diff --git a/lib/attribution.py b/lib/attribution.py index 681e029..664c39c 100644 --- a/lib/attribution.py +++ b/lib/attribution.py @@ -21,6 +21,14 @@ logger = logging.getLogger("pipeline.attribution") VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"}) +# Agent-owned branch prefixes — PRs from these branches get Pentagon-Agent trailer +# credit for challenger/synthesizer roles. Pipeline-infra branches (extract/ reweave/ +# fix/ ingestion/) are deliberately excluded: they're automation, not contribution. +# Single source of truth; imported by contributor.py and backfill-events.py. +AGENT_BRANCH_PREFIXES = ( + "rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/", +) + # Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches # GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy" # or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene diff --git a/lib/contributor.py b/lib/contributor.py index f1249cc..a2117d6 100644 --- a/lib/contributor.py +++ b/lib/contributor.py @@ -14,7 +14,7 @@ import logging import re from . import config, db -from .attribution import classify_kind, normalize_handle +from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle from .forgejo import get_pr_diff logger = logging.getLogger("pipeline.contributor") @@ -186,7 +186,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ # Refine commit_type from diff content (branch prefix may be too broad) row = conn.execute( "SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, " - "domain_verdict, domain_agent FROM prs WHERE number = ?", + "domain_verdict, domain_agent, merged_at FROM prs WHERE number = ?", (pr_number,), ).fetchone() branch_type = row["commit_type"] if row and row["commit_type"] else "extract" @@ -199,6 +199,10 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ pr_domain = row["domain"] if row else None pr_channel = row["source_channel"] if row else None pr_submitted_by = row["submitted_by"] if row else None + # Use the PR's merged_at timestamp so event time matches the actual merge. + # If a merge retries after a crash, this keeps forward-emitted and backfilled + # events on the same timeline. Falls back to datetime('now') in the writer. + pr_merged_at = row["merged_at"] if row and row["merged_at"] else None # ── AUTHOR event (schema v24, double-write) ── # Humans-are-always-author rule: the human in the loop gets author credit. @@ -211,27 +215,33 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ if pr_submitted_by: author_candidate = pr_submitted_by else: - # External GitHub PRs: git author is the real submitter. - rc_author_head, author_head = await git_fn( + # External GitHub PRs: git author of the FIRST commit on the branch is + # the real submitter. `git log -1` would return the latest commit, which + # mis-credits multi-commit PRs where a reviewer rebased or force-pushed. + # Take the last line of the unreversed log (= oldest commit, since git + # log defaults to reverse-chronological). Ganymede review, Apr 24. + rc_author_log, author_log = await git_fn( "log", f"origin/main..origin/{branch}", "--no-merges", - "--format=%an", "-1", timeout=5, + "--format=%an", timeout=5, ) - if rc_author_head == 0 and author_head.strip(): - candidate = author_head.strip().lower() - if candidate and candidate not in {"teleo", "teleo-bot", "pipeline", - "github-actions[bot]", "forgejo-actions"}: - author_candidate = candidate + if rc_author_log == 0 and author_log.strip(): + lines = [line for line in author_log.strip().split("\n") if line.strip()] + if lines: + candidate = lines[-1].strip().lower() + if candidate and candidate not in {"teleo", "teleo-bot", "pipeline", + "github-actions[bot]", "forgejo-actions"}: + author_candidate = candidate # Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc. - if not author_candidate and "/" in branch: - prefix = branch.split("/", 1)[0] - # Exclude pipeline-infrastructure prefixes — those are not authors. - if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"): - author_candidate = prefix + if not author_candidate and branch.startswith(AGENT_BRANCH_PREFIXES): + # Autonomous agent PR (theseus/research-*, leo/entity-*, etc.) — + # credit goes to the agent as author per Cory's directive. + author_candidate = branch.split("/", 1)[0] if author_candidate: insert_contribution_event( conn, author_candidate, "author", pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, ) # ── EVALUATOR events (schema v24) ── @@ -243,6 +253,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ insert_contribution_event( conn, "leo", "evaluator", pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, ) if row["domain_verdict"] == "approve" and row["domain_agent"]: dagent = row["domain_agent"].strip().lower() @@ -250,6 +261,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ insert_contribution_event( conn, dagent, "evaluator", pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, ) # Parse Pentagon-Agent trailer from branch commit messages @@ -257,9 +269,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ # Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent # challenger/synthesizer credit based on refined commit_type. Pipeline-owned # branches (extract/*, reweave/*, etc.) don't — those are infra, not work. - _AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/", - "astra/", "oberon/") - is_agent_branch = branch.startswith(_AGENT_BRANCH_PREFIXES) + is_agent_branch = branch.startswith(AGENT_BRANCH_PREFIXES) _TRAILER_EVENT_ROLE = { "challenge": "challenger", "enrich": "synthesizer", @@ -285,6 +295,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ insert_contribution_event( conn, agent_name, event_role, pr_number, claim_path=None, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, ) agents_found.add(agent_name) @@ -346,6 +357,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ conn, handle, "originator", pr_number, claim_path=rel_path, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, ) # Fallback: if no Pentagon-Agent trailer found, try git commit authors @@ -364,13 +376,35 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_ if author_name and author_name not in _BOT_AUTHORS: role = commit_type_to_role(refined_type) upsert_contributor(conn, author_name, None, role, today) + # Event-model parity: emit challenger/synthesizer event when + # the fallback credits a human/agent for that kind of work. + # Without this, external-contributor challenge/enrich PRs + # accumulate legacy counts but disappear from event-sourced + # leaderboards when Phase B cuts over. (Ganymede review.) + event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type) + if event_role_fb: + insert_contribution_event( + conn, author_name, event_role_fb, pr_number, + claim_path=None, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, + ) agents_found.add(author_name) if not agents_found: - row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone() - if row and row["agent"] and row["agent"] != "external": + fb_row = conn.execute( + "SELECT agent FROM prs WHERE number = ?", (pr_number,) + ).fetchone() + if fb_row and fb_row["agent"] and fb_row["agent"] != "external": + pr_agent = fb_row["agent"].lower() role = commit_type_to_role(refined_type) - upsert_contributor(conn, row["agent"].lower(), None, role, today) + upsert_contributor(conn, pr_agent, None, role, today) + event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type) + if event_role_fb: + insert_contribution_event( + conn, pr_agent, event_role_fb, pr_number, + claim_path=None, domain=pr_domain, channel=pr_channel, + timestamp=pr_merged_at, + ) def upsert_contributor( diff --git a/lib/db.py b/lib/db.py index 1d321b9..f28f295 100644 --- a/lib/db.py +++ b/lib/db.py @@ -9,7 +9,7 @@ from . import config logger = logging.getLogger("pipeline.db") -SCHEMA_VERSION = 24 +SCHEMA_VERSION = 25 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -737,9 +737,13 @@ def migrate(conn: sqlite3.Connection): ], ) # Seed kind='agent' for known Pentagon agents so the events writer picks it up. + # Must stay in sync with lib/attribution.PENTAGON_AGENTS — drift causes + # contributors.kind to disagree with classify_kind() output for future + # inserts. (Ganymede review: "pipeline" was missing until Apr 24.) pentagon_agents = [ "rio", "leo", "theseus", "vida", "clay", "astra", "oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship", + "pipeline", ] for agent in pentagon_agents: conn.execute( @@ -749,6 +753,17 @@ def migrate(conn: sqlite3.Connection): conn.commit() logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column") + if current < 25: + # v24 seeded 13 Pentagon agents but missed "pipeline" — classify_kind() + # treats it as agent so contributors.kind drifted from event-insert output. + # Idempotent corrective UPDATE: fresh installs have no "pipeline" row + # (no-op), upgraded envs flip it if it exists. (Ganymede review Apr 24.) + conn.execute( + "UPDATE contributors SET kind = 'agent' WHERE handle = 'pipeline'" + ) + conn.commit() + logger.info("Migration v25: patched kind='agent' for pipeline handle") + if current < SCHEMA_VERSION: conn.execute( "INSERT OR REPLACE INTO schema_version (version) VALUES (?)", diff --git a/scripts/backfill-events.py b/scripts/backfill-events.py index 84646b9..0a14543 100644 --- a/scripts/backfill-events.py +++ b/scripts/backfill-events.py @@ -46,8 +46,11 @@ PENTAGON_AGENTS = frozenset({ "pipeline", }) -AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/", - "astra/", "oberon/") +# Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES. +# Duplicated here because this script runs standalone (no pipeline package import). +AGENT_BRANCH_PREFIXES = ( + "rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/", +) TRAILER_EVENT_ROLE = { "challenge": "challenger", @@ -332,8 +335,10 @@ def main(): if not args.dry_run: conn.commit() - print("\n=== Summary ===") - for role in ("author", "originator", "challenger", "synthesizer", "evaluator"): + # Originator is emitted in the claim-level pass below, not the PR-level pass. + # Previous summary listed it here with attempted=0 which confused operators. + print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===") + for role in ("author", "challenger", "synthesizer", "evaluator"): att = counts[(role, "attempt")] if args.dry_run: wi = counts[(role, "would_insert")] @@ -343,10 +348,6 @@ def main(): skip = counts[(role, "skipped_dup")] print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") - if not args.dry_run: - total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0] - print(f"\nTotal contribution_events rows: {total}") - # ── Per-claim originator pass ── # Separate pass: walk the current knowledge tree, parse sourcer frontmatter, # and attach each claim to the merging PR via a claim_path → pr_number map @@ -419,12 +420,18 @@ def main(): print(f" Claims processed: {claim_count}") print(f" Originator events emitted: {originator_count}") print(f" Breakdown: {dict(claim_counts)}") - - final_origin_attempted = counts[("originator", "attempt")] + att = counts[("originator", "attempt")] if args.dry_run: - print(f" (dry-run) originator would_insert={counts[('originator', 'would_insert')]}") + wi = counts[("originator", "would_insert")] + print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}") else: - print(f" originator inserted={counts[('originator', 'inserted')]} skipped_dup={counts[('originator', 'skipped_dup')]}") + ins = counts[("originator", "inserted")] + skip = counts[("originator", "skipped_dup")] + print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}") + + if not args.dry_run: + total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0] + print(f"\nTotal contribution_events rows: {total}") if __name__ == "__main__":