From c9515c770a73a4b294123865c6301c4c26b2c71d Mon Sep 17 00:00:00 2001 From: Teleo Agents Date: Wed, 13 May 2026 03:49:10 +0000 Subject: [PATCH] fix(attribution): classify submitted_by by branch prefix at PR discovery reweave.py and ingestion run as the operator Forgejo token, so the prior opener-based classifier set submitted_by=m3taversal for every system maintenance PR. backfill_submitted_by.py never overrides non-NULL rows, so this misattribution accumulated: ~2,748 reweave/ingestion PRs and ~3,706 / research/entity PRs were credited to the operator on the leaderboard and contribution_events table. Two parts: 1. lib/merge.py: at PR discovery, classify by branch prefix first. reweave/, ingestion/ -> submitted_by = 'pipeline' / (per _AGENT_NAMES) -> submitted_by = '' otherwise human -> submitted_by = author.lower() otherwise pipeline -> submitted_by = None (extract.py sets from proposed_by) Origin flag updated so domain detection and priority still fire for branch-classified pipeline PRs. Human PRs lowercased to maintain the canonical-handle contract enforced in PR #9. 2. scripts/reattribute-by-branch-prefix.py: historical cleanup. Per affected PR (atomic): - UPDATE prs.submitted_by -> target - UPDATE sources.submitted_by where source_path matches - UPDATE contribution_events handle ('m3taversal',role='author') -> target, kind='agent'. Collision (target already has author event for PR) deletes the m3ta row; target wins. Scope is deliberately conservative: extract/ branches stay attributed to m3taversal because proposed_by-missing legitimately defaults to the operator (telegram drops). Only reweave/, ingestion/, and /. Dry-run shows 6,454 PRs + 284 events to move. Pre-flight collision query returns 0; pre-flight kind check confirms m3ta has only role=author events on this set (no challenger/synthesizer/evaluator). Idempotent. Dry-run by default. Run with --apply after deploy + DB snapshot. --- lib/merge.py | 42 +++++- scripts/reattribute-by-branch-prefix.py | 168 ++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 7 deletions(-) create mode 100755 scripts/reattribute-by-branch-prefix.py diff --git a/lib/merge.py b/lib/merge.py index b9ff910..e05b8c9 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -112,16 +112,44 @@ async def discover_external_prs(conn) -> int: # Detect origin: pipeline agents have per-agent Forgejo users pipeline_users = {"teleo", "rio", "clay", "theseus", "vida", "astra", "leo"} author = pr.get("user", {}).get("login", "") - is_pipeline = author.lower() in pipeline_users + branch_ref = pr["head"]["ref"] + + # Pre-classify by branch prefix — pipeline-shape branches are + # pipeline regardless of Forgejo opener. reweave.py and + # ingestion run as the operator's token, so opener-based + # classification mis-credited system maintenance to the + # operator (~2.7k PRs on m3ta's contributor row before the + # 2026-05-12 reattribute fix). Branch prefix is the canonical + # signal: reweave/ingestion -> 'pipeline', / -> agent. + branch_target = None + if branch_ref.startswith(("reweave/", "ingestion/")): + branch_target = "pipeline" + elif branch_ref.startswith(_AGENT_NAMES): + # _AGENT_NAMES is a tuple of bare names; agent branches + # are "/..." so use the tuple as a startswith prefix + # set after appending '/'. + for name in _AGENT_NAMES: + if branch_ref.startswith(name + "/"): + branch_target = name + break + + is_pipeline = author.lower() in pipeline_users or branch_target is not None origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None - domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) - agent, commit_type = classify_branch(pr["head"]["ref"]) - source_channel = classify_source_channel(pr["head"]["ref"]) + domain = None if not is_pipeline else detect_domain_from_branch(branch_ref) + agent, commit_type = classify_branch(branch_ref) + source_channel = classify_source_channel(branch_ref) - # For human PRs, submitted_by is the Forgejo author. - # For pipeline PRs, submitted_by is set later by extract.py (from source proposed_by). - submitted_by = author if origin == "human" else None + # submitted_by precedence (canonical handles only): + # 1. branch prefix (pipeline/agent) — set here at discovery + # 2. Forgejo opener for human PRs — set here, lowercased + # 3. extract.py later (from source proposed_by) — left None + if branch_target is not None: + submitted_by = branch_target + elif origin == "human": + submitted_by = author.lower().lstrip("@") if author else None + else: + submitted_by = None conn.execute( """INSERT OR IGNORE INTO prs diff --git a/scripts/reattribute-by-branch-prefix.py b/scripts/reattribute-by-branch-prefix.py new file mode 100755 index 0000000..7a76562 --- /dev/null +++ b/scripts/reattribute-by-branch-prefix.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +"""Reattribute PRs and their author events from m3taversal to the true author. + +Scope (intentionally conservative): + - branch reweave/* -> pipeline (system maintenance, no human author) + - branch ingestion/* -> pipeline (pipeline-internal source intake) + - branch /* -> (autonomous agent work) + for agent in {leo, vida, rio, astra, clay, theseus}. + +NOT in scope: + - branch extract/* -- proposed_by may legitimately be absent + (telegram source drops default to operator). + +Per affected PR (atomic): + 1. UPDATE prs.submitted_by -> target + 2. UPDATE sources.submitted_by where path = pr.source_path + 3. UPDATE contribution_events.handle for every m3ta author event on this PR + (kind set to 'agent', since pipeline + the six agents are all kind='agent' + per attribution.PENTAGON_AGENTS). + +Idempotent. Dry-run by default; --apply commits. +Run AFTER scripts/normalize-submitted-by.py. +""" + +import argparse +import os +import sqlite3 +import sys +from collections import Counter + +DB_PATH = os.environ.get("DB_PATH", "/opt/teleo-eval/pipeline/pipeline.db") + +AGENT_PREFIXES = ("leo/", "vida/", "rio/", "astra/", "clay/", "theseus/") +PIPELINE_PREFIXES = ("reweave/", "ingestion/") + + +def target_for(branch): + if not branch: + return None + if branch.startswith(PIPELINE_PREFIXES): + return "pipeline" + for prefix in AGENT_PREFIXES: + if branch.startswith(prefix): + return prefix.rstrip("/") + return None + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)") + ap.add_argument("--db", default=DB_PATH) + args = ap.parse_args() + + conn = sqlite3.connect(args.db, timeout=30) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA busy_timeout = 30000") + + mode = "APPLY" if args.apply else "DRY-RUN" + print("DB: {}\nMode: {}\n".format(args.db, mode)) + + rows = conn.execute(""" + SELECT number, branch, source_path + FROM prs + WHERE submitted_by = 'm3taversal' + AND branch IS NOT NULL + """).fetchall() + + pr_targets = [] + pr_counts = Counter() + for r in rows: + tgt = target_for(r["branch"]) + if tgt is None: + continue + pr_targets.append((r["number"], r["branch"], r["source_path"], tgt)) + pr_counts[tgt] += 1 + + print("prs to reattribute: {}".format(len(pr_targets))) + for tgt, n in pr_counts.most_common(): + print(" {:6d} -> {!r}".format(n, tgt)) + + src_paths = [t[2] for t in pr_targets if t[2]] + src_count = 0 + if src_paths: + placeholders = ",".join("?" * len(src_paths)) + src_count = conn.execute( + "SELECT COUNT(*) FROM sources " + "WHERE submitted_by = 'm3taversal' AND path IN ({})".format(placeholders), + src_paths, + ).fetchone()[0] + print("\nsources rows that will be re-pointed: {}".format(src_count)) + + pr_to_target = {p[0]: p[3] for p in pr_targets} + events = [] + if pr_to_target: + pr_placeholders = ",".join("?" * len(pr_to_target)) + events = conn.execute( + "SELECT id, pr_number FROM contribution_events " + "WHERE handle = 'm3taversal' AND role = 'author' " + "AND pr_number IN ({})".format(pr_placeholders), + list(pr_to_target.keys()), + ).fetchall() + print("contribution_events author rows to move: {}".format(len(events))) + ev_counts = Counter(pr_to_target[e["pr_number"]] for e in events) + for tgt, n in ev_counts.most_common(): + print(" {:6d} events -> {!r}".format(n, tgt)) + + if not args.apply: + print("\nDry-run complete. Run with --apply to commit " + "({} PRs + {} sources + {} events).".format( + len(pr_targets), src_count, len(events))) + return 0 + + pr_updated = 0 + src_updated = 0 + ev_updated = 0 + ev_collisions = 0 + + try: + for pr_num, branch, source_path, target in pr_targets: + cur = conn.execute( + "UPDATE prs SET submitted_by = ? " + "WHERE number = ? AND submitted_by = 'm3taversal'", + (target, pr_num), + ) + pr_updated += cur.rowcount + + if source_path: + cur = conn.execute( + "UPDATE sources SET submitted_by = ? " + "WHERE path = ? AND submitted_by = 'm3taversal'", + (target, source_path), + ) + src_updated += cur.rowcount + + for ev in conn.execute( + "SELECT id FROM contribution_events " + "WHERE handle = 'm3taversal' AND role = 'author' AND pr_number = ?", + (pr_num,), + ).fetchall(): + try: + conn.execute( + "UPDATE contribution_events SET handle = ?, kind = 'agent' " + "WHERE id = ?", + (target, ev["id"]), + ) + ev_updated += 1 + except sqlite3.IntegrityError: + conn.execute( + "DELETE FROM contribution_events WHERE id = ?", + (ev["id"],), + ) + ev_collisions += 1 + + conn.commit() + except Exception: + conn.rollback() + raise + + print("\nCommitted.") + print(" prs.submitted_by moves: {}".format(pr_updated)) + print(" sources.submitted_by moves: {}".format(src_updated)) + print(" contribution_events moves: {}".format(ev_updated)) + print(" ce collisions deleted: {}".format(ev_collisions)) + return 0 + + +if __name__ == "__main__": + sys.exit(main())