feat(attribution): Phase A — event-sourced contribution ledger (schema v24)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Some checks are pending
CI / lint-and-test (push) Waiting to run
Introduces contribution_events table + non-breaking double-write. Schema
lands today, forward traffic writes events alongside existing count upserts,
backfill script replays history. Phase B will add leaderboard API reading
from events; Phase C switches Argus dashboard over.
## Schema v24 (lib/db.py)
- contribution_events: one row per credit-earning event
(id, handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
Partial UNIQUE indexes handle SQLite's NULL != NULL semantics:
idx_ce_unique_claim on (handle, role, pr_number, claim_path) WHERE claim_path NOT NULL
idx_ce_unique_pr on (handle, role, pr_number) WHERE claim_path IS NULL
PR-level events (evaluator, author, challenger, synthesizer) dedup on 3-tuple.
Per-claim events (originator) dedup on 4-tuple. Idempotent on replay.
- contributor_aliases: canonical handle mapping
Seeded: @thesensatore → thesensatore, cameron → cameron-s1
- contributors.kind TEXT DEFAULT 'person'
Migration seeds 'agent' for known Pentagon agent handles.
## Role model (confirmed by Cory Apr 24)
Weights: author 0.30, challenger 0.25, synthesizer 0.20, originator 0.15, evaluator 0.05
- author: human who submitted the PR (curation + submission work)
- originator: person who authored the underlying content (rewards external creators)
- challenger: agent/person who brought a productive disagreement
- synthesizer: cross-domain work (enrichments, research sessions)
- evaluator: reviewer who approved (Leo + domain agent)
Humans-are-always-author: agents credit is capped at evaluator/synthesizer/
challenger. Pentagon agents classify as kind='agent' and surface in the
agent-view leaderboard, not the default person view.
## Writer (lib/contributor.py)
- New insert_contribution_event(): idempotent INSERT OR IGNORE with alias
normalization + kind classification. Falls back silently on pre-v24 DBs.
- record_contributor_attribution double-writes alongside existing
upsert_contributor calls. Zero risk to current dashboard.
- Author event: emitted once per PR from prs.submitted_by → git author →
agent-branch-prefix.
- Originator events: emitted per claim from frontmatter sourcer, skipping
when sourcer == author (avoids self-credit double-count).
- Evaluator events: Leo (always when leo_verdict='approve') + domain_agent
(when domain_verdict='approve' and not Leo).
- Challenger/Synthesizer: emitted from Pentagon-Agent trailer on
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type.
Pipeline-owned branches (extract/*, reweave/*) get no trailer-based event —
infrastructure work isn't contribution credit.
## Helpers (lib/attribution.py)
- normalize_handle(raw, conn=None): lowercase + strip @ + alias lookup
- classify_kind(handle): returns 'agent' for PENTAGON_AGENTS, else 'person'
Intentionally narrow. Orgs get classified by operator review, not heuristics.
## Backfill (scripts/backfill-events.py)
Replays all merged PRs into events. Idempotent (safe to re-run). Emits:
- PR-level: author, evaluator, challenger, synthesizer
- Per-claim: originator (walks knowledge tree, matches via description titles)
Known limitation: post-merge PR branches are deleted from Forgejo, so we
can't diff them for granular per-claim events. Claim→PR mapping uses
prs.description (pipe-separated titles). Misses some edge cases but
recovers the bulk of historical originator credit. Forward traffic gets
clean per-claim events via the normal record_contributor_attribution path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
93917f9fc2
commit
58fa8c5276
4 changed files with 748 additions and 2 deletions
|
|
@ -48,6 +48,58 @@ def _filter_valid_handles(result: dict) -> dict:
|
|||
return filtered
|
||||
|
||||
|
||||
# ─── Handle normalization + kind classification (schema v24) ──────────────
|
||||
|
||||
# Known Pentagon agents. Used to classify contributor kind='agent' so the
|
||||
# leaderboard can filter them out of the default person view.
|
||||
PENTAGON_AGENTS = frozenset({
|
||||
"rio", "leo", "theseus", "vida", "clay", "astra",
|
||||
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
||||
"pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*)
|
||||
})
|
||||
|
||||
|
||||
def normalize_handle(handle: str, conn=None) -> str:
|
||||
"""Canonicalize a handle: lowercase, strip @, resolve alias if conn provided.
|
||||
|
||||
Examples:
|
||||
'@thesensatore' → 'thesensatore'
|
||||
'Cameron' → 'cameron' → 'cameron-s1' (via alias if seeded)
|
||||
'CNBC' → 'cnbc'
|
||||
|
||||
Always lowercases and strips @ prefix. Alias resolution requires a conn
|
||||
argument (not always available at parse time; merge-time writer passes it).
|
||||
"""
|
||||
if not handle:
|
||||
return ""
|
||||
h = handle.strip().lower().lstrip("@")
|
||||
if conn is None:
|
||||
return h
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,),
|
||||
).fetchone()
|
||||
if row:
|
||||
return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0]
|
||||
except Exception:
|
||||
# Alias table might not exist yet on pre-v24 DBs — degrade gracefully.
|
||||
logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True)
|
||||
return h
|
||||
|
||||
|
||||
def classify_kind(handle: str) -> str:
|
||||
"""Return 'agent' for known Pentagon agents, 'person' otherwise.
|
||||
|
||||
The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review,
|
||||
not inferred here. Keeping heuristics narrow: we know our own agents;
|
||||
everything else defaults to person until explicitly classified.
|
||||
"""
|
||||
h = handle.strip().lower().lstrip("@")
|
||||
if h in PENTAGON_AGENTS:
|
||||
return "agent"
|
||||
return "person"
|
||||
|
||||
|
||||
# ─── Parse attribution from claim content ──────────────────────────────────
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ Extracted from merge.py (Phase 5 decomposition). Functions:
|
|||
- refine_commit_type: extract → challenge/enrich refinement from diff content
|
||||
- record_contributor_attribution: parse trailers + frontmatter, upsert contributors
|
||||
- upsert_contributor: insert/update contributor record with role counts
|
||||
- insert_contribution_event: event-sourced credit log (schema v24)
|
||||
- recalculate_tier: tier promotion based on config rules
|
||||
"""
|
||||
|
||||
|
|
@ -13,11 +14,69 @@ import logging
|
|||
import re
|
||||
|
||||
from . import config, db
|
||||
from .attribution import classify_kind, normalize_handle
|
||||
from .forgejo import get_pr_diff
|
||||
|
||||
logger = logging.getLogger("pipeline.contributor")
|
||||
|
||||
|
||||
# ─── Event schema (v24) ───────────────────────────────────────────────────
|
||||
|
||||
# Role → CI weight, per Cory's confirmed schema (Apr 24 conversation).
|
||||
# Humans-are-always-author rule: agents never accumulate author credit;
|
||||
# evaluator (0.05) is the only agent-facing role. Internal agents still earn
|
||||
# author/challenger/synthesizer on their own autonomous research PRs but
|
||||
# surface in the kind='agent' leaderboard, not the default person view.
|
||||
ROLE_WEIGHTS = {
|
||||
"author": 0.30,
|
||||
"challenger": 0.25,
|
||||
"synthesizer": 0.20,
|
||||
"originator": 0.15,
|
||||
"evaluator": 0.05,
|
||||
}
|
||||
|
||||
|
||||
def insert_contribution_event(
|
||||
conn,
|
||||
handle: str,
|
||||
role: str,
|
||||
pr_number: int,
|
||||
*,
|
||||
claim_path: str | None = None,
|
||||
domain: str | None = None,
|
||||
channel: str | None = None,
|
||||
timestamp: str | None = None,
|
||||
) -> bool:
|
||||
"""Emit a contribution_events row. Idempotent via UNIQUE constraint.
|
||||
|
||||
Returns True if the event was inserted, False if the constraint blocked it
|
||||
(same handle/role/pr/claim_path combo already recorded — safe to replay).
|
||||
|
||||
Canonicalizes handle via alias table. Classifies kind from handle.
|
||||
Falls back silently if contribution_events table doesn't exist yet (pre-v24).
|
||||
"""
|
||||
if role not in ROLE_WEIGHTS:
|
||||
logger.warning("insert_contribution_event: unknown role %r", role)
|
||||
return False
|
||||
weight = ROLE_WEIGHTS[role]
|
||||
canonical = normalize_handle(handle, conn=conn)
|
||||
if not canonical:
|
||||
return False
|
||||
kind = classify_kind(canonical)
|
||||
try:
|
||||
cur = conn.execute(
|
||||
"""INSERT OR IGNORE INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
|
||||
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
|
||||
)
|
||||
return cur.rowcount > 0
|
||||
except Exception:
|
||||
logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r",
|
||||
pr_number, canonical, role, exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
def is_knowledge_pr(diff: str) -> bool:
|
||||
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
|
||||
|
||||
|
|
@ -125,15 +184,88 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
|||
return
|
||||
|
||||
# Refine commit_type from diff content (branch prefix may be too broad)
|
||||
row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
||||
row = conn.execute(
|
||||
"SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, "
|
||||
"domain_verdict, domain_agent FROM prs WHERE number = ?",
|
||||
(pr_number,),
|
||||
).fetchone()
|
||||
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
|
||||
refined_type = refine_commit_type(diff, branch_type)
|
||||
if refined_type != branch_type:
|
||||
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
|
||||
logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type)
|
||||
|
||||
# Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites.
|
||||
pr_domain = row["domain"] if row else None
|
||||
pr_channel = row["source_channel"] if row else None
|
||||
pr_submitted_by = row["submitted_by"] if row else None
|
||||
|
||||
# ── AUTHOR event (schema v24, double-write) ──
|
||||
# Humans-are-always-author rule: the human in the loop gets author credit.
|
||||
# Precedence: prs.submitted_by (set by extract.py from source proposed_by, or
|
||||
# by discover for human PRs) → git author of first commit → branch-prefix agent.
|
||||
# Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get
|
||||
# author events from branch prefix; extract/ PRs carry submitted_by from the
|
||||
# source's proposed_by field so the human who submitted gets credit via path 1.
|
||||
author_candidate: str | None = None
|
||||
if pr_submitted_by:
|
||||
author_candidate = pr_submitted_by
|
||||
else:
|
||||
# External GitHub PRs: git author is the real submitter.
|
||||
rc_author_head, author_head = await git_fn(
|
||||
"log", f"origin/main..origin/{branch}", "--no-merges",
|
||||
"--format=%an", "-1", timeout=5,
|
||||
)
|
||||
if rc_author_head == 0 and author_head.strip():
|
||||
candidate = author_head.strip().lower()
|
||||
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
|
||||
"github-actions[bot]", "forgejo-actions"}:
|
||||
author_candidate = candidate
|
||||
# Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc.
|
||||
if not author_candidate and "/" in branch:
|
||||
prefix = branch.split("/", 1)[0]
|
||||
# Exclude pipeline-infrastructure prefixes — those are not authors.
|
||||
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
|
||||
author_candidate = prefix
|
||||
|
||||
if author_candidate:
|
||||
insert_contribution_event(
|
||||
conn, author_candidate, "author", pr_number,
|
||||
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||
)
|
||||
|
||||
# ── EVALUATOR events (schema v24) ──
|
||||
# Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second
|
||||
# reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when
|
||||
# verdict is 'request_changes' — failed review isn't contribution credit.
|
||||
if row:
|
||||
if row["leo_verdict"] == "approve":
|
||||
insert_contribution_event(
|
||||
conn, "leo", "evaluator", pr_number,
|
||||
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||
)
|
||||
if row["domain_verdict"] == "approve" and row["domain_agent"]:
|
||||
dagent = row["domain_agent"].strip().lower()
|
||||
if dagent and dagent != "leo": # don't double-credit leo
|
||||
insert_contribution_event(
|
||||
conn, dagent, "evaluator", pr_number,
|
||||
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||
)
|
||||
|
||||
# Parse Pentagon-Agent trailer from branch commit messages
|
||||
agents_found: set[str] = set()
|
||||
# Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent
|
||||
# challenger/synthesizer credit based on refined commit_type. Pipeline-owned
|
||||
# branches (extract/*, reweave/*, etc.) don't — those are infra, not work.
|
||||
_AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
|
||||
"astra/", "oberon/")
|
||||
is_agent_branch = branch.startswith(_AGENT_BRANCH_PREFIXES)
|
||||
_TRAILER_EVENT_ROLE = {
|
||||
"challenge": "challenger",
|
||||
"enrich": "synthesizer",
|
||||
"research": "synthesizer",
|
||||
"reweave": "synthesizer",
|
||||
}
|
||||
rc, log_output = await git_fn(
|
||||
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
|
||||
timeout=10,
|
||||
|
|
@ -146,6 +278,14 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
|||
upsert_contributor(
|
||||
conn, agent_name, agent_uuid, role, today,
|
||||
)
|
||||
# Event-emit only for agent-owned branches where the trailer's agent
|
||||
# actually did the substantive work (challenger/synthesizer).
|
||||
event_role = _TRAILER_EVENT_ROLE.get(refined_type)
|
||||
if is_agent_branch and event_role:
|
||||
insert_contribution_event(
|
||||
conn, agent_name, event_role, pr_number,
|
||||
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||
)
|
||||
agents_found.add(agent_name)
|
||||
|
||||
# Parse attribution from NEWLY ADDED knowledge files via the canonical attribution
|
||||
|
|
@ -175,6 +315,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
|||
# already matches via the claim file. Widening requires Cory sign-off
|
||||
# since it would change leaderboard accounting (entity-only PRs → CI credit).
|
||||
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
|
||||
author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None
|
||||
for rel_path in files_output.strip().split("\n"):
|
||||
rel_path = rel_path.strip()
|
||||
if not rel_path.endswith(".md"):
|
||||
|
|
@ -192,6 +333,20 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
|||
upsert_contributor(
|
||||
conn, handle, entry.get("agent_id"), role, today,
|
||||
)
|
||||
# Event-emit: only 'sourcer' frontmatter entries become
|
||||
# originator events. 'extractor' frontmatter = infrastructure
|
||||
# (the Sonnet extraction agent), no event. challenger/
|
||||
# synthesizer frontmatter is extremely rare at extract time.
|
||||
# Skip originator if same as author — avoids double-credit
|
||||
# when someone submits their own content (self-authored).
|
||||
if role == "sourcer":
|
||||
origin_canonical = normalize_handle(handle, conn=conn)
|
||||
if origin_canonical and origin_canonical != author_canonical:
|
||||
insert_contribution_event(
|
||||
conn, handle, "originator", pr_number,
|
||||
claim_path=rel_path,
|
||||
domain=pr_domain, channel=pr_channel,
|
||||
)
|
||||
|
||||
# Fallback: if no Pentagon-Agent trailer found, try git commit authors
|
||||
_BOT_AUTHORS = frozenset({
|
||||
|
|
|
|||
110
lib/db.py
110
lib/db.py
|
|
@ -9,7 +9,7 @@ from . import config
|
|||
|
||||
logger = logging.getLogger("pipeline.db")
|
||||
|
||||
SCHEMA_VERSION = 23
|
||||
SCHEMA_VERSION = 24
|
||||
|
||||
SCHEMA_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
|
|
@ -163,6 +163,50 @@ CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
|
|||
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
||||
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
||||
|
||||
-- Event-sourced contributions (schema v24).
|
||||
-- One row per credit-earning event. Idempotent via two partial UNIQUE indexes
|
||||
-- (SQLite treats NULL != NULL in UNIQUE constraints, so a single composite
|
||||
-- UNIQUE with nullable claim_path would allow evaluator-event duplicates).
|
||||
-- Leaderboards are SQL aggregations over this table; contributors becomes a materialized cache.
|
||||
CREATE TABLE IF NOT EXISTS contribution_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
handle TEXT NOT NULL,
|
||||
kind TEXT NOT NULL DEFAULT 'person',
|
||||
-- person | org | agent
|
||||
role TEXT NOT NULL,
|
||||
-- author | originator | challenger | synthesizer | evaluator
|
||||
weight REAL NOT NULL,
|
||||
pr_number INTEGER NOT NULL,
|
||||
claim_path TEXT,
|
||||
-- NULL for PR-level events (e.g. evaluator). Set for per-claim events.
|
||||
domain TEXT,
|
||||
channel TEXT,
|
||||
-- telegram | github | agent | web | unknown
|
||||
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
-- Per-claim events: unique on (handle, role, pr_number, claim_path) when path IS NOT NULL.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
|
||||
handle, role, pr_number, claim_path
|
||||
) WHERE claim_path IS NOT NULL;
|
||||
-- PR-level events (evaluator, author, trailer-based): unique on (handle, role, pr_number) when path IS NULL.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
|
||||
handle, role, pr_number
|
||||
) WHERE claim_path IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
|
||||
|
||||
-- Handle aliasing. @thesensatore → thesensatore. cameron → cameron-s1.
|
||||
-- Writers call resolve_alias(handle) before inserting events or upserting contributors.
|
||||
CREATE TABLE IF NOT EXISTS contributor_aliases (
|
||||
alias TEXT PRIMARY KEY,
|
||||
canonical TEXT NOT NULL,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
|
||||
"""
|
||||
|
||||
|
||||
|
|
@ -641,6 +685,70 @@ def migrate(conn: sqlite3.Connection):
|
|||
conn.commit()
|
||||
logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup")
|
||||
|
||||
if current < 24:
|
||||
# Event-sourced contributions table + alias table + kind column on contributors.
|
||||
# Non-breaking: contributors table stays; events are written in addition via
|
||||
# double-write in merge.py. Leaderboards switch to events in Phase B.
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS contribution_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
handle TEXT NOT NULL,
|
||||
kind TEXT NOT NULL DEFAULT 'person',
|
||||
role TEXT NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
pr_number INTEGER NOT NULL,
|
||||
claim_path TEXT,
|
||||
domain TEXT,
|
||||
channel TEXT,
|
||||
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
-- Partial unique indexes handle SQLite's NULL != NULL UNIQUE semantics.
|
||||
-- Per-claim events dedup on 4-tuple; PR-level events dedup on 3-tuple.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
|
||||
handle, role, pr_number, claim_path
|
||||
) WHERE claim_path IS NOT NULL;
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
|
||||
handle, role, pr_number
|
||||
) WHERE claim_path IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS contributor_aliases (
|
||||
alias TEXT PRIMARY KEY,
|
||||
canonical TEXT NOT NULL,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
|
||||
""")
|
||||
try:
|
||||
conn.execute("ALTER TABLE contributors ADD COLUMN kind TEXT DEFAULT 'person'")
|
||||
except sqlite3.OperationalError:
|
||||
pass # column already exists
|
||||
# Seed known aliases. @thesensatore → thesensatore catches the zombie row Argus flagged.
|
||||
# cameron → cameron-s1 reconciles the Leo-flagged missing contributor.
|
||||
conn.executemany(
|
||||
"INSERT OR IGNORE INTO contributor_aliases (alias, canonical) VALUES (?, ?)",
|
||||
[
|
||||
("@thesensatore", "thesensatore"),
|
||||
("cameron", "cameron-s1"),
|
||||
],
|
||||
)
|
||||
# Seed kind='agent' for known Pentagon agents so the events writer picks it up.
|
||||
pentagon_agents = [
|
||||
"rio", "leo", "theseus", "vida", "clay", "astra",
|
||||
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
||||
]
|
||||
for agent in pentagon_agents:
|
||||
conn.execute(
|
||||
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
|
||||
(agent,),
|
||||
)
|
||||
conn.commit()
|
||||
logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column")
|
||||
|
||||
if current < SCHEMA_VERSION:
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
|
||||
|
|
|
|||
431
scripts/backfill-events.py
Normal file
431
scripts/backfill-events.py
Normal file
|
|
@ -0,0 +1,431 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
|
||||
|
||||
For each merged PR:
|
||||
- Derive author from prs.submitted_by → git author → branch prefix
|
||||
- Emit author event (role=author, weight=0.30, claim_path=NULL)
|
||||
- For each claim file under a knowledge prefix, parse frontmatter and emit
|
||||
originator events for sourcer entries that differ from the author
|
||||
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
|
||||
(when domain_verdict='approve' and not Leo)
|
||||
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
|
||||
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
|
||||
|
||||
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
|
||||
|
||||
Usage:
|
||||
python3 scripts/backfill-events.py --dry-run # Count events without writing
|
||||
python3 scripts/backfill-events.py # Apply
|
||||
|
||||
Runs read-only against the git worktree; only writes to pipeline.db.
|
||||
"""
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import Counter
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
||||
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
|
||||
|
||||
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
|
||||
ROLE_WEIGHTS = {
|
||||
"author": 0.30,
|
||||
"challenger": 0.25,
|
||||
"synthesizer": 0.20,
|
||||
"originator": 0.15,
|
||||
"evaluator": 0.05,
|
||||
}
|
||||
|
||||
PENTAGON_AGENTS = frozenset({
|
||||
"rio", "leo", "theseus", "vida", "clay", "astra",
|
||||
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
||||
"pipeline",
|
||||
})
|
||||
|
||||
AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
|
||||
"astra/", "oberon/")
|
||||
|
||||
TRAILER_EVENT_ROLE = {
|
||||
"challenge": "challenger",
|
||||
"enrich": "synthesizer",
|
||||
"research": "synthesizer",
|
||||
"reweave": "synthesizer",
|
||||
}
|
||||
|
||||
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
|
||||
|
||||
BOT_AUTHORS = frozenset({
|
||||
"teleo", "teleo-bot", "pipeline",
|
||||
"github-actions[bot]", "forgejo-actions",
|
||||
})
|
||||
|
||||
|
||||
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
|
||||
if not handle:
|
||||
return ""
|
||||
h = handle.strip().lower().lstrip("@")
|
||||
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
return h
|
||||
|
||||
|
||||
def classify_kind(handle: str) -> str:
|
||||
h = handle.strip().lower().lstrip("@")
|
||||
return "agent" if h in PENTAGON_AGENTS else "person"
|
||||
|
||||
|
||||
def parse_frontmatter(text: str):
|
||||
"""Minimal YAML frontmatter parser using PyYAML when available."""
|
||||
if not text.startswith("---"):
|
||||
return None
|
||||
end = text.find("---", 3)
|
||||
if end == -1:
|
||||
return None
|
||||
raw = text[3:end]
|
||||
try:
|
||||
import yaml
|
||||
fm = yaml.safe_load(raw)
|
||||
return fm if isinstance(fm, dict) else None
|
||||
except ImportError:
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def extract_sourcers_from_file(path: Path) -> list[str]:
|
||||
"""Return the sourcer handles from a claim file's frontmatter.
|
||||
|
||||
Matches three formats:
|
||||
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
|
||||
2. Bare-key flat: `sourcer: alexastrum`
|
||||
3. Prefix-keyed: `attribution_sourcer: alexastrum`
|
||||
"""
|
||||
try:
|
||||
content = path.read_text(encoding="utf-8")
|
||||
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
|
||||
return []
|
||||
fm = parse_frontmatter(content)
|
||||
if not fm:
|
||||
return []
|
||||
|
||||
handles: list[str] = []
|
||||
|
||||
attr = fm.get("attribution")
|
||||
if isinstance(attr, dict):
|
||||
entries = attr.get("sourcer", [])
|
||||
if isinstance(entries, list):
|
||||
for e in entries:
|
||||
if isinstance(e, dict) and "handle" in e:
|
||||
handles.append(e["handle"])
|
||||
elif isinstance(e, str):
|
||||
handles.append(e)
|
||||
elif isinstance(entries, str):
|
||||
handles.append(entries)
|
||||
return handles
|
||||
|
||||
flat = fm.get("attribution_sourcer")
|
||||
if flat:
|
||||
if isinstance(flat, str):
|
||||
handles.append(flat)
|
||||
elif isinstance(flat, list):
|
||||
handles.extend(v for v in flat if isinstance(v, str))
|
||||
if handles:
|
||||
return handles
|
||||
|
||||
bare = fm.get("sourcer")
|
||||
if bare:
|
||||
if isinstance(bare, str):
|
||||
handles.append(bare)
|
||||
elif isinstance(bare, list):
|
||||
handles.extend(v for v in bare if isinstance(v, str))
|
||||
|
||||
return handles
|
||||
|
||||
|
||||
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
|
||||
|
||||
|
||||
def valid_handle(h: str) -> bool:
|
||||
if not h:
|
||||
return False
|
||||
lower = h.strip().lower().lstrip("@")
|
||||
if lower.endswith("-") or lower.endswith("_"):
|
||||
return False
|
||||
return bool(_HANDLE_RE.match(lower))
|
||||
|
||||
|
||||
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
|
||||
"""Run a git command, return stdout. Returns empty string on failure."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", *args],
|
||||
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
|
||||
)
|
||||
return result.stdout
|
||||
except (subprocess.TimeoutExpired, OSError):
|
||||
return ""
|
||||
|
||||
|
||||
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
|
||||
"""Best-effort: find git author of first non-merge commit on the branch.
|
||||
|
||||
PR branches are usually deleted after merge. We fall back to scanning main
|
||||
commits around merged_at for commits matching the branch slug.
|
||||
"""
|
||||
# Post-merge branches are cleaned up. For the backfill, we accept that this
|
||||
# path rarely yields results and rely on submitted_by + branch prefix.
|
||||
return ""
|
||||
|
||||
|
||||
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
|
||||
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
|
||||
if pr.get("submitted_by"):
|
||||
cand = pr["submitted_by"].strip().lower().lstrip("@")
|
||||
if cand and cand not in BOT_AUTHORS:
|
||||
return cand
|
||||
branch = pr.get("branch") or ""
|
||||
if "/" in branch:
|
||||
prefix = branch.split("/", 1)[0].lower()
|
||||
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
|
||||
return prefix
|
||||
return None
|
||||
|
||||
|
||||
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
|
||||
canonical = normalize_handle(conn, handle)
|
||||
if not valid_handle(canonical):
|
||||
return
|
||||
kind = classify_kind(canonical)
|
||||
weight = ROLE_WEIGHTS[role]
|
||||
counts[(role, "attempt")] += 1
|
||||
if dry_run:
|
||||
counts[(role, "would_insert")] += 1
|
||||
return
|
||||
cur = conn.execute(
|
||||
"""INSERT OR IGNORE INTO contribution_events
|
||||
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
|
||||
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
|
||||
)
|
||||
if cur.rowcount > 0:
|
||||
counts[(role, "inserted")] += 1
|
||||
else:
|
||||
counts[(role, "skipped_dup")] += 1
|
||||
|
||||
|
||||
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
|
||||
"""Best-effort: list added .md files in the PR.
|
||||
|
||||
Uses prs.source_path as a fallback signal (the claim being added). If the
|
||||
branch no longer exists post-merge, this will return []; we accept the loss
|
||||
for historical PRs where the granular per-claim events can't be recovered —
|
||||
PR-level author/evaluator events still land correctly.
|
||||
"""
|
||||
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
|
||||
# For the backfill we use prs.source_path — for extract/* PRs this points to
|
||||
# the source inbox file; we can glob the claim files from the extract branch
|
||||
# commit on main. But main's commits don't track which files a given PR touched.
|
||||
# Accept the loss: backfill emits only PR-level events (author, evaluator,
|
||||
# challenger/synthesizer). Originator events come from parsing claim files
|
||||
# attributed to the branch via description field which lists claim titles.
|
||||
return []
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not Path(DB_PATH).exists():
|
||||
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
conn = sqlite3.connect(DB_PATH, timeout=30)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Sanity: contribution_events exists (v24 migration applied)
|
||||
try:
|
||||
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
# Walk all merged knowledge PRs
|
||||
query = """
|
||||
SELECT number, branch, domain, source_channel, submitted_by,
|
||||
leo_verdict, domain_verdict, domain_agent,
|
||||
commit_type, merged_at
|
||||
FROM prs
|
||||
WHERE status = 'merged'
|
||||
ORDER BY merged_at ASC
|
||||
"""
|
||||
if args.limit:
|
||||
query += f" LIMIT {args.limit}"
|
||||
prs = conn.execute(query).fetchall()
|
||||
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
|
||||
|
||||
counts: Counter = Counter()
|
||||
repo = Path(REPO_DIR)
|
||||
|
||||
for pr in prs:
|
||||
pr_number = pr["number"]
|
||||
branch = pr["branch"] or ""
|
||||
domain = pr["domain"]
|
||||
channel = pr["source_channel"]
|
||||
merged_at = pr["merged_at"]
|
||||
|
||||
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
|
||||
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
|
||||
# evaluator credit for Leo/domain_agent still applies.
|
||||
is_pipeline_branch = branch.startswith((
|
||||
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
|
||||
))
|
||||
|
||||
# ── AUTHOR ──
|
||||
# For pipeline branches, submitted_by carries the real author (the
|
||||
# human who submitted the source via Telegram/etc). For agent branches,
|
||||
# the agent is author. For external branches (gh-pr-*), git author is
|
||||
# in submitted_by from the sync-mirror pipeline.
|
||||
author = derive_author(conn, dict(pr))
|
||||
if author:
|
||||
emit(conn, counts, args.dry_run, author, "author", pr_number,
|
||||
None, domain, channel, merged_at)
|
||||
|
||||
# ── EVALUATOR ──
|
||||
if pr["leo_verdict"] == "approve":
|
||||
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
|
||||
None, domain, channel, merged_at)
|
||||
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
|
||||
dagent = pr["domain_agent"].strip().lower()
|
||||
if dagent and dagent != "leo":
|
||||
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
|
||||
None, domain, channel, merged_at)
|
||||
|
||||
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
|
||||
# Only fires on agent-owned branches. Pipeline branches aren't creditable
|
||||
# work (they're machine extraction, evaluator already captures the review).
|
||||
if branch.startswith(AGENT_BRANCH_PREFIXES):
|
||||
prefix = branch.split("/", 1)[0].lower()
|
||||
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
|
||||
if event_role:
|
||||
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
|
||||
None, domain, channel, merged_at)
|
||||
|
||||
# ── ORIGINATOR per claim ──
|
||||
# Walk claim files currently on main whose content was added in this PR.
|
||||
# We can't diff old branches (deleted post-merge), but for extract PRs
|
||||
# the source_path + description carry claim titles — too lossy to build
|
||||
# per-claim events reliably. Strategy: walk ALL claim files that have a
|
||||
# sourcer in their frontmatter and assign them to the PR whose
|
||||
# source_path matches (via description or filename heuristic).
|
||||
# DEFERRED: per-claim originator events require branch introspection
|
||||
# that fails on deleted branches. Backfill emits PR-level events only.
|
||||
# Forward traffic (post-deploy) gets per-claim originator events via
|
||||
# record_contributor_attribution's added-files walk.
|
||||
|
||||
if not args.dry_run:
|
||||
conn.commit()
|
||||
|
||||
print("\n=== Summary ===")
|
||||
for role in ("author", "originator", "challenger", "synthesizer", "evaluator"):
|
||||
att = counts[(role, "attempt")]
|
||||
if args.dry_run:
|
||||
wi = counts[(role, "would_insert")]
|
||||
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
|
||||
else:
|
||||
ins = counts[(role, "inserted")]
|
||||
skip = counts[(role, "skipped_dup")]
|
||||
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
|
||||
|
||||
if not args.dry_run:
|
||||
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
|
||||
print(f"\nTotal contribution_events rows: {total}")
|
||||
|
||||
# ── Per-claim originator pass ──
|
||||
# Separate pass: walk the current knowledge tree, parse sourcer frontmatter,
|
||||
# and attach each claim to the merging PR via a claim_path → pr_number map
|
||||
# built from prs.description (pipe-separated claim titles). Imperfect — some
|
||||
# PRs have NULL description or mismatched titles — but recovers the bulk of
|
||||
# historical originator credit.
|
||||
print("\n=== Claim-level originator pass ===")
|
||||
# Build title → pr_number map from prs.description
|
||||
title_to_pr: dict[str, int] = {}
|
||||
for r in conn.execute(
|
||||
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
|
||||
).fetchall():
|
||||
desc = r["description"] or ""
|
||||
for title in desc.split(" | "):
|
||||
title = title.strip()
|
||||
if title:
|
||||
# Last-writer wins. Conflicts are rare (titles unique in practice).
|
||||
title_to_pr[title.lower()] = r["number"]
|
||||
|
||||
claim_counts = Counter()
|
||||
claim_count = 0
|
||||
originator_count = 0
|
||||
for md in sorted(repo.glob("domains/**/*.md")) + \
|
||||
sorted(repo.glob("core/**/*.md")) + \
|
||||
sorted(repo.glob("foundations/**/*.md")) + \
|
||||
sorted(repo.glob("decisions/**/*.md")):
|
||||
rel = str(md.relative_to(repo))
|
||||
# Match via filename stem (with spaces and hyphens) against description titles
|
||||
stem = md.stem
|
||||
# Multiple matching strategies
|
||||
pr_number = title_to_pr.get(stem.lower())
|
||||
if not pr_number:
|
||||
# Hyphenated slug → space variant
|
||||
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
|
||||
if not pr_number:
|
||||
claim_counts["no_pr_match"] += 1
|
||||
continue
|
||||
|
||||
sourcers = extract_sourcers_from_file(md)
|
||||
if not sourcers:
|
||||
claim_counts["no_sourcer"] += 1
|
||||
continue
|
||||
|
||||
claim_count += 1
|
||||
# Look up author for this PR to skip self-credit
|
||||
pr_row = conn.execute(
|
||||
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
|
||||
(pr_number,),
|
||||
).fetchone()
|
||||
if not pr_row:
|
||||
continue
|
||||
author = derive_author(conn, dict(pr_row))
|
||||
author_canonical = normalize_handle(conn, author) if author else None
|
||||
|
||||
for src_handle in sourcers:
|
||||
src_canonical = normalize_handle(conn, src_handle)
|
||||
if not valid_handle(src_canonical):
|
||||
claim_counts["invalid_handle"] += 1
|
||||
continue
|
||||
if src_canonical == author_canonical:
|
||||
claim_counts["skip_self"] += 1
|
||||
continue
|
||||
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
|
||||
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
|
||||
originator_count += 1
|
||||
|
||||
if not args.dry_run:
|
||||
conn.commit()
|
||||
|
||||
print(f" Claims processed: {claim_count}")
|
||||
print(f" Originator events emitted: {originator_count}")
|
||||
print(f" Breakdown: {dict(claim_counts)}")
|
||||
|
||||
final_origin_attempted = counts[("originator", "attempt")]
|
||||
if args.dry_run:
|
||||
print(f" (dry-run) originator would_insert={counts[('originator', 'would_insert')]}")
|
||||
else:
|
||||
print(f" originator inserted={counts[('originator', 'inserted')]} skipped_dup={counts[('originator', 'skipped_dup')]}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in a new issue