diff --git a/lib/evaluate.py b/lib/evaluate.py index a11e4e2..033f59a 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -25,6 +25,9 @@ from . import config, db logger = logging.getLogger("pipeline.evaluate") +# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8) +_active_subprocesses: set = set() + # ─── Constants ────────────────────────────────────────────────────────────── DOMAIN_AGENT_MAP = { @@ -45,6 +48,19 @@ DOMAIN_AGENT_MAP = { } +async def kill_active_subprocesses(): + """Kill all tracked Claude CLI subprocesses. Called during graceful shutdown.""" + for proc in list(_active_subprocesses): + if proc.returncode is None: + logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid) + try: + proc.kill() + await proc.wait() + except ProcessLookupError: + pass + _active_subprocesses.clear() + + def _agent_token(agent_name: str) -> str | None: """Read Forgejo token for a named agent. Returns token string or None.""" token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token" @@ -243,6 +259,7 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) + _active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8) try: stdout, stderr = await asyncio.wait_for( proc.communicate(input=prompt.encode()), @@ -253,19 +270,23 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: await proc.wait() logger.error("Claude CLI timed out after %ds", timeout_sec) return None + finally: + _active_subprocesses.discard(proc) + + out_text = (stdout or b"").decode() + err_text = (stderr or b"").decode() + + # Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message + combined_lower = (out_text + err_text).lower() + if "hit your limit" in combined_lower or "rate limit" in combined_lower: + logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200]) + return "RATE_LIMITED" if proc.returncode != 0: - err = (stderr or b"").decode()[:500] - out = (stdout or b"").decode()[:500] - # Check for rate limit — Claude CLI puts limit message on stdout, not stderr - combined = (err + out).lower() - if "hit your limit" in combined or "rate limit" in combined: - logger.warning("Claude Max rate limited (stdout: %s)", out[:200]) - return "RATE_LIMITED" - logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err[:200], out[:200]) + logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200]) return None - return (stdout or b"").decode().strip() + return out_text.strip() # ─── Diff helpers ────────────────────────────────────────────────────────── @@ -481,6 +502,15 @@ async def _post_formal_approvals(pr_number: int, pr_author: str): async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" + # Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11) + cursor = conn.execute( + "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", + (pr_number,), + ) + if cursor.rowcount == 0: + logger.debug("PR #%d already claimed by another worker, skipping", pr_number) + return {"pr": pr_number, "skipped": True, "reason": "already_claimed"} + # Fetch diff diff = await _get_pr_diff(pr_number) if not diff: @@ -526,9 +556,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: tier = await _triage_pr(diff) conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) - # Mark as reviewing + # Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above) conn.execute( - "UPDATE prs SET status = 'reviewing', last_attempt = datetime('now') WHERE number = ?", + "UPDATE prs SET last_attempt = datetime('now') WHERE number = ?", (pr_number,), ) @@ -541,6 +571,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Step 2: Domain review FIRST (Sonnet — high volume filter) # Skip if already completed from a previous attempt + domain_review = None # Initialize — used later for feedback extraction (Ganymede #12) if existing_domain_verdict not in ("pending", None): domain_verdict = existing_domain_verdict logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) @@ -640,7 +671,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) # Store feedback for re-extraction path feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier} - if domain_verdict == "request_changes": + if domain_verdict == "request_changes" and domain_review is not None: feedback["domain_issues"] = _parse_issues(domain_review) conn.execute( "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", @@ -665,6 +696,14 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: } +# ─── Rate limit backoff ─────────────────────────────────────────────────── + +# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted +# CLI calls overnight when Opus is exhausted. +_rate_limit_backoff_until: datetime | None = None +_RATE_LIMIT_BACKOFF_MINUTES = 15 + + # ─── Main entry point ────────────────────────────────────────────────────── async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: @@ -673,6 +712,19 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: Finds PRs with status='open', tier0_pass=1, and no pending verdicts. Evaluates in priority order. """ + global _rate_limit_backoff_until + + # If we're in rate-limit backoff, skip this cycle entirely + if _rate_limit_backoff_until is not None: + now = datetime.now(timezone.utc) + if now < _rate_limit_backoff_until: + remaining = int((_rate_limit_backoff_until - now).total_seconds()) + logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining) + return 0, 0 + else: + logger.info("Rate limit backoff expired, resuming eval cycles") + _rate_limit_backoff_until = None + # Find PRs ready for evaluation: # - status = 'open' # - tier0_pass = 1 (passed validation) @@ -709,8 +761,20 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: try: result = await evaluate_pr(conn, row["number"], tier=row["tier"]) if result.get("skipped"): - # Rate limited — don't count as failure, just skip - logger.debug("PR #%d skipped: %s", row["number"], result.get("reason")) + reason = result.get("reason", "") + logger.debug("PR #%d skipped: %s", row["number"], reason) + # Any rate limit — stop the entire cycle. No point trying more PRs + # when the model is exhausted. The 10-minute backoff on last_attempt + # prevents re-processing the same PR; breaking here prevents + # cycling through OTHER PRs that will also hit the same limit. + if "rate_limited" in reason: + from datetime import timedelta + _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( + minutes=_RATE_LIMIT_BACKOFF_MINUTES + ) + logger.info("Rate limited (%s) — backing off for %d minutes", + reason, _RATE_LIMIT_BACKOFF_MINUTES) + break else: succeeded += 1 except Exception: diff --git a/lib/health.py b/lib/health.py index 00d4045..3ef09dd 100644 --- a/lib/health.py +++ b/lib/health.py @@ -76,9 +76,9 @@ async def handle_health(request): "merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue}, "budget": budget, "metabolic": { - "null_result_rate_24h": round(null_rate["rate"] or 0, 3), - "domain_approval_rate_24h": round(approval_rate["domain_rate"] or 0, 3) if approval_rate["domain_rate"] else None, - "leo_approval_rate_24h": round(approval_rate["leo_rate"] or 0, 3) if approval_rate["leo_rate"] else None, + "null_result_rate_24h": round(null_rate["rate"], 3) if null_rate and null_rate["rate"] is not None else None, + "domain_approval_rate_24h": round(approval_rate["domain_rate"], 3) if approval_rate and approval_rate["domain_rate"] is not None else None, + "leo_approval_rate_24h": round(approval_rate["leo_rate"], 3) if approval_rate and approval_rate["leo_rate"] is not None else None, }, "recent_activity": [ {"stage": r["stage"], "event": r["event"], "count": r["n"]} @@ -111,7 +111,7 @@ async def handle_health(request): if not budget["ok"]: body["status"] = "budget_exhausted" # Rubber-stamp warning (Vida) - if (approval_rate["domain_rate"] or 0) > 0.95: + if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95: body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping" status_code = 200 if body["status"] == "healthy" else 503 diff --git a/teleo-pipeline.py b/teleo-pipeline.py index 002412e..ad66b97 100644 --- a/teleo-pipeline.py +++ b/teleo-pipeline.py @@ -20,16 +20,13 @@ from lib.health import start_health_server, stop_health_server from lib.breaker import CircuitBreaker from lib.merge import merge_cycle from lib.validate import validate_cycle -from lib.evaluate import evaluate_cycle +from lib.evaluate import evaluate_cycle, kill_active_subprocesses logger = logging.getLogger("pipeline") # Global shutdown event — stages check this between iterations shutdown_event = asyncio.Event() -# Track active subprocesses for cleanup -active_subprocesses: set = set() - async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker): """Generic stage loop with interval, shutdown check, and circuit breaker.""" @@ -84,16 +81,8 @@ def handle_signal(sig): async def kill_subprocesses(): - """Kill any lingering Claude CLI subprocesses.""" - for proc in list(active_subprocesses): - if proc.returncode is None: - logger.warning("Killing lingering subprocess PID %d", proc.pid) - try: - proc.kill() - await proc.wait() - except ProcessLookupError: - pass - active_subprocesses.clear() + """Kill any lingering Claude CLI subprocesses (delegates to evaluate module).""" + await kill_active_subprocesses() async def cleanup_orphan_worktrees():