teleo-codex/diagnostics/alerting_routes.py
m3taversal 33e670b436 argus: add active alerting system (Phase 1)
Three new files for the engineering acceleration initiative:
- alerting.py: 7 health check functions (dormant agents, quality regression,
  throughput anomaly, rejection spikes, stuck loops, cost spikes, domain
  rejection patterns) + failure report generator
- alerting_routes.py: /check, /api/alerts, /api/failure-report/{agent} endpoints
- PATCH_INSTRUCTIONS.md: integration guide for app.py (imports, route
  registration, auth middleware bypass, DB connection)

Observe and alert only — no pipeline modification. Independence constraint
is load-bearing for measurement trustworthiness.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:45:07 +00:00

125 lines
3.9 KiB
Python

"""Route handlers for /check and /api/alerts endpoints.
Import into app.py and register routes in create_app().
"""
import json
import logging
from datetime import datetime, timezone
from aiohttp import web
from alerting import run_all_checks, generate_failure_report, format_alert_message # requires CWD = deploy dir; switch to relative import if packaged
logger = logging.getLogger("argus.alerting")
# In-memory alert store (replaced each /check cycle, persists between requests)
_active_alerts: list[dict] = []
_last_check: str | None = None
async def handle_check(request):
"""GET /check — run all monitoring checks, update active alerts, return results.
Designed to be called by systemd timer every 5 minutes.
Returns JSON summary of all detected issues.
"""
conn = request.app["_alerting_conn_func"]()
try:
alerts = run_all_checks(conn)
except Exception as e:
logger.error("Check failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
global _active_alerts, _last_check
_active_alerts = alerts
_last_check = datetime.now(timezone.utc).isoformat()
# Generate failure reports for agents with stuck loops
failure_reports = {}
stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]}
for agent in stuck_agents:
report = generate_failure_report(conn, agent)
if report:
failure_reports[agent] = report
result = {
"checked_at": _last_check,
"alert_count": len(alerts),
"critical": sum(1 for a in alerts if a["severity"] == "critical"),
"warning": sum(1 for a in alerts if a["severity"] == "warning"),
"info": sum(1 for a in alerts if a["severity"] == "info"),
"alerts": alerts,
"failure_reports": failure_reports,
}
logger.info(
"Check complete: %d alerts (%d critical, %d warning)",
len(alerts),
result["critical"],
result["warning"],
)
return web.json_response(result)
async def handle_api_alerts(request):
"""GET /api/alerts — return current active alerts.
Query params:
severity: filter by severity (critical, warning, info)
category: filter by category (health, quality, throughput, failure_pattern)
agent: filter by agent name
domain: filter by domain
"""
alerts = list(_active_alerts)
# Filters
severity = request.query.get("severity")
if severity:
alerts = [a for a in alerts if a["severity"] == severity]
category = request.query.get("category")
if category:
alerts = [a for a in alerts if a["category"] == category]
agent = request.query.get("agent")
if agent:
alerts = [a for a in alerts if a.get("agent") == agent]
domain = request.query.get("domain")
if domain:
alerts = [a for a in alerts if a.get("domain") == domain]
return web.json_response({
"alerts": alerts,
"total": len(alerts),
"last_check": _last_check,
})
async def handle_api_failure_report(request):
"""GET /api/failure-report/{agent} — generate failure report for an agent.
Query params:
hours: lookback window (default 24)
"""
agent = request.match_info["agent"]
hours = int(request.query.get("hours", "24"))
conn = request.app["_alerting_conn_func"]()
report = generate_failure_report(conn, agent, hours)
if not report:
return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours})
return web.json_response(report)
def register_alerting_routes(app, get_conn_func):
"""Register alerting routes on the app.
get_conn_func: callable that returns a read-only sqlite3.Connection
"""
app["_alerting_conn_func"] = get_conn_func
app.router.add_get("/check", handle_check)
app.router.add_get("/api/alerts", handle_api_alerts)
app.router.add_get("/api/failure-report/{agent}", handle_api_failure_report)