#!/usr/bin/env python3 """Backfill contribution_events by replaying merged PRs from pipeline.db + worktree. For each merged PR: - Derive author from prs.submitted_by → git author → branch prefix - Emit author event (role=author, weight=0.30, claim_path=NULL) - For each claim file under a knowledge prefix, parse frontmatter and emit originator events for sourcer entries that differ from the author - Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent (when domain_verdict='approve' and not Leo) - Emit challenger/synthesizer events for Pentagon-Agent trailers on agent-owned branches (theseus/*, rio/*, etc.) based on commit_type Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run. Usage: python3 scripts/backfill-events.py --dry-run # Count events without writing python3 scripts/backfill-events.py # Apply Runs read-only against the git worktree; only writes to pipeline.db. """ import argparse import os import re import sqlite3 import subprocess import sys from collections import Counter from pathlib import Path DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db") REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main") # Role weights — must match lib/contributor.py ROLE_WEIGHTS. ROLE_WEIGHTS = { "author": 0.30, "challenger": 0.25, "synthesizer": 0.20, "originator": 0.15, "evaluator": 0.05, } PENTAGON_AGENTS = frozenset({ "rio", "leo", "theseus", "vida", "clay", "astra", "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", "pipeline", }) # Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES. # Duplicated here because this script runs standalone (no pipeline package import). AGENT_BRANCH_PREFIXES = ( "rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/", ) TRAILER_EVENT_ROLE = { "challenge": "challenger", "enrich": "synthesizer", "research": "synthesizer", "reweave": "synthesizer", } KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/") BOT_AUTHORS = frozenset({ "teleo", "teleo-bot", "pipeline", "github-actions[bot]", "forgejo-actions", }) def normalize_handle(conn: sqlite3.Connection, handle: str) -> str: if not handle: return "" h = handle.strip().lower().lstrip("@") row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone() if row: return row[0] return h def classify_kind(handle: str) -> str: h = handle.strip().lower().lstrip("@") return "agent" if h in PENTAGON_AGENTS else "person" def parse_frontmatter(text: str): """Minimal YAML frontmatter parser using PyYAML when available.""" if not text.startswith("---"): return None end = text.find("---", 3) if end == -1: return None raw = text[3:end] try: import yaml fm = yaml.safe_load(raw) return fm if isinstance(fm, dict) else None except ImportError: return None except Exception: return None def extract_sourcers_from_file(path: Path) -> list[str]: """Return the sourcer handles from a claim file's frontmatter. Matches three formats: 1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }` 2. Bare-key flat: `sourcer: alexastrum` 3. Prefix-keyed: `attribution_sourcer: alexastrum` """ try: content = path.read_text(encoding="utf-8") except (FileNotFoundError, PermissionError, UnicodeDecodeError): return [] fm = parse_frontmatter(content) if not fm: return [] handles: list[str] = [] attr = fm.get("attribution") if isinstance(attr, dict): entries = attr.get("sourcer", []) if isinstance(entries, list): for e in entries: if isinstance(e, dict) and "handle" in e: handles.append(e["handle"]) elif isinstance(e, str): handles.append(e) elif isinstance(entries, str): handles.append(entries) return handles flat = fm.get("attribution_sourcer") if flat: if isinstance(flat, str): handles.append(flat) elif isinstance(flat, list): handles.extend(v for v in flat if isinstance(v, str)) if handles: return handles bare = fm.get("sourcer") if bare: if isinstance(bare, str): handles.append(bare) elif isinstance(bare, list): handles.extend(v for v in bare if isinstance(v, str)) return handles _HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$") def valid_handle(h: str) -> bool: if not h: return False lower = h.strip().lower().lstrip("@") if lower.endswith("-") or lower.endswith("_"): return False return bool(_HANDLE_RE.match(lower)) def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str: """Run a git command, return stdout. Returns empty string on failure.""" try: result = subprocess.run( ["git", *args], cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False, ) return result.stdout except (subprocess.TimeoutExpired, OSError): return "" def git_first_commit_author(pr_branch: str, merged_at: str) -> str: """Best-effort: find git author of first non-merge commit on the branch. PR branches are usually deleted after merge. We fall back to scanning main commits around merged_at for commits matching the branch slug. """ # Post-merge branches are cleaned up. For the backfill, we accept that this # path rarely yields results and rely on submitted_by + branch prefix. return "" def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None: """Author precedence: submitted_by → branch-prefix agent for agent-owned branches.""" if pr.get("submitted_by"): cand = pr["submitted_by"].strip().lower().lstrip("@") if cand and cand not in BOT_AUTHORS: return cand branch = pr.get("branch") or "" if "/" in branch: prefix = branch.split("/", 1)[0].lower() if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"): return prefix return None def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp): canonical = normalize_handle(conn, handle) if not valid_handle(canonical): return kind = classify_kind(canonical) weight = ROLE_WEIGHTS[role] counts[(role, "attempt")] += 1 if dry_run: counts[(role, "would_insert")] += 1 return cur = conn.execute( """INSERT OR IGNORE INTO contribution_events (handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""", (canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp), ) if cur.rowcount > 0: counts[(role, "inserted")] += 1 else: counts[(role, "skipped_dup")] += 1 def files_added_in_pr(pr_number: int, branch: str) -> list[str]: """Best-effort: list added .md files in the PR. Uses prs.source_path as a fallback signal (the claim being added). If the branch no longer exists post-merge, this will return []; we accept the loss for historical PRs where the granular per-claim events can't be recovered — PR-level author/evaluator events still land correctly. """ # Post-merge PR branches are deleted from Forgejo so we can't diff them. # For the backfill we use prs.source_path — for extract/* PRs this points to # the source inbox file; we can glob the claim files from the extract branch # commit on main. But main's commits don't track which files a given PR touched. # Accept the loss: backfill emits only PR-level events (author, evaluator, # challenger/synthesizer). Originator events come from parsing claim files # attributed to the branch via description field which lists claim titles. return [] def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)") args = parser.parse_args() if not Path(DB_PATH).exists(): print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr) sys.exit(1) conn = sqlite3.connect(DB_PATH, timeout=30) conn.row_factory = sqlite3.Row # Sanity: contribution_events exists (v24 migration applied) try: conn.execute("SELECT 1 FROM contribution_events LIMIT 1") except sqlite3.OperationalError: print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr) sys.exit(2) # Walk all merged knowledge PRs query = """ SELECT number, branch, domain, source_channel, submitted_by, leo_verdict, domain_verdict, domain_agent, commit_type, merged_at FROM prs WHERE status = 'merged' ORDER BY merged_at ASC """ if args.limit: query += f" LIMIT {args.limit}" prs = conn.execute(query).fetchall() print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...") counts: Counter = Counter() repo = Path(REPO_DIR) for pr in prs: pr_number = pr["number"] branch = pr["branch"] or "" domain = pr["domain"] channel = pr["source_channel"] merged_at = pr["merged_at"] # Skip pipeline-only branches for author credit (extract/*, reweave/*, # fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But # evaluator credit for Leo/domain_agent still applies. is_pipeline_branch = branch.startswith(( "extract/", "reweave/", "fix/", "ingestion/", "epimetheus/", )) # ── AUTHOR ── # For pipeline branches, submitted_by carries the real author (the # human who submitted the source via Telegram/etc). For agent branches, # the agent is author. For external branches (gh-pr-*), git author is # in submitted_by from the sync-mirror pipeline. author = derive_author(conn, dict(pr)) if author: emit(conn, counts, args.dry_run, author, "author", pr_number, None, domain, channel, merged_at) # ── EVALUATOR ── if pr["leo_verdict"] == "approve": emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number, None, domain, channel, merged_at) if pr["domain_verdict"] == "approve" and pr["domain_agent"]: dagent = pr["domain_agent"].strip().lower() if dagent and dagent != "leo": emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number, None, domain, channel, merged_at) # ── CHALLENGER / SYNTHESIZER from branch+commit_type ── # Only fires on agent-owned branches. Pipeline branches aren't creditable # work (they're machine extraction, evaluator already captures the review). if branch.startswith(AGENT_BRANCH_PREFIXES): prefix = branch.split("/", 1)[0].lower() event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "") if event_role: emit(conn, counts, args.dry_run, prefix, event_role, pr_number, None, domain, channel, merged_at) # ── ORIGINATOR per claim ── # Walk claim files currently on main whose content was added in this PR. # We can't diff old branches (deleted post-merge), but for extract PRs # the source_path + description carry claim titles — too lossy to build # per-claim events reliably. Strategy: walk ALL claim files that have a # sourcer in their frontmatter and assign them to the PR whose # source_path matches (via description or filename heuristic). # DEFERRED: per-claim originator events require branch introspection # that fails on deleted branches. Backfill emits PR-level events only. # Forward traffic (post-deploy) gets per-claim originator events via # record_contributor_attribution's added-files walk. if not args.dry_run: conn.commit() # Originator is emitted in the claim-level pass below, not the PR-level pass. # Previous summary listed it here with attempted=0 which confused operators. print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===") for role in ("author", "challenger", "synthesizer", "evaluator"): att = counts[(role, "attempt")] if args.dry_run: wi = counts[(role, "would_insert")] print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}") else: ins = counts[(role, "inserted")] skip = counts[(role, "skipped_dup")] print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") # ── Per-claim originator pass ── # Separate pass: walk the current knowledge tree, parse sourcer frontmatter, # and attach each claim to the merging PR via a claim_path → pr_number map # built from prs.description (pipe-separated claim titles). Imperfect — some # PRs have NULL description or mismatched titles — but recovers the bulk of # historical originator credit. print("\n=== Claim-level originator pass ===") # Build title → pr_number map from prs.description title_to_pr: dict[str, int] = {} for r in conn.execute( "SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''" ).fetchall(): desc = r["description"] or "" for title in desc.split(" | "): title = title.strip() if title: # Last-writer wins. Conflicts are rare (titles unique in practice). title_to_pr[title.lower()] = r["number"] claim_counts = Counter() claim_count = 0 originator_count = 0 for md in sorted(repo.glob("domains/**/*.md")) + \ sorted(repo.glob("core/**/*.md")) + \ sorted(repo.glob("foundations/**/*.md")) + \ sorted(repo.glob("decisions/**/*.md")): rel = str(md.relative_to(repo)) # Match via filename stem (with spaces and hyphens) against description titles stem = md.stem # Multiple matching strategies pr_number = title_to_pr.get(stem.lower()) if not pr_number: # Hyphenated slug → space variant pr_number = title_to_pr.get(stem.replace("-", " ").lower()) if not pr_number: claim_counts["no_pr_match"] += 1 continue sourcers = extract_sourcers_from_file(md) if not sourcers: claim_counts["no_sourcer"] += 1 continue claim_count += 1 # Look up author for this PR to skip self-credit pr_row = conn.execute( "SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?", (pr_number,), ).fetchone() if not pr_row: continue author = derive_author(conn, dict(pr_row)) author_canonical = normalize_handle(conn, author) if author else None for src_handle in sourcers: src_canonical = normalize_handle(conn, src_handle) if not valid_handle(src_canonical): claim_counts["invalid_handle"] += 1 continue if src_canonical == author_canonical: claim_counts["skip_self"] += 1 continue emit(conn, counts, args.dry_run, src_handle, "originator", pr_number, rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"]) originator_count += 1 if not args.dry_run: conn.commit() print(f" Claims processed: {claim_count}") print(f" Originator events emitted: {originator_count}") print(f" Breakdown: {dict(claim_counts)}") att = counts[("originator", "attempt")] if args.dry_run: wi = counts[("originator", "would_insert")] print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}") else: ins = counts[("originator", "inserted")] skip = counts[("originator", "skipped_dup")] print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") if not args.dry_run: total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0] print(f"\nTotal contribution_events rows: {total}") if __name__ == "__main__": main()