Compare commits

..

13 commits

Author SHA1 Message Date
1351db70a9 fix(tests): apply Ganymede review nits + add m3taversal reset script
Some checks are pending
CI / lint-and-test (push) Waiting to run
3 nits from review of d60b6f8 + Q4 ask:

1. test_window_24h_only_today: replace always-true assertion with
   concrete `assert handles == ["carol"]`. Push alice's most-recent
   event from -1 days to -2 days to eliminate fixture-vs-query
   microsecond drift on the 24h boundary.
2. _call helper: asyncio.get_event_loop().run_until_complete →
   asyncio.run (deprecation in 3.12, raises in some 3.14 contexts).
3. test_invalid_limit_falls_to_default: dead first call removed,
   misleading "7 entries" comment now matches assertion.

Q4: scripts/reset-m3taversal-sourcer.py captures the surgical
UPDATE we ran on VPS as a reviewable artifact. Idempotent (no-op
on already-reset rows), audit_log entry per run. Ganymede's point:
DB mutations should leave a code paper trail, not just an audit
row whose origin lives only in the executor's memory.

30/30 tests pass on VPS hermes venv (aiohttp 3.13.5, py 3.11.15).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:35:18 +01:00
d60b6f8bf2 test(leaderboard): cover all four slicings + AND-prefix regression
Adds tests/test_leaderboard.py — 30 cases against
diagnostics/leaderboard_routes.py. Two reasons:

(1) Zero coverage on an endpoint Argus + Oberon are about to consume
    for the May 5 hackathon UI. Two bugs slipped through this morning
    (404 wiring missing in app.py; AND-prefix SQL syntax error on
    rolling-window). Tests prevent regression.

(2) Tests serve as living documentation for Oberon's frontend
    integration — each test names a contract guarantee
    (test_left_join_handles_missing_contributors_row,
    test_composed_window_kind_domain, test_role_breakdown_present).

Coverage:
  - _parse_window unit tests (10): all_time, Nd, Nh, caps, garbage,
    case-normalization, and explicit no-AND-prefix assertion
  - handle_leaderboard integration (18): every kind value, every
    window family, domain filter, composed filters, limit + has_more,
    invalid-input fallback, role breakdown shape, empty-window shape,
    LEFT JOIN COALESCE for handles missing from contributors
  - 2 contract assertions: LEADERBOARD_PUBLIC_PATHS membership +
    KIND_VALUES set

Run: 30/30 pass on VPS hermes venv (aiohttp 3.13.5, pytest 9.0.2).
Skips clean locally without aiohttp via pytest.importorskip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:46:47 +01:00
cd5aac5cc6 fix(activity-feed): remove [:120] slug truncation
Some checks are pending
CI / lint-and-test (push) Waiting to run
Claim slugs were being cut at 120 chars in _extract_claim_slugs, causing
Timeline event clicks to 404 when the on-disk filename exceeded that
length (frontend builds /api/claims/<slug> from the truncated value).

This fix landed Apr 26 but regressed when the file was redeployed —
committing the unmangled version to repo so deploy.sh re-shipping
doesn't reintroduce the cap.

Verified live: max slug now 265 chars, 16 of 30 over the old 120 cap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:27:31 +01:00
7c6417d6be test(diagnostics): activity_endpoint classify_pr_operation suite
Move tests from /tmp into the proper test suite. 22 cases covering:

