Some checks are pending
CI / lint-and-test (push) Waiting to run
Introduces contribution_events table + non-breaking double-write. Schema
lands today, forward traffic writes events alongside existing count upserts,
backfill script replays history. Phase B will add leaderboard API reading
from events; Phase C switches Argus dashboard over.
## Schema v24 (lib/db.py)
- contribution_events: one row per credit-earning event
(id, handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
Partial UNIQUE indexes handle SQLite's NULL != NULL semantics:
idx_ce_unique_claim on (handle, role, pr_number, claim_path) WHERE claim_path NOT NULL
idx_ce_unique_pr on (handle, role, pr_number) WHERE claim_path IS NULL
PR-level events (evaluator, author, challenger, synthesizer) dedup on 3-tuple.
Per-claim events (originator) dedup on 4-tuple. Idempotent on replay.
- contributor_aliases: canonical handle mapping
Seeded: @thesensatore → thesensatore, cameron → cameron-s1
- contributors.kind TEXT DEFAULT 'person'
Migration seeds 'agent' for known Pentagon agent handles.
## Role model (confirmed by Cory Apr 24)
Weights: author 0.30, challenger 0.25, synthesizer 0.20, originator 0.15, evaluator 0.05
- author: human who submitted the PR (curation + submission work)
- originator: person who authored the underlying content (rewards external creators)
- challenger: agent/person who brought a productive disagreement
- synthesizer: cross-domain work (enrichments, research sessions)
- evaluator: reviewer who approved (Leo + domain agent)
Humans-are-always-author: agents credit is capped at evaluator/synthesizer/
challenger. Pentagon agents classify as kind='agent' and surface in the
agent-view leaderboard, not the default person view.
## Writer (lib/contributor.py)
- New insert_contribution_event(): idempotent INSERT OR IGNORE with alias
normalization + kind classification. Falls back silently on pre-v24 DBs.
- record_contributor_attribution double-writes alongside existing
upsert_contributor calls. Zero risk to current dashboard.
- Author event: emitted once per PR from prs.submitted_by → git author →
agent-branch-prefix.
- Originator events: emitted per claim from frontmatter sourcer, skipping
when sourcer == author (avoids self-credit double-count).
- Evaluator events: Leo (always when leo_verdict='approve') + domain_agent
(when domain_verdict='approve' and not Leo).
- Challenger/Synthesizer: emitted from Pentagon-Agent trailer on
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type.
Pipeline-owned branches (extract/*, reweave/*) get no trailer-based event —
infrastructure work isn't contribution credit.
## Helpers (lib/attribution.py)
- normalize_handle(raw, conn=None): lowercase + strip @ + alias lookup
- classify_kind(handle): returns 'agent' for PENTAGON_AGENTS, else 'person'
Intentionally narrow. Orgs get classified by operator review, not heuristics.
## Backfill (scripts/backfill-events.py)
Replays all merged PRs into events. Idempotent (safe to re-run). Emits:
- PR-level: author, evaluator, challenger, synthesizer
- Per-claim: originator (walks knowledge tree, matches via description titles)
Known limitation: post-merge PR branches are deleted from Forgejo, so we
can't diff them for granular per-claim events. Claim→PR mapping uses
prs.description (pipe-separated titles). Misses some edge cases but
recovers the bulk of historical originator credit. Forward traffic gets
clean per-claim events via the normal record_contributor_attribution path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
431 lines
16 KiB
Python
431 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
|
|
|
|
For each merged PR:
|
|
- Derive author from prs.submitted_by → git author → branch prefix
|
|
- Emit author event (role=author, weight=0.30, claim_path=NULL)
|
|
- For each claim file under a knowledge prefix, parse frontmatter and emit
|
|
originator events for sourcer entries that differ from the author
|
|
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
|
|
(when domain_verdict='approve' and not Leo)
|
|
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
|
|
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
|
|
|
|
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
|
|
|
|
Usage:
|
|
python3 scripts/backfill-events.py --dry-run # Count events without writing
|
|
python3 scripts/backfill-events.py # Apply
|
|
|
|
Runs read-only against the git worktree; only writes to pipeline.db.
|
|
"""
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
|
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
|
|
|
|
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
|
|
ROLE_WEIGHTS = {
|
|
"author": 0.30,
|
|
"challenger": 0.25,
|
|
"synthesizer": 0.20,
|
|
"originator": 0.15,
|
|
"evaluator": 0.05,
|
|
}
|
|
|
|
PENTAGON_AGENTS = frozenset({
|
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
|
"pipeline",
|
|
})
|
|
|
|
AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
|
|
"astra/", "oberon/")
|
|
|
|
TRAILER_EVENT_ROLE = {
|
|
"challenge": "challenger",
|
|
"enrich": "synthesizer",
|
|
"research": "synthesizer",
|
|
"reweave": "synthesizer",
|
|
}
|
|
|
|
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
|
|
|
|
BOT_AUTHORS = frozenset({
|
|
"teleo", "teleo-bot", "pipeline",
|
|
"github-actions[bot]", "forgejo-actions",
|
|
})
|
|
|
|
|
|
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
|
|
if not handle:
|
|
return ""
|
|
h = handle.strip().lower().lstrip("@")
|
|
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
|
|
if row:
|
|
return row[0]
|
|
return h
|
|
|
|
|
|
def classify_kind(handle: str) -> str:
|
|
h = handle.strip().lower().lstrip("@")
|
|
return "agent" if h in PENTAGON_AGENTS else "person"
|
|
|
|
|
|
def parse_frontmatter(text: str):
|
|
"""Minimal YAML frontmatter parser using PyYAML when available."""
|
|
if not text.startswith("---"):
|
|
return None
|
|
end = text.find("---", 3)
|
|
if end == -1:
|
|
return None
|
|
raw = text[3:end]
|
|
try:
|
|
import yaml
|
|
fm = yaml.safe_load(raw)
|
|
return fm if isinstance(fm, dict) else None
|
|
except ImportError:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def extract_sourcers_from_file(path: Path) -> list[str]:
|
|
"""Return the sourcer handles from a claim file's frontmatter.
|
|
|
|
Matches three formats:
|
|
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
|
|
2. Bare-key flat: `sourcer: alexastrum`
|
|
3. Prefix-keyed: `attribution_sourcer: alexastrum`
|
|
"""
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
|
|
return []
|
|
fm = parse_frontmatter(content)
|
|
if not fm:
|
|
return []
|
|
|
|
handles: list[str] = []
|
|
|
|
attr = fm.get("attribution")
|
|
if isinstance(attr, dict):
|
|
entries = attr.get("sourcer", [])
|
|
if isinstance(entries, list):
|
|
for e in entries:
|
|
if isinstance(e, dict) and "handle" in e:
|
|
handles.append(e["handle"])
|
|
elif isinstance(e, str):
|
|
handles.append(e)
|
|
elif isinstance(entries, str):
|
|
handles.append(entries)
|
|
return handles
|
|
|
|
flat = fm.get("attribution_sourcer")
|
|
if flat:
|
|
if isinstance(flat, str):
|
|
handles.append(flat)
|
|
elif isinstance(flat, list):
|
|
handles.extend(v for v in flat if isinstance(v, str))
|
|
if handles:
|
|
return handles
|
|
|
|
bare = fm.get("sourcer")
|
|
if bare:
|
|
if isinstance(bare, str):
|
|
handles.append(bare)
|
|
elif isinstance(bare, list):
|
|
handles.extend(v for v in bare if isinstance(v, str))
|
|
|
|
return handles
|
|
|
|
|
|
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
|
|
|
|
|
|
def valid_handle(h: str) -> bool:
|
|
if not h:
|
|
return False
|
|
lower = h.strip().lower().lstrip("@")
|
|
if lower.endswith("-") or lower.endswith("_"):
|
|
return False
|
|
return bool(_HANDLE_RE.match(lower))
|
|
|
|
|
|
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
|
|
"""Run a git command, return stdout. Returns empty string on failure."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", *args],
|
|
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
|
|
)
|
|
return result.stdout
|
|
except (subprocess.TimeoutExpired, OSError):
|
|
return ""
|
|
|
|
|
|
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
|
|
"""Best-effort: find git author of first non-merge commit on the branch.
|
|
|
|
PR branches are usually deleted after merge. We fall back to scanning main
|
|
commits around merged_at for commits matching the branch slug.
|
|
"""
|
|
# Post-merge branches are cleaned up. For the backfill, we accept that this
|
|
# path rarely yields results and rely on submitted_by + branch prefix.
|
|
return ""
|
|
|
|
|
|
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
|
|
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
|
|
if pr.get("submitted_by"):
|
|
cand = pr["submitted_by"].strip().lower().lstrip("@")
|
|
if cand and cand not in BOT_AUTHORS:
|
|
return cand
|
|
branch = pr.get("branch") or ""
|
|
if "/" in branch:
|
|
prefix = branch.split("/", 1)[0].lower()
|
|
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
|
|
return prefix
|
|
return None
|
|
|
|
|
|
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
|
|
canonical = normalize_handle(conn, handle)
|
|
if not valid_handle(canonical):
|
|
return
|
|
kind = classify_kind(canonical)
|
|
weight = ROLE_WEIGHTS[role]
|
|
counts[(role, "attempt")] += 1
|
|
if dry_run:
|
|
counts[(role, "would_insert")] += 1
|
|
return
|
|
cur = conn.execute(
|
|
"""INSERT OR IGNORE INTO contribution_events
|
|
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
|
|
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
|
|
)
|
|
if cur.rowcount > 0:
|
|
counts[(role, "inserted")] += 1
|
|
else:
|
|
counts[(role, "skipped_dup")] += 1
|
|
|
|
|
|
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
|
|
"""Best-effort: list added .md files in the PR.
|
|
|
|
Uses prs.source_path as a fallback signal (the claim being added). If the
|
|
branch no longer exists post-merge, this will return []; we accept the loss
|
|
for historical PRs where the granular per-claim events can't be recovered —
|
|
PR-level author/evaluator events still land correctly.
|
|
"""
|
|
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
|
|
# For the backfill we use prs.source_path — for extract/* PRs this points to
|
|
# the source inbox file; we can glob the claim files from the extract branch
|
|
# commit on main. But main's commits don't track which files a given PR touched.
|
|
# Accept the loss: backfill emits only PR-level events (author, evaluator,
|
|
# challenger/synthesizer). Originator events come from parsing claim files
|
|
# attributed to the branch via description field which lists claim titles.
|
|
return []
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
|
|
args = parser.parse_args()
|
|
|
|
if not Path(DB_PATH).exists():
|
|
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(DB_PATH, timeout=30)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# Sanity: contribution_events exists (v24 migration applied)
|
|
try:
|
|
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
|
|
except sqlite3.OperationalError:
|
|
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# Walk all merged knowledge PRs
|
|
query = """
|
|
SELECT number, branch, domain, source_channel, submitted_by,
|
|
leo_verdict, domain_verdict, domain_agent,
|
|
commit_type, merged_at
|
|
FROM prs
|
|
WHERE status = 'merged'
|
|
ORDER BY merged_at ASC
|
|
"""
|
|
if args.limit:
|
|
query += f" LIMIT {args.limit}"
|
|
prs = conn.execute(query).fetchall()
|
|
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
|
|
|
|
counts: Counter = Counter()
|
|
repo = Path(REPO_DIR)
|
|
|
|
for pr in prs:
|
|
pr_number = pr["number"]
|
|
branch = pr["branch"] or ""
|
|
domain = pr["domain"]
|
|
channel = pr["source_channel"]
|
|
merged_at = pr["merged_at"]
|
|
|
|
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
|
|
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
|
|
# evaluator credit for Leo/domain_agent still applies.
|
|
is_pipeline_branch = branch.startswith((
|
|
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
|
|
))
|
|
|
|
# ── AUTHOR ──
|
|
# For pipeline branches, submitted_by carries the real author (the
|
|
# human who submitted the source via Telegram/etc). For agent branches,
|
|
# the agent is author. For external branches (gh-pr-*), git author is
|
|
# in submitted_by from the sync-mirror pipeline.
|
|
author = derive_author(conn, dict(pr))
|
|
if author:
|
|
emit(conn, counts, args.dry_run, author, "author", pr_number,
|
|
None, domain, channel, merged_at)
|
|
|
|
# ── EVALUATOR ──
|
|
if pr["leo_verdict"] == "approve":
|
|
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
|
|
None, domain, channel, merged_at)
|
|
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
|
|
dagent = pr["domain_agent"].strip().lower()
|
|
if dagent and dagent != "leo":
|
|
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
|
|
None, domain, channel, merged_at)
|
|
|
|
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
|
|
# Only fires on agent-owned branches. Pipeline branches aren't creditable
|
|
# work (they're machine extraction, evaluator already captures the review).
|
|
if branch.startswith(AGENT_BRANCH_PREFIXES):
|
|
prefix = branch.split("/", 1)[0].lower()
|
|
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
|
|
if event_role:
|
|
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
|
|
None, domain, channel, merged_at)
|
|
|
|
# ── ORIGINATOR per claim ──
|
|
# Walk claim files currently on main whose content was added in this PR.
|
|
# We can't diff old branches (deleted post-merge), but for extract PRs
|
|
# the source_path + description carry claim titles — too lossy to build
|
|
# per-claim events reliably. Strategy: walk ALL claim files that have a
|
|
# sourcer in their frontmatter and assign them to the PR whose
|
|
# source_path matches (via description or filename heuristic).
|
|
# DEFERRED: per-claim originator events require branch introspection
|
|
# that fails on deleted branches. Backfill emits PR-level events only.
|
|
# Forward traffic (post-deploy) gets per-claim originator events via
|
|
# record_contributor_attribution's added-files walk.
|
|
|
|
if not args.dry_run:
|
|
conn.commit()
|
|
|
|
print("\n=== Summary ===")
|
|
for role in ("author", "originator", "challenger", "synthesizer", "evaluator"):
|
|
att = counts[(role, "attempt")]
|
|
if args.dry_run:
|
|
wi = counts[(role, "would_insert")]
|
|
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
|
|
else:
|
|
ins = counts[(role, "inserted")]
|
|
skip = counts[(role, "skipped_dup")]
|
|
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
|
|
|
|
if not args.dry_run:
|
|
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
|
|
print(f"\nTotal contribution_events rows: {total}")
|
|
|
|
# ── Per-claim originator pass ──
|
|
# Separate pass: walk the current knowledge tree, parse sourcer frontmatter,
|
|
# and attach each claim to the merging PR via a claim_path → pr_number map
|
|
# built from prs.description (pipe-separated claim titles). Imperfect — some
|
|
# PRs have NULL description or mismatched titles — but recovers the bulk of
|
|
# historical originator credit.
|
|
print("\n=== Claim-level originator pass ===")
|
|
# Build title → pr_number map from prs.description
|
|
title_to_pr: dict[str, int] = {}
|
|
for r in conn.execute(
|
|
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
|
|
).fetchall():
|
|
desc = r["description"] or ""
|
|
for title in desc.split(" | "):
|
|
title = title.strip()
|
|
if title:
|
|
# Last-writer wins. Conflicts are rare (titles unique in practice).
|
|
title_to_pr[title.lower()] = r["number"]
|
|
|
|
claim_counts = Counter()
|
|
claim_count = 0
|
|
originator_count = 0
|
|
for md in sorted(repo.glob("domains/**/*.md")) + \
|
|
sorted(repo.glob("core/**/*.md")) + \
|
|
sorted(repo.glob("foundations/**/*.md")) + \
|
|
sorted(repo.glob("decisions/**/*.md")):
|
|
rel = str(md.relative_to(repo))
|
|
# Match via filename stem (with spaces and hyphens) against description titles
|
|
stem = md.stem
|
|
# Multiple matching strategies
|
|
pr_number = title_to_pr.get(stem.lower())
|
|
if not pr_number:
|
|
# Hyphenated slug → space variant
|
|
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
|
|
if not pr_number:
|
|
claim_counts["no_pr_match"] += 1
|
|
continue
|
|
|
|
sourcers = extract_sourcers_from_file(md)
|
|
if not sourcers:
|
|
claim_counts["no_sourcer"] += 1
|
|
continue
|
|
|
|
claim_count += 1
|
|
# Look up author for this PR to skip self-credit
|
|
pr_row = conn.execute(
|
|
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
|
|
(pr_number,),
|
|
).fetchone()
|
|
if not pr_row:
|
|
continue
|
|
author = derive_author(conn, dict(pr_row))
|
|
author_canonical = normalize_handle(conn, author) if author else None
|
|
|
|
for src_handle in sourcers:
|
|
src_canonical = normalize_handle(conn, src_handle)
|
|
if not valid_handle(src_canonical):
|
|
claim_counts["invalid_handle"] += 1
|
|
continue
|
|
if src_canonical == author_canonical:
|
|
claim_counts["skip_self"] += 1
|
|
continue
|
|
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
|
|
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
|
|
originator_count += 1
|
|
|
|
if not args.dry_run:
|
|
conn.commit()
|
|
|
|
print(f" Claims processed: {claim_count}")
|
|
print(f" Originator events emitted: {originator_count}")
|
|
print(f" Breakdown: {dict(claim_counts)}")
|
|
|
|
final_origin_attempted = counts[("originator", "attempt")]
|
|
if args.dry_run:
|
|
print(f" (dry-run) originator would_insert={counts[('originator', 'would_insert')]}")
|
|
else:
|
|
print(f" originator inserted={counts[('originator', 'inserted')]} skipped_dup={counts[('originator', 'skipped_dup')]}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|