diff --git a/lib/merge.py b/lib/merge.py index cc79c6c..9910fb8 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -43,13 +43,6 @@ for _prefix in PIPELINE_OWNED_PREFIXES: assert not _prefix.startswith(f"{_agent}/"), \ f"FATAL: Agent prefix '{_agent}/' found in PIPELINE_OWNED_PREFIXES — this bypasses Leo's review gate" -# Import worktree lock — file at /opt/teleo-eval/pipeline/lib/worktree_lock.py -try: - from .worktree_lock import async_main_worktree_lock -except ImportError: - import sys - sys.path.insert(0, os.path.dirname(__file__)) - from worktree_lock import async_main_worktree_lock from .cascade import cascade_after_merge from .cross_domain import cross_domain_after_merge from .forgejo import get_agent_token, get_pr_diff, repo_path @@ -413,12 +406,16 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]: from .frontmatter import ( REWEAVE_EDGE_FIELDS, - RECIPROCAL_EDGE_MAP, parse_yaml_frontmatter, union_edge_lists, - serialize_edge_fields, serialize_frontmatter, ) +from .post_merge import ( + embed_merged_claims, + reciprocal_edges, + archive_source_for_pr, + commit_source_moves, +) async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: @@ -685,365 +682,6 @@ async def _delete_remote_branch(branch: str): logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch) -# --- Source archiving after merge (Ganymede review: closes near-duplicate loop) --- - -# Accumulates source moves during a merge cycle, batch-committed at the end -_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path) - - -def _update_source_frontmatter_status(path: str, new_status: str): - """Update the status field in a source file's frontmatter. (Ganymede: 5 lines)""" - import re as _re - try: - text = open(path).read() - text = _re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=_re.MULTILINE) - open(path, "w").write(text) - except Exception as e: - logger.warning("Failed to update source status in %s: %s", path, e) - - -async def _embed_merged_claims(main_sha: str, branch_sha: str): - """Embed new/changed claim files from a merged PR into Qdrant. - - Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip) - to find ALL changed files across the entire branch, not just the last commit. - Also deletes Qdrant vectors for files removed by the branch. - - Non-fatal — embedding failure does not block the merge pipeline. - """ - try: - # --- Embed added/changed files --- - rc, diff_out = await _git( - "diff", "--name-only", "--diff-filter=ACMR", - main_sha, branch_sha, - cwd=str(config.MAIN_WORKTREE), - timeout=10, - ) - if rc != 0: - logger.warning("embed: diff failed (rc=%d), skipping", rc) - return - - embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"} - md_files = [ - f for f in diff_out.strip().split("\n") - if f.endswith(".md") - and any(f.startswith(d) for d in embed_dirs) - and not f.split("/")[-1].startswith("_") - ] - - embedded = 0 - for fpath in md_files: - full_path = config.MAIN_WORKTREE / fpath - if not full_path.exists(): - continue - proc = await asyncio.create_subprocess_exec( - "python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) - if proc.returncode == 0 and b"OK" in stdout: - embedded += 1 - else: - logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200]) - - if embedded: - logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files)) - - # --- Delete vectors for removed files (Ganymede: stale vector cleanup) --- - rc, del_out = await _git( - "diff", "--name-only", "--diff-filter=D", - main_sha, branch_sha, - cwd=str(config.MAIN_WORKTREE), - timeout=10, - ) - if rc == 0 and del_out.strip(): - deleted_files = [ - f for f in del_out.strip().split("\n") - if f.endswith(".md") - and any(f.startswith(d) for d in embed_dirs) - ] - if deleted_files: - import hashlib - point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files] - try: - import urllib.request - req = urllib.request.Request( - "http://localhost:6333/collections/teleo-claims/points/delete", - data=json.dumps({"points": point_ids}).encode(), - headers={"Content-Type": "application/json"}, - method="POST", - ) - urllib.request.urlopen(req, timeout=10) - logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids)) - except Exception: - logger.warning("embed: failed to delete stale vectors (non-fatal)") - except Exception: - logger.exception("embed: post-merge embedding failed (non-fatal)") - - -async def _reciprocal_edges(main_sha: str, branch_sha: str): - """Add reciprocal edges on existing claims after a PR merges. - - When a new claim A has `supports: [B]` in its frontmatter, B should have - `supports: [A]` added to its own frontmatter. This gives A an incoming link, - preventing it from being an orphan. - - Runs on main after cherry-pick merge. Non-fatal — orphans are recoverable. - Only processes new files (diff-filter=A), not modified files. - """ - EDGE_FIELDS = ("supports", "challenges", "related") - # Inverse mapping: if A supports B, then B is supported-by A. - # For simplicity, we use the same edge type (bidirectional "supports" means - # both claims support each other's argument). This matches reweave behavior. - - try: - # Find newly added claim files - rc, diff_out = await _git( - "diff", "--name-only", "--diff-filter=A", - main_sha, branch_sha, - cwd=str(config.MAIN_WORKTREE), - timeout=10, - ) - if rc != 0: - logger.warning("reciprocal_edges: diff failed (rc=%d), skipping", rc) - return - - claim_dirs = {"domains/", "core/", "foundations/"} - new_claims = [ - f for f in diff_out.strip().split("\n") - if f.endswith(".md") - and any(f.startswith(d) for d in claim_dirs) - and not f.split("/")[-1].startswith("_") - and "/entities/" not in f - and "/decisions/" not in f - ] - - if not new_claims: - return - - reciprocals_added = 0 - modified_files = set() - for claim_path in new_claims: - full_path = config.MAIN_WORKTREE / claim_path - if not full_path.exists(): - continue - - try: - content = full_path.read_text() - except Exception: - continue - - fm, raw_fm, body = parse_yaml_frontmatter(content) - if fm is None: - continue - - # Get the new claim's slug (filename without .md) - claim_slug = claim_path.rsplit("/", 1)[-1].replace(".md", "") - - # Collect all edge targets from this new claim - for field in EDGE_FIELDS: - targets = fm.get(field, []) - if isinstance(targets, str): - targets = [targets] - if not isinstance(targets, list): - continue - - for target_slug in targets: - target_slug = str(target_slug).strip() - if not target_slug: - continue - - # Find the target file on disk - target_file = _find_claim_file(target_slug) - if target_file is None: - continue - - # Add reciprocal edge: target now has field: [new_claim_slug] - reciprocal_type = RECIPROCAL_EDGE_MAP.get(field, "related") - if _add_edge_to_file(target_file, reciprocal_type, claim_slug): - reciprocals_added += 1 - modified_files.add(str(target_file)) - - if reciprocals_added > 0: - # Stage only the files we modified (never git add -A in automation) - for f in modified_files: - await _git("add", f, cwd=str(config.MAIN_WORKTREE)) - rc, out = await _git( - "commit", "-m", f"reciprocal edges: {reciprocals_added} edges from {len(new_claims)} new claims", - cwd=str(config.MAIN_WORKTREE), - ) - if rc == 0: - # Push immediately — batch-extract-50.sh does reset --hard origin/main - # every 15 min, which destroys unpushed local commits - push_rc, push_out = await _git( - "push", "origin", "main", - cwd=str(config.MAIN_WORKTREE), - timeout=30, - ) - if push_rc == 0: - logger.info("reciprocal_edges: %d edges pushed to main (%d new claims)", reciprocals_added, len(new_claims)) - else: - logger.warning("reciprocal_edges: push failed (commit is local only): %s", push_out[:200]) - else: - logger.warning("reciprocal_edges: commit failed: %s", out[:200]) - - except Exception: - logger.exception("reciprocal_edges: failed (non-fatal)") - - -def _find_claim_file(slug: str) -> "Path | None": - """Find a claim file on disk by its slug. Searches domains/, core/, foundations/.""" - from pathlib import Path as _Path - - worktree = config.MAIN_WORKTREE - for search_dir in ("domains", "core", "foundations"): - base = worktree / search_dir - if not base.is_dir(): - continue - # Direct match - for md in base.rglob(f"{slug}.md"): - if not md.name.startswith("_"): - return md - return None - - -def _add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool: - """Add a single edge to a file's frontmatter. Returns True if modified.""" - try: - content = file_path.read_text() - except Exception: - return False - - fm, raw_fm, body = parse_yaml_frontmatter(content) - if fm is None: - return False - - # Check for existing edge (dedup) - existing = fm.get(edge_type, []) - if isinstance(existing, str): - existing = [existing] - if not isinstance(existing, list): - existing = [] - - if any(str(e).strip().lower() == target_slug.lower() for e in existing): - return False # Already exists - - # Build merged edges (all edge fields, only modifying the target one) - merged_edges = {} - for field in REWEAVE_EDGE_FIELDS: - vals = fm.get(field, []) - if isinstance(vals, str): - vals = [vals] - if not isinstance(vals, list): - vals = [] - merged_edges[field] = list(vals) - - merged_edges.setdefault(edge_type, []).append(target_slug) - - # Serialize using the same string-surgery approach as reweave - new_fm = serialize_edge_fields(raw_fm, merged_edges) - if body.startswith("\n"): - new_content = f"---\n{new_fm}{body}" - else: - new_content = f"---\n{new_fm}\n{body}" - - try: - file_path.write_text(new_content) - return True - except Exception: - return False - - -def _archive_source_for_pr(branch: str, domain: str, merged: bool = True): - """Move source from queue/ to archive/{domain}/ after PR merge or close. - - Only handles extract/ branches (Ganymede: skip research sessions). - Updates frontmatter: 'processed' for merged, 'rejected' for closed. - Accumulates moves for batch commit at end of merge cycle. - """ - if not branch.startswith("extract/"): - return - - source_slug = branch.replace("extract/", "", 1) - main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" - queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md") - archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown") - archive_path = os.path.join(archive_dir, f"{source_slug}.md") - - # Already in archive? Delete queue duplicate - if os.path.exists(archive_path): - if os.path.exists(queue_path): - try: - os.remove(queue_path) - _pending_source_moves.append((queue_path, "deleted")) - logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain) - except Exception as e: - logger.warning("Source dedup failed: %s", e) - return - - # Move from queue to archive - if os.path.exists(queue_path): - # Update frontmatter before moving (Ganymede: distinguish merged vs rejected) - _update_source_frontmatter_status(queue_path, "processed" if merged else "rejected") - os.makedirs(archive_dir, exist_ok=True) - try: - shutil.move(queue_path, archive_path) - _pending_source_moves.append((queue_path, archive_path)) - logger.info("Source archived: queue/%s → archive/%s/ (status=%s)", - source_slug, domain, "processed" if merged else "rejected") - except Exception as e: - logger.warning("Source archive failed: %s", e) - - -async def _commit_source_moves(): - """Batch commit accumulated source moves. Called at end of merge cycle. - - Rhea review: fetch+reset before touching files, use main_worktree_lock, - crash gap is self-healing (reset --hard reverts uncommitted moves). - """ - if not _pending_source_moves: - return - - main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" - count = len(_pending_source_moves) - _pending_source_moves.clear() - - # Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C) - try: - async with async_main_worktree_lock(timeout=10): - # Sync worktree with remote (Rhea: fetch+reset, not pull) - await _git("fetch", "origin", "main", cwd=main_dir, timeout=30) - await _git("reset", "--hard", "origin/main", cwd=main_dir, timeout=30) - - await _git("add", "-A", "inbox/", cwd=main_dir) - - rc, out = await _git( - "commit", "-m", - f"pipeline: archive {count} source(s) post-merge\n\n" - f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>", - cwd=main_dir, - ) - if rc != 0: - if "nothing to commit" in out: - return - logger.warning("Source archive commit failed: %s", out) - return - - for attempt in range(3): - await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30) - rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30) - if rc_push == 0: - logger.info("Committed + pushed %d source archive moves", count) - return - await asyncio.sleep(2) - - logger.warning("Failed to push source archive moves after 3 attempts") - await _git("reset", "--hard", "origin/main", cwd=main_dir) - except TimeoutError: - logger.warning("Source archive commit skipped: worktree lock timeout") - - # --- Domain merge task --- @@ -1187,14 +825,14 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]: logger.exception("PR #%d: contributor attribution failed (non-fatal)", pr_num) # Archive source file (closes near-duplicate loop — Ganymede review) - _archive_source_for_pr(branch, domain) + archive_source_for_pr(branch, domain) # Embed new/changed claims into Qdrant (non-fatal) - await _embed_merged_claims(main_sha, branch_sha) + await embed_merged_claims(main_sha, branch_sha, _git) # Add reciprocal edges on existing claims (non-fatal) # New claim A with supports:[B] → add supports:[A] on B's frontmatter - await _reciprocal_edges(main_sha, branch_sha) + await reciprocal_edges(main_sha, branch_sha, _git) # Cascade: notify agents whose beliefs/positions depend on changed claims try: @@ -1557,6 +1195,6 @@ async def merge_cycle(conn, max_workers=None) -> tuple[int, int]: ) # Batch commit source moves (Ganymede: one commit per cycle, not per PR) - await _commit_source_moves() + await commit_source_moves(_git) return total_succeeded, total_failed diff --git a/lib/post_merge.py b/lib/post_merge.py new file mode 100644 index 0000000..9d1cf2f --- /dev/null +++ b/lib/post_merge.py @@ -0,0 +1,384 @@ +"""Post-merge effects: embedding, reciprocal edges, source archiving. + +All functions run after a PR is merged to main. Non-fatal failures +are logged but do not block the pipeline. + +Extracted from merge.py Phase 6b of decomposition. +""" + +import asyncio +import hashlib +import json +import logging +import os +import re +import shutil +from typing import Callable + +from . import config +from .frontmatter import ( + REWEAVE_EDGE_FIELDS, + RECIPROCAL_EDGE_MAP, + parse_yaml_frontmatter, + serialize_edge_fields, +) + +try: + from .worktree_lock import async_main_worktree_lock +except ImportError: + from worktree_lock import async_main_worktree_lock + +logger = logging.getLogger(__name__) + + +# Accumulates source moves during a merge cycle, batch-committed at the end +_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path) + + +def update_source_frontmatter_status(path: str, new_status: str): + """Update the status field in a source file's frontmatter. (Ganymede: 5 lines)""" + try: + text = open(path).read() + text = re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=re.MULTILINE) + open(path, "w").write(text) + except Exception as e: + logger.warning("Failed to update source status in %s: %s", path, e) + + +async def embed_merged_claims(main_sha: str, branch_sha: str, git_fn: Callable): + """Embed new/changed claim files from a merged PR into Qdrant. + + Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip) + to find ALL changed files across the entire branch, not just the last commit. + Also deletes Qdrant vectors for files removed by the branch. + + Non-fatal — embedding failure does not block the merge pipeline. + """ + try: + # --- Embed added/changed files --- + rc, diff_out = await git_fn( + "diff", "--name-only", "--diff-filter=ACMR", + main_sha, branch_sha, + cwd=str(config.MAIN_WORKTREE), + timeout=10, + ) + if rc != 0: + logger.warning("embed: diff failed (rc=%d), skipping", rc) + return + + embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"} + md_files = [ + f for f in diff_out.strip().split("\n") + if f.endswith(".md") + and any(f.startswith(d) for d in embed_dirs) + and not f.split("/")[-1].startswith("_") + ] + + embedded = 0 + for fpath in md_files: + full_path = config.MAIN_WORKTREE / fpath + if not full_path.exists(): + continue + proc = await asyncio.create_subprocess_exec( + "python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) + if proc.returncode == 0 and b"OK" in stdout: + embedded += 1 + else: + logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200]) + + if embedded: + logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files)) + + # --- Delete vectors for removed files (Ganymede: stale vector cleanup) --- + rc, del_out = await git_fn( + "diff", "--name-only", "--diff-filter=D", + main_sha, branch_sha, + cwd=str(config.MAIN_WORKTREE), + timeout=10, + ) + if rc == 0 and del_out.strip(): + deleted_files = [ + f for f in del_out.strip().split("\n") + if f.endswith(".md") + and any(f.startswith(d) for d in embed_dirs) + ] + if deleted_files: + point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files] + try: + import urllib.request + req = urllib.request.Request( + "http://localhost:6333/collections/teleo-claims/points/delete", + data=json.dumps({"points": point_ids}).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(req, timeout=10) + logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids)) + except Exception: + logger.warning("embed: failed to delete stale vectors (non-fatal)") + except Exception: + logger.exception("embed: post-merge embedding failed (non-fatal)") + + +def find_claim_file(slug: str): + """Find a claim file on disk by its slug. Searches domains/, core/, foundations/. + + Returns Path or None. + """ + worktree = config.MAIN_WORKTREE + for search_dir in ("domains", "core", "foundations"): + base = worktree / search_dir + if not base.is_dir(): + continue + # Direct match + for md in base.rglob(f"{slug}.md"): + if not md.name.startswith("_"): + return md + return None + + +def add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool: + """Add a single edge to a file's frontmatter. Returns True if modified.""" + try: + content = file_path.read_text() + except Exception: + return False + + fm, raw_fm, body = parse_yaml_frontmatter(content) + if fm is None: + return False + + # Check for existing edge (dedup) + existing = fm.get(edge_type, []) + if isinstance(existing, str): + existing = [existing] + if not isinstance(existing, list): + existing = [] + + if any(str(e).strip().lower() == target_slug.lower() for e in existing): + return False # Already exists + + # Build merged edges (all edge fields, only modifying the target one) + merged_edges = {} + for field in REWEAVE_EDGE_FIELDS: + vals = fm.get(field, []) + if isinstance(vals, str): + vals = [vals] + if not isinstance(vals, list): + vals = [] + merged_edges[field] = list(vals) + + merged_edges.setdefault(edge_type, []).append(target_slug) + + # Serialize using the same string-surgery approach as reweave + new_fm = serialize_edge_fields(raw_fm, merged_edges) + if body.startswith("\n"): + new_content = f"---\n{new_fm}{body}" + else: + new_content = f"---\n{new_fm}\n{body}" + + try: + file_path.write_text(new_content) + return True + except Exception: + return False + + +async def reciprocal_edges(main_sha: str, branch_sha: str, git_fn: Callable): + """Add reciprocal edges on existing claims after a PR merges. + + When a new claim A has `supports: [B]` in its frontmatter, B should have + `supports: [A]` added to its own frontmatter. This gives A an incoming link, + preventing it from being an orphan. + + Runs on main after cherry-pick merge. Non-fatal — orphans are recoverable. + Only processes new files (diff-filter=A), not modified files. + """ + EDGE_FIELDS = ("supports", "challenges", "related") + + try: + # Find newly added claim files + rc, diff_out = await git_fn( + "diff", "--name-only", "--diff-filter=A", + main_sha, branch_sha, + cwd=str(config.MAIN_WORKTREE), + timeout=10, + ) + if rc != 0: + logger.warning("reciprocal_edges: diff failed (rc=%d), skipping", rc) + return + + claim_dirs = {"domains/", "core/", "foundations/"} + new_claims = [ + f for f in diff_out.strip().split("\n") + if f.endswith(".md") + and any(f.startswith(d) for d in claim_dirs) + and not f.split("/")[-1].startswith("_") + and "/entities/" not in f + and "/decisions/" not in f + ] + + if not new_claims: + return + + reciprocals_added = 0 + modified_files = set() + for claim_path in new_claims: + full_path = config.MAIN_WORKTREE / claim_path + if not full_path.exists(): + continue + + try: + content = full_path.read_text() + except Exception: + continue + + fm, raw_fm, body = parse_yaml_frontmatter(content) + if fm is None: + continue + + # Get the new claim's slug (filename without .md) + claim_slug = claim_path.rsplit("/", 1)[-1].replace(".md", "") + + # Collect all edge targets from this new claim + for field in EDGE_FIELDS: + targets = fm.get(field, []) + if isinstance(targets, str): + targets = [targets] + if not isinstance(targets, list): + continue + + for target_slug in targets: + target_slug = str(target_slug).strip() + if not target_slug: + continue + + # Find the target file on disk + target_file = find_claim_file(target_slug) + if target_file is None: + continue + + # Add reciprocal edge: target now has field: [new_claim_slug] + reciprocal_type = RECIPROCAL_EDGE_MAP.get(field, "related") + if add_edge_to_file(target_file, reciprocal_type, claim_slug): + reciprocals_added += 1 + modified_files.add(str(target_file)) + + if reciprocals_added > 0: + # Stage only the files we modified (never git add -A in automation) + for f in modified_files: + await git_fn("add", f, cwd=str(config.MAIN_WORKTREE)) + rc, out = await git_fn( + "commit", "-m", f"reciprocal edges: {reciprocals_added} edges from {len(new_claims)} new claims", + cwd=str(config.MAIN_WORKTREE), + ) + if rc == 0: + # Push immediately — batch-extract-50.sh does reset --hard origin/main + # every 15 min, which destroys unpushed local commits + push_rc, push_out = await git_fn( + "push", "origin", "main", + cwd=str(config.MAIN_WORKTREE), + timeout=30, + ) + if push_rc == 0: + logger.info("reciprocal_edges: %d edges pushed to main (%d new claims)", reciprocals_added, len(new_claims)) + else: + logger.warning("reciprocal_edges: push failed (commit is local only): %s", push_out[:200]) + else: + logger.warning("reciprocal_edges: commit failed: %s", out[:200]) + + except Exception: + logger.exception("reciprocal_edges: failed (non-fatal)") + + +def archive_source_for_pr(branch: str, domain: str, merged: bool = True): + """Move source from queue/ to archive/{domain}/ after PR merge or close. + + Only handles extract/ branches (Ganymede: skip research sessions). + Updates frontmatter: 'processed' for merged, 'rejected' for closed. + Accumulates moves for batch commit at end of merge cycle. + """ + if not branch.startswith("extract/"): + return + + source_slug = branch.replace("extract/", "", 1) + main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" + queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md") + archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown") + archive_path = os.path.join(archive_dir, f"{source_slug}.md") + + # Already in archive? Delete queue duplicate + if os.path.exists(archive_path): + if os.path.exists(queue_path): + try: + os.remove(queue_path) + _pending_source_moves.append((queue_path, "deleted")) + logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain) + except Exception as e: + logger.warning("Source dedup failed: %s", e) + return + + # Move from queue to archive + if os.path.exists(queue_path): + # Update frontmatter before moving (Ganymede: distinguish merged vs rejected) + update_source_frontmatter_status(queue_path, "processed" if merged else "rejected") + os.makedirs(archive_dir, exist_ok=True) + try: + shutil.move(queue_path, archive_path) + _pending_source_moves.append((queue_path, archive_path)) + logger.info("Source archived: queue/%s → archive/%s/ (status=%s)", + source_slug, domain, "processed" if merged else "rejected") + except Exception as e: + logger.warning("Source archive failed: %s", e) + + +async def commit_source_moves(git_fn: Callable): + """Batch commit accumulated source moves. Called at end of merge cycle. + + Rhea review: fetch+reset before touching files, use main_worktree_lock, + crash gap is self-healing (reset --hard reverts uncommitted moves). + """ + if not _pending_source_moves: + return + + main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main" + count = len(_pending_source_moves) + _pending_source_moves.clear() + + # Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C) + try: + async with async_main_worktree_lock(timeout=10): + # Sync worktree with remote (Rhea: fetch+reset, not pull) + await git_fn("fetch", "origin", "main", cwd=main_dir, timeout=30) + await git_fn("reset", "--hard", "origin/main", cwd=main_dir, timeout=30) + + await git_fn("add", "-A", "inbox/", cwd=main_dir) + + rc, out = await git_fn( + "commit", "-m", + f"pipeline: archive {count} source(s) post-merge\n\n" + f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>", + cwd=main_dir, + ) + if rc != 0: + if "nothing to commit" in out: + return + logger.warning("Source archive commit failed: %s", out) + return + + for attempt in range(3): + await git_fn("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30) + rc_push, _ = await git_fn("push", "origin", "main", cwd=main_dir, timeout=30) + if rc_push == 0: + logger.info("Committed + pushed %d source archive moves", count) + return + await asyncio.sleep(2) + + logger.warning("Failed to push source archive moves after 3 attempts") + await git_fn("reset", "--hard", "origin/main", cwd=main_dir) + except TimeoutError: + logger.warning("Source archive commit skipped: worktree lock timeout")