teleo-infrastructure/scripts/backfill-events.py
m3taversal 10d5c275da
Some checks are pending
CI / lint-and-test (push) Waiting to run
fix(backfill): normalize commit_date via datetime() in time-proximity query
SQLite datetime comparison fails lexicographically across ISO-T and
space-separator formats: '2026-03-27 18:00:14' < '2026-03-27T17:43:04+00:00'
because space (0x20) < T (0x54). PRs merged same-day but earlier than the
commit hour were silently excluded from the time-proximity cascade.

Shaga's 3 stigmergic-coordination claims resolved to PR #2032 (later, wrong)
instead of #2025 (earlier, correct). Fixed by wrapping both sides in
datetime(), which normalizes to space-separator before comparison.

Verified: all 3 Shaga claims now resolve to #2025 via git_time_proximity.
No change to totals (126 originator events, 5 proximity hits) — the fix
corrects WHICH PR each proximity-matched claim resolves to, not whether.

Caught by Ganymede review of 1d6b515.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:16:03 +01:00

618 lines
25 KiB
Python

#!/usr/bin/env python3
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
For each merged PR:
- Derive author from prs.submitted_by → git author → branch prefix
- Emit author event (role=author, weight=0.30, claim_path=NULL)
- For each claim file under a knowledge prefix, parse frontmatter and emit
originator events for sourcer entries that differ from the author
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
(when domain_verdict='approve' and not Leo)
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
Usage:
python3 scripts/backfill-events.py --dry-run # Count events without writing
python3 scripts/backfill-events.py # Apply
Runs read-only against the git worktree; only writes to pipeline.db.
"""
import argparse
import os
import re
import sqlite3
import subprocess
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
ROLE_WEIGHTS = {
"author": 0.30,
"challenger": 0.25,
"synthesizer": 0.20,
"originator": 0.15,
"evaluator": 0.05,
}
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES.
# Duplicated here because this script runs standalone (no pipeline package import).
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"research": "synthesizer",
"reweave": "synthesizer",
}
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
BOT_AUTHORS = frozenset({
"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions",
})
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
if row:
return row[0]
return h
def classify_kind(handle: str) -> str:
h = handle.strip().lower().lstrip("@")
return "agent" if h in PENTAGON_AGENTS else "person"
def parse_frontmatter(text: str):
"""Minimal YAML frontmatter parser using PyYAML when available."""
if not text.startswith("---"):
return None
end = text.find("---", 3)
if end == -1:
return None
raw = text[3:end]
try:
import yaml
fm = yaml.safe_load(raw)
return fm if isinstance(fm, dict) else None
except ImportError:
return None
except Exception:
return None
def extract_sourcers_from_file(path: Path) -> list[str]:
"""Return the sourcer handles from a claim file's frontmatter.
Matches three formats:
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
2. Bare-key flat: `sourcer: alexastrum`
3. Prefix-keyed: `attribution_sourcer: alexastrum`
"""
try:
content = path.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
return []
fm = parse_frontmatter(content)
if not fm:
return []
handles: list[str] = []
attr = fm.get("attribution")
if isinstance(attr, dict):
entries = attr.get("sourcer", [])
if isinstance(entries, list):
for e in entries:
if isinstance(e, dict) and "handle" in e:
handles.append(e["handle"])
elif isinstance(e, str):
handles.append(e)
elif isinstance(entries, str):
handles.append(entries)
return handles
flat = fm.get("attribution_sourcer")
if flat:
if isinstance(flat, str):
handles.append(flat)
elif isinstance(flat, list):
handles.extend(v for v in flat if isinstance(v, str))
if handles:
return handles
bare = fm.get("sourcer")
if bare:
if isinstance(bare, str):
handles.append(bare)
elif isinstance(bare, list):
handles.extend(v for v in bare if isinstance(v, str))
return handles
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
def valid_handle(h: str) -> bool:
if not h:
return False
lower = h.strip().lower().lstrip("@")
if lower.endswith("-") or lower.endswith("_"):
return False
return bool(_HANDLE_RE.match(lower))
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
"""Run a git command, return stdout. Returns empty string on failure."""
try:
result = subprocess.run(
["git", *args],
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
)
return result.stdout
except (subprocess.TimeoutExpired, OSError):
return ""
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
"""Best-effort: find git author of first non-merge commit on the branch.
PR branches are usually deleted after merge. We fall back to scanning main
commits around merged_at for commits matching the branch slug.
"""
# Post-merge branches are cleaned up. For the backfill, we accept that this
# path rarely yields results and rely on submitted_by + branch prefix.
return ""
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
if pr.get("submitted_by"):
cand = pr["submitted_by"].strip().lower().lstrip("@")
if cand and cand not in BOT_AUTHORS:
return cand
branch = pr.get("branch") or ""
if "/" in branch:
prefix = branch.split("/", 1)[0].lower()
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
return prefix
return None
def find_pr_for_claim(
conn: sqlite3.Connection,
repo: Path,
md: Path,
) -> tuple[int | None, str]:
"""Recover the Forgejo PR number that introduced a claim file.
Returns (pr_number, strategy) — strategy is one of:
'sourced_from' — frontmatter sourced_from matched prs.source_path
'git_subject' — git log first-add commit message matched a branch pattern
'title_desc' — filename stem matched a title in prs.description
'github_pr' — recovery commit mentioned GitHub PR # → prs.github_pr
'none' — no strategy found a match
Order is chosen by reliability:
1. sourced_from (explicit provenance, most reliable when present)
2. git_subject (covers Leo research, Cameron challenges, Theseus contrib)
3. title_desc (current fallback — brittle when description is NULL)
4. github_pr (recovery commits referencing erased GitHub PRs)
"""
rel = str(md.relative_to(repo))
# Strategy 1: sourced_from frontmatter → prs.source_path
try:
content = md.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
content = ""
fm = parse_frontmatter(content) if content else None
if fm:
sourced = fm.get("sourced_from")
candidate_paths: list[str] = []
if isinstance(sourced, str) and sourced:
candidate_paths.append(sourced)
elif isinstance(sourced, list):
candidate_paths.extend(s for s in sourced if isinstance(s, str))
for sp in candidate_paths:
stem = Path(sp).stem
if not stem:
continue
row = conn.execute(
"""SELECT number FROM prs
WHERE source_path LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"%{stem}.md",),
).fetchone()
if row:
return row["number"], "sourced_from"
# Strategy 2: git log first-add commit → subject pattern → prs.branch
# Default log order is reverse-chronological; take the last line (oldest)
# to get the original addition, not later rewrites.
log_out = git(
"log", "--diff-filter=A", "--follow",
"--format=%H|||%s|||%b", "--", rel,
)
if log_out.strip():
# Split on the delimiter we chose. Each commit produces 3 fields but
# %b can contain blank lines — group by lines that look like a SHA.
blocks: list[tuple[str, str, str]] = []
current: list[str] = []
for line in log_out.splitlines():
if re.match(r"^[a-f0-9]{40}\|\|\|", line):
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
current = [line]
else:
current.append(line)
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
if blocks:
# Oldest addition — git log defaults to reverse-chronological
_oldest_sha, subject, body = blocks[-1]
# Pattern: "<agent>: extract claims from <slug>"
m = re.match(r"^(\w+):\s*extract\s+claims\s+from\s+(\S+)", subject)
if m:
slug = m.group(2).rstrip(".md").rstrip(".")
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"extract/{slug}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: research session <date>"
m = re.match(r"^(\w+):\s*research\s+session\s+(\d{4}-\d{2}-\d{2})", subject)
if m:
agent = m.group(1).lower()
date = m.group(2)
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/research-{date}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: challenge" / contrib challenges / entity batches
m = re.match(r"^(\w+):\s*(?:challenge|contrib|entity|synthesize)", subject)
if m:
agent = m.group(1).lower()
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Recovery commits referencing erased GitHub PRs (Alex/Cameron).
# Subject: "Recover <who> contribution from GitHub PR #NN (...)".
# Match only when a corresponding prs row exists with github_pr=NN —
# otherwise the claims were direct-to-main without a Forgejo PR
# record, which requires a synthetic PR row (follow-up, not in
# this script's scope).
gh_match = re.search(r"GitHub\s+PR\s+#(\d+)", subject + "\n" + body)
if gh_match:
gh_pr = int(gh_match.group(1))
row = conn.execute(
"SELECT number FROM prs WHERE github_pr = ? AND status='merged' LIMIT 1",
(gh_pr,),
).fetchone()
if row:
return row["number"], "github_pr"
# Pattern: bare "Extract N claims from <source-fragment>" (no
# agent prefix). Used in early research PRs like Shaga's claims
# at PR #2025. Fall back to time-proximity: find the earliest
# agent-branch PR merged within 24h AFTER this commit's date.
m = re.match(r"^Extract\s+\d+\s+claims\s+from\b", subject)
if m:
# Get commit author date
date_out = git(
"log", "-1", "--format=%aI", _oldest_sha, timeout=10,
)
commit_date = date_out.strip() if date_out.strip() else None
if commit_date:
# git %aI returns ISO 8601 with T-separator; prs.merged_at
# uses SQLite's space-separator. Lexicographic comparison
# fails across formats (space<T), so normalize commit_date
# via datetime() before comparing. Without this, PRs merged
# within the same calendar day but earlier than the commit
# hour are silently excluded (caught by Ganymede review —
# Shaga's #2025 was dropped in favor of later #2032).
row = conn.execute(
"""SELECT number FROM prs
WHERE status='merged'
AND merged_at >= datetime(?)
AND merged_at <= datetime(datetime(?), '+24 hours')
AND (branch LIKE 'leo/%' OR branch LIKE 'theseus/%'
OR branch LIKE 'rio/%' OR branch LIKE 'astra/%'
OR branch LIKE 'vida/%' OR branch LIKE 'clay/%')
ORDER BY merged_at ASC LIMIT 1""",
(commit_date, commit_date),
).fetchone()
if row:
return row["number"], "git_time_proximity"
return None, "none"
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
canonical = normalize_handle(conn, handle)
if not valid_handle(canonical):
return
kind = classify_kind(canonical)
weight = ROLE_WEIGHTS[role]
counts[(role, "attempt")] += 1
if dry_run:
counts[(role, "would_insert")] += 1
return
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
)
if cur.rowcount > 0:
counts[(role, "inserted")] += 1
else:
counts[(role, "skipped_dup")] += 1
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
"""Best-effort: list added .md files in the PR.
Uses prs.source_path as a fallback signal (the claim being added). If the
branch no longer exists post-merge, this will return []; we accept the loss
for historical PRs where the granular per-claim events can't be recovered —
PR-level author/evaluator events still land correctly.
"""
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
# For the backfill we use prs.source_path — for extract/* PRs this points to
# the source inbox file; we can glob the claim files from the extract branch
# commit on main. But main's commits don't track which files a given PR touched.
# Accept the loss: backfill emits only PR-level events (author, evaluator,
# challenger/synthesizer). Originator events come from parsing claim files
# attributed to the branch via description field which lists claim titles.
return []
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: contribution_events exists (v24 migration applied)
try:
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
sys.exit(2)
# Walk all merged knowledge PRs
query = """
SELECT number, branch, domain, source_channel, submitted_by,
leo_verdict, domain_verdict, domain_agent,
commit_type, merged_at
FROM prs
WHERE status = 'merged'
ORDER BY merged_at ASC
"""
if args.limit:
query += f" LIMIT {args.limit}"
prs = conn.execute(query).fetchall()
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
counts: Counter = Counter()
repo = Path(REPO_DIR)
for pr in prs:
pr_number = pr["number"]
branch = pr["branch"] or ""
domain = pr["domain"]
channel = pr["source_channel"]
merged_at = pr["merged_at"]
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
# evaluator credit for Leo/domain_agent still applies.
is_pipeline_branch = branch.startswith((
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
))
# ── AUTHOR ──
# For pipeline branches, submitted_by carries the real author (the
# human who submitted the source via Telegram/etc). For agent branches,
# the agent is author. For external branches (gh-pr-*), git author is
# in submitted_by from the sync-mirror pipeline.
author = derive_author(conn, dict(pr))
if author:
emit(conn, counts, args.dry_run, author, "author", pr_number,
None, domain, channel, merged_at)
# ── EVALUATOR ──
if pr["leo_verdict"] == "approve":
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
None, domain, channel, merged_at)
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
dagent = pr["domain_agent"].strip().lower()
if dagent and dagent != "leo":
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
None, domain, channel, merged_at)
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
# Only fires on agent-owned branches. Pipeline branches aren't creditable
# work (they're machine extraction, evaluator already captures the review).
if branch.startswith(AGENT_BRANCH_PREFIXES):
prefix = branch.split("/", 1)[0].lower()
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
if event_role:
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
None, domain, channel, merged_at)
# ── ORIGINATOR per claim ──
# Walk claim files currently on main whose content was added in this PR.
# We can't diff old branches (deleted post-merge), but for extract PRs
# the source_path + description carry claim titles — too lossy to build
# per-claim events reliably. Strategy: walk ALL claim files that have a
# sourcer in their frontmatter and assign them to the PR whose
# source_path matches (via description or filename heuristic).
# DEFERRED: per-claim originator events require branch introspection
# that fails on deleted branches. Backfill emits PR-level events only.
# Forward traffic (post-deploy) gets per-claim originator events via
# record_contributor_attribution's added-files walk.
if not args.dry_run:
conn.commit()
# Originator is emitted in the claim-level pass below, not the PR-level pass.
# Previous summary listed it here with attempted=0 which confused operators.
print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===")
for role in ("author", "challenger", "synthesizer", "evaluator"):
att = counts[(role, "attempt")]
if args.dry_run:
wi = counts[(role, "would_insert")]
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
else:
ins = counts[(role, "inserted")]
skip = counts[(role, "skipped_dup")]
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
# ── Per-claim originator pass ──
# Walk the knowledge tree, parse sourcer attribution, and attach each claim
# to its merging PR via find_pr_for_claim's multi-strategy recovery.
# Apr 24 rewrite (Ganymede-approved): replaces the single-strategy
# title→description match with four strategies in reliability order.
# Previous script missed PRs with NULL description (Cameron #3377) and
# cross-context claims (Shaga's Leo research). Fallback title-match is
# preserved to recover anything the git-log path misses.
print("\n=== Claim-level originator pass ===")
# Build title → pr_number map from prs.description (strategy 3 fallback)
title_to_pr: dict[str, int] = {}
for r in conn.execute(
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
).fetchall():
desc = r["description"] or ""
for title in desc.split(" | "):
title = title.strip()
if title:
# Last-writer wins. Conflicts are rare (titles unique in practice).
title_to_pr[title.lower()] = r["number"]
claim_counts = Counter()
strategy_counts = Counter()
claim_count = 0
originator_count = 0
for md in sorted(repo.glob("domains/**/*.md")) + \
sorted(repo.glob("core/**/*.md")) + \
sorted(repo.glob("foundations/**/*.md")) + \
sorted(repo.glob("decisions/**/*.md")):
rel = str(md.relative_to(repo))
stem = md.stem
# Strategies 1, 2, 4 via the helper (sourced_from, git_subject, github_pr).
pr_number, strategy = find_pr_for_claim(conn, repo, md)
# Strategy 3 (fallback): title-match against prs.description.
if not pr_number:
pr_number = title_to_pr.get(stem.lower())
if not pr_number:
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
if pr_number:
strategy = "title_desc"
if not pr_number:
claim_counts["no_pr_match"] += 1
continue
sourcers = extract_sourcers_from_file(md)
if not sourcers:
claim_counts["no_sourcer"] += 1
continue
claim_count += 1
strategy_counts[strategy] += 1
# Look up author for this PR to skip self-credit
pr_row = conn.execute(
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
if not pr_row:
continue
author = derive_author(conn, dict(pr_row))
author_canonical = normalize_handle(conn, author) if author else None
for src_handle in sourcers:
src_canonical = normalize_handle(conn, src_handle)
if not valid_handle(src_canonical):
claim_counts["invalid_handle"] += 1
continue
if src_canonical == author_canonical:
claim_counts["skip_self"] += 1
continue
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
originator_count += 1
if not args.dry_run:
conn.commit()
print(f" Claims processed: {claim_count}")
print(f" Originator events emitted: {originator_count}")
print(f" Breakdown: {dict(claim_counts)}")
print(f" Strategy hits: {dict(strategy_counts)}")
att = counts[("originator", "attempt")]
if args.dry_run:
wi = counts[("originator", "would_insert")]
print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}")
else:
ins = counts[("originator", "inserted")]
skip = counts[("originator", "skipped_dup")]
print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
if not args.dry_run:
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
print(f"\nTotal contribution_events rows: {total}")
if __name__ == "__main__":
main()