teleo-infrastructure/lib/db.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

305 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SQLite database — schema, migrations, connection management."""
import json
import logging
import sqlite3
from contextlib import contextmanager
from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 6
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sources (
path TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'unprocessed',
-- unprocessed, triaging, extracting, extracted, null_result,
-- needs_reextraction, error
priority TEXT DEFAULT 'medium',
-- critical, high, medium, low, skip
priority_log TEXT DEFAULT '[]',
-- JSON array: [{stage, priority, reasoning, ts}]
extraction_model TEXT,
claims_count INTEGER DEFAULT 0,
pr_number INTEGER,
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS prs (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
-- conflict: rebase failed or merge timed out — needs human intervention
domain TEXT,
agent TEXT,
tier TEXT,
-- LIGHT, STANDARD, DEEP
tier0_pass INTEGER,
-- 0/1
leo_verdict TEXT DEFAULT 'pending',
-- pending, approve, request_changes, skipped, failed
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
origin TEXT DEFAULT 'pipeline',
-- pipeline | human | external
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
CREATE TABLE IF NOT EXISTS costs (
date TEXT,
model TEXT,
stage TEXT,
calls INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
PRIMARY KEY (date, model, stage)
);
CREATE TABLE IF NOT EXISTS circuit_breakers (
name TEXT PRIMARY KEY,
state TEXT DEFAULT 'closed',
-- closed, open, halfopen
failures INTEGER DEFAULT 0,
successes INTEGER DEFAULT 0,
tripped_at TEXT,
last_success_at TEXT,
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
last_update TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
stage TEXT,
event TEXT,
detail TEXT
);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
"""
def get_connection(readonly: bool = False) -> sqlite3.Connection:
"""Create a SQLite connection with WAL mode and proper settings."""
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(
str(config.DB_PATH),
timeout=30,
isolation_level=None, # autocommit — we manage transactions explicitly
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
conn.execute("PRAGMA foreign_keys=ON")
if readonly:
conn.execute("PRAGMA query_only=ON")
return conn
@contextmanager
def transaction(conn: sqlite3.Connection):
"""Context manager for explicit transactions."""
conn.execute("BEGIN")
try:
yield conn
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def migrate(conn: sqlite3.Connection):
"""Run schema migrations."""
conn.executescript(SCHEMA_SQL)
# Check current version
try:
row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone()
current = row["v"] if row and row["v"] else 0
except sqlite3.OperationalError:
current = 0
# --- Incremental migrations ---
if current < 2:
# Phase 2: add multiplayer columns to prs table
for stmt in [
"ALTER TABLE prs ADD COLUMN priority TEXT",
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
"ALTER TABLE prs ADD COLUMN last_error TEXT",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v2: added priority, origin, last_error to prs")
if current < 3:
# Phase 3: retry budget — track eval attempts and issue tags per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0",
"ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v3: added eval_attempts, eval_issues to prs")
if current < 4:
# Phase 4: auto-fixer — track fix attempts per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v4: added fix_attempts to prs")
if current < 5:
# Phase 5: contributor identity system — tracks who contributed what
# Aligned with schemas/attribution.md (5 roles) + Leo's tier system.
# CI is COMPUTED from raw counts × weights, never stored.
conn.executescript("""
CREATE TABLE IF NOT EXISTS contributors (
handle TEXT PRIMARY KEY,
display_name TEXT,
agent_id TEXT,
first_contribution TEXT,
last_contribution TEXT,
tier TEXT DEFAULT 'new',
-- new, contributor, veteran
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
claims_merged INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
highlights TEXT DEFAULT '[]',
identities TEXT DEFAULT '{}',
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier);
""")
logger.info("Migration v5: added contributors table")
if current < 6:
# Phase 6: analytics — time-series metrics snapshots for trending dashboard
conn.executescript("""
CREATE TABLE IF NOT EXISTS metrics_snapshots (
ts TEXT DEFAULT (datetime('now')),
throughput_1h INTEGER,
approval_rate REAL,
open_prs INTEGER,
merged_total INTEGER,
closed_total INTEGER,
conflict_total INTEGER,
evaluated_24h INTEGER,
fix_success_rate REAL,
rejection_broken_wiki_links INTEGER DEFAULT 0,
rejection_frontmatter_schema INTEGER DEFAULT 0,
rejection_near_duplicate INTEGER DEFAULT 0,
rejection_confidence INTEGER DEFAULT 0,
rejection_other INTEGER DEFAULT 0,
extraction_model TEXT,
eval_domain_model TEXT,
eval_leo_model TEXT,
prompt_version TEXT,
pipeline_version TEXT,
source_origin_agent INTEGER DEFAULT 0,
source_origin_human INTEGER DEFAULT 0,
source_origin_scraper INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts);
""")
logger.info("Migration v6: added metrics_snapshots table for analytics dashboard")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
else:
logger.debug("Database at schema version %d", current)
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None):
"""Write an audit log entry."""
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
(stage, event, detail),
)
def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str):
"""Append a priority assessment to a source's priority_log.
NOTE: This does NOT update the source's priority column. The priority column
is the authoritative priority, set only by initial triage or human override.
The priority_log records each stage's opinion for offline calibration analysis.
(Bug caught by Theseus — original version overwrote priority with each stage's opinion.)
(Race condition fix per Vida — read-then-write wrapped in transaction.)
"""
conn.execute("BEGIN")
try:
row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone()
if not row:
conn.execute("ROLLBACK")
return
log = json.loads(row["priority_log"] or "[]")
log.append({"stage": stage, "priority": priority, "reasoning": reasoning})
conn.execute(
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
(json.dumps(log), path),
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
conn.execute(
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
(priority, path),
)
append_priority_log(conn, path, "override", priority, reason)