teleo-infrastructure/diagnostics/app.py
m3taversal 6fb3f2258f feat: queue staleness diagnostic + vector GC reconciliation
Add queue_staleness to vital signs (sources >7d unprocessed, bucketed by age).
Add ops/vector-gc.py to reconcile Qdrant vectors against filesystem claims —
reports orphan vectors and missing embeddings, with --purge flag for cleanup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 01:14:01 +01:00

1567 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Argus — Diagnostics dashboard + search API for the Teleo pipeline.
Separate aiohttp service (port 8081) that reads pipeline.db read-only.
Provides Chart.js operational dashboard, quality vital signs, contributor analytics,
semantic search via Qdrant, and claim usage logging.
Owner: Argus <69AF7290-758F-464B-B472-04AFCA4AB340>
Data source: Epimetheus's pipeline.db (read-only SQLite), Qdrant vector DB
"""
import json
import logging
import os
import sqlite3
import statistics
import sys
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Add pipeline lib to path so we can import shared modules
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "pipeline"))
from aiohttp import web
from lib.search import search as kb_search, embed_query, search_qdrant
logger = logging.getLogger("argus")
# --- Config ---
DB_PATH = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
PORT = int(os.environ.get("ARGUS_PORT", "8081"))
REPO_DIR = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim-index")
# Search config — moved to lib/search.py (shared with Telegram bot + agents)
# Auth config
API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secrets/argus-api-key"))
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit"})
def _get_db() -> sqlite3.Connection:
"""Open read-only connection to pipeline.db."""
# URI mode for true OS-level read-only (Rhea: belt and suspenders)
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
return conn
def _conn(request) -> sqlite3.Connection:
"""Get DB connection with health check. Reopens if stale."""
conn = request.app["db"]
try:
conn.execute("SELECT 1")
except sqlite3.Error:
conn = _get_db()
request.app["db"] = conn
return conn
# ─── Data queries ────────────────────────────────────────────────────────────
def _current_metrics(conn) -> dict:
"""Compute current operational metrics from live DB state."""
# Throughput (merged in last hour)
merged_1h = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE merged_at > datetime('now', '-1 hour')"
).fetchone()["n"]
# PR status counts
statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
status_map = {r["status"]: r["n"] for r in statuses}
# Approval rate (24h) from audit_log
evaluated = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' "
"AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') "
"AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approved = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' "
"AND event='approved' AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approval_rate = round(approved / evaluated, 3) if evaluated else 0
# Rejection reasons (24h) — count events AND unique PRs
reasons = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt,
COUNT(DISTINCT json_extract(detail, '$.pr')) as unique_prs
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now','-24 hours')
GROUP BY tag ORDER BY cnt DESC LIMIT 10"""
).fetchall()
# Fix cycle
fix_stats = conn.execute(
"SELECT COUNT(*) as attempted, "
"SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as succeeded "
"FROM prs WHERE fix_attempts > 0"
).fetchone()
fix_attempted = fix_stats["attempted"] or 0
fix_succeeded = fix_stats["succeeded"] or 0
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted else 0
# Median time to merge (24h)
merge_times = conn.execute(
"SELECT (julianday(merged_at) - julianday(created_at)) * 24 * 60 as minutes "
"FROM prs WHERE merged_at IS NOT NULL AND merged_at > datetime('now', '-24 hours')"
).fetchall()
durations = [r["minutes"] for r in merge_times if r["minutes"] and r["minutes"] > 0]
median_ttm = round(statistics.median(durations), 1) if durations else None
# Source pipeline
source_statuses = conn.execute(
"SELECT status, COUNT(*) as n FROM sources GROUP BY status"
).fetchall()
source_map = {r["status"]: r["n"] for r in source_statuses}
# Domain breakdown
domain_counts = conn.execute(
"SELECT domain, status, COUNT(*) as n FROM prs GROUP BY domain, status"
).fetchall()
domains = {}
for r in domain_counts:
d = r["domain"] or "unknown"
if d not in domains:
domains[d] = {}
domains[d][r["status"]] = r["n"]
# Breakers
breakers = conn.execute(
"SELECT name, state, failures, last_success_at FROM circuit_breakers"
).fetchall()
breaker_map = {}
for b in breakers:
info = {"state": b["state"], "failures": b["failures"]}
if b["last_success_at"]:
last = datetime.fromisoformat(b["last_success_at"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - last).total_seconds()
info["age_s"] = round(age_s)
breaker_map[b["name"]] = info
return {
"throughput_1h": merged_1h,
"approval_rate": approval_rate,
"evaluated_24h": evaluated,
"approved_24h": approved,
"status_map": status_map,
"source_map": source_map,
"rejection_reasons": [{"tag": r["tag"], "count": r["cnt"], "unique_prs": r["unique_prs"]} for r in reasons],
"fix_rate": fix_rate,
"fix_attempted": fix_attempted,
"fix_succeeded": fix_succeeded,
"median_ttm_minutes": median_ttm,
"domains": domains,
"breakers": breaker_map,
}
def _snapshot_history(conn, days: int = 7) -> list[dict]:
"""Get metrics_snapshots time series."""
rows = conn.execute(
"SELECT * FROM metrics_snapshots WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC",
(f"-{days}",),
).fetchall()
return [dict(r) for r in rows]
def _version_changes(conn, days: int = 30) -> list[dict]:
"""Get prompt/pipeline version change events for chart annotations."""
rows = conn.execute(
"SELECT ts, prompt_version, pipeline_version FROM metrics_snapshots "
"WHERE ts > datetime('now', ? || ' days') ORDER BY ts ASC",
(f"-{days}",),
).fetchall()
changes = []
prev_prompt = prev_pipeline = None
for row in rows:
if row["prompt_version"] != prev_prompt and prev_prompt is not None:
changes.append({"ts": row["ts"], "type": "prompt", "from": prev_prompt, "to": row["prompt_version"]})
if row["pipeline_version"] != prev_pipeline and prev_pipeline is not None:
changes.append({"ts": row["ts"], "type": "pipeline", "from": prev_pipeline, "to": row["pipeline_version"]})
prev_prompt = row["prompt_version"]
prev_pipeline = row["pipeline_version"]
return changes
def _has_column(conn, table: str, column: str) -> bool:
"""Check if a column exists in a table (graceful schema migration support)."""
cols = conn.execute(f"PRAGMA table_info({table})").fetchall()
return any(c["name"] == column for c in cols)
def _contributor_leaderboard(conn, limit: int = 20, view: str = "principal") -> list[dict]:
"""Top contributors by CI score.
view="agent" — one row per contributor handle (original behavior)
view="principal" — rolls up agent contributions to their principal (human)
"""
has_principal = _has_column(conn, "contributors", "principal")
rows = conn.execute(
"SELECT handle, tier, claims_merged, sourcer_count, extractor_count, "
"challenger_count, synthesizer_count, reviewer_count, domains, last_contribution"
+ (", principal" if has_principal else "") +
" FROM contributors ORDER BY claims_merged DESC",
).fetchall()
# Weights reward quality over volume (Cory-approved)
weights = {"sourcer": 0.15, "extractor": 0.05, "challenger": 0.35, "synthesizer": 0.25, "reviewer": 0.20}
role_keys = list(weights.keys())
if view == "principal" and has_principal:
# Aggregate by principal — agents with a principal roll up to the human
buckets: dict[str, dict] = {}
for r in rows:
principal = r["principal"]
key = principal if principal else r["handle"]
if key not in buckets:
buckets[key] = {
"handle": key,
"tier": r["tier"],
"claims_merged": 0,
"domains": set(),
"last_contribution": None,
"agents": [],
**{f"{role}_count": 0 for role in role_keys},
}
b = buckets[key]
b["claims_merged"] += r["claims_merged"] or 0
for role in role_keys:
b[f"{role}_count"] += r[f"{role}_count"] or 0
if r["domains"]:
b["domains"].update(json.loads(r["domains"]))
if r["last_contribution"]:
if not b["last_contribution"] or r["last_contribution"] > b["last_contribution"]:
b["last_contribution"] = r["last_contribution"]
# Upgrade tier (veteran > contributor > new)
tier_rank = {"veteran": 2, "contributor": 1, "new": 0}
if tier_rank.get(r["tier"], 0) > tier_rank.get(b["tier"], 0):
b["tier"] = r["tier"]
if principal:
b["agents"].append(r["handle"])
result = []
for b in buckets.values():
ci = sum(b[f"{role}_count"] * w for role, w in weights.items())
result.append({
"handle": b["handle"],
"tier": b["tier"],
"claims_merged": b["claims_merged"],
"ci": round(ci, 2),
"domains": sorted(b["domains"])[:5],
"last_contribution": b["last_contribution"],
"agents": b["agents"],
})
else:
# By-agent view (original behavior)
result = []
for r in rows:
ci = sum((r[f"{role}_count"] or 0) * w for role, w in weights.items())
entry = {
"handle": r["handle"],
"tier": r["tier"],
"claims_merged": r["claims_merged"] or 0,
"ci": round(ci, 2),
"domains": json.loads(r["domains"]) if r["domains"] else [],
"last_contribution": r["last_contribution"],
}
if has_principal:
entry["principal"] = r["principal"]
result.append(entry)
result = sorted(result, key=lambda x: x["ci"], reverse=True)
return result[:limit]
# ─── Vital signs (Vida's five) ───────────────────────────────────────────────
def _fetch_claim_index() -> dict | None:
"""Fetch claim-index from Epimetheus. Returns parsed JSON or None on failure."""
try:
with urllib.request.urlopen(CLAIM_INDEX_URL, timeout=5) as resp:
return json.loads(resp.read())
except Exception as e:
logger.warning("Failed to fetch claim-index from %s: %s", CLAIM_INDEX_URL, e)
return None
def _compute_vital_signs(conn) -> dict:
"""Compute Vida's five vital signs from DB state + claim-index."""
# 1. Review throughput — backlog and latency
# Query Forgejo directly for authoritative PR counts (DB misses agent-created PRs)
forgejo_open = 0
forgejo_unmergeable = 0
try:
import requests as _req
_token = Path("/opt/teleo-eval/secrets/forgejo-token").read_text().strip() if Path("/opt/teleo-eval/secrets/forgejo-token").exists() else ""
_resp = _req.get(
"http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50",
headers={"Authorization": f"token {_token}"} if _token else {},
timeout=10,
)
if _resp.status_code == 200:
_prs = _resp.json()
forgejo_open = len(_prs)
forgejo_unmergeable = sum(1 for p in _prs if not p.get("mergeable", True))
except Exception:
# Fallback to DB counts if Forgejo unreachable
forgejo_open = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='open'").fetchone()["n"]
open_prs = forgejo_open
conflict_prs = forgejo_unmergeable
conflict_permanent_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='conflict_permanent'").fetchone()["n"]
approved_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='approved'").fetchone()["n"]
reviewing_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='reviewing'").fetchone()["n"]
backlog = open_prs
oldest_open = conn.execute(
"SELECT MIN(created_at) as oldest FROM prs WHERE status='open'"
).fetchone()
review_latency_h = None
if oldest_open and oldest_open["oldest"]:
oldest = datetime.fromisoformat(oldest_open["oldest"])
if oldest.tzinfo is None:
oldest = oldest.replace(tzinfo=timezone.utc)
review_latency_h = round((datetime.now(timezone.utc) - oldest).total_seconds() / 3600, 1)
# 2-5. Claim-index vital signs
ci = _fetch_claim_index()
orphan_ratio = None
linkage_density = None
confidence_dist = {}
evidence_freshness = None
claim_index_status = "unavailable"
if ci and ci.get("claims"):
claims = ci["claims"]
total = len(claims)
claim_index_status = "live"
# 2. Orphan ratio (Vida: <15% healthy)
orphan_count = ci.get("orphan_count", sum(1 for c in claims if c.get("incoming_count", 0) == 0))
orphan_ratio = round(orphan_count / total, 3) if total else 0
# 3. Linkage density — avg outgoing links per claim + cross-domain ratio
total_outgoing = sum(c.get("outgoing_count", 0) for c in claims)
avg_links = round(total_outgoing / total, 2) if total else 0
cross_domain = ci.get("cross_domain_links", 0)
linkage_density = {
"avg_outgoing_links": avg_links,
"cross_domain_links": cross_domain,
"cross_domain_ratio": round(cross_domain / total_outgoing, 3) if total_outgoing else 0,
}
# 4. Confidence distribution + calibration
for c in claims:
conf = c.get("confidence", "unknown")
confidence_dist[conf] = confidence_dist.get(conf, 0) + 1
# Normalize to percentages
confidence_pct = {k: round(v / total * 100, 1) for k, v in sorted(confidence_dist.items())}
# 5. Evidence freshness — avg age of claims in days
today = datetime.now(timezone.utc).date()
ages = []
for c in claims:
try:
if c.get("created"):
created = datetime.strptime(c["created"], "%Y-%m-%d").date()
ages.append((today - created).days)
except (ValueError, KeyError, TypeError):
pass
avg_age_days = round(statistics.mean(ages)) if ages else None
median_age_days = round(statistics.median(ages)) if ages else None
fresh_30d = sum(1 for a in ages if a <= 30)
evidence_freshness = {
"avg_age_days": avg_age_days,
"median_age_days": median_age_days,
"fresh_30d_count": fresh_30d,
"fresh_30d_pct": round(fresh_30d / total * 100, 1) if total else 0,
}
# Domain activity (last 7 days) — stagnation detection
domain_activity = conn.execute(
"SELECT domain, COUNT(*) as n, MAX(last_attempt) as latest "
"FROM prs WHERE last_attempt > datetime('now', '-7 days') GROUP BY domain"
).fetchall()
stagnant_domains = []
active_domains = []
for r in domain_activity:
active_domains.append({"domain": r["domain"], "prs_7d": r["n"], "latest": r["latest"]})
all_domains = conn.execute("SELECT DISTINCT domain FROM prs WHERE domain IS NOT NULL").fetchall()
active_names = {r["domain"] for r in domain_activity}
for r in all_domains:
if r["domain"] not in active_names:
stagnant_domains.append(r["domain"])
# Pipeline funnel
total_sources = conn.execute("SELECT COUNT(*) as n FROM sources").fetchone()["n"]
queued_sources = conn.execute(
"SELECT COUNT(*) as n FROM sources WHERE status='unprocessed'"
).fetchone()["n"]
extracted_sources = conn.execute(
"SELECT COUNT(*) as n FROM sources WHERE status='extracted'"
).fetchone()["n"]
merged_prs = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"]
total_prs = conn.execute("SELECT COUNT(*) as n FROM prs").fetchone()["n"]
funnel = {
"sources_total": total_sources,
"sources_queued": queued_sources,
"sources_extracted": extracted_sources,
"prs_total": total_prs,
"prs_merged": merged_prs,
"conversion_rate": round(merged_prs / total_prs, 3) if total_prs else 0,
}
# Queue staleness — sources unprocessed for >7 days
stale_buckets = conn.execute("""
SELECT
CASE
WHEN created_at < datetime('now', '-30 days') THEN '30d+'
WHEN created_at < datetime('now', '-14 days') THEN '14-30d'
WHEN created_at < datetime('now', '-7 days') THEN '7-14d'
ELSE 'fresh'
END as age_bucket,
COUNT(*) as cnt
FROM sources
WHERE status = 'unprocessed'
GROUP BY age_bucket
""").fetchall()
stale_map = {r["age_bucket"]: r["cnt"] for r in stale_buckets}
stale_total = sum(v for k, v in stale_map.items() if k != "fresh")
oldest_unprocessed = conn.execute(
"SELECT MIN(created_at) as oldest FROM sources WHERE status='unprocessed'"
).fetchone()
oldest_age_days = None
if oldest_unprocessed and oldest_unprocessed["oldest"]:
oldest_dt = datetime.fromisoformat(oldest_unprocessed["oldest"])
if oldest_dt.tzinfo is None:
oldest_dt = oldest_dt.replace(tzinfo=timezone.utc)
oldest_age_days = round((datetime.now(timezone.utc) - oldest_dt).total_seconds() / 86400, 1)
queue_staleness = {
"stale_count": stale_total,
"buckets": stale_map,
"oldest_age_days": oldest_age_days,
"status": "healthy" if stale_total == 0 else ("warning" if stale_total <= 10 else "critical"),
}
return {
"claim_index_status": claim_index_status,
"review_throughput": {
"backlog": backlog,
"open_prs": open_prs,
"approved_waiting": approved_prs,
"conflict_prs": conflict_prs,
"conflict_permanent_prs": conflict_permanent_prs,
"reviewing_prs": reviewing_prs,
"oldest_open_hours": review_latency_h,
"status": "healthy" if backlog <= 3 else ("warning" if backlog <= 10 else "critical"),
},
"orphan_ratio": {
"ratio": orphan_ratio,
"count": ci.get("orphan_count") if ci else None,
"total": ci.get("total_claims") if ci else None,
"status": "healthy" if orphan_ratio and orphan_ratio < 0.15 else ("warning" if orphan_ratio and orphan_ratio < 0.30 else "critical") if orphan_ratio is not None else "unavailable",
},
"linkage_density": linkage_density,
"confidence_distribution": confidence_dist,
"evidence_freshness": evidence_freshness,
"domain_activity": {
"active": active_domains,
"stagnant": stagnant_domains,
"status": "healthy" if not stagnant_domains else "warning",
},
"funnel": funnel,
"queue_staleness": queue_staleness,
}
# ─── Auth ────────────────────────────────────────────────────────────────────
def _load_secret(path: Path) -> str | None:
"""Load a secret from a file. Returns None if missing."""
try:
return path.read_text().strip()
except Exception:
return None
@web.middleware
async def auth_middleware(request, handler):
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
if request.path in _PUBLIC_PATHS:
return await handler(request)
expected = request.app.get("api_key")
if not expected:
# No key configured — all endpoints open (development mode)
return await handler(request)
provided = request.headers.get("X-Api-Key", "")
if provided != expected:
return web.json_response({"error": "unauthorized"}, status=401)
return await handler(request)
# ─── Embedding + Search ──────────────────────────────────────────────────────
# Moved to lib/search.py — imported at top of file as kb_search, embed_query, search_qdrant
# ─── Usage logging ───────────────────────────────────────────────────────────
def _get_write_db() -> sqlite3.Connection | None:
"""Open read-write connection for usage logging only.
Separate from the main read-only connection. Returns None if DB unavailable.
"""
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
# Ensure claim_usage table exists (Epimetheus creates it, but be safe)
conn.execute("""
CREATE TABLE IF NOT EXISTS claim_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
claim_path TEXT NOT NULL,
agent TEXT,
context TEXT,
ts TEXT DEFAULT (datetime('now'))
)
""")
conn.commit()
return conn
except Exception as e:
logger.warning("Failed to open write DB for usage logging: %s", e)
return None
# ─── Route handlers ─────────────────────────────────────────────────────────
async def handle_dashboard(request):
"""GET / — main Chart.js operational dashboard."""
try:
conn = _conn(request)
metrics = _current_metrics(conn)
snapshots = _snapshot_history(conn, days=7)
changes = _version_changes(conn, days=30)
vital_signs = _compute_vital_signs(conn)
contributors_principal = _contributor_leaderboard(conn, limit=10, view="principal")
contributors_agent = _contributor_leaderboard(conn, limit=10, view="agent")
domain_breakdown = _domain_breakdown(conn)
except sqlite3.Error as e:
return web.Response(
text=_render_error(f"Pipeline database unavailable: {e}"),
content_type="text/html",
status=503,
)
now = datetime.now(timezone.utc)
html = _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, domain_breakdown, now)
return web.Response(text=html, content_type="text/html")
async def handle_api_metrics(request):
"""GET /api/metrics — JSON operational metrics."""
conn = _conn(request)
return web.json_response(_current_metrics(conn))
async def handle_api_snapshots(request):
"""GET /api/snapshots?days=7 — time-series data for charts."""
conn = _conn(request)
days = int(request.query.get("days", "7"))
snapshots = _snapshot_history(conn, days)
changes = _version_changes(conn, days)
return web.json_response({"snapshots": snapshots, "version_changes": changes, "days": days})
async def handle_api_vital_signs(request):
"""GET /api/vital-signs — Vida's five vital signs."""
conn = _conn(request)
return web.json_response(_compute_vital_signs(conn))
async def handle_api_contributors(request):
"""GET /api/contributors — contributor leaderboard.
Query params:
limit: max entries (default 50)
view: "principal" (default, rolls up agents) or "agent" (one row per handle)
"""
conn = _conn(request)
limit = int(request.query.get("limit", "50"))
view = request.query.get("view", "principal")
if view not in ("principal", "agent"):
view = "principal"
contributors = _contributor_leaderboard(conn, limit, view=view)
return web.json_response({"contributors": contributors, "view": view})
def _domain_breakdown(conn) -> dict:
"""Per-domain contribution breakdown: claims, contributors, sources, decisions."""
# Claims per domain from merged knowledge PRs
domain_stats = {}
for r in conn.execute("""
SELECT domain, count(*) as prs,
SUM(CASE WHEN commit_type='knowledge' THEN 1 ELSE 0 END) as knowledge_prs
FROM prs WHERE status='merged' AND domain IS NOT NULL
GROUP BY domain ORDER BY prs DESC
""").fetchall():
domain_stats[r["domain"]] = {
"total_prs": r["prs"],
"knowledge_prs": r["knowledge_prs"] or 0,
"contributors": [],
}
# Top contributors per domain (from PR agent field + principal roll-up)
has_principal = _has_column(conn, "contributors", "principal")
for r in conn.execute("""
SELECT p.domain,
COALESCE(c.principal, p.agent, 'unknown') as contributor,
count(*) as cnt
FROM prs p
LEFT JOIN contributors c ON LOWER(p.agent) = c.handle
WHERE p.status='merged' AND p.commit_type='knowledge' AND p.domain IS NOT NULL
GROUP BY p.domain, contributor
ORDER BY p.domain, cnt DESC
""").fetchall():
domain = r["domain"]
if domain in domain_stats:
domain_stats[domain]["contributors"].append({
"handle": r["contributor"],
"claims": r["cnt"],
})
return domain_stats
async def handle_api_domains(request):
"""GET /api/domains — per-domain contribution breakdown.
Returns claims, contributors, and knowledge PR counts per domain.
"""
conn = _conn(request)
breakdown = _domain_breakdown(conn)
return web.json_response({"domains": breakdown})
async def handle_api_search(request):
"""GET /api/search — semantic search over claims via Qdrant + graph expansion.
Query params:
q: search query (required)
domain: filter by domain (optional)
confidence: filter by confidence level (optional)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
"""
query = request.query.get("q", "").strip()
if not query:
return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain")
confidence = request.query.get("confidence")
limit = min(int(request.query.get("limit", "10")), 50)
exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false"
# Use shared search library (Layer 1 + Layer 2)
result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude)
if "error" in result:
error = result["error"]
if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500)
return web.json_response(result)
async def handle_api_audit(request):
"""GET /api/audit — query response_audit table for agent response diagnostics.
Query params:
agent: filter by agent name (optional)
query: search in query text (optional)
limit: max results, default 50, max 200 (optional)
offset: pagination offset (optional)
days: how many days back, default 7 (optional)
"""
conn = _conn(request)
# Check if response_audit table exists
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='response_audit'"
).fetchone()
if not table_check:
return web.json_response({"error": "response_audit table not found"}, status=404)
agent = request.query.get("agent")
query_filter = request.query.get("query", "").strip()
limit = min(int(request.query.get("limit", "50")), 200)
offset = int(request.query.get("offset", "0"))
days = int(request.query.get("days", "7"))
where_clauses = ["timestamp > datetime('now', ?||' days')"]
params: list = [f"-{days}"]
if agent:
where_clauses.append("agent = ?")
params.append(agent)
if query_filter:
where_clauses.append("query LIKE ?")
params.append(f"%{query_filter}%")
where_sql = " AND ".join(where_clauses)
rows = conn.execute(
f"""SELECT id, timestamp, agent, chat_id, query, reformulated_query,
claims_matched, confidence_score, response_length,
retrieval_method, vector_scores, tool_calls,
pass_used, total_candidates
FROM response_audit
WHERE {where_sql}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?""",
params + [limit, offset],
).fetchall()
total = conn.execute(
f"SELECT COUNT(*) as n FROM response_audit WHERE {where_sql}",
params,
).fetchone()["n"]
results = []
for r in rows:
row_dict = dict(r)
# Parse JSON fields for the response
for json_field in ("claims_matched", "vector_scores", "tool_calls"):
if row_dict.get(json_field):
try:
row_dict[json_field] = json.loads(row_dict[json_field])
except (json.JSONDecodeError, TypeError):
pass
results.append(row_dict)
return web.json_response({"total": total, "results": results})
async def handle_audit_page(request):
"""GET /audit — HTML page for browsing response audit data."""
return web.Response(content_type="text/html", text=_render_audit_page())
async def handle_api_usage(request):
"""POST /api/usage — log claim usage for analytics.
Body: {"claim_path": "...", "agent": "rio", "context": "telegram-response"}
Fire-and-forget — returns 200 immediately.
"""
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON"}, status=400)
claim_path = body.get("claim_path", "").strip()
if not claim_path:
return web.json_response({"error": "claim_path required"}, status=400)
agent = body.get("agent", "unknown")
context = body.get("context", "")
# Fire-and-forget write — don't block the response
try:
write_conn = _get_write_db()
if write_conn:
write_conn.execute(
"INSERT INTO claim_usage (claim_path, agent, context) VALUES (?, ?, ?)",
(claim_path, agent, context),
)
write_conn.commit()
write_conn.close()
except Exception as e:
logger.warning("Usage log failed (non-fatal): %s", e)
return web.json_response({"status": "ok"})
# ─── Dashboard HTML ──────────────────────────────────────────────────────────
def _render_error(message: str) -> str:
"""Render a minimal error page when DB is unavailable."""
return f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>Argus — Error</title>
<style>body {{ font-family: -apple-system, system-ui, sans-serif; background: #0d1117; color: #c9d1d9; display: flex; align-items: center; justify-content: center; min-height: 100vh; }}
.err {{ text-align: center; }} h1 {{ color: #f85149; }} p {{ color: #8b949e; }}</style>
</head><body><div class="err"><h1>Argus</h1><p>{message}</p><p>Check if <code>teleo-pipeline.service</code> is running and pipeline.db exists.</p></div></body></html>"""
def _render_audit_page() -> str:
"""Render the response audit browser page."""
return """<!DOCTYPE html>
<html><head>
<meta charset="utf-8">
<title>Argus — Response Audit</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, system-ui, 'Segoe UI', sans-serif; background: #0d1117; color: #c9d1d9; padding: 24px; }
h1 { color: #58a6ff; margin-bottom: 8px; font-size: 22px; }
.subtitle { color: #8b949e; margin-bottom: 20px; font-size: 13px; }
.filters { display: flex; gap: 12px; margin-bottom: 20px; flex-wrap: wrap; align-items: center; }
.filters input, .filters select { background: #161b22; border: 1px solid #30363d; color: #c9d1d9; padding: 8px 12px; border-radius: 6px; font-size: 13px; }
.filters input:focus, .filters select:focus { border-color: #58a6ff; outline: none; }
.filters button { background: #238636; color: #fff; border: none; padding: 8px 16px; border-radius: 6px; cursor: pointer; font-size: 13px; }
.filters button:hover { background: #2ea043; }
.stats { color: #8b949e; font-size: 12px; margin-bottom: 12px; }
.audit-card { background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 16px; margin-bottom: 12px; }
.audit-card:hover { border-color: #484f58; }
.card-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px; }
.card-meta { font-size: 12px; color: #8b949e; }
.card-agent { display: inline-block; background: #1f6feb33; color: #58a6ff; padding: 2px 8px; border-radius: 4px; font-size: 12px; font-weight: 600; }
.card-confidence { display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 12px; font-weight: 600; }
.conf-high { background: #23863633; color: #3fb950; }
.conf-mid { background: #d2992233; color: #d29922; }
.conf-low { background: #f8514933; color: #f85149; }
.card-query { font-size: 14px; margin-bottom: 8px; word-break: break-word; }
.card-query strong { color: #e6edf3; }
.card-reformulated { font-size: 13px; color: #8b949e; margin-bottom: 8px; font-style: italic; }
.card-claims { margin-top: 8px; }
.card-claims summary { cursor: pointer; color: #58a6ff; font-size: 13px; }
.claim-list { margin-top: 6px; padding-left: 16px; font-size: 12px; color: #8b949e; }
.claim-list li { margin-bottom: 4px; }
.score-badge { display: inline-block; background: #30363d; padding: 1px 6px; border-radius: 3px; font-size: 11px; margin-left: 4px; }
.card-tools { margin-top: 8px; }
.card-tools summary { cursor: pointer; color: #58a6ff; font-size: 13px; }
.tool-json { margin-top: 6px; font-size: 11px; color: #8b949e; white-space: pre-wrap; word-break: break-all; max-height: 200px; overflow-y: auto; background: #0d1117; padding: 8px; border-radius: 4px; }
.pagination { display: flex; gap: 8px; margin-top: 16px; justify-content: center; }
.pagination button { background: #21262d; color: #c9d1d9; border: 1px solid #30363d; padding: 6px 14px; border-radius: 6px; cursor: pointer; }
.pagination button:hover { background: #30363d; }
.pagination button:disabled { opacity: 0.4; cursor: not-allowed; }
.empty { text-align: center; color: #484f58; padding: 40px; font-size: 14px; }
.nav { margin-bottom: 20px; font-size: 13px; }
.nav a { color: #58a6ff; text-decoration: none; }
.nav a:hover { text-decoration: underline; }
.pass-badge { display: inline-block; background: #30363d; padding: 1px 6px; border-radius: 3px; font-size: 11px; margin-left: 4px; }
</style>
</head>
<body>
<div class="nav"><a href="/">&larr; Dashboard</a></div>
<h1>Response Audit</h1>
<p class="subtitle">Browse agent responses, retrieved claims, and search quality metrics</p>
<div class="filters">
<input type="text" id="query-filter" placeholder="Search queries..." style="width:250px">
<select id="agent-filter">
<option value="">All agents</option>
<option value="rio">rio</option>
<option value="leo">leo</option>
<option value="theseus">theseus</option>
</select>
<select id="days-filter">
<option value="1">Last 24h</option>
<option value="7" selected>Last 7 days</option>
<option value="30">Last 30 days</option>
</select>
<button onclick="loadAudit()">Search</button>
</div>
<div class="stats" id="stats"></div>
<div id="results"></div>
<div class="pagination" id="pagination"></div>
<script>
let currentOffset = 0;
const PAGE_SIZE = 25;
async function loadAudit(offset = 0) {
currentOffset = offset;
const query = document.getElementById('query-filter').value;
const agent = document.getElementById('agent-filter').value;
const days = document.getElementById('days-filter').value;
const params = new URLSearchParams({limit: PAGE_SIZE, offset, days});
if (query) params.set('query', query);
if (agent) params.set('agent', agent);
const resp = await fetch('/api/audit?' + params);
const data = await resp.json();
if (data.error) {
document.getElementById('results').innerHTML = '<div class="empty">' + data.error + '</div>';
return;
}
document.getElementById('stats').textContent =
`${data.total} response${data.total !== 1 ? 's' : ''} found — showing ${offset + 1}${Math.min(offset + PAGE_SIZE, data.total)}`;
if (data.results.length === 0) {
document.getElementById('results').innerHTML = '<div class="empty">No audit entries found. The bot logs responses as they happen.</div>';
document.getElementById('pagination').innerHTML = '';
return;
}
let html = '';
for (const r of data.results) {
const confClass = r.confidence_score >= 0.7 ? 'conf-high' : r.confidence_score >= 0.4 ? 'conf-mid' : 'conf-low';
const confLabel = r.confidence_score !== null ? (r.confidence_score * 100).toFixed(0) + '%' : '';
// Claims matched
let claimsHtml = '';
if (r.claims_matched && Array.isArray(r.claims_matched) && r.claims_matched.length > 0) {
const items = r.claims_matched.map(c => {
if (typeof c === 'string') return '<li>' + esc(c) + '</li>';
return '<li>' + esc(c.path || c.title || JSON.stringify(c)) + '</li>';
}).join('');
claimsHtml = '<details class="card-claims"><summary>' + r.claims_matched.length + ' claims retrieved</summary><ul class="claim-list">' + items + '</ul></details>';
} else {
claimsHtml = '<div style="font-size:12px;color:#484f58">No claims matched</div>';
}
// Vector scores
let scoresHtml = '';
if (r.vector_scores && Array.isArray(r.vector_scores) && r.vector_scores.length > 0) {
const scoreItems = r.vector_scores.map(s => {
if (typeof s === 'object') return '<li>' + esc(s.claim || s.path || '?') + ' <span class="score-badge">' + (s.score || s.similarity || '?') + '</span></li>';
return '<li>' + esc(String(s)) + '</li>';
}).join('');
scoresHtml = '<details class="card-claims"><summary>Vector scores</summary><ul class="claim-list">' + scoreItems + '</ul></details>';
}
// Tool calls
let toolsHtml = '';
if (r.tool_calls && Array.isArray(r.tool_calls) && r.tool_calls.length > 0) {
toolsHtml = '<details class="card-tools"><summary>' + r.tool_calls.length + ' retrieval steps</summary><div class="tool-json">' + esc(JSON.stringify(r.tool_calls, null, 2)) + '</div></details>';
}
// Reformulated query
let reformHtml = '';
if (r.reformulated_query && r.reformulated_query !== r.query) {
reformHtml = '<div class="card-reformulated">Reformulated: ' + esc(r.reformulated_query) + '</div>';
}
// Pass info
let passHtml = '';
if (r.pass_used) {
passHtml = ' <span class="pass-badge">Pass ' + esc(String(r.pass_used)) + '</span>';
}
if (r.total_candidates) {
passHtml += ' <span class="pass-badge">' + r.total_candidates + ' candidates</span>';
}
html += '<div class="audit-card">' +
'<div class="card-header">' +
'<div><span class="card-agent">' + esc(r.agent || '?') + '</span>' + passHtml + '</div>' +
'<div class="card-meta">' + esc(r.timestamp) + ' <span class="card-confidence ' + confClass + '">' + confLabel + '</span></div>' +
'</div>' +
'<div class="card-query"><strong>Q:</strong> ' + esc(r.query || '') + '</div>' +
reformHtml +
claimsHtml +
scoresHtml +
toolsHtml +
'</div>';
}
document.getElementById('results').innerHTML = html;
// Pagination
const totalPages = Math.ceil(data.total / PAGE_SIZE);
const currentPage = Math.floor(offset / PAGE_SIZE);
let pagHtml = '';
if (currentPage > 0) pagHtml += '<button onclick="loadAudit(' + ((currentPage - 1) * PAGE_SIZE) + ')">&larr; Prev</button>';
pagHtml += '<span style="color:#8b949e;padding:6px">Page ' + (currentPage + 1) + ' / ' + totalPages + '</span>';
if (currentPage < totalPages - 1) pagHtml += '<button onclick="loadAudit(' + ((currentPage + 1) * PAGE_SIZE) + ')">Next &rarr;</button>';
document.getElementById('pagination').innerHTML = pagHtml;
}
function esc(s) { const d = document.createElement('div'); d.textContent = s; return d.innerHTML; }
// Load on page open
loadAudit();
</script>
</body></html>"""
def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, domain_breakdown, now) -> str:
"""Render the full operational dashboard as HTML with Chart.js."""
# Prepare chart data
timestamps = [s["ts"] for s in snapshots]
throughput_data = [s.get("throughput_1h", 0) for s in snapshots]
approval_data = [(s.get("approval_rate") or 0) * 100 for s in snapshots]
open_prs_data = [s.get("open_prs", 0) for s in snapshots]
merged_data = [s.get("merged_total", 0) for s in snapshots]
# Rejection breakdown
rej_wiki = [s.get("rejection_broken_wiki_links", 0) for s in snapshots]
rej_schema = [s.get("rejection_frontmatter_schema", 0) for s in snapshots]
rej_dup = [s.get("rejection_near_duplicate", 0) for s in snapshots]
rej_conf = [s.get("rejection_confidence", 0) for s in snapshots]
rej_other = [s.get("rejection_other", 0) for s in snapshots]
# Source origins
origin_agent = [s.get("source_origin_agent", 0) for s in snapshots]
origin_human = [s.get("source_origin_human", 0) for s in snapshots]
# Version annotations
annotations_js = json.dumps([
{
"type": "line",
"xMin": c["ts"],
"xMax": c["ts"],
"borderColor": "#d29922" if c["type"] == "prompt" else "#58a6ff",
"borderWidth": 1,
"borderDash": [4, 4],
"label": {
"display": True,
"content": f"{c['type']}: {c.get('to', '?')}",
"position": "start",
"backgroundColor": "#161b22",
"color": "#8b949e",
"font": {"size": 10},
},
}
for c in changes
])
# Status color helper
sm = metrics["status_map"]
ar = metrics["approval_rate"]
ar_color = "green" if ar > 0.5 else ("yellow" if ar > 0.2 else "red")
fr_color = "green" if metrics["fix_rate"] > 0.3 else ("yellow" if metrics["fix_rate"] > 0.1 else "red")
# Vital signs
vs_review = vital_signs["review_throughput"]
vs_status_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_review["status"], "yellow")
# Orphan ratio
vs_orphan = vital_signs.get("orphan_ratio", {})
orphan_ratio_val = vs_orphan.get("ratio")
orphan_color = {"healthy": "green", "warning": "yellow", "critical": "red"}.get(vs_orphan.get("status", ""), "")
orphan_display = f"{orphan_ratio_val:.1%}" if orphan_ratio_val is not None else ""
# Linkage density
vs_linkage = vital_signs.get("linkage_density") or {}
linkage_display = f'{vs_linkage.get("avg_outgoing_links", "")}'
cross_domain_ratio = vs_linkage.get("cross_domain_ratio")
cross_domain_color = "green" if cross_domain_ratio and cross_domain_ratio >= 0.15 else ("yellow" if cross_domain_ratio and cross_domain_ratio >= 0.05 else "red") if cross_domain_ratio is not None else ""
# Evidence freshness
vs_fresh = vital_signs.get("evidence_freshness") or {}
fresh_display = f'{vs_fresh.get("median_age_days", "")}' if vs_fresh.get("median_age_days") else ""
fresh_pct = vs_fresh.get("fresh_30d_pct", 0)
# Confidence distribution
vs_conf = vital_signs.get("confidence_distribution", {})
# Rejection reasons table — show unique PRs alongside event count
reason_rows = "".join(
f'<tr><td><code>{r["tag"]}</code></td><td>{r["unique_prs"]}</td><td style="color:#8b949e">{r["count"]}</td></tr>'
for r in metrics["rejection_reasons"]
)
# Domain table
domain_rows = ""
for domain, statuses in sorted(metrics["domains"].items()):
m = statuses.get("merged", 0)
c = statuses.get("closed", 0)
o = statuses.get("open", 0)
total = sum(statuses.values())
domain_rows += f"<tr><td>{domain}</td><td>{total}</td><td class='green'>{m}</td><td class='red'>{c}</td><td>{o}</td></tr>"
# Contributor rows — principal view (default)
principal_rows = "".join(
f'<tr><td>{c["handle"]}'
+ (f'<span style="color:#8b949e;font-size:11px"> ({", ".join(c["agents"])})</span>' if c.get("agents") else "")
+ f'</td><td>{c["tier"]}</td>'
f'<td>{c["claims_merged"]}</td><td>{c["ci"]}</td>'
f'<td>{", ".join(c["domains"][:3]) if c["domains"] else "-"}</td></tr>'
for c in contributors_principal[:10]
)
# Contributor rows — agent view
agent_rows = "".join(
f'<tr><td>{c["handle"]}'
+ (f'<span style="color:#8b949e;font-size:11px"> → {c["principal"]}</span>' if c.get("principal") else "")
+ f'</td><td>{c["tier"]}</td>'
f'<td>{c["claims_merged"]}</td><td>{c["ci"]}</td>'
f'<td>{", ".join(c["domains"][:3]) if c["domains"] else "-"}</td></tr>'
for c in contributors_agent[:10]
)
# Breaker status
breaker_rows = ""
for name, info in metrics["breakers"].items():
state = info["state"]
color = "green" if state == "closed" else ("red" if state == "open" else "yellow")
age = f'{info.get("age_s", "?")}s ago' if "age_s" in info else "-"
breaker_rows += f'<tr><td>{name}</td><td class="{color}">{state}</td><td>{info["failures"]}</td><td>{age}</td></tr>'
# Funnel numbers
funnel = vital_signs["funnel"]
return f"""<!DOCTYPE html>
<html lang="en"><head>
<meta charset="utf-8">
<title>Argus — Teleo Diagnostics</title>
<meta http-equiv="refresh" content="60">
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.6"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns@3.0.0"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-plugin-annotation@3.1.0"></script>
<style>
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{ font-family: -apple-system, system-ui, 'Segoe UI', sans-serif; background: #0d1117; color: #c9d1d9; padding: 24px; }}
.header {{ display: flex; align-items: baseline; gap: 12px; margin-bottom: 8px; }}
h1 {{ color: #58a6ff; font-size: 24px; }}
.subtitle {{ color: #8b949e; font-size: 13px; }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(160px, 1fr)); gap: 12px; margin: 20px 0; }}
.card {{ background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 16px; }}
.card .label {{ color: #8b949e; font-size: 11px; text-transform: uppercase; letter-spacing: 0.5px; }}
.card .value {{ font-size: 28px; font-weight: 700; margin-top: 2px; }}
.card .detail {{ color: #8b949e; font-size: 11px; margin-top: 2px; }}
.green {{ color: #3fb950; }}
.yellow {{ color: #d29922; }}
.red {{ color: #f85149; }}
.blue {{ color: #58a6ff; }}
.chart-container {{ background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 16px; margin: 16px 0; }}
.chart-container h2 {{ color: #c9d1d9; font-size: 14px; margin-bottom: 12px; }}
canvas {{ max-height: 260px; }}
.row {{ display: grid; grid-template-columns: 1fr 1fr; gap: 16px; }}
@media (max-width: 800px) {{ .row {{ grid-template-columns: 1fr; }} }}
table {{ width: 100%; border-collapse: collapse; font-size: 13px; }}
th {{ color: #8b949e; font-size: 11px; text-transform: uppercase; text-align: left; padding: 6px 10px; border-bottom: 1px solid #30363d; }}
td {{ padding: 6px 10px; border-bottom: 1px solid #21262d; }}
code {{ background: #21262d; padding: 2px 6px; border-radius: 3px; font-size: 12px; }}
.section {{ margin-top: 28px; }}
.section-title {{ color: #58a6ff; font-size: 15px; font-weight: 600; margin-bottom: 12px; padding-bottom: 6px; border-bottom: 1px solid #21262d; }}
.funnel {{ display: flex; align-items: center; gap: 8px; flex-wrap: wrap; }}
.funnel-step {{ text-align: center; flex: 1; min-width: 100px; }}
.funnel-step .num {{ font-size: 24px; font-weight: 700; }}
.funnel-step .lbl {{ font-size: 11px; color: #8b949e; text-transform: uppercase; }}
.funnel-arrow {{ color: #30363d; font-size: 20px; }}
.footer {{ margin-top: 40px; padding-top: 16px; border-top: 1px solid #21262d; color: #484f58; font-size: 11px; }}
.footer a {{ color: #484f58; }}
</style>
</head><body>
<div class="header">
<h1>Argus</h1>
<span class="subtitle">Teleo Pipeline Diagnostics &middot; {now.strftime("%Y-%m-%d %H:%M UTC")} &middot; auto-refresh 60s</span>
</div>
<!-- Hero Cards -->
<div class="grid">
<div class="card">
<div class="label">Throughput</div>
<div class="value">{metrics["throughput_1h"]}<span style="font-size:14px;color:#8b949e">/hr</span></div>
<div class="detail">merged last hour</div>
</div>
<div class="card">
<div class="label">Approval Rate (24h)</div>
<div class="value {ar_color}">{ar:.1%}</div>
<div class="detail">{metrics["approved_24h"]}/{metrics["evaluated_24h"]} evaluated</div>
</div>
<div class="card">
<div class="label">Review Backlog</div>
<div class="value {vs_status_color}">{vs_review["backlog"]}</div>
<div class="detail">{vs_review["open_prs"]} open + {vs_review["reviewing_prs"]} reviewing + {vs_review["approved_waiting"]} approved + {vs_review["conflict_prs"]} conflicts</div>
</div>
<div class="card">
<div class="label">Merged Total</div>
<div class="value green">{sm.get("merged", 0)}</div>
<div class="detail">{sm.get("closed", 0)} closed</div>
</div>
<div class="card">
<div class="label">Fix Success</div>
<div class="value {fr_color}">{metrics["fix_rate"]:.1%}</div>
<div class="detail">{metrics["fix_succeeded"]}/{metrics["fix_attempted"]} fixed</div>
</div>
<div class="card">
<div class="label">Time to Merge</div>
<div class="value">{f"{metrics['median_ttm_minutes']:.0f}" if metrics["median_ttm_minutes"] else ""}<span style="font-size:14px;color:#8b949e">min</span></div>
<div class="detail">median (24h)</div>
</div>
</div>
<!-- Pipeline Funnel -->
<div class="section">
<div class="section-title">Pipeline Funnel</div>
<div class="funnel">
<div class="funnel-step"><div class="num">{funnel["sources_total"]}</div><div class="lbl">Sources</div></div>
<div class="funnel-arrow">→</div>
<div class="funnel-step"><div class="num" style="color: #f0883e">{funnel["sources_queued"]}</div><div class="lbl">In Queue</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num">{funnel["sources_extracted"]}</div><div class="lbl">Extracted</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num">{funnel["prs_total"]}</div><div class="lbl">PRs Created</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num green">{funnel["prs_merged"]}</div><div class="lbl">Merged</div></div>
<div class="funnel-arrow">&rarr;</div>
<div class="funnel-step"><div class="num blue">{funnel["conversion_rate"]:.1%}</div><div class="lbl">Conversion</div></div>
</div>
</div>
<!-- Vital Signs (Vida's Five) -->
{f'''<div class="section">
<div class="section-title">Knowledge Health (Vida&rsquo;s Vital Signs)</div>
<div class="grid">
<div class="card">
<div class="label">Orphan Ratio</div>
<div class="value {orphan_color}">{orphan_display}</div>
<div class="detail">{vs_orphan.get("count", "?")} / {vs_orphan.get("total", "?")} claims &middot; target &lt;15%</div>
</div>
<div class="card">
<div class="label">Avg Links/Claim</div>
<div class="value">{linkage_display}</div>
<div class="detail">cross-domain: <span class="{cross_domain_color}">{f"{cross_domain_ratio:.1%}" if cross_domain_ratio is not None else ""}</span> &middot; target 15-30%</div>
</div>
<div class="card">
<div class="label">Evidence Freshness</div>
<div class="value">{fresh_display}<span style="font-size:14px;color:#8b949e">d median</span></div>
<div class="detail">{vs_fresh.get("fresh_30d_count", "?")} claims &lt;30d old &middot; {fresh_pct:.0f}% fresh</div>
</div>
<div class="card">
<div class="label">Confidence Spread</div>
<div class="value" style="font-size:16px">{" / ".join(f"{vs_conf.get(k, 0)}" for k in ["proven", "likely", "experimental", "speculative"])}</div>
<div class="detail">proven / likely / experimental / speculative</div>
</div>
</div>
</div>''' if vital_signs.get("claim_index_status") == "live" else ""}
<!-- Charts -->
<div id="no-chart-data" class="card" style="text-align:center;padding:40px;margin:16px 0;display:none">
<p style="color:#8b949e">No time-series data yet. Charts will appear once Epimetheus wires <code>record_snapshot()</code> into the pipeline daemon.</p>
</div>
<div id="chart-section">
<div class="row">
<div class="chart-container">
<h2>Throughput &amp; Approval Rate</h2>
<canvas id="throughputChart"></canvas>
</div>
<div class="chart-container">
<h2>Rejection Reasons Over Time</h2>
<canvas id="rejectionChart"></canvas>
</div>
</div>
<div class="row">
<div class="chart-container">
<h2>PR Backlog</h2>
<canvas id="backlogChart"></canvas>
</div>
<div class="chart-container">
<h2>Source Origins (24h snapshots)</h2>
<canvas id="originChart"></canvas>
</div>
</div>
</div>
<!-- Tables -->
<div class="row">
<div class="section">
<div class="section-title">Top Rejection Reasons (24h)</div>
<div class="card">
<table>
<tr><th>Issue</th><th>PRs</th><th style="color:#8b949e">Events</th></tr>
{reason_rows if reason_rows else "<tr><td colspan='2' style='color:#8b949e'>No rejections in 24h</td></tr>"}
</table>
</div>
</div>
<div class="section">
<div class="section-title">Circuit Breakers</div>
<div class="card">
<table>
<tr><th>Stage</th><th>State</th><th>Failures</th><th>Last Success</th></tr>
{breaker_rows if breaker_rows else "<tr><td colspan='4' style='color:#8b949e'>No breaker data</td></tr>"}
</table>
</div>
</div>
</div>
<div class="row">
<div class="section">
<div class="section-title">Domain Breakdown</div>
<div class="card">
<table>
<tr><th>Domain</th><th>Total</th><th>Merged</th><th>Closed</th><th>Open</th></tr>
{domain_rows}
</table>
</div>
</div>
<div class="section">
<div class="section-title" style="display:flex;align-items:center;gap:12px">
Top Contributors (by CI)
<span style="font-size:11px;font-weight:400">
<button id="btn-principal" onclick="toggleContribView('principal')" style="background:#21262d;color:#58a6ff;border:1px solid #30363d;border-radius:4px;padding:2px 8px;cursor:pointer;font-size:11px">By Human</button>
<button id="btn-agent" onclick="toggleContribView('agent')" style="background:transparent;color:#8b949e;border:1px solid #30363d;border-radius:4px;padding:2px 8px;cursor:pointer;font-size:11px">By Agent</button>
</span>
</div>
<div class="card">
<table id="contrib-principal">
<tr><th>Contributor</th><th>Tier</th><th>Claims</th><th>CI</th><th>Domains</th></tr>
{principal_rows if principal_rows else "<tr><td colspan='5' style='color:#8b949e'>No contributors yet</td></tr>"}
</table>
<table id="contrib-agent" style="display:none">
<tr><th>Agent</th><th>Tier</th><th>Claims</th><th>CI</th><th>Domains</th></tr>
{agent_rows if agent_rows else "<tr><td colspan='5' style='color:#8b949e'>No contributors yet</td></tr>"}
</table>
</div>
</div>
</div>
<!-- Domain Breakdown -->
<div class="section">
<div class="section-title">Contributions by Domain</div>
<div class="card">
<table>
<tr><th>Domain</th><th>Knowledge PRs</th><th>Top Contributors</th></tr>
{"".join(f'''<tr>
<td style="color:#58a6ff">{domain}</td>
<td>{stats["knowledge_prs"]}</td>
<td style="font-size:12px;color:#8b949e">{", ".join(f'{c["handle"]} ({c["claims"]})' for c in stats["contributors"][:3])}</td>
</tr>''' for domain, stats in sorted(domain_breakdown.items(), key=lambda x: x[1]["knowledge_prs"], reverse=True) if stats["knowledge_prs"] > 0)}
</table>
</div>
</div>
<!-- Stagnation Alerts -->
{"" if not vital_signs["domain_activity"]["stagnant"] else f'''
<div class="section">
<div class="section-title" style="color:#d29922">Stagnation Alerts</div>
<div class="card">
<p style="color:#d29922">Domains with no PR activity in 7 days: <strong>{", ".join(vital_signs["domain_activity"]["stagnant"])}</strong></p>
</div>
</div>
'''}
<div class="footer">
Argus &middot; Teleo Pipeline Diagnostics &middot;
<a href="/api/metrics">API: Metrics</a> &middot;
<a href="/api/snapshots">Snapshots</a> &middot;
<a href="/api/vital-signs">Vital Signs</a> &middot;
<a href="/api/contributors">Contributors</a> &middot;
<a href="/api/domains">Domains</a> &middot;
<a href="/audit">Response Audit</a>
</div>
<script>
const timestamps = {json.dumps(timestamps)};
if (timestamps.length === 0) {{
document.getElementById('chart-section').style.display = 'none';
document.getElementById('no-chart-data').style.display = 'block';
}} else {{
const throughputData = {json.dumps(throughput_data)};
const approvalData = {json.dumps(approval_data)};
const openPrsData = {json.dumps(open_prs_data)};
const mergedData = {json.dumps(merged_data)};
const rejWiki = {json.dumps(rej_wiki)};
const rejSchema = {json.dumps(rej_schema)};
const rejDup = {json.dumps(rej_dup)};
const rejConf = {json.dumps(rej_conf)};
const rejOther = {json.dumps(rej_other)};
const originAgent = {json.dumps(origin_agent)};
const originHuman = {json.dumps(origin_human)};
const annotations = {annotations_js};
const chartDefaults = {{
color: '#8b949e',
borderColor: '#30363d',
font: {{ family: '-apple-system, system-ui, sans-serif' }},
}};
Chart.defaults.color = '#8b949e';
Chart.defaults.borderColor = '#21262d';
Chart.defaults.font.family = '-apple-system, system-ui, sans-serif';
Chart.defaults.font.size = 11;
// Throughput + Approval Rate (dual axis)
new Chart(document.getElementById('throughputChart'), {{
type: 'line',
data: {{
labels: timestamps,
datasets: [
{{
label: 'Throughput/hr',
data: throughputData,
borderColor: '#58a6ff',
backgroundColor: 'rgba(88,166,255,0.1)',
fill: true,
tension: 0.3,
yAxisID: 'y',
pointRadius: 1,
}},
{{
label: 'Approval %',
data: approvalData,
borderColor: '#3fb950',
borderDash: [4, 2],
tension: 0.3,
yAxisID: 'y1',
pointRadius: 1,
}},
],
}},
options: {{
responsive: true,
interaction: {{ mode: 'index', intersect: false }},
scales: {{
x: {{ type: 'time', time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ position: 'left', title: {{ display: true, text: 'PRs/hr' }}, min: 0 }},
y1: {{ position: 'right', title: {{ display: true, text: 'Approval %' }}, min: 0, max: 100, grid: {{ drawOnChartArea: false }} }},
}},
plugins: {{
annotation: {{ annotations: annotations }},
legend: {{ labels: {{ boxWidth: 12 }} }},
}},
}},
}});
// Rejection reasons (stacked area)
new Chart(document.getElementById('rejectionChart'), {{
type: 'line',
data: {{
labels: timestamps,
datasets: [
{{ label: 'Wiki Links', data: rejWiki, borderColor: '#f85149', backgroundColor: 'rgba(248,81,73,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Schema', data: rejSchema, borderColor: '#d29922', backgroundColor: 'rgba(210,153,34,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Duplicate', data: rejDup, borderColor: '#8b949e', backgroundColor: 'rgba(139,148,158,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Confidence', data: rejConf, borderColor: '#bc8cff', backgroundColor: 'rgba(188,140,255,0.2)', fill: true, tension: 0.3, pointRadius: 0 }},
{{ label: 'Other', data: rejOther, borderColor: '#6e7681', backgroundColor: 'rgba(110,118,129,0.15)', fill: true, tension: 0.3, pointRadius: 0 }},
],
}},
options: {{
responsive: true,
scales: {{
x: {{ type: 'time', time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ stacked: true, min: 0, title: {{ display: true, text: 'Count (24h)' }} }},
}},
plugins: {{
annotation: {{ annotations: annotations }},
legend: {{ labels: {{ boxWidth: 12 }} }},
}},
}},
}});
// PR Backlog
new Chart(document.getElementById('backlogChart'), {{
type: 'line',
data: {{
labels: timestamps,
datasets: [
{{ label: 'Open PRs', data: openPrsData, borderColor: '#d29922', backgroundColor: 'rgba(210,153,34,0.15)', fill: true, tension: 0.3, pointRadius: 1 }},
{{ label: 'Merged (total)', data: mergedData, borderColor: '#3fb950', tension: 0.3, pointRadius: 1 }},
],
}},
options: {{
responsive: true,
scales: {{
x: {{ type: 'time', time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ min: 0, title: {{ display: true, text: 'PRs' }} }},
}},
plugins: {{ legend: {{ labels: {{ boxWidth: 12 }} }} }},
}},
}});
// Source Origins
new Chart(document.getElementById('originChart'), {{
type: 'bar',
data: {{
labels: timestamps,
datasets: [
{{ label: 'Agent', data: originAgent, backgroundColor: '#58a6ff' }},
{{ label: 'Human', data: originHuman, backgroundColor: '#3fb950' }},
],
}},
options: {{
responsive: true,
scales: {{
x: {{ type: 'time', stacked: true, time: {{ unit: 'hour', displayFormats: {{ hour: 'MMM d HH:mm' }} }}, grid: {{ display: false }} }},
y: {{ stacked: true, min: 0, title: {{ display: true, text: 'Sources (24h)' }} }},
}},
plugins: {{ legend: {{ labels: {{ boxWidth: 12 }} }} }},
}},
}});
}} // end if (timestamps.length > 0)
function toggleContribView(view) {{
const principal = document.getElementById('contrib-principal');
const agent = document.getElementById('contrib-agent');
const btnP = document.getElementById('btn-principal');
const btnA = document.getElementById('btn-agent');
if (view === 'agent') {{
principal.style.display = 'none';
agent.style.display = '';
btnA.style.background = '#21262d';
btnA.style.color = '#58a6ff';
btnP.style.background = 'transparent';
btnP.style.color = '#8b949e';
}} else {{
principal.style.display = '';
agent.style.display = 'none';
btnP.style.background = '#21262d';
btnP.style.color = '#58a6ff';
btnA.style.background = 'transparent';
btnA.style.color = '#8b949e';
}}
}}
</script>
</body></html>"""
# ─── App factory ─────────────────────────────────────────────────────────────
def create_app() -> web.Application:
app = web.Application(middlewares=[auth_middleware])
app["db"] = _get_db()
app["api_key"] = _load_secret(API_KEY_FILE)
if app["api_key"]:
logger.info("API key auth enabled (protected endpoints require X-Api-Key)")
else:
logger.info("No API key configured — all endpoints open")
app.router.add_get("/", handle_dashboard)
app.router.add_get("/api/metrics", handle_api_metrics)
app.router.add_get("/api/snapshots", handle_api_snapshots)
app.router.add_get("/api/vital-signs", handle_api_vital_signs)
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage)
app.on_cleanup.append(_cleanup)
return app
async def _cleanup(app):
app["db"].close()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger.info("Argus diagnostics starting on port %d, DB: %s", PORT, DB_PATH)
app = create_app()
web.run_app(app, host="0.0.0.0", port=PORT)
if __name__ == "__main__":
main()