teleo-infrastructure/diagnostics/dashboard_routes.py
m3taversal 88e8e15c6d feat: add /api/digest/latest endpoint for scoring digest data
Serves the latest scoring-digest-latest.json from cron output.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 10:55:24 +01:00

1326 lines
51 KiB
Python

"""New API endpoints for the 4-page dashboard.
Endpoints:
GET /api/stage-times — median dwell time per pipeline stage
GET /api/herfindahl — domain concentration index
GET /api/agent-state — live agent-state from filesystem
GET /api/extraction-yield-by-domain — sources→claims conversion per domain
GET /api/agents-dashboard — batched agent performance payload
Owner: Argus
"""
import asyncio
import json
import logging
import os
import sqlite3
import statistics
import time
import urllib.request
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from aiohttp import web
logger = logging.getLogger("argus.dashboard_routes")
# ─── Claim-index cache (60s TTL) ───────────────────────────────────────────
_claim_index_cache: dict | None = None
_claim_index_ts: float = 0
CLAIM_INDEX_TTL = 60 # seconds
CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index")
AGENT_STATE_DIR = Path(os.environ.get("AGENT_STATE_DIR", "/opt/teleo-eval/agent-state"))
def get_claim_index() -> dict | None:
"""Fetch claim-index with 60s cache."""
global _claim_index_cache, _claim_index_ts
now = time.monotonic()
if _claim_index_cache is not None and (now - _claim_index_ts) < CLAIM_INDEX_TTL:
return _claim_index_cache
try:
with urllib.request.urlopen(CLAIM_INDEX_URL, timeout=5) as resp:
data = json.loads(resp.read())
_claim_index_cache = data
_claim_index_ts = now
return data
except Exception as e:
logger.warning("Failed to fetch claim-index: %s", e)
# Return stale cache if available
return _claim_index_cache
# ─── GET /api/stage-times ──────────────────────────────────────────────────
async def handle_stage_times(request):
"""Median dwell time per pipeline stage from audit_log timestamps.
Stages: discover → validate → evaluate → merge
Returns median minutes between consecutive stages.
"""
conn = request.app["_get_conn"]()
try:
hours = int(request.query.get("hours", "24"))
# Get per-PR event timestamps
rows = conn.execute(
"""SELECT json_extract(detail, '$.pr') as pr, event, timestamp
FROM audit_log
WHERE timestamp > datetime('now', ? || ' hours')
AND json_extract(detail, '$.pr') IS NOT NULL
ORDER BY json_extract(detail, '$.pr'), timestamp""",
(f"-{hours}",),
).fetchall()
# Group by PR
pr_events: dict[int, list] = {}
for r in rows:
pr = r["pr"]
if pr not in pr_events:
pr_events[pr] = []
pr_events[pr].append({"event": r["event"], "ts": r["timestamp"]})
# Compute stage dwell times
stage_pairs = [
("pr_discovered", "tier0_complete", "Ingest → Validate"),
("tier0_complete", "approved", "Validate → Approve"),
("tier0_complete", "domain_rejected", "Validate → Reject"),
("approved", "merged", "Approve → Merge"),
]
stage_times = {}
for start_event, end_event, label in stage_pairs:
durations = []
for pr, events in pr_events.items():
start_ts = None
end_ts = None
for e in events:
if e["event"] == start_event and start_ts is None:
start_ts = e["ts"]
if e["event"] == end_event and end_ts is None:
end_ts = e["ts"]
if start_ts and end_ts:
try:
s = datetime.fromisoformat(start_ts)
e = datetime.fromisoformat(end_ts)
mins = (e - s).total_seconds() / 60
if mins >= 0:
durations.append(mins)
except (ValueError, TypeError):
pass
if durations:
stage_times[label] = {
"median_minutes": round(statistics.median(durations), 1),
"p90_minutes": round(sorted(durations)[int(len(durations) * 0.9)], 1) if len(durations) >= 5 else None,
"count": len(durations),
}
return web.json_response({"hours": hours, "stages": stage_times})
finally:
conn.close()
# ─── GET /api/herfindahl ──────────────────────────────────────────────────
async def handle_herfindahl(request):
"""Domain concentration index (Herfindahl-Hirschman).
HHI = sum of (domain_share^2). 1.0 = single domain, lower = more diverse.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
rows = conn.execute(
"""SELECT domain, COUNT(*) as cnt
FROM prs WHERE status='merged' AND domain IS NOT NULL
AND merged_at > datetime('now', ? || ' days')
GROUP BY domain""",
(f"-{days}",),
).fetchall()
if not rows:
return web.json_response({"hhi": 0, "domains": [], "days": days})
total = sum(r["cnt"] for r in rows)
domains = []
hhi = 0
for r in rows:
share = r["cnt"] / total
hhi += share ** 2
domains.append({
"domain": r["domain"],
"count": r["cnt"],
"share": round(share, 4),
})
domains.sort(key=lambda x: x["count"], reverse=True)
# Interpret: HHI < 0.15 = diverse, 0.15-0.25 = moderate, >0.25 = concentrated
status = "diverse" if hhi < 0.15 else ("moderate" if hhi < 0.25 else "concentrated")
return web.json_response({
"hhi": round(hhi, 4),
"status": status,
"domains": domains,
"total_merged": total,
"days": days,
})
finally:
conn.close()
# ─── GET /api/agent-state ─────────────────────────────────────────────────
async def handle_agent_state(request):
"""Read live agent-state from filesystem. 6 agents, ~1KB each."""
if not AGENT_STATE_DIR.exists():
return web.json_response({"error": "agent-state directory not found", "path": str(AGENT_STATE_DIR)}, status=404)
agents = {}
for agent_dir in sorted(AGENT_STATE_DIR.iterdir()):
if not agent_dir.is_dir():
continue
name = agent_dir.name
state = {"name": name}
# metrics.json
metrics_file = agent_dir / "metrics.json"
if metrics_file.exists():
try:
m = json.loads(metrics_file.read_text())
state["last_active"] = m.get("updated_at")
state["metrics"] = m
except (json.JSONDecodeError, OSError):
state["metrics_error"] = True
# tasks.json
tasks_file = agent_dir / "tasks.json"
if tasks_file.exists():
try:
t = json.loads(tasks_file.read_text())
state["tasks"] = t if isinstance(t, list) else []
state["task_count"] = len(state["tasks"])
except (json.JSONDecodeError, OSError):
state["tasks"] = []
# session.json
session_file = agent_dir / "session.json"
if session_file.exists():
try:
s = json.loads(session_file.read_text())
state["session"] = s
except (json.JSONDecodeError, OSError):
pass
# inbox depth
inbox_dir = agent_dir / "inbox"
if inbox_dir.exists() and inbox_dir.is_dir():
state["inbox_depth"] = len(list(inbox_dir.iterdir()))
else:
state["inbox_depth"] = 0
agents[name] = state
return web.json_response({"agents": agents, "agent_count": len(agents)})
# ─── GET /api/extraction-yield-by-domain ──────────────────────────────────
async def handle_extraction_yield_by_domain(request):
"""Sources → claims conversion rate per domain."""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
# Sources per domain (approximate from PR source_path domain)
source_counts = conn.execute(
"""SELECT domain, COUNT(DISTINCT path) as sources
FROM sources s
JOIN prs p ON p.source_path LIKE '%' || s.path || '%'
WHERE s.created_at > datetime('now', ? || ' days')
GROUP BY domain""",
(f"-{days}",),
).fetchall()
# Fallback: simpler query if the join doesn't work well
merged_by_domain = conn.execute(
"""SELECT domain, COUNT(*) as merged
FROM prs WHERE status='merged' AND domain IS NOT NULL
AND merged_at > datetime('now', ? || ' days')
GROUP BY domain""",
(f"-{days}",),
).fetchall()
sources_by_domain = conn.execute(
"""SELECT domain, COUNT(*) as total_prs,
SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as merged
FROM prs WHERE domain IS NOT NULL
AND created_at > datetime('now', ? || ' days')
GROUP BY domain""",
(f"-{days}",),
).fetchall()
domains = []
for r in sources_by_domain:
total = r["total_prs"] or 0
merged = r["merged"] or 0
domains.append({
"domain": r["domain"],
"total_prs": total,
"merged": merged,
"yield": round(merged / total, 3) if total else 0,
})
domains.sort(key=lambda x: x["merged"], reverse=True)
return web.json_response({"days": days, "domains": domains})
finally:
conn.close()
# ─── GET /api/agents-dashboard ─────────────────────────────────────────────
async def handle_agents_dashboard(request):
"""Batched agent performance payload for Page 3.
Returns per-agent: merged count, rejection rate, yield, CI score,
top rejection reasons, contribution trend (weekly).
All in one response to avoid N client-side fetches.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
# Per-agent merged + rejected counts
agent_stats = conn.execute(
"""SELECT
COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
COUNT(*) as evaluated,
SUM(CASE WHEN event='approved' THEN 1 ELSE 0 END) as approved,
SUM(CASE WHEN event IN ('changes_requested','domain_rejected','tier05_rejected') THEN 1 ELSE 0 END) as rejected
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', ? || ' days')
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL
GROUP BY agent""",
(f"-{days}",),
).fetchall()
agents = {}
for r in agent_stats:
name = r["agent"]
ev = r["evaluated"] or 0
ap = r["approved"] or 0
rj = r["rejected"] or 0
agents[name] = {
"evaluated": ev,
"approved": ap,
"rejected": rj,
"yield": round(ap / ev, 3) if ev else 0,
"rejection_rate": round(rj / ev, 3) if ev else 0,
}
# Per-agent top rejection reasons from prs.eval_issues (Epimetheus correction 2026-04-02)
tag_rows = conn.execute(
"""SELECT agent, value as tag, COUNT(*) as cnt
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND agent IS NOT NULL
AND created_at > datetime('now', ? || ' days')
GROUP BY agent, tag
ORDER BY agent, cnt DESC""",
(f"-{days}",),
).fetchall()
for r in tag_rows:
name = r["agent"]
if name in agents:
if "top_rejections" not in agents[name]:
agents[name]["top_rejections"] = []
if len(agents[name]["top_rejections"]) < 5:
agents[name]["top_rejections"].append({"tag": r["tag"], "count": r["cnt"]})
# Weekly contribution trend per agent
weekly = conn.execute(
"""SELECT
COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
strftime('%Y-W%W', timestamp) as week,
SUM(CASE WHEN event='approved' THEN 1 ELSE 0 END) as merged,
COUNT(*) as evaluated
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', ? || ' days')
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL
GROUP BY agent, week
ORDER BY agent, week""",
(f"-{days}",),
).fetchall()
for r in weekly:
name = r["agent"]
if name in agents:
if "weekly_trend" not in agents[name]:
agents[name]["weekly_trend"] = []
agents[name]["weekly_trend"].append({
"week": r["week"],
"merged": r["merged"] or 0,
"evaluated": r["evaluated"] or 0,
})
# CI scores from contributors table
weights = {"sourcer": 0.15, "extractor": 0.05, "challenger": 0.35, "synthesizer": 0.25, "reviewer": 0.20}
try:
contribs = conn.execute(
"SELECT handle, sourcer_count, extractor_count, challenger_count, "
"synthesizer_count, reviewer_count, claims_merged, tier FROM contributors"
).fetchall()
for c in contribs:
name = c["handle"]
if name not in agents:
agents[name] = {}
ci = sum((c[f"{role}_count"] or 0) * w for role, w in weights.items())
agents[name]["ci_score"] = round(ci, 2)
agents[name]["claims_merged"] = c["claims_merged"] or 0
agents[name]["tier"] = c["tier"]
except sqlite3.Error:
pass
return web.json_response({"days": days, "agents": agents})
finally:
conn.close()
# ─── GET /api/cascade-coverage ────────────────────────────────────────────
async def handle_cascade_coverage(request):
"""Cascade coverage from audit_log stage='cascade' events.
Returns: triggered count, by-agent breakdown, claims affected.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
triggered = conn.execute(
"""SELECT
json_extract(detail, '$.agent') as agent,
COUNT(*) as cnt,
SUM(json_array_length(json_extract(detail, '$.source_claims'))) as claims_affected
FROM audit_log
WHERE stage='cascade' AND event='cascade_triggered'
AND timestamp > datetime('now', ? || ' days')
GROUP BY agent""",
(f"-{days}",),
).fetchall()
summaries = conn.execute(
"""SELECT
SUM(json_extract(detail, '$.notifications_sent')) as total_notifications,
COUNT(*) as total_merges_with_cascade
FROM audit_log
WHERE stage='cascade' AND event='cascade_summary'
AND timestamp > datetime('now', ? || ' days')""",
(f"-{days}",),
).fetchone()
reviewed = conn.execute(
"""SELECT COUNT(*) as cnt
FROM audit_log
WHERE stage='cascade' AND event='cascade_reviewed'
AND timestamp > datetime('now', ? || ' days')""",
(f"-{days}",),
).fetchone()
total_triggered = sum(r["cnt"] for r in triggered)
total_reviewed = reviewed["cnt"] if reviewed else 0
completion_rate = round(total_reviewed / total_triggered, 3) if total_triggered else None
by_agent = [
{"agent": r["agent"], "triggered": r["cnt"], "claims_affected": r["claims_affected"] or 0}
for r in triggered
]
insufficient_data = total_triggered < 5
return web.json_response({
"days": days,
"total_triggered": total_triggered,
"total_reviewed": total_reviewed,
"completion_rate": completion_rate,
"total_notifications": summaries["total_notifications"] if summaries else 0,
"merges_with_cascade": summaries["total_merges_with_cascade"] if summaries else 0,
"by_agent": by_agent,
"insufficient_data": insufficient_data,
})
finally:
conn.close()
# ─── GET /api/review-summary ─────────────────────────────────────────────
async def handle_review_summary(request):
"""Structured review data from review_records table (migration v12).
Cleaner than audit_log parsing — structured outcome, rejection_reason,
disagreement_type columns.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
# Check if table exists and has data
try:
total = conn.execute(
"SELECT COUNT(*) as cnt FROM review_records WHERE reviewed_at > datetime('now', ? || ' days')",
(f"-{days}",),
).fetchone()["cnt"]
except Exception:
return web.json_response({"error": "review_records table not available", "populated": False})
if total == 0:
return web.json_response({"populated": False, "total": 0, "days": days})
# Outcome breakdown
outcomes = conn.execute(
"""SELECT outcome, COUNT(*) as cnt
FROM review_records
WHERE reviewed_at > datetime('now', ? || ' days')
GROUP BY outcome""",
(f"-{days}",),
).fetchall()
# Rejection reasons — try review_records first, fall back to prs.eval_issues
reasons = conn.execute(
"""SELECT rejection_reason, COUNT(*) as cnt
FROM review_records
WHERE rejection_reason IS NOT NULL
AND reviewed_at > datetime('now', ? || ' days')
GROUP BY rejection_reason ORDER BY cnt DESC""",
(f"-{days}",),
).fetchall()
rejection_source = "review_records"
if not reasons:
reasons = conn.execute(
"""SELECT value AS rejection_reason, COUNT(*) as cnt
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND created_at > datetime('now', ? || ' days')
GROUP BY value ORDER BY cnt DESC""",
(f"-{days}",),
).fetchall()
rejection_source = "prs.eval_issues"
# Per-reviewer breakdown
reviewers = conn.execute(
"""SELECT reviewer,
SUM(CASE WHEN outcome='approved' THEN 1 ELSE 0 END) as approved,
SUM(CASE WHEN outcome='approved-with-changes' THEN 1 ELSE 0 END) as approved_with_changes,
SUM(CASE WHEN outcome='rejected' THEN 1 ELSE 0 END) as rejected,
COUNT(*) as total
FROM review_records
WHERE reviewed_at > datetime('now', ? || ' days')
GROUP BY reviewer ORDER BY total DESC""",
(f"-{days}",),
).fetchall()
# Per-domain breakdown
domains = conn.execute(
"""SELECT domain,
SUM(CASE WHEN outcome='rejected' THEN 1 ELSE 0 END) as rejected,
COUNT(*) as total
FROM review_records
WHERE domain IS NOT NULL
AND reviewed_at > datetime('now', ? || ' days')
GROUP BY domain ORDER BY total DESC""",
(f"-{days}",),
).fetchall()
return web.json_response({
"populated": True,
"days": days,
"total": total,
"outcomes": {r["outcome"]: r["cnt"] for r in outcomes},
"rejection_reasons": [{"reason": r["rejection_reason"], "count": r["cnt"]} for r in reasons],
"rejection_source": rejection_source,
"reviewers": [
{"reviewer": r["reviewer"], "approved": r["approved"], "approved_with_changes": r["approved_with_changes"],
"rejected": r["rejected"], "total": r["total"]}
for r in reviewers
],
"domains": [
{"domain": r["domain"], "rejected": r["rejected"], "total": r["total"],
"rejection_rate": round(r["rejected"] / r["total"], 3) if r["total"] else 0}
for r in domains
],
})
finally:
conn.close()
# ─── GET /api/agent-scorecard ──────────────────────────────────────────────
async def handle_agent_scorecard(request):
"""Per-agent scorecard: PRs submitted, review outcomes, rejection reasons.
Data from review_records (structured reviews) + prs (submission counts).
Falls back to prs.eval_issues for rejection reasons when review_records
has no rejections yet.
"""
conn = request.app["_get_conn"]()
try:
try:
days = min(int(request.query.get("days", "30")), 90)
except ValueError:
days = 30
day_filter = f"-{days}"
# PRs submitted per agent
prs_by_agent = conn.execute(
"""SELECT agent, COUNT(*) as cnt FROM prs
WHERE agent IS NOT NULL
AND created_at > datetime('now', ? || ' days')
GROUP BY agent""",
(day_filter,),
).fetchall()
prs_map = {r["agent"]: r["cnt"] for r in prs_by_agent}
# Review outcomes from review_records
review_data = {}
try:
reviews = conn.execute(
"""SELECT reviewer as agent, outcome, COUNT(*) as cnt
FROM review_records
WHERE reviewed_at > datetime('now', ? || ' days')
GROUP BY reviewer, outcome""",
(day_filter,),
).fetchall()
for r in reviews:
agent = r["agent"]
if agent not in review_data:
review_data[agent] = {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0}
review_data[agent][r["outcome"].replace("-", "_")] = r["cnt"]
review_data[agent]["total"] += r["cnt"]
except sqlite3.OperationalError:
pass
# If review_records is empty, fall back to audit_log eval events
if not review_data:
evals = conn.execute(
"""SELECT
COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
event, COUNT(*) as cnt
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', ? || ' days')
GROUP BY agent, event""",
(day_filter,),
).fetchall()
for r in evals:
agent = r["agent"]
if not agent:
continue
if agent not in review_data:
review_data[agent] = {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0}
if r["event"] == "approved":
review_data[agent]["approved"] += r["cnt"]
elif r["event"] == "changes_requested": # fixer auto-remediated; equivalent in pre-review_records era
review_data[agent]["approved_with_changes"] += r["cnt"]
else:
review_data[agent]["rejected"] += r["cnt"]
review_data[agent]["total"] += r["cnt"]
# Rejection reasons from prs.eval_issues (canonical source)
reason_rows = conn.execute(
"""SELECT agent, value as reason, COUNT(*) as cnt
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND agent IS NOT NULL
AND created_at > datetime('now', ? || ' days')
GROUP BY agent, reason ORDER BY agent, cnt DESC""",
(day_filter,),
).fetchall()
reasons_map = {}
for r in reason_rows:
if r["agent"] not in reasons_map:
reasons_map[r["agent"]] = {}
reasons_map[r["agent"]][r["reason"]] = r["cnt"]
# Build scorecards
all_agents = sorted(set(list(prs_map.keys()) + list(review_data.keys())))
scorecards = []
for agent in all_agents:
if agent in ("unknown", None):
continue
rd = review_data.get(agent, {"approved": 0, "approved_with_changes": 0, "rejected": 0, "total": 0})
total_reviews = rd["total"]
approved = rd["approved"]
approved_wc = rd["approved_with_changes"]
rejected = rd["rejected"]
approval_rate = ((approved + approved_wc) / total_reviews * 100) if total_reviews else 0
scorecards.append({
"agent": agent,
"total_prs": prs_map.get(agent, 0),
"total_reviews": total_reviews,
"approved": approved,
"approved_with_changes": approved_wc,
"rejected": rejected,
"approval_rate": round(approval_rate, 1),
"rejection_reasons": reasons_map.get(agent, {}),
})
scorecards.sort(key=lambda x: x["total_reviews"], reverse=True)
return web.json_response({"days": days, "scorecards": scorecards})
finally:
conn.close()
# ─── Trace endpoint ────────────────────────────────────────────────────────
async def handle_trace(request: web.Request) -> web.Response:
"""Return the full lifecycle of a source/PR through the pipeline.
GET /api/trace/1234 → all audit_log + review_records + costs for PR 1234.
One thread, every stage, chronological.
"""
trace_id = request.match_info["trace_id"]
conn = request.app["_get_conn"]()
try:
events = conn.execute(
"""SELECT timestamp, stage, event, detail
FROM audit_log
WHERE trace_id = ?
ORDER BY timestamp""",
(trace_id,),
).fetchall()
if not events:
events = conn.execute(
"""SELECT timestamp, stage, event, detail
FROM audit_log
WHERE CAST(json_extract(detail, '$.pr') AS TEXT) = ?
ORDER BY timestamp""",
(trace_id,),
).fetchall()
reviews = conn.execute(
"""SELECT reviewed_at, reviewer, reviewer_model, outcome,
rejection_reason, disagreement_type, notes, claim_path
FROM review_records
WHERE pr_number = ?
ORDER BY reviewed_at""",
(trace_id,),
).fetchall()
pr = conn.execute(
"""SELECT number, source_path, domain, agent, tier, status,
origin, created_at, merged_at
FROM prs
WHERE number = ?""",
(trace_id,),
).fetchone()
result = {
"trace_id": trace_id,
"pr": dict(pr) if pr else None,
"timeline": [
{"timestamp": r[0], "stage": r[1], "event": r[2],
"detail": json.loads(r[3]) if r[3] else None}
for r in events
],
"reviews": [
{"reviewed_at": r[0], "reviewer": r[1], "model": r[2],
"outcome": r[3], "rejection_reason": r[4],
"disagreement_type": r[5], "notes": r[6], "claim_path": r[7]}
for r in reviews
],
}
return web.json_response(result)
finally:
conn.close()
# ─── GET /api/growth ──────────────────────────────────────────────────────
async def handle_growth(request):
"""Cumulative growth of sources, PRs, and merged claims over time.
Returns daily data points with running totals for each series.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "90"))
# Daily new sources
source_rows = conn.execute(
"""SELECT date(created_at) as day, COUNT(*) as cnt
FROM sources
WHERE created_at > datetime('now', ? || ' days')
GROUP BY day ORDER BY day""",
(f"-{days}",),
).fetchall()
# Daily new PRs
pr_rows = conn.execute(
"""SELECT date(created_at) as day, COUNT(*) as cnt
FROM prs
WHERE created_at > datetime('now', ? || ' days')
GROUP BY day ORDER BY day""",
(f"-{days}",),
).fetchall()
# Daily merged PRs
merged_rows = conn.execute(
"""SELECT date(merged_at) as day, COUNT(*) as cnt
FROM prs
WHERE status = 'merged' AND merged_at IS NOT NULL
AND merged_at > datetime('now', ? || ' days')
GROUP BY day ORDER BY day""",
(f"-{days}",),
).fetchall()
# Get totals BEFORE the window for correct cumulative baseline
source_base = conn.execute(
"SELECT COUNT(*) as cnt FROM sources WHERE created_at <= datetime('now', ? || ' days')",
(f"-{days}",),
).fetchone()["cnt"]
pr_base = conn.execute(
"SELECT COUNT(*) as cnt FROM prs WHERE created_at <= datetime('now', ? || ' days')",
(f"-{days}",),
).fetchone()["cnt"]
merged_base = conn.execute(
"""SELECT COUNT(*) as cnt FROM prs
WHERE status = 'merged' AND merged_at IS NOT NULL
AND merged_at <= datetime('now', ? || ' days')""",
(f"-{days}",),
).fetchone()["cnt"]
# Collect all unique dates
all_dates = sorted(set(
[r["day"] for r in source_rows] +
[r["day"] for r in pr_rows] +
[r["day"] for r in merged_rows]
))
# Build lookup dicts
src_by_day = {r["day"]: r["cnt"] for r in source_rows}
pr_by_day = {r["day"]: r["cnt"] for r in pr_rows}
mrg_by_day = {r["day"]: r["cnt"] for r in merged_rows}
# Build cumulative arrays
dates = []
sources_cum = []
prs_cum = []
merged_cum = []
s_total = source_base
p_total = pr_base
m_total = merged_base
for day in all_dates:
s_total += src_by_day.get(day, 0)
p_total += pr_by_day.get(day, 0)
m_total += mrg_by_day.get(day, 0)
dates.append(day)
sources_cum.append(s_total)
prs_cum.append(p_total)
merged_cum.append(m_total)
return web.json_response({
"days": days,
"dates": dates,
"sources": sources_cum,
"prs": prs_cum,
"merged": merged_cum,
"current": {
"sources": s_total,
"prs": p_total,
"merged": m_total,
},
})
finally:
conn.close()
import re
_DATE_PREFIX_RE = re.compile(r"^\d{4}-\d{2}-\d{2}-?")
# ─── GET /api/pr-lifecycle ────────────────────────────────────────────────
async def handle_pr_lifecycle(request):
"""All PRs with eval rounds, reviews, and time-to-merge in one payload.
Returns: summary KPIs + per-PR array for the table.
Joins prs + audit_log (eval rounds) + review_records.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
day_clause = "AND p.created_at > datetime('now', ? || ' days')" if days < 9999 else ""
params = (f"-{days}",) if days < 9999 else ()
# Base PR data (include cost_usd for actual cost tracking)
pr_rows = conn.execute(
f"""SELECT p.number, p.agent, p.domain, p.tier, p.status,
p.created_at, p.merged_at, p.leo_verdict, p.description,
p.domain_agent, p.domain_model, p.branch, p.cost_usd
FROM prs p
WHERE 1=1 {day_clause}
ORDER BY p.number DESC""",
params,
).fetchall()
# Actual costs from costs table (aggregated, same date window as PRs)
cost_day_clause = "AND date > date('now', ? || ' days')" if days < 9999 else ""
actual_cost_rows = conn.execute(
f"""SELECT SUM(cost_usd) as total_actual_cost,
SUM(calls) as total_calls,
SUM(input_tokens) as total_input_tokens,
SUM(output_tokens) as total_output_tokens
FROM costs
WHERE cost_usd > 0 {cost_day_clause}""",
params,
).fetchone()
actual_total_cost = actual_cost_rows["total_actual_cost"] if actual_cost_rows and actual_cost_rows["total_actual_cost"] else 0
# Eval round counts per PR (from audit_log)
eval_rows = conn.execute(
f"""SELECT CAST(json_extract(detail, '$.pr') AS INTEGER) as pr,
COUNT(*) as rounds
FROM audit_log
WHERE stage = 'evaluate'
AND event IN ('approved', 'changes_requested', 'domain_rejected', 'tier05_rejected')
AND json_extract(detail, '$.pr') IS NOT NULL
GROUP BY pr""",
).fetchall()
eval_map = {r["pr"]: r["rounds"] for r in eval_rows}
# Review outcomes per PR (from review_records)
review_rows = conn.execute(
"""SELECT pr_number, outcome,
GROUP_CONCAT(DISTINCT reviewer) as reviewers,
COUNT(*) as review_count
FROM review_records
GROUP BY pr_number, outcome""",
).fetchall()
review_map = {}
for r in review_rows:
pr = r["pr_number"]
if pr not in review_map:
review_map[pr] = {"outcomes": [], "reviewers": set(), "count": 0}
review_map[pr]["outcomes"].append(r["outcome"])
if r["reviewers"]:
review_map[pr]["reviewers"].update(r["reviewers"].split(","))
review_map[pr]["count"] += r["review_count"]
# Review snippets for closed PRs — from review_text or issues list
snippet_rows = conn.execute(
"""SELECT CAST(json_extract(detail, '$.pr') AS INTEGER) as pr,
COALESCE(
json_extract(detail, '$.review_text'),
json_extract(detail, '$.domain_review_text'),
json_extract(detail, '$.leo_review_text')
) as review_text,
json_extract(detail, '$.issues') as issues,
json_extract(detail, '$.leo') as leo_verdict
FROM audit_log
WHERE stage = 'evaluate'
AND event IN ('domain_rejected', 'changes_requested')
AND json_extract(detail, '$.pr') IS NOT NULL
ORDER BY timestamp DESC""",
).fetchall()
snippet_map = {}
for r in snippet_rows:
pr = r["pr"]
if pr not in snippet_map:
if r["review_text"]:
text = r["review_text"].strip()
lines = [ln.strip() for ln in text.split("\n") if ln.strip() and not ln.strip().startswith("#")]
snippet_map[pr] = lines[0][:200] if lines else text[:200]
elif r["issues"]:
try:
issues = json.loads(r["issues"]) if isinstance(r["issues"], str) else r["issues"]
if isinstance(issues, list) and issues:
snippet_map[pr] = "Issues: " + ", ".join(str(i).replace("_", " ") for i in issues)
except (json.JSONDecodeError, TypeError):
pass
TIER_COST_EST = {
"LIGHT": 0.002,
"STANDARD": 0.018,
"DEEP": 0.12,
}
EXTRACT_COST_EST = 0.025
LEO_MODEL_BY_TIER = {
"DEEP": "claude-opus-4-20250514",
"STANDARD": "anthropic/claude-sonnet-4.5",
"LIGHT": None,
}
# Build PR list
prs = []
ttm_values = []
round_values = []
merged_count = 0
closed_count = 0
open_count = 0
for r in pr_rows:
pr_num = r["number"]
ttm = None
if r["merged_at"] and r["created_at"]:
try:
created = datetime.fromisoformat(r["created_at"])
merged = datetime.fromisoformat(r["merged_at"])
ttm = (merged - created).total_seconds() / 60
if ttm >= 0:
ttm_values.append(ttm)
else:
ttm = None
except (ValueError, TypeError):
pass
rounds = eval_map.get(pr_num, 0)
if rounds > 0:
round_values.append(rounds)
review_info = review_map.get(pr_num)
status = r["status"] or "unknown"
if status == "merged":
merged_count += 1
elif status == "closed":
closed_count += 1
elif status == "open":
open_count += 1
desc = r["description"] or ""
claim_titles = [t.strip() for t in desc.split("|") if t.strip()] if desc.strip() else []
claims_count = len(claim_titles) if claim_titles else 1
summary = None
if claim_titles:
summary = claim_titles[0][:120]
if not summary:
branch = r["branch"] or ""
prefix = ""
if "/" in branch:
prefix = branch.split("/", 1)[0]
branch = branch.split("/", 1)[1]
branch = _DATE_PREFIX_RE.sub("", branch)
branch = re.sub(r"-[0-9a-f]{4}$", "", branch)
if branch:
summary = branch.replace("-", " ").replace("_", " ").strip()[:120]
elif prefix:
summary = prefix
tier = r["tier"] or "STANDARD"
actual_cost = r["cost_usd"] if r["cost_usd"] and r["cost_usd"] > 0 else None
if actual_cost is not None:
cost = round(actual_cost, 4)
cost_is_actual = True
else:
eval_cost = TIER_COST_EST.get(tier, 0.018) * max(rounds, 1)
cost = round(EXTRACT_COST_EST + eval_cost, 4)
cost_is_actual = False
leo_model = LEO_MODEL_BY_TIER.get(tier)
prs.append({
"number": pr_num,
"agent": r["agent"],
"domain": r["domain"],
"tier": tier,
"status": status,
"claims_count": claims_count,
"claim_titles": claim_titles,
"eval_rounds": rounds,
"ttm_minutes": round(ttm, 1) if ttm is not None else None,
"created_at": r["created_at"],
"merged_at": r["merged_at"],
"leo_verdict": r["leo_verdict"],
"review_count": review_info["count"] if review_info else 0,
"summary": summary,
"description": desc if desc.strip() else None,
"review_snippet": snippet_map.get(pr_num),
"domain_agent": r["domain_agent"],
"domain_model": r["domain_model"],
"leo_model": leo_model,
"cost": cost,
"cost_is_actual": cost_is_actual,
})
# Summary KPIs
ttm_values.sort()
round_values.sort()
def median(vals):
if not vals:
return None
n = len(vals)
if n % 2 == 0:
return (vals[n // 2 - 1] + vals[n // 2]) / 2
return vals[n // 2]
def p90(vals):
if len(vals) < 5:
return None
return vals[int(len(vals) * 0.9)]
# Compute cost summary: actual where available, estimated where not
total_actual = sum(p["cost"] for p in prs if p["cost_is_actual"])
total_estimated = sum(p["cost"] for p in prs if not p["cost_is_actual"])
prs_with_actual_cost = sum(1 for p in prs if p["cost_is_actual"])
med_ttm = median(ttm_values)
med_rounds = median(round_values)
return web.json_response({
"days": days,
"total": len(prs),
"merged": merged_count,
"closed": closed_count,
"open": open_count,
"median_ttm": round(med_ttm, 1) if med_ttm is not None else None,
"p90_ttm": round(p90(ttm_values), 1) if p90(ttm_values) is not None else None,
"median_rounds": round(med_rounds, 1) if med_rounds is not None else None,
"max_rounds": max(round_values) if round_values else None,
"actual_total_cost": round(actual_total_cost, 2),
"cost_summary": {
"total_actual": round(total_actual, 2),
"total_estimated": round(total_estimated, 2),
"prs_with_actual_cost": prs_with_actual_cost,
"prs_with_estimated_cost": len(prs) - prs_with_actual_cost,
},
"prs": prs,
})
finally:
conn.close()
# ─── GET /api/telegram-extractions ───────────────────────────────────────
async def handle_telegram_extractions(request):
"""Review surface for Telegram conversation extractions.
Shows recent PRs sourced from Telegram conversations with claim titles,
status, and source info. Designed for quick daily spot-checking.
Query params:
days (int): lookback window (default 7, max 90)
"""
conn = request.app["_get_conn"]()
try:
days = min(int(request.query.get("days", "7")), 90)
day_filter = f"-{days}"
# Find PRs from Telegram sources (source_path contains 'telegram' or submitted_by is @m3taversal via bot)
rows = conn.execute(
"""SELECT p.number, p.agent, p.domain, p.tier, p.status,
p.created_at, p.merged_at, p.description, p.source_path,
p.submitted_by, p.branch, p.eval_issues, p.leo_verdict
FROM prs p
WHERE (p.source_path LIKE '%telegram%' OR p.source_path LIKE '%futardio%')
AND p.created_at > datetime('now', ? || ' days')
ORDER BY p.number DESC""",
(day_filter,),
).fetchall()
prs = []
for r in rows:
desc = r["description"] or ""
claim_titles = [t.strip() for t in desc.split("|") if t.strip()] if desc.strip() else []
issues = None
if r["eval_issues"]:
try:
issues = json.loads(r["eval_issues"]) if isinstance(r["eval_issues"], str) else r["eval_issues"]
except (json.JSONDecodeError, TypeError):
pass
prs.append({
"number": r["number"],
"agent": r["agent"],
"domain": r["domain"],
"tier": r["tier"],
"status": r["status"],
"created_at": r["created_at"],
"merged_at": r["merged_at"],
"claim_titles": claim_titles,
"source_path": r["source_path"],
"submitted_by": r["submitted_by"],
"eval_issues": issues,
"leo_verdict": r["leo_verdict"],
})
# Summary stats
merged = sum(1 for p in prs if p["status"] == "merged")
closed = sum(1 for p in prs if p["status"] == "closed")
open_prs = sum(1 for p in prs if p["status"] == "open")
return web.json_response({
"days": days,
"total": len(prs),
"merged": merged,
"closed": closed,
"open": open_prs,
"merge_rate": round(merged / len(prs) * 100, 1) if prs else 0,
"prs": prs,
})
finally:
conn.close()
# ─── GET /api/contributor-growth ─────────────────────────────────────────
CODEX_WORKTREE = Path(os.environ.get("MAIN_WORKTREE", "/opt/teleo-eval/workspaces/main"))
FOUNDING_CUTOFF = "2026-03-15"
CONTRIBUTOR_EXCLUDE = {"Teleo Agents", "Teleo Pipeline"}
_growth_cache: dict | None = None
_growth_cache_ts: float = 0
GROWTH_CACHE_TTL = 300
async def handle_contributor_growth(request):
"""Cumulative unique contributors and claims over time from git log.
Returns time-series data for Chart.js line charts.
Cached for 5 minutes since git log is expensive.
"""
global _growth_cache, _growth_cache_ts
now = time.monotonic()
if _growth_cache is not None and (now - _growth_cache_ts) < GROWTH_CACHE_TTL:
return web.json_response(_growth_cache)
codex_path = str(CODEX_WORKTREE)
if not CODEX_WORKTREE.exists():
return web.json_response(
{"error": "codex worktree not found", "path": codex_path}, status=404
)
proc = await asyncio.create_subprocess_exec(
"git", "log", "--format=%ad|%an", "--date=format:%Y-%m-%d", "--all",
cwd=codex_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{"error": "git log failed", "detail": stderr.decode()[:500]}, status=500
)
first_seen: dict[str, str] = {}
daily_commits: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for line in stdout.decode().strip().split("\n"):
if "|" not in line:
continue
date, author = line.split("|", 1)
if author in CONTRIBUTOR_EXCLUDE:
continue
daily_commits[date][author] += 1
if author not in first_seen or date < first_seen[author]:
first_seen[author] = date
by_date: dict[str, list[str]] = defaultdict(list)
for author, date in first_seen.items():
by_date[date].append(author)
contributors_timeline = []
seen: set[str] = set()
for date in sorted(by_date.keys()):
new_authors = by_date[date]
seen.update(new_authors)
contributors_timeline.append({
"date": date,
"cumulative": len(seen),
"new": [{"name": a, "founding": date <= FOUNDING_CUTOFF} for a in sorted(new_authors)],
})
proc2 = await asyncio.create_subprocess_exec(
"git", "log", "--format=%ad", "--date=format:%Y-%m-%d",
"--all", "--diff-filter=A", "--", "domains/*.md",
cwd=codex_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout2, _ = await proc2.communicate()
claim_counts: dict[str, int] = defaultdict(int)
for line in stdout2.decode().strip().split("\n"):
line = line.strip()
if line:
claim_counts[line] += 1
claims_timeline = []
cumulative = 0
for date in sorted(claim_counts.keys()):
cumulative += claim_counts[date]
claims_timeline.append({"date": date, "cumulative": cumulative, "added": claim_counts[date]})
all_contributors = set(first_seen.keys())
founding = sorted(a for a in all_contributors if first_seen[a] <= FOUNDING_CUTOFF)
result = {
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"summary": {
"total_contributors": len(all_contributors),
"founding_contributors": founding,
"total_claims": cumulative,
"days_active": (datetime.now(timezone.utc) - datetime(2026, 3, 5, tzinfo=timezone.utc)).days,
},
"cumulative_contributors": contributors_timeline,
"cumulative_claims": claims_timeline,
}
_growth_cache = result
_growth_cache_ts = now
return web.json_response(result)
# ─── Registration ──────────────────────────────────────────────────────────
def register_dashboard_routes(app: web.Application, get_conn):
"""Register new dashboard API routes."""
app["_get_conn"] = get_conn
app.router.add_get("/api/stage-times", handle_stage_times)
app.router.add_get("/api/herfindahl", handle_herfindahl)
app.router.add_get("/api/agent-state", handle_agent_state)
app.router.add_get("/api/extraction-yield-by-domain", handle_extraction_yield_by_domain)
app.router.add_get("/api/agents-dashboard", handle_agents_dashboard)
app.router.add_get("/api/cascade-coverage", handle_cascade_coverage)
app.router.add_get("/api/review-summary", handle_review_summary)
app.router.add_get("/api/agent-scorecard", handle_agent_scorecard)
app.router.add_get("/api/trace/{trace_id}", handle_trace)
app.router.add_get("/api/growth", handle_growth)
app.router.add_get("/api/pr-lifecycle", handle_pr_lifecycle)
app.router.add_get("/api/telegram-extractions", handle_telegram_extractions)
app.router.add_get("/api/contributor-growth", handle_contributor_growth)
app.router.add_get("/api/digest/latest", handle_digest_latest)
async def handle_digest_latest(request):
"""GET /api/digest/latest — return the most recent scoring digest."""
import json as _json
digest_path = "/opt/teleo-eval/logs/scoring-digest-latest.json"
try:
with open(digest_path) as f:
data = _json.load(f)
return web.json_response(data)
except FileNotFoundError:
return web.json_response({"error": "No digest available yet"}, status=404)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)