teleo-codex/ops/link-sources-claims.py
m3taversal be8ff41bfe link: bidirectional source↔claim index — 414 claims + 252 sources connected
Wrote sourced_from: into 414 claim files pointing back to their origin source.
Backfilled claims_extracted: into 252 source files that were processed but
missing this field. Matching uses author+title overlap against claim source:
field, validated against 296 known-good pairs from existing claims_extracted.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 11:55:18 +01:00

462 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Bidirectional source↔claim linker.
Phase 1: Build index from sources that already have claims_extracted
Phase 2: For sources WITHOUT claims_extracted, fuzzy-match via claim source: field
Phase 3: Write sourced_from: into claim frontmatter (reverse link)
Phase 4: Backfill claims_extracted: into source frontmatter (forward link)
Usage:
python3 link-sources-claims.py --dry-run # report what would change
python3 link-sources-claims.py --apply # write changes
python3 link-sources-claims.py --validate # test against known-good pairs
"""
import os
import re
import sys
import yaml
import glob
from pathlib import Path
from collections import defaultdict
from difflib import SequenceMatcher
CODEX = Path("/Users/coryabdalla/Pentagon/teleo-codex")
ARCHIVE = CODEX / "inbox" / "archive"
DOMAINS = CODEX / "domains"
NULLRESULT = CODEX / "inbox" / "null-result"
def parse_frontmatter(filepath):
"""Extract YAML frontmatter from a markdown file."""
try:
text = filepath.read_text(encoding="utf-8")
except Exception:
return None, None
if not text.startswith("---"):
return None, text
parts = text.split("---", 2)
if len(parts) < 3:
return None, text
try:
fm = yaml.safe_load(parts[1])
body = parts[2]
return fm, body
except (yaml.YAMLError, ValueError, TypeError):
return None, text
def slugify(title):
"""Convert a claim title to its likely filename slug."""
s = title.lower().strip()
s = re.sub(r'[^\w\s-]', '', s)
s = re.sub(r'[\s_]+', '-', s)
s = re.sub(r'-+', '-', s)
return s.strip('-')
def title_to_slug_variants(title):
"""Generate filename variants for matching."""
base = slugify(title)
variants = [base, base + ".md"]
# Also try without common trailing words
return variants
def normalize_for_match(s):
"""Normalize string for fuzzy matching."""
s = s.lower().strip()
s = re.sub(r'[^\w\s]', '', s)
s = re.sub(r'\s+', ' ', s)
return s
def load_all_sources():
"""Load all source files with their metadata."""
sources = {}
for pattern in [
str(ARCHIVE / "**" / "*.md"),
str(CODEX / "inbox" / "*.md"),
]:
for filepath in glob.glob(pattern, recursive=True):
p = Path(filepath)
fm, body = parse_frontmatter(p)
if fm is None:
continue
rel = p.relative_to(CODEX)
ce = fm.get("claims_extracted", [])
if not isinstance(ce, list):
ce = []
sources[str(rel)] = {
"path": p,
"rel": str(rel),
"fm": fm,
"body": body or "",
"title": str(fm.get("title", "")),
"author": str(fm.get("author", "")),
"url": str(fm.get("url", "")),
"domain": str(fm.get("domain", "")),
"claims_extracted": ce,
"status": str(fm.get("status", "")),
}
return sources
def load_all_claims():
"""Load all claim files with their metadata."""
claims = {}
for filepath in glob.glob(str(DOMAINS / "**" / "*.md"), recursive=True):
p = Path(filepath)
if p.name.startswith("_") or p.name.startswith("divergence-"):
continue
fm, body = parse_frontmatter(p)
if fm is None:
continue
if fm.get("type") != "claim":
continue
rel = p.relative_to(CODEX)
claims[str(rel)] = {
"path": p,
"rel": str(rel),
"fm": fm,
"body": body or "",
"title": p.stem,
"source_field": fm.get("source", ""),
"domain": fm.get("domain", ""),
"sourced_from": fm.get("sourced_from", []) or [],
}
return claims
def build_known_index(sources, claims):
"""Build index from sources that already have claims_extracted."""
# Map claim title slugs to claim paths
claim_by_slug = {}
for cpath, claim in claims.items():
slug = claim["title"]
claim_by_slug[slug] = cpath
# Also index without .md
if slug.endswith(".md"):
claim_by_slug[slug[:-3]] = cpath
known_links = {} # claim_path -> [source_rel_path]
unresolved = []
for spath, source in sources.items():
for claim_ref in source["claims_extracted"]:
# claim_ref might be a title string or a filename
ref_slug = slugify(claim_ref.rstrip(".md"))
matched = None
# Try exact slug match
if ref_slug in claim_by_slug:
matched = claim_by_slug[ref_slug]
elif ref_slug + ".md" in claim_by_slug:
matched = claim_by_slug[ref_slug + ".md"]
else:
# Fuzzy match against claim filenames
best_score = 0
best_match = None
ref_norm = normalize_for_match(claim_ref)
for cslug, cpath in claim_by_slug.items():
cslug_norm = normalize_for_match(cslug)
score = SequenceMatcher(None, ref_norm, cslug_norm).ratio()
if score > best_score:
best_score = score
best_match = cpath
if best_score >= 0.85:
matched = best_match
if matched:
if matched not in known_links:
known_links[matched] = []
known_links[matched].append(spath)
else:
unresolved.append((spath, claim_ref))
return known_links, unresolved, claim_by_slug
def match_unlinked_sources(sources, claims, claim_by_slug):
"""For sources without claims_extracted, try to match via claim source: field."""
unlinked_sources = {k: v for k, v in sources.items()
if not v["claims_extracted"] and v["status"] in ("processed", "enrichment")}
# Build author last-name index for sources
# Key: normalized last name or handle -> [source paths]
author_index = defaultdict(list)
for spath, source in unlinked_sources.items():
author = source.get("author", "")
if not author:
continue
# Extract meaningful name parts (last names, handles)
for part in re.split(r'[,;&()\[\]]', author):
part = part.strip().strip('"').strip("'")
words = part.split()
for w in words:
w_clean = re.sub(r'[^\w]', '', w).lower()
if len(w_clean) >= 4 and w_clean not in (
"analysis", "research", "report", "paper", "journal",
"multiple", "authors", "various", "company", "team",
"university", "institute", "foundation", "network",
"open", "source", "national", "international",
):
author_index[w_clean].append(spath)
inferred_forward = defaultdict(list) # source_path -> [match dicts]
inferred_reverse = defaultdict(list) # claim_path -> [match dicts]
for cpath, claim in claims.items():
source_field = str(claim.get("source_field", ""))
if not source_field:
continue
source_field_norm = normalize_for_match(source_field)
claim_domain = claim.get("domain", "")
# Find candidate sources by author mention in source field
candidates = set()
for author_key, spaths in author_index.items():
if author_key in source_field_norm:
candidates.update(spaths)
# Also check domain-matched sources (but require stronger evidence)
for spath, source in unlinked_sources.items():
if source.get("domain") == claim_domain and spath not in candidates:
candidates.add(spath)
for spath in candidates:
source = unlinked_sources[spath]
score = 0
reasons = []
author = source.get("author", "")
title = source.get("title", "")
s_domain = source.get("domain", "")
# Strong signal: author last name in claim's source field
author_matched = False
if author:
# Extract significant name parts
name_parts = []
for part in re.split(r'[,;&()\[\]]', author):
part = part.strip().strip('"').strip("'")
words = part.split()
for w in words:
w_clean = re.sub(r'[^\w]', '', w).lower()
if len(w_clean) >= 4:
name_parts.append(w_clean)
matched_names = [n for n in name_parts if n in source_field_norm]
if matched_names:
author_matched = True
score += 0.4
reasons.append(f"author:{','.join(matched_names[:2])}")
# Strong signal: source title keywords in claim's source field
title_matched = False
if title:
title_words = [w for w in normalize_for_match(title).split()
if len(w) >= 5 and w not in (
"about", "their", "these", "those", "which",
"would", "could", "should", "being", "having",
"through", "between", "during", "before", "after",
)]
if title_words:
hits = sum(1 for w in title_words if w in source_field_norm)
ratio = hits / len(title_words)
if ratio >= 0.4:
title_matched = True
score += ratio * 0.5
reasons.append(f"title:{ratio:.0%}({hits}/{len(title_words)})")
# Require BOTH author AND title match
# Author alone is too noisy (one author has many sources)
# Domain alone adds nothing meaningful
if not (author_matched and title_matched):
continue
if score >= 0.6:
match_info = {
"claim_path": cpath,
"claim_title": claim["title"],
"source_path": spath,
"score": score,
"reasons": reasons,
}
inferred_forward[spath].append(match_info)
inferred_reverse[cpath].append(match_info)
return dict(inferred_forward), dict(inferred_reverse)
def write_sourced_from(claim_path, source_paths, dry_run=True):
"""Add sourced_from: field to claim frontmatter."""
p = Path(claim_path) if not isinstance(claim_path, Path) else claim_path
if not p.is_absolute():
p = CODEX / p
text = p.read_text(encoding="utf-8")
if "sourced_from:" in text:
return False # already has it
# Insert sourced_from after source: line
lines = text.split("\n")
insert_idx = None
in_frontmatter = False
for i, line in enumerate(lines):
if line.strip() == "---":
if not in_frontmatter:
in_frontmatter = True
continue
else:
# End of frontmatter — insert before closing ---
insert_idx = i
break
if in_frontmatter and line.startswith("source:"):
insert_idx = i + 1
# Skip any continuation lines
while insert_idx < len(lines) and lines[insert_idx].startswith(" "):
insert_idx += 1
if insert_idx is None:
return False
# Build sourced_from block
sf_lines = ["sourced_from:"]
for sp in source_paths:
sf_lines.append(f"- {sp}")
lines[insert_idx:insert_idx] = sf_lines
if not dry_run:
p.write_text("\n".join(lines), encoding="utf-8")
return True
def write_claims_extracted(source_path, claim_titles, dry_run=True):
"""Add claims_extracted: field to source frontmatter."""
p = Path(source_path) if not isinstance(source_path, Path) else source_path
if not p.is_absolute():
p = CODEX / p
text = p.read_text(encoding="utf-8")
if "claims_extracted:" in text:
return False # already has it
lines = text.split("\n")
insert_idx = None
in_frontmatter = False
for i, line in enumerate(lines):
if line.strip() == "---":
if not in_frontmatter:
in_frontmatter = True
continue
else:
insert_idx = i
break
if insert_idx is None:
return False
ce_lines = ["claims_extracted:"]
for title in claim_titles:
safe_title = title.replace('"', '\\"')
ce_lines.append(f'- "{safe_title}"')
lines[insert_idx:insert_idx] = ce_lines
if not dry_run:
p.write_text("\n".join(lines), encoding="utf-8")
return True
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "--dry-run"
print("Loading sources...")
sources = load_all_sources()
print(f" {len(sources)} source files")
print("Loading claims...")
claims = load_all_claims()
print(f" {len(claims)} claim files")
print("\nPhase 1: Building known index from claims_extracted...")
known_links, unresolved, claim_by_slug = build_known_index(sources, claims)
print(f" {len(known_links)} claims linked to sources via claims_extracted")
print(f" {len(unresolved)} unresolved references in claims_extracted")
if mode == "--validate":
print("\n=== VALIDATION MODE ===")
print(f"Known links: {len(known_links)} claims -> sources")
for cpath, spaths in sorted(known_links.items())[:20]:
print(f" {Path(cpath).stem[:60]}...")
for sp in spaths:
print(f" <- {sp}")
print(f"\nUnresolved ({len(unresolved)}):")
for spath, ref in unresolved[:20]:
print(f" {Path(spath).stem[:40]}: {ref[:60]}")
return
print("\nPhase 2: Matching unlinked sources via claim source: field...")
inferred_fwd, inferred_rev = match_unlinked_sources(sources, claims, claim_by_slug)
print(f" {len(inferred_fwd)} sources matched to claims")
print(f" {len(inferred_rev)} claims matched to sources")
# Merge known + inferred reverse links
all_reverse = defaultdict(list)
for cpath, spaths in known_links.items():
all_reverse[cpath].extend(spaths)
for cpath, matches in inferred_rev.items():
for m in matches:
if m["source_path"] not in all_reverse[cpath]:
all_reverse[cpath].append(m["source_path"])
# Merge known + inferred forward links
all_forward = defaultdict(list)
for spath, source in sources.items():
for claim_ref in source["claims_extracted"]:
ref_slug = slugify(claim_ref.rstrip(".md"))
if ref_slug in claim_by_slug:
all_forward[spath].append(claims[claim_by_slug[ref_slug]]["title"])
for spath, matches in inferred_fwd.items():
for m in matches:
title = m["claim_title"]
if title not in all_forward[spath]:
all_forward[spath].append(title)
print(f"\nTotal: {len(all_reverse)} claims with source links")
print(f"Total: {len(all_forward)} sources with claim links")
if mode == "--dry-run":
print("\n=== DRY RUN — no files modified ===")
print(f"\nWould write sourced_from: to {len(all_reverse)} claim files")
print(f"Would write claims_extracted: to {len([k for k in all_forward if k not in {s for s in sources if sources[s]['claims_extracted']}])} source files")
# Show samples
print("\nSample reverse links (claim -> source):")
for cpath, spaths in sorted(all_reverse.items())[:10]:
print(f" {Path(cpath).stem[:60]}")
for sp in spaths:
print(f" <- {sp}")
print("\nSample inferred forward links (source -> claims):")
for spath, matches in sorted(inferred_fwd.items())[:10]:
print(f" {Path(spath).stem[:50]} (score={matches[0]['score']:.2f})")
for m in matches[:3]:
print(f" -> {m['claim_title'][:60]} ({', '.join(m['reasons'])})")
elif mode == "--apply":
print("\n=== APPLYING CHANGES ===")
# Write sourced_from to claims
wrote_reverse = 0
for cpath, spaths in all_reverse.items():
if write_sourced_from(cpath, spaths, dry_run=False):
wrote_reverse += 1
# Write claims_extracted to sources (only unlinked ones)
wrote_forward = 0
for spath, titles in all_forward.items():
if sources[spath]["claims_extracted"]:
continue # already has it
if write_claims_extracted(spath, titles, dry_run=False):
wrote_forward += 1
print(f" Wrote sourced_from: to {wrote_reverse} claims")
print(f" Wrote claims_extracted: to {wrote_forward} sources")
if __name__ == "__main__":
main()