teleo-infrastructure/diagnostics/app.py
m3taversal 8ff4784fcb fix: dashboard queries Forgejo directly for PR backlog, not just DB
Dashboard showed 1 conflict when Forgejo had 30 open PRs because it
only queried pipeline.db — which misses all agent-created PRs (Rio,
Leo, etc.). Now queries Forgejo API for authoritative open/unmergeable
counts. Falls back to DB if Forgejo unreachable.

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-24 15:12:58 +00:00

944 lines
39 KiB
Python

"""Argus — Diagnostics dashboard for the Teleo pipeline.
Separate aiohttp service (port 8081) that reads pipeline.db read-only.
Provides Chart.js operational dashboard, quality vital signs, and contributor analytics.
Owner: Argus <0ECBE5A7-EFAD-4A59-B491-635A1AEDF5DE>
Data source: Epimetheus's pipeline.db (read-only SQLite)
"""
import json
import logging
import os
import sqlite3
import statistics
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from aiohttp import web
logger = logging.getLogger("argus")
# --- Config ---
DB_PATH = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
PORT = int(os.environ.get("ARGUS_PORT", "8081"))
REPO_DIR = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index")
def _get_db() -> sqlite3.Connection:
"""Open read-only connection to pipeline.db."""
# URI mode for true OS-level read-only (Rhea: belt and suspenders)
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
return conn
def _conn(request) -> sqlite3.Connection:
"""Get DB connection with health check. Reopens if stale."""
conn = request.app["db"]
try:
conn.execute("SELECT 1")
except sqlite3.Error:
conn = _get_db()
request.app["db"] = conn
return conn
# ─── Data queries ────────────────────────────────────────────────────────────
def _current_metrics(conn) -> dict:
"""Compute current operational metrics from live DB state."""
# Throughput (merged in last hour)
merged_1h = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE merged_at > datetime('now', '-1 hour')"
).fetchone()["n"]
# PR status counts
statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
status_map = {r["status"]: r["n"] for r in statuses}
# Approval rate (24h) from audit_log
evaluated = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' "
"AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') "
"AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approved = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' "
"AND event='approved' AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approval_rate = round(approved / evaluated, 3) if evaluated else 0
# Rejection reasons (24h) — count events AND unique PRs
reasons = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt,
COUNT(DISTINCT json_extract(detail, '$.pr')) as unique_prs
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now','-24 hours')
GROUP BY tag ORDER BY cnt DESC LIMIT 10"""
).fetchall()
# Fix cycle
fix_stats = conn.execute(
"SELECT COUNT(*) as attempted, "
"SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as succeeded "
"FROM prs WHERE fix_attempts > 0"
).fetchone()
fix_attempted = fix_stats["attempted"] or 0
fix_succeeded = fix_stats["succeeded"] or 0
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted else 0
# Median time to merge (24h)
merge_times = conn.execute(
"SELECT (julianday(merged_at) - julianday(created_at)) * 24 * 60 as minutes "
"FROM prs WHERE merged_at IS NOT NULL AND merged_at > datetime('now', '-24 hours')"
).fetchall()
durations = [r["minutes"] for r in merge_times if r["minutes"] and r["minutes"] > 0]
median_ttm = round(statistics.median(durations), 1) if durations else None
# Source pipeline
source_statuses = conn.execute(
"SELECT status, COUNT(*) as n FROM sources GROUP BY status"
).fetchall()
source_map = {r["status"]: r["n"] for r in source_statuses}
# Domain breakdown
domain_counts = conn.execute(
"SELECT domain, status, COUNT(*) as n FROM prs GROUP BY domain, status"
).fetchall()
domains = {}
for r in domain_counts:
d = r["domain"] or "unknown"
if d not in domains:
domains[d] = {}
domains[d][r["status"]] = r["n"]
# Breakers
breakers = conn.execute(
"SELECT name, state, failures, last_success_at FROM circuit_breakers"
).fetchall()
breaker_map = {}
for b in breakers:
info = {"state": b["state"], "failures": b["failures"]}
if b["last_success_at"]:
last = datetime.fromisoformat(b["last_success_at"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - last).total_seconds()
info["age_s"] = round(age_s)
breaker_map[b["name"]] = info
return {
"throughput_1h": merged_1h,
"approval_rate": approval_rate,
"evaluated_24h": evaluated,
"approved_24h": approved,
"status_map": status_map,
"source_map": source_map,
"rejection_reasons": [{"tag": r["tag"], "count": r["cnt"], "unique_prs": r["unique_prs"]} for r in reasons],
"fix_rate": fix_rate,
"fix_attempted": fix_attempted,
"fix_succeeded": fix_succeeded,
"median_ttm_minutes": median_ttm,
"domains": domains,
"breakers": breaker_map,
}
def _snapshot_history(conn, days: int = 7) -> list[dict]:
"""Get metrics_snapshots time series."""
rows = conn.execute(
"SELECT * FROM metrics_snapshots WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC",
(f"-{days}",),
).fetchall()
return [dict(r) for r in rows]
def _version_changes(conn, days: int = 30) -> list[dict]:
"""Get prompt/pipeline version change events for chart annotations."""
rows = conn.execute(
"SELECT ts, prompt_version, pipeline_version FROM metrics_snapshots "
"WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC",
(f"-{days}",),
).fetchall()
changes = []
prev_prompt = prev_pipeline = None
for row in rows:
if row["prompt_version"] != prev_prompt and prev_prompt is not None:
changes.append({"ts": row["ts"], "type": "prompt", "from": prev_prompt, "to": row["prompt_version"]})
if row["pipeline_version"] != prev_pipeline and prev_pipeline is not None:
changes.append({"ts": row["ts"], "type": "pipeline", "from": prev_pipeline, "to": row["pipeline_version"]})
prev_prompt = row["prompt_version"]
prev_pipeline = row["pipeline_version"]
return changes
def _contributor_leaderboard(conn, limit: int = 20) -> list[dict]:
"""Top contributors by CI score."""
rows = conn.execute(
"SELECT handle, tier, claims_merged, sourcer_count, extractor_count, "
"challenger_count, synthesizer_count, reviewer_count, domains, last_contribution "
"FROM contributors ORDER BY claims_merged DESC LIMIT ?",
(limit,),
).fetchall()
weights = {"sourcer": 0.15, "extractor": 0.40, "challenger": 0.20, "synthesizer": 0.15, "reviewer": 0.10}
result = []
for r in rows:
ci = sum((r[f"{role}_count"] or 0) * w for role, w in weights.items())
result.append({
"handle": r["handle"],
"tier": r["tier"],
"claims_merged": r["claims_merged"] or 0,
"ci": round(ci, 2),
"domains": json.loads(r["domains"]) if r["domains"] else [],
"last_contribution": r["last_contribution"],
})
return sorted(result, key=lambda x: x["ci"], reverse=True)
# ─── Vital signs (Vida's five) ───────────────────────────────────────────────
def _fetch_claim_index() -> dict | None:
"""Fetch claim-index from Epimetheus. Returns parsed JSON or None on failure."""
try:
with urllib.request.urlopen(CLAIM_INDEX_URL, timeout=5) as resp:
return json.loads(resp.read())
except Exception as e:
logger.warning("Failed to fetch claim-index from %s: %s", CLAIM_INDEX_URL, e)
return None
def _compute_vital_signs(conn) -> dict:
"""Compute Vida's five vital signs from DB state + claim-index."""
# 1. Review throughput — backlog and latency
# Query Forgejo directly for authoritative PR counts (DB misses agent-created PRs)
forgejo_open = 0
forgejo_unmergeable = 0
try:
import requests as _req
_token = Path("/opt/teleo-eval/secrets/forgejo-token").read_text().strip() if Path("/opt/teleo-eval/secrets/forgejo-token").exists() else ""
_resp = _req.get(
"http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50",
headers={"Authorization": f"token {_token}"} if _token else {},
timeout=10,
)
if _resp.status_code == 200:
_prs = _resp.json()
forgejo_open = len(_prs)
forgejo_unmergeable = sum(1 for p in _prs if not p.get("mergeable", True))
except Exception:
# Fallback to DB counts if Forgejo unreachable
forgejo_open = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='open'").fetchone()["n"]
open_prs = forgejo_open
conflict_prs = forgejo_unmergeable
conflict_permanent_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='conflict_permanent'").fetchone()["n"]
approved_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='approved'").fetchone()["n"]
reviewing_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='reviewing'").fetchone()["n"]
backlog = open_prs
oldest_open = conn.execute(
"SELECT MIN(created_at) as oldest FROM prs WHERE status='open'"
).fetchone()
review_latency_h = None
if oldest_open and oldest_open["oldest"]:
oldest = datetime.fromisoformat(oldest_open["oldest"])
if oldest.tzinfo is None:
oldest = oldest.replace(tzinfo=timezone.utc)
review_latency_h = round((datetime.now(timezone.utc) - oldest).total_seconds() / 3600, 1)
# 2-5. Claim-index vital signs
ci = _fetch_claim_index()
orphan_ratio = None
linkage_density = None
confidence_dist = {}
evidence_freshness = None
claim_index_status = "unavailable"
if ci and ci.get("claims"):
claims = ci["claims"]
total = len(claims)
claim_index_status = "live"
# 2. Orphan ratio (Vida: <15% healthy)
orphan_count = ci.get("orphan_count", sum(1 for c in claims if c.get("incoming_count", 0) == 0))
orphan_ratio = round(orphan_count / total, 3) if total else 0
# 3. Linkage density — avg outgoing links per claim + cross-domain ratio
total_outgoing = sum(c.get("outgoing_count", 0) for c in claims)
avg_links = round(total_outgoing / total, 2) if total else 0
cross_domain = ci.get("cross_domain_links", 0)
linkage_density = {
"avg_outgoing_links": avg_links,
"cross_domain_links": cross_domain,
"cross_domain_ratio": round(cross_domain / total_outgoing, 3) if total_outgoing else 0,
}
# 4. Confidence distribution + calibration
for c in claims:
conf = c.get("confidence", "unknown")
confidence_dist[conf] = confidence_dist.get(conf, 0) + 1
# Normalize to percentages
confidence_pct = {k: round(v / total * 100, 1) for k, v in sorted(confidence_dist.items())}
# 5. Evidence freshness — avg age of claims in days
today = datetime.now(timezone.utc).date()
ages = []
for c in claims:
try:
if c.get("created"):
created = datetime.strptime(c["created"], "%Y-%m-%d").date()
ages.append((today - created).days)
except (ValueError, KeyError, TypeError):
pass
avg_age_days = round(statistics.mean(ages)) if ages else None
median_age_days = round(statistics.median(ages)) if ages else None
fresh_30d = sum(1 for a in ages if a <= 30)
evidence_freshness = {
"avg_age_days": avg_age_days,
"median_age_days": median_age_days,
"fresh_30d_count": fresh_30d,
"fresh_30d_pct": round(fresh_30d / total * 100, 1) if total else 0,
}
# Domain activity (last 7 days) — stagnation detection
domain_activity = conn.execute(
"SELECT domain, COUNT(*) as n, MAX(last_attempt) as latest "
"FROM prs WHERE last_attempt > datetime('now', '-7 days') GROUP BY domain"
).fetchall()
stagnant_domains = []
active_domains = []
for r in domain_activity:
active_domains.append({"domain": r["domain"], "prs_7d": r["n"], "latest": r["latest"]})
all_domains = conn.execute("SELECT DISTINCT domain FROM prs WHERE domain IS NOT NULL").fetchall()
active_names = {r["domain"] for r in domain_activity}
for r in all_domains:
if r["domain"] not in active_names:
stagnant_domains.append(r["domain"])
# Pipeline funnel
total_sources = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"]
queued_sources = conn.execute(
"SELECT COUNT(*) as n FROM sources WHERE status='unprocessed'"
).fetchone()["n"]
extracted_sources = conn.execute(
"SELECT COUNT(*) as n FROM sources WHERE status='extracted'"
).fetchone()["n"]
merged_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"]
total_prs = conn.execute("SELECT COUNT(*) as n FROM prs").fetchone()["n"]
funnel = {
"sources_total": total_sources,
"sources_queued": queued_sources,
"sources_extracted": extracted_sources,
"prs_total": total_prs,
"prs_merged": merged_prs,
"conversion_rate": round(merged_prs / total_prs, 3) if total_prs else 0,
}
return {
"claim_index_status": claim_index_status,
"review_throughput": {
"backlog": backlog,
"open_prs": open_prs,
"approved_waiting": approved_prs,
"conflict_prs": conflict_prs,
"conflict_permanent_prs": conflict_permanent_prs,
"reviewing_prs": reviewing_prs,
"oldest_open_hours": review_latency_h,
"status": "healthy" if backlog <= 3 else ("warning" if backlog <= 10 else "critical"),
},
"orphan_ratio": {
"ratio": orphan_ratio,
"count": ci.get("orphan_count") if ci else None,
"total": ci.get("total_claims") if ci else None,
"status": "healthy" if orphan_ratio and orphan_ratio < 0.15 else ("warning" if orphan_ratio and orphan_ratio < 0.30 else "critical") if orphan_ratio is not None else "unavailable",
},
"linkage_density": linkage_density,
"confidence_distribution": confidence_dist,
"evidence_freshness": evidence_freshness,
"domain_activity": {
"active": active_domains,
"stagnant": stagnant_domains,
"status": "healthy" if not stagnant_domains else "warning",
},
"funnel": funnel,
}
# ─── Route handlers ─────────────────────────────────────────────────────────
async def handle_dashboard(request):
"""GET / — main Chart.js operational dashboard."""
try:
conn = _conn(request)
metrics = _current_metrics(conn)
snapshots = _snapshot_history(conn, days=7)
changes = _version_changes(conn, days=30)
vital_signs = _compute_vital_signs(conn)
contributors = _contributor_leaderboard(conn, limit=10)
except sqlite3.Error as e:
return web.Response(
text=_render_error(f"Pipeline database unavailable: {e}"),
content_type="text/html",
status=503,
)
now = datetime.now(timezone.utc)
html = _render_dashboard(metrics, snapshots, changes, vital_signs, contributors, now)
return web.Response(text=html, content_type="text/html")
async def handle_api_metrics(request):
"""GET /api/metrics — JSON operational metrics."""
conn = _conn(request)
return web.json_response(_current_metrics(conn))
async def handle_api_snapshots(request):
"""GET /api/snapshots?days=7 — time-series data for charts."""
conn = _conn(request)
days = int(request.query.get("days", "7"))
snapshots = _snapshot_history(conn, days)
changes = _version_changes(conn, days)
return web.json_response({"snapshots": snapshots, "version_changes": changes, "days": days})
async def handle_api_vital_signs(request):
"""GET /api/vital-signs — Vida's five vital signs."""
conn = _conn(request)
return web.json_response(_compute_vital_signs(conn))
async def handle_api_contributors(request):
"""GET /api/contributors — contributor leaderboard."""
conn = _conn(request)
limit = int(request.query.get("limit", "50"))
return web.json_response({"contributors": _contributor_leaderboard(conn, limit)})
async def handle_api_domains(request):
"""GET /api/domains — per-domain health breakdown."""
conn = _conn(request)
metrics = _current_metrics(conn)
return web.json_response({"domains": metrics["domains"]})
# ─── Dashboard HTML ──────────────────────────────────────────────────────────
def _render_error(message: str) -> str:
"""Render a minimal error page when DB is unavailable."""
return f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>Argus — Error</title>
<style>body {{ font-family: -apple-system, system-ui, sans-serif; background: #0d1117; color: #c9d1d9; display: flex; align-items: center; justify-content: center; min-height: 100vh; }}
.err {{ text-align: center; }} h1 {{ color: #f85149; }} p {{ color: #8b949e; }}</style>
</head><body><div class="err"><h1>Argus</h1><p>{message}</p><p>Check if <code>teleo-pipeline.service</code> is running and pipeline.db exists.</p></div></body></html>"""
def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors, now) -> str:
"""Render the full operational dashboard as HTML with Chart.js."""
# Prepare chart data
timestamps = [s["ts"] for s in snapshots]
throughput_data = [s.get("throughput_1h", 0) for s in snapshots]
approval_data = [(s.get("approval_rate") or 0) * 100 for s in snapshots]
open_prs_data = [s.get("open_prs", 0) for s in snapshots]
merged_data = [s.get("merged_total", 0) for s in snapshots]
# Rejection breakdown
rej_wiki = [s.get("rejection_broken_wiki_links", 0) for s in snapshots]
rej_schema = [s.get("rejection_frontmatter_schema", 0) for s in snapshots]
rej_dup = [s.get("rejection_near_duplicate", 0) for s in snapshots]
rej_conf = [s.get("rejection_confidence", 0) for s in snapshots]
rej_other = [s.get("rejection_other", 0) for s in snapshots]
# Source origins
origin_agent = [s.get("source_origin_agent", 0) for s in snapshots]
origin_human = [s.get("source_origin_human", 0) for s in snapshots]
# Version annotations
annotations_js = json.dumps([
{
"type": "line",
"xMin": c["ts"],
"xMax": c["ts"],
"borderColor": "#d29922" if c["type"] == "prompt" else "#58a6ff",
"borderWidth": 1,
"borderDash": [4, 4],
"label": {
"display": True,
"content": f"{c['type']}: {c.get('to', '?')}",
"position": "start",
"backgroundColor": "#161b22",
"color": "#8b949e",
"font": {"size": 10},
},
}
for c in changes
])
# Status color helper
sm = metrics["status_map"]
ar = metrics["approval_rate"]
ar_color = "green" if ar > 0.5 else ("yellow" if ar > 0.2 else "red")
fr_color = "green" if metrics["fix_rate"] > 0.3 else ("yellow" if metrics["fix_rate"] > 0.1 else "red")
# Vital signs
vs_review = vital_signs["review_throughput"]
vs_status_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_review["status"], "yellow")
# Orphan ratio
vs_orphan = vital_signs.get("orphan_ratio", {})
orphan_ratio_val = vs_orphan.get("ratio")
orphan_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_orphan.get("status", ""), "")
orphan_display = f"{orphan_ratio_val:.1%}" if orphan_ratio_val is not None else ""
# Linkage density
vs_linkage = vital_signs.get("linkage_density") or {}
linkage_display = f'{vs_linkage.get("avg_outgoing_links", "")}'
cross_domain_ratio = vs_linkage.get("cross_domain_ratio")
cross_domain_color = "green" if cross_domain_ratio and cross_domain_ratio >= 0.15 else ("yellow" if cross_domain_ratio and cross_domain_ratio >= 0.05 else "red") if cross_domain_ratio is not None else ""
# Evidence freshness
vs_fresh = vital_signs.get("evidence_freshness") or {}
fresh_display = f'{vs_fresh.get("median_age_days", "")}' if vs_fresh.get("median_age_days") else ""
fresh_pct = vs_fresh.get("fresh_30d_pct", 0)
# Confidence distribution
vs_conf = vital_signs.get("confidence_distribution", {})
# Rejection reasons table — show unique PRs alongside event count
reason_rows = "".join(
f'<tr><td><code>{r["tag"]}</code></td><td>{r["unique_prs"]}</td><td style="color:#8b949e">{r["count"]}</td></tr>'
for r in metrics["rejection_reasons"]
)
# Domain table
domain_rows = ""
for domain, statuses in sorted(metrics["domains"].items()):
m = statuses.get("merged", 0)
c = statuses.get("closed", 0)
o = statuses.get("open", 0)
total = sum(statuses.values())
domain_rows += f"<tr><td>{domain}</td><td>{total}</td><td class='green'>{m}</td><td class='red'>{c}</td><td>{o}</td></tr>"
# Contributor rows
contributor_rows = "".join(
f'<tr><td>{c["handle"]}</td><td>{c["tier"]}</td>'
f'<td>{c["claims_merged"]}</td><td>{c["ci"]}</td>'
f'<td>{", ".join(c["domains"][:3]) if c["domains"] else "-"}</td></tr>'
for c in contributors[:10]
)
# Breaker status
breaker_rows = ""
for name, info in metrics["breakers"].items():
state = info["state"]
color = "green" if state == "closed" else ("red" if state == "open" else "yellow")
age = f'{info.get("age_s", "?")}s ago' if "age_s" in info else "-"
breaker_rows += f'<tr><td>{name}</td><td class="{color}">{state}</td><td>{info["failures"]}</td><td>{age}</td></tr>'
# Funnel numbers
funnel = vital_signs["funnel"]
return f"""<!DOCTYPE html>
<html lang="en"><head>
<meta charset="utf-8">
<title>Argus — Teleo Diagnostics</title>
<meta http-equiv="refresh" content="60">
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.6"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns@3.0.0"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-plugin-annotation@3.1.0"></script>
<style>
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{ font-family: -apple-system, system-ui, 'Segoe UI', sans-serif; background: #0d1117; color: #c9d1d9; padding: 24px; }}
.header {{ display: flex; align-items: baseline; gap: 12px; margin-bottom: 8px; }}
h1 {{ color: #58a6ff; font-size: 24px; }}
.subtitle {{ color: #8b949e; font-size: 13px; }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(160px, 1fr)); gap: 12px; margin: 20px 0; }}
.card {{ background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 16px; }}
.card .label {{ color: #8b949e; font-size: 11px; text-transform: uppercase; letter-spacing: 0.5px; }}
.card .value {{ font-size: 28px; font-weight: 700; margin-top: 2px; }}
.card .detail {{ color: #8b949e; font-size: 11px; margin-top: 2px; }}
.green {{ color: #3fb950; }}
.yellow {{ color: #d29922; }}
.red {{ color: #f85149; }}
.blue {{ color: #58a6ff; }}
.chart-container {{ background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 16px; margin: 16px 0; }}
.chart-container h2 {{ color: #c9d1d9; font-size: 14px; margin-bottom: 12px; }}
canvas {{ max-height: 260px; }}
.row {{ display: grid; grid-template-columns: 1fr 1fr; gap: 16px; }}
@media (max-width: 800px) {{ .row {{ grid-template-columns: 1fr; }} }}
table {{ width: 100%; border-collapse: collapse; font-size: 13px; }}
th {{ color: #8b949e; font-size: 11px; text-transform: uppercase; text-align: left; padding: 6px 10px; border-bottom: 1px solid #30363d; }}
td {{ padding: 6px 10px; border-bottom: 1px solid #21262d; }}
code {{ background: #21262d; padding: 2px 6px; border-radius: 3px; font-size: 12px; }}
.section {{ margin-top: 28px; }}
.section-title {{ color: #58a6ff; font-size: 15px; font-weight: 600; margin-bottom: 12px; padding-bottom: 6px; border-bottom: 1px solid #21262d; }}
.funnel {{ display: flex; align-items: center; gap: 8px; flex-wrap: wrap; }}
.funnel-step {{ text-align: center; flex: 1; min-width: 100px; }}
.funnel-step .num {{ font-size: 24px; font-weight: 700; }}
.funnel-step .lbl {{ font-size: 11px; color: #8b949e; text-transform: uppercase; }}
.funnel-arrow {{ color: #30363d; font-size: 20px; }}
.footer {{ margin-top: 40px; padding-top: 16px; border-top: 1px solid #21262d; color: #484f58; font-size: 11px; }}
.footer a {{ color: #484f58; }}
</style>
</head><body>
<div class="header">
<h1>Argus</h1>
<span class="subtitle">Teleo Pipeline Diagnostics &middot; {now.strftime("%Y-%m-%d %H:%M UTC")} &middot; auto-refresh 60s</span>
</div>
<!-- Hero Cards -->
<div class="grid">
<div class="card">
<div class="label">Throughput</div>
<div class="value">{metrics["throughput_1h"]}<span style="font-size:14px;color:#8b949e">/hr</span></div>
<div class="detail">merged last hour</div>
</div>
<div class="card">
<div class="label">Approval Rate (24h)</div>
<div class="value {ar_color}">{ar:.1%}</div>
<div class="detail">{metrics["approved_24h"]}/{metrics["evaluated_24h"]} evaluated</div>
</div>
<div class="card">
<div class="label">Review Backlog</div>
<div class="value {vs_status_color}">{vs_review["backlog"]}</div>
<div class="detail">{vs_review["open_prs"]} open + {vs_review["reviewing_prs"]} reviewing + {vs_review["approved_waiting"]} approved + {vs_review["conflict_prs"]} conflicts</div>
</div>
<div class="card">
<div class="label">Merged Total</div>
<div class="value green">{sm.get("merged", 0)}</div>
<div class="detail">{sm.get("closed", 0)} closed</div>
</div>
<div class="card">
<div class="label">Fix Success</div>
<div class="value {fr_color}">{metrics["fix_rate"]:.1%}</div>
<div class="detail">{metrics["fix_succeeded"]}/{metrics["fix_attempted"]} fixed</div>
</div>
<div class="card">
<div class="label">Time to Merge</div>
<div class="value">{f"{metrics['median_ttm_minutes']:.0f}" if metrics["median_ttm_minutes"] else ""}<span style="font-size:14px;color:#8b949e">min</span></div>
<div class="detail">median (24h)</div>
</div>
</div>
<!-- Pipeline Funnel -->
<div class="section">
<div class="section-title">Pipeline Funnel</div>
<div class="funnel">
<div class="funnel-step"><div class="num">{funnel["sources_total"]}</div><div class="lbl">Sources</div></div>
<div class="funnel-arrow">→</div>
<div class="funnel-step"><div class="num" style="color: #f0883e">{funnel["sources_queued"]}</div><div class="lbl">In Queue</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num">{funnel["sources_extracted"]}</div><div class="lbl">Extracted</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num">{funnel["prs_total"]}</div><div class="lbl">PRs Created</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num green">{funnel["prs_merged"]}</div><div class="lbl">Merged</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num blue">{funnel["conversion_rate"]:.1%}</div><div class="lbl">Conversion</div></div>
</div>
</div>
<!-- Vital Signs (Vida's Five) -->
{f'''<div class="section">
<div class="section-title">Knowledge Health (Vida&rsquo;s Vital Signs)</div>
<div class="grid">
<div class="card">
<div class="label">Orphan Ratio</div>
<div class="value {orphan_color}">{orphan_display}</div>
<div class="detail">{vs_orphan.get("count", "?")} / {vs_orphan.get("total", "?")} claims &middot; target &lt;15%</div>
</div>
<div class="card">
<div class="label">Avg Links/Claim</div>
<div class="value">{linkage_display}</div>
<div class="detail">cross-domain: <span class="{cross_domain_color}">{f"{cross_domain_ratio:.1%}" if cross_domain_ratio is not None else ""}</span> &middot; target 15-30%</div>
</div>
<div class="card">
<div class="label">Evidence Freshness</div>
<div class="value">{fresh_display}<span style="font-size:14px;color:#8b949e">d median</span></div>
<div class="detail">{vs_fresh.get("fresh_30d_count", "?")} claims &lt;30d old &middot; {fresh_pct:.0f}% fresh</div>
</div>
<div class="card">
<div class="label">Confidence Spread</div>
<div class="value" style="font-size:16px">{" / ".join(f"{vs_conf.get(k, 0)}" for k in ["proven", "likely", "experimental", "speculative"])}</div>
<div class="detail">proven / likely / experimental / speculative</div>
</div>
</div>
</div>''' if vital_signs.get("claim_index_status") == "live" else ""}
<!-- Charts -->
<div id="no-chart-data" class="card" style="text-align:center;padding:40px;margin:16px 0;display:none">
<p style="color:#8b949e">No time-series data yet. Charts will appear once Epimetheus wires <code>record_snapshot()</code> into the pipeline daemon.</p>
</div>
<div id="chart-section">
<div class="row">
<div class="chart-container">
<h2>Throughput &amp; Approval Rate</h2>
<canvas id="throughputChart"></canvas>
</div>
<div class="chart-container">
<h2>Rejection Reasons Over Time</h2>
<canvas id="rejectionChart"></canvas>
</div>
</div>
<div class="row">
<div class="chart-container">
<h2>PR Backlog</h2>
<canvas id="backlogChart"></canvas>
</div>
<div class="chart-container">
<h2>Source Origins (24h snapshots)</h2>
<canvas id="originChart"></canvas>
</div>
</div>
</div>
<!-- Tables -->
<div class="row">
<div class="section">
<div class="section-title">Top Rejection Reasons (24h)</div>
<div class="card">
<table>
<tr><th>Issue</th><th>PRs</th><th style="color:#8b949e">Events</th></tr>
{reason_rows if reason_rows else "<tr><td colspan='2' style='color:#8b949e'>No rejections in 24h</td></tr>"}
</table>
</div>
</div>
<div class="section">
<div class="section-title">Circuit Breakers</div>
<div class="card">
<table>
<tr><th>Stage</th><th>State</th><th>Failures</th><th>Last Success</th></tr>
{breaker_rows if breaker_rows else "<tr><td colspan='4' style='color:#8b949e'>No breaker data</td></tr>"}
</table>
</div>
</div>
</div>
<div class="row">
<div class="section">
<div class="section-title">Domain Breakdown</div>
<div class="card">
<table>
<tr><th>Domain</th><th>Total</th><th>Merged</th><th>Closed</th><th>Open</th></tr>
{domain_rows}
</table>
</div>
</div>
<div class="section">
<div class="section-title">Top Contributors (by CI)</div>
<div class="card">
<table>
<tr><th>Handle</th><th>Tier</th><th>Claims</th><th>CI</th><th>Domains</th></tr>
{contributor_rows if contributor_rows else "<tr><td colspan='5' style='color:#8b949e'>No contributors yet</td></tr>"}
</table>
</div>
</div>
</div>
<!-- Stagnation Alerts -->
{"" if not vital_signs["domain_activity"]["stagnant"] else f'''
<div class="section">
<div class="section-title" style="color:#d29922">Stagnation Alerts</div>
<div class="card">
<p style="color:#d29922">Domains with no PR activity in 7 days: <strong>{", ".join(vital_signs["domain_activity"]["stagnant"])}</strong></p>
</div>
</div>
'''}
<div class="footer">
Argus &middot; Teleo Pipeline Diagnostics &middot;
<a href="/api/metrics">API: Metrics</a> &middot;
<a href="/api/snapshots">Snapshots</a> &middot;
<a href="/api/vital-signs">Vital Signs</a> &middot;
<a href="/api/contributors">Contributors</a> &middot;
<a href="/api/domains">Domains</a>
</div>
<script>
const timestamps = {json.dumps(timestamps)};
if (timestamps.length === 0) {{
document.getElementById('chart-section').style.display = 'none';
document.getElementById('no-chart-data').style.display = 'block';
}} else {{
const throughputData = {json.dumps(throughput_data)};
const approvalData = {json.dumps(approval_data)};
const openPrsData = {json.dumps(open_prs_data)};
const mergedData = {json.dumps(merged_data)};
const rejWiki = {json.dumps(rej_wiki)};
const rejSchema = {json.dumps(rej_schema)};
const rejDup = {json.dumps(rej_dup)};
const rejConf = {json.dumps(rej_conf)};
const rejOther = {json.dumps(rej_other)};
const originAgent = {json.dumps(origin_agent)};
const originHuman = {json.dumps(origin_human)};
const annotations = {annotations_js};
const chartDefaults = {{
color: '#8b949e',
borderColor: '#30363d',
font: {{ family: '-apple-system, system-ui, sans-serif' }},
}};
Chart.defaults.color = '#8b949e';
Chart.defaults.borderColor = '#21262d';
Chart.defaults.font.family = '-apple-system, system-ui, sans-serif';
Chart.defaults.font.size = 11;
// Throughput + Approval Rate (dual axis)
new Chart(document.getElementById('throughputChart'), {{
type: 'line',
data: {{
labels: timestamps,
datasets: [
{{
label: 'Throughput/hr',
data: throughputData,
borderColor: '#58a6ff',
backgroundColor: 'rgba(88,166,255,0.1)',
fill: true,
tension: 0.3,
yAxisID: 'y',
pointRadius: 1,
}},
{{
label: 'Approval %',
data: approvalData,
borderColor: '#3fb950',
borderDash: [4, 2],
tension: 0.3,
yAxisID: 'y1',
pointRadius: 1,
}},
],
}},
options: {{
responsive: true,
interaction: {{ mode: 'index', intersect: false }},
scales: {{
x: {{ type: 'time', time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ position: 'left', title: {{ display: true, text: 'PRs/hr' }}, min: 0 }},
y1: {{ position: 'right', title: {{ display: true, text: 'Approval %' }}, min: 0, max: 100, grid: {{ drawOnChartArea: false }} }},
}},
plugins: {{
annotation: {{ annotations: annotations }},
legend: {{ labels: {{ boxWidth: 12 }} }},
}},
}},
}});
// Rejection reasons (stacked area)
new Chart(document.getElementById('rejectionChart'), {{
type: 'line',
data: {{
labels: timestamps,
datasets: [
{{ label: 'Wiki Links', data: rejWiki, borderColor: '#f85149', backgroundColor: 'rgba(248,81,73,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Schema', data: rejSchema, borderColor: '#d29922', backgroundColor: 'rgba(210,153,34,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Duplicate', data: rejDup, borderColor: '#8b949e', backgroundColor: 'rgba(139,148,158,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Confidence', data: rejConf, borderColor: '#bc8cff', backgroundColor: 'rgba(188,140,255,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Other', data: rejOther, borderColor: '#6e7681', backgroundColor: 'rgba(110,118,129,0.15)', fill: true, tension: 0.3, pointRadius: 0 }},
],
}},
options: {{
responsive: true,
scales: {{
x: {{ type: 'time', time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ stacked: true, min: 0, title: {{ display: true, text: 'Count (24h)' }} }},
}},
plugins: {{
annotation: {{ annotations: annotations }},
legend: {{ labels: {{ boxWidth: 12 }} }},
}},
}},
}});
// PR Backlog
new Chart(document.getElementById('backlogChart'), {{
type: 'line',
data: {{
labels: timestamps,
datasets: [
{{ label: 'Open PRs', data: openPrsData, borderColor: '#d29922', backgroundColor: 'rgba(210,153,34,0.15)', fill: true, tension: 0.3, pointRadius: 1 }},
{{ label: 'Merged (total)', data: mergedData, borderColor: '#3fb950', tension: 0.3, pointRadius: 1 }},
],
}},
options: {{
responsive: true,
scales: {{
x: {{ type: 'time', time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ min: 0, title: {{ display: true, text: 'PRs' }} }},
}},
plugins: {{ legend: {{ labels: {{ boxWidth: 12 }} }} }},
}},
}});
// Source Origins
new Chart(document.getElementById('originChart'), {{
type: 'bar',
data: {{
labels: timestamps,
datasets: [
{{ label: 'Agent', data: originAgent, backgroundColor: '#58a6ff' }},
{{ label: 'Human', data: originHuman, backgroundColor: '#3fb950' }},
],
}},
options: {{
responsive: true,
scales: {{
x: {{ type: 'time', stacked: true, time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ stacked: true, min: 0, title: {{ display: true, text: 'Sources (24h)' }} }},
}},
plugins: {{ legend: {{ labels: {{ boxWidth: 12 }} }} }},
}},
}});
}} // end if (timestamps.length > 0)
</script>
</body></html>"""
# ─── App factory ─────────────────────────────────────────────────────────────
def create_app() -> web.Application:
app = web.Application()
app["db"] = _get_db()
app.router.add_get("/", handle_dashboard)
app.router.add_get("/api/metrics", handle_api_metrics)
app.router.add_get("/api/snapshots", handle_api_snapshots)
app.router.add_get("/api/vital-signs", handle_api_vital_signs)
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.on_cleanup.append(_cleanup)
return app
async def _cleanup(app):
app["db"].close()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger.info("Argus diagnostics starting on port %d, DB: %s", PORT, DB_PATH)
app = create_app()
web.run_app(app, host="0.0.0.0", port=PORT)
if __name__ == "__main__":
main()