feat(attribution): Phase A — event-sourced contribution ledger (schema v24)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Some checks are pending
CI / lint-and-test (push) Waiting to run
Introduces contribution_events table + non-breaking double-write. Schema
lands today, forward traffic writes events alongside existing count upserts,
backfill script replays history. Phase B will add leaderboard API reading
from events; Phase C switches Argus dashboard over.
## Schema v24 (lib/db.py)
- contribution_events: one row per credit-earning event
(id, handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
Partial UNIQUE indexes handle SQLite's NULL != NULL semantics:
idx_ce_unique_claim on (handle, role, pr_number, claim_path) WHERE claim_path NOT NULL
idx_ce_unique_pr on (handle, role, pr_number) WHERE claim_path IS NULL
PR-level events (evaluator, author, challenger, synthesizer) dedup on 3-tuple.
Per-claim events (originator) dedup on 4-tuple. Idempotent on replay.
- contributor_aliases: canonical handle mapping
Seeded: @thesensatore → thesensatore, cameron → cameron-s1
- contributors.kind TEXT DEFAULT 'person'
Migration seeds 'agent' for known Pentagon agent handles.
## Role model (confirmed by Cory Apr 24)
Weights: author 0.30, challenger 0.25, synthesizer 0.20, originator 0.15, evaluator 0.05
- author: human who submitted the PR (curation + submission work)
- originator: person who authored the underlying content (rewards external creators)
- challenger: agent/person who brought a productive disagreement
- synthesizer: cross-domain work (enrichments, research sessions)
- evaluator: reviewer who approved (Leo + domain agent)
Humans-are-always-author: agents credit is capped at evaluator/synthesizer/
challenger. Pentagon agents classify as kind='agent' and surface in the
agent-view leaderboard, not the default person view.
## Writer (lib/contributor.py)
- New insert_contribution_event(): idempotent INSERT OR IGNORE with alias
normalization + kind classification. Falls back silently on pre-v24 DBs.
- record_contributor_attribution double-writes alongside existing
upsert_contributor calls. Zero risk to current dashboard.
- Author event: emitted once per PR from prs.submitted_by → git author →
agent-branch-prefix.
- Originator events: emitted per claim from frontmatter sourcer, skipping
when sourcer == author (avoids self-credit double-count).
- Evaluator events: Leo (always when leo_verdict='approve') + domain_agent
(when domain_verdict='approve' and not Leo).
- Challenger/Synthesizer: emitted from Pentagon-Agent trailer on
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type.
Pipeline-owned branches (extract/*, reweave/*) get no trailer-based event —
infrastructure work isn't contribution credit.
## Helpers (lib/attribution.py)
- normalize_handle(raw, conn=None): lowercase + strip @ + alias lookup
- classify_kind(handle): returns 'agent' for PENTAGON_AGENTS, else 'person'
Intentionally narrow. Orgs get classified by operator review, not heuristics.
## Backfill (scripts/backfill-events.py)
Replays all merged PRs into events. Idempotent (safe to re-run). Emits:
- PR-level: author, evaluator, challenger, synthesizer
- Per-claim: originator (walks knowledge tree, matches via description titles)
Known limitation: post-merge PR branches are deleted from Forgejo, so we
can't diff them for granular per-claim events. Claim→PR mapping uses
prs.description (pipe-separated titles). Misses some edge cases but
recovers the bulk of historical originator credit. Forward traffic gets
clean per-claim events via the normal record_contributor_attribution path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
93917f9fc2
commit
58fa8c5276
4 changed files with 748 additions and 2 deletions
|
|
@ -48,6 +48,58 @@ def _filter_valid_handles(result: dict) -> dict:
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Handle normalization + kind classification (schema v24) ──────────────
|
||||||
|
|
||||||
|
# Known Pentagon agents. Used to classify contributor kind='agent' so the
|
||||||
|
# leaderboard can filter them out of the default person view.
|
||||||
|
PENTAGON_AGENTS = frozenset({
|
||||||
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
||||||
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
||||||
|
"pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*)
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_handle(handle: str, conn=None) -> str:
|
||||||
|
"""Canonicalize a handle: lowercase, strip @, resolve alias if conn provided.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
'@thesensatore' → 'thesensatore'
|
||||||
|
'Cameron' → 'cameron' → 'cameron-s1' (via alias if seeded)
|
||||||
|
'CNBC' → 'cnbc'
|
||||||
|
|
||||||
|
Always lowercases and strips @ prefix. Alias resolution requires a conn
|
||||||
|
argument (not always available at parse time; merge-time writer passes it).
|
||||||
|
"""
|
||||||
|
if not handle:
|
||||||
|
return ""
|
||||||
|
h = handle.strip().lower().lstrip("@")
|
||||||
|
if conn is None:
|
||||||
|
return h
|
||||||
|
try:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,),
|
||||||
|
).fetchone()
|
||||||
|
if row:
|
||||||
|
return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0]
|
||||||
|
except Exception:
|
||||||
|
# Alias table might not exist yet on pre-v24 DBs — degrade gracefully.
|
||||||
|
logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True)
|
||||||
|
return h
|
||||||
|
|
||||||
|
|
||||||
|
def classify_kind(handle: str) -> str:
|
||||||
|
"""Return 'agent' for known Pentagon agents, 'person' otherwise.
|
||||||
|
|
||||||
|
The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review,
|
||||||
|
not inferred here. Keeping heuristics narrow: we know our own agents;
|
||||||
|
everything else defaults to person until explicitly classified.
|
||||||
|
"""
|
||||||
|
h = handle.strip().lower().lstrip("@")
|
||||||
|
if h in PENTAGON_AGENTS:
|
||||||
|
return "agent"
|
||||||
|
return "person"
|
||||||
|
|
||||||
|
|
||||||
# ─── Parse attribution from claim content ──────────────────────────────────
|
# ─── Parse attribution from claim content ──────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ Extracted from merge.py (Phase 5 decomposition). Functions:
|
||||||
- refine_commit_type: extract → challenge/enrich refinement from diff content
|
- refine_commit_type: extract → challenge/enrich refinement from diff content
|
||||||
- record_contributor_attribution: parse trailers + frontmatter, upsert contributors
|
- record_contributor_attribution: parse trailers + frontmatter, upsert contributors
|
||||||
- upsert_contributor: insert/update contributor record with role counts
|
- upsert_contributor: insert/update contributor record with role counts
|
||||||
|
- insert_contribution_event: event-sourced credit log (schema v24)
|
||||||
- recalculate_tier: tier promotion based on config rules
|
- recalculate_tier: tier promotion based on config rules
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -13,11 +14,69 @@ import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from . import config, db
|
from . import config, db
|
||||||
|
from .attribution import classify_kind, normalize_handle
|
||||||
from .forgejo import get_pr_diff
|
from .forgejo import get_pr_diff
|
||||||
|
|
||||||
logger = logging.getLogger("pipeline.contributor")
|
logger = logging.getLogger("pipeline.contributor")
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Event schema (v24) ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Role → CI weight, per Cory's confirmed schema (Apr 24 conversation).
|
||||||
|
# Humans-are-always-author rule: agents never accumulate author credit;
|
||||||
|
# evaluator (0.05) is the only agent-facing role. Internal agents still earn
|
||||||
|
# author/challenger/synthesizer on their own autonomous research PRs but
|
||||||
|
# surface in the kind='agent' leaderboard, not the default person view.
|
||||||
|
ROLE_WEIGHTS = {
|
||||||
|
"author": 0.30,
|
||||||
|
"challenger": 0.25,
|
||||||
|
"synthesizer": 0.20,
|
||||||
|
"originator": 0.15,
|
||||||
|
"evaluator": 0.05,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def insert_contribution_event(
|
||||||
|
conn,
|
||||||
|
handle: str,
|
||||||
|
role: str,
|
||||||
|
pr_number: int,
|
||||||
|
*,
|
||||||
|
claim_path: str | None = None,
|
||||||
|
domain: str | None = None,
|
||||||
|
channel: str | None = None,
|
||||||
|
timestamp: str | None = None,
|
||||||
|
) -> bool:
|
||||||
|
"""Emit a contribution_events row. Idempotent via UNIQUE constraint.
|
||||||
|
|
||||||
|
Returns True if the event was inserted, False if the constraint blocked it
|
||||||
|
(same handle/role/pr/claim_path combo already recorded — safe to replay).
|
||||||
|
|
||||||
|
Canonicalizes handle via alias table. Classifies kind from handle.
|
||||||
|
Falls back silently if contribution_events table doesn't exist yet (pre-v24).
|
||||||
|
"""
|
||||||
|
if role not in ROLE_WEIGHTS:
|
||||||
|
logger.warning("insert_contribution_event: unknown role %r", role)
|
||||||
|
return False
|
||||||
|
weight = ROLE_WEIGHTS[role]
|
||||||
|
canonical = normalize_handle(handle, conn=conn)
|
||||||
|
if not canonical:
|
||||||
|
return False
|
||||||
|
kind = classify_kind(canonical)
|
||||||
|
try:
|
||||||
|
cur = conn.execute(
|
||||||
|
"""INSERT OR IGNORE INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
|
||||||
|
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
|
||||||
|
)
|
||||||
|
return cur.rowcount > 0
|
||||||
|
except Exception:
|
||||||
|
logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r",
|
||||||
|
pr_number, canonical, role, exc_info=True)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_knowledge_pr(diff: str) -> bool:
|
def is_knowledge_pr(diff: str) -> bool:
|
||||||
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
|
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
|
||||||
|
|
||||||
|
|
@ -125,15 +184,88 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
||||||
return
|
return
|
||||||
|
|
||||||
# Refine commit_type from diff content (branch prefix may be too broad)
|
# Refine commit_type from diff content (branch prefix may be too broad)
|
||||||
row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
row = conn.execute(
|
||||||
|
"SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, "
|
||||||
|
"domain_verdict, domain_agent FROM prs WHERE number = ?",
|
||||||
|
(pr_number,),
|
||||||
|
).fetchone()
|
||||||
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
|
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
|
||||||
refined_type = refine_commit_type(diff, branch_type)
|
refined_type = refine_commit_type(diff, branch_type)
|
||||||
if refined_type != branch_type:
|
if refined_type != branch_type:
|
||||||
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
|
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
|
||||||
logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type)
|
logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type)
|
||||||
|
|
||||||
|
# Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites.
|
||||||
|
pr_domain = row["domain"] if row else None
|
||||||
|
pr_channel = row["source_channel"] if row else None
|
||||||
|
pr_submitted_by = row["submitted_by"] if row else None
|
||||||
|
|
||||||
|
# ── AUTHOR event (schema v24, double-write) ──
|
||||||
|
# Humans-are-always-author rule: the human in the loop gets author credit.
|
||||||
|
# Precedence: prs.submitted_by (set by extract.py from source proposed_by, or
|
||||||
|
# by discover for human PRs) → git author of first commit → branch-prefix agent.
|
||||||
|
# Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get
|
||||||
|
# author events from branch prefix; extract/ PRs carry submitted_by from the
|
||||||
|
# source's proposed_by field so the human who submitted gets credit via path 1.
|
||||||
|
author_candidate: str | None = None
|
||||||
|
if pr_submitted_by:
|
||||||
|
author_candidate = pr_submitted_by
|
||||||
|
else:
|
||||||
|
# External GitHub PRs: git author is the real submitter.
|
||||||
|
rc_author_head, author_head = await git_fn(
|
||||||
|
"log", f"origin/main..origin/{branch}", "--no-merges",
|
||||||
|
"--format=%an", "-1", timeout=5,
|
||||||
|
)
|
||||||
|
if rc_author_head == 0 and author_head.strip():
|
||||||
|
candidate = author_head.strip().lower()
|
||||||
|
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
|
||||||
|
"github-actions[bot]", "forgejo-actions"}:
|
||||||
|
author_candidate = candidate
|
||||||
|
# Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc.
|
||||||
|
if not author_candidate and "/" in branch:
|
||||||
|
prefix = branch.split("/", 1)[0]
|
||||||
|
# Exclude pipeline-infrastructure prefixes — those are not authors.
|
||||||
|
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
|
||||||
|
author_candidate = prefix
|
||||||
|
|
||||||
|
if author_candidate:
|
||||||
|
insert_contribution_event(
|
||||||
|
conn, author_candidate, "author", pr_number,
|
||||||
|
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── EVALUATOR events (schema v24) ──
|
||||||
|
# Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second
|
||||||
|
# reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when
|
||||||
|
# verdict is 'request_changes' — failed review isn't contribution credit.
|
||||||
|
if row:
|
||||||
|
if row["leo_verdict"] == "approve":
|
||||||
|
insert_contribution_event(
|
||||||
|
conn, "leo", "evaluator", pr_number,
|
||||||
|
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||||
|
)
|
||||||
|
if row["domain_verdict"] == "approve" and row["domain_agent"]:
|
||||||
|
dagent = row["domain_agent"].strip().lower()
|
||||||
|
if dagent and dagent != "leo": # don't double-credit leo
|
||||||
|
insert_contribution_event(
|
||||||
|
conn, dagent, "evaluator", pr_number,
|
||||||
|
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||||
|
)
|
||||||
|
|
||||||
# Parse Pentagon-Agent trailer from branch commit messages
|
# Parse Pentagon-Agent trailer from branch commit messages
|
||||||
agents_found: set[str] = set()
|
agents_found: set[str] = set()
|
||||||
|
# Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent
|
||||||
|
# challenger/synthesizer credit based on refined commit_type. Pipeline-owned
|
||||||
|
# branches (extract/*, reweave/*, etc.) don't — those are infra, not work.
|
||||||
|
_AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
|
||||||
|
"astra/", "oberon/")
|
||||||
|
is_agent_branch = branch.startswith(_AGENT_BRANCH_PREFIXES)
|
||||||
|
_TRAILER_EVENT_ROLE = {
|
||||||
|
"challenge": "challenger",
|
||||||
|
"enrich": "synthesizer",
|
||||||
|
"research": "synthesizer",
|
||||||
|
"reweave": "synthesizer",
|
||||||
|
}
|
||||||
rc, log_output = await git_fn(
|
rc, log_output = await git_fn(
|
||||||
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
|
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
|
||||||
timeout=10,
|
timeout=10,
|
||||||
|
|
@ -146,6 +278,14 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
||||||
upsert_contributor(
|
upsert_contributor(
|
||||||
conn, agent_name, agent_uuid, role, today,
|
conn, agent_name, agent_uuid, role, today,
|
||||||
)
|
)
|
||||||
|
# Event-emit only for agent-owned branches where the trailer's agent
|
||||||
|
# actually did the substantive work (challenger/synthesizer).
|
||||||
|
event_role = _TRAILER_EVENT_ROLE.get(refined_type)
|
||||||
|
if is_agent_branch and event_role:
|
||||||
|
insert_contribution_event(
|
||||||
|
conn, agent_name, event_role, pr_number,
|
||||||
|
claim_path=None, domain=pr_domain, channel=pr_channel,
|
||||||
|
)
|
||||||
agents_found.add(agent_name)
|
agents_found.add(agent_name)
|
||||||
|
|
||||||
# Parse attribution from NEWLY ADDED knowledge files via the canonical attribution
|
# Parse attribution from NEWLY ADDED knowledge files via the canonical attribution
|
||||||
|
|
@ -175,6 +315,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
||||||
# already matches via the claim file. Widening requires Cory sign-off
|
# already matches via the claim file. Widening requires Cory sign-off
|
||||||
# since it would change leaderboard accounting (entity-only PRs → CI credit).
|
# since it would change leaderboard accounting (entity-only PRs → CI credit).
|
||||||
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
|
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
|
||||||
|
author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None
|
||||||
for rel_path in files_output.strip().split("\n"):
|
for rel_path in files_output.strip().split("\n"):
|
||||||
rel_path = rel_path.strip()
|
rel_path = rel_path.strip()
|
||||||
if not rel_path.endswith(".md"):
|
if not rel_path.endswith(".md"):
|
||||||
|
|
@ -192,6 +333,20 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
|
||||||
upsert_contributor(
|
upsert_contributor(
|
||||||
conn, handle, entry.get("agent_id"), role, today,
|
conn, handle, entry.get("agent_id"), role, today,
|
||||||
)
|
)
|
||||||
|
# Event-emit: only 'sourcer' frontmatter entries become
|
||||||
|
# originator events. 'extractor' frontmatter = infrastructure
|
||||||
|
# (the Sonnet extraction agent), no event. challenger/
|
||||||
|
# synthesizer frontmatter is extremely rare at extract time.
|
||||||
|
# Skip originator if same as author — avoids double-credit
|
||||||
|
# when someone submits their own content (self-authored).
|
||||||
|
if role == "sourcer":
|
||||||
|
origin_canonical = normalize_handle(handle, conn=conn)
|
||||||
|
if origin_canonical and origin_canonical != author_canonical:
|
||||||
|
insert_contribution_event(
|
||||||
|
conn, handle, "originator", pr_number,
|
||||||
|
claim_path=rel_path,
|
||||||
|
domain=pr_domain, channel=pr_channel,
|
||||||
|
)
|
||||||
|
|
||||||
# Fallback: if no Pentagon-Agent trailer found, try git commit authors
|
# Fallback: if no Pentagon-Agent trailer found, try git commit authors
|
||||||
_BOT_AUTHORS = frozenset({
|
_BOT_AUTHORS = frozenset({
|
||||||
|
|
|
||||||
110
lib/db.py
110
lib/db.py
|
|
@ -9,7 +9,7 @@ from . import config
|
||||||
|
|
||||||
logger = logging.getLogger("pipeline.db")
|
logger = logging.getLogger("pipeline.db")
|
||||||
|
|
||||||
SCHEMA_VERSION = 23
|
SCHEMA_VERSION = 24
|
||||||
|
|
||||||
SCHEMA_SQL = """
|
SCHEMA_SQL = """
|
||||||
CREATE TABLE IF NOT EXISTS schema_version (
|
CREATE TABLE IF NOT EXISTS schema_version (
|
||||||
|
|
@ -163,6 +163,50 @@ CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
|
||||||
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
|
||||||
|
|
||||||
|
-- Event-sourced contributions (schema v24).
|
||||||
|
-- One row per credit-earning event. Idempotent via two partial UNIQUE indexes
|
||||||
|
-- (SQLite treats NULL != NULL in UNIQUE constraints, so a single composite
|
||||||
|
-- UNIQUE with nullable claim_path would allow evaluator-event duplicates).
|
||||||
|
-- Leaderboards are SQL aggregations over this table; contributors becomes a materialized cache.
|
||||||
|
CREATE TABLE IF NOT EXISTS contribution_events (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
handle TEXT NOT NULL,
|
||||||
|
kind TEXT NOT NULL DEFAULT 'person',
|
||||||
|
-- person | org | agent
|
||||||
|
role TEXT NOT NULL,
|
||||||
|
-- author | originator | challenger | synthesizer | evaluator
|
||||||
|
weight REAL NOT NULL,
|
||||||
|
pr_number INTEGER NOT NULL,
|
||||||
|
claim_path TEXT,
|
||||||
|
-- NULL for PR-level events (e.g. evaluator). Set for per-claim events.
|
||||||
|
domain TEXT,
|
||||||
|
channel TEXT,
|
||||||
|
-- telegram | github | agent | web | unknown
|
||||||
|
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
-- Per-claim events: unique on (handle, role, pr_number, claim_path) when path IS NOT NULL.
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
|
||||||
|
handle, role, pr_number, claim_path
|
||||||
|
) WHERE claim_path IS NOT NULL;
|
||||||
|
-- PR-level events (evaluator, author, trailer-based): unique on (handle, role, pr_number) when path IS NULL.
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
|
||||||
|
handle, role, pr_number
|
||||||
|
) WHERE claim_path IS NULL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
|
||||||
|
|
||||||
|
-- Handle aliasing. @thesensatore → thesensatore. cameron → cameron-s1.
|
||||||
|
-- Writers call resolve_alias(handle) before inserting events or upserting contributors.
|
||||||
|
CREATE TABLE IF NOT EXISTS contributor_aliases (
|
||||||
|
alias TEXT PRIMARY KEY,
|
||||||
|
canonical TEXT NOT NULL,
|
||||||
|
created_at TEXT DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -641,6 +685,70 @@ def migrate(conn: sqlite3.Connection):
|
||||||
conn.commit()
|
conn.commit()
|
||||||
logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup")
|
logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup")
|
||||||
|
|
||||||
|
if current < 24:
|
||||||
|
# Event-sourced contributions table + alias table + kind column on contributors.
|
||||||
|
# Non-breaking: contributors table stays; events are written in addition via
|
||||||
|
# double-write in merge.py. Leaderboards switch to events in Phase B.
|
||||||
|
conn.executescript("""
|
||||||
|
CREATE TABLE IF NOT EXISTS contribution_events (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
handle TEXT NOT NULL,
|
||||||
|
kind TEXT NOT NULL DEFAULT 'person',
|
||||||
|
role TEXT NOT NULL,
|
||||||
|
weight REAL NOT NULL,
|
||||||
|
pr_number INTEGER NOT NULL,
|
||||||
|
claim_path TEXT,
|
||||||
|
domain TEXT,
|
||||||
|
channel TEXT,
|
||||||
|
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
-- Partial unique indexes handle SQLite's NULL != NULL UNIQUE semantics.
|
||||||
|
-- Per-claim events dedup on 4-tuple; PR-level events dedup on 3-tuple.
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
|
||||||
|
handle, role, pr_number, claim_path
|
||||||
|
) WHERE claim_path IS NOT NULL;
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
|
||||||
|
handle, role, pr_number
|
||||||
|
) WHERE claim_path IS NULL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS contributor_aliases (
|
||||||
|
alias TEXT PRIMARY KEY,
|
||||||
|
canonical TEXT NOT NULL,
|
||||||
|
created_at TEXT DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
|
||||||
|
""")
|
||||||
|
try:
|
||||||
|
conn.execute("ALTER TABLE contributors ADD COLUMN kind TEXT DEFAULT 'person'")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
pass # column already exists
|
||||||
|
# Seed known aliases. @thesensatore → thesensatore catches the zombie row Argus flagged.
|
||||||
|
# cameron → cameron-s1 reconciles the Leo-flagged missing contributor.
|
||||||
|
conn.executemany(
|
||||||
|
"INSERT OR IGNORE INTO contributor_aliases (alias, canonical) VALUES (?, ?)",
|
||||||
|
[
|
||||||
|
("@thesensatore", "thesensatore"),
|
||||||
|
("cameron", "cameron-s1"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# Seed kind='agent' for known Pentagon agents so the events writer picks it up.
|
||||||
|
pentagon_agents = [
|
||||||
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
||||||
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
||||||
|
]
|
||||||
|
for agent in pentagon_agents:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
|
||||||
|
(agent,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column")
|
||||||
|
|
||||||
if current < SCHEMA_VERSION:
|
if current < SCHEMA_VERSION:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
|
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
|
||||||
|
|
|
||||||
431
scripts/backfill-events.py
Normal file
431
scripts/backfill-events.py
Normal file
|
|
@ -0,0 +1,431 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
|
||||||
|
|
||||||
|
For each merged PR:
|
||||||
|
- Derive author from prs.submitted_by → git author → branch prefix
|
||||||
|
- Emit author event (role=author, weight=0.30, claim_path=NULL)
|
||||||
|
- For each claim file under a knowledge prefix, parse frontmatter and emit
|
||||||
|
originator events for sourcer entries that differ from the author
|
||||||
|
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
|
||||||
|
(when domain_verdict='approve' and not Leo)
|
||||||
|
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
|
||||||
|
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
|
||||||
|
|
||||||
|
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 scripts/backfill-events.py --dry-run # Count events without writing
|
||||||
|
python3 scripts/backfill-events.py # Apply
|
||||||
|
|
||||||
|
Runs read-only against the git worktree; only writes to pipeline.db.
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from collections import Counter
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
||||||
|
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
|
||||||
|
|
||||||
|
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
|
||||||
|
ROLE_WEIGHTS = {
|
||||||
|
"author": 0.30,
|
||||||
|
"challenger": 0.25,
|
||||||
|
"synthesizer": 0.20,
|
||||||
|
"originator": 0.15,
|
||||||
|
"evaluator": 0.05,
|
||||||
|
}
|
||||||
|
|
||||||
|
PENTAGON_AGENTS = frozenset({
|
||||||
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
||||||
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
||||||
|
"pipeline",
|
||||||
|
})
|
||||||
|
|
||||||
|
AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
|
||||||
|
"astra/", "oberon/")
|
||||||
|
|
||||||
|
TRAILER_EVENT_ROLE = {
|
||||||
|
"challenge": "challenger",
|
||||||
|
"enrich": "synthesizer",
|
||||||
|
"research": "synthesizer",
|
||||||
|
"reweave": "synthesizer",
|
||||||
|
}
|
||||||
|
|
||||||
|
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
|
||||||
|
|
||||||
|
BOT_AUTHORS = frozenset({
|
||||||
|
"teleo", "teleo-bot", "pipeline",
|
||||||
|
"github-actions[bot]", "forgejo-actions",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
|
||||||
|
if not handle:
|
||||||
|
return ""
|
||||||
|
h = handle.strip().lower().lstrip("@")
|
||||||
|
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
|
||||||
|
if row:
|
||||||
|
return row[0]
|
||||||
|
return h
|
||||||
|
|
||||||
|
|
||||||
|
def classify_kind(handle: str) -> str:
|
||||||
|
h = handle.strip().lower().lstrip("@")
|
||||||
|
return "agent" if h in PENTAGON_AGENTS else "person"
|
||||||
|
|
||||||
|
|
||||||
|
def parse_frontmatter(text: str):
|
||||||
|
"""Minimal YAML frontmatter parser using PyYAML when available."""
|
||||||
|
if not text.startswith("---"):
|
||||||
|
return None
|
||||||
|
end = text.find("---", 3)
|
||||||
|
if end == -1:
|
||||||
|
return None
|
||||||
|
raw = text[3:end]
|
||||||
|
try:
|
||||||
|
import yaml
|
||||||
|
fm = yaml.safe_load(raw)
|
||||||
|
return fm if isinstance(fm, dict) else None
|
||||||
|
except ImportError:
|
||||||
|
return None
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def extract_sourcers_from_file(path: Path) -> list[str]:
|
||||||
|
"""Return the sourcer handles from a claim file's frontmatter.
|
||||||
|
|
||||||
|
Matches three formats:
|
||||||
|
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
|
||||||
|
2. Bare-key flat: `sourcer: alexastrum`
|
||||||
|
3. Prefix-keyed: `attribution_sourcer: alexastrum`
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
content = path.read_text(encoding="utf-8")
|
||||||
|
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
|
||||||
|
return []
|
||||||
|
fm = parse_frontmatter(content)
|
||||||
|
if not fm:
|
||||||
|
return []
|
||||||
|
|
||||||
|
handles: list[str] = []
|
||||||
|
|
||||||
|
attr = fm.get("attribution")
|
||||||
|
if isinstance(attr, dict):
|
||||||
|
entries = attr.get("sourcer", [])
|
||||||
|
if isinstance(entries, list):
|
||||||
|
for e in entries:
|
||||||
|
if isinstance(e, dict) and "handle" in e:
|
||||||
|
handles.append(e["handle"])
|
||||||
|
elif isinstance(e, str):
|
||||||
|
handles.append(e)
|
||||||
|
elif isinstance(entries, str):
|
||||||
|
handles.append(entries)
|
||||||
|
return handles
|
||||||
|
|
||||||
|
flat = fm.get("attribution_sourcer")
|
||||||
|
if flat:
|
||||||
|
if isinstance(flat, str):
|
||||||
|
handles.append(flat)
|
||||||
|
elif isinstance(flat, list):
|
||||||
|
handles.extend(v for v in flat if isinstance(v, str))
|
||||||
|
if handles:
|
||||||
|
return handles
|
||||||
|
|
||||||
|
bare = fm.get("sourcer")
|
||||||
|
if bare:
|
||||||
|
if isinstance(bare, str):
|
||||||
|
handles.append(bare)
|
||||||
|
elif isinstance(bare, list):
|
||||||
|
handles.extend(v for v in bare if isinstance(v, str))
|
||||||
|
|
||||||
|
return handles
|
||||||
|
|
||||||
|
|
||||||
|
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
|
||||||
|
|
||||||
|
|
||||||
|
def valid_handle(h: str) -> bool:
|
||||||
|
if not h:
|
||||||
|
return False
|
||||||
|
lower = h.strip().lower().lstrip("@")
|
||||||
|
if lower.endswith("-") or lower.endswith("_"):
|
||||||
|
return False
|
||||||
|
return bool(_HANDLE_RE.match(lower))
|
||||||
|
|
||||||
|
|
||||||
|
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
|
||||||
|
"""Run a git command, return stdout. Returns empty string on failure."""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["git", *args],
|
||||||
|
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
|
||||||
|
)
|
||||||
|
return result.stdout
|
||||||
|
except (subprocess.TimeoutExpired, OSError):
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
|
||||||
|
"""Best-effort: find git author of first non-merge commit on the branch.
|
||||||
|
|
||||||
|
PR branches are usually deleted after merge. We fall back to scanning main
|
||||||
|
commits around merged_at for commits matching the branch slug.
|
||||||
|
"""
|
||||||
|
# Post-merge branches are cleaned up. For the backfill, we accept that this
|
||||||
|
# path rarely yields results and rely on submitted_by + branch prefix.
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
|
||||||
|
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
|
||||||
|
if pr.get("submitted_by"):
|
||||||
|
cand = pr["submitted_by"].strip().lower().lstrip("@")
|
||||||
|
if cand and cand not in BOT_AUTHORS:
|
||||||
|
return cand
|
||||||
|
branch = pr.get("branch") or ""
|
||||||
|
if "/" in branch:
|
||||||
|
prefix = branch.split("/", 1)[0].lower()
|
||||||
|
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
|
||||||
|
return prefix
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
|
||||||
|
canonical = normalize_handle(conn, handle)
|
||||||
|
if not valid_handle(canonical):
|
||||||
|
return
|
||||||
|
kind = classify_kind(canonical)
|
||||||
|
weight = ROLE_WEIGHTS[role]
|
||||||
|
counts[(role, "attempt")] += 1
|
||||||
|
if dry_run:
|
||||||
|
counts[(role, "would_insert")] += 1
|
||||||
|
return
|
||||||
|
cur = conn.execute(
|
||||||
|
"""INSERT OR IGNORE INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
|
||||||
|
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
|
||||||
|
)
|
||||||
|
if cur.rowcount > 0:
|
||||||
|
counts[(role, "inserted")] += 1
|
||||||
|
else:
|
||||||
|
counts[(role, "skipped_dup")] += 1
|
||||||
|
|
||||||
|
|
||||||
|
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
|
||||||
|
"""Best-effort: list added .md files in the PR.
|
||||||
|
|
||||||
|
Uses prs.source_path as a fallback signal (the claim being added). If the
|
||||||
|
branch no longer exists post-merge, this will return []; we accept the loss
|
||||||
|
for historical PRs where the granular per-claim events can't be recovered —
|
||||||
|
PR-level author/evaluator events still land correctly.
|
||||||
|
"""
|
||||||
|
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
|
||||||
|
# For the backfill we use prs.source_path — for extract/* PRs this points to
|
||||||
|
# the source inbox file; we can glob the claim files from the extract branch
|
||||||
|
# commit on main. But main's commits don't track which files a given PR touched.
|
||||||
|
# Accept the loss: backfill emits only PR-level events (author, evaluator,
|
||||||
|
# challenger/synthesizer). Originator events come from parsing claim files
|
||||||
|
# attributed to the branch via description field which lists claim titles.
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--dry-run", action="store_true")
|
||||||
|
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not Path(DB_PATH).exists():
|
||||||
|
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
conn = sqlite3.connect(DB_PATH, timeout=30)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
|
||||||
|
# Sanity: contribution_events exists (v24 migration applied)
|
||||||
|
try:
|
||||||
|
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
# Walk all merged knowledge PRs
|
||||||
|
query = """
|
||||||
|
SELECT number, branch, domain, source_channel, submitted_by,
|
||||||
|
leo_verdict, domain_verdict, domain_agent,
|
||||||
|
commit_type, merged_at
|
||||||
|
FROM prs
|
||||||
|
WHERE status = 'merged'
|
||||||
|
ORDER BY merged_at ASC
|
||||||
|
"""
|
||||||
|
if args.limit:
|
||||||
|
query += f" LIMIT {args.limit}"
|
||||||
|
prs = conn.execute(query).fetchall()
|
||||||
|
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
|
||||||
|
|
||||||
|
counts: Counter = Counter()
|
||||||
|
repo = Path(REPO_DIR)
|
||||||
|
|
||||||
|
for pr in prs:
|
||||||
|
pr_number = pr["number"]
|
||||||
|
branch = pr["branch"] or ""
|
||||||
|
domain = pr["domain"]
|
||||||
|
channel = pr["source_channel"]
|
||||||
|
merged_at = pr["merged_at"]
|
||||||
|
|
||||||
|
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
|
||||||
|
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
|
||||||
|
# evaluator credit for Leo/domain_agent still applies.
|
||||||
|
is_pipeline_branch = branch.startswith((
|
||||||
|
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
|
||||||
|
))
|
||||||
|
|
||||||
|
# ── AUTHOR ──
|
||||||
|
# For pipeline branches, submitted_by carries the real author (the
|
||||||
|
# human who submitted the source via Telegram/etc). For agent branches,
|
||||||
|
# the agent is author. For external branches (gh-pr-*), git author is
|
||||||
|
# in submitted_by from the sync-mirror pipeline.
|
||||||
|
author = derive_author(conn, dict(pr))
|
||||||
|
if author:
|
||||||
|
emit(conn, counts, args.dry_run, author, "author", pr_number,
|
||||||
|
None, domain, channel, merged_at)
|
||||||
|
|
||||||
|
# ── EVALUATOR ──
|
||||||
|
if pr["leo_verdict"] == "approve":
|
||||||
|
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
|
||||||
|
None, domain, channel, merged_at)
|
||||||
|
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
|
||||||
|
dagent = pr["domain_agent"].strip().lower()
|
||||||
|
if dagent and dagent != "leo":
|
||||||
|
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
|
||||||
|
None, domain, channel, merged_at)
|
||||||
|
|
||||||
|
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
|
||||||
|
# Only fires on agent-owned branches. Pipeline branches aren't creditable
|
||||||
|
# work (they're machine extraction, evaluator already captures the review).
|
||||||
|
if branch.startswith(AGENT_BRANCH_PREFIXES):
|
||||||
|
prefix = branch.split("/", 1)[0].lower()
|
||||||
|
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
|
||||||
|
if event_role:
|
||||||
|
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
|
||||||
|
None, domain, channel, merged_at)
|
||||||
|
|
||||||
|
# ── ORIGINATOR per claim ──
|
||||||
|
# Walk claim files currently on main whose content was added in this PR.
|
||||||
|
# We can't diff old branches (deleted post-merge), but for extract PRs
|
||||||
|
# the source_path + description carry claim titles — too lossy to build
|
||||||
|
# per-claim events reliably. Strategy: walk ALL claim files that have a
|
||||||
|
# sourcer in their frontmatter and assign them to the PR whose
|
||||||
|
# source_path matches (via description or filename heuristic).
|
||||||
|
# DEFERRED: per-claim originator events require branch introspection
|
||||||
|
# that fails on deleted branches. Backfill emits PR-level events only.
|
||||||
|
# Forward traffic (post-deploy) gets per-claim originator events via
|
||||||
|
# record_contributor_attribution's added-files walk.
|
||||||
|
|
||||||
|
if not args.dry_run:
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
print("\n=== Summary ===")
|
||||||
|
for role in ("author", "originator", "challenger", "synthesizer", "evaluator"):
|
||||||
|
att = counts[(role, "attempt")]
|
||||||
|
if args.dry_run:
|
||||||
|
wi = counts[(role, "would_insert")]
|
||||||
|
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
|
||||||
|
else:
|
||||||
|
ins = counts[(role, "inserted")]
|
||||||
|
skip = counts[(role, "skipped_dup")]
|
||||||
|
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
|
||||||
|
|
||||||
|
if not args.dry_run:
|
||||||
|
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
|
||||||
|
print(f"\nTotal contribution_events rows: {total}")
|
||||||
|
|
||||||
|
# ── Per-claim originator pass ──
|
||||||
|
# Separate pass: walk the current knowledge tree, parse sourcer frontmatter,
|
||||||
|
# and attach each claim to the merging PR via a claim_path → pr_number map
|
||||||
|
# built from prs.description (pipe-separated claim titles). Imperfect — some
|
||||||
|
# PRs have NULL description or mismatched titles — but recovers the bulk of
|
||||||
|
# historical originator credit.
|
||||||
|
print("\n=== Claim-level originator pass ===")
|
||||||
|
# Build title → pr_number map from prs.description
|
||||||
|
title_to_pr: dict[str, int] = {}
|
||||||
|
for r in conn.execute(
|
||||||
|
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
|
||||||
|
).fetchall():
|
||||||
|
desc = r["description"] or ""
|
||||||
|
for title in desc.split(" | "):
|
||||||
|
title = title.strip()
|
||||||
|
if title:
|
||||||
|
# Last-writer wins. Conflicts are rare (titles unique in practice).
|
||||||
|
title_to_pr[title.lower()] = r["number"]
|
||||||
|
|
||||||
|
claim_counts = Counter()
|
||||||
|
claim_count = 0
|
||||||
|
originator_count = 0
|
||||||
|
for md in sorted(repo.glob("domains/**/*.md")) + \
|
||||||
|
sorted(repo.glob("core/**/*.md")) + \
|
||||||
|
sorted(repo.glob("foundations/**/*.md")) + \
|
||||||
|
sorted(repo.glob("decisions/**/*.md")):
|
||||||
|
rel = str(md.relative_to(repo))
|
||||||
|
# Match via filename stem (with spaces and hyphens) against description titles
|
||||||
|
stem = md.stem
|
||||||
|
# Multiple matching strategies
|
||||||
|
pr_number = title_to_pr.get(stem.lower())
|
||||||
|
if not pr_number:
|
||||||
|
# Hyphenated slug → space variant
|
||||||
|
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
|
||||||
|
if not pr_number:
|
||||||
|
claim_counts["no_pr_match"] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
sourcers = extract_sourcers_from_file(md)
|
||||||
|
if not sourcers:
|
||||||
|
claim_counts["no_sourcer"] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
claim_count += 1
|
||||||
|
# Look up author for this PR to skip self-credit
|
||||||
|
pr_row = conn.execute(
|
||||||
|
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
|
||||||
|
(pr_number,),
|
||||||
|
).fetchone()
|
||||||
|
if not pr_row:
|
||||||
|
continue
|
||||||
|
author = derive_author(conn, dict(pr_row))
|
||||||
|
author_canonical = normalize_handle(conn, author) if author else None
|
||||||
|
|
||||||
|
for src_handle in sourcers:
|
||||||
|
src_canonical = normalize_handle(conn, src_handle)
|
||||||
|
if not valid_handle(src_canonical):
|
||||||
|
claim_counts["invalid_handle"] += 1
|
||||||
|
continue
|
||||||
|
if src_canonical == author_canonical:
|
||||||
|
claim_counts["skip_self"] += 1
|
||||||
|
continue
|
||||||
|
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
|
||||||
|
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
|
||||||
|
originator_count += 1
|
||||||
|
|
||||||
|
if not args.dry_run:
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
print(f" Claims processed: {claim_count}")
|
||||||
|
print(f" Originator events emitted: {originator_count}")
|
||||||
|
print(f" Breakdown: {dict(claim_counts)}")
|
||||||
|
|
||||||
|
final_origin_attempted = counts[("originator", "attempt")]
|
||||||
|
if args.dry_run:
|
||||||
|
print(f" (dry-run) originator would_insert={counts[('originator', 'would_insert')]}")
|
||||||
|
else:
|
||||||
|
print(f" originator inserted={counts[('originator', 'inserted')]} skipped_dup={counts[('originator', 'skipped_dup')]}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue