"""Route handlers for /check and /api/alerts endpoints. Import into app.py and register routes in create_app(). """ import json import logging from datetime import datetime, timezone from aiohttp import web from alerting import run_all_checks, generate_failure_report, format_alert_message # requires CWD = deploy dir; switch to relative import if packaged logger = logging.getLogger("argus.alerting") # In-memory alert store (replaced each /check cycle, persists between requests) _active_alerts: list[dict] = [] _last_check: str | None = None async def handle_check(request): """GET /check — run all monitoring checks, update active alerts, return results. Designed to be called by systemd timer every 5 minutes. Returns JSON summary of all detected issues. """ conn = request.app["_alerting_conn_func"]() try: alerts = run_all_checks(conn) except Exception as e: logger.error("Check failed: %s", e) return web.json_response({"error": str(e)}, status=500) global _active_alerts, _last_check _active_alerts = alerts _last_check = datetime.now(timezone.utc).isoformat() # Generate failure reports for agents with stuck loops failure_reports = {} stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]} for agent in stuck_agents: report = generate_failure_report(conn, agent) if report: failure_reports[agent] = report result = { "checked_at": _last_check, "alert_count": len(alerts), "critical": sum(1 for a in alerts if a["severity"] == "critical"), "warning": sum(1 for a in alerts if a["severity"] == "warning"), "info": sum(1 for a in alerts if a["severity"] == "info"), "alerts": alerts, "failure_reports": failure_reports, } logger.info( "Check complete: %d alerts (%d critical, %d warning)", len(alerts), result["critical"], result["warning"], ) return web.json_response(result) async def handle_api_alerts(request): """GET /api/alerts — return current active alerts. Query params: severity: filter by severity (critical, warning, info) category: filter by category (health, quality, throughput, failure_pattern) agent: filter by agent name domain: filter by domain """ alerts = list(_active_alerts) # Filters severity = request.query.get("severity") if severity: alerts = [a for a in alerts if a["severity"] == severity] category = request.query.get("category") if category: alerts = [a for a in alerts if a["category"] == category] agent = request.query.get("agent") if agent: alerts = [a for a in alerts if a.get("agent") == agent] domain = request.query.get("domain") if domain: alerts = [a for a in alerts if a.get("domain") == domain] return web.json_response({ "alerts": alerts, "total": len(alerts), "last_check": _last_check, }) async def handle_api_failure_report(request): """GET /api/failure-report/{agent} — generate failure report for an agent. Query params: hours: lookback window (default 24) """ agent = request.match_info["agent"] hours = int(request.query.get("hours", "24")) conn = request.app["_alerting_conn_func"]() report = generate_failure_report(conn, agent, hours) if not report: return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours}) return web.json_response(report) def register_alerting_routes(app, get_conn_func): """Register alerting routes on the app. get_conn_func: callable that returns a read-only sqlite3.Connection """ app["_alerting_conn_func"] = get_conn_func app.router.add_get("/check", handle_check) app.router.add_get("/api/alerts", handle_api_alerts) app.router.add_get("/api/failure-report/{agent}", handle_api_failure_report)