"""Vitality API routes for Argus diagnostics dashboard. Endpoints: GET /api/vitality — latest snapshot + time-series for all agents or one GET /api/vitality/snapshot — trigger a new snapshot (POST-like via GET for cron curl) GET /api/vitality/leaderboard — agents ranked by composite vitality score Owner: Argus """ import json import logging import sqlite3 from pathlib import Path from aiohttp import web from vitality import ( ALL_AGENTS, MIGRATION_SQL, collect_all_for_agent, collect_system_aggregate, record_snapshot, ) logger = logging.getLogger("argus.vitality") # Composite vitality weights — Leo-approved 2026-04-08 # Dimension keys match Ship's refactored vitality.py DIMENSION_MAP VITALITY_WEIGHTS = { "knowledge_output": 0.30, # primary output — highest weight "knowledge_quality": 0.20, # was "diversity" — quality of output "contributor_engagement": 0.15, # attracting external contributors "review_performance": 0.00, # new dim, zero until review_records populated "autonomy": 0.15, # independent action "infrastructure_health": 0.05, # machinery working "spend_efficiency": 0.05, # cost discipline "social_reach": 0.00, # zero until accounts active "capital": 0.00, # zero until treasury exists "external_impact": 0.00, # zero until measurable } # Public paths (no auth required) VITALITY_PUBLIC_PATHS = frozenset({ "/api/vitality", "/api/vitality/snapshot", "/api/vitality/leaderboard", }) def _ro_conn(db_path: str) -> sqlite3.Connection: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30) conn.row_factory = sqlite3.Row return conn async def handle_vitality(request: web.Request) -> web.Response: """GET /api/vitality?agent=&days=7 Returns latest snapshot and time-series data. If agent is specified, returns that agent only. Otherwise returns all. """ db_path = request.app["db_path"] agent = request.query.get("agent") try: days = min(int(request.query.get("days", "7")), 90) except ValueError: days = 7 conn = _ro_conn(db_path) try: # Check if table exists table_check = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'" ).fetchone() if not table_check: return web.json_response({ "error": "No vitality data yet. Trigger a snapshot first via /api/vitality/snapshot", "has_data": False }) # Latest snapshot timestamp latest = conn.execute( "SELECT MAX(recorded_at) as ts FROM vitality_snapshots" ).fetchone() latest_ts = latest["ts"] if latest else None if not latest_ts: return web.json_response({"has_data": False}) # Latest snapshot data if agent: agents_filter = [agent] else: agents_filter = ALL_AGENTS + ["_system"] result = {"latest_snapshot": latest_ts, "agents": {}} for a in agents_filter: rows = conn.execute( "SELECT dimension, metric, value, unit FROM vitality_snapshots " "WHERE agent_name = ? AND recorded_at = ?", (a, latest_ts) ).fetchall() if not rows: continue dimensions = {} for r in rows: dim = r["dimension"] if dim not in dimensions: dimensions[dim] = [] dimensions[dim].append({ "metric": r["metric"], "value": r["value"], "unit": r["unit"], }) result["agents"][a] = dimensions # Time-series for trend charts (one data point per snapshot) ts_query_agent = agent if agent else "_system" ts_rows = conn.execute( "SELECT recorded_at, dimension, metric, value " "FROM vitality_snapshots " "WHERE agent_name = ? AND recorded_at > datetime('now', ?)" "ORDER BY recorded_at", (ts_query_agent, f"-{days} days") ).fetchall() time_series = {} for r in ts_rows: key = f"{r['dimension']}.{r['metric']}" if key not in time_series: time_series[key] = [] time_series[key].append({ "t": r["recorded_at"], "v": r["value"], }) result["time_series"] = time_series result["has_data"] = True return web.json_response(result) finally: conn.close() async def handle_vitality_snapshot(request: web.Request) -> web.Response: """GET /api/vitality/snapshot — trigger a new snapshot collection. Used by cron: curl http://localhost:8081/api/vitality/snapshot Requires ?confirm=1 to prevent accidental triggers from crawlers/prefetch. """ if request.query.get("confirm") != "1": return web.json_response( {"status": "noop", "error": "Add ?confirm=1 to trigger a snapshot write"}, status=400, ) db_path = request.app["db_path"] claim_index_url = request.app.get("claim_index_url", "http://localhost:8080/claim-index") try: result = record_snapshot(db_path, claim_index_url) return web.json_response({"status": "ok", **result}) except Exception as e: logger.error("vitality snapshot failed: %s", e) return web.json_response({"status": "error", "error": str(e)}, status=500) async def handle_vitality_leaderboard(request: web.Request) -> web.Response: """GET /api/vitality/leaderboard — agents ranked by composite vitality score. Scoring approach: - Each dimension gets a 0-1 normalized score based on the metric values - Weighted sum produces composite score - Agents ranked by composite score descending """ db_path = request.app["db_path"] conn = _ro_conn(db_path) try: table_check = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'" ).fetchone() if not table_check: return web.json_response({"error": "No vitality data yet", "has_data": False}) latest = conn.execute( "SELECT MAX(recorded_at) as ts FROM vitality_snapshots" ).fetchone() if not latest or not latest["ts"]: return web.json_response({"has_data": False}) latest_ts = latest["ts"] # Collect all agents' latest data agent_scores = [] for agent in ALL_AGENTS: rows = conn.execute( "SELECT dimension, metric, value FROM vitality_snapshots " "WHERE agent_name = ? AND recorded_at = ?", (agent, latest_ts) ).fetchall() if not rows: continue dims = {} for r in rows: dim = r["dimension"] if dim not in dims: dims[dim] = {} dims[dim][r["metric"]] = r["value"] # Normalize each dimension to 0-1 # Dimension keys match Ship's refactored vitality.py DIMENSION_MAP dim_scores = {} # knowledge_output: claims_merged (cap at 100 = 1.0) ko = dims.get("knowledge_output", {}) claims = ko.get("claims_merged", 0) dim_scores["knowledge_output"] = min(claims / 100, 1.0) # knowledge_quality: challenge_rate + breadth + evidence_density + domain_coverage kq = dims.get("knowledge_quality", {}) cr = kq.get("challenge_rate", 0) breadth = kq.get("activity_breadth", 0) evidence = kq.get("evidence_density", 0) coverage = kq.get("domain_coverage", 0) dim_scores["knowledge_quality"] = min( (cr / 0.1 * 0.2 + breadth / 4 * 0.2 + evidence * 0.3 + coverage * 0.3), 1.0 ) # contributor_engagement: unique_submitters (cap at 5 = 1.0) ce = dims.get("contributor_engagement", {}) dim_scores["contributor_engagement"] = min(ce.get("unique_submitters", 0) / 5, 1.0) # review_performance: approval_rate from review_records (0 until populated) rp = dims.get("review_performance", {}) dim_scores["review_performance"] = rp.get("approval_rate", 0) # autonomy: active_days_7d (7 = 1.0) am = dims.get("autonomy", {}) dim_scores["autonomy"] = min(am.get("active_days_7d", 0) / 7, 1.0) # infrastructure_health: merge_rate_7d directly (already 0-1) ih = dims.get("infrastructure_health", {}) dim_scores["infrastructure_health"] = ih.get("merge_rate_7d", 0) # spend_efficiency: inverted — lower cost per claim is better se = dims.get("spend_efficiency", {}) daily_cost = se.get("response_cost_24h", 0) dim_scores["spend_efficiency"] = max(1.0 - daily_cost / 10.0, 0) # Social/Capital/External: stubbed at 0 dim_scores["social_reach"] = 0 dim_scores["capital"] = 0 dim_scores["external_impact"] = 0 # Composite weighted score composite = sum( dim_scores.get(dim, 0) * weight for dim, weight in VITALITY_WEIGHTS.items() ) agent_scores.append({ "agent": agent, "composite_score": round(composite, 4), "dimension_scores": {k: round(v, 4) for k, v in dim_scores.items()}, "raw_highlights": { "claims_merged": int(claims), "merge_rate": round(ih.get("merge_rate_7d", 0) * 100, 1), "active_days": int(am.get("active_days_7d", 0)), "challenge_rate": round(cr * 100, 1), "evidence_density": round(evidence * 100, 1), }, }) # Sort by composite score descending agent_scores.sort(key=lambda x: x["composite_score"], reverse=True) return web.json_response({ "has_data": True, "snapshot_at": latest_ts, "leaderboard": agent_scores, }) finally: conn.close() def register_vitality_routes(app: web.Application): """Register vitality endpoints on the aiohttp app.""" app.router.add_get("/api/vitality", handle_vitality) app.router.add_get("/api/vitality/snapshot", handle_vitality_snapshot) app.router.add_get("/api/vitality/leaderboard", handle_vitality_leaderboard)