diff --git a/batch-extract-50.sh b/batch-extract-50.sh index 58347ba..924403c 100755 --- a/batch-extract-50.sh +++ b/batch-extract-50.sh @@ -5,7 +5,8 @@ # Skip logic uses two checks instead of local marker files (Ganymede v3 review): # Gate 1: Is source already in archive/{domain}/? → already processed, dedup # Gate 2: Does extraction branch exist on Forgejo? → extraction in progress -# Neither → extract +# Gate 3: Does pipeline.db show ≥3 closed PRs for this source? → zombie, skip +# All gates pass → extract # # Architecture: Ganymede (two-gate) + Rhea (separate worktrees) @@ -14,9 +15,11 @@ MAIN_REPO=/opt/teleo-eval/workspaces/main EXTRACT=/opt/teleo-eval/openrouter-extract-v2.py CLEANUP=/opt/teleo-eval/post-extract-cleanup.py LOG=/opt/teleo-eval/logs/batch-extract-50.log +DB=/opt/teleo-eval/pipeline/pipeline.db TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-leo-token) FORGEJO_URL="http://localhost:3000" MAX=50 +MAX_CLOSED=3 # zombie retry limit: skip source after this many closed PRs COUNT=0 SUCCESS=0 FAILED=0 @@ -171,6 +174,17 @@ print(matches[0]['number'] if matches else '') fi fi + # Gate 3: Check pipeline.db for zombie sources — too many closed PRs means + # the source keeps failing eval. Skip after MAX_CLOSED rejections. (Epimetheus) + if [ -f "$DB" ]; then + CLOSED_COUNT=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status = 'closed'" 2>/dev/null || echo 0) + if [ "$CLOSED_COUNT" -ge "$MAX_CLOSED" ]; then + echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (zombie: $CLOSED_COUNT closed PRs >= $MAX_CLOSED limit)" >> $LOG + SKIPPED=$((SKIPPED + 1)) + continue + fi + fi + echo "[$(date)] [$COUNT/$MAX] Processing $BASENAME" >> $LOG # Reset to main (log errors — don't swallow) diff --git a/lib/evaluate.py b/lib/evaluate.py index ec973c8..3636afb 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -580,7 +580,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: json.dumps({"pr": pr_number, "tier": tier}), ) else: - tier = await triage_pr(diff) + tier, triage_usage = await triage_pr(diff) + # Record triage cost + from . import costs + costs.record_usage( + conn, config.TRIAGE_MODEL, "eval_triage", + input_tokens=triage_usage.get("prompt_tokens", 0), + output_tokens=triage_usage.get("completion_tokens", 0), + backend="openrouter", + ) # Tier overrides (claim-shape detector + random promotion) # Order matters: claim-shape catches obvious cases, random promotion catches the rest. @@ -618,6 +626,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback) # Skip if already completed from a previous attempt domain_review = None # Initialize — used later for feedback extraction (Ganymede #12) + domain_usage = {"prompt_tokens": 0, "completion_tokens": 0} + leo_usage = {"prompt_tokens": 0, "completion_tokens": 0} if tier == "LIGHT" and config.LIGHT_SKIP_LLM: domain_verdict = "skipped" logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number) @@ -630,7 +640,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) else: logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier) - domain_review = await run_domain_review(review_diff, files, domain or "general", agent) + domain_review, domain_usage = await run_domain_review(review_diff, files, domain or "general", agent) if domain_review is None: # OpenRouter failure (timeout, error) — revert to open for retry. @@ -683,7 +693,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: leo_review = None # Initialize — used later for issue extraction if tier != "LIGHT": logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier) - leo_review = await run_leo_review(review_diff, files, tier) + leo_review, leo_usage = await run_leo_review(review_diff, files, tier) if leo_review is None: # DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle). @@ -773,12 +783,22 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: from . import costs if domain_verdict != "skipped": - costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter") + costs.record_usage( + conn, config.EVAL_DOMAIN_MODEL, "eval_domain", + input_tokens=domain_usage.get("prompt_tokens", 0), + output_tokens=domain_usage.get("completion_tokens", 0), + backend="openrouter", + ) if leo_verdict not in ("skipped",): if tier == "DEEP": costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") else: - costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter") + costs.record_usage( + conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", + input_tokens=leo_usage.get("prompt_tokens", 0), + output_tokens=leo_usage.get("completion_tokens", 0), + backend="openrouter", + ) return { "pr": pr_number, @@ -1056,7 +1076,7 @@ async def _run_batch_domain_eval( domain, ", ".join(f"#{p['number']}" for p in pr_diffs), ) - batch_response = await run_batch_domain_review(pr_diffs, domain, agent) + batch_response, batch_domain_usage = await run_batch_domain_review(pr_diffs, domain, agent) if batch_response is None: # Complete failure — revert all to open @@ -1079,6 +1099,15 @@ async def _run_batch_domain_eval( }), ) + # Record batch domain review cost ONCE for the whole batch (not per-PR) + from . import costs + costs.record_usage( + conn, config.EVAL_DOMAIN_MODEL, "eval_domain", + input_tokens=batch_domain_usage.get("prompt_tokens", 0), + output_tokens=batch_domain_usage.get("completion_tokens", 0), + backend="openrouter", + ) + # Step 4: Process valid reviews — post comments + continue to Leo for pr_data in pr_diffs: pr_num = pr_data["number"] @@ -1115,10 +1144,6 @@ async def _run_batch_domain_eval( (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num), ) - # Record cost - from . import costs - costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter") - # If domain rejects, handle disposition (same as individual path) if domain_verdict == "request_changes": domain_issues = _parse_issues(review_text) @@ -1145,7 +1170,7 @@ async def _run_batch_domain_eval( review_diff = pr_data["diff"] files = pr_data["files"] - leo_review = await run_leo_review(review_diff, files, "STANDARD") + leo_review, leo_usage = await run_leo_review(review_diff, files, "STANDARD") if leo_review is None: conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) @@ -1169,7 +1194,12 @@ async def _run_batch_domain_eval( token=leo_tok, ) - costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter") + costs.record_usage( + conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", + input_tokens=leo_usage.get("prompt_tokens", 0), + output_tokens=leo_usage.get("completion_tokens", 0), + backend="openrouter", + ) # Final verdict both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped") diff --git a/lib/llm.py b/lib/llm.py index dc3a09d..ed38300 100644 --- a/lib/llm.py +++ b/lib/llm.py @@ -224,12 +224,16 @@ where NUMBER is the PR number shown in the section header.""" async def openrouter_call( model: str, prompt: str, timeout_sec: int = 120, max_tokens: int = 4096, -) -> str | None: - """Call OpenRouter API. Returns response text or None on failure.""" +) -> tuple[str | None, dict]: + """Call OpenRouter API. Returns (response_text, usage_dict). + + usage_dict has keys: prompt_tokens, completion_tokens (0 on failure). + """ + empty_usage = {"prompt_tokens": 0, "completion_tokens": 0} key_file = config.SECRETS_DIR / "openrouter-key" if not key_file.exists(): logger.error("OpenRouter key file not found") - return None + return None, empty_usage key = key_file.read_text().strip() payload = { @@ -250,12 +254,14 @@ async def openrouter_call( if resp.status >= 400: text = await resp.text() logger.error("OpenRouter %s → %d: %s", model, resp.status, text[:200]) - return None + return None, empty_usage data = await resp.json() - return data.get("choices", [{}])[0].get("message", {}).get("content") + usage = data.get("usage", empty_usage) + content = data.get("choices", [{}])[0].get("message", {}).get("content") + return content, usage except Exception as e: logger.error("OpenRouter error: %s → %s", model, e) - return None + return None, empty_usage async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: @@ -305,31 +311,31 @@ async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: # ─── Review execution ───────────────────────────────────────────────────── -async def triage_pr(diff: str) -> str: - """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" +async def triage_pr(diff: str) -> tuple[str, dict]: + """Triage PR via Haiku → (tier, usage). tier is DEEP/STANDARD/LIGHT.""" prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage - result = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) + result, usage = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) if not result: logger.warning("Triage failed, defaulting to STANDARD") - return "STANDARD" + return "STANDARD", usage tier = result.split("\n")[0].strip().upper() if tier in ("DEEP", "STANDARD", "LIGHT"): reason = result.split("\n")[1].strip() if "\n" in result else "" logger.info("Triage: %s — %s", tier, reason[:100]) - return tier + return tier, usage logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20]) - return "STANDARD" + return "STANDARD", usage async def run_batch_domain_review( pr_diffs: list[dict], domain: str, agent: str, -) -> str | None: +) -> tuple[str | None, dict]: """Run batched domain review for multiple PRs in one LLM call. pr_diffs: list of {"number": int, "label": str, "diff": str, "files": str} - Returns raw response text or None on failure. + Returns (raw_response_text, usage) or (None, usage) on failure. """ # Build per-PR sections with anchoring labels sections = [] @@ -351,18 +357,19 @@ async def run_batch_domain_review( # Scale max_tokens with batch size: ~3K tokens per PR review max_tokens = min(3000 * len(pr_diffs), 16384) - result = await openrouter_call( + result, usage = await openrouter_call( config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT, max_tokens=max_tokens, ) - return result + return result, usage -async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: - """Run domain review via OpenRouter GPT-4o. +async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> tuple[str | None, dict]: + """Run domain review via OpenRouter. Decoupled from Claude Max to avoid account-level rate limits blocking domain reviews. Different model lineage also reduces correlated blind spots. + Returns (review_text, usage). """ prompt = DOMAIN_PROMPT.format( agent=agent, @@ -373,16 +380,17 @@ async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> s files=files, ) - result = await openrouter_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - return result + result, usage = await openrouter_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + return result, usage -async def run_leo_review(diff: str, files: str, tier: str) -> str | None: +async def run_leo_review(diff: str, files: str, tier: str) -> tuple[str | None, dict]: """Run Leo review. DEEP → Opus (Claude Max, queue if limited). STANDARD → GPT-4o (OpenRouter). Opus is scarce — reserved for DEEP eval and overnight research sessions. STANDARD goes straight to GPT-4o. Domain review is the primary gate; Leo review is a quality check that doesn't need Opus for routine claims. + Returns (review_text, usage). """ prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) @@ -397,11 +405,11 @@ async def run_leo_review(diff: str, files: str, tier: str) -> str | None: # result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS) # if result == "RATE_LIMITED" or result is None: # logger.info("Opus unavailable for DEEP Leo review — overflowing to Sonnet") - # result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS) - # return result - result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - return result + # result, usage = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS) + # return result, usage + result, usage = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + return result, usage else: # STANDARD/LIGHT: Sonnet via OpenRouter — 120s timeout (routine calls) - result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - return result + result, usage = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + return result, usage diff --git a/lib/substantive_fixer.py b/lib/substantive_fixer.py index 987b538..386b6bc 100644 --- a/lib/substantive_fixer.py +++ b/lib/substantive_fixer.py @@ -333,7 +333,7 @@ async def _fix_pr(conn, pr_number: int) -> dict: fixed_any = False for filepath, content in claim_files.items(): prompt = _build_fix_prompt(content, review_text, issues, source_content, domain_index) - result = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096) + result, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096) if not result: logger.warning("PR #%d: fix LLM call failed for %s", pr_number, filepath) @@ -377,7 +377,7 @@ async def _fix_pr(conn, pr_number: int) -> dict: # Write fixed files for filepath, content in claim_files.items(): prompt = _build_fix_prompt(content, review_text, issues, source_content, domain_index) - fixed_content = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096) + fixed_content, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096) if fixed_content and not fixed_content.strip().startswith("{"): full_path = Path(worktree_path) / filepath full_path.parent.mkdir(parents=True, exist_ok=True) @@ -524,7 +524,7 @@ async def _flag_for_leo_review( # Use LLM to identify candidate matches if domain_index: prompt = _build_fix_prompt(first_claim, review_text, ["near_duplicate"], None, domain_index) - result = await openrouter_call(FIX_MODEL, prompt, timeout_sec=60, max_tokens=1024) + result, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=60, max_tokens=1024) candidates_text = result or "Could not identify candidates." else: candidates_text = "No domain index available."