leo: implement retry budget — stop infinite eval loops

Schema migration v3: adds eval_attempts (INTEGER) and eval_issues (TEXT/JSON)
columns to prs table.

Retry budget logic (Ganymede-approved design):
- Increment eval_attempts on each evaluate_pr() call
- Hard cap: eval_attempts >= 3 → terminal (close PR, tag source needs_human)
- Attempt 1: normal — back to open, wait for fix
- Attempt 2: classify issues as mechanical/substantive
  - Mechanical only (schema, wiki links, dedup): keep open for one more try
  - Substantive (factual, confidence, scope, title): close PR, requeue source
- Issue tags parsed from reviewer comments, stored in eval_issues column
- SHA-based reset: new commits on PR branch → eval_attempts=0, verdicts reset
- Post-migration stagger: LIMIT 5 for first batch to avoid OpenRouter spike
- Cost recording updated: domain review → OpenRouter, Leo → tier-dependent

Stops the 32-PR infinite loop burning ~$0.03/cycle with no terminal state.

Pentagon-Agent: Leo <294C3CA1-0205-4668-82FA-B984D54F48AD>
This commit is contained in:
m3taversal 2026-03-13 17:14:12 +00:00
parent c0a6adf9ed
commit e7c902bac8
4 changed files with 185 additions and 18 deletions

View file

@ -91,6 +91,12 @@ BACKPRESSURE_THROTTLE_WORKERS = 2 # workers when throttled
TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits
SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes
SUBSTANTIVE_RETRY_DEEP = 3
MAX_EVAL_ATTEMPTS = 3 # Hard cap on eval cycles per PR before terminal
# Issue tags that can be fixed mechanically (Python fixer or Haiku)
MECHANICAL_ISSUE_TAGS = {"frontmatter_schema", "broken_wiki_links", "near_duplicate"}
# Issue tags that require re-extraction (substantive quality problems)
SUBSTANTIVE_ISSUE_TAGS = {"factual_discrepancy", "confidence_miscalibration", "scope_error", "title_overclaims"}
# --- Circuit breakers ---
BREAKER_THRESHOLD = 5

View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 2
SCHEMA_VERSION = 3
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -165,6 +165,18 @@ def migrate(conn: sqlite3.Connection):
pass # Column already exists (idempotent)
logger.info("Migration v2: added priority, origin, last_error to prs")
if current < 3:
# Phase 3: retry budget — track eval attempts and issue tags per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0",
"ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v3: added eval_attempts, eval_issues to prs")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -124,11 +124,115 @@ async def _post_formal_approvals(pr_number: int, pr_author: str):
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Retry budget helpers ─────────────────────────────────────────────────
async def _terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Close PR on Forgejo with explanation
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": f"**Closed by eval pipeline** — {reason}.\n\n"
f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source material will be re-queued for extraction with review feedback attached.\n\n"
f"See eval_issues for specific problems."},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
{"state": "closed"},
)
# Update PR status
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(reason, pr_number),
)
# Tag source for re-extraction with feedback
conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
db.audit(conn, "evaluate", "pr_terminated", json.dumps({
"pr": pr_number, "reason": reason,
}))
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
def _classify_issues(issues: list[str]) -> str:
"""Classify issue tags as 'mechanical', 'substantive', or 'mixed'."""
if not issues:
return "unknown"
mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS
substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS
if substantive and not mechanical:
return "substantive"
if mechanical and not substantive:
return "mechanical"
if mechanical and substantive:
return "mixed"
return "unknown" # tags not in either set
async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Attempt 1: normal back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
if eval_attempts < 2:
return # Attempt 1: normal retry
classification = _classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number, eval_attempts, all_issues,
)
db.audit(conn, "evaluate", "mechanical_retry", json.dumps({
"pr": pr_number, "attempt": eval_attempts, "issues": all_issues,
}))
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number, eval_attempts, classification, all_issues,
)
await _terminate_pr(conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}")
# ─── Single PR evaluation ─────────────────────────────────────────────────
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
# Check eval attempt budget before claiming
row = conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)
).fetchone()
eval_attempts = (row["eval_attempts"] or 0) if row else 0
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal — hard cap reached. Close PR, tag source.
logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS)
await _terminate_pr(conn, pr_number, "eval budget exhausted")
return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"}
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
@ -138,6 +242,13 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Increment eval_attempts
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?",
(pr_number,),
)
eval_attempts += 1
# Fetch diff
diff = await get_pr_diff(pr_number)
if not diff:
@ -228,17 +339,24 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
domain_issues = _parse_issues(domain_review) if domain_review else []
conn.execute(
"""UPDATE prs SET status = 'open', leo_verdict = 'skipped',
last_error = 'domain review requested changes'
last_error = 'domain review requested changes',
eval_issues = ?
WHERE number = ?""",
(pr_number,),
(json.dumps(domain_issues), pr_number),
)
db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent}))
return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"}
db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues}))
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped", "eval_attempts": eval_attempts}
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
leo_verdict = "skipped"
leo_review = None # Initialize — used later for issue extraction
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review = await run_leo_review(review_diff, files, tier)
@ -290,14 +408,19 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict)
else:
# Collect all issue tags from both reviews
all_issues = []
if domain_verdict == "request_changes" and domain_review is not None:
all_issues.extend(_parse_issues(domain_review))
if leo_verdict == "request_changes" and leo_review is not None:
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open' WHERE number = ?",
(pr_number,),
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_number),
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier}
if domain_verdict == "request_changes" and domain_review is not None:
feedback["domain_issues"] = _parse_issues(domain_review)
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_number),
@ -306,16 +429,22 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
conn,
"evaluate",
"changes_requested",
json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict}),
json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}),
)
logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)", pr_number, leo_verdict, domain_verdict)
logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", pr_number, leo_verdict, domain_verdict, all_issues)
# Record cost (domain review)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
# Record cost (domain review on OpenRouter, Leo depends on tier)
from . import costs
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max")
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter")
if tier != "LIGHT":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
if tier == "DEEP":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
else:
costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter")
return {
"pr": pr_number,
@ -370,11 +499,21 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
else:
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
# Stagger first pass after migration: if there are previously-rejected PRs
# with eval_attempts=0 (freshly migrated), limit batch to avoid OpenRouter spike.
migrated_count = conn.execute(
"""SELECT COUNT(*) as c FROM prs
WHERE status = 'open' AND eval_attempts = 0
AND (domain_verdict NOT IN ('pending') OR leo_verdict NOT IN ('pending'))"""
).fetchone()["c"]
stagger_limit = 5 if migrated_count > 5 else None
rows = conn.execute(
f"""SELECT p.number, p.tier FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS}
{verdict_filter}
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
@ -388,9 +527,12 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
END,
p.created_at ASC
LIMIT ?""",
(max_workers or config.MAX_EVAL_WORKERS,),
(stagger_limit or max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if stagger_limit and rows:
logger.info("Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count)
if not rows:
return 0, 0

View file

@ -474,9 +474,16 @@ async def validate_pr(conn, pr_number: int) -> dict:
# Post comment
await _post_validation_comment(pr_number, results, head_sha)
# Update PR record
# Update PR record — reset eval state on new commits (unconditional SHA reset).
# New commit = new code to evaluate. Reset eval_attempts, verdicts, and issues
# so the PR gets a fresh evaluation cycle. Cost: 1 extra eval ($0.03) if the
# commit was a no-op. Cheaper than parsing commit messages. (Ganymede Q2)
conn.execute(
"UPDATE prs SET tier0_pass = ? WHERE number = ?",
"""UPDATE prs SET tier0_pass = ?,
eval_attempts = 0, eval_issues = '[]',
domain_verdict = 'pending', leo_verdict = 'pending',
last_error = NULL
WHERE number = ?""",
(1 if all_pass else 0, pr_number),
)
db.audit(