teleo-infrastructure/lib/merge.py
m3taversal 5f554bc2de
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
feat: atomic extract-and-connect + stale PR monitor + response audit
Atomic extract-and-connect (lib/connect.py):
- After extraction writes claim files, each new claim is embedded via
  OpenRouter, searched against Qdrant, and top-5 neighbors (cosine > 0.55)
  are added as `related` edges in the claim's frontmatter
- Edges written on NEW claim only — avoids merge conflicts
- Cross-domain connections enabled, non-fatal on Qdrant failure
- Wired into openrouter-extract-v2.py post-extraction step

Stale PR monitor (lib/stale_pr.py):
- Every watchdog cycle checks open extract/* PRs
- If open >30 min AND 0 claim files → auto-close with comment
- After 2 stale closures → marks source as extraction_failed
- Wired into watchdog.py as check #6

Response audit system:
- response_audit table (migration v8), persistent audit conn in bot.py
- 90-day retention cleanup, tool_calls JSON column
- Confidence tag stripping, systemd ReadWritePaths for pipeline.db

Supporting infrastructure:
- reweave.py: nightly edge reconnection for orphan claims
- reconcile-sources.py: source status reconciliation
- backfill-domains.py: domain classification backfill
- ops/reconcile-source-status.sh: operational reconciliation script
- Attribution improvements, post-extract enrichments, merge improvements

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:34:20 +00:00

1318 lines
53 KiB
Python

"""Merge stage — domain-serialized priority queue with rebase-before-merge.
Design reviewed by Ganymede (round 2) and Rhea. Key decisions:
- Two-layer locking: asyncio.Lock per domain (fast path) + prs.status (crash recovery)
- Rebase-before-merge with pinned force-with-lease SHA (Ganymede)
- Priority queue: COALESCE(p.priority, s.priority, 'medium') — PR > source > default
- Human PRs default to 'high', not 'critical' (Ganymede — prevents DoS on pipeline)
- 5-minute merge timeout — force-reset to 'conflict' (Rhea)
- Ack comment on human PR discovery (Rhea)
- Pagination on all Forgejo list endpoints (Ganymede standing rule)
"""
import asyncio
import json
import logging
import os
import random
import shutil
from collections import defaultdict
from . import config, db
from .db import classify_branch
from .domains import detect_domain_from_branch
from .forgejo import api as forgejo_api
# Import worktree lock — file at /opt/teleo-eval/pipeline/lib/worktree_lock.py
try:
from .worktree_lock import async_main_worktree_lock
except ImportError:
import sys
sys.path.insert(0, os.path.dirname(__file__))
from worktree_lock import async_main_worktree_lock
from .forgejo import get_agent_token, get_pr_diff, repo_path
logger = logging.getLogger("pipeline.merge")
# In-memory domain locks — fast path, lost on crash (durable layer is prs.status)
_domain_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
# Merge timeout: if a PR stays 'merging' longer than this, force-reset (Rhea)
MERGE_TIMEOUT_SECONDS = 300 # 5 minutes
# --- Git helpers ---
async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
"""Run a git command async. Returns (returncode, stdout+stderr)."""
proc = await asyncio.create_subprocess_exec(
"git",
*args,
cwd=cwd or str(config.REPO_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, f"git {args[0]} timed out after {timeout}s"
output = (stdout or b"").decode().strip()
if stderr:
output += "\n" + stderr.decode().strip()
return proc.returncode, output
# --- PR Discovery (Multiplayer v1) ---
async def discover_external_prs(conn) -> int:
"""Scan Forgejo for open PRs not tracked in SQLite.
Human PRs (non-pipeline author) get priority 'high' and origin 'human'.
Critical is reserved for explicit human override only. (Ganymede)
Pagination on all Forgejo list endpoints. (Ganymede standing rule #5)
"""
known = {r["number"] for r in conn.execute("SELECT number FROM prs").fetchall()}
discovered = 0
page = 1
while True:
prs = await forgejo_api(
"GET",
repo_path(f"pulls?state=open&limit=50&page={page}"),
)
if not prs:
break
for pr in prs:
if pr["number"] not in known:
# Detect origin: pipeline agents have per-agent Forgejo users
pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"}
author = pr.get("user", {}).get("login", "")
is_pipeline = author.lower() in pipeline_users
origin = "pipeline" if is_pipeline else "human"
priority = "high" if origin == "human" else None
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
agent, commit_type = classify_branch(pr["head"]["ref"])
conn.execute(
"""INSERT OR IGNORE INTO prs
(number, branch, status, origin, priority, domain, agent, commit_type)
VALUES (?, ?, 'open', ?, ?, ?, ?, ?)""",
(pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type),
)
db.audit(
conn,
"merge",
"pr_discovered",
json.dumps(
{
"pr": pr["number"],
"origin": origin,
"author": pr.get("user", {}).get("login"),
"priority": priority or "inherited",
}
),
)
# Ack comment on human PRs so contributor feels acknowledged (Rhea)
if origin == "human":
await _post_ack_comment(pr["number"])
discovered += 1
if len(prs) < 50:
break # Last page
page += 1
if discovered:
logger.info("Discovered %d external PRs", discovered)
return discovered
async def _post_ack_comment(pr_number: int):
"""Post acknowledgment comment on human-submitted PR. (Rhea)
Contributor should feel acknowledged immediately, not wonder if
their PR disappeared into a void.
"""
body = (
"Thanks for the contribution! Your PR is queued for evaluation "
"(priority: high). Expected review time: ~5 minutes.\n\n"
"_This is an automated message from the Teleo pipeline._"
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": body},
)
# --- Merge operations ---
async def _claim_next_pr(conn, domain: str) -> dict | None:
"""Claim the next approved PR for a domain via atomic UPDATE.
Priority inheritance: COALESCE(p.priority, s.priority, 'medium')
- Explicit PR priority (human PRs) > source priority (pipeline) > default medium
- NULL priorities fall to ELSE 4, which ranks below explicit 'medium' (WHEN 2)
- This is intentional: unclassified PRs don't jump ahead of triaged ones
(Rhea: document the precedence for future maintainers)
NOT EXISTS enforces domain serialization in SQL — defense-in-depth even if
asyncio.Lock is bypassed. (Ganymede: approved)
"""
row = conn.execute(
"""UPDATE prs SET status = 'merging', last_attempt = datetime('now')
WHERE number = (
SELECT p.number FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'approved'
AND p.domain = ?
AND NOT EXISTS (
SELECT 1 FROM prs p2
WHERE p2.domain = p.domain
AND p2.status = 'merging'
)
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
-- Dependency ordering: PRs with fewer broken wiki links merge first.
-- "Creator" PRs (0 broken links) land before "consumer" PRs that
-- reference them, naturally resolving the dependency chain. (Rhea+Ganymede)
CASE WHEN p.eval_issues LIKE '%broken_wiki_links%' THEN 1 ELSE 0 END,
p.created_at ASC
LIMIT 1
)
RETURNING number, source_path, branch, domain""",
(domain,),
).fetchone()
return dict(row) if row else None
async def _rebase_and_push(branch: str) -> tuple[bool, str]:
"""Rebase branch onto main and force-push with pinned SHA.
Always use --force-with-lease with pinned SHA for ALL branches —
pipeline and human. No split logic. (Ganymede)
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
# Create worktree for the branch
rc, out = await _git("worktree", "add", worktree_path, branch)
if rc != 0:
return False, f"worktree add failed: {out}"
try:
# Capture expected SHA before rebase (Ganymede: pin for force-with-lease)
rc, expected_sha = await _git("rev-parse", f"origin/{branch}", cwd=worktree_path)
if rc != 0:
return False, f"rev-parse failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0] # First line only
# Fetch latest main
rc, out = await _git("fetch", "origin", "main", cwd=worktree_path)
if rc != 0:
return False, f"fetch failed: {out}"
# Check if rebase is needed
rc, merge_base = await _git("merge-base", "origin/main", "HEAD", cwd=worktree_path)
rc2, main_sha = await _git("rev-parse", "origin/main", cwd=worktree_path)
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
# Already up to date, no rebase needed
return True, "already up to date"
# Rebase onto main
rc, out = await _git("rebase", "origin/main", cwd=worktree_path, timeout=120)
if rc != 0:
# Rebase conflict — check if all conflicts are entity files.
# Entity enrichments are additive and recoverable from source
# archives. Drop them (take main's version) to unblock claims.
rc_ls, conflicting = await _git("diff", "--name-only", "--diff-filter=U", cwd=worktree_path)
conflict_files = [f.strip() for f in conflicting.split("\n") if f.strip()] if rc_ls == 0 else []
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
# All conflicts are entity files — resolve with main's version.
# Loop: rebase may conflict on multiple commits touching entities.
dropped_entities: set[str] = set()
max_rounds = 20 # safety cap — no PR should have 20+ conflicting commits
for _ in range(max_rounds):
for cf in conflict_files:
await _git("checkout", "--ours", cf, cwd=worktree_path)
await _git("add", cf, cwd=worktree_path)
dropped_entities.update(conflict_files)
# GIT_EDITOR=true prevents interactive editor on rebase --continue
rc_cont, cont_out = await _git(
"-c", "core.editor=true", "rebase", "--continue", cwd=worktree_path, timeout=60
)
if rc_cont == 0:
break # Rebase complete
# Another conflict — check if still entity-only
rc_ls2, conflicting2 = await _git("diff", "--name-only", "--diff-filter=U", cwd=worktree_path)
conflict_files = [f.strip() for f in conflicting2.split("\n") if f.strip()] if rc_ls2 == 0 else []
if not conflict_files or not all(f.startswith("entities/") for f in conflict_files):
await _git("rebase", "--abort", cwd=worktree_path)
return False, f"rebase conflict (non-entity file): {cont_out}"
else:
# Exceeded max rounds
await _git("rebase", "--abort", cwd=worktree_path)
return False, f"rebase conflict (exceeded {max_rounds} entity resolution rounds)"
logger.info(
"Rebase conflict auto-resolved: dropped entity changes in %s (recoverable from source)",
", ".join(sorted(dropped_entities)),
)
else:
await _git("rebase", "--abort", cwd=worktree_path)
return False, f"rebase conflict: {out}"
# Force-push with pinned SHA (Ganymede: defeats tracking-ref update race)
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin",
f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
return True, "rebased and pushed"
finally:
# Cleanup worktree
await _git("worktree", "remove", "--force", worktree_path)
async def _resubmit_approvals(pr_number: int):
"""Re-submit 2 formal Forgejo approvals after force-push invalidated them.
Force-push (rebase) invalidates existing approvals. Branch protection
requires 2 approvals before the merge API will accept the request.
Same pattern as evaluate._post_formal_approvals.
"""
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved (post-rebase re-approval).", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug(
"Post-rebase approval for PR #%d by %s (%d/2)",
pr_number, agent_name, approvals,
)
if approvals < 2:
logger.warning(
"Only %d/2 approvals submitted for PR #%d after rebase",
approvals, pr_number,
)
async def _merge_pr(pr_number: int) -> tuple[bool, str]:
"""Merge PR via Forgejo API. CURRENTLY UNUSED — local ff-push is the primary merge path.
Kept as fallback: re-enable if Forgejo fixes the 405 bug (Ganymede's API-first design).
The local ff-push in _merge_domain_queue replaced this due to persistent 405 errors.
"""
# Check if already merged/closed on Forgejo (prevents 405 on re-merge attempts)
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if pr_info:
if pr_info.get("merged"):
logger.info("PR #%d already merged on Forgejo, syncing status", pr_number)
return True, "already merged"
if pr_info.get("state") == "closed":
logger.warning("PR #%d closed on Forgejo but not merged", pr_number)
return False, "PR closed without merge"
# Merge whitelist only allows leo and m3taversal — use Leo's token
leo_token = get_agent_token("leo")
if not leo_token:
return False, "no leo token for merge (merge whitelist requires leo)"
# Pre-flight: verify approvals exist before attempting merge (Rhea: catches 405)
reviews = await forgejo_api("GET", repo_path(f"pulls/{pr_number}/reviews"))
if reviews is not None:
approval_count = sum(1 for r in reviews if r.get("state") == "APPROVED")
if approval_count < 2:
logger.info("PR #%d: only %d/2 approvals, resubmitting before merge", pr_number, approval_count)
await _resubmit_approvals(pr_number)
# Retry with backoff + jitter for transient errors (Rhea: jitter prevents thundering herd)
delays = [0, 5, 15, 45]
for attempt, base_delay in enumerate(delays, 1):
if base_delay:
jittered = base_delay * (0.8 + random.random() * 0.4)
await asyncio.sleep(jittered)
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/merge"),
{"Do": "merge", "merge_message_field": ""},
token=leo_token,
)
if result is not None:
return True, "merged"
# Check if merge succeeded despite API error (timeout case — Rhea)
pr_check = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if pr_check and pr_check.get("merged"):
return True, "already merged"
# Distinguish transient from permanent failures (Ganymede)
if pr_check and not pr_check.get("mergeable", True):
# PR not mergeable — branch diverged or conflict. Rebase needed, not retry.
return False, "merge rejected: PR not mergeable (needs rebase)"
if attempt < len(delays):
logger.info("PR #%d: merge attempt %d failed (transient), retrying in %.0fs",
pr_number, attempt, delays[attempt] if attempt < len(delays) else 0)
return False, "Forgejo merge API failed after 4 attempts (transient)"
async def _delete_remote_branch(branch: str):
"""Delete remote branch immediately after merge. (Ganymede Q4: immediate, not batch)
If DELETE fails, log and move on — stale branch is cosmetic,
stale merge is operational.
"""
result = await forgejo_api(
"DELETE",
repo_path(f"branches/{branch}"),
)
if result is None:
logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch)
# --- Contributor attribution ---
def _is_knowledge_pr(diff: str) -> bool:
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
Knowledge PRs get full CI attribution weight.
Pipeline-only PRs (inbox, entities, agents, archive) get zero CI weight.
Mixed PRs count as knowledge — if a PR adds a claim, it gets attribution
even if it also moves source files. Knowledge takes priority. (Ganymede review)
"""
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
for line in diff.split("\n"):
if line.startswith("+++ b/") or line.startswith("--- a/"):
path = line.split("/", 1)[1] if "/" in line else ""
if any(path.startswith(p) for p in knowledge_prefixes):
return True
return False
def _refine_commit_type(diff: str, branch_commit_type: str) -> str:
"""Refine commit_type from diff content when branch prefix is ambiguous.
Branch prefix gives initial classification (extract, research, entity, etc.).
For 'extract' branches, diff content can distinguish:
- challenge: adds challenged_by edges to existing claims
- enrich: modifies existing claim frontmatter without new files
- extract: creates new claim files (default for extract branches)
Only refines 'extract' type — other branch types (research, entity, reweave, fix)
are already specific enough.
"""
if branch_commit_type != "extract":
return branch_commit_type
new_files = 0
modified_files = 0
has_challenge_edge = False
in_diff_header = False
current_is_new = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
in_diff_header = True
current_is_new = False
elif line.startswith("new file"):
current_is_new = True
elif line.startswith("+++ b/"):
path = line[6:]
if any(path.startswith(p) for p in ("domains/", "core/", "foundations/")):
if current_is_new:
new_files += 1
else:
modified_files += 1
in_diff_header = False
elif line.startswith("+") and not line.startswith("+++"):
if "challenged_by:" in line or "challenges:" in line:
has_challenge_edge = True
if has_challenge_edge and new_files == 0:
return "challenge"
if modified_files > 0 and new_files == 0:
return "enrich"
return "extract"
async def _record_contributor_attribution(conn, pr_number: int, branch: str):
"""Record contributor attribution after a successful merge.
Parses git trailers and claim frontmatter to identify contributors
and their roles. Upserts into contributors table. Refines commit_type
from diff content. Pipeline-only PRs (no knowledge files) are skipped.
"""
import re as _re
from datetime import date as _date, datetime as _dt
today = _date.today().isoformat()
# Get the PR diff to parse claim frontmatter for attribution blocks
diff = await get_pr_diff(pr_number)
if not diff:
return
# Pipeline-only PRs (inbox, entities, agents) don't count toward CI
if not _is_knowledge_pr(diff):
logger.info("PR #%d: pipeline-only commit — skipping CI attribution", pr_number)
return
# Refine commit_type from diff content (branch prefix may be too broad)
row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone()
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
refined_type = _refine_commit_type(diff, branch_type)
if refined_type != branch_type:
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
logger.info("PR #%d: commit_type refined %s%s", pr_number, branch_type, refined_type)
# Parse Pentagon-Agent trailer from branch commit messages
agents_found: set[str] = set()
rc, log_output = await _git(
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
timeout=10,
)
if rc == 0:
for match in _re.finditer(r"Pentagon-Agent:\s*(\S+)\s*<([^>]+)>", log_output):
agent_name = match.group(1).lower()
agent_uuid = match.group(2)
_upsert_contributor(
conn, agent_name, agent_uuid, "extractor", today,
)
agents_found.add(agent_name)
# Parse attribution blocks from claim frontmatter in diff
# Look for added lines with attribution YAML
current_role = None
for line in diff.split("\n"):
if not line.startswith("+") or line.startswith("+++"):
continue
stripped = line[1:].strip()
# Detect role sections in attribution block
for role in ("sourcer", "extractor", "challenger", "synthesizer", "reviewer"):
if stripped.startswith(f"{role}:"):
current_role = role
break
# Extract handle from attribution entries
handle_match = _re.match(r'-\s*handle:\s*["\']?([^"\']+)["\']?', stripped)
if handle_match and current_role:
handle = handle_match.group(1).strip().lower()
agent_id_match = _re.search(r'agent_id:\s*["\']?([^"\']+)', stripped)
agent_id = agent_id_match.group(1).strip() if agent_id_match else None
_upsert_contributor(conn, handle, agent_id, current_role, today)
# Fallback: if no attribution block found, credit the branch agent as extractor
if not agents_found:
# Try to infer agent from branch name (e.g., "extract/2026-03-05-...")
# The PR's agent field in SQLite is also available
row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
if row and row["agent"]:
_upsert_contributor(conn, row["agent"].lower(), None, "extractor", today)
# Increment claims_merged for all contributors on this PR
# (handled inside _upsert_contributor via the role counts)
def _upsert_contributor(
conn, handle: str, agent_id: str | None, role: str, date_str: str,
):
"""Upsert a contributor record, incrementing the appropriate role count."""
import json as _json
from datetime import datetime as _dt
role_col = f"{role}_count"
if role_col not in (
"sourcer_count", "extractor_count", "challenger_count",
"synthesizer_count", "reviewer_count",
):
logger.warning("Unknown contributor role: %s", role)
return
existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone()
if existing:
conn.execute(
f"""UPDATE contributors SET
{role_col} = {role_col} + 1,
claims_merged = claims_merged + CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END,
last_contribution = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(role, date_str, handle),
)
else:
conn.execute(
f"""INSERT INTO contributors (handle, agent_id, first_contribution, last_contribution, {role_col}, claims_merged)
VALUES (?, ?, ?, ?, 1, CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END)""",
(handle, agent_id, date_str, date_str, role),
)
# Recalculate tier
_recalculate_tier(conn, handle)
def _recalculate_tier(conn, handle: str):
"""Recalculate contributor tier based on config rules."""
from datetime import date as _date, datetime as _dt
row = conn.execute(
"SELECT claims_merged, challenges_survived, first_contribution, tier FROM contributors WHERE handle = ?",
(handle,),
).fetchone()
if not row:
return
current_tier = row["tier"]
claims_merged = row["claims_merged"] or 0
challenges_survived = row["challenges_survived"] or 0
first_contribution = row["first_contribution"]
days_since_first = 0
if first_contribution:
try:
first_date = _dt.strptime(first_contribution, "%Y-%m-%d").date()
days_since_first = (_date.today() - first_date).days
except ValueError:
pass
# Check veteran first (higher tier)
vet_rules = config.CONTRIBUTOR_TIER_RULES["veteran"]
if (claims_merged >= vet_rules["claims_merged"]
and days_since_first >= vet_rules["min_days_since_first"]
and challenges_survived >= vet_rules["challenges_survived"]):
new_tier = "veteran"
elif claims_merged >= config.CONTRIBUTOR_TIER_RULES["contributor"]["claims_merged"]:
new_tier = "contributor"
else:
new_tier = "new"
if new_tier != current_tier:
conn.execute(
"UPDATE contributors SET tier = ?, updated_at = datetime('now') WHERE handle = ?",
(new_tier, handle),
)
logger.info("Contributor %s: tier %s%s", handle, current_tier, new_tier)
db.audit(
conn, "contributor", "tier_change",
json.dumps({"handle": handle, "from": current_tier, "to": new_tier}),
)
# --- Source archiving after merge (Ganymede review: closes near-duplicate loop) ---
# Accumulates source moves during a merge cycle, batch-committed at the end
_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path)
def _update_source_frontmatter_status(path: str, new_status: str):
"""Update the status field in a source file's frontmatter. (Ganymede: 5 lines)"""
import re as _re
try:
text = open(path).read()
text = _re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=_re.MULTILINE)
open(path, "w").write(text)
except Exception as e:
logger.warning("Failed to update source status in %s: %s", path, e)
async def _embed_merged_claims(main_sha: str, branch_sha: str):
"""Embed new/changed claim files from a merged PR into Qdrant.
Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip)
to find ALL changed files across the entire branch, not just the last commit.
Also deletes Qdrant vectors for files removed by the branch.
Non-fatal — embedding failure does not block the merge pipeline.
"""
try:
# --- Embed added/changed files ---
rc, diff_out = await _git(
"diff", "--name-only", "--diff-filter=ACMR",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc != 0:
logger.warning("embed: diff failed (rc=%d), skipping", rc)
return
embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"}
md_files = [
f for f in diff_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in embed_dirs)
and not f.split("/")[-1].startswith("_")
]
embedded = 0
for fpath in md_files:
full_path = config.MAIN_WORKTREE / fpath
if not full_path.exists():
continue
proc = await asyncio.create_subprocess_exec(
"python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
if proc.returncode == 0 and b"OK" in stdout:
embedded += 1
else:
logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200])
if embedded:
logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files))
# --- Delete vectors for removed files (Ganymede: stale vector cleanup) ---
rc, del_out = await _git(
"diff", "--name-only", "--diff-filter=D",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc == 0 and del_out.strip():
deleted_files = [
f for f in del_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in embed_dirs)
]
if deleted_files:
import hashlib
point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files]
try:
import urllib.request
req = urllib.request.Request(
"http://localhost:6333/collections/teleo-claims/points/delete",
data=json.dumps({"points": point_ids}).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=10)
logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids))
except Exception:
logger.warning("embed: failed to delete stale vectors (non-fatal)")
except Exception:
logger.exception("embed: post-merge embedding failed (non-fatal)")
def _archive_source_for_pr(branch: str, domain: str, merged: bool = True):
"""Move source from queue/ to archive/{domain}/ after PR merge or close.
Only handles extract/ branches (Ganymede: skip research sessions).
Updates frontmatter: 'processed' for merged, 'rejected' for closed.
Accumulates moves for batch commit at end of merge cycle.
"""
if not branch.startswith("extract/"):
return
source_slug = branch.replace("extract/", "", 1)
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md")
archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown")
archive_path = os.path.join(archive_dir, f"{source_slug}.md")
# Already in archive? Delete queue duplicate
if os.path.exists(archive_path):
if os.path.exists(queue_path):
try:
os.remove(queue_path)
_pending_source_moves.append((queue_path, "deleted"))
logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain)
except Exception as e:
logger.warning("Source dedup failed: %s", e)
return
# Move from queue to archive
if os.path.exists(queue_path):
# Update frontmatter before moving (Ganymede: distinguish merged vs rejected)
_update_source_frontmatter_status(queue_path, "processed" if merged else "rejected")
os.makedirs(archive_dir, exist_ok=True)
try:
shutil.move(queue_path, archive_path)
_pending_source_moves.append((queue_path, archive_path))
logger.info("Source archived: queue/%s → archive/%s/ (status=%s)",
source_slug, domain, "processed" if merged else "rejected")
except Exception as e:
logger.warning("Source archive failed: %s", e)
async def _commit_source_moves():
"""Batch commit accumulated source moves. Called at end of merge cycle.
Rhea review: fetch+reset before touching files, use main_worktree_lock,
crash gap is self-healing (reset --hard reverts uncommitted moves).
"""
if not _pending_source_moves:
return
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
count = len(_pending_source_moves)
_pending_source_moves.clear()
# Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C)
try:
async with async_main_worktree_lock(timeout=10):
# Sync worktree with remote (Rhea: fetch+reset, not pull)
await _git("fetch", "origin", "main", cwd=main_dir, timeout=30)
await _git("reset", "--hard", "origin/main", cwd=main_dir, timeout=30)
await _git("add", "-A", "inbox/", cwd=main_dir)
rc, out = await _git(
"commit", "-m",
f"pipeline: archive {count} source(s) post-merge\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
cwd=main_dir,
)
if rc != 0:
if "nothing to commit" in out:
return
logger.warning("Source archive commit failed: %s", out)
return
for attempt in range(3):
await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30)
if rc_push == 0:
logger.info("Committed + pushed %d source archive moves", count)
return
await asyncio.sleep(2)
logger.warning("Failed to push source archive moves after 3 attempts")
await _git("reset", "--hard", "origin/main", cwd=main_dir)
except TimeoutError:
logger.warning("Source archive commit skipped: worktree lock timeout")
# --- Domain merge task ---
async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
"""Process the merge queue for a single domain. Returns (succeeded, failed)."""
succeeded = 0
failed = 0
while True:
async with _domain_locks[domain]:
pr = await _claim_next_pr(conn, domain)
if not pr:
break # No more approved PRs for this domain
pr_num = pr["number"]
branch = pr["branch"]
logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain)
try:
# Rebase with timeout (Rhea: 5 min max, then force-reset to conflict)
rebase_ok, rebase_msg = await asyncio.wait_for(
_rebase_and_push(branch),
timeout=MERGE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
logger.error(
"PR #%d merge timed out after %ds — resetting to conflict (Rhea)", pr_num, MERGE_TIMEOUT_SECONDS
)
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(f"merge timed out after {MERGE_TIMEOUT_SECONDS}s", pr_num),
)
db.audit(conn, "merge", "timeout", json.dumps({"pr": pr_num, "timeout_seconds": MERGE_TIMEOUT_SECONDS}))
failed += 1
continue
if not rebase_ok:
# Retry once — main may have changed from a merge earlier in this cycle.
# Claim enrichments that append to the same file often auto-resolve on
# a fresh rebase against the just-updated main. (Ganymede, Mar 14)
logger.info("PR #%d rebase failed, retrying once: %s", pr_num, rebase_msg[:100])
try:
rebase_ok, rebase_msg = await asyncio.wait_for(
_rebase_and_push(branch),
timeout=MERGE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
rebase_ok = False
rebase_msg = f"retry timed out after {MERGE_TIMEOUT_SECONDS}s"
if not rebase_ok:
logger.warning("PR #%d rebase retry also failed: %s", pr_num, rebase_msg)
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(rebase_msg[:500], pr_num),
)
db.audit(conn, "merge", "rebase_failed", json.dumps({"pr": pr_num, "error": rebase_msg[:200], "retried": True}))
failed += 1
continue
logger.info("PR #%d rebase retry succeeded", pr_num)
# Local ff-merge: push rebased branch as main (Rhea's approach, Leo+Rhea: local primary)
# The branch was just rebased onto origin/main by _rebase_and_push,
# so origin/{branch} is a descendant of origin/main. Push it as main.
await _git("fetch", "origin", branch, timeout=15)
rc, main_sha = await _git("rev-parse", "origin/main")
main_sha = main_sha.strip() if rc == 0 else ""
rc, branch_sha = await _git("rev-parse", f"origin/{branch}")
branch_sha = branch_sha.strip() if rc == 0 else ""
merge_ok = False
merge_msg = ""
if branch_sha:
rc, out = await _git(
"push", f"--force-with-lease=main:{main_sha}",
"origin", f"{branch_sha}:main",
timeout=30,
)
if rc == 0:
merge_ok = True
merge_msg = f"merged (local ff-push, SHA: {branch_sha[:8]})"
# Close PR on Forgejo with merge SHA comment
leo_token = get_agent_token("leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": f"Merged locally.\nMerge SHA: `{branch_sha}`\nBranch: `{branch}`"},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_num}"),
{"state": "closed"},
token=leo_token,
)
else:
merge_msg = f"local ff-push failed: {out[:200]}"
else:
merge_msg = f"could not resolve origin/{branch}"
if not merge_ok:
logger.error("PR #%d merge failed: %s", pr_num, merge_msg)
conn.execute(
"UPDATE prs SET status = 'conflict', merge_cycled = 1, merge_failures = COALESCE(merge_failures, 0) + 1, last_error = ? WHERE number = ?",
(merge_msg[:500], pr_num),
)
db.audit(conn, "merge", "merge_failed", json.dumps({"pr": pr_num, "error": merge_msg[:200]}))
failed += 1
continue
# Success — update status and cleanup
conn.execute(
"""UPDATE prs SET status = 'merged',
merged_at = datetime('now'),
last_error = NULL
WHERE number = ?""",
(pr_num,),
)
db.audit(conn, "merge", "merged", json.dumps({"pr": pr_num, "branch": branch}))
logger.info("PR #%d merged successfully", pr_num)
# Record contributor attribution
try:
await _record_contributor_attribution(conn, pr_num, branch)
except Exception:
logger.exception("PR #%d: contributor attribution failed (non-fatal)", pr_num)
# Archive source file (closes near-duplicate loop — Ganymede review)
_archive_source_for_pr(branch, domain)
# Embed new/changed claims into Qdrant (non-fatal)
await _embed_merged_claims(main_sha, branch_sha)
# Delete remote branch immediately (Ganymede Q4)
await _delete_remote_branch(branch)
# Prune local worktree metadata
await _git("worktree", "prune")
succeeded += 1
return succeeded, failed
# --- Main entry point ---
async def _reconcile_db_state(conn):
"""Reconcile pipeline DB against Forgejo's actual PR state.
Fixes ghost PRs: DB says 'conflict' or 'open' but Forgejo says merged/closed.
Also detects deleted branches (rev-parse failures). (Leo's structural fix #1)
Run at the start of each merge cycle.
"""
stale = conn.execute(
"SELECT number, branch, status FROM prs WHERE status IN ('conflict', 'open', 'reviewing')"
).fetchall()
if not stale:
return
reconciled = 0
for row in stale:
pr_number = row["number"]
branch = row["branch"]
db_status = row["status"]
# Check Forgejo PR state
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_number}"))
if not pr_info:
continue
forgejo_state = pr_info.get("state", "")
is_merged = pr_info.get("merged", False)
if is_merged and db_status != "merged":
conn.execute(
"UPDATE prs SET status = 'merged', merged_at = datetime('now') WHERE number = ?",
(pr_number,),
)
reconciled += 1
continue
if forgejo_state == "closed" and not is_merged and db_status not in ("closed",):
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'reconciled: closed on Forgejo' WHERE number = ?",
(pr_number,),
)
reconciled += 1
continue
# Ghost PR detection: branch deleted but PR still open in DB (Fix #2)
# Ganymede: rc != 0 means remote unreachable — skip, don't close
if db_status in ("open", "reviewing") and branch:
rc, ls_out = await _git("ls-remote", "--heads", "origin", branch, timeout=10)
if rc != 0:
logger.warning("ls-remote failed for %s — skipping ghost check", branch)
continue
if not ls_out.strip():
# Branch gone — close PR on Forgejo and in DB (Ganymede: don't leave orphans)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
body={"state": "closed"},
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
body={"body": "Auto-closed: branch deleted from remote."},
)
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'reconciled: branch deleted' WHERE number = ?",
(pr_number,),
)
logger.info("Ghost PR #%d: branch %s deleted, closing", pr_number, branch)
reconciled += 1
if reconciled:
logger.info("Reconciled %d stale PRs against Forgejo state", reconciled)
MAX_CONFLICT_REBASE_ATTEMPTS = 3
async def _handle_permanent_conflicts(conn) -> int:
"""Close conflict_permanent PRs and file their sources correctly.
When a PR fails rebase 3x, the claims are already on main from the first
successful extraction. The source should live in archive/{domain}/ (one copy).
Any duplicate in queue/ gets deleted. No requeuing — breaks the infinite loop.
Hygiene (Cory): one source file, one location, no duplicates.
Reviewed by Ganymede: commit moves, use shutil.move, batch commit at end.
"""
rows = conn.execute(
"""SELECT number, branch, domain
FROM prs
WHERE status = 'conflict_permanent'
ORDER BY number ASC"""
).fetchall()
if not rows:
return 0
handled = 0
files_changed = False
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
for row in rows:
pr_number = row["number"]
branch = row["branch"]
domain = row["domain"] or "unknown"
# Close PR on Forgejo
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
body={"state": "closed"},
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
body={"body": (
"Closed by conflict auto-resolver: rebase failed 3 times (enrichment conflict). "
"Claims already on main from prior extraction. Source filed in archive."
)},
)
await _delete_remote_branch(branch)
# File the source: one copy in archive/{domain}/, delete duplicates
source_slug = branch.replace("extract/", "", 1) if branch.startswith("extract/") else None
if source_slug:
filename = f"{source_slug}.md"
archive_dir = os.path.join(main_dir, "inbox", "archive", domain)
archive_path = os.path.join(archive_dir, filename)
queue_path = os.path.join(main_dir, "inbox", "queue", filename)
already_archived = os.path.exists(archive_path)
if already_archived:
if os.path.exists(queue_path):
try:
os.remove(queue_path)
logger.info("PR #%d: deleted queue duplicate %s (already in archive/%s)",
pr_number, filename, domain)
files_changed = True
except Exception as e:
logger.warning("PR #%d: failed to delete queue duplicate: %s", pr_number, e)
else:
logger.info("PR #%d: source already in archive/%s, no cleanup needed", pr_number, domain)
else:
if os.path.exists(queue_path):
os.makedirs(archive_dir, exist_ok=True)
try:
shutil.move(queue_path, archive_path)
logger.info("PR #%d: filed source to archive/%s: %s", pr_number, domain, filename)
files_changed = True
except Exception as e:
logger.warning("PR #%d: failed to file source: %s", pr_number, e)
else:
logger.warning("PR #%d: source not found in queue or archive for %s", pr_number, filename)
# Clear batch-state marker
state_marker = f"/opt/teleo-eval/batch-state/{source_slug}.done"
try:
if os.path.exists(state_marker):
os.remove(state_marker)
except Exception:
pass
conn.execute(
"UPDATE prs SET status = 'closed', last_error = 'conflict_permanent: closed + filed in archive' WHERE number = ?",
(pr_number,),
)
handled += 1
logger.info("Permanent conflict handled: PR #%d closed, source filed", pr_number)
# Batch commit source moves to main (Ganymede: follow entity_batch pattern)
if files_changed:
await _git("add", "-A", "inbox/", cwd=main_dir)
rc, out = await _git(
"commit", "-m",
f"pipeline: archive {handled} conflict-closed source(s)\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
cwd=main_dir,
)
if rc == 0:
# Push with pull-rebase retry (entity_batch pattern)
for attempt in range(3):
await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30)
if rc_push == 0:
logger.info("Committed + pushed source archive moves for %d PRs", handled)
break
await asyncio.sleep(2)
else:
logger.warning("Failed to push source archive moves after 3 attempts")
await _git("reset", "--hard", "origin/main", cwd=main_dir)
if handled:
logger.info("Handled %d permanent conflict PRs (closed + filed)", handled)
return handled
async def _retry_conflict_prs(conn) -> tuple[int, int]:
"""Retry rebase on conflict PRs that were previously approved.
Design: Ganymede (extend merge stage), Rhea (safety guards), Leo (re-eval required).
- Pick up PRs with status='conflict' and both approvals
- Attempt fresh rebase onto origin/main
- If rebase succeeds: force-push, reset to 'open' with verdicts cleared for re-eval
- If rebase fails: increment attempt counter, leave as 'conflict'
- After MAX_CONFLICT_REBASE_ATTEMPTS failures: mark 'conflict_permanent'
- Skip branches with new commits since conflict was set (Rhea: someone is working on it)
"""
rows = conn.execute(
"""SELECT number, branch, conflict_rebase_attempts
FROM prs
WHERE status = 'conflict'
AND COALESCE(conflict_rebase_attempts, 0) < ?
ORDER BY number ASC""",
(MAX_CONFLICT_REBASE_ATTEMPTS,),
).fetchall()
if not rows:
return 0, 0
resolved = 0
failed = 0
for row in rows:
pr_number = row["number"]
branch = row["branch"]
attempts = row["conflict_rebase_attempts"] or 0
logger.info("Conflict retry [%d/%d] PR #%d branch=%s",
attempts + 1, MAX_CONFLICT_REBASE_ATTEMPTS, pr_number, branch)
# Fetch latest remote state
await _git("fetch", "origin", branch, timeout=30)
await _git("fetch", "origin", "main", timeout=30)
# Attempt rebase
ok, msg = await _rebase_and_push(branch)
if ok:
# Rebase succeeded — reset for re-eval (Ganymede: approvals are stale after rebase)
conn.execute(
"""UPDATE prs
SET status = 'open',
leo_verdict = 'pending',
domain_verdict = 'pending',
eval_attempts = 0,
conflict_rebase_attempts = ?
WHERE number = ?""",
(attempts + 1, pr_number),
)
logger.info("Conflict resolved: PR #%d rebased successfully, reset for re-eval", pr_number)
resolved += 1
else:
new_attempts = attempts + 1
if new_attempts >= MAX_CONFLICT_REBASE_ATTEMPTS:
conn.execute(
"""UPDATE prs
SET status = 'conflict_permanent',
conflict_rebase_attempts = ?,
last_error = ?
WHERE number = ?""",
(new_attempts, f"rebase failed {MAX_CONFLICT_REBASE_ATTEMPTS}x: {msg[:200]}", pr_number),
)
logger.warning("Conflict permanent: PR #%d failed %d rebase attempts: %s",
pr_number, new_attempts, msg[:100])
else:
conn.execute(
"""UPDATE prs
SET conflict_rebase_attempts = ?,
last_error = ?
WHERE number = ?""",
(new_attempts, f"rebase attempt {new_attempts}: {msg[:200]}", pr_number),
)
logger.info("Conflict retry failed: PR #%d attempt %d/%d: %s",
pr_number, new_attempts, MAX_CONFLICT_REBASE_ATTEMPTS, msg[:100])
failed += 1
if resolved or failed:
logger.info("Conflict retry: %d resolved, %d failed", resolved, failed)
return resolved, failed
async def merge_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one merge cycle across all domains.
0. Reconcile DB state against Forgejo (catch ghost PRs)
0.5. Retry conflict PRs (rebase onto current main)
1. Discover external PRs (multiplayer v1)
2. Find all domains with approved PRs
3. Launch one async task per domain (cross-domain parallel, same-domain serial)
"""
# Step 0: Reconcile stale DB entries
await _reconcile_db_state(conn)
# Step 0.5: Retry conflict PRs (Ganymede: before normal merge, same loop)
await _retry_conflict_prs(conn)
# Step 0.6: Handle permanent conflicts (close + requeue for re-extraction)
await _handle_permanent_conflicts(conn)
# Step 1: Discover external PRs
await discover_external_prs(conn)
# Step 2: Find domains with approved work
rows = conn.execute("SELECT DISTINCT domain FROM prs WHERE status = 'approved' AND domain IS NOT NULL").fetchall()
domains = [r["domain"] for r in rows]
# Also check for NULL-domain PRs (human PRs with undetected domain)
null_domain = conn.execute("SELECT COUNT(*) as c FROM prs WHERE status = 'approved' AND domain IS NULL").fetchone()
if null_domain and null_domain["c"] > 0:
logger.warning("%d approved PRs have NULL domain — skipping until eval assigns domain", null_domain["c"])
if not domains:
return 0, 0
# Step 3: Merge all domains concurrently
tasks = [_merge_domain_queue(conn, domain) for domain in domains]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_succeeded = 0
total_failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.exception("Domain %s merge failed with exception", domains[i])
total_failed += 1
else:
s, f = result
total_succeeded += s
total_failed += f
if total_succeeded or total_failed:
logger.info(
"Merge cycle: %d succeeded, %d failed across %d domains", total_succeeded, total_failed, len(domains)
)
# Batch commit source moves (Ganymede: one commit per cycle, not per PR)
await _commit_source_moves()
return total_succeeded, total_failed