teleo-infrastructure/lib/post_extract.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

519 lines
19 KiB
Python

"""Post-extraction validator — deterministic fixes and quality gate.
Runs AFTER LLM extraction, BEFORE git commit. Pure Python, $0 cost.
Catches the mechanical issues that account for 73% of eval rejections:
- Frontmatter schema violations (missing/invalid fields)
- Broken wiki links (strips brackets, keeps text)
- Date errors (wrong format, source date instead of today)
- Filename convention violations
- Title precision (too short, not a proposition)
- Duplicate detection against existing KB
Design principles (Leo):
- Mechanical rules belong in code, not prompts
- Fix what's fixable, reject what's not
- Never silently drop content — log everything
Epimetheus owns this module. Leo reviews changes.
"""
import json
import logging
import os
import re
from datetime import date, datetime
from difflib import SequenceMatcher
from pathlib import Path
logger = logging.getLogger("pipeline.post_extract")
# ─── Constants ──────────────────────────────────────────────────────────────
VALID_DOMAINS = frozenset({
"internet-finance", "entertainment", "health", "ai-alignment",
"space-development", "grand-strategy", "mechanisms", "living-capital",
"living-agents", "teleohumanity", "critical-systems",
"collective-intelligence", "teleological-economics", "cultural-dynamics",
})
VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"})
REQUIRED_CLAIM_FIELDS = ("type", "domain", "description", "confidence", "source", "created")
REQUIRED_ENTITY_FIELDS = ("type", "domain", "description")
WIKI_LINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
# Minimum title word count for claims (Leo: titles must name specific mechanism)
MIN_TITLE_WORDS = 8
DEDUP_THRESHOLD = 0.85
# ─── YAML parsing ──────────────────────────────────────────────────────────
def parse_frontmatter(text: str) -> tuple[dict | None, str]:
"""Extract YAML frontmatter from markdown. Returns (frontmatter_dict, body)."""
if not text.startswith("---"):
return None, text
end = text.find("---", 3)
if end == -1:
return None, text
raw = text[3:end]
body = text[end + 3:].strip()
try:
import yaml
fm = yaml.safe_load(raw)
if not isinstance(fm, dict):
return None, body
return fm, body
except ImportError:
pass
except Exception:
return None, body
# Fallback: simple key-value parser
fm = {}
for line in raw.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
if ":" not in line:
continue
key, _, val = line.partition(":")
key = key.strip()
val = val.strip().strip('"').strip("'")
if val.lower() == "null" or val == "":
val = None
elif val.startswith("["):
val = [v.strip().strip('"').strip("'") for v in val.strip("[]").split(",") if v.strip()]
fm[key] = val
return fm if fm else None, body
# ─── Fixers (modify content, return fixed version) ─────────────────────────
def fix_frontmatter(content: str, domain: str, agent: str) -> tuple[str, list[str]]:
"""Fix common frontmatter issues. Returns (fixed_content, list_of_fixes_applied)."""
fixes = []
fm, body = parse_frontmatter(content)
if fm is None:
return content, ["unfixable:no_frontmatter"]
changed = False
ftype = fm.get("type", "claim")
# Fix 1: created = extraction date, always today. No parsing, no comparison.
# "created" means "when this was extracted," period. Source publication date
# belongs in a separate field if needed. (Ganymede review)
today_str = date.today().isoformat()
if ftype == "claim":
old_created = fm.get("created")
fm["created"] = today_str
if old_created != today_str:
fixes.append(f"set_created:{today_str}")
changed = True
# Fix 2: type field
if "type" not in fm:
fm["type"] = "claim"
fixes.append("added_type:claim")
changed = True
# Fix 3: domain field
if "domain" not in fm or fm["domain"] not in VALID_DOMAINS:
fm["domain"] = domain
fixes.append(f"fixed_domain:{fm.get('domain', 'missing')}->{domain}")
changed = True
# Fix 4: confidence field (claims only)
if ftype == "claim":
conf = fm.get("confidence")
if conf is None:
fm["confidence"] = "experimental"
fixes.append("added_confidence:experimental")
changed = True
elif conf not in VALID_CONFIDENCE:
fm["confidence"] = "experimental"
fixes.append(f"fixed_confidence:{conf}->experimental")
changed = True
# Fix 5: description field
if "description" not in fm or not fm["description"]:
# Try to derive from body's first sentence
first_sentence = body.split(".")[0].strip().lstrip("# ") if body else ""
if first_sentence and len(first_sentence) > 10:
fm["description"] = first_sentence[:200]
fixes.append("derived_description_from_body")
changed = True
# Fix 6: source field (claims only)
if ftype == "claim" and ("source" not in fm or not fm["source"]):
fm["source"] = f"extraction by {agent}"
fixes.append("added_default_source")
changed = True
if not changed:
return content, []
# Reconstruct frontmatter
return _rebuild_content(fm, body), fixes
def fix_wiki_links(content: str, existing_claims: set[str]) -> tuple[str, list[str]]:
"""Strip brackets from broken wiki links, keeping the text. Returns (fixed_content, fixes)."""
fixes = []
def replace_broken(match):
link = match.group(1).strip()
if link not in existing_claims:
fixes.append(f"stripped_wiki_link:{link[:60]}")
return link # Keep text, remove brackets
return match.group(0)
fixed = WIKI_LINK_RE.sub(replace_broken, content)
return fixed, fixes
def fix_trailing_newline(content: str) -> tuple[str, list[str]]:
"""Ensure file ends with exactly one newline."""
if not content.endswith("\n"):
return content + "\n", ["added_trailing_newline"]
return content, []
def fix_h1_title_match(content: str, filename: str) -> tuple[str, list[str]]:
"""Ensure the content has an H1 title. Does NOT replace existing H1s.
The H1 title in the content is authoritative — the filename is derived from it
and may be truncated or slightly different. We only add a missing H1, never
overwrite an existing one.
"""
expected_title = Path(filename).stem.replace("-", " ")
fm, body = parse_frontmatter(content)
if fm is None:
return content, []
# Find existing H1
h1_match = re.search(r"^# (.+)$", body, re.MULTILINE)
if h1_match:
# H1 exists — leave it alone. The content's H1 is authoritative.
return content, []
elif body and not body.startswith("#"):
# No H1 at all — add one derived from filename
body = f"# {expected_title}\n\n{body}"
return _rebuild_content(fm, body), ["added_h1_title"]
return content, []
# ─── Validators (check without modifying, return issues) ──────────────────
def validate_claim(filename: str, content: str, existing_claims: set[str]) -> list[str]:
"""Validate a claim file. Returns list of issues (empty = pass)."""
issues = []
fm, body = parse_frontmatter(content)
if fm is None:
return ["no_frontmatter"]
ftype = fm.get("type", "claim")
# Schema check
required = REQUIRED_CLAIM_FIELDS if ftype == "claim" else REQUIRED_ENTITY_FIELDS
for field in required:
if field not in fm or fm[field] is None:
issues.append(f"missing_field:{field}")
# Domain check
domain = fm.get("domain")
if domain and domain not in VALID_DOMAINS:
issues.append(f"invalid_domain:{domain}")
# Confidence check (claims only)
if ftype == "claim":
conf = fm.get("confidence")
if conf and conf not in VALID_CONFIDENCE:
issues.append(f"invalid_confidence:{conf}")
# Title checks (claims only, not entities)
# Use H1 from body if available (authoritative), fall back to filename
if ftype in ("claim", "framework"):
h1_match = re.search(r"^# (.+)$", body, re.MULTILINE)
title = h1_match.group(1).strip() if h1_match else Path(filename).stem.replace("-", " ")
words = title.split()
# Always enforce minimum 4 words — a 2-3 word title is never specific
# enough to disagree with. (Ganymede review)
if len(words) < 4:
issues.append("title_too_few_words")
elif len(words) < 8:
# For 4-7 word titles, also require a verb/connective
has_verb = bool(re.search(
r"\b(is|are|was|were|will|would|can|could|should|must|has|have|had|"
r"does|did|do|may|might|shall|"
r"because|therefore|however|although|despite|since|through|by|"
r"when|where|while|if|unless|"
r"rather than|instead of|not just|more than|"
r"\w+(?:s|ed|ing|es|tes|ses|zes|ves|cts|pts|nts|rns))\b",
title, re.IGNORECASE,
))
if not has_verb:
issues.append("title_not_proposition")
# Description quality
desc = fm.get("description", "")
if isinstance(desc, str) and len(desc.strip()) < 10:
issues.append("description_too_short")
# Attribution check: extractor must be identified. (Leo: block extractor, warn sourcer)
if ftype == "claim":
from .attribution import validate_attribution
issues.extend(validate_attribution(fm))
# OPSEC check: flag claims containing dollar amounts + internal entity references.
# Rio's rule: never extract LivingIP/Teleo deal terms to public codex. (Ganymede review)
if ftype == "claim":
combined_text = (title + " " + desc + " " + body).lower()
has_dollar = bool(re.search(r"\$[\d,.]+[mkb]?\b", combined_text, re.IGNORECASE))
has_internal = bool(re.search(
r"\b(livingip|teleo|internal|deal terms?|valuation|equity percent)",
combined_text, re.IGNORECASE,
))
if has_dollar and has_internal:
issues.append("opsec_internal_deal_terms")
# Body substance check (claims only)
if ftype == "claim" and body:
# Strip the H1 title line and check remaining content
body_no_h1 = re.sub(r"^# .+\n*", "", body).strip()
# Remove "Relevant Notes" and "Topics" sections
body_content = re.split(r"\n---\n", body_no_h1)[0].strip()
if len(body_content) < 50:
issues.append("body_too_thin")
# Near-duplicate check (claims only, not entities)
if ftype != "entity":
title_lower = Path(filename).stem.replace("-", " ").lower()
title_words = set(title_lower.split()[:6])
for existing in existing_claims:
# Normalize existing stem: hyphens → spaces for consistent comparison
existing_normalized = existing.replace("-", " ").lower()
if len(title_words & set(existing_normalized.split()[:6])) < 2:
continue
ratio = SequenceMatcher(None, title_lower, existing_normalized).ratio()
if ratio >= DEDUP_THRESHOLD:
issues.append(f"near_duplicate:{existing[:80]}")
break # One is enough to flag
return issues
# ─── Main entry point ──────────────────────────────────────────────────────
def validate_and_fix_claims(
claims: list[dict],
domain: str,
agent: str,
existing_claims: set[str],
repo_root: str = ".",
) -> tuple[list[dict], list[dict], dict]:
"""Validate and fix extracted claims. Returns (kept_claims, rejected_claims, stats).
Each claim dict has: filename, domain, content
Returned claims have content fixed where possible.
Stats: {total, kept, fixed, rejected, fixes_applied: [...], rejections: [...]}
"""
kept = []
rejected = []
all_fixes = []
all_rejections = []
# Add intra-batch stems to existing claims (avoid false positive duplicates within same extraction)
batch_stems = {Path(c["filename"]).stem for c in claims}
existing_plus_batch = existing_claims | batch_stems
for claim in claims:
filename = claim.get("filename", "")
content = claim.get("content", "")
claim_domain = claim.get("domain", domain)
if not filename or not content:
rejected.append(claim)
all_rejections.append(f"{filename or '?'}:missing_filename_or_content")
continue
# Phase 1: Apply fixers
content, fixes1 = fix_frontmatter(content, claim_domain, agent)
content, fixes2 = fix_wiki_links(content, existing_plus_batch)
content, fixes3 = fix_trailing_newline(content)
content, fixes4 = fix_h1_title_match(content, filename)
fixes = fixes1 + fixes2 + fixes3 + fixes4
if fixes:
all_fixes.extend([f"{filename}:{f}" for f in fixes])
# Phase 2: Validate (after fixes)
issues = validate_claim(filename, content, existing_claims)
# Separate hard failures from warnings
hard_failures = [i for i in issues if not i.startswith("near_duplicate")]
warnings = [i for i in issues if i.startswith("near_duplicate")]
if hard_failures:
rejected.append({**claim, "content": content, "issues": hard_failures})
all_rejections.extend([f"{filename}:{i}" for i in hard_failures])
else:
if warnings:
all_fixes.extend([f"{filename}:WARN:{w}" for w in warnings])
kept.append({**claim, "content": content})
stats = {
"total": len(claims),
"kept": len(kept),
"fixed": len([f for f in all_fixes if ":WARN:" not in f]),
"rejected": len(rejected),
"fixes_applied": all_fixes,
"rejections": all_rejections,
}
logger.info(
"Post-extraction: %d/%d claims kept (%d fixed, %d rejected)",
stats["kept"], stats["total"], stats["fixed"], stats["rejected"],
)
return kept, rejected, stats
def validate_and_fix_entities(
entities: list[dict],
domain: str,
existing_claims: set[str],
) -> tuple[list[dict], list[dict], dict]:
"""Validate and fix extracted entities. Returns (kept, rejected, stats).
Lighter validation than claims — entities are factual records, not arguable propositions.
"""
kept = []
rejected = []
all_issues = []
for ent in entities:
filename = ent.get("filename", "")
content = ent.get("content", "")
action = ent.get("action", "create")
if not filename:
rejected.append(ent)
all_issues.append("missing_filename")
continue
issues = []
if action == "create" and content:
fm, body = parse_frontmatter(content)
if fm is None:
issues.append("no_frontmatter")
else:
if fm.get("type") != "entity":
issues.append("wrong_type")
if "entity_type" not in fm:
issues.append("missing_entity_type")
if "domain" not in fm:
issues.append("missing_domain")
# decision_market specific checks
if fm.get("entity_type") == "decision_market":
for field in ("parent_entity", "platform", "category", "status"):
if field not in fm:
issues.append(f"dm_missing:{field}")
# Fix trailing newline
if content and not content.endswith("\n"):
ent["content"] = content + "\n"
elif action == "update":
timeline = ent.get("timeline_entry", "")
if not timeline:
issues.append("update_no_timeline")
if issues:
rejected.append({**ent, "issues": issues})
all_issues.extend([f"{filename}:{i}" for i in issues])
else:
kept.append(ent)
stats = {
"total": len(entities),
"kept": len(kept),
"rejected": len(rejected),
"issues": all_issues,
}
return kept, rejected, stats
def load_existing_claims_from_repo(repo_root: str) -> set[str]:
"""Build set of known claim/entity stems from the repo."""
claims: set[str] = set()
base = Path(repo_root)
for subdir in ["domains", "core", "foundations", "maps", "agents", "schemas", "entities"]:
full = base / subdir
if not full.is_dir():
continue
for f in full.rglob("*.md"):
claims.add(f.stem)
return claims
# ─── Helpers ────────────────────────────────────────────────────────────────
def _rebuild_content(fm: dict, body: str) -> str:
"""Rebuild markdown content from frontmatter dict and body."""
# Order frontmatter fields consistently
field_order = ["type", "entity_type", "name", "domain", "description",
"confidence", "source", "created", "status", "parent_entity",
"platform", "proposer", "proposal_url", "proposal_date",
"resolution_date", "category", "summary", "tracked_by",
"secondary_domains", "challenged_by"]
lines = ["---"]
written = set()
for field in field_order:
if field in fm and fm[field] is not None:
lines.append(_yaml_line(field, fm[field]))
written.add(field)
# Write remaining fields not in the order list
for key, val in fm.items():
if key not in written and val is not None:
lines.append(_yaml_line(key, val))
lines.append("---")
lines.append("")
lines.append(body)
content = "\n".join(lines)
if not content.endswith("\n"):
content += "\n"
return content
def _yaml_line(key: str, val) -> str:
"""Format a single YAML key-value line."""
if isinstance(val, list):
return f"{key}: {json.dumps(val)}"
if isinstance(val, bool):
return f"{key}: {'true' if val else 'false'}"
if isinstance(val, (int, float)):
return f"{key}: {val}"
if isinstance(val, date):
return f"{key}: {val.isoformat()}"
# String — quote if it contains special chars
s = str(val)
if any(c in s for c in ":#{}[]|>&*!%@`"):
return f'{key}: "{s}"'
return f"{key}: {s}"