teleo-infrastructure/extract-decisions.py
m3taversal d67d36b409 fix: decision extractor uses extract worktree + PR flow
Was writing directly to main worktree where daemon race condition wiped
files. Now: syncs extract worktree to main, creates branch, writes
records, commits, pushes, opens Forgejo PR. Same pattern as batch-extract.

Also checks both main and extract worktrees for existing records.

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-24 02:50:12 +00:00

450 lines
18 KiB
Python

#!/usr/bin/env python3
"""Extract decision records from proposal sources.
Reads event_type: proposal sources from archive, produces decision records
in decisions/{domain}/ with full verbatim proposal text + LLM-generated
summary, significance, and KB connections.
Usage:
python3 extract-decisions.py [--dry-run] [--limit N] [--source FILE]
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
"""
import argparse
import csv
import json
import os
import re
import sys
from datetime import date
from pathlib import Path
import requests
import yaml
# ─── Constants ──────────────────────────────────────────────────────────────
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
MODEL = "anthropic/claude-sonnet-4.5"
USAGE_CSV = "/opt/teleo-eval/logs/openrouter-usage.csv"
MAIN_REPO = Path("/opt/teleo-eval/workspaces/main")
REPO_DIR = Path("/opt/teleo-eval/workspaces/extract")
ARCHIVE_DIR = MAIN_REPO / "inbox" / "archive" # Read sources from main (canonical)
DECISIONS_DIR = REPO_DIR / "decisions" # Write records to extract worktree
# ─── LLM Call ───────────────────────────────────────────────────────────────
def call_llm(prompt: str, max_tokens: int = 4096) -> str | None:
"""Call OpenRouter API."""
api_key = os.environ.get("OPENROUTER_API_KEY", "")
if not api_key:
# Try reading from file (same location as openrouter-extract-v2.py)
key_file = Path("/opt/teleo-eval/secrets/openrouter-key")
if key_file.exists():
api_key = key_file.read_text().strip()
if not api_key:
print("ERROR: No OPENROUTER_API_KEY", file=sys.stderr)
return None
resp = requests.post(
OPENROUTER_URL,
headers={"Authorization": f"Bearer {api_key}"},
json={
"model": MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.3,
},
timeout=120,
)
if resp.status_code != 200:
print(f"ERROR: OpenRouter {resp.status_code}: {resp.text[:200]}", file=sys.stderr)
return None
data = resp.json()
# Log usage
usage = data.get("usage", {})
try:
with open(USAGE_CSV, "a") as f:
writer = csv.writer(f)
writer.writerow([
date.today().isoformat(),
"extract-decisions",
MODEL,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
"",
])
except Exception:
pass
return data["choices"][0]["message"]["content"]
# ─── Frontmatter Parsing ───────────────────────────────────────────────────
def parse_frontmatter(path: Path) -> tuple[dict | None, str]:
"""Parse YAML frontmatter and body."""
text = path.read_text(errors="replace")
if not text.startswith("---"):
return None, text
end = text.find("\n---", 3)
if end == -1:
return None, text
try:
fm = yaml.safe_load(text[3:end])
if not isinstance(fm, dict):
return None, text
body = text[end + 4:].strip()
return fm, body
except Exception:
return None, text
# ─── Find Unprocessed Proposal Sources ──────────────────────────────────────
def find_proposal_sources() -> list[Path]:
"""Find all unprocessed proposal sources in archive."""
sources = []
for md_file in sorted(ARCHIVE_DIR.rglob("*.md")):
try:
fm, _ = parse_frontmatter(md_file)
except Exception:
continue
if not fm:
continue
if fm.get("event_type") == "proposal" and fm.get("status") in ("unprocessed", None):
sources.append(md_file)
return sources
# ─── Check if Decision Record Exists ────────────────────────────────────────
def decision_exists(slug: str, domain: str = "internet-finance") -> bool:
"""Check if a decision record already exists in main OR extract worktree."""
for repo in [MAIN_REPO, REPO_DIR]:
target_dir = repo / "decisions" / domain
if not target_dir.exists():
continue
if (target_dir / f"{slug}.md").exists():
return True
for f in target_dir.iterdir():
if slug[:40] in f.name:
return True
return False
def slugify(text: str) -> str:
"""Convert text to filename slug."""
text = text.lower()
text = re.sub(r'[^a-z0-9\s-]', '', text)
text = re.sub(r'[\s]+', '-', text.strip())
text = re.sub(r'-+', '-', text)
return text[:80]
# ─── Build Decision Record ──────────────────────────────────────────────────
ANALYSIS_PROMPT = """You are analyzing a futarchy/governance proposal to create a structured decision record for a knowledge base.
Given this proposal source, produce a JSON object with these fields:
- "name": The full proposal name (e.g., "MetaDAO: Hire Robin Hanson as Advisor")
- "status": "passed" or "failed" or "active" (from the source data)
- "proposer": Who proposed it (name or handle)
- "proposal_date": ISO date when created
- "resolution_date": ISO date when resolved (null if active)
- "record_type": One of: "decision_market" (governance proposals voted on via futarchy) or "fundraise" (ICO/launch raising capital through MetaDAO or Futardio)
- "category": One of: treasury, hiring, product, governance, fundraise, incentives, migration, other
- "summary": 1-2 sentence summary of what this proposal does and why it matters. Be specific — include dollar amounts, key parameters, and outcomes.
- "significance": 2-3 paragraphs analyzing why this proposal matters for the futarchy ecosystem. What does it prove or test? What precedent does it set? How does it relate to broader governance patterns?
- "related_claims": List of 2-5 wiki-link titles from the Teleo knowledge base that this proposal is evidence for or against. Use full prose-as-title format like "futarchy-governed DAOs converge on traditional corporate governance scaffolding for treasury operations because market mechanisms alone cannot provide operational security and legal compliance"
IMPORTANT: Only output valid JSON. No markdown, no commentary.
Here is the proposal source:
{source_text}
"""
def build_decision_record(source_path: Path, dry_run: bool = False) -> Path | None:
"""Build a decision record from a proposal source."""
fm, body = parse_frontmatter(source_path)
if not fm:
print(f" SKIP: No frontmatter in {source_path.name}")
return None
title = fm.get("title", "")
domain = fm.get("domain", "internet-finance")
url = fm.get("url", "")
source_date = fm.get("date", "")
tags = fm.get("tags", []) or []
# Extract project name from body
project_match = re.search(r'Project:\s*(.+)', body)
project = project_match.group(1).strip() if project_match else "Unknown"
# Build slug from title
slug = slugify(title.replace("Futardio: ", "").replace("futardio: ", ""))
if not slug:
slug = slugify(source_path.stem)
# Check if already exists
if decision_exists(slug, domain):
print(f" SKIP: Decision record already exists for {slug}")
return None
# Full source text for LLM (truncate at 8K to fit in context)
source_text = f"Title: {title}\nURL: {url}\nDate: {source_date}\n\n{body}"
if len(source_text) > 8000:
source_text = source_text[:8000] + "\n\n[... truncated for analysis ...]"
if dry_run:
print(f" DRY RUN: Would create {slug}.md from {source_path.name}")
return None
# Call LLM for analysis
prompt = ANALYSIS_PROMPT.format(source_text=source_text)
response = call_llm(prompt)
if not response:
print(f" ERROR: LLM call failed for {source_path.name}")
return None
# Parse LLM response
try:
# Strip markdown code fences if present
cleaned = re.sub(r'^```json\s*', '', response.strip())
cleaned = re.sub(r'\s*```$', '', cleaned)
analysis = json.loads(cleaned)
except json.JSONDecodeError as e:
print(f" ERROR: Invalid JSON from LLM for {source_path.name}: {e}")
print(f" Response: {response[:200]}")
return None
# Extract market data from body if present
market_lines = []
for line in body.split("\n"):
line_stripped = line.strip()
if any(kw in line_stripped.lower() for kw in
["status:", "total volume", "pass", "fail", "spot", "outcome",
"autocrat", "proposal account", "dao account", "proposer:"]):
if line_stripped.startswith("- ") or line_stripped.startswith("**"):
market_lines.append(line_stripped)
# Build frontmatter
record_type = analysis.get("record_type", "decision_market")
record_fm = {
"type": "decision",
"entity_type": record_type,
"name": analysis.get("name", title),
"domain": domain,
"status": analysis.get("status", "unknown"),
"tracked_by": "rio",
"created": str(date.today()),
"last_updated": str(date.today()),
"parent_entity": f"[[{project.lower()}]]" if project != "Unknown" else "",
"platform": "metadao",
"proposer": analysis.get("proposer", ""),
"proposal_url": url,
"proposal_date": analysis.get("proposal_date", str(source_date)),
"resolution_date": analysis.get("resolution_date", ""),
"category": analysis.get("category", "other"),
"summary": analysis.get("summary", ""),
"tags": tags + [project.lower()] if project != "Unknown" else tags,
}
# Build body
name = analysis.get("name", title)
summary = analysis.get("summary", "")
significance = analysis.get("significance", "")
related = analysis.get("related_claims", [])
body_parts = [f"# {name}\n"]
body_parts.append(f"## Summary\n\n{summary}\n")
if market_lines:
body_parts.append("## Market Data\n")
for ml in market_lines:
body_parts.append(ml)
body_parts.append("")
body_parts.append(f"## Significance\n\n{significance}\n")
# Full proposal text — verbatim
body_parts.append("## Full Proposal Text\n")
body_parts.append(body)
body_parts.append("")
# KB relationships
if related:
body_parts.append("## Relationship to KB\n")
for claim_title in related:
slug_link = claim_title.replace(" ", "-").lower()
body_parts.append(f"- [[{slug_link}]]")
body_parts.append("")
body_parts.append("---\n")
body_parts.append("Relevant Entities:")
if project != "Unknown":
body_parts.append(f"- [[{project.lower()}]] — parent organization")
body_parts.append(f"\nTopics:\n- [[internet finance and decision markets]]")
# Write file
target_dir = DECISIONS_DIR / domain
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / f"{slug}.md"
# Serialize frontmatter
fm_str = yaml.dump(record_fm, default_flow_style=False, allow_unicode=True, sort_keys=False)
content = f"---\n{fm_str}---\n\n" + "\n".join(body_parts)
target_path.write_text(content)
print(f" CREATED: {target_path.name} ({len(content)} chars)")
# Mark source as processed
source_text_full = source_path.read_text()
updated = source_text_full.replace("status: unprocessed", "status: processed")
source_path.write_text(updated)
return target_path
# ─── Main ───────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Extract decision records from proposal sources")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created without writing")
parser.add_argument("--limit", type=int, default=0, help="Max proposals to process (0 = all)")
parser.add_argument("--source", type=str, help="Process a single source file")
parser.add_argument("--skip-existing", action="store_true", default=True,
help="Skip sources that already have decision records")
args = parser.parse_args()
if args.source:
source_path = Path(args.source)
if not source_path.exists():
print(f"ERROR: Source not found: {source_path}")
sys.exit(1)
result = build_decision_record(source_path, dry_run=args.dry_run)
if result:
print(f"Done: {result}")
return
# Find all unprocessed proposals
sources = find_proposal_sources()
print(f"Found {len(sources)} unprocessed proposal sources")
if args.dry_run:
for s in sources[:args.limit or len(sources)]:
fm, _ = parse_frontmatter(s)
title = fm.get("title", s.stem) if fm else s.stem
print(f" {title}")
return
# Prepare extract worktree: sync to main, create branch
branch_name = f"epimetheus/decisions-{date.today().isoformat()}"
if not _prepare_branch(branch_name):
print("ERROR: Failed to prepare extract worktree branch")
sys.exit(1)
processed = 0
created = 0
skipped = 0
errors = 0
limit = args.limit or len(sources)
for source_path in sources[:limit]:
fm, _ = parse_frontmatter(source_path)
title = fm.get("title", source_path.stem) if fm else source_path.stem
print(f"\nProcessing: {title}")
try:
result = build_decision_record(source_path, dry_run=False)
if result:
created += 1
else:
skipped += 1
except Exception as e:
print(f" ERROR: {e}")
errors += 1
processed += 1
print(f"\nDone: {processed} processed, {created} created, {skipped} skipped, {errors} errors")
# Commit and push for PR review
if created > 0:
_commit_and_push(branch_name, created)
def _prepare_branch(branch_name: str) -> bool:
"""Sync extract worktree to main and create a new branch."""
import subprocess
cwd = str(REPO_DIR)
try:
subprocess.run(["git", "fetch", "origin", "main"], cwd=cwd, check=True, capture_output=True)
subprocess.run(["git", "checkout", "main"], cwd=cwd, check=True, capture_output=True)
subprocess.run(["git", "reset", "--hard", "origin/main"], cwd=cwd, check=True, capture_output=True)
subprocess.run(["git", "checkout", "-b", branch_name], cwd=cwd, check=True, capture_output=True)
print(f"Branch created: {branch_name}")
return True
except subprocess.CalledProcessError as e:
print(f"ERROR preparing branch: {e.stderr.decode()[:200] if e.stderr else e}")
return False
def _commit_and_push(branch_name: str, count: int):
"""Commit decision records and push branch for PR."""
import subprocess
cwd = str(REPO_DIR)
token_file = Path("/opt/teleo-eval/secrets/forgejo-leo-token")
token = token_file.read_text().strip() if token_file.exists() else ""
try:
subprocess.run(["git", "add", "decisions/"], cwd=cwd, check=True, capture_output=True)
result = subprocess.run(["git", "status", "--porcelain"], cwd=cwd, capture_output=True, text=True)
if not result.stdout.strip():
print("No changes to commit")
return
msg = (f"epimetheus: {count} decision records from proposal extraction\n\n"
f"Batch extraction of event_type: proposal sources into structured\n"
f"decision records with full verbatim text + LLM analysis.\n\n"
f"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>")
subprocess.run(["git", "commit", "-m", msg], cwd=cwd, check=True, capture_output=True)
subprocess.run(["git", "push", "-u", "origin", branch_name], cwd=cwd, check=True, capture_output=True)
print(f"Pushed branch: {branch_name}")
# Create PR via Forgejo API
if token:
resp = requests.post(
"http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls",
headers={"Authorization": f"token {token}"},
json={
"title": f"epimetheus: {count} decision records from proposal extraction",
"body": (f"## Summary\n"
f"- {count} decision records extracted from archived proposal sources\n"
f"- Full verbatim proposal text + LLM-generated summary/significance\n"
f"- Both decision markets and fundraises\n\n"
f"## Source\n"
f"Extracted by `extract-decisions.py` from `event_type: proposal` sources in archive/"),
"head": branch_name,
"base": "main",
},
timeout=30,
)
if resp.status_code in (200, 201):
pr_url = resp.json().get("html_url", "")
print(f"PR created: {pr_url}")
else:
print(f"WARNING: PR creation failed ({resp.status_code}): {resp.text[:200]}")
except subprocess.CalledProcessError as e:
print(f"ERROR committing: {e.stderr.decode()[:200] if e.stderr else e}")
if __name__ == "__main__":
main()