"""Claims API — list endpoint + canonical claim detail page. Owner: Argus Routes: GET /api/claims — list/filter (frontmatter scan, lightweight) GET /api/claims/{slug} — full claim detail (Ship contract) GET /api/domains — domain rollups for sidebar The detail endpoint is the canonical /claims/{slug} backend per Ship's 2026-04-29 brief. One round-trip, no N+1 cascade. Wikilinks resolved server-side via title→slug index built from a tree walk. """ import json import re import sqlite3 import time from pathlib import Path import yaml from aiohttp import web # Codex tree roots — claims live in three places (Sourcer Apr 26 fix scope) CODEX_BASE = Path("/opt/teleo-eval/workspaces/main") CLAIM_TREES = [CODEX_BASE / "domains", CODEX_BASE / "foundations", CODEX_BASE / "core"] # pipeline.db for joins (review_records, prs, sources) DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" # In-process caches _list_cache = {"data": None, "ts": 0} _LIST_CACHE_TTL = 300 # 5 min — list view tolerates staleness _index_cache = {"by_title": None, "by_stem": None, "ts": 0} _INDEX_CACHE_TTL = 60 # 1 min — title→slug index for wikilink resolution CORS_HEADERS = {"Access-Control-Allow-Origin": "*"} # Wikilink pattern. [[text]] or [[text|alias]] — we keep the link text only. _WIKILINK_RE = re.compile(r"\[\[([^\]|#]+?)(?:[#|][^\]]*)?\]\]") # ─── Normalization ───────────────────────────────────────────────────────── def _normalize_for_match(s): """Collapse a title or slug to a comparable form. Rules (from Ship's brief — match the link-fixer canonicalization): - lowercase - hyphen ↔ space tolerant (both → single space) - collapse runs of whitespace - strip leading/trailing whitespace - drop trailing punctuation that gets stripped from filenames (`.`, `?`, `!`, `:`, `--`) NOTE: lib/attribution.py exposes only normalize_handle today, not the title normalizer Ship referenced. Implementing inline; if a canonical helper lands later we point at it. """ if not s: return "" s = str(s).lower().strip() # Treat hyphens as spaces, then collapse whitespace runs s = s.replace("-", " ").replace("_", " ") s = re.sub(r"\s+", " ", s) # Strip ASCII punctuation that filenames drop s = re.sub(r"[^\w\s]", "", s) return s.strip() # ─── Frontmatter parse ───────────────────────────────────────────────────── def _split_frontmatter(text): """Return (frontmatter_dict, body_str) or (None, None) if not a claim file.""" if not text.startswith("---"): return None, None try: end = text.index("\n---", 3) except ValueError: return None, None try: fm = yaml.safe_load(text[3:end]) except Exception: return None, None if not isinstance(fm, dict): return None, None body = text[end + 4:].lstrip() return fm, body def _read_claim_file(filepath): """Read a claim file from disk. Returns (frontmatter, body) or (None, None).""" try: text = filepath.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError): return None, None return _split_frontmatter(text) # ─── Tree walk + indexing ────────────────────────────────────────────────── def _walk_claim_files(): """Yield Path objects for every .md claim file in domains/, foundations/, core/.""" for root in CLAIM_TREES: if not root.exists(): continue for f in root.rglob("*.md"): if f.name == "_map.md": continue yield f def _build_indexes(): """Build (title→stem, stem→relpath) indexes for wikilink resolution. Cached for _INDEX_CACHE_TTL. Pulls from claim-index endpoint when possible (already cached upstream) and falls back to filesystem walk. """ now = time.time() if _index_cache["by_title"] is not None and now - _index_cache["ts"] < _INDEX_CACHE_TTL: return _index_cache["by_title"], _index_cache["by_stem"] by_title = {} by_stem = {} for f in _walk_claim_files(): stem = f.stem rel = str(f.relative_to(CODEX_BASE)) by_stem[stem] = rel # Index by stem-as-normalized too (covers wikilinks that use the slug) by_title[_normalize_for_match(stem)] = stem # Also try parsing the title from frontmatter for higher-fidelity matches fm, _ = _read_claim_file(f) if fm: title = fm.get("title") if title: key = _normalize_for_match(title) if key and key not in by_title: by_title[key] = stem _index_cache["by_title"] = by_title _index_cache["by_stem"] = by_stem _index_cache["ts"] = now return by_title, by_stem def _resolve_wikilinks(body, by_title): """Extract [[link]] occurrences from body, return {link_text: slug_or_null}.""" out = {} for match in _WIKILINK_RE.finditer(body or ""): link_text = match.group(1).strip() if not link_text or link_text in out: continue norm = _normalize_for_match(link_text) out[link_text] = by_title.get(norm) return out # ─── Edge extraction from frontmatter ────────────────────────────────────── _EDGE_FIELDS = { "supports": "supports", "challenges": "challenges", "challenged_by": "challenges", # canonical: store as challenges direction "related": "related", "related_claims": "related", "depends_on": "depends_on", } def _extract_edges(fm, by_title, by_stem): """Return edges dict shaped per Ship's contract. Each edge is {slug, title, exists}. Slug resolved through title index. """ edges = {"supports": [], "challenges": [], "related": [], "depends_on": []} for fm_key, edge_kind in _EDGE_FIELDS.items(): raw = fm.get(fm_key) if not raw: continue items = raw if isinstance(raw, list) else [raw] for item in items: if not isinstance(item, str): continue text = item.strip() # Strip wikilink wrapping if present text = re.sub(r"^\[\[|\]\]$", "", text) # Strip pipe annotations: "[[link|alias]]" style or "claim | edge_type | date" text = text.split("|")[0].strip() if not text: continue # Try title match first, fall back to stem match slug = by_title.get(_normalize_for_match(text)) if not slug and text in by_stem: slug = text edges[edge_kind].append({ "slug": slug, "title": text, "exists": slug is not None, }) return edges # ─── Source provenance ───────────────────────────────────────────────────── def _resolve_sourced_from(conn, claim_filepath, fm, title, stem): """Build sourced_from list for the claim. Strategy: find PRs that produced this claim (via prs.description LIKE or branch slug match), look at prs.source_path → inbox archive file → parse that source's frontmatter for title/url. Falls back to the raw `source` string from the claim's own frontmatter. Both `title` and `stem` must be non-empty — caller (handler) already falls back stem→title; passing empty values would leak `LIKE '%%'` and match unrelated PRs. """ out = [] seen_paths = set() pr_rows = [] if (title or "").strip() and (stem or "").strip(): try: pr_rows = conn.execute( """SELECT DISTINCT source_path FROM prs WHERE source_path IS NOT NULL AND source_path != '' AND (description LIKE ? OR branch LIKE ?) LIMIT 10""", (f"%{title}%", f"%{stem}%"), ).fetchall() except sqlite3.OperationalError: pr_rows = [] for row in pr_rows: path = row["source_path"] if not path or path in seen_paths: continue seen_paths.add(path) out.append(_resolve_source_file(path)) # 2. Fallback: parse raw source frontmatter field if no PR match if not out: raw = fm.get("source") if isinstance(raw, str) and raw.strip(): out.append({"path": None, "title": raw.strip()[:200], "url": None}) return out def _resolve_source_file(rel_path): """Given inbox/archive/... path, parse frontmatter for title+url. Best-effort.""" full = CODEX_BASE / rel_path entry = {"path": rel_path, "title": None, "url": None} if full.exists(): fm, _ = _read_claim_file(full) if fm: entry["title"] = fm.get("title") or fm.get("source") or rel_path entry["url"] = fm.get("url") if not entry["title"]: # Last resort: derive from filename entry["title"] = Path(rel_path).stem.replace("-", " ") return entry # ─── Reviews + PRs ───────────────────────────────────────────────────────── def _load_pr_history(conn, title, stem): """Find PRs that touched this claim and their reviews. Both title and stem must be non-empty strings — empty leaks `LIKE '%%'` which matches every PR. Handler already populates a fallback so this is a defense-in-depth guard. """ if not (title or "").strip() or not (stem or "").strip(): return [], [] try: pr_rows = conn.execute( """SELECT number, merged_at, commit_type, agent, branch, status FROM prs WHERE merged_at IS NOT NULL AND (description LIKE ? OR branch LIKE ?) ORDER BY merged_at ASC LIMIT 50""", (f"%{title}%", f"%{stem}%"), ).fetchall() except sqlite3.OperationalError: return [], [] prs = [ { "number": r["number"], "merged_at": r["merged_at"], "kind": r["commit_type"] or "unknown", "agent": r["agent"], "branch": r["branch"], } for r in pr_rows ] pr_numbers = [p["number"] for p in prs] if not pr_numbers: return prs, [] placeholders = ",".join("?" * len(pr_numbers)) try: review_rows = conn.execute( f"""SELECT pr_number, reviewer, reviewer_model, outcome, rejection_reason, notes, reviewed_at FROM review_records WHERE pr_number IN ({placeholders}) ORDER BY reviewed_at ASC""", pr_numbers, ).fetchall() except sqlite3.OperationalError: review_rows = [] reviews = [ { "pr_number": r["pr_number"], "reviewer": r["reviewer"], "model": r["reviewer_model"], "outcome": r["outcome"], "rejection_reason": r["rejection_reason"], "notes": r["notes"], "reviewed_at": r["reviewed_at"], } for r in review_rows ] return prs, reviews # ─── List view (preserved) ───────────────────────────────────────────────── def _parse_list_entry(filepath): fm, body = _read_claim_file(filepath) if not fm or fm.get("type") != "claim": return None links = _WIKILINK_RE.findall(body or "") paragraphs = [p.strip() for p in (body or "").split("\n\n") if p.strip() and not p.strip().startswith("#")] summary = paragraphs[0][:300] if paragraphs else "" return { "slug": filepath.stem, "title": fm.get("title", filepath.stem.replace("-", " ")), "domain": fm.get("domain", "unknown"), "confidence": fm.get("confidence", "unknown"), "agent": fm.get("agent"), "scope": fm.get("scope"), "created": str(fm.get("created", "")), "source": fm.get("source", "") if isinstance(fm.get("source"), str) else "", "sourcer": fm.get("sourcer", ""), "wiki_link_count": len(links), "summary": summary, "challenged_by": fm.get("challenged_by"), "related_claims": fm.get("related_claims", []), } def _load_all_claims_list(): now = time.time() if _list_cache["data"] and now - _list_cache["ts"] < _LIST_CACHE_TTL: return _list_cache["data"] claims = [] for f in _walk_claim_files(): entry = _parse_list_entry(f) if entry: claims.append(entry) _list_cache["data"] = claims _list_cache["ts"] = now return claims # ─── Handlers ────────────────────────────────────────────────────────────── async def handle_claims(request): claims = _load_all_claims_list() domain = request.query.get("domain") search = request.query.get("q", "").lower() confidence = request.query.get("confidence") agent = request.query.get("agent") sort = request.query.get("sort", "recent") filtered = claims if domain: filtered = [c for c in filtered if c["domain"] == domain] if confidence: filtered = [c for c in filtered if c["confidence"] == confidence] if agent: filtered = [c for c in filtered if c["agent"] == agent] if search: filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()] if sort == "recent": filtered.sort(key=lambda c: c["created"], reverse=True) elif sort == "alpha": filtered.sort(key=lambda c: c["title"].lower()) elif sort == "domain": filtered.sort(key=lambda c: (c["domain"], c["title"].lower())) limit = min(int(request.query.get("limit", "50")), 200) offset = int(request.query.get("offset", "0")) page = filtered[offset:offset + limit] domain_counts = {} for c in claims: domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1 return web.json_response({ "claims": page, "total": len(filtered), "offset": offset, "limit": limit, "domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])), "confidence_levels": sorted(set(c["confidence"] for c in claims)), "agents": sorted(set(c["agent"] for c in claims if c["agent"])), }, headers=CORS_HEADERS) async def handle_claim_detail(request): """GET /api/claims/{slug} — canonical claim detail page (Ship contract). One round-trip, all data resolved server-side. Wikilinks pre-resolved. """ slug = request.match_info["slug"] by_title, by_stem = _build_indexes() rel_path = by_stem.get(slug) if not rel_path: return web.json_response({"error": "claim not found", "slug": slug}, status=404, headers=CORS_HEADERS) filepath = CODEX_BASE / rel_path fm, body = _read_claim_file(filepath) if not fm: return web.json_response({"error": "frontmatter parse failed", "slug": slug}, status=500, headers=CORS_HEADERS) # Open read-only DB connection for this request conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True) conn.row_factory = sqlite3.Row try: title = fm.get("title") or slug.replace("-", " ") prs, reviews = _load_pr_history(conn, title, slug) sourced_from = _resolve_sourced_from(conn, filepath, fm, title, slug) finally: conn.close() last_review = None if reviews: latest = reviews[-1] last_review = { "outcome": latest["outcome"], "reviewer": latest["reviewer"], "date": (latest["reviewed_at"] or "")[:10], } # secondary_domains: explicit list, or empty secondary = fm.get("secondary_domains") or fm.get("cross_domain_links") or [] if isinstance(secondary, str): secondary = [secondary] description = fm.get("description") or "" edges = _extract_edges(fm, by_title, by_stem) wikilinks = _resolve_wikilinks(body, by_title) response = { "slug": slug, "title": title, "domain": fm.get("domain", "unknown"), "secondary_domains": secondary, "confidence": fm.get("confidence", "unknown"), "description": description, "created": str(fm.get("created", "")), "last_review": last_review, "body": body or "", "sourced_from": sourced_from, "reviews": reviews, "prs": prs, "edges": edges, "wikilinks": wikilinks, } return web.json_response(response, headers=CORS_HEADERS) async def handle_domains(request): claims = _load_all_claims_list() domains = {} for c in claims: d = c["domain"] if d not in domains: domains[d] = {"name": d, "count": 0, "agents": set(), "confidence_dist": {}} domains[d]["count"] += 1 if c["agent"]: domains[d]["agents"].add(c["agent"]) conf = c["confidence"] domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1 result = [] for d in sorted(domains.values(), key=lambda x: -x["count"]): d["agents"] = sorted(d["agents"]) result.append(d) return web.json_response(result, headers=CORS_HEADERS) def register_claims_routes(app): app.router.add_get("/api/claims", handle_claims) app.router.add_get("/api/claims/{slug}", handle_claim_detail) app.router.add_get("/api/domains", handle_domains)