fix: zombie retry loop + cost tracking

Gate 3 in batch-extract-50.sh: query pipeline.db for closed PRs before
re-extracting. Sources with >=3 closed PRs are skipped (zombie protection).

Cost tracking: openrouter_call() now returns (text, usage) tuple with
prompt_tokens and completion_tokens from the OpenRouter API response.
All callers updated to unpack and pass tokens to costs.record_usage().
Added missing triage cost recording. Fixed batch domain review recording
cost once per batch instead of once per PR.

Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-03-28 11:29:58 +00:00
parent 89692fda2d
commit 0457c49094
4 changed files with 95 additions and 43 deletions

View file

@ -5,7 +5,8 @@
# Skip logic uses two checks instead of local marker files (Ganymede v3 review):
# Gate 1: Is source already in archive/{domain}/? → already processed, dedup
# Gate 2: Does extraction branch exist on Forgejo? → extraction in progress
# Neither → extract
# Gate 3: Does pipeline.db show ≥3 closed PRs for this source? → zombie, skip
# All gates pass → extract
#
# Architecture: Ganymede (two-gate) + Rhea (separate worktrees)
@ -14,9 +15,11 @@ MAIN_REPO=/opt/teleo-eval/workspaces/main
EXTRACT=/opt/teleo-eval/openrouter-extract-v2.py
CLEANUP=/opt/teleo-eval/post-extract-cleanup.py
LOG=/opt/teleo-eval/logs/batch-extract-50.log
DB=/opt/teleo-eval/pipeline/pipeline.db
TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-leo-token)
FORGEJO_URL="http://localhost:3000"
MAX=50
MAX_CLOSED=3 # zombie retry limit: skip source after this many closed PRs
COUNT=0
SUCCESS=0
FAILED=0
@ -171,6 +174,17 @@ print(matches[0]['number'] if matches else '')
fi
fi
# Gate 3: Check pipeline.db for zombie sources — too many closed PRs means
# the source keeps failing eval. Skip after MAX_CLOSED rejections. (Epimetheus)
if [ -f "$DB" ]; then
CLOSED_COUNT=$(sqlite3 "$DB" "SELECT COUNT(*) FROM prs WHERE branch = 'extract/$BASENAME' AND status = 'closed'" 2>/dev/null || echo 0)
if [ "$CLOSED_COUNT" -ge "$MAX_CLOSED" ]; then
echo "[$(date)] [$COUNT/$MAX] SKIP $BASENAME (zombie: $CLOSED_COUNT closed PRs >= $MAX_CLOSED limit)" >> $LOG
SKIPPED=$((SKIPPED + 1))
continue
fi
fi
echo "[$(date)] [$COUNT/$MAX] Processing $BASENAME" >> $LOG
# Reset to main (log errors — don't swallow)

View file

@ -580,7 +580,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
json.dumps({"pr": pr_number, "tier": tier}),
)
else:
tier = await triage_pr(diff)
tier, triage_usage = await triage_pr(diff)
# Record triage cost
from . import costs
costs.record_usage(
conn, config.TRIAGE_MODEL, "eval_triage",
input_tokens=triage_usage.get("prompt_tokens", 0),
output_tokens=triage_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Tier overrides (claim-shape detector + random promotion)
# Order matters: claim-shape catches obvious cases, random promotion catches the rest.
@ -618,6 +626,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback)
# Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
domain_usage = {"prompt_tokens": 0, "completion_tokens": 0}
leo_usage = {"prompt_tokens": 0, "completion_tokens": 0}
if tier == "LIGHT" and config.LIGHT_SKIP_LLM:
domain_verdict = "skipped"
logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number)
@ -630,7 +640,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review = await run_domain_review(review_diff, files, domain or "general", agent)
domain_review, domain_usage = await run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# OpenRouter failure (timeout, error) — revert to open for retry.
@ -683,7 +693,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
leo_review = None # Initialize — used later for issue extraction
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review = await run_leo_review(review_diff, files, tier)
leo_review, leo_usage = await run_leo_review(review_diff, files, tier)
if leo_review is None:
# DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle).
@ -773,12 +783,22 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
from . import costs
if domain_verdict != "skipped":
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter")
costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if leo_verdict not in ("skipped",):
if tier == "DEEP":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
else:
costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter")
costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
return {
"pr": pr_number,
@ -1056,7 +1076,7 @@ async def _run_batch_domain_eval(
domain,
", ".join(f"#{p['number']}" for p in pr_diffs),
)
batch_response = await run_batch_domain_review(pr_diffs, domain, agent)
batch_response, batch_domain_usage = await run_batch_domain_review(pr_diffs, domain, agent)
if batch_response is None:
# Complete failure — revert all to open
@ -1079,6 +1099,15 @@ async def _run_batch_domain_eval(
}),
)
# Record batch domain review cost ONCE for the whole batch (not per-PR)
from . import costs
costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=batch_domain_usage.get("prompt_tokens", 0),
output_tokens=batch_domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Step 4: Process valid reviews — post comments + continue to Leo
for pr_data in pr_diffs:
pr_num = pr_data["number"]
@ -1115,10 +1144,6 @@ async def _run_batch_domain_eval(
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num),
)
# Record cost
from . import costs
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter")
# If domain rejects, handle disposition (same as individual path)
if domain_verdict == "request_changes":
domain_issues = _parse_issues(review_text)
@ -1145,7 +1170,7 @@ async def _run_batch_domain_eval(
review_diff = pr_data["diff"]
files = pr_data["files"]
leo_review = await run_leo_review(review_diff, files, "STANDARD")
leo_review, leo_usage = await run_leo_review(review_diff, files, "STANDARD")
if leo_review is None:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
@ -1169,7 +1194,12 @@ async def _run_batch_domain_eval(
token=leo_tok,
)
costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter")
costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Final verdict
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")

View file

@ -224,12 +224,16 @@ where NUMBER is the PR number shown in the section header."""
async def openrouter_call(
model: str, prompt: str, timeout_sec: int = 120, max_tokens: int = 4096,
) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
) -> tuple[str | None, dict]:
"""Call OpenRouter API. Returns (response_text, usage_dict).
usage_dict has keys: prompt_tokens, completion_tokens (0 on failure).
"""
empty_usage = {"prompt_tokens": 0, "completion_tokens": 0}
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
logger.error("OpenRouter key file not found")
return None
return None, empty_usage
key = key_file.read_text().strip()
payload = {
@ -250,12 +254,14 @@ async def openrouter_call(
if resp.status >= 400:
text = await resp.text()
logger.error("OpenRouter %s%d: %s", model, resp.status, text[:200])
return None
return None, empty_usage
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
usage = data.get("usage", empty_usage)
content = data.get("choices", [{}])[0].get("message", {}).get("content")
return content, usage
except Exception as e:
logger.error("OpenRouter error: %s%s", model, e)
return None
return None, empty_usage
async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None:
@ -305,31 +311,31 @@ async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd:
# ─── Review execution ─────────────────────────────────────────────────────
async def triage_pr(diff: str) -> str:
"""Triage PR via Haiku → DEEP/STANDARD/LIGHT."""
async def triage_pr(diff: str) -> tuple[str, dict]:
"""Triage PR via Haiku → (tier, usage). tier is DEEP/STANDARD/LIGHT."""
prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage
result = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
result, usage = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
if not result:
logger.warning("Triage failed, defaulting to STANDARD")
return "STANDARD"
return "STANDARD", usage
tier = result.split("\n")[0].strip().upper()
if tier in ("DEEP", "STANDARD", "LIGHT"):
reason = result.split("\n")[1].strip() if "\n" in result else ""
logger.info("Triage: %s%s", tier, reason[:100])
return tier
return tier, usage
logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20])
return "STANDARD"
return "STANDARD", usage
async def run_batch_domain_review(
pr_diffs: list[dict], domain: str, agent: str,
) -> str | None:
) -> tuple[str | None, dict]:
"""Run batched domain review for multiple PRs in one LLM call.
pr_diffs: list of {"number": int, "label": str, "diff": str, "files": str}
Returns raw response text or None on failure.
Returns (raw_response_text, usage) or (None, usage) on failure.
"""
# Build per-PR sections with anchoring labels
sections = []
@ -351,18 +357,19 @@ async def run_batch_domain_review(
# Scale max_tokens with batch size: ~3K tokens per PR review
max_tokens = min(3000 * len(pr_diffs), 16384)
result = await openrouter_call(
result, usage = await openrouter_call(
config.EVAL_DOMAIN_MODEL, prompt,
timeout_sec=config.EVAL_TIMEOUT, max_tokens=max_tokens,
)
return result
return result, usage
async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review via OpenRouter GPT-4o.
async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> tuple[str | None, dict]:
"""Run domain review via OpenRouter.
Decoupled from Claude Max to avoid account-level rate limits blocking
domain reviews. Different model lineage also reduces correlated blind spots.
Returns (review_text, usage).
"""
prompt = DOMAIN_PROMPT.format(
agent=agent,
@ -373,16 +380,17 @@ async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> s
files=files,
)
result = await openrouter_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result
result, usage = await openrouter_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result, usage
async def run_leo_review(diff: str, files: str, tier: str) -> str | None:
async def run_leo_review(diff: str, files: str, tier: str) -> tuple[str | None, dict]:
"""Run Leo review. DEEP → Opus (Claude Max, queue if limited). STANDARD → GPT-4o (OpenRouter).
Opus is scarce reserved for DEEP eval and overnight research sessions.
STANDARD goes straight to GPT-4o. Domain review is the primary gate;
Leo review is a quality check that doesn't need Opus for routine claims.
Returns (review_text, usage).
"""
prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
@ -397,11 +405,11 @@ async def run_leo_review(diff: str, files: str, tier: str) -> str | None:
# result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS)
# if result == "RATE_LIMITED" or result is None:
# logger.info("Opus unavailable for DEEP Leo review — overflowing to Sonnet")
# result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS)
# return result
result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result
# result, usage = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT_OPUS)
# return result, usage
result, usage = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result, usage
else:
# STANDARD/LIGHT: Sonnet via OpenRouter — 120s timeout (routine calls)
result = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result
result, usage = await openrouter_call(config.EVAL_LEO_STANDARD_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
return result, usage

View file

@ -333,7 +333,7 @@ async def _fix_pr(conn, pr_number: int) -> dict:
fixed_any = False
for filepath, content in claim_files.items():
prompt = _build_fix_prompt(content, review_text, issues, source_content, domain_index)
result = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096)
result, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096)
if not result:
logger.warning("PR #%d: fix LLM call failed for %s", pr_number, filepath)
@ -377,7 +377,7 @@ async def _fix_pr(conn, pr_number: int) -> dict:
# Write fixed files
for filepath, content in claim_files.items():
prompt = _build_fix_prompt(content, review_text, issues, source_content, domain_index)
fixed_content = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096)
fixed_content, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=120, max_tokens=4096)
if fixed_content and not fixed_content.strip().startswith("{"):
full_path = Path(worktree_path) / filepath
full_path.parent.mkdir(parents=True, exist_ok=True)
@ -524,7 +524,7 @@ async def _flag_for_leo_review(
# Use LLM to identify candidate matches
if domain_index:
prompt = _build_fix_prompt(first_claim, review_text, ["near_duplicate"], None, domain_index)
result = await openrouter_call(FIX_MODEL, prompt, timeout_sec=60, max_tokens=1024)
result, _usage = await openrouter_call(FIX_MODEL, prompt, timeout_sec=60, max_tokens=1024)
candidates_text = result or "Could not identify candidates."
else:
candidates_text = "No domain index available."