teleo-codex/ops/pipeline-v2/lib/merge.py
m3taversal 9925576c13 ship: add contributor attribution tracing to PR lifecycle
- Migration v19: submitted_by column on prs + sources tables
- extract.py: propagates proposed_by from source frontmatter → PR record
- merge.py: sets submitted_by from Forgejo author for human PRs
- dashboard_prs.py: redesigned with Contributor column, improved claim
  visibility in expanded rows, cost estimates, evaluator chain display
- dashboard_routes.py: submitted_by + source_path in pr-lifecycle API
- backfill_submitted_by.py: one-time backfill (1525/1777 PRs matched)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 14:56:03 +00:00

1937 lines
79 KiB
Python

"""Merge stage — domain-serialized priority queue with rebase-before-merge.
Design reviewed by Ganymede (round 2) and Rhea. Key decisions:
- Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery)
- Rebase-before-merge with pinned force-with-lease SHA (Ganymede)
- Priority queue: COALESCE(p.priority, s.priority, 'medium') — PR > source > default
- Human PRs default to 'high', not 'critical' (Ganymede — prevents DoS on pipeline)
- 5-minute merge timeout — force-reset to 'conflict' (Rhea)
- Ack comment on human PR discovery (Rhea)
- Pagination on all Forgejo list endpoints (Ganymede standing rule)
"""
import asyncio
import json
import logging
import os
import random
import re
import shutil
from collections import defaultdict
from . import config, db
from .db import classify_branch
from .dedup import dedup_evidence_blocks
from .domains import detect_domain_from_branch
from .forgejo import api as forgejo_api
# Pipeline-owned branch prefixes — only these get auto-merged.
# Agent branches (theseus/*, rio/*, astra/*, etc.) stay approved but are NOT
# rebased/force-pushed/auto-merged. Agents merge their own PRs.
# Derived from BRANCH_PREFIX_MAP where agent in ("pipeline", "epimetheus").
# (Leo directive: PRs #2141, #157, #2142, #2180 were orphaned by pipeline rebase)
PIPELINE_OWNED_PREFIXES = ("extract/", "ingestion/", "epimetheus/", "reweave/", "fix/")
# Safety assertion: agent branches MUST NOT be in PIPELINE_OWNED_PREFIXES.
# Auto-merge on eval approval bypasses Leo's review gate.
# Agent PRs use auto_merge flag instead (set by evaluate.py after two-reviewer approval).
_AGENT_NAMES = ("theseus", "rio", "astra", "vida", "clay", "leo", "argus", "oberon", "rhea", "ganymede")
for _prefix in PIPELINE_OWNED_PREFIXES:
for _agent in _AGENT_NAMES:
assert not _prefix.startswith(f"{_agent}/"), \
f"FATAL: Agent prefix '{_agent}/' found in PIPELINE_OWNED_PREFIXES — this bypasses Leo's review gate"
# Import worktree lock — file at /opt/teleo-eval/pipeline/lib/worktree_lock.py
try:
from .worktree_lock import async_main_worktree_lock
except ImportError:
import sys
sys.path.insert(0, os.path.dirname(__file__))
from worktree_lock import async_main_worktree_lock
from .cascade import cascade_after_merge
from .cross_domain import cross_domain_after_merge
from .forgejo import get_agent_token, get_pr_diff, repo_path
logger = logging.getLogger("pipeline.merge")
# In-memory domain locks — fast path, lost on crash (durable layer is prs.status)
_domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea)
MERGE_TIMEOUT_SECONDS = 300 # 5 minutes
# --- Git helpers ---
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
"""Run a git command async. Returns (returncode, stdout+stderr)."""
proc = await asyncio.create_subprocess_exec(
"git",
*args,
cwd=cwd or str(config.REPO_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, f"git {args[0]} timed out after {timeout}s"
output = (stdout or b"").decode().strip()
if stderr:
output += "\n" + stderr.decode().strip()
return proc.returncode, output
# --- PR Discovery (Multiplayer v1) ---
async def discover_external_prs(conn) -> int:
"""Scan Forgejo for open PRs not tracked in SQLite.
Human PRs (non-pipeline author) get priority 'high' and origin 'human'.
Critical is reserved for explicit human override only. (Ganymede)
Pagination on all Forgejo list endpoints. (Ganymede standing rule #5)
"""
known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()}
discovered = 0
page = 1
while True:
prs = await forgejo_api(
"GET",
repo_path(f"pulls?state=open&limit=50&page={page}"),
)
if not prs:
break
for pr in prs:
if pr["number"] not in known:
# Detect origin: pipeline agents have per-agent Forgejo users
pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"}
author = pr.get("user", {}).get("login", "")
is_pipeline = author.lower() in pipeline_users
origin = "pipeline" if is_pipeline else "human"
priority = "high" if origin == "human" else None
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
agent, commit_type = classify_branch(pr["head"]["ref"])
# For human PRs, submitted_by is the Forgejo author.
# For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by).
submitted_by = author if origin == "human" else None
conn.execute(
"""INSERT OR IGNORE INTO prs
(number, branch, status, origin, priority, domain, agent, commit_type,
prompt_version, pipeline_version, submitted_by)
VALUES (?, ?, 'open', ?, ?, ?, ?, ?, ?, ?, ?)""",
(pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type, config.PROMPT_VERSION, config.PIPELINE_VERSION, submitted_by),
)
db.audit(
conn,
"merge",
"pr_discovered",
json.dumps(
{
"pr": pr["number"],
"origin": origin,
"author": pr.get("user", {}).get("login"),
"priority": priority or "inherited",
}
),
)
# Ack comment on human PRs so contributor feels acknowledged (Rhea)
if origin == "human":
await _post_ack_comment(pr["number"])
discovered += 1
if len(prs) < 50:
break # Last page
page += 1
if discovered:
logger.info("Discovered %d external PRs", discovered)
return discovered
async def _post_ack_comment(pr_number: int):
"""Post acknowledgment comment on human-submitted PR. (Rhea)
Contributor should feel acknowledged immediately, not wonder if
their PR disappeared into a void.
"""
body = (
"Thanks for the contribution! Your PR is queued for evaluation "
"(priority: high). Expected review time: ~5 minutes.\n\n"
"_This is an automated message from the Teleo pipeline._"
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": body},
)
# --- Merge operations ---
async def _claim_next_pr(conn, domain: str) -> dict | None:
"""Claim the next approved PR for a domain via atomic UPDATE.
Priority inheritance: COALESCE(p.priority, s.priority, 'medium')
- Explicit PR priority (human PRs) > source priority (pipeline) > default medium
- NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2)
- This is intentional: unclassified PRs don't jump ahead of triaged ones
(Rhea: document the precedence for future maintainers)
NOT EXISTS enforces domain serialization in SQL — defense-in-depth even if
asyncio.Lock is bypassed. (Ganymede: approved)
"""
# Build prefix filter for pipeline-owned branches only
# Agent branches stay approved but are NOT auto-merged (Leo: PRs #2141, #157, #2142, #2180)
prefix_clauses = " OR ".join("p.branch LIKE ?" for _ in PIPELINE_OWNED_PREFIXES)
prefix_params = [f"{pfx}%" for pfx in PIPELINE_OWNED_PREFIXES]
row = conn.execute(
f"""UPDATE prs SET status = 'merging', last_attempt = datetime('now')
WHERE number = (
SELECT p.number FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'approved'
AND p.domain = ?
AND ({prefix_clauses} OR p.auto_merge = 1)
AND NOT EXISTS (
SELECT 1 FROM prs p2
WHERE p2.domain = p.domain
AND p2.status = 'merging'
)
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
-- Dependency ordering: PRs with fewer broken wiki links merge first.
-- "Creator" PRs (0 broken links) land before "consumer" PRs that
-- reference them, naturally resolving the dependency chain. (Rhea+Ganymede)
CASE WHEN p.eval_issues LIKE '%broken_wiki_links%' THEN 1 ELSE 0 END,
p.created_at ASC
LIMIT 1
)
RETURNING number, source_path, branch, domain""",
(domain, *prefix_params),
).fetchone()
return dict(row) if row else None
async def _dedup_enriched_files(worktree_path: str) -> int:
"""Scan rebased worktree for duplicate evidence blocks and dedup them.
Returns count of files fixed.
"""
# Get list of modified claim files in this branch vs origin/main
rc, out = await _git("diff", "--name-only", "origin/main..HEAD", cwd=worktree_path)
if rc != 0:
return 0
fixed = 0
for fpath in out.strip().split("\n"):
fpath = fpath.strip()
if not fpath or not fpath.endswith(".md"):
continue
# Only process claim files (domains/, core/, foundations/)
if not any(fpath.startswith(p) for p in ("domains/", "core/", "foundations/")):
continue
full_path = os.path.join(worktree_path, fpath)
if not os.path.exists(full_path):
continue
with open(full_path, "r") as f:
content = f.read()
deduped = dedup_evidence_blocks(content)
if deduped != content:
with open(full_path, "w") as f:
f.write(deduped)
# Stage the fix
await _git("add", fpath, cwd=worktree_path)
fixed += 1
if fixed > 0:
# Amend the last commit to include dedup fixes (no new commit)
await _git(
"-c", "core.editor=true", "commit", "--amend", "--no-edit",
cwd=worktree_path, timeout=30,
)
logger.info("Deduped evidence blocks in %d file(s) after rebase", fixed)
return fixed
async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
"""Cherry-pick extraction commits onto a fresh branch from main.
Replaces rebase-retry: extraction commits ADD new files, so cherry-pick
applies cleanly ~99% of the time. For enrichments (editing existing files),
cherry-pick reports the exact conflict for human review.
Leo's manual fix pattern (PRs #2178, #2141, #157, #2142):
1. git checkout -b clean-branch main
2. git cherry-pick <extraction-commit(s)>
3. Merge to main
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
clean_branch = f"_clean/{branch.replace('/', '-')}"
# Fetch latest state — separate calls to avoid refspec issues with long branch names
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Check if already up to date
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
rc2, main_sha = await _git("rev-parse", "origin/main")
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
return True, "already up to date"
# Get extraction commits (oldest first)
rc, commits_out = await _git(
"log", f"origin/main..origin/{branch}", "--format=%H", "--reverse",
timeout=10,
)
if rc != 0 or not commits_out.strip():
return False, f"no commits found on {branch}"
commit_list = [c.strip() for c in commits_out.strip().split("\n") if c.strip()]
# Create worktree from origin/main (fresh branch)
# Delete stale local branch if it exists from a previous failed attempt
await _git("branch", "-D", clean_branch)
rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
# Cherry-pick each extraction commit
dropped_entities: set[str] = set()
picked_count = 0
for commit_sha in commit_list:
rc, out = await _git("cherry-pick", commit_sha, cwd=worktree_path, timeout=60)
if rc != 0 and "empty" in out.lower():
# Content already on main — skip this commit
await _git("cherry-pick", "--skip", cwd=worktree_path)
logger.info("Cherry-pick %s: empty (already on main), skipping", commit_sha[:8])
continue
picked_count += 1
if rc != 0:
# Check if conflict is entity-only (same auto-resolution as before)
rc_ls, conflicting = await _git(
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path
)
conflict_files = [
f.strip() for f in conflicting.split("\n") if f.strip()
] if rc_ls == 0 else []
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
# Entity conflicts: take main's version (entities are recoverable)
# In cherry-pick: --ours = branch we're ON (clean branch from origin/main)
# --theirs = commit being cherry-picked (extraction branch)
for cf in conflict_files:
await _git("checkout", "--ours", cf, cwd=worktree_path)
await _git("add", cf, cwd=worktree_path)
dropped_entities.update(conflict_files)
rc_cont, cont_out = await _git(
"-c", "core.editor=true", "cherry-pick", "--continue",
cwd=worktree_path, timeout=60,
)
if rc_cont != 0:
await _git("cherry-pick", "--abort", cwd=worktree_path)
return False, f"cherry-pick entity resolution failed on {commit_sha[:8]}: {cont_out}"
logger.info(
"Cherry-pick entity conflict auto-resolved: dropped %s (recoverable)",
", ".join(sorted(conflict_files)),
)
else:
# Real conflict — report exactly what conflicted
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
await _git("cherry-pick", "--abort", cwd=worktree_path)
return False, f"cherry-pick conflict on {commit_sha[:8]}: {conflict_detail}"
if dropped_entities:
logger.info(
"Cherry-pick auto-resolved entity conflicts in %s",
", ".join(sorted(dropped_entities)),
)
# All commits were empty — content already on main
if picked_count == 0:
return True, "already merged (all commits empty)"
# Post-pick dedup: remove duplicate evidence blocks (Leo: PRs #1751, #1752)
await _dedup_enriched_files(worktree_path)
# Force-push clean branch as the original branch name
# Capture expected SHA for force-with-lease
rc, expected_sha = await _git("rev-parse", f"origin/{branch}")
if rc != 0:
return False, f"rev-parse origin/{branch} failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0]
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin",
f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
return True, "cherry-picked and pushed"
finally:
# Cleanup worktree and temp branch
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "challenged_by", "depends_on", "related", "reweave_edges")
# When A supports B, B also supports A (approximately symmetric).
# When A challenges B, B is challenged_by A (NOT symmetric — direction matters).
RECIPROCAL_EDGE_MAP = {
"supports": "supports",
"challenges": "challenged_by",
"related": "related",
"depends_on": "related", # A depends_on B → B is related to A (not symmetric)
}
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]:
"""Parse YAML frontmatter from markdown text.
Returns (frontmatter_dict, raw_fm_text, body_text_including_closing_delimiter).
Returns (None, "", text) if no valid frontmatter found.
raw_fm_text is the text between the --- delimiters (no delimiters, no leading newline).
"""
import yaml
if not text.startswith("---"):
return None, "", text
end = text.find("\n---", 3)
if end == -1:
return None, "", text
try:
raw_fm_text = text[4:end] # skip "---\n", stop before "\n---"
fm = yaml.safe_load(raw_fm_text)
body = text[end:] # includes closing \n--- and body
return (fm if isinstance(fm, dict) else None), raw_fm_text, body
except Exception:
return None, "", text
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
"""Union two edge lists, preserving order from main (append new at end).
Deduplicates by lowercase slug. Main's order is preserved; branch-only
edges are appended in their original order.
"""
seen = set()
result = []
for edge in main_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
for edge in branch_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
return result
def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str:
"""Splice merged edge fields into raw frontmatter text, preserving all other fields byte-identical.
Only modifies REWEAVE_EDGE_FIELDS lines. All other frontmatter (title, confidence, type, etc.)
stays exactly as it was in the source text — no yaml.dump reformatting.
Args:
raw_fm_text: The raw YAML text between the --- delimiters (no delimiters included).
merged_edges: {field_name: [edge_values]} for each edge field that should be present.
"""
import re
import yaml
lines = raw_fm_text.split("\n")
result_lines = []
i = 0
fields_written = set()
while i < len(lines):
line = lines[i]
# Check if this line starts an edge field
matched_field = None
for field in REWEAVE_EDGE_FIELDS:
if line.startswith(f"{field}:"):
matched_field = field
break
if matched_field:
fields_written.add(matched_field)
# Skip the old field and its list items (may be indented with spaces)
i += 1
while i < len(lines) and lines[i] and (lines[i][0] in (' ', '-')):
i += 1
# Write the merged version
edges = merged_edges.get(matched_field, [])
if edges:
result_lines.append(f"{matched_field}:")
for edge in edges:
result_lines.append(f"- {edge}")
# Don't increment i — it's already past the old field
continue
else:
result_lines.append(line)
i += 1
# Append any new edge fields that didn't exist in the original
for field in REWEAVE_EDGE_FIELDS:
if field not in fields_written:
edges = merged_edges.get(field, [])
if edges:
result_lines.append(f"{field}:")
for edge in edges:
result_lines.append(f"- {edge}")
return "\n".join(result_lines)
def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str:
"""Rebuild markdown file: splice merged edges into raw frontmatter, append body.
Uses string-level surgery — only edge fields are modified. All other frontmatter
stays byte-identical to the source. No yaml.dump reformatting.
"""
spliced = _serialize_edge_fields(raw_fm_text, merged_edges)
# body starts with \n--- (closing delimiter + body text)
if body.startswith("\n"):
return f"---\n{spliced}{body}"
return f"---\n{spliced}\n{body}"
async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
"""Merge a reweave PR using per-file frontmatter union instead of cherry-pick.
Reweave branches MODIFY existing files (appending YAML frontmatter edges).
Cherry-pick fails when main moved since branch creation (~75% failure rate).
This function:
1. Gets the list of files changed by the reweave branch
2. For each file, reads frontmatter from BOTH main HEAD and branch HEAD
3. Unions the edge arrays (order-preserving, main first, branch-new appended)
4. Asserts branch edges are a superset of main edges (reweave is append-only)
5. Writes merged content to a worktree, commits, pushes as the branch
Approved by Ganymede (manifest approach) and Theseus (superset assertion + order-preserving dedup).
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
clean_branch = f"_clean/{branch.replace('/', '-')}"
# Fetch latest state
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Get files changed by the reweave branch
rc, diff_out = await _git(
"diff", "--name-only", f"origin/main...origin/{branch}", timeout=10,
)
if rc != 0 or not diff_out.strip():
return False, f"no changed files found on {branch}"
changed_files = [f.strip() for f in diff_out.strip().split("\n") if f.strip() and f.strip().endswith(".md")]
if not changed_files:
return False, "no .md files changed"
# Pre-cleanup: remove stale worktree/branch from prior crash (SIGKILL, OOM, etc.)
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
merged_count = 0
skipped_non_superset = []
for fpath in changed_files:
# Read file content from main HEAD and branch HEAD
rc_main, main_content = await _git("show", f"origin/main:{fpath}", timeout=5)
rc_branch, branch_content = await _git("show", f"origin/{branch}:{fpath}", timeout=5)
if rc_branch != 0:
logger.warning("Reweave merge: cannot read %s from branch %s", fpath, branch)
continue
if rc_main != 0:
# File only exists on branch (new file) — just write it
full_path = os.path.join(worktree_path, fpath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(branch_content)
await _git("add", fpath, cwd=worktree_path)
merged_count += 1
continue
# Parse frontmatter from both versions
main_fm, main_raw_fm, main_body = _parse_yaml_frontmatter(main_content)
branch_fm, _branch_raw_fm, branch_body = _parse_yaml_frontmatter(branch_content)
if main_fm is None or branch_fm is None:
# Parse failure = something unexpected. Fail the merge, don't fallback
# to cherry-pick. (Theseus: loud failure, not silent retry)
return False, f"frontmatter parse failed on {fpath} — manual review needed"
# Superset assertion + merge in one pass.
# Reweave only adds edges. If branch is missing an edge that main has,
# the branch was based on stale main — union is safe (adds both).
merged_edges = {}
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
# Superset check
main_keys = {str(v).strip().lower() for v in main_list if v}
branch_keys = {str(v).strip().lower() for v in branch_list if v}
missing = main_keys - branch_keys
if missing:
logger.warning(
"Reweave merge: %s field '%s' — branch missing edges from main: %s",
fpath, field, missing,
)
skipped_non_superset.append(f"{fpath}:{field}")
# Collect merged edges for string-level splicing
if main_list or branch_list:
merged_edges[field] = _union_edge_lists(main_list, branch_list)
# Write merged file — splice edges into main's raw frontmatter, use main's body
full_path = os.path.join(worktree_path, fpath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(_serialize_frontmatter(main_raw_fm, merged_edges, main_body))
await _git("add", fpath, cwd=worktree_path)
merged_count += 1
if merged_count == 0:
return False, "no files merged (all skipped)"
# Commit the merged changes
commit_msg = f"reweave: merge {merged_count} files via frontmatter union [auto]"
rc, out = await _git(
"commit", "-m", commit_msg, cwd=worktree_path, timeout=30,
)
if rc != 0:
return False, f"commit failed: {out}"
# Force-push as the branch (for the ff-push step in _merge_domain_queue)
rc, expected_sha = await _git("rev-parse", f"origin/{branch}")
if rc != 0:
return False, f"rev-parse origin/{branch} failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0]
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin",
f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
result_msg = f"frontmatter-union merged {merged_count} files"
if skipped_non_superset:
result_msg += f" (non-superset warnings: {len(skipped_non_superset)})"
return True, result_msg
finally:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
async def _resubmit_approvals(pr_number: int):
"""Re-submit 2 formal Forgejo approvals after force-push invalidated them.
Force-push (rebase) invalidates existing approvals. Branch protection
requires 2 approvals before the merge API will accept the request.
Same pattern as evaluate._post_formal_approvals.
"""
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved (post-rebase re-approval).", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug(
"Post-rebase approval for PR #%d by %s (%d/2)",
pr_number, agent_name, approvals,
)
if approvals < 2:
logger.warning(
"Only %d/2 approvals submitted for PR #%d after rebase",
approvals, pr_number,
)
async def _merge_pr(pr_number: int) -> tuple[bool, str]:
"""Merge PR via Forgejo API. CURRENTLY UNUSED — local ff-push is the primary merge path.
Kept as fallback: re-enable if Forgejo fixes the 405 bug (Ganymede's API-first design).
The local ff-push in _merge_domain_queue replaced this due to persistent 405 errors.
"""
# Check if already merged/closed on Forgejo (prevents 405 on re-merge attempts)
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if pr_info:
if pr_info.get("merged"):
logger.info("PR #%d already merged on Forgejo, syncing status", pr_number)
return True, "already merged"
if pr_info.get("state") == "closed":
logger.warning("PR #%d closed on Forgejo but not merged", pr_number)
return False, "PR closed without merge"
# Merge whitelist only allows leo and m3taversal — use Leo's token
leo_token = get_agent_token("leo")
if not leo_token:
return False, "no leo token for merge (merge whitelist requires leo)"
# Pre-flight: verify approvals exist before attempting merge (Rhea: catches 405)
reviews = await forgejo_api("GET", repo_path(f"pulls/{pr_number}/reviews"))
if reviews is not None:
approval_count = sum(1 for r in reviews if r.get("state") == "APPROVED")
if approval_count < 2:
logger.info("PR #%d: only %d/2 approvals, resubmitting before merge", pr_number, approval_count)
await _resubmit_approvals(pr_number)
# Retry with backoff + jitter for transient errors (Rhea: jitter prevents thundering herd)
delays = [0, 5, 15, 45]
for attempt, base_delay in enumerate(delays, 1):
if base_delay:
jittered = base_delay * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/merge"),
{"Do": "merge", "merge_message_field": ""},
token=leo_token,
)
if result is not None:
return True, "merged"
# Check if merge succeeded despite API error (timeout case — Rhea)
pr_check = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if pr_check and pr_check.get("merged"):
return True, "already merged"
# Distinguish transient from permanent failures (Ganymede)
if pr_check and not pr_check.get("mergeable", True):
# PR not mergeable — branch diverged or conflict. Rebase needed, not retry.
return False, "merge rejected: PR not mergeable (needs rebase)"
if attempt < len(delays):
logger.info("PR #%d: merge attempt %d failed (transient), retrying in %.0fs",
pr_number, attempt, delays[attempt] if attempt < len(delays) else 0)
return False, "Forgejo merge API failed after 4 attempts (transient)"
async def _delete_remote_branch(branch: str):
"""Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch)
If DELETE fails, log and move on — stale branch is cosmetic,
stale merge is operational.
"""
result = await forgejo_api(
"DELETE",
repo_path(f"branches/{branch}"),
)
if result is None:
logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch)
# --- Contributor attribution ---
def _is_knowledge_pr(diff: str) -> bool:
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
Knowledge PRs get full CI attribution weight.
Pipeline-only PRs (inbox, entities, agents, archive) get zero CI weight.
Mixed PRs count as knowledge — if a PR adds a claim, it gets attribution
even if it also moves source files. Knowledge takes priority. (Ganymede review)
"""
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
for line in diff.split("\n"):
if line.startswith("+++ b/") or line.startswith("--- a/"):
path = line.split("/", 1)[1] if "/" in line else ""
if any(path.startswith(p) for p in knowledge_prefixes):
return True
return False
def _refine_commit_type(diff: str, branch_commit_type: str) -> str:
"""Refine commit_type from diff content when branch prefix is ambiguous.
Branch prefix gives initial classification (extract, research, entity, etc.).
For 'extract' branches, diff content can distinguish:
- challenge: adds challenged_by edges to existing claims
- enrich: modifies existing claim frontmatter without new files
- extract: creates new claim files (default for extract branches)
Only refines 'extract' type — other branch types (research, entity, reweave, fix)
are already specific enough.
"""
if branch_commit_type != "extract":
return branch_commit_type
new_files = 0
modified_files = 0
has_challenge_edge = False
in_diff_header = False
current_is_new = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
in_diff_header = True
current_is_new = False
elif line.startswith("new file"):
current_is_new = True
elif line.startswith("+++ b/"):
path = line[6:]
if any(path.startswith(p) for p in ("domains/", "core/", "foundations/")):
if current_is_new:
new_files += 1
else:
modified_files += 1
in_diff_header = False
elif line.startswith("+") and not line.startswith("+++"):
if "challenged_by:" in line or "challenges:" in line:
has_challenge_edge = True
if has_challenge_edge and new_files == 0:
return "challenge"
if modified_files > 0 and new_files == 0:
return "enrich"
return "extract"
async def _record_contributor_attribution(conn, pr_number: int, branch: str):
"""Record contributor attribution after a successful merge.
Parses git trailers and claim frontmatter to identify contributors
and their roles. Upserts into contributors table. Refines commit_type
from diff content. Pipeline-only PRs (no knowledge files) are skipped.
"""
import re as _re
from datetime import date as _date, datetime as _dt
today = _date.today().isoformat()
# Get the PR diff to parse claim frontmatter for attribution blocks
diff = await get_pr_diff(pr_number)
if not diff:
return
# Pipeline-only PRs (inbox, entities, agents) don't count toward CI
if not _is_knowledge_pr(diff):
logger.info("PR #%d: pipeline-only commit — skipping CI attribution", pr_number)
return
# Refine commit_type from diff content (branch prefix may be too broad)
row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone()
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
refined_type = _refine_commit_type(diff, branch_type)
if refined_type != branch_type:
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
logger.info("PR #%d: commit_type refined %s%s", pr_number, branch_type, refined_type)
# Parse Pentagon-Agent trailer from branch commit messages
agents_found: set[str] = set()
rc, log_output = await _git(
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
timeout=10,
)
if rc == 0:
for match in _re.finditer(r"Pentagon-Agent:\s*(\S+)\s*<([^>]+)>", log_output):
agent_name = match.group(1).lower()
agent_uuid = match.group(2)
_upsert_contributor(
conn, agent_name, agent_uuid, "extractor", today,
)
agents_found.add(agent_name)
# Parse attribution blocks from claim frontmatter in diff
# Look for added lines with attribution YAML
current_role = None
for line in diff.split("\n"):
if not line.startswith("+") or line.startswith("+++"):
continue
stripped = line[1:].strip()
# Detect role sections in attribution block
for role in ("sourcer", "extractor", "challenger", "synthesizer", "reviewer"):
if stripped.startswith(f"{role}:"):
current_role = role
break
# Extract handle from attribution entries
handle_match = _re.match(r'-\s*handle:\s*["\']?([^"\']+)["\']?', stripped)
if handle_match and current_role:
handle = handle_match.group(1).strip().lower()
agent_id_match = _re.search(r'agent_id:\s*["\']?([^"\']+)', stripped)
agent_id = agent_id_match.group(1).strip() if agent_id_match else None
_upsert_contributor(conn, handle, agent_id, current_role, today)
# Fallback: if no attribution block found, credit the branch agent as extractor
if not agents_found:
# Try to infer agent from branch name (e.g., "extract/2026-03-05-...")
# The PR's agent field in SQLite is also available
row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
if row and row["agent"]:
_upsert_contributor(conn, row["agent"].lower(), None, "extractor", today)
# Increment claims_merged for all contributors on this PR
# (handled inside _upsert_contributor via the role counts)
def _upsert_contributor(
conn, handle: str, agent_id: str | None, role: str, date_str: str,
):
"""Upsert a contributor record, incrementing the appropriate role count."""
import json as _json
from datetime import datetime as _dt
role_col = f"{role}_count"
if role_col not in (
"sourcer_count", "extractor_count", "challenger_count",
"synthesizer_count", "reviewer_count",
):
logger.warning("Unknown contributor role: %s", role)
return
existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone()
if existing:
conn.execute(
f"""UPDATE contributors SET
{role_col} = {role_col} + 1,
claims_merged = claims_merged + CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END,
last_contribution = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(role, date_str, handle),
)
else:
conn.execute(
f"""INSERT INTO contributors (handle, agent_id, first_contribution, last_contribution, {role_col}, claims_merged)
VALUES (?, ?, ?, ?, 1, CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END)""",
(handle, agent_id, date_str, date_str, role),
)
# Recalculate tier
_recalculate_tier(conn, handle)
def _recalculate_tier(conn, handle: str):
"""Recalculate contributor tier based on config rules."""
from datetime import date as _date, datetime as _dt
row = conn.execute(
"SELECT claims_merged, challenges_survived, first_contribution, tier FROM contributors WHERE handle = ?",
(handle,),
).fetchone()
if not row:
return
current_tier = row["tier"]
claims_merged = row["claims_merged"] or 0
challenges_survived = row["challenges_survived"] or 0
first_contribution = row["first_contribution"]
days_since_first = 0
if first_contribution:
try:
first_date = _dt.strptime(first_contribution, "%Y-%m-%d").date()
days_since_first = (_date.today() - first_date).days
except ValueError:
pass
# Check veteran first (higher tier)
vet_rules = config.CONTRIBUTOR_TIER_RULES["veteran"]
if (claims_merged >= vet_rules["claims_merged"]
and days_since_first >= vet_rules["min_days_since_first"]
and challenges_survived >= vet_rules["challenges_survived"]):
new_tier = "veteran"
elif claims_merged >= config.CONTRIBUTOR_TIER_RULES["contributor"]["claims_merged"]:
new_tier = "contributor"
else:
new_tier = "new"
if new_tier != current_tier:
conn.execute(
"UPDATE contributors SET tier = ?, updated_at = datetime('now') WHERE handle = ?",
(new_tier, handle),
)
logger.info("Contributor %s: tier %s%s", handle, current_tier, new_tier)
db.audit(
conn, "contributor", "tier_change",
json.dumps({"handle": handle, "from": current_tier, "to": new_tier}),
)
# --- Source archiving after merge (Ganymede review: closes near-duplicate loop) ---
# Accumulates source moves during a merge cycle, batch-committed at the end
_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path)
def _update_source_frontmatter_status(path: str, new_status: str):
"""Update the status field in a source file's frontmatter. (Ganymede: 5 lines)"""
import re as _re
try:
text = open(path).read()
text = _re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=_re.MULTILINE)
open(path, "w").write(text)
except Exception as e:
logger.warning("Failed to update source status in %s: %s", path, e)
async def _embed_merged_claims(main_sha: str, branch_sha: str):
"""Embed new/changed claim files from a merged PR into Qdrant.
Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip)
to find ALL changed files across the entire branch, not just the last commit.
Also deletes Qdrant vectors for files removed by the branch.
Non-fatal — embedding failure does not block the merge pipeline.
"""
try:
# --- Embed added/changed files ---
rc, diff_out = await _git(
"diff", "--name-only", "--diff-filter=ACMR",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc != 0:
logger.warning("embed: diff failed (rc=%d), skipping", rc)
return
embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"}
md_files = [
f for f in diff_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in embed_dirs)
and not f.split("/")[-1].startswith("_")
]
embedded = 0
for fpath in md_files:
full_path = config.MAIN_WORKTREE / fpath
if not full_path.exists():
continue
proc = await asyncio.create_subprocess_exec(
"python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
if proc.returncode == 0 and b"OK" in stdout:
embedded += 1
else:
logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200])
if embedded:
logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files))
# --- Delete vectors for removed files (Ganymede: stale vector cleanup) ---
rc, del_out = await _git(
"diff", "--name-only", "--diff-filter=D",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc == 0 and del_out.strip():
deleted_files = [
f for f in del_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in embed_dirs)
]
if deleted_files:
import hashlib
point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files]
try:
import urllib.request
req = urllib.request.Request(
"http://localhost:6333/collections/teleo-claims/points/delete",
data=json.dumps({"points": point_ids}).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=10)
logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids))
except Exception:
logger.warning("embed: failed to delete stale vectors (non-fatal)")
except Exception:
logger.exception("embed: post-merge embedding failed (non-fatal)")
async def _reciprocal_edges(main_sha: str, branch_sha: str):
"""Add reciprocal edges on existing claims after a PR merges.
When a new claim A has `supports: [B]` in its frontmatter, B should have
`supports: [A]` added to its own frontmatter. This gives A an incoming link,
preventing it from being an orphan.
Runs on main after cherry-pick merge. Non-fatal — orphans are recoverable.
Only processes new files (diff-filter=A), not modified files.
"""
EDGE_FIELDS = ("supports", "challenges", "related")
# Inverse mapping: if A supports B, then B is supported-by A.
# For simplicity, we use the same edge type (bidirectional "supports" means
# both claims support each other's argument). This matches reweave behavior.
try:
# Find newly added claim files
rc, diff_out = await _git(
"diff", "--name-only", "--diff-filter=A",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc != 0:
logger.warning("reciprocal_edges: diff failed (rc=%d), skipping", rc)
return
claim_dirs = {"domains/", "core/", "foundations/"}
new_claims = [
f for f in diff_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in claim_dirs)
and not f.split("/")[-1].startswith("_")
and "/entities/" not in f
and "/decisions/" not in f
]
if not new_claims:
return
reciprocals_added = 0
modified_files = set()
for claim_path in new_claims:
full_path = config.MAIN_WORKTREE / claim_path
if not full_path.exists():
continue
try:
content = full_path.read_text()
except Exception:
continue
fm, raw_fm, body = _parse_yaml_frontmatter(content)
if fm is None:
continue
# Get the new claim's slug (filename without .md)
claim_slug = claim_path.rsplit("/", 1)[-1].replace(".md", "")
# Collect all edge targets from this new claim
for field in EDGE_FIELDS:
targets = fm.get(field, [])
if isinstance(targets, str):
targets = [targets]
if not isinstance(targets, list):
continue
for target_slug in targets:
target_slug = str(target_slug).strip()
if not target_slug:
continue
# Find the target file on disk
target_file = _find_claim_file(target_slug)
if target_file is None:
continue
# Add reciprocal edge: target now has field: [new_claim_slug]
reciprocal_type = RECIPROCAL_EDGE_MAP.get(field, "related")
if _add_edge_to_file(target_file, reciprocal_type, claim_slug):
reciprocals_added += 1
modified_files.add(str(target_file))
if reciprocals_added > 0:
# Stage only the files we modified (never git add -A in automation)
for f in modified_files:
await _git("add", f, cwd=str(config.MAIN_WORKTREE))
rc, out = await _git(
"commit", "-m", f"reciprocal edges: {reciprocals_added} edges from {len(new_claims)} new claims",
cwd=str(config.MAIN_WORKTREE),
)
if rc == 0:
# Push immediately — batch-extract-50.sh does reset --hard origin/main
# every 15 min, which destroys unpushed local commits
push_rc, push_out = await _git(
"push", "origin", "main",
cwd=str(config.MAIN_WORKTREE),
timeout=30,
)
if push_rc == 0:
logger.info("reciprocal_edges: %d edges pushed to main (%d new claims)", reciprocals_added, len(new_claims))
else:
logger.warning("reciprocal_edges: push failed (commit is local only): %s", push_out[:200])
else:
logger.warning("reciprocal_edges: commit failed: %s", out[:200])
except Exception:
logger.exception("reciprocal_edges: failed (non-fatal)")
def _find_claim_file(slug: str) -> "Path | None":
"""Find a claim file on disk by its slug. Searches domains/, core/, foundations/."""
from pathlib import Path as _Path
worktree = config.MAIN_WORKTREE
for search_dir in ("domains", "core", "foundations"):
base = worktree / search_dir
if not base.is_dir():
continue
# Direct match
for md in base.rglob(f"{slug}.md"):
if not md.name.startswith("_"):
return md
return None
def _add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool:
"""Add a single edge to a file's frontmatter. Returns True if modified."""
try:
content = file_path.read_text()
except Exception:
return False
fm, raw_fm, body = _parse_yaml_frontmatter(content)
if fm is None:
return False
# Check for existing edge (dedup)
existing = fm.get(edge_type, [])
if isinstance(existing, str):
existing = [existing]
if not isinstance(existing, list):
existing = []
if any(str(e).strip().lower() == target_slug.lower() for e in existing):
return False # Already exists
# Build merged edges (all edge fields, only modifying the target one)
merged_edges = {}
for field in REWEAVE_EDGE_FIELDS:
vals = fm.get(field, [])
if isinstance(vals, str):
vals = [vals]
if not isinstance(vals, list):
vals = []
merged_edges[field] = list(vals)
merged_edges.setdefault(edge_type, []).append(target_slug)
# Serialize using the same string-surgery approach as reweave
new_fm = _serialize_edge_fields(raw_fm, merged_edges)
if body.startswith("\n"):
new_content = f"---\n{new_fm}{body}"
else:
new_content = f"---\n{new_fm}\n{body}"
try:
file_path.write_text(new_content)
return True
except Exception:
return False
def _archive_source_for_pr(branch: str, domain: str, merged: bool = True):
"""Move source from queue/ to archive/{domain}/ after PR merge or close.
Only handles extract/ branches (Ganymede: skip research sessions).
Updates frontmatter: 'processed' for merged, 'rejected' for closed.
Accumulates moves for batch commit at end of merge cycle.
"""
if not branch.startswith("extract/"):
return
source_slug = branch.replace("extract/", "", 1)
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md")
archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown")
archive_path = os.path.join(archive_dir, f"{source_slug}.md")
# Already in archive? Delete queue duplicate
if os.path.exists(archive_path):
if os.path.exists(queue_path):
try:
os.remove(queue_path)
_pending_source_moves.append((queue_path, "deleted"))
logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain)
except Exception as e:
logger.warning("Source dedup failed: %s", e)
return
# Move from queue to archive
if os.path.exists(queue_path):
# Update frontmatter before moving (Ganymede: distinguish merged vs rejected)
_update_source_frontmatter_status(queue_path, "processed" if merged else "rejected")
os.makedirs(archive_dir, exist_ok=True)
try:
shutil.move(queue_path, archive_path)
_pending_source_moves.append((queue_path, archive_path))
logger.info("Source archived: queue/%s → archive/%s/ (status=%s)",
source_slug, domain, "processed" if merged else "rejected")
except Exception as e:
logger.warning("Source archive failed: %s", e)
async def _commit_source_moves():
"""Batch commit accumulated source moves. Called at end of merge cycle.
Rhea review: fetch+reset before touching files, use main_worktree_lock,
crash gap is self-healing (reset --hard reverts uncommitted moves).
"""
if not _pending_source_moves:
return
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
count = len(_pending_source_moves)
_pending_source_moves.clear()
# Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C)
try:
async with async_main_worktree_lock(timeout=10):
# Sync worktree with remote (Rhea: fetch+reset, not pull)
await _git("fetch", "origin", "main", cwd=main_dir, timeout=30)
await _git("reset", "--hard", "origin/main", cwd=main_dir, timeout=30)
await _git("add", "-A", "inbox/", cwd=main_dir)
rc, out = await _git(
"commit", "-m",
f"pipeline: archive {count} source(s) post-merge\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
cwd=main_dir,
)
if rc != 0:
if "nothing to commit" in out:
return
logger.warning("Source archive commit failed: %s", out)
return
for attempt in range(3):
await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30)
if rc_push == 0:
logger.info("Committed + pushed %d source archive moves", count)
return
await asyncio.sleep(2)
logger.warning("Failed to push source archive moves after 3 attempts")
await _git("reset", "--hard", "origin/main", cwd=main_dir)
except TimeoutError:
logger.warning("Source archive commit skipped: worktree lock timeout")
# --- Domain merge task ---
async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
"""Process the merge queue for a single domain. Returns (succeeded, failed)."""
succeeded = 0
failed = 0
while True:
async with _domain_locks[domain]:
pr = await _claim_next_pr(conn, domain)
if not pr:
break # No more approved PRs for this domain
pr_num = pr["number"]
branch = pr["branch"]
logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain)
try:
# Route reweave branches to frontmatter-union merge.
# Reweave MODIFIES existing files (appending YAML edges) — cherry-pick
# fails ~75% when main moved. Frontmatter union reads current main HEAD,
# unions edge lists, commits. No conflicts possible.
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
if branch.startswith("reweave/"):
merge_fn = _merge_reweave_pr(branch)
else:
# Extraction commits ADD new files — cherry-pick applies cleanly.
merge_fn = _cherry_pick_onto_main(branch)
pick_ok, pick_msg = await asyncio.wait_for(
merge_fn,
timeout=MERGE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
logger.error(
"PR #%d merge timed out after %ds — resetting to conflict (Rhea)", pr_num, MERGE_TIMEOUT_SECONDS
)
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num),
)
db.audit(conn, "merge", "timeout", json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS}))
failed += 1
continue
if not pick_ok:
logger.warning("PR #%d merge/cherry-pick failed: %s", pr_num, pick_msg)
# Reweave: close immediately, don't retry (Ship: same rationale as ff-push failure)
if branch.startswith("reweave/"):
conn.execute(
"UPDATE prs SET status = 'closed', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(f"reweave merge failed (closed, not retried): {pick_msg[:400]}", pr_num),
)
await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"), {"state": "closed"})
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": f"Reweave merge failed — closing. Next nightly reweave will create a fresh branch.\n\nError: {pick_msg[:200]}"})
await _delete_remote_branch(branch)
else:
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(pick_msg[:500], pr_num),
)
db.audit(conn, "merge", "cherry_pick_failed", json.dumps({"pr": pr_num, "error": pick_msg[:200]}))
failed += 1
continue
# Local ff-merge: push cherry-picked branch as main (Rhea's approach, Leo+Rhea: local primary)
# The branch was just cherry-picked onto origin/main,
# so origin/{branch} is a descendant of origin/main. Push it as main.
await _git("fetch", "origin", branch, timeout=15)
rc, main_sha = await _git("rev-parse", "origin/main")
main_sha = main_sha.strip() if rc == 0 else ""
rc, branch_sha = await _git("rev-parse", f"origin/{branch}")
branch_sha = branch_sha.strip() if rc == 0 else ""
merge_ok = False
merge_msg = ""
if branch_sha:
rc, out = await _git(
"push", f"--force-with-lease=main:{main_sha}",
"origin", f"{branch_sha}:main",
timeout=30,
)
if rc == 0:
merge_ok = True
merge_msg = f"merged (local ff-push, SHA: {branch_sha[:8]})"
# Close PR on Forgejo with merge SHA comment
leo_token = get_agent_token("leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": f"Merged locally.\nMerge SHA: `{branch_sha}`\nBranch: `{branch}`"},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_num}"),
{"state": "closed"},
token=leo_token,
)
else:
merge_msg = f"local ff-push failed: {out[:200]}"
else:
merge_msg = f"could not resolve origin/{branch}"
if not merge_ok:
logger.error("PR #%d merge failed: %s", pr_num, merge_msg)
# Reweave PRs: close immediately on failure. Cherry-pick retry
# will always fail (reweave modifies existing files). Next nightly
# run creates a fresh branch from current main — retry is wasteful.
# (Ship: prevents reweave flood + wasted retry cycles)
if branch.startswith("reweave/"):
conn.execute(
"UPDATE prs SET status = 'closed', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(f"reweave merge failed (closed, not retried): {merge_msg[:400]}", pr_num),
)
await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"), {"state": "closed"})
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": f"Reweave merge failed — closing. Next nightly reweave will create a fresh branch.\n\nError: {merge_msg[:200]}"})
await _delete_remote_branch(branch)
else:
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(merge_msg[:500], pr_num),
)
db.audit(conn, "merge", "merge_failed", json.dumps({"pr": pr_num, "error": merge_msg[:200]}))
failed += 1
continue
# Success — update status and cleanup
conn.execute(
"""UPDATE prs SET status = 'merged',
merged_at = datetime('now'),
last_error = NULL
WHERE number = ?""",
(pr_num,),
)
db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch}))
logger.info("PR #%d merged successfully", pr_num)
# Record contributor attribution
try:
await _record_contributor_attribution(conn, pr_num, branch)
except Exception:
logger.exception("PR #%d: contributor attribution failed (non-fatal)", pr_num)
# Archive source file (closes near-duplicate loop — Ganymede review)
_archive_source_for_pr(branch, domain)
# Embed new/changed claims into Qdrant (non-fatal)
await _embed_merged_claims(main_sha, branch_sha)
# Add reciprocal edges on existing claims (non-fatal)
# New claim A with supports:[B] → add supports:[A] on B's frontmatter
await _reciprocal_edges(main_sha, branch_sha)
# Cascade: notify agents whose beliefs/positions depend on changed claims
try:
await cascade_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn)
except Exception:
logger.exception("PR #%d: cascade failed (non-fatal)", pr_num)
# Cross-domain citation index: log entity-based connections between domains
try:
await cross_domain_after_merge(main_sha, branch_sha, pr_num, config.MAIN_WORKTREE, conn=conn)
except Exception:
logger.exception("PR #%d: cross_domain failed (non-fatal)", pr_num)
conn.commit() # Commit DB writes before slow branch deletion
# Delete remote branch immediately (Ganymede Q4)
await _delete_remote_branch(branch)
# Prune local worktree metadata
await _git("worktree", "prune")
succeeded += 1
return succeeded, failed
# --- Main entry point ---
async def _reconcile_db_state(conn):
"""Reconcile pipeline DB against Forgejo's actual PR state.
Fixes ghost PRs: DB says 'conflict' or 'open' but Forgejo says merged/closed.
Also detects deleted branches (rev-parse failures). (Leo's structural fix #1)
Run at the start of each merge cycle.
"""
stale = conn.execute(
"SELECT number, branch, status FROM prs WHERE status IN ('conflict', 'open', 'reviewing')"
).fetchall()
if not stale:
return
reconciled = 0
for row in stale:
pr_number = row["number"]
branch = row["branch"]
db_status = row["status"]
# Check Forgejo PR state
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if not pr_info:
continue
forgejo_state = pr_info.get("state", "")
is_merged = pr_info.get("merged", False)
if is_merged and db_status != "merged":
conn.execute(
"UPDATE prs SET status = 'merged', merged_at = datetime('now') WHERE number = ?",
(pr_number,),
)
reconciled += 1
continue
if forgejo_state == "closed" and not is_merged and db_status not in ("closed",):
# Clean up branch too — stale branches get rediscovered as new PRs
# (Ship: prevents reweave flood where closed PRs leave branches that
# trigger discover_external_prs → new PR → fail → close → repeat)
if branch:
await _delete_remote_branch(branch)
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'reconciled: closed on Forgejo' WHERE number = ?",
(pr_number,),
)
reconciled += 1
continue
# Ghost PR detection: branch deleted but PR still open in DB (Fix #2)
# Ganymede: rc != 0 means remote unreachable — skip, don't close
if db_status in ("open", "reviewing") and branch:
rc, ls_out = await _git("ls-remote", "--heads", "origin", branch, timeout=10)
if rc != 0:
logger.warning("ls-remote failed for %s — skipping ghost check", branch)
continue
if not ls_out.strip():
# Branch gone — close PR on Forgejo and in DB (Ganymede: don't leave orphans)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
body={"state": "closed"},
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
body={"body": "Auto-closed: branch deleted from remote."},
)
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'reconciled: branch deleted' WHERE number = ?",
(pr_number,),
)
logger.info("Ghost PR #%d: branch %s deleted, closing", pr_number, branch)
reconciled += 1
if reconciled:
logger.info("Reconciled %d stale PRs against Forgejo state", reconciled)
MAX_CONFLICT_REBASE_ATTEMPTS = 3
async def _handle_permanent_conflicts(conn) -> int:
"""Close conflict_permanent PRs and file their sources correctly.
When a PR fails rebase 3x, the claims are already on main from the first
successful extraction. The source should live in archive/{domain}/ (one copy).
Any duplicate in queue/ gets deleted. No requeuing — breaks the infinite loop.
Hygiene (Cory): one source file, one location, no duplicates.
Reviewed by Ganymede: commit moves, use shutil.move, batch commit at end.
"""
rows = conn.execute(
"""SELECT number, branch, domain
FROM prs
WHERE status = 'conflict_permanent'
ORDER BY number ASC"""
).fetchall()
if not rows:
return 0
handled = 0
files_changed = False
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
for row in rows:
pr_number = row["number"]
branch = row["branch"]
domain = row["domain"] or "unknown"
# Close PR on Forgejo
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
body={"state": "closed"},
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
body={"body": (
"Closed by conflict auto-resolver: rebase failed 3 times (enrichment conflict). "
"Claims already on main from prior extraction. Source filed in archive."
)},
)
await _delete_remote_branch(branch)
# File the source: one copy in archive/{domain}/, delete duplicates
source_slug = branch.replace("extract/", "", 1) if branch.startswith("extract/") else None
if source_slug:
filename = f"{source_slug}.md"
archive_dir = os.path.join(main_dir, "inbox", "archive", domain)
archive_path = os.path.join(archive_dir, filename)
queue_path = os.path.join(main_dir, "inbox", "queue", filename)
already_archived = os.path.exists(archive_path)
if already_archived:
if os.path.exists(queue_path):
try:
os.remove(queue_path)
logger.info("PR #%d: deleted queue duplicate %s (already in archive/%s)",
pr_number, filename, domain)
files_changed = True
except Exception as e:
logger.warning("PR #%d: failed to delete queue duplicate: %s", pr_number, e)
else:
logger.info("PR #%d: source already in archive/%s, no cleanup needed", pr_number, domain)
else:
if os.path.exists(queue_path):
os.makedirs(archive_dir, exist_ok=True)
try:
shutil.move(queue_path, archive_path)
logger.info("PR #%d: filed source to archive/%s: %s", pr_number, domain, filename)
files_changed = True
except Exception as e:
logger.warning("PR #%d: failed to file source: %s", pr_number, e)
else:
logger.warning("PR #%d: source not found in queue or archive for %s", pr_number, filename)
# Clear batch-state marker
state_marker = f"/opt/teleo-eval/batch-state/{source_slug}.done"
try:
if os.path.exists(state_marker):
os.remove(state_marker)
except Exception:
pass
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'conflict_permanent: closed + filed in archive' WHERE number = ?",
(pr_number,),
)
handled += 1
logger.info("Permanent conflict handled: PR #%d closed, source filed", pr_number)
# Batch commit source moves to main (Ganymede: follow entity_batch pattern)
if files_changed:
await _git("add", "-A", "inbox/", cwd=main_dir)
rc, out = await _git(
"commit", "-m",
f"pipeline: archive {handled} conflict-closed source(s)\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
cwd=main_dir,
)
if rc == 0:
# Push with pull-rebase retry (entity_batch pattern)
for attempt in range(3):
await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30)
if rc_push == 0:
logger.info("Committed + pushed source archive moves for %d PRs", handled)
break
await asyncio.sleep(2)
else:
logger.warning("Failed to push source archive moves after 3 attempts")
await _git("reset", "--hard", "origin/main", cwd=main_dir)
if handled:
logger.info("Handled %d permanent conflict PRs (closed + filed)", handled)
return handled
async def _retry_conflict_prs(conn) -> tuple[int, int]:
"""Retry conflict PRs via cherry-pick onto fresh main.
Design: Ganymede (extend merge stage), Rhea (safety guards), Leo (re-eval required).
- Pick up PRs with status='conflict' and both approvals
- Cherry-pick extraction commits onto fresh branch from origin/main
- If cherry-pick succeeds: force-push, reset to 'open' with verdicts cleared for re-eval
- If cherry-pick fails: increment attempt counter, leave as 'conflict'
- After MAX_CONFLICT_REBASE_ATTEMPTS failures: mark 'conflict_permanent'
- Skip branches with new commits since conflict was set (Rhea: someone is working on it)
"""
rows = conn.execute(
"""SELECT number, branch, conflict_rebase_attempts
FROM prs
WHERE status = 'conflict'
AND COALESCE(conflict_rebase_attempts, 0) < ?
ORDER BY number ASC""",
(MAX_CONFLICT_REBASE_ATTEMPTS,),
).fetchall()
if not rows:
return 0, 0
resolved = 0
failed = 0
for row in rows:
pr_number = row["number"]
branch = row["branch"]
attempts = row["conflict_rebase_attempts"] or 0
# Reweave branches modify existing files — cherry-pick will always fail.
# Close immediately and delete branch. Next nightly reweave creates fresh.
# (Ship: prevents wasting 3 retry cycles on branches that can never cherry-pick)
if branch.startswith("reweave/"):
logger.info("Reweave PR #%d: skipping retry, closing + deleting branch", pr_number)
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'reweave: closed (retry skipped, next nightly creates fresh)' WHERE number = ?",
(pr_number,),
)
await forgejo_api("PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"})
await forgejo_api("POST", repo_path(f"issues/{pr_number}/comments"),
{"body": "Reweave conflict — closing instead of retrying. Cherry-pick always fails on reweave branches (they modify existing files). Next nightly reweave will create a fresh branch from current main."})
await _delete_remote_branch(branch)
failed += 1
continue
logger.info("Conflict retry [%d/%d] PR #%d branch=%s",
attempts + 1, MAX_CONFLICT_REBASE_ATTEMPTS, pr_number, branch)
# Fetch latest remote state
await _git("fetch", "origin", branch, timeout=30)
await _git("fetch", "origin", "main", timeout=30)
# Attempt cherry-pick onto fresh main (replaces rebase — Leo+Cory directive)
ok, msg = await _cherry_pick_onto_main(branch)
if ok:
# Rebase succeeded — reset for re-eval (Ganymede: approvals are stale after rebase)
conn.execute(
"""UPDATE prs
SET status = 'open',
leo_verdict = 'pending',
domain_verdict = 'pending',
eval_attempts = 0,
conflict_rebase_attempts = ?
WHERE number = ?""",
(attempts + 1, pr_number),
)
logger.info("Conflict resolved: PR #%d rebased successfully, reset for re-eval", pr_number)
resolved += 1
else:
new_attempts = attempts + 1
if new_attempts >= MAX_CONFLICT_REBASE_ATTEMPTS:
conn.execute(
"""UPDATE prs
SET status = 'conflict_permanent',
conflict_rebase_attempts = ?,
last_error = ?
WHERE number = ?""",
(new_attempts, f"rebase failed {MAX_CONFLICT_REBASE_ATTEMPTS}x: {msg[:200]}", pr_number),
)
logger.warning("Conflict permanent: PR #%d failed %d rebase attempts: %s",
pr_number, new_attempts, msg[:100])
else:
conn.execute(
"""UPDATE prs
SET conflict_rebase_attempts = ?,
last_error = ?
WHERE number = ?""",
(new_attempts, f"rebase attempt {new_attempts}: {msg[:200]}", pr_number),
)
logger.info("Conflict retry failed: PR #%d attempt %d/%d: %s",
pr_number, new_attempts, MAX_CONFLICT_REBASE_ATTEMPTS, msg[:100])
failed += 1
if resolved or failed:
logger.info("Conflict retry: %d resolved, %d failed", resolved, failed)
return resolved, failed
async def merge_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one merge cycle across all domains.
0. Reconcile DB state against Forgejo (catch ghost PRs)
0.5. Retry conflict PRs (rebase onto current main)
1. Discover external PRs (multiplayer v1)
2. Find all domains with approved PRs
3. Launch one async task per domain (cross-domain parallel, same-domain serial)
"""
# Step 0: Reconcile stale DB entries
await _reconcile_db_state(conn)
# Step 0.5: Retry conflict PRs (Ganymede: before normal merge, same loop)
await _retry_conflict_prs(conn)
# Step 0.6: Handle permanent conflicts (close + requeue for re-extraction)
await _handle_permanent_conflicts(conn)
# Step 1: Discover external PRs
await discover_external_prs(conn)
# Step 2: Find domains with approved work
rows = conn.execute("SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL").fetchall()
domains = [r["domain"] for r in rows]
# Also check for NULL-domain PRs (human PRs with undetected domain)
null_domain = conn.execute("SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL").fetchone()
if null_domain and null_domain["c"] > 0:
logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", null_domain["c"])
if not domains:
return 0, 0
# Step 3: Merge all domains concurrently
tasks = [_merge_domain_queue(conn, domain) for domain in domains]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_succeeded = 0
total_failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.exception("Domain %s merge failed with exception", domains[i])
total_failed += 1
else:
s, f = result
total_succeeded += s
total_failed += f
if total_succeeded or total_failed:
logger.info(
"Merge cycle: %d succeeded, %d failed across %d domains", total_succeeded, total_failed, len(domains)
)
# Batch commit source moves (Ganymede: one commit per cycle, not per PR)
await _commit_source_moves()
return total_succeeded, total_failed