teleo-codex/diagnostics/alerting.py
m3taversal 33e670b436 argus: add active alerting system (Phase 1)
Three new files for the engineering acceleration initiative:
- alerting.py: 7 health check functions (dormant agents, quality regression,
  throughput anomaly, rejection spikes, stuck loops, cost spikes, domain
  rejection patterns) + failure report generator
- alerting_routes.py: /check, /api/alerts, /api/failure-report/{agent} endpoints
- PATCH_INSTRUCTIONS.md: integration guide for app.py (imports, route
  registration, auth middleware bypass, DB connection)

Observe and alert only — no pipeline modification. Independence constraint
is load-bearing for measurement trustworthiness.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:45:07 +00:00

537 lines
22 KiB
Python

"""Argus active monitoring — health watchdog, quality regression, throughput anomaly detection.
Provides check functions that detect problems and return structured alerts.
Called by /check endpoint (periodic cron) or on-demand.
Alert schema:
{
"id": str, # unique key for dedup (e.g. "dormant:ganymede")
"severity": str, # "critical" | "warning" | "info"
"category": str, # "health" | "quality" | "throughput" | "failure_pattern"
"title": str, # human-readable headline
"detail": str, # actionable description
"agent": str|None, # affected agent (if applicable)
"domain": str|None, # affected domain (if applicable)
"detected_at": str, # ISO timestamp
"auto_resolve": bool, # clears when condition clears
}
"""
import json
import sqlite3
import statistics
from datetime import datetime, timezone
# ─── Agent-domain mapping (static config, maintained by Argus) ──────────────
AGENT_DOMAINS = {
"rio": ["internet-finance"],
"clay": ["creative-industries"],
"ganymede": None, # reviewer — cross-domain
"epimetheus": None, # infra
"leo": None, # standards
"oberon": None, # evolution tracking
"vida": None, # health monitoring
"hermes": None, # comms
"astra": None, # research
}
# Thresholds
DORMANCY_HOURS = 48
APPROVAL_DROP_THRESHOLD = 15 # percentage points below 7-day baseline
THROUGHPUT_DROP_RATIO = 0.5 # alert if today < 50% of 7-day SMA
REJECTION_SPIKE_RATIO = 0.20 # single reason > 20% of recent rejections
STUCK_LOOP_THRESHOLD = 3 # same agent + same rejection reason > N times in 6h
COST_SPIKE_RATIO = 2.0 # daily cost > 2x 7-day average
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
# ─── Check: Agent Health (dormancy detection) ───────────────────────────────
def check_agent_health(conn: sqlite3.Connection) -> list[dict]:
"""Detect agents with no PR activity in the last DORMANCY_HOURS hours."""
alerts = []
# Get last activity per agent
rows = conn.execute(
"""SELECT agent, MAX(last_attempt) as latest, COUNT(*) as total_prs
FROM prs WHERE agent IS NOT NULL
GROUP BY agent"""
).fetchall()
now = datetime.now(timezone.utc)
for r in rows:
agent = r["agent"]
latest = r["latest"]
if not latest:
continue
last_dt = datetime.fromisoformat(latest)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
hours_since = (now - last_dt).total_seconds() / 3600
if hours_since > DORMANCY_HOURS:
alerts.append({
"id": f"dormant:{agent}",
"severity": "warning",
"category": "health",
"title": f"Agent '{agent}' dormant for {int(hours_since)}h",
"detail": (
f"No PR activity since {latest}. "
f"Last seen {int(hours_since)}h ago (threshold: {DORMANCY_HOURS}h). "
f"Total historical PRs: {r['total_prs']}."
),
"agent": agent,
"domain": None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
return alerts
# ─── Check: Quality Regression (approval rate drop) ─────────────────────────
def check_quality_regression(conn: sqlite3.Connection) -> list[dict]:
"""Detect approval rate drops vs 7-day baseline, per agent and per domain."""
alerts = []
# 7-day baseline approval rate (overall)
baseline = conn.execute(
"""SELECT
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
COUNT(*) as total
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-7 days')"""
).fetchone()
baseline_rate = (baseline["approved"] / baseline["total"] * 100) if baseline["total"] else None
# 24h approval rate (overall)
recent = conn.execute(
"""SELECT
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
COUNT(*) as total
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')"""
).fetchone()
recent_rate = (recent["approved"] / recent["total"] * 100) if recent["total"] else None
if baseline_rate is not None and recent_rate is not None:
drop = baseline_rate - recent_rate
if drop > APPROVAL_DROP_THRESHOLD:
alerts.append({
"id": "quality_regression:overall",
"severity": "critical",
"category": "quality",
"title": f"Approval rate dropped {drop:.0f}pp (24h: {recent_rate:.0f}% vs 7d: {baseline_rate:.0f}%)",
"detail": (
f"24h approval rate ({recent_rate:.1f}%) is {drop:.1f} percentage points below "
f"7-day baseline ({baseline_rate:.1f}%). "
f"Evaluated {recent['total']} PRs in last 24h."
),
"agent": None,
"domain": None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
# Per-agent approval rate (24h vs 7d) — only for agents with >=5 evals in each window
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
_check_approval_by_dimension(conn, alerts, "agent", "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))")
# Per-domain approval rate (24h vs 7d) — Theseus addition
_check_approval_by_dimension(conn, alerts, "domain", "json_extract(detail, '$.domain')")
return alerts
def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr):
"""Check approval rate regression grouped by a dimension (agent or domain)."""
# 7-day baseline per dimension
baseline_rows = conn.execute(
f"""SELECT {dim_expr} as dim_val,
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
COUNT(*) as total
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-7 days')
AND {dim_expr} IS NOT NULL
GROUP BY dim_val HAVING total >= 5"""
).fetchall()
baselines = {r["dim_val"]: (r["approved"] / r["total"] * 100) for r in baseline_rows}
# 24h per dimension
recent_rows = conn.execute(
f"""SELECT {dim_expr} as dim_val,
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
COUNT(*) as total
FROM audit_log
WHERE stage='evaluate'
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')
AND {dim_expr} IS NOT NULL
GROUP BY dim_val HAVING total >= 5"""
).fetchall()
for r in recent_rows:
val = r["dim_val"]
if val not in baselines:
continue
recent_rate = r["approved"] / r["total"] * 100
base_rate = baselines[val]
drop = base_rate - recent_rate
if drop > APPROVAL_DROP_THRESHOLD:
alerts.append({
"id": f"quality_regression:{dim_name}:{val}",
"severity": "warning",
"category": "quality",
"title": f"{dim_name.title()} '{val}' approval dropped {drop:.0f}pp",
"detail": (
f"24h: {recent_rate:.1f}% vs 7d baseline: {base_rate:.1f}% "
f"({r['total']} evals in 24h)."
),
"agent": val if dim_name == "agent" else None,
"domain": val if dim_name == "domain" else None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
# ─── Check: Throughput Anomaly ──────────────────────────────────────────────
def check_throughput(conn: sqlite3.Connection) -> list[dict]:
"""Detect throughput stalling — today vs 7-day SMA."""
alerts = []
# Daily merged counts for last 7 days
rows = conn.execute(
"""SELECT date(merged_at) as day, COUNT(*) as n
FROM prs WHERE merged_at > datetime('now', '-7 days')
GROUP BY day ORDER BY day"""
).fetchall()
if len(rows) < 2:
return alerts # Not enough data
daily_counts = [r["n"] for r in rows]
sma = statistics.mean(daily_counts[:-1]) if len(daily_counts) > 1 else daily_counts[0]
today_count = daily_counts[-1]
if sma > 0 and today_count < sma * THROUGHPUT_DROP_RATIO:
alerts.append({
"id": "throughput:stalling",
"severity": "warning",
"category": "throughput",
"title": f"Throughput stalling: {today_count} merges today vs {sma:.0f}/day avg",
"detail": (
f"Today's merge count ({today_count}) is below {THROUGHPUT_DROP_RATIO:.0%} of "
f"7-day average ({sma:.1f}/day). Daily counts: {daily_counts}."
),
"agent": None,
"domain": None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
return alerts
# ─── Check: Rejection Reason Spike ─────────────────────────────────────────
def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]:
"""Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections."""
alerts = []
# Total rejections in 24h
total = conn.execute(
"""SELECT COUNT(*) as n FROM audit_log
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')"""
).fetchone()["n"]
if total < 10:
return alerts # Not enough data
# Count by rejection tag
tags = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')
GROUP BY tag ORDER BY cnt DESC"""
).fetchall()
for t in tags:
ratio = t["cnt"] / total
if ratio > REJECTION_SPIKE_RATIO:
alerts.append({
"id": f"rejection_spike:{t['tag']}",
"severity": "warning",
"category": "quality",
"title": f"Rejection reason '{t['tag']}' at {ratio:.0%} of rejections",
"detail": (
f"'{t['tag']}' accounts for {t['cnt']}/{total} rejections in 24h "
f"({ratio:.1%}). Threshold: {REJECTION_SPIKE_RATIO:.0%}."
),
"agent": None,
"domain": None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
return alerts
# ─── Check: Stuck Loops ────────────────────────────────────────────────────
def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]:
"""Detect agents repeatedly failing on the same rejection reason."""
alerts = []
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
rows = conn.execute(
"""SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
value as tag,
COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-6 hours')
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL
GROUP BY agent, tag
HAVING cnt > ?""",
(STUCK_LOOP_THRESHOLD,),
).fetchall()
for r in rows:
alerts.append({
"id": f"stuck_loop:{r['agent']}:{r['tag']}",
"severity": "critical",
"category": "health",
"title": f"Agent '{r['agent']}' stuck: '{r['tag']}' failed {r['cnt']}x in 6h",
"detail": (
f"Agent '{r['agent']}' has been rejected for '{r['tag']}' "
f"{r['cnt']} times in the last 6 hours (threshold: {STUCK_LOOP_THRESHOLD}). "
f"Stop and reassess."
),
"agent": r["agent"],
"domain": None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
return alerts
# ─── Check: Cost Spikes ────────────────────────────────────────────────────
def check_cost_spikes(conn: sqlite3.Connection) -> list[dict]:
"""Detect daily cost exceeding 2x of 7-day average per agent."""
alerts = []
# Check if costs table exists and has agent column
try:
cols = conn.execute("PRAGMA table_info(costs)").fetchall()
col_names = {c["name"] for c in cols}
except sqlite3.Error:
return alerts
if "agent" not in col_names or "cost_usd" not in col_names:
# Fall back to per-PR cost tracking
rows = conn.execute(
"""SELECT agent,
SUM(CASE WHEN created_at > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost,
SUM(CASE WHEN created_at > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily
FROM prs WHERE agent IS NOT NULL AND cost_usd > 0
GROUP BY agent
HAVING avg_daily > 0"""
).fetchall()
else:
rows = conn.execute(
"""SELECT agent,
SUM(CASE WHEN timestamp > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost,
SUM(CASE WHEN timestamp > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily
FROM costs WHERE agent IS NOT NULL
GROUP BY agent
HAVING avg_daily > 0"""
).fetchall()
for r in rows:
if r["avg_daily"] and r["today_cost"] > r["avg_daily"] * COST_SPIKE_RATIO:
ratio = r["today_cost"] / r["avg_daily"]
alerts.append({
"id": f"cost_spike:{r['agent']}",
"severity": "warning",
"category": "health",
"title": f"Agent '{r['agent']}' cost spike: ${r['today_cost']:.2f} today ({ratio:.1f}x avg)",
"detail": (
f"Today's cost (${r['today_cost']:.2f}) is {ratio:.1f}x the 7-day daily average "
f"(${r['avg_daily']:.2f}). Threshold: {COST_SPIKE_RATIO}x."
),
"agent": r["agent"],
"domain": None,
"detected_at": _now_iso(),
"auto_resolve": True,
})
return alerts
# ─── Check: Domain Rejection Patterns (Theseus addition) ───────────────────
def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]:
"""Track rejection reason shift per domain — surfaces domain maturity issues."""
alerts = []
# Per-domain rejection breakdown in 24h
rows = conn.execute(
"""SELECT json_extract(detail, '$.domain') as domain,
value as tag,
COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')
AND json_extract(detail, '$.domain') IS NOT NULL
GROUP BY domain, tag
ORDER BY domain, cnt DESC"""
).fetchall()
# Group by domain
domain_tags = {}
for r in rows:
d = r["domain"]
if d not in domain_tags:
domain_tags[d] = []
domain_tags[d].append({"tag": r["tag"], "count": r["cnt"]})
# Flag if a domain has >50% of rejections from a single reason (concentrated failure)
for domain, tags in domain_tags.items():
total = sum(t["count"] for t in tags)
if total < 5:
continue
top = tags[0]
ratio = top["count"] / total
if ratio > 0.5:
alerts.append({
"id": f"domain_rejection_pattern:{domain}:{top['tag']}",
"severity": "info",
"category": "failure_pattern",
"title": f"Domain '{domain}': {ratio:.0%} of rejections are '{top['tag']}'",
"detail": (
f"In domain '{domain}', {top['count']}/{total} rejections (24h) are for "
f"'{top['tag']}'. This may indicate a systematic issue with evidence standards "
f"or schema compliance in this domain."
),
"agent": None,
"domain": domain,
"detected_at": _now_iso(),
"auto_resolve": True,
})
return alerts
# ─── Failure Report Generator ───────────────────────────────────────────────
def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 24) -> dict | None:
"""Compile a failure report for a specific agent.
Returns top rejection reasons, example PRs, and suggested fixes.
Designed to be sent directly to the agent via Pentagon messaging.
"""
hours = int(hours) # defensive — callers should pass int, but enforce it
rows = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt,
GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND json_extract(detail, '$.agent') = ?
AND timestamp > datetime('now', ? || ' hours')
GROUP BY tag ORDER BY cnt DESC
LIMIT 5""",
(agent, f"-{hours}"),
).fetchall()
if not rows:
return None
total_rejections = sum(r["cnt"] for r in rows)
top_reasons = []
for r in rows:
prs = r["pr_numbers"].split(",")[:3] if r["pr_numbers"] else []
top_reasons.append({
"reason": r["tag"],
"count": r["cnt"],
"pct": round(r["cnt"] / total_rejections * 100, 1),
"example_prs": prs,
"suggestion": _suggest_fix(r["tag"]),
})
return {
"agent": agent,
"period_hours": hours,
"total_rejections": total_rejections,
"top_reasons": top_reasons,
"generated_at": _now_iso(),
}
def _suggest_fix(rejection_tag: str) -> str:
"""Map known rejection reasons to actionable suggestions."""
suggestions = {
"broken_wiki_links": "Check that all [[wiki links]] in claims resolve to existing files. Run link validation before submitting.",
"near_duplicate": "Search existing claims before creating new ones. Use semantic search to find similar claims.",
"frontmatter_schema": "Validate YAML frontmatter against the claim schema. Required fields: title, domain, confidence, type.",
"weak_evidence": "Add concrete sources, data points, or citations. Claims need evidence that can be independently verified.",
"missing_confidence": "Every claim needs a confidence level: proven, likely, experimental, or speculative.",
"domain_mismatch": "Ensure claims are filed under the correct domain. Check domain definitions if unsure.",
"too_broad": "Break broad claims into specific, testable sub-claims.",
"missing_links": "Claims should link to related claims, entities, or sources. Isolated claims are harder to verify.",
}
return suggestions.get(rejection_tag, f"Review rejection reason '{rejection_tag}' and adjust extraction accordingly.")
# ─── Run All Checks ────────────────────────────────────────────────────────
def run_all_checks(conn: sqlite3.Connection) -> list[dict]:
"""Execute all check functions and return combined alerts."""
alerts = []
alerts.extend(check_agent_health(conn))
alerts.extend(check_quality_regression(conn))
alerts.extend(check_throughput(conn))
alerts.extend(check_rejection_spike(conn))
alerts.extend(check_stuck_loops(conn))
alerts.extend(check_cost_spikes(conn))
alerts.extend(check_domain_rejection_patterns(conn))
return alerts
def format_alert_message(alert: dict) -> str:
"""Format an alert for Pentagon messaging."""
severity_icon = {"critical": "!!", "warning": "!", "info": "~"}
icon = severity_icon.get(alert["severity"], "?")
return f"[{icon}] {alert['title']}\n{alert['detail']}"