"""Post-extraction validator — deterministic fixes and quality gate. Runs AFTER LLM extraction, BEFORE git commit. Pure Python, $0 cost. Catches the mechanical issues that account for 73% of eval rejections: - Frontmatter schema violations (missing/invalid fields) - Broken wiki links (strips brackets, keeps text) - Date errors (wrong format, source date instead of today) - Filename convention violations - Title precision (too short, not a proposition) - Duplicate detection against existing KB Design principles (Leo): - Mechanical rules belong in code, not prompts - Fix what's fixable, reject what's not - Never silently drop content — log everything Epimetheus owns this module. Leo reviews changes. """ import json import logging import os import re from datetime import date, datetime from difflib import SequenceMatcher from pathlib import Path logger = logging.getLogger("pipeline.post_extract") # ─── Constants ────────────────────────────────────────────────────────────── VALID_DOMAINS = frozenset({ "internet-finance", "entertainment", "health", "ai-alignment", "space-development", "grand-strategy", "mechanisms", "living-capital", "living-agents", "teleohumanity", "critical-systems", "collective-intelligence", "teleological-economics", "cultural-dynamics", }) VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) REQUIRED_CLAIM_FIELDS = ("type", "domain", "description", "confidence", "source", "created") REQUIRED_ENTITY_FIELDS = ("type", "domain", "description") WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]") # Minimum title word count for claims (Leo: titles must name specific mechanism) MIN_TITLE_WORDS = 8 DEDUP_THRESHOLD = 0.85 # ─── YAML parsing ────────────────────────────────────────────────────────── def parse_frontmatter(text: str) -> tuple[dict | None, str]: """Extract YAML frontmatter from markdown. Returns (frontmatter_dict, body).""" if not text.startswith("---"): return None, text end = text.find("---", 3) if end == -1: return None, text raw = text[3:end] body = text[end + 3:].strip() try: import yaml fm = yaml.safe_load(raw) if not isinstance(fm, dict): return None, body return fm, body except ImportError: pass except Exception: return None, body # Fallback: simple key-value parser fm = {} for line in raw.strip().split("\n"): line = line.strip() if not line or line.startswith("#"): continue if ":" not in line: continue key, _, val = line.partition(":") key = key.strip() val = val.strip().strip('"').strip("'") if val.lower() == "null" or val == "": val = None elif val.startswith("["): val = [v.strip().strip('"').strip("'") for v in val.strip("[]").split(",") if v.strip()] fm[key] = val return fm if fm else None, body # ─── Fixers (modify content, return fixed version) ───────────────────────── def fix_frontmatter(content: str, domain: str, agent: str) -> tuple[str, list[str]]: """Fix common frontmatter issues. Returns (fixed_content, list_of_fixes_applied).""" fixes = [] fm, body = parse_frontmatter(content) if fm is None: return content, ["unfixable:no_frontmatter"] changed = False ftype = fm.get("type", "claim") # Fix 1: created = extraction date, always today. No parsing, no comparison. # "created" means "when this was extracted," period. Source publication date # belongs in a separate field if needed. (Ganymede review) today_str = date.today().isoformat() if ftype == "claim": old_created = fm.get("created") fm["created"] = today_str if old_created != today_str: fixes.append(f"set_created:{today_str}") changed = True # Fix 2: type field if "type" not in fm: fm["type"] = "claim" fixes.append("added_type:claim") changed = True # Fix 3: domain field if "domain" not in fm or fm["domain"] not in VALID_DOMAINS: fm["domain"] = domain fixes.append(f"fixed_domain:{fm.get('domain', 'missing')}->{domain}") changed = True # Fix 4: confidence field (claims only) if ftype == "claim": conf = fm.get("confidence") if conf is None: fm["confidence"] = "experimental" fixes.append("added_confidence:experimental") changed = True elif conf not in VALID_CONFIDENCE: fm["confidence"] = "experimental" fixes.append(f"fixed_confidence:{conf}->experimental") changed = True # Fix 5: description field if "description" not in fm or not fm["description"]: # Try to derive from body's first sentence first_sentence = body.split(".")[0].strip().lstrip("# ") if body else "" if first_sentence and len(first_sentence) > 10: fm["description"] = first_sentence[:200] fixes.append("derived_description_from_body") changed = True # Fix 6: source field (claims only) if ftype == "claim" and ("source" not in fm or not fm["source"]): fm["source"] = f"extraction by {agent}" fixes.append("added_default_source") changed = True if not changed: return content, [] # Reconstruct frontmatter return _rebuild_content(fm, body), fixes def fix_wiki_links(content: str, existing_claims: set[str]) -> tuple[str, list[str]]: """Fix or strip broken wiki links. Resolves slug→space mismatches before stripping. The LLM often generates wiki links as slugs (hyphens) but KB filenames use spaces. Try normalizing hyphens→spaces before giving up and stripping brackets. """ fixes = [] # Build a lookup: normalized (lowercased, hyphens→spaces) → original stem _normalized_lookup: dict[str, str] = {} for stem in existing_claims: _normalized_lookup[stem.lower().replace("-", " ")] = stem def replace_broken(match): link = match.group(1).strip() if link in existing_claims: return match.group(0) # Exact match — keep as-is # Try normalizing slug to spaces normalized = link.lower().replace("-", " ") if normalized in _normalized_lookup: resolved = _normalized_lookup[normalized] fixes.append(f"resolved_wiki_link:{link[:40]}->{resolved[:40]}") return f"[[{resolved}]]" fixes.append(f"stripped_wiki_link:{link[:60]}") return link # Keep text, remove brackets fixed = WIKI_LINK_RE.sub(replace_broken, content) return fixed, fixes def fix_trailing_newline(content: str) -> tuple[str, list[str]]: """Ensure file ends with exactly one newline.""" if not content.endswith("\n"): return content + "\n", ["added_trailing_newline"] return content, [] def fix_h1_title_match(content: str, filename: str) -> tuple[str, list[str]]: """Ensure the content has an H1 title. Does NOT replace existing H1s. The H1 title in the content is authoritative — the filename is derived from it and may be truncated or slightly different. We only add a missing H1, never overwrite an existing one. """ expected_title = Path(filename).stem.replace("-", " ") fm, body = parse_frontmatter(content) if fm is None: return content, [] # Find existing H1 h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) if h1_match: # H1 exists — leave it alone. The content's H1 is authoritative. return content, [] elif body and not body.startswith("#"): # No H1 at all — add one derived from filename body = f"# {expected_title}\n\n{body}" return _rebuild_content(fm, body), ["added_h1_title"] return content, [] # ─── Validators (check without modifying, return issues) ────────────────── def validate_claim(filename: str, content: str, existing_claims: set[str], agent: str | None = None) -> list[str]: """Validate a claim file. Returns list of issues (empty = pass).""" issues = [] fm, body = parse_frontmatter(content) if fm is None: return ["no_frontmatter"] ftype = fm.get("type", "claim") # Schema check required = REQUIRED_CLAIM_FIELDS if ftype == "claim" else REQUIRED_ENTITY_FIELDS for field in required: if field not in fm or fm[field] is None: issues.append(f"missing_field:{field}") # Domain check domain = fm.get("domain") if domain and domain not in VALID_DOMAINS: issues.append(f"invalid_domain:{domain}") # Confidence check (claims only) if ftype == "claim": conf = fm.get("confidence") if conf and conf not in VALID_CONFIDENCE: issues.append(f"invalid_confidence:{conf}") # Title checks (claims only, not entities) # Use H1 from body if available (authoritative), fall back to filename if ftype in ("claim", "framework"): h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) title = h1_match.group(1).strip() if h1_match else Path(filename).stem.replace("-", " ") words = title.split() # Always enforce minimum 4 words — a 2-3 word title is never specific # enough to disagree with. (Ganymede review) if len(words) < 4: issues.append("title_too_few_words") elif len(words) < 8: # For 4-7 word titles, also require a verb/connective has_verb = bool(re.search( r"\b(is|are|was|were|will|would|can|could|should|must|has|have|had|" r"does|did|do|may|might|shall|" r"because|therefore|however|although|despite|since|through|by|" r"when|where|while|if|unless|" r"rather than|instead of|not just|more than|" r"\w+(?:s|ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns))\b", title, re.IGNORECASE, )) if not has_verb: issues.append("title_not_proposition") # Description quality desc = fm.get("description", "") if isinstance(desc, str) and len(desc.strip()) < 10: issues.append("description_too_short") # Attribution check: extractor must be identified. (Leo: block extractor, warn sourcer) if ftype == "claim": from .attribution import validate_attribution issues.extend(validate_attribution(fm, agent=agent)) # OPSEC check: flag claims containing dollar amounts + internal entity references. # Rio's rule: never extract LivingIP/Teleo deal terms to public codex. (Ganymede review) if ftype == "claim": combined_text = (title + " " + desc + " " + body).lower() has_dollar = bool(re.search(r"\$[\d,.]+[mkb]?\b", combined_text, re.IGNORECASE)) has_internal = bool(re.search( r"\b(livingip|teleo|internal|deal terms?|valuation|equity percent)", combined_text, re.IGNORECASE, )) if has_dollar and has_internal: issues.append("opsec_internal_deal_terms") # Body substance check (claims only) if ftype == "claim" and body: # Strip the H1 title line and check remaining content body_no_h1 = re.sub(r"^# .+\n*", "", body).strip() # Remove "Relevant Notes" and "Topics" sections body_content = re.split(r"\n---\n", body_no_h1)[0].strip() if len(body_content) < 50: issues.append("body_too_thin") # Near-duplicate check (claims only, not entities) if ftype != "entity": title_lower = Path(filename).stem.replace("-", " ").lower() title_words = set(title_lower.split()[:6]) for existing in existing_claims: # Normalize existing stem: hyphens → spaces for consistent comparison existing_normalized = existing.replace("-", " ").lower() if len(title_words & set(existing_normalized.split()[:6])) < 2: continue ratio = SequenceMatcher(None, title_lower, existing_normalized).ratio() if ratio >= DEDUP_THRESHOLD: issues.append(f"near_duplicate:{existing[:80]}") break # One is enough to flag return issues # ─── Main entry point ────────────────────────────────────────────────────── def validate_and_fix_claims( claims: list[dict], domain: str, agent: str, existing_claims: set[str], repo_root: str = ".", ) -> tuple[list[dict], list[dict], dict]: """Validate and fix extracted claims. Returns (kept_claims, rejected_claims, stats). Each claim dict has: filename, domain, content Returned claims have content fixed where possible. Stats: {total, kept, fixed, rejected, fixes_applied: [...], rejections: [...]} """ kept = [] rejected = [] all_fixes = [] all_rejections = [] # Add intra-batch stems to existing claims (avoid false positive duplicates within same extraction) batch_stems = {Path(c["filename"]).stem for c in claims} existing_plus_batch = existing_claims | batch_stems for claim in claims: filename = claim.get("filename", "") content = claim.get("content", "") claim_domain = claim.get("domain", domain) if not filename or not content: rejected.append(claim) all_rejections.append(f"{filename or '?'}:missing_filename_or_content") continue # Phase 1: Apply fixers content, fixes1 = fix_frontmatter(content, claim_domain, agent) content, fixes2 = fix_wiki_links(content, existing_plus_batch) content, fixes3 = fix_trailing_newline(content) content, fixes4 = fix_h1_title_match(content, filename) fixes = fixes1 + fixes2 + fixes3 + fixes4 if fixes: all_fixes.extend([f"{filename}:{f}" for f in fixes]) # Phase 2: Validate (after fixes) issues = validate_claim(filename, content, existing_claims, agent=agent) # Separate hard failures from warnings hard_failures = [i for i in issues if not i.startswith("near_duplicate")] warnings = [i for i in issues if i.startswith("near_duplicate")] if hard_failures: rejected.append({**claim, "content": content, "issues": hard_failures}) all_rejections.extend([f"{filename}:{i}" for i in hard_failures]) else: if warnings: all_fixes.extend([f"{filename}:WARN:{w}" for w in warnings]) kept.append({**claim, "content": content}) stats = { "total": len(claims), "kept": len(kept), "fixed": len([f for f in all_fixes if ":WARN:" not in f]), "rejected": len(rejected), "fixes_applied": all_fixes, "rejections": all_rejections, } logger.info( "Post-extraction: %d/%d claims kept (%d fixed, %d rejected)", stats["kept"], stats["total"], stats["fixed"], stats["rejected"], ) return kept, rejected, stats def validate_and_fix_entities( entities: list[dict], domain: str, existing_claims: set[str], ) -> tuple[list[dict], list[dict], dict]: """Validate and fix extracted entities. Returns (kept, rejected, stats). Lighter validation than claims — entities are factual records, not arguable propositions. """ kept = [] rejected = [] all_issues = [] for ent in entities: filename = ent.get("filename", "") content = ent.get("content", "") action = ent.get("action", "create") if not filename: rejected.append(ent) all_issues.append("missing_filename") continue issues = [] if action == "create" and content: fm, body = parse_frontmatter(content) if fm is None: issues.append("no_frontmatter") else: if fm.get("type") != "entity": issues.append("wrong_type") if "entity_type" not in fm: issues.append("missing_entity_type") if "domain" not in fm: issues.append("missing_domain") # decision_market specific checks if fm.get("entity_type") == "decision_market": for field in ("parent_entity", "platform", "category", "status"): if field not in fm: issues.append(f"dm_missing:{field}") # Fix trailing newline if content and not content.endswith("\n"): ent["content"] = content + "\n" elif action == "update": timeline = ent.get("timeline_entry", "") if not timeline: issues.append("update_no_timeline") if issues: rejected.append({**ent, "issues": issues}) all_issues.extend([f"{filename}:{i}" for i in issues]) else: kept.append(ent) stats = { "total": len(entities), "kept": len(kept), "rejected": len(rejected), "issues": all_issues, } return kept, rejected, stats def load_existing_claims_from_repo(repo_root: str) -> set[str]: """Build set of known claim/entity stems from the repo.""" claims: set[str] = set() base = Path(repo_root) for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas", "entities"]: full = base / subdir if not full.is_dir(): continue for f in full.rglob("*.md"): claims.add(f.stem) return claims # ─── Helpers ──────────────────────────────────────────────────────────────── def _rebuild_content(fm: dict, body: str) -> str: """Rebuild markdown content from frontmatter dict and body.""" # Order frontmatter fields consistently field_order = ["type", "entity_type", "name", "domain", "description", "confidence", "source", "created", "status", "parent_entity", "platform", "proposer", "proposal_url", "proposal_date", "resolution_date", "category", "summary", "tracked_by", "secondary_domains", "challenged_by"] lines = ["---"] written = set() for field in field_order: if field in fm and fm[field] is not None: lines.append(_yaml_line(field, fm[field])) written.add(field) # Write remaining fields not in the order list for key, val in fm.items(): if key not in written and val is not None: lines.append(_yaml_line(key, val)) lines.append("---") lines.append("") lines.append(body) content = "\n".join(lines) if not content.endswith("\n"): content += "\n" return content def _yaml_line(key: str, val) -> str: """Format a single YAML key-value line.""" if isinstance(val, dict): # Nested YAML block (e.g. attribution with sub-keys) lines = [f"{key}:"] for sub_key, sub_val in val.items(): if isinstance(sub_val, list) and sub_val: lines.append(f" {sub_key}:") for item in sub_val: if isinstance(item, dict): first = True for ik, iv in item.items(): prefix = " - " if first else " " lines.append(f'{prefix}{ik}: "{iv}"') first = False else: lines.append(f' - "{item}"') else: lines.append(f" {sub_key}: []") return "\n".join(lines) if isinstance(val, list): return f"{key}: {json.dumps(val)}" if isinstance(val, bool): return f"{key}: {'true' if val else 'false'}" if isinstance(val, (int, float)): return f"{key}: {val}" if isinstance(val, date): return f"{key}: {val.isoformat()}" # String — quote if it contains special chars s = str(val) if any(c in s for c in ":#{}[]|>&*!%@`"): return f'{key}: "{s}"' return f"{key}: {s}"