"""Merge stage — domain-serialized priority queue with rebase-before-merge. Design reviewed by Ganymede (round 2) and Rhea. Key decisions: - Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery) - Rebase-before-merge with pinned force-with-lease SHA (Ganymede) - Priority queue: COALESCE(p.priority, s.priority, 'medium') — PR > source > default - Human PRs default to 'high', not 'critical' (Ganymede — prevents DoS on pipeline) - 5-minute merge timeout — force-reset to 'conflict' (Rhea) - Ack comment on human PR discovery (Rhea) - Pagination on all Forgejo list endpoints (Ganymede standing rule) """ import asyncio import json import logging from collections import defaultdict from . import config, db from .domains import detect_domain_from_branch from .forgejo import api as forgejo_api from .forgejo import repo_path logger = logging.getLogger("pipeline.merge") # In-memory domain locks — fast path, lost on crash (durable layer is prs.status) _domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) # Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea) MERGE_TIMEOUT_SECONDS = 300 # 5 minutes # --- Git helpers --- async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: """Run a git command async. Returns (returncode, stdout+stderr).""" proc = await asyncio.create_subprocess_exec( "git", *args, cwd=cwd or str(config.REPO_DIR), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return -1, f"git {args[0]} timed out after {timeout}s" output = (stdout or b"").decode().strip() if stderr: output += "\n" + stderr.decode().strip() return proc.returncode, output # --- PR Discovery (Multiplayer v1) --- async def discover_external_prs(conn) -> int: """Scan Forgejo for open PRs not tracked in SQLite. Human PRs (non-pipeline author) get priority 'high' and origin 'human'. Critical is reserved for explicit human override only. (Ganymede) Pagination on all Forgejo list endpoints. (Ganymede standing rule #5) """ known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()} discovered = 0 page = 1 while True: prs = await forgejo_api( "GET", repo_path(f"pulls?state=open&limit=50&page={page}"), ) if not prs: break for pr in prs: if pr["number"] not in known: # Detect origin: pipeline agents have per-agent Forgejo users pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"} author = pr.get("user", {}).get("login", "") is_pipeline = author.lower() in pipeline_users origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) conn.execute( """INSERT OR IGNORE INTO prs (number, branch, status, origin, priority, domain) VALUES (?, ?, 'open', ?, ?, ?)""", (pr["number"], pr["head"]["ref"], origin, priority, domain), ) db.audit( conn, "merge", "pr_discovered", json.dumps( { "pr": pr["number"], "origin": origin, "author": pr.get("user", {}).get("login"), "priority": priority or "inherited", } ), ) # Ack comment on human PRs so contributor feels acknowledged (Rhea) if origin == "human": await _post_ack_comment(pr["number"]) discovered += 1 if len(prs) < 50: break # Last page page += 1 if discovered: logger.info("Discovered %d external PRs", discovered) return discovered async def _post_ack_comment(pr_number: int): """Post acknowledgment comment on human-submitted PR. (Rhea) Contributor should feel acknowledged immediately, not wonder if their PR disappeared into a void. """ body = ( "Thanks for the contribution! Your PR is queued for evaluation " "(priority: high). Expected review time: ~5 minutes.\n\n" "_This is an automated message from the Teleo pipeline._" ) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": body}, ) # --- Merge operations --- async def _claim_next_pr(conn, domain: str) -> dict | None: """Claim the next approved PR for a domain via atomic UPDATE. Priority inheritance: COALESCE(p.priority, s.priority, 'medium') - Explicit PR priority (human PRs) > source priority (pipeline) > default medium - NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2) - This is intentional: unclassified PRs don't jump ahead of triaged ones (Rhea: document the precedence for future maintainers) NOT EXISTS enforces domain serialization in SQL — defense-in-depth even if asyncio.Lock is bypassed. (Ganymede: approved) """ row = conn.execute( """UPDATE prs SET status = 'merging', last_attempt = datetime('now') WHERE number = ( SELECT p.number FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'approved' AND p.domain = ? AND NOT EXISTS ( SELECT 1 FROM prs p2 WHERE p2.domain = p.domain AND p2.status = 'merging' ) ORDER BY CASE COALESCE(p.priority, s.priority, 'medium') WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END, p.created_at ASC LIMIT 1 ) RETURNING number, source_path, branch, domain""", (domain,), ).fetchone() return dict(row) if row else None async def _rebase_and_push(branch: str) -> tuple[bool, str]: """Rebase branch onto main and force-push with pinned SHA. Always use --force-with-lease with pinned SHA for ALL branches — pipeline and human. No split logic. (Ganymede) """ worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}" # Create worktree for the branch rc, out = await _git("worktree", "add", worktree_path, branch) if rc != 0: return False, f"worktree add failed: {out}" try: # Capture expected SHA before rebase (Ganymede: pin for force-with-lease) rc, expected_sha = await _git("rev-parse", f"origin/{branch}", cwd=worktree_path) if rc != 0: return False, f"rev-parse failed: {expected_sha}" expected_sha = expected_sha.strip().split("\n")[0] # First line only # Fetch latest main rc, out = await _git("fetch", "origin", "main", cwd=worktree_path) if rc != 0: return False, f"fetch failed: {out}" # Check if rebase is needed rc, merge_base = await _git("merge-base", "origin/main", "HEAD", cwd=worktree_path) rc2, main_sha = await _git("rev-parse", "origin/main", cwd=worktree_path) if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip(): # Already up to date, no rebase needed return True, "already up to date" # Rebase onto main rc, out = await _git("rebase", "origin/main", cwd=worktree_path, timeout=120) if rc != 0: # Rebase conflict await _git("rebase", "--abort", cwd=worktree_path) return False, f"rebase conflict: {out}" # Force-push with pinned SHA (Ganymede: defeats tracking-ref update race) rc, out = await _git( "push", f"--force-with-lease={branch}:{expected_sha}", "origin", f"HEAD:{branch}", cwd=worktree_path, timeout=30, ) if rc != 0: return False, f"push rejected: {out}" return True, "rebased and pushed" finally: # Cleanup worktree await _git("worktree", "remove", "--force", worktree_path) async def _merge_pr(pr_number: int) -> tuple[bool, str]: """Merge PR via Forgejo API. Preserves PR metadata and reviewer attribution.""" # Check if already merged/closed on Forgejo (prevents 405 on re-merge attempts) pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}")) if pr_info: if pr_info.get("merged"): logger.info("PR #%d already merged on Forgejo, syncing status", pr_number) return True, "already merged" if pr_info.get("state") == "closed": logger.warning("PR #%d closed on Forgejo but not merged", pr_number) return False, "PR closed without merge" result = await forgejo_api( "POST", repo_path(f"pulls/{pr_number}/merge"), {"Do": "merge", "merge_message_field": ""}, ) if result is None: return False, "Forgejo merge API failed" return True, "merged" async def _delete_remote_branch(branch: str): """Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch) If DELETE fails, log and move on — stale branch is cosmetic, stale merge is operational. """ result = await forgejo_api( "DELETE", repo_path(f"branches/{branch}"), ) if result is None: logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch) # --- Domain merge task --- async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: """Process the merge queue for a single domain. Returns (succeeded, failed).""" succeeded = 0 failed = 0 while True: async with _domain_locks[domain]: pr = await _claim_next_pr(conn, domain) if not pr: break # No more approved PRs for this domain pr_num = pr["number"] branch = pr["branch"] logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain) try: # Rebase with timeout (Rhea: 5 min max, then force-reset to conflict) rebase_ok, rebase_msg = await asyncio.wait_for( _rebase_and_push(branch), timeout=MERGE_TIMEOUT_SECONDS, ) except asyncio.TimeoutError: logger.error( "PR #%d merge timed out after %ds — resetting to conflict (Rhea)", pr_num, MERGE_TIMEOUT_SECONDS ) conn.execute( "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", (f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num), ) db.audit(conn, "merge", "timeout", json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS})) failed += 1 continue if not rebase_ok: logger.warning("PR #%d rebase failed: %s", pr_num, rebase_msg) conn.execute( "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", (rebase_msg[:500], pr_num), ) db.audit(conn, "merge", "rebase_failed", json.dumps({"pr": pr_num, "error": rebase_msg[:200]})) failed += 1 continue # Merge via API merge_ok, merge_msg = await _merge_pr(pr_num) if not merge_ok: logger.error("PR #%d API merge failed: %s", pr_num, merge_msg) conn.execute( "UPDATE prs SET status = 'conflict', last_error = ? WHERE number = ?", (merge_msg[:500], pr_num), ) db.audit(conn, "merge", "api_merge_failed", json.dumps({"pr": pr_num, "error": merge_msg[:200]})) failed += 1 continue # Success — update status and cleanup conn.execute( """UPDATE prs SET status = 'merged', merged_at = datetime('now'), last_error = NULL WHERE number = ?""", (pr_num,), ) db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch})) logger.info("PR #%d merged successfully", pr_num) # Delete remote branch immediately (Ganymede Q4) await _delete_remote_branch(branch) # Prune local worktree metadata await _git("worktree", "prune") succeeded += 1 return succeeded, failed # --- Main entry point --- async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one merge cycle across all domains. 1. Discover external PRs (multiplayer v1) 2. Find all domains with approved PRs 3. Launch one async task per domain (cross-domain parallel, same-domain serial) """ # Step 1: Discover external PRs await discover_external_prs(conn) # Step 2: Find domains with approved work rows = conn.execute("SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL").fetchall() domains = [r["domain"] for r in rows] # Also check for NULL-domain PRs (human PRs with undetected domain) null_domain = conn.execute("SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL").fetchone() if null_domain and null_domain["c"] > 0: logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", null_domain["c"]) if not domains: return 0, 0 # Step 3: Merge all domains concurrently tasks = [_merge_domain_queue(conn, domain) for domain in domains] results = await asyncio.gather(*tasks, return_exceptions=True) total_succeeded = 0 total_failed = 0 for i, result in enumerate(results): if isinstance(result, Exception): logger.exception("Domain %s merge failed with exception", domains[i]) total_failed += 1 else: s, f = result total_succeeded += s total_failed += f if total_succeeded or total_failed: logger.info( "Merge cycle: %d succeeded, %d failed across %d domains", total_succeeded, total_failed, len(domains) ) return total_succeeded, total_failed