fix: string-level edge splicing in reweave merge — no yaml.dump reformatting
Two fixes from Ganymede review:
1. CRITICAL: blank line before closing --- compounded on repeat reweaves.
Body starts with \n---, so \n{body} created \n\n---. Fixed by checking
body prefix.
2. Replaced yaml.dump round-trip with _serialize_edge_fields() that splices
only edge arrays into raw frontmatter text. Non-edge fields (title,
confidence, type, quotes, flow styles) stay byte-identical to main HEAD.
_parse_yaml_frontmatter now returns 3-tuple: (dict, raw_fm_text, body).
_serialize_frontmatter takes (raw_fm_text, merged_edges_dict, body).
26 tests pass including idempotency (5x serialize), formatting preservation,
and no-blank-line regression test.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6b3a5833df
commit
b091642146
2 changed files with 267 additions and 69 deletions
105
lib/merge.py
105
lib/merge.py
|
|
@ -393,25 +393,27 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
|
||||||
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
|
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
|
||||||
|
|
||||||
|
|
||||||
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str]:
|
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]:
|
||||||
"""Parse YAML frontmatter from markdown text.
|
"""Parse YAML frontmatter from markdown text.
|
||||||
|
|
||||||
Returns (frontmatter_dict, body_text_including_closing_delimiter).
|
Returns (frontmatter_dict, raw_fm_text, body_text_including_closing_delimiter).
|
||||||
Returns (None, text) if no valid frontmatter found.
|
Returns (None, "", text) if no valid frontmatter found.
|
||||||
|
raw_fm_text is the text between the --- delimiters (no delimiters, no leading newline).
|
||||||
"""
|
"""
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
if not text.startswith("---"):
|
if not text.startswith("---"):
|
||||||
return None, text
|
return None, "", text
|
||||||
end = text.find("\n---", 3)
|
end = text.find("\n---", 3)
|
||||||
if end == -1:
|
if end == -1:
|
||||||
return None, text
|
return None, "", text
|
||||||
try:
|
try:
|
||||||
fm = yaml.safe_load(text[3:end])
|
raw_fm_text = text[4:end] # skip "---\n", stop before "\n---"
|
||||||
body = text[end:] # includes closing --- and body
|
fm = yaml.safe_load(raw_fm_text)
|
||||||
return fm if isinstance(fm, dict) else None, body
|
body = text[end:] # includes closing \n--- and body
|
||||||
|
return (fm if isinstance(fm, dict) else None), raw_fm_text, body
|
||||||
except Exception:
|
except Exception:
|
||||||
return None, text
|
return None, "", text
|
||||||
|
|
||||||
|
|
||||||
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
|
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
|
||||||
|
|
@ -435,12 +437,74 @@ def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def _serialize_frontmatter(fm: dict, body: str) -> str:
|
def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str:
|
||||||
"""Serialize frontmatter dict + body back to markdown text."""
|
"""Splice merged edge fields into raw frontmatter text, preserving all other fields byte-identical.
|
||||||
|
|
||||||
|
Only modifies REWEAVE_EDGE_FIELDS lines. All other frontmatter (title, confidence, type, etc.)
|
||||||
|
stays exactly as it was in the source text — no yaml.dump reformatting.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
raw_fm_text: The raw YAML text between the --- delimiters (no delimiters included).
|
||||||
|
merged_edges: {field_name: [edge_values]} for each edge field that should be present.
|
||||||
|
"""
|
||||||
|
import re
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
fm_str = yaml.dump(fm, default_flow_style=False, allow_unicode=True, sort_keys=False).rstrip("\n")
|
lines = raw_fm_text.split("\n")
|
||||||
return f"---\n{fm_str}\n{body}"
|
result_lines = []
|
||||||
|
i = 0
|
||||||
|
fields_written = set()
|
||||||
|
|
||||||
|
while i < len(lines):
|
||||||
|
line = lines[i]
|
||||||
|
# Check if this line starts an edge field
|
||||||
|
matched_field = None
|
||||||
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
|
if line.startswith(f"{field}:"):
|
||||||
|
matched_field = field
|
||||||
|
break
|
||||||
|
|
||||||
|
if matched_field:
|
||||||
|
fields_written.add(matched_field)
|
||||||
|
# Skip the old field and its list items
|
||||||
|
i += 1
|
||||||
|
while i < len(lines) and lines[i].startswith("- "):
|
||||||
|
i += 1
|
||||||
|
# Write the merged version
|
||||||
|
edges = merged_edges.get(matched_field, [])
|
||||||
|
if edges:
|
||||||
|
result_lines.append(f"{matched_field}:")
|
||||||
|
for edge in edges:
|
||||||
|
result_lines.append(f"- {edge}")
|
||||||
|
# Don't increment i — it's already past the old field
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
result_lines.append(line)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
# Append any new edge fields that didn't exist in the original
|
||||||
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
|
if field not in fields_written:
|
||||||
|
edges = merged_edges.get(field, [])
|
||||||
|
if edges:
|
||||||
|
result_lines.append(f"{field}:")
|
||||||
|
for edge in edges:
|
||||||
|
result_lines.append(f"- {edge}")
|
||||||
|
|
||||||
|
return "\n".join(result_lines)
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str:
|
||||||
|
"""Rebuild markdown file: splice merged edges into raw frontmatter, append body.
|
||||||
|
|
||||||
|
Uses string-level surgery — only edge fields are modified. All other frontmatter
|
||||||
|
stays byte-identical to the source. No yaml.dump reformatting.
|
||||||
|
"""
|
||||||
|
spliced = _serialize_edge_fields(raw_fm_text, merged_edges)
|
||||||
|
# body starts with \n--- (closing delimiter + body text)
|
||||||
|
if body.startswith("\n"):
|
||||||
|
return f"---\n{spliced}{body}"
|
||||||
|
return f"---\n{spliced}\n{body}"
|
||||||
|
|
||||||
|
|
||||||
async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
|
async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
|
||||||
|
|
@ -510,8 +574,8 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Parse frontmatter from both versions
|
# Parse frontmatter from both versions
|
||||||
main_fm, main_body = _parse_yaml_frontmatter(main_content)
|
main_fm, main_raw_fm, main_body = _parse_yaml_frontmatter(main_content)
|
||||||
branch_fm, branch_body = _parse_yaml_frontmatter(branch_content)
|
branch_fm, _branch_raw_fm, branch_body = _parse_yaml_frontmatter(branch_content)
|
||||||
|
|
||||||
if main_fm is None or branch_fm is None:
|
if main_fm is None or branch_fm is None:
|
||||||
# Parse failure = something unexpected. Fail the merge, don't fallback
|
# Parse failure = something unexpected. Fail the merge, don't fallback
|
||||||
|
|
@ -540,8 +604,8 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
|
||||||
)
|
)
|
||||||
skipped_non_superset.append(f"{fpath}:{field}")
|
skipped_non_superset.append(f"{fpath}:{field}")
|
||||||
|
|
||||||
# Union edge lists: main's edges first (order-preserved), branch-new appended
|
# Collect merged edge fields for string-level splicing
|
||||||
merged_fm = dict(main_fm) # Start with main's full frontmatter
|
merged_edges = {}
|
||||||
for field in REWEAVE_EDGE_FIELDS:
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
main_list = main_fm.get(field, [])
|
main_list = main_fm.get(field, [])
|
||||||
branch_list = branch_fm.get(field, [])
|
branch_list = branch_fm.get(field, [])
|
||||||
|
|
@ -549,15 +613,14 @@ async def _merge_reweave_pr(branch: str) -> tuple[bool, str]:
|
||||||
main_list = [main_list] if main_list else []
|
main_list = [main_list] if main_list else []
|
||||||
if not isinstance(branch_list, list):
|
if not isinstance(branch_list, list):
|
||||||
branch_list = [branch_list] if branch_list else []
|
branch_list = [branch_list] if branch_list else []
|
||||||
|
|
||||||
if main_list or branch_list:
|
if main_list or branch_list:
|
||||||
merged_fm[field] = _union_edge_lists(main_list, branch_list)
|
merged_edges[field] = _union_edge_lists(main_list, branch_list)
|
||||||
|
|
||||||
# Write merged file — use main's body (reweave doesn't touch body text)
|
# Write merged file — splice edges into main's raw frontmatter, use main's body
|
||||||
full_path = os.path.join(worktree_path, fpath)
|
full_path = os.path.join(worktree_path, fpath)
|
||||||
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
||||||
with open(full_path, "w") as f:
|
with open(full_path, "w") as f:
|
||||||
f.write(_serialize_frontmatter(merged_fm, main_body))
|
f.write(_serialize_frontmatter(main_raw_fm, merged_edges, main_body))
|
||||||
await _git("add", fpath, cwd=worktree_path)
|
await _git("add", fpath, cwd=worktree_path)
|
||||||
merged_count += 1
|
merged_count += 1
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, superset assertion.
|
"""Tests for _merge_reweave_pr helpers — frontmatter union, order-preserving dedup, string-level splicing.
|
||||||
|
|
||||||
These test the pure functions used by _merge_reweave_pr in lib/merge.py.
|
These test the pure functions used by _merge_reweave_pr in lib/merge.py.
|
||||||
Copied here because lib/merge.py's relative imports make direct import impractical in tests.
|
Copied here because lib/merge.py's relative imports make direct import impractical in tests.
|
||||||
|
|
@ -13,18 +13,19 @@ import yaml
|
||||||
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
|
REWEAVE_EDGE_FIELDS = ("supports", "challenges", "depends_on", "related", "reweave_edges")
|
||||||
|
|
||||||
|
|
||||||
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str]:
|
def _parse_yaml_frontmatter(text: str) -> tuple[dict | None, str, str]:
|
||||||
if not text.startswith("---"):
|
if not text.startswith("---"):
|
||||||
return None, text
|
return None, "", text
|
||||||
end = text.find("\n---", 3)
|
end = text.find("\n---", 3)
|
||||||
if end == -1:
|
if end == -1:
|
||||||
return None, text
|
return None, "", text
|
||||||
try:
|
try:
|
||||||
fm = yaml.safe_load(text[3:end])
|
raw_fm_text = text[4:end]
|
||||||
|
fm = yaml.safe_load(raw_fm_text)
|
||||||
body = text[end:]
|
body = text[end:]
|
||||||
return fm if isinstance(fm, dict) else None, body
|
return (fm if isinstance(fm, dict) else None), raw_fm_text, body
|
||||||
except Exception:
|
except Exception:
|
||||||
return None, text
|
return None, "", text
|
||||||
|
|
||||||
|
|
||||||
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
|
def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
|
||||||
|
|
@ -43,31 +44,75 @@ def _union_edge_lists(main_edges: list, branch_edges: list) -> list:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def _serialize_frontmatter(fm: dict, body: str) -> str:
|
def _serialize_edge_fields(raw_fm_text: str, merged_edges: dict[str, list]) -> str:
|
||||||
fm_str = yaml.dump(fm, default_flow_style=False, allow_unicode=True, sort_keys=False).rstrip("\n")
|
lines = raw_fm_text.split("\n")
|
||||||
return f"---\n{fm_str}\n{body}"
|
result_lines = []
|
||||||
|
i = 0
|
||||||
|
fields_written = set()
|
||||||
|
|
||||||
|
while i < len(lines):
|
||||||
|
line = lines[i]
|
||||||
|
matched_field = None
|
||||||
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
|
if line.startswith(f"{field}:"):
|
||||||
|
matched_field = field
|
||||||
|
break
|
||||||
|
|
||||||
|
if matched_field:
|
||||||
|
fields_written.add(matched_field)
|
||||||
|
i += 1
|
||||||
|
while i < len(lines) and lines[i].startswith("- "):
|
||||||
|
i += 1
|
||||||
|
edges = merged_edges.get(matched_field, [])
|
||||||
|
if edges:
|
||||||
|
result_lines.append(f"{matched_field}:")
|
||||||
|
for edge in edges:
|
||||||
|
result_lines.append(f"- {edge}")
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
result_lines.append(line)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
|
if field not in fields_written:
|
||||||
|
edges = merged_edges.get(field, [])
|
||||||
|
if edges:
|
||||||
|
result_lines.append(f"{field}:")
|
||||||
|
for edge in edges:
|
||||||
|
result_lines.append(f"- {edge}")
|
||||||
|
|
||||||
|
return "\n".join(result_lines)
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_frontmatter(raw_fm_text: str, merged_edges: dict[str, list], body: str) -> str:
|
||||||
|
spliced = _serialize_edge_fields(raw_fm_text, merged_edges)
|
||||||
|
if body.startswith("\n"):
|
||||||
|
return f"---\n{spliced}{body}"
|
||||||
|
return f"---\n{spliced}\n{body}"
|
||||||
|
|
||||||
# --- End copied functions ---
|
# --- End copied functions ---
|
||||||
|
|
||||||
|
|
||||||
class TestParseYamlFrontmatter:
|
class TestParseYamlFrontmatter:
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
text = "---\ntitle: Test Claim\nsupports:\n - claim-a\n---\nBody text here."
|
text = "---\ntitle: Test Claim\nsupports:\n- claim-a\n---\nBody text here."
|
||||||
fm, body = _parse_yaml_frontmatter(text)
|
fm, raw, body = _parse_yaml_frontmatter(text)
|
||||||
assert fm is not None
|
assert fm is not None
|
||||||
assert fm["title"] == "Test Claim"
|
assert fm["title"] == "Test Claim"
|
||||||
assert fm["supports"] == ["claim-a"]
|
assert fm["supports"] == ["claim-a"]
|
||||||
assert body.startswith("\n---")
|
assert body.startswith("\n---")
|
||||||
|
assert "title: Test Claim" in raw
|
||||||
|
|
||||||
def test_no_frontmatter(self):
|
def test_no_frontmatter(self):
|
||||||
text = "Just plain text"
|
text = "Just plain text"
|
||||||
fm, body = _parse_yaml_frontmatter(text)
|
fm, raw, body = _parse_yaml_frontmatter(text)
|
||||||
assert fm is None
|
assert fm is None
|
||||||
|
assert raw == ""
|
||||||
assert body == text
|
assert body == text
|
||||||
|
|
||||||
def test_malformed_yaml(self):
|
def test_malformed_yaml(self):
|
||||||
text = "---\n: invalid: yaml: {{{\n---\nBody"
|
text = "---\n: invalid: yaml: {{{\n---\nBody"
|
||||||
fm, body = _parse_yaml_frontmatter(text)
|
fm, raw, body = _parse_yaml_frontmatter(text)
|
||||||
assert fm is None
|
assert fm is None
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -110,18 +155,90 @@ class TestUnionEdgeLists:
|
||||||
assert result == ["claim-a", "claim-b"]
|
assert result == ["claim-a", "claim-b"]
|
||||||
|
|
||||||
|
|
||||||
class TestSerializeFrontmatter:
|
class TestSerializeEdgeFields:
|
||||||
def test_roundtrip(self):
|
def test_replaces_existing_field(self):
|
||||||
fm = {"title": "Test", "supports": ["claim-a", "claim-b"]}
|
raw = "title: Test\nsupports:\n- old-claim"
|
||||||
body = "\n---\nBody text here."
|
merged = {"supports": ["old-claim", "new-claim"]}
|
||||||
text = _serialize_frontmatter(fm, body)
|
result = _serialize_edge_fields(raw, merged)
|
||||||
assert text.startswith("---\n")
|
assert "- old-claim" in result
|
||||||
assert "title: Test" in text
|
assert "- new-claim" in result
|
||||||
assert "Body text here." in text
|
assert "title: Test" in result
|
||||||
|
|
||||||
fm2, body2 = _parse_yaml_frontmatter(text)
|
def test_preserves_non_edge_fields_exactly(self):
|
||||||
assert fm2["title"] == "Test"
|
raw = "title: 'Quoted Title'\nconfidence: 0.85\ntype: claim"
|
||||||
assert fm2["supports"] == ["claim-a", "claim-b"]
|
merged = {"related": ["new-claim"]}
|
||||||
|
result = _serialize_edge_fields(raw, merged)
|
||||||
|
assert "title: 'Quoted Title'" in result
|
||||||
|
assert "confidence: 0.85" in result
|
||||||
|
assert "type: claim" in result
|
||||||
|
assert "related:" in result
|
||||||
|
assert "- new-claim" in result
|
||||||
|
|
||||||
|
def test_appends_new_field(self):
|
||||||
|
raw = "title: Test\ntype: claim"
|
||||||
|
merged = {"supports": ["claim-a"]}
|
||||||
|
result = _serialize_edge_fields(raw, merged)
|
||||||
|
assert "title: Test" in result
|
||||||
|
assert "supports:" in result
|
||||||
|
assert "- claim-a" in result
|
||||||
|
|
||||||
|
def test_empty_edges_removes_field(self):
|
||||||
|
raw = "title: Test\nsupports:\n- old-claim\ntype: claim"
|
||||||
|
merged = {} # no edges to write
|
||||||
|
result = _serialize_edge_fields(raw, merged)
|
||||||
|
assert "supports:" not in result
|
||||||
|
assert "title: Test" in result
|
||||||
|
assert "type: claim" in result
|
||||||
|
|
||||||
|
def test_multiple_edge_fields(self):
|
||||||
|
raw = "title: Test\nsupports:\n- a\nchallenges:\n- b"
|
||||||
|
merged = {"supports": ["a", "c"], "challenges": ["b", "d"]}
|
||||||
|
result = _serialize_edge_fields(raw, merged)
|
||||||
|
lines = result.split("\n")
|
||||||
|
# supports and challenges both present with merged values
|
||||||
|
assert "- a" in result
|
||||||
|
assert "- c" in result
|
||||||
|
assert "- b" in result
|
||||||
|
assert "- d" in result
|
||||||
|
|
||||||
|
|
||||||
|
class TestSerializeFrontmatter:
|
||||||
|
def test_roundtrip_preserves_formatting(self):
|
||||||
|
original = "---\ntitle: 'Quoted Title'\nconfidence: 0.85\nsupports:\n- claim-a\n---\nBody text here."
|
||||||
|
fm, raw, body = _parse_yaml_frontmatter(original)
|
||||||
|
merged_edges = {"supports": ["claim-a", "claim-b"]}
|
||||||
|
result = _serialize_frontmatter(raw, merged_edges, body)
|
||||||
|
|
||||||
|
# Non-edge fields preserved exactly
|
||||||
|
assert "title: 'Quoted Title'" in result
|
||||||
|
assert "confidence: 0.85" in result
|
||||||
|
# Edge fields updated
|
||||||
|
assert "- claim-a" in result
|
||||||
|
assert "- claim-b" in result
|
||||||
|
# Structure preserved
|
||||||
|
assert result.startswith("---\n")
|
||||||
|
assert "\n---\n" in result
|
||||||
|
assert result.endswith("Body text here.")
|
||||||
|
|
||||||
|
def test_no_blank_line_before_closing_delimiter(self):
|
||||||
|
"""Ganymede critical: no extra blank line compounds on repeat reweaves."""
|
||||||
|
original = "---\ntitle: Test\nsupports:\n- a\n---\nBody."
|
||||||
|
fm, raw, body = _parse_yaml_frontmatter(original)
|
||||||
|
merged_edges = {"supports": ["a", "b"]}
|
||||||
|
result = _serialize_frontmatter(raw, merged_edges, body)
|
||||||
|
# Should NOT have \n\n--- (double newline before closing)
|
||||||
|
assert "\n\n---" not in result
|
||||||
|
|
||||||
|
def test_repeated_serialize_no_drift(self):
|
||||||
|
"""Repeated serialization should be idempotent — no accumulating blank lines."""
|
||||||
|
text = "---\ntitle: Test\nsupports:\n- a\n---\nBody."
|
||||||
|
merged_edges = {"supports": ["a", "b"]}
|
||||||
|
|
||||||
|
for _ in range(5):
|
||||||
|
fm, raw, body = _parse_yaml_frontmatter(text)
|
||||||
|
text = _serialize_frontmatter(raw, merged_edges, body)
|
||||||
|
|
||||||
|
assert text.count("\n\n") == 0 # no double newlines anywhere
|
||||||
|
|
||||||
|
|
||||||
class TestSupersetDetection:
|
class TestSupersetDetection:
|
||||||
|
|
@ -155,22 +272,22 @@ class TestFullUnionWorkflow:
|
||||||
"""Main got new edges after branch was created. Union includes both."""
|
"""Main got new edges after branch was created. Union includes both."""
|
||||||
main_text = (
|
main_text = (
|
||||||
"---\ntitle: Test Claim\nconfidence: 0.8\n"
|
"---\ntitle: Test Claim\nconfidence: 0.8\n"
|
||||||
"supports:\n - claim-a\n - claim-b\n"
|
"supports:\n- claim-a\n- claim-b\n"
|
||||||
"related:\n - claim-x\n"
|
"related:\n- claim-x\n"
|
||||||
"---\nBody text."
|
"---\nBody text."
|
||||||
)
|
)
|
||||||
branch_text = (
|
branch_text = (
|
||||||
"---\ntitle: Test Claim\nconfidence: 0.8\n"
|
"---\ntitle: Test Claim\nconfidence: 0.8\n"
|
||||||
"supports:\n - claim-a\n"
|
"supports:\n- claim-a\n"
|
||||||
"related:\n - claim-x\n - claim-y\n"
|
"related:\n- claim-x\n- claim-y\n"
|
||||||
"reweave_edges:\n - \"claim-y|related|2026-04-04\"\n"
|
"reweave_edges:\n- \"claim-y|related|2026-04-04\"\n"
|
||||||
"---\nBody text."
|
"---\nBody text."
|
||||||
)
|
)
|
||||||
|
|
||||||
main_fm, main_body = _parse_yaml_frontmatter(main_text)
|
main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text)
|
||||||
branch_fm, _ = _parse_yaml_frontmatter(branch_text)
|
branch_fm, _, _ = _parse_yaml_frontmatter(branch_text)
|
||||||
|
|
||||||
merged_fm = dict(main_fm)
|
merged_edges = {}
|
||||||
for field in REWEAVE_EDGE_FIELDS:
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
main_list = main_fm.get(field, [])
|
main_list = main_fm.get(field, [])
|
||||||
branch_list = branch_fm.get(field, [])
|
branch_list = branch_fm.get(field, [])
|
||||||
|
|
@ -179,23 +296,27 @@ class TestFullUnionWorkflow:
|
||||||
if not isinstance(branch_list, list):
|
if not isinstance(branch_list, list):
|
||||||
branch_list = [branch_list] if branch_list else []
|
branch_list = [branch_list] if branch_list else []
|
||||||
if main_list or branch_list:
|
if main_list or branch_list:
|
||||||
merged_fm[field] = _union_edge_lists(main_list, branch_list)
|
merged_edges[field] = _union_edge_lists(main_list, branch_list)
|
||||||
|
|
||||||
assert merged_fm["supports"] == ["claim-a", "claim-b"]
|
assert merged_edges["supports"] == ["claim-a", "claim-b"]
|
||||||
assert "claim-x" in merged_fm["related"]
|
assert "claim-x" in merged_edges["related"]
|
||||||
assert "claim-y" in merged_fm["related"]
|
assert "claim-y" in merged_edges["related"]
|
||||||
assert len(merged_fm.get("reweave_edges", [])) == 1
|
assert len(merged_edges.get("reweave_edges", [])) == 1
|
||||||
assert merged_fm["confidence"] == 0.8
|
|
||||||
|
# Verify non-edge fields preserved in serialization
|
||||||
|
result = _serialize_frontmatter(main_raw, merged_edges, main_body)
|
||||||
|
assert "confidence: 0.8" in result
|
||||||
|
assert "title: Test Claim" in result
|
||||||
|
|
||||||
def test_no_edge_fields_untouched(self):
|
def test_no_edge_fields_untouched(self):
|
||||||
"""Non-edge fields (title, confidence, type) come from main unchanged."""
|
"""Non-edge fields (title, confidence, type) come from main unchanged."""
|
||||||
main_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\n---\nBody."
|
main_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\n---\nBody."
|
||||||
branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n - new-claim\n---\nBody."
|
branch_text = "---\ntitle: Original\nconfidence: 0.9\ntype: claim\nrelated:\n- new-claim\n---\nBody."
|
||||||
|
|
||||||
main_fm, main_body = _parse_yaml_frontmatter(main_text)
|
main_fm, main_raw, main_body = _parse_yaml_frontmatter(main_text)
|
||||||
branch_fm, _ = _parse_yaml_frontmatter(branch_text)
|
branch_fm, _, _ = _parse_yaml_frontmatter(branch_text)
|
||||||
|
|
||||||
merged_fm = dict(main_fm)
|
merged_edges = {}
|
||||||
for field in REWEAVE_EDGE_FIELDS:
|
for field in REWEAVE_EDGE_FIELDS:
|
||||||
main_list = main_fm.get(field, [])
|
main_list = main_fm.get(field, [])
|
||||||
branch_list = branch_fm.get(field, [])
|
branch_list = branch_fm.get(field, [])
|
||||||
|
|
@ -204,12 +325,13 @@ class TestFullUnionWorkflow:
|
||||||
if not isinstance(branch_list, list):
|
if not isinstance(branch_list, list):
|
||||||
branch_list = [branch_list] if branch_list else []
|
branch_list = [branch_list] if branch_list else []
|
||||||
if main_list or branch_list:
|
if main_list or branch_list:
|
||||||
merged_fm[field] = _union_edge_lists(main_list, branch_list)
|
merged_edges[field] = _union_edge_lists(main_list, branch_list)
|
||||||
|
|
||||||
assert merged_fm["title"] == "Original"
|
result = _serialize_frontmatter(main_raw, merged_edges, main_body)
|
||||||
assert merged_fm["confidence"] == 0.9
|
assert "title: Original" in result
|
||||||
assert merged_fm["type"] == "claim"
|
assert "confidence: 0.9" in result
|
||||||
assert merged_fm["related"] == ["new-claim"]
|
assert "type: claim" in result
|
||||||
|
assert "- new-claim" in result
|
||||||
|
|
||||||
def test_scalar_edge_field_converted_to_list(self):
|
def test_scalar_edge_field_converted_to_list(self):
|
||||||
"""Edge fields stored as scalars (not lists) are handled gracefully."""
|
"""Edge fields stored as scalars (not lists) are handled gracefully."""
|
||||||
|
|
@ -225,3 +347,16 @@ class TestFullUnionWorkflow:
|
||||||
|
|
||||||
result = _union_edge_lists(main_list, branch_list)
|
result = _union_edge_lists(main_list, branch_list)
|
||||||
assert result == ["single-claim", "new-claim"]
|
assert result == ["single-claim", "new-claim"]
|
||||||
|
|
||||||
|
def test_yaml_formatting_preserved_across_reweave(self):
|
||||||
|
"""The key test: non-edge YAML formatting stays byte-identical."""
|
||||||
|
# Use unusual but valid YAML formatting
|
||||||
|
main_text = "---\ntitle: 'A \"Quoted\" Title'\nconfidence: 0.85\nsome_custom_field: [1, 2, 3]\nsupports:\n- old-claim\n---\nBody."
|
||||||
|
fm, raw, body = _parse_yaml_frontmatter(main_text)
|
||||||
|
merged_edges = {"supports": ["old-claim", "new-claim"]}
|
||||||
|
result = _serialize_frontmatter(raw, merged_edges, body)
|
||||||
|
|
||||||
|
# These non-edge fields must be byte-identical to source
|
||||||
|
assert "title: 'A \"Quoted\" Title'" in result
|
||||||
|
assert "confidence: 0.85" in result
|
||||||
|
assert "some_custom_field: [1, 2, 3]" in result
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue