diff --git a/lib/db.py b/lib/db.py index 1e60075..3062ac6 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 21 +SCHEMA_VERSION = 22 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -71,6 +71,7 @@ CREATE TABLE IF NOT EXISTS prs ( cost_usd REAL DEFAULT 0, auto_merge INTEGER DEFAULT 0, github_pr INTEGER, + source_channel TEXT, created_at TEXT DEFAULT (datetime('now')), merged_at TEXT ); @@ -196,6 +197,7 @@ def transaction(conn: sqlite3.Connection): # Branch prefix → (agent, commit_type) mapping. # Single source of truth — used by merge.py at INSERT time and migration v7 backfill. # Unknown prefixes → ('unknown', 'unknown') + warning log. +# Keep in sync with _CHANNEL_MAP below. BRANCH_PREFIX_MAP = { "extract": ("pipeline", "extract"), "ingestion": ("pipeline", "extract"), @@ -228,6 +230,31 @@ def classify_branch(branch: str) -> tuple[str, str]: return result +# Keep in sync with BRANCH_PREFIX_MAP above. +_CHANNEL_MAP = { + "extract": "telegram", + "ingestion": "telegram", + "rio": "agent", + "theseus": "agent", + "astra": "agent", + "vida": "agent", + "clay": "agent", + "leo": "agent", + "oberon": "agent", + "reweave": "maintenance", + "epimetheus": "maintenance", + "fix": "maintenance", +} + + +def classify_source_channel(branch: str, *, github_pr: int = None) -> str: + """Derive source_channel from branch prefix and github_pr flag.""" + if github_pr is not None or branch.startswith("gh-pr-"): + return "github" + prefix = branch.split("/", 1)[0] if "/" in branch else branch + return _CHANNEL_MAP.get(prefix, "unknown") + + def migrate(conn: sqlite3.Connection): """Run schema migrations.""" conn.executescript(SCHEMA_SQL) @@ -562,6 +589,34 @@ def migrate(conn: sqlite3.Connection): conn.commit() logger.info("Migration v21: added github_pr column + index to prs") + if current < 22: + try: + conn.execute("ALTER TABLE prs ADD COLUMN source_channel TEXT") + except sqlite3.OperationalError: + pass + conn.execute(""" + UPDATE prs SET source_channel = CASE + WHEN github_pr IS NOT NULL THEN 'github' + WHEN branch LIKE 'gh-pr-%%' THEN 'github' + WHEN branch LIKE 'theseus/%%' THEN 'agent' + WHEN branch LIKE 'rio/%%' THEN 'agent' + WHEN branch LIKE 'astra/%%' THEN 'agent' + WHEN branch LIKE 'clay/%%' THEN 'agent' + WHEN branch LIKE 'vida/%%' THEN 'agent' + WHEN branch LIKE 'oberon/%%' THEN 'agent' + WHEN branch LIKE 'leo/%%' THEN 'agent' + WHEN branch LIKE 'reweave/%%' THEN 'maintenance' + WHEN branch LIKE 'epimetheus/%%' THEN 'maintenance' + WHEN branch LIKE 'fix/%%' THEN 'maintenance' + WHEN branch LIKE 'extract/%%' THEN 'telegram' + WHEN branch LIKE 'ingestion/%%' THEN 'telegram' + ELSE 'unknown' + END + WHERE source_channel IS NULL + """) + conn.commit() + logger.info("Migration v22: added source_channel to prs + backfilled from branch prefix") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", diff --git a/lib/merge.py b/lib/merge.py index ed77924..1331ca1 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -20,7 +20,7 @@ import shutil from collections import defaultdict from . import config, db -from .db import classify_branch +from .db import classify_branch, classify_source_channel from .contributor import record_contributor_attribution from .dedup import dedup_evidence_blocks from .domains import detect_domain_from_branch @@ -117,6 +117,7 @@ async def discover_external_prs(conn) -> int: priority = "high" if origin == "human" else None domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) agent, commit_type = classify_branch(pr["head"]["ref"]) + source_channel = classify_source_channel(pr["head"]["ref"]) # For human PRs, submitted_by is the Forgejo author. # For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by). @@ -125,9 +126,9 @@ async def discover_external_prs(conn) -> int: conn.execute( """INSERT OR IGNORE INTO prs (number, branch, status, origin, priority, domain, agent, commit_type, - prompt_version, pipeline_version, submitted_by) - VALUES (?, ?, 'open', ?, ?, ?, ?, ?, ?, ?, ?)""", - (pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type, config.PROMPT_VERSION, config.PIPELINE_VERSION, submitted_by), + prompt_version, pipeline_version, submitted_by, source_channel) + VALUES (?, ?, 'open', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type, config.PROMPT_VERSION, config.PIPELINE_VERSION, submitted_by, source_channel), ) db.audit( conn,