diff --git a/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md b/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md index 2457d51a5..9f2593be4 100644 --- a/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md +++ b/ops/diagnostics/CONSOLIDATION-DIFF-LOG.md @@ -38,7 +38,9 @@ - evolution.md — kept (documentation) - weekly/2026-03-25-week3.md — kept (report) - ops/sessions/*.json — kept (session data) -- All .py files REMOVED from root diagnostics/ +- alerting.py, alerting_routes.py REMOVED by this consolidation +- vitality.py, vitality_routes.py were already absent (moved in prior commit) +- No .py files remain in root diagnostics/ ## VPS .bak files inventory (30+ files) All in /opt/teleo-eval/diagnostics/. Git is the backup now. Safe to delete after consolidation verified. diff --git a/ops/diagnostics/ops/db.py b/ops/diagnostics/ops/db.py deleted file mode 100644 index 518bfbdc2..000000000 --- a/ops/diagnostics/ops/db.py +++ /dev/null @@ -1,655 +0,0 @@ -"""SQLite database — schema, migrations, connection management.""" - -import json -import logging -import sqlite3 -from contextlib import contextmanager - -from . import config - -logger = logging.getLogger("pipeline.db") - -SCHEMA_VERSION = 16 - -SCHEMA_SQL = """ -CREATE TABLE IF NOT EXISTS schema_version ( - version INTEGER PRIMARY KEY, - applied_at TEXT DEFAULT (datetime('now')) -); - -CREATE TABLE IF NOT EXISTS sources ( - path TEXT PRIMARY KEY, - status TEXT NOT NULL DEFAULT 'unprocessed', - -- unprocessed, triaging, extracting, extracted, null_result, - -- needs_reextraction, error - priority TEXT DEFAULT 'medium', - -- critical, high, medium, low, skip - priority_log TEXT DEFAULT '[]', - -- JSON array: [{stage, priority, reasoning, ts}] - extraction_model TEXT, - claims_count INTEGER DEFAULT 0, - pr_number INTEGER, - transient_retries INTEGER DEFAULT 0, - substantive_retries INTEGER DEFAULT 0, - last_error TEXT, - feedback TEXT, - -- eval feedback for re-extraction (JSON) - cost_usd REAL DEFAULT 0, - created_at TEXT DEFAULT (datetime('now')), - updated_at TEXT DEFAULT (datetime('now')) -); - -CREATE TABLE IF NOT EXISTS prs ( - number INTEGER PRIMARY KEY, - source_path TEXT REFERENCES sources(path), - branch TEXT, - status TEXT NOT NULL DEFAULT 'open', - -- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict - -- conflict: rebase failed or merge timed out — needs human intervention - domain TEXT, - agent TEXT, - commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')), - tier TEXT, - -- LIGHT, STANDARD, DEEP - tier0_pass INTEGER, - -- 0/1 - leo_verdict TEXT DEFAULT 'pending', - -- pending, approve, request_changes, skipped, failed - domain_verdict TEXT DEFAULT 'pending', - domain_agent TEXT, - domain_model TEXT, - priority TEXT, - -- NULL = inherit from source. Set explicitly for human-submitted PRs. - -- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium') - -- Human PRs: 'critical' (detected via missing source_path or non-agent author) - origin TEXT DEFAULT 'pipeline', - -- pipeline | human | external - transient_retries INTEGER DEFAULT 0, - substantive_retries INTEGER DEFAULT 0, - last_error TEXT, - last_attempt TEXT, - cost_usd REAL DEFAULT 0, - created_at TEXT DEFAULT (datetime('now')), - merged_at TEXT -); - -CREATE TABLE IF NOT EXISTS costs ( - date TEXT, - model TEXT, - stage TEXT, - calls INTEGER DEFAULT 0, - input_tokens INTEGER DEFAULT 0, - output_tokens INTEGER DEFAULT 0, - cost_usd REAL DEFAULT 0, - PRIMARY KEY (date, model, stage) -); - -CREATE TABLE IF NOT EXISTS circuit_breakers ( - name TEXT PRIMARY KEY, - state TEXT DEFAULT 'closed', - -- closed, open, halfopen - failures INTEGER DEFAULT 0, - successes INTEGER DEFAULT 0, - tripped_at TEXT, - last_success_at TEXT, - -- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida) - last_update TEXT DEFAULT (datetime('now')) -); - -CREATE TABLE IF NOT EXISTS audit_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT DEFAULT (datetime('now')), - stage TEXT, - event TEXT, - detail TEXT -); - -CREATE TABLE IF NOT EXISTS response_audit ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (datetime('now')), - chat_id INTEGER, - user TEXT, - agent TEXT DEFAULT 'rio', - model TEXT, - query TEXT, - conversation_window TEXT, - -- JSON: prior N messages for context - -- NOTE: intentional duplication of transcript data for audit self-containment. - -- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt - -- context inline for retrieval-quality diagnosis. Primary driver of row size — - -- target for cleanup when 90-day retention policy lands. - entities_matched TEXT, - -- JSON: [{name, path, score, used_in_response}] - claims_matched TEXT, - -- JSON: [{path, title, score, source, used_in_response}] - retrieval_layers_hit TEXT, - -- JSON: ["keyword","qdrant","graph"] - retrieval_gap TEXT, - -- What the KB was missing (if anything) - market_data TEXT, - -- JSON: injected token prices - research_context TEXT, - -- Haiku pre-pass results if any - kb_context_text TEXT, - -- Full context string sent to model - tool_calls TEXT, - -- JSON: ordered array [{tool, input, output, duration_ms, ts}] - raw_response TEXT, - display_response TEXT, - confidence_score REAL, - -- Model self-rated retrieval quality 0.0-1.0 - response_time_ms INTEGER, - -- Eval pipeline columns (v10) - prompt_tokens INTEGER, - completion_tokens INTEGER, - generation_cost REAL, - embedding_cost REAL, - total_cost REAL, - blocked INTEGER DEFAULT 0, - block_reason TEXT, - query_type TEXT, - created_at TEXT DEFAULT (datetime('now')) -); - -CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status); -CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status); -CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain); -CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date); -CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage); -CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); -CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); -CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); -""" - - -def get_connection(readonly: bool = False) -> sqlite3.Connection: - """Create a SQLite connection with WAL mode and proper settings.""" - config.DB_PATH.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect( - str(config.DB_PATH), - timeout=30, - isolation_level=None, # autocommit — we manage transactions explicitly - ) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA busy_timeout=10000") - conn.execute("PRAGMA foreign_keys=ON") - if readonly: - conn.execute("PRAGMA query_only=ON") - return conn - - -@contextmanager -def transaction(conn: sqlite3.Connection): - """Context manager for explicit transactions.""" - conn.execute("BEGIN") - try: - yield conn - conn.execute("COMMIT") - except Exception: - conn.execute("ROLLBACK") - raise - - -# Branch prefix → (agent, commit_type) mapping. -# Single source of truth — used by merge.py at INSERT time and migration v7 backfill. -# Unknown prefixes → ('unknown', 'unknown') + warning log. -BRANCH_PREFIX_MAP = { - "extract": ("pipeline", "extract"), - "ingestion": ("pipeline", "extract"), - "epimetheus": ("epimetheus", "extract"), - "rio": ("rio", "research"), - "theseus": ("theseus", "research"), - "astra": ("astra", "research"), - "vida": ("vida", "research"), - "clay": ("clay", "research"), - "leo": ("leo", "entity"), - "reweave": ("pipeline", "reweave"), - "fix": ("pipeline", "fix"), -} - - -def classify_branch(branch: str) -> tuple[str, str]: - """Derive (agent, commit_type) from branch prefix. - - Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes. - """ - prefix = branch.split("/", 1)[0] if "/" in branch else branch - result = BRANCH_PREFIX_MAP.get(prefix) - if result is None: - logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch) - return ("unknown", "unknown") - return result - - -def migrate(conn: sqlite3.Connection): - """Run schema migrations.""" - conn.executescript(SCHEMA_SQL) - - # Check current version - try: - row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone() - current = row["v"] if row and row["v"] else 0 - except sqlite3.OperationalError: - current = 0 - - # --- Incremental migrations --- - if current < 2: - # Phase 2: add multiplayer columns to prs table - for stmt in [ - "ALTER TABLE prs ADD COLUMN priority TEXT", - "ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'", - "ALTER TABLE prs ADD COLUMN last_error TEXT", - ]: - try: - conn.execute(stmt) - except sqlite3.OperationalError: - pass # Column already exists (idempotent) - logger.info("Migration v2: added priority, origin, last_error to prs") - - if current < 3: - # Phase 3: retry budget — track eval attempts and issue tags per PR - for stmt in [ - "ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0", - "ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'", - ]: - try: - conn.execute(stmt) - except sqlite3.OperationalError: - pass # Column already exists (idempotent) - logger.info("Migration v3: added eval_attempts, eval_issues to prs") - - if current < 4: - # Phase 4: auto-fixer — track fix attempts per PR - for stmt in [ - "ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0", - ]: - try: - conn.execute(stmt) - except sqlite3.OperationalError: - pass # Column already exists (idempotent) - logger.info("Migration v4: added fix_attempts to prs") - - if current < 5: - # Phase 5: contributor identity system — tracks who contributed what - # Aligned with schemas/attribution.md (5 roles) + Leo's tier system. - # CI is COMPUTED from raw counts × weights, never stored. - conn.executescript(""" - CREATE TABLE IF NOT EXISTS contributors ( - handle TEXT PRIMARY KEY, - display_name TEXT, - agent_id TEXT, - first_contribution TEXT, - last_contribution TEXT, - tier TEXT DEFAULT 'new', - -- new, contributor, veteran - sourcer_count INTEGER DEFAULT 0, - extractor_count INTEGER DEFAULT 0, - challenger_count INTEGER DEFAULT 0, - synthesizer_count INTEGER DEFAULT 0, - reviewer_count INTEGER DEFAULT 0, - claims_merged INTEGER DEFAULT 0, - challenges_survived INTEGER DEFAULT 0, - domains TEXT DEFAULT '[]', - highlights TEXT DEFAULT '[]', - identities TEXT DEFAULT '{}', - created_at TEXT DEFAULT (datetime('now')), - updated_at TEXT DEFAULT (datetime('now')) - ); - - CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier); - """) - logger.info("Migration v5: added contributors table") - - if current < 6: - # Phase 6: analytics — time-series metrics snapshots for trending dashboard - conn.executescript(""" - CREATE TABLE IF NOT EXISTS metrics_snapshots ( - ts TEXT DEFAULT (datetime('now')), - throughput_1h INTEGER, - approval_rate REAL, - open_prs INTEGER, - merged_total INTEGER, - closed_total INTEGER, - conflict_total INTEGER, - evaluated_24h INTEGER, - fix_success_rate REAL, - rejection_broken_wiki_links INTEGER DEFAULT 0, - rejection_frontmatter_schema INTEGER DEFAULT 0, - rejection_near_duplicate INTEGER DEFAULT 0, - rejection_confidence INTEGER DEFAULT 0, - rejection_other INTEGER DEFAULT 0, - extraction_model TEXT, - eval_domain_model TEXT, - eval_leo_model TEXT, - prompt_version TEXT, - pipeline_version TEXT, - source_origin_agent INTEGER DEFAULT 0, - source_origin_human INTEGER DEFAULT 0, - source_origin_scraper INTEGER DEFAULT 0 - ); - - CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts); - """) - logger.info("Migration v6: added metrics_snapshots table for analytics dashboard") - - if current < 7: - # Phase 7: agent attribution + commit_type for dashboard - # commit_type column + backfill agent/commit_type from branch prefix - try: - conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))") - except sqlite3.OperationalError: - pass # column already exists from CREATE TABLE - # Backfill agent and commit_type from branch prefix - rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall() - for row in rows: - agent, commit_type = classify_branch(row["branch"]) - conn.execute( - "UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)", - (agent, commit_type, row["number"]), - ) - backfilled = len(rows) - logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled) - - if current < 8: - # Phase 8: response audit — full-chain visibility for agent response quality - # Captures: query → tool calls → retrieval → context → response → confidence - # Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops) - conn.executescript(""" - CREATE TABLE IF NOT EXISTS response_audit ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL DEFAULT (datetime('now')), - chat_id INTEGER, - user TEXT, - agent TEXT DEFAULT 'rio', - model TEXT, - query TEXT, - conversation_window TEXT, -- intentional transcript duplication for audit self-containment - entities_matched TEXT, - claims_matched TEXT, - retrieval_layers_hit TEXT, - retrieval_gap TEXT, - market_data TEXT, - research_context TEXT, - kb_context_text TEXT, - tool_calls TEXT, - raw_response TEXT, - display_response TEXT, - confidence_score REAL, - response_time_ms INTEGER, - created_at TEXT DEFAULT (datetime('now')) - ); - - CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); - CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); - CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); - """) - logger.info("Migration v8: added response_audit table for agent response auditing") - - if current < 9: - # Phase 9: rebuild prs table to expand CHECK constraint on commit_type. - # SQLite cannot ALTER CHECK constraints in-place — must rebuild table. - # Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown - # New constraint: adds challenge,enrich,synthesize - # Also re-derive commit_type from branch prefix for rows with invalid/NULL values. - - # Step 1: Get all column names from existing table - cols_info = conn.execute("PRAGMA table_info(prs)").fetchall() - col_names = [c["name"] for c in cols_info] - col_list = ", ".join(col_names) - - # Step 2: Create new table with expanded CHECK constraint - conn.executescript(f""" - CREATE TABLE prs_new ( - number INTEGER PRIMARY KEY, - source_path TEXT REFERENCES sources(path), - branch TEXT, - status TEXT NOT NULL DEFAULT 'open', - domain TEXT, - agent TEXT, - commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')), - tier TEXT, - tier0_pass INTEGER, - leo_verdict TEXT DEFAULT 'pending', - domain_verdict TEXT DEFAULT 'pending', - domain_agent TEXT, - domain_model TEXT, - priority TEXT, - origin TEXT DEFAULT 'pipeline', - transient_retries INTEGER DEFAULT 0, - substantive_retries INTEGER DEFAULT 0, - last_error TEXT, - last_attempt TEXT, - cost_usd REAL DEFAULT 0, - created_at TEXT DEFAULT (datetime('now')), - merged_at TEXT - ); - INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs; - DROP TABLE prs; - ALTER TABLE prs_new RENAME TO prs; - """) - logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint") - - # Step 3: Re-derive commit_type from branch prefix for invalid/NULL values - rows = conn.execute( - """SELECT number, branch FROM prs - WHERE branch IS NOT NULL - AND (commit_type IS NULL - OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))""" - ).fetchall() - fixed = 0 - for row in rows: - agent, commit_type = classify_branch(row["branch"]) - conn.execute( - "UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?", - (agent, commit_type, row["number"]), - ) - fixed += 1 - conn.commit() - logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed) - - if current < 10: - # Add eval pipeline columns to response_audit - # VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern - for col_def in [ - ("prompt_tokens", "INTEGER"), - ("completion_tokens", "INTEGER"), - ("generation_cost", "REAL"), - ("embedding_cost", "REAL"), - ("total_cost", "REAL"), - ("blocked", "INTEGER DEFAULT 0"), - ("block_reason", "TEXT"), - ("query_type", "TEXT"), - ]: - try: - conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}") - except sqlite3.OperationalError: - pass # Column already exists - conn.commit() - logger.info("Migration v10: added eval pipeline columns to response_audit") - - - if current < 11: - # Phase 11: compute tracking — extended costs table columns - # (May already exist on VPS from manual deploy — idempotent ALTERs) - for col_def in [ - ("duration_ms", "INTEGER DEFAULT 0"), - ("cache_read_tokens", "INTEGER DEFAULT 0"), - ("cache_write_tokens", "INTEGER DEFAULT 0"), - ("cost_estimate_usd", "REAL DEFAULT 0"), - ]: - try: - conn.execute(f"ALTER TABLE costs ADD COLUMN {col_def[0]} {col_def[1]}") - except sqlite3.OperationalError: - pass # Column already exists - conn.commit() - logger.info("Migration v11: added compute tracking columns to costs") - - if current < 12: - # Phase 12: structured review records — captures all evaluation outcomes - # including rejections, disagreements, and approved-with-changes. - # Schema locked with Leo (2026-04-01). - conn.executescript(""" - CREATE TABLE IF NOT EXISTS review_records ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - pr_number INTEGER NOT NULL, - claim_path TEXT, - domain TEXT, - agent TEXT, - reviewer TEXT NOT NULL, - reviewer_model TEXT, - outcome TEXT NOT NULL - CHECK (outcome IN ('approved', 'approved-with-changes', 'rejected')), - rejection_reason TEXT - CHECK (rejection_reason IS NULL OR rejection_reason IN ( - 'fails-standalone-test', 'duplicate', 'scope-mismatch', - 'evidence-insufficient', 'framing-poor', 'other' - )), - disagreement_type TEXT - CHECK (disagreement_type IS NULL OR disagreement_type IN ( - 'factual', 'scope', 'framing', 'evidence' - )), - notes TEXT, - batch_id TEXT, - claims_in_batch INTEGER DEFAULT 1, - reviewed_at TEXT DEFAULT (datetime('now')) - ); - CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number); - CREATE INDEX IF NOT EXISTS idx_review_records_outcome ON review_records(outcome); - CREATE INDEX IF NOT EXISTS idx_review_records_domain ON review_records(domain); - CREATE INDEX IF NOT EXISTS idx_review_records_reviewer ON review_records(reviewer); - """) - logger.info("Migration v12: created review_records table") - - if current < 16: - # Phase 16: trace_id on audit_log — queryable provenance chain. - # Auto-extracted from detail JSON's "pr" field by audit(). - # Backfill existing rows from their detail JSON. - try: - conn.execute("ALTER TABLE audit_log ADD COLUMN trace_id TEXT") - except sqlite3.OperationalError: - pass # Column already exists - conn.execute(""" - UPDATE audit_log - SET trace_id = json_extract(detail, '$.pr') - WHERE trace_id IS NULL - AND detail IS NOT NULL - AND json_extract(detail, '$.pr') IS NOT NULL - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_log_trace ON audit_log(trace_id)") - conn.commit() - logger.info("Migration v16: added trace_id to audit_log + backfilled from detail JSON") - - if current < SCHEMA_VERSION: - conn.execute( - "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", - (SCHEMA_VERSION,), - ) - conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML - logger.info("Database migrated to schema version %d", SCHEMA_VERSION) - else: - logger.debug("Database at schema version %d", current) - - -def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None, *, trace_id: str = None): - """Write an audit log entry. - - trace_id is auto-extracted from detail JSON's "pr" field if not provided. - This gives every audit row a queryable trace without changing any call site. - """ - if trace_id is None and detail: - try: - trace_id = str(json.loads(detail).get("pr", "")) - except (json.JSONDecodeError, TypeError, AttributeError): - pass - if trace_id == "": - trace_id = None - conn.execute( - "INSERT INTO audit_log (stage, event, detail, trace_id) VALUES (?, ?, ?, ?)", - (stage, event, detail, trace_id), - ) - - - - -def record_review(conn, pr_number: int, reviewer: str, outcome: str, *, - claim_path: str = None, domain: str = None, agent: str = None, - reviewer_model: str = None, rejection_reason: str = None, - disagreement_type: str = None, notes: str = None, - claims_in_batch: int = 1): - """Record a structured review outcome. - - Called from evaluate stage after Leo/domain reviewer returns a verdict. - outcome must be: approved, approved-with-changes, or rejected. - """ - batch_id = str(pr_number) - conn.execute( - """INSERT INTO review_records - (pr_number, claim_path, domain, agent, reviewer, reviewer_model, - outcome, rejection_reason, disagreement_type, notes, - batch_id, claims_in_batch) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (pr_number, claim_path, domain, agent, reviewer, reviewer_model, - outcome, rejection_reason, disagreement_type, notes, - batch_id, claims_in_batch), - ) - -def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str): - """Append a priority assessment to a source's priority_log. - - NOTE: This does NOT update the source's priority column. The priority column - is the authoritative priority, set only by initial triage or human override. - The priority_log records each stage's opinion for offline calibration analysis. - (Bug caught by Theseus — original version overwrote priority with each stage's opinion.) - (Race condition fix per Vida — read-then-write wrapped in transaction.) - """ - conn.execute("BEGIN") - try: - row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone() - if not row: - conn.execute("ROLLBACK") - return - log = json.loads(row["priority_log"] or "[]") - log.append({"stage": stage, "priority": priority, "reasoning": reasoning}) - conn.execute( - "UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?", - (json.dumps(log), path), - ) - conn.execute("COMMIT") - except Exception: - conn.execute("ROLLBACK") - raise - - -def insert_response_audit(conn: sqlite3.Connection, **kwargs): - """Insert a response audit record. All fields optional except query.""" - cols = [ - "timestamp", "chat_id", "user", "agent", "model", "query", - "conversation_window", "entities_matched", "claims_matched", - "retrieval_layers_hit", "retrieval_gap", "market_data", - "research_context", "kb_context_text", "tool_calls", - "raw_response", "display_response", "confidence_score", - "response_time_ms", - # Eval pipeline columns (v10) - "prompt_tokens", "completion_tokens", "generation_cost", - "embedding_cost", "total_cost", "blocked", "block_reason", - "query_type", - ] - present = {k: v for k, v in kwargs.items() if k in cols and v is not None} - if not present: - return - col_names = ", ".join(present.keys()) - placeholders = ", ".join("?" for _ in present) - conn.execute( - f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})", - tuple(present.values()), - ) - - -def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"): - """Set a source's authoritative priority. Used for human overrides and initial triage.""" - conn.execute( - "UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?", - (priority, path), - ) - append_priority_log(conn, path, "override", priority, reason) diff --git a/ops/pipeline-v2/lib/stale_pr.py b/ops/pipeline-v2/lib/stale_pr.py index c2873d070..abd264369 100644 --- a/ops/pipeline-v2/lib/stale_pr.py +++ b/ops/pipeline-v2/lib/stale_pr.py @@ -20,7 +20,7 @@ from .forgejo import api, repo_path logger = logging.getLogger("pipeline.stale_pr") -STALE_THRESHOLD_MINUTES = 30 +STALE_THRESHOLD_MINUTES = 45 async def check_stale_prs(conn) -> tuple[int, int]: