teleo-infrastructure/tests/test_reweave_merge.py
m3taversal 84cb001dd6 fix: handle indented YAML list items in _serialize_edge_fields
The skip loop only matched `- ` (no indent) but YAML list items are
commonly written as `  - item` (2-space indent). This caused old list
items to persist alongside new ones, corrupting frontmatter on merge.

Fix: consume any line starting with space or dash as part of the current
field's value block.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 14:01:34 +01:00

362 lines
14 KiB
Python

"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, string-level splicing.
These test the pure functions used by _merge_reweave_pr in lib/merge.py.
Copied here because lib/merge.py's relative imports make direct import impractical in tests.
If these functions change in merge.py, update them here too.
"""
import pytest
import yaml
# --- Copied from lib/merge.py (pure functions, no dependencies) ---
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]:
if not text.startswith("---"):
return None, "", text
end = text.find("\n---", 3)
if end == -1:
return None, "", text
try:
raw_fm_text = text[4:end]
fm = yaml.safe_load(raw_fm_text)
body = text[end:]
return (fm if isinstance(fm, dict) else None), raw_fm_text, body
except Exception:
return None, "", text
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
seen = set()
result = []
for edge in main_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
for edge in branch_edges:
key = str(edge).strip().lower()
if key not in seen:
seen.add(key)
result.append(edge)
return result
def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str:
lines = raw_fm_text.split("\n")
result_lines = []
i = 0
fields_written = set()
while i < len(lines):
line = lines[i]
matched_field = None
for field in REWEAVE_EDGE_FIELDS:
if line.startswith(f"{field}:"):
matched_field = field
break
if matched_field:
fields_written.add(matched_field)
i += 1
while i < len(lines) and lines[i] and (lines[i][0] in (' ', '-')):
i += 1
edges = merged_edges.get(matched_field, [])
if edges:
result_lines.append(f"{matched_field}:")
for edge in edges:
result_lines.append(f"- {edge}")
continue
else:
result_lines.append(line)
i += 1
for field in REWEAVE_EDGE_FIELDS:
if field not in fields_written:
edges = merged_edges.get(field, [])
if edges:
result_lines.append(f"{field}:")
for edge in edges:
result_lines.append(f"- {edge}")
return "\n".join(result_lines)
def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str:
spliced = _serialize_edge_fields(raw_fm_text, merged_edges)
if body.startswith("\n"):
return f"---\n{spliced}{body}"
return f"---\n{spliced}\n{body}"
# --- End copied functions ---
class TestParseYamlFrontmatter:
def test_basic(self):
text = "---\ntitle: Test Claim\nsupports:\n- claim-a\n---\nBody text here."
fm, raw, body = _parse_yaml_frontmatter(text)
assert fm is not None
assert fm["title"] == "Test Claim"
assert fm["supports"] == ["claim-a"]
assert body.startswith("\n---")
assert "title: Test Claim" in raw
def test_no_frontmatter(self):
text = "Just plain text"
fm, raw, body = _parse_yaml_frontmatter(text)
assert fm is None
assert raw == ""
assert body == text
def test_malformed_yaml(self):
text = "---\n: invalid: yaml: {{{\n---\nBody"
fm, raw, body = _parse_yaml_frontmatter(text)
assert fm is None
class TestUnionEdgeLists:
def test_no_overlap(self):
main = ["claim-a", "claim-b"]
branch = ["claim-c", "claim-d"]
result = _union_edge_lists(main, branch)
assert result == ["claim-a", "claim-b", "claim-c", "claim-d"]
def test_overlap_preserves_main_order(self):
main = ["claim-b", "claim-a"]
branch = ["claim-a", "claim-c"]
result = _union_edge_lists(main, branch)
assert result == ["claim-b", "claim-a", "claim-c"]
def test_case_insensitive_dedup(self):
main = ["Claim A"]
branch = ["claim a", "Claim B"]
result = _union_edge_lists(main, branch)
assert len(result) == 2
assert result[0] == "Claim A"
assert result[1] == "Claim B"
def test_empty_main(self):
result = _union_edge_lists([], ["claim-a", "claim-b"])
assert result == ["claim-a", "claim-b"]
def test_empty_branch(self):
result = _union_edge_lists(["claim-a"], [])
assert result == ["claim-a"]
def test_both_empty(self):
assert _union_edge_lists([], []) == []
def test_duplicates_within_branch(self):
main = ["claim-a"]
branch = ["claim-b", "claim-b"]
result = _union_edge_lists(main, branch)
assert result == ["claim-a", "claim-b"]
class TestSerializeEdgeFields:
def test_replaces_existing_field(self):
raw = "title: Test\nsupports:\n- old-claim"
merged = {"supports": ["old-claim", "new-claim"]}
result = _serialize_edge_fields(raw, merged)
assert "- old-claim" in result
assert "- new-claim" in result
assert "title: Test" in result
def test_preserves_non_edge_fields_exactly(self):
raw = "title: 'Quoted Title'\nconfidence: 0.85\ntype: claim"
merged = {"related": ["new-claim"]}
result = _serialize_edge_fields(raw, merged)
assert "title: 'Quoted Title'" in result
assert "confidence: 0.85" in result
assert "type: claim" in result
assert "related:" in result
assert "- new-claim" in result
def test_appends_new_field(self):
raw = "title: Test\ntype: claim"
merged = {"supports": ["claim-a"]}
result = _serialize_edge_fields(raw, merged)
assert "title: Test" in result
assert "supports:" in result
assert "- claim-a" in result
def test_empty_edges_removes_field(self):
raw = "title: Test\nsupports:\n- old-claim\ntype: claim"
merged = {} # no edges to write
result = _serialize_edge_fields(raw, merged)
assert "supports:" not in result
assert "title: Test" in result
assert "type: claim" in result
def test_multiple_edge_fields(self):
raw = "title: Test\nsupports:\n- a\nchallenges:\n- b"
merged = {"supports": ["a", "c"], "challenges": ["b", "d"]}
result = _serialize_edge_fields(raw, merged)
lines = result.split("\n")
# supports and challenges both present with merged values
assert "- a" in result
assert "- c" in result
assert "- b" in result
assert "- d" in result
class TestSerializeFrontmatter:
def test_roundtrip_preserves_formatting(self):
original = "---\ntitle: 'Quoted Title'\nconfidence: 0.85\nsupports:\n- claim-a\n---\nBody text here."
fm, raw, body = _parse_yaml_frontmatter(original)
merged_edges = {"supports": ["claim-a", "claim-b"]}
result = _serialize_frontmatter(raw, merged_edges, body)
# Non-edge fields preserved exactly
assert "title: 'Quoted Title'" in result
assert "confidence: 0.85" in result
# Edge fields updated
assert "- claim-a" in result
assert "- claim-b" in result
# Structure preserved
assert result.startswith("---\n")
assert "\n---\n" in result
assert result.endswith("Body text here.")
def test_no_blank_line_before_closing_delimiter(self):
"""Ganymede critical: no extra blank line compounds on repeat reweaves."""
original = "---\ntitle: Test\nsupports:\n- a\n---\nBody."
fm, raw, body = _parse_yaml_frontmatter(original)
merged_edges = {"supports": ["a", "b"]}
result = _serialize_frontmatter(raw, merged_edges, body)
# Should NOT have \n\n--- (double newline before closing)
assert "\n\n---" not in result
def test_repeated_serialize_no_drift(self):
"""Repeated serialization should be idempotent — no accumulating blank lines."""
text = "---\ntitle: Test\nsupports:\n- a\n---\nBody."
merged_edges = {"supports": ["a", "b"]}
for _ in range(5):
fm, raw, body = _parse_yaml_frontmatter(text)
text = _serialize_frontmatter(raw, merged_edges, body)
assert text.count("\n\n") == 0 # no double newlines anywhere
class TestSupersetDetection:
def test_branch_is_superset(self):
main_edges = {"claim-a", "claim-b"}
branch_edges = {"claim-a", "claim-b", "claim-c"}
assert len(main_edges - branch_edges) == 0
def test_branch_missing_edge(self):
main_edges = {"claim-a", "claim-b"}
branch_edges = {"claim-a", "claim-c"}
assert "claim-b" in (main_edges - branch_edges)
def test_equal_sets(self):
main_edges = {"claim-a", "claim-b"}
branch_edges = {"claim-a", "claim-b"}
assert len(main_edges - branch_edges) == 0
class TestEdgeFieldsCoverage:
def test_standard_fields_present(self):
assert "supports" in REWEAVE_EDGE_FIELDS
assert "challenges" in REWEAVE_EDGE_FIELDS
assert "related" in REWEAVE_EDGE_FIELDS
assert "reweave_edges" in REWEAVE_EDGE_FIELDS
assert "depends_on" in REWEAVE_EDGE_FIELDS
class TestFullUnionWorkflow:
def test_main_evolved_branch_stale(self):
"""Main got new edges after branch was created. Union includes both."""
main_text = (
"---\ntitle: Test Claim\nconfidence: 0.8\n"
"supports:\n- claim-a\n- claim-b\n"
"related:\n- claim-x\n"
"---\nBody text."
)
branch_text = (
"---\ntitle: Test Claim\nconfidence: 0.8\n"
"supports:\n- claim-a\n"
"related:\n- claim-x\n- claim-y\n"
"reweave_edges:\n- \"claim-y|related|2026-04-04\"\n"
"---\nBody text."
)
main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text)
branch_fm, _, _ = _parse_yaml_frontmatter(branch_text)
merged_edges = {}
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
if main_list or branch_list:
merged_edges[field] = _union_edge_lists(main_list, branch_list)
assert merged_edges["supports"] == ["claim-a", "claim-b"]
assert "claim-x" in merged_edges["related"]
assert "claim-y" in merged_edges["related"]
assert len(merged_edges.get("reweave_edges", [])) == 1
# Verify non-edge fields preserved in serialization
result = _serialize_frontmatter(main_raw, merged_edges, main_body)
assert "confidence: 0.8" in result
assert "title: Test Claim" in result
def test_no_edge_fields_untouched(self):
"""Non-edge fields (title, confidence, type) come from main unchanged."""
main_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\n---\nBody."
branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n- new-claim\n---\nBody."
main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text)
branch_fm, _, _ = _parse_yaml_frontmatter(branch_text)
merged_edges = {}
for field in REWEAVE_EDGE_FIELDS:
main_list = main_fm.get(field, [])
branch_list = branch_fm.get(field, [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
if main_list or branch_list:
merged_edges[field] = _union_edge_lists(main_list, branch_list)
result = _serialize_frontmatter(main_raw, merged_edges, main_body)
assert "title: Original" in result
assert "confidence: 0.9" in result
assert "type: claim" in result
assert "- new-claim" in result
def test_scalar_edge_field_converted_to_list(self):
"""Edge fields stored as scalars (not lists) are handled gracefully."""
main_fm = {"supports": "single-claim"}
branch_fm = {"supports": ["single-claim", "new-claim"]}
main_list = main_fm.get("supports", [])
branch_list = branch_fm.get("supports", [])
if not isinstance(main_list, list):
main_list = [main_list] if main_list else []
if not isinstance(branch_list, list):
branch_list = [branch_list] if branch_list else []
result = _union_edge_lists(main_list, branch_list)
assert result == ["single-claim", "new-claim"]
def test_yaml_formatting_preserved_across_reweave(self):
"""The key test: non-edge YAML formatting stays byte-identical."""
# Use unusual but valid YAML formatting
main_text = "---\ntitle: 'A \"Quoted\" Title'\nconfidence: 0.85\nsome_custom_field: [1, 2, 3]\nsupports:\n- old-claim\n---\nBody."
fm, raw, body = _parse_yaml_frontmatter(main_text)
merged_edges = {"supports": ["old-claim", "new-claim"]}
result = _serialize_frontmatter(raw, merged_edges, body)
# These non-edge fields must be byte-identical to source
assert "title: 'A \"Quoted\" Title'" in result
assert "confidence: 0.85" in result
assert "some_custom_field: [1, 2, 3]" in result