teleo-infrastructure/lib/contributor.py
m3taversal dea1b02aa6 fix(attribution): narrow exception + document gate asymmetry (Ganymede review)
Two follow-up fixes from Ganymede's review of d0fb4c9:

1. is_publisher_handle: narrow `except Exception` to sqlite3.OperationalError.
   Pre-v26 DB fallback only needs to catch the "table doesn't exist" case;
   broader exceptions (programming errors, locks, corruption) should propagate.

2. upsert_contributor gate: add comment documenting the alias-resolution
   asymmetry between insert_contribution_event (alias-resolved via
   normalize_handle) and upsert_contributor (bare lower+lstrip-@). Today this
   is fine because the v26 classifier produced one publisher row per canonical
   handle. Branch 3 will normalize alias→canonical at writer entry points,
   tightening this gate transparently.

Unit tests for the gates (positive + negative + alias resolution) deferred to
Branch 3 alongside the auto-create flow tests.

Smoke-tested:
  - pre-v26 fallback (no publishers table) → None (correct)
  - case-insensitive match (CNBC → id=1) → correct
  - @ prefix strip (@cnbc → id=1) → correct
  - non-publisher handle (alexastrum) → None (correct)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:25:24 +01:00

512 lines
23 KiB
Python

"""Contributor attribution — tracks who contributed what and calculates tiers.
Extracted from merge.py (Phase 5 decomposition). Functions:
- is_knowledge_pr: diff classification (knowledge vs pipeline-only)
- refine_commit_type: extract → challenge/enrich refinement from diff content
- record_contributor_attribution: parse trailers + frontmatter, upsert contributors
- upsert_contributor: insert/update contributor record with role counts
- insert_contribution_event: event-sourced credit log (schema v24)
- recalculate_tier: tier promotion based on config rules
"""
import json
import logging
import re
from . import config, db
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor")
# ─── Event schema (v24) ───────────────────────────────────────────────────
# Role → CI weight, per Cory's confirmed schema (Apr 24 conversation).
# Humans-are-always-author rule: agents never accumulate author credit;
# evaluator (0.05) is the only agent-facing role. Internal agents still earn
# author/challenger/synthesizer on their own autonomous research PRs but
# surface in the kind='agent' leaderboard, not the default person view.
ROLE_WEIGHTS = {
"author": 0.30,
"challenger": 0.25,
"synthesizer": 0.20,
"originator": 0.15,
"evaluator": 0.05,
}
def insert_contribution_event(
conn,
handle: str,
role: str,
pr_number: int,
*,
claim_path: str | None = None,
domain: str | None = None,
channel: str | None = None,
timestamp: str | None = None,
) -> bool:
"""Emit a contribution_events row. Idempotent via UNIQUE constraint.
Returns True if the event was inserted, False if the constraint blocked it
(same handle/role/pr/claim_path combo already recorded — safe to replay).
Canonicalizes handle via alias table. Classifies kind from handle.
Falls back silently if contribution_events table doesn't exist yet (pre-v24).
"""
if role not in ROLE_WEIGHTS:
logger.warning("insert_contribution_event: unknown role %r", role)
return False
weight = ROLE_WEIGHTS[role]
canonical = normalize_handle(handle, conn=conn)
if not canonical:
return False
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
# etc.) are provenance metadata, not contributors. Don't credit them. Without
# this gate every merge re-creates org events and undoes the v26 cleanup.
if is_publisher_handle(canonical, conn) is not None:
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
return False
kind = classify_kind(canonical)
try:
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
)
return cur.rowcount > 0
except Exception:
logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r",
pr_number, canonical, role, exc_info=True)
return False
def is_knowledge_pr(diff: str) -> bool:
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
Knowledge PRs get full CI attribution weight.
Pipeline-only PRs (inbox, entities, agents, archive) get zero CI weight.
Mixed PRs count as knowledge — if a PR adds a claim, it gets attribution
even if it also moves source files. Knowledge takes priority. (Ganymede review)
"""
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
for line in diff.split("\n"):
if line.startswith("+++ b/") or line.startswith("--- a/"):
path = line.split("/", 1)[1] if "/" in line else ""
if any(path.startswith(p) for p in knowledge_prefixes):
return True
return False
COMMIT_TYPE_TO_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"extract": "extractor",
"research": "synthesizer",
"entity": "extractor",
"reweave": "synthesizer",
"fix": "extractor",
}
def commit_type_to_role(commit_type: str) -> str:
"""Map a refined commit_type to a contributor role."""
return COMMIT_TYPE_TO_ROLE.get(commit_type, "extractor")
def refine_commit_type(diff: str, branch_commit_type: str) -> str:
"""Refine commit_type from diff content when branch prefix is ambiguous.
Branch prefix gives initial classification (extract, research, entity, etc.).
For 'extract' branches, diff content can distinguish:
- challenge: adds challenged_by edges to existing claims
- enrich: modifies existing claim frontmatter without new files
- extract: creates new claim files (default for extract branches)
Only refines 'extract' type — other branch types (research, entity, reweave, fix)
are already specific enough.
"""
if branch_commit_type != "extract":
return branch_commit_type
new_files = 0
modified_files = 0
has_challenge_edge = False
in_diff_header = False
current_is_new = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
in_diff_header = True
current_is_new = False
elif line.startswith("new file"):
current_is_new = True
elif line.startswith("+++ b/"):
path = line[6:]
if any(path.startswith(p) for p in ("domains/", "core/", "foundations/")):
if current_is_new:
new_files += 1
else:
modified_files += 1
in_diff_header = False
elif line.startswith("+") and not line.startswith("+++"):
if "challenged_by:" in line or "challenges:" in line:
has_challenge_edge = True
if has_challenge_edge and new_files == 0:
return "challenge"
if modified_files > 0 and new_files == 0:
return "enrich"
return "extract"
async def record_contributor_attribution(conn, pr_number: int, branch: str, git_fn):
"""Record contributor attribution after a successful merge.
Parses git trailers and claim frontmatter to identify contributors
and their roles. Upserts into contributors table. Refines commit_type
from diff content. Pipeline-only PRs (no knowledge files) are skipped.
Args:
git_fn: async callable matching _git signature (for git log parsing).
"""
from datetime import date as _date
today = _date.today().isoformat()
# Get the PR diff to parse claim frontmatter for attribution blocks
diff = await get_pr_diff(pr_number)
if not diff:
return
# Pipeline-only PRs (inbox, entities, agents) don't count toward CI
if not is_knowledge_pr(diff):
logger.info("PR #%d: pipeline-only commit — skipping CI attribution", pr_number)
return
# Refine commit_type from diff content (branch prefix may be too broad)
row = conn.execute(
"SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, "
"domain_verdict, domain_agent, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
refined_type = refine_commit_type(diff, branch_type)
if refined_type != branch_type:
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
logger.info("PR #%d: commit_type refined %s%s", pr_number, branch_type, refined_type)
# Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites.
pr_domain = row["domain"] if row else None
pr_channel = row["source_channel"] if row else None
pr_submitted_by = row["submitted_by"] if row else None
# Use the PR's merged_at timestamp so event time matches the actual merge.
# If a merge retries after a crash, this keeps forward-emitted and backfilled
# events on the same timeline. Falls back to datetime('now') in the writer.
pr_merged_at = row["merged_at"] if row and row["merged_at"] else None
# ── AUTHOR event (schema v24, double-write) ──
# Humans-are-always-author rule: the human in the loop gets author credit.
# Precedence: prs.submitted_by (set by extract.py from source proposed_by, or
# by discover for human PRs) → git author of first commit → branch-prefix agent.
# Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get
# author events from branch prefix; extract/ PRs carry submitted_by from the
# source's proposed_by field so the human who submitted gets credit via path 1.
author_candidate: str | None = None
if pr_submitted_by:
author_candidate = pr_submitted_by
else:
# External GitHub PRs: git author of the FIRST commit on the branch is
# the real submitter. `git log -1` would return the latest commit, which
# mis-credits multi-commit PRs where a reviewer rebased or force-pushed.
# Take the last line of the unreversed log (= oldest commit, since git
# log defaults to reverse-chronological). Ganymede review, Apr 24.
rc_author_log, author_log = await git_fn(
"log", f"origin/main..origin/{branch}", "--no-merges",
"--format=%an", timeout=5,
)
if rc_author_log == 0 and author_log.strip():
lines = [line for line in author_log.strip().split("\n") if line.strip()]
if lines:
candidate = lines[-1].strip().lower()
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions"}:
author_candidate = candidate
# Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc.
if not author_candidate and branch.startswith(AGENT_BRANCH_PREFIXES):
# Autonomous agent PR (theseus/research-*, leo/entity-*, etc.) —
# credit goes to the agent as author per Cory's directive.
author_candidate = branch.split("/", 1)[0]
if author_candidate:
insert_contribution_event(
conn, author_candidate, "author", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# ── EVALUATOR events (schema v24) ──
# Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second
# reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when
# verdict is 'request_changes' — failed review isn't contribution credit.
if row:
if row["leo_verdict"] == "approve":
insert_contribution_event(
conn, "leo", "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
if row["domain_verdict"] == "approve" and row["domain_agent"]:
dagent = row["domain_agent"].strip().lower()
if dagent and dagent != "leo": # don't double-credit leo
insert_contribution_event(
conn, dagent, "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Parse Pentagon-Agent trailer from branch commit messages
agents_found: set[str] = set()
# Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent
# challenger/synthesizer credit based on refined commit_type. Pipeline-owned
# branches (extract/*, reweave/*, etc.) don't — those are infra, not work.
is_agent_branch = branch.startswith(AGENT_BRANCH_PREFIXES)
_TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"research": "synthesizer",
"reweave": "synthesizer",
}
rc, log_output = await git_fn(
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
timeout=10,
)
if rc == 0:
for match in re.finditer(r"Pentagon-Agent:\s*(\S+)\s*<([^>]+)>", log_output):
agent_name = match.group(1).lower()
agent_uuid = match.group(2)
role = commit_type_to_role(refined_type)
upsert_contributor(
conn, agent_name, agent_uuid, role, today,
)
# Event-emit only for agent-owned branches where the trailer's agent
# actually did the substantive work (challenger/synthesizer).
event_role = _TRAILER_EVENT_ROLE.get(refined_type)
if is_agent_branch and event_role:
insert_contribution_event(
conn, agent_name, event_role, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(agent_name)
# Parse attribution from NEWLY ADDED knowledge files via the canonical attribution
# parser (lib/attribution.py). The previous diff-line regex parser dropped
# both the bare-key flat format (`sourcer: alexastrum`) and the nested
# `attribution:` block format because it only matched `- handle: "X"` lines.
# The Apr 24 incident traced missing leaderboard entries (alexastrum=0,
# thesensatore=0, cameron-s1=0) directly to this parser's blind spots.
#
# --diff-filter=A restricts to added files only (Ganymede review): enrich and
# challenge PRs modify existing claims, and re-crediting the existing sourcer on
# every modification would inflate counts. The synthesizer/challenger/reviewer
# roles for those PRs are credited via the Pentagon-Agent trailer path above.
rc_files, files_output = await git_fn(
"diff", "--name-only", "--diff-filter=A",
f"origin/main...origin/{branch}", timeout=10,
)
if rc_files == 0 and files_output:
from pathlib import Path
from . import config
from .attribution import parse_attribution_from_file
main_root = Path(config.MAIN_WORKTREE)
# Match is_knowledge_pr's gate exactly. Entities/convictions are excluded
# here because is_knowledge_pr skips entity-only PRs at line 123 — so a
# broader list here only matters for mixed PRs where the narrower list
# already matches via the claim file. Widening requires Cory sign-off
# since it would change leaderboard accounting (entity-only PRs → CI credit).
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None
for rel_path in files_output.strip().split("\n"):
rel_path = rel_path.strip()
if not rel_path.endswith(".md"):
continue
if not rel_path.startswith(knowledge_prefixes):
continue
full = main_root / rel_path
if not full.exists():
continue # file removed in this PR
attribution = parse_attribution_from_file(str(full))
for role, entries in attribution.items():
for entry in entries:
handle = entry.get("handle")
if handle:
upsert_contributor(
conn, handle, entry.get("agent_id"), role, today,
)
# Event-emit: only 'sourcer' frontmatter entries become
# originator events. 'extractor' frontmatter = infrastructure
# (the Sonnet extraction agent), no event. challenger/
# synthesizer frontmatter is extremely rare at extract time.
# Skip originator if same as author — avoids double-credit
# when someone submits their own content (self-authored).
if role == "sourcer":
origin_canonical = normalize_handle(handle, conn=conn)
if origin_canonical and origin_canonical != author_canonical:
insert_contribution_event(
conn, handle, "originator", pr_number,
claim_path=rel_path,
domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Fallback: if no Pentagon-Agent trailer found, try git commit authors
_BOT_AUTHORS = frozenset({
"m3taversal", "teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions",
})
if not agents_found:
rc_author, author_output = await git_fn(
"log", f"origin/main..origin/{branch}", "--no-merges",
"--format=%an", timeout=10,
)
if rc_author == 0 and author_output.strip():
for author_line in author_output.strip().split("\n"):
author_name = author_line.strip().lower()
if author_name and author_name not in _BOT_AUTHORS:
role = commit_type_to_role(refined_type)
upsert_contributor(conn, author_name, None, role, today)
# Event-model parity: emit challenger/synthesizer event when
# the fallback credits a human/agent for that kind of work.
# Without this, external-contributor challenge/enrich PRs
# accumulate legacy counts but disappear from event-sourced
# leaderboards when Phase B cuts over. (Ganymede review.)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, author_name, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(author_name)
if not agents_found:
fb_row = conn.execute(
"SELECT agent FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if fb_row and fb_row["agent"] and fb_row["agent"] != "external":
pr_agent = fb_row["agent"].lower()
role = commit_type_to_role(refined_type)
upsert_contributor(conn, pr_agent, None, role, today)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, pr_agent, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
def upsert_contributor(
conn, handle: str, agent_id: str | None, role: str, date_str: str,
):
"""Upsert a contributor record, incrementing the appropriate role count."""
role_col = f"{role}_count"
if role_col not in (
"sourcer_count", "extractor_count", "challenger_count",
"synthesizer_count", "reviewer_count",
):
logger.warning("Unknown contributor role: %s", role)
return
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
# Skip without writing so the v26 classifier cleanup isn't undone by every
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
#
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
# below — both look up by canonical-form-as-stored. Today's classifier produces
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
# normalize alias→canonical at writer entry points (extract.py, post_extract);
# at that point this gate auto-tightens because callers pass canonical handles.
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
return
existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone()
if existing:
conn.execute(
f"""UPDATE contributors SET
{role_col} = {role_col} + 1,
claims_merged = claims_merged + CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END,
last_contribution = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(role, date_str, handle),
)
else:
conn.execute(
f"""INSERT INTO contributors (handle, agent_id, first_contribution, last_contribution, {role_col}, claims_merged)
VALUES (?, ?, ?, ?, 1, CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END)""",
(handle, agent_id, date_str, date_str, role),
)
# Recalculate tier
recalculate_tier(conn, handle)
def recalculate_tier(conn, handle: str):
"""Recalculate contributor tier based on config rules."""
from datetime import date as _date, datetime as _dt
row = conn.execute(
"SELECT claims_merged, challenges_survived, first_contribution, tier FROM contributors WHERE handle = ?",
(handle,),
).fetchone()
if not row:
return
current_tier = row["tier"]
claims_merged = row["claims_merged"] or 0
challenges_survived = row["challenges_survived"] or 0
first_contribution = row["first_contribution"]
days_since_first = 0
if first_contribution:
try:
first_date = _dt.strptime(first_contribution, "%Y-%m-%d").date()
days_since_first = (_date.today() - first_date).days
except ValueError:
pass
# Check veteran first (higher tier)
vet_rules = config.CONTRIBUTOR_TIER_RULES["veteran"]
if (claims_merged >= vet_rules["claims_merged"]
and days_since_first >= vet_rules["min_days_since_first"]
and challenges_survived >= vet_rules["challenges_survived"]):
new_tier = "veteran"
elif claims_merged >= config.CONTRIBUTOR_TIER_RULES["contributor"]["claims_merged"]:
new_tier = "contributor"
else:
new_tier = "new"
if new_tier != current_tier:
conn.execute(
"UPDATE contributors SET tier = ?, updated_at = datetime('now') WHERE handle = ?",
(new_tier, handle),
)
logger.info("Contributor %s: tier %s%s", handle, current_tier, new_tier)
db.audit(
conn, "contributor", "tier_change",
json.dumps({"handle": handle, "from": current_tier, "to": new_tier}),
)