From 33e670b436f4c54995e3b97acb7433ed8f3bce6a Mon Sep 17 00:00:00 2001 From: m3taversal Date: Sat, 28 Mar 2026 22:32:11 +0000 Subject: [PATCH] argus: add active alerting system (Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new files for the engineering acceleration initiative: - alerting.py: 7 health check functions (dormant agents, quality regression, throughput anomaly, rejection spikes, stuck loops, cost spikes, domain rejection patterns) + failure report generator - alerting_routes.py: /check, /api/alerts, /api/failure-report/{agent} endpoints - PATCH_INSTRUCTIONS.md: integration guide for app.py (imports, route registration, auth middleware bypass, DB connection) Observe and alert only — no pipeline modification. Independence constraint is load-bearing for measurement trustworthiness. Co-Authored-By: Claude Opus 4.6 (1M context) --- diagnostics/PATCH_INSTRUCTIONS.md | 65 ++++ diagnostics/alerting.py | 537 ++++++++++++++++++++++++++++++ diagnostics/alerting_routes.py | 125 +++++++ 3 files changed, 727 insertions(+) create mode 100644 diagnostics/PATCH_INSTRUCTIONS.md create mode 100644 diagnostics/alerting.py create mode 100644 diagnostics/alerting_routes.py diff --git a/diagnostics/PATCH_INSTRUCTIONS.md b/diagnostics/PATCH_INSTRUCTIONS.md new file mode 100644 index 00000000..ccb21875 --- /dev/null +++ b/diagnostics/PATCH_INSTRUCTIONS.md @@ -0,0 +1,65 @@ +# Alerting Integration Patch for app.py + +Two changes needed in the live app.py: + +## 1. Add import (after `from activity_endpoint import handle_activity`) + +```python +from alerting_routes import register_alerting_routes +``` + +## 2. Register routes in create_app() (after the last `app.router.add_*` line) + +```python + # Alerting — active monitoring endpoints + register_alerting_routes(app, _alerting_conn) +``` + +## 3. Add helper function (before create_app) + +```python +def _alerting_conn() -> sqlite3.Connection: + """Dedicated read-only connection for alerting checks. + + Separate from app['db'] to avoid contention with request handlers. + Always sets row_factory for named column access. + """ + conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + return conn +``` + +## 4. Add /check and /api/alerts to PUBLIC_PATHS + +```python +_PUBLIC_PATHS = frozenset({"/", "/api/metrics", "/api/rejections", "/api/snapshots", + "/api/vital-signs", "/api/contributors", "/api/domains", + "/api/audit", "/check", "/api/alerts"}) +``` + +## 5. Add /api/failure-report/ prefix check in auth middleware + +In the `@web.middleware` auth function, add this alongside the existing +`request.path.startswith("/api/audit/")` check: + +```python + if request.path.startswith("/api/failure-report/"): + return await handler(request) +``` + +## Deploy notes + +- `alerting.py` and `alerting_routes.py` must be in the **same directory** as `app.py` + (i.e., `/opt/teleo-eval/diagnostics/`). The import uses a bare module name, not + a relative import, so Python resolves it via `sys.path` which includes the working + directory. If the deploy changes the working directory or uses a package structure, + switch the import in `alerting_routes.py` line 11 to `from .alerting import ...`. + +- The `/api/failure-report/{agent}` endpoint is standalone — any agent can pull their + own report on demand via `GET /api/failure-report/?hours=24`. + +## Files to deploy + +- `alerting.py` → `/opt/teleo-eval/diagnostics/alerting.py` +- `alerting_routes.py` → `/opt/teleo-eval/diagnostics/alerting_routes.py` +- Patched `app.py` → `/opt/teleo-eval/diagnostics/app.py` diff --git a/diagnostics/alerting.py b/diagnostics/alerting.py new file mode 100644 index 00000000..33dde714 --- /dev/null +++ b/diagnostics/alerting.py @@ -0,0 +1,537 @@ +"""Argus active monitoring — health watchdog, quality regression, throughput anomaly detection. + +Provides check functions that detect problems and return structured alerts. +Called by /check endpoint (periodic cron) or on-demand. + +Alert schema: + { + "id": str, # unique key for dedup (e.g. "dormant:ganymede") + "severity": str, # "critical" | "warning" | "info" + "category": str, # "health" | "quality" | "throughput" | "failure_pattern" + "title": str, # human-readable headline + "detail": str, # actionable description + "agent": str|None, # affected agent (if applicable) + "domain": str|None, # affected domain (if applicable) + "detected_at": str, # ISO timestamp + "auto_resolve": bool, # clears when condition clears + } +""" + +import json +import sqlite3 +import statistics +from datetime import datetime, timezone + + +# ─── Agent-domain mapping (static config, maintained by Argus) ────────────── + +AGENT_DOMAINS = { + "rio": ["internet-finance"], + "clay": ["creative-industries"], + "ganymede": None, # reviewer — cross-domain + "epimetheus": None, # infra + "leo": None, # standards + "oberon": None, # evolution tracking + "vida": None, # health monitoring + "hermes": None, # comms + "astra": None, # research +} + +# Thresholds +DORMANCY_HOURS = 48 +APPROVAL_DROP_THRESHOLD = 15 # percentage points below 7-day baseline +THROUGHPUT_DROP_RATIO = 0.5 # alert if today < 50% of 7-day SMA +REJECTION_SPIKE_RATIO = 0.20 # single reason > 20% of recent rejections +STUCK_LOOP_THRESHOLD = 3 # same agent + same rejection reason > N times in 6h +COST_SPIKE_RATIO = 2.0 # daily cost > 2x 7-day average + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +# ─── Check: Agent Health (dormancy detection) ─────────────────────────────── + + +def check_agent_health(conn: sqlite3.Connection) -> list[dict]: + """Detect agents with no PR activity in the last DORMANCY_HOURS hours.""" + alerts = [] + + # Get last activity per agent + rows = conn.execute( + """SELECT agent, MAX(last_attempt) as latest, COUNT(*) as total_prs + FROM prs WHERE agent IS NOT NULL + GROUP BY agent""" + ).fetchall() + + now = datetime.now(timezone.utc) + for r in rows: + agent = r["agent"] + latest = r["latest"] + if not latest: + continue + + last_dt = datetime.fromisoformat(latest) + if last_dt.tzinfo is None: + last_dt = last_dt.replace(tzinfo=timezone.utc) + + hours_since = (now - last_dt).total_seconds() / 3600 + + if hours_since > DORMANCY_HOURS: + alerts.append({ + "id": f"dormant:{agent}", + "severity": "warning", + "category": "health", + "title": f"Agent '{agent}' dormant for {int(hours_since)}h", + "detail": ( + f"No PR activity since {latest}. " + f"Last seen {int(hours_since)}h ago (threshold: {DORMANCY_HOURS}h). " + f"Total historical PRs: {r['total_prs']}." + ), + "agent": agent, + "domain": None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + return alerts + + +# ─── Check: Quality Regression (approval rate drop) ───────────────────────── + + +def check_quality_regression(conn: sqlite3.Connection) -> list[dict]: + """Detect approval rate drops vs 7-day baseline, per agent and per domain.""" + alerts = [] + + # 7-day baseline approval rate (overall) + baseline = conn.execute( + """SELECT + COUNT(CASE WHEN event='approved' THEN 1 END) as approved, + COUNT(*) as total + FROM audit_log + WHERE stage='evaluate' + AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-7 days')""" + ).fetchone() + baseline_rate = (baseline["approved"] / baseline["total"] * 100) if baseline["total"] else None + + # 24h approval rate (overall) + recent = conn.execute( + """SELECT + COUNT(CASE WHEN event='approved' THEN 1 END) as approved, + COUNT(*) as total + FROM audit_log + WHERE stage='evaluate' + AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-24 hours')""" + ).fetchone() + recent_rate = (recent["approved"] / recent["total"] * 100) if recent["total"] else None + + if baseline_rate is not None and recent_rate is not None: + drop = baseline_rate - recent_rate + if drop > APPROVAL_DROP_THRESHOLD: + alerts.append({ + "id": "quality_regression:overall", + "severity": "critical", + "category": "quality", + "title": f"Approval rate dropped {drop:.0f}pp (24h: {recent_rate:.0f}% vs 7d: {baseline_rate:.0f}%)", + "detail": ( + f"24h approval rate ({recent_rate:.1f}%) is {drop:.1f} percentage points below " + f"7-day baseline ({baseline_rate:.1f}%). " + f"Evaluated {recent['total']} PRs in last 24h." + ), + "agent": None, + "domain": None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + # Per-agent approval rate (24h vs 7d) — only for agents with >=5 evals in each window + # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) + _check_approval_by_dimension(conn, alerts, "agent", "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))") + + # Per-domain approval rate (24h vs 7d) — Theseus addition + _check_approval_by_dimension(conn, alerts, "domain", "json_extract(detail, '$.domain')") + + return alerts + + +def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr): + """Check approval rate regression grouped by a dimension (agent or domain).""" + # 7-day baseline per dimension + baseline_rows = conn.execute( + f"""SELECT {dim_expr} as dim_val, + COUNT(CASE WHEN event='approved' THEN 1 END) as approved, + COUNT(*) as total + FROM audit_log + WHERE stage='evaluate' + AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-7 days') + AND {dim_expr} IS NOT NULL + GROUP BY dim_val HAVING total >= 5""" + ).fetchall() + baselines = {r["dim_val"]: (r["approved"] / r["total"] * 100) for r in baseline_rows} + + # 24h per dimension + recent_rows = conn.execute( + f"""SELECT {dim_expr} as dim_val, + COUNT(CASE WHEN event='approved' THEN 1 END) as approved, + COUNT(*) as total + FROM audit_log + WHERE stage='evaluate' + AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-24 hours') + AND {dim_expr} IS NOT NULL + GROUP BY dim_val HAVING total >= 5""" + ).fetchall() + + for r in recent_rows: + val = r["dim_val"] + if val not in baselines: + continue + recent_rate = r["approved"] / r["total"] * 100 + base_rate = baselines[val] + drop = base_rate - recent_rate + if drop > APPROVAL_DROP_THRESHOLD: + alerts.append({ + "id": f"quality_regression:{dim_name}:{val}", + "severity": "warning", + "category": "quality", + "title": f"{dim_name.title()} '{val}' approval dropped {drop:.0f}pp", + "detail": ( + f"24h: {recent_rate:.1f}% vs 7d baseline: {base_rate:.1f}% " + f"({r['total']} evals in 24h)." + ), + "agent": val if dim_name == "agent" else None, + "domain": val if dim_name == "domain" else None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + +# ─── Check: Throughput Anomaly ────────────────────────────────────────────── + + +def check_throughput(conn: sqlite3.Connection) -> list[dict]: + """Detect throughput stalling — today vs 7-day SMA.""" + alerts = [] + + # Daily merged counts for last 7 days + rows = conn.execute( + """SELECT date(merged_at) as day, COUNT(*) as n + FROM prs WHERE merged_at > datetime('now', '-7 days') + GROUP BY day ORDER BY day""" + ).fetchall() + + if len(rows) < 2: + return alerts # Not enough data + + daily_counts = [r["n"] for r in rows] + sma = statistics.mean(daily_counts[:-1]) if len(daily_counts) > 1 else daily_counts[0] + today_count = daily_counts[-1] + + if sma > 0 and today_count < sma * THROUGHPUT_DROP_RATIO: + alerts.append({ + "id": "throughput:stalling", + "severity": "warning", + "category": "throughput", + "title": f"Throughput stalling: {today_count} merges today vs {sma:.0f}/day avg", + "detail": ( + f"Today's merge count ({today_count}) is below {THROUGHPUT_DROP_RATIO:.0%} of " + f"7-day average ({sma:.1f}/day). Daily counts: {daily_counts}." + ), + "agent": None, + "domain": None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + return alerts + + +# ─── Check: Rejection Reason Spike ───────────────────────────────────────── + + +def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]: + """Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections.""" + alerts = [] + + # Total rejections in 24h + total = conn.execute( + """SELECT COUNT(*) as n FROM audit_log + WHERE stage='evaluate' + AND event IN ('changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-24 hours')""" + ).fetchone()["n"] + + if total < 10: + return alerts # Not enough data + + # Count by rejection tag + tags = conn.execute( + """SELECT value as tag, COUNT(*) as cnt + FROM audit_log, json_each(json_extract(detail, '$.issues')) + WHERE stage='evaluate' + AND event IN ('changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-24 hours') + GROUP BY tag ORDER BY cnt DESC""" + ).fetchall() + + for t in tags: + ratio = t["cnt"] / total + if ratio > REJECTION_SPIKE_RATIO: + alerts.append({ + "id": f"rejection_spike:{t['tag']}", + "severity": "warning", + "category": "quality", + "title": f"Rejection reason '{t['tag']}' at {ratio:.0%} of rejections", + "detail": ( + f"'{t['tag']}' accounts for {t['cnt']}/{total} rejections in 24h " + f"({ratio:.1%}). Threshold: {REJECTION_SPIKE_RATIO:.0%}." + ), + "agent": None, + "domain": None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + return alerts + + +# ─── Check: Stuck Loops ──────────────────────────────────────────────────── + + +def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]: + """Detect agents repeatedly failing on the same rejection reason.""" + alerts = [] + + # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) + rows = conn.execute( + """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, + value as tag, + COUNT(*) as cnt + FROM audit_log, json_each(json_extract(detail, '$.issues')) + WHERE stage='evaluate' + AND event IN ('changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-6 hours') + AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL + GROUP BY agent, tag + HAVING cnt > ?""", + (STUCK_LOOP_THRESHOLD,), + ).fetchall() + + for r in rows: + alerts.append({ + "id": f"stuck_loop:{r['agent']}:{r['tag']}", + "severity": "critical", + "category": "health", + "title": f"Agent '{r['agent']}' stuck: '{r['tag']}' failed {r['cnt']}x in 6h", + "detail": ( + f"Agent '{r['agent']}' has been rejected for '{r['tag']}' " + f"{r['cnt']} times in the last 6 hours (threshold: {STUCK_LOOP_THRESHOLD}). " + f"Stop and reassess." + ), + "agent": r["agent"], + "domain": None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + return alerts + + +# ─── Check: Cost Spikes ──────────────────────────────────────────────────── + + +def check_cost_spikes(conn: sqlite3.Connection) -> list[dict]: + """Detect daily cost exceeding 2x of 7-day average per agent.""" + alerts = [] + + # Check if costs table exists and has agent column + try: + cols = conn.execute("PRAGMA table_info(costs)").fetchall() + col_names = {c["name"] for c in cols} + except sqlite3.Error: + return alerts + + if "agent" not in col_names or "cost_usd" not in col_names: + # Fall back to per-PR cost tracking + rows = conn.execute( + """SELECT agent, + SUM(CASE WHEN created_at > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost, + SUM(CASE WHEN created_at > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily + FROM prs WHERE agent IS NOT NULL AND cost_usd > 0 + GROUP BY agent + HAVING avg_daily > 0""" + ).fetchall() + else: + rows = conn.execute( + """SELECT agent, + SUM(CASE WHEN timestamp > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost, + SUM(CASE WHEN timestamp > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily + FROM costs WHERE agent IS NOT NULL + GROUP BY agent + HAVING avg_daily > 0""" + ).fetchall() + + for r in rows: + if r["avg_daily"] and r["today_cost"] > r["avg_daily"] * COST_SPIKE_RATIO: + ratio = r["today_cost"] / r["avg_daily"] + alerts.append({ + "id": f"cost_spike:{r['agent']}", + "severity": "warning", + "category": "health", + "title": f"Agent '{r['agent']}' cost spike: ${r['today_cost']:.2f} today ({ratio:.1f}x avg)", + "detail": ( + f"Today's cost (${r['today_cost']:.2f}) is {ratio:.1f}x the 7-day daily average " + f"(${r['avg_daily']:.2f}). Threshold: {COST_SPIKE_RATIO}x." + ), + "agent": r["agent"], + "domain": None, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + return alerts + + +# ─── Check: Domain Rejection Patterns (Theseus addition) ─────────────────── + + +def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]: + """Track rejection reason shift per domain — surfaces domain maturity issues.""" + alerts = [] + + # Per-domain rejection breakdown in 24h + rows = conn.execute( + """SELECT json_extract(detail, '$.domain') as domain, + value as tag, + COUNT(*) as cnt + FROM audit_log, json_each(json_extract(detail, '$.issues')) + WHERE stage='evaluate' + AND event IN ('changes_requested','domain_rejected','tier05_rejected') + AND timestamp > datetime('now', '-24 hours') + AND json_extract(detail, '$.domain') IS NOT NULL + GROUP BY domain, tag + ORDER BY domain, cnt DESC""" + ).fetchall() + + # Group by domain + domain_tags = {} + for r in rows: + d = r["domain"] + if d not in domain_tags: + domain_tags[d] = [] + domain_tags[d].append({"tag": r["tag"], "count": r["cnt"]}) + + # Flag if a domain has >50% of rejections from a single reason (concentrated failure) + for domain, tags in domain_tags.items(): + total = sum(t["count"] for t in tags) + if total < 5: + continue + top = tags[0] + ratio = top["count"] / total + if ratio > 0.5: + alerts.append({ + "id": f"domain_rejection_pattern:{domain}:{top['tag']}", + "severity": "info", + "category": "failure_pattern", + "title": f"Domain '{domain}': {ratio:.0%} of rejections are '{top['tag']}'", + "detail": ( + f"In domain '{domain}', {top['count']}/{total} rejections (24h) are for " + f"'{top['tag']}'. This may indicate a systematic issue with evidence standards " + f"or schema compliance in this domain." + ), + "agent": None, + "domain": domain, + "detected_at": _now_iso(), + "auto_resolve": True, + }) + + return alerts + + +# ─── Failure Report Generator ─────────────────────────────────────────────── + + +def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 24) -> dict | None: + """Compile a failure report for a specific agent. + + Returns top rejection reasons, example PRs, and suggested fixes. + Designed to be sent directly to the agent via Pentagon messaging. + """ + hours = int(hours) # defensive — callers should pass int, but enforce it + rows = conn.execute( + """SELECT value as tag, COUNT(*) as cnt, + GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers + FROM audit_log, json_each(json_extract(detail, '$.issues')) + WHERE stage='evaluate' + AND event IN ('changes_requested','domain_rejected','tier05_rejected') + AND json_extract(detail, '$.agent') = ? + AND timestamp > datetime('now', ? || ' hours') + GROUP BY tag ORDER BY cnt DESC + LIMIT 5""", + (agent, f"-{hours}"), + ).fetchall() + + if not rows: + return None + + total_rejections = sum(r["cnt"] for r in rows) + top_reasons = [] + for r in rows: + prs = r["pr_numbers"].split(",")[:3] if r["pr_numbers"] else [] + top_reasons.append({ + "reason": r["tag"], + "count": r["cnt"], + "pct": round(r["cnt"] / total_rejections * 100, 1), + "example_prs": prs, + "suggestion": _suggest_fix(r["tag"]), + }) + + return { + "agent": agent, + "period_hours": hours, + "total_rejections": total_rejections, + "top_reasons": top_reasons, + "generated_at": _now_iso(), + } + + +def _suggest_fix(rejection_tag: str) -> str: + """Map known rejection reasons to actionable suggestions.""" + suggestions = { + "broken_wiki_links": "Check that all [[wiki links]] in claims resolve to existing files. Run link validation before submitting.", + "near_duplicate": "Search existing claims before creating new ones. Use semantic search to find similar claims.", + "frontmatter_schema": "Validate YAML frontmatter against the claim schema. Required fields: title, domain, confidence, type.", + "weak_evidence": "Add concrete sources, data points, or citations. Claims need evidence that can be independently verified.", + "missing_confidence": "Every claim needs a confidence level: proven, likely, experimental, or speculative.", + "domain_mismatch": "Ensure claims are filed under the correct domain. Check domain definitions if unsure.", + "too_broad": "Break broad claims into specific, testable sub-claims.", + "missing_links": "Claims should link to related claims, entities, or sources. Isolated claims are harder to verify.", + } + return suggestions.get(rejection_tag, f"Review rejection reason '{rejection_tag}' and adjust extraction accordingly.") + + +# ─── Run All Checks ──────────────────────────────────────────────────────── + + +def run_all_checks(conn: sqlite3.Connection) -> list[dict]: + """Execute all check functions and return combined alerts.""" + alerts = [] + alerts.extend(check_agent_health(conn)) + alerts.extend(check_quality_regression(conn)) + alerts.extend(check_throughput(conn)) + alerts.extend(check_rejection_spike(conn)) + alerts.extend(check_stuck_loops(conn)) + alerts.extend(check_cost_spikes(conn)) + alerts.extend(check_domain_rejection_patterns(conn)) + return alerts + + +def format_alert_message(alert: dict) -> str: + """Format an alert for Pentagon messaging.""" + severity_icon = {"critical": "!!", "warning": "!", "info": "~"} + icon = severity_icon.get(alert["severity"], "?") + return f"[{icon}] {alert['title']}\n{alert['detail']}" diff --git a/diagnostics/alerting_routes.py b/diagnostics/alerting_routes.py new file mode 100644 index 00000000..fd357407 --- /dev/null +++ b/diagnostics/alerting_routes.py @@ -0,0 +1,125 @@ +"""Route handlers for /check and /api/alerts endpoints. + +Import into app.py and register routes in create_app(). +""" + +import json +import logging +from datetime import datetime, timezone + +from aiohttp import web +from alerting import run_all_checks, generate_failure_report, format_alert_message # requires CWD = deploy dir; switch to relative import if packaged + +logger = logging.getLogger("argus.alerting") + +# In-memory alert store (replaced each /check cycle, persists between requests) +_active_alerts: list[dict] = [] +_last_check: str | None = None + + +async def handle_check(request): + """GET /check — run all monitoring checks, update active alerts, return results. + + Designed to be called by systemd timer every 5 minutes. + Returns JSON summary of all detected issues. + """ + conn = request.app["_alerting_conn_func"]() + try: + alerts = run_all_checks(conn) + except Exception as e: + logger.error("Check failed: %s", e) + return web.json_response({"error": str(e)}, status=500) + + global _active_alerts, _last_check + _active_alerts = alerts + _last_check = datetime.now(timezone.utc).isoformat() + + # Generate failure reports for agents with stuck loops + failure_reports = {} + stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]} + for agent in stuck_agents: + report = generate_failure_report(conn, agent) + if report: + failure_reports[agent] = report + + result = { + "checked_at": _last_check, + "alert_count": len(alerts), + "critical": sum(1 for a in alerts if a["severity"] == "critical"), + "warning": sum(1 for a in alerts if a["severity"] == "warning"), + "info": sum(1 for a in alerts if a["severity"] == "info"), + "alerts": alerts, + "failure_reports": failure_reports, + } + + logger.info( + "Check complete: %d alerts (%d critical, %d warning)", + len(alerts), + result["critical"], + result["warning"], + ) + + return web.json_response(result) + + +async def handle_api_alerts(request): + """GET /api/alerts — return current active alerts. + + Query params: + severity: filter by severity (critical, warning, info) + category: filter by category (health, quality, throughput, failure_pattern) + agent: filter by agent name + domain: filter by domain + """ + alerts = list(_active_alerts) + + # Filters + severity = request.query.get("severity") + if severity: + alerts = [a for a in alerts if a["severity"] == severity] + + category = request.query.get("category") + if category: + alerts = [a for a in alerts if a["category"] == category] + + agent = request.query.get("agent") + if agent: + alerts = [a for a in alerts if a.get("agent") == agent] + + domain = request.query.get("domain") + if domain: + alerts = [a for a in alerts if a.get("domain") == domain] + + return web.json_response({ + "alerts": alerts, + "total": len(alerts), + "last_check": _last_check, + }) + + +async def handle_api_failure_report(request): + """GET /api/failure-report/{agent} — generate failure report for an agent. + + Query params: + hours: lookback window (default 24) + """ + agent = request.match_info["agent"] + hours = int(request.query.get("hours", "24")) + conn = request.app["_alerting_conn_func"]() + + report = generate_failure_report(conn, agent, hours) + if not report: + return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours}) + + return web.json_response(report) + + +def register_alerting_routes(app, get_conn_func): + """Register alerting routes on the app. + + get_conn_func: callable that returns a read-only sqlite3.Connection + """ + app["_alerting_conn_func"] = get_conn_func + app.router.add_get("/check", handle_check) + app.router.add_get("/api/alerts", handle_api_alerts) + app.router.add_get("/api/failure-report/{agent}", handle_api_failure_report)