diff --git a/lib/fixer.py b/lib/fixer.py index c08f186..822afae 100644 --- a/lib/fixer.py +++ b/lib/fixer.py @@ -22,6 +22,7 @@ import logging from pathlib import Path from . import config, db +from .pr_state import close_pr, reset_for_reeval, start_fixing from .validate import WIKI_LINK_RE, load_existing_claims logger = logging.getLogger("pipeline.fixer") @@ -62,19 +63,9 @@ async def _fix_wiki_links_in_pr(conn, pr_number: int) -> dict: between new claims in the same PR are preserved. """ # Atomic claim — prevent concurrent fixers and evaluators - cursor = conn.execute( - "UPDATE prs SET status = 'fixing', last_attempt = datetime('now') WHERE number = ? AND status = 'open'", - (pr_number,), - ) - if cursor.rowcount == 0: + if not start_fixing(conn, pr_number): return {"pr": pr_number, "skipped": True, "reason": "not_open"} - # Increment fix_attempts - conn.execute( - "UPDATE prs SET fix_attempts = COALESCE(fix_attempts, 0) + 1 WHERE number = ?", - (pr_number,), - ) - # Get PR branch from DB first, fall back to Forgejo API row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone() branch = row["branch"] if row and row["branch"] else None @@ -177,18 +168,7 @@ async def _fix_wiki_links_in_pr(conn, pr_number: int) -> dict: # Reset eval state BEFORE push — if daemon crashes between push and # reset, the PR would be permanently stuck at max eval_attempts. # Reset-first: worst case is one wasted eval cycle on old content. - conn.execute( - """UPDATE prs SET - status = 'open', - eval_attempts = 0, - eval_issues = '[]', - tier0_pass = NULL, - domain_verdict = 'pending', - leo_verdict = 'pending', - last_error = NULL - WHERE number = ?""", - (pr_number,), - ) + reset_for_reeval(conn, pr_number) rc, out = await _git("push", "origin", branch, cwd=worktree_path, timeout=30) if rc != 0: @@ -242,15 +222,11 @@ async def fix_cycle(conn, max_workers=None) -> tuple[int, int]: try: await _gc_forgejo("POST", _gc_repo_path(f"issues/{pr_num}/comments"), {"body": "Auto-closed: fix budget exhausted. Source will be re-extracted."}) - await _gc_forgejo("PATCH", _gc_repo_path(f"pulls/{pr_num}"), {"state": "closed"}) + await close_pr(conn, pr_num, last_error='fix budget exhausted — auto-closed') if branch: await _gc_forgejo("DELETE", _gc_repo_path(f"branches/{branch}")) except Exception as e: logger.warning("GC: failed to close PR #%d on Forgejo: %s", pr_num, e) - conn.execute( - "UPDATE prs SET status = 'closed', last_error = 'fix budget exhausted — auto-closed' WHERE number = ?", - (pr_num,), - ) logger.info("GC: closed %d exhausted PRs (DB + Forgejo + branch cleanup)", len(gc_rows)) batch_limit = min(max_workers or config.MAX_FIX_PER_CYCLE, config.MAX_FIX_PER_CYCLE) diff --git a/lib/merge.py b/lib/merge.py index 0951d87..6c92ff1 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -1735,7 +1735,7 @@ async def _handle_permanent_conflicts(conn) -> int: await close_pr(conn, pr_number, last_error='conflict_permanent: closed + filed in archive', - close_on_forgejo=False) # Already closed at line 1718 + close_on_forgejo=False) # Already closed above (Forgejo PATCH at top of loop) handled += 1 logger.info("Permanent conflict handled: PR #%d closed, source filed", pr_number) diff --git a/lib/pr_state.py b/lib/pr_state.py index c5b50fe..10dcf71 100644 --- a/lib/pr_state.py +++ b/lib/pr_state.py @@ -185,6 +185,42 @@ def reopen_pr( conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params) +def start_fixing(conn, pr_number: int) -> bool: + """Atomically claim PR for fixing (status open -> fixing). + + Also increments fix_attempts and sets last_attempt in one statement. + Returns True if claimed, False if already claimed. + """ + cursor = conn.execute( + "UPDATE prs SET status = 'fixing', " + "fix_attempts = COALESCE(fix_attempts, 0) + 1, " + "last_attempt = datetime('now') " + "WHERE number = ? AND status = 'open'", + (pr_number,), + ) + return cursor.rowcount > 0 + + +def reset_for_reeval(conn, pr_number: int): + """Reset a PR for re-evaluation after a fix. + + Clears all eval state so the PR goes through the full eval cycle again. + Used by both mechanical fixer and substantive fixer after successful fixes. + """ + conn.execute( + """UPDATE prs SET + status = 'open', + eval_attempts = 0, + eval_issues = '[]', + tier0_pass = NULL, + domain_verdict = 'pending', + leo_verdict = 'pending', + last_error = NULL + WHERE number = ?""", + (pr_number,), + ) + + def start_review(conn, pr_number: int) -> bool: """Atomically claim PR for review (status open -> reviewing). diff --git a/lib/substantive_fixer.py b/lib/substantive_fixer.py index 6b7e8ca..9fa33c3 100644 --- a/lib/substantive_fixer.py +++ b/lib/substantive_fixer.py @@ -24,6 +24,7 @@ from pathlib import Path from . import config, db from .forgejo import api as forgejo_api, get_agent_token, get_pr_diff, repo_path +from .pr_state import close_pr, reset_for_reeval, start_fixing from .llm import openrouter_call logger = logging.getLogger("pipeline.substantive_fixer") @@ -225,20 +226,10 @@ def _classify_substantive(issues: list[str]) -> str: async def _fix_pr(conn, pr_number: int) -> dict: """Attempt a substantive fix on a single PR. Returns result dict.""" - # Atomic claim - cursor = conn.execute( - "UPDATE prs SET status = 'fixing', last_attempt = datetime('now') WHERE number = ? AND status = 'open'", - (pr_number,), - ) - if cursor.rowcount == 0: + # Atomic claim — prevent concurrent fixers and evaluators + if not start_fixing(conn, pr_number): return {"pr": pr_number, "skipped": True, "reason": "not_open"} - # Increment fix attempts - conn.execute( - "UPDATE prs SET fix_attempts = COALESCE(fix_attempts, 0) + 1 WHERE number = ?", - (pr_number,), - ) - row = conn.execute( "SELECT branch, source_path, domain, eval_issues, fix_attempts FROM prs WHERE number = ?", (pr_number,), @@ -271,10 +262,7 @@ async def _fix_pr(conn, pr_number: int) -> dict: if classification == "droppable": logger.info("PR #%d: droppable (%s) — closing", pr_number, issues) - conn.execute( - "UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?", - (f"droppable: {issues}", pr_number), - ) + await close_pr(conn, pr_number, last_error=f"droppable: {issues}") return {"pr": pr_number, "action": "closed_droppable", "issues": issues} # Refresh main worktree for source read (Ganymede: ensure freshness) @@ -302,11 +290,8 @@ async def _fix_pr(conn, pr_number: int) -> dict: conn, pr_number, claim_files, domain, ) if result.get("converted"): - conn.execute( - "UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?", - (f"auto-enriched: {result['target_claim']} (sim={result['similarity']:.2f})", pr_number), - ) - await forgejo_api("PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"}) + await close_pr(conn, pr_number, + last_error=f"auto-enriched: {result['target_claim']} (sim={result['similarity']:.2f})") await forgejo_api("POST", repo_path(f"issues/{pr_number}/comments"), { "body": ( f"**Auto-converted:** Evidence from this PR enriched " @@ -394,18 +379,7 @@ async def _fix_pr(conn, pr_number: int) -> dict: return {"pr": pr_number, "skipped": True, "reason": "nothing_to_commit"} # Reset eval state BEFORE push (same pattern as fixer.py) - conn.execute( - """UPDATE prs SET - status = 'open', - eval_attempts = 0, - eval_issues = '[]', - tier0_pass = NULL, - domain_verdict = 'pending', - leo_verdict = 'pending', - last_error = NULL - WHERE number = ?""", - (pr_number,), - ) + reset_for_reeval(conn, pr_number) rc, out = await _git("push", "origin", branch, cwd=worktree_path, timeout=30) if rc != 0: @@ -499,13 +473,7 @@ async def _auto_convert_near_duplicate( async def _close_and_reextract(conn, pr_number: int, issues: list[str]): """Close PR and mark source for re-extraction with feedback.""" - await forgejo_api( - "PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"}, - ) - conn.execute( - "UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?", - (f"unfixable: {', '.join(issues)}", pr_number), - ) + await close_pr(conn, pr_number, last_error=f"unfixable: {', '.join(issues)}") conn.execute( """UPDATE sources SET status = 'needs_reextraction', feedback = ?, updated_at = datetime('now') diff --git a/tests/test_pr_state.py b/tests/test_pr_state.py index c377020..6c0cb98 100644 --- a/tests/test_pr_state.py +++ b/tests/test_pr_state.py @@ -21,6 +21,8 @@ from lib.pr_state import ( mark_conflict_permanent, mark_merged, reopen_pr, + reset_for_reeval, + start_fixing, start_review, ) @@ -51,6 +53,8 @@ def _make_db(): last_error TEXT, merged_at TEXT, last_attempt TEXT, + tier0_pass INTEGER, + fix_attempts INTEGER DEFAULT 0, cost_usd REAL DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ) @@ -334,3 +338,88 @@ class TestStartReview: assert start_review(conn, 100) is True assert start_review(conn, 100) is False + + +# --------------------------------------------------------------------------- +# start_fixing +# --------------------------------------------------------------------------- + +class TestStartFixing: + def test_claims_open_pr(self): + conn = _make_db() + _insert_pr(conn, 200) + + assert start_fixing(conn, 200) is True + + row = _get_pr(conn, 200) + assert row["status"] == "fixing" + assert row["fix_attempts"] == 1 + + def test_increments_fix_attempts(self): + conn = _make_db() + _insert_pr(conn, 200, fix_attempts=3) + + assert start_fixing(conn, 200) is True + + row = _get_pr(conn, 200) + assert row["fix_attempts"] == 4 + + def test_sets_last_attempt(self): + conn = _make_db() + _insert_pr(conn, 200) + + start_fixing(conn, 200) + + row = _get_pr(conn, 200) + assert row["last_attempt"] is not None + + def test_rejects_non_open_pr(self): + conn = _make_db() + _insert_pr(conn, 200, status="reviewing") + + assert start_fixing(conn, 200) is False + + def test_double_claim_fails(self): + conn = _make_db() + _insert_pr(conn, 200) + + assert start_fixing(conn, 200) is True + assert start_fixing(conn, 200) is False + + +# --------------------------------------------------------------------------- +# reset_for_reeval +# --------------------------------------------------------------------------- + +class TestResetForReeval: + def test_resets_all_eval_state(self): + conn = _make_db() + _insert_pr(conn, 300, status="fixing", + eval_attempts=3, leo_verdict="request_changes", + domain_verdict="approve") + conn.execute( + "UPDATE prs SET eval_issues = ?, tier0_pass = 1, last_error = 'some error' WHERE number = 300", + ('["broken_wiki_links"]',), + ) + + reset_for_reeval(conn, 300) + + row = _get_pr(conn, 300) + assert row["status"] == "open" + assert row["eval_attempts"] == 0 + assert row["eval_issues"] == "[]" + assert row["tier0_pass"] is None + assert row["domain_verdict"] == "pending" + assert row["leo_verdict"] == "pending" + assert row["last_error"] is None + + def test_preserves_non_eval_fields(self): + conn = _make_db() + _insert_pr(conn, 300, status="fixing", domain="internet-finance", + fix_attempts=2) + + reset_for_reeval(conn, 300) + + row = _get_pr(conn, 300) + assert row["domain"] == "internet-finance" + assert row["fix_attempts"] == 2