teleo-infrastructure/lib/health.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

720 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Health API — HTTP server on configurable port for monitoring."""
import json
import logging
import statistics
from datetime import date, datetime, timezone
from aiohttp import web
from . import config, costs, db
from .analytics import get_snapshot_history, get_version_changes
from .claim_index import build_claim_index, write_claim_index
from .feedback import get_agent_error_patterns, get_all_agent_patterns
logger = logging.getLogger("pipeline.health")
def _conn(request):
"""Get the persistent readonly connection from app state."""
return request.app["db"]
async def handle_health(request):
"""GET /health — overall pipeline health."""
conn = _conn(request)
# Stage status from circuit breakers
breakers = conn.execute(
"SELECT name, state, failures, last_success_at, last_update FROM circuit_breakers"
).fetchall()
# Queue depths
sources_by_status = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall()
prs_by_status = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
# Per-domain merge queue depth (Vida)
merge_queue = conn.execute(
"SELECT domain, COUNT(*) as n FROM prs WHERE status = 'approved' GROUP BY domain"
).fetchall()
# Cost
budget = costs.check_budget(conn)
# Metabolic metrics (Vida)
null_rate = conn.execute(
"""SELECT
CAST(SUM(CASE WHEN status = 'null_result' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as rate
FROM sources
WHERE updated_at > datetime('now', '-24 hours')
AND status IN ('extracted', 'null_result', 'error')"""
).fetchone()
approval_rate = conn.execute(
"""SELECT
CAST(SUM(CASE WHEN domain_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as domain_rate,
CAST(SUM(CASE WHEN leo_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as leo_rate
FROM prs
WHERE last_attempt > datetime('now', '-24 hours')
AND domain_verdict != 'pending'"""
).fetchone()
# Recent activity (last hour)
recent = conn.execute(
"""SELECT stage, event, COUNT(*) as n
FROM audit_log
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY stage, event"""
).fetchall()
body = {
"status": "healthy",
"breakers": {},
"sources": {r["status"]: r["n"] for r in sources_by_status},
"prs": {r["status"]: r["n"] for r in prs_by_status},
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
"budget": budget,
"metabolic": {
"null_result_rate_24h": round(null_rate["rate"], 3)
if null_rate and null_rate["rate"] is not None
else None,
"domain_approval_rate_24h": round(approval_rate["domain_rate"], 3)
if approval_rate and approval_rate["domain_rate"] is not None
else None,
"leo_approval_rate_24h": round(approval_rate["leo_rate"], 3)
if approval_rate and approval_rate["leo_rate"] is not None
else None,
},
"recent_activity": [{"stage": r["stage"], "event": r["event"], "count": r["n"]} for r in recent],
}
# Breaker state + stall detection (Vida: last_success_at heartbeat)
for r in breakers:
breaker_info = {"state": r["state"], "failures": r["failures"]}
if r["last_success_at"]:
last = datetime.fromisoformat(r["last_success_at"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - last).total_seconds()
breaker_info["last_success_age_s"] = round(age_s)
# Stall detection: no success in 2x the stage's interval
intervals = {
"ingest": config.INGEST_INTERVAL,
"validate": config.VALIDATE_INTERVAL,
"evaluate": config.EVAL_INTERVAL,
"merge": config.MERGE_INTERVAL,
}
threshold = intervals.get(r["name"], 60) * 2
if age_s > threshold:
breaker_info["stalled"] = True
body["breakers"][r["name"]] = breaker_info
# Overall status
if any(b.get("stalled") for b in body["breakers"].values()):
body["status"] = "stalled"
if any(b["state"] == "open" for b in body["breakers"].values()):
body["status"] = "degraded"
if not budget["ok"]:
body["status"] = "budget_exhausted"
# Rubber-stamp warning (Vida)
if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95:
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
status_code = 200 if body["status"] == "healthy" else 503
return web.json_response(body, status=status_code)
async def handle_costs(request):
"""GET /costs — daily cost breakdown."""
conn = _conn(request)
day = request.query.get("date", date.today().isoformat())
breakdown = costs.get_daily_breakdown(conn, day)
budget = costs.check_budget(conn)
return web.json_response({"date": day, "budget": budget, "breakdown": breakdown})
async def handle_sources(request):
"""GET /sources — source pipeline status."""
conn = _conn(request)
status_filter = request.query.get("status")
if status_filter:
rows = conn.execute(
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources WHERE status = ? ORDER BY updated_at DESC LIMIT 50",
(status_filter,),
).fetchall()
else:
rows = conn.execute(
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources ORDER BY updated_at DESC LIMIT 50"
).fetchall()
return web.json_response({"sources": [dict(r) for r in rows]})
async def handle_prs(request):
"""GET /prs — PR pipeline status."""
conn = _conn(request)
status_filter = request.query.get("status")
if status_filter:
rows = conn.execute(
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs WHERE status = ? ORDER BY number DESC LIMIT 50",
(status_filter,),
).fetchall()
else:
rows = conn.execute(
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs ORDER BY number DESC LIMIT 50"
).fetchall()
return web.json_response({"prs": [dict(r) for r in rows]})
async def handle_breakers(request):
"""GET /breakers — circuit breaker states."""
conn = _conn(request)
rows = conn.execute("SELECT * FROM circuit_breakers").fetchall()
return web.json_response({"breakers": [dict(r) for r in rows]})
async def handle_calibration(request):
"""GET /calibration — priority calibration analysis (Vida)."""
conn = _conn(request)
# Find sources where eval disagreed with ingest priority
# Focus on upgrades (Theseus: upgrades are the learnable signal)
rows = conn.execute(
"""SELECT path, priority, priority_log FROM sources
WHERE json_array_length(priority_log) >= 2"""
).fetchall()
upgrades = []
downgrades = []
for r in rows:
import json
log = json.loads(r["priority_log"] or "[]")
if len(log) < 2:
continue
first = log[0]["priority"]
last = log[-1]["priority"]
levels = {"critical": 4, "high": 3, "medium": 2, "low": 1, "skip": 0}
if levels.get(last, 2) > levels.get(first, 2):
upgrades.append({"path": r["path"], "from": first, "to": last})
elif levels.get(last, 2) < levels.get(first, 2):
downgrades.append({"path": r["path"], "from": first, "to": last})
return web.json_response(
{
"upgrades": upgrades[:20],
"downgrades_count": len(downgrades),
"upgrades_count": len(upgrades),
"note": "Focus on upgrades — downgrades are expected (downstream has more context)",
}
)
async def handle_metrics(request):
"""GET /metrics — operational health metrics (Rhea).
Leo's three numbers plus rejection reasons, time-to-merge, and fix effectiveness.
Data from audit_log + prs tables. Curl-friendly JSON.
"""
conn = _conn(request)
# --- 1. Throughput: PRs processed in last hour ---
throughput = conn.execute(
"""SELECT COUNT(*) as n FROM audit_log
WHERE timestamp > datetime('now', '-1 hour')
AND event IN ('approved', 'changes_requested', 'merged')"""
).fetchone()
prs_per_hour = throughput["n"] if throughput else 0
# --- 2. Approval rate (24h) ---
verdicts_24h = conn.execute(
"""SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'merged' THEN 1 ELSE 0 END) as merged,
SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END) as approved,
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed
FROM prs
WHERE last_attempt > datetime('now', '-24 hours')"""
).fetchone()
total_24h = verdicts_24h["total"] if verdicts_24h else 0
passed_24h = (verdicts_24h["merged"] or 0) + (verdicts_24h["approved"] or 0)
approval_rate_24h = round(passed_24h / total_24h, 3) if total_24h > 0 else None
# --- 3. Backlog depth by status ---
backlog_rows = conn.execute(
"SELECT status, COUNT(*) as n FROM prs GROUP BY status"
).fetchall()
backlog = {r["status"]: r["n"] for r in backlog_rows}
# --- 4. Rejection reasons (top 10) ---
issue_rows = conn.execute(
"""SELECT eval_issues FROM prs
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND last_attempt > datetime('now', '-24 hours')"""
).fetchall()
tag_counts: dict[str, int] = {}
for row in issue_rows:
try:
tags = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
continue
for tag in tags:
if isinstance(tag, str):
tag_counts[tag] = tag_counts.get(tag, 0) + 1
rejection_reasons = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]
# --- 5. Median time-to-merge (24h, in minutes) ---
merge_times = conn.execute(
"""SELECT
(julianday(merged_at) - julianday(created_at)) * 24 * 60 as minutes
FROM prs
WHERE merged_at IS NOT NULL
AND merged_at > datetime('now', '-24 hours')"""
).fetchall()
durations = [r["minutes"] for r in merge_times if r["minutes"] is not None and r["minutes"] > 0]
median_ttm_minutes = round(statistics.median(durations), 1) if durations else None
# --- 6. Fix cycle effectiveness ---
fix_stats = conn.execute(
"""SELECT
COUNT(*) as attempted,
SUM(CASE WHEN status IN ('merged', 'approved') THEN 1 ELSE 0 END) as succeeded
FROM prs
WHERE fix_attempts > 0"""
).fetchone()
fix_attempted = fix_stats["attempted"] if fix_stats else 0
fix_succeeded = fix_stats["succeeded"] or 0 if fix_stats else 0
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted > 0 else None
# --- 7. Cost summary (today) ---
budget = costs.check_budget(conn)
return web.json_response({
"throughput_prs_per_hour": prs_per_hour,
"approval_rate_24h": approval_rate_24h,
"backlog": backlog,
"rejection_reasons_24h": [{"tag": t, "count": c} for t, c in rejection_reasons],
"median_time_to_merge_minutes_24h": median_ttm_minutes,
"fix_cycle": {
"attempted": fix_attempted,
"succeeded": fix_succeeded,
"success_rate": fix_rate,
},
"cost_today": budget,
"prs_with_merge_times_24h": len(durations),
"prs_evaluated_24h": total_24h,
})
async def handle_activity(request):
"""GET /activity — condensed PR activity feed (Rhea).
Recent PR outcomes at a glance. Optional ?hours=N (default 1).
Summary line at top, then individual PRs sorted most-recent-first.
"""
conn = _conn(request)
hours = int(request.query.get("hours", "1"))
# Recent PRs with activity
rows = conn.execute(
"""SELECT number, source_path, domain, status, tier,
domain_verdict, leo_verdict, eval_issues,
eval_attempts, fix_attempts, last_attempt, merged_at
FROM prs
WHERE last_attempt > datetime('now', ? || ' hours')
ORDER BY last_attempt DESC
LIMIT 50""",
(f"-{hours}",),
).fetchall()
# Summary counts
counts: dict[str, int] = {}
prs = []
for r in rows:
s = r["status"]
counts[s] = counts.get(s, 0) + 1
# Parse issues
issues = []
try:
issues = json.loads(r["eval_issues"] or "[]")
except (json.JSONDecodeError, TypeError):
pass
# Build reviewer string
reviewers = []
if r["domain_verdict"] and r["domain_verdict"] != "pending":
reviewers.append(f"domain:{r['domain_verdict']}")
if r["leo_verdict"] and r["leo_verdict"] != "pending":
reviewers.append(f"leo:{r['leo_verdict']}")
# Time since last activity
age = ""
if r["last_attempt"]:
try:
last = datetime.fromisoformat(r["last_attempt"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
delta = datetime.now(timezone.utc) - last
mins = int(delta.total_seconds() / 60)
age = f"{mins}m" if mins < 60 else f"{mins // 60}h{mins % 60}m"
except ValueError:
pass
# Source name — strip the long path prefix
source = r["source_path"] or ""
if "/" in source:
source = source.rsplit("/", 1)[-1]
if source.endswith(".md"):
source = source[:-3]
prs.append({
"pr": r["number"],
"source": source,
"domain": r["domain"],
"status": r["status"],
"tier": r["tier"],
"issues": issues if issues else None,
"reviewers": ", ".join(reviewers) if reviewers else None,
"fixes": r["fix_attempts"] if r["fix_attempts"] else None,
"age": age,
})
return web.json_response({
"window": f"{hours}h",
"summary": counts,
"prs": prs,
})
async def handle_contributor(request):
"""GET /contributor/{handle} — contributor profile. ?detail=card|summary|full"""
conn = _conn(request)
handle = request.match_info["handle"].lower().lstrip("@")
detail = request.query.get("detail", "card")
row = conn.execute(
"SELECT * FROM contributors WHERE handle = ?", (handle,)
).fetchone()
if not row:
return web.json_response({"error": f"contributor '{handle}' not found"}, status=404)
# Card (~50 tokens)
card = {
"handle": row["handle"],
"tier": row["tier"],
"claims_merged": row["claims_merged"] or 0,
"domains": json.loads(row["domains"]) if row["domains"] else [],
"last_contribution": row["last_contribution"],
}
if detail == "card":
return web.json_response(card)
# Summary (~200 tokens) — add role counts + CI
roles = {
"sourcer": row["sourcer_count"] or 0,
"extractor": row["extractor_count"] or 0,
"challenger": row["challenger_count"] or 0,
"synthesizer": row["synthesizer_count"] or 0,
"reviewer": row["reviewer_count"] or 0,
}
# Compute CI from role counts × weights
ci_components = {}
ci_total = 0.0
for role, count in roles.items():
weight = config.CONTRIBUTION_ROLE_WEIGHTS.get(role, 0)
score = round(count * weight, 2)
ci_components[role] = score
ci_total += score
summary = {
**card,
"first_contribution": row["first_contribution"],
"agent_id": row["agent_id"],
"roles": roles,
"challenges_survived": row["challenges_survived"] or 0,
"highlights": json.loads(row["highlights"]) if row["highlights"] else [],
"ci": {
**ci_components,
"total": round(ci_total, 2),
},
}
if detail == "summary":
return web.json_response(summary)
# Full — add everything
full = {
**summary,
"identities": json.loads(row["identities"]) if row["identities"] else {},
"display_name": row["display_name"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
return web.json_response(full)
async def handle_contributors_list(request):
"""GET /contributors — list all contributors, sorted by CI."""
conn = _conn(request)
rows = conn.execute(
"SELECT handle, tier, claims_merged, sourcer_count, extractor_count, "
"challenger_count, synthesizer_count, reviewer_count, last_contribution "
"FROM contributors ORDER BY claims_merged DESC"
).fetchall()
contributors = []
for row in rows:
ci_total = sum(
(row[f"{role}_count"] or 0) * config.CONTRIBUTION_ROLE_WEIGHTS.get(role, 0)
for role in ("sourcer", "extractor", "challenger", "synthesizer", "reviewer")
)
contributors.append({
"handle": row["handle"],
"tier": row["tier"],
"claims_merged": row["claims_merged"] or 0,
"ci": round(ci_total, 2),
"last_contribution": row["last_contribution"],
})
return web.json_response({"contributors": contributors, "total": len(contributors)})
async def handle_dashboard(request):
"""GET /dashboard — human-readable HTML metrics page."""
conn = _conn(request)
# Gather same data as /metrics
now = datetime.now(timezone.utc)
today_str = now.strftime("%Y-%m-%d")
statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
status_map = {r["status"]: r["n"] for r in statuses}
# Approval rate (24h)
evaluated = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected') AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approved = conn.execute(
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' AND event='approved' AND timestamp > datetime('now','-24 hours')"
).fetchone()["n"]
approval_rate = round(approved / evaluated, 3) if evaluated else 0
# Throughput
merged_1h = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE merged_at > datetime('now','-1 hour')"
).fetchone()["n"]
# Rejection reasons
reasons = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now','-24 hours')
GROUP BY tag ORDER BY cnt DESC LIMIT 10"""
).fetchall()
# Fix cycle
fix_attempted = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE fix_attempts > 0"
).fetchone()["n"]
fix_succeeded = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE fix_attempts > 0 AND status = 'merged'"
).fetchone()["n"]
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted else 0
# Build HTML
status_rows = "".join(
f"<tr><td>{s}</td><td><strong>{status_map.get(s, 0)}</strong></td></tr>"
for s in ["open", "merged", "closed", "approved", "conflict", "reviewing"]
if status_map.get(s, 0) > 0
)
reason_rows = "".join(
f"<tr><td>{r['tag']}</td><td>{r['cnt']}</td></tr>"
for r in reasons
)
html = f"""<!DOCTYPE html>
<html><head>
<meta charset="utf-8"><title>Pipeline Dashboard</title>
<meta http-equiv="refresh" content="30">
<style>
body {{ font-family: -apple-system, system-ui, sans-serif; max-width: 900px; margin: 40px auto; padding: 0 20px; background: #0d1117; color: #c9d1d9; }}
h1 {{ color: #58a6ff; margin-bottom: 5px; }}
.subtitle {{ color: #8b949e; margin-bottom: 30px; }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 16px; margin-bottom: 30px; }}
.card {{ background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 20px; }}
.card .label {{ color: #8b949e; font-size: 13px; text-transform: uppercase; letter-spacing: 0.5px; }}
.card .value {{ font-size: 32px; font-weight: 700; margin-top: 4px; }}
.green {{ color: #3fb950; }}
.yellow {{ color: #d29922; }}
.red {{ color: #f85149; }}
table {{ width: 100%; border-collapse: collapse; margin-top: 10px; }}
th, td {{ text-align: left; padding: 8px 12px; border-bottom: 1px solid #21262d; }}
th {{ color: #8b949e; font-size: 12px; text-transform: uppercase; }}
h2 {{ color: #58a6ff; margin-top: 30px; font-size: 16px; }}
</style>
</head><body>
<h1>Teleo Pipeline</h1>
<p class="subtitle">Auto-refreshes every 30s &middot; {now.strftime("%Y-%m-%d %H:%M UTC")}</p>
<div class="grid">
<div class="card">
<div class="label">Throughput</div>
<div class="value">{merged_1h}<span style="font-size:16px;color:#8b949e">/hr</span></div>
</div>
<div class="card">
<div class="label">Approval Rate (24h)</div>
<div class="value {'green' if approval_rate > 0.3 else 'yellow' if approval_rate > 0.15 else 'red'}">{approval_rate:.1%}</div>
</div>
<div class="card">
<div class="label">Open PRs</div>
<div class="value">{status_map.get('open', 0)}</div>
</div>
<div class="card">
<div class="label">Merged</div>
<div class="value green">{status_map.get('merged', 0)}</div>
</div>
<div class="card">
<div class="label">Fix Success</div>
<div class="value {'red' if fix_rate < 0.1 else 'yellow'}">{fix_rate:.1%}</div>
</div>
<div class="card">
<div class="label">Evaluated (24h)</div>
<div class="value">{evaluated}</div>
</div>
</div>
<h2>Backlog</h2>
<table>{status_rows}</table>
<h2>Top Rejection Reasons (24h)</h2>
<table><tr><th>Issue</th><th>Count</th></tr>{reason_rows}</table>
<p style="margin-top:40px;color:#484f58;font-size:12px;">
<a href="/metrics" style="color:#484f58;">JSON API</a> &middot;
<a href="/health" style="color:#484f58;">Health</a> &middot;
<a href="/activity" style="color:#484f58;">Activity</a>
</p>
</body></html>"""
return web.Response(text=html, content_type="text/html")
async def handle_feedback(request):
"""GET /feedback/{agent} — per-agent rejection patterns with actionable guidance.
Returns top rejection reasons, approval rate, and fix instructions.
Agents query this to learn from their mistakes. (Epimetheus)
Optional ?hours=N (default 168 = 7 days).
"""
conn = _conn(request)
agent = request.match_info["agent"]
hours = int(request.query.get("hours", "168"))
result = get_agent_error_patterns(conn, agent, hours)
return web.json_response(result)
async def handle_feedback_all(request):
"""GET /feedback — rejection patterns for all agents.
Optional ?hours=N (default 168 = 7 days).
"""
conn = _conn(request)
hours = int(request.query.get("hours", "168"))
result = get_all_agent_patterns(conn, hours)
return web.json_response(result)
async def handle_claim_index(request):
"""GET /claim-index — structured index of all KB claims.
Returns full claim index with titles, domains, confidence, wiki links,
incoming/outgoing counts, orphan ratio, cross-domain link count.
Consumed by Argus (dashboard), Vida (vital signs).
Also writes to disk for file-based consumers.
"""
repo_root = str(config.MAIN_WORKTREE)
index = build_claim_index(repo_root)
# Also write to disk (atomic)
try:
write_claim_index(repo_root)
except Exception:
pass # Non-fatal — API response is primary
return web.json_response(index)
async def handle_analytics_data(request):
"""GET /analytics/data — time-series snapshot history for Chart.js.
Returns snapshot array + version change annotations.
Optional ?days=N (default 7).
"""
conn = _conn(request)
days = int(request.query.get("days", "7"))
snapshots = get_snapshot_history(conn, days)
changes = get_version_changes(conn, days)
return web.json_response({
"snapshots": snapshots,
"version_changes": changes,
"days": days,
"count": len(snapshots),
})
def create_app() -> web.Application:
"""Create the health API application."""
app = web.Application()
# Persistent readonly connection — one connection, no churn (Ganymede)
app["db"] = db.get_connection(readonly=True)
app.router.add_get("/health", handle_health)
app.router.add_get("/costs", handle_costs)
app.router.add_get("/sources", handle_sources)
app.router.add_get("/prs", handle_prs)
app.router.add_get("/breakers", handle_breakers)
app.router.add_get("/metrics", handle_metrics)
app.router.add_get("/dashboard", handle_dashboard)
app.router.add_get("/contributor/{handle}", handle_contributor)
app.router.add_get("/contributors", handle_contributors_list)
app.router.add_get("/", handle_dashboard)
app.router.add_get("/activity", handle_activity)
app.router.add_get("/calibration", handle_calibration)
app.router.add_get("/feedback/{agent}", handle_feedback)
app.router.add_get("/feedback", handle_feedback_all)
app.router.add_get("/analytics/data", handle_analytics_data)
app.router.add_get("/claim-index", handle_claim_index)
app.on_cleanup.append(_cleanup)
return app
async def _cleanup(app):
app["db"].close()
async def start_health_server(runner_ref: list):
"""Start the health HTTP server. Stores runner in runner_ref for shutdown."""
app = create_app()
runner = web.AppRunner(app)
await runner.setup()
# Bind to all interfaces — metrics are read-only, no sensitive data (Cory, Mar 14)
site = web.TCPSite(runner, "0.0.0.0", config.HEALTH_PORT)
await site.start()
runner_ref.append(runner)
logger.info("Health API listening on 0.0.0.0:%d", config.HEALTH_PORT)
async def stop_health_server(runner_ref: list):
"""Stop the health HTTP server."""
for runner in runner_ref:
await runner.cleanup()
logger.info("Health API stopped")