argus: add active alerting system (Phase 1)
Three new files for the engineering acceleration initiative:
- alerting.py: 7 health check functions (dormant agents, quality regression,
throughput anomaly, rejection spikes, stuck loops, cost spikes, domain
rejection patterns) + failure report generator
- alerting_routes.py: /check, /api/alerts, /api/failure-report/{agent} endpoints
- PATCH_INSTRUCTIONS.md: integration guide for app.py (imports, route
registration, auth middleware bypass, DB connection)
Observe and alert only — no pipeline modification. Independence constraint
is load-bearing for measurement trustworthiness.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6550cad7e5
commit
33e670b436
3 changed files with 727 additions and 0 deletions
65
diagnostics/PATCH_INSTRUCTIONS.md
Normal file
65
diagnostics/PATCH_INSTRUCTIONS.md
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
# Alerting Integration Patch for app.py
|
||||
|
||||
Two changes needed in the live app.py:
|
||||
|
||||
## 1. Add import (after `from activity_endpoint import handle_activity`)
|
||||
|
||||
```python
|
||||
from alerting_routes import register_alerting_routes
|
||||
```
|
||||
|
||||
## 2. Register routes in create_app() (after the last `app.router.add_*` line)
|
||||
|
||||
```python
|
||||
# Alerting — active monitoring endpoints
|
||||
register_alerting_routes(app, _alerting_conn)
|
||||
```
|
||||
|
||||
## 3. Add helper function (before create_app)
|
||||
|
||||
```python
|
||||
def _alerting_conn() -> sqlite3.Connection:
|
||||
"""Dedicated read-only connection for alerting checks.
|
||||
|
||||
Separate from app['db'] to avoid contention with request handlers.
|
||||
Always sets row_factory for named column access.
|
||||
"""
|
||||
conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
```
|
||||
|
||||
## 4. Add /check and /api/alerts to PUBLIC_PATHS
|
||||
|
||||
```python
|
||||
_PUBLIC_PATHS = frozenset({"/", "/api/metrics", "/api/rejections", "/api/snapshots",
|
||||
"/api/vital-signs", "/api/contributors", "/api/domains",
|
||||
"/api/audit", "/check", "/api/alerts"})
|
||||
```
|
||||
|
||||
## 5. Add /api/failure-report/ prefix check in auth middleware
|
||||
|
||||
In the `@web.middleware` auth function, add this alongside the existing
|
||||
`request.path.startswith("/api/audit/")` check:
|
||||
|
||||
```python
|
||||
if request.path.startswith("/api/failure-report/"):
|
||||
return await handler(request)
|
||||
```
|
||||
|
||||
## Deploy notes
|
||||
|
||||
- `alerting.py` and `alerting_routes.py` must be in the **same directory** as `app.py`
|
||||
(i.e., `/opt/teleo-eval/diagnostics/`). The import uses a bare module name, not
|
||||
a relative import, so Python resolves it via `sys.path` which includes the working
|
||||
directory. If the deploy changes the working directory or uses a package structure,
|
||||
switch the import in `alerting_routes.py` line 11 to `from .alerting import ...`.
|
||||
|
||||
- The `/api/failure-report/{agent}` endpoint is standalone — any agent can pull their
|
||||
own report on demand via `GET /api/failure-report/<agent-name>?hours=24`.
|
||||
|
||||
## Files to deploy
|
||||
|
||||
- `alerting.py` → `/opt/teleo-eval/diagnostics/alerting.py`
|
||||
- `alerting_routes.py` → `/opt/teleo-eval/diagnostics/alerting_routes.py`
|
||||
- Patched `app.py` → `/opt/teleo-eval/diagnostics/app.py`
|
||||
537
diagnostics/alerting.py
Normal file
537
diagnostics/alerting.py
Normal file
|
|
@ -0,0 +1,537 @@
|
|||
"""Argus active monitoring — health watchdog, quality regression, throughput anomaly detection.
|
||||
|
||||
Provides check functions that detect problems and return structured alerts.
|
||||
Called by /check endpoint (periodic cron) or on-demand.
|
||||
|
||||
Alert schema:
|
||||
{
|
||||
"id": str, # unique key for dedup (e.g. "dormant:ganymede")
|
||||
"severity": str, # "critical" | "warning" | "info"
|
||||
"category": str, # "health" | "quality" | "throughput" | "failure_pattern"
|
||||
"title": str, # human-readable headline
|
||||
"detail": str, # actionable description
|
||||
"agent": str|None, # affected agent (if applicable)
|
||||
"domain": str|None, # affected domain (if applicable)
|
||||
"detected_at": str, # ISO timestamp
|
||||
"auto_resolve": bool, # clears when condition clears
|
||||
}
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import statistics
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
# ─── Agent-domain mapping (static config, maintained by Argus) ──────────────
|
||||
|
||||
AGENT_DOMAINS = {
|
||||
"rio": ["internet-finance"],
|
||||
"clay": ["creative-industries"],
|
||||
"ganymede": None, # reviewer — cross-domain
|
||||
"epimetheus": None, # infra
|
||||
"leo": None, # standards
|
||||
"oberon": None, # evolution tracking
|
||||
"vida": None, # health monitoring
|
||||
"hermes": None, # comms
|
||||
"astra": None, # research
|
||||
}
|
||||
|
||||
# Thresholds
|
||||
DORMANCY_HOURS = 48
|
||||
APPROVAL_DROP_THRESHOLD = 15 # percentage points below 7-day baseline
|
||||
THROUGHPUT_DROP_RATIO = 0.5 # alert if today < 50% of 7-day SMA
|
||||
REJECTION_SPIKE_RATIO = 0.20 # single reason > 20% of recent rejections
|
||||
STUCK_LOOP_THRESHOLD = 3 # same agent + same rejection reason > N times in 6h
|
||||
COST_SPIKE_RATIO = 2.0 # daily cost > 2x 7-day average
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
# ─── Check: Agent Health (dormancy detection) ───────────────────────────────
|
||||
|
||||
|
||||
def check_agent_health(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Detect agents with no PR activity in the last DORMANCY_HOURS hours."""
|
||||
alerts = []
|
||||
|
||||
# Get last activity per agent
|
||||
rows = conn.execute(
|
||||
"""SELECT agent, MAX(last_attempt) as latest, COUNT(*) as total_prs
|
||||
FROM prs WHERE agent IS NOT NULL
|
||||
GROUP BY agent"""
|
||||
).fetchall()
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
for r in rows:
|
||||
agent = r["agent"]
|
||||
latest = r["latest"]
|
||||
if not latest:
|
||||
continue
|
||||
|
||||
last_dt = datetime.fromisoformat(latest)
|
||||
if last_dt.tzinfo is None:
|
||||
last_dt = last_dt.replace(tzinfo=timezone.utc)
|
||||
|
||||
hours_since = (now - last_dt).total_seconds() / 3600
|
||||
|
||||
if hours_since > DORMANCY_HOURS:
|
||||
alerts.append({
|
||||
"id": f"dormant:{agent}",
|
||||
"severity": "warning",
|
||||
"category": "health",
|
||||
"title": f"Agent '{agent}' dormant for {int(hours_since)}h",
|
||||
"detail": (
|
||||
f"No PR activity since {latest}. "
|
||||
f"Last seen {int(hours_since)}h ago (threshold: {DORMANCY_HOURS}h). "
|
||||
f"Total historical PRs: {r['total_prs']}."
|
||||
),
|
||||
"agent": agent,
|
||||
"domain": None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ─── Check: Quality Regression (approval rate drop) ─────────────────────────
|
||||
|
||||
|
||||
def check_quality_regression(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Detect approval rate drops vs 7-day baseline, per agent and per domain."""
|
||||
alerts = []
|
||||
|
||||
# 7-day baseline approval rate (overall)
|
||||
baseline = conn.execute(
|
||||
"""SELECT
|
||||
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
||||
COUNT(*) as total
|
||||
FROM audit_log
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-7 days')"""
|
||||
).fetchone()
|
||||
baseline_rate = (baseline["approved"] / baseline["total"] * 100) if baseline["total"] else None
|
||||
|
||||
# 24h approval rate (overall)
|
||||
recent = conn.execute(
|
||||
"""SELECT
|
||||
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
||||
COUNT(*) as total
|
||||
FROM audit_log
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-24 hours')"""
|
||||
).fetchone()
|
||||
recent_rate = (recent["approved"] / recent["total"] * 100) if recent["total"] else None
|
||||
|
||||
if baseline_rate is not None and recent_rate is not None:
|
||||
drop = baseline_rate - recent_rate
|
||||
if drop > APPROVAL_DROP_THRESHOLD:
|
||||
alerts.append({
|
||||
"id": "quality_regression:overall",
|
||||
"severity": "critical",
|
||||
"category": "quality",
|
||||
"title": f"Approval rate dropped {drop:.0f}pp (24h: {recent_rate:.0f}% vs 7d: {baseline_rate:.0f}%)",
|
||||
"detail": (
|
||||
f"24h approval rate ({recent_rate:.1f}%) is {drop:.1f} percentage points below "
|
||||
f"7-day baseline ({baseline_rate:.1f}%). "
|
||||
f"Evaluated {recent['total']} PRs in last 24h."
|
||||
),
|
||||
"agent": None,
|
||||
"domain": None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
# Per-agent approval rate (24h vs 7d) — only for agents with >=5 evals in each window
|
||||
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
|
||||
_check_approval_by_dimension(conn, alerts, "agent", "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))")
|
||||
|
||||
# Per-domain approval rate (24h vs 7d) — Theseus addition
|
||||
_check_approval_by_dimension(conn, alerts, "domain", "json_extract(detail, '$.domain')")
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr):
|
||||
"""Check approval rate regression grouped by a dimension (agent or domain)."""
|
||||
# 7-day baseline per dimension
|
||||
baseline_rows = conn.execute(
|
||||
f"""SELECT {dim_expr} as dim_val,
|
||||
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
||||
COUNT(*) as total
|
||||
FROM audit_log
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-7 days')
|
||||
AND {dim_expr} IS NOT NULL
|
||||
GROUP BY dim_val HAVING total >= 5"""
|
||||
).fetchall()
|
||||
baselines = {r["dim_val"]: (r["approved"] / r["total"] * 100) for r in baseline_rows}
|
||||
|
||||
# 24h per dimension
|
||||
recent_rows = conn.execute(
|
||||
f"""SELECT {dim_expr} as dim_val,
|
||||
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
||||
COUNT(*) as total
|
||||
FROM audit_log
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-24 hours')
|
||||
AND {dim_expr} IS NOT NULL
|
||||
GROUP BY dim_val HAVING total >= 5"""
|
||||
).fetchall()
|
||||
|
||||
for r in recent_rows:
|
||||
val = r["dim_val"]
|
||||
if val not in baselines:
|
||||
continue
|
||||
recent_rate = r["approved"] / r["total"] * 100
|
||||
base_rate = baselines[val]
|
||||
drop = base_rate - recent_rate
|
||||
if drop > APPROVAL_DROP_THRESHOLD:
|
||||
alerts.append({
|
||||
"id": f"quality_regression:{dim_name}:{val}",
|
||||
"severity": "warning",
|
||||
"category": "quality",
|
||||
"title": f"{dim_name.title()} '{val}' approval dropped {drop:.0f}pp",
|
||||
"detail": (
|
||||
f"24h: {recent_rate:.1f}% vs 7d baseline: {base_rate:.1f}% "
|
||||
f"({r['total']} evals in 24h)."
|
||||
),
|
||||
"agent": val if dim_name == "agent" else None,
|
||||
"domain": val if dim_name == "domain" else None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
|
||||
# ─── Check: Throughput Anomaly ──────────────────────────────────────────────
|
||||
|
||||
|
||||
def check_throughput(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Detect throughput stalling — today vs 7-day SMA."""
|
||||
alerts = []
|
||||
|
||||
# Daily merged counts for last 7 days
|
||||
rows = conn.execute(
|
||||
"""SELECT date(merged_at) as day, COUNT(*) as n
|
||||
FROM prs WHERE merged_at > datetime('now', '-7 days')
|
||||
GROUP BY day ORDER BY day"""
|
||||
).fetchall()
|
||||
|
||||
if len(rows) < 2:
|
||||
return alerts # Not enough data
|
||||
|
||||
daily_counts = [r["n"] for r in rows]
|
||||
sma = statistics.mean(daily_counts[:-1]) if len(daily_counts) > 1 else daily_counts[0]
|
||||
today_count = daily_counts[-1]
|
||||
|
||||
if sma > 0 and today_count < sma * THROUGHPUT_DROP_RATIO:
|
||||
alerts.append({
|
||||
"id": "throughput:stalling",
|
||||
"severity": "warning",
|
||||
"category": "throughput",
|
||||
"title": f"Throughput stalling: {today_count} merges today vs {sma:.0f}/day avg",
|
||||
"detail": (
|
||||
f"Today's merge count ({today_count}) is below {THROUGHPUT_DROP_RATIO:.0%} of "
|
||||
f"7-day average ({sma:.1f}/day). Daily counts: {daily_counts}."
|
||||
),
|
||||
"agent": None,
|
||||
"domain": None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ─── Check: Rejection Reason Spike ─────────────────────────────────────────
|
||||
|
||||
|
||||
def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections."""
|
||||
alerts = []
|
||||
|
||||
# Total rejections in 24h
|
||||
total = conn.execute(
|
||||
"""SELECT COUNT(*) as n FROM audit_log
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-24 hours')"""
|
||||
).fetchone()["n"]
|
||||
|
||||
if total < 10:
|
||||
return alerts # Not enough data
|
||||
|
||||
# Count by rejection tag
|
||||
tags = conn.execute(
|
||||
"""SELECT value as tag, COUNT(*) as cnt
|
||||
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-24 hours')
|
||||
GROUP BY tag ORDER BY cnt DESC"""
|
||||
).fetchall()
|
||||
|
||||
for t in tags:
|
||||
ratio = t["cnt"] / total
|
||||
if ratio > REJECTION_SPIKE_RATIO:
|
||||
alerts.append({
|
||||
"id": f"rejection_spike:{t['tag']}",
|
||||
"severity": "warning",
|
||||
"category": "quality",
|
||||
"title": f"Rejection reason '{t['tag']}' at {ratio:.0%} of rejections",
|
||||
"detail": (
|
||||
f"'{t['tag']}' accounts for {t['cnt']}/{total} rejections in 24h "
|
||||
f"({ratio:.1%}). Threshold: {REJECTION_SPIKE_RATIO:.0%}."
|
||||
),
|
||||
"agent": None,
|
||||
"domain": None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ─── Check: Stuck Loops ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Detect agents repeatedly failing on the same rejection reason."""
|
||||
alerts = []
|
||||
|
||||
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
|
||||
rows = conn.execute(
|
||||
"""SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
|
||||
value as tag,
|
||||
COUNT(*) as cnt
|
||||
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-6 hours')
|
||||
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL
|
||||
GROUP BY agent, tag
|
||||
HAVING cnt > ?""",
|
||||
(STUCK_LOOP_THRESHOLD,),
|
||||
).fetchall()
|
||||
|
||||
for r in rows:
|
||||
alerts.append({
|
||||
"id": f"stuck_loop:{r['agent']}:{r['tag']}",
|
||||
"severity": "critical",
|
||||
"category": "health",
|
||||
"title": f"Agent '{r['agent']}' stuck: '{r['tag']}' failed {r['cnt']}x in 6h",
|
||||
"detail": (
|
||||
f"Agent '{r['agent']}' has been rejected for '{r['tag']}' "
|
||||
f"{r['cnt']} times in the last 6 hours (threshold: {STUCK_LOOP_THRESHOLD}). "
|
||||
f"Stop and reassess."
|
||||
),
|
||||
"agent": r["agent"],
|
||||
"domain": None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ─── Check: Cost Spikes ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def check_cost_spikes(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Detect daily cost exceeding 2x of 7-day average per agent."""
|
||||
alerts = []
|
||||
|
||||
# Check if costs table exists and has agent column
|
||||
try:
|
||||
cols = conn.execute("PRAGMA table_info(costs)").fetchall()
|
||||
col_names = {c["name"] for c in cols}
|
||||
except sqlite3.Error:
|
||||
return alerts
|
||||
|
||||
if "agent" not in col_names or "cost_usd" not in col_names:
|
||||
# Fall back to per-PR cost tracking
|
||||
rows = conn.execute(
|
||||
"""SELECT agent,
|
||||
SUM(CASE WHEN created_at > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost,
|
||||
SUM(CASE WHEN created_at > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily
|
||||
FROM prs WHERE agent IS NOT NULL AND cost_usd > 0
|
||||
GROUP BY agent
|
||||
HAVING avg_daily > 0"""
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"""SELECT agent,
|
||||
SUM(CASE WHEN timestamp > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost,
|
||||
SUM(CASE WHEN timestamp > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily
|
||||
FROM costs WHERE agent IS NOT NULL
|
||||
GROUP BY agent
|
||||
HAVING avg_daily > 0"""
|
||||
).fetchall()
|
||||
|
||||
for r in rows:
|
||||
if r["avg_daily"] and r["today_cost"] > r["avg_daily"] * COST_SPIKE_RATIO:
|
||||
ratio = r["today_cost"] / r["avg_daily"]
|
||||
alerts.append({
|
||||
"id": f"cost_spike:{r['agent']}",
|
||||
"severity": "warning",
|
||||
"category": "health",
|
||||
"title": f"Agent '{r['agent']}' cost spike: ${r['today_cost']:.2f} today ({ratio:.1f}x avg)",
|
||||
"detail": (
|
||||
f"Today's cost (${r['today_cost']:.2f}) is {ratio:.1f}x the 7-day daily average "
|
||||
f"(${r['avg_daily']:.2f}). Threshold: {COST_SPIKE_RATIO}x."
|
||||
),
|
||||
"agent": r["agent"],
|
||||
"domain": None,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ─── Check: Domain Rejection Patterns (Theseus addition) ───────────────────
|
||||
|
||||
|
||||
def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Track rejection reason shift per domain — surfaces domain maturity issues."""
|
||||
alerts = []
|
||||
|
||||
# Per-domain rejection breakdown in 24h
|
||||
rows = conn.execute(
|
||||
"""SELECT json_extract(detail, '$.domain') as domain,
|
||||
value as tag,
|
||||
COUNT(*) as cnt
|
||||
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
||||
AND timestamp > datetime('now', '-24 hours')
|
||||
AND json_extract(detail, '$.domain') IS NOT NULL
|
||||
GROUP BY domain, tag
|
||||
ORDER BY domain, cnt DESC"""
|
||||
).fetchall()
|
||||
|
||||
# Group by domain
|
||||
domain_tags = {}
|
||||
for r in rows:
|
||||
d = r["domain"]
|
||||
if d not in domain_tags:
|
||||
domain_tags[d] = []
|
||||
domain_tags[d].append({"tag": r["tag"], "count": r["cnt"]})
|
||||
|
||||
# Flag if a domain has >50% of rejections from a single reason (concentrated failure)
|
||||
for domain, tags in domain_tags.items():
|
||||
total = sum(t["count"] for t in tags)
|
||||
if total < 5:
|
||||
continue
|
||||
top = tags[0]
|
||||
ratio = top["count"] / total
|
||||
if ratio > 0.5:
|
||||
alerts.append({
|
||||
"id": f"domain_rejection_pattern:{domain}:{top['tag']}",
|
||||
"severity": "info",
|
||||
"category": "failure_pattern",
|
||||
"title": f"Domain '{domain}': {ratio:.0%} of rejections are '{top['tag']}'",
|
||||
"detail": (
|
||||
f"In domain '{domain}', {top['count']}/{total} rejections (24h) are for "
|
||||
f"'{top['tag']}'. This may indicate a systematic issue with evidence standards "
|
||||
f"or schema compliance in this domain."
|
||||
),
|
||||
"agent": None,
|
||||
"domain": domain,
|
||||
"detected_at": _now_iso(),
|
||||
"auto_resolve": True,
|
||||
})
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
# ─── Failure Report Generator ───────────────────────────────────────────────
|
||||
|
||||
|
||||
def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 24) -> dict | None:
|
||||
"""Compile a failure report for a specific agent.
|
||||
|
||||
Returns top rejection reasons, example PRs, and suggested fixes.
|
||||
Designed to be sent directly to the agent via Pentagon messaging.
|
||||
"""
|
||||
hours = int(hours) # defensive — callers should pass int, but enforce it
|
||||
rows = conn.execute(
|
||||
"""SELECT value as tag, COUNT(*) as cnt,
|
||||
GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers
|
||||
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
||||
WHERE stage='evaluate'
|
||||
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
||||
AND json_extract(detail, '$.agent') = ?
|
||||
AND timestamp > datetime('now', ? || ' hours')
|
||||
GROUP BY tag ORDER BY cnt DESC
|
||||
LIMIT 5""",
|
||||
(agent, f"-{hours}"),
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
return None
|
||||
|
||||
total_rejections = sum(r["cnt"] for r in rows)
|
||||
top_reasons = []
|
||||
for r in rows:
|
||||
prs = r["pr_numbers"].split(",")[:3] if r["pr_numbers"] else []
|
||||
top_reasons.append({
|
||||
"reason": r["tag"],
|
||||
"count": r["cnt"],
|
||||
"pct": round(r["cnt"] / total_rejections * 100, 1),
|
||||
"example_prs": prs,
|
||||
"suggestion": _suggest_fix(r["tag"]),
|
||||
})
|
||||
|
||||
return {
|
||||
"agent": agent,
|
||||
"period_hours": hours,
|
||||
"total_rejections": total_rejections,
|
||||
"top_reasons": top_reasons,
|
||||
"generated_at": _now_iso(),
|
||||
}
|
||||
|
||||
|
||||
def _suggest_fix(rejection_tag: str) -> str:
|
||||
"""Map known rejection reasons to actionable suggestions."""
|
||||
suggestions = {
|
||||
"broken_wiki_links": "Check that all [[wiki links]] in claims resolve to existing files. Run link validation before submitting.",
|
||||
"near_duplicate": "Search existing claims before creating new ones. Use semantic search to find similar claims.",
|
||||
"frontmatter_schema": "Validate YAML frontmatter against the claim schema. Required fields: title, domain, confidence, type.",
|
||||
"weak_evidence": "Add concrete sources, data points, or citations. Claims need evidence that can be independently verified.",
|
||||
"missing_confidence": "Every claim needs a confidence level: proven, likely, experimental, or speculative.",
|
||||
"domain_mismatch": "Ensure claims are filed under the correct domain. Check domain definitions if unsure.",
|
||||
"too_broad": "Break broad claims into specific, testable sub-claims.",
|
||||
"missing_links": "Claims should link to related claims, entities, or sources. Isolated claims are harder to verify.",
|
||||
}
|
||||
return suggestions.get(rejection_tag, f"Review rejection reason '{rejection_tag}' and adjust extraction accordingly.")
|
||||
|
||||
|
||||
# ─── Run All Checks ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def run_all_checks(conn: sqlite3.Connection) -> list[dict]:
|
||||
"""Execute all check functions and return combined alerts."""
|
||||
alerts = []
|
||||
alerts.extend(check_agent_health(conn))
|
||||
alerts.extend(check_quality_regression(conn))
|
||||
alerts.extend(check_throughput(conn))
|
||||
alerts.extend(check_rejection_spike(conn))
|
||||
alerts.extend(check_stuck_loops(conn))
|
||||
alerts.extend(check_cost_spikes(conn))
|
||||
alerts.extend(check_domain_rejection_patterns(conn))
|
||||
return alerts
|
||||
|
||||
|
||||
def format_alert_message(alert: dict) -> str:
|
||||
"""Format an alert for Pentagon messaging."""
|
||||
severity_icon = {"critical": "!!", "warning": "!", "info": "~"}
|
||||
icon = severity_icon.get(alert["severity"], "?")
|
||||
return f"[{icon}] {alert['title']}\n{alert['detail']}"
|
||||
125
diagnostics/alerting_routes.py
Normal file
125
diagnostics/alerting_routes.py
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
"""Route handlers for /check and /api/alerts endpoints.
|
||||
|
||||
Import into app.py and register routes in create_app().
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from aiohttp import web
|
||||
from alerting import run_all_checks, generate_failure_report, format_alert_message # requires CWD = deploy dir; switch to relative import if packaged
|
||||
|
||||
logger = logging.getLogger("argus.alerting")
|
||||
|
||||
# In-memory alert store (replaced each /check cycle, persists between requests)
|
||||
_active_alerts: list[dict] = []
|
||||
_last_check: str | None = None
|
||||
|
||||
|
||||
async def handle_check(request):
|
||||
"""GET /check — run all monitoring checks, update active alerts, return results.
|
||||
|
||||
Designed to be called by systemd timer every 5 minutes.
|
||||
Returns JSON summary of all detected issues.
|
||||
"""
|
||||
conn = request.app["_alerting_conn_func"]()
|
||||
try:
|
||||
alerts = run_all_checks(conn)
|
||||
except Exception as e:
|
||||
logger.error("Check failed: %s", e)
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
global _active_alerts, _last_check
|
||||
_active_alerts = alerts
|
||||
_last_check = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Generate failure reports for agents with stuck loops
|
||||
failure_reports = {}
|
||||
stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]}
|
||||
for agent in stuck_agents:
|
||||
report = generate_failure_report(conn, agent)
|
||||
if report:
|
||||
failure_reports[agent] = report
|
||||
|
||||
result = {
|
||||
"checked_at": _last_check,
|
||||
"alert_count": len(alerts),
|
||||
"critical": sum(1 for a in alerts if a["severity"] == "critical"),
|
||||
"warning": sum(1 for a in alerts if a["severity"] == "warning"),
|
||||
"info": sum(1 for a in alerts if a["severity"] == "info"),
|
||||
"alerts": alerts,
|
||||
"failure_reports": failure_reports,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"Check complete: %d alerts (%d critical, %d warning)",
|
||||
len(alerts),
|
||||
result["critical"],
|
||||
result["warning"],
|
||||
)
|
||||
|
||||
return web.json_response(result)
|
||||
|
||||
|
||||
async def handle_api_alerts(request):
|
||||
"""GET /api/alerts — return current active alerts.
|
||||
|
||||
Query params:
|
||||
severity: filter by severity (critical, warning, info)
|
||||
category: filter by category (health, quality, throughput, failure_pattern)
|
||||
agent: filter by agent name
|
||||
domain: filter by domain
|
||||
"""
|
||||
alerts = list(_active_alerts)
|
||||
|
||||
# Filters
|
||||
severity = request.query.get("severity")
|
||||
if severity:
|
||||
alerts = [a for a in alerts if a["severity"] == severity]
|
||||
|
||||
category = request.query.get("category")
|
||||
if category:
|
||||
alerts = [a for a in alerts if a["category"] == category]
|
||||
|
||||
agent = request.query.get("agent")
|
||||
if agent:
|
||||
alerts = [a for a in alerts if a.get("agent") == agent]
|
||||
|
||||
domain = request.query.get("domain")
|
||||
if domain:
|
||||
alerts = [a for a in alerts if a.get("domain") == domain]
|
||||
|
||||
return web.json_response({
|
||||
"alerts": alerts,
|
||||
"total": len(alerts),
|
||||
"last_check": _last_check,
|
||||
})
|
||||
|
||||
|
||||
async def handle_api_failure_report(request):
|
||||
"""GET /api/failure-report/{agent} — generate failure report for an agent.
|
||||
|
||||
Query params:
|
||||
hours: lookback window (default 24)
|
||||
"""
|
||||
agent = request.match_info["agent"]
|
||||
hours = int(request.query.get("hours", "24"))
|
||||
conn = request.app["_alerting_conn_func"]()
|
||||
|
||||
report = generate_failure_report(conn, agent, hours)
|
||||
if not report:
|
||||
return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours})
|
||||
|
||||
return web.json_response(report)
|
||||
|
||||
|
||||
def register_alerting_routes(app, get_conn_func):
|
||||
"""Register alerting routes on the app.
|
||||
|
||||
get_conn_func: callable that returns a read-only sqlite3.Connection
|
||||
"""
|
||||
app["_alerting_conn_func"] = get_conn_func
|
||||
app.router.add_get("/check", handle_check)
|
||||
app.router.add_get("/api/alerts", handle_api_alerts)
|
||||
app.router.add_get("/api/failure-report/{agent}", handle_api_failure_report)
|
||||
Loading…
Reference in a new issue