Rewrite claim-level pass in backfill-events.py to recover the Forgejo PR that introduced each claim via a cascade of 4 strategies (reliability order), replacing the single title→description match that missed PRs with NULL description (Cameron #3377) and bare-subject extracts (Shaga's Leo research PR). ## Strategies 1. sourced_from frontmatter → prs.source_path stem match 2. git log first-add commit → subject pattern → prs.branch - "<agent>: extract claims from <slug>" → extract/<slug> - "<agent>: research session YYYY-MM-DD" → <agent>/research-<date> - "<agent>: (challenge|contrib|entity|synthesize)" → <agent>/* - "Recover X from GitHub PR #N" → prs.github_pr=N - "Extract N claims from X" (no prefix) → time-proximity on agent-owned branches within 24h 3. Current title_desc fallback for anything the above miss ## Dry-run projection (1,662 merged PRs) Before: Claims processed: 33 Originator events: 6 Breakdown: {no_pr_match: 1608, no_sourcer: 26, invalid_handle: 21, skip_self: 6} After: Claims processed: 505 (+472) Originator events: 126 (+120) Strategy hits: git_subject=412, sourced_from=88, git_time_proximity=5 Breakdown: {no_pr_match: 1095, no_sourcer: 67, invalid_handle: 359, skip_self: 20} ## Verified on real VPS data - @thesensatore claims: 3/5 resolve via git_time_proximity to leo/ PRs - Cameron-S1, alexastrum: remain None — their recovery commits (dba00a79, da64f805) bypassed the pipeline entirely, no Forgejo PR record exists. Requires synthetic prs rows — deferred to separate commit with its own Ganymede review (write operation, larger blast radius than this pure-read backfill change). ## Implementation - New find_pr_for_claim(conn, repo, md) helper returns (pr_number, strategy) - Claim-level pass uses it first, falls back to title_desc map - Strategy counter surfaced in summary output for operator visibility Idempotent — backfill re-runs skip duplicate events via the partial UNIQUE index on contribution_events. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
611 lines
24 KiB
Python
611 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
|
|
|
|
For each merged PR:
|
|
- Derive author from prs.submitted_by → git author → branch prefix
|
|
- Emit author event (role=author, weight=0.30, claim_path=NULL)
|
|
- For each claim file under a knowledge prefix, parse frontmatter and emit
|
|
originator events for sourcer entries that differ from the author
|
|
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
|
|
(when domain_verdict='approve' and not Leo)
|
|
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
|
|
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
|
|
|
|
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
|
|
|
|
Usage:
|
|
python3 scripts/backfill-events.py --dry-run # Count events without writing
|
|
python3 scripts/backfill-events.py # Apply
|
|
|
|
Runs read-only against the git worktree; only writes to pipeline.db.
|
|
"""
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
|
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
|
|
|
|
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
|
|
ROLE_WEIGHTS = {
|
|
"author": 0.30,
|
|
"challenger": 0.25,
|
|
"synthesizer": 0.20,
|
|
"originator": 0.15,
|
|
"evaluator": 0.05,
|
|
}
|
|
|
|
PENTAGON_AGENTS = frozenset({
|
|
"rio", "leo", "theseus", "vida", "clay", "astra",
|
|
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
|
|
"pipeline",
|
|
})
|
|
|
|
# Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES.
|
|
# Duplicated here because this script runs standalone (no pipeline package import).
|
|
AGENT_BRANCH_PREFIXES = (
|
|
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
|
|
)
|
|
|
|
TRAILER_EVENT_ROLE = {
|
|
"challenge": "challenger",
|
|
"enrich": "synthesizer",
|
|
"research": "synthesizer",
|
|
"reweave": "synthesizer",
|
|
}
|
|
|
|
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
|
|
|
|
BOT_AUTHORS = frozenset({
|
|
"teleo", "teleo-bot", "pipeline",
|
|
"github-actions[bot]", "forgejo-actions",
|
|
})
|
|
|
|
|
|
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
|
|
if not handle:
|
|
return ""
|
|
h = handle.strip().lower().lstrip("@")
|
|
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
|
|
if row:
|
|
return row[0]
|
|
return h
|
|
|
|
|
|
def classify_kind(handle: str) -> str:
|
|
h = handle.strip().lower().lstrip("@")
|
|
return "agent" if h in PENTAGON_AGENTS else "person"
|
|
|
|
|
|
def parse_frontmatter(text: str):
|
|
"""Minimal YAML frontmatter parser using PyYAML when available."""
|
|
if not text.startswith("---"):
|
|
return None
|
|
end = text.find("---", 3)
|
|
if end == -1:
|
|
return None
|
|
raw = text[3:end]
|
|
try:
|
|
import yaml
|
|
fm = yaml.safe_load(raw)
|
|
return fm if isinstance(fm, dict) else None
|
|
except ImportError:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def extract_sourcers_from_file(path: Path) -> list[str]:
|
|
"""Return the sourcer handles from a claim file's frontmatter.
|
|
|
|
Matches three formats:
|
|
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
|
|
2. Bare-key flat: `sourcer: alexastrum`
|
|
3. Prefix-keyed: `attribution_sourcer: alexastrum`
|
|
"""
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
|
|
return []
|
|
fm = parse_frontmatter(content)
|
|
if not fm:
|
|
return []
|
|
|
|
handles: list[str] = []
|
|
|
|
attr = fm.get("attribution")
|
|
if isinstance(attr, dict):
|
|
entries = attr.get("sourcer", [])
|
|
if isinstance(entries, list):
|
|
for e in entries:
|
|
if isinstance(e, dict) and "handle" in e:
|
|
handles.append(e["handle"])
|
|
elif isinstance(e, str):
|
|
handles.append(e)
|
|
elif isinstance(entries, str):
|
|
handles.append(entries)
|
|
return handles
|
|
|
|
flat = fm.get("attribution_sourcer")
|
|
if flat:
|
|
if isinstance(flat, str):
|
|
handles.append(flat)
|
|
elif isinstance(flat, list):
|
|
handles.extend(v for v in flat if isinstance(v, str))
|
|
if handles:
|
|
return handles
|
|
|
|
bare = fm.get("sourcer")
|
|
if bare:
|
|
if isinstance(bare, str):
|
|
handles.append(bare)
|
|
elif isinstance(bare, list):
|
|
handles.extend(v for v in bare if isinstance(v, str))
|
|
|
|
return handles
|
|
|
|
|
|
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
|
|
|
|
|
|
def valid_handle(h: str) -> bool:
|
|
if not h:
|
|
return False
|
|
lower = h.strip().lower().lstrip("@")
|
|
if lower.endswith("-") or lower.endswith("_"):
|
|
return False
|
|
return bool(_HANDLE_RE.match(lower))
|
|
|
|
|
|
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
|
|
"""Run a git command, return stdout. Returns empty string on failure."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", *args],
|
|
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
|
|
)
|
|
return result.stdout
|
|
except (subprocess.TimeoutExpired, OSError):
|
|
return ""
|
|
|
|
|
|
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
|
|
"""Best-effort: find git author of first non-merge commit on the branch.
|
|
|
|
PR branches are usually deleted after merge. We fall back to scanning main
|
|
commits around merged_at for commits matching the branch slug.
|
|
"""
|
|
# Post-merge branches are cleaned up. For the backfill, we accept that this
|
|
# path rarely yields results and rely on submitted_by + branch prefix.
|
|
return ""
|
|
|
|
|
|
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
|
|
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
|
|
if pr.get("submitted_by"):
|
|
cand = pr["submitted_by"].strip().lower().lstrip("@")
|
|
if cand and cand not in BOT_AUTHORS:
|
|
return cand
|
|
branch = pr.get("branch") or ""
|
|
if "/" in branch:
|
|
prefix = branch.split("/", 1)[0].lower()
|
|
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
|
|
return prefix
|
|
return None
|
|
|
|
|
|
def find_pr_for_claim(
|
|
conn: sqlite3.Connection,
|
|
repo: Path,
|
|
md: Path,
|
|
) -> tuple[int | None, str]:
|
|
"""Recover the Forgejo PR number that introduced a claim file.
|
|
|
|
Returns (pr_number, strategy) — strategy is one of:
|
|
'sourced_from' — frontmatter sourced_from matched prs.source_path
|
|
'git_subject' — git log first-add commit message matched a branch pattern
|
|
'title_desc' — filename stem matched a title in prs.description
|
|
'github_pr' — recovery commit mentioned GitHub PR # → prs.github_pr
|
|
'none' — no strategy found a match
|
|
|
|
Order is chosen by reliability:
|
|
1. sourced_from (explicit provenance, most reliable when present)
|
|
2. git_subject (covers Leo research, Cameron challenges, Theseus contrib)
|
|
3. title_desc (current fallback — brittle when description is NULL)
|
|
4. github_pr (recovery commits referencing erased GitHub PRs)
|
|
"""
|
|
rel = str(md.relative_to(repo))
|
|
|
|
# Strategy 1: sourced_from frontmatter → prs.source_path
|
|
try:
|
|
content = md.read_text(encoding="utf-8")
|
|
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
|
|
content = ""
|
|
fm = parse_frontmatter(content) if content else None
|
|
if fm:
|
|
sourced = fm.get("sourced_from")
|
|
candidate_paths: list[str] = []
|
|
if isinstance(sourced, str) and sourced:
|
|
candidate_paths.append(sourced)
|
|
elif isinstance(sourced, list):
|
|
candidate_paths.extend(s for s in sourced if isinstance(s, str))
|
|
for sp in candidate_paths:
|
|
stem = Path(sp).stem
|
|
if not stem:
|
|
continue
|
|
row = conn.execute(
|
|
"""SELECT number FROM prs
|
|
WHERE source_path LIKE ? AND status='merged'
|
|
ORDER BY merged_at ASC LIMIT 1""",
|
|
(f"%{stem}.md",),
|
|
).fetchone()
|
|
if row:
|
|
return row["number"], "sourced_from"
|
|
|
|
# Strategy 2: git log first-add commit → subject pattern → prs.branch
|
|
# Default log order is reverse-chronological; take the last line (oldest)
|
|
# to get the original addition, not later rewrites.
|
|
log_out = git(
|
|
"log", "--diff-filter=A", "--follow",
|
|
"--format=%H|||%s|||%b", "--", rel,
|
|
)
|
|
if log_out.strip():
|
|
# Split on the delimiter we chose. Each commit produces 3 fields but
|
|
# %b can contain blank lines — group by lines that look like a SHA.
|
|
blocks: list[tuple[str, str, str]] = []
|
|
current: list[str] = []
|
|
for line in log_out.splitlines():
|
|
if re.match(r"^[a-f0-9]{40}\|\|\|", line):
|
|
if current:
|
|
parts = "\n".join(current).split("|||", 2)
|
|
if len(parts) == 3:
|
|
blocks.append((parts[0], parts[1], parts[2]))
|
|
current = [line]
|
|
else:
|
|
current.append(line)
|
|
if current:
|
|
parts = "\n".join(current).split("|||", 2)
|
|
if len(parts) == 3:
|
|
blocks.append((parts[0], parts[1], parts[2]))
|
|
if blocks:
|
|
# Oldest addition — git log defaults to reverse-chronological
|
|
_oldest_sha, subject, body = blocks[-1]
|
|
|
|
# Pattern: "<agent>: extract claims from <slug>"
|
|
m = re.match(r"^(\w+):\s*extract\s+claims\s+from\s+(\S+)", subject)
|
|
if m:
|
|
slug = m.group(2).rstrip(".md").rstrip(".")
|
|
row = conn.execute(
|
|
"""SELECT number FROM prs
|
|
WHERE branch LIKE ? AND status='merged'
|
|
ORDER BY merged_at ASC LIMIT 1""",
|
|
(f"extract/{slug}%",),
|
|
).fetchone()
|
|
if row:
|
|
return row["number"], "git_subject"
|
|
|
|
# Pattern: "<agent>: research session <date>"
|
|
m = re.match(r"^(\w+):\s*research\s+session\s+(\d{4}-\d{2}-\d{2})", subject)
|
|
if m:
|
|
agent = m.group(1).lower()
|
|
date = m.group(2)
|
|
row = conn.execute(
|
|
"""SELECT number FROM prs
|
|
WHERE branch LIKE ? AND status='merged'
|
|
ORDER BY merged_at ASC LIMIT 1""",
|
|
(f"{agent}/research-{date}%",),
|
|
).fetchone()
|
|
if row:
|
|
return row["number"], "git_subject"
|
|
|
|
# Pattern: "<agent>: challenge" / contrib challenges / entity batches
|
|
m = re.match(r"^(\w+):\s*(?:challenge|contrib|entity|synthesize)", subject)
|
|
if m:
|
|
agent = m.group(1).lower()
|
|
row = conn.execute(
|
|
"""SELECT number FROM prs
|
|
WHERE branch LIKE ? AND status='merged'
|
|
ORDER BY merged_at ASC LIMIT 1""",
|
|
(f"{agent}/%",),
|
|
).fetchone()
|
|
if row:
|
|
return row["number"], "git_subject"
|
|
|
|
# Recovery commits referencing erased GitHub PRs (Alex/Cameron).
|
|
# Subject: "Recover <who> contribution from GitHub PR #NN (...)".
|
|
# Match only when a corresponding prs row exists with github_pr=NN —
|
|
# otherwise the claims were direct-to-main without a Forgejo PR
|
|
# record, which requires a synthetic PR row (follow-up, not in
|
|
# this script's scope).
|
|
gh_match = re.search(r"GitHub\s+PR\s+#(\d+)", subject + "\n" + body)
|
|
if gh_match:
|
|
gh_pr = int(gh_match.group(1))
|
|
row = conn.execute(
|
|
"SELECT number FROM prs WHERE github_pr = ? AND status='merged' LIMIT 1",
|
|
(gh_pr,),
|
|
).fetchone()
|
|
if row:
|
|
return row["number"], "github_pr"
|
|
|
|
# Pattern: bare "Extract N claims from <source-fragment>" (no
|
|
# agent prefix). Used in early research PRs like Shaga's claims
|
|
# at PR #2025. Fall back to time-proximity: find the earliest
|
|
# agent-branch PR merged within 24h AFTER this commit's date.
|
|
m = re.match(r"^Extract\s+\d+\s+claims\s+from\b", subject)
|
|
if m:
|
|
# Get commit author date
|
|
date_out = git(
|
|
"log", "-1", "--format=%aI", _oldest_sha, timeout=10,
|
|
)
|
|
commit_date = date_out.strip() if date_out.strip() else None
|
|
if commit_date:
|
|
row = conn.execute(
|
|
"""SELECT number FROM prs
|
|
WHERE status='merged'
|
|
AND merged_at >= ?
|
|
AND merged_at <= datetime(?, '+24 hours')
|
|
AND (branch LIKE 'leo/%' OR branch LIKE 'theseus/%'
|
|
OR branch LIKE 'rio/%' OR branch LIKE 'astra/%'
|
|
OR branch LIKE 'vida/%' OR branch LIKE 'clay/%')
|
|
ORDER BY merged_at ASC LIMIT 1""",
|
|
(commit_date, commit_date),
|
|
).fetchone()
|
|
if row:
|
|
return row["number"], "git_time_proximity"
|
|
|
|
return None, "none"
|
|
|
|
|
|
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
|
|
canonical = normalize_handle(conn, handle)
|
|
if not valid_handle(canonical):
|
|
return
|
|
kind = classify_kind(canonical)
|
|
weight = ROLE_WEIGHTS[role]
|
|
counts[(role, "attempt")] += 1
|
|
if dry_run:
|
|
counts[(role, "would_insert")] += 1
|
|
return
|
|
cur = conn.execute(
|
|
"""INSERT OR IGNORE INTO contribution_events
|
|
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
|
|
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
|
|
)
|
|
if cur.rowcount > 0:
|
|
counts[(role, "inserted")] += 1
|
|
else:
|
|
counts[(role, "skipped_dup")] += 1
|
|
|
|
|
|
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
|
|
"""Best-effort: list added .md files in the PR.
|
|
|
|
Uses prs.source_path as a fallback signal (the claim being added). If the
|
|
branch no longer exists post-merge, this will return []; we accept the loss
|
|
for historical PRs where the granular per-claim events can't be recovered —
|
|
PR-level author/evaluator events still land correctly.
|
|
"""
|
|
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
|
|
# For the backfill we use prs.source_path — for extract/* PRs this points to
|
|
# the source inbox file; we can glob the claim files from the extract branch
|
|
# commit on main. But main's commits don't track which files a given PR touched.
|
|
# Accept the loss: backfill emits only PR-level events (author, evaluator,
|
|
# challenger/synthesizer). Originator events come from parsing claim files
|
|
# attributed to the branch via description field which lists claim titles.
|
|
return []
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
|
|
args = parser.parse_args()
|
|
|
|
if not Path(DB_PATH).exists():
|
|
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
conn = sqlite3.connect(DB_PATH, timeout=30)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# Sanity: contribution_events exists (v24 migration applied)
|
|
try:
|
|
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
|
|
except sqlite3.OperationalError:
|
|
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# Walk all merged knowledge PRs
|
|
query = """
|
|
SELECT number, branch, domain, source_channel, submitted_by,
|
|
leo_verdict, domain_verdict, domain_agent,
|
|
commit_type, merged_at
|
|
FROM prs
|
|
WHERE status = 'merged'
|
|
ORDER BY merged_at ASC
|
|
"""
|
|
if args.limit:
|
|
query += f" LIMIT {args.limit}"
|
|
prs = conn.execute(query).fetchall()
|
|
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
|
|
|
|
counts: Counter = Counter()
|
|
repo = Path(REPO_DIR)
|
|
|
|
for pr in prs:
|
|
pr_number = pr["number"]
|
|
branch = pr["branch"] or ""
|
|
domain = pr["domain"]
|
|
channel = pr["source_channel"]
|
|
merged_at = pr["merged_at"]
|
|
|
|
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
|
|
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
|
|
# evaluator credit for Leo/domain_agent still applies.
|
|
is_pipeline_branch = branch.startswith((
|
|
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
|
|
))
|
|
|
|
# ── AUTHOR ──
|
|
# For pipeline branches, submitted_by carries the real author (the
|
|
# human who submitted the source via Telegram/etc). For agent branches,
|
|
# the agent is author. For external branches (gh-pr-*), git author is
|
|
# in submitted_by from the sync-mirror pipeline.
|
|
author = derive_author(conn, dict(pr))
|
|
if author:
|
|
emit(conn, counts, args.dry_run, author, "author", pr_number,
|
|
None, domain, channel, merged_at)
|
|
|
|
# ── EVALUATOR ──
|
|
if pr["leo_verdict"] == "approve":
|
|
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
|
|
None, domain, channel, merged_at)
|
|
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
|
|
dagent = pr["domain_agent"].strip().lower()
|
|
if dagent and dagent != "leo":
|
|
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
|
|
None, domain, channel, merged_at)
|
|
|
|
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
|
|
# Only fires on agent-owned branches. Pipeline branches aren't creditable
|
|
# work (they're machine extraction, evaluator already captures the review).
|
|
if branch.startswith(AGENT_BRANCH_PREFIXES):
|
|
prefix = branch.split("/", 1)[0].lower()
|
|
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
|
|
if event_role:
|
|
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
|
|
None, domain, channel, merged_at)
|
|
|
|
# ── ORIGINATOR per claim ──
|
|
# Walk claim files currently on main whose content was added in this PR.
|
|
# We can't diff old branches (deleted post-merge), but for extract PRs
|
|
# the source_path + description carry claim titles — too lossy to build
|
|
# per-claim events reliably. Strategy: walk ALL claim files that have a
|
|
# sourcer in their frontmatter and assign them to the PR whose
|
|
# source_path matches (via description or filename heuristic).
|
|
# DEFERRED: per-claim originator events require branch introspection
|
|
# that fails on deleted branches. Backfill emits PR-level events only.
|
|
# Forward traffic (post-deploy) gets per-claim originator events via
|
|
# record_contributor_attribution's added-files walk.
|
|
|
|
if not args.dry_run:
|
|
conn.commit()
|
|
|
|
# Originator is emitted in the claim-level pass below, not the PR-level pass.
|
|
# Previous summary listed it here with attempted=0 which confused operators.
|
|
print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===")
|
|
for role in ("author", "challenger", "synthesizer", "evaluator"):
|
|
att = counts[(role, "attempt")]
|
|
if args.dry_run:
|
|
wi = counts[(role, "would_insert")]
|
|
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
|
|
else:
|
|
ins = counts[(role, "inserted")]
|
|
skip = counts[(role, "skipped_dup")]
|
|
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
|
|
|
|
# ── Per-claim originator pass ──
|
|
# Walk the knowledge tree, parse sourcer attribution, and attach each claim
|
|
# to its merging PR via find_pr_for_claim's multi-strategy recovery.
|
|
# Apr 24 rewrite (Ganymede-approved): replaces the single-strategy
|
|
# title→description match with four strategies in reliability order.
|
|
# Previous script missed PRs with NULL description (Cameron #3377) and
|
|
# cross-context claims (Shaga's Leo research). Fallback title-match is
|
|
# preserved to recover anything the git-log path misses.
|
|
print("\n=== Claim-level originator pass ===")
|
|
# Build title → pr_number map from prs.description (strategy 3 fallback)
|
|
title_to_pr: dict[str, int] = {}
|
|
for r in conn.execute(
|
|
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
|
|
).fetchall():
|
|
desc = r["description"] or ""
|
|
for title in desc.split(" | "):
|
|
title = title.strip()
|
|
if title:
|
|
# Last-writer wins. Conflicts are rare (titles unique in practice).
|
|
title_to_pr[title.lower()] = r["number"]
|
|
|
|
claim_counts = Counter()
|
|
strategy_counts = Counter()
|
|
claim_count = 0
|
|
originator_count = 0
|
|
for md in sorted(repo.glob("domains/**/*.md")) + \
|
|
sorted(repo.glob("core/**/*.md")) + \
|
|
sorted(repo.glob("foundations/**/*.md")) + \
|
|
sorted(repo.glob("decisions/**/*.md")):
|
|
rel = str(md.relative_to(repo))
|
|
stem = md.stem
|
|
|
|
# Strategies 1, 2, 4 via the helper (sourced_from, git_subject, github_pr).
|
|
pr_number, strategy = find_pr_for_claim(conn, repo, md)
|
|
|
|
# Strategy 3 (fallback): title-match against prs.description.
|
|
if not pr_number:
|
|
pr_number = title_to_pr.get(stem.lower())
|
|
if not pr_number:
|
|
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
|
|
if pr_number:
|
|
strategy = "title_desc"
|
|
|
|
if not pr_number:
|
|
claim_counts["no_pr_match"] += 1
|
|
continue
|
|
|
|
sourcers = extract_sourcers_from_file(md)
|
|
if not sourcers:
|
|
claim_counts["no_sourcer"] += 1
|
|
continue
|
|
|
|
claim_count += 1
|
|
strategy_counts[strategy] += 1
|
|
# Look up author for this PR to skip self-credit
|
|
pr_row = conn.execute(
|
|
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
|
|
(pr_number,),
|
|
).fetchone()
|
|
if not pr_row:
|
|
continue
|
|
author = derive_author(conn, dict(pr_row))
|
|
author_canonical = normalize_handle(conn, author) if author else None
|
|
|
|
for src_handle in sourcers:
|
|
src_canonical = normalize_handle(conn, src_handle)
|
|
if not valid_handle(src_canonical):
|
|
claim_counts["invalid_handle"] += 1
|
|
continue
|
|
if src_canonical == author_canonical:
|
|
claim_counts["skip_self"] += 1
|
|
continue
|
|
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
|
|
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
|
|
originator_count += 1
|
|
|
|
if not args.dry_run:
|
|
conn.commit()
|
|
|
|
print(f" Claims processed: {claim_count}")
|
|
print(f" Originator events emitted: {originator_count}")
|
|
print(f" Breakdown: {dict(claim_counts)}")
|
|
print(f" Strategy hits: {dict(strategy_counts)}")
|
|
att = counts[("originator", "attempt")]
|
|
if args.dry_run:
|
|
wi = counts[("originator", "would_insert")]
|
|
print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}")
|
|
else:
|
|
ins = counts[("originator", "inserted")]
|
|
skip = counts[("originator", "skipped_dup")]
|
|
print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
|
|
|
|
if not args.dry_run:
|
|
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
|
|
print(f"\nTotal contribution_events rows: {total}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|