"""Argus — Diagnostics dashboard for the Teleo pipeline. Separate aiohttp service (port 8081) that reads pipeline.db read-only. Provides Chart.js operational dashboard, quality vital signs, and contributor analytics. Owner: Argus <0ECBE5A7-EFAD-4A59-B491-635A1AEDF5DE> Data source: Epimetheus's pipeline.db (read-only SQLite) """ import json import logging import os import sqlite3 import statistics import urllib.request from datetime import datetime, timezone from pathlib import Path from aiohttp import web logger = logging.getLogger("argus") # --- Config --- DB_PATH = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")) PORT = int(os.environ.get("ARGUS_PORT", "8081")) REPO_DIR = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")) CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index") def _get_db() -> sqlite3.Connection: """Open read-only connection to pipeline.db.""" # URI mode for true OS-level read-only (Rhea: belt and suspenders) conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=10000") return conn def _conn(request) -> sqlite3.Connection: """Get DB connection with health check. Reopens if stale.""" conn = request.app["db"] try: conn.execute("SELECT 1") except sqlite3.Error: conn = _get_db() request.app["db"] = conn return conn # ─── Data queries ──────────────────────────────────────────────────────────── def _current_metrics(conn) -> dict: """Compute current operational metrics from live DB state.""" # Throughput (merged in last hour) merged_1h = conn.execute( "SELECT COUNT(*) as n FROM prs WHERE merged_at > datetime('now', '-1 hour')" ).fetchone()["n"] # PR status counts statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall() status_map = {r["status"]: r["n"] for r in statuses} # Approval rate (24h) from audit_log evaluated = conn.execute( "SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' " "AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') " "AND timestamp > datetime('now','-24 hours')" ).fetchone()["n"] approved = conn.execute( "SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' " "AND event='approved' AND timestamp > datetime('now','-24 hours')" ).fetchone()["n"] approval_rate = round(approved / evaluated, 3) if evaluated else 0 # Rejection reasons (24h) — count events AND unique PRs reasons = conn.execute( """SELECT value as tag, COUNT(*) as cnt, COUNT(DISTINCT json_extract(detail, '$.pr')) as unique_prs FROM audit_log, json_each(json_extract(detail, '$.issues')) WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now','-24 hours') GROUP BY tag ORDER BY cnt DESC LIMIT 10""" ).fetchall() # Fix cycle fix_stats = conn.execute( "SELECT COUNT(*) as attempted, " "SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as succeeded " "FROM prs WHERE fix_attempts > 0" ).fetchone() fix_attempted = fix_stats["attempted"] or 0 fix_succeeded = fix_stats["succeeded"] or 0 fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted else 0 # Median time to merge (24h) merge_times = conn.execute( "SELECT (julianday(merged_at) - julianday(created_at)) * 24 * 60 as minutes " "FROM prs WHERE merged_at IS NOT NULL AND merged_at > datetime('now', '-24 hours')" ).fetchall() durations = [r["minutes"] for r in merge_times if r["minutes"] and r["minutes"] > 0] median_ttm = round(statistics.median(durations), 1) if durations else None # Source pipeline source_statuses = conn.execute( "SELECT status, COUNT(*) as n FROM sources GROUP BY status" ).fetchall() source_map = {r["status"]: r["n"] for r in source_statuses} # Domain breakdown domain_counts = conn.execute( "SELECT domain, status, COUNT(*) as n FROM prs GROUP BY domain, status" ).fetchall() domains = {} for r in domain_counts: d = r["domain"] or "unknown" if d not in domains: domains[d] = {} domains[d][r["status"]] = r["n"] # Breakers breakers = conn.execute( "SELECT name, state, failures, last_success_at FROM circuit_breakers" ).fetchall() breaker_map = {} for b in breakers: info = {"state": b["state"], "failures": b["failures"]} if b["last_success_at"]: last = datetime.fromisoformat(b["last_success_at"]) if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) age_s = (datetime.now(timezone.utc) - last).total_seconds() info["age_s"] = round(age_s) breaker_map[b["name"]] = info return { "throughput_1h": merged_1h, "approval_rate": approval_rate, "evaluated_24h": evaluated, "approved_24h": approved, "status_map": status_map, "source_map": source_map, "rejection_reasons": [{"tag": r["tag"], "count": r["cnt"], "unique_prs": r["unique_prs"]} for r in reasons], "fix_rate": fix_rate, "fix_attempted": fix_attempted, "fix_succeeded": fix_succeeded, "median_ttm_minutes": median_ttm, "domains": domains, "breakers": breaker_map, } def _snapshot_history(conn, days: int = 7) -> list[dict]: """Get metrics_snapshots time series.""" rows = conn.execute( "SELECT * FROM metrics_snapshots WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC", (f"-{days}",), ).fetchall() return [dict(r) for r in rows] def _version_changes(conn, days: int = 30) -> list[dict]: """Get prompt/pipeline version change events for chart annotations.""" rows = conn.execute( "SELECT ts, prompt_version, pipeline_version FROM metrics_snapshots " "WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC", (f"-{days}",), ).fetchall() changes = [] prev_prompt = prev_pipeline = None for row in rows: if row["prompt_version"] != prev_prompt and prev_prompt is not None: changes.append({"ts": row["ts"], "type": "prompt", "from": prev_prompt, "to": row["prompt_version"]}) if row["pipeline_version"] != prev_pipeline and prev_pipeline is not None: changes.append({"ts": row["ts"], "type": "pipeline", "from": prev_pipeline, "to": row["pipeline_version"]}) prev_prompt = row["prompt_version"] prev_pipeline = row["pipeline_version"] return changes def _contributor_leaderboard(conn, limit: int = 20) -> list[dict]: """Top contributors by CI score.""" rows = conn.execute( "SELECT handle, tier, claims_merged, sourcer_count, extractor_count, " "challenger_count, synthesizer_count, reviewer_count, domains, last_contribution " "FROM contributors ORDER BY claims_merged DESC LIMIT ?", (limit,), ).fetchall() weights = {"sourcer": 0.15, "extractor": 0.40, "challenger": 0.20, "synthesizer": 0.15, "reviewer": 0.10} result = [] for r in rows: ci = sum((r[f"{role}_count"] or 0) * w for role, w in weights.items()) result.append({ "handle": r["handle"], "tier": r["tier"], "claims_merged": r["claims_merged"] or 0, "ci": round(ci, 2), "domains": json.loads(r["domains"]) if r["domains"] else [], "last_contribution": r["last_contribution"], }) return sorted(result, key=lambda x: x["ci"], reverse=True) # ─── Vital signs (Vida's five) ─────────────────────────────────────────────── def _fetch_claim_index() -> dict | None: """Fetch claim-index from Epimetheus. Returns parsed JSON or None on failure.""" try: with urllib.request.urlopen(CLAIM_INDEX_URL, timeout=5) as resp: return json.loads(resp.read()) except Exception as e: logger.warning("Failed to fetch claim-index from %s: %s", CLAIM_INDEX_URL, e) return None def _compute_vital_signs(conn) -> dict: """Compute Vida's five vital signs from DB state + claim-index.""" # 1. Review throughput — backlog and latency open_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='open'").fetchone()["n"] conflict_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='conflict'").fetchone()["n"] conflict_permanent_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='conflict_permanent'").fetchone()["n"] approved_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='approved'").fetchone()["n"] reviewing_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='reviewing'").fetchone()["n"] backlog = open_prs + approved_prs + conflict_prs + reviewing_prs oldest_open = conn.execute( "SELECT MIN(created_at) as oldest FROM prs WHERE status='open'" ).fetchone() review_latency_h = None if oldest_open and oldest_open["oldest"]: oldest = datetime.fromisoformat(oldest_open["oldest"]) if oldest.tzinfo is None: oldest = oldest.replace(tzinfo=timezone.utc) review_latency_h = round((datetime.now(timezone.utc) - oldest).total_seconds() / 3600, 1) # 2-5. Claim-index vital signs ci = _fetch_claim_index() orphan_ratio = None linkage_density = None confidence_dist = {} evidence_freshness = None claim_index_status = "unavailable" if ci and ci.get("claims"): claims = ci["claims"] total = len(claims) claim_index_status = "live" # 2. Orphan ratio (Vida: <15% healthy) orphan_count = ci.get("orphan_count", sum(1 for c in claims if c.get("incoming_count", 0) == 0)) orphan_ratio = round(orphan_count / total, 3) if total else 0 # 3. Linkage density — avg outgoing links per claim + cross-domain ratio total_outgoing = sum(c.get("outgoing_count", 0) for c in claims) avg_links = round(total_outgoing / total, 2) if total else 0 cross_domain = ci.get("cross_domain_links", 0) linkage_density = { "avg_outgoing_links": avg_links, "cross_domain_links": cross_domain, "cross_domain_ratio": round(cross_domain / total_outgoing, 3) if total_outgoing else 0, } # 4. Confidence distribution + calibration for c in claims: conf = c.get("confidence", "unknown") confidence_dist[conf] = confidence_dist.get(conf, 0) + 1 # Normalize to percentages confidence_pct = {k: round(v / total * 100, 1) for k, v in sorted(confidence_dist.items())} # 5. Evidence freshness — avg age of claims in days today = datetime.now(timezone.utc).date() ages = [] for c in claims: try: if c.get("created"): created = datetime.strptime(c["created"], "%Y-%m-%d").date() ages.append((today - created).days) except (ValueError, KeyError, TypeError): pass avg_age_days = round(statistics.mean(ages)) if ages else None median_age_days = round(statistics.median(ages)) if ages else None fresh_30d = sum(1 for a in ages if a <= 30) evidence_freshness = { "avg_age_days": avg_age_days, "median_age_days": median_age_days, "fresh_30d_count": fresh_30d, "fresh_30d_pct": round(fresh_30d / total * 100, 1) if total else 0, } # Domain activity (last 7 days) — stagnation detection domain_activity = conn.execute( "SELECT domain, COUNT(*) as n, MAX(last_attempt) as latest " "FROM prs WHERE last_attempt > datetime('now', '-7 days') GROUP BY domain" ).fetchall() stagnant_domains = [] active_domains = [] for r in domain_activity: active_domains.append({"domain": r["domain"], "prs_7d": r["n"], "latest": r["latest"]}) all_domains = conn.execute("SELECT DISTINCT domain FROM prs WHERE domain IS NOT NULL").fetchall() active_names = {r["domain"] for r in domain_activity} for r in all_domains: if r["domain"] not in active_names: stagnant_domains.append(r["domain"]) # Pipeline funnel total_sources = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"] queued_sources = conn.execute( "SELECT COUNT(*) as n FROM sources WHERE status='unprocessed'" ).fetchone()["n"] extracted_sources = conn.execute( "SELECT COUNT(*) as n FROM sources WHERE status='extracted'" ).fetchone()["n"] merged_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"] total_prs = conn.execute("SELECT COUNT(*) as n FROM prs").fetchone()["n"] funnel = { "sources_total": total_sources, "sources_queued": queued_sources, "sources_extracted": extracted_sources, "prs_total": total_prs, "prs_merged": merged_prs, "conversion_rate": round(merged_prs / total_prs, 3) if total_prs else 0, } return { "claim_index_status": claim_index_status, "review_throughput": { "backlog": backlog, "open_prs": open_prs, "approved_waiting": approved_prs, "conflict_prs": conflict_prs, "conflict_permanent_prs": conflict_permanent_prs, "reviewing_prs": reviewing_prs, "oldest_open_hours": review_latency_h, "status": "healthy" if backlog <= 3 else ("warning" if backlog <= 10 else "critical"), }, "orphan_ratio": { "ratio": orphan_ratio, "count": ci.get("orphan_count") if ci else None, "total": ci.get("total_claims") if ci else None, "status": "healthy" if orphan_ratio and orphan_ratio < 0.15 else ("warning" if orphan_ratio and orphan_ratio < 0.30 else "critical") if orphan_ratio is not None else "unavailable", }, "linkage_density": linkage_density, "confidence_distribution": confidence_dist, "evidence_freshness": evidence_freshness, "domain_activity": { "active": active_domains, "stagnant": stagnant_domains, "status": "healthy" if not stagnant_domains else "warning", }, "funnel": funnel, } # ─── Route handlers ───────────────────────────────────────────────────────── async def handle_dashboard(request): """GET / — main Chart.js operational dashboard.""" try: conn = _conn(request) metrics = _current_metrics(conn) snapshots = _snapshot_history(conn, days=7) changes = _version_changes(conn, days=30) vital_signs = _compute_vital_signs(conn) contributors = _contributor_leaderboard(conn, limit=10) except sqlite3.Error as e: return web.Response( text=_render_error(f"Pipeline database unavailable: {e}"), content_type="text/html", status=503, ) now = datetime.now(timezone.utc) html = _render_dashboard(metrics, snapshots, changes, vital_signs, contributors, now) return web.Response(text=html, content_type="text/html") async def handle_api_metrics(request): """GET /api/metrics — JSON operational metrics.""" conn = _conn(request) return web.json_response(_current_metrics(conn)) async def handle_api_snapshots(request): """GET /api/snapshots?days=7 — time-series data for charts.""" conn = _conn(request) days = int(request.query.get("days", "7")) snapshots = _snapshot_history(conn, days) changes = _version_changes(conn, days) return web.json_response({"snapshots": snapshots, "version_changes": changes, "days": days}) async def handle_api_vital_signs(request): """GET /api/vital-signs — Vida's five vital signs.""" conn = _conn(request) return web.json_response(_compute_vital_signs(conn)) async def handle_api_contributors(request): """GET /api/contributors — contributor leaderboard.""" conn = _conn(request) limit = int(request.query.get("limit", "50")) return web.json_response({"contributors": _contributor_leaderboard(conn, limit)}) async def handle_api_domains(request): """GET /api/domains — per-domain health breakdown.""" conn = _conn(request) metrics = _current_metrics(conn) return web.json_response({"domains": metrics["domains"]}) # ─── Dashboard HTML ────────────────────────────────────────────────────────── def _render_error(message: str) -> str: """Render a minimal error page when DB is unavailable.""" return f""" Argus — Error

