teleo-infrastructure/scripts/openrouter-extract-v2.py
m3taversal d2aec7fee3
Some checks are pending
CI / lint-and-test (push) Waiting to run
feat: reorganize repo with clear directory boundaries and agent ownership
Move scattered root-level files into categorized directories:
- deploy/ — deployment + mirror scripts (Ship)
- scripts/ — one-off backfills + migrations (Ship)
- research/ — nightly research + prompts (Ship)
- docs/ — all operational documentation (shared)

Delete 3 dead cron scripts replaced by pipeline daemon:
- batch-extract-50.sh, evaluate-trigger.sh, extract-cron.sh

Add CODEOWNERS mapping every path to its owning agent.
Add README with directory structure, ownership table, and VPS layout.
Update deploy.sh paths to match new structure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 18:20:13 +01:00

722 lines
28 KiB
Python

#!/usr/bin/env python3
"""Extract claims from a source file — v2.
Uses lean prompt (judgment only) + deterministic post-extraction validation ($0).
Replaces the 1331-line openrouter-extract.py.
Changes from v1:
- Prompt: ~100 lines (was ~400). Mechanical rules removed — code handles them.
- Pass 2: Replaced Haiku LLM review with Python validator. $0 instead of ~$0.01/source.
- Entity enrichment: Entities enqueued to JSON queue, applied to main by batch processor.
Extraction branches create NEW claim files only — no entity modifications on branches.
Eliminates merge conflicts + 83% near_duplicate false positive rate.
- Fix mode: Removed. Rejected claims re-extract with feedback baked into prompt.
Usage:
python3 openrouter-extract-v2.py <source-file> [--model MODEL] [--dry-run]
"""
import argparse
import csv
import glob
import json
import os
import re
import sys
from datetime import date
from pathlib import Path
import requests
# ─── Add lib/ to path for imports ──────────────────────────────────────────
# Add pipeline lib/ to path. Script lives at /opt/teleo-eval/ but lib/ is at /opt/teleo-eval/pipeline/lib/
sys.path.insert(0, str(Path(__file__).parent / "pipeline"))
sys.path.insert(0, str(Path(__file__).parent))
from lib.extraction_prompt import build_extraction_prompt
from lib.post_extract import (
load_existing_claims_from_repo,
validate_and_fix_claims,
validate_and_fix_entities,
)
from lib.connect import connect_new_claims
# --- Prior art lookup (extract-time connection) ---
def _find_prior_art(source_title: str, source_body: str, limit: int = 10) -> list[dict]:
"""Search Qdrant for existing claims similar to this source.
Uses source title + first 500 chars of body as the search query.
Returns list of {claim_title, claim_path, description, score} dicts.
Non-fatal — returns empty list on any failure.
"""
try:
from lib.search import embed_query, search_qdrant
except ImportError:
return []
query = f"{source_title} {source_body[:500]}".strip()
if len(query) < 20:
return []
vector = embed_query(query)
if vector is None:
return []
hits = search_qdrant(vector, limit=limit, score_threshold=0.55)
results = []
for hit in hits:
payload = hit.get("payload", {})
results.append({
"claim_title": payload.get("claim_title", ""),
"claim_path": payload.get("claim_path", ""),
"description": payload.get("description", ""),
"score": hit.get("score", 0),
})
return results
# ─── Source registration (Argus: pipeline funnel tracking) ─────────────────
def _source_db_conn():
"""Get connection to pipeline.db for source registration."""
try:
from lib import db
return db.get_connection()
except Exception:
return None
def _register_source(conn, path, status, domain=None, model=None, claims_count=0, error=None):
"""Register or update a source in pipeline.db for funnel tracking."""
if conn is None:
return
try:
conn.execute(
"""INSERT INTO sources (path, status, priority, extraction_model, claims_count, created_at, updated_at)
VALUES (?, ?, 'medium', ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(path) DO UPDATE SET
status = excluded.status,
extraction_model = COALESCE(excluded.extraction_model, extraction_model),
claims_count = excluded.claims_count,
last_error = ?,
updated_at = datetime('now')""",
(path, status, model, claims_count, error),
)
except Exception as e:
print(f" WARN: Source registration failed: {e}", file=sys.stderr)
# ─── Constants ──────────────────────────────────────────────────────────────
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
DEFAULT_MODEL = "anthropic/claude-sonnet-4.5"
USAGE_CSV = "/opt/teleo-eval/logs/openrouter-usage.csv"
DOMAIN_AGENTS = {
"internet-finance": "rio",
"entertainment": "clay",
"ai-alignment": "theseus",
"health": "vida",
"space-development": "astra",
"grand-strategy": "leo",
"mechanisms": "leo",
"living-capital": "rio",
"living-agents": "theseus",
"teleohumanity": "leo",
"critical-systems": "theseus",
"collective-intelligence": "theseus",
"teleological-economics": "rio",
"cultural-dynamics": "clay",
"decision-markets": "rio",
}
# ─── Helpers ────────────────────────────────────────────────────────────────
def read_file(path):
try:
with open(path) as f:
return f.read()
except FileNotFoundError:
return ""
def get_domain_from_source(source_content):
match = re.search(r"^domain:\s*(.+)$", source_content, re.MULTILINE)
return match.group(1).strip() if match else None
def get_kb_index(domain):
"""Build fresh KB index for duplicate checking and wiki-link targets.
Regenerated before each extraction (not cached from cron) so the index
reflects the current KB state. Stale indexes cause duplicate claims and
broken wiki links. (Leo's fix #1)
"""
lines = []
# Primary domain claims
domain_dir = f"domains/{domain}"
for f in sorted(glob.glob(os.path.join(domain_dir, "*.md"))):
basename = os.path.basename(f)
if not basename.startswith("_"):
title = basename.replace(".md", "").replace("-", " ")
lines.append(f"- {basename}: {title}")
# Cross-domain claims from core/ and foundations/ (for wiki-link targets)
for subdir in ["core", "foundations"]:
for f in sorted(glob.glob(os.path.join(subdir, "**", "*.md"), recursive=True)):
basename = os.path.basename(f)
if not basename.startswith("_"):
title = basename.replace(".md", "").replace("-", " ")
lines.append(f"- {basename}: {title}")
# Entities in this domain (for enrichment detection)
entity_dir = f"entities/{domain}"
for f in sorted(glob.glob(os.path.join(entity_dir, "*.md"))):
basename = os.path.basename(f)
if not basename.startswith("_"):
lines.append(f"- [entity] {basename}: {basename.replace('.md', '').replace('-', ' ')}")
if not lines:
return "No existing claims in this domain."
# Cap at 200 entries to keep prompt size reasonable
if len(lines) > 200:
lines = lines[:200]
lines.append(f"... and {len(lines) - 200} more (truncated)")
return "\n".join(lines)
def call_openrouter(prompt, model, api_key):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://livingip.xyz",
"X-Title": "Teleo Codex Extraction",
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 16000,
}
resp = requests.post(OPENROUTER_URL, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
data = resp.json()
content = data["choices"][0]["message"]["content"]
usage = data.get("usage", {})
return content, usage
def parse_response(content):
"""Parse JSON response, handling markdown fencing and truncation."""
content = content.strip()
if content.startswith("```"):
content = re.sub(r"^```(?:json)?\s*\n?", "", content)
content = re.sub(r"\n?```\s*$", "", content)
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# Fix common JSON issues
fixed = re.sub(r",\s*([}\]])", r"\1", content)
open_braces = fixed.count("{") - fixed.count("}")
open_brackets = fixed.count("[") - fixed.count("]")
fixed += "]" * max(0, open_brackets) + "}" * max(0, open_braces)
try:
parsed = json.loads(fixed)
print(" WARN: Fixed malformed JSON (trailing commas or truncation)")
return parsed
except json.JSONDecodeError:
pass
# Last resort: try to salvage claims with regex
result = {"claims": [], "enrichments": [], "entities": [], "facts": []}
claim_pattern = r'\{"filename":\s*"([^"]+)"[^}]*"content":\s*"((?:[^"\\]|\\.)*)"\s*\}'
for match in re.finditer(claim_pattern, content, re.DOTALL):
filename = match.group(1)
claim_content = match.group(2).replace("\\n", "\n").replace('\\"', '"')
domain_match = re.search(r'"domain":\s*"([^"]+)"', match.group(0))
result["claims"].append({
"filename": filename,
"domain": domain_match.group(1) if domain_match else "",
"content": claim_content,
})
if result["claims"]:
print(f" WARN: Salvaged {len(result['claims'])} claims from malformed JSON")
return result
def reconstruct_claim_content(claim, domain, agent):
"""Build markdown content from structured claim fields (lean prompt output format)."""
title = claim.get("title", claim.get("filename", "").replace(".md", "").replace("-", " "))
desc = claim.get("description", "")
conf = claim.get("confidence", "experimental")
source = claim.get("source", f"extraction by {agent}")
body_text = claim.get("body", desc)
related = claim.get("related_claims", [])
connections = claim.get("connections", [])
sourcer = claim.get("sourcer", "")
# Build attribution block (v1: extractor always known, sourcer best-effort)
attr_lines = [
"attribution:",
" extractor:",
f' - handle: "{agent}"',
]
if sourcer:
sourcer_handle = sourcer.strip().lower().lstrip("@").replace(" ", "-")
attr_lines.extend([
" sourcer:",
f' - handle: "{sourcer_handle}"',
f' context: "{source}"',
])
# Build typed edge fields from connections array
edge_fields = {"supports": [], "challenges": [], "related": []}
for conn in connections:
target = conn.get("target", "")
rel = conn.get("relationship", "related")
if target and rel in edge_fields:
# Normalize: strip .md extension if present
target = target.replace(".md", "")
if target not in edge_fields[rel]:
edge_fields[rel].append(target)
# Also fold related_claims into "related" edges (backwards compat)
for r in related[:5]:
r_clean = r.replace(".md", "")
if r_clean not in edge_fields["related"]:
edge_fields["related"].append(r_clean)
# Build edge lines for frontmatter
edge_lines = []
for edge_type in ("supports", "challenges", "related"):
targets = edge_fields[edge_type]
if targets:
edge_lines.append(f"{edge_type}:")
for t in targets:
edge_lines.append(f" - {t}")
lines = [
"---",
"type: claim",
f"domain: {domain}",
f'description: "{desc}"',
f"confidence: {conf}",
f'source: "{source}"',
f"created: {date.today().isoformat()}",
*attr_lines,
*edge_lines,
"---",
"",
f"# {title}",
"",
body_text,
"",
"---",
"",
"Relevant Notes:",
]
for r in related[:5]:
lines.append(f"- [[{r}]]")
lines.extend(["", "Topics:", ""])
return "\n".join(lines)
def update_source_file(source_path, source_content, update_info):
"""Update source file frontmatter with processing info."""
updated = re.sub(
r"^status:\s*.+$",
f"status: {update_info['status']}",
source_content,
count=1,
flags=re.MULTILINE,
)
parts = updated.split("---", 2)
if len(parts) >= 3:
fm = parts[1]
fm += f"processed_by: {update_info['processed_by']}\n"
fm += f"processed_date: {update_info['processed_date']}\n"
if update_info.get("claims_extracted"):
fm += f"claims_extracted: {json.dumps(update_info['claims_extracted'])}\n"
if update_info.get("enrichments_applied"):
fm += f"enrichments_applied: {json.dumps(update_info['enrichments_applied'])}\n"
if update_info.get("entities_updated"):
fm += f"entities_updated: {json.dumps(update_info['entities_updated'])}\n"
if update_info.get("model"):
fm += f'extraction_model: "{update_info["model"]}"\n'
if update_info.get("notes"):
fm += f'extraction_notes: "{update_info["notes"]}"\n'
updated = f"---{fm}---{parts[2]}"
key_facts = update_info.get("key_facts", [])
if key_facts:
updated += "\n\n## Key Facts\n"
for fact in key_facts:
updated += f"- {fact}\n"
with open(source_path, "w") as f:
f.write(updated)
def log_usage(agent, model, source_file, usage):
write_header = not os.path.exists(USAGE_CSV)
with open(USAGE_CSV, "a", newline="") as f:
writer = csv.writer(f)
if write_header:
writer.writerow(["date", "agent", "model", "source_file", "input_tokens", "output_tokens"])
writer.writerow([
date.today().isoformat(), agent, model,
os.path.basename(source_file),
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
])
# ─── Main ───────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Extract claims via OpenRouter (v2)")
parser.add_argument("source_file", help="Path to source file in inbox/archive/")
parser.add_argument("--model", default=DEFAULT_MODEL, help=f"Model (default: {DEFAULT_MODEL})")
parser.add_argument("--domain", default=None, help="Override domain")
parser.add_argument("--dry-run", action="store_true", help="Print prompt, don't call API")
parser.add_argument("--no-review", action="store_true", help="No-op (v1 compat). Pass 2 is always Python validator in v2.")
parser.add_argument("--key-file", default="/opt/teleo-eval/secrets/openrouter-key")
args = parser.parse_args()
# Read API key
api_key = read_file(args.key_file).strip()
if not api_key and not args.dry_run:
print("ERROR: No API key found", file=sys.stderr)
sys.exit(1)
# Read source
source_content = read_file(args.source_file)
if not source_content:
print(f"ERROR: Cannot read {args.source_file}", file=sys.stderr)
sys.exit(1)
# Get domain and agent
domain = args.domain or get_domain_from_source(source_content)
if not domain:
print(f"ERROR: No domain field in {args.source_file}", file=sys.stderr)
sys.exit(1)
agent = DOMAIN_AGENTS.get(domain, "leo")
# Get KB index for dedup
kb_index = get_kb_index(domain)
# Load existing claims for post-extraction validation
existing_claims = load_existing_claims_from_repo(".")
# ── Build lean prompt ──
# Extract rationale and intake_tier from source frontmatter (directed contribution)
rationale = None
intake_tier = None
proposed_by = None
rationale_match = re.search(r"^rationale:\s*[\"']?(.+?)[\"']?\s*$", source_content, re.MULTILINE)
if rationale_match:
rationale = rationale_match.group(1).strip()
tier_match = re.search(r"^intake_tier:\s*(\S+)", source_content, re.MULTILINE)
if tier_match:
intake_tier = tier_match.group(1).strip()
proposed_match = re.search(r"^proposed_by:\s*[\"']?(.+?)[\"']?\s*$", source_content, re.MULTILINE)
if proposed_match:
proposed_by = proposed_match.group(1).strip()
# Set intake tier based on rationale presence
if rationale and not intake_tier:
intake_tier = "directed"
elif not intake_tier:
intake_tier = "undirected"
if rationale:
print(f" Directed contribution from {proposed_by or '?'}: {rationale[:80]}...")
# ── Prior art lookup (extract-time connection) ──
# Search Qdrant for existing claims similar to this source.
# Injected into prompt so LLM can classify connections at extraction time.
source_title = os.path.basename(args.source_file).replace(".md", "").replace("-", " ")
prior_art = _find_prior_art(source_title, source_content)
if prior_art:
print(f" Prior art: {len(prior_art)} connection candidates (top: {prior_art[0]['claim_title'][:50]}... @ {prior_art[0]['score']:.2f})")
prompt = build_extraction_prompt(
args.source_file, source_content, domain, agent, kb_index,
rationale=rationale, intake_tier=intake_tier, proposed_by=proposed_by,
prior_art=prior_art,
)
if args.dry_run:
print(f"=== DRY RUN ===")
print(f"Source: {args.source_file}")
print(f"Domain: {domain}, Agent: {agent}")
print(f"Model: {args.model}")
print(f"Existing claims: {len(existing_claims)}")
print(f"Prompt length: {len(prompt)} chars")
print(f"\n=== PROMPT ===\n{prompt[:1000]}...")
return
print(f"Extracting from {args.source_file} via {args.model}...")
print(f"Domain: {domain}, Agent: {agent}, Existing claims: {len(existing_claims)}")
# Register source as extracting (Argus: pipeline funnel)
_src_conn = _source_db_conn()
_register_source(_src_conn, args.source_file, "extracting", domain, args.model)
# ── Pass 1: LLM extraction ──
try:
content, usage = call_openrouter(prompt, args.model, api_key)
except requests.exceptions.RequestException as e:
_register_source(_src_conn, args.source_file, "error", domain, args.model, error=str(e))
print(f"ERROR: API call failed: {e}", file=sys.stderr)
sys.exit(1)
p1_in = usage.get("prompt_tokens", "?")
p1_out = usage.get("completion_tokens", "?")
print(f"LLM tokens: {p1_in} in, {p1_out} out")
result = parse_response(content)
raw_claims = result.get("claims", [])
enrichments = result.get("enrichments", [])
entities = result.get("entities", [])
facts = result.get("facts", [])
decisions = result.get("decisions", [])
print(f"LLM output: {len(raw_claims)} claims, {len(enrichments)} enrichments, {len(entities)} entities, {len(decisions)} decisions, {len(facts)} facts")
# ── Pass 2: Deterministic validation ($0) ──
# Reconstruct content for claims that used the lean format (title/body fields instead of content)
for claim in raw_claims:
if "content" not in claim or not claim["content"]:
claim["content"] = reconstruct_claim_content(claim, domain, agent)
kept_claims, rejected_claims, claim_stats = validate_and_fix_claims(
raw_claims, domain, agent, existing_claims,
)
kept_entities, rejected_entities, entity_stats = validate_and_fix_entities(
entities, domain, existing_claims,
)
print(f"Validation: {claim_stats['kept']}/{claim_stats['total']} claims kept "
f"({claim_stats['fixed']} fixed, {claim_stats['rejected']} rejected)")
if entity_stats["total"]:
print(f"Entities: {entity_stats['kept']}/{entity_stats['total']} kept")
if claim_stats["rejections"]:
print(f"Rejections: {claim_stats['rejections']}")
# ── Write claim files ──
domain_dir = f"domains/{domain}"
os.makedirs(domain_dir, exist_ok=True)
written = []
for claim in kept_claims:
filename = claim["filename"]
claim_path = os.path.join(domain_dir, filename)
if os.path.exists(claim_path):
print(f" WARN: {claim_path} exists, skipping")
continue
with open(claim_path, "w") as f:
f.write(claim["content"])
written.append(filename)
print(f" Wrote: {claim_path}")
# ── Atomic connect: wire new claims to existing KB via vector search ──
connect_stats = {"connected": 0, "edges_added": 0}
if written:
written_paths = [os.path.join(domain_dir, f) for f in written]
try:
connect_stats = connect_new_claims(written_paths, domain=domain)
if connect_stats["connected"] > 0:
print(f" Connected: {connect_stats['connected']}/{len(written)} claims → {connect_stats['edges_added']} edges")
for conn in connect_stats.get("connections", []):
print(f" {conn['claim']}{', '.join(n[:40] for n in conn['neighbors'][:3])}")
if connect_stats.get("skipped_embed_failed"):
print(f" WARN: {connect_stats['skipped_embed_failed']} claims failed embedding (Qdrant unreachable?)")
except Exception as e:
print(f" WARN: Extract-and-connect failed (non-fatal): {e}", file=sys.stderr)
# ── Apply enrichments ──
enriched = []
for enr in enrichments:
target = enr.get("target_file", "")
evidence = enr.get("evidence", "")
enr_type = enr.get("type", "confirm")
source_ref = enr.get("source_ref", os.path.basename(args.source_file))
if not target or not evidence:
continue
target_path = os.path.join(domain_dir, target)
if not os.path.exists(target_path):
print(f" WARN: Enrichment target {target_path} not found, skipping")
continue
existing_content = read_file(target_path)
source_slug = os.path.basename(args.source_file).replace(".md", "")
# Dedup: skip if this source already enriched this claim (idempotency)
if f"[[{source_slug}]]" in existing_content:
print(f" SKIP: {target} already enriched by {source_slug}")
continue
enrichment_block = (
f"\n\n### Additional Evidence ({enr_type})\n"
f"*Source: [[{source_slug}]] | Added: {date.today().isoformat()}*\n\n"
f"{evidence}\n"
)
# Insert enrichment before "Relevant Notes:" or "Topics:" section.
# Do NOT split on "---" — it matches frontmatter delimiters and corrupts YAML
# when files lack a body separator. (Leo: root cause of PRs #1504, #1509)
# Two tiers only (Ganymede: tier 2 delimiter counting dropped — horizontal rule edge case)
notes_match = re.search(r'\n(?:#{0,3}\s*)?(?:[Rr]elevant [Nn]otes|[Tt]opics)\s*:?', existing_content)
if notes_match:
insert_pos = notes_match.start()
updated = existing_content[:insert_pos] + enrichment_block + existing_content[insert_pos:]
else:
# No anchor found — append to end (always safe)
updated = existing_content.rstrip() + enrichment_block + "\n"
with open(target_path, "w") as f:
f.write(updated)
enriched.append(target)
print(f" Enriched: {target_path} ({enr_type})")
# ── Enqueue entities (NOT written to branch — applied to main by batch) ──
# Entity enrichments on branches cause merge conflicts because 20+ PRs
# modify the same entity file (futardio.md, metadao.md). Enqueuing to a
# JSON queue eliminates this: branches only create NEW claim files, entity
# updates are applied to main by entity_batch.py. (Leo's #1 fix)
entities_enqueued = []
for ent in kept_entities:
try:
from lib.entity_queue import enqueue
entry_id = enqueue(ent, args.source_file, agent)
entities_enqueued.append(ent["filename"])
print(f" Entity enqueued: {ent['filename']} ({ent.get('action', '?')}) → queue:{entry_id}")
except Exception as e:
# No fallback — fail loudly if queue unavailable. Direct writes to branches
# defeat the entire queue architecture. (Ganymede review)
print(f" ERROR: Failed to enqueue entity {ent.get('filename', '?')}: {e}", file=sys.stderr)
# ── Write decision files + enqueue parent timeline entries ──
decisions = result.get("decisions", [])
decisions_written = []
for dec in decisions:
filename = dec.get("filename", "")
dec_domain = dec.get("domain", domain)
content = dec.get("content", "")
parent = dec.get("parent_entity", "")
parent_timeline = dec.get("parent_timeline_entry", "")
if not filename:
continue
# Write decision file to branch (goes through PR eval like claims)
if content:
dec_dir = os.path.join("decisions", dec_domain)
os.makedirs(dec_dir, exist_ok=True)
dec_path = os.path.join(dec_dir, filename)
if not os.path.exists(dec_path):
with open(dec_path, "w") as f:
f.write(content)
decisions_written.append(filename)
print(f" Decision written: {dec_path}")
# Enqueue parent entity timeline entry (applied to main by entity_batch)
if parent and parent_timeline:
try:
from lib.entity_queue import enqueue
entry_id = enqueue({
"filename": parent,
"domain": dec_domain,
"action": "update",
"timeline_entry": parent_timeline,
}, args.source_file, agent)
print(f" Decision → parent timeline: {parent} (queue:{entry_id})")
except Exception as e:
print(f" WARN: Failed to enqueue parent timeline for {parent}: {e}", file=sys.stderr)
if decisions_written:
print(f" Decisions: {len(decisions_written)} written")
# ── Update source file ──
if written or decisions_written:
status = "processed"
elif enriched or entities_enqueued:
status = "enrichment"
else:
status = "null-result"
source_update = {
"status": status,
"processed_by": agent,
"processed_date": date.today().isoformat(),
"claims_extracted": written,
"model": args.model,
}
if enriched:
source_update["enrichments_applied"] = enriched
if entities_enqueued:
source_update["entities_enqueued"] = entities_enqueued
if facts:
source_update["key_facts"] = facts
if not written and not enriched and not entities_enqueued:
source_update["notes"] = (
f"LLM returned {len(raw_claims)} claims, "
f"{claim_stats['rejected']} rejected by validator"
)
update_source_file(args.source_file, source_content, source_update)
print(f" Updated: {args.source_file} → status: {status}")
# Register final status (Argus: pipeline funnel)
db_status = "extracted" if status == "processed" else ("null_result" if status == "null-result" else status)
_register_source(_src_conn, args.source_file, db_status, domain, args.model, len(written))
# ── Save debug info for rejected claims ──
if rejected_claims:
debug_dir = os.path.join(os.path.dirname(args.source_file) or ".", ".extraction-debug")
os.makedirs(debug_dir, exist_ok=True)
debug_path = os.path.join(debug_dir, os.path.basename(args.source_file).replace(".md", ".json"))
with open(debug_path, "w") as f:
json.dump({
"rejected_claims": [
{"filename": c.get("filename"), "issues": c.get("issues", [])}
for c in rejected_claims
],
"validation_stats": claim_stats,
"model": args.model,
"date": date.today().isoformat(),
}, f, indent=2)
print(f" Debug: {debug_path}")
# ── Log usage ──
log_usage(agent, args.model, args.source_file, usage)
# ── Summary ──
print(f"\n{'='*60}")
print(f" EXTRACTION COMPLETE (v2)")
print(f" Source: {args.source_file}")
print(f" Agent: {agent}")
print(f" Model: {args.model} ({p1_in} in / {p1_out} out)")
print(f" Pass 2: Python validator ($0)")
print(f" Claims: {len(written)} written, {claim_stats['rejected']} rejected, {claim_stats['fixed']} auto-fixed")
print(f" Connected: {connect_stats.get('connected', 0)} claims → {connect_stats.get('edges_added', 0)} edges (Qdrant)")
print(f" Enrichments: {len(enriched)} applied")
if entities_enqueued:
print(f" Entities: {len(entities_enqueued)} enqueued (applied by batch on main)")
if facts:
print(f" Facts: {len(facts)} stored in source notes")
print(f"{'='*60}")
if __name__ == "__main__":
main()