From 552f44ec1c05280c80ca283842d274280ab14063 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Wed, 15 Apr 2026 17:19:48 +0100 Subject: [PATCH] fix: add migration v20 for conflict retry columns + serialize worktree ops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit db.py: migration v20 adds conflict_rebase_attempts, merge_failures, merge_cycled columns (already exist on VPS via manual migration, missing from code — any future DB rebuild would break retry mechanism). merge.py: replace retry-with-backoff on config.lock with asyncio.Lock (_bare_repo_lock) around all worktree add/remove calls. Prevents contention instead of retrying it. Applied to both _cherry_pick_onto_main and _merge_reweave_pr. Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/db.py | 15 ++++++++++++++- lib/merge.py | 34 +++++++++++++--------------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/lib/db.py b/lib/db.py index 06833f1..d419a6a 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 19 +SCHEMA_VERSION = 20 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -533,6 +533,19 @@ def migrate(conn: sqlite3.Connection): conn.commit() logger.info("Migration v19: added submitted_by to prs and sources tables") + if current < 20: + for col, default in [ + ("conflict_rebase_attempts", "INTEGER DEFAULT 0"), + ("merge_failures", "INTEGER DEFAULT 0"), + ("merge_cycled", "INTEGER DEFAULT 0"), + ]: + try: + conn.execute(f"ALTER TABLE prs ADD COLUMN {col} {default}") + except sqlite3.OperationalError: + pass + conn.commit() + logger.info("Migration v20: added conflict retry columns to prs") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", diff --git a/lib/merge.py b/lib/merge.py index c1f3ca1..11afc54 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -57,6 +57,9 @@ logger = logging.getLogger("pipeline.merge") # In-memory domain locks — fast path, lost on crash (durable layer is prs.status) _domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) +# Bare repo lock — serializes worktree add/remove to prevent config.lock contention +_bare_repo_lock = asyncio.Lock() + # Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea) MERGE_TIMEOUT_SECONDS = 300 # 5 minutes @@ -314,14 +317,8 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]: commit_list = [c.strip() for c in commits_out.strip().split("\n") if c.strip()] - # Create worktree from origin/main (fresh branch) - # Delete stale local branch if it exists from a previous failed attempt - await _git("branch", "-D", clean_branch) - rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main") - for _retry in range(3): - if rc == 0 or "could not lock config" not in out: - break - await asyncio.sleep(random.uniform(0.5, 2.0 * (_retry + 1))) + # Serialize worktree add/remove — concurrent calls hit bare repo config.lock + async with _bare_repo_lock: await _git("branch", "-D", clean_branch) rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main") if rc != 0: @@ -407,9 +404,9 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]: return True, "cherry-picked and pushed" finally: - # Cleanup worktree and temp branch - await _git("worktree", "remove", "--force", worktree_path) - await _git("branch", "-D", clean_branch) + async with _bare_repo_lock: + await _git("worktree", "remove", "--force", worktree_path) + await _git("branch", "-D", clean_branch) REWEAVE_EDGE_FIELDS = ("supports", "challenges", "challenged_by", "depends_on", "related", "reweave_edges") @@ -575,14 +572,8 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: if not changed_files: return False, "no .md files changed" - # Pre-cleanup: remove stale worktree/branch from prior crash (SIGKILL, OOM, etc.) - await _git("worktree", "remove", "--force", worktree_path) - await _git("branch", "-D", clean_branch) - rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main") - for _retry in range(3): - if rc == 0 or "could not lock config" not in out: - break - await asyncio.sleep(random.uniform(0.5, 2.0 * (_retry + 1))) + async with _bare_repo_lock: + await _git("worktree", "remove", "--force", worktree_path) await _git("branch", "-D", clean_branch) rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main") if rc != 0: @@ -689,8 +680,9 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: return True, result_msg finally: - await _git("worktree", "remove", "--force", worktree_path) - await _git("branch", "-D", clean_branch) + async with _bare_repo_lock: + await _git("worktree", "remove", "--force", worktree_path) + await _git("branch", "-D", clean_branch) async def _resubmit_approvals(pr_number: int):