"""Claim index generator — structured index of all KB claims. Produces claim-index.json: every claim with title, domain, confidence, wiki links (outgoing + incoming counts), created date, word count, challenged_by status. Consumed by: - Argus (diagnostics dashboard — charts, vital signs) - Vida (KB health diagnostics — orphan ratio, linkage density, freshness) - Extraction prompt (KB index for dedup — could replace /tmp/kb-indexes/) Generated after each merge (post-merge hook) or on demand. Served via GET /claim-index on the health API. Epimetheus owns this module. """ import json import logging import re from datetime import date, datetime from pathlib import Path from . import config logger = logging.getLogger("pipeline.claim_index") WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]") def _parse_frontmatter(text: str) -> dict | None: """Quick YAML frontmatter parser.""" if not text.startswith("---"): return None end = text.find("---", 3) if end == -1: return None raw = text[3:end] try: import yaml fm = yaml.safe_load(raw) return fm if isinstance(fm, dict) else None except ImportError: pass except Exception: return None # Fallback parser fm = {} for line in raw.strip().split("\n"): line = line.strip() if not line or line.startswith("#"): continue if ":" not in line: continue key, _, val = line.partition(":") key = key.strip() val = val.strip().strip('"').strip("'") if val.lower() == "null" or val == "": val = None fm[key] = val return fm if fm else None def build_claim_index(repo_root: str | None = None) -> dict: """Build the full claim index from the repo. Returns {generated_at, total_claims, claims: [...], domains: {...}} """ base = Path(repo_root) if repo_root else config.MAIN_WORKTREE claims = [] all_stems: dict[str, str] = {} # stem → filepath (for incoming link counting) # Phase 1: Collect all claims with outgoing links for subdir in ["domains", "core", "foundations", "decisions"]: full = base / subdir if not full.is_dir(): continue for f in full.rglob("*.md"): if f.name.startswith("_"): continue try: content = f.read_text() except Exception: continue fm = _parse_frontmatter(content) if fm is None: continue ftype = fm.get("type") if ftype not in ("claim", "framework", None): continue # Skip entities, sources, etc. # Extract wiki links body_start = content.find("---", 3) body = content[body_start + 3:] if body_start > 0 else content outgoing_links = [link.strip() for link in WIKI_LINK_RE.findall(body) if link.strip()] # Relative path from repo root rel_path = str(f.relative_to(base)) # Word count (body only, not frontmatter) body_text = re.sub(r"^# .+\n", "", body).strip() body_text = re.split(r"\n---\n", body_text)[0] # Before Relevant Notes word_count = len(body_text.split()) # Check for challenged_by has_challenged_by = bool(fm.get("challenged_by")) # Created date created = fm.get("created") if isinstance(created, date): created = created.isoformat() claim = { "file": rel_path, "stem": f.stem, "title": f.stem.replace("-", " "), "domain": fm.get("domain", subdir), "confidence": fm.get("confidence"), "created": created, "outgoing_links": outgoing_links, "outgoing_count": len(outgoing_links), "incoming_count": 0, # Computed in phase 2 "has_challenged_by": has_challenged_by, "word_count": word_count, "type": ftype or "claim", } claims.append(claim) all_stems[f.stem] = rel_path # Phase 2: Count incoming links incoming_counts: dict[str, int] = {} for claim in claims: for link in claim["outgoing_links"]: if link in all_stems: incoming_counts[link] = incoming_counts.get(link, 0) + 1 for claim in claims: claim["incoming_count"] = incoming_counts.get(claim["stem"], 0) # Domain summary domain_counts: dict[str, int] = {} for claim in claims: d = claim["domain"] domain_counts[d] = domain_counts.get(d, 0) + 1 # Orphan detection (0 incoming links) orphans = sum(1 for c in claims if c["incoming_count"] == 0) # Cross-domain links cross_domain_links = 0 for claim in claims: claim_domain = claim["domain"] for link in claim["outgoing_links"]: if link in all_stems: # Find the linked claim's domain for other in claims: if other["stem"] == link and other["domain"] != claim_domain: cross_domain_links += 1 break index = { "generated_at": datetime.utcnow().isoformat() + "Z", "total_claims": len(claims), "domains": domain_counts, "orphan_count": orphans, "orphan_ratio": round(orphans / len(claims), 3) if claims else 0, "cross_domain_links": cross_domain_links, "claims": claims, } return index def write_claim_index(repo_root: str | None = None, output_path: str | None = None) -> str: """Build and write claim-index.json. Returns the output path.""" index = build_claim_index(repo_root) if output_path is None: output_path = str(Path.home() / ".pentagon" / "workspace" / "collective" / "claim-index.json") Path(output_path).parent.mkdir(parents=True, exist_ok=True) # Atomic write tmp = output_path + ".tmp" with open(tmp, "w") as f: json.dump(index, f, indent=2) import os os.rename(tmp, output_path) logger.info("Wrote claim-index.json: %d claims, %d orphans, %d cross-domain links", index["total_claims"], index["orphan_count"], index["cross_domain_links"]) return output_path