"""Argus active monitoring — health watchdog, quality regression, throughput anomaly detection. Provides check functions that detect problems and return structured alerts. Called by /check endpoint (periodic cron) or on-demand. Alert schema: { "id": str, # unique key for dedup (e.g. "dormant:ganymede") "severity": str, # "critical" | "warning" | "info" "category": str, # "health" | "quality" | "throughput" | "failure_pattern" "title": str, # human-readable headline "detail": str, # actionable description "agent": str|None, # affected agent (if applicable) "domain": str|None, # affected domain (if applicable) "detected_at": str, # ISO timestamp "auto_resolve": bool, # clears when condition clears } """ import json import sqlite3 import statistics from datetime import datetime, timezone # ─── Agent-domain mapping (static config, maintained by Argus) ────────────── AGENT_DOMAINS = { "rio": ["internet-finance"], "clay": ["creative-industries"], "ganymede": None, # reviewer — cross-domain "epimetheus": None, # infra "leo": None, # standards "oberon": None, # evolution tracking "vida": None, # health monitoring "hermes": None, # comms "astra": None, # research } # Thresholds DORMANCY_HOURS = 48 APPROVAL_DROP_THRESHOLD = 15 # percentage points below 7-day baseline THROUGHPUT_DROP_RATIO = 0.5 # alert if today < 50% of 7-day SMA REJECTION_SPIKE_RATIO = 0.20 # single reason > 20% of recent rejections STUCK_LOOP_THRESHOLD = 3 # same agent + same rejection reason > N times in 6h COST_SPIKE_RATIO = 2.0 # daily cost > 2x 7-day average def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() # ─── Check: Agent Health (dormancy detection) ─────────────────────────────── def check_agent_health(conn: sqlite3.Connection) -> list[dict]: """Detect agents with no PR activity in the last DORMANCY_HOURS hours.""" alerts = [] # Get last activity per agent rows = conn.execute( """SELECT agent, MAX(last_attempt) as latest, COUNT(*) as total_prs FROM prs WHERE agent IS NOT NULL GROUP BY agent""" ).fetchall() now = datetime.now(timezone.utc) for r in rows: agent = r["agent"] latest = r["latest"] if not latest: continue last_dt = datetime.fromisoformat(latest) if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) hours_since = (now - last_dt).total_seconds() / 3600 if hours_since > DORMANCY_HOURS: alerts.append({ "id": f"dormant:{agent}", "severity": "warning", "category": "health", "title": f"Agent '{agent}' dormant for {int(hours_since)}h", "detail": ( f"No PR activity since {latest}. " f"Last seen {int(hours_since)}h ago (threshold: {DORMANCY_HOURS}h). " f"Total historical PRs: {r['total_prs']}." ), "agent": agent, "domain": None, "detected_at": _now_iso(), "auto_resolve": True, }) return alerts # ─── Check: Quality Regression (approval rate drop) ───────────────────────── def check_quality_regression(conn: sqlite3.Connection) -> list[dict]: """Detect approval rate drops vs 7-day baseline, per agent and per domain.""" alerts = [] # 7-day baseline approval rate (overall) baseline = conn.execute( """SELECT COUNT(CASE WHEN event='approved' THEN 1 END) as approved, COUNT(*) as total FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-7 days')""" ).fetchone() baseline_rate = (baseline["approved"] / baseline["total"] * 100) if baseline["total"] else None # 24h approval rate (overall) recent = conn.execute( """SELECT COUNT(CASE WHEN event='approved' THEN 1 END) as approved, COUNT(*) as total FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-24 hours')""" ).fetchone() recent_rate = (recent["approved"] / recent["total"] * 100) if recent["total"] else None if baseline_rate is not None and recent_rate is not None: drop = baseline_rate - recent_rate if drop > APPROVAL_DROP_THRESHOLD: alerts.append({ "id": "quality_regression:overall", "severity": "critical", "category": "quality", "title": f"Approval rate dropped {drop:.0f}pp (24h: {recent_rate:.0f}% vs 7d: {baseline_rate:.0f}%)", "detail": ( f"24h approval rate ({recent_rate:.1f}%) is {drop:.1f} percentage points below " f"7-day baseline ({baseline_rate:.1f}%). " f"Evaluated {recent['total']} PRs in last 24h." ), "agent": None, "domain": None, "detected_at": _now_iso(), "auto_resolve": True, }) # Per-agent approval rate (24h vs 7d) — only for agents with >=5 evals in each window # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) _check_approval_by_dimension(conn, alerts, "agent", "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))") # Per-domain approval rate (24h vs 7d) — Theseus addition _check_approval_by_dimension(conn, alerts, "domain", "json_extract(detail, '$.domain')") return alerts def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr): """Check approval rate regression grouped by a dimension (agent or domain).""" # 7-day baseline per dimension baseline_rows = conn.execute( f"""SELECT {dim_expr} as dim_val, COUNT(CASE WHEN event='approved' THEN 1 END) as approved, COUNT(*) as total FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-7 days') AND {dim_expr} IS NOT NULL GROUP BY dim_val HAVING total >= 5""" ).fetchall() baselines = {r["dim_val"]: (r["approved"] / r["total"] * 100) for r in baseline_rows} # 24h per dimension recent_rows = conn.execute( f"""SELECT {dim_expr} as dim_val, COUNT(CASE WHEN event='approved' THEN 1 END) as approved, COUNT(*) as total FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-24 hours') AND {dim_expr} IS NOT NULL GROUP BY dim_val HAVING total >= 5""" ).fetchall() for r in recent_rows: val = r["dim_val"] if val not in baselines: continue recent_rate = r["approved"] / r["total"] * 100 base_rate = baselines[val] drop = base_rate - recent_rate if drop > APPROVAL_DROP_THRESHOLD: alerts.append({ "id": f"quality_regression:{dim_name}:{val}", "severity": "warning", "category": "quality", "title": f"{dim_name.title()} '{val}' approval dropped {drop:.0f}pp", "detail": ( f"24h: {recent_rate:.1f}% vs 7d baseline: {base_rate:.1f}% " f"({r['total']} evals in 24h)." ), "agent": val if dim_name == "agent" else None, "domain": val if dim_name == "domain" else None, "detected_at": _now_iso(), "auto_resolve": True, }) # ─── Check: Throughput Anomaly ────────────────────────────────────────────── def check_throughput(conn: sqlite3.Connection) -> list[dict]: """Detect throughput stalling — today vs 7-day SMA.""" alerts = [] # Daily merged counts for last 7 days rows = conn.execute( """SELECT date(merged_at) as day, COUNT(*) as n FROM prs WHERE merged_at > datetime('now', '-7 days') GROUP BY day ORDER BY day""" ).fetchall() if len(rows) < 2: return alerts # Not enough data daily_counts = [r["n"] for r in rows] sma = statistics.mean(daily_counts[:-1]) if len(daily_counts) > 1 else daily_counts[0] today_count = daily_counts[-1] if sma > 0 and today_count < sma * THROUGHPUT_DROP_RATIO: alerts.append({ "id": "throughput:stalling", "severity": "warning", "category": "throughput", "title": f"Throughput stalling: {today_count} merges today vs {sma:.0f}/day avg", "detail": ( f"Today's merge count ({today_count}) is below {THROUGHPUT_DROP_RATIO:.0%} of " f"7-day average ({sma:.1f}/day). Daily counts: {daily_counts}." ), "agent": None, "domain": None, "detected_at": _now_iso(), "auto_resolve": True, }) return alerts # ─── Check: Rejection Reason Spike ───────────────────────────────────────── def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]: """Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections.""" alerts = [] # Total rejections in 24h total = conn.execute( """SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-24 hours')""" ).fetchone()["n"] if total < 10: return alerts # Not enough data # Count by rejection tag tags = conn.execute( """SELECT value as tag, COUNT(*) as cnt FROM audit_log, json_each(json_extract(detail, '$.issues')) WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-24 hours') GROUP BY tag ORDER BY cnt DESC""" ).fetchall() for t in tags: ratio = t["cnt"] / total if ratio > REJECTION_SPIKE_RATIO: alerts.append({ "id": f"rejection_spike:{t['tag']}", "severity": "warning", "category": "quality", "title": f"Rejection reason '{t['tag']}' at {ratio:.0%} of rejections", "detail": ( f"'{t['tag']}' accounts for {t['cnt']}/{total} rejections in 24h " f"({ratio:.1%}). Threshold: {REJECTION_SPIKE_RATIO:.0%}." ), "agent": None, "domain": None, "detected_at": _now_iso(), "auto_resolve": True, }) return alerts # ─── Check: Stuck Loops ──────────────────────────────────────────────────── def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]: """Detect agents repeatedly failing on the same rejection reason.""" alerts = [] # COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28) rows = conn.execute( """SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent, value as tag, COUNT(*) as cnt FROM audit_log, json_each(json_extract(detail, '$.issues')) WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-6 hours') AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL GROUP BY agent, tag HAVING cnt > ?""", (STUCK_LOOP_THRESHOLD,), ).fetchall() for r in rows: alerts.append({ "id": f"stuck_loop:{r['agent']}:{r['tag']}", "severity": "critical", "category": "health", "title": f"Agent '{r['agent']}' stuck: '{r['tag']}' failed {r['cnt']}x in 6h", "detail": ( f"Agent '{r['agent']}' has been rejected for '{r['tag']}' " f"{r['cnt']} times in the last 6 hours (threshold: {STUCK_LOOP_THRESHOLD}). " f"Stop and reassess." ), "agent": r["agent"], "domain": None, "detected_at": _now_iso(), "auto_resolve": True, }) return alerts # ─── Check: Cost Spikes ──────────────────────────────────────────────────── def check_cost_spikes(conn: sqlite3.Connection) -> list[dict]: """Detect daily cost exceeding 2x of 7-day average per agent.""" alerts = [] # Check if costs table exists and has agent column try: cols = conn.execute("PRAGMA table_info(costs)").fetchall() col_names = {c["name"] for c in cols} except sqlite3.Error: return alerts if "agent" not in col_names or "cost_usd" not in col_names: # Fall back to per-PR cost tracking rows = conn.execute( """SELECT agent, SUM(CASE WHEN created_at > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost, SUM(CASE WHEN created_at > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily FROM prs WHERE agent IS NOT NULL AND cost_usd > 0 GROUP BY agent HAVING avg_daily > 0""" ).fetchall() else: rows = conn.execute( """SELECT agent, SUM(CASE WHEN timestamp > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost, SUM(CASE WHEN timestamp > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily FROM costs WHERE agent IS NOT NULL GROUP BY agent HAVING avg_daily > 0""" ).fetchall() for r in rows: if r["avg_daily"] and r["today_cost"] > r["avg_daily"] * COST_SPIKE_RATIO: ratio = r["today_cost"] / r["avg_daily"] alerts.append({ "id": f"cost_spike:{r['agent']}", "severity": "warning", "category": "health", "title": f"Agent '{r['agent']}' cost spike: ${r['today_cost']:.2f} today ({ratio:.1f}x avg)", "detail": ( f"Today's cost (${r['today_cost']:.2f}) is {ratio:.1f}x the 7-day daily average " f"(${r['avg_daily']:.2f}). Threshold: {COST_SPIKE_RATIO}x." ), "agent": r["agent"], "domain": None, "detected_at": _now_iso(), "auto_resolve": True, }) return alerts # ─── Check: Domain Rejection Patterns (Theseus addition) ─────────────────── def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]: """Track rejection reason shift per domain — surfaces domain maturity issues.""" alerts = [] # Per-domain rejection breakdown in 24h rows = conn.execute( """SELECT json_extract(detail, '$.domain') as domain, value as tag, COUNT(*) as cnt FROM audit_log, json_each(json_extract(detail, '$.issues')) WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND timestamp > datetime('now', '-24 hours') AND json_extract(detail, '$.domain') IS NOT NULL GROUP BY domain, tag ORDER BY domain, cnt DESC""" ).fetchall() # Group by domain domain_tags = {} for r in rows: d = r["domain"] if d not in domain_tags: domain_tags[d] = [] domain_tags[d].append({"tag": r["tag"], "count": r["cnt"]}) # Flag if a domain has >50% of rejections from a single reason (concentrated failure) for domain, tags in domain_tags.items(): total = sum(t["count"] for t in tags) if total < 5: continue top = tags[0] ratio = top["count"] / total if ratio > 0.5: alerts.append({ "id": f"domain_rejection_pattern:{domain}:{top['tag']}", "severity": "info", "category": "failure_pattern", "title": f"Domain '{domain}': {ratio:.0%} of rejections are '{top['tag']}'", "detail": ( f"In domain '{domain}', {top['count']}/{total} rejections (24h) are for " f"'{top['tag']}'. This may indicate a systematic issue with evidence standards " f"or schema compliance in this domain." ), "agent": None, "domain": domain, "detected_at": _now_iso(), "auto_resolve": True, }) return alerts # ─── Failure Report Generator ─────────────────────────────────────────────── def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 24) -> dict | None: """Compile a failure report for a specific agent. Returns top rejection reasons, example PRs, and suggested fixes. Designed to be sent directly to the agent via Pentagon messaging. """ hours = int(hours) # defensive — callers should pass int, but enforce it rows = conn.execute( """SELECT value as tag, COUNT(*) as cnt, GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers FROM audit_log, json_each(json_extract(detail, '$.issues')) WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected') AND json_extract(detail, '$.agent') = ? AND timestamp > datetime('now', ? || ' hours') GROUP BY tag ORDER BY cnt DESC LIMIT 5""", (agent, f"-{hours}"), ).fetchall() if not rows: return None total_rejections = sum(r["cnt"] for r in rows) top_reasons = [] for r in rows: prs = r["pr_numbers"].split(",")[:3] if r["pr_numbers"] else [] top_reasons.append({ "reason": r["tag"], "count": r["cnt"], "pct": round(r["cnt"] / total_rejections * 100, 1), "example_prs": prs, "suggestion": _suggest_fix(r["tag"]), }) return { "agent": agent, "period_hours": hours, "total_rejections": total_rejections, "top_reasons": top_reasons, "generated_at": _now_iso(), } def _suggest_fix(rejection_tag: str) -> str: """Map known rejection reasons to actionable suggestions.""" suggestions = { "broken_wiki_links": "Check that all [[wiki links]] in claims resolve to existing files. Run link validation before submitting.", "near_duplicate": "Search existing claims before creating new ones. Use semantic search to find similar claims.", "frontmatter_schema": "Validate YAML frontmatter against the claim schema. Required fields: title, domain, confidence, type.", "weak_evidence": "Add concrete sources, data points, or citations. Claims need evidence that can be independently verified.", "missing_confidence": "Every claim needs a confidence level: proven, likely, experimental, or speculative.", "domain_mismatch": "Ensure claims are filed under the correct domain. Check domain definitions if unsure.", "too_broad": "Break broad claims into specific, testable sub-claims.", "missing_links": "Claims should link to related claims, entities, or sources. Isolated claims are harder to verify.", } return suggestions.get(rejection_tag, f"Review rejection reason '{rejection_tag}' and adjust extraction accordingly.") # ─── Run All Checks ──────────────────────────────────────────────────────── def run_all_checks(conn: sqlite3.Connection) -> list[dict]: """Execute all check functions and return combined alerts.""" alerts = [] alerts.extend(check_agent_health(conn)) alerts.extend(check_quality_regression(conn)) alerts.extend(check_throughput(conn)) alerts.extend(check_rejection_spike(conn)) alerts.extend(check_stuck_loops(conn)) alerts.extend(check_cost_spikes(conn)) alerts.extend(check_domain_rejection_patterns(conn)) return alerts def format_alert_message(alert: dict) -> str: """Format an alert for Pentagon messaging.""" severity_icon = {"critical": "!!", "warning": "!", "info": "~"} icon = severity_icon.get(alert["severity"], "?") return f"[{icon}] {alert['title']}\n{alert['detail']}"