teleo-infrastructure/lib/post_merge.py
m3taversal 46ad508de7
Some checks are pending
CI / lint-and-test (push) Waiting to run
Phase 6b: extract post_merge.py from merge.py — post-merge effects
7 functions extracted to lib/post_merge.py:
- embed_merged_claims, reciprocal_edges, find_claim_file, add_edge_to_file,
  archive_source_for_pr, commit_source_moves, update_source_frontmatter_status

git_fn injection pattern (same as contributor.py) for 3 async functions
that need git operations. Unused async_main_worktree_lock import removed
from merge.py.

merge.py: 1562 → 1200 lines (−362). Total reduction from 1912: −712 lines.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 13:20:59 +01:00

384 lines
15 KiB
Python

"""Post-merge effects: embedding, reciprocal edges, source archiving.
All functions run after a PR is merged to main. Non-fatal failures
are logged but do not block the pipeline.
Extracted from merge.py Phase 6b of decomposition.
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import shutil
from typing import Callable
from . import config
from .frontmatter import (
REWEAVE_EDGE_FIELDS,
RECIPROCAL_EDGE_MAP,
parse_yaml_frontmatter,
serialize_edge_fields,
)
try:
from .worktree_lock import async_main_worktree_lock
except ImportError:
from worktree_lock import async_main_worktree_lock
logger = logging.getLogger(__name__)
# Accumulates source moves during a merge cycle, batch-committed at the end
_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path)
def update_source_frontmatter_status(path: str, new_status: str):
"""Update the status field in a source file's frontmatter. (Ganymede: 5 lines)"""
try:
text = open(path).read()
text = re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=re.MULTILINE)
open(path, "w").write(text)
except Exception as e:
logger.warning("Failed to update source status in %s: %s", path, e)
async def embed_merged_claims(main_sha: str, branch_sha: str, git_fn: Callable):
"""Embed new/changed claim files from a merged PR into Qdrant.
Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip)
to find ALL changed files across the entire branch, not just the last commit.
Also deletes Qdrant vectors for files removed by the branch.
Non-fatal — embedding failure does not block the merge pipeline.
"""
try:
# --- Embed added/changed files ---
rc, diff_out = await git_fn(
"diff", "--name-only", "--diff-filter=ACMR",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc != 0:
logger.warning("embed: diff failed (rc=%d), skipping", rc)
return
embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"}
md_files = [
f for f in diff_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in embed_dirs)
and not f.split("/")[-1].startswith("_")
]
embedded = 0
for fpath in md_files:
full_path = config.MAIN_WORKTREE / fpath
if not full_path.exists():
continue
proc = await asyncio.create_subprocess_exec(
"python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
if proc.returncode == 0 and b"OK" in stdout:
embedded += 1
else:
logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200])
if embedded:
logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files))
# --- Delete vectors for removed files (Ganymede: stale vector cleanup) ---
rc, del_out = await git_fn(
"diff", "--name-only", "--diff-filter=D",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc == 0 and del_out.strip():
deleted_files = [
f for f in del_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in embed_dirs)
]
if deleted_files:
point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files]
try:
import urllib.request
req = urllib.request.Request(
"http://localhost:6333/collections/teleo-claims/points/delete",
data=json.dumps({"points": point_ids}).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=10)
logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids))
except Exception:
logger.warning("embed: failed to delete stale vectors (non-fatal)")
except Exception:
logger.exception("embed: post-merge embedding failed (non-fatal)")
def find_claim_file(slug: str):
"""Find a claim file on disk by its slug. Searches domains/, core/, foundations/.
Returns Path or None.
"""
worktree = config.MAIN_WORKTREE
for search_dir in ("domains", "core", "foundations"):
base = worktree / search_dir
if not base.is_dir():
continue
# Direct match
for md in base.rglob(f"{slug}.md"):
if not md.name.startswith("_"):
return md
return None
def add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool:
"""Add a single edge to a file's frontmatter. Returns True if modified."""
try:
content = file_path.read_text()
except Exception:
return False
fm, raw_fm, body = parse_yaml_frontmatter(content)
if fm is None:
return False
# Check for existing edge (dedup)
existing = fm.get(edge_type, [])
if isinstance(existing, str):
existing = [existing]
if not isinstance(existing, list):
existing = []
if any(str(e).strip().lower() == target_slug.lower() for e in existing):
return False # Already exists
# Build merged edges (all edge fields, only modifying the target one)
merged_edges = {}
for field in REWEAVE_EDGE_FIELDS:
vals = fm.get(field, [])
if isinstance(vals, str):
vals = [vals]
if not isinstance(vals, list):
vals = []
merged_edges[field] = list(vals)
merged_edges.setdefault(edge_type, []).append(target_slug)
# Serialize using the same string-surgery approach as reweave
new_fm = serialize_edge_fields(raw_fm, merged_edges)
if body.startswith("\n"):
new_content = f"---\n{new_fm}{body}"
else:
new_content = f"---\n{new_fm}\n{body}"
try:
file_path.write_text(new_content)
return True
except Exception:
return False
async def reciprocal_edges(main_sha: str, branch_sha: str, git_fn: Callable):
"""Add reciprocal edges on existing claims after a PR merges.
When a new claim A has `supports: [B]` in its frontmatter, B should have
`supports: [A]` added to its own frontmatter. This gives A an incoming link,
preventing it from being an orphan.
Runs on main after cherry-pick merge. Non-fatal — orphans are recoverable.
Only processes new files (diff-filter=A), not modified files.
"""
EDGE_FIELDS = ("supports", "challenges", "related")
try:
# Find newly added claim files
rc, diff_out = await git_fn(
"diff", "--name-only", "--diff-filter=A",
main_sha, branch_sha,
cwd=str(config.MAIN_WORKTREE),
timeout=10,
)
if rc != 0:
logger.warning("reciprocal_edges: diff failed (rc=%d), skipping", rc)
return
claim_dirs = {"domains/", "core/", "foundations/"}
new_claims = [
f for f in diff_out.strip().split("\n")
if f.endswith(".md")
and any(f.startswith(d) for d in claim_dirs)
and not f.split("/")[-1].startswith("_")
and "/entities/" not in f
and "/decisions/" not in f
]
if not new_claims:
return
reciprocals_added = 0
modified_files = set()
for claim_path in new_claims:
full_path = config.MAIN_WORKTREE / claim_path
if not full_path.exists():
continue
try:
content = full_path.read_text()
except Exception:
continue
fm, raw_fm, body = parse_yaml_frontmatter(content)
if fm is None:
continue
# Get the new claim's slug (filename without .md)
claim_slug = claim_path.rsplit("/", 1)[-1].replace(".md", "")
# Collect all edge targets from this new claim
for field in EDGE_FIELDS:
targets = fm.get(field, [])
if isinstance(targets, str):
targets = [targets]
if not isinstance(targets, list):
continue
for target_slug in targets:
target_slug = str(target_slug).strip()
if not target_slug:
continue
# Find the target file on disk
target_file = find_claim_file(target_slug)
if target_file is None:
continue
# Add reciprocal edge: target now has field: [new_claim_slug]
reciprocal_type = RECIPROCAL_EDGE_MAP.get(field, "related")
if add_edge_to_file(target_file, reciprocal_type, claim_slug):
reciprocals_added += 1
modified_files.add(str(target_file))
if reciprocals_added > 0:
# Stage only the files we modified (never git add -A in automation)
for f in modified_files:
await git_fn("add", f, cwd=str(config.MAIN_WORKTREE))
rc, out = await git_fn(
"commit", "-m", f"reciprocal edges: {reciprocals_added} edges from {len(new_claims)} new claims",
cwd=str(config.MAIN_WORKTREE),
)
if rc == 0:
# Push immediately — batch-extract-50.sh does reset --hard origin/main
# every 15 min, which destroys unpushed local commits
push_rc, push_out = await git_fn(
"push", "origin", "main",
cwd=str(config.MAIN_WORKTREE),
timeout=30,
)
if push_rc == 0:
logger.info("reciprocal_edges: %d edges pushed to main (%d new claims)", reciprocals_added, len(new_claims))
else:
logger.warning("reciprocal_edges: push failed (commit is local only): %s", push_out[:200])
else:
logger.warning("reciprocal_edges: commit failed: %s", out[:200])
except Exception:
logger.exception("reciprocal_edges: failed (non-fatal)")
def archive_source_for_pr(branch: str, domain: str, merged: bool = True):
"""Move source from queue/ to archive/{domain}/ after PR merge or close.
Only handles extract/ branches (Ganymede: skip research sessions).
Updates frontmatter: 'processed' for merged, 'rejected' for closed.
Accumulates moves for batch commit at end of merge cycle.
"""
if not branch.startswith("extract/"):
return
source_slug = branch.replace("extract/", "", 1)
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md")
archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown")
archive_path = os.path.join(archive_dir, f"{source_slug}.md")
# Already in archive? Delete queue duplicate
if os.path.exists(archive_path):
if os.path.exists(queue_path):
try:
os.remove(queue_path)
_pending_source_moves.append((queue_path, "deleted"))
logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain)
except Exception as e:
logger.warning("Source dedup failed: %s", e)
return
# Move from queue to archive
if os.path.exists(queue_path):
# Update frontmatter before moving (Ganymede: distinguish merged vs rejected)
update_source_frontmatter_status(queue_path, "processed" if merged else "rejected")
os.makedirs(archive_dir, exist_ok=True)
try:
shutil.move(queue_path, archive_path)
_pending_source_moves.append((queue_path, archive_path))
logger.info("Source archived: queue/%s → archive/%s/ (status=%s)",
source_slug, domain, "processed" if merged else "rejected")
except Exception as e:
logger.warning("Source archive failed: %s", e)
async def commit_source_moves(git_fn: Callable):
"""Batch commit accumulated source moves. Called at end of merge cycle.
Rhea review: fetch+reset before touching files, use main_worktree_lock,
crash gap is self-healing (reset --hard reverts uncommitted moves).
"""
if not _pending_source_moves:
return
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
count = len(_pending_source_moves)
_pending_source_moves.clear()
# Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C)
try:
async with async_main_worktree_lock(timeout=10):
# Sync worktree with remote (Rhea: fetch+reset, not pull)
await git_fn("fetch", "origin", "main", cwd=main_dir, timeout=30)
await git_fn("reset", "--hard", "origin/main", cwd=main_dir, timeout=30)
await git_fn("add", "-A", "inbox/", cwd=main_dir)
rc, out = await git_fn(
"commit", "-m",
f"pipeline: archive {count} source(s) post-merge\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
cwd=main_dir,
)
if rc != 0:
if "nothing to commit" in out:
return
logger.warning("Source archive commit failed: %s", out)
return
for attempt in range(3):
await git_fn("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
rc_push, _ = await git_fn("push", "origin", "main", cwd=main_dir, timeout=30)
if rc_push == 0:
logger.info("Committed + pushed %d source archive moves", count)
return
await asyncio.sleep(2)
logger.warning("Failed to push source archive moves after 3 attempts")
await git_fn("reset", "--hard", "origin/main", cwd=main_dir)
except TimeoutError:
logger.warning("Source archive commit skipped: worktree lock timeout")