"""PR state transitions — single source of truth for all status changes. Every UPDATE prs SET status = ... MUST go through this module. Invariants enforced: - close: always syncs Forgejo (opt-out for reconciliation only) - approve: requires non-empty domain (ValueError) - merged: always sets merged_at, clears last_error - conflict: always increments merge_failures, sets merge_cycled Why this exists: 36 hand-crafted status transitions across evaluate.py and merge.py produced 3 incidents (domain NULL, Forgejo ghost PRs, merge_cycled missing). Centralizing eliminates the entire class of "forgot to update X in this one code path" bugs. """ import logging from .forgejo import api as forgejo_api, repo_path logger = logging.getLogger("pipeline.pr_state") async def close_pr( conn, pr_number: int, *, last_error: str = None, merge_cycled: bool = False, inc_merge_failures: bool = False, close_on_forgejo: bool = True, ): """Close a PR in DB and on Forgejo. Args: close_on_forgejo: False only when caller already closed on Forgejo (reconciliation, ghost PR cleanup after manual close). """ if close_on_forgejo: await forgejo_api("PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"}) parts = ["status = 'closed'"] params = [] if last_error is not None: parts.append("last_error = ?") params.append(last_error) if merge_cycled: parts.append("merge_cycled = 1") if inc_merge_failures: parts.append("merge_failures = COALESCE(merge_failures, 0) + 1") params.append(pr_number) conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params) def approve_pr( conn, pr_number: int, *, domain: str, auto_merge: int = 0, leo_verdict: str = None, domain_verdict: str = None, ): """Approve a PR. Raises ValueError if domain is empty/None.""" if not domain: raise ValueError(f"Cannot approve PR #{pr_number} without domain") parts = ["status = 'approved'", "domain = COALESCE(domain, ?)"] params = [domain] parts.append("auto_merge = ?") params.append(auto_merge) if leo_verdict is not None: parts.append("leo_verdict = ?") params.append(leo_verdict) if domain_verdict is not None: parts.append("domain_verdict = ?") params.append(domain_verdict) params.append(pr_number) conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params) def mark_merged(conn, pr_number: int): """Mark PR as merged. Always sets merged_at, clears last_error.""" conn.execute( "UPDATE prs SET status = 'merged', merged_at = datetime('now'), " "last_error = NULL WHERE number = ?", (pr_number,), ) def mark_conflict(conn, pr_number: int, *, last_error: str = None): """Mark PR as conflict. Always increments merge_failures, sets merge_cycled.""" conn.execute( "UPDATE prs SET status = 'conflict', merge_cycled = 1, " "merge_failures = COALESCE(merge_failures, 0) + 1, " "last_error = ? WHERE number = ?", (last_error, pr_number), ) def mark_conflict_permanent( conn, pr_number: int, *, last_error: str = None, conflict_rebase_attempts: int = None, ): """Mark PR as permanently conflicted (no more retries).""" parts = ["status = 'conflict_permanent'"] params = [] if last_error is not None: parts.append("last_error = ?") params.append(last_error) if conflict_rebase_attempts is not None: parts.append("conflict_rebase_attempts = ?") params.append(conflict_rebase_attempts) params.append(pr_number) conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params) def reopen_pr( conn, pr_number: int, *, leo_verdict: str = None, domain_verdict: str = None, last_error: str = None, eval_issues: str = None, dec_eval_attempts: bool = False, reset_for_reeval: bool = False, conflict_rebase_attempts: int = None, ): """Set PR back to open. Covers all reopen scenarios: - Transient failure (API error): no extra args - Rejection: leo_verdict + last_error + eval_issues - Batch overflow: dec_eval_attempts=True - Conflict resolved: reset_for_reeval=True """ parts = ["status = 'open'"] params = [] if reset_for_reeval: parts.extend([ "leo_verdict = 'pending'", "domain_verdict = 'pending'", "eval_attempts = 0", ]) else: if leo_verdict is not None: parts.append("leo_verdict = ?") params.append(leo_verdict) if domain_verdict is not None: parts.append("domain_verdict = ?") params.append(domain_verdict) if last_error is not None: parts.append("last_error = ?") params.append(last_error) if eval_issues is not None: parts.append("eval_issues = ?") params.append(eval_issues) if dec_eval_attempts: parts.append("eval_attempts = COALESCE(eval_attempts, 1) - 1") if conflict_rebase_attempts is not None: parts.append("conflict_rebase_attempts = ?") params.append(conflict_rebase_attempts) params.append(pr_number) conn.execute(f"UPDATE prs SET {', '.join(parts)} WHERE number = ?", params) def start_review(conn, pr_number: int) -> bool: """Atomically claim PR for review (status open -> reviewing). Returns True if claimed, False if already claimed by another worker. """ cursor = conn.execute( "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", (pr_number,), ) return cursor.rowcount > 0