Argus

{message}

Check if teleo-pipeline.service is running and pipeline.db exists.

""" def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors, now) -> str: """Render the full operational dashboard as HTML with Chart.js.""" # Prepare chart data timestamps = [s["ts"] for s in snapshots] throughput_data = [s.get("throughput_1h", 0) for s in snapshots] approval_data = [(s.get("approval_rate") or 0) * 100 for s in snapshots] open_prs_data = [s.get("open_prs", 0) for s in snapshots] merged_data = [s.get("merged_total", 0) for s in snapshots] # Rejection breakdown rej_wiki = [s.get("rejection_broken_wiki_links", 0) for s in snapshots] rej_schema = [s.get("rejection_frontmatter_schema", 0) for s in snapshots] rej_dup = [s.get("rejection_near_duplicate", 0) for s in snapshots] rej_conf = [s.get("rejection_confidence", 0) for s in snapshots] rej_other = [s.get("rejection_other", 0) for s in snapshots] # Source origins origin_agent = [s.get("source_origin_agent", 0) for s in snapshots] origin_human = [s.get("source_origin_human", 0) for s in snapshots] # Version annotations annotations_js = json.dumps([ { "type": "line", "xMin": c["ts"], "xMax": c["ts"], "borderColor": "#d29922" if c["type"] == "prompt" else "#58a6ff", "borderWidth": 1, "borderDash": [4, 4], "label": { "display": True, "content": f"{c['type']}: {c.get('to', '?')}", "position": "start", "backgroundColor": "#161b22", "color": "#8b949e", "font": {"size": 10}, }, } for c in changes ]) # Status color helper sm = metrics["status_map"] ar = metrics["approval_rate"] ar_color = "green" if ar > 0.5 else ("yellow" if ar > 0.2 else "red") fr_color = "green" if metrics["fix_rate"] > 0.3 else ("yellow" if metrics["fix_rate"] > 0.1 else "red") # Vital signs vs_review = vital_signs["review_throughput"] vs_status_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_review["status"], "yellow") # Orphan ratio vs_orphan = vital_signs.get("orphan_ratio", {}) orphan_ratio_val = vs_orphan.get("ratio") orphan_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_orphan.get("status", ""), "") orphan_display = f"{orphan_ratio_val:.1%}" if orphan_ratio_val is not None else "—" # Linkage density vs_linkage = vital_signs.get("linkage_density") or {} linkage_display = f'{vs_linkage.get("avg_outgoing_links", "—")}' cross_domain_ratio = vs_linkage.get("cross_domain_ratio") cross_domain_color = "green" if cross_domain_ratio and cross_domain_ratio >= 0.15 else ("yellow" if cross_domain_ratio and cross_domain_ratio >= 0.05 else "red") if cross_domain_ratio is not None else "" # Evidence freshness vs_fresh = vital_signs.get("evidence_freshness") or {} fresh_display = f'{vs_fresh.get("median_age_days", "—")}' if vs_fresh.get("median_age_days") else "—" fresh_pct = vs_fresh.get("fresh_30d_pct", 0) # Confidence distribution vs_conf = vital_signs.get("confidence_distribution", {}) # Rejection reasons table — show unique PRs alongside event count reason_rows = "".join( f'{r["tag"]}{r["unique_prs"]}{r["count"]}' for r in metrics["rejection_reasons"] ) # Domain table domain_rows = "" for domain, statuses in sorted(metrics["domains"].items()): m = statuses.get("merged", 0) c = statuses.get("closed", 0) o = statuses.get("open", 0) total = sum(statuses.values()) domain_rows += f"{domain}{total}{m}{c}{o}" # Contributor rows contributor_rows = "".join( f'{c["handle"]}{c["tier"]}' f'{c["claims_merged"]}{c["ci"]}' f'{", ".join(c["domains"][:3]) if c["domains"] else "-"}' for c in contributors[:10] ) # Breaker status breaker_rows = "" for name, info in metrics["breakers"].items(): state = info["state"] color = "green" if state == "closed" else ("red" if state == "open" else "yellow") age = f'{info.get("age_s", "?")}s ago' if "age_s" in info else "-" breaker_rows += f'{name}{state}{info["failures"]}{age}' # Funnel numbers funnel = vital_signs["funnel"] return f""" Argus — Teleo Diagnostics

