teleo-infrastructure/diagnostics/claims_api.py
m3taversal 61007042bc fix(claims): proper-prefix slug fallback + event-loop unblock
Ganymede review 2026-05-11 — three issues addressed.

MUST-FIX — prefix fallback broken in both directions:
  Old code used common-prefix matching with a 32-char anchor. This admitted
  two failure modes:

  1. False-positive: stems "X-A" and "X-B" (sharing 50+ char prefix) both
     pass the threshold for a request "X-C-something". Loop picks whichever
     iterates first — dict iteration = filesystem walk order = non-deterministic
     which claim gets served. Two instances with identical data could disagree.

  2. False-negative: a 24-char stem proper-prefix of a longer request never
     reaches the 32-char anchor. Returns 404 despite the correct match
     sitting right there in by_stem.

  Fix: require norm_req.startswith(norm_stem). Proper prefix is much stronger
  than common prefix — drop the anchor to 16 chars without admitting noise.
  Pull to module constant _PREFIX_ANCHOR_MIN.

  Verified against real KB collisions (semaglutide pair, liquidity-weighted-price
  pair, attractor-digital-feudalism short-stem case):
    - Common-prefix XYZZY collision: 200 -> 404 (correct)
    - Proper-prefix match: resolves to shorter B stem (correct, deterministic)
    - 27-char proper-prefix request: 404 -> 200 (correct)
    - All 4 yesterday's long-slug repros: still 200

WARNING — _build_indexes blocks event loop for ~3.3s on cold cache:
  Routed through asyncio.to_thread. Warm-cache overhead negligible (dict
  access), cold-cache concurrent requests no longer stall.

NITS:
  - _split_frontmatter catches yaml.YAMLError specifically, logs WARNING
    with file path. Bare Exception was hiding KB integrity drift.
  - _INDEX_CACHE_TTL bumped 60s -> 300s to match commit-message intent.
  - PREFIX_ANCHOR_MIN pulled to module constant with calibration comment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 13:12:49 +01:00

582 lines
22 KiB
Python

