diff --git a/lib/frontmatter.py b/lib/frontmatter.py new file mode 100644 index 0000000..2286f2b --- /dev/null +++ b/lib/frontmatter.py @@ -0,0 +1,133 @@ +"""Pure YAML frontmatter parsing and serialization for claim/entity files. + +Shared by merge (reweave merge, reciprocal edges) and reweave scripts. +All functions are pure — zero I/O, zero async, zero DB. + +Extracted from merge.py Phase 6 of decomposition (Ganymede-approved plan). +""" + +import yaml + + +# Edge field names recognized in claim frontmatter. +# Order matters: serialize_edge_fields writes them in this order when appending new fields. +REWEAVE_EDGE_FIELDS = ("supports", "challenges", "challenged_by", "depends_on", "related", "reweave_edges") + +# Reciprocal edge mapping: when A has edge_type → B, B gets reciprocal → A. +# When A supports B, B also supports A (approximately symmetric). +# When A challenges B, B is challenged_by A (NOT symmetric — direction matters). +RECIPROCAL_EDGE_MAP = { + "supports": "supports", + "challenges": "challenged_by", + "related": "related", + "depends_on": "related", # A depends_on B → B is related to A (not symmetric) +} + + +def parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]: + """Parse YAML frontmatter from markdown text. + + Returns (frontmatter_dict, raw_fm_text, body_text_including_closing_delimiter). + Returns (None, "", text) if no valid frontmatter found. + raw_fm_text is the text between the --- delimiters (no delimiters, no leading newline). + """ + if not text.startswith("---"): + return None, "", text + end = text.find("\n---", 3) + if end == -1: + return None, "", text + try: + raw_fm_text = text[4:end] # skip "---\n", stop before "\n---" + fm = yaml.safe_load(raw_fm_text) + body = text[end:] # includes closing \n--- and body + return (fm if isinstance(fm, dict) else None), raw_fm_text, body + except Exception: + return None, "", text + + +def union_edge_lists(main_edges: list, branch_edges: list) -> list: + """Union two edge lists, preserving order from main (append new at end). + + Deduplicates by lowercase slug. Main's order is preserved; branch-only + edges are appended in their original order. + """ + seen = set() + result = [] + for edge in main_edges: + key = str(edge).strip().lower() + if key not in seen: + seen.add(key) + result.append(edge) + for edge in branch_edges: + key = str(edge).strip().lower() + if key not in seen: + seen.add(key) + result.append(edge) + return result + + +def serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str: + """Splice merged edge fields into raw frontmatter text, preserving all other fields byte-identical. + + Only modifies REWEAVE_EDGE_FIELDS lines. All other frontmatter (title, confidence, type, etc.) + stays exactly as it was in the source text — no yaml.dump reformatting. + + Args: + raw_fm_text: The raw YAML text between the --- delimiters (no delimiters included). + merged_edges: {field_name: [edge_values]} for each edge field that should be present. + """ + lines = raw_fm_text.split("\n") + result_lines = [] + i = 0 + fields_written = set() + + while i < len(lines): + line = lines[i] + # Check if this line starts an edge field + matched_field = None + for field in REWEAVE_EDGE_FIELDS: + if line.startswith(f"{field}:"): + matched_field = field + break + + if matched_field: + fields_written.add(matched_field) + # Skip the old field and its list items (may be indented with spaces) + i += 1 + while i < len(lines) and lines[i] and (lines[i][0] in (' ', '-')): + i += 1 + # Write the merged version + edges = merged_edges.get(matched_field, []) + if edges: + result_lines.append(f"{matched_field}:") + for edge in edges: + result_lines.append(f"- {edge}") + # Don't increment i — it's already past the old field + continue + else: + result_lines.append(line) + i += 1 + + # Append any new edge fields that didn't exist in the original + for field in REWEAVE_EDGE_FIELDS: + if field not in fields_written: + edges = merged_edges.get(field, []) + if edges: + result_lines.append(f"{field}:") + for edge in edges: + result_lines.append(f"- {edge}") + + return "\n".join(result_lines) + + +def serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str: + """Rebuild markdown file: splice merged edges into raw frontmatter, append body. + + Uses string-level surgery — only edge fields are modified. All other frontmatter + stays byte-identical to the source. No yaml.dump reformatting. + """ + spliced = serialize_edge_fields(raw_fm_text, merged_edges) + # body starts with \n--- (closing delimiter + body text) + if body.startswith("\n"): + return f"---\n{spliced}{body}" + return f"---\n{spliced}\n{body}" diff --git a/lib/merge.py b/lib/merge.py index bbfbdcc..cc79c6c 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -411,130 +411,14 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]: await _git("branch", "-D", clean_branch) -REWEAVE_EDGE_FIELDS = ("supports", "challenges", "challenged_by", "depends_on", "related", "reweave_edges") - -# When A supports B, B also supports A (approximately symmetric). -# When A challenges B, B is challenged_by A (NOT symmetric — direction matters). -RECIPROCAL_EDGE_MAP = { - "supports": "supports", - "challenges": "challenged_by", - "related": "related", - "depends_on": "related", # A depends_on B → B is related to A (not symmetric) -} - - -def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]: - """Parse YAML frontmatter from markdown text. - - Returns (frontmatter_dict, raw_fm_text, body_text_including_closing_delimiter). - Returns (None, "", text) if no valid frontmatter found. - raw_fm_text is the text between the --- delimiters (no delimiters, no leading newline). - """ - import yaml - - if not text.startswith("---"): - return None, "", text - end = text.find("\n---", 3) - if end == -1: - return None, "", text - try: - raw_fm_text = text[4:end] # skip "---\n", stop before "\n---" - fm = yaml.safe_load(raw_fm_text) - body = text[end:] # includes closing \n--- and body - return (fm if isinstance(fm, dict) else None), raw_fm_text, body - except Exception: - return None, "", text - - -def _union_edge_lists(main_edges: list, branch_edges: list) -> list: - """Union two edge lists, preserving order from main (append new at end). - - Deduplicates by lowercase slug. Main's order is preserved; branch-only - edges are appended in their original order. - """ - seen = set() - result = [] - for edge in main_edges: - key = str(edge).strip().lower() - if key not in seen: - seen.add(key) - result.append(edge) - for edge in branch_edges: - key = str(edge).strip().lower() - if key not in seen: - seen.add(key) - result.append(edge) - return result - - -def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str: - """Splice merged edge fields into raw frontmatter text, preserving all other fields byte-identical. - - Only modifies REWEAVE_EDGE_FIELDS lines. All other frontmatter (title, confidence, type, etc.) - stays exactly as it was in the source text — no yaml.dump reformatting. - - Args: - raw_fm_text: The raw YAML text between the --- delimiters (no delimiters included). - merged_edges: {field_name: [edge_values]} for each edge field that should be present. - """ - import re - import yaml - - lines = raw_fm_text.split("\n") - result_lines = [] - i = 0 - fields_written = set() - - while i < len(lines): - line = lines[i] - # Check if this line starts an edge field - matched_field = None - for field in REWEAVE_EDGE_FIELDS: - if line.startswith(f"{field}:"): - matched_field = field - break - - if matched_field: - fields_written.add(matched_field) - # Skip the old field and its list items (may be indented with spaces) - i += 1 - while i < len(lines) and lines[i] and (lines[i][0] in (' ', '-')): - i += 1 - # Write the merged version - edges = merged_edges.get(matched_field, []) - if edges: - result_lines.append(f"{matched_field}:") - for edge in edges: - result_lines.append(f"- {edge}") - # Don't increment i — it's already past the old field - continue - else: - result_lines.append(line) - i += 1 - - # Append any new edge fields that didn't exist in the original - for field in REWEAVE_EDGE_FIELDS: - if field not in fields_written: - edges = merged_edges.get(field, []) - if edges: - result_lines.append(f"{field}:") - for edge in edges: - result_lines.append(f"- {edge}") - - return "\n".join(result_lines) - - -def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str: - """Rebuild markdown file: splice merged edges into raw frontmatter, append body. - - Uses string-level surgery — only edge fields are modified. All other frontmatter - stays byte-identical to the source. No yaml.dump reformatting. - """ - spliced = _serialize_edge_fields(raw_fm_text, merged_edges) - # body starts with \n--- (closing delimiter + body text) - if body.startswith("\n"): - return f"---\n{spliced}{body}" - return f"---\n{spliced}\n{body}" +from .frontmatter import ( + REWEAVE_EDGE_FIELDS, + RECIPROCAL_EDGE_MAP, + parse_yaml_frontmatter, + union_edge_lists, + serialize_edge_fields, + serialize_frontmatter, +) async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: @@ -605,8 +489,8 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: continue # Parse frontmatter from both versions - main_fm, main_raw_fm, main_body = _parse_yaml_frontmatter(main_content) - branch_fm, _branch_raw_fm, branch_body = _parse_yaml_frontmatter(branch_content) + main_fm, main_raw_fm, main_body = parse_yaml_frontmatter(main_content) + branch_fm, _branch_raw_fm, branch_body = parse_yaml_frontmatter(branch_content) if main_fm is None or branch_fm is None: # Parse failure = something unexpected. Fail the merge, don't fallback @@ -638,13 +522,13 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]: # Collect merged edges for string-level splicing if main_list or branch_list: - merged_edges[field] = _union_edge_lists(main_list, branch_list) + merged_edges[field] = union_edge_lists(main_list, branch_list) # Write merged file — splice edges into main's raw frontmatter, use main's body full_path = os.path.join(worktree_path, fpath) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "w") as f: - f.write(_serialize_frontmatter(main_raw_fm, merged_edges, main_body)) + f.write(serialize_frontmatter(main_raw_fm, merged_edges, main_body)) await _git("add", fpath, cwd=worktree_path) merged_count += 1 @@ -950,7 +834,7 @@ async def _reciprocal_edges(main_sha: str, branch_sha: str): except Exception: continue - fm, raw_fm, body = _parse_yaml_frontmatter(content) + fm, raw_fm, body = parse_yaml_frontmatter(content) if fm is None: continue @@ -1031,7 +915,7 @@ def _add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool: except Exception: return False - fm, raw_fm, body = _parse_yaml_frontmatter(content) + fm, raw_fm, body = parse_yaml_frontmatter(content) if fm is None: return False @@ -1058,7 +942,7 @@ def _add_edge_to_file(file_path, edge_type: str, target_slug: str) -> bool: merged_edges.setdefault(edge_type, []).append(target_slug) # Serialize using the same string-surgery approach as reweave - new_fm = _serialize_edge_fields(raw_fm, merged_edges) + new_fm = serialize_edge_fields(raw_fm, merged_edges) if body.startswith("\n"): new_content = f"---\n{new_fm}{body}" else: diff --git a/tests/test_reweave_merge.py b/tests/test_reweave_merge.py index e1af896..e54d60c 100644 --- a/tests/test_reweave_merge.py +++ b/tests/test_reweave_merge.py @@ -1,102 +1,26 @@ -"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, string-level splicing. +"""Tests for frontmatter helpers — frontmatter union, order-preserving dedup, string-level splicing. -These test the pure functions used by _merge_reweave_pr in lib/merge.py. -Copied here because lib/merge.py's relative imports make direct import impractical in tests. -If these functions change in merge.py, update them here too. +These test the pure functions in lib/frontmatter.py (extracted from merge.py Phase 6). """ +import sys +import os import pytest -import yaml -# --- Copied from lib/merge.py (pure functions, no dependencies) --- - -REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges") - - -def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]: - if not text.startswith("---"): - return None, "", text - end = text.find("\n---", 3) - if end == -1: - return None, "", text - try: - raw_fm_text = text[4:end] - fm = yaml.safe_load(raw_fm_text) - body = text[end:] - return (fm if isinstance(fm, dict) else None), raw_fm_text, body - except Exception: - return None, "", text - - -def _union_edge_lists(main_edges: list, branch_edges: list) -> list: - seen = set() - result = [] - for edge in main_edges: - key = str(edge).strip().lower() - if key not in seen: - seen.add(key) - result.append(edge) - for edge in branch_edges: - key = str(edge).strip().lower() - if key not in seen: - seen.add(key) - result.append(edge) - return result - - -def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str: - lines = raw_fm_text.split("\n") - result_lines = [] - i = 0 - fields_written = set() - - while i < len(lines): - line = lines[i] - matched_field = None - for field in REWEAVE_EDGE_FIELDS: - if line.startswith(f"{field}:"): - matched_field = field - break - - if matched_field: - fields_written.add(matched_field) - i += 1 - while i < len(lines) and lines[i] and (lines[i][0] in (' ', '-')): - i += 1 - edges = merged_edges.get(matched_field, []) - if edges: - result_lines.append(f"{matched_field}:") - for edge in edges: - result_lines.append(f"- {edge}") - continue - else: - result_lines.append(line) - i += 1 - - for field in REWEAVE_EDGE_FIELDS: - if field not in fields_written: - edges = merged_edges.get(field, []) - if edges: - result_lines.append(f"{field}:") - for edge in edges: - result_lines.append(f"- {edge}") - - return "\n".join(result_lines) - - -def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str: - spliced = _serialize_edge_fields(raw_fm_text, merged_edges) - if body.startswith("\n"): - return f"---\n{spliced}{body}" - return f"---\n{spliced}\n{body}" - -# --- End copied functions --- +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib")) +from frontmatter import ( + REWEAVE_EDGE_FIELDS, + parse_yaml_frontmatter, + union_edge_lists, + serialize_edge_fields, + serialize_frontmatter, +) class TestParseYamlFrontmatter: def test_basic(self): text = "---\ntitle: Test Claim\nsupports:\n- claim-a\n---\nBody text here." - fm, raw, body = _parse_yaml_frontmatter(text) + fm, raw, body = parse_yaml_frontmatter(text) assert fm is not None assert fm["title"] == "Test Claim" assert fm["supports"] == ["claim-a"] @@ -105,14 +29,14 @@ class TestParseYamlFrontmatter: def test_no_frontmatter(self): text = "Just plain text" - fm, raw, body = _parse_yaml_frontmatter(text) + fm, raw, body = parse_yaml_frontmatter(text) assert fm is None assert raw == "" assert body == text def test_malformed_yaml(self): text = "---\n: invalid: yaml: {{{\n---\nBody" - fm, raw, body = _parse_yaml_frontmatter(text) + fm, raw, body = parse_yaml_frontmatter(text) assert fm is None @@ -120,38 +44,38 @@ class TestUnionEdgeLists: def test_no_overlap(self): main = ["claim-a", "claim-b"] branch = ["claim-c", "claim-d"] - result = _union_edge_lists(main, branch) + result = union_edge_lists(main, branch) assert result == ["claim-a", "claim-b", "claim-c", "claim-d"] def test_overlap_preserves_main_order(self): main = ["claim-b", "claim-a"] branch = ["claim-a", "claim-c"] - result = _union_edge_lists(main, branch) + result = union_edge_lists(main, branch) assert result == ["claim-b", "claim-a", "claim-c"] def test_case_insensitive_dedup(self): main = ["Claim A"] branch = ["claim a", "Claim B"] - result = _union_edge_lists(main, branch) + result = union_edge_lists(main, branch) assert len(result) == 2 assert result[0] == "Claim A" assert result[1] == "Claim B" def test_empty_main(self): - result = _union_edge_lists([], ["claim-a", "claim-b"]) + result = union_edge_lists([], ["claim-a", "claim-b"]) assert result == ["claim-a", "claim-b"] def test_empty_branch(self): - result = _union_edge_lists(["claim-a"], []) + result = union_edge_lists(["claim-a"], []) assert result == ["claim-a"] def test_both_empty(self): - assert _union_edge_lists([], []) == [] + assert union_edge_lists([], []) == [] def test_duplicates_within_branch(self): main = ["claim-a"] branch = ["claim-b", "claim-b"] - result = _union_edge_lists(main, branch) + result = union_edge_lists(main, branch) assert result == ["claim-a", "claim-b"] @@ -159,7 +83,7 @@ class TestSerializeEdgeFields: def test_replaces_existing_field(self): raw = "title: Test\nsupports:\n- old-claim" merged = {"supports": ["old-claim", "new-claim"]} - result = _serialize_edge_fields(raw, merged) + result = serialize_edge_fields(raw, merged) assert "- old-claim" in result assert "- new-claim" in result assert "title: Test" in result @@ -167,7 +91,7 @@ class TestSerializeEdgeFields: def test_preserves_non_edge_fields_exactly(self): raw = "title: 'Quoted Title'\nconfidence: 0.85\ntype: claim" merged = {"related": ["new-claim"]} - result = _serialize_edge_fields(raw, merged) + result = serialize_edge_fields(raw, merged) assert "title: 'Quoted Title'" in result assert "confidence: 0.85" in result assert "type: claim" in result @@ -177,7 +101,7 @@ class TestSerializeEdgeFields: def test_appends_new_field(self): raw = "title: Test\ntype: claim" merged = {"supports": ["claim-a"]} - result = _serialize_edge_fields(raw, merged) + result = serialize_edge_fields(raw, merged) assert "title: Test" in result assert "supports:" in result assert "- claim-a" in result @@ -185,7 +109,7 @@ class TestSerializeEdgeFields: def test_empty_edges_removes_field(self): raw = "title: Test\nsupports:\n- old-claim\ntype: claim" merged = {} # no edges to write - result = _serialize_edge_fields(raw, merged) + result = serialize_edge_fields(raw, merged) assert "supports:" not in result assert "title: Test" in result assert "type: claim" in result @@ -193,7 +117,7 @@ class TestSerializeEdgeFields: def test_multiple_edge_fields(self): raw = "title: Test\nsupports:\n- a\nchallenges:\n- b" merged = {"supports": ["a", "c"], "challenges": ["b", "d"]} - result = _serialize_edge_fields(raw, merged) + result = serialize_edge_fields(raw, merged) lines = result.split("\n") # supports and challenges both present with merged values assert "- a" in result @@ -205,9 +129,9 @@ class TestSerializeEdgeFields: class TestSerializeFrontmatter: def test_roundtrip_preserves_formatting(self): original = "---\ntitle: 'Quoted Title'\nconfidence: 0.85\nsupports:\n- claim-a\n---\nBody text here." - fm, raw, body = _parse_yaml_frontmatter(original) + fm, raw, body = parse_yaml_frontmatter(original) merged_edges = {"supports": ["claim-a", "claim-b"]} - result = _serialize_frontmatter(raw, merged_edges, body) + result = serialize_frontmatter(raw, merged_edges, body) # Non-edge fields preserved exactly assert "title: 'Quoted Title'" in result @@ -223,9 +147,9 @@ class TestSerializeFrontmatter: def test_no_blank_line_before_closing_delimiter(self): """Ganymede critical: no extra blank line compounds on repeat reweaves.""" original = "---\ntitle: Test\nsupports:\n- a\n---\nBody." - fm, raw, body = _parse_yaml_frontmatter(original) + fm, raw, body = parse_yaml_frontmatter(original) merged_edges = {"supports": ["a", "b"]} - result = _serialize_frontmatter(raw, merged_edges, body) + result = serialize_frontmatter(raw, merged_edges, body) # Should NOT have \n\n--- (double newline before closing) assert "\n\n---" not in result @@ -235,8 +159,8 @@ class TestSerializeFrontmatter: merged_edges = {"supports": ["a", "b"]} for _ in range(5): - fm, raw, body = _parse_yaml_frontmatter(text) - text = _serialize_frontmatter(raw, merged_edges, body) + fm, raw, body = parse_yaml_frontmatter(text) + text = serialize_frontmatter(raw, merged_edges, body) assert text.count("\n\n") == 0 # no double newlines anywhere @@ -284,8 +208,8 @@ class TestFullUnionWorkflow: "---\nBody text." ) - main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text) - branch_fm, _, _ = _parse_yaml_frontmatter(branch_text) + main_fm, main_raw, main_body = parse_yaml_frontmatter(main_text) + branch_fm, _, _ = parse_yaml_frontmatter(branch_text) merged_edges = {} for field in REWEAVE_EDGE_FIELDS: @@ -296,7 +220,7 @@ class TestFullUnionWorkflow: if not isinstance(branch_list, list): branch_list = [branch_list] if branch_list else [] if main_list or branch_list: - merged_edges[field] = _union_edge_lists(main_list, branch_list) + merged_edges[field] = union_edge_lists(main_list, branch_list) assert merged_edges["supports"] == ["claim-a", "claim-b"] assert "claim-x" in merged_edges["related"] @@ -304,7 +228,7 @@ class TestFullUnionWorkflow: assert len(merged_edges.get("reweave_edges", [])) == 1 # Verify non-edge fields preserved in serialization - result = _serialize_frontmatter(main_raw, merged_edges, main_body) + result = serialize_frontmatter(main_raw, merged_edges, main_body) assert "confidence: 0.8" in result assert "title: Test Claim" in result @@ -313,8 +237,8 @@ class TestFullUnionWorkflow: main_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\n---\nBody." branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n- new-claim\n---\nBody." - main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text) - branch_fm, _, _ = _parse_yaml_frontmatter(branch_text) + main_fm, main_raw, main_body = parse_yaml_frontmatter(main_text) + branch_fm, _, _ = parse_yaml_frontmatter(branch_text) merged_edges = {} for field in REWEAVE_EDGE_FIELDS: @@ -325,9 +249,9 @@ class TestFullUnionWorkflow: if not isinstance(branch_list, list): branch_list = [branch_list] if branch_list else [] if main_list or branch_list: - merged_edges[field] = _union_edge_lists(main_list, branch_list) + merged_edges[field] = union_edge_lists(main_list, branch_list) - result = _serialize_frontmatter(main_raw, merged_edges, main_body) + result = serialize_frontmatter(main_raw, merged_edges, main_body) assert "title: Original" in result assert "confidence: 0.9" in result assert "type: claim" in result @@ -345,16 +269,16 @@ class TestFullUnionWorkflow: if not isinstance(branch_list, list): branch_list = [branch_list] if branch_list else [] - result = _union_edge_lists(main_list, branch_list) + result = union_edge_lists(main_list, branch_list) assert result == ["single-claim", "new-claim"] def test_yaml_formatting_preserved_across_reweave(self): """The key test: non-edge YAML formatting stays byte-identical.""" # Use unusual but valid YAML formatting main_text = "---\ntitle: 'A \"Quoted\" Title'\nconfidence: 0.85\nsome_custom_field: [1, 2, 3]\nsupports:\n- old-claim\n---\nBody." - fm, raw, body = _parse_yaml_frontmatter(main_text) + fm, raw, body = parse_yaml_frontmatter(main_text) merged_edges = {"supports": ["old-claim", "new-claim"]} - result = _serialize_frontmatter(raw, merged_edges, body) + result = serialize_frontmatter(raw, merged_edges, body) # These non-edge fields must be byte-identical to source assert "title: 'A \"Quoted\" Title'" in result