diff --git a/diagnostics/app.py b/diagnostics/app.py index d167dba..51f3a6a 100644 --- a/diagnostics/app.py +++ b/diagnostics/app.py @@ -2289,6 +2289,12 @@ def create_app() -> web.Application: # Gamification activity feed (hot/recent/important sort) from activity_feed_api import register as register_activity_feed register_activity_feed(app) + # Claims browser + detail + from claims_api import register_claims_routes + register_claims_routes(app) + # Contributor profile (handle lookup, leaderboard with action CI) + from contributor_profile_api import register_contributor_routes + register_contributor_routes(app) app.on_cleanup.append(_cleanup) return app diff --git a/diagnostics/claims_api.py b/diagnostics/claims_api.py new file mode 100644 index 0000000..90bdf73 --- /dev/null +++ b/diagnostics/claims_api.py @@ -0,0 +1,161 @@ +"""Claims API endpoint — serves claim data from the codex filesystem.""" +import os +import re +import time +import yaml +from pathlib import Path +from aiohttp import web + +CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains") +_cache = {"data": None, "ts": 0} +CACHE_TTL = 300 # 5 minutes + +def _parse_frontmatter(filepath): + try: + text = filepath.read_text(encoding="utf-8") + if not text.startswith("---"): + return None + end = text.index("---", 3) + fm = yaml.safe_load(text[3:end]) + if not fm or fm.get("type") != "claim": + return None + body = text[end+3:].strip() + # Count wiki-links + links = re.findall(r"\[\[([^\]]+)\]\]", body) + # Extract first paragraph as summary + paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")] + summary = paragraphs[0][:300] if paragraphs else "" + return { + "slug": filepath.stem, + "title": fm.get("title", filepath.stem.replace("-", " ")), + "domain": fm.get("domain", "unknown"), + "confidence": fm.get("confidence", "unknown"), + "agent": fm.get("agent"), + "scope": fm.get("scope"), + "created": str(fm.get("created", "")), + "source": fm.get("source", "") if isinstance(fm.get("source"), str) else "", + "sourcer": fm.get("sourcer", ""), + "wiki_link_count": len(links), + "summary": summary, + "challenged_by": fm.get("challenged_by"), + "related_claims": fm.get("related_claims", []), + } + except Exception: + return None + + +def _load_all_claims(): + now = time.time() + if _cache["data"] and now - _cache["ts"] < CACHE_TTL: + return _cache["data"] + + claims = [] + for domain_dir in sorted(CODEX_ROOT.iterdir()): + if not domain_dir.is_dir(): + continue + for f in sorted(domain_dir.glob("*.md")): + if f.name == "_map.md": + continue + c = _parse_frontmatter(f) + if c: + claims.append(c) + + _cache["data"] = claims + _cache["ts"] = now + return claims + + +async def handle_claims(request): + claims = _load_all_claims() + + # Filters + domain = request.query.get("domain") + search = request.query.get("q", "").lower() + confidence = request.query.get("confidence") + agent = request.query.get("agent") + sort = request.query.get("sort", "recent") # recent, alpha, domain + + filtered = claims + if domain: + filtered = [c for c in filtered if c["domain"] == domain] + if confidence: + filtered = [c for c in filtered if c["confidence"] == confidence] + if agent: + filtered = [c for c in filtered if c["agent"] == agent] + if search: + filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()] + + # Sort + if sort == "recent": + filtered.sort(key=lambda c: c["created"], reverse=True) + elif sort == "alpha": + filtered.sort(key=lambda c: c["title"].lower()) + elif sort == "domain": + filtered.sort(key=lambda c: (c["domain"], c["title"].lower())) + + # Pagination + limit = min(int(request.query.get("limit", "50")), 200) + offset = int(request.query.get("offset", "0")) + page = filtered[offset:offset+limit] + + # Domain counts for sidebar + domain_counts = {} + for c in claims: + domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1 + + return web.json_response({ + "claims": page, + "total": len(filtered), + "offset": offset, + "limit": limit, + "domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])), + "confidence_levels": sorted(set(c["confidence"] for c in claims)), + "agents": sorted(set(c["agent"] for c in claims if c["agent"])), + }, headers={"Access-Control-Allow-Origin": "*"}) + + +async def handle_claim_detail(request): + slug = request.match_info["slug"] + claims = _load_all_claims() + for c in claims: + if c["slug"] == slug: + # Read full body for detail view + for domain_dir in CODEX_ROOT.iterdir(): + if not domain_dir.is_dir(): + continue + f = domain_dir / f"{slug}.md" + if f.exists(): + text = f.read_text(encoding="utf-8") + end = text.index("---", 3) + body = text[end+3:].strip() + c["body"] = body + break + return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"}) + return web.json_response({"error": "claim not found"}, status=404) + + +async def handle_domains(request): + claims = _load_all_claims() + domains = {} + for c in claims: + d = c["domain"] + if d not in domains: + domains[d] = {"name": d, "count": 0, "agents": set(), "confidence_dist": {}} + domains[d]["count"] += 1 + if c["agent"]: + domains[d]["agents"].add(c["agent"]) + conf = c["confidence"] + domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1 + + result = [] + for d in sorted(domains.values(), key=lambda x: -x["count"]): + d["agents"] = sorted(d["agents"]) + result.append(d) + + return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"}) + + +def register_claims_routes(app): + app.router.add_get("/api/claims", handle_claims) + app.router.add_get("/api/claims/{slug}", handle_claim_detail) + app.router.add_get("/api/domains", handle_domains)