ganymede: fix 4 critical bugs before pipeline restart

- Fix #12: domain_review undefined on resume path — initialize to None,
  guard _parse_issues() call. Prevents NameError on PRs resuming after
  partial eval (76 PRs in this state right now).
- Fix #11: concurrent eval workers can duplicate reviews — add atomic
  UPDATE SET status='reviewing' WHERE status='open' at top of
  evaluate_pr(). Check rowcount, skip if already claimed.
- Fix #8: subprocess tracking for graceful shutdown — _active_subprocesses
  set in evaluate module, tracked in _claude_cli_call, exposed via
  kill_active_subprocesses(). Replaces dead code in teleo-pipeline.py.
- Fix health.py divide-by-zero — guard all metabolic metric reads against
  None from NULLIF/empty result set. Prevents TypeError on /health when
  no PRs have been evaluated in 24h.

Also includes Leo's existing hot-fixes:
- Rate limit detection checks stdout regardless of exit code
- 15-minute cycle-level backoff on rate limit

Pentagon-Agent: Ganymede <F99EBFA6-547B-4096-BEEA-1D59C3E4028A>
This commit is contained in:
m3taversal 2026-03-13 14:13:25 +00:00
parent 799249d470
commit f166db4f62
3 changed files with 85 additions and 32 deletions

View file

@ -25,6 +25,9 @@ from . import config, db
logger = logging.getLogger("pipeline.evaluate")
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
_active_subprocesses: set = set()
# ─── Constants ──────────────────────────────────────────────────────────────
DOMAIN_AGENT_MAP = {
@ -45,6 +48,19 @@ DOMAIN_AGENT_MAP = {
}
async def kill_active_subprocesses():
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
for proc in list(_active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
_active_subprocesses.clear()
def _agent_token(agent_name: str) -> str | None:
"""Read Forgejo token for a named agent. Returns token string or None."""
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
@ -243,6 +259,7 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
@ -253,19 +270,23 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec)
return None
finally:
_active_subprocesses.discard(proc)
out_text = (stdout or b"").decode()
err_text = (stderr or b"").decode()
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
combined_lower = (out_text + err_text).lower()
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
return "RATE_LIMITED"
if proc.returncode != 0:
err = (stderr or b"").decode()[:500]
out = (stdout or b"").decode()[:500]
# Check for rate limit — Claude CLI puts limit message on stdout, not stderr
combined = (err + out).lower()
if "hit your limit" in combined or "rate limit" in combined:
logger.warning("Claude Max rate limited (stdout: %s)", out[:200])
return "RATE_LIMITED"
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err[:200], out[:200])
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
return None
return (stdout or b"").decode().strip()
return out_text.strip()
# ─── Diff helpers ──────────────────────────────────────────────────────────
@ -481,6 +502,15 @@ async def _post_formal_approvals(pr_number: int, pr_author: str):
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Fetch diff
diff = await _get_pr_diff(pr_number)
if not diff:
@ -526,9 +556,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
tier = await _triage_pr(diff)
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Mark as reviewing
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
conn.execute(
"UPDATE prs SET status = 'reviewing', last_attempt = datetime('now') WHERE number = ?",
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
(pr_number,),
)
@ -541,6 +571,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Step 2: Domain review FIRST (Sonnet — high volume filter)
# Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
if existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
@ -640,7 +671,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier}
if domain_verdict == "request_changes":
if domain_verdict == "request_changes" and domain_review is not None:
feedback["domain_issues"] = _parse_issues(domain_review)
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
@ -665,6 +696,14 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
}
# ─── Rate limit backoff ───────────────────────────────────────────────────
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
# CLI calls overnight when Opus is exhausted.
_rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
@ -673,6 +712,19 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
Evaluates in priority order.
"""
global _rate_limit_backoff_until
# If we're in rate-limit backoff, skip this cycle entirely
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining)
return 0, 0
else:
logger.info("Rate limit backoff expired, resuming eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation:
# - status = 'open'
# - tier0_pass = 1 (passed validation)
@ -709,8 +761,20 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
try:
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
# Rate limited — don't count as failure, just skip
logger.debug("PR #%d skipped: %s", row["number"], result.get("reason"))
reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], reason)
# Any rate limit — stop the entire cycle. No point trying more PRs
# when the model is exhausted. The 10-minute backoff on last_attempt
# prevents re-processing the same PR; breaking here prevents
# cycling through OTHER PRs that will also hit the same limit.
if "rate_limited" in reason:
from datetime import timedelta
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info("Rate limited (%s) — backing off for %d minutes",
reason, _RATE_LIMIT_BACKOFF_MINUTES)
break
else:
succeeded += 1
except Exception:

View file

@ -76,9 +76,9 @@ async def handle_health(request):
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
"budget": budget,
"metabolic": {
"null_result_rate_24h": round(null_rate["rate"] or 0, 3),
"domain_approval_rate_24h": round(approval_rate["domain_rate"] or 0, 3) if approval_rate["domain_rate"] else None,
"leo_approval_rate_24h": round(approval_rate["leo_rate"] or 0, 3) if approval_rate["leo_rate"] else None,
"null_result_rate_24h": round(null_rate["rate"], 3) if null_rate and null_rate["rate"] is not None else None,
"domain_approval_rate_24h": round(approval_rate["domain_rate"], 3) if approval_rate and approval_rate["domain_rate"] is not None else None,
"leo_approval_rate_24h": round(approval_rate["leo_rate"], 3) if approval_rate and approval_rate["leo_rate"] is not None else None,
},
"recent_activity": [
{"stage": r["stage"], "event": r["event"], "count": r["n"]}
@ -111,7 +111,7 @@ async def handle_health(request):
if not budget["ok"]:
body["status"] = "budget_exhausted"
# Rubber-stamp warning (Vida)
if (approval_rate["domain_rate"] or 0) > 0.95:
if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95:
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
status_code = 200 if body["status"] == "healthy" else 503

View file

@ -20,16 +20,13 @@ from lib.health import start_health_server, stop_health_server
from lib.breaker import CircuitBreaker
from lib.merge import merge_cycle
from lib.validate import validate_cycle
from lib.evaluate import evaluate_cycle
from lib.evaluate import evaluate_cycle, kill_active_subprocesses
logger = logging.getLogger("pipeline")
# Global shutdown event — stages check this between iterations
shutdown_event = asyncio.Event()
# Track active subprocesses for cleanup
active_subprocesses: set = set()
async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker):
"""Generic stage loop with interval, shutdown check, and circuit breaker."""
@ -84,16 +81,8 @@ def handle_signal(sig):
async def kill_subprocesses():
"""Kill any lingering Claude CLI subprocesses."""
for proc in list(active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
active_subprocesses.clear()
"""Kill any lingering Claude CLI subprocesses (delegates to evaluate module)."""
await kill_active_subprocesses()
async def cleanup_orphan_worktrees():