#!/usr/bin/env python3 """Backfill sourcer/extractor/etc. attribution from claim frontmatter. Walks every merged knowledge file under domains/, entities/, decisions/, foundations/, convictions/, core/ and re-runs the canonical attribution parser (lib/attribution.py). For each parsed (handle, role) pair, increments the corresponding *_count column on the contributors table. Why this is needed (Apr 24 incident): - lib/contributor.py used a diff-line regex parser that handled neither the bare-key flat format (`sourcer: alexastrum`, ~42% of claims) nor the nested `attribution: { sourcer: [...] }` block format used by Leo's manual extractions (Shaga's claims). - Result: alexastrum, thesensatore, cameron-s1, and similar handles were silently dropped at merge time. Their contributor rows either don't exist or are stuck at zero counts. Usage: python3 backfill-sourcer-attribution.py --dry-run # report deltas, no writes python3 backfill-sourcer-attribution.py # apply (additive: max(db, truth)) python3 backfill-sourcer-attribution.py --reset # destructive: set absolute truth Default mode is ADDITIVE for safety: per-role count is set to max(current_db, truth). This preserves any existing high counts that came from non-frontmatter sources (e.g., m3taversal.sourcer=1011 reflects Telegram-curator credit accumulated via a different code path; truncating to the file-walk truth would be destructive). Use --reset to set absolute truth from the file walk only — this clobbers all existing role counts including legitimate non-frontmatter credit. Idempotency: additive mode is safe to re-run. --reset run is gated by an audit_log marker; pass --force to override. """ import argparse import os import sqlite3 import sys from collections import defaultdict from pathlib import Path # Allow running from anywhere — point at pipeline lib PIPELINE_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(PIPELINE_ROOT)) from lib.attribution import parse_attribution_from_file, VALID_ROLES # noqa: E402 DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db") REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")) KNOWLEDGE_PREFIXES = ( "domains", "entities", "decisions", "foundations", "convictions", "core", ) def collect_attributions(repo_root: Path) -> dict[str, dict[str, int]]: """Walk all knowledge files; return {handle: {role: count}}.""" counts: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) files_scanned = 0 files_with_attribution = 0 for prefix in KNOWLEDGE_PREFIXES: base = repo_root / prefix if not base.exists(): continue for path in base.rglob("*.md"): if path.name.startswith("_"): continue files_scanned += 1 attr = parse_attribution_from_file(str(path)) had_any = False for role, entries in attr.items(): for entry in entries: handle = entry.get("handle") if handle: counts[handle][role] += 1 had_any = True if had_any: files_with_attribution += 1 print(f" Scanned {files_scanned} knowledge files", file=sys.stderr) print(f" {files_with_attribution} had parseable attribution", file=sys.stderr) return counts def existing_contributors(conn) -> dict[str, dict[str, int]]: """Return {handle: {role: count}} from the current DB.""" rows = conn.execute( "SELECT handle, sourcer_count, extractor_count, challenger_count, " "synthesizer_count, reviewer_count, claims_merged FROM contributors" ).fetchall() out = {} for r in rows: out[r["handle"]] = { "sourcer": r["sourcer_count"] or 0, "extractor": r["extractor_count"] or 0, "challenger": r["challenger_count"] or 0, "synthesizer": r["synthesizer_count"] or 0, "reviewer": r["reviewer_count"] or 0, "claims_merged": r["claims_merged"] or 0, } return out def claims_merged_for(role_counts: dict[str, int]) -> int: """Mirror upsert_contributor logic: claims_merged += sourcer + extractor.""" return role_counts.get("sourcer", 0) + role_counts.get("extractor", 0) def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true", help="Report deltas without writing") parser.add_argument("--reset", action="store_true", help="Destructive: set absolute truth from file walk " "(default is additive max(db, truth))") parser.add_argument("--force", action="store_true", help="Re-run even if a previous --reset marker exists") args = parser.parse_args() if not REPO.exists(): print(f"ERROR: repo not found at {REPO}", file=sys.stderr) sys.exit(1) print(f"DB: {DB_PATH}", file=sys.stderr) print(f"Repo: {REPO}", file=sys.stderr) print("", file=sys.stderr) print("Walking knowledge tree...", file=sys.stderr) truth = collect_attributions(REPO) print(f" Found attributions for {len(truth)} unique handles", file=sys.stderr) print("", file=sys.stderr) conn = sqlite3.connect(DB_PATH, timeout=30) conn.row_factory = sqlite3.Row current = existing_contributors(conn) # Compute deltas: new handles + handles with role-count mismatches new_handles: list[tuple[str, dict[str, int]]] = [] role_deltas: list[tuple[str, dict[str, int], dict[str, int]]] = [] for handle, roles in truth.items(): if handle not in current: new_handles.append((handle, dict(roles))) else: cur = current[handle] mismatches = {r: roles.get(r, 0) for r in VALID_ROLES if roles.get(r, 0) != cur.get(r, 0)} if mismatches: role_deltas.append((handle, dict(roles), cur)) print(f"=== {len(new_handles)} NEW contributors to insert ===") for handle, roles in sorted(new_handles, key=lambda x: -sum(x[1].values()))[:20]: roles_str = ", ".join(f"{r}={c}" for r, c in roles.items() if c > 0) print(f" + {handle}: {roles_str} (claims_merged={claims_merged_for(roles)})") if len(new_handles) > 20: print(f" ... and {len(new_handles) - 20} more") print() print(f"=== {len(role_deltas)} EXISTING contributors with count drift ===") for handle, truth_roles, cur_roles in sorted( role_deltas, key=lambda x: -sum(x[1].values()), )[:20]: for role in VALID_ROLES: t = truth_roles.get(role, 0) c = cur_roles.get(role, 0) if t != c: print(f" ~ {handle}.{role}: db={c} → truth={t} (Δ{t - c:+d})") if len(role_deltas) > 20: print(f" ... and {len(role_deltas) - 20} more") print() if args.dry_run: mode = "RESET" if args.reset else "ADDITIVE" print(f"Dry run ({mode} mode) — no changes written.") if not args.reset: print("Default is ADDITIVE: existing high counts (e.g. m3taversal=1011) preserved.") print("Pass --reset to clobber existing counts with file-walk truth.") return # Idempotency: --reset is gated by audit marker. Additive mode is always safe. if args.reset: marker = conn.execute( "SELECT 1 FROM audit_log WHERE event = 'sourcer_attribution_backfill_reset' LIMIT 1" ).fetchone() if marker and not args.force: print("ERROR: --reset has already run (audit marker present).") print("Pass --force to re-run.") sys.exit(2) inserted = 0 updated = 0 preserved_higher = 0 for handle, roles in truth.items(): truth_counts = { "sourcer": roles.get("sourcer", 0), "extractor": roles.get("extractor", 0), "challenger": roles.get("challenger", 0), "synthesizer": roles.get("synthesizer", 0), "reviewer": roles.get("reviewer", 0), } if handle in current: cur = current[handle] if args.reset: # Preserve reviewer_count even on reset (PR-level not file-level) final = dict(truth_counts) final["reviewer"] = max(truth_counts["reviewer"], cur.get("reviewer", 0)) else: # Additive: max of db vs truth, per role final = { role: max(truth_counts[role], cur.get(role, 0)) for role in truth_counts } if any(cur.get(r, 0) > truth_counts[r] for r in truth_counts): preserved_higher += 1 cm = final["sourcer"] + final["extractor"] conn.execute( """UPDATE contributors SET sourcer_count = ?, extractor_count = ?, challenger_count = ?, synthesizer_count = ?, reviewer_count = ?, claims_merged = ?, updated_at = datetime('now') WHERE handle = ?""", (final["sourcer"], final["extractor"], final["challenger"], final["synthesizer"], final["reviewer"], cm, handle), ) updated += 1 else: cm = truth_counts["sourcer"] + truth_counts["extractor"] conn.execute( """INSERT INTO contributors ( handle, sourcer_count, extractor_count, challenger_count, synthesizer_count, reviewer_count, claims_merged, first_contribution, last_contribution, tier ) VALUES (?, ?, ?, ?, ?, ?, ?, date('now'), date('now'), 'new')""", (handle, truth_counts["sourcer"], truth_counts["extractor"], truth_counts["challenger"], truth_counts["synthesizer"], truth_counts["reviewer"], cm), ) inserted += 1 event = "sourcer_attribution_backfill_reset" if args.reset else "sourcer_attribution_backfill" conn.execute( "INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)", ("contributor", event, f'{{"inserted": {inserted}, "updated": {updated}, ' f'"preserved_higher": {preserved_higher}, "mode": ' f'"{"reset" if args.reset else "additive"}"}}'), ) conn.commit() print(f"Done ({'RESET' if args.reset else 'ADDITIVE'}). " f"Inserted {inserted} new, updated {updated} existing, " f"preserved {preserved_higher} higher-than-truth values.") if __name__ == "__main__": main()