"""Claims API — list endpoint + canonical claim detail page.
Owner: Argus
Routes:
GET /api/claims — list/filter (frontmatter scan, lightweight)
GET /api/claims/{slug} — full claim detail (Ship contract)
GET /api/domains — domain rollups for sidebar
The detail endpoint is the canonical /claims/{slug} backend per Ship's
2026-04-29 brief. One round-trip, no N+1 cascade. Wikilinks resolved
server-side via title→slug index built from a tree walk.
"""
import asyncio
import json
import logging
import re
import sqlite3
import time
from pathlib import Path
import yaml
from aiohttp import web
logger = logging.getLogger("argus.claims")
# Codex tree roots — claims live in three places (Sourcer Apr 26 fix scope)
CODEX_BASE = Path("/opt/teleo-eval/workspaces/main")
CLAIM_TREES = [CODEX_BASE / "domains", CODEX_BASE / "foundations", CODEX_BASE / "core"]
# pipeline.db for joins (review_records, prs, sources)
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
# In-process caches
_list_cache = {"data": None, "ts": 0}
_LIST_CACHE_TTL = 300 # 5 min — list view tolerates staleness
_index_cache = {"by_title": None, "by_stem": None, "ts": 0}
_INDEX_CACHE_TTL = 300 # 5 min — title→slug index for wikilink resolution
# Minimum normalized-stem length for prefix-fallback resolution.
# Stems shorter than this are too generic to be unambiguous in the prefix
# space (e.g. a "rio" stem would match any request starting with "rio").
# Proper-prefix matching is much stronger than common-prefix at preventing
# spurious hits, so this can be lower than the original common-prefix anchor.
_PREFIX_ANCHOR_MIN = 16
CORS_HEADERS = {"Access-Control-Allow-Origin": "*"}
# Wikilink pattern. [[text]] or [[text|alias]] — we keep the link text only.
_WIKILINK_RE = re.compile(r"\[\[([^\]|#]+?)(?:[#|][^\]]*)?\]\]")
# ─── Normalization ─────────────────────────────────────────────────────────
def _normalize_for_match(s):
"""Collapse a title or slug to a comparable form.
Rules (from Ship's brief — match the link-fixer canonicalization):
- lowercase
- hyphen ↔ space tolerant (both → single space)
- collapse runs of whitespace
- strip leading/trailing whitespace
- drop trailing punctuation that gets stripped from filenames
(`.`, `?`, `!`, `:`, `--`)
NOTE: lib/attribution.py exposes only normalize_handle today, not the
title normalizer Ship referenced. Implementing inline; if a canonical
helper lands later we point at it.
"""
if not s:
return ""
s = str(s).lower().strip()
# Treat hyphens as spaces, then collapse whitespace runs
s = s.replace("-", " ").replace("_", " ")
s = re.sub(r"\s+", " ", s)
# Strip ASCII punctuation that filenames drop
s = re.sub(r"[^\w\s]", "", s)
return s.strip()
# ─── Frontmatter parse ─────────────────────────────────────────────────────
_CODE_FENCE_WRAPPER_RE = re.compile(r"^\s*```(?:markdown|md)?\s*\n(.*?)\n```\s*$", re.DOTALL)
def _split_frontmatter(text, filepath=None):
"""Return (frontmatter_dict, body_str) or (None, None) if not a claim file.
Tolerates files wrapped in a top-level ```markdown ... ``` code fence —
some agents have produced these (e.g. Montreal Protocol claim from Astra,
2024-12-09). Unwrap once before frontmatter detection.
YAML parse failures are logged at WARNING with the file path (when
provided) so KB integrity drift surfaces in logs rather than silently
becoming 404s on the detail endpoint.
"""
if not text:
return None, None
m = _CODE_FENCE_WRAPPER_RE.match(text)
if m:
text = m.group(1)
text = text.lstrip()
if not text.startswith("---"):
return None, None
try:
end = text.index("\n---", 3)
except ValueError:
return None, None
try:
fm = yaml.safe_load(text[3:end])
except yaml.YAMLError as e:
logger.warning("YAML parse failed in %s: %s", filepath or "<unknown>", e)
return None, None
if not isinstance(fm, dict):
return None, None
body = text[end + 4:].lstrip()
return fm, body
def _read_claim_file(filepath):
"""Read a claim file from disk. Returns (frontmatter, body) or (None, None)."""
try:
text = filepath.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
return None, None
return _split_frontmatter(text, filepath)
# ─── Tree walk + indexing ──────────────────────────────────────────────────
def _walk_claim_files():
"""Yield Path objects for every .md claim file in domains/, foundations/, core/."""
for root in CLAIM_TREES:
if not root.exists():
continue
for f in root.rglob("*.md"):
if f.name == "_map.md":
continue
yield f
def _build_indexes():
"""Build (title→stem, stem→relpath) indexes for wikilink resolution.
Cached for _INDEX_CACHE_TTL. Pulls from claim-index endpoint when
possible (already cached upstream) and falls back to filesystem walk.
"""
now = time.time()
if _index_cache["by_title"] is not None and now - _index_cache["ts"] < _INDEX_CACHE_TTL:
return _index_cache["by_title"], _index_cache["by_stem"]
by_title = {}
by_stem = {}
for f in _walk_claim_files():
stem = f.stem
rel = str(f.relative_to(CODEX_BASE))
by_stem[stem] = rel
# Index by stem-as-normalized too (covers wikilinks that use the slug)
by_title[_normalize_for_match(stem)] = stem
# Also try parsing the title from frontmatter for higher-fidelity matches
fm, _ = _read_claim_file(f)
if fm:
title = fm.get("title")
if title:
key = _normalize_for_match(title)
if key and key not in by_title:
by_title[key] = stem
_index_cache["by_title"] = by_title
_index_cache["by_stem"] = by_stem
_index_cache["ts"] = now
return by_title, by_stem
def _resolve_wikilinks(body, by_title):
"""Extract [[link]] occurrences from body, return {link_text: slug_or_null}."""
out = {}
for match in _WIKILINK_RE.finditer(body or ""):
link_text = match.group(1).strip()
if not link_text or link_text in out:
continue
norm = _normalize_for_match(link_text)
out[link_text] = by_title.get(norm)
return out
# ─── Edge extraction from frontmatter ──────────────────────────────────────
_EDGE_FIELDS = {
"supports": "supports",
"challenges": "challenges",
"challenged_by": "challenges", # canonical: store as challenges direction
"related": "related",
"related_claims": "related",
"depends_on": "depends_on",
}
def _extract_edges(fm, by_title, by_stem):
"""Return edges dict shaped per Ship's contract.
Each edge is {slug, title, exists}. Slug resolved through title index.
"""
edges = {"supports": [], "challenges": [], "related": [], "depends_on": []}
for fm_key, edge_kind in _EDGE_FIELDS.items():
raw = fm.get(fm_key)
if not raw:
continue
items = raw if isinstance(raw, list) else [raw]
for item in items:
if not isinstance(item, str):
continue
text = item.strip()
# Strip wikilink wrapping if present
text = re.sub(r"^\[\[|\]\]$", "", text)
# Strip pipe annotations: "[[link|alias]]" style or "claim | edge_type | date"
text = text.split("|")[0].strip()
if not text:
continue
# Try title match first, fall back to stem match
slug = by_title.get(_normalize_for_match(text))
if not slug and text in by_stem:
slug = text
edges[edge_kind].append({
"slug": slug,
"title": text,
"exists": slug is not None,
})
return edges
# ─── Source provenance ─────────────────────────────────────────────────────
def _resolve_sourced_from(conn, claim_filepath, fm, title, stem):
"""Build sourced_from list for the claim.
Strategy: find PRs that produced this claim (via prs.description LIKE
or branch slug match), look at prs.source_path → inbox archive file →
parse that source's frontmatter for title/url. Falls back to the raw
`source` string from the claim's own frontmatter.
Both `title` and `stem` must be non-empty — caller (handler) already
falls back stem→title; passing empty values would leak `LIKE '%%'`
and match unrelated PRs.
"""
out = []
seen_paths = set()
pr_rows = []
if (title or "").strip() and (stem or "").strip():
try:
pr_rows = conn.execute(
"""SELECT DISTINCT source_path
FROM prs
WHERE source_path IS NOT NULL AND source_path != ''
AND (description LIKE ? OR branch LIKE ?)
LIMIT 10""",
(f"%{title}%", f"%{stem}%"),
).fetchall()
except sqlite3.OperationalError:
pr_rows = []
for row in pr_rows:
path = row["source_path"]
if not path or path in seen_paths:
continue
seen_paths.add(path)
out.append(_resolve_source_file(path))
# 2. Fallback: parse raw source frontmatter field if no PR match
if not out:
raw = fm.get("source")
if isinstance(raw, str) and raw.strip():
out.append({"path": None, "title": raw.strip()[:200], "url": None})
return out
def _resolve_source_file(rel_path):
"""Given inbox/archive/... path, parse frontmatter for title+url. Best-effort."""
full = CODEX_BASE / rel_path
entry = {"path": rel_path, "title": None, "url": None}
if full.exists():
fm, _ = _read_claim_file(full)
if fm:
entry["title"] = fm.get("title") or fm.get("source") or rel_path
entry["url"] = fm.get("url")
if not entry["title"]:
# Last resort: derive from filename
entry["title"] = Path(rel_path).stem.replace("-", " ")
return entry
# ─── Reviews + PRs ─────────────────────────────────────────────────────────
def _load_pr_history(conn, title, stem):
"""Find PRs that touched this claim and their reviews.
Both title and stem must be non-empty strings — empty leaks `LIKE '%%'`
which matches every PR. Handler already populates a fallback so this
is a defense-in-depth guard.
"""
if not (title or "").strip() or not (stem or "").strip():
return [], []
try:
pr_rows = conn.execute(
"""SELECT number, merged_at, commit_type, agent, branch, status
FROM prs
WHERE merged_at IS NOT NULL
AND (description LIKE ? OR branch LIKE ?)
ORDER BY merged_at ASC
LIMIT 50""",
(f"%{title}%", f"%{stem}%"),
).fetchall()
except sqlite3.OperationalError:
return [], []
prs = [
{
"number": r["number"],
"merged_at": r["merged_at"],
"kind": r["commit_type"] or "unknown",
"agent": r["agent"],
"branch": r["branch"],
}
for r in pr_rows
]
pr_numbers = [p["number"] for p in prs]
if not pr_numbers:
return prs, []
placeholders = ",".join("?" * len(pr_numbers))
try:
review_rows = conn.execute(
f"""SELECT pr_number, reviewer, reviewer_model, outcome,
rejection_reason, notes, reviewed_at
FROM review_records
WHERE pr_number IN ({placeholders})
ORDER BY reviewed_at ASC""",
pr_numbers,
).fetchall()
except sqlite3.OperationalError:
review_rows = []
reviews = [
{
"pr_number": r["pr_number"],
"reviewer": r["reviewer"],
"model": r["reviewer_model"],
"outcome": r["outcome"],
"rejection_reason": r["rejection_reason"],
"notes": r["notes"],
"reviewed_at": r["reviewed_at"],
}
for r in review_rows
]
return prs, reviews
# ─── List view (preserved) ─────────────────────────────────────────────────
def _parse_list_entry(filepath):
fm, body = _read_claim_file(filepath)
if not fm or fm.get("type") != "claim":
return None
links = _WIKILINK_RE.findall(body or "")
paragraphs = [p.strip() for p in (body or "").split("\n\n")
if p.strip() and not p.strip().startswith("#")]
summary = paragraphs[0][:300] if paragraphs else ""
return {
"slug": filepath.stem,
"title": fm.get("title", filepath.stem.replace("-", " ")),
"domain": fm.get("domain", "unknown"),
"confidence": fm.get("confidence", "unknown"),
"agent": fm.get("agent"),
"scope": fm.get("scope"),
"created": str(fm.get("created", "")),
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
"sourcer": fm.get("sourcer", ""),
"wiki_link_count": len(links),
"summary": summary,
"challenged_by": fm.get("challenged_by"),
"related_claims": fm.get("related_claims", []),
}
def _load_all_claims_list():
now = time.time()
if _list_cache["data"] and now - _list_cache["ts"] < _LIST_CACHE_TTL:
return _list_cache["data"]
claims = []
for f in _walk_claim_files():
entry = _parse_list_entry(f)
if entry:
claims.append(entry)
_list_cache["data"] = claims
_list_cache["ts"] = now
return claims
# ─── Handlers ──────────────────────────────────────────────────────────────
async def handle_claims(request):
claims = _load_all_claims_list()
domain = request.query.get("domain")
search = request.query.get("q", "").lower()
confidence = request.query.get("confidence")
agent = request.query.get("agent")
sort = request.query.get("sort", "recent")
filtered = claims
if domain:
filtered = [c for c in filtered if c["domain"] == domain]
if confidence:
filtered = [c for c in filtered if c["confidence"] == confidence]
if agent:
filtered = [c for c in filtered if c["agent"] == agent]
if search:
filtered = [c for c in filtered
if search in c["title"].lower() or search in c["summary"].lower()]
if sort == "recent":
filtered.sort(key=lambda c: c["created"], reverse=True)
elif sort == "alpha":
filtered.sort(key=lambda c: c["title"].lower())
elif sort == "domain":
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
limit = min(int(request.query.get("limit", "50")), 200)
offset = int(request.query.get("offset", "0"))
page = filtered[offset:offset + limit]
domain_counts = {}
for c in claims:
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
return web.json_response({
"claims": page,
"total": len(filtered),
"offset": offset,
"limit": limit,
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
}, headers=CORS_HEADERS)
async def handle_claim_detail(request):
"""GET /api/claims/{slug} — canonical claim detail page (Ship contract).
One round-trip, all data resolved server-side. Wikilinks pre-resolved.
"""
requested_slug = request.match_info["slug"]
# Cold-cache rebuild walks ~1,900 files (~3.3s of sync I/O). Route through
# to_thread so the aiohttp event loop stays responsive while the index
# rebuilds — concurrent requests don't all stall behind one walk.
# Warm-cache cost is a dict access (microseconds), to_thread overhead
# negligible. Ganymede review 2026-05-11.
by_title, by_stem = await asyncio.to_thread(_build_indexes)
# Resolution order: exact stem → title-normalized (handles description-derived
# slugs from /api/activity-feed that are longer than on-disk file stems) →
# stem-as-prefix (handles description-derived slugs that are shorter than the
# file stem because the description was truncated upstream).
slug = requested_slug
rel_path = by_stem.get(slug)
if not rel_path:
# Title fallback: requested slug = slugified frontmatter title
norm = _normalize_for_match(requested_slug)
resolved_stem = by_title.get(norm)
if resolved_stem:
slug = resolved_stem
rel_path = by_stem.get(resolved_stem)
if not rel_path:
# Proper-prefix fallback: the requested slug should START WITH a known
# stem (covers activity-feed slugs longer than on-disk filenames). The
# earlier common-prefix variant of this was broken in two directions —
# served non-deterministic matches on same-prefix collisions, and
# missed legitimate matches under the 32-char anchor (e.g. stems
# shorter than 32 chars normalized). Ganymede review 2026-05-11.
norm_req = _normalize_for_match(requested_slug)
best_stem = None
best_len = 0
for stem in by_stem:
norm_stem = _normalize_for_match(stem)
if len(norm_stem) < _PREFIX_ANCHOR_MIN:
continue # too generic in prefix space
if norm_req.startswith(norm_stem) and len(norm_stem) > best_len:
best_stem = stem
best_len = len(norm_stem)
if best_stem:
slug = best_stem
rel_path = by_stem.get(best_stem)
if not rel_path:
return web.json_response({"error": "claim not found", "slug": requested_slug},
status=404, headers=CORS_HEADERS)
filepath = CODEX_BASE / rel_path
fm, body = _read_claim_file(filepath)
if not fm:
# File exists at this stem but has no parseable frontmatter — almost
# always a stray enrichment fragment that landed in domains/ without
# being merged into a parent claim. Surfacing as 404 (no claim here)
# not 500: the caller can't act on it differently anyway.
return web.json_response({"error": "claim not found", "slug": slug,
"reason": "file_no_frontmatter"},
status=404, headers=CORS_HEADERS)
# Open read-only DB connection for this request
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
title = fm.get("title") or slug.replace("-", " ")
prs, reviews = _load_pr_history(conn, title, slug)
sourced_from = _resolve_sourced_from(conn, filepath, fm, title, slug)
finally:
conn.close()
last_review = None
if reviews:
latest = reviews[-1]
last_review = {
"outcome": latest["outcome"],
"reviewer": latest["reviewer"],
"date": (latest["reviewed_at"] or "")[:10],
}
# secondary_domains: explicit list, or empty
secondary = fm.get("secondary_domains") or fm.get("cross_domain_links") or []
if isinstance(secondary, str):
secondary = [secondary]
description = fm.get("description") or ""
edges = _extract_edges(fm, by_title, by_stem)
wikilinks = _resolve_wikilinks(body, by_title)
response = {
"slug": slug,
"title": title,
"domain": fm.get("domain", "unknown"),
"secondary_domains": secondary,
"confidence": fm.get("confidence", "unknown"),
"description": description,
"created": str(fm.get("created", "")),
"last_review": last_review,
"body": body or "",
"sourced_from": sourced_from,
"reviews": reviews,
"prs": prs,
"edges": edges,
"wikilinks": wikilinks,
}
return web.json_response(response, headers=CORS_HEADERS)
async def handle_domains(request):
claims = _load_all_claims_list()
domains = {}
for c in claims:
d = c["domain"]
if d not in domains:
domains[d] = {"name": d, "count": 0, "agents": set(), "confidence_dist": {}}
domains[d]["count"] += 1
if c["agent"]:
domains[d]["agents"].add(c["agent"])
conf = c["confidence"]
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
result = []
for d in sorted(domains.values(), key=lambda x: -x["count"]):
d["agents"] = sorted(d["agents"])
result.append(d)
return web.json_response(result, headers=CORS_HEADERS)
def register_claims_routes(app):
app.router.add_get("/api/claims", handle_claims)
app.router.add_get("/api/claims/{slug}", handle_claim_detail)
app.router.add_get("/api/domains", handle_domains)