Argus

Teleo Pipeline Diagnostics · {now.strftime("%Y-%m-%d %H:%M UTC")} · auto-refresh 60s
Throughput
{metrics["throughput_1h"]}/hr
merged last hour
Approval Rate (24h)
{ar:.1%}
{metrics["approved_24h"]}/{metrics["evaluated_24h"]} evaluated
Review Backlog
{vs_review["backlog"]}
{vs_review["open_prs"]} open + {vs_review["reviewing_prs"]} reviewing + {vs_review["approved_waiting"]} approved + {vs_review["conflict_prs"]} conflicts
Merged Total
{sm.get("merged", 0)}
{sm.get("closed", 0)} closed
Fix Success
{metrics["fix_rate"]:.1%}
{metrics["fix_succeeded"]}/{metrics["fix_attempted"]} fixed
Time to Merge
{f"{metrics['median_ttm_minutes']:.0f}" if metrics["median_ttm_minutes"] else "—"}min
median (24h)
Pipeline Funnel
{funnel["sources_total"]}
Sources
{funnel["sources_queued"]}
In Queue
{funnel["sources_extracted"]}
Extracted
{funnel["prs_total"]}
PRs Created
{funnel["prs_merged"]}
Merged
{funnel["conversion_rate"]:.1%}
Conversion
{f'''
Knowledge Health (Vida’s Vital Signs)
Orphan Ratio
{orphan_display}
{vs_orphan.get("count", "?")} / {vs_orphan.get("total", "?")} claims · target <15%
Avg Links/Claim
{linkage_display}
cross-domain: {f"{cross_domain_ratio:.1%}" if cross_domain_ratio is not None else "—"} · target 15-30%
Evidence Freshness
{fresh_display}d median
{vs_fresh.get("fresh_30d_count", "?")} claims <30d old · {fresh_pct:.0f}% fresh
Confidence Spread
{" / ".join(f"{vs_conf.get(k, 0)}" for k in ["proven", "likely", "experimental", "speculative"])}
proven / likely / experimental / speculative
''' if vital_signs.get("claim_index_status") == "live" else ""}

