"""New API endpoints for the 4-page dashboard. Endpoints: GET /api/stage-times — median dwell time per pipeline stage GET /api/herfindahl — domain concentration index GET /api/agent-state — live agent-state from filesystem GET /api/extraction-yield-by-domain — sources→claims conversion per domain GET /api/agents-dashboard — batched agent performance payload Owner: Argus """ import asyncio import json import logging import os import sqlite3 import statistics import time import urllib.request from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from aiohttp import web logger = logging.getLogger("argus.dashboard_routes") # ─── Claim-index cache (60s TTL) ─────────────────────────────────────────── _claim_index_cache: dict | None = None _claim_index_ts: float = 0 CLAIM_INDEX_TTL = 60 # seconds CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index") AGENT_STATE_DIR = Path(os.environ.get("AGENT_STATE_DIR", "/opt/teleo-eval/agent-state")) def get_claim_index() -> dict | None: """Fetch claim-index with 60s cache.""" global _claim_index_cache, _claim_index_ts now = time.monotonic() if _claim_index_cache is not None and (now - _claim_index_ts) < CLAIM_INDEX_TTL: return _claim_index_cache try: with urllib.request.urlopen(CLAIM_INDEX_URL, timeout=5) as resp: data = json.loads(resp.read()) _claim_index_cache = data _claim_index_ts = now return data except Exception as e: logger.warning("Failed to fetch claim-index: %s", e) # Return stale cache if available return _claim_index_cache # ─── GET /api/stage-times ────────────────────────────────────────────────── async def handle_stage_times(request): """Median dwell time per pipeline stage from audit_log timestamps. Stages: discover → validate → evaluate → merge Returns median minutes between consecutive stages. """ conn = request.app["_get_conn"]() try: hours = int(request.query.get("hours", "24")) # Get per-PR event timestamps rows = conn.execute( """SELECT json_extract(detail, '$.pr') as pr, event, timestamp FROM audit_log WHERE timestamp > datetime('now', ? || ' hours') AND json_extract(detail, '$.pr') IS NOT NULL ORDER BY json_extract(detail, '$.pr'), timestamp""", (f"-{hours}",), ).fetchall() # Group by PR pr_events: dict[int, list] = {} for r in rows: pr = r["pr"] if pr not in pr_events: pr_events[pr] = [] pr_events[pr].append({"event": r["event"], "ts": r["timestamp"]}) # Compute stage dwell times stage_pairs = [ ("pr_discovered", "tier0_complete", "Ingest → Validate"), ("tier0_complete", "approved", "Validate → Approve"), ("tier0_complete", "domain_rejected", "Validate → Reject"), ("approved", "merged", "Approve → Merge"), ] stage_times = {} for start_event, end_event, label in stage_pairs: durations = [] for pr, events in pr_events.items(): start_ts = None end_ts = None for e in events: if e["event"] == start_event and start_ts is None: start_ts = e["ts"] if e["event"] == end_event and end_ts is None: end_ts = e["ts"] if start_ts and end_ts: try: s = datetime.fromisoformat(start_ts) e = datetime.fromisoformat(end_ts) mins = (e - s).total_seconds() / 60 if mins >= 0: durations.append(mins) except (ValueError, TypeError): pass if durations: stage_times[label] = { "median_minutes": round(statistics.median(durations), 1), "p90_minutes": round(sorted(durations)[int(len(durations) * 0.9)], 1) if len(durations) >= 5 else None, "count": len(durations), } return web.json_response({"hours": hours, "stages": stage_times}) finally: conn.close() # ─── GET /api/herfindahl ────────────────────────────────────────────────── async def handle_herfindahl(request): """Domain concentration index (Herfindahl-Hirschman). HHI = sum of (domain_share^2). 1.0 = single domain, lower = more diverse. """ conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "30")) rows = conn.execute( """SELECT domain, COUNT(*) as cnt FROM prs WHERE status='merged' AND domain IS NOT NULL AND merged_at > datetime('now', ? || ' days') GROUP BY domain""", (f"-{days}",), ).fetchall() if not rows: return web.json_response({"hhi": 0, "domains": [], "days": days}) total = sum(r["cnt"] for r in rows) domains = [] hhi = 0 for r in rows: share = r["cnt"] / total hhi += share ** 2 domains.append({ "domain": r["domain"], "count": r["cnt"], "share": round(share, 4), }) domains.sort(key=lambda x: x["count"], reverse=True) # Interpret: HHI < 0.15 = diverse, 0.15-0.25 = moderate, >0.25 = concentrated status = "diverse" if hhi < 0.15 else ("moderate" if hhi < 0.25 else "concentrated") return web.json_response({ "hhi": round(hhi, 4), "status": status, "domains": domains, "total_merged": total, "days": days, }) finally: conn.close() # ─── GET /api/agent-state ───────────────────────────────────────────────── async def handle_agent_state(request): """Read live agent-state from filesystem. 6 agents, ~1KB each.""" if not AGENT_STATE_DIR.exists(): return web.json_response({"error": "agent-state directory not found", "path": str(AGENT_STATE_DIR)}, status=404) agents = {} for agent_dir in sorted(AGENT_STATE_DIR.iterdir()): if not agent_dir.is_dir(): continue name = agent_dir.name state = {"name": name} # metrics.json metrics_file = agent_dir / "metrics.json" if metrics_file.exists(): try: m = json.loads(metrics_file.read_text()) state["last_active"] = m.get("updated_at") state["metrics"] = m except (json.JSONDecodeError, OSError): state["metrics_error"] = True # tasks.json tasks_file = agent_dir / "tasks.json" if tasks_file.exists(): try: t = json.loads(tasks_file.read_text()) state["tasks"] = t if isinstance(t, list) else [] state["task_count"] = len(state["tasks"]) except (json.JSONDecodeError, OSError): state["tasks"] = [] # session.json session_file = agent_dir / "session.json" if session_file.exists(): try: s = json.loads(session_file.read_text()) state["session"] = s except (json.JSONDecodeError, OSError): pass # inbox depth inbox_dir = agent_dir / "inbox" if inbox_dir.exists() and inbox_dir.is_dir(): state["inbox_depth"] = len(list(inbox_dir.iterdir())) else: state["inbox_depth"] = 0 agents[name] = state return web.json_response({"agents": agents, "agent_count": len(agents)}) # ─── GET /api/extraction-yield-by-domain ────────────────────────────────── async def handle_extraction_yield_by_domain(request): """Sources → claims conversion rate per domain.""" conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "30")) # Sources per domain (approximate from PR source_path domain) source_counts = conn.execute( """SELECT domain, COUNT(DISTINCT path) as sources FROM sources s JOIN prs p ON p.source_path LIKE '%' || s.path || '%' WHERE s.created_at > datetime('now', ? || ' days') GROUP BY domain""", (f"-{days}",), ).fetchall() # Fallback: simpler query if the join doesn't work well merged_by_domain = conn.execute( """SELECT domain, COUNT(*) as merged FROM prs WHERE status='merged' AND domain IS NOT NULL AND merged_at > datetime('now', ? || ' days') GROUP BY domain""", (f"-{days}",), ).fetchall() sources_by_domain = conn.execute( """SELECT domain, COUNT(*) as total_prs, SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as merged FROM prs WHERE domain IS NOT NULL AND created_at > datetime('now', ? || ' days') GROUP BY domain""", (f"-{days}",), ).fetchall() domains = [] for r in sources_by_domain: total = r["total_prs"] or 0 merged = r["merged"] or 0 domains.append({ "domain": r["domain"], "total_prs": total, "merged": merged, "yield": round(merged / total, 3) if total else 0, }) domains.sort(key=lambda x: x["merged"], reverse=True) return web.json_response({"days": days, "domains": domains}) finally: conn.close() # ─── GET /api/agents-dashboard ───────────────────────────────────────────── async def handle_agents_dashboard(request): """Batched agent performance payload for Page 3. Returns per-agent: merged count, rejection rate, yield, CI score, top rejection reasons, contribution trend (weekly). All in one response to avoid N client-side fetches. """ conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "30")) # Per-agent merged + rejected counts agent_stats = conn.execute( """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, COUNT(*) as evaluated, SUM(CASE WHEN event='approved' THEN 1 ELSE 0 END) as approved, SUM(CASE WHEN event IN ('changes_requested','domain_rejected','tier05_rejected') THEN 1 ELSE 0 END) as rejected FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', ? || ' days') AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL GROUP BY agent""", (f"-{days}",), ).fetchall() agents = {} for r in agent_stats: name = r["agent"] ev = r["evaluated"] or 0 ap = r["approved"] or 0 rj = r["rejected"] or 0 agents[name] = { "evaluated": ev, "approved": ap, "rejected": rj, "yield": round(ap / ev, 3) if ev else 0, "rejection_rate": round(rj / ev, 3) if ev else 0, } # Per-agent top rejection reasons from prs.eval_issues (Epimetheus correction 2026-04-02) tag_rows = conn.execute( """SELECT agent, value as tag, COUNT(*) as cnt FROM prs, json_each(prs.eval_issues) WHERE eval_issues IS NOT NULL AND eval_issues != '[]' AND agent IS NOT NULL AND created_at > datetime('now', ? || ' days') GROUP BY agent, tag ORDER BY agent, cnt DESC""", (f"-{days}",), ).fetchall() for r in tag_rows: name = r["agent"] if name in agents: if "top_rejections" not in agents[name]: agents[name]["top_rejections"] = [] if len(agents[name]["top_rejections"]) < 5: agents[name]["top_rejections"].append({"tag": r["tag"], "count": r["cnt"]}) # Weekly contribution trend per agent weekly = conn.execute( """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, strftime('%Y-W%W', timestamp) as week, SUM(CASE WHEN event='approved' THEN 1 ELSE 0 END) as merged, COUNT(*) as evaluated FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', ? || ' days') AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL GROUP BY agent, week ORDER BY agent, week""", (f"-{days}",), ).fetchall() for r in weekly: name = r["agent"] if name in agents: if "weekly_trend" not in agents[name]: agents[name]["weekly_trend"] = [] agents[name]["weekly_trend"].append({ "week": r["week"], "merged": r["merged"] or 0, "evaluated": r["evaluated"] or 0, }) # CI scores from contributors table weights = {"sourcer": 0.15, "extractor": 0.05, "challenger": 0.35, "synthesizer": 0.25, "reviewer": 0.20} try: contribs = conn.execute( "SELECT handle, sourcer_count, extractor_count, challenger_count, " "synthesizer_count, reviewer_count, claims_merged, tier FROM contributors" ).fetchall() for c in contribs: name = c["handle"] if name not in agents: agents[name] = {} ci = sum((c[f"{role}_count"] or 0) * w for role, w in weights.items()) agents[name]["ci_score"] = round(ci, 2) agents[name]["claims_merged"] = c["claims_merged"] or 0 agents[name]["tier"] = c["tier"] except sqlite3.Error: pass return web.json_response({"days": days, "agents": agents}) finally: conn.close() # ─── GET /api/cascade-coverage ──────────────────────────────────────────── async def handle_cascade_coverage(request): """Cascade coverage from audit_log stage='cascade' events. Returns: triggered count, by-agent breakdown, claims affected. """ conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "30")) triggered = conn.execute( """SELECT json_extract(detail, '$.agent') as agent, COUNT(*) as cnt, SUM(json_array_length(json_extract(detail, '$.source_claims'))) as claims_affected FROM audit_log WHERE stage='cascade' AND event='cascade_triggered' AND timestamp > datetime('now', ? || ' days') GROUP BY agent""", (f"-{days}",), ).fetchall() summaries = conn.execute( """SELECT SUM(json_extract(detail, '$.notifications_sent')) as total_notifications, COUNT(*) as total_merges_with_cascade FROM audit_log WHERE stage='cascade' AND event='cascade_summary' AND timestamp > datetime('now', ? || ' days')""", (f"-{days}",), ).fetchone() reviewed = conn.execute( """SELECT COUNT(*) as cnt FROM audit_log WHERE stage='cascade' AND event='cascade_reviewed' AND timestamp > datetime('now', ? || ' days')""", (f"-{days}",), ).fetchone() total_triggered = sum(r["cnt"] for r in triggered) total_reviewed = reviewed["cnt"] if reviewed else 0 completion_rate = round(total_reviewed / total_triggered, 3) if total_triggered else None by_agent = [ {"agent": r["agent"], "triggered": r["cnt"], "claims_affected": r["claims_affected"] or 0} for r in triggered ] insufficient_data = total_triggered < 5 return web.json_response({ "days": days, "total_triggered": total_triggered, "total_reviewed": total_reviewed, "completion_rate": completion_rate, "total_notifications": summaries["total_notifications"] if summaries else 0, "merges_with_cascade": summaries["total_merges_with_cascade"] if summaries else 0, "by_agent": by_agent, "insufficient_data": insufficient_data, }) finally: conn.close() # ─── GET /api/review-summary ───────────────────────────────────────────── async def handle_review_summary(request): """Structured review data from review_records table (migration v12). Cleaner than audit_log parsing — structured outcome, rejection_reason, disagreement_type columns. """ conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "30")) # Check if table exists and has data try: total = conn.execute( "SELECT COUNT(*) as cnt FROM review_records WHERE reviewed_at > datetime('now', ? || ' days')", (f"-{days}",), ).fetchone()["cnt"] except Exception: return web.json_response({"error": "review_records table not available", "populated": False}) if total == 0: return web.json_response({"populated": False, "total": 0, "days": days}) # Outcome breakdown outcomes = conn.execute( """SELECT outcome, COUNT(*) as cnt FROM review_records WHERE reviewed_at > datetime('now', ? || ' days') GROUP BY outcome""", (f"-{days}",), ).fetchall() # Rejection reasons — try review_records first, fall back to prs.eval_issues reasons = conn.execute( """SELECT rejection_reason, COUNT(*) as cnt FROM review_records WHERE rejection_reason IS NOT NULL AND reviewed_at > datetime('now', ? || ' days') GROUP BY rejection_reason ORDER BY cnt DESC""", (f"-{days}",), ).fetchall() rejection_source = "review_records" if not reasons: reasons = conn.execute( """SELECT value AS rejection_reason, COUNT(*) as cnt FROM prs, json_each(prs.eval_issues) WHERE eval_issues IS NOT NULL AND eval_issues != '[]' AND created_at > datetime('now', ? || ' days') GROUP BY value ORDER BY cnt DESC""", (f"-{days}",), ).fetchall() rejection_source = "prs.eval_issues" # Per-reviewer breakdown reviewers = conn.execute( """SELECT reviewer, SUM(CASE WHEN outcome='approved' THEN 1 ELSE 0 END) as approved, SUM(CASE WHEN outcome='approved-with-changes' THEN 1 ELSE 0 END) as approved_with_changes, SUM(CASE WHEN outcome='rejected' THEN 1 ELSE 0 END) as rejected, COUNT(*) as total FROM review_records WHERE reviewed_at > datetime('now', ? || ' days') GROUP BY reviewer ORDER BY total DESC""", (f"-{days}",), ).fetchall() # Per-domain breakdown domains = conn.execute( """SELECT domain, SUM(CASE WHEN outcome='rejected' THEN 1 ELSE 0 END) as rejected, COUNT(*) as total FROM review_records WHERE domain IS NOT NULL AND reviewed_at > datetime('now', ? || ' days') GROUP BY domain ORDER BY total DESC""", (f"-{days}",), ).fetchall() return web.json_response({ "populated": True, "days": days, "total": total, "outcomes": {r["outcome"]: r["cnt"] for r in outcomes}, "rejection_reasons": [{"reason": r["rejection_reason"], "count": r["cnt"]} for r in reasons], "rejection_source": rejection_source, "reviewers": [ {"reviewer": r["reviewer"], "approved": r["approved"], "approved_with_changes": r["approved_with_changes"], "rejected": r["rejected"], "total": r["total"]} for r in reviewers ], "domains": [ {"domain": r["domain"], "rejected": r["rejected"], "total": r["total"], "rejection_rate": round(r["rejected"] / r["total"], 3) if r["total"] else 0} for r in domains ], }) finally: conn.close() # ─── GET /api/agent-scorecard ────────────────────────────────────────────── async def handle_agent_scorecard(request): """Per-agent scorecard: PRs submitted, review outcomes, rejection reasons. Data from review_records (structured reviews) + prs (submission counts). Falls back to prs.eval_issues for rejection reasons when review_records has no rejections yet. """ conn = request.app["_get_conn"]() try: try: days = min(int(request.query.get("days", "30")), 90) except ValueError: days = 30 day_filter = f"-{days}" # PRs submitted per agent prs_by_agent = conn.execute( """SELECT agent, COUNT(*) as cnt FROM prs WHERE agent IS NOT NULL AND created_at > datetime('now', ? || ' days') GROUP BY agent""", (day_filter,), ).fetchall() prs_map = {r["agent"]: r["cnt"] for r in prs_by_agent} # Review outcomes from review_records review_data = {} try: reviews = conn.execute( """SELECT reviewer as agent, outcome, COUNT(*) as cnt FROM review_records WHERE reviewed_at > datetime('now', ? || ' days') GROUP BY reviewer, outcome""", (day_filter,), ).fetchall() for r in reviews: agent = r["agent"] if agent not in review_data: review_data[agent] = {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0} review_data[agent][r["outcome"].replace("-", "_")] = r["cnt"] review_data[agent]["total"] += r["cnt"] except sqlite3.OperationalError: pass # If review_records is empty, fall back to audit_log eval events if not review_data: evals = conn.execute( """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, event, COUNT(*) as cnt FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', ? || ' days') GROUP BY agent, event""", (day_filter,), ).fetchall() for r in evals: agent = r["agent"] if not agent: continue if agent not in review_data: review_data[agent] = {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0} if r["event"] == "approved": review_data[agent]["approved"] += r["cnt"] elif r["event"] == "changes_requested": # fixer auto-remediated; equivalent in pre-review_records era review_data[agent]["approved_with_changes"] += r["cnt"] else: review_data[agent]["rejected"] += r["cnt"] review_data[agent]["total"] += r["cnt"] # Rejection reasons from prs.eval_issues (canonical source) reason_rows = conn.execute( """SELECT agent, value as reason, COUNT(*) as cnt FROM prs, json_each(prs.eval_issues) WHERE eval_issues IS NOT NULL AND eval_issues != '[]' AND agent IS NOT NULL AND created_at > datetime('now', ? || ' days') GROUP BY agent, reason ORDER BY agent, cnt DESC""", (day_filter,), ).fetchall() reasons_map = {} for r in reason_rows: if r["agent"] not in reasons_map: reasons_map[r["agent"]] = {} reasons_map[r["agent"]][r["reason"]] = r["cnt"] # Build scorecards all_agents = sorted(set(list(prs_map.keys()) + list(review_data.keys()))) scorecards = [] for agent in all_agents: if agent in ("unknown", None): continue rd = review_data.get(agent, {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0}) total_reviews = rd["total"] approved = rd["approved"] approved_wc = rd["approved_with_changes"] rejected = rd["rejected"] approval_rate = ((approved + approved_wc) / total_reviews * 100) if total_reviews else 0 scorecards.append({ "agent": agent, "total_prs": prs_map.get(agent, 0), "total_reviews": total_reviews, "approved": approved, "approved_with_changes": approved_wc, "rejected": rejected, "approval_rate": round(approval_rate, 1), "rejection_reasons": reasons_map.get(agent, {}), }) scorecards.sort(key=lambda x: x["total_reviews"], reverse=True) return web.json_response({"days": days, "scorecards": scorecards}) finally: conn.close() # ─── Trace endpoint ──────────────────────────────────────────────────────── async def handle_trace(request: web.Request) -> web.Response: """Return the full lifecycle of a source/PR through the pipeline. GET /api/trace/1234 → all audit_log + review_records + costs for PR 1234. One thread, every stage, chronological. """ trace_id = request.match_info["trace_id"] conn = request.app["_get_conn"]() try: events = conn.execute( """SELECT timestamp, stage, event, detail FROM audit_log WHERE trace_id = ? ORDER BY timestamp""", (trace_id,), ).fetchall() if not events: events = conn.execute( """SELECT timestamp, stage, event, detail FROM audit_log WHERE CAST(json_extract(detail, '$.pr') AS TEXT) = ? ORDER BY timestamp""", (trace_id,), ).fetchall() reviews = conn.execute( """SELECT reviewed_at, reviewer, reviewer_model, outcome, rejection_reason, disagreement_type, notes, claim_path FROM review_records WHERE pr_number = ? ORDER BY reviewed_at""", (trace_id,), ).fetchall() pr = conn.execute( """SELECT number, source_path, domain, agent, tier, status, origin, created_at, merged_at FROM prs WHERE number = ?""", (trace_id,), ).fetchone() result = { "trace_id": trace_id, "pr": dict(pr) if pr else None, "timeline": [ {"timestamp": r[0], "stage": r[1], "event": r[2], "detail": json.loads(r[3]) if r[3] else None} for r in events ], "reviews": [ {"reviewed_at": r[0], "reviewer": r[1], "model": r[2], "outcome": r[3], "rejection_reason": r[4], "disagreement_type": r[5], "notes": r[6], "claim_path": r[7]} for r in reviews ], } return web.json_response(result) finally: conn.close() # ─── GET /api/growth ────────────────────────────────────────────────────── async def handle_growth(request): """Cumulative growth of sources, PRs, and merged claims over time. Returns daily data points with running totals for each series. """ conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "90")) # Daily new sources source_rows = conn.execute( """SELECT date(created_at) as day, COUNT(*) as cnt FROM sources WHERE created_at > datetime('now', ? || ' days') GROUP BY day ORDER BY day""", (f"-{days}",), ).fetchall() # Daily new PRs pr_rows = conn.execute( """SELECT date(created_at) as day, COUNT(*) as cnt FROM prs WHERE created_at > datetime('now', ? || ' days') GROUP BY day ORDER BY day""", (f"-{days}",), ).fetchall() # Daily merged PRs merged_rows = conn.execute( """SELECT date(merged_at) as day, COUNT(*) as cnt FROM prs WHERE status = 'merged' AND merged_at IS NOT NULL AND merged_at > datetime('now', ? || ' days') GROUP BY day ORDER BY day""", (f"-{days}",), ).fetchall() # Get totals BEFORE the window for correct cumulative baseline source_base = conn.execute( "SELECT COUNT(*) as cnt FROM sources WHERE created_at <= datetime('now', ? || ' days')", (f"-{days}",), ).fetchone()["cnt"] pr_base = conn.execute( "SELECT COUNT(*) as cnt FROM prs WHERE created_at <= datetime('now', ? || ' days')", (f"-{days}",), ).fetchone()["cnt"] merged_base = conn.execute( """SELECT COUNT(*) as cnt FROM prs WHERE status = 'merged' AND merged_at IS NOT NULL AND merged_at <= datetime('now', ? || ' days')""", (f"-{days}",), ).fetchone()["cnt"] # Collect all unique dates all_dates = sorted(set( [r["day"] for r in source_rows] + [r["day"] for r in pr_rows] + [r["day"] for r in merged_rows] )) # Build lookup dicts src_by_day = {r["day"]: r["cnt"] for r in source_rows} pr_by_day = {r["day"]: r["cnt"] for r in pr_rows} mrg_by_day = {r["day"]: r["cnt"] for r in merged_rows} # Build cumulative arrays dates = [] sources_cum = [] prs_cum = [] merged_cum = [] s_total = source_base p_total = pr_base m_total = merged_base for day in all_dates: s_total += src_by_day.get(day, 0) p_total += pr_by_day.get(day, 0) m_total += mrg_by_day.get(day, 0) dates.append(day) sources_cum.append(s_total) prs_cum.append(p_total) merged_cum.append(m_total) return web.json_response({ "days": days, "dates": dates, "sources": sources_cum, "prs": prs_cum, "merged": merged_cum, "current": { "sources": s_total, "prs": p_total, "merged": m_total, }, }) finally: conn.close() import re _DATE_PREFIX_RE = re.compile(r"^\d{4}-\d{2}-\d{2}-?") # ─── GET /api/pr-lifecycle ──────────────────────────────────────────────── async def handle_pr_lifecycle(request): """All PRs with eval rounds, reviews, and time-to-merge in one payload. Returns: summary KPIs + per-PR array for the table. Joins prs + audit_log (eval rounds) + review_records. """ conn = request.app["_get_conn"]() try: days = int(request.query.get("days", "30")) day_clause = "AND p.created_at > datetime('now', ? || ' days')" if days < 9999 else "" params = (f"-{days}",) if days < 9999 else () # Base PR data (include cost_usd for actual cost tracking) pr_rows = conn.execute( f"""SELECT p.number, p.agent, p.domain, p.tier, p.status, p.created_at, p.merged_at, p.leo_verdict, p.description, p.domain_agent, p.domain_model, p.branch, p.cost_usd FROM prs p WHERE 1=1 {day_clause} ORDER BY p.number DESC""", params, ).fetchall() # Actual costs from costs table (aggregated, same date window as PRs) cost_day_clause = "AND date > date('now', ? || ' days')" if days < 9999 else "" actual_cost_rows = conn.execute( f"""SELECT SUM(cost_usd) as total_actual_cost, SUM(calls) as total_calls, SUM(input_tokens) as total_input_tokens, SUM(output_tokens) as total_output_tokens FROM costs WHERE cost_usd > 0 {cost_day_clause}""", params, ).fetchone() actual_total_cost = actual_cost_rows["total_actual_cost"] if actual_cost_rows and actual_cost_rows["total_actual_cost"] else 0 # Eval round counts per PR (from audit_log) eval_rows = conn.execute( f"""SELECT CAST(json_extract(detail, '$.pr') AS INTEGER) as pr, COUNT(*) as rounds FROM audit_log WHERE stage = 'evaluate' AND event IN ('approved', 'changes_requested', 'domain_rejected', 'tier05_rejected') AND json_extract(detail, '$.pr') IS NOT NULL GROUP BY pr""", ).fetchall() eval_map = {r["pr"]: r["rounds"] for r in eval_rows} # Review outcomes per PR (from review_records) review_rows = conn.execute( """SELECT pr_number, outcome, GROUP_CONCAT(DISTINCT reviewer) as reviewers, COUNT(*) as review_count FROM review_records GROUP BY pr_number, outcome""", ).fetchall() review_map = {} for r in review_rows: pr = r["pr_number"] if pr not in review_map: review_map[pr] = {"outcomes": [], "reviewers": set(), "count": 0} review_map[pr]["outcomes"].append(r["outcome"]) if r["reviewers"]: review_map[pr]["reviewers"].update(r["reviewers"].split(",")) review_map[pr]["count"] += r["review_count"] # Review snippets for closed PRs — from review_text or issues list snippet_rows = conn.execute( """SELECT CAST(json_extract(detail, '$.pr') AS INTEGER) as pr, COALESCE( json_extract(detail, '$.review_text'), json_extract(detail, '$.domain_review_text'), json_extract(detail, '$.leo_review_text') ) as review_text, json_extract(detail, '$.issues') as issues, json_extract(detail, '$.leo') as leo_verdict FROM audit_log WHERE stage = 'evaluate' AND event IN ('domain_rejected', 'changes_requested') AND json_extract(detail, '$.pr') IS NOT NULL ORDER BY timestamp DESC""", ).fetchall() snippet_map = {} for r in snippet_rows: pr = r["pr"] if pr not in snippet_map: if r["review_text"]: text = r["review_text"].strip() lines = [ln.strip() for ln in text.split("\n") if ln.strip() and not ln.strip().startswith("#")] snippet_map[pr] = lines[0][:200] if lines else text[:200] elif r["issues"]: try: issues = json.loads(r["issues"]) if isinstance(r["issues"], str) else r["issues"] if isinstance(issues, list) and issues: snippet_map[pr] = "Issues: " + ", ".join(str(i).replace("_", " ") for i in issues) except (json.JSONDecodeError, TypeError): pass TIER_COST_EST = { "LIGHT": 0.002, "STANDARD": 0.018, "DEEP": 0.12, } EXTRACT_COST_EST = 0.025 LEO_MODEL_BY_TIER = { "DEEP": "claude-opus-4-20250514", "STANDARD": "anthropic/claude-sonnet-4.5", "LIGHT": None, } # Build PR list prs = [] ttm_values = [] round_values = [] merged_count = 0 closed_count = 0 open_count = 0 for r in pr_rows: pr_num = r["number"] ttm = None if r["merged_at"] and r["created_at"]: try: created = datetime.fromisoformat(r["created_at"]) merged = datetime.fromisoformat(r["merged_at"]) ttm = (merged - created).total_seconds() / 60 if ttm >= 0: ttm_values.append(ttm) else: ttm = None except (ValueError, TypeError): pass rounds = eval_map.get(pr_num, 0) if rounds > 0: round_values.append(rounds) review_info = review_map.get(pr_num) status = r["status"] or "unknown" if status == "merged": merged_count += 1 elif status == "closed": closed_count += 1 elif status == "open": open_count += 1 desc = r["description"] or "" claim_titles = [t.strip() for t in desc.split("|") if t.strip()] if desc.strip() else [] claims_count = len(claim_titles) if claim_titles else 1 summary = None if claim_titles: summary = claim_titles[0][:120] if not summary: branch = r["branch"] or "" prefix = "" if "/" in branch: prefix = branch.split("/", 1)[0] branch = branch.split("/", 1)[1] branch = _DATE_PREFIX_RE.sub("", branch) branch = re.sub(r"-[0-9a-f]{4}$", "", branch) if branch: summary = branch.replace("-", " ").replace("_", " ").strip()[:120] elif prefix: summary = prefix tier = r["tier"] or "STANDARD" actual_cost = r["cost_usd"] if r["cost_usd"] and r["cost_usd"] > 0 else None if actual_cost is not None: cost = round(actual_cost, 4) cost_is_actual = True else: eval_cost = TIER_COST_EST.get(tier, 0.018) * max(rounds, 1) cost = round(EXTRACT_COST_EST + eval_cost, 4) cost_is_actual = False leo_model = LEO_MODEL_BY_TIER.get(tier) prs.append({ "number": pr_num, "agent": r["agent"], "domain": r["domain"], "tier": tier, "status": status, "claims_count": claims_count, "claim_titles": claim_titles, "eval_rounds": rounds, "ttm_minutes": round(ttm, 1) if ttm is not None else None, "created_at": r["created_at"], "merged_at": r["merged_at"], "leo_verdict": r["leo_verdict"], "review_count": review_info["count"] if review_info else 0, "summary": summary, "description": desc if desc.strip() else None, "review_snippet": snippet_map.get(pr_num), "domain_agent": r["domain_agent"], "domain_model": r["domain_model"], "leo_model": leo_model, "cost": cost, "cost_is_actual": cost_is_actual, }) # Summary KPIs ttm_values.sort() round_values.sort() def median(vals): if not vals: return None n = len(vals) if n % 2 == 0: return (vals[n // 2 - 1] + vals[n // 2]) / 2 return vals[n // 2] def p90(vals): if len(vals) < 5: return None return vals[int(len(vals) * 0.9)] # Compute cost summary: actual where available, estimated where not total_actual = sum(p["cost"] for p in prs if p["cost_is_actual"]) total_estimated = sum(p["cost"] for p in prs if not p["cost_is_actual"]) prs_with_actual_cost = sum(1 for p in prs if p["cost_is_actual"]) med_ttm = median(ttm_values) med_rounds = median(round_values) return web.json_response({ "days": days, "total": len(prs), "merged": merged_count, "closed": closed_count, "open": open_count, "median_ttm": round(med_ttm, 1) if med_ttm is not None else None, "p90_ttm": round(p90(ttm_values), 1) if p90(ttm_values) is not None else None, "median_rounds": round(med_rounds, 1) if med_rounds is not None else None, "max_rounds": max(round_values) if round_values else None, "actual_total_cost": round(actual_total_cost, 2), "cost_summary": { "total_actual": round(total_actual, 2), "total_estimated": round(total_estimated, 2), "prs_with_actual_cost": prs_with_actual_cost, "prs_with_estimated_cost": len(prs) - prs_with_actual_cost, }, "prs": prs, }) finally: conn.close() # ─── GET /api/telegram-extractions ─────────────────────────────────────── async def handle_telegram_extractions(request): """Review surface for Telegram conversation extractions. Shows recent PRs sourced from Telegram conversations with claim titles, status, and source info. Designed for quick daily spot-checking. Query params: days (int): lookback window (default 7, max 90) """ conn = request.app["_get_conn"]() try: days = min(int(request.query.get("days", "7")), 90) day_filter = f"-{days}" # Find PRs from Telegram sources (source_path contains 'telegram' or submitted_by is @m3taversal via bot) rows = conn.execute( """SELECT p.number, p.agent, p.domain, p.tier, p.status, p.created_at, p.merged_at, p.description, p.source_path, p.submitted_by, p.branch, p.eval_issues, p.leo_verdict FROM prs p WHERE (p.source_path LIKE '%telegram%' OR p.source_path LIKE '%futardio%') AND p.created_at > datetime('now', ? || ' days') ORDER BY p.number DESC""", (day_filter,), ).fetchall() prs = [] for r in rows: desc = r["description"] or "" claim_titles = [t.strip() for t in desc.split("|") if t.strip()] if desc.strip() else [] issues = None if r["eval_issues"]: try: issues = json.loads(r["eval_issues"]) if isinstance(r["eval_issues"], str) else r["eval_issues"] except (json.JSONDecodeError, TypeError): pass prs.append({ "number": r["number"], "agent": r["agent"], "domain": r["domain"], "tier": r["tier"], "status": r["status"], "created_at": r["created_at"], "merged_at": r["merged_at"], "claim_titles": claim_titles, "source_path": r["source_path"], "submitted_by": r["submitted_by"], "eval_issues": issues, "leo_verdict": r["leo_verdict"], }) # Summary stats merged = sum(1 for p in prs if p["status"] == "merged") closed = sum(1 for p in prs if p["status"] == "closed") open_prs = sum(1 for p in prs if p["status"] == "open") return web.json_response({ "days": days, "total": len(prs), "merged": merged, "closed": closed, "open": open_prs, "merge_rate": round(merged / len(prs) * 100, 1) if prs else 0, "prs": prs, }) finally: conn.close() # ─── GET /api/contributor-growth ───────────────────────────────────────── CODEX_WORKTREE = Path(os.environ.get("MAIN_WORKTREE", "/opt/teleo-eval/workspaces/main")) FOUNDING_CUTOFF = "2026-03-15" CONTRIBUTOR_EXCLUDE = {"Teleo Agents", "Teleo Pipeline"} _growth_cache: dict | None = None _growth_cache_ts: float = 0 GROWTH_CACHE_TTL = 300 async def handle_contributor_growth(request): """Cumulative unique contributors and claims over time from git log. Returns time-series data for Chart.js line charts. Cached for 5 minutes since git log is expensive. """ global _growth_cache, _growth_cache_ts now = time.monotonic() if _growth_cache is not None and (now - _growth_cache_ts) < GROWTH_CACHE_TTL: return web.json_response(_growth_cache) codex_path = str(CODEX_WORKTREE) if not CODEX_WORKTREE.exists(): return web.json_response( {"error": "codex worktree not found", "path": codex_path}, status=404 ) proc = await asyncio.create_subprocess_exec( "git", "log", "--format=%ad|%an", "--date=format:%Y-%m-%d", "--all", cwd=codex_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: return web.json_response( {"error": "git log failed", "detail": stderr.decode()[:500]}, status=500 ) first_seen: dict[str, str] = {} daily_commits: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for line in stdout.decode().strip().split("\n"): if "|" not in line: continue date, author = line.split("|", 1) if author in CONTRIBUTOR_EXCLUDE: continue daily_commits[date][author] += 1 if author not in first_seen or date < first_seen[author]: first_seen[author] = date by_date: dict[str, list[str]] = defaultdict(list) for author, date in first_seen.items(): by_date[date].append(author) contributors_timeline = [] seen: set[str] = set() for date in sorted(by_date.keys()): new_authors = by_date[date] seen.update(new_authors) contributors_timeline.append({ "date": date, "cumulative": len(seen), "new": [{"name": a, "founding": date <= FOUNDING_CUTOFF} for a in sorted(new_authors)], }) proc2 = await asyncio.create_subprocess_exec( "git", "log", "--format=%ad", "--date=format:%Y-%m-%d", "--all", "--diff-filter=A", "--", "domains/*.md", cwd=codex_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout2, _ = await proc2.communicate() claim_counts: dict[str, int] = defaultdict(int) for line in stdout2.decode().strip().split("\n"): line = line.strip() if line: claim_counts[line] += 1 claims_timeline = [] cumulative = 0 for date in sorted(claim_counts.keys()): cumulative += claim_counts[date] claims_timeline.append({"date": date, "cumulative": cumulative, "added": claim_counts[date]}) all_contributors = set(first_seen.keys()) founding = sorted(a for a in all_contributors if first_seen[a] <= FOUNDING_CUTOFF) result = { "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "summary": { "total_contributors": len(all_contributors), "founding_contributors": founding, "total_claims": cumulative, "days_active": (datetime.now(timezone.utc) - datetime(2026, 3, 5, tzinfo=timezone.utc)).days, }, "cumulative_contributors": contributors_timeline, "cumulative_claims": claims_timeline, } _growth_cache = result _growth_cache_ts = now return web.json_response(result) # ─── Registration ────────────────────────────────────────────────────────── def register_dashboard_routes(app: web.Application, get_conn): """Register new dashboard API routes.""" app["_get_conn"] = get_conn app.router.add_get("/api/stage-times", handle_stage_times) app.router.add_get("/api/herfindahl", handle_herfindahl) app.router.add_get("/api/agent-state", handle_agent_state) app.router.add_get("/api/extraction-yield-by-domain", handle_extraction_yield_by_domain) app.router.add_get("/api/agents-dashboard", handle_agents_dashboard) app.router.add_get("/api/cascade-coverage", handle_cascade_coverage) app.router.add_get("/api/review-summary", handle_review_summary) app.router.add_get("/api/agent-scorecard", handle_agent_scorecard) app.router.add_get("/api/trace/{trace_id}", handle_trace) app.router.add_get("/api/growth", handle_growth) app.router.add_get("/api/pr-lifecycle", handle_pr_lifecycle) app.router.add_get("/api/telegram-extractions", handle_telegram_extractions) app.router.add_get("/api/contributor-growth", handle_contributor_growth) app.router.add_get("/api/digest/latest", handle_digest_latest) app.router.add_get("/api/contributor-graph", handle_contributor_graph) async def handle_digest_latest(request): """GET /api/digest/latest — return the most recent scoring digest.""" import json as _json digest_path = "/opt/teleo-eval/logs/scoring-digest-latest.json" try: with open(digest_path) as f: data = _json.load(f) return web.json_response(data) except FileNotFoundError: return web.json_response({"error": "No digest available yet"}, status=404) except Exception as e: return web.json_response({"error": str(e)}, status=500) async def handle_contributor_graph(request): """GET /api/contributor-graph — serve the PNG chart.""" import subprocess, os png_path = "/opt/teleo-eval/static/contributor-graph.png" # Regenerate if older than 1 hour or missing regen = not os.path.exists(png_path) if not regen: age = __import__('time').time() - os.path.getmtime(png_path) regen = age > 3600 if regen: try: subprocess.run( ['python3', '/opt/teleo-eval/scripts/contributor-graph.py'], timeout=30, capture_output=True ) except Exception: pass if not os.path.exists(png_path): return web.Response(text='Chart not available', status=503) return web.FileResponse(png_path, headers={'Content-Type': 'image/png'})