"""Cross-domain citation index — detect entity overlap across domains. Hook point: called from merge.py after cascade_after_merge. After a claim merges, checks if its referenced entities also appear in claims from other domains. Logs connections to audit_log for silo detection. Two detection methods: 1. Entity name matching — entity names appearing in claim body text (word-boundary) 2. Source overlap — claims citing the same source archive files At ~600 claims and ~100 entities, full scan per merge takes <1 second. """ import asyncio import json import logging import os import re from pathlib import Path logger = logging.getLogger("pipeline.cross_domain") # Minimum entity name length to avoid false positives (ORE, QCX, etc) MIN_ENTITY_NAME_LEN = 4 # Entity names that are common English words — skip to avoid false positives ENTITY_STOPLIST = {"versus", "island", "loyal", "saber", "nebula", "helium", "coal", "snapshot", "dropout"} def _build_entity_names(worktree: Path) -> dict[str, str]: """Build mapping of entity_slug -> display_name from entity files.""" names = {} entity_dir = worktree / "entities" if not entity_dir.exists(): return names for md_file in entity_dir.rglob("*.md"): if md_file.name.startswith("_"): continue try: content = md_file.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError): continue for line in content.split("\n"): if line.startswith("name:"): name = line.split(":", 1)[1].strip().strip('"').strip("'") if len(name) >= MIN_ENTITY_NAME_LEN and name.lower() not in ENTITY_STOPLIST: names[md_file.stem] = name break return names def _compile_entity_patterns(entity_names: dict[str, str]) -> dict[str, re.Pattern]: """Pre-compile word-boundary regex for each entity name.""" patterns = {} for slug, name in entity_names.items(): try: patterns[slug] = re.compile(r'\b' + re.escape(name) + r'\b', re.IGNORECASE) except re.error: continue return patterns def _extract_source_refs(content: str) -> set[str]: """Extract source archive references ([[YYYY-MM-DD-...]]) from content.""" return set(re.findall(r"\[\[(20\d{2}-\d{2}-\d{2}-[^\]]+)\]\]", content)) def _find_entity_mentions(content: str, patterns: dict[str, re.Pattern]) -> set[str]: """Find entity slugs whose names appear in the content (word-boundary match).""" found = set() for slug, pat in patterns.items(): if pat.search(content): found.add(slug) return found def _scan_domain_claims(worktree: Path, patterns: dict[str, re.Pattern]) -> dict[str, list[dict]]: """Build domain -> [claim_info] mapping for all claims.""" domain_claims = {} domains_dir = worktree / "domains" if not domains_dir.exists(): return domain_claims for domain_dir in domains_dir.iterdir(): if not domain_dir.is_dir(): continue claims = [] for claim_file in domain_dir.glob("*.md"): if claim_file.name.startswith("_") or claim_file.name == "directory.md": continue try: content = claim_file.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError): continue claims.append({ "slug": claim_file.stem, "entities": _find_entity_mentions(content, patterns), "sources": _extract_source_refs(content), }) domain_claims[domain_dir.name] = claims return domain_claims async def cross_domain_after_merge( main_sha: str, branch_sha: str, pr_num: int, main_worktree: Path, conn=None, ) -> int: """Detect cross-domain entity/source overlap for claims changed in this merge. Returns the number of cross-domain connections found. """ # 1. Get changed files proc = await asyncio.create_subprocess_exec( "git", "diff", "--name-only", "--diff-filter=ACMR", main_sha, branch_sha, cwd=str(main_worktree), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) except asyncio.TimeoutError: proc.kill() await proc.wait() logger.warning("cross_domain: git diff timed out") return 0 if proc.returncode != 0: return 0 diff_files = [f for f in stdout.decode().strip().split("\n") if f] # 2. Filter to claim files changed_claims = [] for fpath in diff_files: if not fpath.endswith(".md") or not fpath.startswith("domains/"): continue parts = fpath.split("/") if len(parts) < 3: continue basename = os.path.basename(fpath) if basename.startswith("_") or basename == "directory.md": continue changed_claims.append({"path": fpath, "domain": parts[1], "slug": Path(basename).stem}) if not changed_claims: return 0 # 3. Build entity patterns and scan all claims entity_names = _build_entity_names(main_worktree) if not entity_names: return 0 patterns = _compile_entity_patterns(entity_names) domain_claims = _scan_domain_claims(main_worktree, patterns) # 4. For each changed claim, find cross-domain connections total_connections = 0 all_connections = [] for claim in changed_claims: claim_path = main_worktree / claim["path"] try: content = claim_path.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError): continue my_entities = _find_entity_mentions(content, patterns) my_sources = _extract_source_refs(content) if not my_entities and not my_sources: continue connections = [] for other_domain, other_claims in domain_claims.items(): if other_domain == claim["domain"]: continue for other in other_claims: shared_entities = my_entities & other["entities"] shared_sources = my_sources & other["sources"] # Threshold: >=2 shared entities, OR 1 entity + 1 source entity_count = len(shared_entities) source_count = len(shared_sources) if entity_count >= 2 or (entity_count >= 1 and source_count >= 1): connections.append({ "other_claim": other["slug"], "other_domain": other_domain, "shared_entities": sorted(shared_entities)[:5], "shared_sources": sorted(shared_sources)[:3], }) if connections: total_connections += len(connections) all_connections.append({ "claim": claim["slug"], "domain": claim["domain"], "connections": connections[:10], }) logger.info( "cross_domain: %s (%s) has %d cross-domain connections", claim["slug"], claim["domain"], len(connections), ) # 5. Log to audit_log if all_connections and conn is not None: try: conn.execute( "INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)", ("cross_domain", "connections_found", json.dumps({ "pr": pr_num, "total_connections": total_connections, "claims_with_connections": len(all_connections), "details": all_connections[:10], })), ) except Exception: logger.exception("cross_domain: audit_log write failed (non-fatal)") if total_connections: logger.info( "cross_domain: PR #%d — %d connections across %d claims", pr_num, total_connections, len(all_connections), ) return total_connections