From b09164214690415a2a973dbf3e448e652a3c100f Mon Sep 17 00:00:00 2001 From: m3taversal Date: Sat, 4 Apr 2026 13:48:44 +0100 Subject: [PATCH] =?UTF-8?q?fix:=20string-level=20edge=20splicing=20in=20re?= =?UTF-8?q?weave=20merge=20=E2=80=94=20no=20yaml.dump=20reformatting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes from Ganymede review: 1. CRITICAL: blank line before closing --- compounded on repeat reweaves. Body starts with \n---, so \n{body} created \n\n---. Fixed by checking body prefix. 2. Replaced yaml.dump round-trip with _serialize_edge_fields() that splices only edge arrays into raw frontmatter text. Non-edge fields (title, confidence, type, quotes, flow styles) stay byte-identical to main HEAD. _parse_yaml_frontmatter now returns 3-tuple: (dict, raw_fm_text, body). _serialize_frontmatter takes (raw_fm_text, merged_edges_dict, body). 26 tests pass including idempotency (5x serialize), formatting preservation, and no-blank-line regression test. Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/merge.py | 105 ++++++++++++---- tests/test_reweave_merge.py | 231 ++++++++++++++++++++++++++++-------- 2 files changed, 267 insertions(+), 69 deletions(-) diff --git a/lib/merge.py b/lib/merge.py index cea914a..af85d85 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -393,25 +393,27 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]: REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges") -def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str]: +def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]: """Parse YAML frontmatter from markdown text. - Returns (frontmatter_dict, body_text_including_closing_delimiter). - Returns (None, text) if no valid frontmatter found. + Returns (frontmatter_dict, raw_fm_text, body_text_including_closing_delimiter). + Returns (None, "", text) if no valid frontmatter found. + raw_fm_text is the text between the --- delimiters (no delimiters, no leading newline). """ import yaml if not text.startswith("---"): - return None, text + return None, "", text end = text.find("\n---", 3) if end == -1: - return None, text + return None, "", text try: - fm = yaml.safe_load(text[3:end]) - body = text[end:] # includes closing --- and body - return fm if isinstance(fm, dict) else None, body + raw_fm_text = text[4:end] # skip "---\n", stop before "\n---" + fm = yaml.safe_load(raw_fm_text) + body = text[end:] # includes closing \n--- and body + return (fm if isinstance(fm, dict) else None), raw_fm_text, body except Exception: - return None, text + return None, "", text def _union_edge_lists(main_edges: list, branch_edges: list) -> list: @@ -435,12 +437,74 @@ def _union_edge_lists(main_edges: list, branch_edges: list) -> list: return result -def _serialize_frontmatter(fm: dict, body: str) -> str: - """Serialize frontmatter dict + body back to markdown text.""" +def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str: + """Splice merged edge fields into raw frontmatter text, preserving all other fields byte-identical. + + Only modifies REWEAVE_EDGE_FIELDS lines. All other frontmatter (title, confidence, type, etc.) + stays exactly as it was in the source text — no yaml.dump reformatting. + + Args: + raw_fm_text: The raw YAML text between the --- delimiters (no delimiters included). + merged_edges: {field_name: [edge_values]} for each edge field that should be present. + """ + import re import yaml - fm_str = yaml.dump(fm, default_flow_style=False, allow_unicode=True, sort_keys=False).rstrip("\n") - return f"---\n{fm_str}\n{body}" + lines = raw_fm_text.split("\n") + result_lines = [] + i = 0 + fields_written = set() + + while i < len(lines): + line = lines[i] + # Check if this line starts an edge field + matched_field = None + for field in REWEAVE_EDGE_FIELDS: + if line.startswith(f"{field}:"): + matched_field = field + break + + if matched_field: + fields_written.add(matched_field) + # Skip the old field and its list items + i += 1 + while i < len(lines) and lines[i].startswith("- "): + i += 1 + # Write the merged version + edges = merged_edges.get(matched_field, []) + if edges: + result_lines.append(f"{matched_field}:") + for edge in edges: + result_lines.append(f"- {edge}") + # Don't increment i — it's already past the old field + continue + else: + result_lines.append(line) + i += 1 + + # Append any new edge fields that didn't exist in the original + for field in REWEAVE_EDGE_FIELDS: + if field not in fields_written: + edges = merged_edges.get(field, []) + if edges: + result_lines.append(f"{field}:") + for edge in edges: + result_lines.append(f"- {edge}") + + return "\n".join(result_lines) + + +def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str: + """Rebuild markdown file: splice merged edges into raw frontmatter, append body. + + Uses string-level surgery — only edge fields are modified. All other frontmatter + stays byte-identical to the source. No yaml.dump reformatting. + """ + spliced = _serialize_edge_fields(raw_fm_text, merged_edges) + # body starts with \n--- (closing delimiter + body text) + if body.startswith("\n"): + return f"---\n{spliced}{body}" + return f"---\n{spliced}\n{body}" async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: @@ -510,8 +574,8 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: continue # Parse frontmatter from both versions - main_fm, main_body = _parse_yaml_frontmatter(main_content) - branch_fm, branch_body = _parse_yaml_frontmatter(branch_content) + main_fm, main_raw_fm, main_body = _parse_yaml_frontmatter(main_content) + branch_fm, _branch_raw_fm, branch_body = _parse_yaml_frontmatter(branch_content) if main_fm is None or branch_fm is None: # Parse failure = something unexpected. Fail the merge, don't fallback @@ -540,8 +604,8 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: ) skipped_non_superset.append(f"{fpath}:{field}") - # Union edge lists: main's edges first (order-preserved), branch-new appended - merged_fm = dict(main_fm) # Start with main's full frontmatter + # Collect merged edge fields for string-level splicing + merged_edges = {} for field in REWEAVE_EDGE_FIELDS: main_list = main_fm.get(field, []) branch_list = branch_fm.get(field, []) @@ -549,15 +613,14 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: main_list = [main_list] if main_list else [] if not isinstance(branch_list, list): branch_list = [branch_list] if branch_list else [] - if main_list or branch_list: - merged_fm[field] = _union_edge_lists(main_list, branch_list) + merged_edges[field] = _union_edge_lists(main_list, branch_list) - # Write merged file — use main's body (reweave doesn't touch body text) + # Write merged file — splice edges into main's raw frontmatter, use main's body full_path = os.path.join(worktree_path, fpath) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "w") as f: - f.write(_serialize_frontmatter(merged_fm, main_body)) + f.write(_serialize_frontmatter(main_raw_fm, merged_edges, main_body)) await _git("add", fpath, cwd=worktree_path) merged_count += 1 diff --git a/tests/test_reweave_merge.py b/tests/test_reweave_merge.py index 6ac916b..f19c398 100644 --- a/tests/test_reweave_merge.py +++ b/tests/test_reweave_merge.py @@ -1,4 +1,4 @@ -"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, superset assertion. +"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, string-level splicing. These test the pure functions used by _merge_reweave_pr in lib/merge.py. Copied here because lib/merge.py's relative imports make direct import impractical in tests. @@ -13,18 +13,19 @@ import yaml REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges") -def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str]: +def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]: if not text.startswith("---"): - return None, text + return None, "", text end = text.find("\n---", 3) if end == -1: - return None, text + return None, "", text try: - fm = yaml.safe_load(text[3:end]) + raw_fm_text = text[4:end] + fm = yaml.safe_load(raw_fm_text) body = text[end:] - return fm if isinstance(fm, dict) else None, body + return (fm if isinstance(fm, dict) else None), raw_fm_text, body except Exception: - return None, text + return None, "", text def _union_edge_lists(main_edges: list, branch_edges: list) -> list: @@ -43,31 +44,75 @@ def _union_edge_lists(main_edges: list, branch_edges: list) -> list: return result -def _serialize_frontmatter(fm: dict, body: str) -> str: - fm_str = yaml.dump(fm, default_flow_style=False, allow_unicode=True, sort_keys=False).rstrip("\n") - return f"---\n{fm_str}\n{body}" +def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str: + lines = raw_fm_text.split("\n") + result_lines = [] + i = 0 + fields_written = set() + + while i < len(lines): + line = lines[i] + matched_field = None + for field in REWEAVE_EDGE_FIELDS: + if line.startswith(f"{field}:"): + matched_field = field + break + + if matched_field: + fields_written.add(matched_field) + i += 1 + while i < len(lines) and lines[i].startswith("- "): + i += 1 + edges = merged_edges.get(matched_field, []) + if edges: + result_lines.append(f"{matched_field}:") + for edge in edges: + result_lines.append(f"- {edge}") + continue + else: + result_lines.append(line) + i += 1 + + for field in REWEAVE_EDGE_FIELDS: + if field not in fields_written: + edges = merged_edges.get(field, []) + if edges: + result_lines.append(f"{field}:") + for edge in edges: + result_lines.append(f"- {edge}") + + return "\n".join(result_lines) + + +def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str: + spliced = _serialize_edge_fields(raw_fm_text, merged_edges) + if body.startswith("\n"): + return f"---\n{spliced}{body}" + return f"---\n{spliced}\n{body}" # --- End copied functions --- class TestParseYamlFrontmatter: def test_basic(self): - text = "---\ntitle: Test Claim\nsupports:\n - claim-a\n---\nBody text here." - fm, body = _parse_yaml_frontmatter(text) + text = "---\ntitle: Test Claim\nsupports:\n- claim-a\n---\nBody text here." + fm, raw, body = _parse_yaml_frontmatter(text) assert fm is not None assert fm["title"] == "Test Claim" assert fm["supports"] == ["claim-a"] assert body.startswith("\n---") + assert "title: Test Claim" in raw def test_no_frontmatter(self): text = "Just plain text" - fm, body = _parse_yaml_frontmatter(text) + fm, raw, body = _parse_yaml_frontmatter(text) assert fm is None + assert raw == "" assert body == text def test_malformed_yaml(self): text = "---\n: invalid: yaml: {{{\n---\nBody" - fm, body = _parse_yaml_frontmatter(text) + fm, raw, body = _parse_yaml_frontmatter(text) assert fm is None @@ -110,18 +155,90 @@ class TestUnionEdgeLists: assert result == ["claim-a", "claim-b"] -class TestSerializeFrontmatter: - def test_roundtrip(self): - fm = {"title": "Test", "supports": ["claim-a", "claim-b"]} - body = "\n---\nBody text here." - text = _serialize_frontmatter(fm, body) - assert text.startswith("---\n") - assert "title: Test" in text - assert "Body text here." in text +class TestSerializeEdgeFields: + def test_replaces_existing_field(self): + raw = "title: Test\nsupports:\n- old-claim" + merged = {"supports": ["old-claim", "new-claim"]} + result = _serialize_edge_fields(raw, merged) + assert "- old-claim" in result + assert "- new-claim" in result + assert "title: Test" in result - fm2, body2 = _parse_yaml_frontmatter(text) - assert fm2["title"] == "Test" - assert fm2["supports"] == ["claim-a", "claim-b"] + def test_preserves_non_edge_fields_exactly(self): + raw = "title: 'Quoted Title'\nconfidence: 0.85\ntype: claim" + merged = {"related": ["new-claim"]} + result = _serialize_edge_fields(raw, merged) + assert "title: 'Quoted Title'" in result + assert "confidence: 0.85" in result + assert "type: claim" in result + assert "related:" in result + assert "- new-claim" in result + + def test_appends_new_field(self): + raw = "title: Test\ntype: claim" + merged = {"supports": ["claim-a"]} + result = _serialize_edge_fields(raw, merged) + assert "title: Test" in result + assert "supports:" in result + assert "- claim-a" in result + + def test_empty_edges_removes_field(self): + raw = "title: Test\nsupports:\n- old-claim\ntype: claim" + merged = {} # no edges to write + result = _serialize_edge_fields(raw, merged) + assert "supports:" not in result + assert "title: Test" in result + assert "type: claim" in result + + def test_multiple_edge_fields(self): + raw = "title: Test\nsupports:\n- a\nchallenges:\n- b" + merged = {"supports": ["a", "c"], "challenges": ["b", "d"]} + result = _serialize_edge_fields(raw, merged) + lines = result.split("\n") + # supports and challenges both present with merged values + assert "- a" in result + assert "- c" in result + assert "- b" in result + assert "- d" in result + + +class TestSerializeFrontmatter: + def test_roundtrip_preserves_formatting(self): + original = "---\ntitle: 'Quoted Title'\nconfidence: 0.85\nsupports:\n- claim-a\n---\nBody text here." + fm, raw, body = _parse_yaml_frontmatter(original) + merged_edges = {"supports": ["claim-a", "claim-b"]} + result = _serialize_frontmatter(raw, merged_edges, body) + + # Non-edge fields preserved exactly + assert "title: 'Quoted Title'" in result + assert "confidence: 0.85" in result + # Edge fields updated + assert "- claim-a" in result + assert "- claim-b" in result + # Structure preserved + assert result.startswith("---\n") + assert "\n---\n" in result + assert result.endswith("Body text here.") + + def test_no_blank_line_before_closing_delimiter(self): + """Ganymede critical: no extra blank line compounds on repeat reweaves.""" + original = "---\ntitle: Test\nsupports:\n- a\n---\nBody." + fm, raw, body = _parse_yaml_frontmatter(original) + merged_edges = {"supports": ["a", "b"]} + result = _serialize_frontmatter(raw, merged_edges, body) + # Should NOT have \n\n--- (double newline before closing) + assert "\n\n---" not in result + + def test_repeated_serialize_no_drift(self): + """Repeated serialization should be idempotent — no accumulating blank lines.""" + text = "---\ntitle: Test\nsupports:\n- a\n---\nBody." + merged_edges = {"supports": ["a", "b"]} + + for _ in range(5): + fm, raw, body = _parse_yaml_frontmatter(text) + text = _serialize_frontmatter(raw, merged_edges, body) + + assert text.count("\n\n") == 0 # no double newlines anywhere class TestSupersetDetection: @@ -155,22 +272,22 @@ class TestFullUnionWorkflow: """Main got new edges after branch was created. Union includes both.""" main_text = ( "---\ntitle: Test Claim\nconfidence: 0.8\n" - "supports:\n - claim-a\n - claim-b\n" - "related:\n - claim-x\n" + "supports:\n- claim-a\n- claim-b\n" + "related:\n- claim-x\n" "---\nBody text." ) branch_text = ( "---\ntitle: Test Claim\nconfidence: 0.8\n" - "supports:\n - claim-a\n" - "related:\n - claim-x\n - claim-y\n" - "reweave_edges:\n - \"claim-y|related|2026-04-04\"\n" + "supports:\n- claim-a\n" + "related:\n- claim-x\n- claim-y\n" + "reweave_edges:\n- \"claim-y|related|2026-04-04\"\n" "---\nBody text." ) - main_fm, main_body = _parse_yaml_frontmatter(main_text) - branch_fm, _ = _parse_yaml_frontmatter(branch_text) + main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text) + branch_fm, _, _ = _parse_yaml_frontmatter(branch_text) - merged_fm = dict(main_fm) + merged_edges = {} for field in REWEAVE_EDGE_FIELDS: main_list = main_fm.get(field, []) branch_list = branch_fm.get(field, []) @@ -179,23 +296,27 @@ class TestFullUnionWorkflow: if not isinstance(branch_list, list): branch_list = [branch_list] if branch_list else [] if main_list or branch_list: - merged_fm[field] = _union_edge_lists(main_list, branch_list) + merged_edges[field] = _union_edge_lists(main_list, branch_list) - assert merged_fm["supports"] == ["claim-a", "claim-b"] - assert "claim-x" in merged_fm["related"] - assert "claim-y" in merged_fm["related"] - assert len(merged_fm.get("reweave_edges", [])) == 1 - assert merged_fm["confidence"] == 0.8 + assert merged_edges["supports"] == ["claim-a", "claim-b"] + assert "claim-x" in merged_edges["related"] + assert "claim-y" in merged_edges["related"] + assert len(merged_edges.get("reweave_edges", [])) == 1 + + # Verify non-edge fields preserved in serialization + result = _serialize_frontmatter(main_raw, merged_edges, main_body) + assert "confidence: 0.8" in result + assert "title: Test Claim" in result def test_no_edge_fields_untouched(self): """Non-edge fields (title, confidence, type) come from main unchanged.""" main_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\n---\nBody." - branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n - new-claim\n---\nBody." + branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n- new-claim\n---\nBody." - main_fm, main_body = _parse_yaml_frontmatter(main_text) - branch_fm, _ = _parse_yaml_frontmatter(branch_text) + main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text) + branch_fm, _, _ = _parse_yaml_frontmatter(branch_text) - merged_fm = dict(main_fm) + merged_edges = {} for field in REWEAVE_EDGE_FIELDS: main_list = main_fm.get(field, []) branch_list = branch_fm.get(field, []) @@ -204,12 +325,13 @@ class TestFullUnionWorkflow: if not isinstance(branch_list, list): branch_list = [branch_list] if branch_list else [] if main_list or branch_list: - merged_fm[field] = _union_edge_lists(main_list, branch_list) + merged_edges[field] = _union_edge_lists(main_list, branch_list) - assert merged_fm["title"] == "Original" - assert merged_fm["confidence"] == 0.9 - assert merged_fm["type"] == "claim" - assert merged_fm["related"] == ["new-claim"] + result = _serialize_frontmatter(main_raw, merged_edges, main_body) + assert "title: Original" in result + assert "confidence: 0.9" in result + assert "type: claim" in result + assert "- new-claim" in result def test_scalar_edge_field_converted_to_list(self): """Edge fields stored as scalars (not lists) are handled gracefully.""" @@ -225,3 +347,16 @@ class TestFullUnionWorkflow: result = _union_edge_lists(main_list, branch_list) assert result == ["single-claim", "new-claim"] + + def test_yaml_formatting_preserved_across_reweave(self): + """The key test: non-edge YAML formatting stays byte-identical.""" + # Use unusual but valid YAML formatting + main_text = "---\ntitle: 'A \"Quoted\" Title'\nconfidence: 0.85\nsome_custom_field: [1, 2, 3]\nsupports:\n- old-claim\n---\nBody." + fm, raw, body = _parse_yaml_frontmatter(main_text) + merged_edges = {"supports": ["old-claim", "new-claim"]} + result = _serialize_frontmatter(raw, merged_edges, body) + + # These non-edge fields must be byte-identical to source + assert "title: 'A \"Quoted\" Title'" in result + assert "confidence: 0.85" in result + assert "some_custom_field: [1, 2, 3]" in result