"""Post-extraction validator — deterministic fixes and quality gate. Runs AFTER LLM extraction, BEFORE git commit. Pure Python, $0 cost. Catches the mechanical issues that account for 73% of eval rejections: - Frontmatter schema violations (missing/invalid fields) - Broken wiki links (strips brackets, keeps text) - Date errors (wrong format, source date instead of today) - Filename convention violations - Title precision (too short, not a proposition) - Duplicate detection against existing KB Design principles (Leo): - Mechanical rules belong in code, not prompts - Fix what's fixable, reject what's not - Never silently drop content — log everything Epimetheus owns this module. Leo reviews changes. """ import json import logging import re from datetime import date, datetime from difflib import SequenceMatcher from pathlib import Path logger = logging.getLogger("pipeline.post_extract") # ─── Constants ────────────────────────────────────────────────────────────── VALID_DOMAINS = frozenset({ "internet-finance", "entertainment", "health", "ai-alignment", "space-development", "grand-strategy", "mechanisms", "living-capital", "living-agents", "teleohumanity", "critical-systems", "collective-intelligence", "teleological-economics", "cultural-dynamics", }) VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) REQUIRED_CLAIM_FIELDS = ("type", "domain", "description", "confidence", "source", "created") REQUIRED_ENTITY_FIELDS = ("type", "domain", "description") WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]") # Minimum title word count for claims (Leo: titles must name specific mechanism) MIN_TITLE_WORDS = 8 DEDUP_THRESHOLD = 0.85 # ─── YAML parsing ────────────────────────────────────────────────────────── def parse_frontmatter(text: str) -> tuple[dict | None, str]: """Extract YAML frontmatter from markdown. Returns (frontmatter_dict, body).""" if not text.startswith("---"): return None, text end = text.find("---", 3) if end == -1: return None, text raw = text[3:end] body = text[end + 3:].strip() try: import yaml fm = yaml.safe_load(raw) if not isinstance(fm, dict): return None, body for key, value in list(fm.items()): if isinstance(value, date | datetime): fm[key] = value.isoformat() return fm, body except ImportError: pass except Exception: return None, body # Fallback: simple key-value parser fm = {} for line in raw.strip().split("\n"): line = line.strip() if not line or line.startswith("#"): continue if ":" not in line: continue key, _, val = line.partition(":") key = key.strip() val = val.strip().strip('"').strip("'") if val.lower() == "null" or val == "": val = None elif val.startswith("["): val = [v.strip().strip('"').strip("'") for v in val.strip("[]").split(",") if v.strip()] fm[key] = val return fm if fm else None, body # ─── Fixers (modify content, return fixed version) ───────────────────────── def fix_frontmatter(content: str, domain: str, agent: str) -> tuple[str, list[str]]: """Fix common frontmatter issues. Returns (fixed_content, list_of_fixes_applied).""" fixes = [] fm, body = parse_frontmatter(content) if fm is None: return content, ["unfixable:no_frontmatter"] changed = False ftype = fm.get("type", "claim") # Fix 1: created = extraction date, always today. No parsing, no comparison. # "created" means "when this was extracted," period. Source publication date # belongs in a separate field if needed. (Ganymede review) today_str = date.today().isoformat() if ftype == "claim": old_created = fm.get("created") fm["created"] = today_str if old_created != today_str: fixes.append(f"set_created:{today_str}") changed = True # Fix 2: type field if "type" not in fm: fm["type"] = "claim" fixes.append("added_type:claim") changed = True # Fix 3: domain field if "domain" not in fm or fm["domain"] not in VALID_DOMAINS: fm["domain"] = domain fixes.append(f"fixed_domain:{fm.get('domain', 'missing')}->{domain}") changed = True # Fix 4: confidence field (claims only) if ftype == "claim": conf = fm.get("confidence") if conf is None: fm["confidence"] = "experimental" fixes.append("added_confidence:experimental") changed = True elif conf not in VALID_CONFIDENCE: fm["confidence"] = "experimental" fixes.append(f"fixed_confidence:{conf}->experimental") changed = True # Fix 5: description field if "description" not in fm or not fm["description"]: # Try to derive from the first non-empty body line. first_sentence = "" for line in body.splitlines(): first_sentence = line.strip().lstrip("# ") if first_sentence: first_sentence = first_sentence.split(".")[0].strip() break if first_sentence and len(first_sentence) > 10: fm["description"] = first_sentence[:200] fixes.append("derived_description_from_body") changed = True # Fix 6: source field (claims only) if ftype == "claim" and ("source" not in fm or not fm["source"]): fm["source"] = f"extraction by {agent}" fixes.append("added_default_source") changed = True if not changed: return content, [] # Reconstruct frontmatter return _rebuild_content(fm, body), fixes def fix_wiki_links(content: str, existing_claims: set[str]) -> tuple[str, list[str]]: """Fix or strip broken wiki links. Resolves slug→space mismatches before stripping. The LLM often generates wiki links as slugs (hyphens) but KB filenames use spaces. Try normalizing hyphens→spaces before giving up and stripping brackets. """ fixes = [] # Build a lookup: normalized (lowercased, hyphens→spaces) → original stem _normalized_lookup: dict[str, str] = {} for stem in existing_claims: _normalized_lookup[stem.lower().replace("-", " ")] = stem def replace_broken(match): link = match.group(1).strip() if link in existing_claims: return match.group(0) # Exact match — keep as-is # Try normalizing slug to spaces normalized = link.lower().replace("-", " ") if normalized in _normalized_lookup: resolved = _normalized_lookup[normalized] fixes.append(f"resolved_wiki_link:{link[:40]}->{resolved[:40]}") return f"[[{resolved}]]" fixes.append(f"stripped_wiki_link:{link[:60]}") return link # Keep text, remove brackets fixed = WIKI_LINK_RE.sub(replace_broken, content) return fixed, fixes def fix_trailing_newline(content: str) -> tuple[str, list[str]]: """Ensure file ends with exactly one newline.""" if not content.endswith("\n"): return content + "\n", ["added_trailing_newline"] return content, [] def fix_h1_title_match(content: str, filename: str) -> tuple[str, list[str]]: """Ensure the content has an H1 title. Does NOT replace existing H1s. The H1 title in the content is authoritative — the filename is derived from it and may be truncated or slightly different. We only add a missing H1, never overwrite an existing one. """ expected_title = Path(filename).stem.replace("-", " ") fm, body = parse_frontmatter(content) if fm is None: return content, [] # Find existing H1 h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) if h1_match: # H1 exists — leave it alone. The content's H1 is authoritative. return content, [] elif body and not body.startswith("#"): # No H1 at all — add one derived from filename body = f"# {expected_title}\n\n{body}" return _rebuild_content(fm, body), ["added_h1_title"] return content, [] # ─── Validators (check without modifying, return issues) ────────────────── def validate_claim(filename: str, content: str, existing_claims: set[str], agent: str | None = None) -> list[str]: """Validate a claim file. Returns list of issues (empty = pass).""" issues = [] fm, body = parse_frontmatter(content) if fm is None: return ["no_frontmatter"] ftype = fm.get("type", "claim") # Schema check required = REQUIRED_CLAIM_FIELDS if ftype == "claim" else REQUIRED_ENTITY_FIELDS for field in required: if field not in fm or fm[field] is None: issues.append(f"missing_field:{field}") # Domain check domain = fm.get("domain") if domain and domain not in VALID_DOMAINS: issues.append(f"invalid_domain:{domain}") # Confidence check (claims only) if ftype == "claim": conf = fm.get("confidence") if conf and conf not in VALID_CONFIDENCE: issues.append(f"invalid_confidence:{conf}") # Title checks (claims only, not entities) # Use H1 from body if available (authoritative), fall back to filename if ftype in ("claim", "framework"): h1_match = re.search(r"^# (.+)$", body, re.MULTILINE) title = h1_match.group(1).strip() if h1_match else Path(filename).stem.replace("-", " ") words = title.split() # Always enforce minimum 4 words — a 2-3 word title is never specific # enough to disagree with. (Ganymede review) if len(words) < 4: issues.append("title_too_few_words") elif len(words) < 8: # For 4-7 word titles, also require a verb/connective has_verb = bool(re.search( r"\b(is|are|was|were|will|would|can|could|should|must|has|have|had|" r"does|did|do|may|might|shall|" r"because|therefore|however|although|despite|since|through|by|" r"when|where|while|if|unless|" r"rather than|instead of|not just|more than|" r"\w+(?:s|ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns))\b", title, re.IGNORECASE, )) if not has_verb: issues.append("title_not_proposition") # Description quality desc = fm.get("description", "") if isinstance(desc, str) and len(desc.strip()) < 10: issues.append("description_too_short") # Attribution check: extractor must be identified. (Leo: block extractor, warn sourcer) if ftype == "claim": from .attribution import validate_attribution issues.extend(validate_attribution(fm, agent=agent)) # OPSEC check: flag claims containing dollar amounts + internal entity references. # Rio's rule: never extract LivingIP/Teleo deal terms to public codex. (Ganymede review) if ftype == "claim": combined_text = (title + " " + desc + " " + body).lower() has_dollar = bool(re.search(r"\$[\d,.]+[mkb]?\b", combined_text, re.IGNORECASE)) has_internal = bool(re.search( r"\b(livingip|teleo|internal|deal terms?|valuation|equity percent)", combined_text, re.IGNORECASE, )) if has_dollar and has_internal: issues.append("opsec_internal_deal_terms") # Body substance check (claims only) if ftype == "claim" and body: # Strip the H1 title line and check remaining content body_no_h1 = re.sub(r"^# .+\n*", "", body).strip() # Remove "Relevant Notes" and "Topics" sections body_content = re.split(r"\n---\n", body_no_h1)[0].strip() if len(body_content) < 50: issues.append("body_too_thin") # Near-duplicate check (claims only, not entities) if ftype != "entity": title_lower = Path(filename).stem.replace("-", " ").lower() title_words = set(title_lower.split()[:6]) for existing in existing_claims: # Normalize existing stem: hyphens → spaces for consistent comparison existing_normalized = existing.replace("-", " ").lower() if len(title_words & set(existing_normalized.split()[:6])) < 2: continue ratio = SequenceMatcher(None, title_lower, existing_normalized).ratio() if ratio >= DEDUP_THRESHOLD: issues.append(f"near_duplicate:{existing[:80]}") break # One is enough to flag return issues # ─── Main entry point ────────────────────────────────────────────────────── def validate_and_fix_claims( claims: list[dict], domain: str, agent: str, existing_claims: set[str], repo_root: str = ".", ) -> tuple[list[dict], list[dict], dict]: """Validate and fix extracted claims. Returns (kept_claims, rejected_claims, stats). Each claim dict has: filename, domain, content Returned claims have content fixed where possible. Stats: {total, kept, fixed, rejected, fixes_applied: [...], rejections: [...]} """ kept = [] rejected = [] all_fixes = [] all_rejections = [] # Add intra-batch stems to existing claims (avoid false positive duplicates within same extraction) batch_stems = {Path(c["filename"]).stem for c in claims} existing_plus_batch = existing_claims | batch_stems for claim in claims: filename = claim.get("filename", "") content = claim.get("content", "") claim_domain = claim.get("domain", domain) if not filename or not content: rejected.append(claim) all_rejections.append(f"{filename or '?'}:missing_filename_or_content") continue # Phase 1: Apply fixers content, fixes1 = fix_frontmatter(content, claim_domain, agent) content, fixes2 = fix_wiki_links(content, existing_plus_batch) content, fixes3 = fix_trailing_newline(content) content, fixes4 = fix_h1_title_match(content, filename) fixes = fixes1 + fixes2 + fixes3 + fixes4 if fixes: all_fixes.extend([f"{filename}:{f}" for f in fixes]) # Phase 2: Validate (after fixes) issues = validate_claim(filename, content, existing_claims, agent=agent) # Separate hard failures from warnings hard_failures = [i for i in issues if not i.startswith("near_duplicate")] warnings = [i for i in issues if i.startswith("near_duplicate")] if hard_failures: rejected.append({**claim, "content": content, "issues": hard_failures}) all_rejections.extend([f"{filename}:{i}" for i in hard_failures]) else: if warnings: all_fixes.extend([f"{filename}:WARN:{w}" for w in warnings]) kept.append({**claim, "content": content}) stats = { "total": len(claims), "kept": len(kept), "fixed": len([f for f in all_fixes if ":WARN:" not in f]), "rejected": len(rejected), "fixes_applied": all_fixes, "rejections": all_rejections, } logger.info( "Post-extraction: %d/%d claims kept (%d fixed, %d rejected)", stats["kept"], stats["total"], stats["fixed"], stats["rejected"], ) return kept, rejected, stats def validate_and_fix_entities( entities: list[dict], domain: str, existing_claims: set[str], ) -> tuple[list[dict], list[dict], dict]: """Validate and fix extracted entities. Returns (kept, rejected, stats). Lighter validation than claims — entities are factual records, not arguable propositions. """ kept = [] rejected = [] all_issues = [] for ent in entities: filename = ent.get("filename", "") content = ent.get("content", "") action = ent.get("action", "create") if not filename: rejected.append(ent) all_issues.append("missing_filename") continue issues = [] if action == "create" and content: fm, _body = parse_frontmatter(content) if fm is None: issues.append("no_frontmatter") else: if fm.get("type") != "entity": issues.append("wrong_type") if "entity_type" not in fm: issues.append("missing_entity_type") if "domain" not in fm: issues.append("missing_domain") # decision_market specific checks if fm.get("entity_type") == "decision_market": for field in ("parent_entity", "platform", "category", "status"): if field not in fm: issues.append(f"dm_missing:{field}") # Fix trailing newline if content and not content.endswith("\n"): ent["content"] = content + "\n" elif action == "update": timeline = ent.get("timeline_entry", "") if not timeline: issues.append("update_no_timeline") if issues: rejected.append({**ent, "issues": issues}) all_issues.extend([f"{filename}:{i}" for i in issues]) else: kept.append(ent) stats = { "total": len(entities), "kept": len(kept), "rejected": len(rejected), "issues": all_issues, } return kept, rejected, stats def load_existing_claims_from_repo(repo_root: str) -> set[str]: """Build set of known claim/entity stems from the repo.""" claims: set[str] = set() base = Path(repo_root) for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas", "entities"]: full = base / subdir if not full.is_dir(): continue for f in full.rglob("*.md"): claims.add(f.stem) return claims # ─── Helpers ──────────────────────────────────────────────────────────────── def _rebuild_content(fm: dict, body: str) -> str: """Rebuild markdown content from frontmatter dict and body.""" # Order frontmatter fields consistently field_order = ["type", "entity_type", "name", "domain", "description", "confidence", "source", "created", "status", "parent_entity", "platform", "proposer", "proposal_url", "proposal_date", "resolution_date", "category", "summary", "tracked_by", "secondary_domains", "challenged_by"] lines = ["---"] written = set() for field in field_order: if field in fm and fm[field] is not None: lines.append(_yaml_line(field, fm[field])) written.add(field) # Write remaining fields not in the order list for key, val in fm.items(): if key not in written and val is not None: lines.append(_yaml_line(key, val)) lines.append("---") lines.append("") lines.append(body) content = "\n".join(lines) if not content.endswith("\n"): content += "\n" return content def _yaml_line(key: str, val) -> str: """Format a single YAML key-value line.""" if isinstance(val, dict): # Nested YAML block (e.g. attribution with sub-keys) lines = [f"{key}:"] for sub_key, sub_val in val.items(): if isinstance(sub_val, list) and sub_val: lines.append(f" {sub_key}:") for item in sub_val: if isinstance(item, dict): first = True for ik, iv in item.items(): prefix = " - " if first else " " lines.append(f'{prefix}{ik}: "{iv}"') first = False else: lines.append(f' - "{item}"') else: lines.append(f" {sub_key}: []") return "\n".join(lines) if isinstance(val, list): return f"{key}: {json.dumps(val)}" if isinstance(val, bool): return f"{key}: {'true' if val else 'false'}" if isinstance(val, (int, float)): return f"{key}: {val}" if isinstance(val, date): return f"{key}: {val.isoformat()}" # String — quote if it contains special chars s = str(val) if any(c in s for c in ":#{}[]|>&*!%@`"): return f'{key}: "{s}"' return f"{key}: {s}"