"""SQLite database — schema, migrations, connection management.""" import json import logging import sqlite3 from contextlib import contextmanager from . import config logger = logging.getLogger("pipeline.db") SCHEMA_VERSION = 3 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS sources ( path TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'unprocessed', -- unprocessed, triaging, extracting, extracted, null_result, -- needs_reextraction, error priority TEXT DEFAULT 'medium', -- critical, high, medium, low, skip priority_log TEXT DEFAULT '[]', -- JSON array: [{stage, priority, reasoning, ts}] extraction_model TEXT, claims_count INTEGER DEFAULT 0, pr_number INTEGER, transient_retries INTEGER DEFAULT 0, substantive_retries INTEGER DEFAULT 0, last_error TEXT, feedback TEXT, -- eval feedback for re-extraction (JSON) cost_usd REAL DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS prs ( number INTEGER PRIMARY KEY, source_path TEXT REFERENCES sources(path), branch TEXT, status TEXT NOT NULL DEFAULT 'open', -- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict -- conflict: rebase failed or merge timed out — needs human intervention domain TEXT, agent TEXT, tier TEXT, -- LIGHT, STANDARD, DEEP tier0_pass INTEGER, -- 0/1 leo_verdict TEXT DEFAULT 'pending', -- pending, approve, request_changes, skipped, failed domain_verdict TEXT DEFAULT 'pending', domain_agent TEXT, domain_model TEXT, priority TEXT, -- NULL = inherit from source. Set explicitly for human-submitted PRs. -- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium') -- Human PRs: 'critical' (detected via missing source_path or non-agent author) origin TEXT DEFAULT 'pipeline', -- pipeline | human | external transient_retries INTEGER DEFAULT 0, substantive_retries INTEGER DEFAULT 0, last_error TEXT, last_attempt TEXT, cost_usd REAL DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), merged_at TEXT ); CREATE TABLE IF NOT EXISTS costs ( date TEXT, model TEXT, stage TEXT, calls INTEGER DEFAULT 0, input_tokens INTEGER DEFAULT 0, output_tokens INTEGER DEFAULT 0, cost_usd REAL DEFAULT 0, PRIMARY KEY (date, model, stage) ); CREATE TABLE IF NOT EXISTS circuit_breakers ( name TEXT PRIMARY KEY, state TEXT DEFAULT 'closed', -- closed, open, halfopen failures INTEGER DEFAULT 0, successes INTEGER DEFAULT 0, tripped_at TEXT, last_success_at TEXT, -- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida) last_update TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), stage TEXT, event TEXT, detail TEXT ); CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status); CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status); CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain); CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date); CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage); """ def get_connection(readonly: bool = False) -> sqlite3.Connection: """Create a SQLite connection with WAL mode and proper settings.""" config.DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect( str(config.DB_PATH), timeout=30, isolation_level=None, # autocommit — we manage transactions explicitly ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=10000") conn.execute("PRAGMA foreign_keys=ON") if readonly: conn.execute("PRAGMA query_only=ON") return conn @contextmanager def transaction(conn: sqlite3.Connection): """Context manager for explicit transactions.""" conn.execute("BEGIN") try: yield conn conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise def migrate(conn: sqlite3.Connection): """Run schema migrations.""" conn.executescript(SCHEMA_SQL) # Check current version try: row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone() current = row["v"] if row and row["v"] else 0 except sqlite3.OperationalError: current = 0 # --- Incremental migrations --- if current < 2: # Phase 2: add multiplayer columns to prs table for stmt in [ "ALTER TABLE prs ADD COLUMN priority TEXT", "ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'", "ALTER TABLE prs ADD COLUMN last_error TEXT", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass # Column already exists (idempotent) logger.info("Migration v2: added priority, origin, last_error to prs") if current < 3: # Phase 3: retry budget — track eval attempts and issue tags per PR for stmt in [ "ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0", "ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'", ]: try: conn.execute(stmt) except sqlite3.OperationalError: pass # Column already exists (idempotent) logger.info("Migration v3: added eval_attempts, eval_issues to prs") if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,), ) logger.info("Database migrated to schema version %d", SCHEMA_VERSION) else: logger.debug("Database at schema version %d", current) def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None): """Write an audit log entry.""" conn.execute( "INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)", (stage, event, detail), ) def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str): """Append a priority assessment to a source's priority_log. NOTE: This does NOT update the source's priority column. The priority column is the authoritative priority, set only by initial triage or human override. The priority_log records each stage's opinion for offline calibration analysis. (Bug caught by Theseus — original version overwrote priority with each stage's opinion.) (Race condition fix per Vida — read-then-write wrapped in transaction.) """ conn.execute("BEGIN") try: row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone() if not row: conn.execute("ROLLBACK") return log = json.loads(row["priority_log"] or "[]") log.append({"stage": stage, "priority": priority, "reasoning": reasoning}) conn.execute( "UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?", (json.dumps(log), path), ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"): """Set a source's authoritative priority. Used for human overrides and initial triage.""" conn.execute( "UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?", (priority, path), ) append_priority_log(conn, path, "override", priority, reason)