teleo-infrastructure/scripts/backfill-research-session-attribution.py
Teleo Agents 74bf0461e8
Some checks are pending
CI / lint-and-test (pull_request) Waiting to run
fix(attribution): canonicalize submitted_by at write time + historical normalizer
Companion / write-side fix to fix/activity-feed-canonical-handle.

The activity-feed canonicalization was a read-side guard. The bug at the
source is that extract.py and two backfill scripts write decorated
strings (Vida (self-directed), pipeline (reweave), @m3taversal) into
prs.submitted_by and sources.submitted_by. Downstream readers
(lib.contributor.insert_contribution_event, scripts/scoring_digest,
diagnostics/activity_feed_api) all strip the decorator on read — but
anything that reads the column verbatim (like /api/activity-feed before
the read-side fix) 404s on /contributors/{decorated-handle}.

Stop writing the decorator. The self-directed signal is already carried
by intake_tier == research-task plus the prs.agent column; the suffix
is redundant string noise that costs us correctness at every consumer
that forgets to strip.

Changes:

- lib/extract.py:690 — write canonical handle via attribution.normalize_handle.
  Direct elif for intake_tier == research-task now stores just agent_name.
  @m3taversal -> m3taversal.

- diagnostics/backfill_submitted_by.py — same fix in two branches plus
  the reweave branch (pipeline (reweave) -> pipeline).

- scripts/backfill-research-session-attribution.py — UPDATE prs sets
  agent handle alone, no suffix. Docstring + log line updated.

- scripts/normalize-submitted-by.py (new) — one-time backfill that
  canonicalizes existing prs.submitted_by and sources.submitted_by rows.
  Strips trailing parenthetical decorators, lowercases, drops @. Defaults
  to dry-run; --apply to commit. Skips rows that would normalize to
  invalid handles (no garbage falls through silently).

Dry-run against live pipeline.db:
  prs:     3008 rows need normalization (clean mappings, 0 invalid)
  sources: 730 rows need normalization (clean mappings, 0 invalid)
  Total:   3738 rows. All map to existing handle column values.

After this lands + auto-deploys, the operator should run
  python3 scripts/normalize-submitted-by.py --apply
once to clean historical rows. The read-side canonicalization in
diagnostics/activity_feed_api.py (fix/activity-feed-canonical-handle)
becomes redundant defense-in-depth instead of load-bearing.

No KB writes.
2026-05-13 02:56:50 +00:00

282 lines
11 KiB
Python

#!/usr/bin/env python3
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
Problem: research-session.sh used to write source frontmatter without
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
`contribution_events` as a `handle='m3taversal', role='author'` row per
research-derived claim. Result: agent research credited to the human.
Forward fix is a frontmatter-template patch to research-session.sh.
This script corrects historical records.
Identification:
Research-session source archives are committed to teleo-codex with a
message matching `^<agent>: research session YYYY-MM-DD —`. The diff
for that commit lists `inbox/queue/*.md` files the agent created. Any
PR whose `source_path` matches one of those filenames is research-derived.
Touch list (per matched PR):
1. UPDATE prs SET submitted_by = '<agent>' (canonical handle, lowercase, no
trailing "(self-directed)" suffix — see lib/extract.py and
diagnostics/activity_feed_api.py for why decorators leak into 404s)
2. DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author' AND pr_number=?
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
Defaults to --dry-run. Pass --apply to commit changes.
Usage:
python3 backfill-research-session-attribution.py --dry-run --days 30
python3 backfill-research-session-attribution.py --apply --days 30
"""
import argparse
import logging
import os
import re
import sqlite3
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("backfill-research-attr")
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
AUTHOR_WEIGHT = 0.30
def git(repo: Path, *args: str) -> str:
"""Run a git command in repo, return stdout. Raises on non-zero."""
result = subprocess.run(
["git", "-C", str(repo), *args],
capture_output=True, text=True, check=True,
)
return result.stdout
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
"""Return {source_filename_basename: agent_handle} for last N days.
Walks teleo-codex `git log --since`, filters to research-session commits,
parses agent from message header, lists inbox/queue/*.md files added in
that commit's diff. Maps the basename (which becomes source_path on extract)
to the agent who created it.
"""
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
file_to_agent: dict[str, str] = {}
commits_seen = 0
commits_matched = 0
for line in log.splitlines():
if not line or "|" not in line:
continue
commits_seen += 1
sha, _, subject = line.partition("|")
m = COMMIT_HEADER_RE.match(subject)
if not m:
continue
agent = m.group(1)
if agent not in KNOWN_AGENTS:
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
continue
commits_matched += 1
# List files added in this commit (inbox/queue/*.md only)
try:
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
"--diff-filter=A", sha)
except subprocess.CalledProcessError:
logger.warning("diff-tree failed for %s", sha[:8])
continue
for f in added.splitlines():
if f.startswith("inbox/queue/") and f.endswith(".md"):
basename = Path(f).name
if basename in file_to_agent and file_to_agent[basename] != agent:
logger.warning(
"filename collision: %s — was %s, now %s (keeping first)",
basename, file_to_agent[basename], agent,
)
continue
file_to_agent.setdefault(basename, agent)
logger.info(
"scanned %d commits, %d research-session matches, %d unique source files",
commits_seen, commits_matched, len(file_to_agent),
)
return file_to_agent
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
Only includes PRs:
- with source_path basename in our research-session map
- currently attributed to '@m3taversal'
- merged within the last N days (cap on temporal scope)
"""
rows = conn.execute(
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
FROM prs
WHERE submitted_by = '@m3taversal'
AND source_path IS NOT NULL
AND status = 'merged'
AND merged_at > datetime('now', ?)""",
(f"-{days} days",),
).fetchall()
matches = []
for row in rows:
basename = Path(row["source_path"]).name
agent = file_to_agent.get(basename)
if agent:
matches.append({
"pr": row["number"],
"current_submitted_by": row["submitted_by"],
"source_path": row["source_path"],
"basename": basename,
"agent": agent,
"domain": row["domain"],
"channel": row["source_channel"],
"merged_at": row["merged_at"],
})
return matches
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
return conn.execute(
"""SELECT COUNT(*) FROM contribution_events
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
(handle, role, pr),
).fetchone()[0]
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
"""Apply the backfill. Returns counters."""
counters = defaultdict(int)
if not dry_run:
conn.execute("BEGIN")
try:
for m in matches:
pr = m["pr"]
agent = m["agent"]
# Pre-checks for accurate dry-run reporting
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
if dry_run:
logger.info(
"would update pr=%d submitted_by '%s''%s' "
"[m3ta_event=%s, agent_event=%s]",
pr, m["current_submitted_by"], agent,
old_event_exists, new_event_exists,
)
counters["prs"] += 1
if old_event_exists:
counters["events_to_delete"] += 1
if not new_event_exists:
counters["events_to_insert"] += 1
continue
# 1. UPDATE prs.submitted_by — canonical handle only.
conn.execute(
"UPDATE prs SET submitted_by = ? WHERE number = ?",
(agent, pr),
)
counters["prs"] += 1
# 2. INSERT new agent author event (idempotent via UNIQUE index)
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
)
if cur.rowcount > 0:
counters["events_inserted"] += 1
# 3. DELETE old m3taversal author event
cur = conn.execute(
"""DELETE FROM contribution_events
WHERE handle = 'm3taversal' AND role = 'author'
AND pr_number = ? AND claim_path IS NULL""",
(pr,),
)
if cur.rowcount > 0:
counters["events_deleted"] += 1
if not dry_run:
conn.execute("COMMIT")
except Exception:
if not dry_run:
conn.execute("ROLLBACK")
raise
return dict(counters)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
parser.add_argument("--limit", type=int, default=0,
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
args = parser.parse_args()
dry_run = not args.apply
logger.info("repo=%s db=%s days=%d mode=%s",
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
if not args.repo.exists():
logger.error("repo not found: %s", args.repo)
sys.exit(1)
if not args.db.exists():
logger.error("db not found: %s", args.db)
sys.exit(1)
file_to_agent = discover_research_session_archives(args.repo, args.days)
if not file_to_agent:
logger.warning("no research-session source files found in last %d days", args.days)
sys.exit(0)
# Per-agent breakdown
by_agent = defaultdict(int)
for agent in file_to_agent.values():
by_agent[agent] += 1
for agent, count in sorted(by_agent.items()):
logger.info(" research-session sources by %s: %d", agent, count)
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
matches = find_misattributed_prs(conn, file_to_agent, args.days)
logger.info("misattributed PRs found: %d", len(matches))
if args.limit and len(matches) > args.limit:
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
matches = matches[:args.limit]
if not matches:
logger.info("nothing to do")
return
# Per-agent breakdown of misattribution
miss_by_agent = defaultdict(int)
for m in matches:
miss_by_agent[m["agent"]] += 1
logger.info("misattributed PR breakdown:")
for agent, count in sorted(miss_by_agent.items()):
logger.info(" %s: %d", agent, count)
counters = apply_backfill(conn, matches, dry_run)
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
if __name__ == "__main__":
main()