teleo-infrastructure/lib/watchdog.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

138 lines
5.3 KiB
Python

"""Pipeline health watchdog — detects stalls and model failures fast.
Runs every 60 seconds (inside the existing health check or as its own stage).
Checks for conditions that have caused pipeline stalls:
1. Eval stall: open PRs with tier0_pass=1 but no eval event in 5 minutes
2. Breaker open: any circuit breaker in open state
3. Model API failure: 400/401 errors indicating invalid model ID or auth failure
4. Zombie accumulation: PRs with exhausted fix budget sitting in open
When a condition is detected, logs a WARNING with specific diagnosis.
Future: could trigger Pentagon notification or webhook.
Epimetheus owns this module. Born from 3 stall incidents in 2 sessions.
"""
import json
import logging
from datetime import datetime, timezone
from . import config, db
logger = logging.getLogger("pipeline.watchdog")
async def watchdog_check(conn) -> dict:
"""Run all health checks. Returns {healthy: bool, issues: [...]}.
Called every 60 seconds by the pipeline daemon.
"""
issues = []
# 1. Eval stall: open PRs ready for eval but no eval event in 5 minutes
eval_ready = conn.execute(
"""SELECT COUNT(*) as n FROM prs
WHERE status = 'open' AND tier0_pass = 1
AND domain_verdict = 'pending' AND eval_attempts < ?""",
(config.MAX_EVAL_ATTEMPTS,),
).fetchone()["n"]
if eval_ready > 0:
last_eval = conn.execute(
"SELECT MAX(timestamp) as ts FROM audit_log WHERE stage = 'evaluate'"
).fetchone()
if last_eval and last_eval["ts"]:
try:
last_ts = datetime.fromisoformat(last_eval["ts"].replace("Z", "+00:00"))
age_seconds = (datetime.now(timezone.utc) - last_ts).total_seconds()
if age_seconds > 300: # 5 minutes
issues.append({
"type": "eval_stall",
"severity": "critical",
"detail": f"{eval_ready} PRs ready for eval but no eval event in {int(age_seconds)}s",
"action": "Check eval breaker state and model API availability",
})
except (ValueError, TypeError):
pass
# 2. Breaker open
breakers = conn.execute(
"SELECT name, state, failures FROM circuit_breakers WHERE state = 'open'"
).fetchall()
for b in breakers:
issues.append({
"type": "breaker_open",
"severity": "critical",
"detail": f"Breaker '{b['name']}' is OPEN ({b['failures']} failures)",
"action": f"Check {b['name']} stage logs for root cause",
})
# 3. Model API failure pattern: 5+ recent errors from same model
recent_errors = conn.execute(
"""SELECT detail FROM audit_log
WHERE stage = 'evaluate' AND event IN ('error', 'domain_rejected')
AND timestamp > datetime('now', '-10 minutes')
ORDER BY id DESC LIMIT 10"""
).fetchall()
error_count = 0
for row in recent_errors:
detail = row["detail"] or ""
if "400" in detail or "not a valid model" in detail or "401" in detail:
error_count += 1
if error_count >= 3:
issues.append({
"type": "model_api_failure",
"severity": "critical",
"detail": f"{error_count} model API errors in last 10 minutes — possible invalid model ID or auth failure",
"action": "Check OpenRouter model IDs in config.py and API key validity",
})
# 4. Zombie PRs: open with exhausted fix budget and request_changes
zombies = conn.execute(
"""SELECT COUNT(*) as n FROM prs
WHERE status = 'open' AND fix_attempts >= ?
AND (domain_verdict = 'request_changes' OR leo_verdict = 'request_changes')""",
(config.MAX_FIX_ATTEMPTS,),
).fetchone()["n"]
if zombies > 0:
issues.append({
"type": "zombie_prs",
"severity": "warning",
"detail": f"{zombies} PRs with exhausted fix budget still open",
"action": "GC should auto-close these — check fixer.py GC logic",
})
# 5. Tier0 blockage: many PRs with tier0_pass=0 (potential validation bug)
tier0_blocked = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE status = 'open' AND tier0_pass = 0"
).fetchone()["n"]
if tier0_blocked >= 5:
issues.append({
"type": "tier0_blockage",
"severity": "warning",
"detail": f"{tier0_blocked} PRs blocked at tier0_pass=0",
"action": "Check validate.py — may be the modified-file or wiki-link bug recurring",
})
# Log issues
healthy = len(issues) == 0
if not healthy:
for issue in issues:
if issue["severity"] == "critical":
logger.warning("WATCHDOG CRITICAL: %s%s", issue["type"], issue["detail"])
else:
logger.info("WATCHDOG: %s%s", issue["type"], issue["detail"])
return {"healthy": healthy, "issues": issues, "checks_run": 5}
async def watchdog_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Pipeline stage entry point. Returns (1, 0) on success."""
result = await watchdog_check(conn)
if not result["healthy"]:
db.audit(
conn, "watchdog", "issues_detected",
json.dumps({"issues": result["issues"]}),
)
return 1, 0