ganymede: fix 4 critical bugs before pipeline restart #1
3 changed files with 85 additions and 32 deletions
|
|
@ -25,6 +25,9 @@ from . import config, db
|
|||
|
||||
logger = logging.getLogger("pipeline.evaluate")
|
||||
|
||||
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
|
||||
_active_subprocesses: set = set()
|
||||
|
||||
# ─── Constants ──────────────────────────────────────────────────────────────
|
||||
|
||||
DOMAIN_AGENT_MAP = {
|
||||
|
|
@ -45,6 +48,19 @@ DOMAIN_AGENT_MAP = {
|
|||
}
|
||||
|
||||
|
||||
async def kill_active_subprocesses():
|
||||
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
|
||||
for proc in list(_active_subprocesses):
|
||||
if proc.returncode is None:
|
||||
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
|
||||
try:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
_active_subprocesses.clear()
|
||||
|
||||
|
||||
def _agent_token(agent_name: str) -> str | None:
|
||||
"""Read Forgejo token for a named agent. Returns token string or None."""
|
||||
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
|
||||
|
|
@ -243,6 +259,7 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
|
|||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
|
||||
try:
|
||||
stdout, stderr = await asyncio.wait_for(
|
||||
proc.communicate(input=prompt.encode()),
|
||||
|
|
@ -253,19 +270,23 @@ async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
|
|||
await proc.wait()
|
||||
logger.error("Claude CLI timed out after %ds", timeout_sec)
|
||||
return None
|
||||
finally:
|
||||
_active_subprocesses.discard(proc)
|
||||
|
||||
out_text = (stdout or b"").decode()
|
||||
err_text = (stderr or b"").decode()
|
||||
|
||||
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
|
||||
combined_lower = (out_text + err_text).lower()
|
||||
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
|
||||
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
|
||||
return "RATE_LIMITED"
|
||||
|
||||
if proc.returncode != 0:
|
||||
err = (stderr or b"").decode()[:500]
|
||||
out = (stdout or b"").decode()[:500]
|
||||
# Check for rate limit — Claude CLI puts limit message on stdout, not stderr
|
||||
combined = (err + out).lower()
|
||||
if "hit your limit" in combined or "rate limit" in combined:
|
||||
logger.warning("Claude Max rate limited (stdout: %s)", out[:200])
|
||||
return "RATE_LIMITED"
|
||||
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err[:200], out[:200])
|
||||
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
|
||||
return None
|
||||
|
||||
return (stdout or b"").decode().strip()
|
||||
return out_text.strip()
|
||||
|
||||
|
||||
# ─── Diff helpers ──────────────────────────────────────────────────────────
|
||||
|
|
@ -481,6 +502,15 @@ async def _post_formal_approvals(pr_number: int, pr_author: str):
|
|||
|
||||
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
|
||||
"""Evaluate a single PR. Returns result dict."""
|
||||
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
|
||||
cursor = conn.execute(
|
||||
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
|
||||
(pr_number,),
|
||||
)
|
||||
if cursor.rowcount == 0:
|
||||
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
|
||||
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
|
||||
|
||||
# Fetch diff
|
||||
diff = await _get_pr_diff(pr_number)
|
||||
if not diff:
|
||||
|
|
@ -526,9 +556,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
|
|||
tier = await _triage_pr(diff)
|
||||
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
|
||||
|
||||
# Mark as reviewing
|
||||
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
|
||||
conn.execute(
|
||||
"UPDATE prs SET status = 'reviewing', last_attempt = datetime('now') WHERE number = ?",
|
||||
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
|
||||
(pr_number,),
|
||||
)
|
||||
|
||||
|
|
@ -541,6 +571,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
|
|||
|
||||
# Step 2: Domain review FIRST (Sonnet — high volume filter)
|
||||
# Skip if already completed from a previous attempt
|
||||
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
|
||||
if existing_domain_verdict not in ("pending", None):
|
||||
domain_verdict = existing_domain_verdict
|
||||
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
|
||||
|
|
@ -640,7 +671,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
|
|||
)
|
||||
# Store feedback for re-extraction path
|
||||
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier}
|
||||
if domain_verdict == "request_changes":
|
||||
if domain_verdict == "request_changes" and domain_review is not None:
|
||||
feedback["domain_issues"] = _parse_issues(domain_review)
|
||||
conn.execute(
|
||||
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
|
||||
|
|
@ -665,6 +696,14 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
|
|||
}
|
||||
|
||||
|
||||
# ─── Rate limit backoff ───────────────────────────────────────────────────
|
||||
|
||||
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
|
||||
# CLI calls overnight when Opus is exhausted.
|
||||
_rate_limit_backoff_until: datetime | None = None
|
||||
_RATE_LIMIT_BACKOFF_MINUTES = 15
|
||||
|
||||
|
||||
# ─── Main entry point ──────────────────────────────────────────────────────
|
||||
|
||||
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
|
||||
|
|
@ -673,6 +712,19 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
|
||||
Evaluates in priority order.
|
||||
"""
|
||||
global _rate_limit_backoff_until
|
||||
|
||||
# If we're in rate-limit backoff, skip this cycle entirely
|
||||
if _rate_limit_backoff_until is not None:
|
||||
now = datetime.now(timezone.utc)
|
||||
if now < _rate_limit_backoff_until:
|
||||
remaining = int((_rate_limit_backoff_until - now).total_seconds())
|
||||
logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining)
|
||||
return 0, 0
|
||||
else:
|
||||
logger.info("Rate limit backoff expired, resuming eval cycles")
|
||||
_rate_limit_backoff_until = None
|
||||
|
||||
# Find PRs ready for evaluation:
|
||||
# - status = 'open'
|
||||
# - tier0_pass = 1 (passed validation)
|
||||
|
|
@ -709,8 +761,20 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
try:
|
||||
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
|
||||
if result.get("skipped"):
|
||||
# Rate limited — don't count as failure, just skip
|
||||
logger.debug("PR #%d skipped: %s", row["number"], result.get("reason"))
|
||||
reason = result.get("reason", "")
|
||||
logger.debug("PR #%d skipped: %s", row["number"], reason)
|
||||
# Any rate limit — stop the entire cycle. No point trying more PRs
|
||||
# when the model is exhausted. The 10-minute backoff on last_attempt
|
||||
# prevents re-processing the same PR; breaking here prevents
|
||||
# cycling through OTHER PRs that will also hit the same limit.
|
||||
if "rate_limited" in reason:
|
||||
from datetime import timedelta
|
||||
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
|
||||
minutes=_RATE_LIMIT_BACKOFF_MINUTES
|
||||
)
|
||||
logger.info("Rate limited (%s) — backing off for %d minutes",
|
||||
reason, _RATE_LIMIT_BACKOFF_MINUTES)
|
||||
break
|
||||
else:
|
||||
succeeded += 1
|
||||
except Exception:
|
||||
|
|
|
|||
|
|
@ -76,9 +76,9 @@ async def handle_health(request):
|
|||
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
|
||||
"budget": budget,
|
||||
"metabolic": {
|
||||
"null_result_rate_24h": round(null_rate["rate"] or 0, 3),
|
||||
"domain_approval_rate_24h": round(approval_rate["domain_rate"] or 0, 3) if approval_rate["domain_rate"] else None,
|
||||
"leo_approval_rate_24h": round(approval_rate["leo_rate"] or 0, 3) if approval_rate["leo_rate"] else None,
|
||||
"null_result_rate_24h": round(null_rate["rate"], 3) if null_rate and null_rate["rate"] is not None else None,
|
||||
"domain_approval_rate_24h": round(approval_rate["domain_rate"], 3) if approval_rate and approval_rate["domain_rate"] is not None else None,
|
||||
"leo_approval_rate_24h": round(approval_rate["leo_rate"], 3) if approval_rate and approval_rate["leo_rate"] is not None else None,
|
||||
},
|
||||
"recent_activity": [
|
||||
{"stage": r["stage"], "event": r["event"], "count": r["n"]}
|
||||
|
|
@ -111,7 +111,7 @@ async def handle_health(request):
|
|||
if not budget["ok"]:
|
||||
body["status"] = "budget_exhausted"
|
||||
# Rubber-stamp warning (Vida)
|
||||
if (approval_rate["domain_rate"] or 0) > 0.95:
|
||||
if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95:
|
||||
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
|
||||
|
||||
status_code = 200 if body["status"] == "healthy" else 503
|
||||
|
|
|
|||
|
|
@ -20,16 +20,13 @@ from lib.health import start_health_server, stop_health_server
|
|||
from lib.breaker import CircuitBreaker
|
||||
from lib.merge import merge_cycle
|
||||
from lib.validate import validate_cycle
|
||||
from lib.evaluate import evaluate_cycle
|
||||
from lib.evaluate import evaluate_cycle, kill_active_subprocesses
|
||||
|
||||
logger = logging.getLogger("pipeline")
|
||||
|
||||
# Global shutdown event — stages check this between iterations
|
||||
shutdown_event = asyncio.Event()
|
||||
|
||||
# Track active subprocesses for cleanup
|
||||
active_subprocesses: set = set()
|
||||
|
||||
|
||||
async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker):
|
||||
"""Generic stage loop with interval, shutdown check, and circuit breaker."""
|
||||
|
|
@ -84,16 +81,8 @@ def handle_signal(sig):
|
|||
|
||||
|
||||
async def kill_subprocesses():
|
||||
"""Kill any lingering Claude CLI subprocesses."""
|
||||
for proc in list(active_subprocesses):
|
||||
if proc.returncode is None:
|
||||
logger.warning("Killing lingering subprocess PID %d", proc.pid)
|
||||
try:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
active_subprocesses.clear()
|
||||
"""Kill any lingering Claude CLI subprocesses (delegates to evaluate module)."""
|
||||
await kill_active_subprocesses()
|
||||
|
||||
|
||||
async def cleanup_orphan_worktrees():
|
||||
|
|
|
|||
Loading…
Reference in a new issue