teleo-codex/ops/diagnostics/research_routes.py
m3taversal bf3af00d5d consolidate diagnostics: copy newer/unique files from root /diagnostics/ into teleo-codex/ops/diagnostics/
Files consolidated:
- dashboard_routes.py: root copy (39K) overwrites teleo-codex (34K) — has cost fix + connection leak fix
- dashboard_prs.py: root copy overwrites — has cost display rewrite
- dashboard_epistemic.py: root copy overwrites — has Ship rename
- research_tracking.py: new file, existed only in root /diagnostics/ (reviewed by Ganymede, never committed here)
- research_routes.py: new file, same situation
- ops/db.py: new file, unique to root /diagnostics/ops/

After this commit, root /diagnostics/ contains only stale copies and patch files — safe to delete.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 10:14:40 +02:00

279 lines
9.9 KiB
Python

"""Dashboard API routes for research session + cost tracking.
Argus-side read-only endpoints. These query the data that
research_tracking.py writes to pipeline.db.
Add to app.py after alerting_routes setup.
"""
import json
import sqlite3
from aiohttp import web
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
async def handle_api_research_sessions(request):
"""GET /api/research-sessions?agent=&domain=&days=7
Returns research sessions with linked sources and cost data.
"""
agent = request.query.get("agent")
domain = request.query.get("domain")
try:
days = int(request.query.get("days", 7))
except (ValueError, TypeError):
days = 7
conn = _conn(request.app)
try:
where = ["rs.started_at >= datetime('now', ?)"]
params = [f"-{days} days"]
if agent:
where.append("rs.agent = ?")
params.append(agent)
if domain:
where.append("rs.domain = ?")
params.append(domain)
where_clause = " AND ".join(where)
sessions = conn.execute(f"""
SELECT rs.*,
GROUP_CONCAT(s.path, '||') as source_paths,
GROUP_CONCAT(s.status, '||') as source_statuses,
GROUP_CONCAT(s.claims_count, '||') as source_claims,
GROUP_CONCAT(COALESCE(s.cost_usd, 0), '||') as source_costs
FROM research_sessions rs
LEFT JOIN sources s ON s.session_id = rs.id
WHERE {where_clause}
GROUP BY rs.id
ORDER BY rs.started_at DESC
""", params).fetchall()
result = []
for s in sessions:
sources = []
if s["source_paths"]:
paths = s["source_paths"].split("||")
statuses = (s["source_statuses"] or "").split("||")
claims = (s["source_claims"] or "").split("||")
costs = (s["source_costs"] or "").split("||")
for i, p in enumerate(paths):
sources.append({
"path": p,
"status": statuses[i] if i < len(statuses) else None,
"claims_count": int(claims[i]) if i < len(claims) and claims[i] else 0,
"extraction_cost": float(costs[i]) if i < len(costs) and costs[i] else 0,
})
result.append({
"id": s["id"],
"agent": s["agent"],
"domain": s["domain"],
"topic": s["topic"],
"reasoning": s["reasoning"],
"summary": s["summary"],
"sources_planned": s["sources_planned"],
"sources_produced": s["sources_produced"],
"model": s["model"],
"input_tokens": s["input_tokens"],
"output_tokens": s["output_tokens"],
"research_cost": s["cost_usd"],
"extraction_cost": sum(src["extraction_cost"] for src in sources),
"total_cost": s["cost_usd"] + sum(src["extraction_cost"] for src in sources),
"total_claims": sum(src["claims_count"] for src in sources),
"status": s["status"],
"started_at": s["started_at"],
"completed_at": s["completed_at"],
"sources": sources,
})
# Summary stats
total_sessions = len(result)
total_cost = sum(r["total_cost"] for r in result)
total_claims = sum(r["total_claims"] for r in result)
total_sources = sum(r["sources_produced"] for r in result)
return web.json_response({
"summary": {
"sessions": total_sessions,
"total_cost": round(total_cost, 2),
"total_claims": total_claims,
"total_sources": total_sources,
"avg_cost_per_claim": round(total_cost / total_claims, 4) if total_claims else 0,
"avg_cost_per_session": round(total_cost / total_sessions, 4) if total_sessions else 0,
},
"sessions": result,
})
finally:
conn.close()
async def handle_api_costs(request):
"""GET /api/costs?days=14&by=stage|model|date
Comprehensive cost breakdown. Works with EXISTING data in costs table
plus the new extraction costs once backfilled.
"""
try:
days = int(request.query.get("days", 14))
except (ValueError, TypeError):
days = 14
group_by = request.query.get("by", "stage")
conn = _conn(request.app)
try:
valid_groups = {"stage", "model", "date"}
if group_by not in valid_groups:
group_by = "stage"
rows = conn.execute(f"""
SELECT {group_by},
SUM(calls) as total_calls,
SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cost_usd) as total_cost
FROM costs
WHERE date >= date('now', ?)
GROUP BY {group_by}
ORDER BY total_cost DESC
""", (f"-{days} days",)).fetchall()
result = []
for r in rows:
result.append({
group_by: r[group_by],
"calls": r["total_calls"],
"input_tokens": r["total_input"],
"output_tokens": r["total_output"],
"cost_usd": round(r["total_cost"], 4),
})
grand_total = sum(r["cost_usd"] for r in result)
# Also get per-agent cost from sources table (extraction costs)
agent_costs = conn.execute("""
SELECT p.agent,
COUNT(DISTINCT s.path) as sources,
SUM(s.cost_usd) as extraction_cost,
SUM(s.claims_count) as claims
FROM sources s
LEFT JOIN prs p ON p.source_path = s.path
WHERE s.cost_usd > 0
GROUP BY p.agent
ORDER BY extraction_cost DESC
""").fetchall()
agent_breakdown = []
for r in agent_costs:
agent_breakdown.append({
"agent": r["agent"] or "unlinked",
"sources": r["sources"],
"extraction_cost": round(r["extraction_cost"], 2),
"claims": r["claims"],
"cost_per_claim": round(r["extraction_cost"] / r["claims"], 4) if r["claims"] else 0,
})
return web.json_response({
"period_days": days,
"grand_total": round(grand_total, 2),
"by_" + group_by: result,
"by_agent": agent_breakdown,
})
finally:
conn.close()
async def handle_api_source_detail(request):
"""GET /api/source/{path}
Full lifecycle of a single source: research session → extraction → claims → eval outcomes.
"""
source_path = request.match_info["path"]
conn = _conn(request.app)
try:
# Try exact match first, fall back to suffix match (anchored)
source = conn.execute(
"SELECT * FROM sources WHERE path = ?",
(source_path,),
).fetchone()
if not source:
# Suffix match — anchor with / prefix to avoid substring hits
source = conn.execute(
"SELECT * FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_path}",),
).fetchone()
if not source:
return web.json_response({"error": "Source not found"}, status=404)
result = dict(source)
# Get research session if linked
if source["session_id"]:
session = conn.execute(
"SELECT * FROM research_sessions WHERE id = ?",
(source["session_id"],),
).fetchone()
result["research_session"] = dict(session) if session else None
else:
result["research_session"] = None
# Get PRs from this source
prs = conn.execute(
"SELECT number, status, domain, agent, tier, leo_verdict, domain_verdict, "
"cost_usd, created_at, merged_at, commit_type, transient_retries, substantive_retries, last_error "
"FROM prs WHERE source_path = ?",
(source["path"],),
).fetchall()
result["prs"] = [dict(p) for p in prs]
# Get eval events from audit_log for those PRs
# NOTE: audit_log.detail is mixed — some rows are JSON (evaluate events),
# some are plain text. Use json_valid() to filter safely.
pr_numbers = [p["number"] for p in prs]
if pr_numbers:
placeholders = ",".join("?" * len(pr_numbers))
evals = conn.execute(f"""
SELECT * FROM audit_log
WHERE stage = 'evaluate'
AND json_valid(detail)
AND json_extract(detail, '$.pr') IN ({placeholders})
ORDER BY timestamp
""", pr_numbers).fetchall()
result["eval_history"] = [
{"timestamp": e["timestamp"], "event": e["event"],
"detail": json.loads(e["detail"]) if e["detail"] else None}
for e in evals
]
else:
result["eval_history"] = []
return web.json_response(result)
finally:
conn.close()
def setup_research_routes(app):
"""Register research tracking routes. Call from create_app()."""
app.router.add_get("/api/research-sessions", handle_api_research_sessions)
app.router.add_get("/api/costs", handle_api_costs)
app.router.add_get("/api/source/{path:.+}", handle_api_source_detail)
# Public paths to add to auth middleware
RESEARCH_PUBLIC_PATHS = frozenset({
"/api/research-sessions",
"/api/costs",
})
# /api/source/{path} needs prefix matching — add to auth middleware:
# if path.startswith("/api/source/"): allow