"""Contributor attribution — tracks who contributed what and calculates tiers. Extracted from merge.py (Phase 5 decomposition). Functions: - is_knowledge_pr: diff classification (knowledge vs pipeline-only) - refine_commit_type: extract → challenge/enrich refinement from diff content - record_contributor_attribution: parse trailers + frontmatter, upsert contributors - upsert_contributor: insert/update contributor record with role counts - insert_contribution_event: event-sourced credit log (schema v24) - recalculate_tier: tier promotion based on config rules """ import json import logging import re from . import config, db from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle from .forgejo import get_pr_diff logger = logging.getLogger("pipeline.contributor") # ─── Event schema (v24) ─────────────────────────────────────────────────── # Role → CI weight, per Cory's confirmed schema (Apr 24 conversation). # Humans-are-always-author rule: agents never accumulate author credit; # evaluator (0.05) is the only agent-facing role. Internal agents still earn # author/challenger/synthesizer on their own autonomous research PRs but # surface in the kind='agent' leaderboard, not the default person view. ROLE_WEIGHTS = { "author": 0.30, "challenger": 0.25, "synthesizer": 0.20, "originator": 0.15, "evaluator": 0.05, } def insert_contribution_event( conn, handle: str, role: str, pr_number: int, *, claim_path: str | None = None, domain: str | None = None, channel: str | None = None, timestamp: str | None = None, ) -> bool: """Emit a contribution_events row. Idempotent via UNIQUE constraint. Returns True if the event was inserted, False if the constraint blocked it (same handle/role/pr/claim_path combo already recorded — safe to replay). Canonicalizes handle via alias table. Classifies kind from handle. Falls back silently if contribution_events table doesn't exist yet (pre-v24). """ if role not in ROLE_WEIGHTS: logger.warning("insert_contribution_event: unknown role %r", role) return False weight = ROLE_WEIGHTS[role] canonical = normalize_handle(handle, conn=conn) if not canonical: return False kind = classify_kind(canonical) try: cur = conn.execute( """INSERT OR IGNORE INTO contribution_events (handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""", (canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp), ) return cur.rowcount > 0 except Exception: logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r", pr_number, canonical, role, exc_info=True) return False def is_knowledge_pr(diff: str) -> bool: """Check if a PR touches knowledge files (claims, decisions, core, foundations). Knowledge PRs get full CI attribution weight. Pipeline-only PRs (inbox, entities, agents, archive) get zero CI weight. Mixed PRs count as knowledge — if a PR adds a claim, it gets attribution even if it also moves source files. Knowledge takes priority. (Ganymede review) """ knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/") for line in diff.split("\n"): if line.startswith("+++ b/") or line.startswith("--- a/"): path = line.split("/", 1)[1] if "/" in line else "" if any(path.startswith(p) for p in knowledge_prefixes): return True return False COMMIT_TYPE_TO_ROLE = { "challenge": "challenger", "enrich": "synthesizer", "extract": "extractor", "research": "synthesizer", "entity": "extractor", "reweave": "synthesizer", "fix": "extractor", } def commit_type_to_role(commit_type: str) -> str: """Map a refined commit_type to a contributor role.""" return COMMIT_TYPE_TO_ROLE.get(commit_type, "extractor") def refine_commit_type(diff: str, branch_commit_type: str) -> str: """Refine commit_type from diff content when branch prefix is ambiguous. Branch prefix gives initial classification (extract, research, entity, etc.). For 'extract' branches, diff content can distinguish: - challenge: adds challenged_by edges to existing claims - enrich: modifies existing claim frontmatter without new files - extract: creates new claim files (default for extract branches) Only refines 'extract' type — other branch types (research, entity, reweave, fix) are already specific enough. """ if branch_commit_type != "extract": return branch_commit_type new_files = 0 modified_files = 0 has_challenge_edge = False in_diff_header = False current_is_new = False for line in diff.split("\n"): if line.startswith("diff --git"): in_diff_header = True current_is_new = False elif line.startswith("new file"): current_is_new = True elif line.startswith("+++ b/"): path = line[6:] if any(path.startswith(p) for p in ("domains/", "core/", "foundations/")): if current_is_new: new_files += 1 else: modified_files += 1 in_diff_header = False elif line.startswith("+") and not line.startswith("+++"): if "challenged_by:" in line or "challenges:" in line: has_challenge_edge = True if has_challenge_edge and new_files == 0: return "challenge" if modified_files > 0 and new_files == 0: return "enrich" return "extract" async def record_contributor_attribution(conn, pr_number: int, branch: str, git_fn): """Record contributor attribution after a successful merge. Parses git trailers and claim frontmatter to identify contributors and their roles. Upserts into contributors table. Refines commit_type from diff content. Pipeline-only PRs (no knowledge files) are skipped. Args: git_fn: async callable matching _git signature (for git log parsing). """ from datetime import date as _date today = _date.today().isoformat() # Get the PR diff to parse claim frontmatter for attribution blocks diff = await get_pr_diff(pr_number) if not diff: return # Pipeline-only PRs (inbox, entities, agents) don't count toward CI if not is_knowledge_pr(diff): logger.info("PR #%d: pipeline-only commit — skipping CI attribution", pr_number) return # Refine commit_type from diff content (branch prefix may be too broad) row = conn.execute( "SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, " "domain_verdict, domain_agent, merged_at FROM prs WHERE number = ?", (pr_number,), ).fetchone() branch_type = row["commit_type"] if row and row["commit_type"] else "extract" refined_type = refine_commit_type(diff, branch_type) if refined_type != branch_type: conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number)) logger.info("PR #%d: commit_type refined %s → %s", pr_number, branch_type, refined_type) # Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites. pr_domain = row["domain"] if row else None pr_channel = row["source_channel"] if row else None pr_submitted_by = row["submitted_by"] if row else None # Use the PR's merged_at timestamp so event time matches the actual merge. # If a merge retries after a crash, this keeps forward-emitted and backfilled # events on the same timeline. Falls back to datetime('now') in the writer. pr_merged_at = row["merged_at"] if row and row["merged_at"] else None # ── AUTHOR event (schema v24, double-write) ── # Humans-are-always-author rule: the human in the loop gets author credit. # Precedence: prs.submitted_by (set by extract.py from source proposed_by, or # by discover for human PRs) → git author of first commit → branch-prefix agent. # Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get # author events from branch prefix; extract/ PRs carry submitted_by from the # source's proposed_by field so the human who submitted gets credit via path 1. author_candidate: str | None = None if pr_submitted_by: author_candidate = pr_submitted_by else: # External GitHub PRs: git author of the FIRST commit on the branch is # the real submitter. `git log -1` would return the latest commit, which # mis-credits multi-commit PRs where a reviewer rebased or force-pushed. # Take the last line of the unreversed log (= oldest commit, since git # log defaults to reverse-chronological). Ganymede review, Apr 24. rc_author_log, author_log = await git_fn( "log", f"origin/main..origin/{branch}", "--no-merges", "--format=%an", timeout=5, ) if rc_author_log == 0 and author_log.strip(): lines = [line for line in author_log.strip().split("\n") if line.strip()] if lines: candidate = lines[-1].strip().lower() if candidate and candidate not in {"teleo", "teleo-bot", "pipeline", "github-actions[bot]", "forgejo-actions"}: author_candidate = candidate # Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc. if not author_candidate and branch.startswith(AGENT_BRANCH_PREFIXES): # Autonomous agent PR (theseus/research-*, leo/entity-*, etc.) — # credit goes to the agent as author per Cory's directive. author_candidate = branch.split("/", 1)[0] if author_candidate: insert_contribution_event( conn, author_candidate, "author", pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) # ── EVALUATOR events (schema v24) ── # Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second # reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when # verdict is 'request_changes' — failed review isn't contribution credit. if row: if row["leo_verdict"] == "approve": insert_contribution_event( conn, "leo", "evaluator", pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) if row["domain_verdict"] == "approve" and row["domain_agent"]: dagent = row["domain_agent"].strip().lower() if dagent and dagent != "leo": # don't double-credit leo insert_contribution_event( conn, dagent, "evaluator", pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) # Parse Pentagon-Agent trailer from branch commit messages agents_found: set[str] = set() # Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent # challenger/synthesizer credit based on refined commit_type. Pipeline-owned # branches (extract/*, reweave/*, etc.) don't — those are infra, not work. is_agent_branch = branch.startswith(AGENT_BRANCH_PREFIXES) _TRAILER_EVENT_ROLE = { "challenge": "challenger", "enrich": "synthesizer", "research": "synthesizer", "reweave": "synthesizer", } rc, log_output = await git_fn( "log", f"origin/main..origin/{branch}", "--format=%b%n%N", timeout=10, ) if rc == 0: for match in re.finditer(r"Pentagon-Agent:\s*(\S+)\s*<([^>]+)>", log_output): agent_name = match.group(1).lower() agent_uuid = match.group(2) role = commit_type_to_role(refined_type) upsert_contributor( conn, agent_name, agent_uuid, role, today, ) # Event-emit only for agent-owned branches where the trailer's agent # actually did the substantive work (challenger/synthesizer). event_role = _TRAILER_EVENT_ROLE.get(refined_type) if is_agent_branch and event_role: insert_contribution_event( conn, agent_name, event_role, pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) agents_found.add(agent_name) # Parse attribution from NEWLY ADDED knowledge files via the canonical attribution # parser (lib/attribution.py). The previous diff-line regex parser dropped # both the bare-key flat format (`sourcer: alexastrum`) and the nested # `attribution:` block format because it only matched `- handle: "X"` lines. # The Apr 24 incident traced missing leaderboard entries (alexastrum=0, # thesensatore=0, cameron-s1=0) directly to this parser's blind spots. # # --diff-filter=A restricts to added files only (Ganymede review): enrich and # challenge PRs modify existing claims, and re-crediting the existing sourcer on # every modification would inflate counts. The synthesizer/challenger/reviewer # roles for those PRs are credited via the Pentagon-Agent trailer path above. rc_files, files_output = await git_fn( "diff", "--name-only", "--diff-filter=A", f"origin/main...origin/{branch}", timeout=10, ) if rc_files == 0 and files_output: from pathlib import Path from . import config from .attribution import parse_attribution_from_file main_root = Path(config.MAIN_WORKTREE) # Match is_knowledge_pr's gate exactly. Entities/convictions are excluded # here because is_knowledge_pr skips entity-only PRs at line 123 — so a # broader list here only matters for mixed PRs where the narrower list # already matches via the claim file. Widening requires Cory sign-off # since it would change leaderboard accounting (entity-only PRs → CI credit). knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/") author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None for rel_path in files_output.strip().split("\n"): rel_path = rel_path.strip() if not rel_path.endswith(".md"): continue if not rel_path.startswith(knowledge_prefixes): continue full = main_root / rel_path if not full.exists(): continue # file removed in this PR attribution = parse_attribution_from_file(str(full)) for role, entries in attribution.items(): for entry in entries: handle = entry.get("handle") if handle: upsert_contributor( conn, handle, entry.get("agent_id"), role, today, ) # Event-emit: only 'sourcer' frontmatter entries become # originator events. 'extractor' frontmatter = infrastructure # (the Sonnet extraction agent), no event. challenger/ # synthesizer frontmatter is extremely rare at extract time. # Skip originator if same as author — avoids double-credit # when someone submits their own content (self-authored). if role == "sourcer": origin_canonical = normalize_handle(handle, conn=conn) if origin_canonical and origin_canonical != author_canonical: insert_contribution_event( conn, handle, "originator", pr_number, claim_path=rel_path, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) # Fallback: if no Pentagon-Agent trailer found, try git commit authors _BOT_AUTHORS = frozenset({ "m3taversal", "teleo", "teleo-bot", "pipeline", "github-actions[bot]", "forgejo-actions", }) if not agents_found: rc_author, author_output = await git_fn( "log", f"origin/main..origin/{branch}", "--no-merges", "--format=%an", timeout=10, ) if rc_author == 0 and author_output.strip(): for author_line in author_output.strip().split("\n"): author_name = author_line.strip().lower() if author_name and author_name not in _BOT_AUTHORS: role = commit_type_to_role(refined_type) upsert_contributor(conn, author_name, None, role, today) # Event-model parity: emit challenger/synthesizer event when # the fallback credits a human/agent for that kind of work. # Without this, external-contributor challenge/enrich PRs # accumulate legacy counts but disappear from event-sourced # leaderboards when Phase B cuts over. (Ganymede review.) event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type) if event_role_fb: insert_contribution_event( conn, author_name, event_role_fb, pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) agents_found.add(author_name) if not agents_found: fb_row = conn.execute( "SELECT agent FROM prs WHERE number = ?", (pr_number,) ).fetchone() if fb_row and fb_row["agent"] and fb_row["agent"] != "external": pr_agent = fb_row["agent"].lower() role = commit_type_to_role(refined_type) upsert_contributor(conn, pr_agent, None, role, today) event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type) if event_role_fb: insert_contribution_event( conn, pr_agent, event_role_fb, pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, timestamp=pr_merged_at, ) def upsert_contributor( conn, handle: str, agent_id: str | None, role: str, date_str: str, ): """Upsert a contributor record, incrementing the appropriate role count.""" role_col = f"{role}_count" if role_col not in ( "sourcer_count", "extractor_count", "challenger_count", "synthesizer_count", "reviewer_count", ): logger.warning("Unknown contributor role: %s", role) return existing = conn.execute( "SELECT handle FROM contributors WHERE handle = ?", (handle,) ).fetchone() if existing: conn.execute( f"""UPDATE contributors SET {role_col} = {role_col} + 1, claims_merged = claims_merged + CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END, last_contribution = ?, updated_at = datetime('now') WHERE handle = ?""", (role, date_str, handle), ) else: conn.execute( f"""INSERT INTO contributors (handle, agent_id, first_contribution, last_contribution, {role_col}, claims_merged) VALUES (?, ?, ?, ?, 1, CASE WHEN ? IN ('extractor', 'sourcer') THEN 1 ELSE 0 END)""", (handle, agent_id, date_str, date_str, role), ) # Recalculate tier recalculate_tier(conn, handle) def recalculate_tier(conn, handle: str): """Recalculate contributor tier based on config rules.""" from datetime import date as _date, datetime as _dt row = conn.execute( "SELECT claims_merged, challenges_survived, first_contribution, tier FROM contributors WHERE handle = ?", (handle,), ).fetchone() if not row: return current_tier = row["tier"] claims_merged = row["claims_merged"] or 0 challenges_survived = row["challenges_survived"] or 0 first_contribution = row["first_contribution"] days_since_first = 0 if first_contribution: try: first_date = _dt.strptime(first_contribution, "%Y-%m-%d").date() days_since_first = (_date.today() - first_date).days except ValueError: pass # Check veteran first (higher tier) vet_rules = config.CONTRIBUTOR_TIER_RULES["veteran"] if (claims_merged >= vet_rules["claims_merged"] and days_since_first >= vet_rules["min_days_since_first"] and challenges_survived >= vet_rules["challenges_survived"]): new_tier = "veteran" elif claims_merged >= config.CONTRIBUTOR_TIER_RULES["contributor"]["claims_merged"]: new_tier = "contributor" else: new_tier = "new" if new_tier != current_tier: conn.execute( "UPDATE contributors SET tier = ?, updated_at = datetime('now') WHERE handle = ?", (new_tier, handle), ) logger.info("Contributor %s: tier %s → %s", handle, current_tier, new_tier) db.audit( conn, "contributor", "tier_change", json.dumps({"handle": handle, "from": current_tier, "to": new_tier}), )