apply Ganymede review fixes: delete misplaced ops/db.py, correct diff log, fix stale_pr DB update
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
13a6b60c21
commit
3461f2ad8f
3 changed files with 4 additions and 657 deletions
|
|
@ -38,7 +38,9 @@
|
||||||
- evolution.md — kept (documentation)
|
- evolution.md — kept (documentation)
|
||||||
- weekly/2026-03-25-week3.md — kept (report)
|
- weekly/2026-03-25-week3.md — kept (report)
|
||||||
- ops/sessions/*.json — kept (session data)
|
- ops/sessions/*.json — kept (session data)
|
||||||
- All .py files REMOVED from root diagnostics/
|
- alerting.py, alerting_routes.py REMOVED by this consolidation
|
||||||
|
- vitality.py, vitality_routes.py were already absent (moved in prior commit)
|
||||||
|
- No .py files remain in root diagnostics/
|
||||||
|
|
||||||
## VPS .bak files inventory (30+ files)
|
## VPS .bak files inventory (30+ files)
|
||||||
All in /opt/teleo-eval/diagnostics/. Git is the backup now. Safe to delete after consolidation verified.
|
All in /opt/teleo-eval/diagnostics/. Git is the backup now. Safe to delete after consolidation verified.
|
||||||
|
|
|
||||||
|
|
@ -1,655 +0,0 @@
|
||||||
"""SQLite database — schema, migrations, connection management."""
|
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import sqlite3
|
|
||||||
from contextlib import contextmanager
|
|
||||||
|
|
||||||
from . import config
|
|
||||||
|
|
||||||
logger = logging.getLogger("pipeline.db")
|
|
||||||
|
|
||||||
SCHEMA_VERSION = 16
|
|
||||||
|
|
||||||
SCHEMA_SQL = """
|
|
||||||
CREATE TABLE IF NOT EXISTS schema_version (
|
|
||||||
version INTEGER PRIMARY KEY,
|
|
||||||
applied_at TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS sources (
|
|
||||||
path TEXT PRIMARY KEY,
|
|
||||||
status TEXT NOT NULL DEFAULT 'unprocessed',
|
|
||||||
-- unprocessed, triaging, extracting, extracted, null_result,
|
|
||||||
-- needs_reextraction, error
|
|
||||||
priority TEXT DEFAULT 'medium',
|
|
||||||
-- critical, high, medium, low, skip
|
|
||||||
priority_log TEXT DEFAULT '[]',
|
|
||||||
-- JSON array: [{stage, priority, reasoning, ts}]
|
|
||||||
extraction_model TEXT,
|
|
||||||
claims_count INTEGER DEFAULT 0,
|
|
||||||
pr_number INTEGER,
|
|
||||||
transient_retries INTEGER DEFAULT 0,
|
|
||||||
substantive_retries INTEGER DEFAULT 0,
|
|
||||||
last_error TEXT,
|
|
||||||
feedback TEXT,
|
|
||||||
-- eval feedback for re-extraction (JSON)
|
|
||||||
cost_usd REAL DEFAULT 0,
|
|
||||||
created_at TEXT DEFAULT (datetime('now')),
|
|
||||||
updated_at TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS prs (
|
|
||||||
number INTEGER PRIMARY KEY,
|
|
||||||
source_path TEXT REFERENCES sources(path),
|
|
||||||
branch TEXT,
|
|
||||||
status TEXT NOT NULL DEFAULT 'open',
|
|
||||||
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
|
|
||||||
-- conflict: rebase failed or merge timed out — needs human intervention
|
|
||||||
domain TEXT,
|
|
||||||
agent TEXT,
|
|
||||||
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')),
|
|
||||||
tier TEXT,
|
|
||||||
-- LIGHT, STANDARD, DEEP
|
|
||||||
tier0_pass INTEGER,
|
|
||||||
-- 0/1
|
|
||||||
leo_verdict TEXT DEFAULT 'pending',
|
|
||||||
-- pending, approve, request_changes, skipped, failed
|
|
||||||
domain_verdict TEXT DEFAULT 'pending',
|
|
||||||
domain_agent TEXT,
|
|
||||||
domain_model TEXT,
|
|
||||||
priority TEXT,
|
|
||||||
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
|
|
||||||
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
|
|
||||||
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
|
|
||||||
origin TEXT DEFAULT 'pipeline',
|
|
||||||
-- pipeline | human | external
|
|
||||||
transient_retries INTEGER DEFAULT 0,
|
|
||||||
substantive_retries INTEGER DEFAULT 0,
|
|
||||||
last_error TEXT,
|
|
||||||
last_attempt TEXT,
|
|
||||||
cost_usd REAL DEFAULT 0,
|
|
||||||
created_at TEXT DEFAULT (datetime('now')),
|
|
||||||
merged_at TEXT
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS costs (
|
|
||||||
date TEXT,
|
|
||||||
model TEXT,
|
|
||||||
stage TEXT,
|
|
||||||
calls INTEGER DEFAULT 0,
|
|
||||||
input_tokens INTEGER DEFAULT 0,
|
|
||||||
output_tokens INTEGER DEFAULT 0,
|
|
||||||
cost_usd REAL DEFAULT 0,
|
|
||||||
PRIMARY KEY (date, model, stage)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS circuit_breakers (
|
|
||||||
name TEXT PRIMARY KEY,
|
|
||||||
state TEXT DEFAULT 'closed',
|
|
||||||
-- closed, open, halfopen
|
|
||||||
failures INTEGER DEFAULT 0,
|
|
||||||
successes INTEGER DEFAULT 0,
|
|
||||||
tripped_at TEXT,
|
|
||||||
last_success_at TEXT,
|
|
||||||
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
|
|
||||||
last_update TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS audit_log (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
timestamp TEXT DEFAULT (datetime('now')),
|
|
||||||
stage TEXT,
|
|
||||||
event TEXT,
|
|
||||||
detail TEXT
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS response_audit (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
||||||
chat_id INTEGER,
|
|
||||||
user TEXT,
|
|
||||||
agent TEXT DEFAULT 'rio',
|
|
||||||
model TEXT,
|
|
||||||
query TEXT,
|
|
||||||
conversation_window TEXT,
|
|
||||||
-- JSON: prior N messages for context
|
|
||||||
-- NOTE: intentional duplication of transcript data for audit self-containment.
|
|
||||||
-- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt
|
|
||||||
-- context inline for retrieval-quality diagnosis. Primary driver of row size —
|
|
||||||
-- target for cleanup when 90-day retention policy lands.
|
|
||||||
entities_matched TEXT,
|
|
||||||
-- JSON: [{name, path, score, used_in_response}]
|
|
||||||
claims_matched TEXT,
|
|
||||||
-- JSON: [{path, title, score, source, used_in_response}]
|
|
||||||
retrieval_layers_hit TEXT,
|
|
||||||
-- JSON: ["keyword","qdrant","graph"]
|
|
||||||
retrieval_gap TEXT,
|
|
||||||
-- What the KB was missing (if anything)
|
|
||||||
market_data TEXT,
|
|
||||||
-- JSON: injected token prices
|
|
||||||
research_context TEXT,
|
|
||||||
-- Haiku pre-pass results if any
|
|
||||||
kb_context_text TEXT,
|
|
||||||
-- Full context string sent to model
|
|
||||||
tool_calls TEXT,
|
|
||||||
-- JSON: ordered array [{tool, input, output, duration_ms, ts}]
|
|
||||||
raw_response TEXT,
|
|
||||||
display_response TEXT,
|
|
||||||
confidence_score REAL,
|
|
||||||
-- Model self-rated retrieval quality 0.0-1.0
|
|
||||||
response_time_ms INTEGER,
|
|
||||||
-- Eval pipeline columns (v10)
|
|
||||||
prompt_tokens INTEGER,
|
|
||||||
completion_tokens INTEGER,
|
|
||||||
generation_cost REAL,
|
|
||||||
embedding_cost REAL,
|
|
||||||
total_cost REAL,
|
|
||||||
blocked INTEGER DEFAULT 0,
|
|
||||||
block_reason TEXT,
|
|
||||||
query_type TEXT,
|
|
||||||
created_at TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
def get_connection(readonly: bool = False) -> sqlite3.Connection:
|
|
||||||
"""Create a SQLite connection with WAL mode and proper settings."""
|
|
||||||
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
conn = sqlite3.connect(
|
|
||||||
str(config.DB_PATH),
|
|
||||||
timeout=30,
|
|
||||||
isolation_level=None, # autocommit — we manage transactions explicitly
|
|
||||||
)
|
|
||||||
conn.row_factory = sqlite3.Row
|
|
||||||
conn.execute("PRAGMA journal_mode=WAL")
|
|
||||||
conn.execute("PRAGMA busy_timeout=10000")
|
|
||||||
conn.execute("PRAGMA foreign_keys=ON")
|
|
||||||
if readonly:
|
|
||||||
conn.execute("PRAGMA query_only=ON")
|
|
||||||
return conn
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def transaction(conn: sqlite3.Connection):
|
|
||||||
"""Context manager for explicit transactions."""
|
|
||||||
conn.execute("BEGIN")
|
|
||||||
try:
|
|
||||||
yield conn
|
|
||||||
conn.execute("COMMIT")
|
|
||||||
except Exception:
|
|
||||||
conn.execute("ROLLBACK")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
# Branch prefix → (agent, commit_type) mapping.
|
|
||||||
# Single source of truth — used by merge.py at INSERT time and migration v7 backfill.
|
|
||||||
# Unknown prefixes → ('unknown', 'unknown') + warning log.
|
|
||||||
BRANCH_PREFIX_MAP = {
|
|
||||||
"extract": ("pipeline", "extract"),
|
|
||||||
"ingestion": ("pipeline", "extract"),
|
|
||||||
"epimetheus": ("epimetheus", "extract"),
|
|
||||||
"rio": ("rio", "research"),
|
|
||||||
"theseus": ("theseus", "research"),
|
|
||||||
"astra": ("astra", "research"),
|
|
||||||
"vida": ("vida", "research"),
|
|
||||||
"clay": ("clay", "research"),
|
|
||||||
"leo": ("leo", "entity"),
|
|
||||||
"reweave": ("pipeline", "reweave"),
|
|
||||||
"fix": ("pipeline", "fix"),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def classify_branch(branch: str) -> tuple[str, str]:
|
|
||||||
"""Derive (agent, commit_type) from branch prefix.
|
|
||||||
|
|
||||||
Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes.
|
|
||||||
"""
|
|
||||||
prefix = branch.split("/", 1)[0] if "/" in branch else branch
|
|
||||||
result = BRANCH_PREFIX_MAP.get(prefix)
|
|
||||||
if result is None:
|
|
||||||
logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch)
|
|
||||||
return ("unknown", "unknown")
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def migrate(conn: sqlite3.Connection):
|
|
||||||
"""Run schema migrations."""
|
|
||||||
conn.executescript(SCHEMA_SQL)
|
|
||||||
|
|
||||||
# Check current version
|
|
||||||
try:
|
|
||||||
row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone()
|
|
||||||
current = row["v"] if row and row["v"] else 0
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
current = 0
|
|
||||||
|
|
||||||
# --- Incremental migrations ---
|
|
||||||
if current < 2:
|
|
||||||
# Phase 2: add multiplayer columns to prs table
|
|
||||||
for stmt in [
|
|
||||||
"ALTER TABLE prs ADD COLUMN priority TEXT",
|
|
||||||
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
|
|
||||||
"ALTER TABLE prs ADD COLUMN last_error TEXT",
|
|
||||||
]:
|
|
||||||
try:
|
|
||||||
conn.execute(stmt)
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # Column already exists (idempotent)
|
|
||||||
logger.info("Migration v2: added priority, origin, last_error to prs")
|
|
||||||
|
|
||||||
if current < 3:
|
|
||||||
# Phase 3: retry budget — track eval attempts and issue tags per PR
|
|
||||||
for stmt in [
|
|
||||||
"ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0",
|
|
||||||
"ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'",
|
|
||||||
]:
|
|
||||||
try:
|
|
||||||
conn.execute(stmt)
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # Column already exists (idempotent)
|
|
||||||
logger.info("Migration v3: added eval_attempts, eval_issues to prs")
|
|
||||||
|
|
||||||
if current < 4:
|
|
||||||
# Phase 4: auto-fixer — track fix attempts per PR
|
|
||||||
for stmt in [
|
|
||||||
"ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0",
|
|
||||||
]:
|
|
||||||
try:
|
|
||||||
conn.execute(stmt)
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # Column already exists (idempotent)
|
|
||||||
logger.info("Migration v4: added fix_attempts to prs")
|
|
||||||
|
|
||||||
if current < 5:
|
|
||||||
# Phase 5: contributor identity system — tracks who contributed what
|
|
||||||
# Aligned with schemas/attribution.md (5 roles) + Leo's tier system.
|
|
||||||
# CI is COMPUTED from raw counts × weights, never stored.
|
|
||||||
conn.executescript("""
|
|
||||||
CREATE TABLE IF NOT EXISTS contributors (
|
|
||||||
handle TEXT PRIMARY KEY,
|
|
||||||
display_name TEXT,
|
|
||||||
agent_id TEXT,
|
|
||||||
first_contribution TEXT,
|
|
||||||
last_contribution TEXT,
|
|
||||||
tier TEXT DEFAULT 'new',
|
|
||||||
-- new, contributor, veteran
|
|
||||||
sourcer_count INTEGER DEFAULT 0,
|
|
||||||
extractor_count INTEGER DEFAULT 0,
|
|
||||||
challenger_count INTEGER DEFAULT 0,
|
|
||||||
synthesizer_count INTEGER DEFAULT 0,
|
|
||||||
reviewer_count INTEGER DEFAULT 0,
|
|
||||||
claims_merged INTEGER DEFAULT 0,
|
|
||||||
challenges_survived INTEGER DEFAULT 0,
|
|
||||||
domains TEXT DEFAULT '[]',
|
|
||||||
highlights TEXT DEFAULT '[]',
|
|
||||||
identities TEXT DEFAULT '{}',
|
|
||||||
created_at TEXT DEFAULT (datetime('now')),
|
|
||||||
updated_at TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier);
|
|
||||||
""")
|
|
||||||
logger.info("Migration v5: added contributors table")
|
|
||||||
|
|
||||||
if current < 6:
|
|
||||||
# Phase 6: analytics — time-series metrics snapshots for trending dashboard
|
|
||||||
conn.executescript("""
|
|
||||||
CREATE TABLE IF NOT EXISTS metrics_snapshots (
|
|
||||||
ts TEXT DEFAULT (datetime('now')),
|
|
||||||
throughput_1h INTEGER,
|
|
||||||
approval_rate REAL,
|
|
||||||
open_prs INTEGER,
|
|
||||||
merged_total INTEGER,
|
|
||||||
closed_total INTEGER,
|
|
||||||
conflict_total INTEGER,
|
|
||||||
evaluated_24h INTEGER,
|
|
||||||
fix_success_rate REAL,
|
|
||||||
rejection_broken_wiki_links INTEGER DEFAULT 0,
|
|
||||||
rejection_frontmatter_schema INTEGER DEFAULT 0,
|
|
||||||
rejection_near_duplicate INTEGER DEFAULT 0,
|
|
||||||
rejection_confidence INTEGER DEFAULT 0,
|
|
||||||
rejection_other INTEGER DEFAULT 0,
|
|
||||||
extraction_model TEXT,
|
|
||||||
eval_domain_model TEXT,
|
|
||||||
eval_leo_model TEXT,
|
|
||||||
prompt_version TEXT,
|
|
||||||
pipeline_version TEXT,
|
|
||||||
source_origin_agent INTEGER DEFAULT 0,
|
|
||||||
source_origin_human INTEGER DEFAULT 0,
|
|
||||||
source_origin_scraper INTEGER DEFAULT 0
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts);
|
|
||||||
""")
|
|
||||||
logger.info("Migration v6: added metrics_snapshots table for analytics dashboard")
|
|
||||||
|
|
||||||
if current < 7:
|
|
||||||
# Phase 7: agent attribution + commit_type for dashboard
|
|
||||||
# commit_type column + backfill agent/commit_type from branch prefix
|
|
||||||
try:
|
|
||||||
conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))")
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # column already exists from CREATE TABLE
|
|
||||||
# Backfill agent and commit_type from branch prefix
|
|
||||||
rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall()
|
|
||||||
for row in rows:
|
|
||||||
agent, commit_type = classify_branch(row["branch"])
|
|
||||||
conn.execute(
|
|
||||||
"UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)",
|
|
||||||
(agent, commit_type, row["number"]),
|
|
||||||
)
|
|
||||||
backfilled = len(rows)
|
|
||||||
logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled)
|
|
||||||
|
|
||||||
if current < 8:
|
|
||||||
# Phase 8: response audit — full-chain visibility for agent response quality
|
|
||||||
# Captures: query → tool calls → retrieval → context → response → confidence
|
|
||||||
# Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops)
|
|
||||||
conn.executescript("""
|
|
||||||
CREATE TABLE IF NOT EXISTS response_audit (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
||||||
chat_id INTEGER,
|
|
||||||
user TEXT,
|
|
||||||
agent TEXT DEFAULT 'rio',
|
|
||||||
model TEXT,
|
|
||||||
query TEXT,
|
|
||||||
conversation_window TEXT, -- intentional transcript duplication for audit self-containment
|
|
||||||
entities_matched TEXT,
|
|
||||||
claims_matched TEXT,
|
|
||||||
retrieval_layers_hit TEXT,
|
|
||||||
retrieval_gap TEXT,
|
|
||||||
market_data TEXT,
|
|
||||||
research_context TEXT,
|
|
||||||
kb_context_text TEXT,
|
|
||||||
tool_calls TEXT,
|
|
||||||
raw_response TEXT,
|
|
||||||
display_response TEXT,
|
|
||||||
confidence_score REAL,
|
|
||||||
response_time_ms INTEGER,
|
|
||||||
created_at TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
|
||||||
""")
|
|
||||||
logger.info("Migration v8: added response_audit table for agent response auditing")
|
|
||||||
|
|
||||||
if current < 9:
|
|
||||||
# Phase 9: rebuild prs table to expand CHECK constraint on commit_type.
|
|
||||||
# SQLite cannot ALTER CHECK constraints in-place — must rebuild table.
|
|
||||||
# Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown
|
|
||||||
# New constraint: adds challenge,enrich,synthesize
|
|
||||||
# Also re-derive commit_type from branch prefix for rows with invalid/NULL values.
|
|
||||||
|
|
||||||
# Step 1: Get all column names from existing table
|
|
||||||
cols_info = conn.execute("PRAGMA table_info(prs)").fetchall()
|
|
||||||
col_names = [c["name"] for c in cols_info]
|
|
||||||
col_list = ", ".join(col_names)
|
|
||||||
|
|
||||||
# Step 2: Create new table with expanded CHECK constraint
|
|
||||||
conn.executescript(f"""
|
|
||||||
CREATE TABLE prs_new (
|
|
||||||
number INTEGER PRIMARY KEY,
|
|
||||||
source_path TEXT REFERENCES sources(path),
|
|
||||||
branch TEXT,
|
|
||||||
status TEXT NOT NULL DEFAULT 'open',
|
|
||||||
domain TEXT,
|
|
||||||
agent TEXT,
|
|
||||||
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')),
|
|
||||||
tier TEXT,
|
|
||||||
tier0_pass INTEGER,
|
|
||||||
leo_verdict TEXT DEFAULT 'pending',
|
|
||||||
domain_verdict TEXT DEFAULT 'pending',
|
|
||||||
domain_agent TEXT,
|
|
||||||
domain_model TEXT,
|
|
||||||
priority TEXT,
|
|
||||||
origin TEXT DEFAULT 'pipeline',
|
|
||||||
transient_retries INTEGER DEFAULT 0,
|
|
||||||
substantive_retries INTEGER DEFAULT 0,
|
|
||||||
last_error TEXT,
|
|
||||||
last_attempt TEXT,
|
|
||||||
cost_usd REAL DEFAULT 0,
|
|
||||||
created_at TEXT DEFAULT (datetime('now')),
|
|
||||||
merged_at TEXT
|
|
||||||
);
|
|
||||||
INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs;
|
|
||||||
DROP TABLE prs;
|
|
||||||
ALTER TABLE prs_new RENAME TO prs;
|
|
||||||
""")
|
|
||||||
logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint")
|
|
||||||
|
|
||||||
# Step 3: Re-derive commit_type from branch prefix for invalid/NULL values
|
|
||||||
rows = conn.execute(
|
|
||||||
"""SELECT number, branch FROM prs
|
|
||||||
WHERE branch IS NOT NULL
|
|
||||||
AND (commit_type IS NULL
|
|
||||||
OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))"""
|
|
||||||
).fetchall()
|
|
||||||
fixed = 0
|
|
||||||
for row in rows:
|
|
||||||
agent, commit_type = classify_branch(row["branch"])
|
|
||||||
conn.execute(
|
|
||||||
"UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?",
|
|
||||||
(agent, commit_type, row["number"]),
|
|
||||||
)
|
|
||||||
fixed += 1
|
|
||||||
conn.commit()
|
|
||||||
logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed)
|
|
||||||
|
|
||||||
if current < 10:
|
|
||||||
# Add eval pipeline columns to response_audit
|
|
||||||
# VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern
|
|
||||||
for col_def in [
|
|
||||||
("prompt_tokens", "INTEGER"),
|
|
||||||
("completion_tokens", "INTEGER"),
|
|
||||||
("generation_cost", "REAL"),
|
|
||||||
("embedding_cost", "REAL"),
|
|
||||||
("total_cost", "REAL"),
|
|
||||||
("blocked", "INTEGER DEFAULT 0"),
|
|
||||||
("block_reason", "TEXT"),
|
|
||||||
("query_type", "TEXT"),
|
|
||||||
]:
|
|
||||||
try:
|
|
||||||
conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}")
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # Column already exists
|
|
||||||
conn.commit()
|
|
||||||
logger.info("Migration v10: added eval pipeline columns to response_audit")
|
|
||||||
|
|
||||||
|
|
||||||
if current < 11:
|
|
||||||
# Phase 11: compute tracking — extended costs table columns
|
|
||||||
# (May already exist on VPS from manual deploy — idempotent ALTERs)
|
|
||||||
for col_def in [
|
|
||||||
("duration_ms", "INTEGER DEFAULT 0"),
|
|
||||||
("cache_read_tokens", "INTEGER DEFAULT 0"),
|
|
||||||
("cache_write_tokens", "INTEGER DEFAULT 0"),
|
|
||||||
("cost_estimate_usd", "REAL DEFAULT 0"),
|
|
||||||
]:
|
|
||||||
try:
|
|
||||||
conn.execute(f"ALTER TABLE costs ADD COLUMN {col_def[0]} {col_def[1]}")
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # Column already exists
|
|
||||||
conn.commit()
|
|
||||||
logger.info("Migration v11: added compute tracking columns to costs")
|
|
||||||
|
|
||||||
if current < 12:
|
|
||||||
# Phase 12: structured review records — captures all evaluation outcomes
|
|
||||||
# including rejections, disagreements, and approved-with-changes.
|
|
||||||
# Schema locked with Leo (2026-04-01).
|
|
||||||
conn.executescript("""
|
|
||||||
CREATE TABLE IF NOT EXISTS review_records (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
pr_number INTEGER NOT NULL,
|
|
||||||
claim_path TEXT,
|
|
||||||
domain TEXT,
|
|
||||||
agent TEXT,
|
|
||||||
reviewer TEXT NOT NULL,
|
|
||||||
reviewer_model TEXT,
|
|
||||||
outcome TEXT NOT NULL
|
|
||||||
CHECK (outcome IN ('approved', 'approved-with-changes', 'rejected')),
|
|
||||||
rejection_reason TEXT
|
|
||||||
CHECK (rejection_reason IS NULL OR rejection_reason IN (
|
|
||||||
'fails-standalone-test', 'duplicate', 'scope-mismatch',
|
|
||||||
'evidence-insufficient', 'framing-poor', 'other'
|
|
||||||
)),
|
|
||||||
disagreement_type TEXT
|
|
||||||
CHECK (disagreement_type IS NULL OR disagreement_type IN (
|
|
||||||
'factual', 'scope', 'framing', 'evidence'
|
|
||||||
)),
|
|
||||||
notes TEXT,
|
|
||||||
batch_id TEXT,
|
|
||||||
claims_in_batch INTEGER DEFAULT 1,
|
|
||||||
reviewed_at TEXT DEFAULT (datetime('now'))
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_review_records_outcome ON review_records(outcome);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_review_records_domain ON review_records(domain);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_review_records_reviewer ON review_records(reviewer);
|
|
||||||
""")
|
|
||||||
logger.info("Migration v12: created review_records table")
|
|
||||||
|
|
||||||
if current < 16:
|
|
||||||
# Phase 16: trace_id on audit_log — queryable provenance chain.
|
|
||||||
# Auto-extracted from detail JSON's "pr" field by audit().
|
|
||||||
# Backfill existing rows from their detail JSON.
|
|
||||||
try:
|
|
||||||
conn.execute("ALTER TABLE audit_log ADD COLUMN trace_id TEXT")
|
|
||||||
except sqlite3.OperationalError:
|
|
||||||
pass # Column already exists
|
|
||||||
conn.execute("""
|
|
||||||
UPDATE audit_log
|
|
||||||
SET trace_id = json_extract(detail, '$.pr')
|
|
||||||
WHERE trace_id IS NULL
|
|
||||||
AND detail IS NOT NULL
|
|
||||||
AND json_extract(detail, '$.pr') IS NOT NULL
|
|
||||||
""")
|
|
||||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_log_trace ON audit_log(trace_id)")
|
|
||||||
conn.commit()
|
|
||||||
logger.info("Migration v16: added trace_id to audit_log + backfilled from detail JSON")
|
|
||||||
|
|
||||||
if current < SCHEMA_VERSION:
|
|
||||||
conn.execute(
|
|
||||||
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
|
|
||||||
(SCHEMA_VERSION,),
|
|
||||||
)
|
|
||||||
conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML
|
|
||||||
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
|
|
||||||
else:
|
|
||||||
logger.debug("Database at schema version %d", current)
|
|
||||||
|
|
||||||
|
|
||||||
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None, *, trace_id: str = None):
|
|
||||||
"""Write an audit log entry.
|
|
||||||
|
|
||||||
trace_id is auto-extracted from detail JSON's "pr" field if not provided.
|
|
||||||
This gives every audit row a queryable trace without changing any call site.
|
|
||||||
"""
|
|
||||||
if trace_id is None and detail:
|
|
||||||
try:
|
|
||||||
trace_id = str(json.loads(detail).get("pr", ""))
|
|
||||||
except (json.JSONDecodeError, TypeError, AttributeError):
|
|
||||||
pass
|
|
||||||
if trace_id == "":
|
|
||||||
trace_id = None
|
|
||||||
conn.execute(
|
|
||||||
"INSERT INTO audit_log (stage, event, detail, trace_id) VALUES (?, ?, ?, ?)",
|
|
||||||
(stage, event, detail, trace_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def record_review(conn, pr_number: int, reviewer: str, outcome: str, *,
|
|
||||||
claim_path: str = None, domain: str = None, agent: str = None,
|
|
||||||
reviewer_model: str = None, rejection_reason: str = None,
|
|
||||||
disagreement_type: str = None, notes: str = None,
|
|
||||||
claims_in_batch: int = 1):
|
|
||||||
"""Record a structured review outcome.
|
|
||||||
|
|
||||||
Called from evaluate stage after Leo/domain reviewer returns a verdict.
|
|
||||||
outcome must be: approved, approved-with-changes, or rejected.
|
|
||||||
"""
|
|
||||||
batch_id = str(pr_number)
|
|
||||||
conn.execute(
|
|
||||||
"""INSERT INTO review_records
|
|
||||||
(pr_number, claim_path, domain, agent, reviewer, reviewer_model,
|
|
||||||
outcome, rejection_reason, disagreement_type, notes,
|
|
||||||
batch_id, claims_in_batch)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
||||||
(pr_number, claim_path, domain, agent, reviewer, reviewer_model,
|
|
||||||
outcome, rejection_reason, disagreement_type, notes,
|
|
||||||
batch_id, claims_in_batch),
|
|
||||||
)
|
|
||||||
|
|
||||||
def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str):
|
|
||||||
"""Append a priority assessment to a source's priority_log.
|
|
||||||
|
|
||||||
NOTE: This does NOT update the source's priority column. The priority column
|
|
||||||
is the authoritative priority, set only by initial triage or human override.
|
|
||||||
The priority_log records each stage's opinion for offline calibration analysis.
|
|
||||||
(Bug caught by Theseus — original version overwrote priority with each stage's opinion.)
|
|
||||||
(Race condition fix per Vida — read-then-write wrapped in transaction.)
|
|
||||||
"""
|
|
||||||
conn.execute("BEGIN")
|
|
||||||
try:
|
|
||||||
row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone()
|
|
||||||
if not row:
|
|
||||||
conn.execute("ROLLBACK")
|
|
||||||
return
|
|
||||||
log = json.loads(row["priority_log"] or "[]")
|
|
||||||
log.append({"stage": stage, "priority": priority, "reasoning": reasoning})
|
|
||||||
conn.execute(
|
|
||||||
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
|
|
||||||
(json.dumps(log), path),
|
|
||||||
)
|
|
||||||
conn.execute("COMMIT")
|
|
||||||
except Exception:
|
|
||||||
conn.execute("ROLLBACK")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def insert_response_audit(conn: sqlite3.Connection, **kwargs):
|
|
||||||
"""Insert a response audit record. All fields optional except query."""
|
|
||||||
cols = [
|
|
||||||
"timestamp", "chat_id", "user", "agent", "model", "query",
|
|
||||||
"conversation_window", "entities_matched", "claims_matched",
|
|
||||||
"retrieval_layers_hit", "retrieval_gap", "market_data",
|
|
||||||
"research_context", "kb_context_text", "tool_calls",
|
|
||||||
"raw_response", "display_response", "confidence_score",
|
|
||||||
"response_time_ms",
|
|
||||||
# Eval pipeline columns (v10)
|
|
||||||
"prompt_tokens", "completion_tokens", "generation_cost",
|
|
||||||
"embedding_cost", "total_cost", "blocked", "block_reason",
|
|
||||||
"query_type",
|
|
||||||
]
|
|
||||||
present = {k: v for k, v in kwargs.items() if k in cols and v is not None}
|
|
||||||
if not present:
|
|
||||||
return
|
|
||||||
col_names = ", ".join(present.keys())
|
|
||||||
placeholders = ", ".join("?" for _ in present)
|
|
||||||
conn.execute(
|
|
||||||
f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})",
|
|
||||||
tuple(present.values()),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
|
|
||||||
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
|
|
||||||
conn.execute(
|
|
||||||
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
|
|
||||||
(priority, path),
|
|
||||||
)
|
|
||||||
append_priority_log(conn, path, "override", priority, reason)
|
|
||||||
|
|
@ -20,7 +20,7 @@ from .forgejo import api, repo_path
|
||||||
|
|
||||||
logger = logging.getLogger("pipeline.stale_pr")
|
logger = logging.getLogger("pipeline.stale_pr")
|
||||||
|
|
||||||
STALE_THRESHOLD_MINUTES = 30
|
STALE_THRESHOLD_MINUTES = 45
|
||||||
|
|
||||||
|
|
||||||
async def check_stale_prs(conn) -> tuple[int, int]:
|
async def check_stale_prs(conn) -> tuple[int, int]:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue