Phase 6b: extract post_merge.py from merge.py — post-merge effects
Some checks are pending
CI / lint-and-test (push) Waiting to run
Some checks are pending
CI / lint-and-test (push) Waiting to run
7 functions extracted to lib/post_merge.py: - embed_merged_claims, reciprocal_edges, find_claim_file, add_edge_to_file, archive_source_for_pr, commit_source_moves, update_source_frontmatter_status git_fn injection pattern (same as contributor.py) for 3 async functions that need git operations. Unused async_main_worktree_lock import removed from merge.py. merge.py: 1562 → 1200 lines (−362). Total reduction from 1912: −712 lines. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ed1edd6466
commit
46ad508de7
2 changed files with 394 additions and 372 deletions
382
lib/merge.py
382
lib/merge.py
|
|
@ -43,13 +43,6 @@ for _prefix in PIPELINE_OWNED_PREFIXES:
|
|||
assert not _prefix.startswith(f"{_agent}/"), \
|
||||
f"FATAL: Agent prefix '{_agent}/' found in PIPELINE_OWNED_PREFIXES — this bypasses Leo's review gate"
|
||||
|
||||
# Import worktree lock — file at /opt/teleo-eval/pipeline/lib/worktree_lock.py
|
||||
try:
|
||||
from .worktree_lock import async_main_worktree_lock
|
||||
except ImportError:
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
from worktree_lock import async_main_worktree_lock
|
||||
from .cascade import cascade_after_merge
|
||||
from .cross_domain import cross_domain_after_merge
|
||||
from .forgejo import get_agent_token, get_pr_diff, repo_path
|
||||
|
|
@ -413,12 +406,16 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
|
|||
|
||||
from .frontmatter import (
|
||||
REWEAVE_EDGE_FIELDS,
|
||||
RECIPROCAL_EDGE_MAP,
|
||||
parse_yaml_frontmatter,
|
||||
union_edge_lists,
|
||||
serialize_edge_fields,
|
||||
serialize_frontmatter,
|
||||
)
|
||||
from .post_merge import (
|
||||
embed_merged_claims,
|
||||
reciprocal_edges,
|
||||
archive_source_for_pr,
|
||||
commit_source_moves,
|
||||
)
|
||||
|
||||
|
||||
async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
|
||||
|
|
@ -685,365 +682,6 @@ async def _delete_remote_branch(branch: str):
|
|||
logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch)
|
||||
|
||||
|
||||
# --- Source archiving after merge (Ganymede review: closes near-duplicate loop) ---
|
||||
|
||||
# Accumulates source moves during a merge cycle, batch-committed at the end
|
||||
_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path)
|
||||
|
||||
|
||||
def _update_source_frontmatter_status(path: str, new_status: str):
|
||||
"""Update the status field in a source file's frontmatter. (Ganymede: 5 lines)"""
|
||||
import re as _re
|
||||
try:
|
||||
text = open(path).read()
|
||||
text = _re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=_re.MULTILINE)
|
||||
open(path, "w").write(text)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to update source status in %s: %s", path, e)
|
||||
|
||||
|
||||
async def _embed_merged_claims(main_sha: str, branch_sha: str):
|
||||
"""Embed new/changed claim files from a merged PR into Qdrant.
|
||||
|
||||
Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip)
|
||||
to find ALL changed files across the entire branch, not just the last commit.
|
||||
Also deletes Qdrant vectors for files removed by the branch.
|
||||
|
||||
Non-fatal — embedding failure does not block the merge pipeline.
|
||||
"""
|
||||
try:
|
||||
# --- Embed added/changed files ---
|
||||
rc, diff_out = await _git(
|
||||
"diff", "--name-only", "--diff-filter=ACMR",
|
||||
main_sha, branch_sha,
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=10,
|
||||
)
|
||||
if rc != 0:
|
||||
logger.warning("embed: diff failed (rc=%d), skipping", rc)
|
||||
return
|
||||
|
||||
embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"}
|
||||
md_files = [
|
||||
f for f in diff_out.strip().split("\n")
|
||||
if f.endswith(".md")
|
||||
and any(f.startswith(d) for d in embed_dirs)
|
||||
and not f.split("/")[-1].startswith("_")
|
||||
]
|
||||
|
||||
embedded = 0
|
||||
for fpath in md_files:
|
||||
full_path = config.MAIN_WORKTREE / fpath
|
||||
if not full_path.exists():
|
||||
continue
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path),
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
|
||||
if proc.returncode == 0 and b"OK" in stdout:
|
||||
embedded += 1
|
||||
else:
|
||||
logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200])
|
||||
|
||||
if embedded:
|
||||
logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files))
|
||||
|
||||
# --- Delete vectors for removed files (Ganymede: stale vector cleanup) ---
|
||||
rc, del_out = await _git(
|
||||
"diff", "--name-only", "--diff-filter=D",
|
||||
main_sha, branch_sha,
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=10,
|
||||
)
|
||||
if rc == 0 and del_out.strip():
|
||||
deleted_files = [
|
||||
f for f in del_out.strip().split("\n")
|
||||
if f.endswith(".md")
|
||||
and any(f.startswith(d) for d in embed_dirs)
|
||||
]
|
||||
if deleted_files:
|
||||
import hashlib
|
||||
point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files]
|
||||
try:
|
||||
import urllib.request
|
||||
req = urllib.request.Request(
|
||||
"http://localhost:6333/collections/teleo-claims/points/delete",
|
||||
data=json.dumps({"points": point_ids}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
urllib.request.urlopen(req, timeout=10)
|
||||
logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids))
|
||||
except Exception:
|
||||
logger.warning("embed: failed to delete stale vectors (non-fatal)")
|
||||
except Exception:
|
||||
logger.exception("embed: post-merge embedding failed (non-fatal)")
|
||||
|
||||
|
||||
async def _reciprocal_edges(main_sha: str, branch_sha: str):
|
||||
"""Add reciprocal edges on existing claims after a PR merges.
|
||||
|
||||
When a new claim A has `supports: [B]` in its frontmatter, B should have
|
||||
`supports: [A]` added to its own frontmatter. This gives A an incoming link,
|
||||
preventing it from being an orphan.
|
||||
|
||||
Runs on main after cherry-pick merge. Non-fatal — orphans are recoverable.
|
||||
Only processes new files (diff-filter=A), not modified files.
|
||||
"""
|
||||
EDGE_FIELDS = ("supports", "challenges", "related")
|
||||
# Inverse mapping: if A supports B, then B is supported-by A.
|
||||
# For simplicity, we use the same edge type (bidirectional "supports" means
|
||||
# both claims support each other's argument). This matches reweave behavior.
|
||||
|
||||
try:
|
||||
# Find newly added claim files
|
||||
rc, diff_out = await _git(
|
||||
"diff", "--name-only", "--diff-filter=A",
|
||||
main_sha, branch_sha,
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=10,
|
||||
)
|
||||
if rc != 0:
|
||||
logger.warning("reciprocal_edges: diff failed (rc=%d), skipping", rc)
|
||||
return
|
||||
|
||||
claim_dirs = {"domains/", "core/", "foundations/"}
|
||||
new_claims = [
|
||||
f for f in diff_out.strip().split("\n")
|
||||
if f.endswith(".md")
|
||||
and any(f.startswith(d) for d in claim_dirs)
|
||||
and not f.split("/")[-1].startswith("_")
|
||||
and "/entities/" not in f
|
||||
and "/decisions/" not in f
|
||||
]
|
||||
|
||||
if not new_claims:
|
||||
return
|
||||
|
||||
reciprocals_added = 0
|
||||
modified_files = set()
|
||||
for claim_path in new_claims:
|
||||
full_path = config.MAIN_WORKTREE / claim_path
|
||||
if not full_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
content = full_path.read_text()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
fm, raw_fm, body = parse_yaml_frontmatter(content)
|
||||
if fm is None:
|
||||
continue
|
||||
|
||||
# Get the new claim's slug (filename without .md)
|
||||
claim_slug = claim_path.rsplit("/", 1)[-1].replace(".md", "")
|
||||
|
||||
# Collect all edge targets from this new claim
|
||||
for field in EDGE_FIELDS:
|
||||
targets = fm.get(field, [])
|
||||
if isinstance(targets, str):
|
||||
targets = [targets]
|
||||
if not isinstance(targets, list):
|
||||
continue
|
||||
|
||||
for target_slug in targets:
|
||||
target_slug = str(target_slug).strip()
|
||||
if not target_slug:
|
||||
continue
|
||||
|
||||
# Find the target file on disk
|
||||
target_file = _find_claim_file(target_slug)
|
||||
if target_file is None:
|
||||
continue
|
||||
|
||||
# Add reciprocal edge: target now has field: [new_claim_slug]
|
||||
reciprocal_type = RECIPROCAL_EDGE_MAP.get(field, "related")
|
||||
if _add_edge_to_file(target_file, reciprocal_type, claim_slug):
|
||||
reciprocals_added += 1
|
||||
modified_files.add(str(target_file))
|
||||
|
||||
if reciprocals_added > 0:
|
||||
# Stage only the files we modified (never git add -A in automation)
|
||||
for f in modified_files:
|
||||
await _git("add", f, cwd=str(config.MAIN_WORKTREE))
|
||||
rc, out = await _git(
|
||||
"commit", "-m", f"reciprocal edges: {reciprocals_added} edges from {len(new_claims)} new claims",
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
)
|
||||
if rc == 0:
|
||||
# Push immediately — batch-extract-50.sh does reset --hard origin/main
|
||||
# every 15 min, which destroys unpushed local commits
|
||||
push_rc, push_out = await _git(
|
||||
"push", "origin", "main",
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=30,
|
||||
)
|
||||
if push_rc == 0:
|
||||
logger.info("reciprocal_edges: %d edges pushed to main (%d new claims)", reciprocals_added, len(new_claims))
|
||||
else:
|
||||
logger.warning("reciprocal_edges: push failed (commit is local only): %s", push_out[:200])
|
||||
else:
|
||||
logger.warning("reciprocal_edges: commit failed: %s", out[:200])
|
||||
|
||||
except Exception:
|
||||
logger.exception("reciprocal_edges: failed (non-fatal)")
|
||||
|
||||
|
||||
def _find_claim_file(slug: str) -> "Path | None":
|
||||
"""Find a claim file on disk by its slug. Searches domains/, core/, foundations/."""
|
||||
from pathlib import Path as _Path
|
||||
|
||||
worktree = config.MAIN_WORKTREE
|
||||
for search_dir in ("domains", "core", "foundations"):
|
||||
base = worktree / search_dir
|
||||
if not base.is_dir():
|
||||
continue
|
||||
# Direct match
|
||||
for md in base.rglob(f"{slug}.md"):
|
||||
if not md.name.startswith("_"):
|
||||
return md
|
||||
return None
|
||||
|
||||
|
||||
def _add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool:
|
||||
"""Add a single edge to a file's frontmatter. Returns True if modified."""
|
||||
try:
|
||||
content = file_path.read_text()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
fm, raw_fm, body = parse_yaml_frontmatter(content)
|
||||
if fm is None:
|
||||
return False
|
||||
|
||||
# Check for existing edge (dedup)
|
||||
existing = fm.get(edge_type, [])
|
||||
if isinstance(existing, str):
|
||||
existing = [existing]
|
||||
if not isinstance(existing, list):
|
||||
existing = []
|
||||
|
||||
if any(str(e).strip().lower() == target_slug.lower() for e in existing):
|
||||
return False # Already exists
|
||||
|
||||
# Build merged edges (all edge fields, only modifying the target one)
|
||||
merged_edges = {}
|
||||
for field in REWEAVE_EDGE_FIELDS:
|
||||
vals = fm.get(field, [])
|
||||
if isinstance(vals, str):
|
||||
vals = [vals]
|
||||
if not isinstance(vals, list):
|
||||
vals = []
|
||||
merged_edges[field] = list(vals)
|
||||
|
||||
merged_edges.setdefault(edge_type, []).append(target_slug)
|
||||
|
||||
# Serialize using the same string-surgery approach as reweave
|
||||
new_fm = serialize_edge_fields(raw_fm, merged_edges)
|
||||
if body.startswith("\n"):
|
||||
new_content = f"---\n{new_fm}{body}"
|
||||
else:
|
||||
new_content = f"---\n{new_fm}\n{body}"
|
||||
|
||||
try:
|
||||
file_path.write_text(new_content)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _archive_source_for_pr(branch: str, domain: str, merged: bool = True):
|
||||
"""Move source from queue/ to archive/{domain}/ after PR merge or close.
|
||||
|
||||
Only handles extract/ branches (Ganymede: skip research sessions).
|
||||
Updates frontmatter: 'processed' for merged, 'rejected' for closed.
|
||||
Accumulates moves for batch commit at end of merge cycle.
|
||||
"""
|
||||
if not branch.startswith("extract/"):
|
||||
return
|
||||
|
||||
source_slug = branch.replace("extract/", "", 1)
|
||||
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
|
||||
queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md")
|
||||
archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown")
|
||||
archive_path = os.path.join(archive_dir, f"{source_slug}.md")
|
||||
|
||||
# Already in archive? Delete queue duplicate
|
||||
if os.path.exists(archive_path):
|
||||
if os.path.exists(queue_path):
|
||||
try:
|
||||
os.remove(queue_path)
|
||||
_pending_source_moves.append((queue_path, "deleted"))
|
||||
logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain)
|
||||
except Exception as e:
|
||||
logger.warning("Source dedup failed: %s", e)
|
||||
return
|
||||
|
||||
# Move from queue to archive
|
||||
if os.path.exists(queue_path):
|
||||
# Update frontmatter before moving (Ganymede: distinguish merged vs rejected)
|
||||
_update_source_frontmatter_status(queue_path, "processed" if merged else "rejected")
|
||||
os.makedirs(archive_dir, exist_ok=True)
|
||||
try:
|
||||
shutil.move(queue_path, archive_path)
|
||||
_pending_source_moves.append((queue_path, archive_path))
|
||||
logger.info("Source archived: queue/%s → archive/%s/ (status=%s)",
|
||||
source_slug, domain, "processed" if merged else "rejected")
|
||||
except Exception as e:
|
||||
logger.warning("Source archive failed: %s", e)
|
||||
|
||||
|
||||
async def _commit_source_moves():
|
||||
"""Batch commit accumulated source moves. Called at end of merge cycle.
|
||||
|
||||
Rhea review: fetch+reset before touching files, use main_worktree_lock,
|
||||
crash gap is self-healing (reset --hard reverts uncommitted moves).
|
||||
"""
|
||||
if not _pending_source_moves:
|
||||
return
|
||||
|
||||
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
|
||||
count = len(_pending_source_moves)
|
||||
_pending_source_moves.clear()
|
||||
|
||||
# Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C)
|
||||
try:
|
||||
async with async_main_worktree_lock(timeout=10):
|
||||
# Sync worktree with remote (Rhea: fetch+reset, not pull)
|
||||
await _git("fetch", "origin", "main", cwd=main_dir, timeout=30)
|
||||
await _git("reset", "--hard", "origin/main", cwd=main_dir, timeout=30)
|
||||
|
||||
await _git("add", "-A", "inbox/", cwd=main_dir)
|
||||
|
||||
rc, out = await _git(
|
||||
"commit", "-m",
|
||||
f"pipeline: archive {count} source(s) post-merge\n\n"
|
||||
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
|
||||
cwd=main_dir,
|
||||
)
|
||||
if rc != 0:
|
||||
if "nothing to commit" in out:
|
||||
return
|
||||
logger.warning("Source archive commit failed: %s", out)
|
||||
return
|
||||
|
||||
for attempt in range(3):
|
||||
await _git("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
|
||||
rc_push, _ = await _git("push", "origin", "main", cwd=main_dir, timeout=30)
|
||||
if rc_push == 0:
|
||||
logger.info("Committed + pushed %d source archive moves", count)
|
||||
return
|
||||
await asyncio.sleep(2)
|
||||
|
||||
logger.warning("Failed to push source archive moves after 3 attempts")
|
||||
await _git("reset", "--hard", "origin/main", cwd=main_dir)
|
||||
except TimeoutError:
|
||||
logger.warning("Source archive commit skipped: worktree lock timeout")
|
||||
|
||||
|
||||
# --- Domain merge task ---
|
||||
|
||||
|
||||
|
|
@ -1187,14 +825,14 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
|
|||
logger.exception("PR #%d: contributor attribution failed (non-fatal)", pr_num)
|
||||
|
||||
# Archive source file (closes near-duplicate loop — Ganymede review)
|
||||
_archive_source_for_pr(branch, domain)
|
||||
archive_source_for_pr(branch, domain)
|
||||
|
||||
# Embed new/changed claims into Qdrant (non-fatal)
|
||||
await _embed_merged_claims(main_sha, branch_sha)
|
||||
await embed_merged_claims(main_sha, branch_sha, _git)
|
||||
|
||||
# Add reciprocal edges on existing claims (non-fatal)
|
||||
# New claim A with supports:[B] → add supports:[A] on B's frontmatter
|
||||
await _reciprocal_edges(main_sha, branch_sha)
|
||||
await reciprocal_edges(main_sha, branch_sha, _git)
|
||||
|
||||
# Cascade: notify agents whose beliefs/positions depend on changed claims
|
||||
try:
|
||||
|
|
@ -1557,6 +1195,6 @@ async def merge_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|||
)
|
||||
|
||||
# Batch commit source moves (Ganymede: one commit per cycle, not per PR)
|
||||
await _commit_source_moves()
|
||||
await commit_source_moves(_git)
|
||||
|
||||
return total_succeeded, total_failed
|
||||
|
|
|
|||
384
lib/post_merge.py
Normal file
384
lib/post_merge.py
Normal file
|
|
@ -0,0 +1,384 @@
|
|||
"""Post-merge effects: embedding, reciprocal edges, source archiving.
|
||||
|
||||
All functions run after a PR is merged to main. Non-fatal failures
|
||||
are logged but do not block the pipeline.
|
||||
|
||||
Extracted from merge.py Phase 6b of decomposition.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from typing import Callable
|
||||
|
||||
from . import config
|
||||
from .frontmatter import (
|
||||
REWEAVE_EDGE_FIELDS,
|
||||
RECIPROCAL_EDGE_MAP,
|
||||
parse_yaml_frontmatter,
|
||||
serialize_edge_fields,
|
||||
)
|
||||
|
||||
try:
|
||||
from .worktree_lock import async_main_worktree_lock
|
||||
except ImportError:
|
||||
from worktree_lock import async_main_worktree_lock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Accumulates source moves during a merge cycle, batch-committed at the end
|
||||
_pending_source_moves: list[tuple[str, str]] = [] # (queue_path, archive_path)
|
||||
|
||||
|
||||
def update_source_frontmatter_status(path: str, new_status: str):
|
||||
"""Update the status field in a source file's frontmatter. (Ganymede: 5 lines)"""
|
||||
try:
|
||||
text = open(path).read()
|
||||
text = re.sub(r"^status: .*$", f"status: {new_status}", text, count=1, flags=re.MULTILINE)
|
||||
open(path, "w").write(text)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to update source status in %s: %s", path, e)
|
||||
|
||||
|
||||
async def embed_merged_claims(main_sha: str, branch_sha: str, git_fn: Callable):
|
||||
"""Embed new/changed claim files from a merged PR into Qdrant.
|
||||
|
||||
Diffs main_sha (pre-merge main HEAD) against branch_sha (merged branch tip)
|
||||
to find ALL changed files across the entire branch, not just the last commit.
|
||||
Also deletes Qdrant vectors for files removed by the branch.
|
||||
|
||||
Non-fatal — embedding failure does not block the merge pipeline.
|
||||
"""
|
||||
try:
|
||||
# --- Embed added/changed files ---
|
||||
rc, diff_out = await git_fn(
|
||||
"diff", "--name-only", "--diff-filter=ACMR",
|
||||
main_sha, branch_sha,
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=10,
|
||||
)
|
||||
if rc != 0:
|
||||
logger.warning("embed: diff failed (rc=%d), skipping", rc)
|
||||
return
|
||||
|
||||
embed_dirs = {"domains/", "core/", "foundations/", "decisions/", "entities/"}
|
||||
md_files = [
|
||||
f for f in diff_out.strip().split("\n")
|
||||
if f.endswith(".md")
|
||||
and any(f.startswith(d) for d in embed_dirs)
|
||||
and not f.split("/")[-1].startswith("_")
|
||||
]
|
||||
|
||||
embedded = 0
|
||||
for fpath in md_files:
|
||||
full_path = config.MAIN_WORKTREE / fpath
|
||||
if not full_path.exists():
|
||||
continue
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"python3", "/opt/teleo-eval/embed-claims.py", "--file", str(full_path),
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
|
||||
if proc.returncode == 0 and b"OK" in stdout:
|
||||
embedded += 1
|
||||
else:
|
||||
logger.warning("embed: failed for %s: %s", fpath, stderr.decode()[:200])
|
||||
|
||||
if embedded:
|
||||
logger.info("embed: %d/%d files embedded into Qdrant", embedded, len(md_files))
|
||||
|
||||
# --- Delete vectors for removed files (Ganymede: stale vector cleanup) ---
|
||||
rc, del_out = await git_fn(
|
||||
"diff", "--name-only", "--diff-filter=D",
|
||||
main_sha, branch_sha,
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=10,
|
||||
)
|
||||
if rc == 0 and del_out.strip():
|
||||
deleted_files = [
|
||||
f for f in del_out.strip().split("\n")
|
||||
if f.endswith(".md")
|
||||
and any(f.startswith(d) for d in embed_dirs)
|
||||
]
|
||||
if deleted_files:
|
||||
point_ids = [hashlib.md5(f.encode()).hexdigest() for f in deleted_files]
|
||||
try:
|
||||
import urllib.request
|
||||
req = urllib.request.Request(
|
||||
"http://localhost:6333/collections/teleo-claims/points/delete",
|
||||
data=json.dumps({"points": point_ids}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
urllib.request.urlopen(req, timeout=10)
|
||||
logger.info("embed: deleted %d stale vectors from Qdrant", len(point_ids))
|
||||
except Exception:
|
||||
logger.warning("embed: failed to delete stale vectors (non-fatal)")
|
||||
except Exception:
|
||||
logger.exception("embed: post-merge embedding failed (non-fatal)")
|
||||
|
||||
|
||||
def find_claim_file(slug: str):
|
||||
"""Find a claim file on disk by its slug. Searches domains/, core/, foundations/.
|
||||
|
||||
Returns Path or None.
|
||||
"""
|
||||
worktree = config.MAIN_WORKTREE
|
||||
for search_dir in ("domains", "core", "foundations"):
|
||||
base = worktree / search_dir
|
||||
if not base.is_dir():
|
||||
continue
|
||||
# Direct match
|
||||
for md in base.rglob(f"{slug}.md"):
|
||||
if not md.name.startswith("_"):
|
||||
return md
|
||||
return None
|
||||
|
||||
|
||||
def add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool:
|
||||
"""Add a single edge to a file's frontmatter. Returns True if modified."""
|
||||
try:
|
||||
content = file_path.read_text()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
fm, raw_fm, body = parse_yaml_frontmatter(content)
|
||||
if fm is None:
|
||||
return False
|
||||
|
||||
# Check for existing edge (dedup)
|
||||
existing = fm.get(edge_type, [])
|
||||
if isinstance(existing, str):
|
||||
existing = [existing]
|
||||
if not isinstance(existing, list):
|
||||
existing = []
|
||||
|
||||
if any(str(e).strip().lower() == target_slug.lower() for e in existing):
|
||||
return False # Already exists
|
||||
|
||||
# Build merged edges (all edge fields, only modifying the target one)
|
||||
merged_edges = {}
|
||||
for field in REWEAVE_EDGE_FIELDS:
|
||||
vals = fm.get(field, [])
|
||||
if isinstance(vals, str):
|
||||
vals = [vals]
|
||||
if not isinstance(vals, list):
|
||||
vals = []
|
||||
merged_edges[field] = list(vals)
|
||||
|
||||
merged_edges.setdefault(edge_type, []).append(target_slug)
|
||||
|
||||
# Serialize using the same string-surgery approach as reweave
|
||||
new_fm = serialize_edge_fields(raw_fm, merged_edges)
|
||||
if body.startswith("\n"):
|
||||
new_content = f"---\n{new_fm}{body}"
|
||||
else:
|
||||
new_content = f"---\n{new_fm}\n{body}"
|
||||
|
||||
try:
|
||||
file_path.write_text(new_content)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def reciprocal_edges(main_sha: str, branch_sha: str, git_fn: Callable):
|
||||
"""Add reciprocal edges on existing claims after a PR merges.
|
||||
|
||||
When a new claim A has `supports: [B]` in its frontmatter, B should have
|
||||
`supports: [A]` added to its own frontmatter. This gives A an incoming link,
|
||||
preventing it from being an orphan.
|
||||
|
||||
Runs on main after cherry-pick merge. Non-fatal — orphans are recoverable.
|
||||
Only processes new files (diff-filter=A), not modified files.
|
||||
"""
|
||||
EDGE_FIELDS = ("supports", "challenges", "related")
|
||||
|
||||
try:
|
||||
# Find newly added claim files
|
||||
rc, diff_out = await git_fn(
|
||||
"diff", "--name-only", "--diff-filter=A",
|
||||
main_sha, branch_sha,
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=10,
|
||||
)
|
||||
if rc != 0:
|
||||
logger.warning("reciprocal_edges: diff failed (rc=%d), skipping", rc)
|
||||
return
|
||||
|
||||
claim_dirs = {"domains/", "core/", "foundations/"}
|
||||
new_claims = [
|
||||
f for f in diff_out.strip().split("\n")
|
||||
if f.endswith(".md")
|
||||
and any(f.startswith(d) for d in claim_dirs)
|
||||
and not f.split("/")[-1].startswith("_")
|
||||
and "/entities/" not in f
|
||||
and "/decisions/" not in f
|
||||
]
|
||||
|
||||
if not new_claims:
|
||||
return
|
||||
|
||||
reciprocals_added = 0
|
||||
modified_files = set()
|
||||
for claim_path in new_claims:
|
||||
full_path = config.MAIN_WORKTREE / claim_path
|
||||
if not full_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
content = full_path.read_text()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
fm, raw_fm, body = parse_yaml_frontmatter(content)
|
||||
if fm is None:
|
||||
continue
|
||||
|
||||
# Get the new claim's slug (filename without .md)
|
||||
claim_slug = claim_path.rsplit("/", 1)[-1].replace(".md", "")
|
||||
|
||||
# Collect all edge targets from this new claim
|
||||
for field in EDGE_FIELDS:
|
||||
targets = fm.get(field, [])
|
||||
if isinstance(targets, str):
|
||||
targets = [targets]
|
||||
if not isinstance(targets, list):
|
||||
continue
|
||||
|
||||
for target_slug in targets:
|
||||
target_slug = str(target_slug).strip()
|
||||
if not target_slug:
|
||||
continue
|
||||
|
||||
# Find the target file on disk
|
||||
target_file = find_claim_file(target_slug)
|
||||
if target_file is None:
|
||||
continue
|
||||
|
||||
# Add reciprocal edge: target now has field: [new_claim_slug]
|
||||
reciprocal_type = RECIPROCAL_EDGE_MAP.get(field, "related")
|
||||
if add_edge_to_file(target_file, reciprocal_type, claim_slug):
|
||||
reciprocals_added += 1
|
||||
modified_files.add(str(target_file))
|
||||
|
||||
if reciprocals_added > 0:
|
||||
# Stage only the files we modified (never git add -A in automation)
|
||||
for f in modified_files:
|
||||
await git_fn("add", f, cwd=str(config.MAIN_WORKTREE))
|
||||
rc, out = await git_fn(
|
||||
"commit", "-m", f"reciprocal edges: {reciprocals_added} edges from {len(new_claims)} new claims",
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
)
|
||||
if rc == 0:
|
||||
# Push immediately — batch-extract-50.sh does reset --hard origin/main
|
||||
# every 15 min, which destroys unpushed local commits
|
||||
push_rc, push_out = await git_fn(
|
||||
"push", "origin", "main",
|
||||
cwd=str(config.MAIN_WORKTREE),
|
||||
timeout=30,
|
||||
)
|
||||
if push_rc == 0:
|
||||
logger.info("reciprocal_edges: %d edges pushed to main (%d new claims)", reciprocals_added, len(new_claims))
|
||||
else:
|
||||
logger.warning("reciprocal_edges: push failed (commit is local only): %s", push_out[:200])
|
||||
else:
|
||||
logger.warning("reciprocal_edges: commit failed: %s", out[:200])
|
||||
|
||||
except Exception:
|
||||
logger.exception("reciprocal_edges: failed (non-fatal)")
|
||||
|
||||
|
||||
def archive_source_for_pr(branch: str, domain: str, merged: bool = True):
|
||||
"""Move source from queue/ to archive/{domain}/ after PR merge or close.
|
||||
|
||||
Only handles extract/ branches (Ganymede: skip research sessions).
|
||||
Updates frontmatter: 'processed' for merged, 'rejected' for closed.
|
||||
Accumulates moves for batch commit at end of merge cycle.
|
||||
"""
|
||||
if not branch.startswith("extract/"):
|
||||
return
|
||||
|
||||
source_slug = branch.replace("extract/", "", 1)
|
||||
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
|
||||
queue_path = os.path.join(main_dir, "inbox", "queue", f"{source_slug}.md")
|
||||
archive_dir = os.path.join(main_dir, "inbox", "archive", domain or "unknown")
|
||||
archive_path = os.path.join(archive_dir, f"{source_slug}.md")
|
||||
|
||||
# Already in archive? Delete queue duplicate
|
||||
if os.path.exists(archive_path):
|
||||
if os.path.exists(queue_path):
|
||||
try:
|
||||
os.remove(queue_path)
|
||||
_pending_source_moves.append((queue_path, "deleted"))
|
||||
logger.info("Source dedup: deleted queue/%s (already in archive/%s)", source_slug, domain)
|
||||
except Exception as e:
|
||||
logger.warning("Source dedup failed: %s", e)
|
||||
return
|
||||
|
||||
# Move from queue to archive
|
||||
if os.path.exists(queue_path):
|
||||
# Update frontmatter before moving (Ganymede: distinguish merged vs rejected)
|
||||
update_source_frontmatter_status(queue_path, "processed" if merged else "rejected")
|
||||
os.makedirs(archive_dir, exist_ok=True)
|
||||
try:
|
||||
shutil.move(queue_path, archive_path)
|
||||
_pending_source_moves.append((queue_path, archive_path))
|
||||
logger.info("Source archived: queue/%s → archive/%s/ (status=%s)",
|
||||
source_slug, domain, "processed" if merged else "rejected")
|
||||
except Exception as e:
|
||||
logger.warning("Source archive failed: %s", e)
|
||||
|
||||
|
||||
async def commit_source_moves(git_fn: Callable):
|
||||
"""Batch commit accumulated source moves. Called at end of merge cycle.
|
||||
|
||||
Rhea review: fetch+reset before touching files, use main_worktree_lock,
|
||||
crash gap is self-healing (reset --hard reverts uncommitted moves).
|
||||
"""
|
||||
if not _pending_source_moves:
|
||||
return
|
||||
|
||||
main_dir = config.MAIN_WORKTREE if hasattr(config, "MAIN_WORKTREE") else "/opt/teleo-eval/workspaces/main"
|
||||
count = len(_pending_source_moves)
|
||||
_pending_source_moves.clear()
|
||||
|
||||
# Acquire file lock — coordinates with telegram bot and other daemon stages (Ganymede: Option C)
|
||||
try:
|
||||
async with async_main_worktree_lock(timeout=10):
|
||||
# Sync worktree with remote (Rhea: fetch+reset, not pull)
|
||||
await git_fn("fetch", "origin", "main", cwd=main_dir, timeout=30)
|
||||
await git_fn("reset", "--hard", "origin/main", cwd=main_dir, timeout=30)
|
||||
|
||||
await git_fn("add", "-A", "inbox/", cwd=main_dir)
|
||||
|
||||
rc, out = await git_fn(
|
||||
"commit", "-m",
|
||||
f"pipeline: archive {count} source(s) post-merge\n\n"
|
||||
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>",
|
||||
cwd=main_dir,
|
||||
)
|
||||
if rc != 0:
|
||||
if "nothing to commit" in out:
|
||||
return
|
||||
logger.warning("Source archive commit failed: %s", out)
|
||||
return
|
||||
|
||||
for attempt in range(3):
|
||||
await git_fn("pull", "--rebase", "origin", "main", cwd=main_dir, timeout=30)
|
||||
rc_push, _ = await git_fn("push", "origin", "main", cwd=main_dir, timeout=30)
|
||||
if rc_push == 0:
|
||||
logger.info("Committed + pushed %d source archive moves", count)
|
||||
return
|
||||
await asyncio.sleep(2)
|
||||
|
||||
logger.warning("Failed to push source archive moves after 3 attempts")
|
||||
await git_fn("reset", "--hard", "origin/main", cwd=main_dir)
|
||||
except TimeoutError:
|
||||
logger.warning("Source archive commit skipped: worktree lock timeout")
|
||||
Loading…
Reference in a new issue