teleo-codex/ops/diagnostics/research_tracking.py
m3taversal bf3af00d5d consolidate diagnostics: copy newer/unique files from root /diagnostics/ into teleo-codex/ops/diagnostics/
Files consolidated:
- dashboard_routes.py: root copy (39K) overwrites teleo-codex (34K) — has cost fix + connection leak fix
- dashboard_prs.py: root copy overwrites — has cost display rewrite
- dashboard_epistemic.py: root copy overwrites — has Ship rename
- research_tracking.py: new file, existed only in root /diagnostics/ (reviewed by Ganymede, never committed here)
- research_routes.py: new file, same situation
- ops/db.py: new file, unique to root /diagnostics/ops/

After this commit, root /diagnostics/ contains only stale copies and patch files — safe to delete.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 10:14:40 +02:00

419 lines
15 KiB
Python

"""Research session tracking + cost attribution for the Teleo pipeline.
This module adds three capabilities:
1. research_sessions table — tracks WHY agents researched, what they found interesting,
session cost, and links to generated sources
2. Extraction cost attribution — writes per-source cost to sources.cost_usd after extraction
3. Source → claim linkage — ensures prs.source_path is always populated
Designed for Epimetheus to integrate into the pipeline. Argus built the spec;
Ganymede reviews; Epimetheus wires it in.
Data flow:
Agent research session → research_sessions row (with reasoning + summary)
→ sources created (with session_id FK)
→ extraction runs (cost written to sources.cost_usd + costs table)
→ PRs created (source_path populated)
→ claims merged (traceable back to session)
"""
import json
import logging
import sqlite3
from datetime import datetime
from typing import Optional
logger = logging.getLogger("research_tracking")
# ---------------------------------------------------------------------------
# Migration v11: research_sessions table + sources.session_id FK
# (v9 is current; v10 is Epimetheus's eval pipeline migration)
# ---------------------------------------------------------------------------
MIGRATION_V11_SQL = """
-- Research session tracking table
CREATE TABLE IF NOT EXISTS research_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
-- Which agent ran the research (leo, rio, astra, etc.)
domain TEXT,
-- Primary domain of the research
topic TEXT NOT NULL,
-- What they researched (short description)
reasoning TEXT,
-- WHY they chose this topic (agent's own explanation)
summary TEXT,
-- What they found most interesting/relevant
sources_planned INTEGER DEFAULT 0,
-- How many sources they intended to produce
sources_produced INTEGER DEFAULT 0,
-- How many actually materialized
model TEXT,
-- Model used for research (e.g. claude-opus-4-6)
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
-- Total research session cost (LLM calls for discovery + writing)
status TEXT DEFAULT 'running',
-- running, completed, failed, partial
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
metadata TEXT DEFAULT '{}'
-- JSON: any extra context (prompt version, search queries used, etc.)
);
CREATE INDEX IF NOT EXISTS idx_rs_agent ON research_sessions(agent);
CREATE INDEX IF NOT EXISTS idx_rs_domain ON research_sessions(domain);
CREATE INDEX IF NOT EXISTS idx_rs_started ON research_sessions(started_at);
-- Add session_id FK to sources table
ALTER TABLE sources ADD COLUMN session_id INTEGER REFERENCES research_sessions(id);
CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
-- Record migration
INSERT INTO schema_version (version) VALUES (11);
"""
# ---------------------------------------------------------------------------
# Cost attribution: write extraction cost to sources.cost_usd
# ---------------------------------------------------------------------------
# Pricing per million tokens (as of March 2026)
MODEL_PRICING = {
"anthropic/claude-sonnet-4.5": {"input": 3.00, "output": 15.00},
"anthropic/claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"anthropic/claude-haiku-4.5": {"input": 0.80, "output": 4.00},
"anthropic/claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"minimax/minimax-m2.5": {"input": 0.14, "output": 0.56},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate USD cost from model name and token counts."""
pricing = MODEL_PRICING.get(model)
if not pricing:
# Default to Sonnet 4.5 pricing as conservative estimate
logger.warning("Unknown model %s — using Sonnet 4.5 pricing", model)
pricing = {"input": 3.00, "output": 15.00}
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
def record_extraction_cost(
conn: sqlite3.Connection,
source_path: str,
model: str,
input_tokens: int,
output_tokens: int,
):
"""Write extraction cost to both sources.cost_usd and costs table.
Call this after each successful extraction call in openrouter-extract-v2.py.
This is the missing link — the CSV logger records tokens but never writes
cost back to the DB.
"""
cost = calculate_cost(model, input_tokens, output_tokens)
# Update source row
conn.execute(
"UPDATE sources SET cost_usd = cost_usd + ?, extraction_model = ? WHERE path = ?",
(cost, model, source_path),
)
# Also record in costs table for dashboard aggregation
date = datetime.utcnow().strftime("%Y-%m-%d")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, input_tokens, output_tokens, cost),
)
conn.commit()
logger.info(
"Recorded extraction cost for %s: $%.4f (%d in, %d out, %s)",
source_path, cost, input_tokens, output_tokens, model,
)
return cost
# ---------------------------------------------------------------------------
# Research session lifecycle
# ---------------------------------------------------------------------------
def start_session(
conn: sqlite3.Connection,
agent: str,
topic: str,
domain: Optional[str] = None,
reasoning: Optional[str] = None,
sources_planned: int = 0,
model: Optional[str] = None,
metadata: Optional[dict] = None,
) -> int:
"""Call at the START of a research session. Returns session_id.
The agent should call this before it begins producing sources,
explaining what it plans to research and why.
"""
cur = conn.execute(
"""INSERT INTO research_sessions
(agent, domain, topic, reasoning, sources_planned, model, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
agent,
domain,
topic,
reasoning,
sources_planned,
model,
json.dumps(metadata or {}),
),
)
conn.commit()
session_id = cur.lastrowid
logger.info("Started research session #%d: %s / %s", session_id, agent, topic)
return session_id
def link_source_to_session(
conn: sqlite3.Connection,
source_path: str,
session_id: int,
):
"""Link a source file to its research session.
Call this when a source is written to inbox/ during a research session.
"""
conn.execute(
"UPDATE sources SET session_id = ? WHERE path = ?",
(session_id, source_path),
)
conn.execute(
"""UPDATE research_sessions
SET sources_produced = sources_produced + 1
WHERE id = ?""",
(session_id,),
)
conn.commit()
def complete_session(
conn: sqlite3.Connection,
session_id: int,
summary: str,
input_tokens: int = 0,
output_tokens: int = 0,
cost_usd: float = 0,
status: str = "completed",
):
"""Call at the END of a research session.
The agent should summarize what it found most interesting/relevant.
Cost should include ALL LLM calls made during the session (web search,
analysis, source writing — everything).
"""
conn.execute(
"""UPDATE research_sessions
SET summary = ?, input_tokens = ?, output_tokens = ?,
cost_usd = ?, status = ?, completed_at = datetime('now')
WHERE id = ?""",
(summary, input_tokens, output_tokens, cost_usd, status, session_id),
)
conn.commit()
logger.info("Completed research session #%d: %s", session_id, status)
# ---------------------------------------------------------------------------
# Source → PR linkage fix
# ---------------------------------------------------------------------------
def ensure_source_path_on_pr(
conn: sqlite3.Connection,
pr_number: int,
source_path: str,
):
"""Ensure prs.source_path is populated. Call during PR creation.
Currently 0/1451 PRs have source_path set. This is the fix.
"""
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ? AND (source_path IS NULL OR source_path = '')",
(source_path, pr_number),
)
conn.commit()
# ---------------------------------------------------------------------------
# Backfill: attribute extraction costs from existing CSV log
# ---------------------------------------------------------------------------
def backfill_extraction_costs(conn: sqlite3.Connection, csv_path: str):
"""One-time backfill: read openrouter-usage.csv and write costs to sources + costs tables.
Run once to fill in the ~$338 of extraction costs that were logged to CSV
but never written to the database.
Safe to re-run — only updates sources where cost_usd = 0, so partial
runs can be resumed without double-counting.
"""
import csv
count = 0
total_cost = 0.0
with open(csv_path) as f:
reader = csv.DictReader(f)
for row in reader:
source_file = row.get("source_file", "")
model = row.get("model", "")
try:
in_tok = int(row.get("input_tokens", 0) or 0)
out_tok = int(row.get("output_tokens", 0) or 0)
except (ValueError, TypeError):
continue
cost = calculate_cost(model, in_tok, out_tok)
if cost <= 0:
continue
# Try to match source_file to sources.path
# CSV has filename, DB has full path — match on exact suffix
# Use ORDER BY length(path) to prefer shortest (most specific) match
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? AND cost_usd = 0 ORDER BY length(path) LIMIT 1",
(f"%/{source_file}" if "/" not in source_file else f"%{source_file}",),
).fetchone()
if matched:
conn.execute(
"UPDATE sources SET cost_usd = ?, extraction_model = ? WHERE path = ?",
(cost, model, matched[0]),
)
# Always record in costs table
date = row.get("date", "unknown")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, in_tok, out_tok, cost),
)
count += 1
total_cost += cost
conn.commit()
logger.info("Backfilled %d extraction cost records, total $%.2f", count, total_cost)
return count, total_cost
# ---------------------------------------------------------------------------
# Backfill: populate prs.source_path from branch naming convention
# ---------------------------------------------------------------------------
def backfill_source_paths(conn: sqlite3.Connection):
"""One-time backfill: derive source_path for existing PRs from branch names.
Branch format: extract/YYYY-MM-DD-source-name or similar patterns.
Source path format: inbox/queue/YYYY-MM-DD-source-name.md
"""
rows = conn.execute(
"SELECT number, branch FROM prs WHERE source_path IS NULL AND branch IS NOT NULL"
).fetchall()
count = 0
for number, branch in rows:
# Try to extract source name from branch
# Common patterns: extract/source-name, claims/source-name
parts = branch.split("/", 1)
if len(parts) < 2:
continue
source_stem = parts[1]
# Try to find matching source in DB — exact suffix match, shortest path wins
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_stem}%" if source_stem else "",),
).fetchone()
if matched:
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ?",
(matched[0], number),
)
count += 1
conn.commit()
logger.info("Backfilled source_path for %d PRs", count)
return count
# ---------------------------------------------------------------------------
# Integration points (for Epimetheus to wire in)
# ---------------------------------------------------------------------------
INTEGRATION_GUIDE = """
## Where to wire this in
### 1. openrouter-extract-v2.py — after successful extraction call
from research_tracking import record_extraction_cost
# After line 430 (content, usage = call_openrouter(...))
# After line 672 (log_usage(...))
record_extraction_cost(
conn, args.source_file, args.model,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
)
### 2. Agent research scripts — wrap research sessions
from research_tracking import start_session, link_source_to_session, complete_session
# At start of research:
session_id = start_session(conn, agent="leo", topic="weapons stigmatization campaigns",
domain="grand-strategy",
reasoning="Following up on EU AI Act national security exclusion — exploring how stigmatization
campaigns have historically driven arms control policy",
sources_planned=6, model="claude-opus-4-6")
# As each source is written:
link_source_to_session(conn, source_path, session_id)
# At end of research:
complete_session(conn, session_id,
summary="Ottawa Treaty mine ban model is the strongest parallel to AI weapons — same
3-condition framework (humanitarian harm + low military utility + civil society
coalition). Ukraine Shahed case is a near-miss triggering event.",
input_tokens=total_in, output_tokens=total_out, cost_usd=total_cost)
### 3. PR creation in lib/merge.py or lib/validate.py — ensure source_path
from research_tracking import ensure_source_path_on_pr
# When creating a PR, pass the source:
ensure_source_path_on_pr(conn, pr_number, source_path)
### 4. One-time backfills (run manually after migration)
from research_tracking import backfill_extraction_costs, backfill_source_paths
backfill_extraction_costs(conn, "/opt/teleo-eval/logs/openrouter-usage.csv")
backfill_source_paths(conn)
### 5. Migration
Run MIGRATION_V11_SQL against pipeline.db after backing up.
"""