teleo-infrastructure/lib/merge.py
m3taversal 26a8b15f56
Some checks are pending
CI / lint-and-test (push) Waiting to run
fix: skip merge commits in cherry-pick to prevent fork workflow content loss
External contributors who run `git merge main` create merge commits that
cherry-pick can't handle without -m flag. --no-merges filters these out.
Added detection for branches with only merge commits but real content diff.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 18:04:45 +01:00

1217 lines
52 KiB
Python

"""Merge stage — domain-serialized priority queue with rebase-before-merge.
Design reviewed by Ganymede (round 2) and Rhea. Key decisions:
- Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery)
- Rebase-before-merge with pinned force-with-lease SHA (Ganymede)
- Priority queue: COALESCE(p.priority, s.priority, 'medium') — PR > source > default
- Human PRs default to 'high', not 'critical' (Ganymede — prevents DoS on pipeline)
- 5-minute merge timeout — force-reset to 'conflict' (Rhea)
- Ack comment on human PR discovery (Rhea)
- Pagination on all Forgejo list endpoints (Ganymede standing rule)
"""
import asyncio
import json
import logging
import os
import random
import re
import shutil
from collections import defaultdict
from . import config, db
from .db import classify_branch
from .contributor import record_contributor_attribution
from .dedup import dedup_evidence_blocks
from .domains import detect_domain_from_branch
from .forgejo import api as forgejo_api
from .pr_state import close_pr, mark_conflict, mark_conflict_permanent, mark_merged, reopen_pr
# Pipeline-owned branch prefixes — only these get auto-merged.
# Agent branches (theseus/*, rio/*, astra/*, etc.) stay approved but are NOT
# rebased/force-pushed/auto-merged. Agents merge their own PRs.
# Derived from BRANCH_PREFIX_MAP where agent in ("pipeline", "epimetheus").
# (Leo directive: PRs #2141, #157, #2142, #2180 were orphaned by pipeline rebase)
PIPELINE_OWNED_PREFIXES = ("extract/", "ingestion/", "epimetheus/", "reweave/", "fix/")
# Safety assertion: agent branches MUST NOT be in PIPELINE_OWNED_PREFIXES.
# Auto-merge on eval approval bypasses Leo's review gate.
# Agent PRs use auto_merge flag instead (set by evaluate.py after two-reviewer approval).
_AGENT_NAMES = ("theseus", "rio", "astra", "vida", "clay", "leo", "argus", "oberon", "rhea", "ganymede")
for _prefix in PIPELINE_OWNED_PREFIXES:
for _agent in _AGENT_NAMES:
assert not _prefix.startswith(f"{_agent}/"), \
f"FATAL: Agent prefix '{_agent}/' found in PIPELINE_OWNED_PREFIXES — this bypasses Leo's review gate"
from .cascade import cascade_after_merge
from .cross_domain import cross_domain_after_merge
from .forgejo import get_agent_token, get_pr_diff, repo_path
logger = logging.getLogger("pipeline.merge")
# In-memory domain locks — fast path, lost on crash (durable layer is prs.status)
_domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# Bare repo lock — serializes worktree add/remove to prevent config.lock contention
_bare_repo_lock = asyncio.Lock()
# Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea)
MERGE_TIMEOUT_SECONDS = 300 # 5 minutes
# --- Git helpers ---
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
"""Run a git command async. Returns (returncode, stdout+stderr)."""
proc = await asyncio.create_subprocess_exec(
"git",
*args,
cwd=cwd or str(config.REPO_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, f"git {args[0]} timed out after {timeout}s"
output = (stdout or b"").decode().strip()
if stderr:
output += "\n" + stderr.decode().strip()
return proc.returncode, output
# --- PR Discovery (Multiplayer v1) ---
async def discover_external_prs(conn) -> int:
"""Scan Forgejo for open PRs not tracked in SQLite.
Human PRs (non-pipeline author) get priority 'high' and origin 'human'.
Critical is reserved for explicit human override only. (Ganymede)
Pagination on all Forgejo list endpoints. (Ganymede standing rule #5)
"""
known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()}
discovered = 0
page = 1
while True:
prs = await forgejo_api(
"GET",
repo_path(f"pulls?state=open&limit=50&page={page}"),
)
if not prs:
break
for pr in prs:
if pr["number"] not in known:
# Detect origin: pipeline agents have per-agent Forgejo users
pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"}
author = pr.get("user", {}).get("login", "")
is_pipeline = author.lower() in pipeline_users
origin = "pipeline" if is_pipeline else "human"
priority = "high" if origin == "human" else None
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
agent, commit_type = classify_branch(pr["head"]["ref"])
# For human PRs, submitted_by is the Forgejo author.
# For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by).
submitted_by = author if origin == "human" else None
conn.execute(
"""INSERT OR IGNORE INTO prs
(number, branch, status, origin, priority, domain, agent, commit_type,
prompt_version, pipeline_version, submitted_by)
VALUES (?, ?, 'open', ?, ?, ?, ?, ?, ?, ?, ?)""",
(pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type, config.PROMPT_VERSION, config.PIPELINE_VERSION, submitted_by),
)
db.audit(
conn,
"merge",
"pr_discovered",
json.dumps(
{
"pr": pr["number"],
"origin": origin,
"author": pr.get("user", {}).get("login"),
"priority": priority or "inherited",
}
),
)
# Ack comment on human PRs so contributor feels acknowledged (Rhea)
if origin == "human":
await _post_ack_comment(pr["number"])
discovered += 1
if len(prs) < 50:
break # Last page
page += 1
if discovered:
logger.info("Discovered %d external PRs", discovered)
return discovered
async def _post_ack_comment(pr_number: int):
"""Post acknowledgment comment on human-submitted PR. (Rhea)
Contributor should feel acknowledged immediately, not wonder if
their PR disappeared into a void.
"""
body = (
"Thanks for the contribution! Your PR is queued for evaluation "
"(priority: high). Expected review time: ~5 minutes.\n\n"
"_This is an automated message from the Teleo pipeline._"
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": body},
)
# --- Merge operations ---
async def _claim_next_pr(conn, domain: str) -> dict | None:
"""Claim the next approved PR for a domain via atomic UPDATE.
Priority inheritance: COALESCE(p.priority, s.priority, 'medium')
- Explicit PR priority (human PRs) > source priority (pipeline) > default medium
- NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2)
- This is intentional: unclassified PRs don't jump ahead of triaged ones
(Rhea: document the precedence for future maintainers)
NOT EXISTS enforces domain serialization in SQL — defense-in-depth even if
asyncio.Lock is bypassed. (Ganymede: approved)
"""
# Build prefix filter for pipeline-owned branches only
# Agent branches stay approved but are NOT auto-merged (Leo: PRs #2141, #157, #2142, #2180)
prefix_clauses = " OR ".join("p.branch LIKE ?" for _ in PIPELINE_OWNED_PREFIXES)
prefix_params = [f"{pfx}%" for pfx in PIPELINE_OWNED_PREFIXES]
row = conn.execute(
f"""UPDATE prs SET status = 'merging', last_attempt = datetime('now')
WHERE number = (
SELECT p.number FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'approved'
AND p.domain = ?
AND ({prefix_clauses} OR p.auto_merge = 1)
AND NOT EXISTS (
SELECT 1 FROM prs p2
WHERE p2.domain = p.domain
AND p2.status = 'merging'
)
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
-- Dependency ordering: PRs with fewer broken wiki links merge first.
-- "Creator" PRs (0 broken links) land before "consumer" PRs that
-- reference them, naturally resolving the dependency chain. (Rhea+Ganymede)
CASE WHEN p.eval_issues LIKE '%broken_wiki_links%' THEN 1 ELSE 0 END,
p.created_at ASC
LIMIT 1
)
RETURNING number, source_path, branch, domain""",
(domain, *prefix_params),
).fetchone()
return dict(row) if row else None
async def _dedup_enriched_files(worktree_path: str) -> int:
"""Scan rebased worktree for duplicate evidence blocks and dedup them.
Returns count of files fixed.
"""
# Get list of modified claim files in this branch vs origin/main
rc, out = await _git("diff", "--name-only", "origin/main..HEAD", cwd=worktree_path)
if rc != 0:
return 0
fixed = 0
for fpath in out.strip().split("\n"):
fpath = fpath.strip()
if not fpath or not fpath.endswith(".md"):
continue
# Only process claim files (domains/, core/, foundations/)
if not any(fpath.startswith(p) for p in ("domains/", "core/", "foundations/")):
continue
full_path = os.path.join(worktree_path, fpath)
if not os.path.exists(full_path):
continue
with open(full_path, "r") as f:
content = f.read()
deduped = dedup_evidence_blocks(content)
if deduped != content:
with open(full_path, "w") as f:
f.write(deduped)
# Stage the fix
await _git("add", fpath, cwd=worktree_path)
fixed += 1
if fixed > 0:
# Amend the last commit to include dedup fixes (no new commit)
await _git(
"-c", "core.editor=true", "commit", "--amend", "--no-edit",
cwd=worktree_path, timeout=30,
)
logger.info("Deduped evidence blocks in %d file(s) after rebase", fixed)
return fixed
async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
"""Cherry-pick extraction commits onto a fresh branch from main.
Replaces rebase-retry: extraction commits ADD new files, so cherry-pick
applies cleanly ~99% of the time. For enrichments (editing existing files),
cherry-pick reports the exact conflict for human review.
Leo's manual fix pattern (PRs #2178, #2141, #157, #2142):
1. git checkout -b clean-branch main
2. git cherry-pick <extraction-commit(s)>
3. Merge to main
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
clean_branch = f"_clean/{branch.replace('/', '-')}"
# Fetch latest state — separate calls to avoid refspec issues with long branch names
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Check if already up to date
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
rc2, main_sha = await _git("rev-parse", "origin/main")
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
return True, "already up to date"
# Get extraction commits (oldest first), skip merge commits from fork workflows
rc, commits_out = await _git(
"log", f"origin/main..origin/{branch}", "--no-merges", "--format=%H", "--reverse",
timeout=10,
)
if rc != 0 or not commits_out.strip():
# Check if branch has content but only merge commits (fork workflow edge case)
rc_diff, diff_out = await _git(
"diff", "--stat", f"origin/main...origin/{branch}", timeout=10,
)
if rc_diff == 0 and diff_out.strip():
db.audit("merge_manual_needed", detail=json.dumps({
"pr": pr_number, "branch": branch,
"reason": "branch has content diff but only merge commits — needs manual merge",
}))
return False, f"branch {branch} has only merge commits but contains content — flagged for manual merge"
return False, f"no commits found on {branch}"
commit_list = [c.strip() for c in commits_out.strip().split("\n") if c.strip()]
# Serialize worktree add/remove — concurrent calls hit bare repo config.lock
async with _bare_repo_lock:
await _git("branch", "-D", clean_branch)
rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
# Cherry-pick each extraction commit
dropped_entities: set[str] = set()
picked_count = 0
for commit_sha in commit_list:
rc, out = await _git("cherry-pick", commit_sha, cwd=worktree_path, timeout=60)
if rc != 0 and "empty" in out.lower():
# Content already on main — skip this commit
await _git("cherry-pick", "--skip", cwd=worktree_path)
logger.info("Cherry-pick %s: empty (already on main), skipping", commit_sha[:8])
continue
picked_count += 1
if rc != 0:
# Check if conflict is entity-only (same auto-resolution as before)
rc_ls, conflicting = await _git(
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path
)
conflict_files = [
f.strip() for f in conflicting.split("\n") if f.strip()
] if rc_ls == 0 else []
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
# Entity conflicts: take main's version (entities are recoverable)
# In cherry-pick: --ours = branch we're ON (clean branch from origin/main)
# --theirs = commit being cherry-picked (extraction branch)
for cf in conflict_files:
await _git("checkout", "--ours", cf, cwd=worktree_path)
await _git("add", cf, cwd=worktree_path)
dropped_entities.update(conflict_files)
rc_cont, cont_out = await _git(
"-c", "core.editor=true", "cherry-pick", "--continue",
cwd=worktree_path, timeout=60,
)
if rc_cont != 0:
await _git("cherry-pick", "--abort", cwd=worktree_path)
return False, f"cherry-pick entity resolution failed on {commit_sha[:8]}: {cont_out}"
logger.info(
"Cherry-pick entity conflict auto-resolved: dropped %s (recoverable)",
", ".join(sorted(conflict_files)),
)
else:
# Real conflict — report exactly what conflicted
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
await _git("cherry-pick", "--abort", cwd=worktree_path)
return False, f"cherry-pick conflict on {commit_sha[:8]}: {conflict_detail}"
if dropped_entities:
logger.info(
"Cherry-pick auto-resolved entity conflicts in %s",
", ".join(sorted(dropped_entities)),
)
# All commits were empty — content already on main
if picked_count == 0:
return True, "already merged (all commits empty)"
# Post-pick dedup: remove duplicate evidence blocks (Leo: PRs #1751, #1752)
await _dedup_enriched_files(worktree_path)
# Force-push clean branch as the original branch name
# Capture expected SHA for force-with-lease
rc, expected_sha = await _git("rev-parse", f"origin/{branch}")
if rc != 0:
return False, f"rev-parse origin/{branch} failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0]
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin",
f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
return True, "cherry-picked and pushed"
finally:
async with _bare_repo_lock:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
from .frontmatter import (
REWEAVE_EDGE_FIELDS,
parse_yaml_frontmatter,
union_edge_lists,
serialize_frontmatter,
)
from .post_merge import (
embed_merged_claims,
reciprocal_edges,
archive_source_for_pr,
commit_source_moves,
)
async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
"""Merge a reweave PR using per-file frontmatter union instead of cherry-pick.
Reweave branches MODIFY existing files (appending YAML frontmatter edges).
Cherry-pick fails when main moved since branch creation (~75% failure rate).
This function:
1. Gets the list of files changed by the reweave branch
2. For each file, reads frontmatter from BOTH main HEAD and branch HEAD
3. Unions the edge arrays (order-preserving, main first, branch-new appended)
4. Asserts branch edges are a superset of main edges (reweave is append-only)
5. Writes merged content to a worktree, commits, pushes as the branch
Approved by Ganymede (manifest approach) and Theseus (superset assertion + order-preserving dedup).
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
clean_branch = f"_clean/{branch.replace('/', '-')}"
# Fetch latest state
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Get files changed by the reweave branch
rc, diff_out = await _git(
"diff", "--name-only", f"origin/main...origin/{branch}", timeout=10,
)
if rc != 0 or not diff_out.strip():
return False, f"no changed files found on {branch}"
changed_files = [f.strip() for f in diff_out.strip().split("\n") if f.strip() and f.strip().endswith(".md")]
if not changed_files:
return False, "no .md files changed"
async with _bare_repo_lock:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
merged_count = 0
skipped_non_superset = []
for fpath in changed_files:
# Read file content from main HEAD and branch HEAD
rc_main, main_content = await _git("show", f"origin/main:{fpath}", timeout=5)
rc_branch, branch_content = await _git("show", f"origin/{branch}:{fpath}", timeout=5)
if rc_branch != 0:
logger.warning("Reweave merge: cannot read %s from branch %s", fpath, branch)
continue
if rc_main != 0:
# File only exists on branch (new file) — just write it
full_path = os.path.join(worktree_path, fpath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(branch_content)
await _git("add", fpath, cwd=worktree_path)
merged_count += 1
continue
# Parse frontmatter from both versions
main_fm, main_raw_fm, main_body = parse_yaml_frontmatter(main_content)
branch_fm, _branch_raw_fm, branch_body = parse_yaml_frontmatter(branch_content)
if main_fm is None or branch_fm is None:
# Parse failure = something unexpected. Fail the merge, don't fallback
# to cherry-pick. (Theseus: loud failure, not silent retry)
return False, f"frontmatter parse failed on {fpath} — manual review needed"
# Superset assertion + merge in one pass.
# Reweave only adds edges. If branch is missing an edge that main has,
# the branch was based on stale main — union is safe (adds both).
merged_edges = {}
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
# Superset check
main_keys = {str(v).strip().lower() for v in main_list if v}
branch_keys = {str(v).strip().lower() for v in branch_list if v}
missing = main_keys - branch_keys
if missing:
logger.warning(
"Reweave merge: %s field '%s' — branch missing edges from main: %s",
fpath, field, missing,
)
skipped_non_superset.append(f"{fpath}:{field}")
# Collect merged edges for string-level splicing
if main_list or branch_list:
merged_edges[field] = union_edge_lists(main_list, branch_list)
# Write merged file — splice edges into main's raw frontmatter, use main's body
full_path = os.path.join(worktree_path, fpath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(serialize_frontmatter(main_raw_fm, merged_edges, main_body))
await _git("add", fpath, cwd=worktree_path)
merged_count += 1
if merged_count == 0:
return False, "no files merged (all skipped)"
# Commit the merged changes
commit_msg = f"reweave: merge {merged_count} files via frontmatter union [auto]"
rc, out = await _git(
"commit", "-m", commit_msg, cwd=worktree_path, timeout=30,
)
if rc != 0:
return False, f"commit failed: {out}"
# Force-push as the branch (for the ff-push step in _merge_domain_queue)
rc, expected_sha = await _git("rev-parse", f"origin/{branch}")
if rc != 0:
return False, f"rev-parse origin/{branch} failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0]
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin",
f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
result_msg = f"frontmatter-union merged {merged_count} files"
if skipped_non_superset:
result_msg += f" (non-superset warnings: {len(skipped_non_superset)})"
return True, result_msg
finally:
async with _bare_repo_lock:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
async def _resubmit_approvals(pr_number: int):
"""Re-submit 2 formal Forgejo approvals after force-push invalidated them.
Force-push (rebase) invalidates existing approvals. Branch protection
requires 2 approvals before the merge API will accept the request.
Same pattern as evaluate._post_formal_approvals.
"""
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved (post-rebase re-approval).", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug(
"Post-rebase approval for PR #%d by %s (%d/2)",
pr_number, agent_name, approvals,
)
if approvals < 2:
logger.warning(
"Only %d/2 approvals submitted for PR #%d after rebase",
approvals, pr_number,
)
async def _merge_pr(pr_number: int) -> tuple[bool, str]:
"""Merge PR via Forgejo API. CURRENTLY UNUSED — local ff-push is the primary merge path.
Kept as fallback: re-enable if Forgejo fixes the 405 bug (Ganymede's API-first design).
The local ff-push in _merge_domain_queue replaced this due to persistent 405 errors.
"""
# Check if already merged/closed on Forgejo (prevents 405 on re-merge attempts)
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if pr_info:
if pr_info.get("merged"):
logger.info("PR #%d already merged on Forgejo, syncing status", pr_number)
return True, "already merged"
if pr_info.get("state") == "closed":
logger.warning("PR #%d closed on Forgejo but not merged", pr_number)
return False, "PR closed without merge"
# Merge whitelist only allows leo and m3taversal — use Leo's token
leo_token = get_agent_token("leo")
if not leo_token:
return False, "no leo token for merge (merge whitelist requires leo)"
# Pre-flight: verify approvals exist before attempting merge (Rhea: catches 405)
reviews = await forgejo_api("GET", repo_path(f"pulls/{pr_number}/reviews"))
if reviews is not None:
approval_count = sum(1 for r in reviews if r.get("state") == "APPROVED")
if approval_count < 2:
logger.info("PR #%d: only %d/2 approvals, resubmitting before merge", pr_number, approval_count)
await _resubmit_approvals(pr_number)
# Retry with backoff + jitter for transient errors (Rhea: jitter prevents thundering herd)
delays = [0, 5, 15, 45]
for attempt, base_delay in enumerate(delays, 1):
if base_delay:
jittered = base_delay * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/merge"),
{"Do": "merge", "merge_message_field": ""},
token=leo_token,
)
if result is not None:
return True, "merged"
# Check if merge succeeded despite API error (timeout case — Rhea)
pr_check = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if pr_check and pr_check.get("merged"):
return True, "already merged"
# Distinguish transient from permanent failures (Ganymede)
if pr_check and not pr_check.get("mergeable", True):
# PR not mergeable — branch diverged or conflict. Rebase needed, not retry.
return False, "merge rejected: PR not mergeable (needs rebase)"
if attempt < len(delays):
logger.info("PR #%d: merge attempt %d failed (transient), retrying in %.0fs",
pr_number, attempt, delays[attempt] if attempt < len(delays) else 0)
return False, "Forgejo merge API failed after 4 attempts (transient)"
async def _delete_remote_branch(branch: str):
"""Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch)
If DELETE fails, log and move on — stale branch is cosmetic,
stale merge is operational.
"""
result = await forgejo_api(
"DELETE",
repo_path(f"branches/{branch}"),
)
if result is None:
logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch)
# --- Domain merge task ---
async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
"""Process the merge queue for a single domain. Returns (succeeded, failed)."""
succeeded = 0
failed = 0
while True:
async with _domain_locks[domain]:
pr = await _claim_next_pr(conn, domain)
if not pr:
break # No more approved PRs for this domain
pr_num = pr["number"]
branch = pr["branch"]
logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain)
try:
# Route reweave branches to frontmatter-union merge.
# Reweave MODIFIES existing files (appending YAML edges) — cherry-pick
# fails ~75% when main moved. Frontmatter union reads current main HEAD,
# unions edge lists, commits. No conflicts possible.
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
if branch.startswith("reweave/"):
merge_fn = _merge_reweave_pr(branch)
else:
# Extraction commits ADD new files — cherry-pick applies cleanly.
merge_fn = _cherry_pick_onto_main(branch)
pick_ok, pick_msg = await asyncio.wait_for(
merge_fn,
timeout=MERGE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
logger.error(
"PR #%d merge timed out after %ds — resetting to conflict (Rhea)", pr_num, MERGE_TIMEOUT_SECONDS
)
mark_conflict(conn, pr_num, last_error=f"merge timed out after {MERGE_TIMEOUT_SECONDS}s")
db.audit(conn, "merge", "timeout", json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS}))
failed += 1
continue
if not pick_ok:
logger.warning("PR #%d merge/cherry-pick failed: %s", pr_num, pick_msg)
# Reweave: close immediately, don't retry (Ship: same rationale as ff-push failure)
if branch.startswith("reweave/"):
await close_pr(conn, pr_num,
last_error=f"reweave merge failed (closed, not retried): {pick_msg[:400]}",
merge_cycled=True, inc_merge_failures=True)
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": f"Reweave merge failed — closing. Next nightly reweave will create a fresh branch.\n\nError: {pick_msg[:200]}"})
await _delete_remote_branch(branch)
else:
mark_conflict(conn, pr_num, last_error=pick_msg[:500])
db.audit(conn, "merge", "cherry_pick_failed", json.dumps({"pr": pr_num, "error": pick_msg[:200]}))
failed += 1
continue
# Content already on main — close PR, skip push, clean up branch.
# Cherry-pick returns "already merged" when all commits are empty.
# The branch ref still points at old commits (not a descendant of main),
# so pushing branch_sha:main would fail as non-fast-forward.
if pick_msg in ("already merged (all commits empty)", "already up to date"):
leo_token = get_agent_token("leo")
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": f"Content already on main — closing.\nBranch: `{branch}`"})
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"), {"state": "closed"}, token=leo_token)
if result is None:
logger.error("PR #%d: Forgejo close failed (already-merged path), skipping DB update", pr_num)
failed += 1
continue
mark_merged(conn, pr_num)
db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch, "note": "content already on main"}))
await _delete_remote_branch(branch)
logger.info("PR #%d already merged (content on main), closed", pr_num)
succeeded += 1
continue
# Local ff-push: cherry-picked branch is a descendant of origin/main.
# Regular push = fast-forward. Non-ff rejected by default (same safety).
# --force-with-lease removed: Forgejo categorically blocks it on protected branches.
await _git("fetch", "origin", branch, timeout=15)
rc, main_sha = await _git("rev-parse", "origin/main")
main_sha = main_sha.strip() if rc == 0 else ""
rc, branch_sha = await _git("rev-parse", f"origin/{branch}")
branch_sha = branch_sha.strip() if rc == 0 else ""
merge_ok = False
merge_msg = ""
if branch_sha:
rc, out = await _git(
"push", "origin", f"{branch_sha}:main",
timeout=30,
)
if rc == 0:
merge_ok = True
merge_msg = f"merged (local ff-push, SHA: {branch_sha[:8]})"
# Close PR on Forgejo with merge SHA comment
leo_token = get_agent_token("leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": f"Merged locally.\nMerge SHA: `{branch_sha}`\nBranch: `{branch}`"},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_num}"),
{"state": "closed"},
token=leo_token,
)
else:
merge_msg = f"local ff-push failed: {out[:200]}"
else:
merge_msg = f"could not resolve origin/{branch}"
if not merge_ok:
logger.error("PR #%d merge failed: %s", pr_num, merge_msg)
# Reweave PRs: close immediately on failure. Cherry-pick retry
# will always fail (reweave modifies existing files). Next nightly
# run creates a fresh branch from current main — retry is wasteful.
# (Ship: prevents reweave flood + wasted retry cycles)
if branch.startswith("reweave/"):
await close_pr(conn, pr_num,
last_error=f"reweave merge failed (closed, not retried): {merge_msg[:400]}",
merge_cycled=True, inc_merge_failures=True)
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": f"Reweave merge failed — closing. Next nightly reweave will create a fresh branch.\n\nError: {merge_msg[:200]}"})
await _delete_remote_branch(branch)
else:
mark_conflict(conn, pr_num, last_error=merge_msg[:500])
db.audit(conn, "merge", "merge_failed", json.dumps({"pr": pr_num, "error": merge_msg[:200]}))
failed += 1
continue
# Success — update status and cleanup
mark_merged(conn, pr_num)
db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch}))
logger.info("PR #%d merged successfully", pr_num)
# Record contributor attribution
try:
await record_contributor_attribution(conn, pr_num, branch, _git)
except Exception:
logger.exception("PR #%d: contributor attribution failed (non-fatal)", pr_num)
# Archive source file (closes near-duplicate loop — Ganymede review)
archive_source_for_pr(branch, domain)
# Embed new/changed claims into Qdrant (non-fatal)
await embed_merged_claims(main_sha, branch_sha, _git)
# Add reciprocal edges on existing claims (non-fatal)
# New claim A with supports:[B] → add supports:[A] on B's frontmatter
await reciprocal_edges(main_sha, branch_sha, _git)
# Cascade: notify agents whose beliefs/positions depend on changed claims
try:
await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn)
except Exception:
logger.exception("PR #%d: cascade failed (non-fatal)", pr_num)
# Cross-domain citation index: log entity-based connections between domains
try:
await cross_domain_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn)
except Exception:
logger.exception("PR #%d: cross_domain failed (non-fatal)", pr_num)
conn.commit() # Commit DB writes before slow branch deletion
# Delete remote branch immediately (Ganymede Q4)
await _delete_remote_branch(branch)
# Prune local worktree metadata
await _git("worktree", "prune")
succeeded += 1
return succeeded, failed
# --- Main entry point ---
async def _reconcile_db_state(conn):
"""Reconcile pipeline DB against Forgejo's actual PR state.
Fixes ghost PRs: DB says 'conflict' or 'open' but Forgejo says merged/closed.
Also detects deleted branches (rev-parse failures). (Leo's structural fix #1)
Run at the start of each merge cycle.
"""
stale = conn.execute(
"SELECT number, branch, status FROM prs WHERE status IN ('conflict', 'open', 'reviewing')"
).fetchall()
if not stale:
return
reconciled = 0
for row in stale:
pr_number = row["number"]
branch = row["branch"]
db_status = row["status"]
# Check Forgejo PR state
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if not pr_info:
continue
forgejo_state = pr_info.get("state", "")
is_merged = pr_info.get("merged", False)
if is_merged and db_status != "merged":
mark_merged(conn, pr_number)
reconciled += 1
continue
if forgejo_state == "closed" and not is_merged and db_status not in ("closed",):
# Clean up branch too — stale branches get rediscovered as new PRs
# (Ship: prevents reweave flood where closed PRs leave branches that
# trigger discover_external_prs → new PR → fail → close → repeat)
if branch:
await _delete_remote_branch(branch)
await close_pr(conn, pr_number, last_error='reconciled: closed on Forgejo', close_on_forgejo=False)
reconciled += 1
continue
# Ghost PR detection: branch deleted but PR still open in DB (Fix #2)
# Ganymede: rc != 0 means remote unreachable — skip, don't close
if db_status in ("open", "reviewing") and branch:
rc, ls_out = await _git("ls-remote", "--heads", "origin", branch, timeout=10)
if rc != 0:
logger.warning("ls-remote failed for %s — skipping ghost check", branch)
continue
if not ls_out.strip():
# Branch gone — close PR on Forgejo and in DB (Ganymede: don't leave orphans)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
body={"state": "closed"},
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
body={"body": "Auto-closed: branch deleted from remote."},
)
await close_pr(conn, pr_number, last_error='reconciled: branch deleted', close_on_forgejo=False)
logger.info("Ghost PR #%d: branch %s deleted, closing", pr_number, branch)
reconciled += 1
if reconciled:
logger.info("Reconciled %d stale PRs against Forgejo state", reconciled)
MAX_CONFLICT_REBASE_ATTEMPTS = 3
async def _handle_permanent_conflicts(conn) -> int:
"""Close conflict_permanent PRs and file their sources correctly.
When a PR fails rebase 3x, the claims are already on main from the first
successful extraction. The source should live in archive/{domain}/ (one copy).
Any duplicate in queue/ gets deleted. No requeuing — breaks the infinite loop.
Hygiene (Cory): one source file, one location, no duplicates.
Reviewed by Ganymede: commit moves, use shutil.move, batch commit at end.
"""
rows = conn.execute(
"""SELECT number, branch, domain
FROM prs
WHERE status = 'conflict_permanent'
ORDER BY number ASC"""
).fetchall()
if not rows:
return 0
handled = 0
files_changed = False
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
for row in rows:
pr_number = row["number"]
branch = row["branch"]
domain = row["domain"] or "unknown"
# Close PR on Forgejo — check return to prevent ghost PR
close_result = await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
body={"state": "closed"},
)
if close_result is None:
logger.error("PR #%d: Forgejo close failed (permanent conflict), skipping", pr_number)
continue
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
body={"body": (
"Closed by conflict auto-resolver: rebase failed 3 times (enrichment conflict). "
"Claims already on main from prior extraction. Source filed in archive."
)},
)
await _delete_remote_branch(branch)
# File the source: one copy in archive/{domain}/, delete duplicates
source_slug = branch.replace("extract/", "", 1) if branch.startswith("extract/") else None
if source_slug:
filename = f"{source_slug}.md"
archive_dir = os.path.join(main_dir, "inbox", "archive", domain)
archive_path = os.path.join(archive_dir, filename)
queue_path = os.path.join(main_dir, "inbox", "queue", filename)
already_archived = os.path.exists(archive_path)
if already_archived:
if os.path.exists(queue_path):
try:
os.remove(queue_path)
logger.info("PR #%d: deleted queue duplicate %s (already in archive/%s)",
pr_number, filename, domain)
files_changed = True
except Exception as e:
logger.warning("PR #%d: failed to delete queue duplicate: %s", pr_number, e)
else:
logger.info("PR #%d: source already in archive/%s, no cleanup needed", pr_number, domain)
else:
if os.path.exists(queue_path):
os.makedirs(archive_dir, exist_ok=True)
try:
shutil.move(queue_path, archive_path)
logger.info("PR #%d: filed source to archive/%s: %s", pr_number, domain, filename)
files_changed = True
except Exception as e:
logger.warning("PR #%d: failed to file source: %s", pr_number, e)
else:
logger.warning("PR #%d: source not found in queue or archive for %s", pr_number, filename)
# Clear batch-state marker
state_marker = f"/opt/teleo-eval/batch-state/{source_slug}.done"
try:
if os.path.exists(state_marker):
os.remove(state_marker)
except Exception:
pass
await close_pr(conn, pr_number,
last_error='conflict_permanent: closed + filed in archive',
close_on_forgejo=False) # Already closed above (Forgejo PATCH at top of loop)
handled += 1
logger.info("Permanent conflict handled: PR #%d closed, source filed", pr_number)
# Batch commit source moves to main (Ganymede: follow entity_batch pattern)
if files_changed:
await _git("add", "-A", "inbox/", cwd=main_dir)
rc, out = await _git(
"commit", "-m",
f"pipeline: archive {handled} conflict-closed source(s)\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
cwd=main_dir,
)
if rc == 0:
# Push with pull-rebase retry (entity_batch pattern)
for attempt in range(3):
await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30)
if rc_push == 0:
logger.info("Committed + pushed source archive moves for %d PRs", handled)
break
await asyncio.sleep(2)
else:
logger.warning("Failed to push source archive moves after 3 attempts")
await _git("reset", "--hard", "origin/main", cwd=main_dir)
if handled:
logger.info("Handled %d permanent conflict PRs (closed + filed)", handled)
return handled
async def _retry_conflict_prs(conn) -> tuple[int, int]:
"""Retry conflict PRs via cherry-pick onto fresh main.
Design: Ganymede (extend merge stage), Rhea (safety guards), Leo (re-eval required).
- Pick up PRs with status='conflict' and both approvals
- Cherry-pick extraction commits onto fresh branch from origin/main
- If cherry-pick succeeds: force-push, reset to 'open' with verdicts cleared for re-eval
- If cherry-pick fails: increment attempt counter, leave as 'conflict'
- After MAX_CONFLICT_REBASE_ATTEMPTS failures: mark 'conflict_permanent'
- Skip branches with new commits since conflict was set (Rhea: someone is working on it)
"""
rows = conn.execute(
"""SELECT number, branch, conflict_rebase_attempts
FROM prs
WHERE status = 'conflict'
AND COALESCE(conflict_rebase_attempts, 0) < ?
ORDER BY number ASC""",
(MAX_CONFLICT_REBASE_ATTEMPTS,),
).fetchall()
if not rows:
return 0, 0
resolved = 0
failed = 0
for row in rows:
pr_number = row["number"]
branch = row["branch"]
attempts = row["conflict_rebase_attempts"] or 0
# Reweave branches modify existing files — cherry-pick will always fail.
# Close immediately and delete branch. Next nightly reweave creates fresh.
# (Ship: prevents wasting 3 retry cycles on branches that can never cherry-pick)
if branch.startswith("reweave/"):
logger.info("Reweave PR #%d: skipping retry, closing + deleting branch", pr_number)
await close_pr(conn, pr_number,
last_error='reweave: closed (retry skipped, next nightly creates fresh)')
await forgejo_api("POST", repo_path(f"issues/{pr_number}/comments"),
{"body": "Reweave conflict — closing instead of retrying. Cherry-pick always fails on reweave branches (they modify existing files). Next nightly reweave will create a fresh branch from current main."})
await _delete_remote_branch(branch)
failed += 1
continue
logger.info("Conflict retry [%d/%d] PR #%d branch=%s",
attempts + 1, MAX_CONFLICT_REBASE_ATTEMPTS, pr_number, branch)
# Fetch latest remote state
await _git("fetch", "origin", branch, timeout=30)
await _git("fetch", "origin", "main", timeout=30)
# Attempt cherry-pick onto fresh main (replaces rebase — Leo+Cory directive)
ok, msg = await _cherry_pick_onto_main(branch)
if ok:
# Rebase succeeded — reset for re-eval (Ganymede: approvals are stale after rebase)
reopen_pr(conn, pr_number, reset_for_reeval=True,
conflict_rebase_attempts=attempts + 1)
logger.info("Conflict resolved: PR #%d rebased successfully, reset for re-eval", pr_number)
resolved += 1
else:
new_attempts = attempts + 1
if new_attempts >= MAX_CONFLICT_REBASE_ATTEMPTS:
mark_conflict_permanent(conn, pr_number,
last_error=f"rebase failed {MAX_CONFLICT_REBASE_ATTEMPTS}x: {msg[:200]}",
conflict_rebase_attempts=new_attempts)
logger.warning("Conflict permanent: PR #%d failed %d rebase attempts: %s",
pr_number, new_attempts, msg[:100])
else:
conn.execute(
"""UPDATE prs
SET conflict_rebase_attempts = ?,
last_error = ?
WHERE number = ?""",
(new_attempts, f"rebase attempt {new_attempts}: {msg[:200]}", pr_number),
)
logger.info("Conflict retry failed: PR #%d attempt %d/%d: %s",
pr_number, new_attempts, MAX_CONFLICT_REBASE_ATTEMPTS, msg[:100])
failed += 1
if resolved or failed:
logger.info("Conflict retry: %d resolved, %d failed", resolved, failed)
return resolved, failed
async def merge_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one merge cycle across all domains.
0. Reconcile DB state against Forgejo (catch ghost PRs)
0.5. Retry conflict PRs (rebase onto current main)
1. Discover external PRs (multiplayer v1)
2. Find all domains with approved PRs
3. Launch one async task per domain (cross-domain parallel, same-domain serial)
"""
# Step 0: Reconcile stale DB entries
await _reconcile_db_state(conn)
# Step 0.5: Retry conflict PRs (Ganymede: before normal merge, same loop)
await _retry_conflict_prs(conn)
# Step 0.6: Handle permanent conflicts (close + requeue for re-extraction)
await _handle_permanent_conflicts(conn)
# Step 1: Discover external PRs
await discover_external_prs(conn)
# Step 2: Find domains with approved work
rows = conn.execute("SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL").fetchall()
domains = [r["domain"] for r in rows]
# Also check for NULL-domain PRs (human PRs with undetected domain)
null_domain = conn.execute("SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL").fetchone()
if null_domain and null_domain["c"] > 0:
logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", null_domain["c"])
if not domains:
return 0, 0
# Step 3: Merge all domains concurrently
tasks = [_merge_domain_queue(conn, domain) for domain in domains]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_succeeded = 0
total_failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.exception("Domain %s merge failed with exception", domains[i])
total_failed += 1
else:
s, f = result
total_succeeded += s
total_failed += f
if total_succeeded or total_failed:
logger.info(
"Merge cycle: %d succeeded, %d failed across %d domains", total_succeeded, total_failed, len(domains)
)
# Batch commit source moves (Ganymede: one commit per cycle, not per PR)
await commit_source_moves(_git)
return total_succeeded, total_failed