diff --git a/diagnostics/alerting.py b/diagnostics/alerting.py deleted file mode 100644 index 33dde714e..000000000 --- a/diagnostics/alerting.py +++ /dev/null @@ -1,537 +0,0 @@ -"""Argus active monitoring — health watchdog, quality regression, throughput anomaly detection. - -Provides check functions that detect problems and return structured alerts. -Called by /check endpoint (periodic cron) or on-demand. - -Alert schema: - { - "id": str, # unique key for dedup (e.g. "dormant:ganymede") - "severity": str, # "critical" | "warning" | "info" - "category": str, # "health" | "quality" | "throughput" | "failure_pattern" - "title": str, # human-readable headline - "detail": str, # actionable description - "agent": str|None, # affected agent (if applicable) - "domain": str|None, # affected domain (if applicable) - "detected_at": str, # ISO timestamp - "auto_resolve": bool, # clears when condition clears - } -""" - -import json -import sqlite3 -import statistics -from datetime import datetime, timezone - - -# ─── Agent-domain mapping (static config, maintained by Argus) ────────────── - -AGENT_DOMAINS = { - "rio": ["internet-finance"], - "clay": ["creative-industries"], - "ganymede": None, # reviewer — cross-domain - "epimetheus": None, # infra - "leo": None, # standards - "oberon": None, # evolution tracking - "vida": None, # health monitoring - "hermes": None, # comms - "astra": None, # research -} - -# Thresholds -DORMANCY_HOURS = 48 -APPROVAL_DROP_THRESHOLD = 15 # percentage points below 7-day baseline -THROUGHPUT_DROP_RATIO = 0.5 # alert if today < 50% of 7-day SMA -REJECTION_SPIKE_RATIO = 0.20 # single reason > 20% of recent rejections -STUCK_LOOP_THRESHOLD = 3 # same agent + same rejection reason > N times in 6h -COST_SPIKE_RATIO = 2.0 # daily cost > 2x 7-day average - - -def _now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -# ─── Check: Agent Health (dormancy detection) ─────────────────────────────── - - -def check_agent_health(conn: sqlite3.Connection) -> list[dict]: - """Detect agents with no PR activity in the last DORMANCY_HOURS hours.""" - alerts = [] - - # Get last activity per agent - rows = conn.execute( - """SELECT agent, MAX(last_attempt) as latest, COUNT(*) as total_prs - FROM prs WHERE agent IS NOT NULL - GROUP BY agent""" - ).fetchall() - - now = datetime.now(timezone.utc) - for r in rows: - agent = r["agent"] - latest = r["latest"] - if not latest: - continue - - last_dt = datetime.fromisoformat(latest) - if last_dt.tzinfo is None: - last_dt = last_dt.replace(tzinfo=timezone.utc) - - hours_since = (now - last_dt).total_seconds() / 3600 - - if hours_since > DORMANCY_HOURS: - alerts.append({ - "id": f"dormant:{agent}", - "severity": "warning", - "category": "health", - "title": f"Agent '{agent}' dormant for {int(hours_since)}h", - "detail": ( - f"No PR activity since {latest}. " - f"Last seen {int(hours_since)}h ago (threshold: {DORMANCY_HOURS}h). " - f"Total historical PRs: {r['total_prs']}." - ), - "agent": agent, - "domain": None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - return alerts - - -# ─── Check: Quality Regression (approval rate drop) ───────────────────────── - - -def check_quality_regression(conn: sqlite3.Connection) -> list[dict]: - """Detect approval rate drops vs 7-day baseline, per agent and per domain.""" - alerts = [] - - # 7-day baseline approval rate (overall) - baseline = conn.execute( - """SELECT - COUNT(CASE WHEN event='approved' THEN 1 END) as approved, - COUNT(*) as total - FROM audit_log - WHERE stage='evaluate' - AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-7 days')""" - ).fetchone() - baseline_rate = (baseline["approved"] / baseline["total"] * 100) if baseline["total"] else None - - # 24h approval rate (overall) - recent = conn.execute( - """SELECT - COUNT(CASE WHEN event='approved' THEN 1 END) as approved, - COUNT(*) as total - FROM audit_log - WHERE stage='evaluate' - AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours')""" - ).fetchone() - recent_rate = (recent["approved"] / recent["total"] * 100) if recent["total"] else None - - if baseline_rate is not None and recent_rate is not None: - drop = baseline_rate - recent_rate - if drop > APPROVAL_DROP_THRESHOLD: - alerts.append({ - "id": "quality_regression:overall", - "severity": "critical", - "category": "quality", - "title": f"Approval rate dropped {drop:.0f}pp (24h: {recent_rate:.0f}% vs 7d: {baseline_rate:.0f}%)", - "detail": ( - f"24h approval rate ({recent_rate:.1f}%) is {drop:.1f} percentage points below " - f"7-day baseline ({baseline_rate:.1f}%). " - f"Evaluated {recent['total']} PRs in last 24h." - ), - "agent": None, - "domain": None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - # Per-agent approval rate (24h vs 7d) — only for agents with >=5 evals in each window - # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) - _check_approval_by_dimension(conn, alerts, "agent", "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))") - - # Per-domain approval rate (24h vs 7d) — Theseus addition - _check_approval_by_dimension(conn, alerts, "domain", "json_extract(detail, '$.domain')") - - return alerts - - -def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr): - """Check approval rate regression grouped by a dimension (agent or domain).""" - # 7-day baseline per dimension - baseline_rows = conn.execute( - f"""SELECT {dim_expr} as dim_val, - COUNT(CASE WHEN event='approved' THEN 1 END) as approved, - COUNT(*) as total - FROM audit_log - WHERE stage='evaluate' - AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-7 days') - AND {dim_expr} IS NOT NULL - GROUP BY dim_val HAVING total >= 5""" - ).fetchall() - baselines = {r["dim_val"]: (r["approved"] / r["total"] * 100) for r in baseline_rows} - - # 24h per dimension - recent_rows = conn.execute( - f"""SELECT {dim_expr} as dim_val, - COUNT(CASE WHEN event='approved' THEN 1 END) as approved, - COUNT(*) as total - FROM audit_log - WHERE stage='evaluate' - AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours') - AND {dim_expr} IS NOT NULL - GROUP BY dim_val HAVING total >= 5""" - ).fetchall() - - for r in recent_rows: - val = r["dim_val"] - if val not in baselines: - continue - recent_rate = r["approved"] / r["total"] * 100 - base_rate = baselines[val] - drop = base_rate - recent_rate - if drop > APPROVAL_DROP_THRESHOLD: - alerts.append({ - "id": f"quality_regression:{dim_name}:{val}", - "severity": "warning", - "category": "quality", - "title": f"{dim_name.title()} '{val}' approval dropped {drop:.0f}pp", - "detail": ( - f"24h: {recent_rate:.1f}% vs 7d baseline: {base_rate:.1f}% " - f"({r['total']} evals in 24h)." - ), - "agent": val if dim_name == "agent" else None, - "domain": val if dim_name == "domain" else None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - -# ─── Check: Throughput Anomaly ────────────────────────────────────────────── - - -def check_throughput(conn: sqlite3.Connection) -> list[dict]: - """Detect throughput stalling — today vs 7-day SMA.""" - alerts = [] - - # Daily merged counts for last 7 days - rows = conn.execute( - """SELECT date(merged_at) as day, COUNT(*) as n - FROM prs WHERE merged_at > datetime('now', '-7 days') - GROUP BY day ORDER BY day""" - ).fetchall() - - if len(rows) < 2: - return alerts # Not enough data - - daily_counts = [r["n"] for r in rows] - sma = statistics.mean(daily_counts[:-1]) if len(daily_counts) > 1 else daily_counts[0] - today_count = daily_counts[-1] - - if sma > 0 and today_count < sma * THROUGHPUT_DROP_RATIO: - alerts.append({ - "id": "throughput:stalling", - "severity": "warning", - "category": "throughput", - "title": f"Throughput stalling: {today_count} merges today vs {sma:.0f}/day avg", - "detail": ( - f"Today's merge count ({today_count}) is below {THROUGHPUT_DROP_RATIO:.0%} of " - f"7-day average ({sma:.1f}/day). Daily counts: {daily_counts}." - ), - "agent": None, - "domain": None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - return alerts - - -# ─── Check: Rejection Reason Spike ───────────────────────────────────────── - - -def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]: - """Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections.""" - alerts = [] - - # Total rejections in 24h - total = conn.execute( - """SELECT COUNT(*) as n FROM audit_log - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours')""" - ).fetchone()["n"] - - if total < 10: - return alerts # Not enough data - - # Count by rejection tag - tags = conn.execute( - """SELECT value as tag, COUNT(*) as cnt - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours') - GROUP BY tag ORDER BY cnt DESC""" - ).fetchall() - - for t in tags: - ratio = t["cnt"] / total - if ratio > REJECTION_SPIKE_RATIO: - alerts.append({ - "id": f"rejection_spike:{t['tag']}", - "severity": "warning", - "category": "quality", - "title": f"Rejection reason '{t['tag']}' at {ratio:.0%} of rejections", - "detail": ( - f"'{t['tag']}' accounts for {t['cnt']}/{total} rejections in 24h " - f"({ratio:.1%}). Threshold: {REJECTION_SPIKE_RATIO:.0%}." - ), - "agent": None, - "domain": None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - return alerts - - -# ─── Check: Stuck Loops ──────────────────────────────────────────────────── - - -def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]: - """Detect agents repeatedly failing on the same rejection reason.""" - alerts = [] - - # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) - rows = conn.execute( - """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, - value as tag, - COUNT(*) as cnt - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-6 hours') - AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL - GROUP BY agent, tag - HAVING cnt > ?""", - (STUCK_LOOP_THRESHOLD,), - ).fetchall() - - for r in rows: - alerts.append({ - "id": f"stuck_loop:{r['agent']}:{r['tag']}", - "severity": "critical", - "category": "health", - "title": f"Agent '{r['agent']}' stuck: '{r['tag']}' failed {r['cnt']}x in 6h", - "detail": ( - f"Agent '{r['agent']}' has been rejected for '{r['tag']}' " - f"{r['cnt']} times in the last 6 hours (threshold: {STUCK_LOOP_THRESHOLD}). " - f"Stop and reassess." - ), - "agent": r["agent"], - "domain": None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - return alerts - - -# ─── Check: Cost Spikes ──────────────────────────────────────────────────── - - -def check_cost_spikes(conn: sqlite3.Connection) -> list[dict]: - """Detect daily cost exceeding 2x of 7-day average per agent.""" - alerts = [] - - # Check if costs table exists and has agent column - try: - cols = conn.execute("PRAGMA table_info(costs)").fetchall() - col_names = {c["name"] for c in cols} - except sqlite3.Error: - return alerts - - if "agent" not in col_names or "cost_usd" not in col_names: - # Fall back to per-PR cost tracking - rows = conn.execute( - """SELECT agent, - SUM(CASE WHEN created_at > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost, - SUM(CASE WHEN created_at > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily - FROM prs WHERE agent IS NOT NULL AND cost_usd > 0 - GROUP BY agent - HAVING avg_daily > 0""" - ).fetchall() - else: - rows = conn.execute( - """SELECT agent, - SUM(CASE WHEN timestamp > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost, - SUM(CASE WHEN timestamp > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily - FROM costs WHERE agent IS NOT NULL - GROUP BY agent - HAVING avg_daily > 0""" - ).fetchall() - - for r in rows: - if r["avg_daily"] and r["today_cost"] > r["avg_daily"] * COST_SPIKE_RATIO: - ratio = r["today_cost"] / r["avg_daily"] - alerts.append({ - "id": f"cost_spike:{r['agent']}", - "severity": "warning", - "category": "health", - "title": f"Agent '{r['agent']}' cost spike: ${r['today_cost']:.2f} today ({ratio:.1f}x avg)", - "detail": ( - f"Today's cost (${r['today_cost']:.2f}) is {ratio:.1f}x the 7-day daily average " - f"(${r['avg_daily']:.2f}). Threshold: {COST_SPIKE_RATIO}x." - ), - "agent": r["agent"], - "domain": None, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - return alerts - - -# ─── Check: Domain Rejection Patterns (Theseus addition) ─────────────────── - - -def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]: - """Track rejection reason shift per domain — surfaces domain maturity issues.""" - alerts = [] - - # Per-domain rejection breakdown in 24h - rows = conn.execute( - """SELECT json_extract(detail, '$.domain') as domain, - value as tag, - COUNT(*) as cnt - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND timestamp > datetime('now', '-24 hours') - AND json_extract(detail, '$.domain') IS NOT NULL - GROUP BY domain, tag - ORDER BY domain, cnt DESC""" - ).fetchall() - - # Group by domain - domain_tags = {} - for r in rows: - d = r["domain"] - if d not in domain_tags: - domain_tags[d] = [] - domain_tags[d].append({"tag": r["tag"], "count": r["cnt"]}) - - # Flag if a domain has >50% of rejections from a single reason (concentrated failure) - for domain, tags in domain_tags.items(): - total = sum(t["count"] for t in tags) - if total < 5: - continue - top = tags[0] - ratio = top["count"] / total - if ratio > 0.5: - alerts.append({ - "id": f"domain_rejection_pattern:{domain}:{top['tag']}", - "severity": "info", - "category": "failure_pattern", - "title": f"Domain '{domain}': {ratio:.0%} of rejections are '{top['tag']}'", - "detail": ( - f"In domain '{domain}', {top['count']}/{total} rejections (24h) are for " - f"'{top['tag']}'. This may indicate a systematic issue with evidence standards " - f"or schema compliance in this domain." - ), - "agent": None, - "domain": domain, - "detected_at": _now_iso(), - "auto_resolve": True, - }) - - return alerts - - -# ─── Failure Report Generator ─────────────────────────────────────────────── - - -def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 24) -> dict | None: - """Compile a failure report for a specific agent. - - Returns top rejection reasons, example PRs, and suggested fixes. - Designed to be sent directly to the agent via Pentagon messaging. - """ - hours = int(hours) # defensive — callers should pass int, but enforce it - rows = conn.execute( - """SELECT value as tag, COUNT(*) as cnt, - GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers - FROM audit_log, json_each(json_extract(detail, '$.issues')) - WHERE stage='evaluate' - AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND json_extract(detail, '$.agent') = ? - AND timestamp > datetime('now', ? || ' hours') - GROUP BY tag ORDER BY cnt DESC - LIMIT 5""", - (agent, f"-{hours}"), - ).fetchall() - - if not rows: - return None - - total_rejections = sum(r["cnt"] for r in rows) - top_reasons = [] - for r in rows: - prs = r["pr_numbers"].split(",")[:3] if r["pr_numbers"] else [] - top_reasons.append({ - "reason": r["tag"], - "count": r["cnt"], - "pct": round(r["cnt"] / total_rejections * 100, 1), - "example_prs": prs, - "suggestion": _suggest_fix(r["tag"]), - }) - - return { - "agent": agent, - "period_hours": hours, - "total_rejections": total_rejections, - "top_reasons": top_reasons, - "generated_at": _now_iso(), - } - - -def _suggest_fix(rejection_tag: str) -> str: - """Map known rejection reasons to actionable suggestions.""" - suggestions = { - "broken_wiki_links": "Check that all [[wiki links]] in claims resolve to existing files. Run link validation before submitting.", - "near_duplicate": "Search existing claims before creating new ones. Use semantic search to find similar claims.", - "frontmatter_schema": "Validate YAML frontmatter against the claim schema. Required fields: title, domain, confidence, type.", - "weak_evidence": "Add concrete sources, data points, or citations. Claims need evidence that can be independently verified.", - "missing_confidence": "Every claim needs a confidence level: proven, likely, experimental, or speculative.", - "domain_mismatch": "Ensure claims are filed under the correct domain. Check domain definitions if unsure.", - "too_broad": "Break broad claims into specific, testable sub-claims.", - "missing_links": "Claims should link to related claims, entities, or sources. Isolated claims are harder to verify.", - } - return suggestions.get(rejection_tag, f"Review rejection reason '{rejection_tag}' and adjust extraction accordingly.") - - -# ─── Run All Checks ──────────────────────────────────────────────────────── - - -def run_all_checks(conn: sqlite3.Connection) -> list[dict]: - """Execute all check functions and return combined alerts.""" - alerts = [] - alerts.extend(check_agent_health(conn)) - alerts.extend(check_quality_regression(conn)) - alerts.extend(check_throughput(conn)) - alerts.extend(check_rejection_spike(conn)) - alerts.extend(check_stuck_loops(conn)) - alerts.extend(check_cost_spikes(conn)) - alerts.extend(check_domain_rejection_patterns(conn)) - return alerts - - -def format_alert_message(alert: dict) -> str: - """Format an alert for Pentagon messaging.""" - severity_icon = {"critical": "!!", "warning": "!", "info": "~"} - icon = severity_icon.get(alert["severity"], "?") - return f"[{icon}] {alert['title']}\n{alert['detail']}" diff --git a/diagnostics/alerting_routes.py b/diagnostics/alerting_routes.py deleted file mode 100644 index fd3574071..000000000 --- a/diagnostics/alerting_routes.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Route handlers for /check and /api/alerts endpoints. - -Import into app.py and register routes in create_app(). -""" - -import json -import logging -from datetime import datetime, timezone - -from aiohttp import web -from alerting import run_all_checks, generate_failure_report, format_alert_message # requires CWD = deploy dir; switch to relative import if packaged - -logger = logging.getLogger("argus.alerting") - -# In-memory alert store (replaced each /check cycle, persists between requests) -_active_alerts: list[dict] = [] -_last_check: str | None = None - - -async def handle_check(request): - """GET /check — run all monitoring checks, update active alerts, return results. - - Designed to be called by systemd timer every 5 minutes. - Returns JSON summary of all detected issues. - """ - conn = request.app["_alerting_conn_func"]() - try: - alerts = run_all_checks(conn) - except Exception as e: - logger.error("Check failed: %s", e) - return web.json_response({"error": str(e)}, status=500) - - global _active_alerts, _last_check - _active_alerts = alerts - _last_check = datetime.now(timezone.utc).isoformat() - - # Generate failure reports for agents with stuck loops - failure_reports = {} - stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]} - for agent in stuck_agents: - report = generate_failure_report(conn, agent) - if report: - failure_reports[agent] = report - - result = { - "checked_at": _last_check, - "alert_count": len(alerts), - "critical": sum(1 for a in alerts if a["severity"] == "critical"), - "warning": sum(1 for a in alerts if a["severity"] == "warning"), - "info": sum(1 for a in alerts if a["severity"] == "info"), - "alerts": alerts, - "failure_reports": failure_reports, - } - - logger.info( - "Check complete: %d alerts (%d critical, %d warning)", - len(alerts), - result["critical"], - result["warning"], - ) - - return web.json_response(result) - - -async def handle_api_alerts(request): - """GET /api/alerts — return current active alerts. - - Query params: - severity: filter by severity (critical, warning, info) - category: filter by category (health, quality, throughput, failure_pattern) - agent: filter by agent name - domain: filter by domain - """ - alerts = list(_active_alerts) - - # Filters - severity = request.query.get("severity") - if severity: - alerts = [a for a in alerts if a["severity"] == severity] - - category = request.query.get("category") - if category: - alerts = [a for a in alerts if a["category"] == category] - - agent = request.query.get("agent") - if agent: - alerts = [a for a in alerts if a.get("agent") == agent] - - domain = request.query.get("domain") - if domain: - alerts = [a for a in alerts if a.get("domain") == domain] - - return web.json_response({ - "alerts": alerts, - "total": len(alerts), - "last_check": _last_check, - }) - - -async def handle_api_failure_report(request): - """GET /api/failure-report/{agent} — generate failure report for an agent. - - Query params: - hours: lookback window (default 24) - """ - agent = request.match_info["agent"] - hours = int(request.query.get("hours", "24")) - conn = request.app["_alerting_conn_func"]() - - report = generate_failure_report(conn, agent, hours) - if not report: - return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours}) - - return web.json_response(report) - - -def register_alerting_routes(app, get_conn_func): - """Register alerting routes on the app. - - get_conn_func: callable that returns a read-only sqlite3.Connection - """ - app["_alerting_conn_func"] = get_conn_func - app.router.add_get("/check", handle_check) - app.router.add_get("/api/alerts", handle_api_alerts) - app.router.add_get("/api/failure-report/{agent}", handle_api_failure_report) diff --git a/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md b/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md new file mode 100644 index 000000000..a420a386b --- /dev/null +++ b/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md @@ -0,0 +1,47 @@ +# Diagnostics Consolidation Diff Log +# Branch: epimetheus/consolidate-infra +# Date: 2026-04-13 + +## Files with multiple copies — resolution + +### alerting.py +- ROOT diagnostics/alerting.py (22320 bytes) — KEPT (newer: has _ALLOWED_DIM_EXPRS SQL injection protection, stricter dim_expr validation) +- ops/diagnostics/alerting.py (22039 bytes) — OVERWRITTEN (missing SQL injection guards) +- VPS /opt/teleo-eval/diagnostics/alerting.py (22039 bytes) — matches ops/ version, needs deploy + +### alerting_routes.py +- ROOT diagnostics/alerting_routes.py (4216 bytes) — KEPT (newer: proper try/finally/conn.close, ValueError catch on hours param) +- ops/diagnostics/alerting_routes.py (4043 bytes) — OVERWRITTEN (missing error handling, missing conn.close) +- VPS /opt/teleo-eval/diagnostics/alerting_routes.py (4043 bytes) — matches ops/ version, needs deploy + +### vitality.py +- ROOT diagnostics/vitality.py (25548 bytes) — KEPT (only copy in repo, larger than VPS) +- VPS /opt/teleo-eval/diagnostics/vitality.py (18539 bytes) — older version, needs deploy +- MOVED TO: ops/diagnostics/vitality.py + +### vitality_routes.py +- ROOT diagnostics/vitality_routes.py (10824 bytes) — KEPT (only copy in repo, larger than VPS) +- VPS /opt/teleo-eval/diagnostics/vitality_routes.py (9729 bytes) — older version, needs deploy +- MOVED TO: ops/diagnostics/vitality_routes.py + +## Files moved + +| From | To | Reason | +|------|-----|--------| +| diagnostics/vitality.py | ops/diagnostics/vitality.py | Consolidate to canonical location | +| diagnostics/vitality_routes.py | ops/diagnostics/vitality_routes.py | Consolidate to canonical location | +| diagnostics/alerting.py | ops/diagnostics/alerting.py | Newer version overwrites older | +| diagnostics/alerting_routes.py | ops/diagnostics/alerting_routes.py | Newer version overwrites older | + +## Root diagnostics/ after consolidation +- PATCH_INSTRUCTIONS.md — kept (documentation, not code) +- evolution.md — kept (documentation) +- weekly/2026-03-25-week3.md — kept (report) +- ops/sessions/*.json — kept (session data) +- All .py files REMOVED from root diagnostics/ + +## VPS .bak files inventory (30+ files) +All in /opt/teleo-eval/diagnostics/. Git is the backup now. Safe to delete after consolidation verified. + +## VPS deploy needed after merge +alerting.py, alerting_routes.py, vitality.py, vitality_routes.py — all local versions are newer than VPS. diff --git a/ops/diagnostics/alerting.py b/ops/diagnostics/alerting.py index 0c84ae5b4..c0dab371a 100644 --- a/ops/diagnostics/alerting.py +++ b/ops/diagnostics/alerting.py @@ -157,8 +157,17 @@ def check_quality_regression(conn: sqlite3.Connection) -> list[dict]: return alerts +_ALLOWED_DIM_EXPRS = frozenset({ + "json_extract(detail, '$.agent')", + "json_extract(detail, '$.domain')", + "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))", +}) + + def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr): - """Check approval rate regression grouped by a dimension (agent or domain).""" + """Check approval rate regression grouped by a dimension. dim_expr must be in _ALLOWED_DIM_EXPRS.""" + if dim_expr not in _ALLOWED_DIM_EXPRS: + raise ValueError(f"untrusted dim_expr: {dim_expr}") # 7-day baseline per dimension baseline_rows = conn.execute( f"""SELECT {dim_expr} as dim_val, @@ -468,7 +477,7 @@ def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 2 FROM audit_log, json_each(json_extract(detail, '$.issues')) WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') - AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) = ? + AND json_extract(detail, '$.agent') = ? AND timestamp > datetime('now', ? || ' hours') GROUP BY tag ORDER BY cnt DESC LIMIT 5""", diff --git a/ops/diagnostics/alerting_routes.py b/ops/diagnostics/alerting_routes.py index fd3574071..6e736b110 100644 --- a/ops/diagnostics/alerting_routes.py +++ b/ops/diagnostics/alerting_routes.py @@ -26,22 +26,24 @@ async def handle_check(request): conn = request.app["_alerting_conn_func"]() try: alerts = run_all_checks(conn) + + # Generate failure reports for agents with stuck loops + failure_reports = {} + stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]} + for agent in stuck_agents: + report = generate_failure_report(conn, agent) + if report: + failure_reports[agent] = report except Exception as e: logger.error("Check failed: %s", e) return web.json_response({"error": str(e)}, status=500) + finally: + conn.close() global _active_alerts, _last_check _active_alerts = alerts _last_check = datetime.now(timezone.utc).isoformat() - # Generate failure reports for agents with stuck loops - failure_reports = {} - stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]} - for agent in stuck_agents: - report = generate_failure_report(conn, agent) - if report: - failure_reports[agent] = report - result = { "checked_at": _last_check, "alert_count": len(alerts), @@ -104,10 +106,15 @@ async def handle_api_failure_report(request): hours: lookback window (default 24) """ agent = request.match_info["agent"] - hours = int(request.query.get("hours", "24")) + try: + hours = min(int(request.query.get("hours", "24")), 168) + except ValueError: + hours = 24 conn = request.app["_alerting_conn_func"]() - - report = generate_failure_report(conn, agent, hours) + try: + report = generate_failure_report(conn, agent, hours) + finally: + conn.close() if not report: return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours}) diff --git a/ops/diagnostics/vitality.py b/ops/diagnostics/vitality.py new file mode 100644 index 000000000..9eebe37f8 --- /dev/null +++ b/ops/diagnostics/vitality.py @@ -0,0 +1,629 @@ +"""Agent Vitality Diagnostics — data collection and schema. + +Records daily vitality snapshots per agent across 10 dimensions. +Designed as the objective function for agent "aliveness" ranking. + +Owner: Ship (data collection) + Argus (storage, API, dashboard) +Data sources: pipeline.db (read-only), claim-index API, agent-state filesystem, review_records + +Dimension keys (agreed with Leo 2026-04-08): + knowledge_output, knowledge_quality, contributor_engagement, + review_performance, spend_efficiency, autonomy, + infrastructure_health, social_reach, capital, external_impact +""" + +import json +import logging +import os +import sqlite3 +import urllib.request +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger("vitality") + +# Known domain agents and their primary domains +AGENT_DOMAINS = { + "rio": ["internet-finance"], + "theseus": ["collective-intelligence", "living-agents"], + "astra": ["space-development", "energy", "manufacturing", "robotics"], + "vida": ["health"], + "clay": ["entertainment", "cultural-dynamics"], + "leo": ["grand-strategy", "teleohumanity"], + "hermes": [], # communications, no domain + "rhea": [], # infrastructure ops, no domain + "ganymede": [], # code review, no domain + "epimetheus": [], # pipeline, no domain + "oberon": [], # dashboard, no domain + "argus": [], # diagnostics, no domain + "ship": [], # engineering, no domain +} + +# Agent file path prefixes — for matching claims by location, not just domain field. +# Handles claims in core/ and foundations/ that may not have a standard domain field +# in the claim-index (domain derived from directory path). +AGENT_PATHS = { + "rio": ["domains/internet-finance/"], + "theseus": ["domains/ai-alignment/", "core/living-agents/", "core/collective-intelligence/", + "foundations/collective-intelligence/"], + "astra": ["domains/space-development/", "domains/energy/", + "domains/manufacturing/", "domains/robotics/"], + "vida": ["domains/health/"], + "clay": ["domains/entertainment/", "foundations/cultural-dynamics/"], + "leo": ["core/grand-strategy/", "core/teleohumanity/", "core/mechanisms/", + "core/living-capital/", "foundations/teleological-economics/", + "foundations/critical-systems/"], +} + +ALL_AGENTS = list(AGENT_DOMAINS.keys()) + +# Agent-state directory (VPS filesystem) +AGENT_STATE_DIR = Path(os.environ.get( + "AGENT_STATE_DIR", "/opt/teleo-eval/agent-state" +)) + +MIGRATION_SQL = """ +CREATE TABLE IF NOT EXISTS vitality_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_name TEXT NOT NULL, + dimension TEXT NOT NULL, + metric TEXT NOT NULL, + value REAL NOT NULL DEFAULT 0, + unit TEXT NOT NULL DEFAULT '', + source TEXT, + recorded_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(agent_name, dimension, metric, recorded_at) +); +CREATE INDEX IF NOT EXISTS idx_vitality_agent_time + ON vitality_snapshots(agent_name, recorded_at); +CREATE INDEX IF NOT EXISTS idx_vitality_dimension + ON vitality_snapshots(dimension, recorded_at); +""" + +# Add source column if missing (idempotent upgrade from v1 schema) +UPGRADE_SQL = """ +ALTER TABLE vitality_snapshots ADD COLUMN source TEXT; +""" + + +def ensure_schema(db_path: str): + """Create vitality_snapshots table if it doesn't exist.""" + conn = sqlite3.connect(db_path, timeout=30) + try: + conn.executescript(MIGRATION_SQL) + try: + conn.execute(UPGRADE_SQL) + except sqlite3.OperationalError: + pass # column already exists + conn.commit() + logger.info("vitality_snapshots schema ensured") + finally: + conn.close() + + +def _fetch_claim_index(url: str = "http://localhost:8080/claim-index") -> dict | None: + """Fetch claim-index from pipeline health API.""" + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=10) as resp: + return json.loads(resp.read()) + except Exception as e: + logger.warning("claim-index fetch failed: %s", e) + return None + + +def _ro_conn(db_path: str) -> sqlite3.Connection: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30) + conn.row_factory = sqlite3.Row + return conn + + +# --------------------------------------------------------------------------- +# Dimension 1: knowledge_output — "How much has this agent produced?" +# --------------------------------------------------------------------------- + +def collect_knowledge_output(conn: sqlite3.Connection, agent: str) -> list[dict]: + """Claims merged, domain count, PRs submitted.""" + metrics = [] + + row = conn.execute( + "SELECT COUNT(*) as cnt FROM prs WHERE agent = ? AND status = 'merged'", + (agent,), + ).fetchone() + metrics.append({"metric": "claims_merged", "value": row["cnt"], "unit": "claims"}) + + row = conn.execute( + "SELECT COUNT(DISTINCT domain) as cnt FROM prs " + "WHERE agent = ? AND domain IS NOT NULL AND status = 'merged'", + (agent,), + ).fetchone() + metrics.append({"metric": "domains_contributed", "value": row["cnt"], "unit": "domains"}) + + row = conn.execute( + "SELECT COUNT(*) as cnt FROM prs WHERE agent = ? AND created_at > datetime('now', '-7 days')", + (agent,), + ).fetchone() + metrics.append({"metric": "prs_7d", "value": row["cnt"], "unit": "PRs"}) + + return metrics + + +# --------------------------------------------------------------------------- +# Dimension 2: knowledge_quality — "How good is the output?" +# --------------------------------------------------------------------------- + +def collect_knowledge_quality( + conn: sqlite3.Connection, claim_index: dict | None, agent: str +) -> list[dict]: + """Evidence density, challenge rate, cross-domain links, domain coverage.""" + metrics = [] + agent_domains = AGENT_DOMAINS.get(agent, []) + + # Challenge rate = challenge PRs / total PRs + rows = conn.execute( + "SELECT commit_type, COUNT(*) as cnt FROM prs " + "WHERE agent = ? AND commit_type IS NOT NULL GROUP BY commit_type", + (agent,), + ).fetchall() + total = sum(r["cnt"] for r in rows) + type_counts = {r["commit_type"]: r["cnt"] for r in rows} + challenge_rate = type_counts.get("challenge", 0) / total if total > 0 else 0 + metrics.append({"metric": "challenge_rate", "value": round(challenge_rate, 4), "unit": "ratio"}) + + # Activity breadth (distinct commit types) + metrics.append({"metric": "activity_breadth", "value": len(type_counts), "unit": "types"}) + + # Evidence density + cross-domain links from claim-index + # Match by domain field OR file path prefix (catches core/, foundations/ claims) + agent_paths = AGENT_PATHS.get(agent, []) + if claim_index and (agent_domains or agent_paths): + claims = claim_index.get("claims", []) + agent_claims = [ + c for c in claims + if c.get("domain") in agent_domains + or any(c.get("file", "").startswith(p) for p in agent_paths) + ] + total_claims = len(agent_claims) + + # Evidence density: claims with incoming links / total claims + linked = sum(1 for c in agent_claims if c.get("incoming_count", 0) > 0) + density = linked / total_claims if total_claims > 0 else 0 + metrics.append({"metric": "evidence_density", "value": round(density, 4), "unit": "ratio"}) + + # Cross-domain links + cross_domain = sum( + 1 for c in agent_claims + for link in c.get("outgoing_links", []) + if any(d in link for d in claim_index.get("domains", {}).keys() + if d not in agent_domains) + ) + metrics.append({"metric": "cross_domain_links", "value": cross_domain, "unit": "links"}) + + # Domain coverage: agent's claims / average domain size + domains_data = claim_index.get("domains", {}) + agent_claim_count = sum(domains_data.get(d, 0) for d in agent_domains) + avg_domain_size = (sum(domains_data.values()) / len(domains_data)) if domains_data else 1 + coverage = min(agent_claim_count / avg_domain_size, 1.0) if avg_domain_size > 0 else 0 + metrics.append({"metric": "domain_coverage", "value": round(coverage, 4), "unit": "ratio"}) + else: + metrics.append({"metric": "evidence_density", "value": 0, "unit": "ratio"}) + metrics.append({"metric": "cross_domain_links", "value": 0, "unit": "links"}) + metrics.append({"metric": "domain_coverage", "value": 0, "unit": "ratio"}) + + return metrics + + +# --------------------------------------------------------------------------- +# Dimension 3: contributor_engagement — "Who contributes to this agent's domain?" +# --------------------------------------------------------------------------- + +def collect_contributor_engagement(conn: sqlite3.Connection, agent: str) -> list[dict]: + """Unique submitters to this agent's domain.""" + row = conn.execute( + "SELECT COUNT(DISTINCT submitted_by) as cnt FROM prs " + "WHERE agent = ? AND submitted_by IS NOT NULL AND submitted_by != ''", + (agent,), + ).fetchone() + return [ + {"metric": "unique_submitters", "value": row["cnt"], "unit": "contributors"}, + ] + + +# --------------------------------------------------------------------------- +# Dimension 4: review_performance — "How good is the evaluator feedback loop?" +# --------------------------------------------------------------------------- + +def collect_review_performance(conn: sqlite3.Connection, agent: str) -> list[dict]: + """Approval rate, rejection reasons from review_records.""" + metrics = [] + + # Check if review_records table exists + table_check = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='review_records'" + ).fetchone() + if not table_check: + return [ + {"metric": "approval_rate", "value": 0, "unit": "ratio"}, + {"metric": "total_reviews", "value": 0, "unit": "reviews"}, + ] + + # Overall approval rate for this agent's claims (join through prs table) + row = conn.execute( + "SELECT COUNT(*) as total, " + "SUM(CASE WHEN r.outcome = 'approved' THEN 1 ELSE 0 END) as approved, " + "SUM(CASE WHEN r.outcome = 'approved-with-changes' THEN 1 ELSE 0 END) as with_changes, " + "SUM(CASE WHEN r.outcome = 'rejected' THEN 1 ELSE 0 END) as rejected " + "FROM review_records r " + "JOIN prs p ON r.pr_number = p.pr_number " + "WHERE LOWER(p.agent) = LOWER(?)", + (agent,), + ).fetchone() + total = row["total"] or 0 + approved = (row["approved"] or 0) + (row["with_changes"] or 0) + rejected = row["rejected"] or 0 + approval_rate = approved / total if total > 0 else 0 + + metrics.append({"metric": "total_reviews", "value": total, "unit": "reviews"}) + metrics.append({"metric": "approval_rate", "value": round(approval_rate, 4), "unit": "ratio"}) + metrics.append({"metric": "approved", "value": row["approved"] or 0, "unit": "reviews"}) + metrics.append({"metric": "approved_with_changes", "value": row["with_changes"] or 0, "unit": "reviews"}) + metrics.append({"metric": "rejected", "value": rejected, "unit": "reviews"}) + + # Top rejection reasons (last 30 days) + reasons = conn.execute( + "SELECT r.rejection_reason, COUNT(*) as cnt FROM review_records r " + "JOIN prs p ON r.pr_number = p.pr_number " + "WHERE LOWER(p.agent) = LOWER(?) AND r.outcome = 'rejected' " + "AND r.rejection_reason IS NOT NULL " + "AND r.review_date > datetime('now', '-30 days') " + "GROUP BY r.rejection_reason ORDER BY cnt DESC", + (agent,), + ).fetchall() + for r in reasons: + metrics.append({ + "metric": f"rejection_{r['rejection_reason']}", + "value": r["cnt"], + "unit": "rejections", + }) + + return metrics + + +# --------------------------------------------------------------------------- +# Dimension 5: spend_efficiency — "What does it cost per merged claim?" +# --------------------------------------------------------------------------- + +def collect_spend_efficiency(conn: sqlite3.Connection, agent: str) -> list[dict]: + """Cost per merged claim, total spend, response costs.""" + metrics = [] + + # Pipeline cost attributed to this agent (from prs.cost_usd) + row = conn.execute( + "SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as merged " + "FROM prs WHERE agent = ? AND status = 'merged'", + (agent,), + ).fetchone() + total_cost = row["cost"] or 0 + merged = row["merged"] or 0 + cost_per_claim = total_cost / merged if merged > 0 else 0 + + metrics.append({"metric": "total_pipeline_cost", "value": round(total_cost, 4), "unit": "USD"}) + metrics.append({"metric": "cost_per_merged_claim", "value": round(cost_per_claim, 4), "unit": "USD"}) + + # Response audit costs (Telegram bot) — per-agent + row = conn.execute( + "SELECT COALESCE(SUM(generation_cost), 0) as cost, COUNT(*) as cnt " + "FROM response_audit WHERE agent = ?", + (agent,), + ).fetchone() + metrics.append({"metric": "response_cost_total", "value": round(row["cost"], 4), "unit": "USD"}) + metrics.append({"metric": "total_responses", "value": row["cnt"], "unit": "responses"}) + + # 24h spend snapshot + row = conn.execute( + "SELECT COALESCE(SUM(generation_cost), 0) as cost " + "FROM response_audit WHERE agent = ? AND timestamp > datetime('now', '-24 hours')", + (agent,), + ).fetchone() + metrics.append({"metric": "response_cost_24h", "value": round(row["cost"], 4), "unit": "USD"}) + + return metrics + + +# --------------------------------------------------------------------------- +# Dimension 6: autonomy — "How independently does this agent act?" +# --------------------------------------------------------------------------- + +def collect_autonomy(conn: sqlite3.Connection, agent: str) -> list[dict]: + """Self-directed actions, active days.""" + metrics = [] + + # Autonomous responses in last 24h + row = conn.execute( + "SELECT COUNT(*) as cnt FROM response_audit " + "WHERE agent = ? AND timestamp > datetime('now', '-24 hours')", + (agent,), + ).fetchone() + metrics.append({"metric": "autonomous_responses_24h", "value": row["cnt"], "unit": "actions"}) + + # Active days in last 7 + row = conn.execute( + "SELECT COUNT(DISTINCT date(created_at)) as days FROM prs " + "WHERE agent = ? AND created_at > datetime('now', '-7 days')", + (agent,), + ).fetchone() + metrics.append({"metric": "active_days_7d", "value": row["days"], "unit": "days"}) + + return metrics + + +# --------------------------------------------------------------------------- +# Dimension 7: infrastructure_health — "Is the agent's machinery working?" +# --------------------------------------------------------------------------- + +def collect_infrastructure_health(conn: sqlite3.Connection, agent: str) -> list[dict]: + """Circuit breakers, PR success rate, agent-state liveness.""" + metrics = [] + + # Circuit breakers + rows = conn.execute( + "SELECT name, state FROM circuit_breakers WHERE name LIKE ?", + (f"%{agent}%",), + ).fetchall() + open_breakers = sum(1 for r in rows if r["state"] != "closed") + metrics.append({"metric": "open_circuit_breakers", "value": open_breakers, "unit": "breakers"}) + + # PR success rate last 7 days + row = conn.execute( + "SELECT COUNT(*) as total, " + "SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as merged " + "FROM prs WHERE agent = ? AND created_at > datetime('now', '-7 days')", + (agent,), + ).fetchone() + total = row["total"] + rate = row["merged"] / total if total > 0 else 0 + metrics.append({"metric": "merge_rate_7d", "value": round(rate, 4), "unit": "ratio"}) + + # Agent-state liveness (read metrics.json from filesystem) + state_file = AGENT_STATE_DIR / agent / "metrics.json" + if state_file.exists(): + try: + with open(state_file) as f: + state = json.load(f) + lifetime = state.get("lifetime", {}) + metrics.append({ + "metric": "sessions_total", + "value": lifetime.get("sessions_total", 0), + "unit": "sessions", + }) + metrics.append({ + "metric": "sessions_timeout", + "value": lifetime.get("sessions_timeout", 0), + "unit": "sessions", + }) + metrics.append({ + "metric": "sessions_error", + "value": lifetime.get("sessions_error", 0), + "unit": "sessions", + }) + except (json.JSONDecodeError, OSError) as e: + logger.warning("Failed to read agent-state for %s: %s", agent, e) + + return metrics + + +# --------------------------------------------------------------------------- +# Dimensions 8-10: Stubs (no data sources yet) +# --------------------------------------------------------------------------- + +def collect_social_reach(agent: str) -> list[dict]: + """Social dimension: stub zeros until X API accounts are active.""" + return [ + {"metric": "followers", "value": 0, "unit": "followers"}, + {"metric": "impressions_7d", "value": 0, "unit": "impressions"}, + {"metric": "engagement_rate", "value": 0, "unit": "ratio"}, + ] + + +def collect_capital(agent: str) -> list[dict]: + """Capital dimension: stub zeros until treasury/revenue tracking exists.""" + return [ + {"metric": "aum", "value": 0, "unit": "USD"}, + {"metric": "treasury", "value": 0, "unit": "USD"}, + ] + + +def collect_external_impact(agent: str) -> list[dict]: + """External impact dimension: stub zeros until manual tracking exists.""" + return [ + {"metric": "decisions_informed", "value": 0, "unit": "decisions"}, + {"metric": "deals_sourced", "value": 0, "unit": "deals"}, + ] + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + +DIMENSION_MAP = { + "knowledge_output": lambda conn, ci, agent: collect_knowledge_output(conn, agent), + "knowledge_quality": collect_knowledge_quality, + "contributor_engagement": lambda conn, ci, agent: collect_contributor_engagement(conn, agent), + "review_performance": lambda conn, ci, agent: collect_review_performance(conn, agent), + "spend_efficiency": lambda conn, ci, agent: collect_spend_efficiency(conn, agent), + "autonomy": lambda conn, ci, agent: collect_autonomy(conn, agent), + "infrastructure_health": lambda conn, ci, agent: collect_infrastructure_health(conn, agent), + "social_reach": lambda conn, ci, agent: collect_social_reach(agent), + "capital": lambda conn, ci, agent: collect_capital(agent), + "external_impact": lambda conn, ci, agent: collect_external_impact(agent), +} + + +def collect_all_for_agent( + db_path: str, + agent: str, + claim_index_url: str = "http://localhost:8080/claim-index", +) -> dict: + """Collect all 10 vitality dimensions for a single agent. + Returns {dimension: [metrics]}. + """ + claim_index = _fetch_claim_index(claim_index_url) + conn = _ro_conn(db_path) + try: + result = {} + for dim_key, collector in DIMENSION_MAP.items(): + try: + result[dim_key] = collector(conn, claim_index, agent) + except Exception as e: + logger.error("collector %s failed for %s: %s", dim_key, agent, e) + result[dim_key] = [] + return result + finally: + conn.close() + + +def collect_system_aggregate( + db_path: str, + claim_index_url: str = "http://localhost:8080/claim-index", +) -> dict: + """System-level aggregate vitality metrics.""" + claim_index = _fetch_claim_index(claim_index_url) + conn = _ro_conn(db_path) + try: + metrics = {} + + # Knowledge totals + total_claims = claim_index["total_claims"] if claim_index else 0 + orphan_ratio = claim_index.get("orphan_ratio", 0) if claim_index else 0 + domain_count = len(claim_index.get("domains", {})) if claim_index else 0 + + metrics["knowledge_output"] = [ + {"metric": "total_claims", "value": total_claims, "unit": "claims"}, + {"metric": "total_domains", "value": domain_count, "unit": "domains"}, + {"metric": "orphan_ratio", "value": round(orphan_ratio, 4), "unit": "ratio"}, + ] + + # Cross-domain citation rate + if claim_index: + claims = claim_index.get("claims", []) + total_links = sum(c.get("outgoing_count", 0) for c in claims) + cross_domain = 0 + for c in claims: + src_domain = c.get("domain") + for link in c.get("outgoing_links", []): + linked_claims = [ + x for x in claims + if x.get("stem") in link or x.get("file", "").endswith(link + ".md") + ] + for lc in linked_claims: + if lc.get("domain") != src_domain: + cross_domain += 1 + metrics["knowledge_quality"] = [ + {"metric": "cross_domain_citation_rate", + "value": round(cross_domain / max(total_links, 1), 4), + "unit": "ratio"}, + ] + + # Pipeline throughput + row = conn.execute( + "SELECT COUNT(*) as merged FROM prs " + "WHERE status='merged' AND merged_at > datetime('now', '-24 hours')" + ).fetchone() + row2 = conn.execute("SELECT COUNT(*) as total FROM sources").fetchone() + row3 = conn.execute( + "SELECT COUNT(*) as pending FROM prs " + "WHERE status NOT IN ('merged','rejected','closed')" + ).fetchone() + + metrics["infrastructure_health"] = [ + {"metric": "prs_merged_24h", "value": row["merged"], "unit": "PRs/day"}, + {"metric": "total_sources", "value": row2["total"], "unit": "sources"}, + {"metric": "queue_depth", "value": row3["pending"], "unit": "PRs"}, + ] + + # Total spend + row = conn.execute( + "SELECT COALESCE(SUM(cost_usd), 0) as cost " + "FROM costs WHERE date > date('now', '-1 day')" + ).fetchone() + row2 = conn.execute( + "SELECT COALESCE(SUM(generation_cost), 0) as cost FROM response_audit " + "WHERE timestamp > datetime('now', '-24 hours')" + ).fetchone() + metrics["spend_efficiency"] = [ + {"metric": "pipeline_cost_24h", "value": round(row["cost"], 4), "unit": "USD"}, + {"metric": "response_cost_24h", "value": round(row2["cost"], 4), "unit": "USD"}, + {"metric": "total_cost_24h", + "value": round(row["cost"] + row2["cost"], 4), "unit": "USD"}, + ] + + # Stubs + metrics["social_reach"] = [{"metric": "total_followers", "value": 0, "unit": "followers"}] + metrics["capital"] = [{"metric": "total_aum", "value": 0, "unit": "USD"}] + + return metrics + finally: + conn.close() + + +def record_snapshot( + db_path: str, + claim_index_url: str = "http://localhost:8080/claim-index", +): + """Run a full vitality snapshot — one row per agent per dimension per metric.""" + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + rows = [] + + # Per-agent snapshots + for agent in ALL_AGENTS: + try: + dimensions = collect_all_for_agent(db_path, agent, claim_index_url) + for dim_name, metrics in dimensions.items(): + collector_name = f"{dim_name}_collector" + for m in metrics: + rows.append(( + agent, dim_name, m["metric"], m["value"], + m["unit"], collector_name, now, + )) + except Exception as e: + logger.error("vitality collection failed for %s: %s", agent, e) + + # System aggregate + try: + system = collect_system_aggregate(db_path, claim_index_url) + for dim_name, metrics in system.items(): + for m in metrics: + rows.append(( + "_system", dim_name, m["metric"], m["value"], + m["unit"], "system_aggregate", now, + )) + except Exception as e: + logger.error("vitality system aggregate failed: %s", e) + + # Write all rows + ensure_schema(db_path) + conn = sqlite3.connect(db_path, timeout=30) + try: + conn.executemany( + "INSERT OR REPLACE INTO vitality_snapshots " + "(agent_name, dimension, metric, value, unit, source, recorded_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + rows, + ) + conn.commit() + logger.info( + "vitality snapshot recorded: %d rows for %d agents + system", + len(rows), len(ALL_AGENTS), + ) + return {"rows_written": len(rows), "agents": len(ALL_AGENTS), "recorded_at": now} + finally: + conn.close() + + +if __name__ == "__main__": + """CLI: python3 vitality.py [db_path] — runs a snapshot.""" + import sys + logging.basicConfig(level=logging.INFO) + db = sys.argv[1] if len(sys.argv) > 1 else "/opt/teleo-eval/pipeline/pipeline.db" + result = record_snapshot(db) + print(json.dumps(result, indent=2)) diff --git a/ops/diagnostics/vitality_routes.py b/ops/diagnostics/vitality_routes.py new file mode 100644 index 000000000..f2799a13c --- /dev/null +++ b/ops/diagnostics/vitality_routes.py @@ -0,0 +1,293 @@ +"""Vitality API routes for Argus diagnostics dashboard. + +Endpoints: + GET /api/vitality — latest snapshot + time-series for all agents or one + GET /api/vitality/snapshot — trigger a new snapshot (POST-like via GET for cron curl) + GET /api/vitality/leaderboard — agents ranked by composite vitality score + +Owner: Argus +""" + +import json +import logging +import sqlite3 +from pathlib import Path + +from aiohttp import web + +from vitality import ( + ALL_AGENTS, + MIGRATION_SQL, + collect_all_for_agent, + collect_system_aggregate, + record_snapshot, +) + +logger = logging.getLogger("argus.vitality") + +# Composite vitality weights — Leo-approved 2026-04-08 +# Dimension keys match Ship's refactored vitality.py DIMENSION_MAP +VITALITY_WEIGHTS = { + "knowledge_output": 0.30, # primary output — highest weight + "knowledge_quality": 0.20, # was "diversity" — quality of output + "contributor_engagement": 0.15, # attracting external contributors + "review_performance": 0.00, # new dim, zero until review_records populated + "autonomy": 0.15, # independent action + "infrastructure_health": 0.05, # machinery working + "spend_efficiency": 0.05, # cost discipline + "social_reach": 0.00, # zero until accounts active + "capital": 0.00, # zero until treasury exists + "external_impact": 0.00, # zero until measurable +} + +# Public paths (no auth required) +VITALITY_PUBLIC_PATHS = frozenset({ + "/api/vitality", + "/api/vitality/snapshot", + "/api/vitality/leaderboard", +}) + + +def _ro_conn(db_path: str) -> sqlite3.Connection: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30) + conn.row_factory = sqlite3.Row + return conn + + +async def handle_vitality(request: web.Request) -> web.Response: + """GET /api/vitality?agent=&days=7 + + Returns latest snapshot and time-series data. + If agent is specified, returns that agent only. Otherwise returns all. + """ + db_path = request.app["db_path"] + agent = request.query.get("agent") + try: + days = min(int(request.query.get("days", "7")), 90) + except ValueError: + days = 7 + + conn = _ro_conn(db_path) + try: + # Check if table exists + table_check = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'" + ).fetchone() + if not table_check: + return web.json_response({ + "error": "No vitality data yet. Trigger a snapshot first via /api/vitality/snapshot", + "has_data": False + }) + + # Latest snapshot timestamp + latest = conn.execute( + "SELECT MAX(recorded_at) as ts FROM vitality_snapshots" + ).fetchone() + latest_ts = latest["ts"] if latest else None + + if not latest_ts: + return web.json_response({"has_data": False}) + + # Latest snapshot data + if agent: + agents_filter = [agent] + else: + agents_filter = ALL_AGENTS + ["_system"] + + result = {"latest_snapshot": latest_ts, "agents": {}} + + for a in agents_filter: + rows = conn.execute( + "SELECT dimension, metric, value, unit FROM vitality_snapshots " + "WHERE agent_name = ? AND recorded_at = ?", + (a, latest_ts) + ).fetchall() + + if not rows: + continue + + dimensions = {} + for r in rows: + dim = r["dimension"] + if dim not in dimensions: + dimensions[dim] = [] + dimensions[dim].append({ + "metric": r["metric"], + "value": r["value"], + "unit": r["unit"], + }) + result["agents"][a] = dimensions + + # Time-series for trend charts (one data point per snapshot) + ts_query_agent = agent if agent else "_system" + ts_rows = conn.execute( + "SELECT recorded_at, dimension, metric, value " + "FROM vitality_snapshots " + "WHERE agent_name = ? AND recorded_at > datetime('now', ?)" + "ORDER BY recorded_at", + (ts_query_agent, f"-{days} days") + ).fetchall() + + time_series = {} + for r in ts_rows: + key = f"{r['dimension']}.{r['metric']}" + if key not in time_series: + time_series[key] = [] + time_series[key].append({ + "t": r["recorded_at"], + "v": r["value"], + }) + result["time_series"] = time_series + result["has_data"] = True + + return web.json_response(result) + finally: + conn.close() + + +async def handle_vitality_snapshot(request: web.Request) -> web.Response: + """GET /api/vitality/snapshot — trigger a new snapshot collection. + + Used by cron: curl http://localhost:8081/api/vitality/snapshot + Requires ?confirm=1 to prevent accidental triggers from crawlers/prefetch. + """ + if request.query.get("confirm") != "1": + return web.json_response( + {"status": "noop", "error": "Add ?confirm=1 to trigger a snapshot write"}, + status=400, + ) + db_path = request.app["db_path"] + claim_index_url = request.app.get("claim_index_url", "http://localhost:8080/claim-index") + + try: + result = record_snapshot(db_path, claim_index_url) + return web.json_response({"status": "ok", **result}) + except Exception as e: + logger.error("vitality snapshot failed: %s", e) + return web.json_response({"status": "error", "error": str(e)}, status=500) + + +async def handle_vitality_leaderboard(request: web.Request) -> web.Response: + """GET /api/vitality/leaderboard — agents ranked by composite vitality score. + + Scoring approach: + - Each dimension gets a 0-1 normalized score based on the metric values + - Weighted sum produces composite score + - Agents ranked by composite score descending + """ + db_path = request.app["db_path"] + conn = _ro_conn(db_path) + try: + table_check = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'" + ).fetchone() + if not table_check: + return web.json_response({"error": "No vitality data yet", "has_data": False}) + + latest = conn.execute( + "SELECT MAX(recorded_at) as ts FROM vitality_snapshots" + ).fetchone() + if not latest or not latest["ts"]: + return web.json_response({"has_data": False}) + + latest_ts = latest["ts"] + + # Collect all agents' latest data + agent_scores = [] + for agent in ALL_AGENTS: + rows = conn.execute( + "SELECT dimension, metric, value FROM vitality_snapshots " + "WHERE agent_name = ? AND recorded_at = ?", + (agent, latest_ts) + ).fetchall() + if not rows: + continue + + dims = {} + for r in rows: + dim = r["dimension"] + if dim not in dims: + dims[dim] = {} + dims[dim][r["metric"]] = r["value"] + + # Normalize each dimension to 0-1 + # Dimension keys match Ship's refactored vitality.py DIMENSION_MAP + dim_scores = {} + + # knowledge_output: claims_merged (cap at 100 = 1.0) + ko = dims.get("knowledge_output", {}) + claims = ko.get("claims_merged", 0) + dim_scores["knowledge_output"] = min(claims / 100, 1.0) + + # knowledge_quality: challenge_rate + breadth + evidence_density + domain_coverage + kq = dims.get("knowledge_quality", {}) + cr = kq.get("challenge_rate", 0) + breadth = kq.get("activity_breadth", 0) + evidence = kq.get("evidence_density", 0) + coverage = kq.get("domain_coverage", 0) + dim_scores["knowledge_quality"] = min( + (cr / 0.1 * 0.2 + breadth / 4 * 0.2 + evidence * 0.3 + coverage * 0.3), 1.0 + ) + + # contributor_engagement: unique_submitters (cap at 5 = 1.0) + ce = dims.get("contributor_engagement", {}) + dim_scores["contributor_engagement"] = min(ce.get("unique_submitters", 0) / 5, 1.0) + + # review_performance: approval_rate from review_records (0 until populated) + rp = dims.get("review_performance", {}) + dim_scores["review_performance"] = rp.get("approval_rate", 0) + + # autonomy: active_days_7d (7 = 1.0) + am = dims.get("autonomy", {}) + dim_scores["autonomy"] = min(am.get("active_days_7d", 0) / 7, 1.0) + + # infrastructure_health: merge_rate_7d directly (already 0-1) + ih = dims.get("infrastructure_health", {}) + dim_scores["infrastructure_health"] = ih.get("merge_rate_7d", 0) + + # spend_efficiency: inverted — lower cost per claim is better + se = dims.get("spend_efficiency", {}) + daily_cost = se.get("response_cost_24h", 0) + dim_scores["spend_efficiency"] = max(1.0 - daily_cost / 10.0, 0) + + # Social/Capital/External: stubbed at 0 + dim_scores["social_reach"] = 0 + dim_scores["capital"] = 0 + dim_scores["external_impact"] = 0 + + # Composite weighted score + composite = sum( + dim_scores.get(dim, 0) * weight + for dim, weight in VITALITY_WEIGHTS.items() + ) + + agent_scores.append({ + "agent": agent, + "composite_score": round(composite, 4), + "dimension_scores": {k: round(v, 4) for k, v in dim_scores.items()}, + "raw_highlights": { + "claims_merged": int(claims), + "merge_rate": round(ih.get("merge_rate_7d", 0) * 100, 1), + "active_days": int(am.get("active_days_7d", 0)), + "challenge_rate": round(cr * 100, 1), + "evidence_density": round(evidence * 100, 1), + }, + }) + + # Sort by composite score descending + agent_scores.sort(key=lambda x: x["composite_score"], reverse=True) + + return web.json_response({ + "has_data": True, + "snapshot_at": latest_ts, + "leaderboard": agent_scores, + }) + finally: + conn.close() + + +def register_vitality_routes(app: web.Application): + """Register vitality endpoints on the aiohttp app.""" + app.router.add_get("/api/vitality", handle_vitality) + app.router.add_get("/api/vitality/snapshot", handle_vitality_snapshot) + app.router.add_get("/api/vitality/leaderboard", handle_vitality_leaderboard)