feat: per-file frontmatter union for reweave PR merge

Reweave PRs modify existing files (appending YAML edges). Cherry-pick
fails ~75% when main moves between PR creation and merge.

_merge_reweave_pr() reads each changed file from both main HEAD and
branch HEAD, unions the edge arrays (order-preserving, main-first),
and writes the result. Eliminates merge conflicts structurally.

Key design decisions (Ganymede + Theseus approved):
- Order-preserving dedup: main's edges first, branch-new appended
- Superset assertion: logs warning if branch missing main edges
- Uses main's body text (reweave only touches frontmatter)
- Loud failure on parse errors (no cherry-pick fallback)
- Append-only contract: reweave adds edges, never removes

18 tests covering parse, union, serialize, superset, and full workflow.
This commit is contained in:
m3taversal 2026-04-04 13:43:32 +01:00
parent 2253f48993
commit 6b3a5833df
2 changed files with 448 additions and 4 deletions

View file

@ -390,6 +390,215 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
await _git("branch", "-D", clean_branch)
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str]:
"""Parse YAML frontmatter from markdown text.
Returns (frontmatter_dict, body_text_including_closing_delimiter).
Returns (None, text) if no valid frontmatter found.
"""
import yaml
if not text.startswith("---"):
return None, text
end = text.find("\n---", 3)
if end == -1:
return None, text
try:
fm = yaml.safe_load(text[3:end])
body = text[end:] # includes closing --- and body
return fm if isinstance(fm, dict) else None, body
except Exception:
return None, text
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
"""Union two edge lists, preserving order from main (append new at end).
Deduplicates by lowercase slug. Main's order is preserved; branch-only
edges are appended in their original order.
"""
seen = set()
result = []
for edge in main_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
for edge in branch_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
return result
def _serialize_frontmatter(fm: dict, body: str) -> str:
"""Serialize frontmatter dict + body back to markdown text."""
import yaml
fm_str = yaml.dump(fm, default_flow_style=False, allow_unicode=True, sort_keys=False).rstrip("\n")
return f"---\n{fm_str}\n{body}"
async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
"""Merge a reweave PR using per-file frontmatter union instead of cherry-pick.
Reweave branches MODIFY existing files (appending YAML frontmatter edges).
Cherry-pick fails when main moved since branch creation (~75% failure rate).
This function:
1. Gets the list of files changed by the reweave branch
2. For each file, reads frontmatter from BOTH main HEAD and branch HEAD
3. Unions the edge arrays (order-preserving, main first, branch-new appended)
4. Asserts branch edges are a superset of main edges (reweave is append-only)
5. Writes merged content to a worktree, commits, pushes as the branch
Approved by Ganymede (manifest approach) and Theseus (superset assertion + order-preserving dedup).
"""
worktree_path = f"/tmp/teleo-merge-{branch.replace('/', '-')}"
clean_branch = f"_clean/{branch.replace('/', '-')}"
# Fetch latest state
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Get files changed by the reweave branch
rc, diff_out = await _git(
"diff", "--name-only", f"origin/main...origin/{branch}", timeout=10,
)
if rc != 0 or not diff_out.strip():
return False, f"no changed files found on {branch}"
changed_files = [f.strip() for f in diff_out.strip().split("\n") if f.strip() and f.strip().endswith(".md")]
if not changed_files:
return False, "no .md files changed"
# Create worktree from origin/main
await _git("branch", "-D", clean_branch)
rc, out = await _git("worktree", "add", "-b", clean_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
merged_count = 0
skipped_non_superset = []
for fpath in changed_files:
# Read file content from main HEAD and branch HEAD
rc_main, main_content = await _git("show", f"origin/main:{fpath}", timeout=5)
rc_branch, branch_content = await _git("show", f"origin/{branch}:{fpath}", timeout=5)
if rc_branch != 0:
logger.warning("Reweave merge: cannot read %s from branch %s", fpath, branch)
continue
if rc_main != 0:
# File only exists on branch (new file) — just write it
full_path = os.path.join(worktree_path, fpath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(branch_content)
await _git("add", fpath, cwd=worktree_path)
merged_count += 1
continue
# Parse frontmatter from both versions
main_fm, main_body = _parse_yaml_frontmatter(main_content)
branch_fm, branch_body = _parse_yaml_frontmatter(branch_content)
if main_fm is None or branch_fm is None:
# Parse failure = something unexpected. Fail the merge, don't fallback
# to cherry-pick. (Theseus: loud failure, not silent retry)
return False, f"frontmatter parse failed on {fpath} — manual review needed"
# Superset assertion: branch's edge set must be a superset of main's.
# Reweave only adds edges. If branch is missing an edge that main has,
# the branch was based on stale main — union is safe (adds both).
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
main_keys = {str(v).strip().lower() for v in main_list if v}
branch_keys = {str(v).strip().lower() for v in branch_list if v}
missing = main_keys - branch_keys
if missing:
logger.warning(
"Reweave merge: %s field '%s' — branch missing edges from main: %s",
fpath, field, missing,
)
skipped_non_superset.append(f"{fpath}:{field}")
# Union edge lists: main's edges first (order-preserved), branch-new appended
merged_fm = dict(main_fm) # Start with main's full frontmatter
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
if main_list or branch_list:
merged_fm[field] = _union_edge_lists(main_list, branch_list)
# Write merged file — use main's body (reweave doesn't touch body text)
full_path = os.path.join(worktree_path, fpath)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(_serialize_frontmatter(merged_fm, main_body))
await _git("add", fpath, cwd=worktree_path)
merged_count += 1
if merged_count == 0:
return False, "no files merged (all skipped)"
# Commit the merged changes
commit_msg = f"reweave: merge {merged_count} files via frontmatter union [auto]"
rc, out = await _git(
"commit", "-m", commit_msg, cwd=worktree_path, timeout=30,
)
if rc != 0:
return False, f"commit failed: {out}"
# Force-push as the branch (for the ff-push step in _merge_domain_queue)
rc, expected_sha = await _git("rev-parse", f"origin/{branch}")
if rc != 0:
return False, f"rev-parse origin/{branch} failed: {expected_sha}"
expected_sha = expected_sha.strip().split("\n")[0]
rc, out = await _git(
"push",
f"--force-with-lease={branch}:{expected_sha}",
"origin",
f"HEAD:{branch}",
cwd=worktree_path,
timeout=30,
)
if rc != 0:
return False, f"push rejected: {out}"
result_msg = f"frontmatter-union merged {merged_count} files"
if skipped_non_superset:
result_msg += f" (non-superset warnings: {len(skipped_non_superset)})"
return True, result_msg
finally:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", clean_branch)
async def _resubmit_approvals(pr_number: int):
"""Re-submit 2 formal Forgejo approvals after force-push invalidated them.
@ -944,11 +1153,19 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
logger.info("Merging PR #%d (%s) in domain %s", pr_num, branch, domain)
try:
# Cherry-pick onto fresh main (replaces rebase-retry — Leo+Cory directive)
# Extraction commits ADD new files, so cherry-pick applies cleanly.
# Rebase failed ~23% of the time due to main moving during replay.
# Route reweave branches to frontmatter-union merge.
# Reweave MODIFIES existing files (appending YAML edges) — cherry-pick
# fails ~75% when main moved. Frontmatter union reads current main HEAD,
# unions edge lists, commits. No conflicts possible.
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
if branch.startswith("reweave/"):
merge_fn = _merge_reweave_pr(branch)
else:
# Extraction commits ADD new files — cherry-pick applies cleanly.
merge_fn = _cherry_pick_onto_main(branch)
pick_ok, pick_msg = await asyncio.wait_for(
_cherry_pick_onto_main(branch),
merge_fn,
timeout=MERGE_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:

227
tests/test_reweave_merge.py Normal file
View file

@ -0,0 +1,227 @@
"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, superset assertion.
These test the pure functions used by _merge_reweave_pr in lib/merge.py.
Copied here because lib/merge.py's relative imports make direct import impractical in tests.
If these functions change in merge.py, update them here too.
"""
import pytest
import yaml
# --- Copied from lib/merge.py (pure functions, no dependencies) ---
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str]:
if not text.startswith("---"):
return None, text
end = text.find("\n---", 3)
if end == -1:
return None, text
try:
fm = yaml.safe_load(text[3:end])
body = text[end:]
return fm if isinstance(fm, dict) else None, body
except Exception:
return None, text
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
seen = set()
result = []
for edge in main_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
for edge in branch_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
return result
def _serialize_frontmatter(fm: dict, body: str) -> str:
fm_str = yaml.dump(fm, default_flow_style=False, allow_unicode=True, sort_keys=False).rstrip("\n")
return f"---\n{fm_str}\n{body}"
# --- End copied functions ---
class TestParseYamlFrontmatter:
def test_basic(self):
text = "---\ntitle: Test Claim\nsupports:\n - claim-a\n---\nBody text here."
fm, body = _parse_yaml_frontmatter(text)
assert fm is not None
assert fm["title"] == "Test Claim"
assert fm["supports"] == ["claim-a"]
assert body.startswith("\n---")
def test_no_frontmatter(self):
text = "Just plain text"
fm, body = _parse_yaml_frontmatter(text)
assert fm is None
assert body == text
def test_malformed_yaml(self):
text = "---\n: invalid: yaml: {{{\n---\nBody"
fm, body = _parse_yaml_frontmatter(text)
assert fm is None
class TestUnionEdgeLists:
def test_no_overlap(self):
main = ["claim-a", "claim-b"]
branch = ["claim-c", "claim-d"]
result = _union_edge_lists(main, branch)
assert result == ["claim-a", "claim-b", "claim-c", "claim-d"]
def test_overlap_preserves_main_order(self):
main = ["claim-b", "claim-a"]
branch = ["claim-a", "claim-c"]
result = _union_edge_lists(main, branch)
assert result == ["claim-b", "claim-a", "claim-c"]
def test_case_insensitive_dedup(self):
main = ["Claim A"]
branch = ["claim a", "Claim B"]
result = _union_edge_lists(main, branch)
assert len(result) == 2
assert result[0] == "Claim A"
assert result[1] == "Claim B"
def test_empty_main(self):
result = _union_edge_lists([], ["claim-a", "claim-b"])
assert result == ["claim-a", "claim-b"]
def test_empty_branch(self):
result = _union_edge_lists(["claim-a"], [])
assert result == ["claim-a"]
def test_both_empty(self):
assert _union_edge_lists([], []) == []
def test_duplicates_within_branch(self):
main = ["claim-a"]
branch = ["claim-b", "claim-b"]
result = _union_edge_lists(main, branch)
assert result == ["claim-a", "claim-b"]
class TestSerializeFrontmatter:
def test_roundtrip(self):
fm = {"title": "Test", "supports": ["claim-a", "claim-b"]}
body = "\n---\nBody text here."
text = _serialize_frontmatter(fm, body)
assert text.startswith("---\n")
assert "title: Test" in text
assert "Body text here." in text
fm2, body2 = _parse_yaml_frontmatter(text)
assert fm2["title"] == "Test"
assert fm2["supports"] == ["claim-a", "claim-b"]
class TestSupersetDetection:
def test_branch_is_superset(self):
main_edges = {"claim-a", "claim-b"}
branch_edges = {"claim-a", "claim-b", "claim-c"}
assert len(main_edges - branch_edges) == 0
def test_branch_missing_edge(self):
main_edges = {"claim-a", "claim-b"}
branch_edges = {"claim-a", "claim-c"}
assert "claim-b" in (main_edges - branch_edges)
def test_equal_sets(self):
main_edges = {"claim-a", "claim-b"}
branch_edges = {"claim-a", "claim-b"}
assert len(main_edges - branch_edges) == 0
class TestEdgeFieldsCoverage:
def test_standard_fields_present(self):
assert "supports" in REWEAVE_EDGE_FIELDS
assert "challenges" in REWEAVE_EDGE_FIELDS
assert "related" in REWEAVE_EDGE_FIELDS
assert "reweave_edges" in REWEAVE_EDGE_FIELDS
assert "depends_on" in REWEAVE_EDGE_FIELDS
class TestFullUnionWorkflow:
def test_main_evolved_branch_stale(self):
"""Main got new edges after branch was created. Union includes both."""
main_text = (
"---\ntitle: Test Claim\nconfidence: 0.8\n"
"supports:\n - claim-a\n - claim-b\n"
"related:\n - claim-x\n"
"---\nBody text."
)
branch_text = (
"---\ntitle: Test Claim\nconfidence: 0.8\n"
"supports:\n - claim-a\n"
"related:\n - claim-x\n - claim-y\n"
"reweave_edges:\n - \"claim-y|related|2026-04-04\"\n"
"---\nBody text."
)
main_fm, main_body = _parse_yaml_frontmatter(main_text)
branch_fm, _ = _parse_yaml_frontmatter(branch_text)
merged_fm = dict(main_fm)
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
if main_list or branch_list:
merged_fm[field] = _union_edge_lists(main_list, branch_list)
assert merged_fm["supports"] == ["claim-a", "claim-b"]
assert "claim-x" in merged_fm["related"]
assert "claim-y" in merged_fm["related"]
assert len(merged_fm.get("reweave_edges", [])) == 1
assert merged_fm["confidence"] == 0.8
def test_no_edge_fields_untouched(self):
"""Non-edge fields (title, confidence, type) come from main unchanged."""
main_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\n---\nBody."
branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n - new-claim\n---\nBody."
main_fm, main_body = _parse_yaml_frontmatter(main_text)
branch_fm, _ = _parse_yaml_frontmatter(branch_text)
merged_fm = dict(main_fm)
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
if main_list or branch_list:
merged_fm[field] = _union_edge_lists(main_list, branch_list)
assert merged_fm["title"] == "Original"
assert merged_fm["confidence"] == 0.9
assert merged_fm["type"] == "claim"
assert merged_fm["related"] == ["new-claim"]
def test_scalar_edge_field_converted_to_list(self):
"""Edge fields stored as scalars (not lists) are handled gracefully."""
main_fm = {"supports": "single-claim"}
branch_fm = {"supports": ["single-claim", "new-claim"]}
main_list = main_fm.get("supports", [])
branch_list = branch_fm.get("supports", [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
result = _union_edge_lists(main_list, branch_list)
assert result == ["single-claim", "new-claim"]