fix: enrichment idempotency — three-layer dedup prevents duplicate evidence blocks

Layer 1: Insertion-time dedup in openrouter-extract-v2.py — skip if source_slug
already appears in claim content.
Layer 2: Insertion-time dedup in entity_batch.py — skip if PR number already
enriched this claim.
Layer 3: Post-rebase dedup in merge.py — scan rebased files for duplicate
evidence blocks (same source reference) and remove them before force-push.

Root cause: multiple enrichment branches modify the same claim at the same
insertion point. When rebased sequentially, evidence blocks are duplicated.
(Leo: PRs #1751, #1752)

lib/dedup.py: standalone module — parses evidence headers, deduplicates by
source key, preserves trailing content (Relevant Notes, Topics sections).
9 tests covering all patterns including the real PR #1751 duplication case.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-03-31 12:05:20 +01:00
parent ad48d7384e
commit f43f8f923f
5 changed files with 334 additions and 0 deletions

113
lib/dedup.py Normal file
View file

@ -0,0 +1,113 @@
"""Evidence block deduplication for enrichment idempotency.
Removes duplicate '### Additional Evidence' and '### Auto-enrichment' blocks
that arise from rebase of enrichment branches. (Leo: PRs #1751, #1752)
"""
import logging
import re
logger = logging.getLogger("pipeline.dedup")
# Matches start of an evidence block header
_EVIDENCE_HEADER = re.compile(
r'^### (?:Additional Evidence|Auto-enrichment) \(',
re.MULTILINE,
)
# Extracts source key from the *Source: ...* line
_SOURCE_LINE = re.compile(r'^\*Source: (.+)\*', re.MULTILINE)
def dedup_evidence_blocks(content: str) -> str:
"""Remove duplicate evidence blocks from a claim file.
After rebase, two enrichment branches can produce duplicate
evidence blocks with the same source reference. Keeps the first
occurrence of each source, removes subsequent duplicates.
"""
# Find all evidence block start positions
headers = list(_EVIDENCE_HEADER.finditer(content))
if len(headers) < 2:
return content
# Parse each block: find its extent and source key
blocks = [] # (start, end, source_key)
for i, hdr in enumerate(headers):
block_start = hdr.start()
# Block extends to just before the next evidence header
# (or to end of file for the last block).
# But we need to be careful: content after the last evidence
# block that ISN'T evidence (Relevant Notes, ---, etc.) should
# NOT be considered part of the block.
if i + 1 < len(headers):
block_end = headers[i + 1].start()
else:
# Last block: find where evidence content ends.
# Look for the next non-evidence section marker after the
# source line and evidence body.
rest = content[block_start:]
# Find end of this evidence block's text by looking for
# a section boundary: ---, ## heading, Relevant Notes, Topics
# Skip the first line (the ### header itself)
lines = rest.split("\n")
end_offset = len(rest)
past_source = False
past_body = False
line_pos = 0
for j, line in enumerate(lines):
if j == 0:
line_pos += len(line) + 1
continue
if line.startswith("*Source:"):
past_source = True
line_pos += len(line) + 1
continue
if past_source and line.strip() == "":
# Blank line after source — start of body
line_pos += len(line) + 1
continue
if past_source and line.strip():
past_body = True
# After we've seen body content, a blank line followed by
# a section marker means the block is done
if past_body and (
line.startswith("---")
or line.startswith("## ")
or line.startswith("### ") # next evidence or other heading
or re.match(r'^(?:Relevant Notes|Topics)\s*:?', line)
):
end_offset = line_pos
break
line_pos += len(line) + 1
block_end = block_start + end_offset
# Extract source key
block_text = content[block_start:block_end]
src_match = _SOURCE_LINE.search(block_text)
source_key = src_match.group(1).strip() if src_match else f"_unknown_{i}"
blocks.append((block_start, block_end, source_key))
# Now rebuild content, skipping duplicate sources
seen: set[str] = set()
result_parts = [content[:blocks[0][0]]]
removed = 0
for start, end, source_key in blocks:
if source_key in seen:
removed += 1
continue
seen.add(source_key)
result_parts.append(content[start:end])
# Append any content after the last block
last_end = blocks[-1][1]
if last_end < len(content):
result_parts.append(content[last_end:])
if removed > 0:
logger.info("Deduped %d duplicate evidence block(s)", removed)
return "".join(result_parts)

View file

@ -107,6 +107,10 @@ def _apply_claim_enrichment(claim_path: str, evidence: str, pr_number: int,
if not content:
return False, f"target claim empty: {claim_path}"
# Dedup: skip if this PR already enriched this claim (idempotency)
if f"PR #{pr_number}" in content:
return False, f"already enriched by PR #{pr_number}"
enrichment_block = (
f"\n\n### Auto-enrichment (near-duplicate conversion, similarity={similarity:.2f})\n"
f"*Source: PR #{pr_number}\"{original_title}\"*\n"

View file

@ -15,11 +15,13 @@ import json
import logging
import os
import random
import re
import shutil
from collections import defaultdict
from . import config, db
from .db import classify_branch
from .dedup import dedup_evidence_blocks
from .domains import detect_domain_from_branch
from .forgejo import api as forgejo_api
@ -200,6 +202,51 @@ async def _claim_next_pr(conn, domain: str) -> dict | None:
return dict(row) if row else None
async def _dedup_enriched_files(worktree_path: str) -> int:
"""Scan rebased worktree for duplicate evidence blocks and dedup them.
Returns count of files fixed.
"""
# Get list of modified claim files in this branch vs origin/main
rc, out = await _git("diff", "--name-only", "origin/main..HEAD", cwd=worktree_path)
if rc != 0:
return 0
fixed = 0
for fpath in out.strip().split("\n"):
fpath = fpath.strip()
if not fpath or not fpath.endswith(".md"):
continue
# Only process claim files (domains/, core/, foundations/)
if not any(fpath.startswith(p) for p in ("domains/", "core/", "foundations/")):
continue
full_path = os.path.join(worktree_path, fpath)
if not os.path.exists(full_path):
continue
with open(full_path, "r") as f:
content = f.read()
deduped = dedup_evidence_blocks(content)
if deduped != content:
with open(full_path, "w") as f:
f.write(deduped)
# Stage the fix
await _git("add", fpath, cwd=worktree_path)
fixed += 1
if fixed > 0:
# Amend the last commit to include dedup fixes (no new commit)
await _git(
"-c", "core.editor=true", "commit", "--amend", "--no-edit",
cwd=worktree_path, timeout=30,
)
logger.info("Deduped evidence blocks in %d file(s) after rebase", fixed)
return fixed
async def _rebase_and_push(branch: str) -> tuple[bool, str]:
"""Rebase branch onto main and force-push with pinned SHA.
@ -275,6 +322,11 @@ async def _rebase_and_push(branch: str) -> tuple[bool, str]:
await _git("rebase", "--abort", cwd=worktree_path)
return False, f"rebase conflict: {out}"
# Post-rebase dedup: remove duplicate evidence blocks created by
# rebasing enrichment branches onto main that already has overlapping
# enrichments. (Leo: root cause of PRs #1751, #1752)
await _dedup_enriched_files(worktree_path)
# Force-push with pinned SHA (Ganymede: defeats tracking-ref update race)
rc, out = await _git(
"push",

View file

@ -489,6 +489,12 @@ def main():
existing_content = read_file(target_path)
source_slug = os.path.basename(args.source_file).replace(".md", "")
# Dedup: skip if this source already enriched this claim (idempotency)
if f"[[{source_slug}]]" in existing_content:
print(f" SKIP: {target} already enriched by {source_slug}")
continue
enrichment_block = (
f"\n\n### Additional Evidence ({enr_type})\n"
f"*Source: [[{source_slug}]] | Added: {date.today().isoformat()}*\n\n"

View file

@ -0,0 +1,159 @@
"""Tests for enrichment idempotency — dedup at insertion and post-rebase."""
import os
import tempfile
import pytest
# ─── Unit tests for dedup_evidence_blocks ────────────────────────────────
from lib.dedup import dedup_evidence_blocks
class TestDedupEvidenceBlocks:
"""Test the post-rebase evidence block deduplication."""
def test_no_blocks_unchanged(self):
content = "---\ntype: claim\n---\n\nSome claim body.\n"
assert dedup_evidence_blocks(content) == content
def test_single_block_unchanged(self):
content = (
"---\ntype: claim\n---\n\nClaim body.\n\n"
"### Additional Evidence (extend)\n"
"*Source: [[some-source-2026-03-19]] | Added: 2026-03-19*\n\n"
"Evidence text here.\n"
)
assert dedup_evidence_blocks(content) == content
def test_duplicate_blocks_removed(self):
"""Two evidence blocks from the same source — second is removed."""
block = (
"\n\n### Additional Evidence (extend)\n"
"*Source: [[interlune-he3-quantum-demand]] | Added: 2026-03-19*\n\n"
"Some evidence text.\n"
)
content = f"---\ntype: claim\n---\n\nClaim body.{block}{block}\nRelevant Notes:\n"
result = dedup_evidence_blocks(content)
# Should contain exactly one occurrence
assert result.count("[[interlune-he3-quantum-demand]]") == 1
assert "Relevant Notes:" in result
def test_different_sources_kept(self):
"""Two evidence blocks from different sources — both kept."""
block1 = (
"\n\n### Additional Evidence (extend)\n"
"*Source: [[source-a]] | Added: 2026-03-19*\n\n"
"Evidence A.\n"
)
block2 = (
"\n\n### Additional Evidence (challenge)\n"
"*Source: [[source-b]] | Added: 2026-03-20*\n\n"
"Evidence B.\n"
)
content = f"---\ntype: claim\n---\n\nClaim body.{block1}{block2}"
result = dedup_evidence_blocks(content)
assert "[[source-a]]" in result
assert "[[source-b]]" in result
def test_auto_enrichment_dedup(self):
"""Duplicate auto-enrichment blocks from substantive fixer."""
block = (
"\n\n### Auto-enrichment (near-duplicate conversion, similarity=0.92)\n"
"*Source: PR #1234 — \"Some duplicate claim\"*\n\n"
"Converted evidence.\n"
)
content = f"---\ntype: claim\n---\n\nBody.{block}{block}"
result = dedup_evidence_blocks(content)
assert result.count("PR #1234") == 1
def test_mixed_types_dedup(self):
"""Same source appears in both Additional Evidence and Auto-enrichment."""
block1 = (
"\n\n### Additional Evidence (extend)\n"
"*Source: [[my-source]] | Added: 2026-03-19*\n\n"
"First version of evidence.\n"
)
block2 = (
"\n\n### Additional Evidence (extend)\n"
"*Source: [[my-source]] | Added: 2026-03-19*\n\n"
"Second version of evidence (rebase duplicate).\n"
)
content = f"---\ntype: claim\n---\n\nBody.{block1}{block2}"
result = dedup_evidence_blocks(content)
assert result.count("[[my-source]]") == 1
# First occurrence kept
assert "First version" in result
assert "Second version" not in result
def test_real_pr1751_pattern(self):
"""Reproduce the actual PR #1751 duplicate pattern from space-development."""
content = (
"---\ntype: claim\ndomain: space-development\n---\n\n"
"Claim about SpaceX vertical integration.\n\n"
"### Additional Evidence (extend)\n"
"*Source: [[2026-03-00-commercial-stations-haven1-slip-orbital-reef-delays]] | Added: 2026-03-19*\n\n"
"Orbital Reef multi-party structure experiencing delays.\n\n"
"### Additional Evidence (extend)\n"
"*Source: [[2026-03-00-commercial-stations-haven1-slip-orbital-reef-delays]] | Added: 2026-03-19*\n\n"
"Orbital Reef multi-party structure experiencing delays (duplicate from rebase).\n\n"
"---\n\n"
"### Additional Evidence (extend)\n"
"*Source: [[2026-03-19-space-com-starship-v3-first-static-fire]] | Added: 2026-03-24*\n\n"
"V3 Starship static fire completed.\n"
)
result = dedup_evidence_blocks(content)
assert result.count("[[2026-03-00-commercial-stations-haven1-slip-orbital-reef-delays]]") == 1
assert result.count("[[2026-03-19-space-com-starship-v3-first-static-fire]]") == 1
# ─── Insertion-time dedup tests ──────────────────────────────────────────
class TestInsertionDedup:
"""Test that enrichment insertion skips already-enriched claims."""
def test_entity_batch_dedup(self):
"""_apply_claim_enrichment skips if PR already enriched the claim."""
from lib.entity_batch import _apply_claim_enrichment
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
f.write(
"---\ntype: claim\n---\n\nClaim body.\n\n"
"### Auto-enrichment (near-duplicate conversion, similarity=0.90)\n"
"*Source: PR #100 — \"Existing enrichment\"*\n\n"
"Already enriched evidence.\n"
)
f.flush()
path = f.name
try:
ok, msg = _apply_claim_enrichment(path, "New evidence", 100, "Duplicate", 0.91)
assert not ok
assert "already enriched" in msg
# Different PR should succeed
ok2, msg2 = _apply_claim_enrichment(path, "New evidence", 200, "Different PR", 0.88)
assert ok2
finally:
os.unlink(path)
def test_entity_batch_first_enrichment_succeeds(self):
"""First enrichment of a claim by a PR should succeed."""
from lib.entity_batch import _apply_claim_enrichment
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
f.write("---\ntype: claim\n---\n\nClaim body with no enrichments yet.\n")
f.flush()
path = f.name
try:
ok, msg = _apply_claim_enrichment(path, "New evidence", 500, "First enrichment", 0.92)
assert ok
with open(path) as rf:
content = rf.read()
assert "PR #500" in content
finally:
os.unlink(path)