diff --git a/lib/config.py b/lib/config.py index 0e8e2d2..96c66d6 100644 --- a/lib/config.py +++ b/lib/config.py @@ -91,6 +91,12 @@ BACKPRESSURE_THROTTLE_WORKERS = 2 # workers when throttled TRANSIENT_RETRY_MAX = 5 # API timeouts, rate limits SUBSTANTIVE_RETRY_STANDARD = 2 # reviewer request_changes SUBSTANTIVE_RETRY_DEEP = 3 +MAX_EVAL_ATTEMPTS = 3 # Hard cap on eval cycles per PR before terminal + +# Issue tags that can be fixed mechanically (Python fixer or Haiku) +MECHANICAL_ISSUE_TAGS = {"frontmatter_schema", "broken_wiki_links", "near_duplicate"} +# Issue tags that require re-extraction (substantive quality problems) +SUBSTANTIVE_ISSUE_TAGS = {"factual_discrepancy", "confidence_miscalibration", "scope_error", "title_overclaims"} # --- Circuit breakers --- BREAKER_THRESHOLD = 5 diff --git a/lib/db.py b/lib/db.py index 9828a4c..e36344f 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 2 +SCHEMA_VERSION = 3 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -165,6 +165,18 @@ def migrate(conn: sqlite3.Connection): pass # Column already exists (idempotent) logger.info("Migration v2: added priority, origin, last_error to prs") + if current < 3: + # Phase 3: retry budget — track eval attempts and issue tags per PR + for stmt in [ + "ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0", + "ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'", + ]: + try: + conn.execute(stmt) + except sqlite3.OperationalError: + pass # Column already exists (idempotent) + logger.info("Migration v3: added eval_attempts, eval_issues to prs") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", diff --git a/lib/evaluate.py b/lib/evaluate.py index 2957efd..86a1f4c 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -124,11 +124,115 @@ async def _post_formal_approvals(pr_number: int, pr_author: str): logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) +# ─── Retry budget helpers ───────────────────────────────────────────────── + + +async def _terminate_pr(conn, pr_number: int, reason: str): + """Terminal state: close PR on Forgejo, mark source needs_human.""" + # Close PR on Forgejo with explanation + await forgejo_api( + "POST", + repo_path(f"issues/{pr_number}/comments"), + {"body": f"**Closed by eval pipeline** — {reason}.\n\n" + f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " + f"Source material will be re-queued for extraction with review feedback attached.\n\n" + f"See eval_issues for specific problems."}, + ) + await forgejo_api( + "PATCH", + repo_path(f"pulls/{pr_number}"), + {"state": "closed"}, + ) + + # Update PR status + conn.execute( + "UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?", + (reason, pr_number), + ) + + # Tag source for re-extraction with feedback + conn.execute( + """UPDATE sources SET status = 'needs_reextraction', + updated_at = datetime('now') + WHERE path = (SELECT source_path FROM prs WHERE number = ?)""", + (pr_number,), + ) + + db.audit(conn, "evaluate", "pr_terminated", json.dumps({ + "pr": pr_number, "reason": reason, + })) + logger.info("PR #%d: TERMINATED — %s", pr_number, reason) + + +def _classify_issues(issues: list[str]) -> str: + """Classify issue tags as 'mechanical', 'substantive', or 'mixed'.""" + if not issues: + return "unknown" + mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS + substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS + if substantive and not mechanical: + return "substantive" + if mechanical and not substantive: + return "mechanical" + if mechanical and substantive: + return "mixed" + return "unknown" # tags not in either set + + +async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]): + """Disposition logic for rejected PRs on attempt 2+. + + Attempt 1: normal — back to open, wait for fix. + Attempt 2: check issue classification. + - Mechanical only: keep open for one more attempt (auto-fix future). + - Substantive or mixed: close PR, requeue source. + Attempt 3+: terminal. + """ + if eval_attempts < 2: + return # Attempt 1: normal retry + + classification = _classify_issues(all_issues) + + if eval_attempts >= config.MAX_EVAL_ATTEMPTS: + # Terminal + await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts") + return + + if classification == "mechanical": + # Mechanical issues only — keep open for one more attempt. + # Future: auto-fix module will push fixes here. + logger.info( + "PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt", + pr_number, eval_attempts, all_issues, + ) + db.audit(conn, "evaluate", "mechanical_retry", json.dumps({ + "pr": pr_number, "attempt": eval_attempts, "issues": all_issues, + })) + else: + # Substantive, mixed, or unknown — close and requeue + logger.info( + "PR #%d: attempt %d, %s issues (%s) — closing and requeuing source", + pr_number, eval_attempts, classification, all_issues, + ) + await _terminate_pr(conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}") + + # ─── Single PR evaluation ───────────────────────────────────────────────── async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" + # Check eval attempt budget before claiming + row = conn.execute( + "SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,) + ).fetchone() + eval_attempts = (row["eval_attempts"] or 0) if row else 0 + if eval_attempts >= config.MAX_EVAL_ATTEMPTS: + # Terminal — hard cap reached. Close PR, tag source. + logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS) + await _terminate_pr(conn, pr_number, "eval budget exhausted") + return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"} + # Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11) cursor = conn.execute( "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", @@ -138,6 +242,13 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: logger.debug("PR #%d already claimed by another worker, skipping", pr_number) return {"pr": pr_number, "skipped": True, "reason": "already_claimed"} + # Increment eval_attempts + conn.execute( + "UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?", + (pr_number,), + ) + eval_attempts += 1 + # Fetch diff diff = await get_pr_diff(pr_number) if not diff: @@ -228,17 +339,24 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # If domain review rejects, skip Leo review (save Opus) if domain_verdict == "request_changes": logger.info("PR #%d: domain rejected, skipping Leo review", pr_number) + domain_issues = _parse_issues(domain_review) if domain_review else [] conn.execute( """UPDATE prs SET status = 'open', leo_verdict = 'skipped', - last_error = 'domain review requested changes' + last_error = 'domain review requested changes', + eval_issues = ? WHERE number = ?""", - (pr_number,), + (json.dumps(domain_issues), pr_number), ) - db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent})) - return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"} + db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues})) + + # Disposition: check if this PR should be terminated or kept open + await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) + + return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped", "eval_attempts": eval_attempts} # Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT) leo_verdict = "skipped" + leo_review = None # Initialize — used later for issue extraction if tier != "LIGHT": logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier) leo_review = await run_leo_review(review_diff, files, tier) @@ -290,14 +408,19 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict) else: + # Collect all issue tags from both reviews + all_issues = [] + if domain_verdict == "request_changes" and domain_review is not None: + all_issues.extend(_parse_issues(domain_review)) + if leo_verdict == "request_changes" and leo_review is not None: + all_issues.extend(_parse_issues(leo_review)) + conn.execute( - "UPDATE prs SET status = 'open' WHERE number = ?", - (pr_number,), + "UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?", + (json.dumps(all_issues), pr_number), ) # Store feedback for re-extraction path - feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier} - if domain_verdict == "request_changes" and domain_review is not None: - feedback["domain_issues"] = _parse_issues(domain_review) + feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues} conn.execute( "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", (json.dumps(feedback), pr_number), @@ -306,16 +429,22 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: conn, "evaluate", "changes_requested", - json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict}), + json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}), ) - logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)", pr_number, leo_verdict, domain_verdict) + logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", pr_number, leo_verdict, domain_verdict, all_issues) - # Record cost (domain review) + # Disposition: check if this PR should be terminated or kept open + await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues) + + # Record cost (domain review on OpenRouter, Leo depends on tier) from . import costs - costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max") + costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter") if tier != "LIGHT": - costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") + if tier == "DEEP": + costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") + else: + costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter") return { "pr": pr_number, @@ -370,11 +499,21 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: else: verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')" + # Stagger first pass after migration: if there are previously-rejected PRs + # with eval_attempts=0 (freshly migrated), limit batch to avoid OpenRouter spike. + migrated_count = conn.execute( + """SELECT COUNT(*) as c FROM prs + WHERE status = 'open' AND eval_attempts = 0 + AND (domain_verdict NOT IN ('pending') OR leo_verdict NOT IN ('pending'))""" + ).fetchone()["c"] + stagger_limit = 5 if migrated_count > 5 else None + rows = conn.execute( f"""SELECT p.number, p.tier FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass = 1 + AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS} {verdict_filter} AND (p.last_attempt IS NULL OR p.last_attempt < datetime('now', '-10 minutes')) @@ -388,9 +527,12 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: END, p.created_at ASC LIMIT ?""", - (max_workers or config.MAX_EVAL_WORKERS,), + (stagger_limit or max_workers or config.MAX_EVAL_WORKERS,), ).fetchall() + if stagger_limit and rows: + logger.info("Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count) + if not rows: return 0, 0 diff --git a/lib/validate.py b/lib/validate.py index afccaaf..7cd8d8d 100644 --- a/lib/validate.py +++ b/lib/validate.py @@ -474,9 +474,16 @@ async def validate_pr(conn, pr_number: int) -> dict: # Post comment await _post_validation_comment(pr_number, results, head_sha) - # Update PR record + # Update PR record — reset eval state on new commits (unconditional SHA reset). + # New commit = new code to evaluate. Reset eval_attempts, verdicts, and issues + # so the PR gets a fresh evaluation cycle. Cost: 1 extra eval ($0.03) if the + # commit was a no-op. Cheaper than parsing commit messages. (Ganymede Q2) conn.execute( - "UPDATE prs SET tier0_pass = ? WHERE number = ?", + """UPDATE prs SET tier0_pass = ?, + eval_attempts = 0, eval_issues = '[]', + domain_verdict = 'pending', leo_verdict = 'pending', + last_error = NULL + WHERE number = ?""", (1 if all_pass else 0, pr_number), ) db.audit(