teleo-infrastructure/lib/health.py
m3taversal a7251d7529
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
ganymede: add dev infrastructure — pyproject.toml, CI, deploy script
Phase 2 of pipeline refactoring:

- pyproject.toml: Python >=3.11, aiohttp dep, dev extras (pytest,
  pytest-asyncio, ruff). Ruff configured with sane defaults + ignore
  rules for existing code patterns (implicit Optional, timezone.utc).
- .forgejo/workflows/ci.yml: Forgejo Actions CI — syntax check, ruff
  lint, ruff format, pytest on every PR and push to main.
- deploy.sh: Pull + venv update + syntax check + optional restart.
  Replaces ad-hoc scp workflow.
- tests/conftest.py: Shared fixture for in-memory SQLite with full
  schema. Ready for Phase 4 test suite.
- .gitignore: Added venv, pytest cache, coverage, build artifacts.
- Ruff auto-fixes: import sorting, unused imports removed across all
  modules. All files pass ruff check + ruff format.

Pentagon-Agent: Ganymede <F99EBFA6-547B-4096-BEEA-1D59C3E4028A>
2026-03-13 14:24:27 +00:00

244 lines
9.2 KiB
Python

"""Health API — HTTP server on configurable port for monitoring."""
import logging
from datetime import date, datetime, timezone
from aiohttp import web
from . import config, costs, db
logger = logging.getLogger("pipeline.health")
def _conn(request):
"""Get the persistent readonly connection from app state."""
return request.app["db"]
async def handle_health(request):
"""GET /health — overall pipeline health."""
conn = _conn(request)
# Stage status from circuit breakers
breakers = conn.execute(
"SELECT name, state, failures, last_success_at, last_update FROM circuit_breakers"
).fetchall()
# Queue depths
sources_by_status = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall()
prs_by_status = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
# Per-domain merge queue depth (Vida)
merge_queue = conn.execute(
"SELECT domain, COUNT(*) as n FROM prs WHERE status = 'approved' GROUP BY domain"
).fetchall()
# Cost
budget = costs.check_budget(conn)
# Metabolic metrics (Vida)
null_rate = conn.execute(
"""SELECT
CAST(SUM(CASE WHEN status = 'null_result' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as rate
FROM sources
WHERE updated_at > datetime('now', '-24 hours')
AND status IN ('extracted', 'null_result', 'error')"""
).fetchone()
approval_rate = conn.execute(
"""SELECT
CAST(SUM(CASE WHEN domain_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as domain_rate,
CAST(SUM(CASE WHEN leo_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
NULLIF(COUNT(*), 0) as leo_rate
FROM prs
WHERE last_attempt > datetime('now', '-24 hours')
AND domain_verdict != 'pending'"""
).fetchone()
# Recent activity (last hour)
recent = conn.execute(
"""SELECT stage, event, COUNT(*) as n
FROM audit_log
WHERE timestamp > datetime('now', '-1 hour')
GROUP BY stage, event"""
).fetchall()
body = {
"status": "healthy",
"breakers": {},
"sources": {r["status"]: r["n"] for r in sources_by_status},
"prs": {r["status"]: r["n"] for r in prs_by_status},
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
"budget": budget,
"metabolic": {
"null_result_rate_24h": round(null_rate["rate"], 3)
if null_rate and null_rate["rate"] is not None
else None,
"domain_approval_rate_24h": round(approval_rate["domain_rate"], 3)
if approval_rate and approval_rate["domain_rate"] is not None
else None,
"leo_approval_rate_24h": round(approval_rate["leo_rate"], 3)
if approval_rate and approval_rate["leo_rate"] is not None
else None,
},
"recent_activity": [{"stage": r["stage"], "event": r["event"], "count": r["n"]} for r in recent],
}
# Breaker state + stall detection (Vida: last_success_at heartbeat)
for r in breakers:
breaker_info = {"state": r["state"], "failures": r["failures"]}
if r["last_success_at"]:
last = datetime.fromisoformat(r["last_success_at"])
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
age_s = (datetime.now(timezone.utc) - last).total_seconds()
breaker_info["last_success_age_s"] = round(age_s)
# Stall detection: no success in 2x the stage's interval
intervals = {
"ingest": config.INGEST_INTERVAL,
"validate": config.VALIDATE_INTERVAL,
"evaluate": config.EVAL_INTERVAL,
"merge": config.MERGE_INTERVAL,
}
threshold = intervals.get(r["name"], 60) * 2
if age_s > threshold:
breaker_info["stalled"] = True
body["breakers"][r["name"]] = breaker_info
# Overall status
if any(b.get("stalled") for b in body["breakers"].values()):
body["status"] = "stalled"
if any(b["state"] == "open" for b in body["breakers"].values()):
body["status"] = "degraded"
if not budget["ok"]:
body["status"] = "budget_exhausted"
# Rubber-stamp warning (Vida)
if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95:
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
status_code = 200 if body["status"] == "healthy" else 503
return web.json_response(body, status=status_code)
async def handle_costs(request):
"""GET /costs — daily cost breakdown."""
conn = _conn(request)
day = request.query.get("date", date.today().isoformat())
breakdown = costs.get_daily_breakdown(conn, day)
budget = costs.check_budget(conn)
return web.json_response({"date": day, "budget": budget, "breakdown": breakdown})
async def handle_sources(request):
"""GET /sources — source pipeline status."""
conn = _conn(request)
status_filter = request.query.get("status")
if status_filter:
rows = conn.execute(
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources WHERE status = ? ORDER BY updated_at DESC LIMIT 50",
(status_filter,),
).fetchall()
else:
rows = conn.execute(
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources ORDER BY updated_at DESC LIMIT 50"
).fetchall()
return web.json_response({"sources": [dict(r) for r in rows]})
async def handle_prs(request):
"""GET /prs — PR pipeline status."""
conn = _conn(request)
status_filter = request.query.get("status")
if status_filter:
rows = conn.execute(
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs WHERE status = ? ORDER BY number DESC LIMIT 50",
(status_filter,),
).fetchall()
else:
rows = conn.execute(
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs ORDER BY number DESC LIMIT 50"
).fetchall()
return web.json_response({"prs": [dict(r) for r in rows]})
async def handle_breakers(request):
"""GET /breakers — circuit breaker states."""
conn = _conn(request)
rows = conn.execute("SELECT * FROM circuit_breakers").fetchall()
return web.json_response({"breakers": [dict(r) for r in rows]})
async def handle_calibration(request):
"""GET /calibration — priority calibration analysis (Vida)."""
conn = _conn(request)
# Find sources where eval disagreed with ingest priority
# Focus on upgrades (Theseus: upgrades are the learnable signal)
rows = conn.execute(
"""SELECT path, priority, priority_log FROM sources
WHERE json_array_length(priority_log) >= 2"""
).fetchall()
upgrades = []
downgrades = []
for r in rows:
import json
log = json.loads(r["priority_log"] or "[]")
if len(log) < 2:
continue
first = log[0]["priority"]
last = log[-1]["priority"]
levels = {"critical": 4, "high": 3, "medium": 2, "low": 1, "skip": 0}
if levels.get(last, 2) > levels.get(first, 2):
upgrades.append({"path": r["path"], "from": first, "to": last})
elif levels.get(last, 2) < levels.get(first, 2):
downgrades.append({"path": r["path"], "from": first, "to": last})
return web.json_response(
{
"upgrades": upgrades[:20],
"downgrades_count": len(downgrades),
"upgrades_count": len(upgrades),
"note": "Focus on upgrades — downgrades are expected (downstream has more context)",
}
)
def create_app() -> web.Application:
"""Create the health API application."""
app = web.Application()
# Persistent readonly connection — one connection, no churn (Ganymede)
app["db"] = db.get_connection(readonly=True)
app.router.add_get("/health", handle_health)
app.router.add_get("/costs", handle_costs)
app.router.add_get("/sources", handle_sources)
app.router.add_get("/prs", handle_prs)
app.router.add_get("/breakers", handle_breakers)
app.router.add_get("/calibration", handle_calibration)
app.on_cleanup.append(_cleanup)
return app
async def _cleanup(app):
app["db"].close()
async def start_health_server(runner_ref: list):
"""Start the health HTTP server. Stores runner in runner_ref for shutdown."""
app = create_app()
runner = web.AppRunner(app)
await runner.setup()
# Bind to 127.0.0.1 only — use reverse proxy for external access (Ganymede)
site = web.TCPSite(runner, "127.0.0.1", config.HEALTH_PORT)
await site.start()
runner_ref.append(runner)
logger.info("Health API listening on 127.0.0.1:%d", config.HEALTH_PORT)
async def stop_health_server(runner_ref: list):
"""Stop the health HTTP server."""
for runner in runner_ref:
await runner.cleanup()
logger.info("Health API stopped")