From 58fa8c5276129b385d3bfb6f20f2b532415c9abf Mon Sep 17 00:00:00 2001 From: m3taversal Date: Fri, 24 Apr 2026 13:59:22 +0100 Subject: [PATCH] =?UTF-8?q?feat(attribution):=20Phase=20A=20=E2=80=94=20ev?= =?UTF-8?q?ent-sourced=20contribution=20ledger=20(schema=20v24)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces contribution_events table + non-breaking double-write. Schema lands today, forward traffic writes events alongside existing count upserts, backfill script replays history. Phase B will add leaderboard API reading from events; Phase C switches Argus dashboard over. ## Schema v24 (lib/db.py) - contribution_events: one row per credit-earning event (id, handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp) Partial UNIQUE indexes handle SQLite's NULL != NULL semantics: idx_ce_unique_claim on (handle, role, pr_number, claim_path) WHERE claim_path NOT NULL idx_ce_unique_pr on (handle, role, pr_number) WHERE claim_path IS NULL PR-level events (evaluator, author, challenger, synthesizer) dedup on 3-tuple. Per-claim events (originator) dedup on 4-tuple. Idempotent on replay. - contributor_aliases: canonical handle mapping Seeded: @thesensatore → thesensatore, cameron → cameron-s1 - contributors.kind TEXT DEFAULT 'person' Migration seeds 'agent' for known Pentagon agent handles. ## Role model (confirmed by Cory Apr 24) Weights: author 0.30, challenger 0.25, synthesizer 0.20, originator 0.15, evaluator 0.05 - author: human who submitted the PR (curation + submission work) - originator: person who authored the underlying content (rewards external creators) - challenger: agent/person who brought a productive disagreement - synthesizer: cross-domain work (enrichments, research sessions) - evaluator: reviewer who approved (Leo + domain agent) Humans-are-always-author: agents credit is capped at evaluator/synthesizer/ challenger. Pentagon agents classify as kind='agent' and surface in the agent-view leaderboard, not the default person view. ## Writer (lib/contributor.py) - New insert_contribution_event(): idempotent INSERT OR IGNORE with alias normalization + kind classification. Falls back silently on pre-v24 DBs. - record_contributor_attribution double-writes alongside existing upsert_contributor calls. Zero risk to current dashboard. - Author event: emitted once per PR from prs.submitted_by → git author → agent-branch-prefix. - Originator events: emitted per claim from frontmatter sourcer, skipping when sourcer == author (avoids self-credit double-count). - Evaluator events: Leo (always when leo_verdict='approve') + domain_agent (when domain_verdict='approve' and not Leo). - Challenger/Synthesizer: emitted from Pentagon-Agent trailer on agent-owned branches (theseus/*, rio/*, etc.) based on commit_type. Pipeline-owned branches (extract/*, reweave/*) get no trailer-based event — infrastructure work isn't contribution credit. ## Helpers (lib/attribution.py) - normalize_handle(raw, conn=None): lowercase + strip @ + alias lookup - classify_kind(handle): returns 'agent' for PENTAGON_AGENTS, else 'person' Intentionally narrow. Orgs get classified by operator review, not heuristics. ## Backfill (scripts/backfill-events.py) Replays all merged PRs into events. Idempotent (safe to re-run). Emits: - PR-level: author, evaluator, challenger, synthesizer - Per-claim: originator (walks knowledge tree, matches via description titles) Known limitation: post-merge PR branches are deleted from Forgejo, so we can't diff them for granular per-claim events. Claim→PR mapping uses prs.description (pipe-separated titles). Misses some edge cases but recovers the bulk of historical originator credit. Forward traffic gets clean per-claim events via the normal record_contributor_attribution path. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/attribution.py | 52 +++++ lib/contributor.py | 157 +++++++++++++- lib/db.py | 110 +++++++++- scripts/backfill-events.py | 431 +++++++++++++++++++++++++++++++++++++ 4 files changed, 748 insertions(+), 2 deletions(-) create mode 100644 scripts/backfill-events.py diff --git a/lib/attribution.py b/lib/attribution.py index 2034e1f..681e029 100644 --- a/lib/attribution.py +++ b/lib/attribution.py @@ -48,6 +48,58 @@ def _filter_valid_handles(result: dict) -> dict: return filtered +# ─── Handle normalization + kind classification (schema v24) ────────────── + +# Known Pentagon agents. Used to classify contributor kind='agent' so the +# leaderboard can filter them out of the default person view. +PENTAGON_AGENTS = frozenset({ + "rio", "leo", "theseus", "vida", "clay", "astra", + "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", + "pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*) +}) + + +def normalize_handle(handle: str, conn=None) -> str: + """Canonicalize a handle: lowercase, strip @, resolve alias if conn provided. + + Examples: + '@thesensatore' → 'thesensatore' + 'Cameron' → 'cameron' → 'cameron-s1' (via alias if seeded) + 'CNBC' → 'cnbc' + + Always lowercases and strips @ prefix. Alias resolution requires a conn + argument (not always available at parse time; merge-time writer passes it). + """ + if not handle: + return "" + h = handle.strip().lower().lstrip("@") + if conn is None: + return h + try: + row = conn.execute( + "SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,), + ).fetchone() + if row: + return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0] + except Exception: + # Alias table might not exist yet on pre-v24 DBs — degrade gracefully. + logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True) + return h + + +def classify_kind(handle: str) -> str: + """Return 'agent' for known Pentagon agents, 'person' otherwise. + + The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review, + not inferred here. Keeping heuristics narrow: we know our own agents; + everything else defaults to person until explicitly classified. + """ + h = handle.strip().lower().lstrip("@") + if h in PENTAGON_AGENTS: + return "agent" + return "person" + + # ─── Parse attribution from claim content ────────────────────────────────── diff --git a/lib/contributor.py b/lib/contributor.py index 5150c67..f1249cc 100644 --- a/lib/contributor.py +++ b/lib/contributor.py @@ -5,6 +5,7 @@ Extracted from merge.py (Phase 5 decomposition). Functions: - refine_commit_type: extract → challenge/enrich refinement from diff content - record_contributor_attribution: parse trailers + frontmatter, upsert contributors - upsert_contributor: insert/update contributor record with role counts +- insert_contribution_event: event-sourced credit log (schema v24) - recalculate_tier: tier promotion based on config rules """ @@ -13,11 +14,69 @@ import logging import re from . import config, db +from .attribution import classify_kind, normalize_handle from .forgejo import get_pr_diff logger = logging.getLogger("pipeline.contributor") +# ─── Event schema (v24) ─────────────────────────────────────────────────── + +# Role → CI weight, per Cory's confirmed schema (Apr 24 conversation). +# Humans-are-always-author rule: agents never accumulate author credit; +# evaluator (0.05) is the only agent-facing role. Internal agents still earn +# author/challenger/synthesizer on their own autonomous research PRs but +# surface in the kind='agent' leaderboard, not the default person view. +ROLE_WEIGHTS = { + "author": 0.30, + "challenger": 0.25, + "synthesizer": 0.20, + "originator": 0.15, + "evaluator": 0.05, +} + + +def insert_contribution_event( + conn, + handle: str, + role: str, + pr_number: int, + *, + claim_path: str | None = None, + domain: str | None = None, + channel: str | None = None, + timestamp: str | None = None, +) -> bool: + """Emit a contribution_events row. Idempotent via UNIQUE constraint. + + Returns True if the event was inserted, False if the constraint blocked it + (same handle/role/pr/claim_path combo already recorded — safe to replay). + + Canonicalizes handle via alias table. Classifies kind from handle. + Falls back silently if contribution_events table doesn't exist yet (pre-v24). + """ + if role not in ROLE_WEIGHTS: + logger.warning("insert_contribution_event: unknown role %r", role) + return False + weight = ROLE_WEIGHTS[role] + canonical = normalize_handle(handle, conn=conn) + if not canonical: + return False + kind = classify_kind(canonical) + try: + cur = conn.execute( + """INSERT OR IGNORE INTO contribution_events + (handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""", + (canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp), + ) + return cur.rowcount > 0 + except Exception: + logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r", + pr_number, canonical, role, exc_info=True) + return False + + def is_knowledge_pr(diff: str) -> bool: """Check if a PR touches knowledge files (claims, decisions, core, foundations). @@ -125,15 +184,88 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ return # Refine commit_type from diff content (branch prefix may be too broad) - row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone() + row = conn.execute( + "SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, " + "domain_verdict, domain_agent FROM prs WHERE number = ?", + (pr_number,), + ).fetchone() branch_type = row["commit_type"] if row and row["commit_type"] else "extract" refined_type = refine_commit_type(diff, branch_type) if refined_type != branch_type: conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number)) logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type) + # Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites. + pr_domain = row["domain"] if row else None + pr_channel = row["source_channel"] if row else None + pr_submitted_by = row["submitted_by"] if row else None + + # ── AUTHOR event (schema v24, double-write) ── + # Humans-are-always-author rule: the human in the loop gets author credit. + # Precedence: prs.submitted_by (set by extract.py from source proposed_by, or + # by discover for human PRs) → git author of first commit → branch-prefix agent. + # Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get + # author events from branch prefix; extract/ PRs carry submitted_by from the + # source's proposed_by field so the human who submitted gets credit via path 1. + author_candidate: str | None = None + if pr_submitted_by: + author_candidate = pr_submitted_by + else: + # External GitHub PRs: git author is the real submitter. + rc_author_head, author_head = await git_fn( + "log", f"origin/main..origin/{branch}", "--no-merges", + "--format=%an", "-1", timeout=5, + ) + if rc_author_head == 0 and author_head.strip(): + candidate = author_head.strip().lower() + if candidate and candidate not in {"teleo", "teleo-bot", "pipeline", + "github-actions[bot]", "forgejo-actions"}: + author_candidate = candidate + # Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc. + if not author_candidate and "/" in branch: + prefix = branch.split("/", 1)[0] + # Exclude pipeline-infrastructure prefixes — those are not authors. + if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"): + author_candidate = prefix + + if author_candidate: + insert_contribution_event( + conn, author_candidate, "author", pr_number, + claim_path=None, domain=pr_domain, channel=pr_channel, + ) + + # ── EVALUATOR events (schema v24) ── + # Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second + # reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when + # verdict is 'request_changes' — failed review isn't contribution credit. + if row: + if row["leo_verdict"] == "approve": + insert_contribution_event( + conn, "leo", "evaluator", pr_number, + claim_path=None, domain=pr_domain, channel=pr_channel, + ) + if row["domain_verdict"] == "approve" and row["domain_agent"]: + dagent = row["domain_agent"].strip().lower() + if dagent and dagent != "leo": # don't double-credit leo + insert_contribution_event( + conn, dagent, "evaluator", pr_number, + claim_path=None, domain=pr_domain, channel=pr_channel, + ) + # Parse Pentagon-Agent trailer from branch commit messages agents_found: set[str] = set() + # Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent + # challenger/synthesizer credit based on refined commit_type. Pipeline-owned + # branches (extract/*, reweave/*, etc.) don't — those are infra, not work. + _AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/", + "astra/", "oberon/") + is_agent_branch = branch.startswith(_AGENT_BRANCH_PREFIXES) + _TRAILER_EVENT_ROLE = { + "challenge": "challenger", + "enrich": "synthesizer", + "research": "synthesizer", + "reweave": "synthesizer", + } rc, log_output = await git_fn( "log", f"origin/main..origin/{branch}", "--format=%b%n%N", timeout=10, @@ -146,6 +278,14 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ upsert_contributor( conn, agent_name, agent_uuid, role, today, ) + # Event-emit only for agent-owned branches where the trailer's agent + # actually did the substantive work (challenger/synthesizer). + event_role = _TRAILER_EVENT_ROLE.get(refined_type) + if is_agent_branch and event_role: + insert_contribution_event( + conn, agent_name, event_role, pr_number, + claim_path=None, domain=pr_domain, channel=pr_channel, + ) agents_found.add(agent_name) # Parse attribution from NEWLY ADDED knowledge files via the canonical attribution @@ -175,6 +315,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ # already matches via the claim file. Widening requires Cory sign-off # since it would change leaderboard accounting (entity-only PRs → CI credit). knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/") + author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None for rel_path in files_output.strip().split("\n"): rel_path = rel_path.strip() if not rel_path.endswith(".md"): @@ -192,6 +333,20 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ upsert_contributor( conn, handle, entry.get("agent_id"), role, today, ) + # Event-emit: only 'sourcer' frontmatter entries become + # originator events. 'extractor' frontmatter = infrastructure + # (the Sonnet extraction agent), no event. challenger/ + # synthesizer frontmatter is extremely rare at extract time. + # Skip originator if same as author — avoids double-credit + # when someone submits their own content (self-authored). + if role == "sourcer": + origin_canonical = normalize_handle(handle, conn=conn) + if origin_canonical and origin_canonical != author_canonical: + insert_contribution_event( + conn, handle, "originator", pr_number, + claim_path=rel_path, + domain=pr_domain, channel=pr_channel, + ) # Fallback: if no Pentagon-Agent trailer found, try git commit authors _BOT_AUTHORS = frozenset({ diff --git a/lib/db.py b/lib/db.py index 7f86605..1d321b9 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 23 +SCHEMA_VERSION = 24 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -163,6 +163,50 @@ CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage); CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); + +-- Event-sourced contributions (schema v24). +-- One row per credit-earning event. Idempotent via two partial UNIQUE indexes +-- (SQLite treats NULL != NULL in UNIQUE constraints, so a single composite +-- UNIQUE with nullable claim_path would allow evaluator-event duplicates). +-- Leaderboards are SQL aggregations over this table; contributors becomes a materialized cache. +CREATE TABLE IF NOT EXISTS contribution_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + handle TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'person', + -- person | org | agent + role TEXT NOT NULL, + -- author | originator | challenger | synthesizer | evaluator + weight REAL NOT NULL, + pr_number INTEGER NOT NULL, + claim_path TEXT, + -- NULL for PR-level events (e.g. evaluator). Set for per-claim events. + domain TEXT, + channel TEXT, + -- telegram | github | agent | web | unknown + timestamp TEXT NOT NULL DEFAULT (datetime('now')) +); +-- Per-claim events: unique on (handle, role, pr_number, claim_path) when path IS NOT NULL. +CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events( + handle, role, pr_number, claim_path +) WHERE claim_path IS NOT NULL; +-- PR-level events (evaluator, author, trailer-based): unique on (handle, role, pr_number) when path IS NULL. +CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events( + handle, role, pr_number +) WHERE claim_path IS NULL; +CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp); +CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp); +CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number); +CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp); +CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp); + +-- Handle aliasing. @thesensatore → thesensatore. cameron → cameron-s1. +-- Writers call resolve_alias(handle) before inserting events or upserting contributors. +CREATE TABLE IF NOT EXISTS contributor_aliases ( + alias TEXT PRIMARY KEY, + canonical TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) +); +CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical); """ @@ -641,6 +685,70 @@ def migrate(conn: sqlite3.Connection): conn.commit() logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup") + if current < 24: + # Event-sourced contributions table + alias table + kind column on contributors. + # Non-breaking: contributors table stays; events are written in addition via + # double-write in merge.py. Leaderboards switch to events in Phase B. + conn.executescript(""" + CREATE TABLE IF NOT EXISTS contribution_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + handle TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'person', + role TEXT NOT NULL, + weight REAL NOT NULL, + pr_number INTEGER NOT NULL, + claim_path TEXT, + domain TEXT, + channel TEXT, + timestamp TEXT NOT NULL DEFAULT (datetime('now')) + ); + -- Partial unique indexes handle SQLite's NULL != NULL UNIQUE semantics. + -- Per-claim events dedup on 4-tuple; PR-level events dedup on 3-tuple. + CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events( + handle, role, pr_number, claim_path + ) WHERE claim_path IS NOT NULL; + CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events( + handle, role, pr_number + ) WHERE claim_path IS NULL; + CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp); + CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp); + CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number); + CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp); + CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp); + + CREATE TABLE IF NOT EXISTS contributor_aliases ( + alias TEXT PRIMARY KEY, + canonical TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical); + """) + try: + conn.execute("ALTER TABLE contributors ADD COLUMN kind TEXT DEFAULT 'person'") + except sqlite3.OperationalError: + pass # column already exists + # Seed known aliases. @thesensatore → thesensatore catches the zombie row Argus flagged. + # cameron → cameron-s1 reconciles the Leo-flagged missing contributor. + conn.executemany( + "INSERT OR IGNORE INTO contributor_aliases (alias, canonical) VALUES (?, ?)", + [ + ("@thesensatore", "thesensatore"), + ("cameron", "cameron-s1"), + ], + ) + # Seed kind='agent' for known Pentagon agents so the events writer picks it up. + pentagon_agents = [ + "rio", "leo", "theseus", "vida", "clay", "astra", + "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", + ] + for agent in pentagon_agents: + conn.execute( + "UPDATE contributors SET kind = 'agent' WHERE handle = ?", + (agent,), + ) + conn.commit() + logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", diff --git a/scripts/backfill-events.py b/scripts/backfill-events.py new file mode 100644 index 0000000..84646b9 --- /dev/null +++ b/scripts/backfill-events.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree. + +For each merged PR: + - Derive author from prs.submitted_by → git author → branch prefix + - Emit author event (role=author, weight=0.30, claim_path=NULL) + - For each claim file under a knowledge prefix, parse frontmatter and emit + originator events for sourcer entries that differ from the author + - Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent + (when domain_verdict='approve' and not Leo) + - Emit challenger/synthesizer events for Pentagon-Agent trailers on + agent-owned branches (theseus/*, rio/*, etc.) based on commit_type + +Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run. + +Usage: + python3 scripts/backfill-events.py --dry-run # Count events without writing + python3 scripts/backfill-events.py # Apply + +Runs read-only against the git worktree; only writes to pipeline.db. +""" +import argparse +import os +import re +import sqlite3 +import subprocess +import sys +from collections import Counter +from pathlib import Path + +DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db") +REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main") + +# Role weights — must match lib/contributor.py ROLE_WEIGHTS. +ROLE_WEIGHTS = { + "author": 0.30, + "challenger": 0.25, + "synthesizer": 0.20, + "originator": 0.15, + "evaluator": 0.05, +} + +PENTAGON_AGENTS = frozenset({ + "rio", "leo", "theseus", "vida", "clay", "astra", + "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", + "pipeline", +}) + +AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/", + "astra/", "oberon/") + +TRAILER_EVENT_ROLE = { + "challenge": "challenger", + "enrich": "synthesizer", + "research": "synthesizer", + "reweave": "synthesizer", +} + +KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/") + +BOT_AUTHORS = frozenset({ + "teleo", "teleo-bot", "pipeline", + "github-actions[bot]", "forgejo-actions", +}) + + +def normalize_handle(conn: sqlite3.Connection, handle: str) -> str: + if not handle: + return "" + h = handle.strip().lower().lstrip("@") + row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone() + if row: + return row[0] + return h + + +def classify_kind(handle: str) -> str: + h = handle.strip().lower().lstrip("@") + return "agent" if h in PENTAGON_AGENTS else "person" + + +def parse_frontmatter(text: str): + """Minimal YAML frontmatter parser using PyYAML when available.""" + if not text.startswith("---"): + return None + end = text.find("---", 3) + if end == -1: + return None + raw = text[3:end] + try: + import yaml + fm = yaml.safe_load(raw) + return fm if isinstance(fm, dict) else None + except ImportError: + return None + except Exception: + return None + + +def extract_sourcers_from_file(path: Path) -> list[str]: + """Return the sourcer handles from a claim file's frontmatter. + + Matches three formats: + 1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }` + 2. Bare-key flat: `sourcer: alexastrum` + 3. Prefix-keyed: `attribution_sourcer: alexastrum` + """ + try: + content = path.read_text(encoding="utf-8") + except (FileNotFoundError, PermissionError, UnicodeDecodeError): + return [] + fm = parse_frontmatter(content) + if not fm: + return [] + + handles: list[str] = [] + + attr = fm.get("attribution") + if isinstance(attr, dict): + entries = attr.get("sourcer", []) + if isinstance(entries, list): + for e in entries: + if isinstance(e, dict) and "handle" in e: + handles.append(e["handle"]) + elif isinstance(e, str): + handles.append(e) + elif isinstance(entries, str): + handles.append(entries) + return handles + + flat = fm.get("attribution_sourcer") + if flat: + if isinstance(flat, str): + handles.append(flat) + elif isinstance(flat, list): + handles.extend(v for v in flat if isinstance(v, str)) + if handles: + return handles + + bare = fm.get("sourcer") + if bare: + if isinstance(bare, str): + handles.append(bare) + elif isinstance(bare, list): + handles.extend(v for v in bare if isinstance(v, str)) + + return handles + + +_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$") + + +def valid_handle(h: str) -> bool: + if not h: + return False + lower = h.strip().lower().lstrip("@") + if lower.endswith("-") or lower.endswith("_"): + return False + return bool(_HANDLE_RE.match(lower)) + + +def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str: + """Run a git command, return stdout. Returns empty string on failure.""" + try: + result = subprocess.run( + ["git", *args], + cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False, + ) + return result.stdout + except (subprocess.TimeoutExpired, OSError): + return "" + + +def git_first_commit_author(pr_branch: str, merged_at: str) -> str: + """Best-effort: find git author of first non-merge commit on the branch. + + PR branches are usually deleted after merge. We fall back to scanning main + commits around merged_at for commits matching the branch slug. + """ + # Post-merge branches are cleaned up. For the backfill, we accept that this + # path rarely yields results and rely on submitted_by + branch prefix. + return "" + + +def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None: + """Author precedence: submitted_by → branch-prefix agent for agent-owned branches.""" + if pr.get("submitted_by"): + cand = pr["submitted_by"].strip().lower().lstrip("@") + if cand and cand not in BOT_AUTHORS: + return cand + branch = pr.get("branch") or "" + if "/" in branch: + prefix = branch.split("/", 1)[0].lower() + if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"): + return prefix + return None + + +def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp): + canonical = normalize_handle(conn, handle) + if not valid_handle(canonical): + return + kind = classify_kind(canonical) + weight = ROLE_WEIGHTS[role] + counts[(role, "attempt")] += 1 + if dry_run: + counts[(role, "would_insert")] += 1 + return + cur = conn.execute( + """INSERT OR IGNORE INTO contribution_events + (handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""", + (canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp), + ) + if cur.rowcount > 0: + counts[(role, "inserted")] += 1 + else: + counts[(role, "skipped_dup")] += 1 + + +def files_added_in_pr(pr_number: int, branch: str) -> list[str]: + """Best-effort: list added .md files in the PR. + + Uses prs.source_path as a fallback signal (the claim being added). If the + branch no longer exists post-merge, this will return []; we accept the loss + for historical PRs where the granular per-claim events can't be recovered — + PR-level author/evaluator events still land correctly. + """ + # Post-merge PR branches are deleted from Forgejo so we can't diff them. + # For the backfill we use prs.source_path — for extract/* PRs this points to + # the source inbox file; we can glob the claim files from the extract branch + # commit on main. But main's commits don't track which files a given PR touched. + # Accept the loss: backfill emits only PR-level events (author, evaluator, + # challenger/synthesizer). Originator events come from parsing claim files + # attributed to the branch via description field which lists claim titles. + return [] + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)") + args = parser.parse_args() + + if not Path(DB_PATH).exists(): + print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr) + sys.exit(1) + + conn = sqlite3.connect(DB_PATH, timeout=30) + conn.row_factory = sqlite3.Row + + # Sanity: contribution_events exists (v24 migration applied) + try: + conn.execute("SELECT 1 FROM contribution_events LIMIT 1") + except sqlite3.OperationalError: + print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr) + sys.exit(2) + + # Walk all merged knowledge PRs + query = """ + SELECT number, branch, domain, source_channel, submitted_by, + leo_verdict, domain_verdict, domain_agent, + commit_type, merged_at + FROM prs + WHERE status = 'merged' + ORDER BY merged_at ASC + """ + if args.limit: + query += f" LIMIT {args.limit}" + prs = conn.execute(query).fetchall() + print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...") + + counts: Counter = Counter() + repo = Path(REPO_DIR) + + for pr in prs: + pr_number = pr["number"] + branch = pr["branch"] or "" + domain = pr["domain"] + channel = pr["source_channel"] + merged_at = pr["merged_at"] + + # Skip pipeline-only branches for author credit (extract/*, reweave/*, + # fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But + # evaluator credit for Leo/domain_agent still applies. + is_pipeline_branch = branch.startswith(( + "extract/", "reweave/", "fix/", "ingestion/", "epimetheus/", + )) + + # ── AUTHOR ── + # For pipeline branches, submitted_by carries the real author (the + # human who submitted the source via Telegram/etc). For agent branches, + # the agent is author. For external branches (gh-pr-*), git author is + # in submitted_by from the sync-mirror pipeline. + author = derive_author(conn, dict(pr)) + if author: + emit(conn, counts, args.dry_run, author, "author", pr_number, + None, domain, channel, merged_at) + + # ── EVALUATOR ── + if pr["leo_verdict"] == "approve": + emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number, + None, domain, channel, merged_at) + if pr["domain_verdict"] == "approve" and pr["domain_agent"]: + dagent = pr["domain_agent"].strip().lower() + if dagent and dagent != "leo": + emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number, + None, domain, channel, merged_at) + + # ── CHALLENGER / SYNTHESIZER from branch+commit_type ── + # Only fires on agent-owned branches. Pipeline branches aren't creditable + # work (they're machine extraction, evaluator already captures the review). + if branch.startswith(AGENT_BRANCH_PREFIXES): + prefix = branch.split("/", 1)[0].lower() + event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "") + if event_role: + emit(conn, counts, args.dry_run, prefix, event_role, pr_number, + None, domain, channel, merged_at) + + # ── ORIGINATOR per claim ── + # Walk claim files currently on main whose content was added in this PR. + # We can't diff old branches (deleted post-merge), but for extract PRs + # the source_path + description carry claim titles — too lossy to build + # per-claim events reliably. Strategy: walk ALL claim files that have a + # sourcer in their frontmatter and assign them to the PR whose + # source_path matches (via description or filename heuristic). + # DEFERRED: per-claim originator events require branch introspection + # that fails on deleted branches. Backfill emits PR-level events only. + # Forward traffic (post-deploy) gets per-claim originator events via + # record_contributor_attribution's added-files walk. + + if not args.dry_run: + conn.commit() + + print("\n=== Summary ===") + for role in ("author", "originator", "challenger", "synthesizer", "evaluator"): + att = counts[(role, "attempt")] + if args.dry_run: + wi = counts[(role, "would_insert")] + print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}") + else: + ins = counts[(role, "inserted")] + skip = counts[(role, "skipped_dup")] + print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") + + if not args.dry_run: + total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0] + print(f"\nTotal contribution_events rows: {total}") + + # ── Per-claim originator pass ── + # Separate pass: walk the current knowledge tree, parse sourcer frontmatter, + # and attach each claim to the merging PR via a claim_path → pr_number map + # built from prs.description (pipe-separated claim titles). Imperfect — some + # PRs have NULL description or mismatched titles — but recovers the bulk of + # historical originator credit. + print("\n=== Claim-level originator pass ===") + # Build title → pr_number map from prs.description + title_to_pr: dict[str, int] = {} + for r in conn.execute( + "SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''" + ).fetchall(): + desc = r["description"] or "" + for title in desc.split(" | "): + title = title.strip() + if title: + # Last-writer wins. Conflicts are rare (titles unique in practice). + title_to_pr[title.lower()] = r["number"] + + claim_counts = Counter() + claim_count = 0 + originator_count = 0 + for md in sorted(repo.glob("domains/**/*.md")) + \ + sorted(repo.glob("core/**/*.md")) + \ + sorted(repo.glob("foundations/**/*.md")) + \ + sorted(repo.glob("decisions/**/*.md")): + rel = str(md.relative_to(repo)) + # Match via filename stem (with spaces and hyphens) against description titles + stem = md.stem + # Multiple matching strategies + pr_number = title_to_pr.get(stem.lower()) + if not pr_number: + # Hyphenated slug → space variant + pr_number = title_to_pr.get(stem.replace("-", " ").lower()) + if not pr_number: + claim_counts["no_pr_match"] += 1 + continue + + sourcers = extract_sourcers_from_file(md) + if not sourcers: + claim_counts["no_sourcer"] += 1 + continue + + claim_count += 1 + # Look up author for this PR to skip self-credit + pr_row = conn.execute( + "SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?", + (pr_number,), + ).fetchone() + if not pr_row: + continue + author = derive_author(conn, dict(pr_row)) + author_canonical = normalize_handle(conn, author) if author else None + + for src_handle in sourcers: + src_canonical = normalize_handle(conn, src_handle) + if not valid_handle(src_canonical): + claim_counts["invalid_handle"] += 1 + continue + if src_canonical == author_canonical: + claim_counts["skip_self"] += 1 + continue + emit(conn, counts, args.dry_run, src_handle, "originator", pr_number, + rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"]) + originator_count += 1 + + if not args.dry_run: + conn.commit() + + print(f" Claims processed: {claim_count}") + print(f" Originator events emitted: {originator_count}") + print(f" Breakdown: {dict(claim_counts)}") + + final_origin_attempted = counts[("originator", "attempt")] + if args.dry_run: + print(f" (dry-run) originator would_insert={counts[('originator', 'would_insert')]}") + else: + print(f" originator inserted={counts[('originator', 'inserted')]} skipped_dup={counts[('originator', 'skipped_dup')]}") + + +if __name__ == "__main__": + main()