"""SQLite database — schema, migrations, connection management.""" import json import logging import sqlite3 from contextlib import contextmanager from . import config logger = logging.getLogger("pipeline.db") SCHEMA_VERSION = 12 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS sources ( path TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'unprocessed', -- unprocessed, triaging, extracting, extracted, null_result, -- needs_reextraction, error priority TEXT DEFAULT 'medium', -- critical, high, medium, low, skip priority_log TEXT DEFAULT '[]', -- JSON array: [{stage, priority, reasoning, ts}] extraction_model TEXT, claims_count INTEGER DEFAULT 0, pr_number INTEGER, transient_retries INTEGER DEFAULT 0, substantive_retries INTEGER DEFAULT 0, last_error TEXT, feedback TEXT, -- eval feedback for re-extraction (JSON) cost_usd REAL DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS prs ( number INTEGER PRIMARY KEY, source_path TEXT REFERENCES sources(path), branch TEXT, status TEXT NOT NULL DEFAULT 'open', -- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict -- conflict: rebase failed or merge timed out — needs human intervention domain TEXT, agent TEXT, commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')), tier TEXT, -- LIGHT, STANDARD, DEEP tier0_pass INTEGER, -- 0/1 leo_verdict TEXT DEFAULT 'pending', -- pending, approve, request_changes, skipped, failed domain_verdict TEXT DEFAULT 'pending', domain_agent TEXT, domain_model TEXT, priority TEXT, -- NULL = inherit from source. Set explicitly for human-submitted PRs. -- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium') -- Human PRs: 'critical' (detected via missing source_path or non-agent author) origin TEXT DEFAULT 'pipeline', -- pipeline | human | external transient_retries INTEGER DEFAULT 0, substantive_retries INTEGER DEFAULT 0, last_error TEXT, last_attempt TEXT, cost_usd REAL DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), merged_at TEXT ); CREATE TABLE IF NOT EXISTS costs ( date TEXT, model TEXT, stage TEXT, calls INTEGER DEFAULT 0, input_tokens INTEGER DEFAULT 0, output_tokens INTEGER DEFAULT 0, cost_usd REAL DEFAULT 0, PRIMARY KEY (date, model, stage) ); CREATE TABLE IF NOT EXISTS circuit_breakers ( name TEXT PRIMARY KEY, state TEXT DEFAULT 'closed', -- closed, open, halfopen failures INTEGER DEFAULT 0, successes INTEGER DEFAULT 0, tripped_at TEXT, last_success_at TEXT, -- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida) last_update TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), stage TEXT, event TEXT, detail TEXT ); CREATE TABLE IF NOT EXISTS response_audit ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (datetime('now')), chat_id INTEGER, user TEXT, agent TEXT DEFAULT 'rio', model TEXT, query TEXT, conversation_window TEXT, -- JSON: prior N messages for context -- NOTE: intentional duplication of transcript data for audit self-containment. -- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt -- context inline for retrieval-quality diagnosis. Primary driver of row size — -- target for cleanup when 90-day retention policy lands. entities_matched TEXT, -- JSON: [{name, path, score, used_in_response}] claims_matched TEXT, -- JSON: [{path, title, score, source, used_in_response}] retrieval_layers_hit TEXT, -- JSON: ["keyword","qdrant","graph"] retrieval_gap TEXT, -- What the KB was missing (if anything) market_data TEXT, -- JSON: injected token prices research_context TEXT, -- Haiku pre-pass results if any kb_context_text TEXT, -- Full context string sent to model tool_calls TEXT, -- JSON: ordered array [{tool, input, output, duration_ms, ts}] raw_response TEXT, display_response TEXT, confidence_score REAL, -- Model self-rated retrieval quality 0.0-1.0 response_time_ms INTEGER, -- Eval pipeline columns (v10) prompt_tokens INTEGER, completion_tokens INTEGER, generation_cost REAL, embedding_cost REAL, total_cost REAL, blocked INTEGER DEFAULT 0, block_reason TEXT, query_type TEXT, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status); CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status); CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain); CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date); CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage); CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); """ def get_connection(readonly: bool = False) -> sqlite3.Connection: """Create a SQLite connection with WAL mode and proper settings.""" config.DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect( str(config.DB_PATH), timeout=30, isolation_level=None, # autocommit — we manage transactions explicitly ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=10000") conn.execute("PRAGMA foreign_keys=ON") if readonly: conn.execute("PRAGMA query_only=ON") return conn @contextmanager def transaction(conn: sqlite3.Connection): """Context manager for explicit transactions.""" conn.execute("BEGIN") try: yield conn conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise # Branch prefix → (agent, commit_type) mapping. # Single source of truth — used by merge.py at INSERT time and migration v7 backfill. # Unknown prefixes → ('unknown', 'unknown') + warning log. BRANCH_PREFIX_MAP = { "extract": ("pipeline", "extract"), "ingestion": ("pipeline", "extract"), "epimetheus": ("epimetheus", "extract"), "rio": ("rio", "research"), "theseus": ("theseus", "research"), "astra": ("astra", "research"), "vida": ("vida", "research"), "clay": ("clay", "research"), "leo": ("leo", "entity"), "reweave": ("pipeline", "reweave"), "fix": ("pipeline", "fix"), } def classify_branch(branch: str) -> tuple[str, str]: """Derive (agent, commit_type) from branch prefix. Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes. """ prefix = branch.split("/", 1)[0] if "/" in branch else branch result = BRANCH_PREFIX_MAP.get(prefix) if result is None: logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch) return ("unknown", "unknown") return result def migrate(conn: sqlite3.Connection): """Run schema migrations.""" conn.executescript(SCHEMA_SQL) # Check current version try: row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone() current = row["v"] if row and row["v"] else 0 except sqlite3.OperationalError: current = 0 # --- Incremental migrations --- if current < 2: # Phase 2: add multiplayer columns to prs table for stmt in [ "ALTER TABLE prs ADD COLUMN priority TEXT", "ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'", "ALTER TABLE prs ADD COLUMN last_error TEXT", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass # Column already exists (idempotent) logger.info("Migration v2: added priority, origin, last_error to prs") if current < 3: # Phase 3: retry budget — track eval attempts and issue tags per PR for stmt in [ "ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0", "ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass # Column already exists (idempotent) logger.info("Migration v3: added eval_attempts, eval_issues to prs") if current < 4: # Phase 4: auto-fixer — track fix attempts per PR for stmt in [ "ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass # Column already exists (idempotent) logger.info("Migration v4: added fix_attempts to prs") if current < 5: # Phase 5: contributor identity system — tracks who contributed what # Aligned with schemas/attribution.md (5 roles) + Leo's tier system. # CI is COMPUTED from raw counts × weights, never stored. conn.executescript(""" CREATE TABLE IF NOT EXISTS contributors ( handle TEXT PRIMARY KEY, display_name TEXT, agent_id TEXT, first_contribution TEXT, last_contribution TEXT, tier TEXT DEFAULT 'new', -- new, contributor, veteran sourcer_count INTEGER DEFAULT 0, extractor_count INTEGER DEFAULT 0, challenger_count INTEGER DEFAULT 0, synthesizer_count INTEGER DEFAULT 0, reviewer_count INTEGER DEFAULT 0, claims_merged INTEGER DEFAULT 0, challenges_survived INTEGER DEFAULT 0, domains TEXT DEFAULT '[]', highlights TEXT DEFAULT '[]', identities TEXT DEFAULT '{}', created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier); """) logger.info("Migration v5: added contributors table") if current < 6: # Phase 6: analytics — time-series metrics snapshots for trending dashboard conn.executescript(""" CREATE TABLE IF NOT EXISTS metrics_snapshots ( ts TEXT DEFAULT (datetime('now')), throughput_1h INTEGER, approval_rate REAL, open_prs INTEGER, merged_total INTEGER, closed_total INTEGER, conflict_total INTEGER, evaluated_24h INTEGER, fix_success_rate REAL, rejection_broken_wiki_links INTEGER DEFAULT 0, rejection_frontmatter_schema INTEGER DEFAULT 0, rejection_near_duplicate INTEGER DEFAULT 0, rejection_confidence INTEGER DEFAULT 0, rejection_other INTEGER DEFAULT 0, extraction_model TEXT, eval_domain_model TEXT, eval_leo_model TEXT, prompt_version TEXT, pipeline_version TEXT, source_origin_agent INTEGER DEFAULT 0, source_origin_human INTEGER DEFAULT 0, source_origin_scraper INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts); """) logger.info("Migration v6: added metrics_snapshots table for analytics dashboard") if current < 7: # Phase 7: agent attribution + commit_type for dashboard # commit_type column + backfill agent/commit_type from branch prefix try: conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))") except sqlite3.OperationalError: pass # column already exists from CREATE TABLE # Backfill agent and commit_type from branch prefix rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall() for row in rows: agent, commit_type = classify_branch(row["branch"]) conn.execute( "UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)", (agent, commit_type, row["number"]), ) backfilled = len(rows) logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled) if current < 8: # Phase 8: response audit — full-chain visibility for agent response quality # Captures: query → tool calls → retrieval → context → response → confidence # Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops) conn.executescript(""" CREATE TABLE IF NOT EXISTS response_audit ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (datetime('now')), chat_id INTEGER, user TEXT, agent TEXT DEFAULT 'rio', model TEXT, query TEXT, conversation_window TEXT, -- intentional transcript duplication for audit self-containment entities_matched TEXT, claims_matched TEXT, retrieval_layers_hit TEXT, retrieval_gap TEXT, market_data TEXT, research_context TEXT, kb_context_text TEXT, tool_calls TEXT, raw_response TEXT, display_response TEXT, confidence_score REAL, response_time_ms INTEGER, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); """) logger.info("Migration v8: added response_audit table for agent response auditing") if current < 9: # Phase 9: rebuild prs table to expand CHECK constraint on commit_type. # SQLite cannot ALTER CHECK constraints in-place — must rebuild table. # Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown # New constraint: adds challenge,enrich,synthesize # Also re-derive commit_type from branch prefix for rows with invalid/NULL values. # Step 1: Get all column names from existing table cols_info = conn.execute("PRAGMA table_info(prs)").fetchall() col_names = [c["name"] for c in cols_info] col_list = ", ".join(col_names) # Step 2: Create new table with expanded CHECK constraint conn.executescript(f""" CREATE TABLE prs_new ( number INTEGER PRIMARY KEY, source_path TEXT REFERENCES sources(path), branch TEXT, status TEXT NOT NULL DEFAULT 'open', domain TEXT, agent TEXT, commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')), tier TEXT, tier0_pass INTEGER, leo_verdict TEXT DEFAULT 'pending', domain_verdict TEXT DEFAULT 'pending', domain_agent TEXT, domain_model TEXT, priority TEXT, origin TEXT DEFAULT 'pipeline', transient_retries INTEGER DEFAULT 0, substantive_retries INTEGER DEFAULT 0, last_error TEXT, last_attempt TEXT, cost_usd REAL DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), merged_at TEXT ); INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs; DROP TABLE prs; ALTER TABLE prs_new RENAME TO prs; """) logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint") # Step 3: Re-derive commit_type from branch prefix for invalid/NULL values rows = conn.execute( """SELECT number, branch FROM prs WHERE branch IS NOT NULL AND (commit_type IS NULL OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))""" ).fetchall() fixed = 0 for row in rows: agent, commit_type = classify_branch(row["branch"]) conn.execute( "UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?", (agent, commit_type, row["number"]), ) fixed += 1 conn.commit() logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed) if current < 10: # Add eval pipeline columns to response_audit # VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern for col_def in [ ("prompt_tokens", "INTEGER"), ("completion_tokens", "INTEGER"), ("generation_cost", "REAL"), ("embedding_cost", "REAL"), ("total_cost", "REAL"), ("blocked", "INTEGER DEFAULT 0"), ("block_reason", "TEXT"), ("query_type", "TEXT"), ]: try: conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}") except sqlite3.OperationalError: pass # Column already exists conn.commit() logger.info("Migration v10: added eval pipeline columns to response_audit") if current < 11: # Phase 11: compute tracking — extended costs table columns # (May already exist on VPS from manual deploy — idempotent ALTERs) for col_def in [ ("duration_ms", "INTEGER DEFAULT 0"), ("cache_read_tokens", "INTEGER DEFAULT 0"), ("cache_write_tokens", "INTEGER DEFAULT 0"), ("cost_estimate_usd", "REAL DEFAULT 0"), ]: try: conn.execute(f"ALTER TABLE costs ADD COLUMN {col_def[0]} {col_def[1]}") except sqlite3.OperationalError: pass # Column already exists conn.commit() logger.info("Migration v11: added compute tracking columns to costs") if current < 12: # Phase 12: structured review records — captures all evaluation outcomes # including rejections, disagreements, and approved-with-changes. # Schema locked with Leo (2026-04-01). conn.executescript(""" CREATE TABLE IF NOT EXISTS review_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, pr_number INTEGER NOT NULL, claim_path TEXT, domain TEXT, agent TEXT, reviewer TEXT NOT NULL, reviewer_model TEXT, outcome TEXT NOT NULL CHECK (outcome IN ('approved', 'approved-with-changes', 'rejected')), rejection_reason TEXT CHECK (rejection_reason IS NULL OR rejection_reason IN ( 'fails-standalone-test', 'duplicate', 'scope-mismatch', 'evidence-insufficient', 'framing-poor', 'other' )), disagreement_type TEXT CHECK (disagreement_type IS NULL OR disagreement_type IN ( 'factual', 'scope', 'framing', 'evidence' )), notes TEXT, batch_id TEXT, claims_in_batch INTEGER DEFAULT 1, reviewed_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number); CREATE INDEX IF NOT EXISTS idx_review_records_outcome ON review_records(outcome); CREATE INDEX IF NOT EXISTS idx_review_records_domain ON review_records(domain); CREATE INDEX IF NOT EXISTS idx_review_records_reviewer ON review_records(reviewer); """) logger.info("Migration v12: created review_records table") if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,), ) conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML logger.info("Database migrated to schema version %d", SCHEMA_VERSION) else: logger.debug("Database at schema version %d", current) def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None): """Write an audit log entry.""" conn.execute( "INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)", (stage, event, detail), ) def record_review(conn, pr_number: int, reviewer: str, outcome: str, *, claim_path: str = None, domain: str = None, agent: str = None, reviewer_model: str = None, rejection_reason: str = None, disagreement_type: str = None, notes: str = None, claims_in_batch: int = 1): """Record a structured review outcome. Called from evaluate stage after Leo/domain reviewer returns a verdict. outcome must be: approved, approved-with-changes, or rejected. """ batch_id = str(pr_number) conn.execute( """INSERT INTO review_records (pr_number, claim_path, domain, agent, reviewer, reviewer_model, outcome, rejection_reason, disagreement_type, notes, batch_id, claims_in_batch) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (pr_number, claim_path, domain, agent, reviewer, reviewer_model, outcome, rejection_reason, disagreement_type, notes, batch_id, claims_in_batch), ) def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str): """Append a priority assessment to a source's priority_log. NOTE: This does NOT update the source's priority column. The priority column is the authoritative priority, set only by initial triage or human override. The priority_log records each stage's opinion for offline calibration analysis. (Bug caught by Theseus — original version overwrote priority with each stage's opinion.) (Race condition fix per Vida — read-then-write wrapped in transaction.) """ conn.execute("BEGIN") try: row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone() if not row: conn.execute("ROLLBACK") return log = json.loads(row["priority_log"] or "[]") log.append({"stage": stage, "priority": priority, "reasoning": reasoning}) conn.execute( "UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?", (json.dumps(log), path), ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise def insert_response_audit(conn: sqlite3.Connection, **kwargs): """Insert a response audit record. All fields optional except query.""" cols = [ "timestamp", "chat_id", "user", "agent", "model", "query", "conversation_window", "entities_matched", "claims_matched", "retrieval_layers_hit", "retrieval_gap", "market_data", "research_context", "kb_context_text", "tool_calls", "raw_response", "display_response", "confidence_score", "response_time_ms", # Eval pipeline columns (v10) "prompt_tokens", "completion_tokens", "generation_cost", "embedding_cost", "total_cost", "blocked", "block_reason", "query_type", ] present = {k: v for k, v in kwargs.items() if k in cols and v is not None} if not present: return col_names = ", ".join(present.keys()) placeholders = ", ".join("?" for _ in present) conn.execute( f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})", tuple(present.values()), ) def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"): """Set a source's authoritative priority. Used for human overrides and initial triage.""" conn.execute( "UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?", (priority, path), ) append_priority_log(conn, path, "override", priority, reason)