#!/usr/bin/env python3 """ Bidirectional source↔claim linker. Phase 1: Build index from sources that already have claims_extracted Phase 2: For sources WITHOUT claims_extracted, fuzzy-match via claim source: field Phase 3: Write sourced_from: into claim frontmatter (reverse link) Phase 4: Backfill claims_extracted: into source frontmatter (forward link) Usage: python3 link-sources-claims.py --dry-run # report what would change python3 link-sources-claims.py --apply # write changes python3 link-sources-claims.py --validate # test against known-good pairs """ import os import re import sys import yaml import glob from pathlib import Path from collections import defaultdict from difflib import SequenceMatcher CODEX = Path("/Users/coryabdalla/Pentagon/teleo-codex") ARCHIVE = CODEX / "inbox" / "archive" DOMAINS = CODEX / "domains" NULLRESULT = CODEX / "inbox" / "null-result" def parse_frontmatter(filepath): """Extract YAML frontmatter from a markdown file.""" try: text = filepath.read_text(encoding="utf-8") except Exception: return None, None if not text.startswith("---"): return None, text parts = text.split("---", 2) if len(parts) < 3: return None, text try: fm = yaml.safe_load(parts[1]) body = parts[2] return fm, body except (yaml.YAMLError, ValueError, TypeError): return None, text def slugify(title): """Convert a claim title to its likely filename slug.""" s = title.lower().strip() s = re.sub(r'[^\w\s-]', '', s) s = re.sub(r'[\s_]+', '-', s) s = re.sub(r'-+', '-', s) return s.strip('-') def title_to_slug_variants(title): """Generate filename variants for matching.""" base = slugify(title) variants = [base, base + ".md"] # Also try without common trailing words return variants def normalize_for_match(s): """Normalize string for fuzzy matching.""" s = s.lower().strip() s = re.sub(r'[^\w\s]', '', s) s = re.sub(r'\s+', ' ', s) return s def load_all_sources(): """Load all source files with their metadata.""" sources = {} for pattern in [ str(ARCHIVE / "**" / "*.md"), str(CODEX / "inbox" / "*.md"), ]: for filepath in glob.glob(pattern, recursive=True): p = Path(filepath) fm, body = parse_frontmatter(p) if fm is None: continue rel = p.relative_to(CODEX) ce = fm.get("claims_extracted", []) if not isinstance(ce, list): ce = [] sources[str(rel)] = { "path": p, "rel": str(rel), "fm": fm, "body": body or "", "title": str(fm.get("title", "")), "author": str(fm.get("author", "")), "url": str(fm.get("url", "")), "domain": str(fm.get("domain", "")), "claims_extracted": ce, "status": str(fm.get("status", "")), } return sources def load_all_claims(): """Load all claim files with their metadata.""" claims = {} for filepath in glob.glob(str(DOMAINS / "**" / "*.md"), recursive=True): p = Path(filepath) if p.name.startswith("_") or p.name.startswith("divergence-"): continue fm, body = parse_frontmatter(p) if fm is None: continue if fm.get("type") != "claim": continue rel = p.relative_to(CODEX) claims[str(rel)] = { "path": p, "rel": str(rel), "fm": fm, "body": body or "", "title": p.stem, "source_field": fm.get("source", ""), "domain": fm.get("domain", ""), "sourced_from": fm.get("sourced_from", []) or [], } return claims def build_known_index(sources, claims): """Build index from sources that already have claims_extracted.""" # Map claim title slugs to claim paths claim_by_slug = {} for cpath, claim in claims.items(): slug = claim["title"] claim_by_slug[slug] = cpath # Also index without .md if slug.endswith(".md"): claim_by_slug[slug[:-3]] = cpath known_links = {} # claim_path -> [source_rel_path] unresolved = [] for spath, source in sources.items(): for claim_ref in source["claims_extracted"]: # claim_ref might be a title string or a filename ref_slug = slugify(claim_ref.rstrip(".md")) matched = None # Try exact slug match if ref_slug in claim_by_slug: matched = claim_by_slug[ref_slug] elif ref_slug + ".md" in claim_by_slug: matched = claim_by_slug[ref_slug + ".md"] else: # Fuzzy match against claim filenames best_score = 0 best_match = None ref_norm = normalize_for_match(claim_ref) for cslug, cpath in claim_by_slug.items(): cslug_norm = normalize_for_match(cslug) score = SequenceMatcher(None, ref_norm, cslug_norm).ratio() if score > best_score: best_score = score best_match = cpath if best_score >= 0.85: matched = best_match if matched: if matched not in known_links: known_links[matched] = [] known_links[matched].append(spath) else: unresolved.append((spath, claim_ref)) return known_links, unresolved, claim_by_slug def match_unlinked_sources(sources, claims, claim_by_slug): """For sources without claims_extracted, try to match via claim source: field.""" unlinked_sources = {k: v for k, v in sources.items() if not v["claims_extracted"] and v["status"] in ("processed", "enrichment")} # Build author last-name index for sources # Key: normalized last name or handle -> [source paths] author_index = defaultdict(list) for spath, source in unlinked_sources.items(): author = source.get("author", "") if not author: continue # Extract meaningful name parts (last names, handles) for part in re.split(r'[,;&()\[\]]', author): part = part.strip().strip('"').strip("'") words = part.split() for w in words: w_clean = re.sub(r'[^\w]', '', w).lower() if len(w_clean) >= 4 and w_clean not in ( "analysis", "research", "report", "paper", "journal", "multiple", "authors", "various", "company", "team", "university", "institute", "foundation", "network", "open", "source", "national", "international", ): author_index[w_clean].append(spath) inferred_forward = defaultdict(list) # source_path -> [match dicts] inferred_reverse = defaultdict(list) # claim_path -> [match dicts] for cpath, claim in claims.items(): source_field = str(claim.get("source_field", "")) if not source_field: continue source_field_norm = normalize_for_match(source_field) claim_domain = claim.get("domain", "") # Find candidate sources by author mention in source field candidates = set() for author_key, spaths in author_index.items(): if author_key in source_field_norm: candidates.update(spaths) # Also check domain-matched sources (but require stronger evidence) for spath, source in unlinked_sources.items(): if source.get("domain") == claim_domain and spath not in candidates: candidates.add(spath) for spath in candidates: source = unlinked_sources[spath] score = 0 reasons = [] author = source.get("author", "") title = source.get("title", "") s_domain = source.get("domain", "") # Strong signal: author last name in claim's source field author_matched = False if author: # Extract significant name parts name_parts = [] for part in re.split(r'[,;&()\[\]]', author): part = part.strip().strip('"').strip("'") words = part.split() for w in words: w_clean = re.sub(r'[^\w]', '', w).lower() if len(w_clean) >= 4: name_parts.append(w_clean) matched_names = [n for n in name_parts if n in source_field_norm] if matched_names: author_matched = True score += 0.4 reasons.append(f"author:{','.join(matched_names[:2])}") # Strong signal: source title keywords in claim's source field title_matched = False if title: title_words = [w for w in normalize_for_match(title).split() if len(w) >= 5 and w not in ( "about", "their", "these", "those", "which", "would", "could", "should", "being", "having", "through", "between", "during", "before", "after", )] if title_words: hits = sum(1 for w in title_words if w in source_field_norm) ratio = hits / len(title_words) if ratio >= 0.4: title_matched = True score += ratio * 0.5 reasons.append(f"title:{ratio:.0%}({hits}/{len(title_words)})") # Require BOTH author AND title match # Author alone is too noisy (one author has many sources) # Domain alone adds nothing meaningful if not (author_matched and title_matched): continue if score >= 0.6: match_info = { "claim_path": cpath, "claim_title": claim["title"], "source_path": spath, "score": score, "reasons": reasons, } inferred_forward[spath].append(match_info) inferred_reverse[cpath].append(match_info) return dict(inferred_forward), dict(inferred_reverse) def write_sourced_from(claim_path, source_paths, dry_run=True): """Add sourced_from: field to claim frontmatter.""" p = Path(claim_path) if not isinstance(claim_path, Path) else claim_path if not p.is_absolute(): p = CODEX / p text = p.read_text(encoding="utf-8") if "sourced_from:" in text: return False # already has it # Insert sourced_from after source: line lines = text.split("\n") insert_idx = None in_frontmatter = False for i, line in enumerate(lines): if line.strip() == "---": if not in_frontmatter: in_frontmatter = True continue else: # End of frontmatter — insert before closing --- insert_idx = i break if in_frontmatter and line.startswith("source:"): insert_idx = i + 1 # Skip any continuation lines while insert_idx < len(lines) and lines[insert_idx].startswith(" "): insert_idx += 1 if insert_idx is None: return False # Build sourced_from block sf_lines = ["sourced_from:"] for sp in source_paths: sf_lines.append(f"- {sp}") lines[insert_idx:insert_idx] = sf_lines if not dry_run: p.write_text("\n".join(lines), encoding="utf-8") return True def write_claims_extracted(source_path, claim_titles, dry_run=True): """Add claims_extracted: field to source frontmatter.""" p = Path(source_path) if not isinstance(source_path, Path) else source_path if not p.is_absolute(): p = CODEX / p text = p.read_text(encoding="utf-8") if "claims_extracted:" in text: return False # already has it lines = text.split("\n") insert_idx = None in_frontmatter = False for i, line in enumerate(lines): if line.strip() == "---": if not in_frontmatter: in_frontmatter = True continue else: insert_idx = i break if insert_idx is None: return False ce_lines = ["claims_extracted:"] for title in claim_titles: safe_title = title.replace('"', '\\"') ce_lines.append(f'- "{safe_title}"') lines[insert_idx:insert_idx] = ce_lines if not dry_run: p.write_text("\n".join(lines), encoding="utf-8") return True def main(): mode = sys.argv[1] if len(sys.argv) > 1 else "--dry-run" print("Loading sources...") sources = load_all_sources() print(f" {len(sources)} source files") print("Loading claims...") claims = load_all_claims() print(f" {len(claims)} claim files") print("\nPhase 1: Building known index from claims_extracted...") known_links, unresolved, claim_by_slug = build_known_index(sources, claims) print(f" {len(known_links)} claims linked to sources via claims_extracted") print(f" {len(unresolved)} unresolved references in claims_extracted") if mode == "--validate": print("\n=== VALIDATION MODE ===") print(f"Known links: {len(known_links)} claims -> sources") for cpath, spaths in sorted(known_links.items())[:20]: print(f" {Path(cpath).stem[:60]}...") for sp in spaths: print(f" <- {sp}") print(f"\nUnresolved ({len(unresolved)}):") for spath, ref in unresolved[:20]: print(f" {Path(spath).stem[:40]}: {ref[:60]}") return print("\nPhase 2: Matching unlinked sources via claim source: field...") inferred_fwd, inferred_rev = match_unlinked_sources(sources, claims, claim_by_slug) print(f" {len(inferred_fwd)} sources matched to claims") print(f" {len(inferred_rev)} claims matched to sources") # Merge known + inferred reverse links all_reverse = defaultdict(list) for cpath, spaths in known_links.items(): all_reverse[cpath].extend(spaths) for cpath, matches in inferred_rev.items(): for m in matches: if m["source_path"] not in all_reverse[cpath]: all_reverse[cpath].append(m["source_path"]) # Merge known + inferred forward links all_forward = defaultdict(list) for spath, source in sources.items(): for claim_ref in source["claims_extracted"]: ref_slug = slugify(claim_ref.rstrip(".md")) if ref_slug in claim_by_slug: all_forward[spath].append(claims[claim_by_slug[ref_slug]]["title"]) for spath, matches in inferred_fwd.items(): for m in matches: title = m["claim_title"] if title not in all_forward[spath]: all_forward[spath].append(title) print(f"\nTotal: {len(all_reverse)} claims with source links") print(f"Total: {len(all_forward)} sources with claim links") if mode == "--dry-run": print("\n=== DRY RUN — no files modified ===") print(f"\nWould write sourced_from: to {len(all_reverse)} claim files") print(f"Would write claims_extracted: to {len([k for k in all_forward if k not in {s for s in sources if sources[s]['claims_extracted']}])} source files") # Show samples print("\nSample reverse links (claim -> source):") for cpath, spaths in sorted(all_reverse.items())[:10]: print(f" {Path(cpath).stem[:60]}") for sp in spaths: print(f" <- {sp}") print("\nSample inferred forward links (source -> claims):") for spath, matches in sorted(inferred_fwd.items())[:10]: print(f" {Path(spath).stem[:50]} (score={matches[0]['score']:.2f})") for m in matches[:3]: print(f" -> {m['claim_title'][:60]} ({', '.join(m['reasons'])})") elif mode == "--apply": print("\n=== APPLYING CHANGES ===") # Write sourced_from to claims wrote_reverse = 0 for cpath, spaths in all_reverse.items(): if write_sourced_from(cpath, spaths, dry_run=False): wrote_reverse += 1 # Write claims_extracted to sources (only unlinked ones) wrote_forward = 0 for spath, titles in all_forward.items(): if sources[spath]["claims_extracted"]: continue # already has it if write_claims_extracted(spath, titles, dry_run=False): wrote_forward += 1 print(f" Wrote sourced_from: to {wrote_reverse} claims") print(f" Wrote claims_extracted: to {wrote_forward} sources") if __name__ == "__main__": main()