Fix source_channel migration: add to SCHEMA_SQL, default 'unknown' not 'telegram'

Ganymede review findings:
1. source_channel was missing from CREATE TABLE (fresh installs wouldn't have it)
2. Default fallback changed from 'telegram' to 'unknown' — unknown prefixes
   are genuinely unknown, not telegram
3. Cross-reference comments added between BRANCH_PREFIX_MAP and _CHANNEL_MAP

Also wires classify_source_channel into merge.py PR discovery INSERT.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-17 13:27:15 +01:00
parent 25a537d2e1
commit ac794f5c68
2 changed files with 61 additions and 5 deletions

View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 21
SCHEMA_VERSION = 22
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -71,6 +71,7 @@ CREATE TABLE IF NOT EXISTS prs (
cost_usd REAL DEFAULT 0,
auto_merge INTEGER DEFAULT 0,
github_pr INTEGER,
source_channel TEXT,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
@ -196,6 +197,7 @@ def transaction(conn: sqlite3.Connection):
# Branch prefix → (agent, commit_type) mapping.
# Single source of truth — used by merge.py at INSERT time and migration v7 backfill.
# Unknown prefixes → ('unknown', 'unknown') + warning log.
# Keep in sync with _CHANNEL_MAP below.
BRANCH_PREFIX_MAP = {
"extract": ("pipeline", "extract"),
"ingestion": ("pipeline", "extract"),
@ -228,6 +230,31 @@ def classify_branch(branch: str) -> tuple[str, str]:
return result
# Keep in sync with BRANCH_PREFIX_MAP above.
_CHANNEL_MAP = {
"extract": "telegram",
"ingestion": "telegram",
"rio": "agent",
"theseus": "agent",
"astra": "agent",
"vida": "agent",
"clay": "agent",
"leo": "agent",
"oberon": "agent",
"reweave": "maintenance",
"epimetheus": "maintenance",
"fix": "maintenance",
}
def classify_source_channel(branch: str, *, github_pr: int = None) -> str:
"""Derive source_channel from branch prefix and github_pr flag."""
if github_pr is not None or branch.startswith("gh-pr-"):
return "github"
prefix = branch.split("/", 1)[0] if "/" in branch else branch
return _CHANNEL_MAP.get(prefix, "unknown")
def migrate(conn: sqlite3.Connection):
"""Run schema migrations."""
conn.executescript(SCHEMA_SQL)
@ -562,6 +589,34 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v21: added github_pr column + index to prs")
if current < 22:
try:
conn.execute("ALTER TABLE prs ADD COLUMN source_channel TEXT")
except sqlite3.OperationalError:
pass
conn.execute("""
UPDATE prs SET source_channel = CASE
WHEN github_pr IS NOT NULL THEN 'github'
WHEN branch LIKE 'gh-pr-%%' THEN 'github'
WHEN branch LIKE 'theseus/%%' THEN 'agent'
WHEN branch LIKE 'rio/%%' THEN 'agent'
WHEN branch LIKE 'astra/%%' THEN 'agent'
WHEN branch LIKE 'clay/%%' THEN 'agent'
WHEN branch LIKE 'vida/%%' THEN 'agent'
WHEN branch LIKE 'oberon/%%' THEN 'agent'
WHEN branch LIKE 'leo/%%' THEN 'agent'
WHEN branch LIKE 'reweave/%%' THEN 'maintenance'
WHEN branch LIKE 'epimetheus/%%' THEN 'maintenance'
WHEN branch LIKE 'fix/%%' THEN 'maintenance'
WHEN branch LIKE 'extract/%%' THEN 'telegram'
WHEN branch LIKE 'ingestion/%%' THEN 'telegram'
ELSE 'unknown'
END
WHERE source_channel IS NULL
""")
conn.commit()
logger.info("Migration v22: added source_channel to prs + backfilled from branch prefix")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -20,7 +20,7 @@ import shutil
from collections import defaultdict
from . import config, db
from .db import classify_branch
from .db import classify_branch, classify_source_channel
from .contributor import record_contributor_attribution
from .dedup import dedup_evidence_blocks
from .domains import detect_domain_from_branch
@ -117,6 +117,7 @@ async def discover_external_prs(conn) -> int:
priority = "high" if origin == "human" else None
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
agent, commit_type = classify_branch(pr["head"]["ref"])
source_channel = classify_source_channel(pr["head"]["ref"])
# For human PRs, submitted_by is the Forgejo author.
# For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by).
@ -125,9 +126,9 @@ async def discover_external_prs(conn) -> int:
conn.execute(
"""INSERT OR IGNORE INTO prs
(number, branch, status, origin, priority, domain, agent, commit_type,
prompt_version, pipeline_version, submitted_by)
VALUES (?, ?, 'open', ?, ?, ?, ?, ?, ?, ?, ?)""",
(pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type, config.PROMPT_VERSION, config.PIPELINE_VERSION, submitted_by),
prompt_version, pipeline_version, submitted_by, source_channel)
VALUES (?, ?, 'open', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(pr["number"], pr["head"]["ref"], origin, priority, domain, agent, commit_type, config.PROMPT_VERSION, config.PIPELINE_VERSION, submitted_by, source_channel),
)
db.audit(
conn,