Three new files for the engineering acceleration initiative:
- alerting.py: 7 health check functions (dormant agents, quality regression,
throughput anomaly, rejection spikes, stuck loops, cost spikes, domain
rejection patterns) + failure report generator
- alerting_routes.py: /check, /api/alerts, /api/failure-report/{agent} endpoints
- PATCH_INSTRUCTIONS.md: integration guide for app.py (imports, route
registration, auth middleware bypass, DB connection)
Observe and alert only — no pipeline modification. Independence constraint
is load-bearing for measurement trustworthiness.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
537 lines
22 KiB
Python
537 lines
22 KiB
Python
"""Argus active monitoring — health watchdog, quality regression, throughput anomaly detection.
|
|
|
|
Provides check functions that detect problems and return structured alerts.
|
|
Called by /check endpoint (periodic cron) or on-demand.
|
|
|
|
Alert schema:
|
|
{
|
|
"id": str, # unique key for dedup (e.g. "dormant:ganymede")
|
|
"severity": str, # "critical" | "warning" | "info"
|
|
"category": str, # "health" | "quality" | "throughput" | "failure_pattern"
|
|
"title": str, # human-readable headline
|
|
"detail": str, # actionable description
|
|
"agent": str|None, # affected agent (if applicable)
|
|
"domain": str|None, # affected domain (if applicable)
|
|
"detected_at": str, # ISO timestamp
|
|
"auto_resolve": bool, # clears when condition clears
|
|
}
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import statistics
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
# ─── Agent-domain mapping (static config, maintained by Argus) ──────────────
|
|
|
|
AGENT_DOMAINS = {
|
|
"rio": ["internet-finance"],
|
|
"clay": ["creative-industries"],
|
|
"ganymede": None, # reviewer — cross-domain
|
|
"epimetheus": None, # infra
|
|
"leo": None, # standards
|
|
"oberon": None, # evolution tracking
|
|
"vida": None, # health monitoring
|
|
"hermes": None, # comms
|
|
"astra": None, # research
|
|
}
|
|
|
|
# Thresholds
|
|
DORMANCY_HOURS = 48
|
|
APPROVAL_DROP_THRESHOLD = 15 # percentage points below 7-day baseline
|
|
THROUGHPUT_DROP_RATIO = 0.5 # alert if today < 50% of 7-day SMA
|
|
REJECTION_SPIKE_RATIO = 0.20 # single reason > 20% of recent rejections
|
|
STUCK_LOOP_THRESHOLD = 3 # same agent + same rejection reason > N times in 6h
|
|
COST_SPIKE_RATIO = 2.0 # daily cost > 2x 7-day average
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
# ─── Check: Agent Health (dormancy detection) ───────────────────────────────
|
|
|
|
|
|
def check_agent_health(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Detect agents with no PR activity in the last DORMANCY_HOURS hours."""
|
|
alerts = []
|
|
|
|
# Get last activity per agent
|
|
rows = conn.execute(
|
|
"""SELECT agent, MAX(last_attempt) as latest, COUNT(*) as total_prs
|
|
FROM prs WHERE agent IS NOT NULL
|
|
GROUP BY agent"""
|
|
).fetchall()
|
|
|
|
now = datetime.now(timezone.utc)
|
|
for r in rows:
|
|
agent = r["agent"]
|
|
latest = r["latest"]
|
|
if not latest:
|
|
continue
|
|
|
|
last_dt = datetime.fromisoformat(latest)
|
|
if last_dt.tzinfo is None:
|
|
last_dt = last_dt.replace(tzinfo=timezone.utc)
|
|
|
|
hours_since = (now - last_dt).total_seconds() / 3600
|
|
|
|
if hours_since > DORMANCY_HOURS:
|
|
alerts.append({
|
|
"id": f"dormant:{agent}",
|
|
"severity": "warning",
|
|
"category": "health",
|
|
"title": f"Agent '{agent}' dormant for {int(hours_since)}h",
|
|
"detail": (
|
|
f"No PR activity since {latest}. "
|
|
f"Last seen {int(hours_since)}h ago (threshold: {DORMANCY_HOURS}h). "
|
|
f"Total historical PRs: {r['total_prs']}."
|
|
),
|
|
"agent": agent,
|
|
"domain": None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
return alerts
|
|
|
|
|
|
# ─── Check: Quality Regression (approval rate drop) ─────────────────────────
|
|
|
|
|
|
def check_quality_regression(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Detect approval rate drops vs 7-day baseline, per agent and per domain."""
|
|
alerts = []
|
|
|
|
# 7-day baseline approval rate (overall)
|
|
baseline = conn.execute(
|
|
"""SELECT
|
|
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
|
COUNT(*) as total
|
|
FROM audit_log
|
|
WHERE stage='evaluate'
|
|
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-7 days')"""
|
|
).fetchone()
|
|
baseline_rate = (baseline["approved"] / baseline["total"] * 100) if baseline["total"] else None
|
|
|
|
# 24h approval rate (overall)
|
|
recent = conn.execute(
|
|
"""SELECT
|
|
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
|
COUNT(*) as total
|
|
FROM audit_log
|
|
WHERE stage='evaluate'
|
|
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-24 hours')"""
|
|
).fetchone()
|
|
recent_rate = (recent["approved"] / recent["total"] * 100) if recent["total"] else None
|
|
|
|
if baseline_rate is not None and recent_rate is not None:
|
|
drop = baseline_rate - recent_rate
|
|
if drop > APPROVAL_DROP_THRESHOLD:
|
|
alerts.append({
|
|
"id": "quality_regression:overall",
|
|
"severity": "critical",
|
|
"category": "quality",
|
|
"title": f"Approval rate dropped {drop:.0f}pp (24h: {recent_rate:.0f}% vs 7d: {baseline_rate:.0f}%)",
|
|
"detail": (
|
|
f"24h approval rate ({recent_rate:.1f}%) is {drop:.1f} percentage points below "
|
|
f"7-day baseline ({baseline_rate:.1f}%). "
|
|
f"Evaluated {recent['total']} PRs in last 24h."
|
|
),
|
|
"agent": None,
|
|
"domain": None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
# Per-agent approval rate (24h vs 7d) — only for agents with >=5 evals in each window
|
|
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
|
|
_check_approval_by_dimension(conn, alerts, "agent", "COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))")
|
|
|
|
# Per-domain approval rate (24h vs 7d) — Theseus addition
|
|
_check_approval_by_dimension(conn, alerts, "domain", "json_extract(detail, '$.domain')")
|
|
|
|
return alerts
|
|
|
|
|
|
def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr):
|
|
"""Check approval rate regression grouped by a dimension (agent or domain)."""
|
|
# 7-day baseline per dimension
|
|
baseline_rows = conn.execute(
|
|
f"""SELECT {dim_expr} as dim_val,
|
|
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
|
COUNT(*) as total
|
|
FROM audit_log
|
|
WHERE stage='evaluate'
|
|
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-7 days')
|
|
AND {dim_expr} IS NOT NULL
|
|
GROUP BY dim_val HAVING total >= 5"""
|
|
).fetchall()
|
|
baselines = {r["dim_val"]: (r["approved"] / r["total"] * 100) for r in baseline_rows}
|
|
|
|
# 24h per dimension
|
|
recent_rows = conn.execute(
|
|
f"""SELECT {dim_expr} as dim_val,
|
|
COUNT(CASE WHEN event='approved' THEN 1 END) as approved,
|
|
COUNT(*) as total
|
|
FROM audit_log
|
|
WHERE stage='evaluate'
|
|
AND event IN ('approved','changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-24 hours')
|
|
AND {dim_expr} IS NOT NULL
|
|
GROUP BY dim_val HAVING total >= 5"""
|
|
).fetchall()
|
|
|
|
for r in recent_rows:
|
|
val = r["dim_val"]
|
|
if val not in baselines:
|
|
continue
|
|
recent_rate = r["approved"] / r["total"] * 100
|
|
base_rate = baselines[val]
|
|
drop = base_rate - recent_rate
|
|
if drop > APPROVAL_DROP_THRESHOLD:
|
|
alerts.append({
|
|
"id": f"quality_regression:{dim_name}:{val}",
|
|
"severity": "warning",
|
|
"category": "quality",
|
|
"title": f"{dim_name.title()} '{val}' approval dropped {drop:.0f}pp",
|
|
"detail": (
|
|
f"24h: {recent_rate:.1f}% vs 7d baseline: {base_rate:.1f}% "
|
|
f"({r['total']} evals in 24h)."
|
|
),
|
|
"agent": val if dim_name == "agent" else None,
|
|
"domain": val if dim_name == "domain" else None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
|
|
# ─── Check: Throughput Anomaly ──────────────────────────────────────────────
|
|
|
|
|
|
def check_throughput(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Detect throughput stalling — today vs 7-day SMA."""
|
|
alerts = []
|
|
|
|
# Daily merged counts for last 7 days
|
|
rows = conn.execute(
|
|
"""SELECT date(merged_at) as day, COUNT(*) as n
|
|
FROM prs WHERE merged_at > datetime('now', '-7 days')
|
|
GROUP BY day ORDER BY day"""
|
|
).fetchall()
|
|
|
|
if len(rows) < 2:
|
|
return alerts # Not enough data
|
|
|
|
daily_counts = [r["n"] for r in rows]
|
|
sma = statistics.mean(daily_counts[:-1]) if len(daily_counts) > 1 else daily_counts[0]
|
|
today_count = daily_counts[-1]
|
|
|
|
if sma > 0 and today_count < sma * THROUGHPUT_DROP_RATIO:
|
|
alerts.append({
|
|
"id": "throughput:stalling",
|
|
"severity": "warning",
|
|
"category": "throughput",
|
|
"title": f"Throughput stalling: {today_count} merges today vs {sma:.0f}/day avg",
|
|
"detail": (
|
|
f"Today's merge count ({today_count}) is below {THROUGHPUT_DROP_RATIO:.0%} of "
|
|
f"7-day average ({sma:.1f}/day). Daily counts: {daily_counts}."
|
|
),
|
|
"agent": None,
|
|
"domain": None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
return alerts
|
|
|
|
|
|
# ─── Check: Rejection Reason Spike ─────────────────────────────────────────
|
|
|
|
|
|
def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections."""
|
|
alerts = []
|
|
|
|
# Total rejections in 24h
|
|
total = conn.execute(
|
|
"""SELECT COUNT(*) as n FROM audit_log
|
|
WHERE stage='evaluate'
|
|
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-24 hours')"""
|
|
).fetchone()["n"]
|
|
|
|
if total < 10:
|
|
return alerts # Not enough data
|
|
|
|
# Count by rejection tag
|
|
tags = conn.execute(
|
|
"""SELECT value as tag, COUNT(*) as cnt
|
|
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
|
WHERE stage='evaluate'
|
|
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-24 hours')
|
|
GROUP BY tag ORDER BY cnt DESC"""
|
|
).fetchall()
|
|
|
|
for t in tags:
|
|
ratio = t["cnt"] / total
|
|
if ratio > REJECTION_SPIKE_RATIO:
|
|
alerts.append({
|
|
"id": f"rejection_spike:{t['tag']}",
|
|
"severity": "warning",
|
|
"category": "quality",
|
|
"title": f"Rejection reason '{t['tag']}' at {ratio:.0%} of rejections",
|
|
"detail": (
|
|
f"'{t['tag']}' accounts for {t['cnt']}/{total} rejections in 24h "
|
|
f"({ratio:.1%}). Threshold: {REJECTION_SPIKE_RATIO:.0%}."
|
|
),
|
|
"agent": None,
|
|
"domain": None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
return alerts
|
|
|
|
|
|
# ─── Check: Stuck Loops ────────────────────────────────────────────────────
|
|
|
|
|
|
def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Detect agents repeatedly failing on the same rejection reason."""
|
|
alerts = []
|
|
|
|
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
|
|
rows = conn.execute(
|
|
"""SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
|
|
value as tag,
|
|
COUNT(*) as cnt
|
|
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
|
WHERE stage='evaluate'
|
|
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-6 hours')
|
|
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL
|
|
GROUP BY agent, tag
|
|
HAVING cnt > ?""",
|
|
(STUCK_LOOP_THRESHOLD,),
|
|
).fetchall()
|
|
|
|
for r in rows:
|
|
alerts.append({
|
|
"id": f"stuck_loop:{r['agent']}:{r['tag']}",
|
|
"severity": "critical",
|
|
"category": "health",
|
|
"title": f"Agent '{r['agent']}' stuck: '{r['tag']}' failed {r['cnt']}x in 6h",
|
|
"detail": (
|
|
f"Agent '{r['agent']}' has been rejected for '{r['tag']}' "
|
|
f"{r['cnt']} times in the last 6 hours (threshold: {STUCK_LOOP_THRESHOLD}). "
|
|
f"Stop and reassess."
|
|
),
|
|
"agent": r["agent"],
|
|
"domain": None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
return alerts
|
|
|
|
|
|
# ─── Check: Cost Spikes ────────────────────────────────────────────────────
|
|
|
|
|
|
def check_cost_spikes(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Detect daily cost exceeding 2x of 7-day average per agent."""
|
|
alerts = []
|
|
|
|
# Check if costs table exists and has agent column
|
|
try:
|
|
cols = conn.execute("PRAGMA table_info(costs)").fetchall()
|
|
col_names = {c["name"] for c in cols}
|
|
except sqlite3.Error:
|
|
return alerts
|
|
|
|
if "agent" not in col_names or "cost_usd" not in col_names:
|
|
# Fall back to per-PR cost tracking
|
|
rows = conn.execute(
|
|
"""SELECT agent,
|
|
SUM(CASE WHEN created_at > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost,
|
|
SUM(CASE WHEN created_at > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily
|
|
FROM prs WHERE agent IS NOT NULL AND cost_usd > 0
|
|
GROUP BY agent
|
|
HAVING avg_daily > 0"""
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"""SELECT agent,
|
|
SUM(CASE WHEN timestamp > datetime('now', '-1 day') THEN cost_usd ELSE 0 END) as today_cost,
|
|
SUM(CASE WHEN timestamp > datetime('now', '-7 days') THEN cost_usd ELSE 0 END) / 7.0 as avg_daily
|
|
FROM costs WHERE agent IS NOT NULL
|
|
GROUP BY agent
|
|
HAVING avg_daily > 0"""
|
|
).fetchall()
|
|
|
|
for r in rows:
|
|
if r["avg_daily"] and r["today_cost"] > r["avg_daily"] * COST_SPIKE_RATIO:
|
|
ratio = r["today_cost"] / r["avg_daily"]
|
|
alerts.append({
|
|
"id": f"cost_spike:{r['agent']}",
|
|
"severity": "warning",
|
|
"category": "health",
|
|
"title": f"Agent '{r['agent']}' cost spike: ${r['today_cost']:.2f} today ({ratio:.1f}x avg)",
|
|
"detail": (
|
|
f"Today's cost (${r['today_cost']:.2f}) is {ratio:.1f}x the 7-day daily average "
|
|
f"(${r['avg_daily']:.2f}). Threshold: {COST_SPIKE_RATIO}x."
|
|
),
|
|
"agent": r["agent"],
|
|
"domain": None,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
return alerts
|
|
|
|
|
|
# ─── Check: Domain Rejection Patterns (Theseus addition) ───────────────────
|
|
|
|
|
|
def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Track rejection reason shift per domain — surfaces domain maturity issues."""
|
|
alerts = []
|
|
|
|
# Per-domain rejection breakdown in 24h
|
|
rows = conn.execute(
|
|
"""SELECT json_extract(detail, '$.domain') as domain,
|
|
value as tag,
|
|
COUNT(*) as cnt
|
|
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
|
WHERE stage='evaluate'
|
|
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
|
AND timestamp > datetime('now', '-24 hours')
|
|
AND json_extract(detail, '$.domain') IS NOT NULL
|
|
GROUP BY domain, tag
|
|
ORDER BY domain, cnt DESC"""
|
|
).fetchall()
|
|
|
|
# Group by domain
|
|
domain_tags = {}
|
|
for r in rows:
|
|
d = r["domain"]
|
|
if d not in domain_tags:
|
|
domain_tags[d] = []
|
|
domain_tags[d].append({"tag": r["tag"], "count": r["cnt"]})
|
|
|
|
# Flag if a domain has >50% of rejections from a single reason (concentrated failure)
|
|
for domain, tags in domain_tags.items():
|
|
total = sum(t["count"] for t in tags)
|
|
if total < 5:
|
|
continue
|
|
top = tags[0]
|
|
ratio = top["count"] / total
|
|
if ratio > 0.5:
|
|
alerts.append({
|
|
"id": f"domain_rejection_pattern:{domain}:{top['tag']}",
|
|
"severity": "info",
|
|
"category": "failure_pattern",
|
|
"title": f"Domain '{domain}': {ratio:.0%} of rejections are '{top['tag']}'",
|
|
"detail": (
|
|
f"In domain '{domain}', {top['count']}/{total} rejections (24h) are for "
|
|
f"'{top['tag']}'. This may indicate a systematic issue with evidence standards "
|
|
f"or schema compliance in this domain."
|
|
),
|
|
"agent": None,
|
|
"domain": domain,
|
|
"detected_at": _now_iso(),
|
|
"auto_resolve": True,
|
|
})
|
|
|
|
return alerts
|
|
|
|
|
|
# ─── Failure Report Generator ───────────────────────────────────────────────
|
|
|
|
|
|
def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 24) -> dict | None:
|
|
"""Compile a failure report for a specific agent.
|
|
|
|
Returns top rejection reasons, example PRs, and suggested fixes.
|
|
Designed to be sent directly to the agent via Pentagon messaging.
|
|
"""
|
|
hours = int(hours) # defensive — callers should pass int, but enforce it
|
|
rows = conn.execute(
|
|
"""SELECT value as tag, COUNT(*) as cnt,
|
|
GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers
|
|
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
|
WHERE stage='evaluate'
|
|
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
|
AND json_extract(detail, '$.agent') = ?
|
|
AND timestamp > datetime('now', ? || ' hours')
|
|
GROUP BY tag ORDER BY cnt DESC
|
|
LIMIT 5""",
|
|
(agent, f"-{hours}"),
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
return None
|
|
|
|
total_rejections = sum(r["cnt"] for r in rows)
|
|
top_reasons = []
|
|
for r in rows:
|
|
prs = r["pr_numbers"].split(",")[:3] if r["pr_numbers"] else []
|
|
top_reasons.append({
|
|
"reason": r["tag"],
|
|
"count": r["cnt"],
|
|
"pct": round(r["cnt"] / total_rejections * 100, 1),
|
|
"example_prs": prs,
|
|
"suggestion": _suggest_fix(r["tag"]),
|
|
})
|
|
|
|
return {
|
|
"agent": agent,
|
|
"period_hours": hours,
|
|
"total_rejections": total_rejections,
|
|
"top_reasons": top_reasons,
|
|
"generated_at": _now_iso(),
|
|
}
|
|
|
|
|
|
def _suggest_fix(rejection_tag: str) -> str:
|
|
"""Map known rejection reasons to actionable suggestions."""
|
|
suggestions = {
|
|
"broken_wiki_links": "Check that all [[wiki links]] in claims resolve to existing files. Run link validation before submitting.",
|
|
"near_duplicate": "Search existing claims before creating new ones. Use semantic search to find similar claims.",
|
|
"frontmatter_schema": "Validate YAML frontmatter against the claim schema. Required fields: title, domain, confidence, type.",
|
|
"weak_evidence": "Add concrete sources, data points, or citations. Claims need evidence that can be independently verified.",
|
|
"missing_confidence": "Every claim needs a confidence level: proven, likely, experimental, or speculative.",
|
|
"domain_mismatch": "Ensure claims are filed under the correct domain. Check domain definitions if unsure.",
|
|
"too_broad": "Break broad claims into specific, testable sub-claims.",
|
|
"missing_links": "Claims should link to related claims, entities, or sources. Isolated claims are harder to verify.",
|
|
}
|
|
return suggestions.get(rejection_tag, f"Review rejection reason '{rejection_tag}' and adjust extraction accordingly.")
|
|
|
|
|
|
# ─── Run All Checks ────────────────────────────────────────────────────────
|
|
|
|
|
|
def run_all_checks(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Execute all check functions and return combined alerts."""
|
|
alerts = []
|
|
alerts.extend(check_agent_health(conn))
|
|
alerts.extend(check_quality_regression(conn))
|
|
alerts.extend(check_throughput(conn))
|
|
alerts.extend(check_rejection_spike(conn))
|
|
alerts.extend(check_stuck_loops(conn))
|
|
alerts.extend(check_cost_spikes(conn))
|
|
alerts.extend(check_domain_rejection_patterns(conn))
|
|
return alerts
|
|
|
|
|
|
def format_alert_message(alert: dict) -> str:
|
|
"""Format an alert for Pentagon messaging."""
|
|
severity_icon = {"critical": "!!", "warning": "!", "info": "~"}
|
|
icon = severity_icon.get(alert["severity"], "?")
|
|
return f"[{icon}] {alert['title']}\n{alert['detail']}"
|