#!/usr/bin/env python3 """Backfill contribution_events by replaying merged PRs from pipeline.db + worktree. For each merged PR: - Derive author from prs.submitted_by → git author → branch prefix - Emit author event (role=author, weight=0.30, claim_path=NULL) - For each claim file under a knowledge prefix, parse frontmatter and emit originator events for sourcer entries that differ from the author - Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent (when domain_verdict='approve' and not Leo) - Emit challenger/synthesizer events for Pentagon-Agent trailers on agent-owned branches (theseus/*, rio/*, etc.) based on commit_type Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run. Usage: python3 scripts/backfill-events.py --dry-run # Count events without writing python3 scripts/backfill-events.py # Apply Runs read-only against the git worktree; only writes to pipeline.db. """ import argparse import os import re import sqlite3 import subprocess import sys from collections import Counter from pathlib import Path DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db") REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main") # Role weights — must match lib/contributor.py ROLE_WEIGHTS. ROLE_WEIGHTS = { "author": 0.30, "challenger": 0.25, "synthesizer": 0.20, "originator": 0.15, "evaluator": 0.05, } PENTAGON_AGENTS = frozenset({ "rio", "leo", "theseus", "vida", "clay", "astra", "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", "pipeline", }) # Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES. # Duplicated here because this script runs standalone (no pipeline package import). AGENT_BRANCH_PREFIXES = ( "rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/", ) TRAILER_EVENT_ROLE = { "challenge": "challenger", "enrich": "synthesizer", "research": "synthesizer", "reweave": "synthesizer", } KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/") BOT_AUTHORS = frozenset({ "teleo", "teleo-bot", "pipeline", "github-actions[bot]", "forgejo-actions", }) def normalize_handle(conn: sqlite3.Connection, handle: str) -> str: if not handle: return "" h = handle.strip().lower().lstrip("@") row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone() if row: return row[0] return h def classify_kind(handle: str) -> str: h = handle.strip().lower().lstrip("@") return "agent" if h in PENTAGON_AGENTS else "person" def parse_frontmatter(text: str): """Minimal YAML frontmatter parser using PyYAML when available.""" if not text.startswith("---"): return None end = text.find("---", 3) if end == -1: return None raw = text[3:end] try: import yaml fm = yaml.safe_load(raw) return fm if isinstance(fm, dict) else None except ImportError: return None except Exception: return None def extract_sourcers_from_file(path: Path) -> list[str]: """Return the sourcer handles from a claim file's frontmatter. Matches three formats: 1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }` 2. Bare-key flat: `sourcer: alexastrum` 3. Prefix-keyed: `attribution_sourcer: alexastrum` """ try: content = path.read_text(encoding="utf-8") except (FileNotFoundError, PermissionError, UnicodeDecodeError): return [] fm = parse_frontmatter(content) if not fm: return [] handles: list[str] = [] attr = fm.get("attribution") if isinstance(attr, dict): entries = attr.get("sourcer", []) if isinstance(entries, list): for e in entries: if isinstance(e, dict) and "handle" in e: handles.append(e["handle"]) elif isinstance(e, str): handles.append(e) elif isinstance(entries, str): handles.append(entries) return handles flat = fm.get("attribution_sourcer") if flat: if isinstance(flat, str): handles.append(flat) elif isinstance(flat, list): handles.extend(v for v in flat if isinstance(v, str)) if handles: return handles bare = fm.get("sourcer") if bare: if isinstance(bare, str): handles.append(bare) elif isinstance(bare, list): handles.extend(v for v in bare if isinstance(v, str)) return handles _HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$") def valid_handle(h: str) -> bool: if not h: return False lower = h.strip().lower().lstrip("@") if lower.endswith("-") or lower.endswith("_"): return False return bool(_HANDLE_RE.match(lower)) def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str: """Run a git command, return stdout. Returns empty string on failure.""" try: result = subprocess.run( ["git", *args], cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False, ) return result.stdout except (subprocess.TimeoutExpired, OSError): return "" def git_first_commit_author(pr_branch: str, merged_at: str) -> str: """Best-effort: find git author of first non-merge commit on the branch. PR branches are usually deleted after merge. We fall back to scanning main commits around merged_at for commits matching the branch slug. """ # Post-merge branches are cleaned up. For the backfill, we accept that this # path rarely yields results and rely on submitted_by + branch prefix. return "" def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None: """Author precedence: submitted_by → branch-prefix agent for agent-owned branches.""" if pr.get("submitted_by"): cand = pr["submitted_by"].strip().lower().lstrip("@") if cand and cand not in BOT_AUTHORS: return cand branch = pr.get("branch") or "" if "/" in branch: prefix = branch.split("/", 1)[0].lower() if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"): return prefix return None def find_pr_for_claim( conn: sqlite3.Connection, repo: Path, md: Path, ) -> tuple[int | None, str]: """Recover the Forgejo PR number that introduced a claim file. Returns (pr_number, strategy) — strategy is one of: 'sourced_from' — frontmatter sourced_from matched prs.source_path 'git_subject' — git log first-add commit message matched a branch pattern 'title_desc' — filename stem matched a title in prs.description 'github_pr' — recovery commit mentioned GitHub PR # → prs.github_pr 'none' — no strategy found a match Order is chosen by reliability: 1. sourced_from (explicit provenance, most reliable when present) 2. git_subject (covers Leo research, Cameron challenges, Theseus contrib) 3. title_desc (current fallback — brittle when description is NULL) 4. github_pr (recovery commits referencing erased GitHub PRs) """ rel = str(md.relative_to(repo)) # Strategy 1: sourced_from frontmatter → prs.source_path try: content = md.read_text(encoding="utf-8") except (FileNotFoundError, PermissionError, UnicodeDecodeError): content = "" fm = parse_frontmatter(content) if content else None if fm: sourced = fm.get("sourced_from") candidate_paths: list[str] = [] if isinstance(sourced, str) and sourced: candidate_paths.append(sourced) elif isinstance(sourced, list): candidate_paths.extend(s for s in sourced if isinstance(s, str)) for sp in candidate_paths: stem = Path(sp).stem if not stem: continue row = conn.execute( """SELECT number FROM prs WHERE source_path LIKE ? AND status='merged' ORDER BY merged_at ASC LIMIT 1""", (f"%{stem}.md",), ).fetchone() if row: return row["number"], "sourced_from" # Strategy 2: git log first-add commit → subject pattern → prs.branch # Default log order is reverse-chronological; take the last line (oldest) # to get the original addition, not later rewrites. log_out = git( "log", "--diff-filter=A", "--follow", "--format=%H|||%s|||%b", "--", rel, ) if log_out.strip(): # Split on the delimiter we chose. Each commit produces 3 fields but # %b can contain blank lines — group by lines that look like a SHA. blocks: list[tuple[str, str, str]] = [] current: list[str] = [] for line in log_out.splitlines(): if re.match(r"^[a-f0-9]{40}\|\|\|", line): if current: parts = "\n".join(current).split("|||", 2) if len(parts) == 3: blocks.append((parts[0], parts[1], parts[2])) current = [line] else: current.append(line) if current: parts = "\n".join(current).split("|||", 2) if len(parts) == 3: blocks.append((parts[0], parts[1], parts[2])) if blocks: # Oldest addition — git log defaults to reverse-chronological _oldest_sha, subject, body = blocks[-1] # Pattern: ": extract claims from " m = re.match(r"^(\w+):\s*extract\s+claims\s+from\s+(\S+)", subject) if m: slug = m.group(2).rstrip(".md").rstrip(".") row = conn.execute( """SELECT number FROM prs WHERE branch LIKE ? AND status='merged' ORDER BY merged_at ASC LIMIT 1""", (f"extract/{slug}%",), ).fetchone() if row: return row["number"], "git_subject" # Pattern: ": research session " m = re.match(r"^(\w+):\s*research\s+session\s+(\d{4}-\d{2}-\d{2})", subject) if m: agent = m.group(1).lower() date = m.group(2) row = conn.execute( """SELECT number FROM prs WHERE branch LIKE ? AND status='merged' ORDER BY merged_at ASC LIMIT 1""", (f"{agent}/research-{date}%",), ).fetchone() if row: return row["number"], "git_subject" # Pattern: ": challenge" / contrib challenges / entity batches m = re.match(r"^(\w+):\s*(?:challenge|contrib|entity|synthesize)", subject) if m: agent = m.group(1).lower() row = conn.execute( """SELECT number FROM prs WHERE branch LIKE ? AND status='merged' ORDER BY merged_at ASC LIMIT 1""", (f"{agent}/%",), ).fetchone() if row: return row["number"], "git_subject" # Recovery commits referencing erased GitHub PRs (Alex/Cameron). # Subject: "Recover contribution from GitHub PR #NN (...)". # Match only when a corresponding prs row exists with github_pr=NN — # otherwise the claims were direct-to-main without a Forgejo PR # record, which requires a synthetic PR row (follow-up, not in # this script's scope). gh_match = re.search(r"GitHub\s+PR\s+#(\d+)", subject + "\n" + body) if gh_match: gh_pr = int(gh_match.group(1)) row = conn.execute( "SELECT number FROM prs WHERE github_pr = ? AND status='merged' LIMIT 1", (gh_pr,), ).fetchone() if row: return row["number"], "github_pr" # Pattern: bare "Extract N claims from " (no # agent prefix). Used in early research PRs like Shaga's claims # at PR #2025. Fall back to time-proximity: find the earliest # agent-branch PR merged within 24h AFTER this commit's date. m = re.match(r"^Extract\s+\d+\s+claims\s+from\b", subject) if m: # Get commit author date date_out = git( "log", "-1", "--format=%aI", _oldest_sha, timeout=10, ) commit_date = date_out.strip() if date_out.strip() else None if commit_date: # git %aI returns ISO 8601 with T-separator; prs.merged_at # uses SQLite's space-separator. Lexicographic comparison # fails across formats (space= datetime(?) AND merged_at <= datetime(datetime(?), '+24 hours') AND (branch LIKE 'leo/%' OR branch LIKE 'theseus/%' OR branch LIKE 'rio/%' OR branch LIKE 'astra/%' OR branch LIKE 'vida/%' OR branch LIKE 'clay/%') ORDER BY merged_at ASC LIMIT 1""", (commit_date, commit_date), ).fetchone() if row: return row["number"], "git_time_proximity" return None, "none" def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp): canonical = normalize_handle(conn, handle) if not valid_handle(canonical): return kind = classify_kind(canonical) weight = ROLE_WEIGHTS[role] counts[(role, "attempt")] += 1 if dry_run: counts[(role, "would_insert")] += 1 return cur = conn.execute( """INSERT OR IGNORE INTO contribution_events (handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""", (canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp), ) if cur.rowcount > 0: counts[(role, "inserted")] += 1 else: counts[(role, "skipped_dup")] += 1 def files_added_in_pr(pr_number: int, branch: str) -> list[str]: """Best-effort: list added .md files in the PR. Uses prs.source_path as a fallback signal (the claim being added). If the branch no longer exists post-merge, this will return []; we accept the loss for historical PRs where the granular per-claim events can't be recovered — PR-level author/evaluator events still land correctly. """ # Post-merge PR branches are deleted from Forgejo so we can't diff them. # For the backfill we use prs.source_path — for extract/* PRs this points to # the source inbox file; we can glob the claim files from the extract branch # commit on main. But main's commits don't track which files a given PR touched. # Accept the loss: backfill emits only PR-level events (author, evaluator, # challenger/synthesizer). Originator events come from parsing claim files # attributed to the branch via description field which lists claim titles. return [] def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)") args = parser.parse_args() if not Path(DB_PATH).exists(): print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr) sys.exit(1) conn = sqlite3.connect(DB_PATH, timeout=30) conn.row_factory = sqlite3.Row # Sanity: contribution_events exists (v24 migration applied) try: conn.execute("SELECT 1 FROM contribution_events LIMIT 1") except sqlite3.OperationalError: print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr) sys.exit(2) # Walk all merged knowledge PRs query = """ SELECT number, branch, domain, source_channel, submitted_by, leo_verdict, domain_verdict, domain_agent, commit_type, merged_at FROM prs WHERE status = 'merged' ORDER BY merged_at ASC """ if args.limit: query += f" LIMIT {args.limit}" prs = conn.execute(query).fetchall() print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...") counts: Counter = Counter() repo = Path(REPO_DIR) for pr in prs: pr_number = pr["number"] branch = pr["branch"] or "" domain = pr["domain"] channel = pr["source_channel"] merged_at = pr["merged_at"] # Skip pipeline-only branches for author credit (extract/*, reweave/*, # fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But # evaluator credit for Leo/domain_agent still applies. is_pipeline_branch = branch.startswith(( "extract/", "reweave/", "fix/", "ingestion/", "epimetheus/", )) # ── AUTHOR ── # For pipeline branches, submitted_by carries the real author (the # human who submitted the source via Telegram/etc). For agent branches, # the agent is author. For external branches (gh-pr-*), git author is # in submitted_by from the sync-mirror pipeline. author = derive_author(conn, dict(pr)) if author: emit(conn, counts, args.dry_run, author, "author", pr_number, None, domain, channel, merged_at) # ── EVALUATOR ── if pr["leo_verdict"] == "approve": emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number, None, domain, channel, merged_at) if pr["domain_verdict"] == "approve" and pr["domain_agent"]: dagent = pr["domain_agent"].strip().lower() if dagent and dagent != "leo": emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number, None, domain, channel, merged_at) # ── CHALLENGER / SYNTHESIZER from branch+commit_type ── # Only fires on agent-owned branches. Pipeline branches aren't creditable # work (they're machine extraction, evaluator already captures the review). if branch.startswith(AGENT_BRANCH_PREFIXES): prefix = branch.split("/", 1)[0].lower() event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "") if event_role: emit(conn, counts, args.dry_run, prefix, event_role, pr_number, None, domain, channel, merged_at) # ── ORIGINATOR per claim ── # Walk claim files currently on main whose content was added in this PR. # We can't diff old branches (deleted post-merge), but for extract PRs # the source_path + description carry claim titles — too lossy to build # per-claim events reliably. Strategy: walk ALL claim files that have a # sourcer in their frontmatter and assign them to the PR whose # source_path matches (via description or filename heuristic). # DEFERRED: per-claim originator events require branch introspection # that fails on deleted branches. Backfill emits PR-level events only. # Forward traffic (post-deploy) gets per-claim originator events via # record_contributor_attribution's added-files walk. if not args.dry_run: conn.commit() # Originator is emitted in the claim-level pass below, not the PR-level pass. # Previous summary listed it here with attempted=0 which confused operators. print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===") for role in ("author", "challenger", "synthesizer", "evaluator"): att = counts[(role, "attempt")] if args.dry_run: wi = counts[(role, "would_insert")] print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}") else: ins = counts[(role, "inserted")] skip = counts[(role, "skipped_dup")] print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") # ── Per-claim originator pass ── # Walk the knowledge tree, parse sourcer attribution, and attach each claim # to its merging PR via find_pr_for_claim's multi-strategy recovery. # Apr 24 rewrite (Ganymede-approved): replaces the single-strategy # title→description match with four strategies in reliability order. # Previous script missed PRs with NULL description (Cameron #3377) and # cross-context claims (Shaga's Leo research). Fallback title-match is # preserved to recover anything the git-log path misses. print("\n=== Claim-level originator pass ===") # Build title → pr_number map from prs.description (strategy 3 fallback) title_to_pr: dict[str, int] = {} for r in conn.execute( "SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''" ).fetchall(): desc = r["description"] or "" for title in desc.split(" | "): title = title.strip() if title: # Last-writer wins. Conflicts are rare (titles unique in practice). title_to_pr[title.lower()] = r["number"] claim_counts = Counter() strategy_counts = Counter() claim_count = 0 originator_count = 0 for md in sorted(repo.glob("domains/**/*.md")) + \ sorted(repo.glob("core/**/*.md")) + \ sorted(repo.glob("foundations/**/*.md")) + \ sorted(repo.glob("decisions/**/*.md")): rel = str(md.relative_to(repo)) stem = md.stem # Strategies 1, 2, 4 via the helper (sourced_from, git_subject, github_pr). pr_number, strategy = find_pr_for_claim(conn, repo, md) # Strategy 3 (fallback): title-match against prs.description. if not pr_number: pr_number = title_to_pr.get(stem.lower()) if not pr_number: pr_number = title_to_pr.get(stem.replace("-", " ").lower()) if pr_number: strategy = "title_desc" if not pr_number: claim_counts["no_pr_match"] += 1 continue sourcers = extract_sourcers_from_file(md) if not sourcers: claim_counts["no_sourcer"] += 1 continue claim_count += 1 strategy_counts[strategy] += 1 # Look up author for this PR to skip self-credit pr_row = conn.execute( "SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?", (pr_number,), ).fetchone() if not pr_row: continue author = derive_author(conn, dict(pr_row)) author_canonical = normalize_handle(conn, author) if author else None for src_handle in sourcers: src_canonical = normalize_handle(conn, src_handle) if not valid_handle(src_canonical): claim_counts["invalid_handle"] += 1 continue if src_canonical == author_canonical: claim_counts["skip_self"] += 1 continue emit(conn, counts, args.dry_run, src_handle, "originator", pr_number, rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"]) originator_count += 1 if not args.dry_run: conn.commit() print(f" Claims processed: {claim_count}") print(f" Originator events emitted: {originator_count}") print(f" Breakdown: {dict(claim_counts)}") print(f" Strategy hits: {dict(strategy_counts)}") att = counts[("originator", "attempt")] if args.dry_run: wi = counts[("originator", "would_insert")] print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}") else: ins = counts[("originator", "inserted")] skip = counts[("originator", "skipped_dup")] print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") if not args.dry_run: total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0] print(f"\nTotal contribution_events rows: {total}") if __name__ == "__main__": main()