Throughput & Approval Rate

Rejection Reasons Over Time

PR Backlog

Source Origins (24h snapshots)

Top Rejection Reasons (24h)
{reason_rows if reason_rows else ""}
IssuePRsEvents
No rejections in 24h
Circuit Breakers
{breaker_rows if breaker_rows else ""}
StageStateFailuresLast Success
No breaker data
Domain Breakdown
{domain_rows}
DomainTotalMergedClosedOpen
Top Contributors (by CI)
{contributor_rows if contributor_rows else ""}
HandleTierClaimsCIDomains
No contributors yet
{"" if not vital_signs["domain_activity"]["stagnant"] else f'''
Stagnation Alerts

Domains with no PR activity in 7 days: {", ".join(vital_signs["domain_activity"]["stagnant"])}

'''} """ # ─── App factory ───────────────────────────────────────────────────────────── def create_app() -> web.Application: app = web.Application() app["db"] = _get_db() app.router.add_get("/", handle_dashboard) app.router.add_get("/api/metrics", handle_api_metrics) app.router.add_get("/api/snapshots", handle_api_snapshots) app.router.add_get("/api/vital-signs", handle_api_vital_signs) app.router.add_get("/api/contributors", handle_api_contributors) app.router.add_get("/api/domains", handle_api_domains) app.on_cleanup.append(_cleanup) return app async def _cleanup(app): app["db"].close() def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") logger.info("Argus diagnostics starting on port %d, DB: %s", PORT, DB_PATH) app = create_app() web.run_app(app, host="0.0.0.0", port=PORT) if __name__ == "__main__": main()