"""Merge stage — domain-serialized priority queue with rebase-before-merge. Design reviewed by Ganymede (round 2) and Rhea. Key decisions: - Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery) - Rebase-before-merge with pinned force-with-lease SHA (Ganymede) - Priority queue: COALESCE(p.priority, s.priority, 'medium') — PR > source > default - Human PRs default to 'high', not 'critical' (Ganymede — prevents DoS on pipeline) - 5-minute merge timeout — force-reset to 'conflict' (Rhea) - Ack comment on human PR discovery (Rhea) - Pagination on all Forgejo list endpoints (Ganymede standing rule) """ import asyncio import json import logging import os import random import re import shutil from collections import defaultdict from . import config, db from .db import classify_branch from .dedup import dedup_evidence_blocks from .domains import detect_domain_from_branch from .cascade import cascade_after_merge from .forgejo import api as forgejo_api # Pipeline-owned branch prefixes — these get auto-merged via cherry-pick. # Originally restricted to pipeline-only branches because rebase orphaned agent commits. # Now safe for all branches: cherry-pick creates a fresh branch from main, never # rewrites the source branch. (Original issue: Leo directive, PRs #2141, #157, #2142, #2180) PIPELINE_OWNED_PREFIXES = ( "extract/", "ingestion/", "epimetheus/", "reweave/", "fix/", "theseus/", "rio/", "astra/", "vida/", "clay/", "leo/", "argus/", "oberon/", ) # Import worktree lock — file at /opt/teleo-eval/pipeline/lib/worktree_lock.py try: from .worktree_lock import async_main_worktree_lock except ImportError: import sys sys.path.insert(0, os.path.dirname(__file__)) from worktree_lock import async_main_worktree_lock from .forgejo import get_agent_token, get_pr_diff, repo_path logger = logging.getLogger("pipeline.merge") # In-memory domain locks — fast path, lost on crash (durable layer is prs.status) _domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) # Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea) MERGE_TIMEOUT_SECONDS = 300 # 5 minutes # --- Git helpers --- async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: """Run a git command async. Returns (returncode, stdout+stderr).""" proc = await asyncio.create_subprocess_exec( "git", *args, cwd=cwd or str(config.REPO_DIR), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return -1, f"git {args[0]} timed out after {timeout}s" output = (stdout or b"").decode().strip() if stderr: output += "\n" + stderr.decode().strip() return proc.returncode, output # --- PR Discovery (Multiplayer v1) --- async def discover_external_prs(conn) -> int: """Scan Forgejo for open PRs not tracked in SQLite. Human PRs (non-pipeline author) get priority 'high' and origin 'human'. Critical is reserved for explicit human override only. (Ganymede) Pagination on all Forgejo list endpoints. (Ganymede standing rule #5) """ known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()} discovered = 0 page = 1 while True: prs = await forgejo_api( "GET", repo_path(f"pulls?state=open&limit=50&page={page}"), ) if not prs: break for pr in prs: if pr["number"] not in known: # Detect origin: pipeline agents have per-agent Forgejo users pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"} author = pr.get("user", {}).get("login", "") is_pipeline = author.lower() in pipeline_users origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) agent, commit_type = classify_branch(pr["head"]["ref"]) conn.execute( """INSERT OR IGNORE INTO prs (number, branch, status, origin, priority, domain, agent, commit_type) VALUES (?, ?, 'open', ?, ?, ?, ?, ?)""", (pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type), ) db.audit( conn, "merge", "pr_discovered", json.dumps( { "pr": pr["number"], "origin": origin, "author": pr.get("user", {}).get("login"), "priority": priority or "inherited", } ), ) # Ack comment on human PRs so contributor feels acknowledged (Rhea) if origin == "human": await _post_ack_comment(pr["number"]) discovered += 1 if len(prs) < 50: break # Last page page += 1 if discovered: logger.info("Discovered %d external PRs", discovered) return discovered async def _post_ack_comment(pr_number: int): """Post acknowledgment comment on human-submitted PR. (Rhea) Contributor should feel acknowledged immediately, not wonder if their PR disappeared into a void. """ body = ( "Thanks for the contribution! Your PR is queued for evaluation " "(priority: high). Expected review time: ~5 minutes.\n\n" "_This is an automated message from the Teleo pipeline._" ) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": body}, ) # --- Merge operations --- async def _claim_next_pr(conn, domain: str) -> dict | None: """Claim the next approved PR for a domain via atomic UPDATE. Priority inheritance: COALESCE(p.priority, s.priority, 'medium') - Explicit PR priority (human PRs) > source priority (pipeline) > default medium - NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2) - This is intentional: unclassified PRs don't jump ahead of triaged ones (Rhea: document the precedence for future maintainers) NOT EXISTS enforces domain serialization in SQL — defense-in-depth even if asyncio.Lock is bypassed. (Ganymede: approved) """ # Build prefix filter for pipeline-owned branches only # Agent branches stay approved but are NOT auto-merged (Leo: PRs #2141, #157, #2142, #2180) prefix_clauses = " OR ".join("p.branch LIKE ?" for _ in PIPELINE_OWNED_PREFIXES) prefix_params = [f"{pfx}%" for pfx in PIPELINE_OWNED_PREFIXES] row = conn.execute( f"""UPDATE prs SET status = 'merging', last_attempt = datetime('now') WHERE number = ( SELECT p.number FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'approved' AND p.domain = ? AND ({prefix_clauses}) AND NOT EXISTS ( SELECT 1 FROM prs p2 WHERE p2.domain = p.domain AND p2.status = 'merging' ) ORDER BY CASE COALESCE(p.priority, s.priority, 'medium') WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END, -- Dependency ordering: PRs with fewer broken wiki links merge first. -- "Creator" PRs (0 broken links) land before "consumer" PRs that -- reference them, naturally resolving the dependency chain. (Rhea+Ganymede) CASE WHEN p.eval_issues LIKE '%broken_wiki_links%' THEN 1 ELSE 0 END, p.created_at ASC LIMIT 1 ) RETURNING number, source_path, branch, domain""", (domain, *prefix_params), ).fetchone() return dict(row) if row else None async def _dedup_enriched_files(worktree_path: str) -> int: """Scan rebased worktree for duplicate evidence blocks and dedup them. Returns count of files fixed. """ # Get list of modified claim files in this branch vs origin/main rc, out = await _git("diff", "--name-only", "origin/main..HEAD", cwd=worktree_path) if rc != 0: return 0 fixed = 0 for fpath in out.strip().split("\n"): fpath = fpath.strip() if not fpath or not fpath.endswith(".md"): continue # Only process claim files (domains/, core/, foundations/) if not any(fpath.startswith(p) for p in ("domains/", "core/", "foundations/")): continue full_path = os.path.join(worktree_path, fpath) if not os.path.exists(full_path): continue with open(full_path, "r") as f: content = f.read() deduped = dedup_evidence_blocks(content) if deduped != content: with open(full_path, "w") as f: f.write(deduped) # Stage the fix await _git("add", fpath, cwd=worktree_path) fixed += 1 if fixed > 0: # Amend the last commit to include dedup fixes (no new commit) await _git( "-c", "core.editor=true", "commit", "--amend", "--no-edit", cwd=worktree_path, timeout=30, ) logger.info("Deduped evidence blocks in %d file(s) after rebase", fixed) return fixed async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]: """Cherry-pick extraction commits onto a fresh branch from main. Replaces rebase-retry: extraction commits ADD new files, so cherry-pick applies cleanly ~99% of the time. For enrichments (editing existing files), cherry-pick reports the exact conflict for human review. Leo's manual fix pattern (PRs #2178, #2141, #157, #2142): 1. git checkout -b clean-branch main 2. git cherry-pick 3. Merge to main """ worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}" clean_branch = f"_clean/{branch.replace('/', '-')}" # Fetch latest state — separate calls to avoid refspec issues with long branch names rc, out = await _git("fetch", "origin", "main", timeout=15) if rc != 0: return False, f"fetch main failed: {out}" rc, out = await _git("fetch", "origin", branch, timeout=15) if rc != 0: return False, f"fetch branch failed: {out}" # Check if already up to date rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}") rc2, main_sha = await _git("rev-parse", "origin/main") if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip(): return True, "already up to date" # Get extraction commits (oldest first) rc, commits_out = await _git( "log", f"origin/main..origin/{branch}", "--format=%H", "--reverse", timeout=10, ) if rc != 0 or not commits_out.strip(): return False, f"no commits found on {branch}" commit_list = [c.strip() for c in commits_out.strip().split("\n") if c.strip()] # Create worktree from origin/main (fresh branch) # Delete stale local branch if it exists from a previous failed attempt await _git("branch", "-D", clean_branch) rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main") if rc != 0: return False, f"worktree add failed: {out}" try: # Cherry-pick each extraction commit dropped_entities: set[str] = set() picked_count = 0 for commit_sha in commit_list: # Detect merge commits — cherry-pick needs -m 1 to pick first-parent diff rc_parents, parents_out = await _git( "cat-file", "-p", commit_sha, cwd=worktree_path, timeout=5, ) parent_count = parents_out.count("\nparent ") + (1 if parents_out.startswith("parent ") else 0) is_merge = parent_count >= 2 pick_args = ["cherry-pick"] if is_merge: pick_args.extend(["-m", "1"]) logger.info("Cherry-pick %s: merge commit, using -m 1", commit_sha[:8]) pick_args.append(commit_sha) rc, out = await _git(*pick_args, cwd=worktree_path, timeout=60) if rc != 0 and "empty" in out.lower(): # Content already on main — skip this commit await _git("cherry-pick", "--skip", cwd=worktree_path) logger.info("Cherry-pick %s: empty (already on main), skipping", commit_sha[:8]) continue picked_count += 1 if rc != 0: # Check if conflict is entity-only (same auto-resolution as before) rc_ls, conflicting = await _git( "diff", "--name-only", "--diff-filter=U", cwd=worktree_path ) conflict_files = [ f.strip() for f in conflicting.split("\n") if f.strip() ] if rc_ls == 0 else [] if conflict_files and all(f.startswith("entities/") for f in conflict_files): # Entity conflicts: take main's version (entities are recoverable) # In cherry-pick: --ours = branch we're ON (clean branch from origin/main) # --theirs = commit being cherry-picked (extraction branch) for cf in conflict_files: await _git("checkout", "--ours", cf, cwd=worktree_path) await _git("add", cf, cwd=worktree_path) dropped_entities.update(conflict_files) rc_cont, cont_out = await _git( "-c", "core.editor=true", "cherry-pick", "--continue", cwd=worktree_path, timeout=60, ) if rc_cont != 0: await _git("cherry-pick", "--abort", cwd=worktree_path) return False, f"cherry-pick entity resolution failed on {commit_sha[:8]}: {cont_out}" logger.info( "Cherry-pick entity conflict auto-resolved: dropped %s (recoverable)", ", ".join(sorted(conflict_files)), ) else: # Real conflict — report exactly what conflicted conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200] await _git("cherry-pick", "--abort", cwd=worktree_path) return False, f"cherry-pick conflict on {commit_sha[:8]}: {conflict_detail}" if dropped_entities: logger.info( "Cherry-pick auto-resolved entity conflicts in %s", ", ".join(sorted(dropped_entities)), ) # All commits were empty — content already on main if picked_count == 0: return True, "already merged (all commits empty)" # Post-pick dedup: remove duplicate evidence blocks (Leo: PRs #1751, #1752) await _dedup_enriched_files(worktree_path) # Force-push clean branch as the original branch name # Capture expected SHA for force-with-lease rc, expected_sha = await _git("rev-parse", f"origin/{branch}") if rc != 0: return False, f"rev-parse origin/{branch} failed: {expected_sha}" expected_sha = expected_sha.strip().split("\n")[0] rc, out = await _git( "push", f"--force-with-lease={branch}:{expected_sha}", "origin", f"HEAD:{branch}", cwd=worktree_path, timeout=30, ) if rc != 0: return False, f"push rejected: {out}" return True, "cherry-picked and pushed" finally: # Cleanup worktree and temp branch await _git("worktree", "remove", "--force", worktree_path) await _git("branch", "-D", clean_branch) async def _resubmit_approvals(pr_number: int): """Re-submit 2 formal Forgejo approvals after force-push invalidated them. Force-push (rebase) invalidates existing approvals. Branch protection requires 2 approvals before the merge API will accept the request. Same pattern as evaluate._post_formal_approvals. """ pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}")) pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" approvals = 0 for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: if agent_name == pr_author: continue if approvals >= 2: break token = get_agent_token(agent_name) if token: result = await forgejo_api( "POST", repo_path(f"pulls/{pr_number}/reviews"), {"body": "Approved (post-rebase re-approval).", "event": "APPROVED"}, token=token, ) if result is not None: approvals += 1 logger.debug( "Post-rebase approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals, ) if approvals < 2: logger.warning( "Only %d/2 approvals submitted for PR #%d after rebase", approvals, pr_number, ) async def _merge_pr(pr_number: int) -> tuple[bool, str]: """Merge PR via Forgejo API. CURRENTLY UNUSED — local ff-push is the primary merge path. Kept as fallback: re-enable if Forgejo fixes the 405 bug (Ganymede's API-first design). The local ff-push in _merge_domain_queue replaced this due to persistent 405 errors. """ # Check if already merged/closed on Forgejo (prevents 405 on re-merge attempts) pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}")) if pr_info: if pr_info.get("merged"): logger.info("PR #%d already merged on Forgejo, syncing status", pr_number) return True, "already merged" if pr_info.get("state") == "closed": logger.warning("PR #%d closed on Forgejo but not merged", pr_number) return False, "PR closed without merge" # Merge whitelist only allows leo and m3taversal — use Leo's token leo_token = get_agent_token("leo") if not leo_token: return False, "no leo token for merge (merge whitelist requires leo)" # Pre-flight: verify approvals exist before attempting merge (Rhea: catches 405) reviews = await forgejo_api("GET", repo_path(f"pulls/{pr_number}/reviews")) if reviews is not None: approval_count = sum(1 for r in reviews if r.get("state") == "APPROVED") if approval_count < 2: logger.info("PR #%d: only %d/2 approvals, resubmitting before merge", pr_number, approval_count) await _resubmit_approvals(pr_number) # Retry with backoff + jitter for transient errors (Rhea: jitter prevents thundering herd) delays = [0, 5, 15, 45] for attempt, base_delay in enumerate(delays, 1): if base_delay: jittered = base_delay * (0.8 + random.random() * 0.4) await asyncio.sleep(jittered) result = await forgejo_api( "POST", repo_path(f"pulls/{pr_number}/merge"), {"Do": "merge", "merge_message_field": ""}, token=leo_token, ) if result is not None: return True, "merged" # Check if merge succeeded despite API error (timeout case — Rhea) pr_check = await forgejo_api("GET", repo_path(f"pulls/{pr_number}")) if pr_check and pr_check.get("merged"): return True, "already merged" # Distinguish transient from permanent failures (Ganymede) if pr_check and not pr_check.get("mergeable", True): # PR not mergeable — branch diverged or conflict. Rebase needed, not retry. return False, "merge rejected: PR not mergeable (needs rebase)" if attempt < len(delays): logger.info("PR #%d: merge attempt %d failed (transient), retrying in %.0fs", pr_number, attempt, delays[attempt] if attempt < len(delays) else 0) return False, "Forgejo merge API failed after 4 attempts (transient)" async def _delete_remote_branch(branch: str): """Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch) If DELETE fails, log and move on — stale branch is cosmetic, stale merge is operational. """ result = await forgejo_api( "DELETE", repo_path(f"branches/{branch}"), ) if result is None: logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch) # --- Contributor attribution --- def _is_knowledge_pr(diff: str) -> bool: """Check if a PR touches knowledge files (claims, decisions, core, foundations). Knowledge PRs get full CI attribution weight. Pipeline-only PRs (inbox, entities, agents, archive) get zero CI weight. Mixed PRs count as knowledge — if a PR adds a claim, it gets attribution even if it also moves source files. Knowledge takes priority. (Ganymede review) """ knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/") for line in diff.split("\n"): if line.startswith("+++ b/") or line.startswith("--- a/"): path = line.split("/", 1)[1] if "/" in line else "" if any(path.startswith(p) for p in knowledge_prefixes): return True return False def _refine_commit_type(diff: str, branch_commit_type: str) -> str: """Refine commit_type from diff content when branch prefix is ambiguous. Branch prefix gives initial classification (extract, research, entity, etc.). For 'extract' branches, diff content can distinguish: - challenge: adds challenged_by edges to existing claims - enrich: modifies existing claim frontmatter without new files - extract: creates new claim files (default for extract branches) Only refines 'extract' type — other branch types (research, entity, reweave, fix) are already specific enough. """ if branch_commit_type != "extract": return branch_commit_type new_files = 0 modified_files = 0 has_challenge_edge = False in_diff_header = False current_is_new = False for line in diff.split("\n"): if line.startswith("diff --git"): in_diff_header = True current_is_new = False elif line.startswith("new file"): current_is_new = True elif line.startswith("+++ b/"): path = line[6:] if any(path.startswith(p) for p in ("domains/", "core/", "foundations/")): if current_is_new: new_files += 1 else: modified_files += 1 in_diff_header = False elif line.startswith("+") and not line.startswith("+++"): if "challenged_by:" in line or "challenges:" in line: has_challenge_edge = True if has_challenge_edge and new_files == 0: return "challenge" if modified_files > 0 and new_files == 0: return "enrich" return "extract" async def _record_contributor_attribution(conn, pr_number: int, branch: str): """Record contributor attribution after a successful merge. Parses git trailers and claim frontmatter to identify contributors and their roles. Upserts into contributors table. Refines commit_type from diff content. Pipeline-only PRs (no knowledge files) are skipped. """ import re as _re from datetime import date as _date, datetime as _dt today = _date.today().isoformat() # Get the PR diff to parse claim frontmatter for attribution blocks diff = await get_pr_diff(pr_number) if not diff: return # Pipeline-only PRs (inbox, entities, agents) don't count toward CI if not _is_knowledge_pr(diff): logger.info("PR #%d: pipeline-only commit — skipping CI attribution", pr_number) return # Refine commit_type from diff content (branch prefix may be too broad) row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone() branch_type = row["commit_type"] if row and row["commit_type"] else "extract" refined_type = _refine_commit_type(diff, branch_type) if refined_type != branch_type: conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number)) logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type) # Parse Pentagon-Agent trailer from branch commit messages agents_found: set[str] = set() rc, log_output = await _git( "log", f"origin/main..origin/{branch}", "--format=%b%n%N", timeout=10, ) if rc == 0: for match in _re.finditer(r"Pentagon-Agent:\s*(\S+)\s*<([^>]+)>", log_output): agent_name = match.group(1).lower() agent_uuid = match.group(2) _upsert_contributor( conn, agent_name, agent_uuid, "extractor", today, ) agents_found.add(agent_name) # Parse attribution blocks from claim frontmatter in diff # Look for added lines with attribution YAML current_role = None for line in diff.split("\n"): if not line.startswith("+") or line.startswith("+++"): continue stripped = line[1:].strip() # Detect role sections in attribution block for role in ("sourcer", "extractor", "challenger", "synthesizer", "reviewer"): if stripped.startswith(f"{role}:"): current_role = role break # Extract handle from attribution entries handle_match = _re.match(r'-\s*handle:\s*["\']?([^"\']+)["\']?', stripped) if handle_match and current_role: handle = handle_match.group(1).strip().lower() agent_id_match = _re.search(r'agent_id:\s*["\']?([^"\']+)', stripped) agent_id = agent_id_match.group(1).strip() if agent_id_match else None _upsert_contributor(conn, handle, agent_id, current_role, today) # Fallback: if no attribution block found, credit the branch agent as extractor if not agents_found: # Try to infer agent from branch name (e.g., "extract/2026-03-05-...") # The PR's agent field in SQLite is also available row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone() if row and row["agent"]: _upsert_contributor(conn, row["agent"].lower(), None, "extractor", today) # Increment claims_merged for all contributors on this PR # (handled inside _upsert_contributor via the role counts) def _upsert_contributor( conn, handle: str, agent_id: str | None, role: str, date_str: str, ): """Upsert a contributor record, incrementing the appropriate role count.""" import json as _json from datetime import datetime as _dt role_col = f"{role}_count" if role_col not in ( "sourcer_count", "extractor_count", "challenger_count", "synthesizer_count", "reviewer_count", ): logger.warning("Unknown contributor role: %s", role) return existing = conn.execute( "SELECT handle FROM contributors WHERE handle = ?", (handle,) ).fetchone() if existing: conn.execute( f"""UPDATE contributors SET {role_col} = {role_col} + 1, claims_merged = claims_merged + CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END, last_contribution = ?, updated_at = datetime('now') WHERE handle = ?""", (role, date_str, handle), ) else: conn.execute( f"""INSERT INTO contributors (handle, agent_id, first_contribution, last_contribution, {role_col}, claims_merged) VALUES (?, ?, ?, ?, 1, CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END)""", (handle, agent_id, date_str, date_str, role), ) # Recalculate tier _recalculate_tier(conn, handle) def _recalculate_tier(conn, handle: str): """Recalculate contributor tier based on config rules.""" from datetime import date as _date, datetime as _dt row = conn.execute( "SELECT claims_merged, challenges_survived, first_contribution, tier FROM contributors WHERE handle = ?", (handle,), ).fetchone() if not row: return current_tier = row["tier"] claims_merged = row["claims_merged"] or 0 challenges_survived = row["challenges_survived"] or 0 first_contribution = row["first_contribution"] days_since_first = 0 if first_contribution: try: first_date = _dt.strptime(first_contribution, "%Y-%m-%d").date() days_since_first = (_date.today() - first_date).days except ValueError: pass # Check veteran first (higher tier) vet_rules = config.CONTRIBUTOR_TIER_RULES["veteran"] if (claims_merged >= vet_rules["claims_merged"] and days_since_first >= vet_rules["min_days_since_first"] and challenges_survived >= vet_rules["challenges_survived"]): new_tier = "veteran" elif claims_merged >= config.CONTRIBUTOR_TIER_RULES["contributor"]["claims_merged"]: new_tier = "contributor" else: new_tier = "new" if new_tier != current_tier: conn.execute( "UPDATE contributors SET tier = ?, updated_at = datetime('now') WHERE handle = ?", (new_tier, handle), ) logger.info("Contributor %s: tier %s → %s", handle, current_tier, new_tier) db.audit( conn, "contributor", "tier_change", json.dumps({"handle": handle, "from": current_tier, "to": new_tier}), ) # --- Source archiving after merge (Ganymede review: closes near-duplicate loop) --- # Accumulates source moves during a merge cycle, batch-committed at the end _pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path) def _update_source_frontmatter_status(path: str, new_status: str): """Update the status field in a source file's frontmatter. (Ganymede: 5 lines)""" import re as _re try: text = open(path).read() text = _re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=_re.MULTILINE) open(path, "w").write(text) except Exception as e: logger.warning("Failed to update source status in %s: %s", path, e) async def _embed_merged_claims(main_sha: str, branch_sha: str): """Embed new/changed claim files from a merged PR into Qdrant. Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip) to find ALL changed files across the entire branch, not just the last commit. Also deletes Qdrant vectors for files removed by the branch. Non-fatal — embedding failure does not block the merge pipeline. """ try: # --- Embed added/changed files --- rc, diff_out = await _git( "diff", "--name-only", "--diff-filter=ACMR", main_sha, branch_sha, cwd=str(config.MAIN_WORKTREE), timeout=10, ) if rc != 0: logger.warning("embed: diff failed (rc=%d), skipping", rc) return embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"} md_files = [ f for f in diff_out.strip().split("\n") if f.endswith(".md") and any(f.startswith(d) for d in embed_dirs) and not f.split("/")[-1].startswith("_") ] embedded = 0 for fpath in md_files: full_path = config.MAIN_WORKTREE / fpath if not full_path.exists(): continue proc = await asyncio.create_subprocess_exec( "python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) if proc.returncode == 0 and b"OK" in stdout: embedded += 1 else: logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200]) if embedded: logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files)) # --- Delete vectors for removed files (Ganymede: stale vector cleanup) --- rc, del_out = await _git( "diff", "--name-only", "--diff-filter=D", main_sha, branch_sha, cwd=str(config.MAIN_WORKTREE), timeout=10, ) if rc == 0 and del_out.strip(): deleted_files = [ f for f in del_out.strip().split("\n") if f.endswith(".md") and any(f.startswith(d) for d in embed_dirs) ] if deleted_files: import hashlib point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files] try: import urllib.request req = urllib.request.Request( "http://localhost:6333/collections/teleo-claims/points/delete", data=json.dumps({"points": point_ids}).encode(), headers={"Content-Type": "application/json"}, method="POST", ) urllib.request.urlopen(req, timeout=10) logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids)) except Exception: logger.warning("embed: failed to delete stale vectors (non-fatal)") except Exception: logger.exception("embed: post-merge embedding failed (non-fatal)") def _archive_source_for_pr(branch: str, domain: str, merged: bool = True): """Move source from queue/ to archive/{domain}/ after PR merge or close. Only handles extract/ branches (Ganymede: skip research sessions). Updates frontmatter: 'processed' for merged, 'rejected' for closed. Accumulates moves for batch commit at end of merge cycle. """ if not branch.startswith("extract/"): return source_slug = branch.replace("extract/", "", 1) main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md") archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown") archive_path = os.path.join(archive_dir, f"{source_slug}.md") # Already in archive? Delete queue duplicate if os.path.exists(archive_path): if os.path.exists(queue_path): try: os.remove(queue_path) _pending_source_moves.append((queue_path, "deleted")) logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain) except Exception as e: logger.warning("Source dedup failed: %s", e) return # Move from queue to archive if os.path.exists(queue_path): # Update frontmatter before moving (Ganymede: distinguish merged vs rejected) _update_source_frontmatter_status(queue_path, "processed" if merged else "rejected") os.makedirs(archive_dir, exist_ok=True) try: shutil.move(queue_path, archive_path) _pending_source_moves.append((queue_path, archive_path)) logger.info("Source archived: queue/%s → archive/%s/ (status=%s)", source_slug, domain, "processed" if merged else "rejected") except Exception as e: logger.warning("Source archive failed: %s", e) async def _commit_source_moves(): """Batch commit accumulated source moves. Called at end of merge cycle. Rhea review: fetch+reset before touching files, use main_worktree_lock, crash gap is self-healing (reset --hard reverts uncommitted moves). """ if not _pending_source_moves: return main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" count = len(_pending_source_moves) _pending_source_moves.clear() # Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C) try: async with async_main_worktree_lock(timeout=10): # Sync worktree with remote (Rhea: fetch+reset, not pull) await _git("fetch", "origin", "main", cwd=main_dir, timeout=30) await _git("reset", "--hard", "origin/main", cwd=main_dir, timeout=30) await _git("add", "-A", "inbox/", cwd=main_dir) rc, out = await _git( "commit", "-m", f"pipeline: archive {count} source(s) post-merge\n\n" f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>", cwd=main_dir, ) if rc != 0: if "nothing to commit" in out: return logger.warning("Source archive commit failed: %s", out) return for attempt in range(3): await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30) rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30) if rc_push == 0: logger.info("Committed + pushed %d source archive moves", count) return await asyncio.sleep(2) logger.warning("Failed to push source archive moves after 3 attempts") await _git("reset", "--hard", "origin/main", cwd=main_dir) except TimeoutError: logger.warning("Source archive commit skipped: worktree lock timeout") # --- Domain merge task --- async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: """Process the merge queue for a single domain. Returns (succeeded, failed).""" succeeded = 0 failed = 0 while True: async with _domain_locks[domain]: pr = await _claim_next_pr(conn, domain) if not pr: break # No more approved PRs for this domain pr_num = pr["number"] branch = pr["branch"] logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain) try: # Cherry-pick onto fresh main (replaces rebase-retry — Leo+Cory directive) # Extraction commits ADD new files, so cherry-pick applies cleanly. # Rebase failed ~23% of the time due to main moving during replay. pick_ok, pick_msg = await asyncio.wait_for( _cherry_pick_onto_main(branch), timeout=MERGE_TIMEOUT_SECONDS, ) except asyncio.TimeoutError: logger.error( "PR #%d merge timed out after %ds — resetting to conflict (Rhea)", pr_num, MERGE_TIMEOUT_SECONDS ) conn.execute( "UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?", (f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num), ) db.audit(conn, "merge", "timeout", json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS})) failed += 1 continue if not pick_ok: # Cherry-pick failed — this is a genuine conflict (not a race condition). # No retry needed: cherry-pick onto fresh main means main can't have moved. logger.warning("PR #%d cherry-pick failed: %s", pr_num, pick_msg) conn.execute( "UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?", (pick_msg[:500], pr_num), ) db.audit(conn, "merge", "cherry_pick_failed", json.dumps({"pr": pr_num, "error": pick_msg[:200]})) failed += 1 continue # Local ff-merge: push cherry-picked branch as main (Rhea's approach, Leo+Rhea: local primary) # The branch was just cherry-picked onto origin/main, # so origin/{branch} is a descendant of origin/main. Push it as main. await _git("fetch", "origin", branch, timeout=15) rc, main_sha = await _git("rev-parse", "origin/main") main_sha = main_sha.strip() if rc == 0 else "" rc, branch_sha = await _git("rev-parse", f"origin/{branch}") branch_sha = branch_sha.strip() if rc == 0 else "" merge_ok = False merge_msg = "" if branch_sha: rc, out = await _git( "push", f"--force-with-lease=main:{main_sha}", "origin", f"{branch_sha}:main", timeout=30, ) if rc == 0: merge_ok = True merge_msg = f"merged (local ff-push, SHA: {branch_sha[:8]})" # Close PR on Forgejo with merge SHA comment leo_token = get_agent_token("leo") await forgejo_api( "POST", repo_path(f"issues/{pr_num}/comments"), {"body": f"Merged locally.\nMerge SHA: `{branch_sha}`\nBranch: `{branch}`"}, ) await forgejo_api( "PATCH", repo_path(f"pulls/{pr_num}"), {"state": "closed"}, token=leo_token, ) else: merge_msg = f"local ff-push failed: {out[:200]}" else: merge_msg = f"could not resolve origin/{branch}" if not merge_ok: logger.error("PR #%d merge failed: %s", pr_num, merge_msg) conn.execute( "UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?", (merge_msg[:500], pr_num), ) db.audit(conn, "merge", "merge_failed", json.dumps({"pr": pr_num, "error": merge_msg[:200]})) failed += 1 continue # Success — update status and cleanup conn.execute( """UPDATE prs SET status = 'merged', merged_at = datetime('now'), last_error = NULL WHERE number = ?""", (pr_num,), ) db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch})) logger.info("PR #%d merged successfully", pr_num) # Record contributor attribution try: await _record_contributor_attribution(conn, pr_num, branch) except Exception: logger.exception("PR #%d: contributor attribution failed (non-fatal)", pr_num) # Archive source file (closes near-duplicate loop — Ganymede review) _archive_source_for_pr(branch, domain) # Embed new/changed claims into Qdrant (non-fatal) await _embed_merged_claims(main_sha, branch_sha) # Cascade: notify agents whose beliefs/positions depend on changed claims try: cascaded = await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE) if cascaded: logger.info("PR #%d: %d cascade notifications sent", pr_num, cascaded) except Exception: logger.exception("PR #%d: cascade check failed (non-fatal)", pr_num) # Delete remote branch immediately (Ganymede Q4) await _delete_remote_branch(branch) # Prune local worktree metadata await _git("worktree", "prune") succeeded += 1 return succeeded, failed # --- Main entry point --- async def _reconcile_db_state(conn): """Reconcile pipeline DB against Forgejo's actual PR state. Fixes ghost PRs: DB says 'conflict' or 'open' but Forgejo says merged/closed. Also detects deleted branches (rev-parse failures). (Leo's structural fix #1) Run at the start of each merge cycle. """ stale = conn.execute( "SELECT number, branch, status FROM prs WHERE status IN ('conflict', 'open', 'reviewing', 'approved')" ).fetchall() if not stale: return reconciled = 0 for row in stale: pr_number = row["number"] branch = row["branch"] db_status = row["status"] # Check Forgejo PR state pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}")) if not pr_info: continue forgejo_state = pr_info.get("state", "") is_merged = pr_info.get("merged", False) if is_merged and db_status != "merged": conn.execute( "UPDATE prs SET status = 'merged', merged_at = datetime('now') WHERE number = ?", (pr_number,), ) reconciled += 1 continue if forgejo_state == "closed" and not is_merged and db_status not in ("closed",): # Agent PRs get merged via git push (not Forgejo merge API), so # Forgejo shows merged=False. Check if branch content is on main. if db_status == "approved" and branch: # Agent merges are ff-push — no merge commit exists. # Check if branch tip is an ancestor of main (content is on main). rc, branch_sha = await _git( "rev-parse", f"origin/{branch}", timeout=10, ) if rc == 0 and branch_sha.strip(): rc2, _ = await _git( "merge-base", "--is-ancestor", branch_sha.strip(), "origin/main", timeout=10, ) if rc2 == 0: conn.execute( "UPDATE prs SET status = 'merged', merged_at = datetime('now') WHERE number = ?", (pr_number,), ) logger.info("Reconciled PR #%d: agent-merged (branch tip on main)", pr_number) reconciled += 1 continue conn.execute( "UPDATE prs SET status = 'closed', last_error = 'reconciled: closed on Forgejo' WHERE number = ?", (pr_number,), ) reconciled += 1 continue # Ghost PR detection: branch deleted but PR still open in DB (Fix #2) # Ganymede: rc != 0 means remote unreachable — skip, don't close if db_status in ("open", "reviewing") and branch: rc, ls_out = await _git("ls-remote", "--heads", "origin", branch, timeout=10) if rc != 0: logger.warning("ls-remote failed for %s — skipping ghost check", branch) continue if not ls_out.strip(): # Branch gone — close PR on Forgejo and in DB (Ganymede: don't leave orphans) await forgejo_api( "PATCH", repo_path(f"pulls/{pr_number}"), body={"state": "closed"}, ) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), body={"body": "Auto-closed: branch deleted from remote."}, ) conn.execute( "UPDATE prs SET status = 'closed', last_error = 'reconciled: branch deleted' WHERE number = ?", (pr_number,), ) logger.info("Ghost PR #%d: branch %s deleted, closing", pr_number, branch) reconciled += 1 if reconciled: logger.info("Reconciled %d stale PRs against Forgejo state", reconciled) MAX_CONFLICT_REBASE_ATTEMPTS = 3 async def _handle_permanent_conflicts(conn) -> int: """Close conflict_permanent PRs and file their sources correctly. When a PR fails rebase 3x, the claims are already on main from the first successful extraction. The source should live in archive/{domain}/ (one copy). Any duplicate in queue/ gets deleted. No requeuing — breaks the infinite loop. Hygiene (Cory): one source file, one location, no duplicates. Reviewed by Ganymede: commit moves, use shutil.move, batch commit at end. """ rows = conn.execute( """SELECT number, branch, domain FROM prs WHERE status = 'conflict_permanent' ORDER BY number ASC""" ).fetchall() if not rows: return 0 handled = 0 files_changed = False main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" for row in rows: pr_number = row["number"] branch = row["branch"] domain = row["domain"] or "unknown" # Close PR on Forgejo await forgejo_api( "PATCH", repo_path(f"pulls/{pr_number}"), body={"state": "closed"}, ) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), body={"body": ( "Closed by conflict auto-resolver: rebase failed 3 times (enrichment conflict). " "Claims already on main from prior extraction. Source filed in archive." )}, ) await _delete_remote_branch(branch) # File the source: one copy in archive/{domain}/, delete duplicates source_slug = branch.replace("extract/", "", 1) if branch.startswith("extract/") else None if source_slug: filename = f"{source_slug}.md" archive_dir = os.path.join(main_dir, "inbox", "archive", domain) archive_path = os.path.join(archive_dir, filename) queue_path = os.path.join(main_dir, "inbox", "queue", filename) already_archived = os.path.exists(archive_path) if already_archived: if os.path.exists(queue_path): try: os.remove(queue_path) logger.info("PR #%d: deleted queue duplicate %s (already in archive/%s)", pr_number, filename, domain) files_changed = True except Exception as e: logger.warning("PR #%d: failed to delete queue duplicate: %s", pr_number, e) else: logger.info("PR #%d: source already in archive/%s, no cleanup needed", pr_number, domain) else: if os.path.exists(queue_path): os.makedirs(archive_dir, exist_ok=True) try: shutil.move(queue_path, archive_path) logger.info("PR #%d: filed source to archive/%s: %s", pr_number, domain, filename) files_changed = True except Exception as e: logger.warning("PR #%d: failed to file source: %s", pr_number, e) else: logger.warning("PR #%d: source not found in queue or archive for %s", pr_number, filename) # Clear batch-state marker state_marker = f"/opt/teleo-eval/batch-state/{source_slug}.done" try: if os.path.exists(state_marker): os.remove(state_marker) except Exception: pass conn.execute( "UPDATE prs SET status = 'closed', last_error = 'conflict_permanent: closed + filed in archive' WHERE number = ?", (pr_number,), ) handled += 1 logger.info("Permanent conflict handled: PR #%d closed, source filed", pr_number) # Batch commit source moves to main (Ganymede: follow entity_batch pattern) if files_changed: await _git("add", "-A", "inbox/", cwd=main_dir) rc, out = await _git( "commit", "-m", f"pipeline: archive {handled} conflict-closed source(s)\n\n" f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>", cwd=main_dir, ) if rc == 0: # Push with pull-rebase retry (entity_batch pattern) for attempt in range(3): await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30) rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30) if rc_push == 0: logger.info("Committed + pushed source archive moves for %d PRs", handled) break await asyncio.sleep(2) else: logger.warning("Failed to push source archive moves after 3 attempts") await _git("reset", "--hard", "origin/main", cwd=main_dir) if handled: logger.info("Handled %d permanent conflict PRs (closed + filed)", handled) return handled async def _retry_conflict_prs(conn) -> tuple[int, int]: """Retry conflict PRs via cherry-pick onto fresh main. Design: Ganymede (extend merge stage), Rhea (safety guards), Leo (re-eval required). - Pick up PRs with status='conflict' and both approvals - Cherry-pick extraction commits onto fresh branch from origin/main - If cherry-pick succeeds: force-push, reset to 'open' with verdicts cleared for re-eval - If cherry-pick fails: increment attempt counter, leave as 'conflict' - After MAX_CONFLICT_REBASE_ATTEMPTS failures: mark 'conflict_permanent' - Skip branches with new commits since conflict was set (Rhea: someone is working on it) """ rows = conn.execute( """SELECT number, branch, conflict_rebase_attempts FROM prs WHERE status = 'conflict' AND COALESCE(conflict_rebase_attempts, 0) < ? ORDER BY number ASC""", (MAX_CONFLICT_REBASE_ATTEMPTS,), ).fetchall() if not rows: return 0, 0 resolved = 0 failed = 0 for row in rows: pr_number = row["number"] branch = row["branch"] attempts = row["conflict_rebase_attempts"] or 0 logger.info("Conflict retry [%d/%d] PR #%d branch=%s", attempts + 1, MAX_CONFLICT_REBASE_ATTEMPTS, pr_number, branch) # Fetch latest remote state await _git("fetch", "origin", branch, timeout=30) await _git("fetch", "origin", "main", timeout=30) # Attempt cherry-pick onto fresh main (replaces rebase — Leo+Cory directive) ok, msg = await _cherry_pick_onto_main(branch) if ok: # Rebase succeeded — reset for re-eval (Ganymede: approvals are stale after rebase) conn.execute( """UPDATE prs SET status = 'open', leo_verdict = 'pending', domain_verdict = 'pending', eval_attempts = 0, conflict_rebase_attempts = ? WHERE number = ?""", (attempts + 1, pr_number), ) logger.info("Conflict resolved: PR #%d rebased successfully, reset for re-eval", pr_number) resolved += 1 else: new_attempts = attempts + 1 if new_attempts >= MAX_CONFLICT_REBASE_ATTEMPTS: conn.execute( """UPDATE prs SET status = 'conflict_permanent', conflict_rebase_attempts = ?, last_error = ? WHERE number = ?""", (new_attempts, f"rebase failed {MAX_CONFLICT_REBASE_ATTEMPTS}x: {msg[:200]}", pr_number), ) logger.warning("Conflict permanent: PR #%d failed %d rebase attempts: %s", pr_number, new_attempts, msg[:100]) else: conn.execute( """UPDATE prs SET conflict_rebase_attempts = ?, last_error = ? WHERE number = ?""", (new_attempts, f"rebase attempt {new_attempts}: {msg[:200]}", pr_number), ) logger.info("Conflict retry failed: PR #%d attempt %d/%d: %s", pr_number, new_attempts, MAX_CONFLICT_REBASE_ATTEMPTS, msg[:100]) failed += 1 if resolved or failed: logger.info("Conflict retry: %d resolved, %d failed", resolved, failed) return resolved, failed async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one merge cycle across all domains. 0. Reconcile DB state against Forgejo (catch ghost PRs) 0.5. Retry conflict PRs (rebase onto current main) 1. Discover external PRs (multiplayer v1) 2. Find all domains with approved PRs 3. Launch one async task per domain (cross-domain parallel, same-domain serial) """ # Step 0: Reconcile stale DB entries await _reconcile_db_state(conn) # Step 0.5: Retry conflict PRs (Ganymede: before normal merge, same loop) await _retry_conflict_prs(conn) # Step 0.6: Handle permanent conflicts (close + requeue for re-extraction) await _handle_permanent_conflicts(conn) # Step 1: Discover external PRs await discover_external_prs(conn) # Step 2: Find domains with approved work rows = conn.execute("SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL").fetchall() domains = [r["domain"] for r in rows] # Also check for NULL-domain PRs (human PRs with undetected domain) null_domain = conn.execute("SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL").fetchone() if null_domain and null_domain["c"] > 0: logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", null_domain["c"]) if not domains: return 0, 0 # Step 3: Merge all domains concurrently tasks = [_merge_domain_queue(conn, domain) for domain in domains] results = await asyncio.gather(*tasks, return_exceptions=True) total_succeeded = 0 total_failed = 0 for i, result in enumerate(results): if isinstance(result, Exception): logger.exception("Domain %s merge failed with exception", domains[i]) total_failed += 1 else: s, f = result total_succeeded += s total_failed += f if total_succeeded or total_failed: logger.info( "Merge cycle: %d succeeded, %d failed across %d domains", total_succeeded, total_failed, len(domains) ) # Batch commit source moves (Ganymede: one commit per cycle, not per PR) await _commit_source_moves() return total_succeeded, total_failed