Add research_tracking.py to diagnostics (Phase 1 consolidation)
Argus's research lifecycle tracking module. Was in root diagnostics/ only — missing from both repos. Completes Phase 1 file inventory. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c0cc4ef090
commit
380ebd9124
1 changed files with 419 additions and 0 deletions
419
diagnostics/research_tracking.py
Normal file
419
diagnostics/research_tracking.py
Normal file
|
|
@ -0,0 +1,419 @@
|
|||
"""Research session tracking + cost attribution for the Teleo pipeline.
|
||||
|
||||
This module adds three capabilities:
|
||||
1. research_sessions table — tracks WHY agents researched, what they found interesting,
|
||||
session cost, and links to generated sources
|
||||
2. Extraction cost attribution — writes per-source cost to sources.cost_usd after extraction
|
||||
3. Source → claim linkage — ensures prs.source_path is always populated
|
||||
|
||||
Designed for Epimetheus to integrate into the pipeline. Argus built the spec;
|
||||
Ganymede reviews; Epimetheus wires it in.
|
||||
|
||||
Data flow:
|
||||
Agent research session → research_sessions row (with reasoning + summary)
|
||||
→ sources created (with session_id FK)
|
||||
→ extraction runs (cost written to sources.cost_usd + costs table)
|
||||
→ PRs created (source_path populated)
|
||||
→ claims merged (traceable back to session)
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger("research_tracking")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Migration v11: research_sessions table + sources.session_id FK
|
||||
# (v9 is current; v10 is Epimetheus's eval pipeline migration)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
MIGRATION_V11_SQL = """
|
||||
-- Research session tracking table
|
||||
CREATE TABLE IF NOT EXISTS research_sessions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent TEXT NOT NULL,
|
||||
-- Which agent ran the research (leo, rio, astra, etc.)
|
||||
domain TEXT,
|
||||
-- Primary domain of the research
|
||||
topic TEXT NOT NULL,
|
||||
-- What they researched (short description)
|
||||
reasoning TEXT,
|
||||
-- WHY they chose this topic (agent's own explanation)
|
||||
summary TEXT,
|
||||
-- What they found most interesting/relevant
|
||||
sources_planned INTEGER DEFAULT 0,
|
||||
-- How many sources they intended to produce
|
||||
sources_produced INTEGER DEFAULT 0,
|
||||
-- How many actually materialized
|
||||
model TEXT,
|
||||
-- Model used for research (e.g. claude-opus-4-6)
|
||||
input_tokens INTEGER DEFAULT 0,
|
||||
output_tokens INTEGER DEFAULT 0,
|
||||
cost_usd REAL DEFAULT 0,
|
||||
-- Total research session cost (LLM calls for discovery + writing)
|
||||
status TEXT DEFAULT 'running',
|
||||
-- running, completed, failed, partial
|
||||
started_at TEXT DEFAULT (datetime('now')),
|
||||
completed_at TEXT,
|
||||
metadata TEXT DEFAULT '{}'
|
||||
-- JSON: any extra context (prompt version, search queries used, etc.)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rs_agent ON research_sessions(agent);
|
||||
CREATE INDEX IF NOT EXISTS idx_rs_domain ON research_sessions(domain);
|
||||
CREATE INDEX IF NOT EXISTS idx_rs_started ON research_sessions(started_at);
|
||||
|
||||
-- Add session_id FK to sources table
|
||||
ALTER TABLE sources ADD COLUMN session_id INTEGER REFERENCES research_sessions(id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
|
||||
|
||||
-- Record migration
|
||||
INSERT INTO schema_version (version) VALUES (11);
|
||||
"""
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cost attribution: write extraction cost to sources.cost_usd
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Pricing per million tokens (as of March 2026)
|
||||
MODEL_PRICING = {
|
||||
"anthropic/claude-sonnet-4.5": {"input": 3.00, "output": 15.00},
|
||||
"anthropic/claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
|
||||
"anthropic/claude-haiku-4.5": {"input": 0.80, "output": 4.00},
|
||||
"anthropic/claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
|
||||
"minimax/minimax-m2.5": {"input": 0.14, "output": 0.56},
|
||||
}
|
||||
|
||||
|
||||
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
|
||||
"""Calculate USD cost from model name and token counts."""
|
||||
pricing = MODEL_PRICING.get(model)
|
||||
if not pricing:
|
||||
# Default to Sonnet 4.5 pricing as conservative estimate
|
||||
logger.warning("Unknown model %s — using Sonnet 4.5 pricing", model)
|
||||
pricing = {"input": 3.00, "output": 15.00}
|
||||
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
|
||||
|
||||
|
||||
def record_extraction_cost(
|
||||
conn: sqlite3.Connection,
|
||||
source_path: str,
|
||||
model: str,
|
||||
input_tokens: int,
|
||||
output_tokens: int,
|
||||
):
|
||||
"""Write extraction cost to both sources.cost_usd and costs table.
|
||||
|
||||
Call this after each successful extraction call in openrouter-extract-v2.py.
|
||||
This is the missing link — the CSV logger records tokens but never writes
|
||||
cost back to the DB.
|
||||
"""
|
||||
cost = calculate_cost(model, input_tokens, output_tokens)
|
||||
|
||||
# Update source row
|
||||
conn.execute(
|
||||
"UPDATE sources SET cost_usd = cost_usd + ?, extraction_model = ? WHERE path = ?",
|
||||
(cost, model, source_path),
|
||||
)
|
||||
|
||||
# Also record in costs table for dashboard aggregation
|
||||
date = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
conn.execute(
|
||||
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
|
||||
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
|
||||
ON CONFLICT(date, model, stage)
|
||||
DO UPDATE SET calls = calls + 1,
|
||||
input_tokens = input_tokens + excluded.input_tokens,
|
||||
output_tokens = output_tokens + excluded.output_tokens,
|
||||
cost_usd = cost_usd + excluded.cost_usd""",
|
||||
(date, model, input_tokens, output_tokens, cost),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
logger.info(
|
||||
"Recorded extraction cost for %s: $%.4f (%d in, %d out, %s)",
|
||||
source_path, cost, input_tokens, output_tokens, model,
|
||||
)
|
||||
return cost
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Research session lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def start_session(
|
||||
conn: sqlite3.Connection,
|
||||
agent: str,
|
||||
topic: str,
|
||||
domain: Optional[str] = None,
|
||||
reasoning: Optional[str] = None,
|
||||
sources_planned: int = 0,
|
||||
model: Optional[str] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
) -> int:
|
||||
"""Call at the START of a research session. Returns session_id.
|
||||
|
||||
The agent should call this before it begins producing sources,
|
||||
explaining what it plans to research and why.
|
||||
"""
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO research_sessions
|
||||
(agent, domain, topic, reasoning, sources_planned, model, metadata)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
agent,
|
||||
domain,
|
||||
topic,
|
||||
reasoning,
|
||||
sources_planned,
|
||||
model,
|
||||
json.dumps(metadata or {}),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
session_id = cur.lastrowid
|
||||
logger.info("Started research session #%d: %s / %s", session_id, agent, topic)
|
||||
return session_id
|
||||
|
||||
|
||||
def link_source_to_session(
|
||||
conn: sqlite3.Connection,
|
||||
source_path: str,
|
||||
session_id: int,
|
||||
):
|
||||
"""Link a source file to its research session.
|
||||
|
||||
Call this when a source is written to inbox/ during a research session.
|
||||
"""
|
||||
conn.execute(
|
||||
"UPDATE sources SET session_id = ? WHERE path = ?",
|
||||
(session_id, source_path),
|
||||
)
|
||||
conn.execute(
|
||||
"""UPDATE research_sessions
|
||||
SET sources_produced = sources_produced + 1
|
||||
WHERE id = ?""",
|
||||
(session_id,),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def complete_session(
|
||||
conn: sqlite3.Connection,
|
||||
session_id: int,
|
||||
summary: str,
|
||||
input_tokens: int = 0,
|
||||
output_tokens: int = 0,
|
||||
cost_usd: float = 0,
|
||||
status: str = "completed",
|
||||
):
|
||||
"""Call at the END of a research session.
|
||||
|
||||
The agent should summarize what it found most interesting/relevant.
|
||||
Cost should include ALL LLM calls made during the session (web search,
|
||||
analysis, source writing — everything).
|
||||
"""
|
||||
conn.execute(
|
||||
"""UPDATE research_sessions
|
||||
SET summary = ?, input_tokens = ?, output_tokens = ?,
|
||||
cost_usd = ?, status = ?, completed_at = datetime('now')
|
||||
WHERE id = ?""",
|
||||
(summary, input_tokens, output_tokens, cost_usd, status, session_id),
|
||||
)
|
||||
conn.commit()
|
||||
logger.info("Completed research session #%d: %s", session_id, status)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Source → PR linkage fix
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def ensure_source_path_on_pr(
|
||||
conn: sqlite3.Connection,
|
||||
pr_number: int,
|
||||
source_path: str,
|
||||
):
|
||||
"""Ensure prs.source_path is populated. Call during PR creation.
|
||||
|
||||
Currently 0/1451 PRs have source_path set. This is the fix.
|
||||
"""
|
||||
conn.execute(
|
||||
"UPDATE prs SET source_path = ? WHERE number = ? AND (source_path IS NULL OR source_path = '')",
|
||||
(source_path, pr_number),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backfill: attribute extraction costs from existing CSV log
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def backfill_extraction_costs(conn: sqlite3.Connection, csv_path: str):
|
||||
"""One-time backfill: read openrouter-usage.csv and write costs to sources + costs tables.
|
||||
|
||||
Run once to fill in the ~$338 of extraction costs that were logged to CSV
|
||||
but never written to the database.
|
||||
|
||||
Safe to re-run — only updates sources where cost_usd = 0, so partial
|
||||
runs can be resumed without double-counting.
|
||||
"""
|
||||
import csv
|
||||
|
||||
count = 0
|
||||
total_cost = 0.0
|
||||
with open(csv_path) as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
source_file = row.get("source_file", "")
|
||||
model = row.get("model", "")
|
||||
try:
|
||||
in_tok = int(row.get("input_tokens", 0) or 0)
|
||||
out_tok = int(row.get("output_tokens", 0) or 0)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
cost = calculate_cost(model, in_tok, out_tok)
|
||||
if cost <= 0:
|
||||
continue
|
||||
|
||||
# Try to match source_file to sources.path
|
||||
# CSV has filename, DB has full path — match on exact suffix
|
||||
# Use ORDER BY length(path) to prefer shortest (most specific) match
|
||||
matched = conn.execute(
|
||||
"SELECT path FROM sources WHERE path LIKE ? AND cost_usd = 0 ORDER BY length(path) LIMIT 1",
|
||||
(f"%/{source_file}" if "/" not in source_file else f"%{source_file}",),
|
||||
).fetchone()
|
||||
|
||||
if matched:
|
||||
conn.execute(
|
||||
"UPDATE sources SET cost_usd = ?, extraction_model = ? WHERE path = ?",
|
||||
(cost, model, matched[0]),
|
||||
)
|
||||
|
||||
# Always record in costs table
|
||||
date = row.get("date", "unknown")
|
||||
conn.execute(
|
||||
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
|
||||
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
|
||||
ON CONFLICT(date, model, stage)
|
||||
DO UPDATE SET calls = calls + 1,
|
||||
input_tokens = input_tokens + excluded.input_tokens,
|
||||
output_tokens = output_tokens + excluded.output_tokens,
|
||||
cost_usd = cost_usd + excluded.cost_usd""",
|
||||
(date, model, in_tok, out_tok, cost),
|
||||
)
|
||||
|
||||
count += 1
|
||||
total_cost += cost
|
||||
|
||||
conn.commit()
|
||||
logger.info("Backfilled %d extraction cost records, total $%.2f", count, total_cost)
|
||||
return count, total_cost
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backfill: populate prs.source_path from branch naming convention
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def backfill_source_paths(conn: sqlite3.Connection):
|
||||
"""One-time backfill: derive source_path for existing PRs from branch names.
|
||||
|
||||
Branch format: extract/YYYY-MM-DD-source-name or similar patterns.
|
||||
Source path format: inbox/queue/YYYY-MM-DD-source-name.md
|
||||
"""
|
||||
rows = conn.execute(
|
||||
"SELECT number, branch FROM prs WHERE source_path IS NULL AND branch IS NOT NULL"
|
||||
).fetchall()
|
||||
|
||||
count = 0
|
||||
for number, branch in rows:
|
||||
# Try to extract source name from branch
|
||||
# Common patterns: extract/source-name, claims/source-name
|
||||
parts = branch.split("/", 1)
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
source_stem = parts[1]
|
||||
|
||||
# Try to find matching source in DB — exact suffix match, shortest path wins
|
||||
matched = conn.execute(
|
||||
"SELECT path FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
|
||||
(f"%/{source_stem}%" if source_stem else "",),
|
||||
).fetchone()
|
||||
|
||||
if matched:
|
||||
conn.execute(
|
||||
"UPDATE prs SET source_path = ? WHERE number = ?",
|
||||
(matched[0], number),
|
||||
)
|
||||
count += 1
|
||||
|
||||
conn.commit()
|
||||
logger.info("Backfilled source_path for %d PRs", count)
|
||||
return count
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration points (for Epimetheus to wire in)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
INTEGRATION_GUIDE = """
|
||||
## Where to wire this in
|
||||
|
||||
### 1. openrouter-extract-v2.py — after successful extraction call
|
||||
|
||||
from research_tracking import record_extraction_cost
|
||||
|
||||
# After line 430 (content, usage = call_openrouter(...))
|
||||
# After line 672 (log_usage(...))
|
||||
record_extraction_cost(
|
||||
conn, args.source_file, args.model,
|
||||
usage.get("prompt_tokens", 0),
|
||||
usage.get("completion_tokens", 0),
|
||||
)
|
||||
|
||||
### 2. Agent research scripts — wrap research sessions
|
||||
|
||||
from research_tracking import start_session, link_source_to_session, complete_session
|
||||
|
||||
# At start of research:
|
||||
session_id = start_session(conn, agent="leo", topic="weapons stigmatization campaigns",
|
||||
domain="grand-strategy",
|
||||
reasoning="Following up on EU AI Act national security exclusion — exploring how stigmatization
|
||||
campaigns have historically driven arms control policy",
|
||||
sources_planned=6, model="claude-opus-4-6")
|
||||
|
||||
# As each source is written:
|
||||
link_source_to_session(conn, source_path, session_id)
|
||||
|
||||
# At end of research:
|
||||
complete_session(conn, session_id,
|
||||
summary="Ottawa Treaty mine ban model is the strongest parallel to AI weapons — same
|
||||
3-condition framework (humanitarian harm + low military utility + civil society
|
||||
coalition). Ukraine Shahed case is a near-miss triggering event.",
|
||||
input_tokens=total_in, output_tokens=total_out, cost_usd=total_cost)
|
||||
|
||||
### 3. PR creation in lib/merge.py or lib/validate.py — ensure source_path
|
||||
|
||||
from research_tracking import ensure_source_path_on_pr
|
||||
|
||||
# When creating a PR, pass the source:
|
||||
ensure_source_path_on_pr(conn, pr_number, source_path)
|
||||
|
||||
### 4. One-time backfills (run manually after migration)
|
||||
|
||||
from research_tracking import backfill_extraction_costs, backfill_source_paths
|
||||
|
||||
backfill_extraction_costs(conn, "/opt/teleo-eval/logs/openrouter-usage.csv")
|
||||
backfill_source_paths(conn)
|
||||
|
||||
### 5. Migration
|
||||
|
||||
Run MIGRATION_V11_SQL against pipeline.db after backing up.
|
||||
"""
|
||||
Loading…
Reference in a new issue