teleo-infrastructure/openrouter-extract-v2.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

628 lines
24 KiB
Python

#!/usr/bin/env python3
"""Extract claims from a source file — v2.
Uses lean prompt (judgment only) + deterministic post-extraction validation ($0).
Replaces the 1331-line openrouter-extract.py.
Changes from v1:
- Prompt: ~100 lines (was ~400). Mechanical rules removed — code handles them.
- Pass 2: Replaced Haiku LLM review with Python validator. $0 instead of ~$0.01/source.
- Entity enrichment: Entities enqueued to JSON queue, applied to main by batch processor.
Extraction branches create NEW claim files only — no entity modifications on branches.
Eliminates merge conflicts + 83% near_duplicate false positive rate.
- Fix mode: Removed. Rejected claims re-extract with feedback baked into prompt.
Usage:
python3 openrouter-extract-v2.py <source-file> [--model MODEL] [--dry-run]
"""
import argparse
import csv
import glob
import json
import os
import re
import sys
from datetime import date
from pathlib import Path
import requests
# ─── Add lib/ to path for imports ──────────────────────────────────────────
# Add pipeline lib/ to path. Script lives at /opt/teleo-eval/ but lib/ is at /opt/teleo-eval/pipeline/lib/
sys.path.insert(0, str(Path(__file__).parent / "pipeline"))
sys.path.insert(0, str(Path(__file__).parent))
from lib.extraction_prompt import build_extraction_prompt
from lib.post_extract import (
load_existing_claims_from_repo,
validate_and_fix_claims,
validate_and_fix_entities,
)
# ─── Source registration (Argus: pipeline funnel tracking) ─────────────────
def _source_db_conn():
"""Get connection to pipeline.db for source registration."""
try:
from lib import db
return db.get_connection()
except Exception:
return None
def _register_source(conn, path, status, domain=None, model=None, claims_count=0, error=None):
"""Register or update a source in pipeline.db for funnel tracking."""
if conn is None:
return
try:
conn.execute(
"""INSERT INTO sources (path, status, priority, extraction_model, claims_count, created_at, updated_at)
VALUES (?, ?, 'medium', ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(path) DO UPDATE SET
status = excluded.status,
extraction_model = COALESCE(excluded.extraction_model, extraction_model),
claims_count = excluded.claims_count,
last_error = ?,
updated_at = datetime('now')""",
(path, status, model, claims_count, error),
)
except Exception as e:
print(f" WARN: Source registration failed: {e}", file=sys.stderr)
# ─── Constants ──────────────────────────────────────────────────────────────
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
DEFAULT_MODEL = "anthropic/claude-sonnet-4.5"
USAGE_CSV = "/opt/teleo-eval/logs/openrouter-usage.csv"
DOMAIN_AGENTS = {
"internet-finance": "rio",
"entertainment": "clay",
"ai-alignment": "theseus",
"health": "vida",
"space-development": "astra",
"grand-strategy": "leo",
"mechanisms": "leo",
"living-capital": "rio",
"living-agents": "theseus",
"teleohumanity": "leo",
"critical-systems": "theseus",
"collective-intelligence": "theseus",
"teleological-economics": "rio",
"cultural-dynamics": "clay",
"decision-markets": "rio",
}
# ─── Helpers ────────────────────────────────────────────────────────────────
def read_file(path):
try:
with open(path) as f:
return f.read()
except FileNotFoundError:
return ""
def get_domain_from_source(source_content):
match = re.search(r"^domain:\s*(.+)$", source_content, re.MULTILINE)
return match.group(1).strip() if match else None
def get_kb_index(domain):
"""Build fresh KB index for duplicate checking and wiki-link targets.
Regenerated before each extraction (not cached from cron) so the index
reflects the current KB state. Stale indexes cause duplicate claims and
broken wiki links. (Leo's fix #1)
"""
lines = []
# Primary domain claims
domain_dir = f"domains/{domain}"
for f in sorted(glob.glob(os.path.join(domain_dir, "*.md"))):
basename = os.path.basename(f)
if not basename.startswith("_"):
title = basename.replace(".md", "").replace("-", " ")
lines.append(f"- {basename}: {title}")
# Cross-domain claims from core/ and foundations/ (for wiki-link targets)
for subdir in ["core", "foundations"]:
for f in sorted(glob.glob(os.path.join(subdir, "**", "*.md"), recursive=True)):
basename = os.path.basename(f)
if not basename.startswith("_"):
title = basename.replace(".md", "").replace("-", " ")
lines.append(f"- {basename}: {title}")
# Entities in this domain (for enrichment detection)
entity_dir = f"entities/{domain}"
for f in sorted(glob.glob(os.path.join(entity_dir, "*.md"))):
basename = os.path.basename(f)
if not basename.startswith("_"):
lines.append(f"- [entity] {basename}: {basename.replace('.md', '').replace('-', ' ')}")
if not lines:
return "No existing claims in this domain."
# Cap at 200 entries to keep prompt size reasonable
if len(lines) > 200:
lines = lines[:200]
lines.append(f"... and {len(lines) - 200} more (truncated)")
return "\n".join(lines)
def call_openrouter(prompt, model, api_key):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://livingip.xyz",
"X-Title": "Teleo Codex Extraction",
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 16000,
}
resp = requests.post(OPENROUTER_URL, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
data = resp.json()
content = data["choices"][0]["message"]["content"]
usage = data.get("usage", {})
return content, usage
def parse_response(content):
"""Parse JSON response, handling markdown fencing and truncation."""
content = content.strip()
if content.startswith("```"):
content = re.sub(r"^```(?:json)?\s*\n?", "", content)
content = re.sub(r"\n?```\s*$", "", content)
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# Fix common JSON issues
fixed = re.sub(r",\s*([}\]])", r"\1", content)
open_braces = fixed.count("{") - fixed.count("}")
open_brackets = fixed.count("[") - fixed.count("]")
fixed += "]" * max(0, open_brackets) + "}" * max(0, open_braces)
try:
parsed = json.loads(fixed)
print(" WARN: Fixed malformed JSON (trailing commas or truncation)")
return parsed
except json.JSONDecodeError:
pass
# Last resort: try to salvage claims with regex
result = {"claims": [], "enrichments": [], "entities": [], "facts": []}
claim_pattern = r'\{"filename":\s*"([^"]+)"[^}]*"content":\s*"((?:[^"\\]|\\.)*)"\s*\}'
for match in re.finditer(claim_pattern, content, re.DOTALL):
filename = match.group(1)
claim_content = match.group(2).replace("\\n", "\n").replace('\\"', '"')
domain_match = re.search(r'"domain":\s*"([^"]+)"', match.group(0))
result["claims"].append({
"filename": filename,
"domain": domain_match.group(1) if domain_match else "",
"content": claim_content,
})
if result["claims"]:
print(f" WARN: Salvaged {len(result['claims'])} claims from malformed JSON")
return result
def reconstruct_claim_content(claim, domain, agent):
"""Build markdown content from structured claim fields (lean prompt output format)."""
title = claim.get("title", claim.get("filename", "").replace(".md", "").replace("-", " "))
desc = claim.get("description", "")
conf = claim.get("confidence", "experimental")
source = claim.get("source", f"extraction by {agent}")
body_text = claim.get("body", desc)
related = claim.get("related_claims", [])
sourcer = claim.get("sourcer", "")
# Build attribution block (v1: extractor always known, sourcer best-effort)
attr_lines = [
"attribution:",
" extractor:",
f' - handle: "{agent}"',
]
if sourcer:
sourcer_handle = sourcer.strip().lower().lstrip("@").replace(" ", "-")
attr_lines.extend([
" sourcer:",
f' - handle: "{sourcer_handle}"',
f' context: "{source}"',
])
lines = [
"---",
"type: claim",
f"domain: {domain}",
f'description: "{desc}"',
f"confidence: {conf}",
f'source: "{source}"',
f"created: {date.today().isoformat()}",
*attr_lines,
"---",
"",
f"# {title}",
"",
body_text,
"",
"---",
"",
"Relevant Notes:",
]
for r in related[:5]:
lines.append(f"- [[{r}]]")
lines.extend(["", "Topics:", "- [[_map]]", ""])
return "\n".join(lines)
def update_source_file(source_path, source_content, update_info):
"""Update source file frontmatter with processing info."""
updated = re.sub(
r"^status:\s*.+$",
f"status: {update_info['status']}",
source_content,
count=1,
flags=re.MULTILINE,
)
parts = updated.split("---", 2)
if len(parts) >= 3:
fm = parts[1]
fm += f"processed_by: {update_info['processed_by']}\n"
fm += f"processed_date: {update_info['processed_date']}\n"
if update_info.get("claims_extracted"):
fm += f"claims_extracted: {json.dumps(update_info['claims_extracted'])}\n"
if update_info.get("enrichments_applied"):
fm += f"enrichments_applied: {json.dumps(update_info['enrichments_applied'])}\n"
if update_info.get("entities_updated"):
fm += f"entities_updated: {json.dumps(update_info['entities_updated'])}\n"
if update_info.get("model"):
fm += f'extraction_model: "{update_info["model"]}"\n'
if update_info.get("notes"):
fm += f'extraction_notes: "{update_info["notes"]}"\n'
updated = f"---{fm}---{parts[2]}"
key_facts = update_info.get("key_facts", [])
if key_facts:
updated += "\n\n## Key Facts\n"
for fact in key_facts:
updated += f"- {fact}\n"
with open(source_path, "w") as f:
f.write(updated)
def log_usage(agent, model, source_file, usage):
write_header = not os.path.exists(USAGE_CSV)
with open(USAGE_CSV, "a", newline="") as f:
writer = csv.writer(f)
if write_header:
writer.writerow(["date", "agent", "model", "source_file", "input_tokens", "output_tokens"])
writer.writerow([
date.today().isoformat(), agent, model,
os.path.basename(source_file),
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
])
# ─── Main ───────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Extract claims via OpenRouter (v2)")
parser.add_argument("source_file", help="Path to source file in inbox/archive/")
parser.add_argument("--model", default=DEFAULT_MODEL, help=f"Model (default: {DEFAULT_MODEL})")
parser.add_argument("--domain", default=None, help="Override domain")
parser.add_argument("--dry-run", action="store_true", help="Print prompt, don't call API")
parser.add_argument("--no-review", action="store_true", help="No-op (v1 compat). Pass 2 is always Python validator in v2.")
parser.add_argument("--key-file", default="/opt/teleo-eval/secrets/openrouter-key")
args = parser.parse_args()
# Read API key
api_key = read_file(args.key_file).strip()
if not api_key and not args.dry_run:
print("ERROR: No API key found", file=sys.stderr)
sys.exit(1)
# Read source
source_content = read_file(args.source_file)
if not source_content:
print(f"ERROR: Cannot read {args.source_file}", file=sys.stderr)
sys.exit(1)
# Get domain and agent
domain = args.domain or get_domain_from_source(source_content)
if not domain:
print(f"ERROR: No domain field in {args.source_file}", file=sys.stderr)
sys.exit(1)
agent = DOMAIN_AGENTS.get(domain, "leo")
# Get KB index for dedup
kb_index = get_kb_index(domain)
# Load existing claims for post-extraction validation
existing_claims = load_existing_claims_from_repo(".")
# ── Build lean prompt ──
# Extract rationale and intake_tier from source frontmatter (directed contribution)
rationale = None
intake_tier = None
proposed_by = None
rationale_match = re.search(r"^rationale:\s*[\"']?(.+?)[\"']?\s*$", source_content, re.MULTILINE)
if rationale_match:
rationale = rationale_match.group(1).strip()
tier_match = re.search(r"^intake_tier:\s*(\S+)", source_content, re.MULTILINE)
if tier_match:
intake_tier = tier_match.group(1).strip()
proposed_match = re.search(r"^proposed_by:\s*[\"']?(.+?)[\"']?\s*$", source_content, re.MULTILINE)
if proposed_match:
proposed_by = proposed_match.group(1).strip()
# Set intake tier based on rationale presence
if rationale and not intake_tier:
intake_tier = "directed"
elif not intake_tier:
intake_tier = "undirected"
if rationale:
print(f" Directed contribution from {proposed_by or '?'}: {rationale[:80]}...")
prompt = build_extraction_prompt(
args.source_file, source_content, domain, agent, kb_index,
rationale=rationale, intake_tier=intake_tier, proposed_by=proposed_by,
)
if args.dry_run:
print(f"=== DRY RUN ===")
print(f"Source: {args.source_file}")
print(f"Domain: {domain}, Agent: {agent}")
print(f"Model: {args.model}")
print(f"Existing claims: {len(existing_claims)}")
print(f"Prompt length: {len(prompt)} chars")
print(f"\n=== PROMPT ===\n{prompt[:1000]}...")
return
print(f"Extracting from {args.source_file} via {args.model}...")
print(f"Domain: {domain}, Agent: {agent}, Existing claims: {len(existing_claims)}")
# Register source as extracting (Argus: pipeline funnel)
_src_conn = _source_db_conn()
_register_source(_src_conn, args.source_file, "extracting", domain, args.model)
# ── Pass 1: LLM extraction ──
try:
content, usage = call_openrouter(prompt, args.model, api_key)
except requests.exceptions.RequestException as e:
_register_source(_src_conn, args.source_file, "error", domain, args.model, error=str(e))
print(f"ERROR: API call failed: {e}", file=sys.stderr)
sys.exit(1)
p1_in = usage.get("prompt_tokens", "?")
p1_out = usage.get("completion_tokens", "?")
print(f"LLM tokens: {p1_in} in, {p1_out} out")
result = parse_response(content)
raw_claims = result.get("claims", [])
enrichments = result.get("enrichments", [])
entities = result.get("entities", [])
facts = result.get("facts", [])
decisions = result.get("decisions", [])
print(f"LLM output: {len(raw_claims)} claims, {len(enrichments)} enrichments, {len(entities)} entities, {len(decisions)} decisions, {len(facts)} facts")
# ── Pass 2: Deterministic validation ($0) ──
# Reconstruct content for claims that used the lean format (title/body fields instead of content)
for claim in raw_claims:
if "content" not in claim or not claim["content"]:
claim["content"] = reconstruct_claim_content(claim, domain, agent)
kept_claims, rejected_claims, claim_stats = validate_and_fix_claims(
raw_claims, domain, agent, existing_claims,
)
kept_entities, rejected_entities, entity_stats = validate_and_fix_entities(
entities, domain, existing_claims,
)
print(f"Validation: {claim_stats['kept']}/{claim_stats['total']} claims kept "
f"({claim_stats['fixed']} fixed, {claim_stats['rejected']} rejected)")
if entity_stats["total"]:
print(f"Entities: {entity_stats['kept']}/{entity_stats['total']} kept")
if claim_stats["rejections"]:
print(f"Rejections: {claim_stats['rejections']}")
# ── Write claim files ──
domain_dir = f"domains/{domain}"
os.makedirs(domain_dir, exist_ok=True)
written = []
for claim in kept_claims:
filename = claim["filename"]
claim_path = os.path.join(domain_dir, filename)
if os.path.exists(claim_path):
print(f" WARN: {claim_path} exists, skipping")
continue
with open(claim_path, "w") as f:
f.write(claim["content"])
written.append(filename)
print(f" Wrote: {claim_path}")
# ── Apply enrichments ──
enriched = []
for enr in enrichments:
target = enr.get("target_file", "")
evidence = enr.get("evidence", "")
enr_type = enr.get("type", "confirm")
source_ref = enr.get("source_ref", os.path.basename(args.source_file))
if not target or not evidence:
continue
target_path = os.path.join(domain_dir, target)
if not os.path.exists(target_path):
print(f" WARN: Enrichment target {target_path} not found, skipping")
continue
existing_content = read_file(target_path)
source_slug = os.path.basename(args.source_file).replace(".md", "")
enrichment_block = (
f"\n\n### Additional Evidence ({enr_type})\n"
f"*Source: [[{source_slug}]] | Added: {date.today().isoformat()}*\n\n"
f"{evidence}\n"
)
# Insert enrichment before "Relevant Notes:" or "Topics:" section.
# Do NOT split on "---" — it matches frontmatter delimiters and corrupts YAML
# when files lack a body separator. (Leo: root cause of PRs #1504, #1509)
# Two tiers only (Ganymede: tier 2 delimiter counting dropped — horizontal rule edge case)
notes_match = re.search(r'\n(?:#{0,3}\s*)?(?:[Rr]elevant [Nn]otes|[Tt]opics)\s*:?', existing_content)
if notes_match:
insert_pos = notes_match.start()
updated = existing_content[:insert_pos] + enrichment_block + existing_content[insert_pos:]
else:
# No anchor found — append to end (always safe)
updated = existing_content.rstrip() + enrichment_block + "\n"
with open(target_path, "w") as f:
f.write(updated)
enriched.append(target)
print(f" Enriched: {target_path} ({enr_type})")
# ── Enqueue entities (NOT written to branch — applied to main by batch) ──
# Entity enrichments on branches cause merge conflicts because 20+ PRs
# modify the same entity file (futardio.md, metadao.md). Enqueuing to a
# JSON queue eliminates this: branches only create NEW claim files, entity
# updates are applied to main by entity_batch.py. (Leo's #1 fix)
entities_enqueued = []
for ent in kept_entities:
try:
from lib.entity_queue import enqueue
entry_id = enqueue(ent, args.source_file, agent)
entities_enqueued.append(ent["filename"])
print(f" Entity enqueued: {ent['filename']} ({ent.get('action', '?')}) → queue:{entry_id}")
except Exception as e:
# No fallback — fail loudly if queue unavailable. Direct writes to branches
# defeat the entire queue architecture. (Ganymede review)
print(f" ERROR: Failed to enqueue entity {ent.get('filename', '?')}: {e}", file=sys.stderr)
# ── Write decision files + enqueue parent timeline entries ──
decisions = result.get("decisions", [])
decisions_written = []
for dec in decisions:
filename = dec.get("filename", "")
dec_domain = dec.get("domain", domain)
content = dec.get("content", "")
parent = dec.get("parent_entity", "")
parent_timeline = dec.get("parent_timeline_entry", "")
if not filename:
continue
# Write decision file to branch (goes through PR eval like claims)
if content:
dec_dir = os.path.join("decisions", dec_domain)
os.makedirs(dec_dir, exist_ok=True)
dec_path = os.path.join(dec_dir, filename)
if not os.path.exists(dec_path):
with open(dec_path, "w") as f:
f.write(content)
decisions_written.append(filename)
print(f" Decision written: {dec_path}")
# Enqueue parent entity timeline entry (applied to main by entity_batch)
if parent and parent_timeline:
try:
from lib.entity_queue import enqueue
entry_id = enqueue({
"filename": parent,
"domain": dec_domain,
"action": "update",
"timeline_entry": parent_timeline,
}, args.source_file, agent)
print(f" Decision → parent timeline: {parent} (queue:{entry_id})")
except Exception as e:
print(f" WARN: Failed to enqueue parent timeline for {parent}: {e}", file=sys.stderr)
if decisions_written:
print(f" Decisions: {len(decisions_written)} written")
# ── Update source file ──
if written or decisions_written:
status = "processed"
elif enriched or entities_enqueued:
status = "enrichment"
else:
status = "null-result"
source_update = {
"status": status,
"processed_by": agent,
"processed_date": date.today().isoformat(),
"claims_extracted": written,
"model": args.model,
}
if enriched:
source_update["enrichments_applied"] = enriched
if entities_enqueued:
source_update["entities_enqueued"] = entities_enqueued
if facts:
source_update["key_facts"] = facts
if not written and not enriched and not entities_enqueued:
source_update["notes"] = (
f"LLM returned {len(raw_claims)} claims, "
f"{claim_stats['rejected']} rejected by validator"
)
update_source_file(args.source_file, source_content, source_update)
print(f" Updated: {args.source_file} → status: {status}")
# Register final status (Argus: pipeline funnel)
db_status = "extracted" if status == "processed" else ("null_result" if status == "null-result" else status)
_register_source(_src_conn, args.source_file, db_status, domain, args.model, len(written))
# ── Save debug info for rejected claims ──
if rejected_claims:
debug_dir = os.path.join(os.path.dirname(args.source_file) or ".", ".extraction-debug")
os.makedirs(debug_dir, exist_ok=True)
debug_path = os.path.join(debug_dir, os.path.basename(args.source_file).replace(".md", ".json"))
with open(debug_path, "w") as f:
json.dump({
"rejected_claims": [
{"filename": c.get("filename"), "issues": c.get("issues", [])}
for c in rejected_claims
],
"validation_stats": claim_stats,
"model": args.model,
"date": date.today().isoformat(),
}, f, indent=2)
print(f" Debug: {debug_path}")
# ── Log usage ──
log_usage(agent, args.model, args.source_file, usage)
# ── Summary ──
print(f"\n{'='*60}")
print(f" EXTRACTION COMPLETE (v2)")
print(f" Source: {args.source_file}")
print(f" Agent: {agent}")
print(f" Model: {args.model} ({p1_in} in / {p1_out} out)")
print(f" Pass 2: Python validator ($0)")
print(f" Claims: {len(written)} written, {claim_stats['rejected']} rejected, {claim_stats['fixed']} auto-fixed")
print(f" Enrichments: {len(enriched)} applied")
if entities_enqueued:
print(f" Entities: {len(entities_enqueued)} enqueued (applied by batch on main)")
if facts:
print(f" Facts: {len(facts)} stored in source notes")
print(f"{'='*60}")
if __name__ == "__main__":
main()