1035 lines
43 KiB
Python
1035 lines
43 KiB
Python
"""SQLite database — schema, migrations, connection management."""
|
|
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
|
|
from . import config
|
|
|
|
logger = logging.getLogger("pipeline.db")
|
|
|
|
SCHEMA_VERSION = 27
|
|
|
|
SCHEMA_SQL = """
|
|
CREATE TABLE IF NOT EXISTS schema_version (
|
|
version INTEGER PRIMARY KEY,
|
|
applied_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sources (
|
|
path TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL DEFAULT 'unprocessed',
|
|
-- unprocessed, triaging, extracting, extracted, null_result,
|
|
-- needs_reextraction, error
|
|
priority TEXT DEFAULT 'medium',
|
|
-- critical, high, medium, low, skip
|
|
priority_log TEXT DEFAULT '[]',
|
|
-- JSON array: [{stage, priority, reasoning, ts}]
|
|
extraction_model TEXT,
|
|
claims_count INTEGER DEFAULT 0,
|
|
pr_number INTEGER,
|
|
transient_retries INTEGER DEFAULT 0,
|
|
substantive_retries INTEGER DEFAULT 0,
|
|
last_error TEXT,
|
|
feedback TEXT,
|
|
-- eval feedback for re-extraction (JSON)
|
|
cost_usd REAL DEFAULT 0,
|
|
-- v26: provenance — publisher (news org / venue) + content author.
|
|
-- publisher_id references publishers(id) when source is from a known org.
|
|
-- original_author_handle references contributors(handle) when author is in our system.
|
|
-- original_author is free-text fallback ("Kim et al.", "Robin Hanson") — not credit-bearing.
|
|
publisher_id INTEGER REFERENCES publishers(id),
|
|
content_type TEXT,
|
|
-- article | paper | tweet | conversation | self_authored | webpage | podcast
|
|
original_author TEXT,
|
|
original_author_handle TEXT REFERENCES contributors(handle),
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS prs (
|
|
number INTEGER PRIMARY KEY,
|
|
source_path TEXT REFERENCES sources(path),
|
|
branch TEXT,
|
|
status TEXT NOT NULL DEFAULT 'open',
|
|
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
|
|
-- conflict: rebase failed or merge timed out — needs human intervention
|
|
domain TEXT,
|
|
agent TEXT,
|
|
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')),
|
|
tier TEXT,
|
|
-- LIGHT, STANDARD, DEEP
|
|
tier0_pass INTEGER,
|
|
-- 0/1
|
|
leo_verdict TEXT DEFAULT 'pending',
|
|
-- pending, approve, request_changes, skipped, failed
|
|
domain_verdict TEXT DEFAULT 'pending',
|
|
domain_agent TEXT,
|
|
domain_model TEXT,
|
|
priority TEXT,
|
|
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
|
|
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
|
|
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
|
|
origin TEXT DEFAULT 'pipeline',
|
|
-- pipeline | human | external
|
|
transient_retries INTEGER DEFAULT 0,
|
|
substantive_retries INTEGER DEFAULT 0,
|
|
last_error TEXT,
|
|
last_attempt TEXT,
|
|
cost_usd REAL DEFAULT 0,
|
|
auto_merge INTEGER DEFAULT 0,
|
|
github_pr INTEGER,
|
|
source_channel TEXT,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
merged_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS costs (
|
|
date TEXT,
|
|
model TEXT,
|
|
stage TEXT,
|
|
calls INTEGER DEFAULT 0,
|
|
input_tokens INTEGER DEFAULT 0,
|
|
output_tokens INTEGER DEFAULT 0,
|
|
cost_usd REAL DEFAULT 0,
|
|
duration_ms INTEGER DEFAULT 0,
|
|
cache_read_tokens INTEGER DEFAULT 0,
|
|
cache_write_tokens INTEGER DEFAULT 0,
|
|
cost_estimate_usd REAL DEFAULT 0,
|
|
PRIMARY KEY (date, model, stage)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS circuit_breakers (
|
|
name TEXT PRIMARY KEY,
|
|
state TEXT DEFAULT 'closed',
|
|
-- closed, open, halfopen
|
|
failures INTEGER DEFAULT 0,
|
|
successes INTEGER DEFAULT 0,
|
|
tripped_at TEXT,
|
|
last_success_at TEXT,
|
|
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
|
|
last_update TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS audit_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT DEFAULT (datetime('now')),
|
|
stage TEXT,
|
|
event TEXT,
|
|
detail TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS response_audit (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
chat_id INTEGER,
|
|
user TEXT,
|
|
agent TEXT DEFAULT 'rio',
|
|
model TEXT,
|
|
query TEXT,
|
|
conversation_window TEXT,
|
|
-- JSON: prior N messages for context
|
|
-- NOTE: intentional duplication of transcript data for audit self-containment.
|
|
-- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt
|
|
-- context inline for retrieval-quality diagnosis. Primary driver of row size —
|
|
-- target for cleanup when 90-day retention policy lands.
|
|
entities_matched TEXT,
|
|
-- JSON: [{name, path, score, used_in_response}]
|
|
claims_matched TEXT,
|
|
-- JSON: [{path, title, score, source, used_in_response}]
|
|
retrieval_layers_hit TEXT,
|
|
-- JSON: ["keyword","qdrant","graph"]
|
|
retrieval_gap TEXT,
|
|
-- What the KB was missing (if anything)
|
|
market_data TEXT,
|
|
-- JSON: injected token prices
|
|
research_context TEXT,
|
|
-- Haiku pre-pass results if any
|
|
kb_context_text TEXT,
|
|
-- Full context string sent to model
|
|
tool_calls TEXT,
|
|
-- JSON: ordered array [{tool, input, output, duration_ms, ts}]
|
|
raw_response TEXT,
|
|
display_response TEXT,
|
|
confidence_score REAL,
|
|
-- Model self-rated retrieval quality 0.0-1.0
|
|
response_time_ms INTEGER,
|
|
-- Eval pipeline columns (v10)
|
|
prompt_tokens INTEGER,
|
|
completion_tokens INTEGER,
|
|
generation_cost REAL,
|
|
embedding_cost REAL,
|
|
total_cost REAL,
|
|
blocked INTEGER DEFAULT 0,
|
|
block_reason TEXT,
|
|
query_type TEXT,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
|
|
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
|
|
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
|
|
CREATE INDEX IF NOT EXISTS idx_prs_source_path ON prs(source_path) WHERE source_path IS NOT NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
|
|
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
|
|
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
|
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
|
|
|
-- Event-sourced contributions (schema v24).
|
|
-- One row per credit-earning event. Idempotent via two partial UNIQUE indexes
|
|
-- (SQLite treats NULL != NULL in UNIQUE constraints, so a single composite
|
|
-- UNIQUE with nullable claim_path would allow evaluator-event duplicates).
|
|
-- Leaderboards are SQL aggregations over this table; contributors becomes a materialized cache.
|
|
CREATE TABLE IF NOT EXISTS contribution_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
handle TEXT NOT NULL,
|
|
kind TEXT NOT NULL DEFAULT 'person',
|
|
-- person | org | agent
|
|
role TEXT NOT NULL,
|
|
-- author | originator | challenger | synthesizer | evaluator
|
|
weight REAL NOT NULL,
|
|
pr_number INTEGER NOT NULL,
|
|
claim_path TEXT,
|
|
-- NULL for PR-level events (e.g. evaluator). Set for per-claim events.
|
|
domain TEXT,
|
|
channel TEXT,
|
|
-- telegram | github | agent | web | unknown
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
-- Per-claim events: unique on (handle, role, pr_number, claim_path) when path IS NOT NULL.
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
|
|
handle, role, pr_number, claim_path
|
|
) WHERE claim_path IS NOT NULL;
|
|
-- PR-level events (evaluator, author, trailer-based): unique on (handle, role, pr_number) when path IS NULL.
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
|
|
handle, role, pr_number
|
|
) WHERE claim_path IS NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
|
|
|
|
-- Handle aliasing. @thesensatore → thesensatore. cameron → cameron-s1.
|
|
-- Writers call resolve_alias(handle) before inserting events or upserting contributors.
|
|
CREATE TABLE IF NOT EXISTS contributor_aliases (
|
|
alias TEXT PRIMARY KEY,
|
|
canonical TEXT NOT NULL,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
|
|
|
|
-- Publishers: news orgs, academic venues, social platforms. NOT contributors — these
|
|
-- provide metadata/provenance for sources, never earn leaderboard credit. Separating
|
|
-- these from contributors prevents CNBC/SpaceNews from dominating the leaderboard.
|
|
-- (Apr 24 Cory directive: "only credit the original source if its on X or tg")
|
|
CREATE TABLE IF NOT EXISTS publishers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
|
|
url_pattern TEXT,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
|
|
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
|
|
|
|
-- Multi-platform identity: one contributor, many handles. Enables the leaderboard to
|
|
-- unify @thesensatore (X) + thesensatore (TG) + thesensatore@github into one person.
|
|
-- Writers check this table after resolving aliases to find canonical contributor handle.
|
|
CREATE TABLE IF NOT EXISTS contributor_identities (
|
|
contributor_handle TEXT NOT NULL,
|
|
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
|
|
platform_handle TEXT NOT NULL,
|
|
verified INTEGER DEFAULT 0,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
PRIMARY KEY (platform, platform_handle)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
|
|
"""
|
|
|
|
|
|
def get_connection(readonly: bool = False) -> sqlite3.Connection:
|
|
"""Create a SQLite connection with WAL mode and proper settings."""
|
|
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(
|
|
str(config.DB_PATH),
|
|
timeout=30,
|
|
isolation_level=None, # autocommit — we manage transactions explicitly
|
|
)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=10000")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
if readonly:
|
|
conn.execute("PRAGMA query_only=ON")
|
|
return conn
|
|
|
|
|
|
@contextmanager
|
|
def transaction(conn: sqlite3.Connection):
|
|
"""Context manager for explicit transactions."""
|
|
conn.execute("BEGIN")
|
|
try:
|
|
yield conn
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
raise
|
|
|
|
|
|
# Branch prefix → (agent, commit_type) mapping.
|
|
# Single source of truth — used by merge.py at INSERT time and migration v7 backfill.
|
|
# Unknown prefixes → ('unknown', 'unknown') + warning log.
|
|
# Keep in sync with _CHANNEL_MAP below.
|
|
BRANCH_PREFIX_MAP = {
|
|
"extract": ("pipeline", "extract"),
|
|
"ingestion": ("pipeline", "extract"),
|
|
"epimetheus": ("epimetheus", "extract"),
|
|
"rio": ("rio", "research"),
|
|
"theseus": ("theseus", "research"),
|
|
"astra": ("astra", "research"),
|
|
"vida": ("vida", "research"),
|
|
"clay": ("clay", "research"),
|
|
"leo": ("leo", "entity"),
|
|
"reweave": ("pipeline", "reweave"),
|
|
"fix": ("pipeline", "fix"),
|
|
"contrib": ("external", "contrib"),
|
|
}
|
|
|
|
|
|
def classify_branch(branch: str) -> tuple[str, str]:
|
|
"""Derive (agent, commit_type) from branch prefix.
|
|
|
|
Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes.
|
|
"""
|
|
prefix = branch.split("/", 1)[0] if "/" in branch else branch
|
|
# Fork PR branches: gh-pr-N/original-branch
|
|
if prefix.startswith("gh-pr-"):
|
|
return ("external", "contrib")
|
|
result = BRANCH_PREFIX_MAP.get(prefix)
|
|
if result is None:
|
|
logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch)
|
|
return ("unknown", "unknown")
|
|
return result
|
|
|
|
|
|
# Keep in sync with BRANCH_PREFIX_MAP above.
|
|
#
|
|
# Valid source_channel values: github | telegram | agent | maintenance | web | unknown
|
|
# - github: external contributor PR (set via sync-mirror.sh github_pr linking,
|
|
# or from gh-pr-* branches, or any time github_pr is provided)
|
|
# - telegram: message captured by telegram bot (must be tagged explicitly by
|
|
# ingestion — extract/* default is "unknown" because the bare branch prefix
|
|
# can no longer distinguish telegram-origin from github-origin extractions)
|
|
# - agent: per-agent research branches (rio/, theseus/, etc.)
|
|
# - maintenance: pipeline housekeeping (reweave/, epimetheus/, fix/)
|
|
# - web: future in-app submissions (chat UI or form posts)
|
|
# - unknown: fallback when provenance cannot be determined
|
|
_CHANNEL_MAP = {
|
|
"extract": "unknown",
|
|
"ingestion": "unknown",
|
|
"rio": "agent",
|
|
"theseus": "agent",
|
|
"astra": "agent",
|
|
"vida": "agent",
|
|
"clay": "agent",
|
|
"leo": "agent",
|
|
"oberon": "agent",
|
|
"reweave": "maintenance",
|
|
"epimetheus": "maintenance",
|
|
"fix": "maintenance",
|
|
}
|
|
|
|
|
|
def classify_source_channel(branch: str, *, github_pr: int = None) -> str:
|
|
"""Derive source_channel from branch prefix and github_pr flag.
|
|
|
|
Precedence: github_pr flag > gh-pr- branch prefix > _CHANNEL_MAP lookup.
|
|
extract/* defaults to "unknown" — callers with better provenance (telegram
|
|
bot, web submission handler) must override at PR-insert time.
|
|
"""
|
|
if github_pr is not None or branch.startswith("gh-pr-"):
|
|
return "github"
|
|
prefix = branch.split("/", 1)[0] if "/" in branch else branch
|
|
return _CHANNEL_MAP.get(prefix, "unknown")
|
|
|
|
|
|
def migrate(conn: sqlite3.Connection):
|
|
"""Run schema migrations."""
|
|
conn.executescript(SCHEMA_SQL)
|
|
|
|
# Check current version
|
|
try:
|
|
row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone()
|
|
current = row["v"] if row and row["v"] else 0
|
|
except sqlite3.OperationalError:
|
|
current = 0
|
|
|
|
# --- Incremental migrations ---
|
|
if current < 2:
|
|
# Phase 2: add multiplayer columns to prs table
|
|
for stmt in [
|
|
"ALTER TABLE prs ADD COLUMN priority TEXT",
|
|
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
|
|
"ALTER TABLE prs ADD COLUMN last_error TEXT",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists (idempotent)
|
|
logger.info("Migration v2: added priority, origin, last_error to prs")
|
|
|
|
if current < 3:
|
|
# Phase 3: retry budget — track eval attempts and issue tags per PR
|
|
for stmt in [
|
|
"ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0",
|
|
"ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists (idempotent)
|
|
logger.info("Migration v3: added eval_attempts, eval_issues to prs")
|
|
|
|
if current < 4:
|
|
# Phase 4: auto-fixer — track fix attempts per PR
|
|
for stmt in [
|
|
"ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists (idempotent)
|
|
logger.info("Migration v4: added fix_attempts to prs")
|
|
|
|
if current < 5:
|
|
# Phase 5: contributor identity system — tracks who contributed what
|
|
# Aligned with schemas/attribution.md (5 roles) + Leo's tier system.
|
|
# CI is COMPUTED from raw counts x weights, never stored.
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS contributors (
|
|
handle TEXT PRIMARY KEY,
|
|
display_name TEXT,
|
|
agent_id TEXT,
|
|
first_contribution TEXT,
|
|
last_contribution TEXT,
|
|
tier TEXT DEFAULT 'new',
|
|
-- new, contributor, veteran
|
|
sourcer_count INTEGER DEFAULT 0,
|
|
extractor_count INTEGER DEFAULT 0,
|
|
challenger_count INTEGER DEFAULT 0,
|
|
synthesizer_count INTEGER DEFAULT 0,
|
|
reviewer_count INTEGER DEFAULT 0,
|
|
claims_merged INTEGER DEFAULT 0,
|
|
challenges_survived INTEGER DEFAULT 0,
|
|
domains TEXT DEFAULT '[]',
|
|
highlights TEXT DEFAULT '[]',
|
|
identities TEXT DEFAULT '{}',
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier);
|
|
""")
|
|
logger.info("Migration v5: added contributors table")
|
|
|
|
if current < 6:
|
|
# Phase 6: analytics — time-series metrics snapshots for trending dashboard
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS metrics_snapshots (
|
|
ts TEXT DEFAULT (datetime('now')),
|
|
throughput_1h INTEGER,
|
|
approval_rate REAL,
|
|
open_prs INTEGER,
|
|
merged_total INTEGER,
|
|
closed_total INTEGER,
|
|
conflict_total INTEGER,
|
|
evaluated_24h INTEGER,
|
|
fix_success_rate REAL,
|
|
rejection_broken_wiki_links INTEGER DEFAULT 0,
|
|
rejection_frontmatter_schema INTEGER DEFAULT 0,
|
|
rejection_near_duplicate INTEGER DEFAULT 0,
|
|
rejection_confidence INTEGER DEFAULT 0,
|
|
rejection_other INTEGER DEFAULT 0,
|
|
extraction_model TEXT,
|
|
eval_domain_model TEXT,
|
|
eval_leo_model TEXT,
|
|
prompt_version TEXT,
|
|
pipeline_version TEXT,
|
|
source_origin_agent INTEGER DEFAULT 0,
|
|
source_origin_human INTEGER DEFAULT 0,
|
|
source_origin_scraper INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts);
|
|
""")
|
|
logger.info("Migration v6: added metrics_snapshots table for analytics dashboard")
|
|
|
|
if current < 7:
|
|
# Phase 7: agent attribution + commit_type for dashboard
|
|
# commit_type column + backfill agent/commit_type from branch prefix
|
|
try:
|
|
conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))")
|
|
except sqlite3.OperationalError:
|
|
pass # column already exists from CREATE TABLE
|
|
# Backfill agent and commit_type from branch prefix
|
|
rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall()
|
|
for row in rows:
|
|
agent, commit_type = classify_branch(row["branch"])
|
|
conn.execute(
|
|
"UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)",
|
|
(agent, commit_type, row["number"]),
|
|
)
|
|
backfilled = len(rows)
|
|
logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled)
|
|
|
|
if current < 8:
|
|
# Phase 8: response audit — full-chain visibility for agent response quality
|
|
# Captures: query → tool calls → retrieval → context → response → confidence
|
|
# Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops)
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS response_audit (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
chat_id INTEGER,
|
|
user TEXT,
|
|
agent TEXT DEFAULT 'rio',
|
|
model TEXT,
|
|
query TEXT,
|
|
conversation_window TEXT, -- intentional transcript duplication for audit self-containment
|
|
entities_matched TEXT,
|
|
claims_matched TEXT,
|
|
retrieval_layers_hit TEXT,
|
|
retrieval_gap TEXT,
|
|
market_data TEXT,
|
|
research_context TEXT,
|
|
kb_context_text TEXT,
|
|
tool_calls TEXT,
|
|
raw_response TEXT,
|
|
display_response TEXT,
|
|
confidence_score REAL,
|
|
response_time_ms INTEGER,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
|
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
|
""")
|
|
logger.info("Migration v8: added response_audit table for agent response auditing")
|
|
|
|
if current < 9:
|
|
# Phase 9: rebuild prs table to expand CHECK constraint on commit_type.
|
|
# SQLite cannot ALTER CHECK constraints in-place — must rebuild table.
|
|
# Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown
|
|
# New constraint: adds challenge,enrich,synthesize
|
|
# Also re-derive commit_type from branch prefix for rows with invalid/NULL values.
|
|
prs_sql_row = conn.execute(
|
|
"SELECT sql FROM sqlite_master WHERE type = 'table' AND name = 'prs'"
|
|
).fetchone()
|
|
prs_sql = (prs_sql_row["sql"] or "") if prs_sql_row else ""
|
|
|
|
if all(kind in prs_sql for kind in ("challenge", "enrich", "synthesize")):
|
|
logger.info("Migration v9: prs commit_type CHECK already expanded, rebuild skipped")
|
|
else:
|
|
# Step 1: Get all column names from existing table.
|
|
cols_info = conn.execute("PRAGMA table_info(prs)").fetchall()
|
|
col_names = [c["name"] for c in cols_info]
|
|
|
|
# Step 2: Create new table with the expanded CHECK constraint.
|
|
# Keep columns introduced before and after v9 when present. This keeps
|
|
# fresh DB bootstrap and partially manually-migrated VPS DBs idempotent.
|
|
target_cols = [
|
|
"number",
|
|
"source_path",
|
|
"branch",
|
|
"status",
|
|
"domain",
|
|
"agent",
|
|
"commit_type",
|
|
"tier",
|
|
"tier0_pass",
|
|
"leo_verdict",
|
|
"domain_verdict",
|
|
"domain_agent",
|
|
"domain_model",
|
|
"priority",
|
|
"origin",
|
|
"eval_attempts",
|
|
"eval_issues",
|
|
"fix_attempts",
|
|
"transient_retries",
|
|
"substantive_retries",
|
|
"last_error",
|
|
"last_attempt",
|
|
"cost_usd",
|
|
"auto_merge",
|
|
"github_pr",
|
|
"source_channel",
|
|
"prompt_version",
|
|
"pipeline_version",
|
|
"submitted_by",
|
|
"conflict_rebase_attempts",
|
|
"merge_failures",
|
|
"merge_cycled",
|
|
"created_at",
|
|
"merged_at",
|
|
]
|
|
insert_cols = [col for col in target_cols if col in col_names]
|
|
col_list = ", ".join(insert_cols)
|
|
|
|
conn.executescript("""
|
|
CREATE TABLE prs_new (
|
|
number INTEGER PRIMARY KEY,
|
|
source_path TEXT REFERENCES sources(path),
|
|
branch TEXT,
|
|
status TEXT NOT NULL DEFAULT 'open',
|
|
domain TEXT,
|
|
agent TEXT,
|
|
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')),
|
|
tier TEXT,
|
|
tier0_pass INTEGER,
|
|
leo_verdict TEXT DEFAULT 'pending',
|
|
domain_verdict TEXT DEFAULT 'pending',
|
|
domain_agent TEXT,
|
|
domain_model TEXT,
|
|
priority TEXT,
|
|
origin TEXT DEFAULT 'pipeline',
|
|
eval_attempts INTEGER DEFAULT 0,
|
|
eval_issues TEXT DEFAULT '[]',
|
|
fix_attempts INTEGER DEFAULT 0,
|
|
transient_retries INTEGER DEFAULT 0,
|
|
substantive_retries INTEGER DEFAULT 0,
|
|
last_error TEXT,
|
|
last_attempt TEXT,
|
|
cost_usd REAL DEFAULT 0,
|
|
auto_merge INTEGER DEFAULT 0,
|
|
github_pr INTEGER,
|
|
source_channel TEXT,
|
|
prompt_version TEXT,
|
|
pipeline_version TEXT,
|
|
submitted_by TEXT,
|
|
conflict_rebase_attempts INTEGER DEFAULT 0,
|
|
merge_failures INTEGER DEFAULT 0,
|
|
merge_cycled INTEGER DEFAULT 0,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
merged_at TEXT
|
|
);
|
|
""")
|
|
if insert_cols:
|
|
conn.execute(f"INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs")
|
|
conn.executescript("""
|
|
DROP TABLE prs;
|
|
ALTER TABLE prs_new RENAME TO prs;
|
|
""")
|
|
logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint")
|
|
|
|
# Step 3: Re-derive commit_type from branch prefix for invalid/NULL values
|
|
rows = conn.execute(
|
|
"""SELECT number, branch FROM prs
|
|
WHERE branch IS NOT NULL
|
|
AND (commit_type IS NULL
|
|
OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))"""
|
|
).fetchall()
|
|
fixed = 0
|
|
for row in rows:
|
|
agent, commit_type = classify_branch(row["branch"])
|
|
conn.execute(
|
|
"UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?",
|
|
(agent, commit_type, row["number"]),
|
|
)
|
|
fixed += 1
|
|
conn.commit()
|
|
logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed)
|
|
|
|
if current < 10:
|
|
# Add eval pipeline columns to response_audit
|
|
# VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern
|
|
for col_def in [
|
|
("prompt_tokens", "INTEGER"),
|
|
("completion_tokens", "INTEGER"),
|
|
("generation_cost", "REAL"),
|
|
("embedding_cost", "REAL"),
|
|
("total_cost", "REAL"),
|
|
("blocked", "INTEGER DEFAULT 0"),
|
|
("block_reason", "TEXT"),
|
|
("query_type", "TEXT"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
conn.commit()
|
|
logger.info("Migration v10: added eval pipeline columns to response_audit")
|
|
|
|
if current < 11:
|
|
# Add auto_merge flag for agent PR auto-merge (eval-approved agent branches)
|
|
try:
|
|
conn.execute("ALTER TABLE prs ADD COLUMN auto_merge INTEGER DEFAULT 0")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists (VPS may be ahead of repo schema)
|
|
conn.commit()
|
|
logger.info("Migration v11: added auto_merge column to prs table")
|
|
|
|
|
|
# v12-v16 ran manually on VPS before code was version-controlled.
|
|
# Their changes are consolidated into v17+ migrations below.
|
|
|
|
if current < 17:
|
|
# Add prompt/pipeline version tracking per PR
|
|
for col, _default in [
|
|
("prompt_version", None),
|
|
("pipeline_version", None),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE prs ADD COLUMN {col} TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
conn.commit()
|
|
logger.info("Migration v17: added prompt_version, pipeline_version to prs table")
|
|
|
|
if current < 18:
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS review_records (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
pr_number INTEGER NOT NULL,
|
|
claim_path TEXT,
|
|
domain TEXT,
|
|
agent TEXT,
|
|
reviewer TEXT,
|
|
reviewer_model TEXT,
|
|
outcome TEXT NOT NULL,
|
|
rejection_reason TEXT,
|
|
disagreement_type TEXT,
|
|
notes TEXT,
|
|
batch_id TEXT,
|
|
claims_in_batch INTEGER,
|
|
reviewed_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number);
|
|
CREATE INDEX IF NOT EXISTS idx_review_records_agent ON review_records(agent);
|
|
""")
|
|
conn.commit()
|
|
logger.info("Migration v18: created review_records table")
|
|
|
|
if current < 19:
|
|
# Add submitted_by for contributor attribution tracing.
|
|
# Tracks who submitted the source: human handle, agent name, or "self-directed".
|
|
try:
|
|
conn.execute("ALTER TABLE prs ADD COLUMN submitted_by TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass # Column already exists
|
|
try:
|
|
conn.execute("ALTER TABLE sources ADD COLUMN submitted_by TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
logger.info("Migration v19: added submitted_by to prs and sources tables")
|
|
|
|
if current < 20:
|
|
for col, default in [
|
|
("conflict_rebase_attempts", "INTEGER DEFAULT 0"),
|
|
("merge_failures", "INTEGER DEFAULT 0"),
|
|
("merge_cycled", "INTEGER DEFAULT 0"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE prs ADD COLUMN {col} {default}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
logger.info("Migration v20: added conflict retry columns to prs")
|
|
|
|
if current < 21:
|
|
try:
|
|
conn.execute("ALTER TABLE prs ADD COLUMN github_pr INTEGER")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_prs_github_pr ON prs (github_pr) WHERE github_pr IS NOT NULL"
|
|
)
|
|
conn.commit()
|
|
logger.info("Migration v21: added github_pr column + index to prs")
|
|
|
|
if current < 22:
|
|
try:
|
|
conn.execute("ALTER TABLE prs ADD COLUMN source_channel TEXT")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.execute("""
|
|
UPDATE prs SET source_channel = CASE
|
|
WHEN github_pr IS NOT NULL THEN 'github'
|
|
WHEN branch LIKE 'gh-pr-%%' THEN 'github'
|
|
WHEN branch LIKE 'theseus/%%' THEN 'agent'
|
|
WHEN branch LIKE 'rio/%%' THEN 'agent'
|
|
WHEN branch LIKE 'astra/%%' THEN 'agent'
|
|
WHEN branch LIKE 'clay/%%' THEN 'agent'
|
|
WHEN branch LIKE 'vida/%%' THEN 'agent'
|
|
WHEN branch LIKE 'oberon/%%' THEN 'agent'
|
|
WHEN branch LIKE 'leo/%%' THEN 'agent'
|
|
WHEN branch LIKE 'reweave/%%' THEN 'maintenance'
|
|
WHEN branch LIKE 'epimetheus/%%' THEN 'maintenance'
|
|
WHEN branch LIKE 'fix/%%' THEN 'maintenance'
|
|
WHEN branch LIKE 'extract/%%' THEN 'telegram'
|
|
WHEN branch LIKE 'ingestion/%%' THEN 'telegram'
|
|
ELSE 'unknown'
|
|
END
|
|
WHERE source_channel IS NULL
|
|
""")
|
|
conn.commit()
|
|
logger.info("Migration v22: added source_channel to prs + backfilled from branch prefix")
|
|
|
|
if current < 23:
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_prs_source_path ON prs(source_path) WHERE source_path IS NOT NULL"
|
|
)
|
|
conn.commit()
|
|
logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup")
|
|
|
|
if current < 24:
|
|
# Event-sourced contributions table + alias table + kind column on contributors.
|
|
# Non-breaking: contributors table stays; events are written in addition via
|
|
# double-write in merge.py. Leaderboards switch to events in Phase B.
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS contribution_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
handle TEXT NOT NULL,
|
|
kind TEXT NOT NULL DEFAULT 'person',
|
|
role TEXT NOT NULL,
|
|
weight REAL NOT NULL,
|
|
pr_number INTEGER NOT NULL,
|
|
claim_path TEXT,
|
|
domain TEXT,
|
|
channel TEXT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
-- Partial unique indexes handle SQLite's NULL != NULL UNIQUE semantics.
|
|
-- Per-claim events dedup on 4-tuple; PR-level events dedup on 3-tuple.
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
|
|
handle, role, pr_number, claim_path
|
|
) WHERE claim_path IS NOT NULL;
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
|
|
handle, role, pr_number
|
|
) WHERE claim_path IS NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
|
|
|
|
CREATE TABLE IF NOT EXISTS contributor_aliases (
|
|
alias TEXT PRIMARY KEY,
|
|
canonical TEXT NOT NULL,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
|
|
""")
|
|
try:
|
|
conn.execute("ALTER TABLE contributors ADD COLUMN kind TEXT DEFAULT 'person'")
|
|
except sqlite3.OperationalError:
|
|
pass # column already exists
|
|
# Seed known aliases. @thesensatore → thesensatore catches the zombie row Argus flagged.
|
|
# cameron → cameron-s1 reconciles the Leo-flagged missing contributor.
|
|
conn.executemany(
|
|
"INSERT OR IGNORE INTO contributor_aliases (alias, canonical) VALUES (?, ?)",
|
|
[
|
|
("@thesensatore", "thesensatore"),
|
|
("cameron", "cameron-s1"),
|
|
],
|
|
)
|
|
# Seed kind='agent' for known Pentagon agents so the events writer picks it up.
|
|
# Must stay in sync with lib/attribution.PENTAGON_AGENTS — drift causes
|
|
# contributors.kind to disagree with classify_kind() output for future
|
|
# inserts. (Ganymede review: "pipeline" was missing until Apr 24.)
|
|
pentagon_agents = [
|
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
|
"pipeline",
|
|
]
|
|
for agent in pentagon_agents:
|
|
conn.execute(
|
|
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
|
|
(agent,),
|
|
)
|
|
conn.commit()
|
|
logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column")
|
|
|
|
if current < 25:
|
|
# v24 seeded 13 Pentagon agents but missed "pipeline" — classify_kind()
|
|
# treats it as agent so contributors.kind drifted from event-insert output.
|
|
# Idempotent corrective UPDATE: fresh installs have no "pipeline" row
|
|
# (no-op), upgraded envs flip it if it exists. (Ganymede review Apr 24.)
|
|
conn.execute(
|
|
"UPDATE contributors SET kind = 'agent' WHERE handle = 'pipeline'"
|
|
)
|
|
conn.commit()
|
|
logger.info("Migration v25: patched kind='agent' for pipeline handle")
|
|
|
|
if current < 26:
|
|
# Add publishers + contributor_identities. Non-breaking — new tables only.
|
|
# No existing data moved. Classification into publishers happens via a
|
|
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
|
|
# seed list. CHECK constraint on contributors.kind deferred until after
|
|
# classification completes. (Apr 24 Cory directive: "fix schema, don't
|
|
# filter output" — separate contributors from publishers at the data layer.)
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS publishers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
|
|
url_pattern TEXT,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
|
|
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
|
|
|
|
CREATE TABLE IF NOT EXISTS contributor_identities (
|
|
contributor_handle TEXT NOT NULL,
|
|
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
|
|
platform_handle TEXT NOT NULL,
|
|
verified INTEGER DEFAULT 0,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
PRIMARY KEY (platform, platform_handle)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
|
|
""")
|
|
# Extend sources with provenance columns. ALTER TABLE ADD COLUMN is
|
|
# idempotent-safe via try/except because SQLite doesn't support IF NOT EXISTS
|
|
# on column adds.
|
|
for col_sql in (
|
|
"ALTER TABLE sources ADD COLUMN publisher_id INTEGER REFERENCES publishers(id)",
|
|
"ALTER TABLE sources ADD COLUMN content_type TEXT",
|
|
"ALTER TABLE sources ADD COLUMN original_author TEXT",
|
|
"ALTER TABLE sources ADD COLUMN original_author_handle TEXT REFERENCES contributors(handle)",
|
|
):
|
|
try:
|
|
conn.execute(col_sql)
|
|
except sqlite3.OperationalError as e:
|
|
if "duplicate column" not in str(e).lower():
|
|
raise
|
|
conn.commit()
|
|
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
|
|
|
|
if current < 27:
|
|
for col, definition in [
|
|
("duration_ms", "INTEGER DEFAULT 0"),
|
|
("cache_read_tokens", "INTEGER DEFAULT 0"),
|
|
("cache_write_tokens", "INTEGER DEFAULT 0"),
|
|
("cost_estimate_usd", "REAL DEFAULT 0"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE costs ADD COLUMN {col} {definition}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
logger.info("Migration v27: added detailed cost accounting columns")
|
|
|
|
if current < SCHEMA_VERSION:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
|
|
(SCHEMA_VERSION,),
|
|
)
|
|
conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML
|
|
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
|
|
else:
|
|
logger.debug("Database at schema version %d", current)
|
|
|
|
|
|
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None):
|
|
"""Write an audit log entry."""
|
|
conn.execute(
|
|
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
|
|
(stage, event, detail),
|
|
)
|
|
|
|
|
|
def record_review(
|
|
conn: sqlite3.Connection,
|
|
pr_number: int,
|
|
outcome: str,
|
|
*,
|
|
domain: str = None,
|
|
agent: str = None,
|
|
reviewer: str = None,
|
|
reviewer_model: str = None,
|
|
rejection_reason: str = None,
|
|
disagreement_type: str = None,
|
|
notes: str = None,
|
|
claims_in_batch: int = None,
|
|
):
|
|
"""Write a review record. Called at each eval verdict point."""
|
|
conn.execute(
|
|
"""INSERT INTO review_records
|
|
(pr_number, domain, agent, reviewer, reviewer_model, outcome,
|
|
rejection_reason, disagreement_type, notes, batch_id, claims_in_batch)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
pr_number, domain, agent, reviewer, reviewer_model, outcome,
|
|
rejection_reason, disagreement_type,
|
|
notes[:4000] if notes else None,
|
|
str(pr_number), # batch_id = PR number
|
|
claims_in_batch,
|
|
),
|
|
)
|
|
|
|
|
|
def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str):
|
|
"""Append a priority assessment to a source's priority_log.
|
|
|
|
NOTE: This does NOT update the source's priority column. The priority column
|
|
is the authoritative priority, set only by initial triage or human override.
|
|
The priority_log records each stage's opinion for offline calibration analysis.
|
|
(Bug caught by Theseus — original version overwrote priority with each stage's opinion.)
|
|
(Race condition fix per Vida — read-then-write wrapped in transaction.)
|
|
"""
|
|
conn.execute("BEGIN")
|
|
try:
|
|
row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone()
|
|
if not row:
|
|
conn.execute("ROLLBACK")
|
|
return
|
|
log = json.loads(row["priority_log"] or "[]")
|
|
log.append({"stage": stage, "priority": priority, "reasoning": reasoning})
|
|
conn.execute(
|
|
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
|
|
(json.dumps(log), path),
|
|
)
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
raise
|
|
|
|
|
|
def insert_response_audit(conn: sqlite3.Connection, **kwargs):
|
|
"""Insert a response audit record. All fields optional except query."""
|
|
cols = [
|
|
"timestamp", "chat_id", "user", "agent", "model", "query",
|
|
"conversation_window", "entities_matched", "claims_matched",
|
|
"retrieval_layers_hit", "retrieval_gap", "market_data",
|
|
"research_context", "kb_context_text", "tool_calls",
|
|
"raw_response", "display_response", "confidence_score",
|
|
"response_time_ms",
|
|
# Eval pipeline columns (v10)
|
|
"prompt_tokens", "completion_tokens", "generation_cost",
|
|
"embedding_cost", "total_cost", "blocked", "block_reason",
|
|
"query_type",
|
|
]
|
|
present = {k: v for k, v in kwargs.items() if k in cols and v is not None}
|
|
if not present:
|
|
return
|
|
col_names = ", ".join(present.keys())
|
|
placeholders = ", ".join("?" for _ in present)
|
|
conn.execute(
|
|
f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})",
|
|
tuple(present.values()),
|
|
)
|
|
|
|
|
|
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
|
|
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
|
|
conn.execute(
|
|
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
|
|
(priority, path),
|
|
)
|
|
append_priority_log(conn, path, "override", priority, reason)
|