teleo-infrastructure/lib/db.py
m3taversal 799249d470 Initial commit: Pipeline v2 daemon + infrastructure docs
- teleo-pipeline.py: async daemon with 4 stage loops (ingest/validate/evaluate/merge)
- lib/: config, db, evaluate, validate, merge, breaker, costs, health, log modules
- INFRASTRUCTURE.md: comprehensive deep-dive for onboarding
- teleo-pipeline.service: systemd unit file

Pentagon-Agent: Leo <294C3CA1-0205-4668-82FA-B984D54F48AD>
2026-03-12 14:11:18 +00:00

228 lines
7.3 KiB
Python

"""SQLite database — schema, migrations, connection management."""
import sqlite3
import json
import logging
from contextlib import contextmanager
from pathlib import Path
from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 2
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sources (
path TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'unprocessed',
-- unprocessed, triaging, extracting, extracted, null_result,
-- needs_reextraction, error
priority TEXT DEFAULT 'medium',
-- critical, high, medium, low, skip
priority_log TEXT DEFAULT '[]',
-- JSON array: [{stage, priority, reasoning, ts}]
extraction_model TEXT,
claims_count INTEGER DEFAULT 0,
pr_number INTEGER,
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS prs (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
-- conflict: rebase failed or merge timed out — needs human intervention
domain TEXT,
agent TEXT,
tier TEXT,
-- LIGHT, STANDARD, DEEP
tier0_pass INTEGER,
-- 0/1
leo_verdict TEXT DEFAULT 'pending',
-- pending, approve, request_changes, skipped, failed
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
origin TEXT DEFAULT 'pipeline',
-- pipeline | human | external
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
CREATE TABLE IF NOT EXISTS costs (
date TEXT,
model TEXT,
stage TEXT,
calls INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
PRIMARY KEY (date, model, stage)
);
CREATE TABLE IF NOT EXISTS circuit_breakers (
name TEXT PRIMARY KEY,
state TEXT DEFAULT 'closed',
-- closed, open, halfopen
failures INTEGER DEFAULT 0,
successes INTEGER DEFAULT 0,
tripped_at TEXT,
last_success_at TEXT,
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
last_update TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
stage TEXT,
event TEXT,
detail TEXT
);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
"""
def get_connection(readonly: bool = False) -> sqlite3.Connection:
"""Create a SQLite connection with WAL mode and proper settings."""
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(
str(config.DB_PATH),
timeout=30,
isolation_level=None, # autocommit — we manage transactions explicitly
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
conn.execute("PRAGMA foreign_keys=ON")
if readonly:
conn.execute("PRAGMA query_only=ON")
return conn
@contextmanager
def transaction(conn: sqlite3.Connection):
"""Context manager for explicit transactions."""
conn.execute("BEGIN")
try:
yield conn
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def migrate(conn: sqlite3.Connection):
"""Run schema migrations."""
conn.executescript(SCHEMA_SQL)
# Check current version
try:
row = conn.execute(
"SELECT MAX(version) as v FROM schema_version"
).fetchone()
current = row["v"] if row and row["v"] else 0
except sqlite3.OperationalError:
current = 0
# --- Incremental migrations ---
if current < 2:
# Phase 2: add multiplayer columns to prs table
for stmt in [
"ALTER TABLE prs ADD COLUMN priority TEXT",
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
"ALTER TABLE prs ADD COLUMN last_error TEXT",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v2: added priority, origin, last_error to prs")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
else:
logger.debug("Database at schema version %d", current)
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None):
"""Write an audit log entry."""
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
(stage, event, detail),
)
def append_priority_log(
conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str
):
"""Append a priority assessment to a source's priority_log.
NOTE: This does NOT update the source's priority column. The priority column
is the authoritative priority, set only by initial triage or human override.
The priority_log records each stage's opinion for offline calibration analysis.
(Bug caught by Theseus — original version overwrote priority with each stage's opinion.)
(Race condition fix per Vida — read-then-write wrapped in transaction.)
"""
conn.execute("BEGIN")
try:
row = conn.execute(
"SELECT priority_log FROM sources WHERE path = ?", (path,)
).fetchone()
if not row:
conn.execute("ROLLBACK")
return
log = json.loads(row["priority_log"] or "[]")
log.append(
{"stage": stage, "priority": priority, "reasoning": reasoning}
)
conn.execute(
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
(json.dumps(log), path),
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
conn.execute(
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
(priority, path),
)
append_priority_log(conn, path, "override", priority, reason)