"""Pipeline health watchdog — detects stalls and model failures fast. Runs every 60 seconds (inside the existing health check or as its own stage). Checks for conditions that have caused pipeline stalls: 1. Eval stall: open PRs with tier0_pass=1 but no eval event in 5 minutes 2. Breaker open: any circuit breaker in open state 3. Model API failure: 400/401 errors indicating invalid model ID or auth failure 4. Zombie accumulation: PRs with exhausted fix budget sitting in open When a condition is detected, logs a WARNING with specific diagnosis. Future: could trigger Pentagon notification or webhook. Epimetheus owns this module. Born from 3 stall incidents in 2 sessions. """ import json import logging from datetime import datetime, timezone from . import config, db from .stale_pr import check_stale_prs logger = logging.getLogger("pipeline.watchdog") async def watchdog_check(conn) -> dict: """Run all health checks. Returns {healthy: bool, issues: [...]}. Called every 60 seconds by the pipeline daemon. """ issues = [] # 1. Eval stall: open PRs ready for eval but no eval event in 5 minutes eval_ready = conn.execute( """SELECT COUNT(*) as n FROM prs WHERE status = 'open' AND tier0_pass = 1 AND domain_verdict = 'pending' AND eval_attempts < ?""", (config.MAX_EVAL_ATTEMPTS,), ).fetchone()["n"] if eval_ready > 0: last_eval = conn.execute( "SELECT MAX(timestamp) as ts FROM audit_log WHERE stage = 'evaluate'" ).fetchone() if last_eval and last_eval["ts"]: try: last_ts = datetime.fromisoformat(last_eval["ts"].replace("Z", "+00:00")) age_seconds = (datetime.now(timezone.utc) - last_ts).total_seconds() if age_seconds > 300: # 5 minutes issues.append({ "type": "eval_stall", "severity": "critical", "detail": f"{eval_ready} PRs ready for eval but no eval event in {int(age_seconds)}s", "action": "Check eval breaker state and model API availability", }) except (ValueError, TypeError): pass # 2. Breaker open breakers = conn.execute( "SELECT name, state, failures FROM circuit_breakers WHERE state = 'open'" ).fetchall() for b in breakers: issues.append({ "type": "breaker_open", "severity": "critical", "detail": f"Breaker '{b['name']}' is OPEN ({b['failures']} failures)", "action": f"Check {b['name']} stage logs for root cause", }) # 3. Model API failure pattern: 5+ recent errors from same model recent_errors = conn.execute( """SELECT detail FROM audit_log WHERE stage = 'evaluate' AND event IN ('error', 'domain_rejected') AND timestamp > datetime('now', '-10 minutes') ORDER BY id DESC LIMIT 10""" ).fetchall() error_count = 0 for row in recent_errors: detail = row["detail"] or "" if "400" in detail or "not a valid model" in detail or "401" in detail: error_count += 1 if error_count >= 3: issues.append({ "type": "model_api_failure", "severity": "critical", "detail": f"{error_count} model API errors in last 10 minutes — possible invalid model ID or auth failure", "action": "Check OpenRouter model IDs in config.py and API key validity", }) # 4. Zombie PRs: open with exhausted fix budget and request_changes zombies = conn.execute( """SELECT COUNT(*) as n FROM prs WHERE status = 'open' AND fix_attempts >= ? AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')""", (config.MAX_FIX_ATTEMPTS,), ).fetchone()["n"] if zombies > 0: issues.append({ "type": "zombie_prs", "severity": "warning", "detail": f"{zombies} PRs with exhausted fix budget still open", "action": "GC should auto-close these — check fixer.py GC logic", }) # 5. Tier0 blockage: auto-reset stuck PRs with retry cap MAX_TIER0_RESETS = 3 TIER0_RESET_COOLDOWN_S = 3600 tier0_blocked = conn.execute( "SELECT number, branch FROM prs WHERE status = 'open' AND tier0_pass = 0" ).fetchall() if tier0_blocked: reset_count = 0 permanent_count = 0 for pr in tier0_blocked: row = conn.execute( """SELECT COUNT(*) as n, MAX(timestamp) as last_ts FROM audit_log WHERE stage = 'watchdog' AND event = 'tier0_reset' AND json_extract(detail, '$.pr') = ?""", (pr["number"],), ).fetchone() prior_resets = row["n"] if prior_resets >= MAX_TIER0_RESETS: permanent_count += 1 continue last_reset = row["last_ts"] if last_reset: try: last_ts = datetime.fromisoformat(last_reset).replace(tzinfo=timezone.utc) age = (datetime.now(timezone.utc) - last_ts).total_seconds() if age < TIER0_RESET_COOLDOWN_S: continue except (ValueError, TypeError): pass conn.execute( "UPDATE prs SET tier0_pass = NULL WHERE number = ?", (pr["number"],), ) db.audit( conn, "watchdog", "tier0_reset", json.dumps({ "pr": pr["number"], "branch": pr["branch"], "attempt": prior_resets + 1, "max": MAX_TIER0_RESETS, }), ) reset_count += 1 logger.info( "WATCHDOG: auto-reset tier0 for PR #%d (attempt %d/%d)", pr["number"], prior_resets + 1, MAX_TIER0_RESETS, ) if reset_count: issues.append({ "type": "tier0_reset", "severity": "info", "detail": f"Auto-reset {reset_count} PRs stuck at tier0_pass=0 for re-validation", "action": "Monitor — if same PRs fail again, check validate.py", }) if permanent_count: issues.append({ "type": "tier0_permanent_failure", "severity": "warning", "detail": f"{permanent_count} PRs exhausted {MAX_TIER0_RESETS} tier0 retries — manual intervention needed", "action": "Inspect PR content or close stale PRs", }) # 6. Stale extraction PRs: open >30 min with no claim files try: stale_closed, stale_errors = await check_stale_prs(conn) if stale_closed > 0: issues.append({ "type": "stale_prs_closed", "severity": "info", "detail": f"Auto-closed {stale_closed} stale extraction PRs (no claims after 30 min)", "action": "Check batch-extract logs for extraction failures", }) if stale_errors > 0: issues.append({ "type": "stale_pr_close_failed", "severity": "warning", "detail": f"Failed to close {stale_errors} stale PRs", "action": "Check Forgejo API connectivity", }) except Exception as e: logger.warning("Stale PR check failed: %s", e) # Log issues healthy = len(issues) == 0 if not healthy: for issue in issues: if issue["severity"] == "critical": logger.warning("WATCHDOG CRITICAL: %s — %s", issue["type"], issue["detail"]) else: logger.info("WATCHDOG: %s — %s", issue["type"], issue["detail"]) return {"healthy": healthy, "issues": issues, "checks_run": 6} async def watchdog_cycle(conn, max_workers=None) -> tuple[int, int]: """Pipeline stage entry point. Returns (1, 0) on success.""" result = await watchdog_check(conn) if not result["healthy"]: db.audit( conn, "watchdog", "issues_detected", json.dumps({"issues": result["issues"]}), ) return 1, 0