Wrote sourced_from: into 414 claim files pointing back to their origin source. Backfilled claims_extracted: into 252 source files that were processed but missing this field. Matching uses author+title overlap against claim source: field, validated against 296 known-good pairs from existing claims_extracted. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
462 lines
17 KiB
Python
462 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Bidirectional source↔claim linker.
|
|
|
|
Phase 1: Build index from sources that already have claims_extracted
|
|
Phase 2: For sources WITHOUT claims_extracted, fuzzy-match via claim source: field
|
|
Phase 3: Write sourced_from: into claim frontmatter (reverse link)
|
|
Phase 4: Backfill claims_extracted: into source frontmatter (forward link)
|
|
|
|
Usage:
|
|
python3 link-sources-claims.py --dry-run # report what would change
|
|
python3 link-sources-claims.py --apply # write changes
|
|
python3 link-sources-claims.py --validate # test against known-good pairs
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import yaml
|
|
import glob
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from difflib import SequenceMatcher
|
|
|
|
CODEX = Path("/Users/coryabdalla/Pentagon/teleo-codex")
|
|
ARCHIVE = CODEX / "inbox" / "archive"
|
|
DOMAINS = CODEX / "domains"
|
|
NULLRESULT = CODEX / "inbox" / "null-result"
|
|
|
|
def parse_frontmatter(filepath):
|
|
"""Extract YAML frontmatter from a markdown file."""
|
|
try:
|
|
text = filepath.read_text(encoding="utf-8")
|
|
except Exception:
|
|
return None, None
|
|
if not text.startswith("---"):
|
|
return None, text
|
|
parts = text.split("---", 2)
|
|
if len(parts) < 3:
|
|
return None, text
|
|
try:
|
|
fm = yaml.safe_load(parts[1])
|
|
body = parts[2]
|
|
return fm, body
|
|
except (yaml.YAMLError, ValueError, TypeError):
|
|
return None, text
|
|
|
|
def slugify(title):
|
|
"""Convert a claim title to its likely filename slug."""
|
|
s = title.lower().strip()
|
|
s = re.sub(r'[^\w\s-]', '', s)
|
|
s = re.sub(r'[\s_]+', '-', s)
|
|
s = re.sub(r'-+', '-', s)
|
|
return s.strip('-')
|
|
|
|
def title_to_slug_variants(title):
|
|
"""Generate filename variants for matching."""
|
|
base = slugify(title)
|
|
variants = [base, base + ".md"]
|
|
# Also try without common trailing words
|
|
return variants
|
|
|
|
def normalize_for_match(s):
|
|
"""Normalize string for fuzzy matching."""
|
|
s = s.lower().strip()
|
|
s = re.sub(r'[^\w\s]', '', s)
|
|
s = re.sub(r'\s+', ' ', s)
|
|
return s
|
|
|
|
def load_all_sources():
|
|
"""Load all source files with their metadata."""
|
|
sources = {}
|
|
for pattern in [
|
|
str(ARCHIVE / "**" / "*.md"),
|
|
str(CODEX / "inbox" / "*.md"),
|
|
]:
|
|
for filepath in glob.glob(pattern, recursive=True):
|
|
p = Path(filepath)
|
|
fm, body = parse_frontmatter(p)
|
|
if fm is None:
|
|
continue
|
|
rel = p.relative_to(CODEX)
|
|
ce = fm.get("claims_extracted", [])
|
|
if not isinstance(ce, list):
|
|
ce = []
|
|
sources[str(rel)] = {
|
|
"path": p,
|
|
"rel": str(rel),
|
|
"fm": fm,
|
|
"body": body or "",
|
|
"title": str(fm.get("title", "")),
|
|
"author": str(fm.get("author", "")),
|
|
"url": str(fm.get("url", "")),
|
|
"domain": str(fm.get("domain", "")),
|
|
"claims_extracted": ce,
|
|
"status": str(fm.get("status", "")),
|
|
}
|
|
return sources
|
|
|
|
def load_all_claims():
|
|
"""Load all claim files with their metadata."""
|
|
claims = {}
|
|
for filepath in glob.glob(str(DOMAINS / "**" / "*.md"), recursive=True):
|
|
p = Path(filepath)
|
|
if p.name.startswith("_") or p.name.startswith("divergence-"):
|
|
continue
|
|
fm, body = parse_frontmatter(p)
|
|
if fm is None:
|
|
continue
|
|
if fm.get("type") != "claim":
|
|
continue
|
|
rel = p.relative_to(CODEX)
|
|
claims[str(rel)] = {
|
|
"path": p,
|
|
"rel": str(rel),
|
|
"fm": fm,
|
|
"body": body or "",
|
|
"title": p.stem,
|
|
"source_field": fm.get("source", ""),
|
|
"domain": fm.get("domain", ""),
|
|
"sourced_from": fm.get("sourced_from", []) or [],
|
|
}
|
|
return claims
|
|
|
|
def build_known_index(sources, claims):
|
|
"""Build index from sources that already have claims_extracted."""
|
|
# Map claim title slugs to claim paths
|
|
claim_by_slug = {}
|
|
for cpath, claim in claims.items():
|
|
slug = claim["title"]
|
|
claim_by_slug[slug] = cpath
|
|
# Also index without .md
|
|
if slug.endswith(".md"):
|
|
claim_by_slug[slug[:-3]] = cpath
|
|
|
|
known_links = {} # claim_path -> [source_rel_path]
|
|
unresolved = []
|
|
|
|
for spath, source in sources.items():
|
|
for claim_ref in source["claims_extracted"]:
|
|
# claim_ref might be a title string or a filename
|
|
ref_slug = slugify(claim_ref.rstrip(".md"))
|
|
matched = None
|
|
# Try exact slug match
|
|
if ref_slug in claim_by_slug:
|
|
matched = claim_by_slug[ref_slug]
|
|
elif ref_slug + ".md" in claim_by_slug:
|
|
matched = claim_by_slug[ref_slug + ".md"]
|
|
else:
|
|
# Fuzzy match against claim filenames
|
|
best_score = 0
|
|
best_match = None
|
|
ref_norm = normalize_for_match(claim_ref)
|
|
for cslug, cpath in claim_by_slug.items():
|
|
cslug_norm = normalize_for_match(cslug)
|
|
score = SequenceMatcher(None, ref_norm, cslug_norm).ratio()
|
|
if score > best_score:
|
|
best_score = score
|
|
best_match = cpath
|
|
if best_score >= 0.85:
|
|
matched = best_match
|
|
|
|
if matched:
|
|
if matched not in known_links:
|
|
known_links[matched] = []
|
|
known_links[matched].append(spath)
|
|
else:
|
|
unresolved.append((spath, claim_ref))
|
|
|
|
return known_links, unresolved, claim_by_slug
|
|
|
|
def match_unlinked_sources(sources, claims, claim_by_slug):
|
|
"""For sources without claims_extracted, try to match via claim source: field."""
|
|
unlinked_sources = {k: v for k, v in sources.items()
|
|
if not v["claims_extracted"] and v["status"] in ("processed", "enrichment")}
|
|
|
|
# Build author last-name index for sources
|
|
# Key: normalized last name or handle -> [source paths]
|
|
author_index = defaultdict(list)
|
|
for spath, source in unlinked_sources.items():
|
|
author = source.get("author", "")
|
|
if not author:
|
|
continue
|
|
# Extract meaningful name parts (last names, handles)
|
|
for part in re.split(r'[,;&()\[\]]', author):
|
|
part = part.strip().strip('"').strip("'")
|
|
words = part.split()
|
|
for w in words:
|
|
w_clean = re.sub(r'[^\w]', '', w).lower()
|
|
if len(w_clean) >= 4 and w_clean not in (
|
|
"analysis", "research", "report", "paper", "journal",
|
|
"multiple", "authors", "various", "company", "team",
|
|
"university", "institute", "foundation", "network",
|
|
"open", "source", "national", "international",
|
|
):
|
|
author_index[w_clean].append(spath)
|
|
|
|
inferred_forward = defaultdict(list) # source_path -> [match dicts]
|
|
inferred_reverse = defaultdict(list) # claim_path -> [match dicts]
|
|
|
|
for cpath, claim in claims.items():
|
|
source_field = str(claim.get("source_field", ""))
|
|
if not source_field:
|
|
continue
|
|
|
|
source_field_norm = normalize_for_match(source_field)
|
|
claim_domain = claim.get("domain", "")
|
|
|
|
# Find candidate sources by author mention in source field
|
|
candidates = set()
|
|
for author_key, spaths in author_index.items():
|
|
if author_key in source_field_norm:
|
|
candidates.update(spaths)
|
|
|
|
# Also check domain-matched sources (but require stronger evidence)
|
|
for spath, source in unlinked_sources.items():
|
|
if source.get("domain") == claim_domain and spath not in candidates:
|
|
candidates.add(spath)
|
|
|
|
for spath in candidates:
|
|
source = unlinked_sources[spath]
|
|
score = 0
|
|
reasons = []
|
|
|
|
author = source.get("author", "")
|
|
title = source.get("title", "")
|
|
s_domain = source.get("domain", "")
|
|
|
|
# Strong signal: author last name in claim's source field
|
|
author_matched = False
|
|
if author:
|
|
# Extract significant name parts
|
|
name_parts = []
|
|
for part in re.split(r'[,;&()\[\]]', author):
|
|
part = part.strip().strip('"').strip("'")
|
|
words = part.split()
|
|
for w in words:
|
|
w_clean = re.sub(r'[^\w]', '', w).lower()
|
|
if len(w_clean) >= 4:
|
|
name_parts.append(w_clean)
|
|
|
|
matched_names = [n for n in name_parts if n in source_field_norm]
|
|
if matched_names:
|
|
author_matched = True
|
|
score += 0.4
|
|
reasons.append(f"author:{','.join(matched_names[:2])}")
|
|
|
|
# Strong signal: source title keywords in claim's source field
|
|
title_matched = False
|
|
if title:
|
|
title_words = [w for w in normalize_for_match(title).split()
|
|
if len(w) >= 5 and w not in (
|
|
"about", "their", "these", "those", "which",
|
|
"would", "could", "should", "being", "having",
|
|
"through", "between", "during", "before", "after",
|
|
)]
|
|
if title_words:
|
|
hits = sum(1 for w in title_words if w in source_field_norm)
|
|
ratio = hits / len(title_words)
|
|
if ratio >= 0.4:
|
|
title_matched = True
|
|
score += ratio * 0.5
|
|
reasons.append(f"title:{ratio:.0%}({hits}/{len(title_words)})")
|
|
|
|
# Require BOTH author AND title match
|
|
# Author alone is too noisy (one author has many sources)
|
|
# Domain alone adds nothing meaningful
|
|
if not (author_matched and title_matched):
|
|
continue
|
|
|
|
if score >= 0.6:
|
|
match_info = {
|
|
"claim_path": cpath,
|
|
"claim_title": claim["title"],
|
|
"source_path": spath,
|
|
"score": score,
|
|
"reasons": reasons,
|
|
}
|
|
inferred_forward[spath].append(match_info)
|
|
inferred_reverse[cpath].append(match_info)
|
|
|
|
return dict(inferred_forward), dict(inferred_reverse)
|
|
|
|
def write_sourced_from(claim_path, source_paths, dry_run=True):
|
|
"""Add sourced_from: field to claim frontmatter."""
|
|
p = Path(claim_path) if not isinstance(claim_path, Path) else claim_path
|
|
if not p.is_absolute():
|
|
p = CODEX / p
|
|
text = p.read_text(encoding="utf-8")
|
|
|
|
if "sourced_from:" in text:
|
|
return False # already has it
|
|
|
|
# Insert sourced_from after source: line
|
|
lines = text.split("\n")
|
|
insert_idx = None
|
|
in_frontmatter = False
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "---":
|
|
if not in_frontmatter:
|
|
in_frontmatter = True
|
|
continue
|
|
else:
|
|
# End of frontmatter — insert before closing ---
|
|
insert_idx = i
|
|
break
|
|
if in_frontmatter and line.startswith("source:"):
|
|
insert_idx = i + 1
|
|
# Skip any continuation lines
|
|
while insert_idx < len(lines) and lines[insert_idx].startswith(" "):
|
|
insert_idx += 1
|
|
|
|
if insert_idx is None:
|
|
return False
|
|
|
|
# Build sourced_from block
|
|
sf_lines = ["sourced_from:"]
|
|
for sp in source_paths:
|
|
sf_lines.append(f"- {sp}")
|
|
|
|
lines[insert_idx:insert_idx] = sf_lines
|
|
|
|
if not dry_run:
|
|
p.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
return True
|
|
|
|
def write_claims_extracted(source_path, claim_titles, dry_run=True):
|
|
"""Add claims_extracted: field to source frontmatter."""
|
|
p = Path(source_path) if not isinstance(source_path, Path) else source_path
|
|
if not p.is_absolute():
|
|
p = CODEX / p
|
|
text = p.read_text(encoding="utf-8")
|
|
|
|
if "claims_extracted:" in text:
|
|
return False # already has it
|
|
|
|
lines = text.split("\n")
|
|
insert_idx = None
|
|
in_frontmatter = False
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "---":
|
|
if not in_frontmatter:
|
|
in_frontmatter = True
|
|
continue
|
|
else:
|
|
insert_idx = i
|
|
break
|
|
|
|
if insert_idx is None:
|
|
return False
|
|
|
|
ce_lines = ["claims_extracted:"]
|
|
for title in claim_titles:
|
|
safe_title = title.replace('"', '\\"')
|
|
ce_lines.append(f'- "{safe_title}"')
|
|
|
|
lines[insert_idx:insert_idx] = ce_lines
|
|
|
|
if not dry_run:
|
|
p.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
return True
|
|
|
|
def main():
|
|
mode = sys.argv[1] if len(sys.argv) > 1 else "--dry-run"
|
|
|
|
print("Loading sources...")
|
|
sources = load_all_sources()
|
|
print(f" {len(sources)} source files")
|
|
|
|
print("Loading claims...")
|
|
claims = load_all_claims()
|
|
print(f" {len(claims)} claim files")
|
|
|
|
print("\nPhase 1: Building known index from claims_extracted...")
|
|
known_links, unresolved, claim_by_slug = build_known_index(sources, claims)
|
|
print(f" {len(known_links)} claims linked to sources via claims_extracted")
|
|
print(f" {len(unresolved)} unresolved references in claims_extracted")
|
|
|
|
if mode == "--validate":
|
|
print("\n=== VALIDATION MODE ===")
|
|
print(f"Known links: {len(known_links)} claims -> sources")
|
|
for cpath, spaths in sorted(known_links.items())[:20]:
|
|
print(f" {Path(cpath).stem[:60]}...")
|
|
for sp in spaths:
|
|
print(f" <- {sp}")
|
|
print(f"\nUnresolved ({len(unresolved)}):")
|
|
for spath, ref in unresolved[:20]:
|
|
print(f" {Path(spath).stem[:40]}: {ref[:60]}")
|
|
return
|
|
|
|
print("\nPhase 2: Matching unlinked sources via claim source: field...")
|
|
inferred_fwd, inferred_rev = match_unlinked_sources(sources, claims, claim_by_slug)
|
|
print(f" {len(inferred_fwd)} sources matched to claims")
|
|
print(f" {len(inferred_rev)} claims matched to sources")
|
|
|
|
# Merge known + inferred reverse links
|
|
all_reverse = defaultdict(list)
|
|
for cpath, spaths in known_links.items():
|
|
all_reverse[cpath].extend(spaths)
|
|
for cpath, matches in inferred_rev.items():
|
|
for m in matches:
|
|
if m["source_path"] not in all_reverse[cpath]:
|
|
all_reverse[cpath].append(m["source_path"])
|
|
|
|
# Merge known + inferred forward links
|
|
all_forward = defaultdict(list)
|
|
for spath, source in sources.items():
|
|
for claim_ref in source["claims_extracted"]:
|
|
ref_slug = slugify(claim_ref.rstrip(".md"))
|
|
if ref_slug in claim_by_slug:
|
|
all_forward[spath].append(claims[claim_by_slug[ref_slug]]["title"])
|
|
for spath, matches in inferred_fwd.items():
|
|
for m in matches:
|
|
title = m["claim_title"]
|
|
if title not in all_forward[spath]:
|
|
all_forward[spath].append(title)
|
|
|
|
print(f"\nTotal: {len(all_reverse)} claims with source links")
|
|
print(f"Total: {len(all_forward)} sources with claim links")
|
|
|
|
if mode == "--dry-run":
|
|
print("\n=== DRY RUN — no files modified ===")
|
|
print(f"\nWould write sourced_from: to {len(all_reverse)} claim files")
|
|
print(f"Would write claims_extracted: to {len([k for k in all_forward if k not in {s for s in sources if sources[s]['claims_extracted']}])} source files")
|
|
|
|
# Show samples
|
|
print("\nSample reverse links (claim -> source):")
|
|
for cpath, spaths in sorted(all_reverse.items())[:10]:
|
|
print(f" {Path(cpath).stem[:60]}")
|
|
for sp in spaths:
|
|
print(f" <- {sp}")
|
|
|
|
print("\nSample inferred forward links (source -> claims):")
|
|
for spath, matches in sorted(inferred_fwd.items())[:10]:
|
|
print(f" {Path(spath).stem[:50]} (score={matches[0]['score']:.2f})")
|
|
for m in matches[:3]:
|
|
print(f" -> {m['claim_title'][:60]} ({', '.join(m['reasons'])})")
|
|
|
|
elif mode == "--apply":
|
|
print("\n=== APPLYING CHANGES ===")
|
|
|
|
# Write sourced_from to claims
|
|
wrote_reverse = 0
|
|
for cpath, spaths in all_reverse.items():
|
|
if write_sourced_from(cpath, spaths, dry_run=False):
|
|
wrote_reverse += 1
|
|
|
|
# Write claims_extracted to sources (only unlinked ones)
|
|
wrote_forward = 0
|
|
for spath, titles in all_forward.items():
|
|
if sources[spath]["claims_extracted"]:
|
|
continue # already has it
|
|
if write_claims_extracted(spath, titles, dry_run=False):
|
|
wrote_forward += 1
|
|
|
|
print(f" Wrote sourced_from: to {wrote_reverse} claims")
|
|
print(f" Wrote claims_extracted: to {wrote_forward} sources")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|