"""Verify research-attribution backfill is replay-safe against real schema. Three things to prove: 1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly (idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL). 2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE is a true no-op — does not create a phantom duplicate. 3. The backfill script's specific operation (DELETE then INSERT for same key) nets zero rows when run twice in sequence. """ import sqlite3 import sys # Schema lifted verbatim from lib/db.py:181-209 SCHEMA = """ CREATE TABLE contribution_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, handle TEXT NOT NULL, kind TEXT NOT NULL DEFAULT 'person', role TEXT NOT NULL, weight REAL NOT NULL, pr_number INTEGER NOT NULL, claim_path TEXT, domain TEXT, channel TEXT, timestamp TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events( handle, role, pr_number, claim_path ) WHERE claim_path IS NOT NULL; CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events( handle, role, pr_number ) WHERE claim_path IS NULL; """ def setup() -> sqlite3.Connection: conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row conn.executescript(SCHEMA) return conn def insert_event(conn, handle, role, pr_number, claim_path=None): cur = conn.execute( """INSERT OR IGNORE INTO contribution_events (handle, kind, role, weight, pr_number, claim_path) VALUES (?, 'agent', ?, 0.30, ?, ?)""", (handle, role, pr_number, claim_path), ) return cur.rowcount def count(conn) -> int: return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0] def test_pr_level_dedup_with_null_claim_path(): """Two inserts of same (handle, role, pr_number, NULL) → 1 row.""" conn = setup() r1 = insert_event(conn, "rio", "author", 4061) r2 = insert_event(conn, "rio", "author", 4061) n = count(conn) assert r1 == 1, f"first insert should write, got rowcount={r1}" assert r2 == 0, f"second insert should be ignored, got rowcount={r2}" assert n == 1, f"expected 1 row, got {n}" print("PASS: pr-level dedup with NULL claim_path") def test_per_claim_dedup_with_path(): """Two inserts of same (handle, role, pr_number, path) → 1 row.""" conn = setup() r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md") r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md") n = count(conn) assert r1 == 1 and r2 == 0 and n == 1 print("PASS: per-claim dedup with claim_path") def test_pr_level_and_per_claim_coexist(): """A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist because the partial indexes target different rows.""" conn = setup() r1 = insert_event(conn, "rio", "author", 4061, claim_path=None) r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md") n = count(conn) assert r1 == 1 and r2 == 1 and n == 2 print("PASS: pr-level and per-claim events coexist on same pr_number") def test_backfill_replay_is_noop(): """Simulate the exact backfill operation: INSERT correct event, DELETE wrong event. Run twice. Expect identical state — no phantom rows, no double-deletions.""" conn = setup() # Initial state: m3taversal has the wrong author event for pr=4061 insert_event(conn, "m3taversal", "author", 4061) assert count(conn) == 1 def backfill_pr_4061(): # Insert the correct event (rio is the real author) conn.execute( """INSERT OR IGNORE INTO contribution_events (handle, kind, role, weight, pr_number, claim_path) VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""", ("rio (self-directed)",), ) # Delete the wrong event conn.execute( """DELETE FROM contribution_events WHERE handle='m3taversal' AND role='author' AND pr_number=4061 AND claim_path IS NULL""", ) conn.commit() backfill_pr_4061() state_after_first = sorted( (r["handle"], r["role"], r["pr_number"], r["claim_path"]) for r in conn.execute("SELECT * FROM contribution_events") ) assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first # Replay backfill_pr_4061() state_after_second = sorted( (r["handle"], r["role"], r["pr_number"], r["claim_path"]) for r in conn.execute("SELECT * FROM contribution_events") ) assert state_after_first == state_after_second, "replay should be idempotent" assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}" print("PASS: backfill replay is a true no-op") def test_replay_against_already_backfilled_pr_does_not_double_delete(): """If m3taversal event was already deleted, running backfill again must not error or affect anything else.""" conn = setup() # Already-correct state: rio has the author event, m3taversal does not insert_event(conn, "rio (self-directed)", "author", 4061) insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched # Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op # Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected cur1 = conn.execute( """INSERT OR IGNORE INTO contribution_events (handle, kind, role, weight, pr_number, claim_path) VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""", ) cur2 = conn.execute( """DELETE FROM contribution_events WHERE handle='m3taversal' AND role='author' AND pr_number=4061 AND claim_path IS NULL""", ) assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}" assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}" assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}" print("PASS: replay against already-backfilled state preserves unrelated events") if __name__ == "__main__": test_pr_level_dedup_with_null_claim_path() test_per_claim_dedup_with_path() test_pr_level_and_per_claim_coexist() test_backfill_replay_is_noop() test_replay_against_already_backfilled_pr_does_not_double_delete() print("\nAll 5 tests passed against real schema.")