fix(diagnostics): commit claims_api + register routes that were VPS-only
Some checks are pending
CI / lint-and-test (push) Waiting to run
Some checks are pending
CI / lint-and-test (push) Waiting to run
Root cause (per Epi audit):
- /api/claims, /api/contributors/list, /api/contributors/{handle} returned
404 in prod. The route registrations and claims_api.py module existed only
on VPS — never committed. Today's auto-deploy of an unrelated app.py change
rsync'd the repo (registration-less) version over the VPS edits, wiping
endpoints Vercel depended on.
- Recurrence of the deploy-without-commit pattern (blindspot #2).
Brings repo to parity with the live, working VPS state:
- Add diagnostics/claims_api.py (161 lines, was VPS-only)
- Wire register_claims_routes + register_contributor_routes in app.py
alongside the existing register_activity_feed call
beliefs_routes.py is also VPS-only and currently unregistered (orphaned by
the same Apr 21 manual edit that dropped its registration). Left out of this
commit pending a decision on whether to revive or delete.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
2f6424617b
commit
cfcb06a6dc
2 changed files with 167 additions and 0 deletions
|
|
@ -2289,6 +2289,12 @@ def create_app() -> web.Application:
|
|||
# Gamification activity feed (hot/recent/important sort)
|
||||
from activity_feed_api import register as register_activity_feed
|
||||
register_activity_feed(app)
|
||||
# Claims browser + detail
|
||||
from claims_api import register_claims_routes
|
||||
register_claims_routes(app)
|
||||
# Contributor profile (handle lookup, leaderboard with action CI)
|
||||
from contributor_profile_api import register_contributor_routes
|
||||
register_contributor_routes(app)
|
||||
app.on_cleanup.append(_cleanup)
|
||||
return app
|
||||
|
||||
|
|
|
|||
161
diagnostics/claims_api.py
Normal file
161
diagnostics/claims_api.py
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
"""Claims API endpoint — serves claim data from the codex filesystem."""
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from aiohttp import web
|
||||
|
||||
CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains")
|
||||
_cache = {"data": None, "ts": 0}
|
||||
CACHE_TTL = 300 # 5 minutes
|
||||
|
||||
def _parse_frontmatter(filepath):
|
||||
try:
|
||||
text = filepath.read_text(encoding="utf-8")
|
||||
if not text.startswith("---"):
|
||||
return None
|
||||
end = text.index("---", 3)
|
||||
fm = yaml.safe_load(text[3:end])
|
||||
if not fm or fm.get("type") != "claim":
|
||||
return None
|
||||
body = text[end+3:].strip()
|
||||
# Count wiki-links
|
||||
links = re.findall(r"\[\[([^\]]+)\]\]", body)
|
||||
# Extract first paragraph as summary
|
||||
paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")]
|
||||
summary = paragraphs[0][:300] if paragraphs else ""
|
||||
return {
|
||||
"slug": filepath.stem,
|
||||
"title": fm.get("title", filepath.stem.replace("-", " ")),
|
||||
"domain": fm.get("domain", "unknown"),
|
||||
"confidence": fm.get("confidence", "unknown"),
|
||||
"agent": fm.get("agent"),
|
||||
"scope": fm.get("scope"),
|
||||
"created": str(fm.get("created", "")),
|
||||
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
|
||||
"sourcer": fm.get("sourcer", ""),
|
||||
"wiki_link_count": len(links),
|
||||
"summary": summary,
|
||||
"challenged_by": fm.get("challenged_by"),
|
||||
"related_claims": fm.get("related_claims", []),
|
||||
}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _load_all_claims():
|
||||
now = time.time()
|
||||
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
|
||||
return _cache["data"]
|
||||
|
||||
claims = []
|
||||
for domain_dir in sorted(CODEX_ROOT.iterdir()):
|
||||
if not domain_dir.is_dir():
|
||||
continue
|
||||
for f in sorted(domain_dir.glob("*.md")):
|
||||
if f.name == "_map.md":
|
||||
continue
|
||||
c = _parse_frontmatter(f)
|
||||
if c:
|
||||
claims.append(c)
|
||||
|
||||
_cache["data"] = claims
|
||||
_cache["ts"] = now
|
||||
return claims
|
||||
|
||||
|
||||
async def handle_claims(request):
|
||||
claims = _load_all_claims()
|
||||
|
||||
# Filters
|
||||
domain = request.query.get("domain")
|
||||
search = request.query.get("q", "").lower()
|
||||
confidence = request.query.get("confidence")
|
||||
agent = request.query.get("agent")
|
||||
sort = request.query.get("sort", "recent") # recent, alpha, domain
|
||||
|
||||
filtered = claims
|
||||
if domain:
|
||||
filtered = [c for c in filtered if c["domain"] == domain]
|
||||
if confidence:
|
||||
filtered = [c for c in filtered if c["confidence"] == confidence]
|
||||
if agent:
|
||||
filtered = [c for c in filtered if c["agent"] == agent]
|
||||
if search:
|
||||
filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()]
|
||||
|
||||
# Sort
|
||||
if sort == "recent":
|
||||
filtered.sort(key=lambda c: c["created"], reverse=True)
|
||||
elif sort == "alpha":
|
||||
filtered.sort(key=lambda c: c["title"].lower())
|
||||
elif sort == "domain":
|
||||
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
|
||||
|
||||
# Pagination
|
||||
limit = min(int(request.query.get("limit", "50")), 200)
|
||||
offset = int(request.query.get("offset", "0"))
|
||||
page = filtered[offset:offset+limit]
|
||||
|
||||
# Domain counts for sidebar
|
||||
domain_counts = {}
|
||||
for c in claims:
|
||||
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
|
||||
|
||||
return web.json_response({
|
||||
"claims": page,
|
||||
"total": len(filtered),
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
|
||||
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
|
||||
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
|
||||
}, headers={"Access-Control-Allow-Origin": "*"})
|
||||
|
||||
|
||||
async def handle_claim_detail(request):
|
||||
slug = request.match_info["slug"]
|
||||
claims = _load_all_claims()
|
||||
for c in claims:
|
||||
if c["slug"] == slug:
|
||||
# Read full body for detail view
|
||||
for domain_dir in CODEX_ROOT.iterdir():
|
||||
if not domain_dir.is_dir():
|
||||
continue
|
||||
f = domain_dir / f"{slug}.md"
|
||||
if f.exists():
|
||||
text = f.read_text(encoding="utf-8")
|
||||
end = text.index("---", 3)
|
||||
body = text[end+3:].strip()
|
||||
c["body"] = body
|
||||
break
|
||||
return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"})
|
||||
return web.json_response({"error": "claim not found"}, status=404)
|
||||
|
||||
|
||||
async def handle_domains(request):
|
||||
claims = _load_all_claims()
|
||||
domains = {}
|
||||
for c in claims:
|
||||
d = c["domain"]
|
||||
if d not in domains:
|
||||
domains[d] = {"name": d, "count": 0, "agents": set(), "confidence_dist": {}}
|
||||
domains[d]["count"] += 1
|
||||
if c["agent"]:
|
||||
domains[d]["agents"].add(c["agent"])
|
||||
conf = c["confidence"]
|
||||
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
|
||||
|
||||
result = []
|
||||
for d in sorted(domains.values(), key=lambda x: -x["count"]):
|
||||
d["agents"] = sorted(d["agents"])
|
||||
result.append(d)
|
||||
|
||||
return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"})
|
||||
|
||||
|
||||
def register_claims_routes(app):
|
||||
app.router.add_get("/api/claims", handle_claims)
|
||||
app.router.add_get("/api/claims/{slug}", handle_claim_detail)
|
||||
app.router.add_get("/api/domains", handle_domains)
|
||||
Loading…
Reference in a new issue