"""Argus — Diagnostics dashboard + search API for the Teleo pipeline.
Separate aiohttp service (port 8081) that reads pipeline.db read-only.
Provides Chart.js operational dashboard, quality vital signs, contributor analytics,
semantic search via Qdrant, and claim usage logging.
Owner: Argus <69AF7290-758F-464B-B472-04AFCA4AB340>
Data source: Epimetheus's pipeline.db (read-only SQLite), Qdrant vector DB
"""
import json
import logging
import os
import sqlite3
import statistics
import sys
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Add pipeline lib to path so we can import shared modules
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "pipeline"))
from aiohttp import web
from lib.search import search as kb_search, embed_query, search_qdrant
logger = logging.getLogger("argus")
# --- Config ---
DB_PATH = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
PORT = int(os.environ.get("ARGUS_PORT", "8081"))
REPO_DIR = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index")
# Search config — moved to lib/search.py (shared with Telegram bot + agents)
# Auth config
API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secrets/argus-api-key"))
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains"})
def _get_db() -> sqlite3.Connection:
"""Open read-only connection to pipeline.db."""
# URI mode for true OS-level read-only (Rhea: belt and suspenders)
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
return conn
def _conn(request) -> sqlite3.Connection:
"""Get DB connection with health check. Reopens if stale."""
conn = request.app["db"]
try:
conn.execute("SELECT 1")
except sqlite3.Error:
conn = _get_db()
request.app["db"] = conn
return conn
# ─── Data queries ────────────────────────────────────────────────────────────
def _current_metrics(conn) -> dict:
"""Compute current operational metrics from live DB state."""
# Throughput (merged in last hour)
merged_1h = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE merged_at > datetime('now', '-1 hour')"
).fetchone()["n"]
# PR status counts
statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
status_map = {r["status"]: r["n"] for r in statuses}
# Approval rate (24h) from audit_log
evaluated = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' "
"AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') "
"AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approved = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' "
"AND event='approved' AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approval_rate = round(approved / evaluated, 3) if evaluated else 0
# Rejection reasons (24h) — count events AND unique PRs
reasons = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt,
COUNT(DISTINCT json_extract(detail, '$.pr')) as unique_prs
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now','-24 hours')
GROUP BY tag ORDER BY cnt DESC LIMIT 10"""
).fetchall()
# Fix cycle
fix_stats = conn.execute(
"SELECT COUNT(*) as attempted, "
"SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as succeeded "
"FROM prs WHERE fix_attempts > 0"
).fetchone()
fix_attempted = fix_stats["attempted"] or 0
fix_succeeded = fix_stats["succeeded"] or 0
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted else 0
# Median time to merge (24h)
merge_times = conn.execute(
"SELECT (julianday(merged_at) - julianday(created_at)) * 24 * 60 as minutes "
"FROM prs WHERE merged_at IS NOT NULL AND merged_at > datetime('now', '-24 hours')"
).fetchall()
durations = [r["minutes"] for r in merge_times if r["minutes"] and r["minutes"] > 0]
median_ttm = round(statistics.median(durations), 1) if durations else None
# Source pipeline
source_statuses = conn.execute(
"SELECT status, COUNT(*) as n FROM sources GROUP BY status"
).fetchall()
source_map = {r["status"]: r["n"] for r in source_statuses}
# Domain breakdown
domain_counts = conn.execute(
"SELECT domain, status, COUNT(*) as n FROM prs GROUP BY domain, status"
).fetchall()
domains = {}
for r in domain_counts:
d = r["domain"] or "unknown"
if d not in domains:
domains[d] = {}
domains[d][r["status"]] = r["n"]
# Breakers
breakers = conn.execute(
"SELECT name, state, failures, last_success_at FROM circuit_breakers"
).fetchall()
breaker_map = {}
for b in breakers:
info = {"state": b["state"], "failures": b["failures"]}
if b["last_success_at"]:
last = datetime.fromisoformat(b["last_success_at"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - last).total_seconds()
info["age_s"] = round(age_s)
breaker_map[b["name"]] = info
return {
"throughput_1h": merged_1h,
"approval_rate": approval_rate,
"evaluated_24h": evaluated,
"approved_24h": approved,
"status_map": status_map,
"source_map": source_map,
"rejection_reasons": [{"tag": r["tag"], "count": r["cnt"], "unique_prs": r["unique_prs"]} for r in reasons],
"fix_rate": fix_rate,
"fix_attempted": fix_attempted,
"fix_succeeded": fix_succeeded,
"median_ttm_minutes": median_ttm,
"domains": domains,
"breakers": breaker_map,
}
def _snapshot_history(conn, days: int = 7) -> list[dict]:
"""Get metrics_snapshots time series."""
rows = conn.execute(
"SELECT * FROM metrics_snapshots WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC",
(f"-{days}",),
).fetchall()
return [dict(r) for r in rows]
def _version_changes(conn, days: int = 30) -> list[dict]:
"""Get prompt/pipeline version change events for chart annotations."""
rows = conn.execute(
"SELECT ts, prompt_version, pipeline_version FROM metrics_snapshots "
"WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC",
(f"-{days}",),
).fetchall()
changes = []
prev_prompt = prev_pipeline = None
for row in rows:
if row["prompt_version"] != prev_prompt and prev_prompt is not None:
changes.append({"ts": row["ts"], "type": "prompt", "from": prev_prompt, "to": row["prompt_version"]})
if row["pipeline_version"] != prev_pipeline and prev_pipeline is not None:
changes.append({"ts": row["ts"], "type": "pipeline", "from": prev_pipeline, "to": row["pipeline_version"]})
prev_prompt = row["prompt_version"]
prev_pipeline = row["pipeline_version"]
return changes
def _has_column(conn, table: str, column: str) -> bool:
"""Check if a column exists in a table (graceful schema migration support)."""
cols = conn.execute(f"PRAGMA table_info({table})").fetchall()
return any(c["name"] == column for c in cols)
def _contributor_leaderboard(conn, limit: int = 20, view: str = "principal") -> list[dict]:
"""Top contributors by CI score.
view="agent" — one row per contributor handle (original behavior)
view="principal" — rolls up agent contributions to their principal (human)
"""
has_principal = _has_column(conn, "contributors", "principal")
rows = conn.execute(
"SELECT handle, tier, claims_merged, sourcer_count, extractor_count, "
"challenger_count, synthesizer_count, reviewer_count, domains, last_contribution"
+ (", principal" if has_principal else "") +
" FROM contributors ORDER BY claims_merged DESC",
).fetchall()
# Weights reward quality over volume (Cory-approved)
weights = {"sourcer": 0.15, "extractor": 0.05, "challenger": 0.35, "synthesizer": 0.25, "reviewer": 0.20}
role_keys = list(weights.keys())
if view == "principal" and has_principal:
# Aggregate by principal — agents with a principal roll up to the human
buckets: dict[str, dict] = {}
for r in rows:
principal = r["principal"]
key = principal if principal else r["handle"]
if key not in buckets:
buckets[key] = {
"handle": key,
"tier": r["tier"],
"claims_merged": 0,
"domains": set(),
"last_contribution": None,
"agents": [],
**{f"{role}_count": 0 for role in role_keys},
}
b = buckets[key]
b["claims_merged"] += r["claims_merged"] or 0
for role in role_keys:
b[f"{role}_count"] += r[f"{role}_count"] or 0
if r["domains"]:
b["domains"].update(json.loads(r["domains"]))
if r["last_contribution"]:
if not b["last_contribution"] or r["last_contribution"] > b["last_contribution"]:
b["last_contribution"] = r["last_contribution"]
# Upgrade tier (veteran > contributor > new)
tier_rank = {"veteran": 2, "contributor": 1, "new": 0}
if tier_rank.get(r["tier"], 0) > tier_rank.get(b["tier"], 0):
b["tier"] = r["tier"]
if principal:
b["agents"].append(r["handle"])
result = []
for b in buckets.values():
ci = sum(b[f"{role}_count"] * w for role, w in weights.items())
result.append({
"handle": b["handle"],
"tier": b["tier"],
"claims_merged": b["claims_merged"],
"ci": round(ci, 2),
"domains": sorted(b["domains"])[:5],
"last_contribution": b["last_contribution"],
"agents": b["agents"],
})
else:
# By-agent view (original behavior)
result = []
for r in rows:
ci = sum((r[f"{role}_count"] or 0) * w for role, w in weights.items())
entry = {
"handle": r["handle"],
"tier": r["tier"],
"claims_merged": r["claims_merged"] or 0,
"ci": round(ci, 2),
"domains": json.loads(r["domains"]) if r["domains"] else [],
"last_contribution": r["last_contribution"],
}
if has_principal:
entry["principal"] = r["principal"]
result.append(entry)
result = sorted(result, key=lambda x: x["ci"], reverse=True)
return result[:limit]
# ─── Vital signs (Vida's five) ───────────────────────────────────────────────
def _fetch_claim_index() -> dict | None:
"""Fetch claim-index from Epimetheus. Returns parsed JSON or None on failure."""
try:
with urllib.request.urlopen(CLAIM_INDEX_URL, timeout=5) as resp:
return json.loads(resp.read())
except Exception as e:
logger.warning("Failed to fetch claim-index from %s: %s", CLAIM_INDEX_URL, e)
return None
def _compute_vital_signs(conn) -> dict:
"""Compute Vida's five vital signs from DB state + claim-index."""
# 1. Review throughput — backlog and latency
# Query Forgejo directly for authoritative PR counts (DB misses agent-created PRs)
forgejo_open = 0
forgejo_unmergeable = 0
try:
import requests as _req
_token = Path("/opt/teleo-eval/secrets/forgejo-token").read_text().strip() if Path("/opt/teleo-eval/secrets/forgejo-token").exists() else ""
_resp = _req.get(
"http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50",
headers={"Authorization": f"token {_token}"} if _token else {},
timeout=10,
)
if _resp.status_code == 200:
_prs = _resp.json()
forgejo_open = len(_prs)
forgejo_unmergeable = sum(1 for p in _prs if not p.get("mergeable", True))
except Exception:
# Fallback to DB counts if Forgejo unreachable
forgejo_open = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='open'").fetchone()["n"]
open_prs = forgejo_open
conflict_prs = forgejo_unmergeable
conflict_permanent_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='conflict_permanent'").fetchone()["n"]
approved_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='approved'").fetchone()["n"]
reviewing_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='reviewing'").fetchone()["n"]
backlog = open_prs
oldest_open = conn.execute(
"SELECT MIN(created_at) as oldest FROM prs WHERE status='open'"
).fetchone()
review_latency_h = None
if oldest_open and oldest_open["oldest"]:
oldest = datetime.fromisoformat(oldest_open["oldest"])
if oldest.tzinfo is None:
oldest = oldest.replace(tzinfo=timezone.utc)
review_latency_h = round((datetime.now(timezone.utc) - oldest).total_seconds() / 3600, 1)
# 2-5. Claim-index vital signs
ci = _fetch_claim_index()
orphan_ratio = None
linkage_density = None
confidence_dist = {}
evidence_freshness = None
claim_index_status = "unavailable"
if ci and ci.get("claims"):
claims = ci["claims"]
total = len(claims)
claim_index_status = "live"
# 2. Orphan ratio (Vida: <15% healthy)
orphan_count = ci.get("orphan_count", sum(1 for c in claims if c.get("incoming_count", 0) == 0))
orphan_ratio = round(orphan_count / total, 3) if total else 0
# 3. Linkage density — avg outgoing links per claim + cross-domain ratio
total_outgoing = sum(c.get("outgoing_count", 0) for c in claims)
avg_links = round(total_outgoing / total, 2) if total else 0
cross_domain = ci.get("cross_domain_links", 0)
linkage_density = {
"avg_outgoing_links": avg_links,
"cross_domain_links": cross_domain,
"cross_domain_ratio": round(cross_domain / total_outgoing, 3) if total_outgoing else 0,
}
# 4. Confidence distribution + calibration
for c in claims:
conf = c.get("confidence", "unknown")
confidence_dist[conf] = confidence_dist.get(conf, 0) + 1
# Normalize to percentages
confidence_pct = {k: round(v / total * 100, 1) for k, v in sorted(confidence_dist.items())}
# 5. Evidence freshness — avg age of claims in days
today = datetime.now(timezone.utc).date()
ages = []
for c in claims:
try:
if c.get("created"):
created = datetime.strptime(c["created"], "%Y-%m-%d").date()
ages.append((today - created).days)
except (ValueError, KeyError, TypeError):
pass
avg_age_days = round(statistics.mean(ages)) if ages else None
median_age_days = round(statistics.median(ages)) if ages else None
fresh_30d = sum(1 for a in ages if a <= 30)
evidence_freshness = {
"avg_age_days": avg_age_days,
"median_age_days": median_age_days,
"fresh_30d_count": fresh_30d,
"fresh_30d_pct": round(fresh_30d / total * 100, 1) if total else 0,
}
# Domain activity (last 7 days) — stagnation detection
domain_activity = conn.execute(
"SELECT domain, COUNT(*) as n, MAX(last_attempt) as latest "
"FROM prs WHERE last_attempt > datetime('now', '-7 days') GROUP BY domain"
).fetchall()
stagnant_domains = []
active_domains = []
for r in domain_activity:
active_domains.append({"domain": r["domain"], "prs_7d": r["n"], "latest": r["latest"]})
all_domains = conn.execute("SELECT DISTINCT domain FROM prs WHERE domain IS NOT NULL").fetchall()
active_names = {r["domain"] for r in domain_activity}
for r in all_domains:
if r["domain"] not in active_names:
stagnant_domains.append(r["domain"])
# Pipeline funnel
total_sources = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"]
queued_sources = conn.execute(
"SELECT COUNT(*) as n FROM sources WHERE status='unprocessed'"
).fetchone()["n"]
extracted_sources = conn.execute(
"SELECT COUNT(*) as n FROM sources WHERE status='extracted'"
).fetchone()["n"]
merged_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"]
total_prs = conn.execute("SELECT COUNT(*) as n FROM prs").fetchone()["n"]
funnel = {
"sources_total": total_sources,
"sources_queued": queued_sources,
"sources_extracted": extracted_sources,
"prs_total": total_prs,
"prs_merged": merged_prs,
"conversion_rate": round(merged_prs / total_prs, 3) if total_prs else 0,
}
return {
"claim_index_status": claim_index_status,
"review_throughput": {
"backlog": backlog,
"open_prs": open_prs,
"approved_waiting": approved_prs,
"conflict_prs": conflict_prs,
"conflict_permanent_prs": conflict_permanent_prs,
"reviewing_prs": reviewing_prs,
"oldest_open_hours": review_latency_h,
"status": "healthy" if backlog <= 3 else ("warning" if backlog <= 10 else "critical"),
},
"orphan_ratio": {
"ratio": orphan_ratio,
"count": ci.get("orphan_count") if ci else None,
"total": ci.get("total_claims") if ci else None,
"status": "healthy" if orphan_ratio and orphan_ratio < 0.15 else ("warning" if orphan_ratio and orphan_ratio < 0.30 else "critical") if orphan_ratio is not None else "unavailable",
},
"linkage_density": linkage_density,
"confidence_distribution": confidence_dist,
"evidence_freshness": evidence_freshness,
"domain_activity": {
"active": active_domains,
"stagnant": stagnant_domains,
"status": "healthy" if not stagnant_domains else "warning",
},
"funnel": funnel,
}
# ─── Auth ────────────────────────────────────────────────────────────────────
def _load_secret(path: Path) -> str | None:
"""Load a secret from a file. Returns None if missing."""
try:
return path.read_text().strip()
except Exception:
return None
@web.middleware
async def auth_middleware(request, handler):
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
if request.path in _PUBLIC_PATHS:
return await handler(request)
expected = request.app.get("api_key")
if not expected:
# No key configured — all endpoints open (development mode)
return await handler(request)
provided = request.headers.get("X-Api-Key", "")
if provided != expected:
return web.json_response({"error": "unauthorized"}, status=401)
return await handler(request)
# ─── Embedding + Search ──────────────────────────────────────────────────────
# Moved to lib/search.py — imported at top of file as kb_search, embed_query, search_qdrant
# ─── Usage logging ───────────────────────────────────────────────────────────
def _get_write_db() -> sqlite3.Connection | None:
"""Open read-write connection for usage logging only.
Separate from the main read-only connection. Returns None if DB unavailable.
"""
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
# Ensure claim_usage table exists (Epimetheus creates it, but be safe)
conn.execute("""
CREATE TABLE IF NOT EXISTS claim_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
claim_path TEXT NOT NULL,
agent TEXT,
context TEXT,
ts TEXT DEFAULT (datetime('now'))
)
""")
conn.commit()
return conn
except Exception as e:
logger.warning("Failed to open write DB for usage logging: %s", e)
return None
# ─── Route handlers ─────────────────────────────────────────────────────────
async def handle_dashboard(request):
"""GET / — main Chart.js operational dashboard."""
try:
conn = _conn(request)
metrics = _current_metrics(conn)
snapshots = _snapshot_history(conn, days=7)
changes = _version_changes(conn, days=30)
vital_signs = _compute_vital_signs(conn)
contributors_principal = _contributor_leaderboard(conn, limit=10, view="principal")
contributors_agent = _contributor_leaderboard(conn, limit=10, view="agent")
domain_breakdown = _domain_breakdown(conn)
except sqlite3.Error as e:
return web.Response(
text=_render_error(f"Pipeline database unavailable: {e}"),
content_type="text/html",
status=503,
)
now = datetime.now(timezone.utc)
html = _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, domain_breakdown, now)
return web.Response(text=html, content_type="text/html")
async def handle_api_metrics(request):
"""GET /api/metrics — JSON operational metrics."""
conn = _conn(request)
return web.json_response(_current_metrics(conn))
async def handle_api_snapshots(request):
"""GET /api/snapshots?days=7 — time-series data for charts."""
conn = _conn(request)
days = int(request.query.get("days", "7"))
snapshots = _snapshot_history(conn, days)
changes = _version_changes(conn, days)
return web.json_response({"snapshots": snapshots, "version_changes": changes, "days": days})
async def handle_api_vital_signs(request):
"""GET /api/vital-signs — Vida's five vital signs."""
conn = _conn(request)
return web.json_response(_compute_vital_signs(conn))
async def handle_api_contributors(request):
"""GET /api/contributors — contributor leaderboard.
Query params:
limit: max entries (default 50)
view: "principal" (default, rolls up agents) or "agent" (one row per handle)
"""
conn = _conn(request)
limit = int(request.query.get("limit", "50"))
view = request.query.get("view", "principal")
if view not in ("principal", "agent"):
view = "principal"
contributors = _contributor_leaderboard(conn, limit, view=view)
return web.json_response({"contributors": contributors, "view": view})
def _domain_breakdown(conn) -> dict:
"""Per-domain contribution breakdown: claims, contributors, sources, decisions."""
# Claims per domain from merged knowledge PRs
domain_stats = {}
for r in conn.execute("""
SELECT domain, count(*) as prs,
SUM(CASE WHEN commit_type='knowledge' THEN 1 ELSE 0 END) as knowledge_prs
FROM prs WHERE status='merged' AND domain IS NOT NULL
GROUP BY domain ORDER BY prs DESC
""").fetchall():
domain_stats[r["domain"]] = {
"total_prs": r["prs"],
"knowledge_prs": r["knowledge_prs"] or 0,
"contributors": [],
}
# Top contributors per domain (from PR agent field + principal roll-up)
has_principal = _has_column(conn, "contributors", "principal")
for r in conn.execute("""
SELECT p.domain,
COALESCE(c.principal, p.agent, 'unknown') as contributor,
count(*) as cnt
FROM prs p
LEFT JOIN contributors c ON LOWER(p.agent) = c.handle
WHERE p.status='merged' AND p.commit_type='knowledge' AND p.domain IS NOT NULL
GROUP BY p.domain, contributor
ORDER BY p.domain, cnt DESC
""").fetchall():
domain = r["domain"]
if domain in domain_stats:
domain_stats[domain]["contributors"].append({
"handle": r["contributor"],
"claims": r["cnt"],
})
return domain_stats
async def handle_api_domains(request):
"""GET /api/domains — per-domain contribution breakdown.
Returns claims, contributors, and knowledge PR counts per domain.
"""
conn = _conn(request)
breakdown = _domain_breakdown(conn)
return web.json_response({"domains": breakdown})
async def handle_api_search(request):
"""GET /api/search — semantic search over claims via Qdrant + graph expansion.
Query params:
q: search query (required)
domain: filter by domain (optional)
confidence: filter by confidence level (optional)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
"""
query = request.query.get("q", "").strip()
if not query:
return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain")
confidence = request.query.get("confidence")
limit = min(int(request.query.get("limit", "10")), 50)
exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false"
# Use shared search library (Layer 1 + Layer 2)
result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude)
if "error" in result:
error = result["error"]
if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500)
return web.json_response(result)
async def handle_api_usage(request):
"""POST /api/usage — log claim usage for analytics.
Body: {"claim_path": "...", "agent": "rio", "context": "telegram-response"}
Fire-and-forget — returns 200 immediately.
"""
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON"}, status=400)
claim_path = body.get("claim_path", "").strip()
if not claim_path:
return web.json_response({"error": "claim_path required"}, status=400)
agent = body.get("agent", "unknown")
context = body.get("context", "")
# Fire-and-forget write — don't block the response
try:
write_conn = _get_write_db()
if write_conn:
write_conn.execute(
"INSERT INTO claim_usage (claim_path, agent, context) VALUES (?, ?, ?)",
(claim_path, agent, context),
)
write_conn.commit()
write_conn.close()
except Exception as e:
logger.warning("Usage log failed (non-fatal): %s", e)
return web.json_response({"status": "ok"})
# ─── Dashboard HTML ──────────────────────────────────────────────────────────
def _render_error(message: str) -> str:
"""Render a minimal error page when DB is unavailable."""
return f"""
Argus — Error
Argus
{message}
Check if teleo-pipeline.service is running and pipeline.db exists.
"""
def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, domain_breakdown, now) -> str:
"""Render the full operational dashboard as HTML with Chart.js."""
# Prepare chart data
timestamps = [s["ts"] for s in snapshots]
throughput_data = [s.get("throughput_1h", 0) for s in snapshots]
approval_data = [(s.get("approval_rate") or 0) * 100 for s in snapshots]
open_prs_data = [s.get("open_prs", 0) for s in snapshots]
merged_data = [s.get("merged_total", 0) for s in snapshots]
# Rejection breakdown
rej_wiki = [s.get("rejection_broken_wiki_links", 0) for s in snapshots]
rej_schema = [s.get("rejection_frontmatter_schema", 0) for s in snapshots]
rej_dup = [s.get("rejection_near_duplicate", 0) for s in snapshots]
rej_conf = [s.get("rejection_confidence", 0) for s in snapshots]
rej_other = [s.get("rejection_other", 0) for s in snapshots]
# Source origins
origin_agent = [s.get("source_origin_agent", 0) for s in snapshots]
origin_human = [s.get("source_origin_human", 0) for s in snapshots]
# Version annotations
annotations_js = json.dumps([
{
"type": "line",
"xMin": c["ts"],
"xMax": c["ts"],
"borderColor": "#d29922" if c["type"] == "prompt" else "#58a6ff",
"borderWidth": 1,
"borderDash": [4, 4],
"label": {
"display": True,
"content": f"{c['type']}: {c.get('to', '?')}",
"position": "start",
"backgroundColor": "#161b22",
"color": "#8b949e",
"font": {"size": 10},
},
}
for c in changes
])
# Status color helper
sm = metrics["status_map"]
ar = metrics["approval_rate"]
ar_color = "green" if ar > 0.5 else ("yellow" if ar > 0.2 else "red")
fr_color = "green" if metrics["fix_rate"] > 0.3 else ("yellow" if metrics["fix_rate"] > 0.1 else "red")
# Vital signs
vs_review = vital_signs["review_throughput"]
vs_status_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_review["status"], "yellow")
# Orphan ratio
vs_orphan = vital_signs.get("orphan_ratio", {})
orphan_ratio_val = vs_orphan.get("ratio")
orphan_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_orphan.get("status", ""), "")
orphan_display = f"{orphan_ratio_val:.1%}" if orphan_ratio_val is not None else "—"
# Linkage density
vs_linkage = vital_signs.get("linkage_density") or {}
linkage_display = f'{vs_linkage.get("avg_outgoing_links", "—")}'
cross_domain_ratio = vs_linkage.get("cross_domain_ratio")
cross_domain_color = "green" if cross_domain_ratio and cross_domain_ratio >= 0.15 else ("yellow" if cross_domain_ratio and cross_domain_ratio >= 0.05 else "red") if cross_domain_ratio is not None else ""
# Evidence freshness
vs_fresh = vital_signs.get("evidence_freshness") or {}
fresh_display = f'{vs_fresh.get("median_age_days", "—")}' if vs_fresh.get("median_age_days") else "—"
fresh_pct = vs_fresh.get("fresh_30d_pct", 0)
# Confidence distribution
vs_conf = vital_signs.get("confidence_distribution", {})
# Rejection reasons table — show unique PRs alongside event count
reason_rows = "".join(
f'
{r["tag"]}
{r["unique_prs"]}
{r["count"]}
'
for r in metrics["rejection_reasons"]
)
# Domain table
domain_rows = ""
for domain, statuses in sorted(metrics["domains"].items()):
m = statuses.get("merged", 0)
c = statuses.get("closed", 0)
o = statuses.get("open", 0)
total = sum(statuses.values())
domain_rows += f"
{domain}
{total}
{m}
{c}
{o}
"
# Contributor rows — principal view (default)
principal_rows = "".join(
f'
{c["handle"]}'
+ (f' ({", ".join(c["agents"])})' if c.get("agents") else "")
+ f'
{c["tier"]}
'
f'
{c["claims_merged"]}
{c["ci"]}
'
f'
{", ".join(c["domains"][:3]) if c["domains"] else "-"}
'
for c in contributors_principal[:10]
)
# Contributor rows — agent view
agent_rows = "".join(
f'
{c["handle"]}'
+ (f' → {c["principal"]}' if c.get("principal") else "")
+ f'
{c["tier"]}
'
f'
{c["claims_merged"]}
{c["ci"]}
'
f'
{", ".join(c["domains"][:3]) if c["domains"] else "-"}
'
for c in contributors_agent[:10]
)
# Breaker status
breaker_rows = ""
for name, info in metrics["breakers"].items():
state = info["state"]
color = "green" if state == "closed" else ("red" if state == "open" else "yellow")
age = f'{info.get("age_s", "?")}s ago' if "age_s" in info else "-"
breaker_rows += f'