Compare commits

...

2 commits

Author SHA1 Message Date
380ebd9124 Add research_tracking.py to diagnostics (Phase 1 consolidation)
Argus's research lifecycle tracking module. Was in root diagnostics/
only — missing from both repos. Completes Phase 1 file inventory.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 10:15:27 +02:00
c0cc4ef090 Add vitality modules + upgrade alerting with SQL injection protection
- vitality.py (25K): 10-dimension vitality scoring (Ship + Argus, Leo-approved)
- vitality_routes.py (10K): Dashboard routes for vitality endpoints
- alerting.py: Updated with _ALLOWED_DIM_EXPRS SQL injection protection, stricter dim_expr validation
- alerting_routes.py: Added proper try/finally/conn.close, ValueError catch on hours param
- Diff log documenting multi-copy resolution decisions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 10:12:53 +02:00
6 changed files with 1417 additions and 13 deletions

View file

@ -0,0 +1,47 @@
# Diagnostics Consolidation Diff Log
# Branch: epimetheus/consolidate-infra
# Date: 2026-04-13
## Files with multiple copies — resolution
### alerting.py
- ROOT diagnostics/alerting.py (22320 bytes) — KEPT (newer: has _ALLOWED_DIM_EXPRS SQL injection protection, stricter dim_expr validation)
- ops/diagnostics/alerting.py (22039 bytes) — OVERWRITTEN (missing SQL injection guards)
- VPS /opt/teleo-eval/diagnostics/alerting.py (22039 bytes) — matches ops/ version, needs deploy
### alerting_routes.py
- ROOT diagnostics/alerting_routes.py (4216 bytes) — KEPT (newer: proper try/finally/conn.close, ValueError catch on hours param)
- ops/diagnostics/alerting_routes.py (4043 bytes) — OVERWRITTEN (missing error handling, missing conn.close)
- VPS /opt/teleo-eval/diagnostics/alerting_routes.py (4043 bytes) — matches ops/ version, needs deploy
### vitality.py
- ROOT diagnostics/vitality.py (25548 bytes) — KEPT (only copy in repo, larger than VPS)
- VPS /opt/teleo-eval/diagnostics/vitality.py (18539 bytes) — older version, needs deploy
- MOVED TO: ops/diagnostics/vitality.py
### vitality_routes.py
- ROOT diagnostics/vitality_routes.py (10824 bytes) — KEPT (only copy in repo, larger than VPS)
- VPS /opt/teleo-eval/diagnostics/vitality_routes.py (9729 bytes) — older version, needs deploy
- MOVED TO: ops/diagnostics/vitality_routes.py
## Files moved
| From | To | Reason |
|------|-----|--------|
| diagnostics/vitality.py | ops/diagnostics/vitality.py | Consolidate to canonical location |
| diagnostics/vitality_routes.py | ops/diagnostics/vitality_routes.py | Consolidate to canonical location |
| diagnostics/alerting.py | ops/diagnostics/alerting.py | Newer version overwrites older |
| diagnostics/alerting_routes.py | ops/diagnostics/alerting_routes.py | Newer version overwrites older |
## Root diagnostics/ after consolidation
- PATCH_INSTRUCTIONS.md — kept (documentation, not code)
- evolution.md — kept (documentation)
- weekly/2026-03-25-week3.md — kept (report)
- ops/sessions/*.json — kept (session data)
- All .py files REMOVED from root diagnostics/
## VPS .bak files inventory (30+ files)
All in /opt/teleo-eval/diagnostics/. Git is the backup now. Safe to delete after consolidation verified.
## VPS deploy needed after merge
alerting.py, alerting_routes.py, vitality.py, vitality_routes.py — all local versions are newer than VPS.

View file

@ -157,8 +157,17 @@ def check_quality_regression(conn: sqlite3.Connection) -> list[dict]:
return alerts return alerts
_ALLOWED_DIM_EXPRS = frozenset({
"json_extract(detail, '$.agent')",
"json_extract(detail, '$.domain')",
"COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))",
})
def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr): def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr):
"""Check approval rate regression grouped by a dimension (agent or domain).""" """Check approval rate regression grouped by a dimension. dim_expr must be in _ALLOWED_DIM_EXPRS."""
if dim_expr not in _ALLOWED_DIM_EXPRS:
raise ValueError(f"untrusted dim_expr: {dim_expr}")
# 7-day baseline per dimension # 7-day baseline per dimension
baseline_rows = conn.execute( baseline_rows = conn.execute(
f"""SELECT {dim_expr} as dim_val, f"""SELECT {dim_expr} as dim_val,
@ -468,7 +477,7 @@ def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 2
FROM audit_log, json_each(json_extract(detail, '$.issues')) FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate' WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) = ? AND json_extract(detail, '$.agent') = ?
AND timestamp > datetime('now', ? || ' hours') AND timestamp > datetime('now', ? || ' hours')
GROUP BY tag ORDER BY cnt DESC GROUP BY tag ORDER BY cnt DESC
LIMIT 5""", LIMIT 5""",

View file

@ -26,13 +26,6 @@ async def handle_check(request):
conn = request.app["_alerting_conn_func"]() conn = request.app["_alerting_conn_func"]()
try: try:
alerts = run_all_checks(conn) alerts = run_all_checks(conn)
except Exception as e:
logger.error("Check failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
global _active_alerts, _last_check
_active_alerts = alerts
_last_check = datetime.now(timezone.utc).isoformat()
# Generate failure reports for agents with stuck loops # Generate failure reports for agents with stuck loops
failure_reports = {} failure_reports = {}
@ -41,6 +34,15 @@ async def handle_check(request):
report = generate_failure_report(conn, agent) report = generate_failure_report(conn, agent)
if report: if report:
failure_reports[agent] = report failure_reports[agent] = report
except Exception as e:
logger.error("Check failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
finally:
conn.close()
global _active_alerts, _last_check
_active_alerts = alerts
_last_check = datetime.now(timezone.utc).isoformat()
result = { result = {
"checked_at": _last_check, "checked_at": _last_check,
@ -104,10 +106,15 @@ async def handle_api_failure_report(request):
hours: lookback window (default 24) hours: lookback window (default 24)
""" """
agent = request.match_info["agent"] agent = request.match_info["agent"]
hours = int(request.query.get("hours", "24")) try:
hours = min(int(request.query.get("hours", "24")), 168)
except ValueError:
hours = 24
conn = request.app["_alerting_conn_func"]() conn = request.app["_alerting_conn_func"]()
try:
report = generate_failure_report(conn, agent, hours) report = generate_failure_report(conn, agent, hours)
finally:
conn.close()
if not report: if not report:
return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours}) return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours})

View file

@ -0,0 +1,419 @@
"""Research session tracking + cost attribution for the Teleo pipeline.
This module adds three capabilities:
1. research_sessions table tracks WHY agents researched, what they found interesting,
session cost, and links to generated sources
2. Extraction cost attribution writes per-source cost to sources.cost_usd after extraction
3. Source claim linkage ensures prs.source_path is always populated
Designed for Epimetheus to integrate into the pipeline. Argus built the spec;
Ganymede reviews; Epimetheus wires it in.
Data flow:
Agent research session research_sessions row (with reasoning + summary)
sources created (with session_id FK)
extraction runs (cost written to sources.cost_usd + costs table)
PRs created (source_path populated)
claims merged (traceable back to session)
"""
import json
import logging
import sqlite3
from datetime import datetime
from typing import Optional
logger = logging.getLogger("research_tracking")
# ---------------------------------------------------------------------------
# Migration v11: research_sessions table + sources.session_id FK
# (v9 is current; v10 is Epimetheus's eval pipeline migration)
# ---------------------------------------------------------------------------
MIGRATION_V11_SQL = """
-- Research session tracking table
CREATE TABLE IF NOT EXISTS research_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
-- Which agent ran the research (leo, rio, astra, etc.)
domain TEXT,
-- Primary domain of the research
topic TEXT NOT NULL,
-- What they researched (short description)
reasoning TEXT,
-- WHY they chose this topic (agent's own explanation)
summary TEXT,
-- What they found most interesting/relevant
sources_planned INTEGER DEFAULT 0,
-- How many sources they intended to produce
sources_produced INTEGER DEFAULT 0,
-- How many actually materialized
model TEXT,
-- Model used for research (e.g. claude-opus-4-6)
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
-- Total research session cost (LLM calls for discovery + writing)
status TEXT DEFAULT 'running',
-- running, completed, failed, partial
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
metadata TEXT DEFAULT '{}'
-- JSON: any extra context (prompt version, search queries used, etc.)
);
CREATE INDEX IF NOT EXISTS idx_rs_agent ON research_sessions(agent);
CREATE INDEX IF NOT EXISTS idx_rs_domain ON research_sessions(domain);
CREATE INDEX IF NOT EXISTS idx_rs_started ON research_sessions(started_at);
-- Add session_id FK to sources table
ALTER TABLE sources ADD COLUMN session_id INTEGER REFERENCES research_sessions(id);
CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
-- Record migration
INSERT INTO schema_version (version) VALUES (11);
"""
# ---------------------------------------------------------------------------
# Cost attribution: write extraction cost to sources.cost_usd
# ---------------------------------------------------------------------------
# Pricing per million tokens (as of March 2026)
MODEL_PRICING = {
"anthropic/claude-sonnet-4.5": {"input": 3.00, "output": 15.00},
"anthropic/claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"anthropic/claude-haiku-4.5": {"input": 0.80, "output": 4.00},
"anthropic/claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"minimax/minimax-m2.5": {"input": 0.14, "output": 0.56},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate USD cost from model name and token counts."""
pricing = MODEL_PRICING.get(model)
if not pricing:
# Default to Sonnet 4.5 pricing as conservative estimate
logger.warning("Unknown model %s — using Sonnet 4.5 pricing", model)
pricing = {"input": 3.00, "output": 15.00}
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
def record_extraction_cost(
conn: sqlite3.Connection,
source_path: str,
model: str,
input_tokens: int,
output_tokens: int,
):
"""Write extraction cost to both sources.cost_usd and costs table.
Call this after each successful extraction call in openrouter-extract-v2.py.
This is the missing link the CSV logger records tokens but never writes
cost back to the DB.
"""
cost = calculate_cost(model, input_tokens, output_tokens)
# Update source row
conn.execute(
"UPDATE sources SET cost_usd = cost_usd + ?, extraction_model = ? WHERE path = ?",
(cost, model, source_path),
)
# Also record in costs table for dashboard aggregation
date = datetime.utcnow().strftime("%Y-%m-%d")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, input_tokens, output_tokens, cost),
)
conn.commit()
logger.info(
"Recorded extraction cost for %s: $%.4f (%d in, %d out, %s)",
source_path, cost, input_tokens, output_tokens, model,
)
return cost
# ---------------------------------------------------------------------------
# Research session lifecycle
# ---------------------------------------------------------------------------
def start_session(
conn: sqlite3.Connection,
agent: str,
topic: str,
domain: Optional[str] = None,
reasoning: Optional[str] = None,
sources_planned: int = 0,
model: Optional[str] = None,
metadata: Optional[dict] = None,
) -> int:
"""Call at the START of a research session. Returns session_id.
The agent should call this before it begins producing sources,
explaining what it plans to research and why.
"""
cur = conn.execute(
"""INSERT INTO research_sessions
(agent, domain, topic, reasoning, sources_planned, model, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
agent,
domain,
topic,
reasoning,
sources_planned,
model,
json.dumps(metadata or {}),
),
)
conn.commit()
session_id = cur.lastrowid
logger.info("Started research session #%d: %s / %s", session_id, agent, topic)
return session_id
def link_source_to_session(
conn: sqlite3.Connection,
source_path: str,
session_id: int,
):
"""Link a source file to its research session.
Call this when a source is written to inbox/ during a research session.
"""
conn.execute(
"UPDATE sources SET session_id = ? WHERE path = ?",
(session_id, source_path),
)
conn.execute(
"""UPDATE research_sessions
SET sources_produced = sources_produced + 1
WHERE id = ?""",
(session_id,),
)
conn.commit()
def complete_session(
conn: sqlite3.Connection,
session_id: int,
summary: str,
input_tokens: int = 0,
output_tokens: int = 0,
cost_usd: float = 0,
status: str = "completed",
):
"""Call at the END of a research session.
The agent should summarize what it found most interesting/relevant.
Cost should include ALL LLM calls made during the session (web search,
analysis, source writing everything).
"""
conn.execute(
"""UPDATE research_sessions
SET summary = ?, input_tokens = ?, output_tokens = ?,
cost_usd = ?, status = ?, completed_at = datetime('now')
WHERE id = ?""",
(summary, input_tokens, output_tokens, cost_usd, status, session_id),
)
conn.commit()
logger.info("Completed research session #%d: %s", session_id, status)
# ---------------------------------------------------------------------------
# Source → PR linkage fix
# ---------------------------------------------------------------------------
def ensure_source_path_on_pr(
conn: sqlite3.Connection,
pr_number: int,
source_path: str,
):
"""Ensure prs.source_path is populated. Call during PR creation.
Currently 0/1451 PRs have source_path set. This is the fix.
"""
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ? AND (source_path IS NULL OR source_path = '')",
(source_path, pr_number),
)
conn.commit()
# ---------------------------------------------------------------------------
# Backfill: attribute extraction costs from existing CSV log
# ---------------------------------------------------------------------------
def backfill_extraction_costs(conn: sqlite3.Connection, csv_path: str):
"""One-time backfill: read openrouter-usage.csv and write costs to sources + costs tables.
Run once to fill in the ~$338 of extraction costs that were logged to CSV
but never written to the database.
Safe to re-run only updates sources where cost_usd = 0, so partial
runs can be resumed without double-counting.
"""
import csv
count = 0
total_cost = 0.0
with open(csv_path) as f:
reader = csv.DictReader(f)
for row in reader:
source_file = row.get("source_file", "")
model = row.get("model", "")
try:
in_tok = int(row.get("input_tokens", 0) or 0)
out_tok = int(row.get("output_tokens", 0) or 0)
except (ValueError, TypeError):
continue
cost = calculate_cost(model, in_tok, out_tok)
if cost <= 0:
continue
# Try to match source_file to sources.path
# CSV has filename, DB has full path — match on exact suffix
# Use ORDER BY length(path) to prefer shortest (most specific) match
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? AND cost_usd = 0 ORDER BY length(path) LIMIT 1",
(f"%/{source_file}" if "/" not in source_file else f"%{source_file}",),
).fetchone()
if matched:
conn.execute(
"UPDATE sources SET cost_usd = ?, extraction_model = ? WHERE path = ?",
(cost, model, matched[0]),
)
# Always record in costs table
date = row.get("date", "unknown")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, in_tok, out_tok, cost),
)
count += 1
total_cost += cost
conn.commit()
logger.info("Backfilled %d extraction cost records, total $%.2f", count, total_cost)
return count, total_cost
# ---------------------------------------------------------------------------
# Backfill: populate prs.source_path from branch naming convention
# ---------------------------------------------------------------------------
def backfill_source_paths(conn: sqlite3.Connection):
"""One-time backfill: derive source_path for existing PRs from branch names.
Branch format: extract/YYYY-MM-DD-source-name or similar patterns.
Source path format: inbox/queue/YYYY-MM-DD-source-name.md
"""
rows = conn.execute(
"SELECT number, branch FROM prs WHERE source_path IS NULL AND branch IS NOT NULL"
).fetchall()
count = 0
for number, branch in rows:
# Try to extract source name from branch
# Common patterns: extract/source-name, claims/source-name
parts = branch.split("/", 1)
if len(parts) < 2:
continue
source_stem = parts[1]
# Try to find matching source in DB — exact suffix match, shortest path wins
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_stem}%" if source_stem else "",),
).fetchone()
if matched:
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ?",
(matched[0], number),
)
count += 1
conn.commit()
logger.info("Backfilled source_path for %d PRs", count)
return count
# ---------------------------------------------------------------------------
# Integration points (for Epimetheus to wire in)
# ---------------------------------------------------------------------------
INTEGRATION_GUIDE = """
## Where to wire this in
### 1. openrouter-extract-v2.py — after successful extraction call
from research_tracking import record_extraction_cost
# After line 430 (content, usage = call_openrouter(...))
# After line 672 (log_usage(...))
record_extraction_cost(
conn, args.source_file, args.model,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
)
### 2. Agent research scripts — wrap research sessions
from research_tracking import start_session, link_source_to_session, complete_session
# At start of research:
session_id = start_session(conn, agent="leo", topic="weapons stigmatization campaigns",
domain="grand-strategy",
reasoning="Following up on EU AI Act national security exclusion — exploring how stigmatization
campaigns have historically driven arms control policy",
sources_planned=6, model="claude-opus-4-6")
# As each source is written:
link_source_to_session(conn, source_path, session_id)
# At end of research:
complete_session(conn, session_id,
summary="Ottawa Treaty mine ban model is the strongest parallel to AI weapons — same
3-condition framework (humanitarian harm + low military utility + civil society
coalition). Ukraine Shahed case is a near-miss triggering event.",
input_tokens=total_in, output_tokens=total_out, cost_usd=total_cost)
### 3. PR creation in lib/merge.py or lib/validate.py — ensure source_path
from research_tracking import ensure_source_path_on_pr
# When creating a PR, pass the source:
ensure_source_path_on_pr(conn, pr_number, source_path)
### 4. One-time backfills (run manually after migration)
from research_tracking import backfill_extraction_costs, backfill_source_paths
backfill_extraction_costs(conn, "/opt/teleo-eval/logs/openrouter-usage.csv")
backfill_source_paths(conn)
### 5. Migration
Run MIGRATION_V11_SQL against pipeline.db after backing up.
"""

629
diagnostics/vitality.py Normal file
View file

@ -0,0 +1,629 @@
"""Agent Vitality Diagnostics — data collection and schema.
Records daily vitality snapshots per agent across 10 dimensions.
Designed as the objective function for agent "aliveness" ranking.
Owner: Ship (data collection) + Argus (storage, API, dashboard)
Data sources: pipeline.db (read-only), claim-index API, agent-state filesystem, review_records
Dimension keys (agreed with Leo 2026-04-08):
knowledge_output, knowledge_quality, contributor_engagement,
review_performance, spend_efficiency, autonomy,
infrastructure_health, social_reach, capital, external_impact
"""
import json
import logging
import os
import sqlite3
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger("vitality")
# Known domain agents and their primary domains
AGENT_DOMAINS = {
"rio": ["internet-finance"],
"theseus": ["collective-intelligence", "living-agents"],
"astra": ["space-development", "energy", "manufacturing", "robotics"],
"vida": ["health"],
"clay": ["entertainment", "cultural-dynamics"],
"leo": ["grand-strategy", "teleohumanity"],
"hermes": [], # communications, no domain
"rhea": [], # infrastructure ops, no domain
"ganymede": [], # code review, no domain
"epimetheus": [], # pipeline, no domain
"oberon": [], # dashboard, no domain
"argus": [], # diagnostics, no domain
"ship": [], # engineering, no domain
}
# Agent file path prefixes — for matching claims by location, not just domain field.
# Handles claims in core/ and foundations/ that may not have a standard domain field
# in the claim-index (domain derived from directory path).
AGENT_PATHS = {
"rio": ["domains/internet-finance/"],
"theseus": ["domains/ai-alignment/", "core/living-agents/", "core/collective-intelligence/",
"foundations/collective-intelligence/"],
"astra": ["domains/space-development/", "domains/energy/",
"domains/manufacturing/", "domains/robotics/"],
"vida": ["domains/health/"],
"clay": ["domains/entertainment/", "foundations/cultural-dynamics/"],
"leo": ["core/grand-strategy/", "core/teleohumanity/", "core/mechanisms/",
"core/living-capital/", "foundations/teleological-economics/",
"foundations/critical-systems/"],
}
ALL_AGENTS = list(AGENT_DOMAINS.keys())
# Agent-state directory (VPS filesystem)
AGENT_STATE_DIR = Path(os.environ.get(
"AGENT_STATE_DIR", "/opt/teleo-eval/agent-state"
))
MIGRATION_SQL = """
CREATE TABLE IF NOT EXISTS vitality_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT NOT NULL,
dimension TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL DEFAULT 0,
unit TEXT NOT NULL DEFAULT '',
source TEXT,
recorded_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(agent_name, dimension, metric, recorded_at)
);
CREATE INDEX IF NOT EXISTS idx_vitality_agent_time
ON vitality_snapshots(agent_name, recorded_at);
CREATE INDEX IF NOT EXISTS idx_vitality_dimension
ON vitality_snapshots(dimension, recorded_at);
"""
# Add source column if missing (idempotent upgrade from v1 schema)
UPGRADE_SQL = """
ALTER TABLE vitality_snapshots ADD COLUMN source TEXT;
"""
def ensure_schema(db_path: str):
"""Create vitality_snapshots table if it doesn't exist."""
conn = sqlite3.connect(db_path, timeout=30)
try:
conn.executescript(MIGRATION_SQL)
try:
conn.execute(UPGRADE_SQL)
except sqlite3.OperationalError:
pass # column already exists
conn.commit()
logger.info("vitality_snapshots schema ensured")
finally:
conn.close()
def _fetch_claim_index(url: str = "http://localhost:8080/claim-index") -> dict | None:
"""Fetch claim-index from pipeline health API."""
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except Exception as e:
logger.warning("claim-index fetch failed: %s", e)
return None
def _ro_conn(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
return conn
# ---------------------------------------------------------------------------
# Dimension 1: knowledge_output — "How much has this agent produced?"
# ---------------------------------------------------------------------------
def collect_knowledge_output(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Claims merged, domain count, PRs submitted."""
metrics = []
row = conn.execute(
"SELECT COUNT(*) as cnt FROM prs WHERE agent = ? AND status = 'merged'",
(agent,),
).fetchone()
metrics.append({"metric": "claims_merged", "value": row["cnt"], "unit": "claims"})
row = conn.execute(
"SELECT COUNT(DISTINCT domain) as cnt FROM prs "
"WHERE agent = ? AND domain IS NOT NULL AND status = 'merged'",
(agent,),
).fetchone()
metrics.append({"metric": "domains_contributed", "value": row["cnt"], "unit": "domains"})
row = conn.execute(
"SELECT COUNT(*) as cnt FROM prs WHERE agent = ? AND created_at > datetime('now', '-7 days')",
(agent,),
).fetchone()
metrics.append({"metric": "prs_7d", "value": row["cnt"], "unit": "PRs"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 2: knowledge_quality — "How good is the output?"
# ---------------------------------------------------------------------------
def collect_knowledge_quality(
conn: sqlite3.Connection, claim_index: dict | None, agent: str
) -> list[dict]:
"""Evidence density, challenge rate, cross-domain links, domain coverage."""
metrics = []
agent_domains = AGENT_DOMAINS.get(agent, [])
# Challenge rate = challenge PRs / total PRs
rows = conn.execute(
"SELECT commit_type, COUNT(*) as cnt FROM prs "
"WHERE agent = ? AND commit_type IS NOT NULL GROUP BY commit_type",
(agent,),
).fetchall()
total = sum(r["cnt"] for r in rows)
type_counts = {r["commit_type"]: r["cnt"] for r in rows}
challenge_rate = type_counts.get("challenge", 0) / total if total > 0 else 0
metrics.append({"metric": "challenge_rate", "value": round(challenge_rate, 4), "unit": "ratio"})
# Activity breadth (distinct commit types)
metrics.append({"metric": "activity_breadth", "value": len(type_counts), "unit": "types"})
# Evidence density + cross-domain links from claim-index
# Match by domain field OR file path prefix (catches core/, foundations/ claims)
agent_paths = AGENT_PATHS.get(agent, [])
if claim_index and (agent_domains or agent_paths):
claims = claim_index.get("claims", [])
agent_claims = [
c for c in claims
if c.get("domain") in agent_domains
or any(c.get("file", "").startswith(p) for p in agent_paths)
]
total_claims = len(agent_claims)
# Evidence density: claims with incoming links / total claims
linked = sum(1 for c in agent_claims if c.get("incoming_count", 0) > 0)
density = linked / total_claims if total_claims > 0 else 0
metrics.append({"metric": "evidence_density", "value": round(density, 4), "unit": "ratio"})
# Cross-domain links
cross_domain = sum(
1 for c in agent_claims
for link in c.get("outgoing_links", [])
if any(d in link for d in claim_index.get("domains", {}).keys()
if d not in agent_domains)
)
metrics.append({"metric": "cross_domain_links", "value": cross_domain, "unit": "links"})
# Domain coverage: agent's claims / average domain size
domains_data = claim_index.get("domains", {})
agent_claim_count = sum(domains_data.get(d, 0) for d in agent_domains)
avg_domain_size = (sum(domains_data.values()) / len(domains_data)) if domains_data else 1
coverage = min(agent_claim_count / avg_domain_size, 1.0) if avg_domain_size > 0 else 0
metrics.append({"metric": "domain_coverage", "value": round(coverage, 4), "unit": "ratio"})
else:
metrics.append({"metric": "evidence_density", "value": 0, "unit": "ratio"})
metrics.append({"metric": "cross_domain_links", "value": 0, "unit": "links"})
metrics.append({"metric": "domain_coverage", "value": 0, "unit": "ratio"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 3: contributor_engagement — "Who contributes to this agent's domain?"
# ---------------------------------------------------------------------------
def collect_contributor_engagement(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Unique submitters to this agent's domain."""
row = conn.execute(
"SELECT COUNT(DISTINCT submitted_by) as cnt FROM prs "
"WHERE agent = ? AND submitted_by IS NOT NULL AND submitted_by != ''",
(agent,),
).fetchone()
return [
{"metric": "unique_submitters", "value": row["cnt"], "unit": "contributors"},
]
# ---------------------------------------------------------------------------
# Dimension 4: review_performance — "How good is the evaluator feedback loop?"
# ---------------------------------------------------------------------------
def collect_review_performance(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Approval rate, rejection reasons from review_records."""
metrics = []
# Check if review_records table exists
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='review_records'"
).fetchone()
if not table_check:
return [
{"metric": "approval_rate", "value": 0, "unit": "ratio"},
{"metric": "total_reviews", "value": 0, "unit": "reviews"},
]
# Overall approval rate for this agent's claims (join through prs table)
row = conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN r.outcome = 'approved' THEN 1 ELSE 0 END) as approved, "
"SUM(CASE WHEN r.outcome = 'approved-with-changes' THEN 1 ELSE 0 END) as with_changes, "
"SUM(CASE WHEN r.outcome = 'rejected' THEN 1 ELSE 0 END) as rejected "
"FROM review_records r "
"JOIN prs p ON r.pr_number = p.pr_number "
"WHERE LOWER(p.agent) = LOWER(?)",
(agent,),
).fetchone()
total = row["total"] or 0
approved = (row["approved"] or 0) + (row["with_changes"] or 0)
rejected = row["rejected"] or 0
approval_rate = approved / total if total > 0 else 0
metrics.append({"metric": "total_reviews", "value": total, "unit": "reviews"})
metrics.append({"metric": "approval_rate", "value": round(approval_rate, 4), "unit": "ratio"})
metrics.append({"metric": "approved", "value": row["approved"] or 0, "unit": "reviews"})
metrics.append({"metric": "approved_with_changes", "value": row["with_changes"] or 0, "unit": "reviews"})
metrics.append({"metric": "rejected", "value": rejected, "unit": "reviews"})
# Top rejection reasons (last 30 days)
reasons = conn.execute(
"SELECT r.rejection_reason, COUNT(*) as cnt FROM review_records r "
"JOIN prs p ON r.pr_number = p.pr_number "
"WHERE LOWER(p.agent) = LOWER(?) AND r.outcome = 'rejected' "
"AND r.rejection_reason IS NOT NULL "
"AND r.review_date > datetime('now', '-30 days') "
"GROUP BY r.rejection_reason ORDER BY cnt DESC",
(agent,),
).fetchall()
for r in reasons:
metrics.append({
"metric": f"rejection_{r['rejection_reason']}",
"value": r["cnt"],
"unit": "rejections",
})
return metrics
# ---------------------------------------------------------------------------
# Dimension 5: spend_efficiency — "What does it cost per merged claim?"
# ---------------------------------------------------------------------------
def collect_spend_efficiency(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Cost per merged claim, total spend, response costs."""
metrics = []
# Pipeline cost attributed to this agent (from prs.cost_usd)
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as merged "
"FROM prs WHERE agent = ? AND status = 'merged'",
(agent,),
).fetchone()
total_cost = row["cost"] or 0
merged = row["merged"] or 0
cost_per_claim = total_cost / merged if merged > 0 else 0
metrics.append({"metric": "total_pipeline_cost", "value": round(total_cost, 4), "unit": "USD"})
metrics.append({"metric": "cost_per_merged_claim", "value": round(cost_per_claim, 4), "unit": "USD"})
# Response audit costs (Telegram bot) — per-agent
row = conn.execute(
"SELECT COALESCE(SUM(generation_cost), 0) as cost, COUNT(*) as cnt "
"FROM response_audit WHERE agent = ?",
(agent,),
).fetchone()
metrics.append({"metric": "response_cost_total", "value": round(row["cost"], 4), "unit": "USD"})
metrics.append({"metric": "total_responses", "value": row["cnt"], "unit": "responses"})
# 24h spend snapshot
row = conn.execute(
"SELECT COALESCE(SUM(generation_cost), 0) as cost "
"FROM response_audit WHERE agent = ? AND timestamp > datetime('now', '-24 hours')",
(agent,),
).fetchone()
metrics.append({"metric": "response_cost_24h", "value": round(row["cost"], 4), "unit": "USD"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 6: autonomy — "How independently does this agent act?"
# ---------------------------------------------------------------------------
def collect_autonomy(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Self-directed actions, active days."""
metrics = []
# Autonomous responses in last 24h
row = conn.execute(
"SELECT COUNT(*) as cnt FROM response_audit "
"WHERE agent = ? AND timestamp > datetime('now', '-24 hours')",
(agent,),
).fetchone()
metrics.append({"metric": "autonomous_responses_24h", "value": row["cnt"], "unit": "actions"})
# Active days in last 7
row = conn.execute(
"SELECT COUNT(DISTINCT date(created_at)) as days FROM prs "
"WHERE agent = ? AND created_at > datetime('now', '-7 days')",
(agent,),
).fetchone()
metrics.append({"metric": "active_days_7d", "value": row["days"], "unit": "days"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 7: infrastructure_health — "Is the agent's machinery working?"
# ---------------------------------------------------------------------------
def collect_infrastructure_health(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Circuit breakers, PR success rate, agent-state liveness."""
metrics = []
# Circuit breakers
rows = conn.execute(
"SELECT name, state FROM circuit_breakers WHERE name LIKE ?",
(f"%{agent}%",),
).fetchall()
open_breakers = sum(1 for r in rows if r["state"] != "closed")
metrics.append({"metric": "open_circuit_breakers", "value": open_breakers, "unit": "breakers"})
# PR success rate last 7 days
row = conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as merged "
"FROM prs WHERE agent = ? AND created_at > datetime('now', '-7 days')",
(agent,),
).fetchone()
total = row["total"]
rate = row["merged"] / total if total > 0 else 0
metrics.append({"metric": "merge_rate_7d", "value": round(rate, 4), "unit": "ratio"})
# Agent-state liveness (read metrics.json from filesystem)
state_file = AGENT_STATE_DIR / agent / "metrics.json"
if state_file.exists():
try:
with open(state_file) as f:
state = json.load(f)
lifetime = state.get("lifetime", {})
metrics.append({
"metric": "sessions_total",
"value": lifetime.get("sessions_total", 0),
"unit": "sessions",
})
metrics.append({
"metric": "sessions_timeout",
"value": lifetime.get("sessions_timeout", 0),
"unit": "sessions",
})
metrics.append({
"metric": "sessions_error",
"value": lifetime.get("sessions_error", 0),
"unit": "sessions",
})
except (json.JSONDecodeError, OSError) as e:
logger.warning("Failed to read agent-state for %s: %s", agent, e)
return metrics
# ---------------------------------------------------------------------------
# Dimensions 8-10: Stubs (no data sources yet)
# ---------------------------------------------------------------------------
def collect_social_reach(agent: str) -> list[dict]:
"""Social dimension: stub zeros until X API accounts are active."""
return [
{"metric": "followers", "value": 0, "unit": "followers"},
{"metric": "impressions_7d", "value": 0, "unit": "impressions"},
{"metric": "engagement_rate", "value": 0, "unit": "ratio"},
]
def collect_capital(agent: str) -> list[dict]:
"""Capital dimension: stub zeros until treasury/revenue tracking exists."""
return [
{"metric": "aum", "value": 0, "unit": "USD"},
{"metric": "treasury", "value": 0, "unit": "USD"},
]
def collect_external_impact(agent: str) -> list[dict]:
"""External impact dimension: stub zeros until manual tracking exists."""
return [
{"metric": "decisions_informed", "value": 0, "unit": "decisions"},
{"metric": "deals_sourced", "value": 0, "unit": "deals"},
]
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
DIMENSION_MAP = {
"knowledge_output": lambda conn, ci, agent: collect_knowledge_output(conn, agent),
"knowledge_quality": collect_knowledge_quality,
"contributor_engagement": lambda conn, ci, agent: collect_contributor_engagement(conn, agent),
"review_performance": lambda conn, ci, agent: collect_review_performance(conn, agent),
"spend_efficiency": lambda conn, ci, agent: collect_spend_efficiency(conn, agent),
"autonomy": lambda conn, ci, agent: collect_autonomy(conn, agent),
"infrastructure_health": lambda conn, ci, agent: collect_infrastructure_health(conn, agent),
"social_reach": lambda conn, ci, agent: collect_social_reach(agent),
"capital": lambda conn, ci, agent: collect_capital(agent),
"external_impact": lambda conn, ci, agent: collect_external_impact(agent),
}
def collect_all_for_agent(
db_path: str,
agent: str,
claim_index_url: str = "http://localhost:8080/claim-index",
) -> dict:
"""Collect all 10 vitality dimensions for a single agent.
Returns {dimension: [metrics]}.
"""
claim_index = _fetch_claim_index(claim_index_url)
conn = _ro_conn(db_path)
try:
result = {}
for dim_key, collector in DIMENSION_MAP.items():
try:
result[dim_key] = collector(conn, claim_index, agent)
except Exception as e:
logger.error("collector %s failed for %s: %s", dim_key, agent, e)
result[dim_key] = []
return result
finally:
conn.close()
def collect_system_aggregate(
db_path: str,
claim_index_url: str = "http://localhost:8080/claim-index",
) -> dict:
"""System-level aggregate vitality metrics."""
claim_index = _fetch_claim_index(claim_index_url)
conn = _ro_conn(db_path)
try:
metrics = {}
# Knowledge totals
total_claims = claim_index["total_claims"] if claim_index else 0
orphan_ratio = claim_index.get("orphan_ratio", 0) if claim_index else 0
domain_count = len(claim_index.get("domains", {})) if claim_index else 0
metrics["knowledge_output"] = [
{"metric": "total_claims", "value": total_claims, "unit": "claims"},
{"metric": "total_domains", "value": domain_count, "unit": "domains"},
{"metric": "orphan_ratio", "value": round(orphan_ratio, 4), "unit": "ratio"},
]
# Cross-domain citation rate
if claim_index:
claims = claim_index.get("claims", [])
total_links = sum(c.get("outgoing_count", 0) for c in claims)
cross_domain = 0
for c in claims:
src_domain = c.get("domain")
for link in c.get("outgoing_links", []):
linked_claims = [
x for x in claims
if x.get("stem") in link or x.get("file", "").endswith(link + ".md")
]
for lc in linked_claims:
if lc.get("domain") != src_domain:
cross_domain += 1
metrics["knowledge_quality"] = [
{"metric": "cross_domain_citation_rate",
"value": round(cross_domain / max(total_links, 1), 4),
"unit": "ratio"},
]
# Pipeline throughput
row = conn.execute(
"SELECT COUNT(*) as merged FROM prs "
"WHERE status='merged' AND merged_at > datetime('now', '-24 hours')"
).fetchone()
row2 = conn.execute("SELECT COUNT(*) as total FROM sources").fetchone()
row3 = conn.execute(
"SELECT COUNT(*) as pending FROM prs "
"WHERE status NOT IN ('merged','rejected','closed')"
).fetchone()
metrics["infrastructure_health"] = [
{"metric": "prs_merged_24h", "value": row["merged"], "unit": "PRs/day"},
{"metric": "total_sources", "value": row2["total"], "unit": "sources"},
{"metric": "queue_depth", "value": row3["pending"], "unit": "PRs"},
]
# Total spend
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0) as cost "
"FROM costs WHERE date > date('now', '-1 day')"
).fetchone()
row2 = conn.execute(
"SELECT COALESCE(SUM(generation_cost), 0) as cost FROM response_audit "
"WHERE timestamp > datetime('now', '-24 hours')"
).fetchone()
metrics["spend_efficiency"] = [
{"metric": "pipeline_cost_24h", "value": round(row["cost"], 4), "unit": "USD"},
{"metric": "response_cost_24h", "value": round(row2["cost"], 4), "unit": "USD"},
{"metric": "total_cost_24h",
"value": round(row["cost"] + row2["cost"], 4), "unit": "USD"},
]
# Stubs
metrics["social_reach"] = [{"metric": "total_followers", "value": 0, "unit": "followers"}]
metrics["capital"] = [{"metric": "total_aum", "value": 0, "unit": "USD"}]
return metrics
finally:
conn.close()
def record_snapshot(
db_path: str,
claim_index_url: str = "http://localhost:8080/claim-index",
):
"""Run a full vitality snapshot — one row per agent per dimension per metric."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
rows = []
# Per-agent snapshots
for agent in ALL_AGENTS:
try:
dimensions = collect_all_for_agent(db_path, agent, claim_index_url)
for dim_name, metrics in dimensions.items():
collector_name = f"{dim_name}_collector"
for m in metrics:
rows.append((
agent, dim_name, m["metric"], m["value"],
m["unit"], collector_name, now,
))
except Exception as e:
logger.error("vitality collection failed for %s: %s", agent, e)
# System aggregate
try:
system = collect_system_aggregate(db_path, claim_index_url)
for dim_name, metrics in system.items():
for m in metrics:
rows.append((
"_system", dim_name, m["metric"], m["value"],
m["unit"], "system_aggregate", now,
))
except Exception as e:
logger.error("vitality system aggregate failed: %s", e)
# Write all rows
ensure_schema(db_path)
conn = sqlite3.connect(db_path, timeout=30)
try:
conn.executemany(
"INSERT OR REPLACE INTO vitality_snapshots "
"(agent_name, dimension, metric, value, unit, source, recorded_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
logger.info(
"vitality snapshot recorded: %d rows for %d agents + system",
len(rows), len(ALL_AGENTS),
)
return {"rows_written": len(rows), "agents": len(ALL_AGENTS), "recorded_at": now}
finally:
conn.close()
if __name__ == "__main__":
"""CLI: python3 vitality.py [db_path] — runs a snapshot."""
import sys
logging.basicConfig(level=logging.INFO)
db = sys.argv[1] if len(sys.argv) > 1 else "/opt/teleo-eval/pipeline/pipeline.db"
result = record_snapshot(db)
print(json.dumps(result, indent=2))

View file

@ -0,0 +1,293 @@
"""Vitality API routes for Argus diagnostics dashboard.
Endpoints:
GET /api/vitality latest snapshot + time-series for all agents or one
GET /api/vitality/snapshot trigger a new snapshot (POST-like via GET for cron curl)
GET /api/vitality/leaderboard agents ranked by composite vitality score
Owner: Argus
"""
import json
import logging
import sqlite3
from pathlib import Path
from aiohttp import web
from vitality import (
ALL_AGENTS,
MIGRATION_SQL,
collect_all_for_agent,
collect_system_aggregate,
record_snapshot,
)
logger = logging.getLogger("argus.vitality")
# Composite vitality weights — Leo-approved 2026-04-08
# Dimension keys match Ship's refactored vitality.py DIMENSION_MAP
VITALITY_WEIGHTS = {
"knowledge_output": 0.30, # primary output — highest weight
"knowledge_quality": 0.20, # was "diversity" — quality of output
"contributor_engagement": 0.15, # attracting external contributors
"review_performance": 0.00, # new dim, zero until review_records populated
"autonomy": 0.15, # independent action
"infrastructure_health": 0.05, # machinery working
"spend_efficiency": 0.05, # cost discipline
"social_reach": 0.00, # zero until accounts active
"capital": 0.00, # zero until treasury exists
"external_impact": 0.00, # zero until measurable
}
# Public paths (no auth required)
VITALITY_PUBLIC_PATHS = frozenset({
"/api/vitality",
"/api/vitality/snapshot",
"/api/vitality/leaderboard",
})
def _ro_conn(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
return conn
async def handle_vitality(request: web.Request) -> web.Response:
"""GET /api/vitality?agent=<name>&days=7
Returns latest snapshot and time-series data.
If agent is specified, returns that agent only. Otherwise returns all.
"""
db_path = request.app["db_path"]
agent = request.query.get("agent")
try:
days = min(int(request.query.get("days", "7")), 90)
except ValueError:
days = 7
conn = _ro_conn(db_path)
try:
# Check if table exists
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'"
).fetchone()
if not table_check:
return web.json_response({
"error": "No vitality data yet. Trigger a snapshot first via /api/vitality/snapshot",
"has_data": False
})
# Latest snapshot timestamp
latest = conn.execute(
"SELECT MAX(recorded_at) as ts FROM vitality_snapshots"
).fetchone()
latest_ts = latest["ts"] if latest else None
if not latest_ts:
return web.json_response({"has_data": False})
# Latest snapshot data
if agent:
agents_filter = [agent]
else:
agents_filter = ALL_AGENTS + ["_system"]
result = {"latest_snapshot": latest_ts, "agents": {}}
for a in agents_filter:
rows = conn.execute(
"SELECT dimension, metric, value, unit FROM vitality_snapshots "
"WHERE agent_name = ? AND recorded_at = ?",
(a, latest_ts)
).fetchall()
if not rows:
continue
dimensions = {}
for r in rows:
dim = r["dimension"]
if dim not in dimensions:
dimensions[dim] = []
dimensions[dim].append({
"metric": r["metric"],
"value": r["value"],
"unit": r["unit"],
})
result["agents"][a] = dimensions
# Time-series for trend charts (one data point per snapshot)
ts_query_agent = agent if agent else "_system"
ts_rows = conn.execute(
"SELECT recorded_at, dimension, metric, value "
"FROM vitality_snapshots "
"WHERE agent_name = ? AND recorded_at > datetime('now', ?)"
"ORDER BY recorded_at",
(ts_query_agent, f"-{days} days")
).fetchall()
time_series = {}
for r in ts_rows:
key = f"{r['dimension']}.{r['metric']}"
if key not in time_series:
time_series[key] = []
time_series[key].append({
"t": r["recorded_at"],
"v": r["value"],
})
result["time_series"] = time_series
result["has_data"] = True
return web.json_response(result)
finally:
conn.close()
async def handle_vitality_snapshot(request: web.Request) -> web.Response:
"""GET /api/vitality/snapshot — trigger a new snapshot collection.
Used by cron: curl http://localhost:8081/api/vitality/snapshot
Requires ?confirm=1 to prevent accidental triggers from crawlers/prefetch.
"""
if request.query.get("confirm") != "1":
return web.json_response(
{"status": "noop", "error": "Add ?confirm=1 to trigger a snapshot write"},
status=400,
)
db_path = request.app["db_path"]
claim_index_url = request.app.get("claim_index_url", "http://localhost:8080/claim-index")
try:
result = record_snapshot(db_path, claim_index_url)
return web.json_response({"status": "ok", **result})
except Exception as e:
logger.error("vitality snapshot failed: %s", e)
return web.json_response({"status": "error", "error": str(e)}, status=500)
async def handle_vitality_leaderboard(request: web.Request) -> web.Response:
"""GET /api/vitality/leaderboard — agents ranked by composite vitality score.
Scoring approach:
- Each dimension gets a 0-1 normalized score based on the metric values
- Weighted sum produces composite score
- Agents ranked by composite score descending
"""
db_path = request.app["db_path"]
conn = _ro_conn(db_path)
try:
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'"
).fetchone()
if not table_check:
return web.json_response({"error": "No vitality data yet", "has_data": False})
latest = conn.execute(
"SELECT MAX(recorded_at) as ts FROM vitality_snapshots"
).fetchone()
if not latest or not latest["ts"]:
return web.json_response({"has_data": False})
latest_ts = latest["ts"]
# Collect all agents' latest data
agent_scores = []
for agent in ALL_AGENTS:
rows = conn.execute(
"SELECT dimension, metric, value FROM vitality_snapshots "
"WHERE agent_name = ? AND recorded_at = ?",
(agent, latest_ts)
).fetchall()
if not rows:
continue
dims = {}
for r in rows:
dim = r["dimension"]
if dim not in dims:
dims[dim] = {}
dims[dim][r["metric"]] = r["value"]
# Normalize each dimension to 0-1
# Dimension keys match Ship's refactored vitality.py DIMENSION_MAP
dim_scores = {}
# knowledge_output: claims_merged (cap at 100 = 1.0)
ko = dims.get("knowledge_output", {})
claims = ko.get("claims_merged", 0)
dim_scores["knowledge_output"] = min(claims / 100, 1.0)
# knowledge_quality: challenge_rate + breadth + evidence_density + domain_coverage
kq = dims.get("knowledge_quality", {})
cr = kq.get("challenge_rate", 0)
breadth = kq.get("activity_breadth", 0)
evidence = kq.get("evidence_density", 0)
coverage = kq.get("domain_coverage", 0)
dim_scores["knowledge_quality"] = min(
(cr / 0.1 * 0.2 + breadth / 4 * 0.2 + evidence * 0.3 + coverage * 0.3), 1.0
)
# contributor_engagement: unique_submitters (cap at 5 = 1.0)
ce = dims.get("contributor_engagement", {})
dim_scores["contributor_engagement"] = min(ce.get("unique_submitters", 0) / 5, 1.0)
# review_performance: approval_rate from review_records (0 until populated)
rp = dims.get("review_performance", {})
dim_scores["review_performance"] = rp.get("approval_rate", 0)
# autonomy: active_days_7d (7 = 1.0)
am = dims.get("autonomy", {})
dim_scores["autonomy"] = min(am.get("active_days_7d", 0) / 7, 1.0)
# infrastructure_health: merge_rate_7d directly (already 0-1)
ih = dims.get("infrastructure_health", {})
dim_scores["infrastructure_health"] = ih.get("merge_rate_7d", 0)
# spend_efficiency: inverted — lower cost per claim is better
se = dims.get("spend_efficiency", {})
daily_cost = se.get("response_cost_24h", 0)
dim_scores["spend_efficiency"] = max(1.0 - daily_cost / 10.0, 0)
# Social/Capital/External: stubbed at 0
dim_scores["social_reach"] = 0
dim_scores["capital"] = 0
dim_scores["external_impact"] = 0
# Composite weighted score
composite = sum(
dim_scores.get(dim, 0) * weight
for dim, weight in VITALITY_WEIGHTS.items()
)
agent_scores.append({
"agent": agent,
"composite_score": round(composite, 4),
"dimension_scores": {k: round(v, 4) for k, v in dim_scores.items()},
"raw_highlights": {
"claims_merged": int(claims),
"merge_rate": round(ih.get("merge_rate_7d", 0) * 100, 1),
"active_days": int(am.get("active_days_7d", 0)),
"challenge_rate": round(cr * 100, 1),
"evidence_density": round(evidence * 100, 1),
},
})
# Sort by composite score descending
agent_scores.sort(key=lambda x: x["composite_score"], reverse=True)
return web.json_response({
"has_data": True,
"snapshot_at": latest_ts,
"leaderboard": agent_scores,
})
finally:
conn.close()
def register_vitality_routes(app: web.Application):
"""Register vitality endpoints on the aiohttp app."""
app.router.add_get("/api/vitality", handle_vitality)
app.router.add_get("/api/vitality/snapshot", handle_vitality_snapshot)
app.router.add_get("/api/vitality/leaderboard", handle_vitality_leaderboard)