diff --git a/diagnostics/research_tracking.py b/diagnostics/research_tracking.py new file mode 100644 index 0000000..4b79064 --- /dev/null +++ b/diagnostics/research_tracking.py @@ -0,0 +1,419 @@ +"""Research session tracking + cost attribution for the Teleo pipeline. + +This module adds three capabilities: +1. research_sessions table — tracks WHY agents researched, what they found interesting, + session cost, and links to generated sources +2. Extraction cost attribution — writes per-source cost to sources.cost_usd after extraction +3. Source → claim linkage — ensures prs.source_path is always populated + +Designed for Epimetheus to integrate into the pipeline. Argus built the spec; +Ganymede reviews; Epimetheus wires it in. + +Data flow: + Agent research session → research_sessions row (with reasoning + summary) + → sources created (with session_id FK) + → extraction runs (cost written to sources.cost_usd + costs table) + → PRs created (source_path populated) + → claims merged (traceable back to session) +""" + +import json +import logging +import sqlite3 +from datetime import datetime +from typing import Optional + +logger = logging.getLogger("research_tracking") + +# --------------------------------------------------------------------------- +# Migration v11: research_sessions table + sources.session_id FK +# (v9 is current; v10 is Epimetheus's eval pipeline migration) +# --------------------------------------------------------------------------- + +MIGRATION_V11_SQL = """ +-- Research session tracking table +CREATE TABLE IF NOT EXISTS research_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent TEXT NOT NULL, + -- Which agent ran the research (leo, rio, astra, etc.) + domain TEXT, + -- Primary domain of the research + topic TEXT NOT NULL, + -- What they researched (short description) + reasoning TEXT, + -- WHY they chose this topic (agent's own explanation) + summary TEXT, + -- What they found most interesting/relevant + sources_planned INTEGER DEFAULT 0, + -- How many sources they intended to produce + sources_produced INTEGER DEFAULT 0, + -- How many actually materialized + model TEXT, + -- Model used for research (e.g. claude-opus-4-6) + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + cost_usd REAL DEFAULT 0, + -- Total research session cost (LLM calls for discovery + writing) + status TEXT DEFAULT 'running', + -- running, completed, failed, partial + started_at TEXT DEFAULT (datetime('now')), + completed_at TEXT, + metadata TEXT DEFAULT '{}' + -- JSON: any extra context (prompt version, search queries used, etc.) +); + +CREATE INDEX IF NOT EXISTS idx_rs_agent ON research_sessions(agent); +CREATE INDEX IF NOT EXISTS idx_rs_domain ON research_sessions(domain); +CREATE INDEX IF NOT EXISTS idx_rs_started ON research_sessions(started_at); + +-- Add session_id FK to sources table +ALTER TABLE sources ADD COLUMN session_id INTEGER REFERENCES research_sessions(id); +CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id); + +-- Record migration +INSERT INTO schema_version (version) VALUES (11); +""" + +# --------------------------------------------------------------------------- +# Cost attribution: write extraction cost to sources.cost_usd +# --------------------------------------------------------------------------- + +# Pricing per million tokens (as of March 2026) +MODEL_PRICING = { + "anthropic/claude-sonnet-4.5": {"input": 3.00, "output": 15.00}, + "anthropic/claude-sonnet-4-5": {"input": 3.00, "output": 15.00}, + "anthropic/claude-haiku-4.5": {"input": 0.80, "output": 4.00}, + "anthropic/claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00}, + "minimax/minimax-m2.5": {"input": 0.14, "output": 0.56}, +} + + +def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float: + """Calculate USD cost from model name and token counts.""" + pricing = MODEL_PRICING.get(model) + if not pricing: + # Default to Sonnet 4.5 pricing as conservative estimate + logger.warning("Unknown model %s — using Sonnet 4.5 pricing", model) + pricing = {"input": 3.00, "output": 15.00} + return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + + +def record_extraction_cost( + conn: sqlite3.Connection, + source_path: str, + model: str, + input_tokens: int, + output_tokens: int, +): + """Write extraction cost to both sources.cost_usd and costs table. + + Call this after each successful extraction call in openrouter-extract-v2.py. + This is the missing link — the CSV logger records tokens but never writes + cost back to the DB. + """ + cost = calculate_cost(model, input_tokens, output_tokens) + + # Update source row + conn.execute( + "UPDATE sources SET cost_usd = cost_usd + ?, extraction_model = ? WHERE path = ?", + (cost, model, source_path), + ) + + # Also record in costs table for dashboard aggregation + date = datetime.utcnow().strftime("%Y-%m-%d") + conn.execute( + """INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd) + VALUES (?, ?, 'extraction', 1, ?, ?, ?) + ON CONFLICT(date, model, stage) + DO UPDATE SET calls = calls + 1, + input_tokens = input_tokens + excluded.input_tokens, + output_tokens = output_tokens + excluded.output_tokens, + cost_usd = cost_usd + excluded.cost_usd""", + (date, model, input_tokens, output_tokens, cost), + ) + + conn.commit() + logger.info( + "Recorded extraction cost for %s: $%.4f (%d in, %d out, %s)", + source_path, cost, input_tokens, output_tokens, model, + ) + return cost + + +# --------------------------------------------------------------------------- +# Research session lifecycle +# --------------------------------------------------------------------------- + + +def start_session( + conn: sqlite3.Connection, + agent: str, + topic: str, + domain: Optional[str] = None, + reasoning: Optional[str] = None, + sources_planned: int = 0, + model: Optional[str] = None, + metadata: Optional[dict] = None, +) -> int: + """Call at the START of a research session. Returns session_id. + + The agent should call this before it begins producing sources, + explaining what it plans to research and why. + """ + cur = conn.execute( + """INSERT INTO research_sessions + (agent, domain, topic, reasoning, sources_planned, model, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + agent, + domain, + topic, + reasoning, + sources_planned, + model, + json.dumps(metadata or {}), + ), + ) + conn.commit() + session_id = cur.lastrowid + logger.info("Started research session #%d: %s / %s", session_id, agent, topic) + return session_id + + +def link_source_to_session( + conn: sqlite3.Connection, + source_path: str, + session_id: int, +): + """Link a source file to its research session. + + Call this when a source is written to inbox/ during a research session. + """ + conn.execute( + "UPDATE sources SET session_id = ? WHERE path = ?", + (session_id, source_path), + ) + conn.execute( + """UPDATE research_sessions + SET sources_produced = sources_produced + 1 + WHERE id = ?""", + (session_id,), + ) + conn.commit() + + +def complete_session( + conn: sqlite3.Connection, + session_id: int, + summary: str, + input_tokens: int = 0, + output_tokens: int = 0, + cost_usd: float = 0, + status: str = "completed", +): + """Call at the END of a research session. + + The agent should summarize what it found most interesting/relevant. + Cost should include ALL LLM calls made during the session (web search, + analysis, source writing — everything). + """ + conn.execute( + """UPDATE research_sessions + SET summary = ?, input_tokens = ?, output_tokens = ?, + cost_usd = ?, status = ?, completed_at = datetime('now') + WHERE id = ?""", + (summary, input_tokens, output_tokens, cost_usd, status, session_id), + ) + conn.commit() + logger.info("Completed research session #%d: %s", session_id, status) + + +# --------------------------------------------------------------------------- +# Source → PR linkage fix +# --------------------------------------------------------------------------- + + +def ensure_source_path_on_pr( + conn: sqlite3.Connection, + pr_number: int, + source_path: str, +): + """Ensure prs.source_path is populated. Call during PR creation. + + Currently 0/1451 PRs have source_path set. This is the fix. + """ + conn.execute( + "UPDATE prs SET source_path = ? WHERE number = ? AND (source_path IS NULL OR source_path = '')", + (source_path, pr_number), + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# Backfill: attribute extraction costs from existing CSV log +# --------------------------------------------------------------------------- + + +def backfill_extraction_costs(conn: sqlite3.Connection, csv_path: str): + """One-time backfill: read openrouter-usage.csv and write costs to sources + costs tables. + + Run once to fill in the ~$338 of extraction costs that were logged to CSV + but never written to the database. + + Safe to re-run — only updates sources where cost_usd = 0, so partial + runs can be resumed without double-counting. + """ + import csv + + count = 0 + total_cost = 0.0 + with open(csv_path) as f: + reader = csv.DictReader(f) + for row in reader: + source_file = row.get("source_file", "") + model = row.get("model", "") + try: + in_tok = int(row.get("input_tokens", 0) or 0) + out_tok = int(row.get("output_tokens", 0) or 0) + except (ValueError, TypeError): + continue + + cost = calculate_cost(model, in_tok, out_tok) + if cost <= 0: + continue + + # Try to match source_file to sources.path + # CSV has filename, DB has full path — match on exact suffix + # Use ORDER BY length(path) to prefer shortest (most specific) match + matched = conn.execute( + "SELECT path FROM sources WHERE path LIKE ? AND cost_usd = 0 ORDER BY length(path) LIMIT 1", + (f"%/{source_file}" if "/" not in source_file else f"%{source_file}",), + ).fetchone() + + if matched: + conn.execute( + "UPDATE sources SET cost_usd = ?, extraction_model = ? WHERE path = ?", + (cost, model, matched[0]), + ) + + # Always record in costs table + date = row.get("date", "unknown") + conn.execute( + """INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd) + VALUES (?, ?, 'extraction', 1, ?, ?, ?) + ON CONFLICT(date, model, stage) + DO UPDATE SET calls = calls + 1, + input_tokens = input_tokens + excluded.input_tokens, + output_tokens = output_tokens + excluded.output_tokens, + cost_usd = cost_usd + excluded.cost_usd""", + (date, model, in_tok, out_tok, cost), + ) + + count += 1 + total_cost += cost + + conn.commit() + logger.info("Backfilled %d extraction cost records, total $%.2f", count, total_cost) + return count, total_cost + + +# --------------------------------------------------------------------------- +# Backfill: populate prs.source_path from branch naming convention +# --------------------------------------------------------------------------- + + +def backfill_source_paths(conn: sqlite3.Connection): + """One-time backfill: derive source_path for existing PRs from branch names. + + Branch format: extract/YYYY-MM-DD-source-name or similar patterns. + Source path format: inbox/queue/YYYY-MM-DD-source-name.md + """ + rows = conn.execute( + "SELECT number, branch FROM prs WHERE source_path IS NULL AND branch IS NOT NULL" + ).fetchall() + + count = 0 + for number, branch in rows: + # Try to extract source name from branch + # Common patterns: extract/source-name, claims/source-name + parts = branch.split("/", 1) + if len(parts) < 2: + continue + source_stem = parts[1] + + # Try to find matching source in DB — exact suffix match, shortest path wins + matched = conn.execute( + "SELECT path FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1", + (f"%/{source_stem}%" if source_stem else "",), + ).fetchone() + + if matched: + conn.execute( + "UPDATE prs SET source_path = ? WHERE number = ?", + (matched[0], number), + ) + count += 1 + + conn.commit() + logger.info("Backfilled source_path for %d PRs", count) + return count + + +# --------------------------------------------------------------------------- +# Integration points (for Epimetheus to wire in) +# --------------------------------------------------------------------------- + +INTEGRATION_GUIDE = """ +## Where to wire this in + +### 1. openrouter-extract-v2.py — after successful extraction call + + from research_tracking import record_extraction_cost + + # After line 430 (content, usage = call_openrouter(...)) + # After line 672 (log_usage(...)) + record_extraction_cost( + conn, args.source_file, args.model, + usage.get("prompt_tokens", 0), + usage.get("completion_tokens", 0), + ) + +### 2. Agent research scripts — wrap research sessions + + from research_tracking import start_session, link_source_to_session, complete_session + + # At start of research: + session_id = start_session(conn, agent="leo", topic="weapons stigmatization campaigns", + domain="grand-strategy", + reasoning="Following up on EU AI Act national security exclusion — exploring how stigmatization + campaigns have historically driven arms control policy", + sources_planned=6, model="claude-opus-4-6") + + # As each source is written: + link_source_to_session(conn, source_path, session_id) + + # At end of research: + complete_session(conn, session_id, + summary="Ottawa Treaty mine ban model is the strongest parallel to AI weapons — same + 3-condition framework (humanitarian harm + low military utility + civil society + coalition). Ukraine Shahed case is a near-miss triggering event.", + input_tokens=total_in, output_tokens=total_out, cost_usd=total_cost) + +### 3. PR creation in lib/merge.py or lib/validate.py — ensure source_path + + from research_tracking import ensure_source_path_on_pr + + # When creating a PR, pass the source: + ensure_source_path_on_pr(conn, pr_number, source_path) + +### 4. One-time backfills (run manually after migration) + + from research_tracking import backfill_extraction_costs, backfill_source_paths + + backfill_extraction_costs(conn, "/opt/teleo-eval/logs/openrouter-usage.csv") + backfill_source_paths(conn) + +### 5. Migration + + Run MIGRATION_V11_SQL against pipeline.db after backing up. +"""