Three new files for the engineering acceleration initiative:
- alerting.py: 7 health check functions (dormant agents, quality regression,
throughput anomaly, rejection spikes, stuck loops, cost spikes, domain
rejection patterns) + failure report generator
- alerting_routes.py: /check, /api/alerts, /api/failure-report/{agent} endpoints
- PATCH_INSTRUCTIONS.md: integration guide for app.py (imports, route
registration, auth middleware bypass, DB connection)
Observe and alert only — no pipeline modification. Independence constraint
is load-bearing for measurement trustworthiness.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
"""Route handlers for /check and /api/alerts endpoints.
|
|
|
|
Import into app.py and register routes in create_app().
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from aiohttp import web
|
|
from alerting import run_all_checks, generate_failure_report, format_alert_message # requires CWD = deploy dir; switch to relative import if packaged
|
|
|
|
logger = logging.getLogger("argus.alerting")
|
|
|
|
# In-memory alert store (replaced each /check cycle, persists between requests)
|
|
_active_alerts: list[dict] = []
|
|
_last_check: str | None = None
|
|
|
|
|
|
async def handle_check(request):
|
|
"""GET /check — run all monitoring checks, update active alerts, return results.
|
|
|
|
Designed to be called by systemd timer every 5 minutes.
|
|
Returns JSON summary of all detected issues.
|
|
"""
|
|
conn = request.app["_alerting_conn_func"]()
|
|
try:
|
|
alerts = run_all_checks(conn)
|
|
except Exception as e:
|
|
logger.error("Check failed: %s", e)
|
|
return web.json_response({"error": str(e)}, status=500)
|
|
|
|
global _active_alerts, _last_check
|
|
_active_alerts = alerts
|
|
_last_check = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Generate failure reports for agents with stuck loops
|
|
failure_reports = {}
|
|
stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]}
|
|
for agent in stuck_agents:
|
|
report = generate_failure_report(conn, agent)
|
|
if report:
|
|
failure_reports[agent] = report
|
|
|
|
result = {
|
|
"checked_at": _last_check,
|
|
"alert_count": len(alerts),
|
|
"critical": sum(1 for a in alerts if a["severity"] == "critical"),
|
|
"warning": sum(1 for a in alerts if a["severity"] == "warning"),
|
|
"info": sum(1 for a in alerts if a["severity"] == "info"),
|
|
"alerts": alerts,
|
|
"failure_reports": failure_reports,
|
|
}
|
|
|
|
logger.info(
|
|
"Check complete: %d alerts (%d critical, %d warning)",
|
|
len(alerts),
|
|
result["critical"],
|
|
result["warning"],
|
|
)
|
|
|
|
return web.json_response(result)
|
|
|
|
|
|
async def handle_api_alerts(request):
|
|
"""GET /api/alerts — return current active alerts.
|
|
|
|
Query params:
|
|
severity: filter by severity (critical, warning, info)
|
|
category: filter by category (health, quality, throughput, failure_pattern)
|
|
agent: filter by agent name
|
|
domain: filter by domain
|
|
"""
|
|
alerts = list(_active_alerts)
|
|
|
|
# Filters
|
|
severity = request.query.get("severity")
|
|
if severity:
|
|
alerts = [a for a in alerts if a["severity"] == severity]
|
|
|
|
category = request.query.get("category")
|
|
if category:
|
|
alerts = [a for a in alerts if a["category"] == category]
|
|
|
|
agent = request.query.get("agent")
|
|
if agent:
|
|
alerts = [a for a in alerts if a.get("agent") == agent]
|
|
|
|
domain = request.query.get("domain")
|
|
if domain:
|
|
alerts = [a for a in alerts if a.get("domain") == domain]
|
|
|
|
return web.json_response({
|
|
"alerts": alerts,
|
|
"total": len(alerts),
|
|
"last_check": _last_check,
|
|
})
|
|
|
|
|
|
async def handle_api_failure_report(request):
|
|
"""GET /api/failure-report/{agent} — generate failure report for an agent.
|
|
|
|
Query params:
|
|
hours: lookback window (default 24)
|
|
"""
|
|
agent = request.match_info["agent"]
|
|
hours = int(request.query.get("hours", "24"))
|
|
conn = request.app["_alerting_conn_func"]()
|
|
|
|
report = generate_failure_report(conn, agent, hours)
|
|
if not report:
|
|
return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours})
|
|
|
|
return web.json_response(report)
|
|
|
|
|
|
def register_alerting_routes(app, get_conn_func):
|
|
"""Register alerting routes on the app.
|
|
|
|
get_conn_func: callable that returns a read-only sqlite3.Connection
|
|
"""
|
|
app["_alerting_conn_func"] = get_conn_func
|
|
app.router.add_get("/check", handle_check)
|
|
app.router.add_get("/api/alerts", handle_api_alerts)
|
|
app.router.add_get("/api/failure-report/{agent}", handle_api_failure_report)
|