ganymede: fix 4 critical bugs before pipeline restart #1

Merged
m3taversal merged 1 commit from ganymede/phase1-critical-fixes into main 2026-03-13 14:35:18 +00:00
3 changed files with 85 additions and 32 deletions

View file

@ -25,6 +25,9 @@ from . import config, db
logger = logging.getLogger("pipeline.evaluate") logger = logging.getLogger("pipeline.evaluate")
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
_active_subprocesses: set = set()
# ─── Constants ────────────────────────────────────────────────────────────── # ─── Constants ──────────────────────────────────────────────────────────────
DOMAIN_AGENT_MAP = { DOMAIN_AGENT_MAP = {
@ -45,6 +48,19 @@ DOMAIN_AGENT_MAP = {
} }
async def kill_active_subprocesses():
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
for proc in list(_active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
_active_subprocesses.clear()
def _agent_token(agent_name: str) -> str | None: def _agent_token(agent_name: str) -> str | None:
"""Read Forgejo token for a named agent. Returns token string or None.""" """Read Forgejo token for a named agent. Returns token string or None."""
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token" token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
@ -243,6 +259,7 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
) )
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
try: try:
stdout, stderr = await asyncio.wait_for( stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()), proc.communicate(input=prompt.encode()),
@ -253,19 +270,23 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
await proc.wait() await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec) logger.error("Claude CLI timed out after %ds", timeout_sec)
return None return None
finally:
_active_subprocesses.discard(proc)
out_text = (stdout or b"").decode()
err_text = (stderr or b"").decode()
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
combined_lower = (out_text + err_text).lower()
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
return "RATE_LIMITED"
if proc.returncode != 0: if proc.returncode != 0:
err = (stderr or b"").decode()[:500] logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
out = (stdout or b"").decode()[:500]
# Check for rate limit — Claude CLI puts limit message on stdout, not stderr
combined = (err + out).lower()
if "hit your limit" in combined or "rate limit" in combined:
logger.warning("Claude Max rate limited (stdout: %s)", out[:200])
return "RATE_LIMITED"
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err[:200], out[:200])
return None return None
return (stdout or b"").decode().strip() return out_text.strip()
# ─── Diff helpers ────────────────────────────────────────────────────────── # ─── Diff helpers ──────────────────────────────────────────────────────────
@ -481,6 +502,15 @@ async def _post_formal_approvals(pr_number: int, pr_author: str):
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict.""" """Evaluate a single PR. Returns result dict."""
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Fetch diff # Fetch diff
diff = await _get_pr_diff(pr_number) diff = await _get_pr_diff(pr_number)
if not diff: if not diff:
@ -526,9 +556,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
tier = await _triage_pr(diff) tier = await _triage_pr(diff)
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Mark as reviewing # Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
conn.execute( conn.execute(
"UPDATE prs SET status = 'reviewing', last_attempt = datetime('now') WHERE number = ?", "UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
(pr_number,), (pr_number,),
) )
@ -541,6 +571,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Step 2: Domain review FIRST (Sonnet — high volume filter) # Step 2: Domain review FIRST (Sonnet — high volume filter)
# Skip if already completed from a previous attempt # Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
if existing_domain_verdict not in ("pending", None): if existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
@ -640,7 +671,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
) )
# Store feedback for re-extraction path # Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier} feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier}
if domain_verdict == "request_changes": if domain_verdict == "request_changes" and domain_review is not None:
feedback["domain_issues"] = _parse_issues(domain_review) feedback["domain_issues"] = _parse_issues(domain_review)
conn.execute( conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
@ -665,6 +696,14 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
} }
# ─── Rate limit backoff ───────────────────────────────────────────────────
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
# CLI calls overnight when Opus is exhausted.
_rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Main entry point ────────────────────────────────────────────────────── # ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
@ -673,6 +712,19 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
Finds PRs with status='open', tier0_pass=1, and no pending verdicts. Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
Evaluates in priority order. Evaluates in priority order.
""" """
global _rate_limit_backoff_until
# If we're in rate-limit backoff, skip this cycle entirely
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining)
return 0, 0
else:
logger.info("Rate limit backoff expired, resuming eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation: # Find PRs ready for evaluation:
# - status = 'open' # - status = 'open'
# - tier0_pass = 1 (passed validation) # - tier0_pass = 1 (passed validation)
@ -709,8 +761,20 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
try: try:
result = await evaluate_pr(conn, row["number"], tier=row["tier"]) result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"): if result.get("skipped"):
# Rate limited — don't count as failure, just skip reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], result.get("reason")) logger.debug("PR #%d skipped: %s", row["number"], reason)
# Any rate limit — stop the entire cycle. No point trying more PRs
# when the model is exhausted. The 10-minute backoff on last_attempt
# prevents re-processing the same PR; breaking here prevents
# cycling through OTHER PRs that will also hit the same limit.
if "rate_limited" in reason:
from datetime import timedelta
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info("Rate limited (%s) — backing off for %d minutes",
reason, _RATE_LIMIT_BACKOFF_MINUTES)
break
else: else:
succeeded += 1 succeeded += 1
except Exception: except Exception:

View file

@ -76,9 +76,9 @@ async def handle_health(request):
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue}, "merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
"budget": budget, "budget": budget,
"metabolic": { "metabolic": {
"null_result_rate_24h": round(null_rate["rate"] or 0, 3), "null_result_rate_24h": round(null_rate["rate"], 3) if null_rate and null_rate["rate"] is not None else None,
"domain_approval_rate_24h": round(approval_rate["domain_rate"] or 0, 3) if approval_rate["domain_rate"] else None, "domain_approval_rate_24h": round(approval_rate["domain_rate"], 3) if approval_rate and approval_rate["domain_rate"] is not None else None,
"leo_approval_rate_24h": round(approval_rate["leo_rate"] or 0, 3) if approval_rate["leo_rate"] else None, "leo_approval_rate_24h": round(approval_rate["leo_rate"], 3) if approval_rate and approval_rate["leo_rate"] is not None else None,
}, },
"recent_activity": [ "recent_activity": [
{"stage": r["stage"], "event": r["event"], "count": r["n"]} {"stage": r["stage"], "event": r["event"], "count": r["n"]}
@ -111,7 +111,7 @@ async def handle_health(request):
if not budget["ok"]: if not budget["ok"]:
body["status"] = "budget_exhausted" body["status"] = "budget_exhausted"
# Rubber-stamp warning (Vida) # Rubber-stamp warning (Vida)
if (approval_rate["domain_rate"] or 0) > 0.95: if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95:
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping" body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
status_code = 200 if body["status"] == "healthy" else 503 status_code = 200 if body["status"] == "healthy" else 503

View file

@ -20,16 +20,13 @@ from lib.health import start_health_server, stop_health_server
from lib.breaker import CircuitBreaker from lib.breaker import CircuitBreaker
from lib.merge import merge_cycle from lib.merge import merge_cycle
from lib.validate import validate_cycle from lib.validate import validate_cycle
from lib.evaluate import evaluate_cycle from lib.evaluate import evaluate_cycle, kill_active_subprocesses
logger = logging.getLogger("pipeline") logger = logging.getLogger("pipeline")
# Global shutdown event — stages check this between iterations # Global shutdown event — stages check this between iterations
shutdown_event = asyncio.Event() shutdown_event = asyncio.Event()
# Track active subprocesses for cleanup
active_subprocesses: set = set()
async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker): async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker):
"""Generic stage loop with interval, shutdown check, and circuit breaker.""" """Generic stage loop with interval, shutdown check, and circuit breaker."""
@ -84,16 +81,8 @@ def handle_signal(sig):
async def kill_subprocesses(): async def kill_subprocesses():
"""Kill any lingering Claude CLI subprocesses.""" """Kill any lingering Claude CLI subprocesses (delegates to evaluate module)."""
for proc in list(active_subprocesses): await kill_active_subprocesses()
if proc.returncode is None:
logger.warning("Killing lingering subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
active_subprocesses.clear()
async def cleanup_orphan_worktrees(): async def cleanup_orphan_worktrees():