teleo-infrastructure/lib/pr_state.py
m3taversal 376b77999f
Some checks are pending
CI / lint-and-test (push) Waiting to run
refactor: Phase 3 — fix close_pr ghost bug, wire stale_pr, extract eval_parse
Critical bug fix: close_pr now checks forgejo_api return value and
skips DB update on Forgejo failure, preventing ghost PRs (DB closed,
Forgejo open). Returns bool so callers can handle failures.

_terminate_pr checks return value — skips source requeue on failure.
stale_pr.py migrated from raw Forgejo+DB to close_pr (last raw close
transition eliminated).

eval_parse.py: 15 pure parsing functions extracted from evaluate.py
(~370 lines removed). Zero I/O, zero async, independently testable.
evaluate.py drops from ~1510 to ~1140 lines.

Tests: 295 passed (42 new eval_parse + 2 new close_pr), zero regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 12:40:23 +01:00

241 lines
7.2 KiB
Python

"""PR state transitions — single source of truth for all status changes.
Every UPDATE prs SET status = ... MUST go through this module.
Invariants enforced:
- close: always syncs Forgejo (opt-out for reconciliation only)
- approve: requires non-empty domain (ValueError)
- merged: always sets merged_at, clears last_error
- conflict: always increments merge_failures, sets merge_cycled
Why this exists: 36 hand-crafted status transitions across evaluate.py
and merge.py produced 3 incidents (domain NULL, Forgejo ghost PRs,
merge_cycled missing). Centralizing eliminates the entire class of
"forgot to update X in this one code path" bugs.
"""
import logging
from .forgejo import api as forgejo_api, repo_path
logger = logging.getLogger("pipeline.pr_state")
async def close_pr(
conn,
pr_number: int,
*,
last_error: str = None,
merge_cycled: bool = False,
inc_merge_failures: bool = False,
close_on_forgejo: bool = True,
) -> bool:
"""Close a PR in DB and on Forgejo. Returns True on success, False on Forgejo failure.
Args:
close_on_forgejo: False only when caller already closed on Forgejo
(reconciliation, ghost PR cleanup after manual close).
If Forgejo API fails, the DB update is SKIPPED to prevent ghost PRs
(DB says closed, Forgejo says open). The reconciliation loop in
merge.py._reconcile_db_state catches any that slip through.
"""
if close_on_forgejo:
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"})
if result is None:
logger.error("close_pr: Forgejo API failed for PR #%d, skipping DB update", pr_number)
return False
parts = ["status = 'closed'"]
params = []
if last_error is not None:
parts.append("last_error = ?")
params.append(last_error)
if merge_cycled:
parts.append("merge_cycled = 1")
if inc_merge_failures:
parts.append("merge_failures = COALESCE(merge_failures, 0) + 1")
params.append(pr_number)
conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params)
return True
def approve_pr(
conn,
pr_number: int,
*,
domain: str,
auto_merge: int = 0,
leo_verdict: str = None,
domain_verdict: str = None,
):
"""Approve a PR. Raises ValueError if domain is empty/None."""
if not domain:
raise ValueError(f"Cannot approve PR #{pr_number} without domain")
parts = ["status = 'approved'", "domain = COALESCE(domain, ?)"]
params = [domain]
parts.append("auto_merge = ?")
params.append(auto_merge)
if leo_verdict is not None:
parts.append("leo_verdict = ?")
params.append(leo_verdict)
if domain_verdict is not None:
parts.append("domain_verdict = ?")
params.append(domain_verdict)
params.append(pr_number)
conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params)
def mark_merged(conn, pr_number: int):
"""Mark PR as merged. Always sets merged_at, clears last_error."""
conn.execute(
"UPDATE prs SET status = 'merged', merged_at = datetime('now'), "
"last_error = NULL WHERE number = ?",
(pr_number,),
)
def mark_conflict(conn, pr_number: int, *, last_error: str = None):
"""Mark PR as conflict. Always increments merge_failures, sets merge_cycled."""
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, "
"merge_failures = COALESCE(merge_failures, 0) + 1, "
"last_error = ? WHERE number = ?",
(last_error, pr_number),
)
def mark_conflict_permanent(
conn,
pr_number: int,
*,
last_error: str = None,
conflict_rebase_attempts: int = None,
):
"""Mark PR as permanently conflicted (no more retries)."""
parts = ["status = 'conflict_permanent'"]
params = []
if last_error is not None:
parts.append("last_error = ?")
params.append(last_error)
if conflict_rebase_attempts is not None:
parts.append("conflict_rebase_attempts = ?")
params.append(conflict_rebase_attempts)
params.append(pr_number)
conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params)
def reopen_pr(
conn,
pr_number: int,
*,
leo_verdict: str = None,
domain_verdict: str = None,
last_error: str = None,
eval_issues: str = None,
dec_eval_attempts: bool = False,
reset_for_reeval: bool = False,
conflict_rebase_attempts: int = None,
):
"""Set PR back to open.
Covers all reopen scenarios:
- Transient failure (API error): no extra args
- Rejection: leo_verdict + last_error + eval_issues
- Batch overflow: dec_eval_attempts=True
- Conflict resolved: reset_for_reeval=True
"""
parts = ["status = 'open'"]
params = []
if reset_for_reeval:
parts.extend([
"leo_verdict = 'pending'",
"domain_verdict = 'pending'",
"eval_attempts = 0",
])
else:
if leo_verdict is not None:
parts.append("leo_verdict = ?")
params.append(leo_verdict)
if domain_verdict is not None:
parts.append("domain_verdict = ?")
params.append(domain_verdict)
if last_error is not None:
parts.append("last_error = ?")
params.append(last_error)
if eval_issues is not None:
parts.append("eval_issues = ?")
params.append(eval_issues)
if dec_eval_attempts:
parts.append("eval_attempts = COALESCE(eval_attempts, 1) - 1")
if conflict_rebase_attempts is not None:
parts.append("conflict_rebase_attempts = ?")
params.append(conflict_rebase_attempts)
params.append(pr_number)
conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params)
def start_fixing(conn, pr_number: int) -> bool:
"""Atomically claim PR for fixing (status open -> fixing).
Also increments fix_attempts and sets last_attempt in one statement.
Returns True if claimed, False if already claimed.
"""
cursor = conn.execute(
"UPDATE prs SET status = 'fixing', "
"fix_attempts = COALESCE(fix_attempts, 0) + 1, "
"last_attempt = datetime('now') "
"WHERE number = ? AND status = 'open'",
(pr_number,),
)
return cursor.rowcount > 0
def reset_for_reeval(conn, pr_number: int):
"""Reset a PR for re-evaluation after a fix.
Clears all eval state so the PR goes through the full eval cycle again.
Used by both mechanical fixer and substantive fixer after successful fixes.
"""
conn.execute(
"""UPDATE prs SET
status = 'open',
eval_attempts = 0,
eval_issues = '[]',
tier0_pass = NULL,
domain_verdict = 'pending',
leo_verdict = 'pending',
last_error = NULL
WHERE number = ?""",
(pr_number,),
)
def start_review(conn, pr_number: int) -> bool:
"""Atomically claim PR for review (status open -> reviewing).
Returns True if claimed, False if already claimed by another worker.
"""
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
return cursor.rowcount > 0