teleo-infrastructure/extract-decisions.py
m3taversal 6c6cd0d14e feat: support fundraise record_type alongside decision_market
LLM now classifies proposals as either decision_market (governance votes)
or fundraise (ICO/launch capital raises). Both handled by same extractor.

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-23 18:04:30 +00:00

374 lines
14 KiB
Python

#!/usr/bin/env python3
"""Extract decision records from proposal sources.
Reads event_type: proposal sources from archive, produces decision records
in decisions/{domain}/ with full verbatim proposal text + LLM-generated
summary, significance, and KB connections.
Usage:
python3 extract-decisions.py [--dry-run] [--limit N] [--source FILE]
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
"""
import argparse
import csv
import json
import os
import re
import sys
from datetime import date
from pathlib import Path
import requests
import yaml
# ─── Constants ──────────────────────────────────────────────────────────────
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
MODEL = "anthropic/claude-sonnet-4.5"
USAGE_CSV = "/opt/teleo-eval/logs/openrouter-usage.csv"
REPO_DIR = Path("/opt/teleo-eval/workspaces/main")
ARCHIVE_DIR = REPO_DIR / "inbox" / "archive"
DECISIONS_DIR = REPO_DIR / "decisions"
# ─── LLM Call ───────────────────────────────────────────────────────────────
def call_llm(prompt: str, max_tokens: int = 4096) -> str | None:
"""Call OpenRouter API."""
api_key = os.environ.get("OPENROUTER_API_KEY", "")
if not api_key:
# Try reading from file (same location as openrouter-extract-v2.py)
key_file = Path("/opt/teleo-eval/secrets/openrouter-key")
if key_file.exists():
api_key = key_file.read_text().strip()
if not api_key:
print("ERROR: No OPENROUTER_API_KEY", file=sys.stderr)
return None
resp = requests.post(
OPENROUTER_URL,
headers={"Authorization": f"Bearer {api_key}"},
json={
"model": MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.3,
},
timeout=120,
)
if resp.status_code != 200:
print(f"ERROR: OpenRouter {resp.status_code}: {resp.text[:200]}", file=sys.stderr)
return None
data = resp.json()
# Log usage
usage = data.get("usage", {})
try:
with open(USAGE_CSV, "a") as f:
writer = csv.writer(f)
writer.writerow([
date.today().isoformat(),
"extract-decisions",
MODEL,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
"",
])
except Exception:
pass
return data["choices"][0]["message"]["content"]
# ─── Frontmatter Parsing ───────────────────────────────────────────────────
def parse_frontmatter(path: Path) -> tuple[dict | None, str]:
"""Parse YAML frontmatter and body."""
text = path.read_text(errors="replace")
if not text.startswith("---"):
return None, text
end = text.find("\n---", 3)
if end == -1:
return None, text
try:
fm = yaml.safe_load(text[3:end])
if not isinstance(fm, dict):
return None, text
body = text[end + 4:].strip()
return fm, body
except Exception:
return None, text
# ─── Find Unprocessed Proposal Sources ──────────────────────────────────────
def find_proposal_sources() -> list[Path]:
"""Find all unprocessed proposal sources in archive."""
sources = []
for md_file in sorted(ARCHIVE_DIR.rglob("*.md")):
try:
fm, _ = parse_frontmatter(md_file)
except Exception:
continue
if not fm:
continue
if fm.get("event_type") == "proposal" and fm.get("status") in ("unprocessed", None):
sources.append(md_file)
return sources
# ─── Check if Decision Record Exists ────────────────────────────────────────
def decision_exists(slug: str, domain: str = "internet-finance") -> bool:
"""Check if a decision record already exists."""
target_dir = DECISIONS_DIR / domain
if not target_dir.exists():
return False
# Check exact slug match
if (target_dir / f"{slug}.md").exists():
return True
# Check partial match (slug might be truncated)
for f in target_dir.iterdir():
if slug[:40] in f.name:
return True
return False
def slugify(text: str) -> str:
"""Convert text to filename slug."""
text = text.lower()
text = re.sub(r'[^a-z0-9\s-]', '', text)
text = re.sub(r'[\s]+', '-', text.strip())
text = re.sub(r'-+', '-', text)
return text[:80]
# ─── Build Decision Record ──────────────────────────────────────────────────
ANALYSIS_PROMPT = """You are analyzing a futarchy/governance proposal to create a structured decision record for a knowledge base.
Given this proposal source, produce a JSON object with these fields:
- "name": The full proposal name (e.g., "MetaDAO: Hire Robin Hanson as Advisor")
- "status": "passed" or "failed" or "active" (from the source data)
- "proposer": Who proposed it (name or handle)
- "proposal_date": ISO date when created
- "resolution_date": ISO date when resolved (null if active)
- "record_type": One of: "decision_market" (governance proposals voted on via futarchy) or "fundraise" (ICO/launch raising capital through MetaDAO or Futardio)
- "category": One of: treasury, hiring, product, governance, fundraise, incentives, migration, other
- "summary": 1-2 sentence summary of what this proposal does and why it matters. Be specific — include dollar amounts, key parameters, and outcomes.
- "significance": 2-3 paragraphs analyzing why this proposal matters for the futarchy ecosystem. What does it prove or test? What precedent does it set? How does it relate to broader governance patterns?
- "related_claims": List of 2-5 wiki-link titles from the Teleo knowledge base that this proposal is evidence for or against. Use full prose-as-title format like "futarchy-governed DAOs converge on traditional corporate governance scaffolding for treasury operations because market mechanisms alone cannot provide operational security and legal compliance"
IMPORTANT: Only output valid JSON. No markdown, no commentary.
Here is the proposal source:
{source_text}
"""
def build_decision_record(source_path: Path, dry_run: bool = False) -> Path | None:
"""Build a decision record from a proposal source."""
fm, body = parse_frontmatter(source_path)
if not fm:
print(f" SKIP: No frontmatter in {source_path.name}")
return None
title = fm.get("title", "")
domain = fm.get("domain", "internet-finance")
url = fm.get("url", "")
source_date = fm.get("date", "")
tags = fm.get("tags", []) or []
# Extract project name from body
project_match = re.search(r'Project:\s*(.+)', body)
project = project_match.group(1).strip() if project_match else "Unknown"
# Build slug from title
slug = slugify(title.replace("Futardio: ", "").replace("futardio: ", ""))
if not slug:
slug = slugify(source_path.stem)
# Check if already exists
if decision_exists(slug, domain):
print(f" SKIP: Decision record already exists for {slug}")
return None
# Full source text for LLM (truncate at 8K to fit in context)
source_text = f"Title: {title}\nURL: {url}\nDate: {source_date}\n\n{body}"
if len(source_text) > 8000:
source_text = source_text[:8000] + "\n\n[... truncated for analysis ...]"
if dry_run:
print(f" DRY RUN: Would create {slug}.md from {source_path.name}")
return None
# Call LLM for analysis
prompt = ANALYSIS_PROMPT.format(source_text=source_text)
response = call_llm(prompt)
if not response:
print(f" ERROR: LLM call failed for {source_path.name}")
return None
# Parse LLM response
try:
# Strip markdown code fences if present
cleaned = re.sub(r'^```json\s*', '', response.strip())
cleaned = re.sub(r'\s*```$', '', cleaned)
analysis = json.loads(cleaned)
except json.JSONDecodeError as e:
print(f" ERROR: Invalid JSON from LLM for {source_path.name}: {e}")
print(f" Response: {response[:200]}")
return None
# Extract market data from body if present
market_lines = []
for line in body.split("\n"):
line_stripped = line.strip()
if any(kw in line_stripped.lower() for kw in
["status:", "total volume", "pass", "fail", "spot", "outcome",
"autocrat", "proposal account", "dao account", "proposer:"]):
if line_stripped.startswith("- ") or line_stripped.startswith("**"):
market_lines.append(line_stripped)
# Build frontmatter
record_type = analysis.get("record_type", "decision_market")
record_fm = {
"type": "decision",
"entity_type": record_type,
"name": analysis.get("name", title),
"domain": domain,
"status": analysis.get("status", "unknown"),
"tracked_by": "rio",
"created": str(date.today()),
"last_updated": str(date.today()),
"parent_entity": f"[[{project.lower()}]]" if project != "Unknown" else "",
"platform": "metadao",
"proposer": analysis.get("proposer", ""),
"proposal_url": url,
"proposal_date": analysis.get("proposal_date", str(source_date)),
"resolution_date": analysis.get("resolution_date", ""),
"category": analysis.get("category", "other"),
"summary": analysis.get("summary", ""),
"tags": tags + [project.lower()] if project != "Unknown" else tags,
}
# Build body
name = analysis.get("name", title)
summary = analysis.get("summary", "")
significance = analysis.get("significance", "")
related = analysis.get("related_claims", [])
body_parts = [f"# {name}\n"]
body_parts.append(f"## Summary\n\n{summary}\n")
if market_lines:
body_parts.append("## Market Data\n")
for ml in market_lines:
body_parts.append(ml)
body_parts.append("")
body_parts.append(f"## Significance\n\n{significance}\n")
# Full proposal text — verbatim
body_parts.append("## Full Proposal Text\n")
body_parts.append(body)
body_parts.append("")
# KB relationships
if related:
body_parts.append("## Relationship to KB\n")
for claim_title in related:
slug_link = claim_title.replace(" ", "-").lower()
body_parts.append(f"- [[{slug_link}]]")
body_parts.append("")
body_parts.append("---\n")
body_parts.append("Relevant Entities:")
if project != "Unknown":
body_parts.append(f"- [[{project.lower()}]] — parent organization")
body_parts.append(f"\nTopics:\n- [[internet finance and decision markets]]")
# Write file
target_dir = DECISIONS_DIR / domain
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / f"{slug}.md"
# Serialize frontmatter
fm_str = yaml.dump(record_fm, default_flow_style=False, allow_unicode=True, sort_keys=False)
content = f"---\n{fm_str}---\n\n" + "\n".join(body_parts)
target_path.write_text(content)
print(f" CREATED: {target_path.name} ({len(content)} chars)")
# Mark source as processed
source_text_full = source_path.read_text()
updated = source_text_full.replace("status: unprocessed", "status: processed")
source_path.write_text(updated)
return target_path
# ─── Main ───────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Extract decision records from proposal sources")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created without writing")
parser.add_argument("--limit", type=int, default=0, help="Max proposals to process (0 = all)")
parser.add_argument("--source", type=str, help="Process a single source file")
parser.add_argument("--skip-existing", action="store_true", default=True,
help="Skip sources that already have decision records")
args = parser.parse_args()
if args.source:
source_path = Path(args.source)
if not source_path.exists():
print(f"ERROR: Source not found: {source_path}")
sys.exit(1)
result = build_decision_record(source_path, dry_run=args.dry_run)
if result:
print(f"Done: {result}")
return
# Find all unprocessed proposals
sources = find_proposal_sources()
print(f"Found {len(sources)} unprocessed proposal sources")
if args.dry_run:
for s in sources[:args.limit or len(sources)]:
fm, _ = parse_frontmatter(s)
title = fm.get("title", s.stem) if fm else s.stem
print(f" {title}")
return
processed = 0
created = 0
skipped = 0
errors = 0
limit = args.limit or len(sources)
for source_path in sources[:limit]:
fm, _ = parse_frontmatter(source_path)
title = fm.get("title", source_path.stem) if fm else source_path.stem
print(f"\nProcessing: {title}")
try:
result = build_decision_record(source_path, dry_run=False)
if result:
created += 1
else:
skipped += 1
except Exception as e:
print(f" ERROR: {e}")
errors += 1
processed += 1
print(f"\nDone: {processed} processed, {created} created, {skipped} skipped, {errors} errors")
if __name__ == "__main__":
main()