- Leo gotcha: extract/* + commit_type=enrich/challenge classifies by
  commit_type, not branch prefix (same pattern as the contributor-role
  wiring fix)
- Reweave priority: branch.startswith('reweave/') wins over
  _MAINTENANCE_COMMIT_TYPES — nightly reweave PRs classify as enrich,
  not infra. Locks in the bifurcation against future priority refactors
- Full NON_MERGED_STATUS_TO_OPERATION coverage: open, approved, closed,
  conflict, validating, reviewing, merging, zombie
- Knowledge-producing commit_types (research, entity) → new
- Maintenance commit_types (fix, pipeline) → infra
- Defensive: null inputs, unknown status

aiohttp imported at module load — file uses pytest.importorskip so it
runs cleanly in any environment with aiohttp installed and skips gracefully
otherwise. sys.path inject for diagnostics/ since it isn't packaged.

Reviewed-by: Ganymede

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:39:44 +01:00
42d35d4e15 fix(diagnostics): wire /api/leaderboard into app.py + fix rolling-window SQL
Some checks are pending
CI / lint-and-test (push) Waiting to run
de7e5ec landed leaderboard_routes.py + the route file's register fn but
the import + register_leaderboard_routes(app) call + auth-middleware
allowlist were never added to app.py — endpoint returned 404 in production.

Three minimal edits to app.py mirror the existing register_*_routes pattern
(import at line 28, allowlist OR-clause at line 512, register call at 2365).

Plus a SQL bug in _parse_window: rolling-window clauses prefixed "AND "
but the WHERE composition uses " AND ".join(...), producing
"WHERE 1=1 AND AND ce.timestamp..." → sqlite3.OperationalError on every
window=Nd / window=Nh request. Stripped the prefix and added a comment so
the asymmetry doesn't bite again.

Verified on VPS:
  GET /api/leaderboard?window=all_time&kind=person → 200, 11 rows
  GET /api/leaderboard?window=7d&kind=person → 200, 2 rows
  GET /api/leaderboard?window=30d&kind=person → 200, 9 rows
  GET /api/leaderboard?domain=internet-finance → 200, 3 rows
  GET /api/leaderboard?kind=agent → 200, leo/rio/clay/astra/vida

Unblocks: Argus dashboard cutover, Oberon column reorder, Leo's CI
taxonomy broadcast.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:30:26 +01:00
de7e5ec709 feat(diagnostics): /api/leaderboard reads contribution_events directly
Some checks are pending
CI / lint-and-test (push) Waiting to run
New endpoint replaces the legacy /api/contributors *_count read path with
event-sourced reads from the Phase A contribution_events ledger.

- Params: window (all_time | Nd | Nh), kind (person | agent | org | all),
  domain (filter), limit (default 100, max 500)
- Returns per-handle CI, full role breakdown (author/challenger/synthesizer/
  originator/evaluator), events_count, pr_count, first/last contribution
- ORDER BY ci DESC, last_contribution DESC — recent contributors break ties
- Read-only sqlite URI; total/has_more computed for paginated UIs

Wiring (import + register + _PUBLIC_PATHS entry) currently applied to live
app.py on VPS only — repo app.py has drift from Ship's uncommitted /api/search
POST contract. Next deploy.sh round-trip needs both to land together.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:16:41 +01:00
369f6c96da fix(attribution): credit research-session sources to agents, not m3taversal (#7)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Forward fix: research-session.sh writes intake_tier: research-task (no proposed_by — extract.py infers agent from branch).

Backfill: 304 PRs reattributed across 30 days (rio 74, clay 70, astra 53, vida 48, theseus 30, leo 29). Already applied to production.

Format reconciliation: normalize_handle strips (self-directed) suffix so both halves canonicalize to the same agent handle.

5 idempotency tests passing. Production replay self-extinguishes (delta 3839→3839).
2026-04-27 11:59:54 +00:00
6aff03ff56 fix(attribution): unify research-session format on "(self-directed)" suffix
Some checks are pending
CI / lint-and-test (pull_request) Waiting to run
Resolves the format inconsistency between the forward fix and the 304-row
backfill. Both halves now produce prs.submitted_by = "rio (self-directed)":

- research-session.sh: drop proposed_by from the frontmatter template.
  extract.py path 1 (proposed_by-driven) no longer fires; path 2 fires
  instead and constructs f"{agent} (self-directed)" — matches backfill.

- attribution.py: normalize_handle now strips "(self-directed)" suffix
  immediately after lowercase+@-strip, before alias lookup. Closes the
  phantom-person-event class on any future replay through
  record_contributor_attribution. Round-trips through alias rules keyed
  on bare agent names.

Test (5 cases) still passes; suffix-strip behavior verified against
hostile inputs (whitespace, casing, mid-string occurrences must NOT
match — only trailing pattern).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:53:52 +01:00
319e03e2c6 test(attribution): prove research-backfill replay is idempotent
Some checks are pending
CI / lint-and-test (pull_request) Waiting to run
Five tests against the real contribution_events schema (lib/db.py:181-209):
- pr-level dedup with NULL claim_path via idx_ce_unique_pr partial index
- per-claim dedup with non-NULL claim_path via idx_ce_unique_claim partial index
- pr-level and per-claim events coexist on the same pr_number
- backfill (INSERT correct + DELETE wrong) is a true no-op on replay
- replay against already-backfilled state preserves unrelated events

Schema case identified: case 2 with partial-index split solution already in
place. Two partial UNIQUE indexes target disjoint row sets (claim_path IS NULL
vs IS NOT NULL), bypassing SQLite's NULL-not-equal-NULL UNIQUE quirk.

Production replay verified: re-running backfill --apply against the live DB
returns "misattributed PRs found: 0" because the first-run UPDATE flipped the
WHERE predicate. Total contribution_events count: 3839 → 3839.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:50:17 +01:00
2d332c66d4 fix(attribution): credit research-session sources to agents, not m3taversal
Some checks are pending
CI / lint-and-test (pull_request) Waiting to run
Two-part fix for a bug where every claim extracted from agent overnight
research sessions was being credited to m3taversal in contribution_events
(visible in the activity feed as "@m3taversal" on agent-derived claims).

Forward fix (research/research-session.sh):
The frontmatter template the agent prompt instructs Claude to use now
includes `proposed_by: ${AGENT}` and `intake_tier: research-task`. With
those fields present, extract.py path 1 (line 687) takes precedence and
sets prs.submitted_by to the agent handle, which then propagates into
contribution_events as a kind='agent' author event for the agent.

Without the fields, extract.py fell through to the default branch on
line 695 and set submitted_by='@m3taversal'.

Backfill (scripts/backfill-research-session-attribution.py):
Identifies research-session-derived PRs by finding teleo-codex commits
matching `^<agent>: research session YYYY-MM-DD —`, listing the
inbox/queue/*.md files added in each commit's diff, and matching those
filename basenames against prs.source_path. Only PRs currently
submitted_by='@m3taversal' AND merged within the configurable window
are touched. Default --dry-run; --apply to commit.

For each match the script:
  1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
  2. INSERT OR IGNORE the agent author event (kind='agent', weight=0.30)
     with the original PR's domain, channel, merged_at preserved
  3. DELETE the misattributed m3taversal author event

Applied 30-day backfill on VPS:
  - 304 PRs re-attributed (rio 74, clay 70, astra 53, vida 48,
    theseus 30, leo 29)
  - 297 m3taversal author events deleted, 304 agent author events
    inserted (delta of 7 = pre-v24 PRs that never had m3ta events
    in the first place; we still create the new agent event)
  - m3taversal author count: 1368 → 1071 (−22%)
  - Pre-backfill DB snapshot: pipeline.db.bak-pre-research-attribution

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:38:53 +01:00
dea1b02aa6 fix(attribution): narrow exception + document gate asymmetry (Ganymede review)
Two follow-up fixes from Ganymede's review of d0fb4c9:

1. is_publisher_handle: narrow `except Exception` to sqlite3.OperationalError.
   Pre-v26 DB fallback only needs to catch the "table doesn't exist" case;
   broader exceptions (programming errors, locks, corruption) should propagate.

2. upsert_contributor gate: add comment documenting the alias-resolution
   asymmetry between insert_contribution_event (alias-resolved via
   normalize_handle) and upsert_contributor (bare lower+lstrip-@). Today this
   is fine because the v26 classifier produced one publisher row per canonical
   handle. Branch 3 will normalize alias→canonical at writer entry points,
   tightening this gate transparently.

Unit tests for the gates (positive + negative + alias resolution) deferred to
Branch 3 alongside the auto-create flow tests.

Smoke-tested:
  - pre-v26 fallback (no publishers table) → None (correct)
  - case-insensitive match (CNBC → id=1) → correct
  - @ prefix strip (@cnbc → id=1) → correct
  - non-publisher handle (alexastrum) → None (correct)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:25:24 +01:00
d0fb4c96e3 fix(attribution): gate writer on publishers table (regression prevention)
Schema v26 (commit 3fe524d) split orgs/citations from contributors into
the publishers table. Without a writer-side gate, every merged PR with
`sourcer: cnbc` (or similar) re-creates CNBC as a contributor and
undoes the v26 classifier cleanup. Once normal pipeline traffic resumes,
the contributors table re-pollutes within hours.

Fix: belt-and-suspenders gate at both writer surfaces.

1. `lib/attribution.py::is_publisher_handle(handle, conn)` — returns
   publisher.id if handle exists in publishers.name, else None. Falls
   back gracefully on pre-v26 DBs (no publishers table → returns None →
   writer behaves like before, no regression).

2. `lib/contributor.py::insert_contribution_event` — checks
   is_publisher_handle on canonical handle before INSERT. If it's a
   publisher, debug-log + return False. Prevents originator events for
   CNBC/SpaceNews/etc.

3. `lib/contributor.py::upsert_contributor` — same gate at top. Prevents
   the contributors table from re-acquiring publisher rows.

Verified end-to-end against live VPS DB snapshot:
  - CNBC originator event: blocked (insert returns False)
  - CNBC contributors row: blocked (no row created)
  - alexastrum, thesensatore, newhandle_xyz: pass through unchanged
  - is_publisher_handle handles case-insensitive lookup correctly
    (CNBC and cnbc both match publisher_id=3)

Pre-deploy event count was 3705. Post-classifier cleanup: 3623 (82 org
events purged). Going forward, no new org events accumulate.

Branch 2 of the schema-v26 rollout. Branch 3 (auto-create at tier='cited',
extract.py sources.publisher_id wiring) is separate scope and not required
for regression prevention.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:21:10 +01:00
926a397839 fix(activity): re-apply source classifier + add date-prefix slug fallback
Some checks are pending
CI / lint-and-test (push) Waiting to run
Regression: aeae712's source/create distinction was lost — VPS reverted to
pre-aeae712 behavior where every extract/* knowledge PR returned type=create
regardless of whether a claim was written. Source archives surfaced as
"New claim" chips with date-prefix slugs that 404 on click.

Root cause: aeae712 was deployed via local file copy and never pushed to
origin; a subsequent rsync from origin/main overwrote it with the older
classifier. This branch ships from origin so deploy.sh's repo-first gate
makes recurrence impossible.

- Restore aeae712: extract/* + empty description -> source, with
  empty claim_slug + source_slug field, ci_earned 0.15
- Add Leo's regex fallback: candidate_slug matching
  ^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$ -> source regardless of branch
  /commit_type/description state. Catches edge cases where description
  leaks but is just a source title (slugified into the inbox filename
  pattern), not a claim insight.
- Add 'challenge' to _FEED_COMMIT_TYPES (latent bug — challenge PRs
  would be filtered out before classification because the filter
  list omitted them; memory says 0 challenges exist so it never
  triggered, but schema support belongs in the filter)
- _build_events: compute candidate slug before classify so the regex
  fallback has a slug to inspect

Verified locally on Leo's example PRs (#4014, #4016) — both classify
as source. VPS smoke pending deploy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 13:47:00 +01:00
14 changed files with 1466 additions and 525 deletions

View file

@ -51,7 +51,7 @@ fi
# Syntax check all Python files before copying # Syntax check all Python files before copying
ERRORS=0 ERRORS=0
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py scripts/*.py; do for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py; do
[ -f "$f" ] || continue [ -f "$f" ] || continue
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>&1; then if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>&1; then
log "SYNTAX ERROR: $f" log "SYNTAX ERROR: $f"
@ -77,7 +77,6 @@ rsync "${RSYNC_OPTS[@]}" telegram/ "$PIPELINE_DIR/telegram/"
rsync "${RSYNC_OPTS[@]}" diagnostics/ "$DIAGNOSTICS_DIR/" rsync "${RSYNC_OPTS[@]}" diagnostics/ "$DIAGNOSTICS_DIR/"
rsync "${RSYNC_OPTS[@]}" agent-state/ "$AGENT_STATE_DIR/" rsync "${RSYNC_OPTS[@]}" agent-state/ "$AGENT_STATE_DIR/"
rsync "${RSYNC_OPTS[@]}" tests/ "$PIPELINE_DIR/tests/" rsync "${RSYNC_OPTS[@]}" tests/ "$PIPELINE_DIR/tests/"
rsync "${RSYNC_OPTS[@]}" scripts/ "$PIPELINE_DIR/scripts/"
[ -f research/research-session.sh ] && rsync "${RSYNC_OPTS[@]}" research/research-session.sh /opt/teleo-eval/research-session.sh [ -f research/research-session.sh ] && rsync "${RSYNC_OPTS[@]}" research/research-session.sh /opt/teleo-eval/research-session.sh
# Safety net: ensure all .sh files are executable after rsync # Safety net: ensure all .sh files are executable after rsync

View file

@ -41,7 +41,7 @@ echo ""
# Syntax check all Python files before deploying # Syntax check all Python files before deploying
echo "=== Pre-deploy syntax check ===" echo "=== Pre-deploy syntax check ==="
ERRORS=0 ERRORS=0
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py "$REPO_ROOT/scripts/"*.py; do for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py; do
[ -f "$f" ] || continue [ -f "$f" ] || continue
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>/dev/null; then if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>/dev/null; then
echo "SYNTAX ERROR: $f" echo "SYNTAX ERROR: $f"
@ -80,10 +80,6 @@ echo "=== Tests ==="
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/tests/" "$VPS_HOST:$VPS_PIPELINE/tests/" rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/tests/" "$VPS_HOST:$VPS_PIPELINE/tests/"
echo "" echo ""
echo "=== Scripts ==="
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/scripts/" "$VPS_HOST:$VPS_PIPELINE/scripts/"
echo ""
echo "=== Diagnostics ===" echo "=== Diagnostics ==="
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/" rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/"
echo "" echo ""

View file

@ -9,6 +9,16 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
_cache = {"data": None, "ts": 0} _cache = {"data": None, "ts": 0}
CACHE_TTL = 60 # 1 minute — activity should feel fresh CACHE_TTL = 60 # 1 minute — activity should feel fresh
# commit_types we surface in the activity feed. `pipeline` is system
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
# inbox archive filenames, not claim slugs. Used as a fallback signal when
# branch/description heuristics miss (e.g. populated descriptions that
# happen to be source titles, not claim insights).
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
def _get_conn(): def _get_conn():
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(DB_PATH)
@ -17,19 +27,52 @@ def _get_conn():
return conn return conn
def _classify_event(branch, description, commit_type): def _is_source_slug(slug):
if commit_type != "knowledge": return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Two signals classify them
as 'source' (defense in depth):
1. extract/* branch with empty description (no claim title produced)
2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern)
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
description_lower = (description or "").lower()
has_desc = bool(description and description.strip())
if commit_type_l not in _FEED_COMMIT_TYPES:
return None return None
if branch and branch.startswith("extract/"):
return "create" # Explicit challenge signals win first.
if branch and branch.startswith("reweave/"): if (commit_type_l == "challenge"
return "enrich" or branch.startswith("challenge/")
if branch and branch.startswith("challenge/"): or "challenged_by" in description_lower):
return "challenge" return "challenge"
if description and "challenged_by" in description.lower():
return "challenge" # Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
if branch and branch.startswith("enrich/"): if (commit_type_l == "enrich"
or branch.startswith("enrich/")
or branch.startswith("reweave/")):
return "enrich" return "enrich"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
return "source"
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
# like an inbox archive filename (date-prefix-hash), treat as source
# regardless of branch/commit_type/description state. Catches cases
# where description leaked but is just a source title, not a claim.
if _is_source_slug(candidate_slug):
return "source"
# Everything else with a description is a new claim.
return "create" return "create"
@ -59,7 +102,7 @@ def _extract_claim_slugs(description, branch=None):
if branch: if branch:
parts = branch.split("/", 1) parts = branch.split("/", 1)
if len(parts) > 1: if len(parts) > 1:
return [parts[1][:120]] return [parts[1]]
return [] return []
titles = [t.strip() for t in description.split("|") if t.strip()] titles = [t.strip() for t in description.split("|") if t.strip()]
slugs = [] slugs = []
@ -68,7 +111,7 @@ def _extract_claim_slugs(description, branch=None):
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug) slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
slug = slug.replace(" ", "-").strip("-") slug = slug.replace(" ", "-").strip("-")
if len(slug) > 10: if len(slug) > 10:
slugs.append(slug[:120]) slugs.append(slug)
return slugs return slugs
@ -81,33 +124,60 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
def _build_events(): def _build_events():
conn = _get_conn() conn = _get_conn()
try: try:
rows = conn.execute(""" placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
rows = conn.execute(f"""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd, p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel p.source_channel, p.source_path
FROM prs p FROM prs p
WHERE p.status = 'merged' WHERE p.status = 'merged'
AND p.commit_type = 'knowledge' AND p.commit_type IN ({placeholders})
AND p.merged_at IS NOT NULL AND p.merged_at IS NOT NULL
ORDER BY p.merged_at DESC ORDER BY p.merged_at DESC
LIMIT 2000 LIMIT 2000
""").fetchall() """, _FEED_COMMIT_TYPES).fetchall()
events = [] events = []
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
for row in rows: for row in rows:
event_type = _classify_event(row["branch"], row["description"], row["commit_type"]) slugs = _extract_claim_slugs(row["description"], row["branch"])
candidate_slug = slugs[0] if slugs else ""
event_type = _classify_event(
row["branch"], row["description"], row["commit_type"],
candidate_slug=candidate_slug,
)
if not event_type: if not event_type:
continue continue
contributor = _normalize_contributor(row["submitted_by"], row["agent"]) contributor = _normalize_contributor(row["submitted_by"], row["agent"])
slugs = _extract_claim_slugs(row["description"], row["branch"])
merged_at = row["merged_at"] or "" merged_at = row["merged_at"] or ""
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40} ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15}
ci_earned = ci_map.get(event_type, 0) ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written —
# so the frontend can't produce a 404-ing claim link.
if event_type == "source":
summary_text = _summary_from_branch(row["branch"])
source_slug = (
_summary_from_branch(row["branch"]).lower().replace(" ", "-")
or row["branch"]
)
events.append({
"type": "source",
"claim_slug": "",
"source_slug": source_slug,
"domain": row["domain"] or "unknown",
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"source_channel": row["source_channel"] or "unknown",
})
continue
for slug in slugs: for slug in slugs:
if slug not in claim_activity: if slug not in claim_activity:
claim_activity[slug] = { claim_activity[slug] = {
@ -164,8 +234,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True) events.sort(key=hot_key, reverse=True)
elif sort_mode == "important": elif sort_mode == "important":
type_rank = {"challenge": 0, "enrich": 1, "create": 2} type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3}
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"]))) events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"])))
return events return events
@ -175,6 +245,8 @@ async def handle_activity_feed(request):
sort_mode = "recent" sort_mode = "recent"
domain = request.query.get("domain", "") domain = request.query.get("domain", "")
contributor = request.query.get("contributor", "") contributor = request.query.get("contributor", "")
type_param = request.query.get("type", "")
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
try: try:
limit = min(int(request.query.get("limit", "20")), 100) limit = min(int(request.query.get("limit", "20")), 100)
except ValueError: except ValueError:
@ -196,6 +268,8 @@ async def handle_activity_feed(request):
filtered = [e for e in filtered if e["domain"] == domain] filtered = [e for e in filtered if e["domain"] == domain]
if contributor: if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor] filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
filtered = [e for e in filtered if e["type"] in type_filter]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events) total = len(sorted_events)

View file

@ -25,6 +25,7 @@ from aiohttp import web
from review_queue_routes import register_review_queue_routes from review_queue_routes import register_review_queue_routes
from daily_digest_routes import register_daily_digest_routes from daily_digest_routes import register_daily_digest_routes
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
from leaderboard_routes import register_leaderboard_routes, LEADERBOARD_PUBLIC_PATHS
from lib.search import search as kb_search, embed_query, search_qdrant from lib.search import search as kb_search, embed_query, search_qdrant
logger = logging.getLogger("argus") logger = logging.getLogger("argus")
@ -508,7 +509,7 @@ def _load_secret(path: Path) -> str | None:
@web.middleware @web.middleware
async def auth_middleware(request, handler): async def auth_middleware(request, handler):
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header.""" """API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"): if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path in LEADERBOARD_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
return await handler(request) return await handler(request)
expected = request.app.get("api_key") expected = request.app.get("api_key")
if not expected: if not expected:
@ -2361,6 +2362,8 @@ def create_app() -> web.Application:
# Response audit - cost tracking + reasoning traces # Response audit - cost tracking + reasoning traces
app["db_path"] = str(DB_PATH) app["db_path"] = str(DB_PATH)
register_response_audit_routes(app) register_response_audit_routes(app)
# Event-sourced leaderboard (Phase B — reads contribution_events directly)
register_leaderboard_routes(app)
# Timeline activity feed (per-PR + audit_log events for dashboard v2) # Timeline activity feed (per-PR + audit_log events for dashboard v2)
from activity_endpoint import handle_activity from activity_endpoint import handle_activity
app.router.add_get("/api/activity", handle_activity) app.router.add_get("/api/activity", handle_activity)

View file

@ -0,0 +1,166 @@
"""Leaderboard endpoint reading from event-sourced contribution_events.
Owner: Argus
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
joins contributors for kind, returns sorted leaderboard with role breakdown.
Roles + weights (Phase A):
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
Endpoints:
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
"""
import logging
import re
import sqlite3
from aiohttp import web
logger = logging.getLogger("argus.leaderboard_routes")
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
KIND_VALUES = ("person", "agent", "org", "all")
# Public path set so auth middleware lets it through
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def _parse_window(raw):
"""Parse window param. Returns (sql_clause, params_tuple, label).
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
Caps N at 365d / 8760h to prevent abuse.
"""
if not raw or raw == "all_time":
return ("", (), "all_time")
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
if not m:
return ("", (), "all_time")
n = int(m.group(1))
unit = m.group(2)
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
if unit == "d":
n = min(n, 365)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
n = min(n, 8760)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
async def handle_leaderboard(request):
"""GET /api/leaderboard.
Query params:
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
domain: filter by domain (optional)
kind: 'person' (default) | 'agent' | 'org' | 'all'
limit: max entries (default 100, max 500)
"""
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
domain = request.query.get("domain")
kind = request.query.get("kind", "person")
if kind not in KIND_VALUES:
kind = "person"
try:
limit = min(int(request.query.get("limit", "100")), 500)
except (ValueError, TypeError):
limit = 100
where = ["1=1", window_clause] if window_clause else ["1=1"]
params = list(window_params)
if domain:
where.append("ce.domain = ?")
params.append(domain)
if kind != "all":
where.append("COALESCE(c.kind, 'person') = ?")
params.append(kind)
where_sql = " AND ".join([w for w in where if w])
conn = _conn(request.app)
try:
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
# LEFT JOIN contributors so handles in events but not in contributors still appear
# (defaults to kind='person' via COALESCE).
rows = conn.execute(f"""
SELECT
ce.handle,
COALESCE(c.kind, 'person') AS kind,
ROUND(SUM(ce.weight), 4) AS ci,
COUNT(*) AS events_count,
MIN(ce.timestamp) AS first_contribution,
MAX(ce.timestamp) AS last_contribution,
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
COUNT(DISTINCT ce.domain) AS domain_count,
COUNT(DISTINCT ce.pr_number) AS pr_count
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
GROUP BY ce.handle, COALESCE(c.kind, 'person')
ORDER BY ci DESC, last_contribution DESC
LIMIT ?
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
has_more = len(rows) > limit
rows = rows[:limit]
# Total count of distinct handles matching filters (without limit)
total_row = conn.execute(f"""
SELECT COUNT(DISTINCT ce.handle) AS total
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
""", params).fetchone()
total = total_row["total"] if total_row else 0
leaderboard = []
for r in rows:
leaderboard.append({
"handle": r["handle"],
"kind": r["kind"],
"ci": r["ci"],
"ci_breakdown": {
"author": round(r["ci_author"] or 0, 4),
"challenger": round(r["ci_challenger"] or 0, 4),
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
"originator": round(r["ci_originator"] or 0, 4),
"evaluator": round(r["ci_evaluator"] or 0, 4),
},
"events_count": r["events_count"],
"domain_count": r["domain_count"],
"pr_count": r["pr_count"],
"first_contribution": r["first_contribution"],
"last_contribution": r["last_contribution"],
})
return web.json_response({
"window": window_label,
"domain": domain,
"kind_filter": kind,
"total": total,
"shown": len(leaderboard),
"has_more": has_more,
"source": "contribution_events", # explicit so consumers know the data origin
"leaderboard": leaderboard,
})
finally:
conn.close()
def register_leaderboard_routes(app: web.Application):
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
app.router.add_get("/api/leaderboard", handle_leaderboard)

View file

@ -15,6 +15,7 @@ Epimetheus owns this module. Leo reviews changes.
import logging import logging
import re import re
import sqlite3
from pathlib import Path from pathlib import Path
logger = logging.getLogger("pipeline.attribution") logger = logging.getLogger("pipeline.attribution")
@ -81,6 +82,7 @@ def normalize_handle(handle: str, conn=None) -> str:
if not handle: if not handle:
return "" return ""
h = handle.strip().lower().lstrip("@") h = handle.strip().lower().lstrip("@")
h = re.sub(r"\s*\(self-directed\)\s*$", "", h)
if conn is None: if conn is None:
return h return h
try: try:
@ -108,6 +110,36 @@ def classify_kind(handle: str) -> str:
return "person" return "person"
def is_publisher_handle(handle: str, conn) -> int | None:
"""Return publisher.id if the handle exists as a publisher name, else None.
Schema v26 split orgs/citations into the publishers table. Writer code
(upsert_contributor, insert_contribution_event) calls this to gate creating
contributor rows or events for handles that belong to publishers.
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
re-create CNBC as a contributor and undo the v26 classifier cleanup.
Falls back gracefully on pre-v26 DBs: returns None if publishers table
doesn't exist yet (writer behaves like before, no regression).
"""
if not handle or conn is None:
return None
h = handle.strip().lower().lstrip("@")
try:
row = conn.execute(
"SELECT id FROM publishers WHERE name = ?", (h,),
).fetchone()
if row:
return row["id"] if hasattr(row, "keys") else row[0]
except sqlite3.OperationalError:
# Pre-v26 DB: publishers table doesn't exist yet. Fall through to None
# so writer behaves as before. Any other exception class is real signal
# (programming error, lock contention, corruption) — let it propagate.
logger.debug("is_publisher_handle: publishers table not present (pre-v26?)", exc_info=True)
return None
# ─── Parse attribution from claim content ────────────────────────────────── # ─── Parse attribution from claim content ──────────────────────────────────

View file

@ -14,7 +14,7 @@ import logging
import re import re
from . import config, db from . import config, db
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
from .forgejo import get_pr_diff from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor") logger = logging.getLogger("pipeline.contributor")
@ -62,6 +62,12 @@ def insert_contribution_event(
canonical = normalize_handle(handle, conn=conn) canonical = normalize_handle(handle, conn=conn)
if not canonical: if not canonical:
return False return False
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
# etc.) are provenance metadata, not contributors. Don't credit them. Without
# this gate every merge re-creates org events and undoes the v26 cleanup.
if is_publisher_handle(canonical, conn) is not None:
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
return False
kind = classify_kind(canonical) kind = classify_kind(canonical)
try: try:
cur = conn.execute( cur = conn.execute(
@ -419,6 +425,21 @@ def upsert_contributor(
logger.warning("Unknown contributor role: %s", role) logger.warning("Unknown contributor role: %s", role)
return return
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
# Skip without writing so the v26 classifier cleanup isn't undone by every
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
#
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
# below — both look up by canonical-form-as-stored. Today's classifier produces
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
# normalize alias→canonical at writer entry points (extract.py, post_extract);
# at that point this gate auto-tightens because callers pass canonical handles.
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
return
existing = conn.execute( existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,) "SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone() ).fetchone()

View file

@ -267,6 +267,7 @@ format: tweet | thread
status: unprocessed status: unprocessed
priority: high | medium | low priority: high | medium | low
tags: [topic1, topic2] tags: [topic1, topic2]
intake_tier: research-task
--- ---
## Content ## Content

View file

@ -0,0 +1,280 @@
#!/usr/bin/env python3
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
Problem: research-session.sh used to write source frontmatter without
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
`contribution_events` as a `handle='m3taversal', role='author'` row per
research-derived claim. Result: agent research credited to the human.
Forward fix is a frontmatter-template patch to research-session.sh.
This script corrects historical records.
Identification:
Research-session source archives are committed to teleo-codex with a
message matching `^<agent>: research session YYYY-MM-DD `. The diff
for that commit lists `inbox/queue/*.md` files the agent created. Any
PR whose `source_path` matches one of those filenames is research-derived.
Touch list (per matched PR):
1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
2. DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author' AND pr_number=?
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
Defaults to --dry-run. Pass --apply to commit changes.
Usage:
python3 backfill-research-session-attribution.py --dry-run --days 30
python3 backfill-research-session-attribution.py --apply --days 30
"""
import argparse
import logging
import os
import re
import sqlite3
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("backfill-research-attr")
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
AUTHOR_WEIGHT = 0.30
def git(repo: Path, *args: str) -> str:
"""Run a git command in repo, return stdout. Raises on non-zero."""
result = subprocess.run(
["git", "-C", str(repo), *args],
capture_output=True, text=True, check=True,
)
return result.stdout
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
"""Return {source_filename_basename: agent_handle} for last N days.
Walks teleo-codex `git log --since`, filters to research-session commits,
parses agent from message header, lists inbox/queue/*.md files added in
that commit's diff. Maps the basename (which becomes source_path on extract)
to the agent who created it.
"""
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
file_to_agent: dict[str, str] = {}
commits_seen = 0
commits_matched = 0
for line in log.splitlines():
if not line or "|" not in line:
continue
commits_seen += 1
sha, _, subject = line.partition("|")
m = COMMIT_HEADER_RE.match(subject)
if not m:
continue
agent = m.group(1)
if agent not in KNOWN_AGENTS:
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
continue
commits_matched += 1
# List files added in this commit (inbox/queue/*.md only)
try:
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
"--diff-filter=A", sha)
except subprocess.CalledProcessError:
logger.warning("diff-tree failed for %s", sha[:8])
continue
for f in added.splitlines():
if f.startswith("inbox/queue/") and f.endswith(".md"):
basename = Path(f).name
if basename in file_to_agent and file_to_agent[basename] != agent:
logger.warning(
"filename collision: %s — was %s, now %s (keeping first)",
basename, file_to_agent[basename], agent,
)
continue
file_to_agent.setdefault(basename, agent)
logger.info(
"scanned %d commits, %d research-session matches, %d unique source files",
commits_seen, commits_matched, len(file_to_agent),
)
return file_to_agent
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
Only includes PRs:
- with source_path basename in our research-session map
- currently attributed to '@m3taversal'
- merged within the last N days (cap on temporal scope)
"""
rows = conn.execute(
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
FROM prs
WHERE submitted_by = '@m3taversal'
AND source_path IS NOT NULL
AND status = 'merged'
AND merged_at > datetime('now', ?)""",
(f"-{days} days",),
).fetchall()
matches = []
for row in rows:
basename = Path(row["source_path"]).name
agent = file_to_agent.get(basename)
if agent:
matches.append({
"pr": row["number"],
"current_submitted_by": row["submitted_by"],
"source_path": row["source_path"],
"basename": basename,
"agent": agent,
"domain": row["domain"],
"channel": row["source_channel"],
"merged_at": row["merged_at"],
})
return matches
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
return conn.execute(
"""SELECT COUNT(*) FROM contribution_events
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
(handle, role, pr),
).fetchone()[0]
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
"""Apply the backfill. Returns counters."""
counters = defaultdict(int)
if not dry_run:
conn.execute("BEGIN")
try:
for m in matches:
pr = m["pr"]
agent = m["agent"]
# Pre-checks for accurate dry-run reporting
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
if dry_run:
logger.info(
"would update pr=%d submitted_by '%s''%s (self-directed)' "
"[m3ta_event=%s, agent_event=%s]",
pr, m["current_submitted_by"], agent,
old_event_exists, new_event_exists,
)
counters["prs"] += 1
if old_event_exists:
counters["events_to_delete"] += 1
if not new_event_exists:
counters["events_to_insert"] += 1
continue
# 1. UPDATE prs.submitted_by
conn.execute(
"UPDATE prs SET submitted_by = ? WHERE number = ?",
(f"{agent} (self-directed)", pr),
)
counters["prs"] += 1
# 2. INSERT new agent author event (idempotent via UNIQUE index)
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
)
if cur.rowcount > 0:
counters["events_inserted"] += 1
# 3. DELETE old m3taversal author event
cur = conn.execute(
"""DELETE FROM contribution_events
WHERE handle = 'm3taversal' AND role = 'author'
AND pr_number = ? AND claim_path IS NULL""",
(pr,),
)
if cur.rowcount > 0:
counters["events_deleted"] += 1
if not dry_run:
conn.execute("COMMIT")
except Exception:
if not dry_run:
conn.execute("ROLLBACK")
raise
return dict(counters)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
parser.add_argument("--limit", type=int, default=0,
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
args = parser.parse_args()
dry_run = not args.apply
logger.info("repo=%s db=%s days=%d mode=%s",
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
if not args.repo.exists():
logger.error("repo not found: %s", args.repo)
sys.exit(1)
if not args.db.exists():
logger.error("db not found: %s", args.db)
sys.exit(1)
file_to_agent = discover_research_session_archives(args.repo, args.days)
if not file_to_agent:
logger.warning("no research-session source files found in last %d days", args.days)
sys.exit(0)
# Per-agent breakdown
by_agent = defaultdict(int)
for agent in file_to_agent.values():
by_agent[agent] += 1
for agent, count in sorted(by_agent.items()):
logger.info(" research-session sources by %s: %d", agent, count)
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
matches = find_misattributed_prs(conn, file_to_agent, args.days)
logger.info("misattributed PRs found: %d", len(matches))
if args.limit and len(matches) > args.limit:
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
matches = matches[:args.limit]
if not matches:
logger.info("nothing to do")
return
# Per-agent breakdown of misattribution
miss_by_agent = defaultdict(int)
for m in matches:
miss_by_agent[m["agent"]] += 1
logger.info("misattributed PR breakdown:")
for agent, count in sorted(miss_by_agent.items()):
logger.info(" %s: %d", agent, count)
counters = apply_backfill(conn, matches, dry_run)
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
if __name__ == "__main__":
main()

View file

@ -1,495 +0,0 @@
#!/usr/bin/env python3
"""metadao-scrape.py — pull active/recent proposals from metadao.fi into source markdown.
Replaces the broken futard.io GraphQL ingestion (Cloud Run teleo-api).
metadao.fi is a Vercel-protected Next.js App Router site; direct curl is blocked
by the anti-bot challenge. A real headless browser passes the challenge cleanly,
and once cookies are issued for the context we can call /api/decode-proposal/{addr}
from inside the browser to get structured instruction data.
Discovery flow:
1. visit / to prime Vercel cookies
2. visit /projects, scrape distinct /projects/{slug} hrefs
3. for each project, visit /projects/{slug}, scrape proposal addresses from DOM
4. for each NEW proposal (basename not already in --archive-dir):
a. visit proposal page, capture rendered prose
b. call /api/decode-proposal/{addr} via in-browser fetch for instructions
c. write source markdown to --output-dir
Idempotent. Skips proposals whose basename is already present in archive-dir
or output-dir. Designed to run from a systemd timer or one-shot.
Usage:
python3 metadao-scrape.py --archive-dir /opt/teleo-eval/workspaces/main/inbox/archive \\
--output-dir /opt/teleo-eval/workspaces/main/inbox/queue \\
[--dry-run] [--limit 10] [--project solomon]
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import sys
from datetime import date, datetime
from pathlib import Path
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("metadao-scrape")
BASE = "https://www.metadao.fi"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
)
def slugify(text: str, max_len: int = 60) -> str:
s = text.lower().strip()
s = re.sub(r"[^a-z0-9\s-]", "", s)
s = re.sub(r"\s+", "-", s)
s = re.sub(r"-+", "-", s)
return s.strip("-")[:max_len].rstrip("-")
def _yaml_str(s: str) -> str:
"""Quote-safe YAML string. JSON strings are valid YAML strings."""
return json.dumps(s, ensure_ascii=False)
def existing_basenames(*dirs: Path) -> set[str]:
"""Collect all .md basenames (without extension) across the given dirs (recursive)."""
seen: set[str] = set()
for d in dirs:
if not d.exists():
continue
for p in d.rglob("*.md"):
seen.add(p.stem)
return seen
PROP_ADDR_RE = re.compile(r"proposal_address:\s*[\"']?([A-Za-z0-9]{32,44})[\"']?")
URL_ADDR_RE = re.compile(r"(?:futard\.io|metadao\.fi)(?:/[^/\s\"']*)*?/proposal/([A-Za-z0-9]{32,44})")
def existing_proposal_addresses(*dirs: Path) -> set[str]:
"""Scan frontmatter / URLs in existing source files to collect known proposal addresses.
Reads only the first 4KB of each file (frontmatter + URL line are at the top)
to keep this fast on large archives.
"""
addrs: set[str] = set()
for d in dirs:
if not d.exists():
continue
for p in d.rglob("*.md"):
try:
head = p.read_text(errors="replace")[:4096]
except Exception:
continue
for m in PROP_ADDR_RE.finditer(head):
addrs.add(m.group(1))
for m in URL_ADDR_RE.finditer(head):
addrs.add(m.group(1))
return addrs
def list_project_slugs(page) -> list[str]:
"""Read /projects and extract distinct project slugs."""
page.goto(f"{BASE}/projects", wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(1500)
hrefs = page.evaluate(
"""() => {
const links = Array.from(document.querySelectorAll('a[href^="/projects/"]'));
const slugs = new Set();
for (const a of links) {
const m = a.getAttribute('href').match(/^\\/projects\\/([a-z0-9-]+)(?:\\/|$)/);
if (m && m[1]) slugs.add(m[1]);
}
return [...slugs];
}"""
)
return list(hrefs)
def get_project_metadata(page, slug: str) -> dict:
"""Visit a project page and return basic metadata + proposal addresses + card text.
Card text typically contains 'SOLO-004 ENDED DP-00003 (MEM): The Gigabus Proposal Pass $0.64...'
so we capture it for downstream title parsing.
"""
url = f"{BASE}/projects/{slug}"
page.goto(url, wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(1500)
proposals = page.evaluate(
"""() => {
const links = Array.from(document.querySelectorAll('a[href*="/proposal/"]'));
const seen = new Set();
const out = [];
const TARGET_ADDR_RE = /\\/proposal\\/([A-Za-z0-9]+)/;
for (const a of links) {
const m = a.getAttribute('href').match(TARGET_ADDR_RE);
if (!m) continue;
if (seen.has(m[1])) continue;
seen.add(m[1]);
const addr = m[1];
// Walk up only while the ancestor contains exactly one proposal link
// (so we get the card, not a parent that contains all cards).
let card = a;
while (card.parentElement) {
const parent = card.parentElement;
const propLinks = parent.querySelectorAll('a[href*="/proposal/"]');
if (propLinks.length > 1) break;
card = parent;
}
out.push({
address: addr,
link_text: (a.innerText || '').trim().slice(0, 600),
card_text: (card.innerText || '').trim().slice(0, 1500),
});
}
return out;
}"""
)
# Try to read project name from h1 / title
project_name = page.evaluate(
"""() => {
const h = document.querySelector('h1');
return h ? h.innerText.trim() : '';
}"""
) or slug.title()
return {"slug": slug, "name": project_name, "url": url, "proposals": proposals}
# Strict pattern: DP-NNNNN (CAT): Title — the canonical proposal heading.
DP_STRICT_RE = re.compile(r"DP-\d+\s*\([A-Z]+\)\s*[:\-]\s*[^\n\r]+", re.MULTILINE)
# Loose pattern: any line starting with DP-NNNNN followed by something.
DP_LOOSE_RE = re.compile(r"DP-\d+\s*(?:\([A-Z]+\))?\s*[:\-]?\s*[^\n\r]+", re.MULTILINE)
STAT_BLEED_RE = re.compile(
# Stat keywords only bleed when followed by a numeric/symbolic stat token,
# so word-only sequences like "Active Capital" or "Live Streaming Service" pass.
r"\s+\b(?:Pass|Fail|Passed|Failed|Active|Pending|Ended|Live|TOTAL|VOLUME|STATUS|MCAP|PRICE|SPOT)\b\s+(?:\$|\+|-|\d)"
r"|\s*(?:\$\d|\+\d{2,}|\d+\.\d+%|\d{5,})",
re.IGNORECASE,
)
def _clean_title_candidate(line: str) -> str:
line = line.strip()
# Find first bleed match past offset 10. re.search returns leftmost, but the
# DP-NNNNN digit sequence always wins first place; we want the first POST-title
# match instead. Walk all matches and trim at the earliest one past the guard.
for bleed in STAT_BLEED_RE.finditer(line):
if bleed.start() > 10:
line = line[: bleed.start()].rstrip(" :-—")
break
return line.strip()[:200]
def extract_dp_title(*texts: str) -> str:
"""Find the canonical 'DP-NNNNN (CAT): Title' line.
Strategy:
1. Try strict pattern (with parenthetical category code) across all sources.
Take the SHORTEST hit prose continuations of an already-correct title
tend to be longer than the title itself.
2. Fall back to loose pattern, longest match.
"""
strict: list[str] = []
loose: list[str] = []
for t in texts:
if not t:
continue
for m in DP_STRICT_RE.finditer(t):
cleaned = _clean_title_candidate(m.group(0))
if cleaned:
strict.append(cleaned)
for m in DP_LOOSE_RE.finditer(t):
cleaned = _clean_title_candidate(m.group(0))
if cleaned:
loose.append(cleaned)
if strict:
return min(strict, key=len)
if loose:
return max(loose, key=len)
return ""
def fetch_proposal(page, project_slug: str, addr: str, card_text: str = "") -> dict | None:
"""Visit proposal page, capture rendered text + decode instructions via in-browser fetch."""
url = f"{BASE}/projects/{project_slug}/proposal/{addr}"
log.info("fetching proposal %s/%s", project_slug, addr[:8])
try:
page.goto(url, wait_until="domcontentloaded", timeout=45000)
except PWTimeout:
log.warning("timeout loading %s — using whatever rendered", url)
page.wait_for_timeout(2500) # let RSC stream finish
body_text = page.evaluate("() => document.body.innerText || ''")
# Title preference: card_text (from project page) → body_text DP-NNNNN match → first h1/h2
title_block = extract_dp_title(card_text, body_text)
if not title_block:
title_block = page.evaluate(
"""() => {
const h = document.querySelector('h1, h2');
return h ? h.innerText.trim() : '';
}"""
) or f"proposal-{addr[:8]}"
# Status: 'Passed' / 'Failed' / 'Active' / 'Pending'
status = page.evaluate(
"""() => {
const text = document.body.innerText || '';
const m = text.match(/\\n(Passed|Failed|Active|Pending|Live|Ended)\\b/);
return m ? m[1] : '';
}"""
)
# Get the structured /api/decode-proposal data
decoded = None
try:
decoded = page.evaluate(
f"""async () => {{
try {{
const r = await fetch('/api/decode-proposal/{addr}');
if (!r.ok) return null;
return await r.json();
}} catch (e) {{ return null; }}
}}"""
)
except Exception as e:
log.debug("decode fetch failed for %s: %s", addr, e)
return {
"address": addr,
"project_slug": project_slug,
"url": url,
"title": title_block,
"status": status,
"body_text": body_text,
"decoded": decoded,
}
def parse_dp_code(title: str) -> tuple[str, str]:
"""Parse 'DP-00003 (MEM): The Gigabus Proposal' → ('dp-00003-mem', 'The Gigabus Proposal').
Falls back gracefully if format doesn't match.
"""
# Match leading DP-NNNNN[space(category)]?[:]?[space]? plus the rest
m = re.match(r"^(DP-\d+(?:\s*\([A-Z]+\))?)\s*[:\-]?\s*(.*)$", title.strip())
if m:
code = re.sub(r"[^a-z0-9]+", "-", m.group(1).lower()).strip("-")
rest = m.group(2).strip()
return code, rest
return "", title.strip()
def build_filename(project_slug: str, proposal: dict, today: str) -> str:
"""YYYY-MM-DD-metadao-{slug}-{title-fragment}-{addr8}.md
Embedding the address fragment makes filenames stable across runs even when
the title isn't unique (e.g. projects that don't use DP-NNNNN naming).
"""
title = proposal.get("title") or ""
code, rest = parse_dp_code(title)
parts: list[str] = []
if code:
parts.append(code)
if rest:
parts.append(slugify(rest, max_len=40))
body_slug = "-".join(p for p in parts if p)[:60].rstrip("-")
addr_frag = proposal["address"][:8].lower()
if body_slug:
return f"{today}-metadao-{project_slug}-{body_slug}-{addr_frag}.md"
return f"{today}-metadao-{project_slug}-{addr_frag}.md"
def build_source_markdown(project: dict, proposal: dict, today: str) -> str:
"""Build the source markdown matching the existing schema."""
title = proposal.get("title") or f"{project['name']} proposal {proposal['address'][:8]}"
body_text = (proposal.get("body_text") or "").strip()
decoded = proposal.get("decoded") or {}
# Build YAML frontmatter — all free-text values escaped via _yaml_str (json.dumps).
# project_slug is constrained to [a-z0-9-] by slugify upstream, but pass through
# the same path for consistency.
full_title = f"MetaDAO: {project['name']}{title}"
fm_lines = [
"---",
"type: source",
f"title: {_yaml_str(full_title)}",
f"author: {_yaml_str('metadao.fi')}",
f"url: {_yaml_str(proposal['url'])}",
f"date: {today}",
"domain: internet-finance",
"format: data",
"status: unprocessed",
f"tags: [futardio, metadao, futarchy, solana, governance, {project['slug']}]",
"event_type: proposal",
f"project_slug: {_yaml_str(project['slug'])}",
f"proposal_address: {_yaml_str(proposal['address'])}",
]
if proposal.get("status"):
fm_lines.append(f"proposal_status: {_yaml_str(proposal['status'])}")
if decoded.get("squadsProposal"):
fm_lines.append(f"squads_proposal: {_yaml_str(decoded['squadsProposal'])}")
if decoded.get("squadsStatus"):
fm_lines.append(f"squads_status: {_yaml_str(decoded['squadsStatus'])}")
fm_lines.append("---")
fm_lines.append("")
# Header section — quick facts
body_md = [
f"# {title}",
"",
"## Proposal Details",
f"- Project: {project['name']} (`{project['slug']}`)",
f"- Proposal: {title}",
f"- Address: `{proposal['address']}`",
]
if proposal.get("status"):
body_md.append(f"- Status: {proposal['status']}")
body_md.append(f"- URL: {proposal['url']}")
# Proposal prose body (rendered text from the page)
body_md.append("")
body_md.append("## Proposal Body")
body_md.append("")
body_md.append(body_text or "_(no body captured)_")
# Decoded on-chain instructions
if decoded:
body_md.append("")
body_md.append("## On-chain Decoded")
if decoded.get("squadsUrl"):
body_md.append(f"- Squads: {decoded['squadsUrl']}")
instrs = decoded.get("instructions") or []
if instrs:
body_md.append("")
body_md.append("### Instructions")
for i, instr in enumerate(instrs, 1):
body_md.append(f"{i}. **{instr.get('description', instr.get('type', 'instruction'))}** ({instr.get('program', '')})")
for f in instr.get("fields", []) or []:
val = f.get("fullValue") or f.get("value") or ""
body_md.append(f" - {f.get('label', '')}: `{val}`")
if instr.get("summary"):
body_md.append(f" - Summary: {instr['summary']}")
return "\n".join(fm_lines + body_md) + "\n"
def main() -> int:
p = argparse.ArgumentParser(description="Scrape MetaDAO proposals into inbox source files")
p.add_argument("--archive-dir", required=True, help="existing archive dir (skip if basename exists here)")
p.add_argument("--output-dir", required=True, help="dir to write new source markdown into")
p.add_argument("--project", help="restrict to a single project slug (default: scan all)")
p.add_argument("--limit", type=int, default=0, help="max number of new proposals to capture (0 = unlimited)")
p.add_argument("--dry-run", action="store_true", help="print intended writes instead of writing")
p.add_argument("--headless", action="store_true", default=True)
args = p.parse_args()
archive_dir = Path(args.archive_dir).resolve()
output_dir = Path(args.output_dir).resolve()
seen_basenames = existing_basenames(archive_dir, output_dir)
seen_addresses = existing_proposal_addresses(archive_dir, output_dir)
log.info("loaded %d existing basenames + %d known proposal addresses from %s + %s",
len(seen_basenames), len(seen_addresses), archive_dir, output_dir)
today = date.today().isoformat()
written: list[str] = []
skipped_existing = 0
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=args.headless)
ctx = browser.new_context(user_agent=USER_AGENT)
page = ctx.new_page()
# Prime cookies
log.info("priming Vercel session via homepage")
page.goto(f"{BASE}/", wait_until="domcontentloaded", timeout=30000)
page.wait_for_timeout(1500)
# Discovery
if args.project:
project_slugs = [args.project]
else:
project_slugs = list_project_slugs(page)
log.info("discovered %d project slugs: %s", len(project_slugs), project_slugs)
for slug in project_slugs:
try:
project = get_project_metadata(page, slug)
except Exception:
log.exception("failed to read project %s", slug)
continue
log.info(" %s%d proposals", slug, len(project["proposals"]))
for prop in project["proposals"]:
addr = prop["address"]
# Pre-check #1: known proposal address (cheapest, no browser visit)
if addr in seen_addresses:
skipped_existing += 1
continue
# Pre-check #2: address fragment in an existing basename
addr_frag = addr[:8].lower()
if any(addr_frag in b.lower() for b in seen_basenames):
skipped_existing += 1
continue
try:
proposal_data = fetch_proposal(page, slug, addr, card_text=prop.get("card_text", ""))
except Exception:
log.exception("failed to fetch proposal %s/%s", slug, addr)
continue
if not proposal_data:
continue
# Minimum-render gate: skip partial renders rather than archiving stubs.
# Successful captures are 20KB+; require either a real body or a DP-N title.
body_len = len(proposal_data.get("body_text") or "")
has_dp_match = bool(re.search(r"DP-\d+", proposal_data.get("title", "") or ""))
if body_len < 500 and not has_dp_match:
log.warning(" skip (insufficient render): %s body=%dB title=%r",
addr, body_len, proposal_data.get("title", ""))
continue
fname = build_filename(slug, proposal_data, today)
if Path(fname).stem in seen_basenames:
skipped_existing += 1
log.info(" skip (already archived by title): %s", fname)
continue
content = build_source_markdown(project, proposal_data, today)
target = output_dir / fname
if args.dry_run:
log.info(" DRY: would write %s (%d bytes)", target, len(content))
else:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content)
log.info(" wrote %s (%d bytes)", target, len(content))
written.append(fname)
if args.limit and len(written) >= args.limit:
log.info("hit limit=%d, stopping", args.limit)
browser.close()
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
return 0
browser.close()
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Reset m3taversal.sourcer_count from inflated legacy value to file-truth count.
Background: pre-Phase-A extract.py had a `submitted_by` fallback that credited
m3taversal as sourcer for every Telegram-ingested source, accumulating to 1011
sourcer_count in the contributors table. The actual file-truth count (sourcer
frontmatter equal to "m3taversal" in claim files) is 21. The 990-row delta is
infrastructure attribution that doesn't reflect content authorship.
The Phase A event-sourced ledger (contribution_events) computed the correct
389.55 CI from author events; /api/leaderboard reads from there directly.
But the legacy /api/contributors endpoint reads contributors.claims_merged
which carries the inflated 1011. Until that endpoint is deprecated, the
divergence shows two different numbers depending on which surface the UI
queries.
This script applies the surgical UPDATE that was run on VPS on 2026-04-27
during the leaderboard cutover. Committed as a script per Ganymede review:
"DB mutations go through reviewable code paths matters more than the
convenience of one-shot SQL. The artifact explains what was done and why."
Idempotent safe to re-run. If sourcer_count is already 21, no change.
Usage:
python3 scripts/reset-m3taversal-sourcer.py --dry-run
python3 scripts/reset-m3taversal-sourcer.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
TARGET_HANDLE = "m3taversal"
TRUTH_SOURCER_COUNT = 21
TRUTH_CLAIMS_MERGED = 21
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT handle, sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
if not row:
print(f" No contributors row for {TARGET_HANDLE} — nothing to reset.")
return
print(
f" Current: {row['handle']} sourcer_count={row['sourcer_count']} "
f"claims_merged={row['claims_merged']}"
)
print(f" Target: sourcer_count={TRUTH_SOURCER_COUNT} claims_merged={TRUTH_CLAIMS_MERGED}")
if (row["sourcer_count"] == TRUTH_SOURCER_COUNT
and row["claims_merged"] == TRUTH_CLAIMS_MERGED):
print(" Already at target values — no-op.")
return
if args.dry_run:
print(" (dry-run) UPDATE would be applied. Re-run without --dry-run.")
return
conn.execute(
"""UPDATE contributors SET
sourcer_count = ?,
claims_merged = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(TRUTH_SOURCER_COUNT, TRUTH_CLAIMS_MERGED, TARGET_HANDLE),
)
conn.execute(
"""INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)""",
(
"manual",
"m3taversal_sourcer_reset",
(
'{"reason":"Pre-Phase-A submitted_by fallback inflated to 1011; '
'file-truth is 21","sourcer_count_before":1011,'
'"sourcer_count_after":21,"claims_merged_after":21}'
),
),
)
conn.commit()
after = conn.execute(
"SELECT sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
print(
f" Applied. Now: sourcer_count={after['sourcer_count']} "
f"claims_merged={after['claims_merged']}"
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,152 @@
"""Tests for diagnostics/activity_endpoint.py classify_pr_operation.
Covers the Leo gotcha extract/* branches with commit_type=enrich or
challenge classify by commit_type, not branch prefix. Same class of bug
as the contributor-role wiring fix.
"""
import sys
from pathlib import Path
import pytest
# diagnostics/ isn't on sys.path by default; add it for these tests.
_DIAG = Path(__file__).resolve().parents[1] / "diagnostics"
if str(_DIAG) not in sys.path:
sys.path.insert(0, str(_DIAG))
# aiohttp is imported at module load time; skip cleanly if not installed.
pytest.importorskip("aiohttp")
from activity_endpoint import classify_pr_operation # noqa: E402
# ─── Merged PRs: commit_type wins over branch prefix ───────────────────────
def test_extract_branch_legacy_knowledge_classifies_new():
assert classify_pr_operation("merged", "knowledge", "extract/foo", None) == "new"
def test_extract_branch_with_enrich_commit_type_classifies_enrich():
"""Leo gotcha: extract/* + commit_type=enrich → enrich, not new."""
assert classify_pr_operation("merged", "enrich", "extract/foo", None) == "enrich"
def test_extract_branch_with_challenge_commit_type_classifies_challenge():
"""Leo gotcha: extract/* + commit_type=challenge → challenge, not new."""
assert classify_pr_operation("merged", "challenge", "extract/foo", None) == "challenge"
def test_challenged_by_in_description_classifies_challenge():
assert (
classify_pr_operation(
"merged", "knowledge", "extract/foo", "evidence for challenged_by claim"
)
== "challenge"
)
# ─── Branch prefix fallback (when commit_type is generic) ──────────────────
def test_reweave_branch_classifies_enrich():
assert classify_pr_operation("merged", "knowledge", "reweave/batch-1", None) == "enrich"
def test_challenge_branch_classifies_challenge():
assert (
classify_pr_operation("merged", "knowledge", "challenge/nuclear-moloch", None)
== "challenge"
)
# ─── Maintenance commit_types → infra ──────────────────────────────────────
def test_fix_commit_type_classifies_infra():
assert classify_pr_operation("merged", "fix", "fix/deploy-bug", None) == "infra"
def test_pipeline_commit_type_classifies_infra():
assert (
classify_pr_operation("merged", "pipeline", "epimetheus/migration-v14", None)
== "infra"
)
# ─── Knowledge-producing commit_types → new ────────────────────────────────
def test_research_commit_type_classifies_new():
assert (
classify_pr_operation("merged", "research", "theseus/cornelius-batch-2", None)
== "new"
)
def test_entity_commit_type_classifies_new():
assert classify_pr_operation("merged", "entity", "leo/entities-update", None) == "new"
# ─── Non-merged statuses route through NON_MERGED_STATUS_TO_OPERATION ──────
def test_open_pr_classifies_extract():
assert classify_pr_operation("open", None, "extract/foo", None) == "extract"
def test_approved_pr_classifies_new():
assert classify_pr_operation("approved", None, "extract/foo", None) == "new"
def test_closed_pr_classifies_infra():
assert classify_pr_operation("closed", None, "extract/foo", None) == "infra"
def test_conflict_pr_classifies_challenge():
assert classify_pr_operation("conflict", None, "extract/foo", None) == "challenge"
def test_validating_pr_classifies_extract():
assert classify_pr_operation("validating", None, "extract/foo", None) == "extract"
def test_reviewing_pr_classifies_extract():
assert classify_pr_operation("reviewing", None, "extract/foo", None) == "extract"
def test_merging_pr_classifies_new():
assert classify_pr_operation("merging", None, "extract/foo", None) == "new"
def test_zombie_pr_classifies_infra():
assert classify_pr_operation("zombie", None, "extract/foo", None) == "infra"
# ─── Priority order: reweave commit_type vs reweave/ branch ─────────────────
# Reweave commit_type is in _MAINTENANCE_COMMIT_TYPES (→ infra), but
# branch.startswith('reweave/') is checked first (→ enrich). The bifurcation
# is real spec behavior — nightly reweave PRs must classify as enrich, not
# infra. Locking this in prevents a silent flip on future priority refactors.
def test_reweave_commit_type_with_reweave_branch_classifies_enrich():
"""Branch prefix wins over maintenance — reweave PRs are enrich, not infra."""
assert classify_pr_operation("merged", "reweave", "reweave/batch-1", None) == "enrich"
def test_reweave_commit_type_without_reweave_branch_classifies_infra():
"""Without reweave/ prefix, reweave commit_type falls to maintenance → infra."""
assert classify_pr_operation("merged", "reweave", "epimetheus/foo", None) == "infra"
# ─── Defensive cases — null/empty inputs shouldn't crash ───────────────────
def test_null_commit_type_and_branch_classifies_new():
assert classify_pr_operation("merged", None, None, None) == "new"
def test_unknown_status_falls_back_to_infra():
assert classify_pr_operation("nonsense", None, None, None) == "infra"

437
tests/test_leaderboard.py Normal file
View file

@ -0,0 +1,437 @@
"""Tests for /api/leaderboard endpoint (diagnostics/leaderboard_routes.py).
Locks behavior for the four slicings consumed by Argus + Oberon:
- window: all_time | Nd | Nh
- domain: per-domain filter
- kind: person | agent | org | all
- limit: pagination + has_more flag
Regression coverage includes the AND-prefix SQL bug (commit 42d35d4): _parse_window
returned clauses prefixed with 'AND ' which produced 'WHERE 1=1 AND AND ...' when
joined into the WHERE clause via " AND ".join(...).
"""
import asyncio
import json
import sqlite3
from pathlib import Path
import pytest
# Skip whole file if aiohttp isn't available (matches test_activity_classify.py pattern)
aiohttp = pytest.importorskip("aiohttp")
# Make diagnostics/ importable
import sys
DIAG_ROOT = Path(__file__).parent.parent / "diagnostics"
sys.path.insert(0, str(DIAG_ROOT))
from leaderboard_routes import ( # noqa: E402
_parse_window,
handle_leaderboard,
KIND_VALUES,
LEADERBOARD_PUBLIC_PATHS,
)
from aiohttp.test_utils import make_mocked_request # noqa: E402
# ─── Schema lifted from lib/db.py:138-209 (v25 minimum) ──────────────────────
SCHEMA = """
CREATE TABLE contributors (
handle TEXT PRIMARY KEY,
kind TEXT DEFAULT 'person',
tier TEXT DEFAULT 'new',
claims_merged INTEGER DEFAULT 0,
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
first_contribution TEXT,
last_contribution TEXT
);
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def db_path(tmp_path):
"""Seeded pipeline.db with deterministic events.
Cohort:
- alice (person): 3 author events, 1 originator (recent 3d, internet-finance)
- bob (person): 5 author events (older, 60d ago, ai-alignment)
- carol (person): 1 author + 1 evaluator (today, internet-finance)
- rio (agent): 4 author + 2 evaluator (mixed, internet-finance + grand-strategy)
- leo (agent): 8 evaluator events (today, mixed domains)
- cnbc (org): 2 originator events (legacy, before classifier moved orgs)
- newhandle (no contributors row): 1 author event tests LEFT JOIN COALESCE
"""
p = tmp_path / "pipeline.db"
conn = sqlite3.connect(str(p))
conn.executescript(SCHEMA)
contribs = [
("alice", "person"),
("bob", "person"),
("carol", "person"),
("rio", "agent"),
("leo", "agent"),
("cnbc", "org"),
# newhandle intentionally absent — tests LEFT JOIN
]
for handle, kind in contribs:
conn.execute(
"INSERT INTO contributors (handle, kind) VALUES (?, ?)",
(handle, kind),
)
# (handle, role, weight, pr_number, claim_path, domain, timestamp)
events = [
# alice — 3 author + 1 originator, recent (all >24h ago, all <7d)
# Most-recent event at -2 days (not -1 days) so 24h window exclusion is
# unambiguous and not subject to fixture-vs-query microsecond drift.
("alice", "author", 0.30, 100, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 101, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 102, None, "ai-alignment", "now,-3 days"),
("alice", "originator", 0.15, 103, "domains/internet-finance/x.md", "internet-finance", "now,-2 days"),
# bob — 5 author, all 60d ago (outside 30d, inside all_time)
("bob", "author", 0.30, 200, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 201, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 202, None, "ai-alignment", "now,-61 days"),
("bob", "author", 0.30, 203, None, "ai-alignment", "now,-62 days"),
("bob", "author", 0.30, 204, None, "ai-alignment", "now,-63 days"),
# carol — 1 author + 1 evaluator, today
("carol", "author", 0.30, 300, None, "internet-finance", "now"),
("carol", "evaluator", 0.05, 301, None, "internet-finance", "now"),
# rio agent — 4 author + 2 evaluator
("rio", "author", 0.30, 400, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 401, None, "grand-strategy", "now,-2 days"),
("rio", "author", 0.30, 402, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 403, None, "internet-finance", "now,-2 days"),
("rio", "evaluator", 0.05, 404, None, "ai-alignment", "now,-2 days"),
("rio", "evaluator", 0.05, 405, None, "ai-alignment", "now,-2 days"),
# leo agent — 8 evaluator
*[
("leo", "evaluator", 0.05, 500 + i, None, "internet-finance" if i % 2 == 0 else "ai-alignment", "now")
for i in range(8)
],
# cnbc org — 2 originator (legacy data, kept by classifier+gate split)
("cnbc", "originator", 0.15, 600, "domains/internet-finance/y.md", "internet-finance", "now,-5 days"),
("cnbc", "originator", 0.15, 601, "domains/internet-finance/z.md", "internet-finance", "now,-5 days"),
# newhandle — handle in events but no contributors row (LEFT JOIN COALESCE → person)
# -2 days so 24h-window test exclusion is unambiguous (matches alice).
("newhandle", "author", 0.30, 700, None, "ai-alignment", "now,-2 days"),
]
for handle, role, weight, pr_num, claim_path, domain, ts_modifier in events:
# Use SQLite datetime() to compute timestamps relative to "now" so tests
# are deterministic across days. Multi-arg form: datetime('now', '-1 days').
ts_args = ts_modifier.split(",")
if len(ts_args) == 1:
ts_sql = f"datetime('{ts_args[0]}')"
else:
ts_sql = f"datetime('{ts_args[0]}', '{ts_args[1].strip()}')"
conn.execute(
f"""INSERT INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, {ts_sql})""",
(handle, "agent" if handle in {"rio", "leo"} else "person",
role, weight, pr_num, claim_path, domain),
)
conn.commit()
conn.close()
return str(p)
def _call(db_path, **query):
"""Build a mocked request, call handle_leaderboard, return parsed JSON."""
qs = "&".join(f"{k}={v}" for k, v in query.items())
req = make_mocked_request("GET", f"/api/leaderboard?{qs}")
# make_mocked_request gives us req.app — write db_path into it.
req.app["db_path"] = db_path
response = asyncio.run(handle_leaderboard(req))
return json.loads(response.body.decode())
# ─── _parse_window unit tests ────────────────────────────────────────────────
class TestParseWindow:
def test_default_is_all_time(self):
clause, params, label = _parse_window(None)
assert clause == ""
assert params == ()
assert label == "all_time"
def test_explicit_all_time(self):
clause, params, label = _parse_window("all_time")
assert clause == ""
assert label == "all_time"
def test_seven_days(self):
clause, params, label = _parse_window("7d")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-7 days",)
assert label == "7d"
# Regression: must NOT begin with "AND " (handle_leaderboard composes via " AND ".join)
assert not clause.startswith("AND")
def test_thirty_days(self):
clause, params, label = _parse_window("30d")
assert params == ("-30 days",)
assert label == "30d"
def test_hours(self):
clause, params, label = _parse_window("24h")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-24 hours",)
assert label == "24h"
def test_caps_days_at_365(self):
clause, params, label = _parse_window("9999d")
assert params == ("-365 days",)
def test_caps_hours_at_8760(self):
clause, params, label = _parse_window("99999h")
assert params == ("-8760 hours",)
def test_garbage_falls_to_all_time(self):
clause, params, label = _parse_window("foobar")
assert clause == ""
assert label == "all_time"
def test_uppercase_normalized(self):
clause, params, label = _parse_window("7D")
assert label == "7d"
def test_zero_days_still_emits_clause(self):
# 0d means "now or later" — empty result, but parse should succeed
clause, params, label = _parse_window("0d")
assert "datetime" in clause
assert label == "0d"
# ─── handle_leaderboard integration tests ────────────────────────────────────
class TestLeaderboardEndpoint:
def test_all_time_default_kind_person(self, db_path):
"""Default kind is 'person'. Returns all persons, sorted by CI desc."""
body = _call(db_path)
assert body["window"] == "all_time"
assert body["kind_filter"] == "person"
assert body["domain"] is None
assert body["source"] == "contribution_events"
# alice 3*0.30 + 0.15 = 1.05
# bob 5*0.30 = 1.50
# carol 0.30 + 0.05 = 0.35
# newhandle 0.30 (LEFT JOIN COALESCE → 'person')
# cnbc excluded (kind='org')
# rio/leo excluded (kind='agent')
handles = [r["handle"] for r in body["leaderboard"]]
assert "bob" in handles
assert "alice" in handles
assert "newhandle" in handles, "LEFT JOIN COALESCE should default missing contributors to 'person'"
assert "cnbc" not in handles, "kind=person should exclude orgs"
assert "rio" not in handles, "kind=person should exclude agents"
# Descending by CI
cis = [r["ci"] for r in body["leaderboard"]]
assert cis == sorted(cis, reverse=True)
def test_window_7d_excludes_old_events(self, db_path):
"""REGRESSION: 7d window must execute (no AND-prefix SQL error).
Bob has all events 60d ago must not appear in 7d window.
Alice has events 1-3d ago must appear.
"""
body = _call(db_path, window="7d")
assert body["window"] == "7d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "bob" not in handles, "60d-old events must be excluded from 7d window"
assert "carol" in handles # today
def test_window_30d_excludes_60d_events(self, db_path):
"""REGRESSION: 30d window must execute. Bob (60d) excluded; alice/carol included."""
body = _call(db_path, window="30d")
assert body["window"] == "30d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
def test_window_24h_only_today(self, db_path):
"""24h window picks up today's events only.
Default kind=person. Within 24h: only carol (events at 'now').
Excluded: alice/newhandle (events at -2 days), bob (-60d), rio/leo (kind),
cnbc (-5d AND kind=org).
"""
body = _call(db_path, window="24h")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["carol"], (
"24h + kind=person should return only carol; got %r" % handles
)
def test_kind_agent(self, db_path):
"""kind=agent returns only agents."""
body = _call(db_path, kind="agent")
handles = [r["handle"] for r in body["leaderboard"]]
assert "rio" in handles
assert "leo" in handles
assert "alice" not in handles
assert "bob" not in handles
def test_kind_org(self, db_path):
"""kind=org returns only orgs (legacy events still queryable)."""
body = _call(db_path, kind="org")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["cnbc"]
assert body["leaderboard"][0]["ci"] == 0.30 # 2 * 0.15
def test_kind_all_returns_everyone(self, db_path):
"""kind=all returns all kinds — persons + agents + orgs."""
body = _call(db_path, kind="all")
handles = {r["handle"] for r in body["leaderboard"]}
assert handles == {"alice", "bob", "carol", "rio", "leo", "cnbc", "newhandle"}
def test_invalid_kind_falls_to_person(self, db_path):
"""Defensive: unknown kind value silently falls back to 'person'."""
body = _call(db_path, kind="bogus")
assert body["kind_filter"] == "person"
def test_domain_filter(self, db_path):
"""domain=internet-finance scopes events; kind filter still applies."""
body = _call(db_path, domain="internet-finance")
assert body["domain"] == "internet-finance"
handles = {r["handle"] for r in body["leaderboard"]}
# alice has 2 internet-finance authors + 1 originator
# carol has 1 internet-finance author + 1 evaluator
# bob has 0 (all ai-alignment)
# newhandle has 0 (ai-alignment only)
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
assert "newhandle" not in handles
def test_composed_window_kind_domain(self, db_path):
"""REGRESSION: composed filters must build SQL correctly.
7d + person + internet-finance alice only.
"""
body = _call(db_path, window="7d", kind="person", domain="internet-finance")
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles # excluded by 7d
assert "rio" not in handles # excluded by kind=person
def test_limit_caps_results(self, db_path):
"""limit caps the leaderboard slice; total reflects unfiltered count."""
body = _call(db_path, kind="all", limit=3)
assert body["shown"] == 3
assert body["has_more"] is True
assert body["total"] == 7
def test_no_has_more_when_under_limit(self, db_path):
body = _call(db_path, kind="org")
assert body["shown"] == 1
assert body["has_more"] is False
assert body["total"] == 1
def test_invalid_limit_falls_to_default(self, db_path):
"""Defensive: garbage limit param falls to default 100. 7 entries < 100."""
body = _call(db_path, kind="all", limit="not-a-number")
assert body["shown"] == 7
assert body["has_more"] is False
def test_limit_capped_at_500(self, db_path):
"""Defensive: limit > 500 silently caps at 500."""
body = _call(db_path, limit=99999, kind="all")
# No assertion on the value of the cap from the response — just that
# it doesn't error and shown <= 500.
assert body["shown"] <= 500
def test_role_breakdown_present(self, db_path):
"""Each row includes ci_breakdown with all 5 roles."""
body = _call(db_path)
for entry in body["leaderboard"]:
assert set(entry["ci_breakdown"].keys()) == {
"author", "challenger", "synthesizer", "originator", "evaluator",
}
def test_alice_role_breakdown_correct(self, db_path):
"""Alice has 3 author (0.90) + 1 originator (0.15) = 1.05 total."""
body = _call(db_path)
alice = next(r for r in body["leaderboard"] if r["handle"] == "alice")
assert alice["ci"] == 1.05
assert alice["ci_breakdown"]["author"] == 0.90
assert alice["ci_breakdown"]["originator"] == 0.15
assert alice["ci_breakdown"]["challenger"] == 0
assert alice["ci_breakdown"]["synthesizer"] == 0
assert alice["ci_breakdown"]["evaluator"] == 0
assert alice["events_count"] == 4
assert alice["pr_count"] == 4
assert alice["domain_count"] == 2 # internet-finance + ai-alignment
def test_empty_window_returns_clean_response(self, db_path):
"""Window with no matching events returns shape-correct empty response."""
# 24h window + kind=org → cnbc is 5d ago, so empty
body = _call(db_path, window="24h", kind="org")
assert body["leaderboard"] == []
assert body["total"] == 0
assert body["shown"] == 0
assert body["has_more"] is False
assert body["source"] == "contribution_events"
def test_left_join_handles_missing_contributors_row(self, db_path):
"""REGRESSION: handle in events but missing from contributors must default to kind='person'.
Catches the failure mode where a handle classified as cited (auto-create
deferred to Branch 3) accumulates events but has no contributors row yet.
"""
body = _call(db_path)
newhandle_row = next(
(r for r in body["leaderboard"] if r["handle"] == "newhandle"), None
)
assert newhandle_row is not None
assert newhandle_row["kind"] == "person"
assert newhandle_row["ci"] == 0.30
# ─── Public path constant (auth middleware bypass) ───────────────────────────
def test_public_paths_includes_leaderboard():
"""Auth middleware needs LEADERBOARD_PUBLIC_PATHS to skip API key for /api/leaderboard."""
assert "/api/leaderboard" in LEADERBOARD_PUBLIC_PATHS
def test_kind_values_matches_contract():
"""API contract: only these 4 kind values are accepted."""
assert set(KIND_VALUES) == {"person", "agent", "org", "all"}

View file

@ -0,0 +1,167 @@
"""Verify research-attribution backfill is replay-safe against real schema.
Three things to prove:
1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly
(idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL).
2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE
is a true no-op does not create a phantom duplicate.
3. The backfill script's specific operation (DELETE then INSERT for same key)
nets zero rows when run twice in sequence.
"""
import sqlite3
import sys
# Schema lifted verbatim from lib/db.py:181-209
SCHEMA = """
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
def setup() -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
return conn
def insert_event(conn, handle, role, pr_number, claim_path=None):
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', ?, 0.30, ?, ?)""",
(handle, role, pr_number, claim_path),
)
return cur.rowcount
def count(conn) -> int:
return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
def test_pr_level_dedup_with_null_claim_path():
"""Two inserts of same (handle, role, pr_number, NULL) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061)
r2 = insert_event(conn, "rio", "author", 4061)
n = count(conn)
assert r1 == 1, f"first insert should write, got rowcount={r1}"
assert r2 == 0, f"second insert should be ignored, got rowcount={r2}"
assert n == 1, f"expected 1 row, got {n}"
print("PASS: pr-level dedup with NULL claim_path")
def test_per_claim_dedup_with_path():
"""Two inserts of same (handle, role, pr_number, path) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 0 and n == 1
print("PASS: per-claim dedup with claim_path")
def test_pr_level_and_per_claim_coexist():
"""A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist
because the partial indexes target different rows."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path=None)
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 1 and n == 2
print("PASS: pr-level and per-claim events coexist on same pr_number")
def test_backfill_replay_is_noop():
"""Simulate the exact backfill operation: INSERT correct event, DELETE wrong event.
Run twice. Expect identical state no phantom rows, no double-deletions."""
conn = setup()
# Initial state: m3taversal has the wrong author event for pr=4061
insert_event(conn, "m3taversal", "author", 4061)
assert count(conn) == 1
def backfill_pr_4061():
# Insert the correct event (rio is the real author)
conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""",
("rio (self-directed)",),
)
# Delete the wrong event
conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
conn.commit()
backfill_pr_4061()
state_after_first = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first
# Replay
backfill_pr_4061()
state_after_second = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == state_after_second, "replay should be idempotent"
assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}"
print("PASS: backfill replay is a true no-op")
def test_replay_against_already_backfilled_pr_does_not_double_delete():
"""If m3taversal event was already deleted, running backfill again must not error
or affect anything else."""
conn = setup()
# Already-correct state: rio has the author event, m3taversal does not
insert_event(conn, "rio (self-directed)", "author", 4061)
insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched
# Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op
# Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected
cur1 = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""",
)
cur2 = conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}"
assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}"
assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}"
print("PASS: replay against already-backfilled state preserves unrelated events")
if __name__ == "__main__":
test_pr_level_dedup_with_null_claim_path()
test_per_claim_dedup_with_path()
test_pr_level_and_per_claim_coexist()
test_backfill_replay_is_noop()
test_replay_against_already_backfilled_pr_does_not_double_delete()
print("\nAll 5 tests passed against real schema.")