Compare commits

..

36 commits

Author SHA1 Message Date
ed5f7ef6cc fix(merge): correct audit-ref comment + add sentinel-drift warning
Some checks are pending
CI / lint-and-test (push) Waiting to run
Two nits from Ganymede line-level review of 7741c1e:

1. Comment at lines 562-565 said --force-with-lease but code is plain
   --force. Comment now describes the actual behavior: bot-owned per-PR
   audit ref, intentional overwrite on stale refs from prior aborted
   attempts, no concurrent writer to lease against.

2. Sentinel-regex extraction in _merge_domain_queue dispatch had no
   graceful-failure log. If the _merge_no_ff_external success-message
   contract drifts and any of the three regexes (M, audit_ref, external
   PR #) miss, dispatch silently builds a comment with None values and
   writes audit_log JSON with null fields. Added a warning log when any
   regex misses — signal-only, doesn't gate the close path since the
   merge already succeeded.

Branch: epimetheus/external-merge-flow-bug1
Parent: 7741c1e (Ship Msg 3 architecture review close)
Diff:   +11/-3, single file lib/merge.py

Ganymede: 3-message protocol Msg 3 (nits applied, ball returned).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 16:19:08 +01:00
7741c1e6de fix(merge): synthetic _merged/* ref + function-owned ff-push (Ship Msg 3)
Phase 2 review fix #1 (architectural pushback): replace force-push of
contributor's gh-pr-N/* branch with a three-step synthetic-branch flow:

  1. Worktree on local branch _merged-{slug} from origin/main
  2. git merge --no-ff origin/{branch} into the local branch
  3. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
  4. Function ff-pushes merge_sha → origin/main directly

Contributor's gh-pr-N/* branch on Forgejo is now never touched.
Force-pushing it would have rewritten the tip with a merge commit the
contributor didn't author — confusing bot force-push in Forgejo PR UI.
Mirrors the _clean/* synthetic branch pattern in cherry-pick.

Function now owns the push to main (was dispatch's job for cherry-pick
and reweave). Returns sentinel "merged --no-ff (external PR #N, M=<sha>,
audit_ref=...)" that dispatch detects to skip its ff-push and route
directly to PR-close + mark_merged + audit. Audit detail JSON now
includes merge_commit_sha + audit_ref + github_pr (Ship review #5).

Smoke-tested in scratch repo end-to-end:
  - contributor branch tip unchanged ✓
  - audit ref _merged/gh-pr-90/... carries merge SHA ✓
  - main tip equals merge SHA (ff-push, no force) ✓
  - contributor SHA ancestor of main → GitHub badge fires ✓

Sentinel return parsed via 3 regexes in dispatch (full 40-char SHA in
return string for durability). Branch comment in dispatch explicitly
notes contributor branch is left in place — sync-mirror keeps the
GitHub PR <-> Forgejo PR link observable through it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:32:52 +01:00
992b4ee36f feat(merge): _merge_no_ff_external for gh-pr-* branches (Phase 2)
External GitHub fork PRs need their contributor commit SHA in main's history
for GitHub's "merged" badge to fire. Cherry-pick rewrites the SHA, breaking
that detection. New _merge_no_ff_external function preserves the SHA via a
true merge commit.

Mechanics (mirrors _cherry_pick_onto_main shape):
1. Fetch origin/main + origin/{branch}
2. Detached worktree at origin/main, git merge --no-ff origin/{branch}
   with verbose message: "Merge external GitHub PR #{N}: {branch_slug}"
3. Force-push merge commit M as origin/{branch}, replacing branch tip
4. Dispatch's existing ff-push origin/{branch} → main propagates M to main

M has parents [main_sha, contributor_sha]. M is a fast-forward descendant
of main_sha (first-parent chain), so the ff-push to main is valid without
--force. Contributor SHA reachable from main → GitHub recognizes merged.

Conflict handling: same auto-resolve as cherry-pick — entity-only conflicts
take main's version (--ours = current worktree HEAD = main), other conflicts
abort with detail.

Backout: config.EXTERNAL_PR_NO_FF_MERGE = True (default). Set False to fall
back to cherry-pick if no-ff destabilizes throughput one week pre-Accelerate.

Branch dispatch in _merge_domain_queue:
- reweave/* → _merge_reweave_pr (existing)
- gh-pr-N/* AND config.EXTERNAL_PR_NO_FF_MERGE → _merge_no_ff_external (new)
- everything else → _cherry_pick_onto_main (existing default)

Verified end-to-end in scratch repo:
- merge commit M has [main_sha, contributor_sha] as parents
- contributor SHA is ancestor of M
- after ff-push, contributor SHA is in main's history (GitHub badge fires)
- regex parses 8 cases correctly (real fork PR + edge cases reject cleanly)

Architecture per Ship Msg 3 / doc v3 (537cfd5 on epimetheus/external-merge-flow-design).
Phase 1 (sync-mirror self-heal) deployed yesterday. Phase 3 (FwazB PR #90 cleanup)
queued behind this deploy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:18:37 +01:00
de204db539 fix(sync-mirror): tighten gh-pr-* regex + document SQL-integer-safety
Some checks are pending
CI / lint-and-test (push) Waiting to run
Ganymede review nit on commit 1eb259d:

- Regex changed from [0-9]* (zero-or-more) to [0-9][0-9]* (one-or-more,
  portable BRE form of [0-9]+ that works on both GNU and BSD sed).
- Empty/non-numeric branches now fail at parse, not just at the empty-guard
  below — SQL-integer-safety load-bearing on the regex alone.
- Comment above the UPDATE notes the integer-validation invariants
  (INTEGER `number` column + regex-validated gh_pr_num) since bash sqlite3
  has no parametric binding.

Smoke tested: gh-pr-/foo, gh-pr-abc/foo no longer parse to non-empty.
gh-pr-90/main, gh-pr-4066/contrib/x, gh-pr-1/x all parse correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 13:07:50 +01:00
1eb259de8a fix(sync-mirror): self-heal sweep for orphaned gh-pr-* github_pr links
Step 0 (new): runs once per cron tick before per-repo work. Selects PR rows
where branch matches gh-pr-% but github_pr IS NULL, parses the PR number
from the branch name, and updates github_pr + source_channel='github'.

Recovers from races and transient failures in the existing Step 4.5 link
UPDATE — no retry path before. The sweep IS the backfill: same SELECT/UPDATE
heals historical orphans (FwazB PR 4066 picked up on first cron tick) AND
future races on subsequent ticks. No separate one-shot script needed.

Properties:
- Idempotent: SELECT empty when clean, zero work
- No API calls: branch name encodes the GitHub PR number deterministically
- Bounded log volume: one line per actually-healed row
- Runs before any sync_repo work, ahead of branch-mirror loop and the
  auto-create-PR block in Step 4 — same-cycle convergence on fresh races

Closes the Bug #2 path that left FwazB's PR 4066 with github_pr=NULL,
preventing on_merged() from posting comment + closing the GitHub PR.

Verified end-to-end on live DB snapshot:
- before: 4066 had github_pr=NULL
- after sweep: 4066 has github_pr=90, source_channel='github'
- second run: zero output (idempotent)

Phase 1 of docs/external-contributor-merge-flow.md (v2, sweep-only).
Ship architecturally approved Msg 2/2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 13:02:37 +01:00
33f6ca9e3f fix(mirror): setup script pushes main+tags only (consistency with sync-mirror)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Initial setup-infra-mirror.sh did `git push origin --all`, which contradicted
the main_only mode protection landed in b9c4947 — agent review branches
(epimetheus/*, ganymede/*) ended up publicly visible on the new GitHub
teleo-infrastructure mirror until I deleted them.

Initial push now mirrors the recurring sync's main_only path: refs/heads/main
+ tags only. Re-running the setup script is now idempotent at branch level —
won't redo the agent-branch leak.

Cleanup applied to live GitHub teleo-infrastructure: 18 stale agent review
branches deleted via single batched push (epimetheus/* x14, ganymede/* x3,
ship/metadao-scraper). Only main remains. Codex bidirectional mirror unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 23:09:25 +01:00
b9c4947637 fix(mirror): restrict main_only mode to main+tags (Ganymede review)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Finding #1 (recommendation, applied): infra-mode now pushes only main + tags
to GitHub. Agent review branches (epimetheus/*, ganymede/*) stay Forgejo-only.
Public GitHub history reflects merged work, not pre-review WIP with internal
agent context.

Bidirectional mode unchanged — codex still mirrors all branches so external
contributors can fork from any branch.

Nit #4: setup script m3taversal username has a comment explaining it's a
placeholder for fine-grained PAT auth, mirrors the existing teleo-codex remote.

Two pre-existing nits filed for follow-up branch:
- hardcoded `living-ip:` in GH_PR_NUM head filter (line 273)
- spurious CRITICAL log on GH→forgejo→GH cycles (re-fetch forgejo after Step 2.5)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 22:54:18 +01:00
bf647b7abb feat(mirror): refactor sync-mirror.sh for multi-repo, add infra setup script
Wraps the per-repo body in sync_repo() and loops over MIRROR_REPOS at the
bottom. teleo-codex stays bidirectional (full PR roundtrip + pipeline.db
linking). teleo-infrastructure runs main_only: branch+tag sync Forgejo→
GitHub, ff-only GitHub→Forgejo on main, divergence alerting per-repo.
Steps 2.1 (fork PR refs) and 4 (Forgejo PR auto-create + DB link) gated
on MODE=bidirectional.

Setup script (deploy/setup-infra-mirror.sh) initializes the bare repo at
/opt/teleo-eval/mirror/teleo-infrastructure.git, configures remotes,
performs initial Forgejo→GitHub push. Idempotent. Pre-flight checks both
GitHub repo (must be created manually first — fine-grained PAT can't
create repos in the org) and Forgejo repo are accessible.

Per-repo divergence state file (.divergence-count.<repo>) so each repo
has independent counter + alert state. Also pulls in the source_channel
update from Apr 6 that lived only on VPS (line 215 added 'github').

Not deployed yet — pending Ganymede review and GitHub repo creation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 22:22:33 +01:00
1351db70a9 fix(tests): apply Ganymede review nits + add m3taversal reset script
Some checks are pending
CI / lint-and-test (push) Waiting to run
3 nits from review of d60b6f8 + Q4 ask:

1. test_window_24h_only_today: replace always-true assertion with
   concrete `assert handles == ["carol"]`. Push alice's most-recent
   event from -1 days to -2 days to eliminate fixture-vs-query
   microsecond drift on the 24h boundary.
2. _call helper: asyncio.get_event_loop().run_until_complete →
   asyncio.run (deprecation in 3.12, raises in some 3.14 contexts).
3. test_invalid_limit_falls_to_default: dead first call removed,
   misleading "7 entries" comment now matches assertion.

Q4: scripts/reset-m3taversal-sourcer.py captures the surgical
UPDATE we ran on VPS as a reviewable artifact. Idempotent (no-op
on already-reset rows), audit_log entry per run. Ganymede's point:
DB mutations should leave a code paper trail, not just an audit
row whose origin lives only in the executor's memory.

30/30 tests pass on VPS hermes venv (aiohttp 3.13.5, py 3.11.15).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:35:18 +01:00
d60b6f8bf2 test(leaderboard): cover all four slicings + AND-prefix regression
Adds tests/test_leaderboard.py — 30 cases against
diagnostics/leaderboard_routes.py. Two reasons:

(1) Zero coverage on an endpoint Argus + Oberon are about to consume
    for the May 5 hackathon UI. Two bugs slipped through this morning
    (404 wiring missing in app.py; AND-prefix SQL syntax error on
    rolling-window). Tests prevent regression.

(2) Tests serve as living documentation for Oberon's frontend
    integration — each test names a contract guarantee
    (test_left_join_handles_missing_contributors_row,
    test_composed_window_kind_domain, test_role_breakdown_present).

Coverage:
  - _parse_window unit tests (10): all_time, Nd, Nh, caps, garbage,
    case-normalization, and explicit no-AND-prefix assertion
  - handle_leaderboard integration (18): every kind value, every
    window family, domain filter, composed filters, limit + has_more,
    invalid-input fallback, role breakdown shape, empty-window shape,
    LEFT JOIN COALESCE for handles missing from contributors
  - 2 contract assertions: LEADERBOARD_PUBLIC_PATHS membership +
    KIND_VALUES set

Run: 30/30 pass on VPS hermes venv (aiohttp 3.13.5, pytest 9.0.2).
Skips clean locally without aiohttp via pytest.importorskip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:46:47 +01:00
cd5aac5cc6 fix(activity-feed): remove [:120] slug truncation
Some checks are pending
CI / lint-and-test (push) Waiting to run
Claim slugs were being cut at 120 chars in _extract_claim_slugs, causing
Timeline event clicks to 404 when the on-disk filename exceeded that
length (frontend builds /api/claims/<slug> from the truncated value).

This fix landed Apr 26 but regressed when the file was redeployed —
committing the unmangled version to repo so deploy.sh re-shipping
doesn't reintroduce the cap.

Verified live: max slug now 265 chars, 16 of 30 over the old 120 cap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 15:27:31 +01:00
7c6417d6be test(diagnostics): activity_endpoint classify_pr_operation suite
Move tests from /tmp into the proper test suite. 22 cases covering:

- Leo gotcha: extract/* + commit_type=enrich/challenge classifies by
  commit_type, not branch prefix (same pattern as the contributor-role
  wiring fix)
- Reweave priority: branch.startswith('reweave/') wins over
  _MAINTENANCE_COMMIT_TYPES — nightly reweave PRs classify as enrich,
  not infra. Locks in the bifurcation against future priority refactors
- Full NON_MERGED_STATUS_TO_OPERATION coverage: open, approved, closed,
  conflict, validating, reviewing, merging, zombie
- Knowledge-producing commit_types (research, entity) → new
- Maintenance commit_types (fix, pipeline) → infra
- Defensive: null inputs, unknown status

aiohttp imported at module load — file uses pytest.importorskip so it
runs cleanly in any environment with aiohttp installed and skips gracefully
otherwise. sys.path inject for diagnostics/ since it isn't packaged.

Reviewed-by: Ganymede

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:39:44 +01:00
42d35d4e15 fix(diagnostics): wire /api/leaderboard into app.py + fix rolling-window SQL
Some checks are pending
CI / lint-and-test (push) Waiting to run
de7e5ec landed leaderboard_routes.py + the route file's register fn but
the import + register_leaderboard_routes(app) call + auth-middleware
allowlist were never added to app.py — endpoint returned 404 in production.

Three minimal edits to app.py mirror the existing register_*_routes pattern
(import at line 28, allowlist OR-clause at line 512, register call at 2365).

Plus a SQL bug in _parse_window: rolling-window clauses prefixed "AND "
but the WHERE composition uses " AND ".join(...), producing
"WHERE 1=1 AND AND ce.timestamp..." → sqlite3.OperationalError on every
window=Nd / window=Nh request. Stripped the prefix and added a comment so
the asymmetry doesn't bite again.

Verified on VPS:
  GET /api/leaderboard?window=all_time&kind=person → 200, 11 rows
  GET /api/leaderboard?window=7d&kind=person → 200, 2 rows
  GET /api/leaderboard?window=30d&kind=person → 200, 9 rows
  GET /api/leaderboard?domain=internet-finance → 200, 3 rows
  GET /api/leaderboard?kind=agent → 200, leo/rio/clay/astra/vida

Unblocks: Argus dashboard cutover, Oberon column reorder, Leo's CI
taxonomy broadcast.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:30:26 +01:00
de7e5ec709 feat(diagnostics): /api/leaderboard reads contribution_events directly
Some checks are pending
CI / lint-and-test (push) Waiting to run
New endpoint replaces the legacy /api/contributors *_count read path with
event-sourced reads from the Phase A contribution_events ledger.

- Params: window (all_time | Nd | Nh), kind (person | agent | org | all),
  domain (filter), limit (default 100, max 500)
- Returns per-handle CI, full role breakdown (author/challenger/synthesizer/
  originator/evaluator), events_count, pr_count, first/last contribution
- ORDER BY ci DESC, last_contribution DESC — recent contributors break ties
- Read-only sqlite URI; total/has_more computed for paginated UIs

Wiring (import + register + _PUBLIC_PATHS entry) currently applied to live
app.py on VPS only — repo app.py has drift from Ship's uncommitted /api/search
POST contract. Next deploy.sh round-trip needs both to land together.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 13:16:41 +01:00
369f6c96da fix(attribution): credit research-session sources to agents, not m3taversal (#7)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Forward fix: research-session.sh writes intake_tier: research-task (no proposed_by — extract.py infers agent from branch).

Backfill: 304 PRs reattributed across 30 days (rio 74, clay 70, astra 53, vida 48, theseus 30, leo 29). Already applied to production.

Format reconciliation: normalize_handle strips (self-directed) suffix so both halves canonicalize to the same agent handle.

5 idempotency tests passing. Production replay self-extinguishes (delta 3839→3839).
2026-04-27 11:59:54 +00:00
6aff03ff56 fix(attribution): unify research-session format on "(self-directed)" suffix
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Resolves the format inconsistency between the forward fix and the 304-row
backfill. Both halves now produce prs.submitted_by = "rio (self-directed)":

- research-session.sh: drop proposed_by from the frontmatter template.
  extract.py path 1 (proposed_by-driven) no longer fires; path 2 fires
  instead and constructs f"{agent} (self-directed)" — matches backfill.

- attribution.py: normalize_handle now strips "(self-directed)" suffix
  immediately after lowercase+@-strip, before alias lookup. Closes the
  phantom-person-event class on any future replay through
  record_contributor_attribution. Round-trips through alias rules keyed
  on bare agent names.

Test (5 cases) still passes; suffix-strip behavior verified against
hostile inputs (whitespace, casing, mid-string occurrences must NOT
match — only trailing pattern).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:53:52 +01:00
319e03e2c6 test(attribution): prove research-backfill replay is idempotent
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Five tests against the real contribution_events schema (lib/db.py:181-209):
- pr-level dedup with NULL claim_path via idx_ce_unique_pr partial index
- per-claim dedup with non-NULL claim_path via idx_ce_unique_claim partial index
- pr-level and per-claim events coexist on the same pr_number
- backfill (INSERT correct + DELETE wrong) is a true no-op on replay
- replay against already-backfilled state preserves unrelated events

Schema case identified: case 2 with partial-index split solution already in
place. Two partial UNIQUE indexes target disjoint row sets (claim_path IS NULL
vs IS NOT NULL), bypassing SQLite's NULL-not-equal-NULL UNIQUE quirk.

Production replay verified: re-running backfill --apply against the live DB
returns "misattributed PRs found: 0" because the first-run UPDATE flipped the
WHERE predicate. Total contribution_events count: 3839 → 3839.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:50:17 +01:00
2d332c66d4 fix(attribution): credit research-session sources to agents, not m3taversal
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
Two-part fix for a bug where every claim extracted from agent overnight
research sessions was being credited to m3taversal in contribution_events
(visible in the activity feed as "@m3taversal" on agent-derived claims).

Forward fix (research/research-session.sh):
The frontmatter template the agent prompt instructs Claude to use now
includes `proposed_by: ${AGENT}` and `intake_tier: research-task`. With
those fields present, extract.py path 1 (line 687) takes precedence and
sets prs.submitted_by to the agent handle, which then propagates into
contribution_events as a kind='agent' author event for the agent.

Without the fields, extract.py fell through to the default branch on
line 695 and set submitted_by='@m3taversal'.

Backfill (scripts/backfill-research-session-attribution.py):
Identifies research-session-derived PRs by finding teleo-codex commits
matching `^<agent>: research session YYYY-MM-DD —`, listing the
inbox/queue/*.md files added in each commit's diff, and matching those
filename basenames against prs.source_path. Only PRs currently
submitted_by='@m3taversal' AND merged within the configurable window
are touched. Default --dry-run; --apply to commit.

For each match the script:
  1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
  2. INSERT OR IGNORE the agent author event (kind='agent', weight=0.30)
     with the original PR's domain, channel, merged_at preserved
  3. DELETE the misattributed m3taversal author event

Applied 30-day backfill on VPS:
  - 304 PRs re-attributed (rio 74, clay 70, astra 53, vida 48,
    theseus 30, leo 29)
  - 297 m3taversal author events deleted, 304 agent author events
    inserted (delta of 7 = pre-v24 PRs that never had m3ta events
    in the first place; we still create the new agent event)
  - m3taversal author count: 1368 → 1071 (−22%)
  - Pre-backfill DB snapshot: pipeline.db.bak-pre-research-attribution

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 12:38:53 +01:00
dea1b02aa6 fix(attribution): narrow exception + document gate asymmetry (Ganymede review)
Two follow-up fixes from Ganymede's review of d0fb4c9:

1. is_publisher_handle: narrow `except Exception` to sqlite3.OperationalError.
   Pre-v26 DB fallback only needs to catch the "table doesn't exist" case;
   broader exceptions (programming errors, locks, corruption) should propagate.

2. upsert_contributor gate: add comment documenting the alias-resolution
   asymmetry between insert_contribution_event (alias-resolved via
   normalize_handle) and upsert_contributor (bare lower+lstrip-@). Today this
   is fine because the v26 classifier produced one publisher row per canonical
   handle. Branch 3 will normalize alias→canonical at writer entry points,
   tightening this gate transparently.

Unit tests for the gates (positive + negative + alias resolution) deferred to
Branch 3 alongside the auto-create flow tests.

Smoke-tested:
  - pre-v26 fallback (no publishers table) → None (correct)
  - case-insensitive match (CNBC → id=1) → correct
  - @ prefix strip (@cnbc → id=1) → correct
  - non-publisher handle (alexastrum) → None (correct)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:25:24 +01:00
d0fb4c96e3 fix(attribution): gate writer on publishers table (regression prevention)
Schema v26 (commit 3fe524d) split orgs/citations from contributors into
the publishers table. Without a writer-side gate, every merged PR with
`sourcer: cnbc` (or similar) re-creates CNBC as a contributor and
undoes the v26 classifier cleanup. Once normal pipeline traffic resumes,
the contributors table re-pollutes within hours.

Fix: belt-and-suspenders gate at both writer surfaces.

1. `lib/attribution.py::is_publisher_handle(handle, conn)` — returns
   publisher.id if handle exists in publishers.name, else None. Falls
   back gracefully on pre-v26 DBs (no publishers table → returns None →
   writer behaves like before, no regression).

2. `lib/contributor.py::insert_contribution_event` — checks
   is_publisher_handle on canonical handle before INSERT. If it's a
   publisher, debug-log + return False. Prevents originator events for
   CNBC/SpaceNews/etc.

3. `lib/contributor.py::upsert_contributor` — same gate at top. Prevents
   the contributors table from re-acquiring publisher rows.

Verified end-to-end against live VPS DB snapshot:
  - CNBC originator event: blocked (insert returns False)
  - CNBC contributors row: blocked (no row created)
  - alexastrum, thesensatore, newhandle_xyz: pass through unchanged
  - is_publisher_handle handles case-insensitive lookup correctly
    (CNBC and cnbc both match publisher_id=3)

Pre-deploy event count was 3705. Post-classifier cleanup: 3623 (82 org
events purged). Going forward, no new org events accumulate.

Branch 2 of the schema-v26 rollout. Branch 3 (auto-create at tier='cited',
extract.py sources.publisher_id wiring) is separate scope and not required
for regression prevention.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:21:10 +01:00
926a397839 fix(activity): re-apply source classifier + add date-prefix slug fallback
Some checks are pending
CI / lint-and-test (push) Waiting to run
Regression: aeae712's source/create distinction was lost — VPS reverted to
pre-aeae712 behavior where every extract/* knowledge PR returned type=create
regardless of whether a claim was written. Source archives surfaced as
"New claim" chips with date-prefix slugs that 404 on click.

Root cause: aeae712 was deployed via local file copy and never pushed to
origin; a subsequent rsync from origin/main overwrote it with the older
classifier. This branch ships from origin so deploy.sh's repo-first gate
makes recurrence impossible.

- Restore aeae712: extract/* + empty description -> source, with
  empty claim_slug + source_slug field, ci_earned 0.15
- Add Leo's regex fallback: candidate_slug matching
  ^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$ -> source regardless of branch
  /commit_type/description state. Catches edge cases where description
  leaks but is just a source title (slugified into the inbox filename
  pattern), not a claim insight.
- Add 'challenge' to _FEED_COMMIT_TYPES (latent bug — challenge PRs
  would be filtered out before classification because the filter
  list omitted them; memory says 0 challenges exist so it never
  triggered, but schema support belongs in the filter)
- _build_events: compute candidate slug before classify so the regex
  fallback has a slug to inspect

Verified locally on Leo's example PRs (#4014, #4016) — both classify
as source. VPS smoke pending deploy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 13:47:00 +01:00
3fe524dd14 fix(classify): Ganymede review fixes — alias cleanup + counter accuracy + handle alignment
Some checks failed
CI / lint-and-test (push) Has been cancelled
1. WARNING — orphan contributor_aliases after publisher/garbage delete:
   Added alias cleanup to the transaction (gated on --delete-events, same
   audit rationale as events). Both garbage and publisher deletion loops
   now DELETE matching contributor_aliases rows. Dry-run adds an orphan
   count diagnostic so the --delete-events decision is informed.

2. NIT — inserted_publishers counter over-reports on replay:
   INSERT OR IGNORE silently skips name collisions, but the counter
   incremented unconditionally. Now uses cur.rowcount so a second apply
   reports 0 inserts instead of falsely claiming 100. moved_to_publisher
   set remains unconditional — publisher rows already present still need
   the matching contributors row deleted.

3. NIT — handle-length gate diverged from writer path:
   Widened from {0,19} (20 chars) to {0,38} (39 chars) to match GitHub's
   handle limit and contributor.py::_HANDLE_RE. Prevents future long-handle
   real contributors from falling through to review_needed and blocking
   --apply. Current data has 0 review_needed either way.

Bonus (Q5): Added audit_log entry inside the transaction. One row in
audit_log.stage='schema_v26', event='classify_contributors' with counter
detail JSON on every --apply run. Cheap audit trail for the destructive op.

Verified end-to-end on VPS DB snapshot:
- First apply: 100/9/9/100/0 (matches pre-fix)
- Second apply: 0/9/0/0/0 (counter fix working)
- With injected aliases + --delete-events: 2 aliases deleted, 1 pre-existing
  orphan correctly left alone (outside script scope), audit_log entry
  written with accurate counters.

Ganymede msg-3. Protocol closed.
2026-04-24 20:47:21 +01:00
45b2f6de20 feat(schema): v26 — publishers + contributor_identities + sources provenance
Separates three concerns currently conflated in contributors table:
  contributors — people + agents we credit (kind in 'person','agent')
  publishers   — news orgs / academic venues / platforms (not credited)
  sources      — gains publisher_id + content_type + original_author columns

Rationale (Cory directive Apr 24): livingip.xyz leaderboard was showing CNBC,
SpaceNews, TechCrunch etc. at the top because the attribution pipeline credited
news org names as if they were contributors. The mechanism-level fix is a
schema split — orgs live in publishers, individuals in contributors, each
table has one semantics.

Migration v26:
  - CREATE TABLE publishers (id PK, name UNIQUE, kind CHECK IN
    news|academic|social_platform|podcast|self|internal|legal|government|
    research_org|commercial|other, url_pattern, created_at)
  - CREATE TABLE contributor_identities (contributor_handle, platform CHECK IN
    x|telegram|github|email|web|internal, platform_handle, verified, created_at)
    Composite PK on (platform, platform_handle) + index on contributor_handle.
    Enables one contributor to unify X + TG + GitHub handles.
  - ALTER TABLE sources ADD COLUMN publisher_id REFERENCES publishers(id)
  - ALTER TABLE sources ADD COLUMN content_type
    (article|paper|tweet|conversation|self_authored|webpage|podcast)
  - ALTER TABLE sources ADD COLUMN original_author TEXT
    (free-text fallback, e.g., "Kim et al." — not credit-bearing)
  - ALTER TABLE sources ADD COLUMN original_author_handle REFERENCES contributors(handle)
    (set only when the author is in our contributor network)
  - ALTER wrapped in try/except on "duplicate column" for replay safety
  - Both SCHEMA_SQL (fresh installs) + migration block (upgrades) updated
  - SCHEMA_VERSION bumped 25 -> 26

Migration is non-breaking. No data moves yet. Existing publishers-polluting-
contributors row state is preserved until the classifier runs. Writer routing
to these tables lands in a separate branch (Phase B writer changes).

Classifier (scripts/classify-contributors.py):
  Analyzes existing contributors rows, buckets into:
    keep_agent   — 9 Pentagon agents
    keep_person  — 21 real humans + reachable pseudonymous X/TG handles
    publisher    — 100 news orgs, academic venues, formal-citation names,
                   brand/platform names
    garbage      — 9 parse artifacts (containing /, parens, 3+ hyphens)
    review_needed — 0 (fully covered by current allowlists)

  Hand-curated allowlists for news/academic/social/internal publisher kinds.
  Garbage detection via regex on special chars and length > 50.
  Named pseudonyms without @ prefix (karpathy, simonw, swyx, metaproph3t,
  sjdedic, ceterispar1bus, etc.) classified as keep_person — they're real
  X/TG contributors missing an @ prefix because extraction frontmatter
  didn't normalize. Cory's auto-create rule catches these on first reference.

  Formal-citation names (Firstname-Lastname form — Clayton Christensen, Hayek,
  Ostrom, Friston, Bostrom, Bak, etc.) classified as academic publishers —
  these are cited, not reachable via @ handle. Get promoted to contributors
  if/when they sign up with an @ handle.

  Apply path is transactional (BEGIN / COMMIT / ROLLBACK on error). Publisher
  insert happens before contributor delete, and contributor delete is gated
  on successful insert so we never lose a row by moving it to a failed
  publisher insert.

  --apply path flags:
    --delete-events  : also DELETE contribution_events rows for moved handles
                       (default: keep events for audit trail)
  --show <handle>   : inspect a single row's classification

Smoke-tested end-to-end via local copy of VPS DB:
  Before: 139 contributors total (polluted with orgs)
  After:  30 contributors (9 agent + 21 person), 100 publishers, 9 deleted
  contribution_events: 3,705 preserved
  contributors <-> publishers overlap: 0

Named contributors verified present after --apply:
  alexastrum (claims=6)  thesensatore (5)  cameron-s1 (1)  m3taversal (1011)

Pentagon agent 'pipeline' (claims_merged=771) intentionally retained — it's
the process name from old extract.py fallback path, not a real contributor.
Classified as agent (kind='agent') so doesn't appear in person leaderboard.

Deploy sequence after Ganymede review:
  1. Branch ff-merge to main
  2. scp lib/db.py + scripts/classify-contributors.py to VPS
  3. Pipeline already at v26 (migration ran during earlier v26 restart)
  4. Run dry-run: python3 ops/classify-contributors.py
  5. Apply: python3 ops/classify-contributors.py --apply
  6. Verify: livingip.xyz leaderboard stops showing CNBC/SpaceNews
  7. Argus /api/contributors unaffected (reads contributors directly, now clean)

Follow-up branch (not in this commit):
  - Writer routing in lib/contributor.py + extract.py:
    org handles -> publishers table + sources.publisher_id
    person handles with @ prefix -> auto-create contributor, tier='cited'
    formal-citation names -> sources.original_author (free text)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 20:47:21 +01:00
f0f9388c1f feat(diagnostics): add POST /api/search for chat API contract
Some checks are pending
CI / lint-and-test (push) Waiting to run
Wire the search endpoint to accept POST bodies matching the embedded
chat contract (query/limit/min_score/domain/confidence/exclude →
slug/path/title/domain/confidence/score/body_excerpt). GET path retained
for legacy callers and adds a min_score override for hackathon debug.

- _qdrant_hits_to_results() shapes raw hits into chat response format
- handle_api_search() dispatches POST vs GET
- /api/search added to _PUBLIC_PATHS (chat is unauthenticated)
- POST route registered alongside existing GET

Resolves VPS↔repo drift flagged by Argus before next deploy.sh run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 17:58:30 +01:00
0f2b153c92 fix(backfill): Ganymede review — fix tautological guard + origin='human'
Some checks are pending
CI / lint-and-test (push) Waiting to run
Addresses two findings in commit 762fd42 review:

1. BUG: guard query was tautological. `SELECT MAX(number) FROM prs WHERE
   number < 900000` filters out exactly what the `>= 900000` check tests.
   Replaced with a direct check for unexpected rows in the synthetic range
   (excluding our known 900068/900088).

2. WARNING: origin defaults to 'pipeline' via schema default. lib/merge.py
   convention is origin='human' for external contributors. Synthetic rows
   now set origin='human', priority='high' — matches discover_external_prs
   for real GitHub PRs. Prevents Phase B origin-based filtering from
   misclassifying Alex/Cameron as machine-authored.

Also flagged in review: credit projection was optimistic. Author events are
PR-level (not per-claim), so Alex gets 1×0.30 author credit, not 6. Same
for Cameron. Per-claim originator credit goes to the 7 frontmatter sourcers
where applicable. Not a code change — expectation reset for Cory.
2026-04-24 16:49:12 +01:00
762fd4233e feat(backfill): synthetic PR rows for pre-mirror GitHub PRs #68 (Alex) + #88 (Cameron)
Two historical GitHub PRs merged before our sync-mirror.sh tracked github_pr:
  - GitHub PR #68: alexastrum, 6 claims, merged Mar 9 2026 via squash merge
  - GitHub PR #88: Cameron-S1, 1 claim, merged early April

Their claim files were lost during a Forgejo→GitHub mirror overwrite and later
recovered via direct-to-main commits (dba00a79, da64f805). Because the
recovery commits bypassed the pipeline, our 'prs' table has no row to attach
originator events to — all 4 backfill-events.py strategies returned None,
leaving Alex + Cameron at 0 originator credits despite real historical work.

This reconstructs synthetic 'prs' rows so the existing github_pr strategy in
backfill-events.py attaches 7 originator events on re-run:
  - Numbers 900068 / 900088 live in a clearly-synthetic range that cannot
    collide with real Forgejo PRs (current max: 3941)
  - github_pr=68/88 wires up the existing lookup strategy
  - submitted_by=alexastrum / cameron-s1 establishes author attribution
  - merged_at from the recovery commit messages (not recovery-commit time)
  - last_error tags the rows as synthetic for future audits

Idempotent: INSERT OR IGNORE via check on number OR github_pr. Safe to replay.
Reversible: DELETE FROM prs WHERE number IN (900068, 900088).

After applying this script:
  python3 ops/backfill-events.py
will credit Alex with 6 author + 6 originator events (author=1.80, originator=0.90)
and Cameron with 1 author + 1 originator (0.30 + 0.15), all dated to the
historical merge dates — so 7d/30d leaderboard windows show them correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:33:37 +01:00
10d5c275da fix(backfill): normalize commit_date via datetime() in time-proximity query
Some checks are pending
CI / lint-and-test (push) Waiting to run
SQLite datetime comparison fails lexicographically across ISO-T and
space-separator formats: '2026-03-27 18:00:14' < '2026-03-27T17:43:04+00:00'
because space (0x20) < T (0x54). PRs merged same-day but earlier than the
commit hour were silently excluded from the time-proximity cascade.

Shaga's 3 stigmergic-coordination claims resolved to PR #2032 (later, wrong)
instead of #2025 (earlier, correct). Fixed by wrapping both sides in
datetime(), which normalizes to space-separator before comparison.

Verified: all 3 Shaga claims now resolve to #2025 via git_time_proximity.
No change to totals (126 originator events, 5 proximity hits) — the fix
corrects WHICH PR each proximity-matched claim resolves to, not whether.

Caught by Ganymede review of 1d6b515.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:16:03 +01:00
1d6b51527a feat(backfill): 4-strategy PR recovery for originator events
Rewrite claim-level pass in backfill-events.py to recover the Forgejo PR
that introduced each claim via a cascade of 4 strategies (reliability
order), replacing the single title→description match that missed PRs
with NULL description (Cameron #3377) and bare-subject extracts (Shaga's
Leo research PR).

## Strategies
  1. sourced_from frontmatter → prs.source_path stem match
  2. git log first-add commit → subject pattern → prs.branch
     - "<agent>: extract claims from <slug>"  → extract/<slug>
     - "<agent>: research session YYYY-MM-DD" → <agent>/research-<date>
     - "<agent>: (challenge|contrib|entity|synthesize)" → <agent>/*
     - "Recover X from GitHub PR #N"           → prs.github_pr=N
     - "Extract N claims from X" (no prefix)   → time-proximity on
       agent-owned branches within 24h
  3. Current title_desc fallback for anything the above miss

## Dry-run projection (1,662 merged PRs)

Before:
  Claims processed: 33
  Originator events: 6
  Breakdown: {no_pr_match: 1608, no_sourcer: 26, invalid_handle: 21, skip_self: 6}

After:
  Claims processed: 505 (+472)
  Originator events: 126 (+120)
  Strategy hits: git_subject=412, sourced_from=88, git_time_proximity=5
  Breakdown: {no_pr_match: 1095, no_sourcer: 67, invalid_handle: 359, skip_self: 20}

## Verified on real VPS data
- @thesensatore claims: 3/5 resolve via git_time_proximity to leo/ PRs
- Cameron-S1, alexastrum: remain None — their recovery commits
  (dba00a79, da64f805) bypassed the pipeline entirely, no Forgejo PR
  record exists. Requires synthetic prs rows — deferred to separate
  commit with its own Ganymede review (write operation, larger blast
  radius than this pure-read backfill change).

## Implementation
- New find_pr_for_claim(conn, repo, md) helper returns (pr_number, strategy)
- Claim-level pass uses it first, falls back to title_desc map
- Strategy counter surfaced in summary output for operator visibility

Idempotent — backfill re-runs skip duplicate events via the partial
UNIQUE index on contribution_events.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:06:52 +01:00
540ba97b9d fix(attribution): Phase A followup — bug #1 + 4 nits + refactor (Ganymede review)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Addresses Apr 24 review of 58fa8c52. All 6 findings landed.

Bug #1 — git log -1 returns latest commit, not first (semantic mismatch
with "original author" comment):
  Drop -1 flag, take last line of default-ordered log output (= oldest).
  Fixes mis-credit on multi-commit PRs where a reviewer rebased/force-pushed.

Nit #2 — forward writer didn't pass merged_at:
  Fetch merged_at in the prs SELECT, thread pr_merged_at through all 5
  insert_contribution_event call sites. Keeps forward-emitted and backfilled
  event timestamps on the same timeline after merge retries.

Nit #3 — legacy-counts fallback paths emit no events (parity gap):
  git-author and prs.agent fallback paths now emit challenger/synthesizer
  events via the TRAILER_EVENT_ROLE map when refined_type matches. Closes
  the gap where external-contributor challenge/enrich PRs would accumulate
  legacy counts but disappear from event-sourced leaderboards.

Nit #4 — migration v24 agent seed missing 'pipeline':
  Added "pipeline" to the seed list. Plus new migration v25 with idempotent
  corrective UPDATE so existing envs (where v24 already ran) pick up the
  fix on restart without requiring manual SQL. Verified on VPS state:
  pipeline row was kind='person', will flip to 'agent' on redeploy.

Nit #5 — backfill summary prints originator attempted=0 in wrong pass:
  Split the "=== Summary ===" header into "=== PR-level events ===" and
  "=== Claim-level originator pass ===" with originator counts in the
  right block. Operator-facing cosmetic.

Refactor #6 — AGENT_BRANCH_PREFIXES duplicated in 2 sites:
  Extracted to lib/attribution.py as single source of truth. contributor.py
  imports it. backfill-events.py keeps its local copy (runs standalone
  without pipeline package import) with a sync-reference comment.

No behavioral drift for the common case. Backfill re-runs cleanly against
existing forward-written events (UNIQUE-index idempotency).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 14:13:54 +01:00
58fa8c5276 feat(attribution): Phase A — event-sourced contribution ledger (schema v24)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Introduces contribution_events table + non-breaking double-write. Schema
lands today, forward traffic writes events alongside existing count upserts,
backfill script replays history. Phase B will add leaderboard API reading
from events; Phase C switches Argus dashboard over.

## Schema v24 (lib/db.py)

- contribution_events: one row per credit-earning event
  (id, handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
  Partial UNIQUE indexes handle SQLite's NULL != NULL semantics:
    idx_ce_unique_claim on (handle, role, pr_number, claim_path) WHERE claim_path NOT NULL
    idx_ce_unique_pr    on (handle, role, pr_number)             WHERE claim_path IS NULL
  PR-level events (evaluator, author, challenger, synthesizer) dedup on 3-tuple.
  Per-claim events (originator) dedup on 4-tuple. Idempotent on replay.
- contributor_aliases: canonical handle mapping
  Seeded: @thesensatore → thesensatore, cameron → cameron-s1
- contributors.kind TEXT DEFAULT 'person'
  Migration seeds 'agent' for known Pentagon agent handles.

## Role model (confirmed by Cory Apr 24)

Weights: author 0.30, challenger 0.25, synthesizer 0.20, originator 0.15, evaluator 0.05
- author:     human who submitted the PR (curation + submission work)
- originator: person who authored the underlying content (rewards external creators)
- challenger: agent/person who brought a productive disagreement
- synthesizer: cross-domain work (enrichments, research sessions)
- evaluator:  reviewer who approved (Leo + domain agent)

Humans-are-always-author: agents credit is capped at evaluator/synthesizer/
challenger. Pentagon agents classify as kind='agent' and surface in the
agent-view leaderboard, not the default person view.

## Writer (lib/contributor.py)

- New insert_contribution_event(): idempotent INSERT OR IGNORE with alias
  normalization + kind classification. Falls back silently on pre-v24 DBs.
- record_contributor_attribution double-writes alongside existing
  upsert_contributor calls. Zero risk to current dashboard.
- Author event: emitted once per PR from prs.submitted_by → git author →
  agent-branch-prefix.
- Originator events: emitted per claim from frontmatter sourcer, skipping
  when sourcer == author (avoids self-credit double-count).
- Evaluator events: Leo (always when leo_verdict='approve') + domain_agent
  (when domain_verdict='approve' and not Leo).
- Challenger/Synthesizer: emitted from Pentagon-Agent trailer on
  agent-owned branches (theseus/*, rio/*, etc.) based on commit_type.
  Pipeline-owned branches (extract/*, reweave/*) get no trailer-based event —
  infrastructure work isn't contribution credit.

## Helpers (lib/attribution.py)

- normalize_handle(raw, conn=None): lowercase + strip @ + alias lookup
- classify_kind(handle): returns 'agent' for PENTAGON_AGENTS, else 'person'
  Intentionally narrow. Orgs get classified by operator review, not heuristics.

## Backfill (scripts/backfill-events.py)

Replays all merged PRs into events. Idempotent (safe to re-run). Emits:
- PR-level: author, evaluator, challenger, synthesizer
- Per-claim: originator (walks knowledge tree, matches via description titles)

Known limitation: post-merge PR branches are deleted from Forgejo, so we
can't diff them for granular per-claim events. Claim→PR mapping uses
prs.description (pipe-separated titles). Misses some edge cases but
recovers the bulk of historical originator credit. Forward traffic gets
clean per-claim events via the normal record_contributor_attribution path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 13:59:22 +01:00
93917f9fc2 fix(attribution): --diff-filter=A + handle sanity filter + remove legacy fallback
Some checks are pending
CI / lint-and-test (push) Waiting to run
Ganymede review findings on epimetheus/contributor-attribution-fix branch:

1. BUG: record_contributor_attribution used `git diff --name-only` (all modified
   files), not just added. Enrich/challenge PRs re-credited the sourcer on every
   subsequent modification. Fixed: --diff-filter=A restricts to new files only.
   The synthesizer/challenger/reviewer roles for enrich PRs are still credited
   via the Pentagon-Agent trailer path, so this doesn't lose any correct credit.

2. WARNING: Legacy `source`-field heuristic fabricated garbage handles from
   descriptive strings ("sec-interpretive-release-s7-2026-09-(march-17",
   "governance---meritocratic-voting-+-futarchy"). Removed outright + added
   regex handle sanity filter (`^[a-z0-9][a-z0-9_-]{0,38}$`). Applied before
   every return path in parse_attribution (the nested-block early return was
   previously bypassing the filter).

   Dry-run impact: unique handles 83→70 (13 garbage filtered), NEW contributors
   49→48, EXISTING drift rows 34→22. The filter drops rows where the literal
   garbage string lives in frontmatter (Slotkin case: attribution.sourcer.handle
   was written as "senator-elissa-slotkin-/-the-hill" by the buggy legacy path).

3. NIT: Aligned knowledge_prefixes in the file walker to match is_knowledge_pr
   (removed entities/, convictions/). Widening those requires Cory sign-off
   since is_knowledge_pr currently gates entity-only PRs out of CI.

Tests: 17 pass (added test_bad_handles_filtered, test_valid_handle_with_hyphen_passes,
updated test_legacy_source_fallback → test_legacy_source_fallback_removed).

Ganymede review — 3-message protocol msg 3 pending.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 12:58:55 +01:00
3fe0f4b744 fix(attribution): credit sourcer/extractor from claim frontmatter
Three layers of contributor-attribution bug surfaced by Apr 24 leaderboard
investigation. alexastrum, thesensatore, cameron-s1 all had real merged
contributions but zero credit in the contributors table.

1. lib/attribution.py: parse_attribution() only read `attribution_sourcer:`
   prefix-keyed flat fields. ~42% of claim files (535/1280) use the bare-key
   form `sourcer: alexastrum` written by extract.py. Added bare-key handling
   between the prefixed-flat path and the legacy-source-field fallback.
   Block format (`attribution: { sourcer: [...] }`) still wins when present.

2. lib/contributor.py: record_contributor_attribution() parsed the diff text
   with regex looking for `+- handle: "X"` lines. This matched neither the
   bare-key flat format nor the `attribution: { sourcer: [...] }` block
   format Leo uses for manual extractions. Replaced the regex parser with
   a file walker that calls attribution.parse_attribution_from_file() on
   each changed knowledge file — single source of truth for both formats.

3. scripts/backfill-sourcer-attribution.py: walks all merged knowledge files,
   re-attributes via the canonical parser, upserts contributors. Default
   additive mode preserves existing high counts (e.g. m3taversal.sourcer=1011
   reflects Telegram-curator credit accumulated via a different code path
   that this fix does not touch). --reset flag for the destructive case.

Dry-run preview (additive mode):
  - 670 NEW contributors to insert (mostly source-citation handles)
  - 77 EXISTING contributors with under-counted role columns
  - alexastrum: 0 → 6, thesensatore: 0 → 5, cameron-s1: 0 → 2
  - astra.sourcer: 0 → 96, leo.sourcer: 0 → 44, theseus.sourcer: 0 → 18
  - m3taversal.sourcer: 1011 (preserved, not 22 from file walk)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 12:48:41 +01:00
05d15cea56 feat(activity): Timeline data gaps — type filter + commit_type classifier + source_channel reshape
Three hackathon-critical fixes for Timeline page rendering (Accelerate Solana, May 5):

Gap 1 — /api/activity respects ?type= now:
  - accepts single or comma-separated operation types
    (extract|new|enrich|challenge|infra)
  - over-fetches 5× limit (capped 2000) so post-build filtering still
    fills the requested page size
  - unknown types filter out cleanly

Gap 2 — classify_pr_operation() replaces STATUS_TO_OPERATION for merged PRs:
  - commit_type wins over branch prefix for merged PRs so extract/* branches
    with commit_type='enrich' or 'challenge' surface correctly (same gotcha
    as the contributor-role wiring fix)
  - priority: challenge → enrich (incl. reweave/) → maintenance (infra) → new
  - challenged_by detection carried over from activity_feed_api._classify_event
  - non-merged statuses unchanged (extract/new/infra/challenge as before)
  - SQL now selects commit_type + description alongside existing columns
  - 14 unit tests covering the gotcha matrix

Gap 3 — _CHANNEL_MAP reshape:
  - extract/, ingestion/ default → 'unknown' (was 'telegram'; telegram-origin
    classification now requires explicit tagging at ingestion time)
  - agent/maintenance mappings unchanged
  - github_pr override and gh-pr-* branches continue to return 'github'
  - 'web' registered as the canonical in-app submission channel (matches
    the platform-named pattern established by telegram/github/agent)
  - module docstring enumerates all six valid channels

Deployed to VPS; diagnostics + pipeline restarted clean.
Smoke: type=enrich returns 22 events (was 0), type=challenge returns 0
(matches DB — zero challenge commit_types).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 19:51:58 +01:00
cfcb06a6dc fix(diagnostics): commit claims_api + register routes that were VPS-only
Some checks are pending
CI / lint-and-test (push) Waiting to run
Root cause (per Epi audit):
- /api/claims, /api/contributors/list, /api/contributors/{handle} returned
  404 in prod. The route registrations and claims_api.py module existed only
  on VPS — never committed. Today's auto-deploy of an unrelated app.py change
  rsync'd the repo (registration-less) version over the VPS edits, wiping
  endpoints Vercel depended on.
- Recurrence of the deploy-without-commit pattern (blindspot #2).

Brings repo to parity with the live, working VPS state:
- Add diagnostics/claims_api.py (161 lines, was VPS-only)
- Wire register_claims_routes + register_contributor_routes in app.py
  alongside the existing register_activity_feed call

beliefs_routes.py is also VPS-only and currently unregistered (orphaned by
the same Apr 21 manual edit that dropped its registration). Left out of this
commit pending a decision on whether to revive or delete.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 12:51:17 +01:00
2f6424617b feat: wire Timeline activity endpoint + surface source_channel
/api/activity and /api/activity-feed were never registered in app.py —
both files existed but neither route was reachable (confirmed 404 on VPS).
Register both so Timeline and gamification feeds can consume them.

Adds source_channel to /api/activity payload (both PR rows and audit
events — audit rows return null since they aren't tied to a specific PR).
Migration v22 already populated prs.source_channel on VPS with enum:
telegram=2340, agent=698, maintenance=102, unknown=11, github=1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 12:22:12 +01:00
9a943e8460 feat: expose source_channel on activity feed
Adds p.source_channel to the SELECT and surfaces it on each event.
Migration v22 populated the column with enum values: telegram, agent,
maintenance, unknown, github. Timeline UI needs this to show per-event
provenance (2340 telegram, 698 agent, 102 maintenance, 11 unknown, 1 github).

Nulls fall back to "unknown" — only 0 rows currently null, but the
fallback is defensive for future inserts before backfill runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 12:20:26 +01:00
23 changed files with 4526 additions and 277 deletions

120
deploy/setup-infra-mirror.sh Executable file
View file

@ -0,0 +1,120 @@
#!/bin/bash
# One-time setup: prepare the bare mirror repo for teleo-infrastructure.
#
# Prerequisites (must happen BEFORE running this):
# 1. GitHub repo `living-ip/teleo-infrastructure` created (manual via web or
# `gh repo create` — the deploy PAT is fine-grained to teleo-codex only
# and cannot create new repos in the org).
# 2. GitHub PAT updated to include push access on the new repo (or rotate
# to a classic PAT with `repo` scope covering both).
#
# This script is idempotent — safe to re-run.
set -euo pipefail
MIRROR_BASE="/opt/teleo-eval/mirror"
REPO_DIR="$MIRROR_BASE/teleo-infrastructure.git"
FORGEJO_URL="http://localhost:3000/teleo/teleo-infrastructure.git"
GITHUB_REPO="living-ip/teleo-infrastructure"
FORGEJO_TOKEN_FILE="/opt/teleo-eval/secrets/forgejo-admin-token"
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
if [ ! -f "$FORGEJO_TOKEN_FILE" ]; then
echo "ERROR: missing $FORGEJO_TOKEN_FILE" >&2
exit 1
fi
if [ ! -f "$GITHUB_PAT_FILE" ]; then
echo "ERROR: missing $GITHUB_PAT_FILE" >&2
exit 1
fi
FORGEJO_TOKEN=$(cat "$FORGEJO_TOKEN_FILE" | tr -d '[:space:]')
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" | tr -d '[:space:]')
# Sanity check: GitHub repo must exist before we point a remote at it.
echo "Verifying GitHub repo $GITHUB_REPO exists..."
GH_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
-H "Authorization: Bearer $GITHUB_PAT" \
"https://api.github.com/repos/$GITHUB_REPO")
if [ "$GH_STATUS" != "200" ]; then
echo "ERROR: GitHub repo $GITHUB_REPO not accessible (HTTP $GH_STATUS)" >&2
echo "Create it first: gh repo create $GITHUB_REPO --public --description 'Pipeline + diagnostics infra for the LivingIP collective'" >&2
exit 2
fi
echo " OK — $GITHUB_REPO accessible"
# Sanity check: Forgejo repo must exist.
echo "Verifying Forgejo repo teleo/teleo-infrastructure exists..."
FG_STATUS=$(curl -sS -o /dev/null -w "%{http_code}" \
-H "Authorization: token $FORGEJO_TOKEN" \
"http://localhost:3000/api/v1/repos/teleo/teleo-infrastructure")
if [ "$FG_STATUS" != "200" ]; then
echo "ERROR: Forgejo repo teleo/teleo-infrastructure not accessible (HTTP $FG_STATUS)" >&2
exit 3
fi
echo " OK — Forgejo repo accessible"
# Init bare mirror if missing
if [ -d "$REPO_DIR" ]; then
echo "Bare repo already exists at $REPO_DIR — skipping init"
else
echo "Creating bare repo at $REPO_DIR..."
mkdir -p "$REPO_DIR"
cd "$REPO_DIR"
git init --bare >/dev/null
chown -R teleo:teleo "$REPO_DIR"
echo " OK — bare repo initialized"
fi
cd "$REPO_DIR"
# Configure remotes (idempotent: set-url succeeds whether remote exists or not)
# Forgejo remote (origin convention is reversed in this codebase: origin=GitHub,
# forgejo=Forgejo, matching the existing teleo-codex.git layout).
FORGEJO_REMOTE_URL="http://github-mirror:${FORGEJO_TOKEN}@localhost:3000/teleo/teleo-infrastructure.git"
# NOTE: "m3taversal" is a placeholder username — for fine-grained PATs the
# username field is decorative; the token does the auth. Matches the existing
# teleo-codex.git remote for consistency. (Ganymede review nit #4.)
GITHUB_REMOTE_URL="https://m3taversal:${GITHUB_PAT}@github.com/${GITHUB_REPO}.git"
if git remote get-url forgejo >/dev/null 2>&1; then
git remote set-url forgejo "$FORGEJO_REMOTE_URL"
echo " Updated forgejo remote URL"
else
git remote add forgejo "$FORGEJO_REMOTE_URL"
echo " Added forgejo remote"
fi
if git remote get-url origin >/dev/null 2>&1; then
git remote set-url origin "$GITHUB_REMOTE_URL"
echo " Updated origin remote URL"
else
git remote add origin "$GITHUB_REMOTE_URL"
echo " Added origin remote"
fi
# Initial fetch from Forgejo
echo "Fetching from Forgejo..."
git fetch forgejo --prune 2>&1 | sed 's/^/ /'
# Initial push to GitHub (will populate the empty repo)
# main_only mode: push ONLY refs/heads/main + tags, mirroring what sync-mirror.sh
# does for this repo on the recurring path. Agent review branches stay Forgejo-only.
echo "Pushing initial main + tags to GitHub..."
git update-ref refs/heads/main refs/remotes/forgejo/main 2>/dev/null || {
echo "ERROR: forgejo/main ref missing — fetch may have failed" >&2
exit 1
}
git push origin "refs/heads/main:refs/heads/main" 2>&1 | sed 's/^/ /' || {
echo "WARN: initial push failed — you may need to authorize the PAT for $GITHUB_REPO" >&2
}
git push origin --tags 2>&1 | sed 's/^/ /' || true
# Final permissions sweep
chown -R teleo:teleo "$REPO_DIR"
echo
echo "Setup complete. Verify with:"
echo " ssh teleo@77.42.65.182 ls -la $REPO_DIR/refs/heads"
echo " /opt/teleo-eval/sync-mirror.sh && tail -50 /opt/teleo-eval/logs/sync.log"

View file

@ -2,22 +2,35 @@
# Bidirectional sync: Forgejo (authoritative) <-> GitHub (public mirror) # Bidirectional sync: Forgejo (authoritative) <-> GitHub (public mirror)
# Forgejo wins on conflict. Runs every 2 minutes via cron. # Forgejo wins on conflict. Runs every 2 minutes via cron.
# #
# Repos handled (see MIRROR_REPOS below):
# - teleo-codex (mode=bidirectional): full PR roundtrip — fork PR refs from
# GitHub, auto-create Forgejo PR mirrors, link github_pr in pipeline.db.
# - teleo-infrastructure (mode=main_only): one-way sync of branches+tags from
# Forgejo to GitHub. No PR roundtrip — pipeline doesn't process infra PRs;
# external infra PRs land on GitHub for visibility, get reviewed manually.
#
# Security note: GitHub->Forgejo path is for external contributor convenience. # Security note: GitHub->Forgejo path is for external contributor convenience.
# Never auto-process branches arriving via this path without a PR. # Never auto-process branches arriving via this path without a PR.
# Eval pipeline and extract cron only act on PRs, not raw branches. # Eval pipeline and extract cron only act on PRs, not raw branches.
set -euo pipefail set -euo pipefail
REPO_DIR="/opt/teleo-eval/mirror/teleo-codex.git"
LOG="/opt/teleo-eval/logs/sync.log" LOG="/opt/teleo-eval/logs/sync.log"
LOCKFILE="/tmp/sync-mirror.lock" LOCKFILE="/tmp/sync-mirror.lock"
PIPELINE_DB="/opt/teleo-eval/pipeline/pipeline.db" PIPELINE_DB="/opt/teleo-eval/pipeline/pipeline.db"
GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat" GITHUB_PAT_FILE="/opt/teleo-eval/secrets/github-pat"
GITHUB_REPO="living-ip/teleo-codex"
log() { echo "[$(date -Iseconds)] $1" >> "$LOG"; } # (forgejo_owner_repo, github_owner_repo, bare_path, mode)
# mode: bidirectional | main_only
MIRROR_REPOS=(
"teleo/teleo-codex living-ip/teleo-codex /opt/teleo-eval/mirror/teleo-codex.git bidirectional"
"teleo/teleo-infrastructure living-ip/teleo-infrastructure /opt/teleo-eval/mirror/teleo-infrastructure.git main_only"
)
# Lockfile — prevent concurrent runs REPO_TAG="main"
log() { echo "[$(date -Iseconds)] [$REPO_TAG] $1" >> "$LOG"; }
# Lockfile — prevent concurrent runs (single lock for whole script)
if [ -f "$LOCKFILE" ]; then if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null) pid=$(cat "$LOCKFILE" 2>/dev/null)
if kill -0 "$pid" 2>/dev/null; then if kill -0 "$pid" 2>/dev/null; then
@ -28,38 +41,58 @@ fi
echo $$ > "$LOCKFILE" echo $$ > "$LOCKFILE"
trap 'rm -f "$LOCKFILE"' EXIT trap 'rm -f "$LOCKFILE"' EXIT
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true) # ─────────────────────────────────────────────────────────────────────────────
if [ -n "$BAD_PERMS" ]; then # sync_repo: process one mirror entry. Sets module-level FORGEJO_REPO,
# GITHUB_REPO, REPO_DIR, MODE, REPO_TAG used by inner steps.
# ─────────────────────────────────────────────────────────────────────────────
sync_repo() {
FORGEJO_REPO="$1" # e.g. teleo/teleo-codex (path on Forgejo)
GITHUB_REPO="$2" # e.g. living-ip/teleo-codex (path on GitHub)
REPO_DIR="$3" # bare mirror dir
MODE="$4" # bidirectional | main_only
REPO_TAG="${FORGEJO_REPO##*/}" # short name for log prefix
# Pre-flight: bare repo must exist
if [ ! -d "$REPO_DIR" ]; then
log "ERROR: bare repo missing at $REPO_DIR — skipping"
return 0
fi
# Pre-flight: fix permissions if another user touched the mirror dir (Rhea)
BAD_PERMS=$(find "$REPO_DIR" ! -user teleo 2>/dev/null | head -1 || true)
if [ -n "$BAD_PERMS" ]; then
log "Fixing mirror permissions (found: $BAD_PERMS)" log "Fixing mirror permissions (found: $BAD_PERMS)"
chown -R teleo:teleo "$REPO_DIR" 2>/dev/null chown -R teleo:teleo "$REPO_DIR" 2>/dev/null || true
fi fi
cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; exit 1; } cd "$REPO_DIR" || { log "ERROR: cannot cd to $REPO_DIR"; return 0; }
# Step 1: Fetch from Forgejo (must succeed — it's authoritative) # Step 1: Fetch from Forgejo (must succeed — it's authoritative)
log "Fetching from Forgejo..." log "Fetching from Forgejo..."
if ! git fetch forgejo --prune >> "$LOG" 2>&1; then if ! git fetch forgejo --prune >> "$LOG" 2>&1; then
log "ERROR: Forgejo fetch failed — aborting" log "ERROR: Forgejo fetch failed — skipping this repo"
exit 1 return 0
fi fi
# Step 2: Fetch from GitHub (warn on failure, don't abort) # Step 2: Fetch from GitHub (warn on failure, don't abort)
log "Fetching from GitHub..." log "Fetching from GitHub..."
git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed" git fetch origin --prune >> "$LOG" 2>&1 || log "WARN: GitHub fetch failed"
# Step 2.1: Fetch GitHub fork PR refs # Step 2.1: Fetch GitHub fork PR refs (bidirectional only)
# Fork-based PRs don't create branches on origin — they create refs/pull/N/head # Fork-based PRs don't create branches on origin — they create refs/pull/N/head.
# Fetch these so we can push them to Forgejo for evaluation # main_only repos don't accept fork PRs through the mirror path.
GITHUB_PAT_STEP2=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]') if [ "$MODE" = "bidirectional" ]; then
if [ -n "$GITHUB_PAT_STEP2" ]; then local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
if [ -n "$PAT" ]; then
local OPEN_PRS
OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \ OPEN_PRS=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?state=open&per_page=100" \
-H "Authorization: token $GITHUB_PAT_STEP2" 2>/dev/null || echo "[]") -H "Authorization: token $PAT" 2>/dev/null || echo "[]")
echo "$OPEN_PRS" | python3 -c " echo "$OPEN_PRS" | python3 -c "
import sys, json import sys, json
prs = json.load(sys.stdin) prs = json.load(sys.stdin)
for pr in prs: for pr in prs:
head = pr.get('head', {}) head = pr.get('head', {})
# Only process fork PRs (repo differs from base repo)
base_repo = pr.get('base', {}).get('repo', {}).get('full_name', '') base_repo = pr.get('base', {}).get('repo', {}).get('full_name', '')
head_repo = head.get('repo', {}) or {} head_repo = head.get('repo', {}) or {}
head_full = head_repo.get('full_name', '') head_full = head_repo.get('full_name', '')
@ -67,23 +100,24 @@ for pr in prs:
print(f\"{pr['number']} {head.get('ref', '')} {head.get('sha', '')}\") print(f\"{pr['number']} {head.get('ref', '')} {head.get('sha', '')}\")
" 2>/dev/null | while read pr_num branch_name head_sha; do " 2>/dev/null | while read pr_num branch_name head_sha; do
if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi if [ -z "$pr_num" ] || [ -z "$branch_name" ]; then continue; fi
PR_BRANCH="gh-pr-${pr_num}/${branch_name}" local PR_BRANCH="gh-pr-${pr_num}/${branch_name}"
# Check if we already have this ref at the right SHA local EXISTING
EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true) EXISTING=$(git rev-parse "refs/heads/$PR_BRANCH" 2>/dev/null || true)
if [ "$EXISTING" = "$head_sha" ]; then continue; fi if [ "$EXISTING" = "$head_sha" ]; then continue; fi
# Fetch the PR ref and create a local branch
git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \ git fetch origin "refs/pull/${pr_num}/head:refs/heads/$PR_BRANCH" >> "$LOG" 2>&1 && \
log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \ log "Fetched fork PR #$pr_num -> $PR_BRANCH" || \
log "WARN: Failed to fetch fork PR #$pr_num" log "WARN: Failed to fetch fork PR #$pr_num"
done done
fi fi
fi
# Step 2.5: GitHub main -> Forgejo main (ff-only) # Step 2.5: GitHub main -> Forgejo main (ff-only)
# If a PR was merged on GitHub, GitHub main is ahead of Forgejo main. # If a PR was merged on GitHub, GitHub main is ahead of Forgejo main.
# Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence. # Fast-forward Forgejo main to match — safe because ff-only guarantees no divergence.
GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true) local GITHUB_MAIN_FF FORGEJO_MAIN_FF
FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true) GITHUB_MAIN_FF=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then FORGEJO_MAIN_FF=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then if [ "$GITHUB_MAIN_FF" != "$FORGEJO_MAIN_FF" ]; then
if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then if git merge-base --is-ancestor "$FORGEJO_MAIN_FF" "$GITHUB_MAIN_FF"; then
log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding" log "GitHub main ($GITHUB_MAIN_FF) ahead of Forgejo main ($FORGEJO_MAIN_FF) — fast-forwarding"
@ -92,50 +126,83 @@ if [ -n "$GITHUB_MAIN_FF" ] && [ -n "$FORGEJO_MAIN_FF" ]; then
log "WARN: Failed to fast-forward Forgejo main" log "WARN: Failed to fast-forward Forgejo main"
fi fi
fi fi
fi fi
# Step 3: Forgejo -> GitHub (primary direction) # Step 3: Forgejo -> GitHub (primary direction)
# Update local refs from Forgejo remote refs using process substitution (avoids subshell) log "Syncing Forgejo -> GitHub..."
log "Syncing Forgejo -> GitHub..." while read branch; do
while read branch; do
[ "$branch" = "HEAD" ] && continue [ "$branch" = "HEAD" ] && continue
git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \ git update-ref "refs/heads/$branch" "refs/remotes/forgejo/$branch" 2>/dev/null || \
log "WARN: Failed to update ref $branch" log "WARN: Failed to update ref $branch"
done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/) done < <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/)
# Safety: verify Forgejo main descends from GitHub main before force-pushing # Safety: verify Forgejo main descends from GitHub main before force-pushing
GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true) local GITHUB_MAIN FORGEJO_MAIN PUSH_MAIN
FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true) GITHUB_MAIN=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
PUSH_MAIN=true FORGEJO_MAIN=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then PUSH_MAIN=true
if [ -n "$GITHUB_MAIN" ] && [ -n "$FORGEJO_MAIN" ]; then
if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then if ! git merge-base --is-ancestor "$GITHUB_MAIN" "$FORGEJO_MAIN"; then
log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push" log "CRITICAL: Forgejo main is NOT a descendant of GitHub main — skipping main push"
log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN" log "CRITICAL: GitHub main: $GITHUB_MAIN, Forgejo main: $FORGEJO_MAIN"
PUSH_MAIN=false PUSH_MAIN=false
fi fi
fi fi
if [ "$PUSH_MAIN" = true ]; then if [ "$MODE" = "main_only" ]; then
# Infra-style mirror: push main + tags ONLY. Pre-review agent branches
# (epimetheus/*, ganymede/*, etc.) carry internal context — agent UUIDs,
# in-flight discussion, WIP — and must not land in the public GitHub
# history. (Ganymede review, finding #1.)
if [ "$PUSH_MAIN" = true ]; then
git push origin --force "refs/heads/main:refs/heads/main" >> "$LOG" 2>&1 || \
log "WARN: main push to GitHub failed"
fi
else
# Bidirectional mirror (codex): push all branches so external
# contributors can fork from any branch, not just main.
if [ "$PUSH_MAIN" = true ]; then
git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed" git push origin --all --force >> "$LOG" 2>&1 || log "WARN: Push to GitHub failed"
else else
# Push all branches except main # Push all branches except main when main is divergent
while read branch; do while read branch; do
[ "$branch" = "main" ] && continue [ "$branch" = "main" ] && continue
[ "$branch" = "HEAD" ] && continue [ "$branch" = "HEAD" ] && continue
git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \ git push origin --force "refs/heads/$branch:refs/heads/$branch" >> "$LOG" 2>&1 || \
log "WARN: Failed to push $branch to GitHub" log "WARN: Failed to push $branch to GitHub"
done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/) done < <(git for-each-ref --format="%(refname:lstrip=2)" refs/heads/)
fi fi
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed" fi
git push origin --tags --force >> "$LOG" 2>&1 || log "WARN: Tag push to GitHub failed"
# Step 4: GitHub -> Forgejo (external contributions only) # Step 4: GitHub -> Forgejo + Forgejo PR auto-create (bidirectional only)
# Only push branches that exist on GitHub but NOT on Forgejo if [ "$MODE" = "bidirectional" ]; then
log "Checking GitHub-only branches..." sync_github_to_forgejo_with_prs
GITHUB_ONLY=$(comm -23 \ fi
# Step 6: Divergence alerting (applies to both modes)
check_divergence
}
# ─────────────────────────────────────────────────────────────────────────────
# Step 4 split out: codex-specific GitHub→Forgejo branch push + PR auto-create.
# Reads FORGEJO_REPO, GITHUB_REPO, PIPELINE_DB, REPO_TAG from sync_repo scope.
# ─────────────────────────────────────────────────────────────────────────────
sync_github_to_forgejo_with_prs() {
log "Checking GitHub-only branches..."
local FORGEJO_HOST="http://localhost:3000/api/v1/repos/$FORGEJO_REPO"
local GITHUB_ONLY
GITHUB_ONLY=$(comm -23 \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \ <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/origin/ | grep -v HEAD | sort) \
<(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort)) <(git for-each-ref --format="%(refname:lstrip=3)" refs/remotes/forgejo/ | grep -v HEAD | sort))
if [ -n "$GITHUB_ONLY" ]; then if [ -z "$GITHUB_ONLY" ]; then
log "No new GitHub-only branches"
return 0
fi
local FORGEJO_TOKEN
FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null) FORGEJO_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token 2>/dev/null)
for branch in $GITHUB_ONLY; do for branch in $GITHUB_ONLY; do
log "New from GitHub: $branch -> Forgejo" log "New from GitHub: $branch -> Forgejo"
@ -151,20 +218,21 @@ if [ -n "$GITHUB_ONLY" ]; then
continue continue
} }
fi fi
# Auto-create PR on Forgejo for mirrored branches (external contributor path) # Skip pipeline-internal branch prefixes (no PR creation)
# Skip pipeline-internal branches
case "$branch" in case "$branch" in
extract/*|ingestion/*) continue ;; extract/*|ingestion/*) continue ;;
esac esac
if [ -n "$FORGEJO_TOKEN" ]; then if [ -z "$FORGEJO_TOKEN" ]; then continue; fi
# Check if PR already exists for this branch (open or closed) # Check if PR already exists for this branch (open or closed)
# NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs). # NOTE: Forgejo ?head= filter is broken (ignores head value, returns all PRs).
# Workaround: fetch open+closed PRs, pipe to Python, check head.ref. # Workaround: fetch open+closed PRs, pipe to Python, check head.ref.
local HAS_PR
HAS_PR=$( { HAS_PR=$( {
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=open&limit=50" \ curl -sf "$FORGEJO_HOST/pulls?state=open&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]" -H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
echo "" echo ""
curl -sf "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls?state=closed&sort=created&limit=50" \ curl -sf "$FORGEJO_HOST/pulls?state=closed&sort=created&limit=50" \
-H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]" -H "Authorization: token $FORGEJO_TOKEN" 2>/dev/null || echo "[]"
} | python3 -c " } | python3 -c "
import sys, json import sys, json
@ -179,83 +247,92 @@ for line in sys.stdin:
except: pass except: pass
print('no') print('no')
" "$branch" 2>/dev/null || echo "no") " "$branch" 2>/dev/null || echo "no")
if [ "$HAS_PR" = "no" ]; then
if [ "$HAS_PR" = "yes" ]; then continue; fi
# Build PR title — for fork PRs, use the GitHub PR title # Build PR title — for fork PRs, use the GitHub PR title
local PR_TITLE PAYLOAD RESULT PR_NUM GH_PR_NUM
if [[ "$branch" == gh-pr-* ]]; then if [[ "$branch" == gh-pr-* ]]; then
local FORK_GH_NUM PAT_T
FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|') FORK_GH_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
GITHUB_PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]') PAT_T=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \ PR_TITLE=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls/$FORK_GH_NUM" \
-H "Authorization: token $GITHUB_PAT_T" 2>/dev/null | \ -H "Authorization: token $PAT_T" 2>/dev/null | \
python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true) python3 -c "import sys,json; print(json.load(sys.stdin).get('title',''))" 2>/dev/null || true)
[ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g') [ -z "$PR_TITLE" ] && PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
else else
PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g') PR_TITLE=$(echo "$branch" | sed 's|/|: |;s/-/ /g')
fi fi
PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch") PAYLOAD=$(python3 -c "import sys,json; print(json.dumps({'title':sys.argv[1],'head':sys.argv[2],'base':'main'}))" "$PR_TITLE" "$branch")
RESULT=$(curl -sf -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \ RESULT=$(curl -sf -X POST "$FORGEJO_HOST/pulls" \
-H "Authorization: token $FORGEJO_TOKEN" \ -H "Authorization: token $FORGEJO_TOKEN" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d "$PAYLOAD" 2>/dev/null || echo "") -d "$PAYLOAD" 2>/dev/null || echo "")
PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true) PR_NUM=$(echo "$RESULT" | grep -o '"number":[0-9]*' | head -1 | grep -o "[0-9]*" || true)
if [ -n "$PR_NUM" ]; then if [ -z "$PR_NUM" ]; then
log "WARN: Failed to auto-create PR for $branch"
continue
fi
log "Auto-created PR #$PR_NUM on Forgejo for $branch" log "Auto-created PR #$PR_NUM on Forgejo for $branch"
# Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB # Step 4.5: Link GitHub PR to Forgejo PR in pipeline DB
if [[ "$branch" == gh-pr-* ]]; then if [[ "$branch" == gh-pr-* ]]; then
GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|') GH_PR_NUM=$(echo "$branch" | sed 's|gh-pr-\([0-9]*\)/.*|\1|')
else else
GITHUB_PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]') local PAT
PAT=$(cat "$GITHUB_PAT_FILE" 2>/dev/null | tr -d '[:space:]')
GH_PR_NUM="" GH_PR_NUM=""
if [ -n "$GITHUB_PAT" ]; then if [ -n "$PAT" ]; then
GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \ GH_PR_NUM=$(curl -sf "https://api.github.com/repos/$GITHUB_REPO/pulls?head=living-ip:$branch&state=all" \
-H "Authorization: token $GITHUB_PAT" 2>/dev/null | \ -H "Authorization: token $PAT" 2>/dev/null | \
python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true) python3 -c "import sys,json; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')" 2>/dev/null || true)
fi fi
fi fi
if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then if [[ "$GH_PR_NUM" =~ ^[0-9]+$ ]] && [[ "$PR_NUM" =~ ^[0-9]+$ ]]; then
sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM WHERE number = $PR_NUM;" 2>/dev/null && \ sqlite3 "$PIPELINE_DB" "UPDATE prs SET github_pr = $GH_PR_NUM, source_channel = 'github' WHERE number = $PR_NUM;" 2>/dev/null && \
log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \ log "Linked GitHub PR #$GH_PR_NUM -> Forgejo PR #$PR_NUM" || \
log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB" log "WARN: Failed to link GitHub PR #$GH_PR_NUM to Forgejo PR #$PR_NUM in DB"
fi fi
else
log "WARN: Failed to auto-create PR for $branch"
fi
fi
fi
done done
else }
log "No new GitHub-only branches"
fi
# Step 6: Divergence alerting
# After all sync steps, check if GitHub and Forgejo main still differ.
# 2 consecutive divergent cycles (4 min) triggers a one-shot Telegram alert.
DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count"
git fetch forgejo main --quiet 2>/dev/null || true
git fetch origin main --quiet 2>/dev/null || true
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then # ─────────────────────────────────────────────────────────────────────────────
# Step 6 split out: divergence alerting. Per-repo state file so each repo
# has its own divergence counter and alert state.
# ─────────────────────────────────────────────────────────────────────────────
check_divergence() {
local DIVERGENCE_FILE="/opt/teleo-eval/logs/.divergence-count.${REPO_TAG}"
git fetch forgejo main --quiet 2>/dev/null || true
git fetch origin main --quiet 2>/dev/null || true
local GH_MAIN_FINAL FG_MAIN_FINAL
GH_MAIN_FINAL=$(git rev-parse refs/remotes/origin/main 2>/dev/null || true)
FG_MAIN_FINAL=$(git rev-parse refs/remotes/forgejo/main 2>/dev/null || true)
if [ -n "$GH_MAIN_FINAL" ] && [ -n "$FG_MAIN_FINAL" ] && [ "$GH_MAIN_FINAL" != "$FG_MAIN_FINAL" ]; then
local PREV
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0") PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" = "alerted" ]; then if [ "$PREV" = "alerted" ]; then
log "DIVERGENCE: still diverged (already alerted)" log "DIVERGENCE: still diverged (already alerted)"
else else
COUNT=$((PREV + 1)) local COUNT=$((PREV + 1))
echo "$COUNT" > "$DIVERGENCE_FILE" echo "$COUNT" > "$DIVERGENCE_FILE"
log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL" log "DIVERGENCE: cycle $COUNT — GitHub=$GH_MAIN_FINAL Forgejo=$FG_MAIN_FINAL"
if [ "$COUNT" -ge 2 ]; then if [ "$COUNT" -ge 2 ]; then
local BOT_TOKEN ADMIN_CHAT
BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true) BOT_TOKEN=$(cat /opt/teleo-eval/secrets/telegram-bot-token 2>/dev/null || true)
ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true) ADMIN_CHAT=$(cat /opt/teleo-eval/secrets/admin-chat-id 2>/dev/null || true)
if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then if [ -n "$BOT_TOKEN" ] && [ -n "$ADMIN_CHAT" ]; then
local ALERT_MSG
ALERT_MSG=$(python3 -c " ALERT_MSG=$(python3 -c "
import json, sys import json, sys
msg = '⚠️ Mirror divergence detected\\n\\n' msg = '⚠️ Mirror divergence detected (' + sys.argv[5] + ')\\n\\n'
msg += f'GitHub main: {sys.argv[1][:8]}\\n' msg += f'GitHub main: {sys.argv[1][:8]}\\n'
msg += f'Forgejo main: {sys.argv[2][:8]}\\n' msg += f'Forgejo main: {sys.argv[2][:8]}\\n'
msg += f'Diverged for {sys.argv[3]} consecutive cycles ({int(sys.argv[3])*2} min)\\n\\n' msg += f'Diverged for {sys.argv[3]} consecutive cycles ({int(sys.argv[3])*2} min)\\n\\n'
msg += 'Check sync-mirror.sh logs: /opt/teleo-eval/logs/sync.log' msg += 'Check sync-mirror.sh logs: /opt/teleo-eval/logs/sync.log'
print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'})) print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'}))
" "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT") " "$GH_MAIN_FINAL" "$FG_MAIN_FINAL" "$COUNT" "$ADMIN_CHAT" "$REPO_TAG")
if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \ if curl -sf -X POST "https://api.telegram.org/bot${BOT_TOKEN}/sendMessage" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d "$ALERT_MSG" >> "$LOG" 2>&1; then -d "$ALERT_MSG" >> "$LOG" 2>&1; then
@ -269,14 +346,60 @@ print(json.dumps({'chat_id': sys.argv[4], 'text': msg, 'parse_mode': 'HTML'}))
fi fi
fi fi
fi fi
else else
if [ -f "$DIVERGENCE_FILE" ]; then if [ -f "$DIVERGENCE_FILE" ]; then
local PREV
PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0") PREV=$(cat "$DIVERGENCE_FILE" 2>/dev/null || echo "0")
if [ "$PREV" != "0" ]; then if [ "$PREV" != "0" ]; then
log "DIVERGENCE: resolved — repos back in sync" log "DIVERGENCE: resolved — repos back in sync"
fi fi
rm -f "$DIVERGENCE_FILE" rm -f "$DIVERGENCE_FILE"
fi fi
fi
}
# ─────────────────────────────────────────────────────────────────────────────
# Main: process each configured mirror in sequence.
# A failure on one repo doesn't block subsequent repos — sync_repo returns 0
# on most error paths to keep the loop going.
# ─────────────────────────────────────────────────────────────────────────────
REPO_TAG="main"
log "Starting sync cycle"
# Step 0: self-heal any gh-pr-* PR rows missing github_pr.
# Runs FIRST — before per-repo work (branch-mirror loop, auto-create-PR block).
# Recovers from races/transient failures in Step 4.5's one-shot link UPDATE.
# Idempotent: SELECT empty when clean, zero-cost path. Same SELECT/UPDATE
# heals historical orphans (PR 4066 picked up on first cron tick post-deploy)
# and future races on subsequent ticks. The branch name encodes the GitHub PR
# number deterministically (gh-pr-{N}/...) so no API call is required.
if [ -f "$PIPELINE_DB" ]; then
sqlite3 -separator '|' "$PIPELINE_DB" \
"SELECT number, branch FROM prs WHERE branch LIKE 'gh-pr-%' AND github_pr IS NULL;" \
2>/dev/null | while IFS='|' read -r pr_num branch; do
# Regex requires >=1 digit — empty/non-numeric branches fail to parse here,
# not just at the empty-guard below. Keeps SQL-integer-safety load-bearing
# on the regex alone. [0-9][0-9]* is the portable BRE form of [0-9]+,
# works on both GNU sed (VPS) and BSD sed (dev macs).
gh_pr_num=$(echo "$branch" | sed -n 's|^gh-pr-\([0-9][0-9]*\)/.*|\1|p')
[ -z "$gh_pr_num" ] && continue
# Both interpolated values are integer-validated upstream (pr_num from
# INTEGER `number` column, gh_pr_num from regex above). No parametric
# binding available in bash sqlite3 — safety relies on those invariants.
if sqlite3 "$PIPELINE_DB" \
"UPDATE prs SET github_pr = $gh_pr_num, source_channel = 'github' WHERE number = $pr_num;" \
2>/dev/null; then
log "self-heal: linked Forgejo PR #$pr_num -> GitHub PR #$gh_pr_num"
fi
done
fi fi
log "Sync complete" for entry in "${MIRROR_REPOS[@]}"; do
# Read the 4 fields. `read` splits on $IFS (whitespace) by default.
read -r forgejo_repo github_repo bare_path mode <<< "$entry"
sync_repo "$forgejo_repo" "$github_repo" "$bare_path" "$mode"
done
REPO_TAG="main"
log "Sync cycle complete"

View file

@ -28,12 +28,9 @@ import sqlite3
import json import json
# Map PR status to Clay's operation color palette # Non-merged statuses map directly to operation — no semantic classification yet.
# extract (cyan), new (green), enrich (amber), challenge (red-orange), NON_MERGED_STATUS_TO_OPERATION = {
# decision (violet), infra (grey) 'approved': 'new', # about to become knowledge
STATUS_TO_OPERATION = {
'merged': 'new', # green — new knowledge merged
'approved': 'enrich', # amber — approved, enriching KB
'open': 'extract', # cyan — new extraction in progress 'open': 'extract', # cyan — new extraction in progress
'validating': 'extract', # cyan — being validated 'validating': 'extract', # cyan — being validated
'reviewing': 'extract', # cyan — under review 'reviewing': 'extract', # cyan — under review
@ -43,6 +40,51 @@ STATUS_TO_OPERATION = {
'conflict': 'challenge', # red-orange — conflict detected 'conflict': 'challenge', # red-orange — conflict detected
} }
# Maintenance commit_types that land on main but don't represent new knowledge.
_MAINTENANCE_COMMIT_TYPES = {'fix', 'pipeline', 'reweave'}
def classify_pr_operation(status, commit_type, branch, description=None):
"""Derive a Timeline operation from a PR row.
Priority order for MERGED PRs (commit_type wins over branch prefix
extract/* branches with commit_type='enrich' or 'challenge' classify
by commit_type, matching the contributor-role wiring fix):
1. commit_type == 'challenge' OR branch.startswith('challenge/') OR
description contains 'challenged_by' 'challenge'
2. commit_type == 'enrich' OR branch.startswith('enrich/' | 'reweave/')
'enrich'
3. commit_type in _MAINTENANCE_COMMIT_TYPES 'infra'
4. default (commit_type='knowledge'|'extract'|'research'|'entity' or
anything else) 'new'
For non-merged PRs, falls back to NON_MERGED_STATUS_TO_OPERATION.
"""
commit_type = (commit_type or '').lower()
branch = branch or ''
description_lower = (description or '').lower()
if status != 'merged':
return NON_MERGED_STATUS_TO_OPERATION.get(status, 'infra')
# Challenge takes precedence — the signal is inherently more specific.
if (commit_type == 'challenge'
or branch.startswith('challenge/')
or 'challenged_by' in description_lower):
return 'challenge'
if (commit_type == 'enrich'
or branch.startswith('enrich/')
or branch.startswith('reweave/')):
return 'enrich'
if commit_type in _MAINTENANCE_COMMIT_TYPES:
return 'infra'
# Default: legacy 'knowledge', new 'extract', 'research', 'entity',
# unknown/null commit_type → treat as new knowledge.
return 'new'
# Map audit_log stage to operation type # Map audit_log stage to operation type
STAGE_TO_OPERATION = { STAGE_TO_OPERATION = {
'ingest': 'extract', 'ingest': 'extract',
@ -118,6 +160,8 @@ async def handle_activity(request):
Query params: Query params:
limit (int, default 100, max 500): number of events to return limit (int, default 100, max 500): number of events to return
cursor (ISO timestamp): return events older than this timestamp cursor (ISO timestamp): return events older than this timestamp
type (str, optional): comma-separated operation types to include
(extract|new|enrich|challenge|infra). If absent, returns all types.
Derives events from two sources: Derives events from two sources:
1. prs table per-PR events with domain, agent, status 1. prs table per-PR events with domain, agent, status
@ -131,6 +175,13 @@ async def handle_activity(request):
limit = 100 limit = 100
cursor = request.query.get('cursor') cursor = request.query.get('cursor')
type_param = request.query.get('type', '').strip()
allowed_ops = None
if type_param:
allowed_ops = {t.strip() for t in type_param.split(',') if t.strip()}
if not allowed_ops:
allowed_ops = None
db_path = request.app['db_path'] db_path = request.app['db_path']
try: try:
@ -143,22 +194,27 @@ async def handle_activity(request):
# Each PR generates events at created_at and merged_at timestamps # Each PR generates events at created_at and merged_at timestamps
pr_query = """ pr_query = """
SELECT number, status, domain, agent, branch, source_path, SELECT number, status, domain, agent, branch, source_path,
created_at, merged_at created_at, merged_at, source_channel, commit_type,
description
FROM prs FROM prs
WHERE {where_clause} WHERE {where_clause}
ORDER BY COALESCE(merged_at, created_at) DESC ORDER BY COALESCE(merged_at, created_at) DESC
LIMIT ? LIMIT ?
""" """
# Over-fetch when filtering by type so we have enough matching rows after
# post-build filtering. Cap at 2000 to avoid runaway queries.
fetch_limit = min(2000, limit * 5) if allowed_ops else limit + 1
if cursor: if cursor:
rows = conn.execute( rows = conn.execute(
pr_query.format(where_clause="COALESCE(merged_at, created_at) < ?"), pr_query.format(where_clause="COALESCE(merged_at, created_at) < ?"),
(cursor, limit + 1) (cursor, fetch_limit)
).fetchall() ).fetchall()
else: else:
rows = conn.execute( rows = conn.execute(
pr_query.format(where_clause="1=1"), pr_query.format(where_clause="1=1"),
(limit + 1,) (fetch_limit,)
).fetchall() ).fetchall()
# Known knowledge agents for branch-prefix inference # Known knowledge agents for branch-prefix inference
@ -166,7 +222,14 @@ async def handle_activity(request):
for row in rows: for row in rows:
row_dict = dict(row) row_dict = dict(row)
operation = STATUS_TO_OPERATION.get(row_dict['status'], 'infra') operation = classify_pr_operation(
row_dict['status'],
row_dict.get('commit_type'),
row_dict.get('branch'),
row_dict.get('description'),
)
if allowed_ops and operation not in allowed_ops:
continue
description = pr_description(row_dict) description = pr_description(row_dict)
# Use merged_at if available (more interesting event), else created_at # Use merged_at if available (more interesting event), else created_at
@ -189,6 +252,7 @@ async def handle_activity(request):
'description': description, 'description': description,
'status': row_dict['status'], 'status': row_dict['status'],
'pr_number': row_dict['number'], 'pr_number': row_dict['number'],
'source_channel': row_dict.get('source_channel') or 'unknown',
}) })
# Source 2: Audit log events (secondary — pipeline-level) # Source 2: Audit log events (secondary — pipeline-level)
@ -217,6 +281,8 @@ async def handle_activity(request):
for row in audit_rows: for row in audit_rows:
row_dict = dict(row) row_dict = dict(row)
operation = STAGE_TO_OPERATION.get(row_dict['stage'], 'infra') operation = STAGE_TO_OPERATION.get(row_dict['stage'], 'infra')
if allowed_ops and operation not in allowed_ops:
continue
description = audit_description(row_dict) description = audit_description(row_dict)
events.append({ events.append({
@ -228,6 +294,7 @@ async def handle_activity(request):
'description': description, 'description': description,
'status': None, 'status': None,
'pr_number': None, 'pr_number': None,
'source_channel': None, # audit events not tied to a PR
}) })
conn.close() conn.close()

View file

@ -9,6 +9,16 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
_cache = {"data": None, "ts": 0} _cache = {"data": None, "ts": 0}
CACHE_TTL = 60 # 1 minute — activity should feel fresh CACHE_TTL = 60 # 1 minute — activity should feel fresh
# commit_types we surface in the activity feed. `pipeline` is system
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
# inbox archive filenames, not claim slugs. Used as a fallback signal when
# branch/description heuristics miss (e.g. populated descriptions that
# happen to be source titles, not claim insights).
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
def _get_conn(): def _get_conn():
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(DB_PATH)
@ -17,19 +27,52 @@ def _get_conn():
return conn return conn
def _classify_event(branch, description, commit_type): def _is_source_slug(slug):
if commit_type != "knowledge": return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Two signals classify them
as 'source' (defense in depth):
1. extract/* branch with empty description (no claim title produced)
2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern)
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
description_lower = (description or "").lower()
has_desc = bool(description and description.strip())
if commit_type_l not in _FEED_COMMIT_TYPES:
return None return None
if branch and branch.startswith("extract/"):
return "create" # Explicit challenge signals win first.
if branch and branch.startswith("reweave/"): if (commit_type_l == "challenge"
return "enrich" or branch.startswith("challenge/")
if branch and branch.startswith("challenge/"): or "challenged_by" in description_lower):
return "challenge" return "challenge"
if description and "challenged_by" in description.lower():
return "challenge" # Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
if branch and branch.startswith("enrich/"): if (commit_type_l == "enrich"
or branch.startswith("enrich/")
or branch.startswith("reweave/")):
return "enrich" return "enrich"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
return "source"
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
# like an inbox archive filename (date-prefix-hash), treat as source
# regardless of branch/commit_type/description state. Catches cases
# where description leaked but is just a source title, not a claim.
if _is_source_slug(candidate_slug):
return "source"
# Everything else with a description is a new claim.
return "create" return "create"
@ -59,7 +102,7 @@ def _extract_claim_slugs(description, branch=None):
if branch: if branch:
parts = branch.split("/", 1) parts = branch.split("/", 1)
if len(parts) > 1: if len(parts) > 1:
return [parts[1][:120]] return [parts[1]]
return [] return []
titles = [t.strip() for t in description.split("|") if t.strip()] titles = [t.strip() for t in description.split("|") if t.strip()]
slugs = [] slugs = []
@ -68,7 +111,7 @@ def _extract_claim_slugs(description, branch=None):
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug) slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
slug = slug.replace(" ", "-").strip("-") slug = slug.replace(" ", "-").strip("-")
if len(slug) > 10: if len(slug) > 10:
slugs.append(slug[:120]) slugs.append(slug)
return slugs return slugs
@ -81,32 +124,60 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
def _build_events(): def _build_events():
conn = _get_conn() conn = _get_conn()
try: try:
rows = conn.execute(""" placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
rows = conn.execute(f"""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel, p.source_path
FROM prs p FROM prs p
WHERE p.status = 'merged' WHERE p.status = 'merged'
AND p.commit_type = 'knowledge' AND p.commit_type IN ({placeholders})
AND p.merged_at IS NOT NULL AND p.merged_at IS NOT NULL
ORDER BY p.merged_at DESC ORDER BY p.merged_at DESC
LIMIT 2000 LIMIT 2000
""").fetchall() """, _FEED_COMMIT_TYPES).fetchall()
events = [] events = []
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
for row in rows: for row in rows:
event_type = _classify_event(row["branch"], row["description"], row["commit_type"]) slugs = _extract_claim_slugs(row["description"], row["branch"])
candidate_slug = slugs[0] if slugs else ""
event_type = _classify_event(
row["branch"], row["description"], row["commit_type"],
candidate_slug=candidate_slug,
)
if not event_type: if not event_type:
continue continue
contributor = _normalize_contributor(row["submitted_by"], row["agent"]) contributor = _normalize_contributor(row["submitted_by"], row["agent"])
slugs = _extract_claim_slugs(row["description"], row["branch"])
merged_at = row["merged_at"] or "" merged_at = row["merged_at"] or ""
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40} ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15}
ci_earned = ci_map.get(event_type, 0) ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written —
# so the frontend can't produce a 404-ing claim link.
if event_type == "source":
summary_text = _summary_from_branch(row["branch"])
source_slug = (
_summary_from_branch(row["branch"]).lower().replace(" ", "-")
or row["branch"]
)
events.append({
"type": "source",
"claim_slug": "",
"source_slug": source_slug,
"domain": row["domain"] or "unknown",
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"source_channel": row["source_channel"] or "unknown",
})
continue
for slug in slugs: for slug in slugs:
if slug not in claim_activity: if slug not in claim_activity:
claim_activity[slug] = { claim_activity[slug] = {
@ -139,6 +210,7 @@ def _build_events():
"ci_earned": round(ci_earned, 2), "ci_earned": round(ci_earned, 2),
"summary": summary_text, "summary": summary_text,
"pr_number": row["number"], "pr_number": row["number"],
"source_channel": row["source_channel"] or "unknown",
}) })
return events, claim_activity return events, claim_activity
@ -162,8 +234,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True) events.sort(key=hot_key, reverse=True)
elif sort_mode == "important": elif sort_mode == "important":
type_rank = {"challenge": 0, "enrich": 1, "create": 2} type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3}
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"]))) events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"])))
return events return events
@ -173,6 +245,8 @@ async def handle_activity_feed(request):
sort_mode = "recent" sort_mode = "recent"
domain = request.query.get("domain", "") domain = request.query.get("domain", "")
contributor = request.query.get("contributor", "") contributor = request.query.get("contributor", "")
type_param = request.query.get("type", "")
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
try: try:
limit = min(int(request.query.get("limit", "20")), 100) limit = min(int(request.query.get("limit", "20")), 100)
except ValueError: except ValueError:
@ -194,6 +268,8 @@ async def handle_activity_feed(request):
filtered = [e for e in filtered if e["domain"] == domain] filtered = [e for e in filtered if e["domain"] == domain]
if contributor: if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor] filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
filtered = [e for e in filtered if e["type"] in type_filter]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events) total = len(sorted_events)

View file

@ -25,6 +25,7 @@ from aiohttp import web
from review_queue_routes import register_review_queue_routes from review_queue_routes import register_review_queue_routes
from daily_digest_routes import register_daily_digest_routes from daily_digest_routes import register_daily_digest_routes
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
from leaderboard_routes import register_leaderboard_routes, LEADERBOARD_PUBLIC_PATHS
from lib.search import search as kb_search, embed_query, search_qdrant from lib.search import search as kb_search, embed_query, search_qdrant
logger = logging.getLogger("argus") logger = logging.getLogger("argus")
@ -42,7 +43,7 @@ API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secret
# Endpoints that skip auth (dashboard is public for now, can lock later) # Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/prs", "/ops", "/health", "/agents", "/epistemic", "/legacy", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs", _PUBLIC_PATHS = frozenset({"/", "/prs", "/ops", "/health", "/agents", "/epistemic", "/legacy", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest"}) "/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest", "/api/search"})
def _get_db() -> sqlite3.Connection: def _get_db() -> sqlite3.Connection:
@ -508,7 +509,7 @@ def _load_secret(path: Path) -> str | None:
@web.middleware @web.middleware
async def auth_middleware(request, handler): async def auth_middleware(request, handler):
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header.""" """API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"): if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path in LEADERBOARD_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
return await handler(request) return await handler(request)
expected = request.app.get("api_key") expected = request.app.get("api_key")
if not expected: if not expected:
@ -663,38 +664,115 @@ async def handle_api_domains(request):
return web.json_response({"domains": breakdown}) return web.json_response({"domains": breakdown})
async def handle_api_search(request): def _qdrant_hits_to_results(hits, include_expanded=False):
"""GET /api/search — semantic search over claims via Qdrant + graph expansion. """Shape raw Qdrant hits into Ship's chat-API contract."""
results = []
for h in hits:
payload = h.get("payload", {}) or {}
path = payload.get("claim_path", "") or ""
slug = path.rsplit("/", 1)[-1]
if slug.endswith(".md"):
slug = slug[:-3]
results.append({
"slug": slug,
"path": path,
"title": payload.get("claim_title", ""),
"domain": payload.get("domain"),
"confidence": payload.get("confidence"),
"score": round(float(h.get("score", 0.0) or 0.0), 4),
"body_excerpt": payload.get("snippet", "") or "",
})
return results
Query params:
async def handle_api_search(request):
"""Semantic search over claims via Qdrant.
POST contract (Ship's chat API):
body: {"query": str, "limit": int, "min_score": float?, "domain": str?, "confidence": str?, "exclude": [str]?}
response: {"query": str, "results": [{"slug","path","title","domain","confidence","score","body_excerpt"}], "total": int}
GET (legacy + hackathon debug):
q: search query (required) q: search query (required)
domain: filter by domain (optional) limit, domain, confidence, exclude, expand
confidence: filter by confidence level (optional) min_score: if set, bypasses two-pass lib threshold (default lib behavior otherwise)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
""" """
if request.method == "POST":
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON body"}, status=400)
query = (body.get("query") or "").strip()
if not query:
return web.json_response({"error": "query required"}, status=400)
try:
limit = min(int(body.get("limit") or 5), 50)
except (TypeError, ValueError):
return web.json_response({"error": "limit must be int"}, status=400)
try:
min_score = float(body.get("min_score") if body.get("min_score") is not None else 0.25)
except (TypeError, ValueError):
return web.json_response({"error": "min_score must be float"}, status=400)
domain = body.get("domain")
confidence = body.get("confidence")
exclude = body.get("exclude") or None
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
results = _qdrant_hits_to_results(hits)
return web.json_response({"query": query, "results": results, "total": len(results)})
# GET path
query = request.query.get("q", "").strip() query = request.query.get("q", "").strip()
if not query: if not query:
return web.json_response({"error": "q parameter required"}, status=400) return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain") domain = request.query.get("domain")
confidence = request.query.get("confidence") confidence = request.query.get("confidence")
try:
limit = min(int(request.query.get("limit", "10")), 50) limit = min(int(request.query.get("limit", "10")), 50)
except ValueError:
return web.json_response({"error": "limit must be int"}, status=400)
exclude_raw = request.query.get("exclude", "") exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false" expand = request.query.get("expand", "true").lower() != "false"
min_score_raw = request.query.get("min_score")
# Use shared search library (Layer 1 + Layer 2) if min_score_raw is not None:
try:
min_score = float(min_score_raw)
except ValueError:
return web.json_response({"error": "min_score must be float"}, status=400)
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
direct = _qdrant_hits_to_results(hits)
return web.json_response({
"query": query,
"direct_results": direct,
"expanded_results": [],
"total": len(direct),
})
# Default GET: Layer 1 + Layer 2 via lib
result = kb_search(query, expand=expand, result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude) domain=domain, confidence=confidence, exclude=exclude)
if "error" in result: if "error" in result:
error = result["error"] error = result["error"]
if error == "embedding_failed": if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502) return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500) return web.json_response({"error": error}, status=500)
return web.json_response(result) return web.json_response(result)
@ -2268,6 +2346,7 @@ def create_app() -> web.Application:
app.router.add_get("/api/contributors", handle_api_contributors) app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains) app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search) app.router.add_get("/api/search", handle_api_search)
app.router.add_post("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit) app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page) app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage) app.router.add_post("/api/usage", handle_api_usage)
@ -2283,6 +2362,20 @@ def create_app() -> web.Application:
# Response audit - cost tracking + reasoning traces # Response audit - cost tracking + reasoning traces
app["db_path"] = str(DB_PATH) app["db_path"] = str(DB_PATH)
register_response_audit_routes(app) register_response_audit_routes(app)
# Event-sourced leaderboard (Phase B — reads contribution_events directly)
register_leaderboard_routes(app)
# Timeline activity feed (per-PR + audit_log events for dashboard v2)
from activity_endpoint import handle_activity
app.router.add_get("/api/activity", handle_activity)
# Gamification activity feed (hot/recent/important sort)
from activity_feed_api import register as register_activity_feed
register_activity_feed(app)
# Claims browser + detail
from claims_api import register_claims_routes
register_claims_routes(app)
# Contributor profile (handle lookup, leaderboard with action CI)
from contributor_profile_api import register_contributor_routes
register_contributor_routes(app)
app.on_cleanup.append(_cleanup) app.on_cleanup.append(_cleanup)
return app return app

161
diagnostics/claims_api.py Normal file
View file

@ -0,0 +1,161 @@
"""Claims API endpoint — serves claim data from the codex filesystem."""
import os
import re
import time
import yaml
from pathlib import Path
from aiohttp import web
CODEX_ROOT = Path("/opt/teleo-eval/workspaces/main/domains")
_cache = {"data": None, "ts": 0}
CACHE_TTL = 300 # 5 minutes
def _parse_frontmatter(filepath):
try:
text = filepath.read_text(encoding="utf-8")
if not text.startswith("---"):
return None
end = text.index("---", 3)
fm = yaml.safe_load(text[3:end])
if not fm or fm.get("type") != "claim":
return None
body = text[end+3:].strip()
# Count wiki-links
links = re.findall(r"\[\[([^\]]+)\]\]", body)
# Extract first paragraph as summary
paragraphs = [p.strip() for p in body.split("\n\n") if p.strip() and not p.strip().startswith("#")]
summary = paragraphs[0][:300] if paragraphs else ""
return {
"slug": filepath.stem,
"title": fm.get("title", filepath.stem.replace("-", " ")),
"domain": fm.get("domain", "unknown"),
"confidence": fm.get("confidence", "unknown"),
"agent": fm.get("agent"),
"scope": fm.get("scope"),
"created": str(fm.get("created", "")),
"source": fm.get("source", "") if isinstance(fm.get("source"), str) else "",
"sourcer": fm.get("sourcer", ""),
"wiki_link_count": len(links),
"summary": summary,
"challenged_by": fm.get("challenged_by"),
"related_claims": fm.get("related_claims", []),
}
except Exception:
return None
def _load_all_claims():
now = time.time()
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
claims = []
for domain_dir in sorted(CODEX_ROOT.iterdir()):
if not domain_dir.is_dir():
continue
for f in sorted(domain_dir.glob("*.md")):
if f.name == "_map.md":
continue
c = _parse_frontmatter(f)
if c:
claims.append(c)
_cache["data"] = claims
_cache["ts"] = now
return claims
async def handle_claims(request):
claims = _load_all_claims()
# Filters
domain = request.query.get("domain")
search = request.query.get("q", "").lower()
confidence = request.query.get("confidence")
agent = request.query.get("agent")
sort = request.query.get("sort", "recent") # recent, alpha, domain
filtered = claims
if domain:
filtered = [c for c in filtered if c["domain"] == domain]
if confidence:
filtered = [c for c in filtered if c["confidence"] == confidence]
if agent:
filtered = [c for c in filtered if c["agent"] == agent]
if search:
filtered = [c for c in filtered if search in c["title"].lower() or search in c["summary"].lower()]
# Sort
if sort == "recent":
filtered.sort(key=lambda c: c["created"], reverse=True)
elif sort == "alpha":
filtered.sort(key=lambda c: c["title"].lower())
elif sort == "domain":
filtered.sort(key=lambda c: (c["domain"], c["title"].lower()))
# Pagination
limit = min(int(request.query.get("limit", "50")), 200)
offset = int(request.query.get("offset", "0"))
page = filtered[offset:offset+limit]
# Domain counts for sidebar
domain_counts = {}
for c in claims:
domain_counts[c["domain"]] = domain_counts.get(c["domain"], 0) + 1
return web.json_response({
"claims": page,
"total": len(filtered),
"offset": offset,
"limit": limit,
"domains": dict(sorted(domain_counts.items(), key=lambda x: -x[1])),
"confidence_levels": sorted(set(c["confidence"] for c in claims)),
"agents": sorted(set(c["agent"] for c in claims if c["agent"])),
}, headers={"Access-Control-Allow-Origin": "*"})
async def handle_claim_detail(request):
slug = request.match_info["slug"]
claims = _load_all_claims()
for c in claims:
if c["slug"] == slug:
# Read full body for detail view
for domain_dir in CODEX_ROOT.iterdir():
if not domain_dir.is_dir():
continue
f = domain_dir / f"{slug}.md"
if f.exists():
text = f.read_text(encoding="utf-8")
end = text.index("---", 3)
body = text[end+3:].strip()
c["body"] = body
break
return web.json_response(c, headers={"Access-Control-Allow-Origin": "*"})
return web.json_response({"error": "claim not found"}, status=404)
async def handle_domains(request):
claims = _load_all_claims()
domains = {}
for c in claims:
d = c["domain"]
if d not in domains:
domains[d] = {"name": d, "count": 0, "agents": set(), "confidence_dist": {}}
domains[d]["count"] += 1
if c["agent"]:
domains[d]["agents"].add(c["agent"])
conf = c["confidence"]
domains[d]["confidence_dist"][conf] = domains[d]["confidence_dist"].get(conf, 0) + 1
result = []
for d in sorted(domains.values(), key=lambda x: -x["count"]):
d["agents"] = sorted(d["agents"])
result.append(d)
return web.json_response(result, headers={"Access-Control-Allow-Origin": "*"})
def register_claims_routes(app):
app.router.add_get("/api/claims", handle_claims)
app.router.add_get("/api/claims/{slug}", handle_claim_detail)
app.router.add_get("/api/domains", handle_domains)

View file

@ -0,0 +1,166 @@
"""Leaderboard endpoint reading from event-sourced contribution_events.
Owner: Argus
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
joins contributors for kind, returns sorted leaderboard with role breakdown.
Roles + weights (Phase A):
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
Endpoints:
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
"""
import logging
import re
import sqlite3
from aiohttp import web
logger = logging.getLogger("argus.leaderboard_routes")
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
KIND_VALUES = ("person", "agent", "org", "all")
# Public path set so auth middleware lets it through
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def _parse_window(raw):
"""Parse window param. Returns (sql_clause, params_tuple, label).
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
Caps N at 365d / 8760h to prevent abuse.
"""
if not raw or raw == "all_time":
return ("", (), "all_time")
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
if not m:
return ("", (), "all_time")
n = int(m.group(1))
unit = m.group(2)
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
if unit == "d":
n = min(n, 365)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
n = min(n, 8760)
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
async def handle_leaderboard(request):
"""GET /api/leaderboard.
Query params:
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
domain: filter by domain (optional)
kind: 'person' (default) | 'agent' | 'org' | 'all'
limit: max entries (default 100, max 500)
"""
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
domain = request.query.get("domain")
kind = request.query.get("kind", "person")
if kind not in KIND_VALUES:
kind = "person"
try:
limit = min(int(request.query.get("limit", "100")), 500)
except (ValueError, TypeError):
limit = 100
where = ["1=1", window_clause] if window_clause else ["1=1"]
params = list(window_params)
if domain:
where.append("ce.domain = ?")
params.append(domain)
if kind != "all":
where.append("COALESCE(c.kind, 'person') = ?")
params.append(kind)
where_sql = " AND ".join([w for w in where if w])
conn = _conn(request.app)
try:
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
# LEFT JOIN contributors so handles in events but not in contributors still appear
# (defaults to kind='person' via COALESCE).
rows = conn.execute(f"""
SELECT
ce.handle,
COALESCE(c.kind, 'person') AS kind,
ROUND(SUM(ce.weight), 4) AS ci,
COUNT(*) AS events_count,
MIN(ce.timestamp) AS first_contribution,
MAX(ce.timestamp) AS last_contribution,
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
COUNT(DISTINCT ce.domain) AS domain_count,
COUNT(DISTINCT ce.pr_number) AS pr_count
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
GROUP BY ce.handle, COALESCE(c.kind, 'person')
ORDER BY ci DESC, last_contribution DESC
LIMIT ?
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
has_more = len(rows) > limit
rows = rows[:limit]
# Total count of distinct handles matching filters (without limit)
total_row = conn.execute(f"""
SELECT COUNT(DISTINCT ce.handle) AS total
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
""", params).fetchone()
total = total_row["total"] if total_row else 0
leaderboard = []
for r in rows:
leaderboard.append({
"handle": r["handle"],
"kind": r["kind"],
"ci": r["ci"],
"ci_breakdown": {
"author": round(r["ci_author"] or 0, 4),
"challenger": round(r["ci_challenger"] or 0, 4),
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
"originator": round(r["ci_originator"] or 0, 4),
"evaluator": round(r["ci_evaluator"] or 0, 4),
},
"events_count": r["events_count"],
"domain_count": r["domain_count"],
"pr_count": r["pr_count"],
"first_contribution": r["first_contribution"],
"last_contribution": r["last_contribution"],
})
return web.json_response({
"window": window_label,
"domain": domain,
"kind_filter": kind,
"total": total,
"shown": len(leaderboard),
"has_more": has_more,
"source": "contribution_events", # explicit so consumers know the data origin
"leaderboard": leaderboard,
})
finally:
conn.close()
def register_leaderboard_routes(app: web.Application):
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
app.router.add_get("/api/leaderboard", handle_leaderboard)

View file

@ -15,12 +15,130 @@ Epimetheus owns this module. Leo reviews changes.
import logging import logging
import re import re
import sqlite3
from pathlib import Path from pathlib import Path
logger = logging.getLogger("pipeline.attribution") logger = logging.getLogger("pipeline.attribution")
VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"}) VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"})
# Agent-owned branch prefixes — PRs from these branches get Pentagon-Agent trailer
# credit for challenger/synthesizer roles. Pipeline-infra branches (extract/ reweave/
# fix/ ingestion/) are deliberately excluded: they're automation, not contribution.
# Single source of truth; imported by contributor.py and backfill-events.py.
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
# Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches
# GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy"
# or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene
# bugs produce. Apply at parse time so bad handles never reach the contributors table.
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
def _valid_handle(handle: str) -> bool:
"""Return True if handle matches the handle format (alphanum + _-, ≤39 chars)."""
if not handle or not isinstance(handle, str):
return False
h = handle.strip().lower().lstrip("@")
if h.endswith("-") or h.endswith("_"):
return False
return bool(_HANDLE_RE.match(h))
def _filter_valid_handles(result: dict) -> dict:
"""Drop entries with invalid handles from a parsed attribution dict."""
filtered: dict[str, list[dict]] = {role: [] for role in VALID_ROLES}
for role, entries in result.items():
for entry in entries:
if _valid_handle(entry.get("handle", "")):
filtered[role].append(entry)
return filtered
# ─── Handle normalization + kind classification (schema v24) ──────────────
# Known Pentagon agents. Used to classify contributor kind='agent' so the
# leaderboard can filter them out of the default person view.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*)
})
def normalize_handle(handle: str, conn=None) -> str:
"""Canonicalize a handle: lowercase, strip @, resolve alias if conn provided.
Examples:
'@thesensatore' 'thesensatore'
'Cameron' 'cameron' 'cameron-s1' (via alias if seeded)
'CNBC' 'cnbc'
Always lowercases and strips @ prefix. Alias resolution requires a conn
argument (not always available at parse time; merge-time writer passes it).
"""
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
h = re.sub(r"\s*\(self-directed\)\s*$", "", h)
if conn is None:
return h
try:
row = conn.execute(
"SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,),
).fetchone()
if row:
return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0]
except Exception:
# Alias table might not exist yet on pre-v24 DBs — degrade gracefully.
logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True)
return h
def classify_kind(handle: str) -> str:
"""Return 'agent' for known Pentagon agents, 'person' otherwise.
The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review,
not inferred here. Keeping heuristics narrow: we know our own agents;
everything else defaults to person until explicitly classified.
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return "agent"
return "person"
def is_publisher_handle(handle: str, conn) -> int | None:
"""Return publisher.id if the handle exists as a publisher name, else None.
Schema v26 split orgs/citations into the publishers table. Writer code
(upsert_contributor, insert_contribution_event) calls this to gate creating
contributor rows or events for handles that belong to publishers.
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
re-create CNBC as a contributor and undo the v26 classifier cleanup.
Falls back gracefully on pre-v26 DBs: returns None if publishers table
doesn't exist yet (writer behaves like before, no regression).
"""
if not handle or conn is None:
return None
h = handle.strip().lower().lstrip("@")
try:
row = conn.execute(
"SELECT id FROM publishers WHERE name = ?", (h,),
).fetchone()
if row:
return row["id"] if hasattr(row, "keys") else row[0]
except sqlite3.OperationalError:
# Pre-v26 DB: publishers table doesn't exist yet. Fall through to None
# so writer behaves as before. Any other exception class is real signal
# (programming error, lock contention, corruption) — let it propagate.
logger.debug("is_publisher_handle: publishers table not present (pre-v26?)", exc_info=True)
return None
# ─── Parse attribution from claim content ────────────────────────────────── # ─── Parse attribution from claim content ──────────────────────────────────
@ -51,7 +169,11 @@ def parse_attribution(fm: dict) -> dict[str, list[dict]]:
elif isinstance(entries, str): elif isinstance(entries, str):
# Single entry as string # Single entry as string
result[role].append({"handle": entries.strip().lower().lstrip("@"), "agent_id": None, "context": None}) result[role].append({"handle": entries.strip().lower().lstrip("@"), "agent_id": None, "context": None})
return result # Fall through to the filter at the end (don't early-return). The nested
# block path was skipping the handle sanity filter, letting garbage like
# "senator-elissa-slotkin-/-the-hill" through when it was written into
# frontmatter during the legacy-fallback era.
return _filter_valid_handles(result)
# Flat format fallback (attribution_sourcer, attribution_extractor, etc.) # Flat format fallback (attribution_sourcer, attribution_extractor, etc.)
for role in VALID_ROLES: for role in VALID_ROLES:
@ -64,22 +186,40 @@ def parse_attribution(fm: dict) -> dict[str, list[dict]]:
if isinstance(v, str): if isinstance(v, str):
result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None}) result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None})
# Legacy fallback: infer from source field # Bare-key flat format: `sourcer: alexastrum`, `extractor: leo`, etc.
if not any(result[r] for r in VALID_ROLES): # This is what extract.py writes (line 290: f'sourcer: "{sourcer}"') — the most
source = fm.get("source", "") # common format in practice (~42% of claim files). The Apr 24 incident traced
if isinstance(source, str) and source: # missing leaderboard entries to this format being silently dropped because the
# Try to extract author handle from source string # parser only checked the `attribution_*` prefix.
# Patterns: "@handle", "Author Name", "org, description" # Only fill if the role wasn't already populated by the prefixed form, to avoid
handle_match = re.search(r"@(\w+)", source) # double-counting when both formats coexist on the same claim.
if handle_match: for role in VALID_ROLES:
result["sourcer"].append({"handle": handle_match.group(1).lower(), "agent_id": None, "context": source}) if result[role]:
else: continue
# Use first word/phrase before comma as sourcer handle bare_val = fm.get(role)
author = source.split(",")[0].strip().lower().replace(" ", "-") if isinstance(bare_val, str) and bare_val.strip():
if author and len(author) > 1: result[role].append({"handle": bare_val.strip().lower().lstrip("@"), "agent_id": None, "context": None})
result["sourcer"].append({"handle": author, "agent_id": None, "context": source}) elif isinstance(bare_val, list):
for v in bare_val:
if isinstance(v, str) and v.strip():
result[role].append({"handle": v.strip().lower().lstrip("@"), "agent_id": None, "context": None})
elif isinstance(v, dict) and v.get("handle"):
result[role].append({
"handle": v["handle"].strip().lower().lstrip("@"),
"agent_id": v.get("agent_id"),
"context": v.get("context"),
})
return result # Legacy `source` heuristic REMOVED (Ganymede review, Apr 24). It fabricated
# handles from descriptive source strings — "governance---meritocratic-voting-+-
# futarchy", "cameron-(contributor)", "sec-interpretive-release-s7-2026-09-
# (march-17". Hit rate on real handles was near-zero, false-positive rate was
# high. Claims without explicit attribution now return empty (better surface as
# data hygiene than invent fake contributors).
# Filter to valid handles only. Bad handles (garbage from upstream frontmatter
# bugs) get dropped rather than written to the contributors table.
return _filter_valid_handles(result)
def parse_attribution_from_file(filepath: str) -> dict[str, list[dict]]: def parse_attribution_from_file(filepath: str) -> dict[str, list[dict]]:

View file

@ -84,6 +84,14 @@ MAX_EXTRACT_WORKERS = int(os.environ.get("MAX_EXTRACT_WORKERS", "5"))
MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7")) MAX_EVAL_WORKERS = int(os.environ.get("MAX_EVAL_WORKERS", "7"))
MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain MAX_MERGE_WORKERS = 1 # domain-serialized, but one merge at a time per domain
# --- External GitHub PR merge strategy ---
# When True, gh-pr-N/* branches merge with --no-ff (preserves contributor SHA in
# main's history → GitHub recognizes "merged" badge). When False, fall back to
# cherry-pick (the default for all other branches). Default True; flip to False
# as an emergency backout if the no-ff path destabilizes merge throughput.
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
EXTERNAL_PR_NO_FF_MERGE = True
# --- Timeouts (seconds) --- # --- Timeouts (seconds) ---
EXTRACT_TIMEOUT = 600 # 10 min EXTRACT_TIMEOUT = 600 # 10 min
EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls) EVAL_TIMEOUT = 120 # 2 min — routine Sonnet/Gemini Flash calls (was 600, caused 10-min stalls)

View file

@ -5,6 +5,7 @@ Extracted from merge.py (Phase 5 decomposition). Functions:
- refine_commit_type: extract challenge/enrich refinement from diff content - refine_commit_type: extract challenge/enrich refinement from diff content
- record_contributor_attribution: parse trailers + frontmatter, upsert contributors - record_contributor_attribution: parse trailers + frontmatter, upsert contributors
- upsert_contributor: insert/update contributor record with role counts - upsert_contributor: insert/update contributor record with role counts
- insert_contribution_event: event-sourced credit log (schema v24)
- recalculate_tier: tier promotion based on config rules - recalculate_tier: tier promotion based on config rules
""" """
@ -13,11 +14,75 @@ import logging
import re import re
from . import config, db from . import config, db
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
from .forgejo import get_pr_diff from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor") logger = logging.getLogger("pipeline.contributor")
# ─── Event schema (v24) ───────────────────────────────────────────────────
# Role → CI weight, per Cory's confirmed schema (Apr 24 conversation).
# Humans-are-always-author rule: agents never accumulate author credit;
# evaluator (0.05) is the only agent-facing role. Internal agents still earn
# author/challenger/synthesizer on their own autonomous research PRs but
# surface in the kind='agent' leaderboard, not the default person view.
ROLE_WEIGHTS = {
"author": 0.30,
"challenger": 0.25,
"synthesizer": 0.20,
"originator": 0.15,
"evaluator": 0.05,
}
def insert_contribution_event(
conn,
handle: str,
role: str,
pr_number: int,
*,
claim_path: str | None = None,
domain: str | None = None,
channel: str | None = None,
timestamp: str | None = None,
) -> bool:
"""Emit a contribution_events row. Idempotent via UNIQUE constraint.
Returns True if the event was inserted, False if the constraint blocked it
(same handle/role/pr/claim_path combo already recorded safe to replay).
Canonicalizes handle via alias table. Classifies kind from handle.
Falls back silently if contribution_events table doesn't exist yet (pre-v24).
"""
if role not in ROLE_WEIGHTS:
logger.warning("insert_contribution_event: unknown role %r", role)
return False
weight = ROLE_WEIGHTS[role]
canonical = normalize_handle(handle, conn=conn)
if not canonical:
return False
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
# etc.) are provenance metadata, not contributors. Don't credit them. Without
# this gate every merge re-creates org events and undoes the v26 cleanup.
if is_publisher_handle(canonical, conn) is not None:
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
return False
kind = classify_kind(canonical)
try:
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
)
return cur.rowcount > 0
except Exception:
logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r",
pr_number, canonical, role, exc_info=True)
return False
def is_knowledge_pr(diff: str) -> bool: def is_knowledge_pr(diff: str) -> bool:
"""Check if a PR touches knowledge files (claims, decisions, core, foundations). """Check if a PR touches knowledge files (claims, decisions, core, foundations).
@ -125,15 +190,98 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
return return
# Refine commit_type from diff content (branch prefix may be too broad) # Refine commit_type from diff content (branch prefix may be too broad)
row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone() row = conn.execute(
"SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, "
"domain_verdict, domain_agent, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
branch_type = row["commit_type"] if row and row["commit_type"] else "extract" branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
refined_type = refine_commit_type(diff, branch_type) refined_type = refine_commit_type(diff, branch_type)
if refined_type != branch_type: if refined_type != branch_type:
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number)) conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
logger.info("PR #%d: commit_type refined %s%s", pr_number, branch_type, refined_type) logger.info("PR #%d: commit_type refined %s%s", pr_number, branch_type, refined_type)
# Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites.
pr_domain = row["domain"] if row else None
pr_channel = row["source_channel"] if row else None
pr_submitted_by = row["submitted_by"] if row else None
# Use the PR's merged_at timestamp so event time matches the actual merge.
# If a merge retries after a crash, this keeps forward-emitted and backfilled
# events on the same timeline. Falls back to datetime('now') in the writer.
pr_merged_at = row["merged_at"] if row and row["merged_at"] else None
# ── AUTHOR event (schema v24, double-write) ──
# Humans-are-always-author rule: the human in the loop gets author credit.
# Precedence: prs.submitted_by (set by extract.py from source proposed_by, or
# by discover for human PRs) → git author of first commit → branch-prefix agent.
# Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get
# author events from branch prefix; extract/ PRs carry submitted_by from the
# source's proposed_by field so the human who submitted gets credit via path 1.
author_candidate: str | None = None
if pr_submitted_by:
author_candidate = pr_submitted_by
else:
# External GitHub PRs: git author of the FIRST commit on the branch is
# the real submitter. `git log -1` would return the latest commit, which
# mis-credits multi-commit PRs where a reviewer rebased or force-pushed.
# Take the last line of the unreversed log (= oldest commit, since git
# log defaults to reverse-chronological). Ganymede review, Apr 24.
rc_author_log, author_log = await git_fn(
"log", f"origin/main..origin/{branch}", "--no-merges",
"--format=%an", timeout=5,
)
if rc_author_log == 0 and author_log.strip():
lines = [line for line in author_log.strip().split("\n") if line.strip()]
if lines:
candidate = lines[-1].strip().lower()
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions"}:
author_candidate = candidate
# Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc.
if not author_candidate and branch.startswith(AGENT_BRANCH_PREFIXES):
# Autonomous agent PR (theseus/research-*, leo/entity-*, etc.) —
# credit goes to the agent as author per Cory's directive.
author_candidate = branch.split("/", 1)[0]
if author_candidate:
insert_contribution_event(
conn, author_candidate, "author", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# ── EVALUATOR events (schema v24) ──
# Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second
# reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when
# verdict is 'request_changes' — failed review isn't contribution credit.
if row:
if row["leo_verdict"] == "approve":
insert_contribution_event(
conn, "leo", "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
if row["domain_verdict"] == "approve" and row["domain_agent"]:
dagent = row["domain_agent"].strip().lower()
if dagent and dagent != "leo": # don't double-credit leo
insert_contribution_event(
conn, dagent, "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Parse Pentagon-Agent trailer from branch commit messages # Parse Pentagon-Agent trailer from branch commit messages
agents_found: set[str] = set() agents_found: set[str] = set()
# Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent
# challenger/synthesizer credit based on refined commit_type. Pipeline-owned
# branches (extract/*, reweave/*, etc.) don't — those are infra, not work.
is_agent_branch = branch.startswith(AGENT_BRANCH_PREFIXES)
_TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"research": "synthesizer",
"reweave": "synthesizer",
}
rc, log_output = await git_fn( rc, log_output = await git_fn(
"log", f"origin/main..origin/{branch}", "--format=%b%n%N", "log", f"origin/main..origin/{branch}", "--format=%b%n%N",
timeout=10, timeout=10,
@ -146,29 +294,77 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
upsert_contributor( upsert_contributor(
conn, agent_name, agent_uuid, role, today, conn, agent_name, agent_uuid, role, today,
) )
# Event-emit only for agent-owned branches where the trailer's agent
# actually did the substantive work (challenger/synthesizer).
event_role = _TRAILER_EVENT_ROLE.get(refined_type)
if is_agent_branch and event_role:
insert_contribution_event(
conn, agent_name, event_role, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(agent_name) agents_found.add(agent_name)
# Parse attribution blocks from claim frontmatter in diff # Parse attribution from NEWLY ADDED knowledge files via the canonical attribution
# Look for added lines with attribution YAML # parser (lib/attribution.py). The previous diff-line regex parser dropped
current_role = None # both the bare-key flat format (`sourcer: alexastrum`) and the nested
for line in diff.split("\n"): # `attribution:` block format because it only matched `- handle: "X"` lines.
if not line.startswith("+") or line.startswith("+++"): # The Apr 24 incident traced missing leaderboard entries (alexastrum=0,
# thesensatore=0, cameron-s1=0) directly to this parser's blind spots.
#
# --diff-filter=A restricts to added files only (Ganymede review): enrich and
# challenge PRs modify existing claims, and re-crediting the existing sourcer on
# every modification would inflate counts. The synthesizer/challenger/reviewer
# roles for those PRs are credited via the Pentagon-Agent trailer path above.
rc_files, files_output = await git_fn(
"diff", "--name-only", "--diff-filter=A",
f"origin/main...origin/{branch}", timeout=10,
)
if rc_files == 0 and files_output:
from pathlib import Path
from . import config
from .attribution import parse_attribution_from_file
main_root = Path(config.MAIN_WORKTREE)
# Match is_knowledge_pr's gate exactly. Entities/convictions are excluded
# here because is_knowledge_pr skips entity-only PRs at line 123 — so a
# broader list here only matters for mixed PRs where the narrower list
# already matches via the claim file. Widening requires Cory sign-off
# since it would change leaderboard accounting (entity-only PRs → CI credit).
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None
for rel_path in files_output.strip().split("\n"):
rel_path = rel_path.strip()
if not rel_path.endswith(".md"):
continue continue
stripped = line[1:].strip() if not rel_path.startswith(knowledge_prefixes):
continue
# Detect role sections in attribution block full = main_root / rel_path
for role in ("sourcer", "extractor", "challenger", "synthesizer", "reviewer"): if not full.exists():
if stripped.startswith(f"{role}:"): continue # file removed in this PR
current_role = role attribution = parse_attribution_from_file(str(full))
break for role, entries in attribution.items():
for entry in entries:
# Extract handle from attribution entries handle = entry.get("handle")
handle_match = re.match(r'-\s*handle:\s*["\']?([^"\']+)["\']?', stripped) if handle:
if handle_match and current_role: upsert_contributor(
handle = handle_match.group(1).strip().lower() conn, handle, entry.get("agent_id"), role, today,
agent_id_match = re.search(r'agent_id:\s*["\']?([^"\']+)', stripped) )
agent_id = agent_id_match.group(1).strip() if agent_id_match else None # Event-emit: only 'sourcer' frontmatter entries become
upsert_contributor(conn, handle, agent_id, current_role, today) # originator events. 'extractor' frontmatter = infrastructure
# (the Sonnet extraction agent), no event. challenger/
# synthesizer frontmatter is extremely rare at extract time.
# Skip originator if same as author — avoids double-credit
# when someone submits their own content (self-authored).
if role == "sourcer":
origin_canonical = normalize_handle(handle, conn=conn)
if origin_canonical and origin_canonical != author_canonical:
insert_contribution_event(
conn, handle, "originator", pr_number,
claim_path=rel_path,
domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Fallback: if no Pentagon-Agent trailer found, try git commit authors # Fallback: if no Pentagon-Agent trailer found, try git commit authors
_BOT_AUTHORS = frozenset({ _BOT_AUTHORS = frozenset({
@ -186,13 +382,35 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
if author_name and author_name not in _BOT_AUTHORS: if author_name and author_name not in _BOT_AUTHORS:
role = commit_type_to_role(refined_type) role = commit_type_to_role(refined_type)
upsert_contributor(conn, author_name, None, role, today) upsert_contributor(conn, author_name, None, role, today)
# Event-model parity: emit challenger/synthesizer event when
# the fallback credits a human/agent for that kind of work.
# Without this, external-contributor challenge/enrich PRs
# accumulate legacy counts but disappear from event-sourced
# leaderboards when Phase B cuts over. (Ganymede review.)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, author_name, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(author_name) agents_found.add(author_name)
if not agents_found: if not agents_found:
row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone() fb_row = conn.execute(
if row and row["agent"] and row["agent"] != "external": "SELECT agent FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if fb_row and fb_row["agent"] and fb_row["agent"] != "external":
pr_agent = fb_row["agent"].lower()
role = commit_type_to_role(refined_type) role = commit_type_to_role(refined_type)
upsert_contributor(conn, row["agent"].lower(), None, role, today) upsert_contributor(conn, pr_agent, None, role, today)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, pr_agent, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
def upsert_contributor( def upsert_contributor(
@ -207,6 +425,21 @@ def upsert_contributor(
logger.warning("Unknown contributor role: %s", role) logger.warning("Unknown contributor role: %s", role)
return return
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
# Skip without writing so the v26 classifier cleanup isn't undone by every
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
#
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
# below — both look up by canonical-form-as-stored. Today's classifier produces
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
# normalize alias→canonical at writer entry points (extract.py, post_extract);
# at that point this gate auto-tightens because callers pass canonical handles.
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
return
existing = conn.execute( existing = conn.execute(
"SELECT handle FROM contributors WHERE handle = ?", (handle,) "SELECT handle FROM contributors WHERE handle = ?", (handle,)
).fetchone() ).fetchone()

228
lib/db.py
View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db") logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 23 SCHEMA_VERSION = 26
SCHEMA_SQL = """ SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version ( CREATE TABLE IF NOT EXISTS schema_version (
@ -35,6 +35,15 @@ CREATE TABLE IF NOT EXISTS sources (
feedback TEXT, feedback TEXT,
-- eval feedback for re-extraction (JSON) -- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0, cost_usd REAL DEFAULT 0,
-- v26: provenance publisher (news org / venue) + content author.
-- publisher_id references publishers(id) when source is from a known org.
-- original_author_handle references contributors(handle) when author is in our system.
-- original_author is free-text fallback ("Kim et al.", "Robin Hanson") not credit-bearing.
publisher_id INTEGER REFERENCES publishers(id),
content_type TEXT,
-- article | paper | tweet | conversation | self_authored | webpage | podcast
original_author TEXT,
original_author_handle TEXT REFERENCES contributors(handle),
created_at TEXT DEFAULT (datetime('now')), created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')) updated_at TEXT DEFAULT (datetime('now'))
); );
@ -163,6 +172,77 @@ CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp); CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent); CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp); CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
-- Event-sourced contributions (schema v24).
-- One row per credit-earning event. Idempotent via two partial UNIQUE indexes
-- (SQLite treats NULL != NULL in UNIQUE constraints, so a single composite
-- UNIQUE with nullable claim_path would allow evaluator-event duplicates).
-- Leaderboards are SQL aggregations over this table; contributors becomes a materialized cache.
CREATE TABLE IF NOT EXISTS contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
-- person | org | agent
role TEXT NOT NULL,
-- author | originator | challenger | synthesizer | evaluator
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
-- NULL for PR-level events (e.g. evaluator). Set for per-claim events.
domain TEXT,
channel TEXT,
-- telegram | github | agent | web | unknown
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Per-claim events: unique on (handle, role, pr_number, claim_path) when path IS NOT NULL.
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
-- PR-level events (evaluator, author, trailer-based): unique on (handle, role, pr_number) when path IS NULL.
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
-- Handle aliasing. @thesensatore thesensatore. cameron cameron-s1.
-- Writers call resolve_alias(handle) before inserting events or upserting contributors.
CREATE TABLE IF NOT EXISTS contributor_aliases (
alias TEXT PRIMARY KEY,
canonical TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
-- Publishers: news orgs, academic venues, social platforms. NOT contributors these
-- provide metadata/provenance for sources, never earn leaderboard credit. Separating
-- these from contributors prevents CNBC/SpaceNews from dominating the leaderboard.
-- (Apr 24 Cory directive: "only credit the original source if its on X or tg")
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
-- Multi-platform identity: one contributor, many handles. Enables the leaderboard to
-- unify @thesensatore (X) + thesensatore (TG) + thesensatore@github into one person.
-- Writers check this table after resolving aliases to find canonical contributor handle.
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
""" """
@ -232,9 +312,20 @@ def classify_branch(branch: str) -> tuple[str, str]:
# Keep in sync with BRANCH_PREFIX_MAP above. # Keep in sync with BRANCH_PREFIX_MAP above.
#
# Valid source_channel values: github | telegram | agent | maintenance | web | unknown
# - github: external contributor PR (set via sync-mirror.sh github_pr linking,
# or from gh-pr-* branches, or any time github_pr is provided)
# - telegram: message captured by telegram bot (must be tagged explicitly by
# ingestion — extract/* default is "unknown" because the bare branch prefix
# can no longer distinguish telegram-origin from github-origin extractions)
# - agent: per-agent research branches (rio/, theseus/, etc.)
# - maintenance: pipeline housekeeping (reweave/, epimetheus/, fix/)
# - web: future in-app submissions (chat UI or form posts)
# - unknown: fallback when provenance cannot be determined
_CHANNEL_MAP = { _CHANNEL_MAP = {
"extract": "telegram", "extract": "unknown",
"ingestion": "telegram", "ingestion": "unknown",
"rio": "agent", "rio": "agent",
"theseus": "agent", "theseus": "agent",
"astra": "agent", "astra": "agent",
@ -249,7 +340,12 @@ _CHANNEL_MAP = {
def classify_source_channel(branch: str, *, github_pr: int = None) -> str: def classify_source_channel(branch: str, *, github_pr: int = None) -> str:
"""Derive source_channel from branch prefix and github_pr flag.""" """Derive source_channel from branch prefix and github_pr flag.
Precedence: github_pr flag > gh-pr- branch prefix > _CHANNEL_MAP lookup.
extract/* defaults to "unknown" callers with better provenance (telegram
bot, web submission handler) must override at PR-insert time.
"""
if github_pr is not None or branch.startswith("gh-pr-"): if github_pr is not None or branch.startswith("gh-pr-"):
return "github" return "github"
prefix = branch.split("/", 1)[0] if "/" in branch else branch prefix = branch.split("/", 1)[0] if "/" in branch else branch
@ -625,6 +721,130 @@ def migrate(conn: sqlite3.Connection):
conn.commit() conn.commit()
logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup") logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup")
if current < 24:
# Event-sourced contributions table + alias table + kind column on contributors.
# Non-breaking: contributors table stays; events are written in addition via
# double-write in merge.py. Leaderboards switch to events in Phase B.
conn.executescript("""
CREATE TABLE IF NOT EXISTS contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Partial unique indexes handle SQLite's NULL != NULL UNIQUE semantics.
-- Per-claim events dedup on 4-tuple; PR-level events dedup on 3-tuple.
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
CREATE TABLE IF NOT EXISTS contributor_aliases (
alias TEXT PRIMARY KEY,
canonical TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
""")
try:
conn.execute("ALTER TABLE contributors ADD COLUMN kind TEXT DEFAULT 'person'")
except sqlite3.OperationalError:
pass # column already exists
# Seed known aliases. @thesensatore → thesensatore catches the zombie row Argus flagged.
# cameron → cameron-s1 reconciles the Leo-flagged missing contributor.
conn.executemany(
"INSERT OR IGNORE INTO contributor_aliases (alias, canonical) VALUES (?, ?)",
[
("@thesensatore", "thesensatore"),
("cameron", "cameron-s1"),
],
)
# Seed kind='agent' for known Pentagon agents so the events writer picks it up.
# Must stay in sync with lib/attribution.PENTAGON_AGENTS — drift causes
# contributors.kind to disagree with classify_kind() output for future
# inserts. (Ganymede review: "pipeline" was missing until Apr 24.)
pentagon_agents = [
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
]
for agent in pentagon_agents:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(agent,),
)
conn.commit()
logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column")
if current < 25:
# v24 seeded 13 Pentagon agents but missed "pipeline" — classify_kind()
# treats it as agent so contributors.kind drifted from event-insert output.
# Idempotent corrective UPDATE: fresh installs have no "pipeline" row
# (no-op), upgraded envs flip it if it exists. (Ganymede review Apr 24.)
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = 'pipeline'"
)
conn.commit()
logger.info("Migration v25: patched kind='agent' for pipeline handle")
if current < 26:
# Add publishers + contributor_identities. Non-breaking — new tables only.
# No existing data moved. Classification into publishers happens via a
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
# seed list. CHECK constraint on contributors.kind deferred to v27 after
# classification completes. (Apr 24 Cory directive: "fix schema, don't
# filter output" — separate contributors from publishers at the data layer.)
conn.executescript("""
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
""")
# Extend sources with provenance columns. ALTER TABLE ADD COLUMN is
# idempotent-safe via try/except because SQLite doesn't support IF NOT EXISTS
# on column adds.
for col_sql in (
"ALTER TABLE sources ADD COLUMN publisher_id INTEGER REFERENCES publishers(id)",
"ALTER TABLE sources ADD COLUMN content_type TEXT",
"ALTER TABLE sources ADD COLUMN original_author TEXT",
"ALTER TABLE sources ADD COLUMN original_author_handle TEXT REFERENCES contributors(handle)",
):
try:
conn.execute(col_sql)
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
raise
conn.commit()
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
if current < SCHEMA_VERSION: if current < SCHEMA_VERSION:
conn.execute( conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)", "INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -429,6 +429,171 @@ async def _cherry_pick_onto_main(branch: str) -> tuple[bool, str]:
await _git("branch", "-D", clean_branch) await _git("branch", "-D", clean_branch)
_GH_PR_BRANCH_RE = re.compile(r"^gh-pr-(\d+)/(.+)$")
async def _merge_no_ff_external(branch: str) -> tuple[bool, str]:
"""Merge an external GitHub fork PR with --no-ff so contributor SHA lands in main.
Why this differs from _cherry_pick_onto_main:
- Cherry-pick rewrites the contributor's commit SHA → GitHub's "is PR head SHA
an ancestor of main?" check returns false → "merged" badge never fires.
- --no-ff preserves the contributor's commit SHA as a parent of the merge
commit. After ff-push to main (the existing dispatch step), GitHub sees
the SHA in ancestry and marks the PR merged.
Mechanics:
1. Fetch origin/main + origin/{branch}
2. Worktree on local branch _merged-{slug} from origin/main
3. git merge --no-ff origin/{branch} with verbose message:
"Merge external GitHub PR #{N}: {branch_slug}"
4. Push merge commit to origin/_merged/{branch} (synthetic audit ref)
5. ff-push merge_sha origin/main directly (function owns the push, NOT
dispatch see sentinel return below)
The merge commit M has parents [main_sha, branch_sha]. M is a fast-forward
descendant of main_sha (via first-parent chain), so the push to main
works without --force.
Synthetic branch (Ship review Apr 28): we deliberately do NOT force-push
the contributor's gh-pr-N/* branch. Force-pushing it would rewrite the
branch tip with a merge commit the contributor didn't author, showing as
a confusing bot force-push in Forgejo's PR UI. The synthetic _merged/*
audit ref lets us track the merge commit without touching the contributor's
branch. Mirrors the _clean/* synthetic branch pattern in cherry-pick.
Sentinel return: function pushes merge_sha main itself (dispatch's ff-push
can't, since origin/{branch} is unchanged and not a descendant of main).
Returns a "merged --no-ff" sentinel string that dispatch detects to skip
its ff-push step and route directly to PR-close + mark_merged + audit.
The full 40-char merge SHA is in the return string for dispatch to extract.
Conflict handling: same auto-resolve pattern as cherry-pick entity-only
conflicts take main's version (--ours = current worktree HEAD = main),
other conflicts abort and return False with detail.
Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
"""
m = _GH_PR_BRANCH_RE.match(branch)
if not m:
return False, f"branch {branch} doesn't match gh-pr-N/* format"
gh_pr_num = m.group(1)
branch_slug = m.group(2)
slug = branch.replace("/", "-")
worktree_path = f"/tmp/teleo-merge-{slug}"
local_branch = f"_merged-{slug}" # local working branch in worktree
audit_ref = f"_merged/{branch}" # remote synthetic ref (preserves hierarchy)
# Fetch latest state — separate calls (long branch names break combined refspec)
rc, out = await _git("fetch", "origin", "main", timeout=15)
if rc != 0:
return False, f"fetch main failed: {out}"
rc, out = await _git("fetch", "origin", branch, timeout=15)
if rc != 0:
return False, f"fetch branch failed: {out}"
# Up-to-date check (mirrors cherry-pick path semantics)
rc, merge_base = await _git("merge-base", "origin/main", f"origin/{branch}")
rc2, main_sha = await _git("rev-parse", "origin/main")
if rc == 0 and rc2 == 0 and merge_base.strip() == main_sha.strip():
rc_diff, diff_out = await _git(
"diff", "--stat", f"origin/main..origin/{branch}", timeout=10,
)
if rc_diff != 0 or not diff_out.strip():
return True, "already up to date"
logger.info("External PR branch %s is descendant of main but has new content — proceeding", branch)
async with _bare_repo_lock:
# Clean up any stale local branch from a prior failed run
await _git("branch", "-D", local_branch)
rc, out = await _git("worktree", "add", "-b", local_branch, worktree_path, "origin/main")
if rc != 0:
return False, f"worktree add failed: {out}"
try:
merge_msg = f"Merge external GitHub PR #{gh_pr_num}: {branch_slug}"
rc, out = await _git(
"merge", "--no-ff", f"origin/{branch}",
"-m", merge_msg,
cwd=worktree_path, timeout=60,
)
if rc != 0:
# Identify conflicts
rc_ls, conflicting = await _git(
"diff", "--name-only", "--diff-filter=U", cwd=worktree_path,
)
conflict_files = [
f.strip() for f in conflicting.split("\n") if f.strip()
] if rc_ls == 0 else []
if conflict_files and all(f.startswith("entities/") for f in conflict_files):
# Entity-only conflicts: take main's version (entities are recoverable)
# In merge: --ours = branch we're ON (worktree HEAD = main)
# --theirs = branch merging in (origin/{branch})
for cf in conflict_files:
await _git("checkout", "--ours", cf, cwd=worktree_path)
await _git("add", cf, cwd=worktree_path)
# Complete the merge using the prepared MERGE_MSG (no editor)
rc_cont, cont_out = await _git(
"-c", "core.editor=true",
"commit", "--no-edit",
cwd=worktree_path, timeout=60,
)
if rc_cont != 0:
await _git("merge", "--abort", cwd=worktree_path)
return False, f"merge entity resolution failed for PR #{gh_pr_num}: {cont_out}"
logger.info(
"External PR #%s merge: entity conflict auto-resolved (dropped %s)",
gh_pr_num, ", ".join(sorted(conflict_files)),
)
else:
conflict_detail = ", ".join(conflict_files) if conflict_files else out[:200]
await _git("merge", "--abort", cwd=worktree_path)
return False, f"merge conflict on PR #{gh_pr_num}: {conflict_detail}"
# Capture the merge commit SHA before any pushes
rc, merge_sha = await _git("rev-parse", "HEAD", cwd=worktree_path)
if rc != 0:
return False, f"rev-parse merge HEAD failed: {merge_sha}"
merge_sha = merge_sha.strip().split("\n")[0]
# Push to synthetic audit ref _merged/{branch} (does not touch contributor's
# gh-pr-N/* branch). Plain --force: the audit ref is bot-owned and per-PR;
# if a prior aborted attempt left a stale ref, overwriting it is the
# intended behavior, and there's no concurrent writer to lease against.
rc, out = await _git(
"push", "--force", "origin", f"HEAD:refs/heads/{audit_ref}",
cwd=worktree_path, timeout=30,
)
if rc != 0:
return False, f"push to audit ref {audit_ref} failed: {out}"
# ff-push the merge commit to main. This is a true fast-forward (M is a
# descendant of origin/main via its first parent), so no --force needed.
# Forgejo's branch protection allows ff-push to main from authorized users.
rc, out = await _git(
"push", "origin", f"{merge_sha}:main",
cwd=worktree_path, timeout=30,
)
if rc != 0:
# Roll back audit ref if main push failed — keeps state consistent.
await _git("push", "--delete", "origin", f"refs/heads/{audit_ref}",
cwd=worktree_path, timeout=15)
return False, f"ff-push to main failed: {out}"
# Sentinel return: "merged --no-ff" prefix triggers dispatch's external-PR
# close path (skips ff-push, does PR-close + mark_merged + audit).
# Full 40-char merge SHA in the message so dispatch can parse it for audit.
return True, f"merged --no-ff (external PR #{gh_pr_num}, M={merge_sha}, audit_ref={audit_ref})"
finally:
async with _bare_repo_lock:
await _git("worktree", "remove", "--force", worktree_path)
await _git("branch", "-D", local_branch)
from .frontmatter import ( from .frontmatter import (
REWEAVE_EDGE_FIELDS, REWEAVE_EDGE_FIELDS,
parse_yaml_frontmatter, parse_yaml_frontmatter,
@ -733,6 +898,12 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
# (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup) # (Ganymede: manifest approach, Theseus: superset assertion + order-preserving dedup)
if branch.startswith("reweave/"): if branch.startswith("reweave/"):
merge_fn = _merge_reweave_pr(branch) merge_fn = _merge_reweave_pr(branch)
elif branch.startswith("gh-pr-") and config.EXTERNAL_PR_NO_FF_MERGE:
# External GitHub fork PRs: --no-ff merge so contributor SHA lands
# in main's history → GitHub recognizes "merged" badge.
# Backout via config.EXTERNAL_PR_NO_FF_MERGE = False (falls back to cherry-pick).
# Phase 2 of external contributor merge flow (Ship architecture review Apr 28).
merge_fn = _merge_no_ff_external(branch)
else: else:
# Extraction commits ADD new files — cherry-pick applies cleanly. # Extraction commits ADD new files — cherry-pick applies cleanly.
merge_fn = _cherry_pick_onto_main(branch) merge_fn = _cherry_pick_onto_main(branch)
@ -786,6 +957,58 @@ async def _merge_domain_queue(conn, domain: str) -> tuple[int, int]:
succeeded += 1 succeeded += 1
continue continue
# External GitHub PR (gh-pr-*): _merge_no_ff_external already pushed
# the merge commit to origin/main + the synthetic _merged/{branch}
# audit ref. Skip dispatch's ff-push (would fail — origin/{branch} is
# the contributor's untouched branch, not a descendant of main).
# Just close PR + mark_merged + audit, parsing merge SHA from sentinel.
if pick_msg.startswith("merged --no-ff"):
m = re.search(r"M=([a-f0-9]{40})", pick_msg)
merge_sha = m.group(1) if m else None
m_ref = re.search(r"audit_ref=(\S+?)\)", pick_msg)
audit_ref = m_ref.group(1) if m_ref else None
m_pr = re.search(r"external PR #(\d+)", pick_msg)
gh_pr_num = m_pr.group(1) if m_pr else None
# Surface drift between dispatch and _merge_no_ff_external if the
# success-message contract changes. Merge already succeeded; this
# is signal-only, not a gate on the close path.
if not (m and m_ref and m_pr):
logger.warning(
"PR #%d sentinel parse incomplete: M=%s, audit_ref=%s, gh_pr=%s, msg=%r",
pr_num, bool(m), bool(m_ref), bool(m_pr), pick_msg,
)
leo_token = get_agent_token("leo")
comment_body = (
f"Merged via --no-ff into main.\n"
f"Merge commit: `{merge_sha}`\n"
f"Audit ref: `{audit_ref}`\n"
f"Branch: `{branch}` (preserved unchanged)"
)
await forgejo_api("POST", repo_path(f"issues/{pr_num}/comments"),
{"body": comment_body})
result = await forgejo_api("PATCH", repo_path(f"pulls/{pr_num}"),
{"state": "closed"}, token=leo_token)
if result is None:
logger.error("PR #%d: Forgejo close failed (no-ff path), skipping DB update", pr_num)
failed += 1
continue
mark_merged(conn, pr_num)
db.audit(conn, "merge", "merged", json.dumps({
"pr": pr_num, "branch": branch, "method": "no-ff",
"merge_commit_sha": merge_sha,
"audit_ref": audit_ref,
"github_pr": gh_pr_num,
}))
# NOTE: do NOT _delete_remote_branch(branch) here. The contributor's
# gh-pr-N/* branch is the mirror of their fork PR head — leaving it
# in place lets sync-mirror keep the GitHub PR <-> Forgejo PR link
# observable. The synthetic _merged/{branch} ref carries the merge.
logger.info("PR #%d merged via --no-ff (M=%s)", pr_num,
merge_sha[:8] if merge_sha else "?")
succeeded += 1
continue
# Local ff-push: cherry-picked branch is a descendant of origin/main. # Local ff-push: cherry-picked branch is a descendant of origin/main.
# Regular push = fast-forward. Non-ff rejected by default (same safety). # Regular push = fast-forward. Non-ff rejected by default (same safety).
# --force-with-lease removed: Forgejo categorically blocks it on protected branches. # --force-with-lease removed: Forgejo categorically blocks it on protected branches.

View file

@ -267,6 +267,7 @@ format: tweet | thread
status: unprocessed status: unprocessed
priority: high | medium | low priority: high | medium | low
tags: [topic1, topic2] tags: [topic1, topic2]
intake_tier: research-task
--- ---
## Content ## Content

618
scripts/backfill-events.py Normal file
View file

@ -0,0 +1,618 @@
#!/usr/bin/env python3
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
For each merged PR:
- Derive author from prs.submitted_by git author branch prefix
- Emit author event (role=author, weight=0.30, claim_path=NULL)
- For each claim file under a knowledge prefix, parse frontmatter and emit
originator events for sourcer entries that differ from the author
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
(when domain_verdict='approve' and not Leo)
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
Usage:
python3 scripts/backfill-events.py --dry-run # Count events without writing
python3 scripts/backfill-events.py # Apply
Runs read-only against the git worktree; only writes to pipeline.db.
"""
import argparse
import os
import re
import sqlite3
import subprocess
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
ROLE_WEIGHTS = {
"author": 0.30,
"challenger": 0.25,
"synthesizer": 0.20,
"originator": 0.15,
"evaluator": 0.05,
}
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES.
# Duplicated here because this script runs standalone (no pipeline package import).
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"research": "synthesizer",
"reweave": "synthesizer",
}
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
BOT_AUTHORS = frozenset({
"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions",
})
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
if row:
return row[0]
return h
def classify_kind(handle: str) -> str:
h = handle.strip().lower().lstrip("@")
return "agent" if h in PENTAGON_AGENTS else "person"
def parse_frontmatter(text: str):
"""Minimal YAML frontmatter parser using PyYAML when available."""
if not text.startswith("---"):
return None
end = text.find("---", 3)
if end == -1:
return None
raw = text[3:end]
try:
import yaml
fm = yaml.safe_load(raw)
return fm if isinstance(fm, dict) else None
except ImportError:
return None
except Exception:
return None
def extract_sourcers_from_file(path: Path) -> list[str]:
"""Return the sourcer handles from a claim file's frontmatter.
Matches three formats:
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
2. Bare-key flat: `sourcer: alexastrum`
3. Prefix-keyed: `attribution_sourcer: alexastrum`
"""
try:
content = path.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
return []
fm = parse_frontmatter(content)
if not fm:
return []
handles: list[str] = []
attr = fm.get("attribution")
if isinstance(attr, dict):
entries = attr.get("sourcer", [])
if isinstance(entries, list):
for e in entries:
if isinstance(e, dict) and "handle" in e:
handles.append(e["handle"])
elif isinstance(e, str):
handles.append(e)
elif isinstance(entries, str):
handles.append(entries)
return handles
flat = fm.get("attribution_sourcer")
if flat:
if isinstance(flat, str):
handles.append(flat)
elif isinstance(flat, list):
handles.extend(v for v in flat if isinstance(v, str))
if handles:
return handles
bare = fm.get("sourcer")
if bare:
if isinstance(bare, str):
handles.append(bare)
elif isinstance(bare, list):
handles.extend(v for v in bare if isinstance(v, str))
return handles
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
def valid_handle(h: str) -> bool:
if not h:
return False
lower = h.strip().lower().lstrip("@")
if lower.endswith("-") or lower.endswith("_"):
return False
return bool(_HANDLE_RE.match(lower))
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
"""Run a git command, return stdout. Returns empty string on failure."""
try:
result = subprocess.run(
["git", *args],
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
)
return result.stdout
except (subprocess.TimeoutExpired, OSError):
return ""
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
"""Best-effort: find git author of first non-merge commit on the branch.
PR branches are usually deleted after merge. We fall back to scanning main
commits around merged_at for commits matching the branch slug.
"""
# Post-merge branches are cleaned up. For the backfill, we accept that this
# path rarely yields results and rely on submitted_by + branch prefix.
return ""
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
if pr.get("submitted_by"):
cand = pr["submitted_by"].strip().lower().lstrip("@")
if cand and cand not in BOT_AUTHORS:
return cand
branch = pr.get("branch") or ""
if "/" in branch:
prefix = branch.split("/", 1)[0].lower()
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
return prefix
return None
def find_pr_for_claim(
conn: sqlite3.Connection,
repo: Path,
md: Path,
) -> tuple[int | None, str]:
"""Recover the Forgejo PR number that introduced a claim file.
Returns (pr_number, strategy) strategy is one of:
'sourced_from' frontmatter sourced_from matched prs.source_path
'git_subject' git log first-add commit message matched a branch pattern
'title_desc' filename stem matched a title in prs.description
'github_pr' recovery commit mentioned GitHub PR # → prs.github_pr
'none' no strategy found a match
Order is chosen by reliability:
1. sourced_from (explicit provenance, most reliable when present)
2. git_subject (covers Leo research, Cameron challenges, Theseus contrib)
3. title_desc (current fallback brittle when description is NULL)
4. github_pr (recovery commits referencing erased GitHub PRs)
"""
rel = str(md.relative_to(repo))
# Strategy 1: sourced_from frontmatter → prs.source_path
try:
content = md.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
content = ""
fm = parse_frontmatter(content) if content else None
if fm:
sourced = fm.get("sourced_from")
candidate_paths: list[str] = []
if isinstance(sourced, str) and sourced:
candidate_paths.append(sourced)
elif isinstance(sourced, list):
candidate_paths.extend(s for s in sourced if isinstance(s, str))
for sp in candidate_paths:
stem = Path(sp).stem
if not stem:
continue
row = conn.execute(
"""SELECT number FROM prs
WHERE source_path LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"%{stem}.md",),
).fetchone()
if row:
return row["number"], "sourced_from"
# Strategy 2: git log first-add commit → subject pattern → prs.branch
# Default log order is reverse-chronological; take the last line (oldest)
# to get the original addition, not later rewrites.
log_out = git(
"log", "--diff-filter=A", "--follow",
"--format=%H|||%s|||%b", "--", rel,
)
if log_out.strip():
# Split on the delimiter we chose. Each commit produces 3 fields but
# %b can contain blank lines — group by lines that look like a SHA.
blocks: list[tuple[str, str, str]] = []
current: list[str] = []
for line in log_out.splitlines():
if re.match(r"^[a-f0-9]{40}\|\|\|", line):
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
current = [line]
else:
current.append(line)
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
if blocks:
# Oldest addition — git log defaults to reverse-chronological
_oldest_sha, subject, body = blocks[-1]
# Pattern: "<agent>: extract claims from <slug>"
m = re.match(r"^(\w+):\s*extract\s+claims\s+from\s+(\S+)", subject)
if m:
slug = m.group(2).rstrip(".md").rstrip(".")
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"extract/{slug}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: research session <date>"
m = re.match(r"^(\w+):\s*research\s+session\s+(\d{4}-\d{2}-\d{2})", subject)
if m:
agent = m.group(1).lower()
date = m.group(2)
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/research-{date}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: challenge" / contrib challenges / entity batches
m = re.match(r"^(\w+):\s*(?:challenge|contrib|entity|synthesize)", subject)
if m:
agent = m.group(1).lower()
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Recovery commits referencing erased GitHub PRs (Alex/Cameron).
# Subject: "Recover <who> contribution from GitHub PR #NN (...)".
# Match only when a corresponding prs row exists with github_pr=NN —
# otherwise the claims were direct-to-main without a Forgejo PR
# record, which requires a synthetic PR row (follow-up, not in
# this script's scope).
gh_match = re.search(r"GitHub\s+PR\s+#(\d+)", subject + "\n" + body)
if gh_match:
gh_pr = int(gh_match.group(1))
row = conn.execute(
"SELECT number FROM prs WHERE github_pr = ? AND status='merged' LIMIT 1",
(gh_pr,),
).fetchone()
if row:
return row["number"], "github_pr"
# Pattern: bare "Extract N claims from <source-fragment>" (no
# agent prefix). Used in early research PRs like Shaga's claims
# at PR #2025. Fall back to time-proximity: find the earliest
# agent-branch PR merged within 24h AFTER this commit's date.
m = re.match(r"^Extract\s+\d+\s+claims\s+from\b", subject)
if m:
# Get commit author date
date_out = git(
"log", "-1", "--format=%aI", _oldest_sha, timeout=10,
)
commit_date = date_out.strip() if date_out.strip() else None
if commit_date:
# git %aI returns ISO 8601 with T-separator; prs.merged_at
# uses SQLite's space-separator. Lexicographic comparison
# fails across formats (space<T), so normalize commit_date
# via datetime() before comparing. Without this, PRs merged
# within the same calendar day but earlier than the commit
# hour are silently excluded (caught by Ganymede review —
# Shaga's #2025 was dropped in favor of later #2032).
row = conn.execute(
"""SELECT number FROM prs
WHERE status='merged'
AND merged_at >= datetime(?)
AND merged_at <= datetime(datetime(?), '+24 hours')
AND (branch LIKE 'leo/%' OR branch LIKE 'theseus/%'
OR branch LIKE 'rio/%' OR branch LIKE 'astra/%'
OR branch LIKE 'vida/%' OR branch LIKE 'clay/%')
ORDER BY merged_at ASC LIMIT 1""",
(commit_date, commit_date),
).fetchone()
if row:
return row["number"], "git_time_proximity"
return None, "none"
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
canonical = normalize_handle(conn, handle)
if not valid_handle(canonical):
return
kind = classify_kind(canonical)
weight = ROLE_WEIGHTS[role]
counts[(role, "attempt")] += 1
if dry_run:
counts[(role, "would_insert")] += 1
return
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
)
if cur.rowcount > 0:
counts[(role, "inserted")] += 1
else:
counts[(role, "skipped_dup")] += 1
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
"""Best-effort: list added .md files in the PR.
Uses prs.source_path as a fallback signal (the claim being added). If the
branch no longer exists post-merge, this will return []; we accept the loss
for historical PRs where the granular per-claim events can't be recovered —
PR-level author/evaluator events still land correctly.
"""
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
# For the backfill we use prs.source_path — for extract/* PRs this points to
# the source inbox file; we can glob the claim files from the extract branch
# commit on main. But main's commits don't track which files a given PR touched.
# Accept the loss: backfill emits only PR-level events (author, evaluator,
# challenger/synthesizer). Originator events come from parsing claim files
# attributed to the branch via description field which lists claim titles.
return []
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: contribution_events exists (v24 migration applied)
try:
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
sys.exit(2)
# Walk all merged knowledge PRs
query = """
SELECT number, branch, domain, source_channel, submitted_by,
leo_verdict, domain_verdict, domain_agent,
commit_type, merged_at
FROM prs
WHERE status = 'merged'
ORDER BY merged_at ASC
"""
if args.limit:
query += f" LIMIT {args.limit}"
prs = conn.execute(query).fetchall()
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
counts: Counter = Counter()
repo = Path(REPO_DIR)
for pr in prs:
pr_number = pr["number"]
branch = pr["branch"] or ""
domain = pr["domain"]
channel = pr["source_channel"]
merged_at = pr["merged_at"]
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
# evaluator credit for Leo/domain_agent still applies.
is_pipeline_branch = branch.startswith((
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
))
# ── AUTHOR ──
# For pipeline branches, submitted_by carries the real author (the
# human who submitted the source via Telegram/etc). For agent branches,
# the agent is author. For external branches (gh-pr-*), git author is
# in submitted_by from the sync-mirror pipeline.
author = derive_author(conn, dict(pr))
if author:
emit(conn, counts, args.dry_run, author, "author", pr_number,
None, domain, channel, merged_at)
# ── EVALUATOR ──
if pr["leo_verdict"] == "approve":
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
None, domain, channel, merged_at)
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
dagent = pr["domain_agent"].strip().lower()
if dagent and dagent != "leo":
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
None, domain, channel, merged_at)
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
# Only fires on agent-owned branches. Pipeline branches aren't creditable
# work (they're machine extraction, evaluator already captures the review).
if branch.startswith(AGENT_BRANCH_PREFIXES):
prefix = branch.split("/", 1)[0].lower()
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
if event_role:
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
None, domain, channel, merged_at)
# ── ORIGINATOR per claim ──
# Walk claim files currently on main whose content was added in this PR.
# We can't diff old branches (deleted post-merge), but for extract PRs
# the source_path + description carry claim titles — too lossy to build
# per-claim events reliably. Strategy: walk ALL claim files that have a
# sourcer in their frontmatter and assign them to the PR whose
# source_path matches (via description or filename heuristic).
# DEFERRED: per-claim originator events require branch introspection
# that fails on deleted branches. Backfill emits PR-level events only.
# Forward traffic (post-deploy) gets per-claim originator events via
# record_contributor_attribution's added-files walk.
if not args.dry_run:
conn.commit()
# Originator is emitted in the claim-level pass below, not the PR-level pass.
# Previous summary listed it here with attempted=0 which confused operators.
print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===")
for role in ("author", "challenger", "synthesizer", "evaluator"):
att = counts[(role, "attempt")]
if args.dry_run:
wi = counts[(role, "would_insert")]
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
else:
ins = counts[(role, "inserted")]
skip = counts[(role, "skipped_dup")]
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
# ── Per-claim originator pass ──
# Walk the knowledge tree, parse sourcer attribution, and attach each claim
# to its merging PR via find_pr_for_claim's multi-strategy recovery.
# Apr 24 rewrite (Ganymede-approved): replaces the single-strategy
# title→description match with four strategies in reliability order.
# Previous script missed PRs with NULL description (Cameron #3377) and
# cross-context claims (Shaga's Leo research). Fallback title-match is
# preserved to recover anything the git-log path misses.
print("\n=== Claim-level originator pass ===")
# Build title → pr_number map from prs.description (strategy 3 fallback)
title_to_pr: dict[str, int] = {}
for r in conn.execute(
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
).fetchall():
desc = r["description"] or ""
for title in desc.split(" | "):
title = title.strip()
if title:
# Last-writer wins. Conflicts are rare (titles unique in practice).
title_to_pr[title.lower()] = r["number"]
claim_counts = Counter()
strategy_counts = Counter()
claim_count = 0
originator_count = 0
for md in sorted(repo.glob("domains/**/*.md")) + \
sorted(repo.glob("core/**/*.md")) + \
sorted(repo.glob("foundations/**/*.md")) + \
sorted(repo.glob("decisions/**/*.md")):
rel = str(md.relative_to(repo))
stem = md.stem
# Strategies 1, 2, 4 via the helper (sourced_from, git_subject, github_pr).
pr_number, strategy = find_pr_for_claim(conn, repo, md)
# Strategy 3 (fallback): title-match against prs.description.
if not pr_number:
pr_number = title_to_pr.get(stem.lower())
if not pr_number:
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
if pr_number:
strategy = "title_desc"
if not pr_number:
claim_counts["no_pr_match"] += 1
continue
sourcers = extract_sourcers_from_file(md)
if not sourcers:
claim_counts["no_sourcer"] += 1
continue
claim_count += 1
strategy_counts[strategy] += 1
# Look up author for this PR to skip self-credit
pr_row = conn.execute(
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
if not pr_row:
continue
author = derive_author(conn, dict(pr_row))
author_canonical = normalize_handle(conn, author) if author else None
for src_handle in sourcers:
src_canonical = normalize_handle(conn, src_handle)
if not valid_handle(src_canonical):
claim_counts["invalid_handle"] += 1
continue
if src_canonical == author_canonical:
claim_counts["skip_self"] += 1
continue
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
originator_count += 1
if not args.dry_run:
conn.commit()
print(f" Claims processed: {claim_count}")
print(f" Originator events emitted: {originator_count}")
print(f" Breakdown: {dict(claim_counts)}")
print(f" Strategy hits: {dict(strategy_counts)}")
att = counts[("originator", "attempt")]
if args.dry_run:
wi = counts[("originator", "would_insert")]
print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}")
else:
ins = counts[("originator", "inserted")]
skip = counts[("originator", "skipped_dup")]
print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
if not args.dry_run:
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
print(f"\nTotal contribution_events rows: {total}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,280 @@
#!/usr/bin/env python3
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
Problem: research-session.sh used to write source frontmatter without
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
`contribution_events` as a `handle='m3taversal', role='author'` row per
research-derived claim. Result: agent research credited to the human.
Forward fix is a frontmatter-template patch to research-session.sh.
This script corrects historical records.
Identification:
Research-session source archives are committed to teleo-codex with a
message matching `^<agent>: research session YYYY-MM-DD `. The diff
for that commit lists `inbox/queue/*.md` files the agent created. Any
PR whose `source_path` matches one of those filenames is research-derived.
Touch list (per matched PR):
1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
2. DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author' AND pr_number=?
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
Defaults to --dry-run. Pass --apply to commit changes.
Usage:
python3 backfill-research-session-attribution.py --dry-run --days 30
python3 backfill-research-session-attribution.py --apply --days 30
"""
import argparse
import logging
import os
import re
import sqlite3
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("backfill-research-attr")
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
AUTHOR_WEIGHT = 0.30
def git(repo: Path, *args: str) -> str:
"""Run a git command in repo, return stdout. Raises on non-zero."""
result = subprocess.run(
["git", "-C", str(repo), *args],
capture_output=True, text=True, check=True,
)
return result.stdout
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
"""Return {source_filename_basename: agent_handle} for last N days.
Walks teleo-codex `git log --since`, filters to research-session commits,
parses agent from message header, lists inbox/queue/*.md files added in
that commit's diff. Maps the basename (which becomes source_path on extract)
to the agent who created it.
"""
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
file_to_agent: dict[str, str] = {}
commits_seen = 0
commits_matched = 0
for line in log.splitlines():
if not line or "|" not in line:
continue
commits_seen += 1
sha, _, subject = line.partition("|")
m = COMMIT_HEADER_RE.match(subject)
if not m:
continue
agent = m.group(1)
if agent not in KNOWN_AGENTS:
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
continue
commits_matched += 1
# List files added in this commit (inbox/queue/*.md only)
try:
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
"--diff-filter=A", sha)
except subprocess.CalledProcessError:
logger.warning("diff-tree failed for %s", sha[:8])
continue
for f in added.splitlines():
if f.startswith("inbox/queue/") and f.endswith(".md"):
basename = Path(f).name
if basename in file_to_agent and file_to_agent[basename] != agent:
logger.warning(
"filename collision: %s — was %s, now %s (keeping first)",
basename, file_to_agent[basename], agent,
)
continue
file_to_agent.setdefault(basename, agent)
logger.info(
"scanned %d commits, %d research-session matches, %d unique source files",
commits_seen, commits_matched, len(file_to_agent),
)
return file_to_agent
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
Only includes PRs:
- with source_path basename in our research-session map
- currently attributed to '@m3taversal'
- merged within the last N days (cap on temporal scope)
"""
rows = conn.execute(
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
FROM prs
WHERE submitted_by = '@m3taversal'
AND source_path IS NOT NULL
AND status = 'merged'
AND merged_at > datetime('now', ?)""",
(f"-{days} days",),
).fetchall()
matches = []
for row in rows:
basename = Path(row["source_path"]).name
agent = file_to_agent.get(basename)
if agent:
matches.append({
"pr": row["number"],
"current_submitted_by": row["submitted_by"],
"source_path": row["source_path"],
"basename": basename,
"agent": agent,
"domain": row["domain"],
"channel": row["source_channel"],
"merged_at": row["merged_at"],
})
return matches
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
return conn.execute(
"""SELECT COUNT(*) FROM contribution_events
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
(handle, role, pr),
).fetchone()[0]
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
"""Apply the backfill. Returns counters."""
counters = defaultdict(int)
if not dry_run:
conn.execute("BEGIN")
try:
for m in matches:
pr = m["pr"]
agent = m["agent"]
# Pre-checks for accurate dry-run reporting
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
if dry_run:
logger.info(
"would update pr=%d submitted_by '%s''%s (self-directed)' "
"[m3ta_event=%s, agent_event=%s]",
pr, m["current_submitted_by"], agent,
old_event_exists, new_event_exists,
)
counters["prs"] += 1
if old_event_exists:
counters["events_to_delete"] += 1
if not new_event_exists:
counters["events_to_insert"] += 1
continue
# 1. UPDATE prs.submitted_by
conn.execute(
"UPDATE prs SET submitted_by = ? WHERE number = ?",
(f"{agent} (self-directed)", pr),
)
counters["prs"] += 1
# 2. INSERT new agent author event (idempotent via UNIQUE index)
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
)
if cur.rowcount > 0:
counters["events_inserted"] += 1
# 3. DELETE old m3taversal author event
cur = conn.execute(
"""DELETE FROM contribution_events
WHERE handle = 'm3taversal' AND role = 'author'
AND pr_number = ? AND claim_path IS NULL""",
(pr,),
)
if cur.rowcount > 0:
counters["events_deleted"] += 1
if not dry_run:
conn.execute("COMMIT")
except Exception:
if not dry_run:
conn.execute("ROLLBACK")
raise
return dict(counters)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
parser.add_argument("--limit", type=int, default=0,
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
args = parser.parse_args()
dry_run = not args.apply
logger.info("repo=%s db=%s days=%d mode=%s",
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
if not args.repo.exists():
logger.error("repo not found: %s", args.repo)
sys.exit(1)
if not args.db.exists():
logger.error("db not found: %s", args.db)
sys.exit(1)
file_to_agent = discover_research_session_archives(args.repo, args.days)
if not file_to_agent:
logger.warning("no research-session source files found in last %d days", args.days)
sys.exit(0)
# Per-agent breakdown
by_agent = defaultdict(int)
for agent in file_to_agent.values():
by_agent[agent] += 1
for agent, count in sorted(by_agent.items()):
logger.info(" research-session sources by %s: %d", agent, count)
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
matches = find_misattributed_prs(conn, file_to_agent, args.days)
logger.info("misattributed PRs found: %d", len(matches))
if args.limit and len(matches) > args.limit:
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
matches = matches[:args.limit]
if not matches:
logger.info("nothing to do")
return
# Per-agent breakdown of misattribution
miss_by_agent = defaultdict(int)
for m in matches:
miss_by_agent[m["agent"]] += 1
logger.info("misattributed PR breakdown:")
for agent, count in sorted(miss_by_agent.items()):
logger.info(" %s: %d", agent, count)
counters = apply_backfill(conn, matches, dry_run)
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,261 @@
#!/usr/bin/env python3
"""Backfill sourcer/extractor/etc. attribution from claim frontmatter.
Walks every merged knowledge file under domains/, entities/, decisions/,
foundations/, convictions/, core/ and re-runs the canonical attribution
parser (lib/attribution.py). For each parsed (handle, role) pair, increments
the corresponding *_count column on the contributors table.
Why this is needed (Apr 24 incident):
- lib/contributor.py used a diff-line regex parser that handled neither
the bare-key flat format (`sourcer: alexastrum`, ~42% of claims) nor
the nested `attribution: { sourcer: [...] }` block format used by Leo's
manual extractions (Shaga's claims).
- Result: alexastrum, thesensatore, cameron-s1, and similar handles were
silently dropped at merge time. Their contributor rows either don't
exist or are stuck at zero counts.
Usage:
python3 backfill-sourcer-attribution.py --dry-run # report deltas, no writes
python3 backfill-sourcer-attribution.py # apply (additive: max(db, truth))
python3 backfill-sourcer-attribution.py --reset # destructive: set absolute truth
Default mode is ADDITIVE for safety: per-role count is set to max(current_db, truth).
This preserves any existing high counts that came from non-frontmatter sources
(e.g., m3taversal.sourcer=1011 reflects Telegram-curator credit accumulated via
a different code path; truncating to the file-walk truth would be destructive).
Use --reset to set absolute truth from the file walk only this clobbers
all existing role counts including legitimate non-frontmatter credit.
Idempotency: additive mode is safe to re-run. --reset run is gated by an
audit_log marker; pass --force to override.
"""
import argparse
import os
import sqlite3
import sys
from collections import defaultdict
from pathlib import Path
# Allow running from anywhere — point at pipeline lib
PIPELINE_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PIPELINE_ROOT))
from lib.attribution import parse_attribution_from_file, VALID_ROLES # noqa: E402
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
KNOWLEDGE_PREFIXES = (
"domains", "entities", "decisions", "foundations", "convictions", "core",
)
def collect_attributions(repo_root: Path) -> dict[str, dict[str, int]]:
"""Walk all knowledge files; return {handle: {role: count}}."""
counts: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
files_scanned = 0
files_with_attribution = 0
for prefix in KNOWLEDGE_PREFIXES:
base = repo_root / prefix
if not base.exists():
continue
for path in base.rglob("*.md"):
if path.name.startswith("_"):
continue
files_scanned += 1
attr = parse_attribution_from_file(str(path))
had_any = False
for role, entries in attr.items():
for entry in entries:
handle = entry.get("handle")
if handle:
counts[handle][role] += 1
had_any = True
if had_any:
files_with_attribution += 1
print(f" Scanned {files_scanned} knowledge files", file=sys.stderr)
print(f" {files_with_attribution} had parseable attribution", file=sys.stderr)
return counts
def existing_contributors(conn) -> dict[str, dict[str, int]]:
"""Return {handle: {role: count}} from the current DB."""
rows = conn.execute(
"SELECT handle, sourcer_count, extractor_count, challenger_count, "
"synthesizer_count, reviewer_count, claims_merged FROM contributors"
).fetchall()
out = {}
for r in rows:
out[r["handle"]] = {
"sourcer": r["sourcer_count"] or 0,
"extractor": r["extractor_count"] or 0,
"challenger": r["challenger_count"] or 0,
"synthesizer": r["synthesizer_count"] or 0,
"reviewer": r["reviewer_count"] or 0,
"claims_merged": r["claims_merged"] or 0,
}
return out
def claims_merged_for(role_counts: dict[str, int]) -> int:
"""Mirror upsert_contributor logic: claims_merged += sourcer + extractor."""
return role_counts.get("sourcer", 0) + role_counts.get("extractor", 0)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true",
help="Report deltas without writing")
parser.add_argument("--reset", action="store_true",
help="Destructive: set absolute truth from file walk "
"(default is additive max(db, truth))")
parser.add_argument("--force", action="store_true",
help="Re-run even if a previous --reset marker exists")
args = parser.parse_args()
if not REPO.exists():
print(f"ERROR: repo not found at {REPO}", file=sys.stderr)
sys.exit(1)
print(f"DB: {DB_PATH}", file=sys.stderr)
print(f"Repo: {REPO}", file=sys.stderr)
print("", file=sys.stderr)
print("Walking knowledge tree...", file=sys.stderr)
truth = collect_attributions(REPO)
print(f" Found attributions for {len(truth)} unique handles", file=sys.stderr)
print("", file=sys.stderr)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
current = existing_contributors(conn)
# Compute deltas: new handles + handles with role-count mismatches
new_handles: list[tuple[str, dict[str, int]]] = []
role_deltas: list[tuple[str, dict[str, int], dict[str, int]]] = []
for handle, roles in truth.items():
if handle not in current:
new_handles.append((handle, dict(roles)))
else:
cur = current[handle]
mismatches = {r: roles.get(r, 0) for r in VALID_ROLES
if roles.get(r, 0) != cur.get(r, 0)}
if mismatches:
role_deltas.append((handle, dict(roles), cur))
print(f"=== {len(new_handles)} NEW contributors to insert ===")
for handle, roles in sorted(new_handles, key=lambda x: -sum(x[1].values()))[:20]:
roles_str = ", ".join(f"{r}={c}" for r, c in roles.items() if c > 0)
print(f" + {handle}: {roles_str} (claims_merged={claims_merged_for(roles)})")
if len(new_handles) > 20:
print(f" ... and {len(new_handles) - 20} more")
print()
print(f"=== {len(role_deltas)} EXISTING contributors with count drift ===")
for handle, truth_roles, cur_roles in sorted(
role_deltas,
key=lambda x: -sum(x[1].values()),
)[:20]:
for role in VALID_ROLES:
t = truth_roles.get(role, 0)
c = cur_roles.get(role, 0)
if t != c:
print(f" ~ {handle}.{role}: db={c} → truth={t}{t - c:+d})")
if len(role_deltas) > 20:
print(f" ... and {len(role_deltas) - 20} more")
print()
if args.dry_run:
mode = "RESET" if args.reset else "ADDITIVE"
print(f"Dry run ({mode} mode) — no changes written.")
if not args.reset:
print("Default is ADDITIVE: existing high counts (e.g. m3taversal=1011) preserved.")
print("Pass --reset to clobber existing counts with file-walk truth.")
return
# Idempotency: --reset is gated by audit marker. Additive mode is always safe.
if args.reset:
marker = conn.execute(
"SELECT 1 FROM audit_log WHERE event = 'sourcer_attribution_backfill_reset' LIMIT 1"
).fetchone()
if marker and not args.force:
print("ERROR: --reset has already run (audit marker present).")
print("Pass --force to re-run.")
sys.exit(2)
inserted = 0
updated = 0
preserved_higher = 0
for handle, roles in truth.items():
truth_counts = {
"sourcer": roles.get("sourcer", 0),
"extractor": roles.get("extractor", 0),
"challenger": roles.get("challenger", 0),
"synthesizer": roles.get("synthesizer", 0),
"reviewer": roles.get("reviewer", 0),
}
if handle in current:
cur = current[handle]
if args.reset:
# Preserve reviewer_count even on reset (PR-level not file-level)
final = dict(truth_counts)
final["reviewer"] = max(truth_counts["reviewer"], cur.get("reviewer", 0))
else:
# Additive: max of db vs truth, per role
final = {
role: max(truth_counts[role], cur.get(role, 0))
for role in truth_counts
}
if any(cur.get(r, 0) > truth_counts[r] for r in truth_counts):
preserved_higher += 1
cm = final["sourcer"] + final["extractor"]
conn.execute(
"""UPDATE contributors SET
sourcer_count = ?,
extractor_count = ?,
challenger_count = ?,
synthesizer_count = ?,
reviewer_count = ?,
claims_merged = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(final["sourcer"], final["extractor"], final["challenger"],
final["synthesizer"], final["reviewer"], cm, handle),
)
updated += 1
else:
cm = truth_counts["sourcer"] + truth_counts["extractor"]
conn.execute(
"""INSERT INTO contributors (
handle, sourcer_count, extractor_count, challenger_count,
synthesizer_count, reviewer_count, claims_merged,
first_contribution, last_contribution, tier
) VALUES (?, ?, ?, ?, ?, ?, ?, date('now'), date('now'), 'new')""",
(handle, truth_counts["sourcer"], truth_counts["extractor"],
truth_counts["challenger"], truth_counts["synthesizer"],
truth_counts["reviewer"], cm),
)
inserted += 1
event = "sourcer_attribution_backfill_reset" if args.reset else "sourcer_attribution_backfill"
conn.execute(
"INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)",
("contributor", event,
f'{{"inserted": {inserted}, "updated": {updated}, '
f'"preserved_higher": {preserved_higher}, "mode": '
f'"{"reset" if args.reset else "additive"}"}}'),
)
conn.commit()
print(f"Done ({'RESET' if args.reset else 'ADDITIVE'}). "
f"Inserted {inserted} new, updated {updated} existing, "
f"preserved {preserved_higher} higher-than-truth values.")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""Reconstruct synthetic `prs` rows for historical GitHub PRs lost pre-mirror-wiring.
Two PRs merged on GitHub before our sync-mirror.sh tracked `github_pr`:
- GitHub PR #68: alexastrum — 6 claims, merged 2026-03-09 via GitHub squash,
recovered to Forgejo via commit dba00a79 (Apr 16, after mirror erased files)
- GitHub PR #88: Cameron-S1 — 1 claim, recovered via commit da64f805
The recovery commits wrote the files directly to main, so our `prs` table has
no row to attach originator events to the backfill-events.py strategies all
return NULL. We reconstruct one synthetic `prs` row per historical GitHub PR so
the events pipeline (and `github_pr` strategy in backfill-events) can credit
Alex and Cameron properly.
Numbers 900000+ are clearly synthetic and won't collide with real Forgejo PRs.
Idempotent via INSERT OR IGNORE.
Usage:
python3 scripts/backfill-synthetic-recovery-prs.py --dry-run
python3 scripts/backfill-synthetic-recovery-prs.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Historical GitHub PRs recovered via direct-to-main commits.
# Original GitHub merge dates come from the recovery commit messages.
RECOVERY_PRS = [
{
"number": 900068,
"github_pr": 68,
"branch": "gh-pr-68",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "alexastrum",
"source_channel": "github",
# origin='human' matches lib/merge.py convention for external contributors
# (default is 'pipeline' which misclassifies us as machine-authored).
"origin": "human",
"priority": "high",
"description": "Multi-agent git workflows production maturity | Cryptographic agent trust ratings | Defense in depth for AI agent oversight | Deterministic policy engines below LLM layer | Knowledge validation four-layer architecture | Structurally separating proposer and reviewer agents",
"merged_at": "2026-03-09 00:00:00",
"created_at": "2026-03-08 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #68 pre-mirror-wiring reconstruction (commit dba00a79)",
},
{
"number": 900088,
"github_pr": 88,
"branch": "gh-pr-88",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "cameron-s1",
"source_channel": "github",
"origin": "human",
"priority": "high",
"description": "Orthogonality is an artefact of specification architectures not a property of intelligence itself",
"merged_at": "2026-04-01 00:00:00",
"created_at": "2026-04-01 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #88 pre-mirror-wiring reconstruction (commit da64f805)",
},
]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Guard against synthetic-range colonization (Ganymede review): check for
# any row in the synthetic range that isn't one of ours. INSERT OR IGNORE on
# the specific numbers is the real collision defense; this is belt-and-suspenders.
max_real = conn.execute(
"SELECT MAX(number) FROM prs WHERE number < 900000"
).fetchone()[0] or 0
print(f"Max real Forgejo PR number: {max_real}")
synth_conflict = conn.execute(
"SELECT number FROM prs WHERE number >= 900000 AND number NOT IN (900068, 900088) LIMIT 1"
).fetchone()
if synth_conflict:
print(f"ERROR: PR #{synth_conflict[0]} already exists in synthetic range. "
f"Pick a new range before running.", file=sys.stderr)
sys.exit(2)
inserted = 0
skipped = 0
for row in RECOVERY_PRS:
existing = conn.execute(
"SELECT number FROM prs WHERE number = ? OR github_pr = ?",
(row["number"], row["github_pr"]),
).fetchone()
if existing:
print(f" PR #{row['number']} (github_pr={row['github_pr']}): already exists — skip")
skipped += 1
continue
print(f" {'(dry-run) ' if args.dry_run else ''}INSERT synthetic PR #{row['number']} "
f"(github_pr={row['github_pr']}, submitted_by={row['submitted_by']}, "
f"merged_at={row['merged_at']})")
if not args.dry_run:
conn.execute(
"""INSERT INTO prs (
number, github_pr, branch, status, domain, commit_type, tier,
leo_verdict, domain_verdict, submitted_by, source_channel,
origin, priority,
description, merged_at, created_at, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
row["number"], row["github_pr"], row["branch"], row["status"],
row["domain"], row["commit_type"], row["tier"],
row["leo_verdict"], row["domain_verdict"],
row["submitted_by"], row["source_channel"],
row["origin"], row["priority"],
row["description"], row["merged_at"], row["created_at"],
row["last_error"],
),
)
inserted += 1
if not args.dry_run:
conn.commit()
print(f"\nInserted {inserted}, skipped {skipped}")
if not args.dry_run and inserted:
print("\nNext step: re-run backfill-events.py to attach originator events")
print(" python3 ops/backfill-events.py")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""Classify `contributors` rows into {keep_person, keep_agent, move_to_publisher, delete_garbage}.
Reads current contributors table, proposes reclassification per v26 schema design:
- Real humans + Pentagon agents stay in contributors (kind='person'|'agent')
- News orgs, publications, venues move to publishers table (new v26)
- Multi-word hyphenated garbage (parsing artifacts) gets deleted
- Their contribution_events are handled per category:
* Publishers: DELETE events (orgs shouldn't have credit)
* Garbage: DELETE events (bogus data)
* Persons/agents: keep events untouched
Classification is heuristic uses explicit allowlists + regex patterns + length gates.
Ambiguous cases default to 'review_needed' (human decision).
Usage:
python3 scripts/classify-contributors.py # dry-run analysis + report
python3 scripts/classify-contributors.py --apply # write changes
python3 scripts/classify-contributors.py --show <handle> # inspect a single row
Writes to pipeline.db only. Does NOT modify claim files.
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Pentagon agents: kind='agent'. Authoritative list.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Publisher/news-org handles seen in current contributors table.
# Grouped by kind for the publishers row. Classified by inspection.
# NOTE: This list is hand-curated — add to it as new orgs appear.
PUBLISHERS_NEWS = {
# News outlets / brands
"cnbc", "al-jazeera", "axios", "bloomberg", "reuters", "bettorsinsider",
"fortune", "techcrunch", "coindesk", "coindesk-staff", "coindesk-research",
"coindesk research", "coindesk staff",
"defense-one", "thedefensepost", "theregister", "the-intercept",
"the-meridiem", "variety", "variety-staff", "variety staff", "spacenews",
"nasaspaceflight", "thedonkey", "insidedefense", "techpolicypress",
"morganlewis", "casinoorg", "deadline", "animationmagazine",
"defensepost", "casino-org", "casino.org",
"air & space forces magazine", "ieee spectrum", "techcrunch-staff",
"blockworks", "blockworks-staff", "decrypt", "ainvest", "banking-dive", "banking dive",
"cset-georgetown", "cset georgetown",
"kff", "kff-health-news", "kff health news", "kff-health-news---cbo",
"kff-health-news-/-cbo", "kff health news / cbo", "kffhealthnews",
"bloomberg-law",
"norton-rose-fulbright", "norton rose fulbright",
"defence-post", "the-defensepost",
"wilmerhale", "mofo", "sciencedirect",
"yogonet", "csr", "aisi-uk", "aisi", "aisi_gov", "rand",
"armscontrol", "eclinmed", "solana-compass", "solana compass",
"pmc11919318", "pmc11780016",
"healthverity", "natrium", "form-energy",
"courtlistener", "curtis-schiff", "curtis-schiff-prediction-markets",
"prophetx", "techpolicypress-staff",
"npr", "venturebeat", "geekwire", "payloadspace", "the-ankler",
"theankler", "tubefilter", "emarketer", "dagster",
"numerai", # fund/project brand, not person
"psl", "multistate",
}
PUBLISHERS_ACADEMIC = {
# Academic orgs, labs, papers, journals, institutions
"arxiv", "metr", "metr_evals", "apollo-research", "apollo research", "apolloresearch",
"jacc-study-authors", "jacc-data-report-authors",
"anthropic-fellows-program", "anthropic-fellows",
"anthropic-fellows-/-alignment-science-team", "anthropic-research",
"jmir-2024", "jmir 2024",
"oettl-et-al.,-journal-of-experimental-orthopaedics",
"oettl et al., journal of experimental orthopaedics",
"jacc", "nct06548490", "pmc",
"conitzer-et-al.-(2024)", "aquino-michaels-2026", "pan-et-al.",
"pan-et-al.-'natural-language-agent-harnesses'",
"stanford", "stanford-meta-harness",
"hendershot", "annals-im",
"nellie-liang,-brookings-institution", "nellie liang, brookings institution",
"penn-state", "american-heart-association", "american heart association",
"molt_cornelius", "molt-cornelius",
# Companies / labs / brand-orgs (not specific humans)
"anthropic", "anthropicai", "openai", "nasa", "icrc", "ecri",
"epochairesearch", "metadao", "iapam", "icer",
"who", "ama", "uspstf", "unknown",
"futard.io", # protocol/platform
"oxford-martin-ai-governance-initiative",
"oxford-martin-ai-governance",
"u.s.-food-and-drug-administration",
"jitse-goutbeek,-european-policy-centre", # cited person+org string → publisher
"adepoju-et-al.", # paper citation
# Formal-citation names (Firstname-Lastname or Lastname-et-al) — classified
# as academic citations, not reachable contributors. They'd need an @ handle
# to get CI credit per Cory's growth-loop design.
"senator-elissa-slotkin",
"bostrom", "hanson", "kaufmann", "noah-smith", "doug-shapiro",
"shayon-sengupta", "shayon sengupta",
"robin-hanson", "robin hanson", "eliezer-yudkowsky",
"leopold-aschenbrenner", "aschenbrenner",
"ramstead", "larsson", "heavey",
"dan-slimmon", "van-leeuwaarden", "ward-whitt", "adams",
"tamim-ansary", "spizzirri",
"dario-amodei", # formal-citation form (real @ is @darioamodei)
"corless", "oxranga", "vlahakis",
# Brand/project/DAO tokens — not individuals
"areal-dao", "areal", "theiaresearch", "futard-io", "dhrumil",
# Classic formal-citation names — famous academics/economists cited by surname.
# Reachable via @ handle if/when they join (e.g. Ostrom has no X, Hayek deceased,
# Friston has an institutional affiliation not an @ handle we'd track).
"clayton-christensen", "hidalgo", "coase", "wiener", "juarrero",
"ostrom", "centola", "hayek", "marshall-mcluhan", "blackmore",
"knuth", "friston", "aquino-michaels", "conitzer", "bak",
}
# NOTE: pseudonymous X handles that MAY be real contributors stay in keep_person:
# karpathy, simonw, swyx, metaproph3t, metanallok, mmdhrumil, sjdedic,
# ceterispar1bus — these are real X accounts and match Cory's growth loop.
# They appear without @ prefix because extraction frontmatter didn't normalize.
# Auto-creating them as contributors tier='cited' is correct (A-path from earlier).
PUBLISHERS_SOCIAL = {
"x", "twitter", "telegram", "x.com",
}
PUBLISHERS_INTERNAL = {
"teleohumanity-manifesto", "strategy-session-journal",
"living-capital-thesis-development", "attractor-state-historical-backtesting",
"web-research-compilation", "architectural-investing",
"governance---meritocratic-voting-+-futarchy", # title artifact
"sec-interpretive-release-s7-2026-09-(march-17", # title artifact
"mindstudio", # tooling/platform, not contributor
}
# Merge into one kind→set map for classification
PUBLISHER_KIND_MAP = {}
for h in PUBLISHERS_NEWS:
PUBLISHER_KIND_MAP[h.lower()] = "news"
for h in PUBLISHERS_ACADEMIC:
PUBLISHER_KIND_MAP[h.lower()] = "academic"
for h in PUBLISHERS_SOCIAL:
PUBLISHER_KIND_MAP[h.lower()] = "social_platform"
for h in PUBLISHERS_INTERNAL:
PUBLISHER_KIND_MAP[h.lower()] = "internal"
# Garbage: handles that are clearly parse artifacts, not real names.
# Pattern: contains parens, special chars, or >50 chars.
def is_garbage(handle: str) -> bool:
h = handle.strip()
if len(h) > 50:
return True
if re.search(r"[()\[\]<>{}\/\\|@#$%^&*=?!:;\"']", h):
# But @ can appear legitimately in handles like @thesensatore — allow if @ is only prefix
if h.startswith("@") and not re.search(r"[()\[\]<>{}\/\\|#$%^&*=?!:;\"']", h):
return False
return True
# Multi-word hyphenated with very specific artifact shape: 3+ hyphens in a row or trailing noise
if "---" in h or "---meritocratic" in h or h.endswith("(march") or h.endswith("-(march"):
return True
return False
def classify(handle: str) -> tuple[str, str | None]:
"""Return (category, publisher_kind).
category {'keep_agent', 'keep_person', 'publisher', 'garbage', 'review_needed'}
publisher_kind {'news','academic','social_platform','internal', None}
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return ("keep_agent", None)
if h in PUBLISHER_KIND_MAP:
return ("publisher", PUBLISHER_KIND_MAP[h])
if is_garbage(handle):
return ("garbage", None)
# @-prefixed handles or short-slug real-looking names → keep as person
# (Auto-create rule from Cory: @ handles auto-join as tier='cited'.)
if handle.startswith("@"):
return ("keep_person", None)
# Plausible handles (<=39 chars, alphanum + underscore/hyphen): treat as person.
# 39-char ceiling matches GitHub's handle limit and the writer path in
# contributor.py::_HANDLE_RE, so a valid 21-39 char real handle won't fall
# through to review_needed and block --apply.
if re.match(r"^[a-z0-9][a-z0-9_-]{0,38}$", h):
return ("keep_person", None)
# Everything else: needs human review
return ("review_needed", None)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="Write changes to DB")
parser.add_argument("--show", type=str, help="Inspect a single handle")
parser.add_argument("--delete-events", action="store_true",
help="DELETE contribution_events for publishers+garbage (default: keep for audit)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: publishers table must exist (v26 migration applied)
try:
conn.execute("SELECT 1 FROM publishers LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: publishers table missing. Run migration v26 first.", file=sys.stderr)
sys.exit(2)
rows = conn.execute(
"SELECT handle, kind, tier, claims_merged FROM contributors ORDER BY claims_merged DESC"
).fetchall()
if args.show:
target = args.show.strip().lower().lstrip("@")
for r in rows:
if r["handle"].lower().lstrip("@") == target:
category, pkind = classify(r["handle"])
events_count = conn.execute(
"SELECT COUNT(*) FROM contribution_events WHERE handle = ?",
(r["handle"].lower().lstrip("@"),),
).fetchone()[0]
print(f"handle: {r['handle']}")
print(f"current_kind: {r['kind']}")
print(f"current_tier: {r['tier']}")
print(f"claims_merged: {r['claims_merged']}")
print(f"events: {events_count}")
print(f"→ category: {category}")
if pkind:
print(f"→ publisher: kind={pkind}")
return
print(f"No match for '{args.show}'")
return
# Classify all
buckets: dict[str, list[dict]] = {
"keep_agent": [],
"keep_person": [],
"publisher": [],
"garbage": [],
"review_needed": [],
}
for r in rows:
category, pkind = classify(r["handle"])
buckets[category].append({
"handle": r["handle"],
"kind_now": r["kind"],
"tier": r["tier"],
"claims": r["claims_merged"] or 0,
"publisher_kind": pkind,
})
print("=== Classification summary ===")
for cat, items in buckets.items():
print(f" {cat:18s} {len(items):5d}")
print("\n=== Sample of each category ===")
for cat, items in buckets.items():
print(f"\n--- {cat} (showing up to 10) ---")
for item in items[:10]:
tag = f"{item['publisher_kind']}" if item["publisher_kind"] else ""
print(f" {item['handle']:50s} claims={item['claims']:5d}{tag}")
print("\n=== Full review_needed list ===")
for item in buckets["review_needed"]:
print(f" {item['handle']:50s} claims={item['claims']:5d}")
# Diagnostic: orphan alias count for handles we're about to delete.
# Contributor_aliases has no FK (SQLite FKs require PRAGMA to enforce anyway),
# so aliases pointing to deleted canonical handles become orphans. Surface
# the count so the --delete-events decision is informed.
doomed = [item["handle"].lower().lstrip("@") for item in buckets["garbage"] + buckets["publisher"]]
if doomed:
placeholders = ",".join("?" * len(doomed))
orphan_count = conn.execute(
f"SELECT COUNT(*) FROM contributor_aliases WHERE canonical IN ({placeholders})",
doomed,
).fetchone()[0]
print(f"\n=== Alias orphan check ===")
print(f" contributor_aliases rows pointing to deletable canonicals: {orphan_count}")
if orphan_count:
print(f" (cleanup requires --delete-events; without it, aliases stay as orphans)")
if not args.apply:
print("\n(dry-run — no writes. Re-run with --apply to execute.)")
return
# ── Apply changes ──
print("\n=== Applying changes ===")
if buckets["review_needed"]:
print(f"ABORT: {len(buckets['review_needed'])} rows need human review. Fix classifier before --apply.")
sys.exit(3)
inserted_publishers = 0
reclassified_agents = 0
deleted_garbage = 0
deleted_publisher_rows = 0
deleted_events = 0
deleted_aliases = 0
# Single transaction — if any step errors, roll back. This prevents the failure
# mode where a publisher insert fails silently and we still delete the contributor
# row, losing data.
try:
conn.execute("BEGIN")
# 1. Insert publishers. Track which ones succeeded so step 4 only deletes those.
# Counter uses cur.rowcount so replay runs (where publishers already exist)
# report accurate inserted=0 instead of falsely claiming the full set.
# moved_to_publisher is unconditional — the contributors row still needs to
# be deleted even when the publishers row was added in a prior run.
moved_to_publisher = set()
for item in buckets["publisher"]:
name = item["handle"].strip().lower().lstrip("@")
cur = conn.execute(
"INSERT OR IGNORE INTO publishers (name, kind) VALUES (?, ?)",
(name, item["publisher_kind"]),
)
if cur.rowcount > 0:
inserted_publishers += 1
moved_to_publisher.add(item["handle"])
# 2. Ensure Pentagon agents have kind='agent' (idempotent after v25 patch)
for item in buckets["keep_agent"]:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(item["handle"].lower().lstrip("@"),),
)
reclassified_agents += 1
# 3. Delete garbage handles from contributors (and their events + aliases)
for item in buckets["garbage"]:
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_garbage += cur.rowcount
# 4. Delete publisher rows from contributors — ONLY for those successfully
# inserted into publishers above. Guards against partial failure.
# Aliases pointing to publisher-classified handles get cleaned under the
# same --delete-events gate: publishers live in their own table now, any
# leftover aliases in contributor_aliases are orphans.
for item in buckets["publisher"]:
if item["handle"] not in moved_to_publisher:
continue
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_publisher_rows += cur.rowcount
# 5. Audit log entry for the destructive operation (Ganymede Q5).
conn.execute(
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
(
"schema_v26",
"classify_contributors",
json.dumps({
"publishers_inserted": inserted_publishers,
"agents_updated": reclassified_agents,
"garbage_deleted": deleted_garbage,
"publisher_rows_deleted": deleted_publisher_rows,
"events_deleted": deleted_events,
"aliases_deleted": deleted_aliases,
"delete_events_flag": bool(args.delete_events),
}),
),
)
conn.commit()
except Exception as e:
conn.rollback()
print(f"ERROR: Transaction failed, rolled back. {e}", file=sys.stderr)
sys.exit(4)
print(f" publishers inserted: {inserted_publishers}")
print(f" agents kind='agent' ensured: {reclassified_agents}")
print(f" garbage rows deleted: {deleted_garbage}")
print(f" publisher rows removed from contributors: {deleted_publisher_rows}")
if args.delete_events:
print(f" contribution_events deleted: {deleted_events}")
print(f" contributor_aliases deleted: {deleted_aliases}")
else:
print(f" (events + aliases kept — re-run with --delete-events to clean them)")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Reset m3taversal.sourcer_count from inflated legacy value to file-truth count.
Background: pre-Phase-A extract.py had a `submitted_by` fallback that credited
m3taversal as sourcer for every Telegram-ingested source, accumulating to 1011
sourcer_count in the contributors table. The actual file-truth count (sourcer
frontmatter equal to "m3taversal" in claim files) is 21. The 990-row delta is
infrastructure attribution that doesn't reflect content authorship.
The Phase A event-sourced ledger (contribution_events) computed the correct
389.55 CI from author events; /api/leaderboard reads from there directly.
But the legacy /api/contributors endpoint reads contributors.claims_merged
which carries the inflated 1011. Until that endpoint is deprecated, the
divergence shows two different numbers depending on which surface the UI
queries.
This script applies the surgical UPDATE that was run on VPS on 2026-04-27
during the leaderboard cutover. Committed as a script per Ganymede review:
"DB mutations go through reviewable code paths matters more than the
convenience of one-shot SQL. The artifact explains what was done and why."
Idempotent safe to re-run. If sourcer_count is already 21, no change.
Usage:
python3 scripts/reset-m3taversal-sourcer.py --dry-run
python3 scripts/reset-m3taversal-sourcer.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
TARGET_HANDLE = "m3taversal"
TRUTH_SOURCER_COUNT = 21
TRUTH_CLAIMS_MERGED = 21
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT handle, sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
if not row:
print(f" No contributors row for {TARGET_HANDLE} — nothing to reset.")
return
print(
f" Current: {row['handle']} sourcer_count={row['sourcer_count']} "
f"claims_merged={row['claims_merged']}"
)
print(f" Target: sourcer_count={TRUTH_SOURCER_COUNT} claims_merged={TRUTH_CLAIMS_MERGED}")
if (row["sourcer_count"] == TRUTH_SOURCER_COUNT
and row["claims_merged"] == TRUTH_CLAIMS_MERGED):
print(" Already at target values — no-op.")
return
if args.dry_run:
print(" (dry-run) UPDATE would be applied. Re-run without --dry-run.")
return
conn.execute(
"""UPDATE contributors SET
sourcer_count = ?,
claims_merged = ?,
updated_at = datetime('now')
WHERE handle = ?""",
(TRUTH_SOURCER_COUNT, TRUTH_CLAIMS_MERGED, TARGET_HANDLE),
)
conn.execute(
"""INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)""",
(
"manual",
"m3taversal_sourcer_reset",
(
'{"reason":"Pre-Phase-A submitted_by fallback inflated to 1011; '
'file-truth is 21","sourcer_count_before":1011,'
'"sourcer_count_after":21,"claims_merged_after":21}'
),
),
)
conn.commit()
after = conn.execute(
"SELECT sourcer_count, claims_merged FROM contributors WHERE handle = ?",
(TARGET_HANDLE,),
).fetchone()
print(
f" Applied. Now: sourcer_count={after['sourcer_count']} "
f"claims_merged={after['claims_merged']}"
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,152 @@
"""Tests for diagnostics/activity_endpoint.py classify_pr_operation.
Covers the Leo gotcha extract/* branches with commit_type=enrich or
challenge classify by commit_type, not branch prefix. Same class of bug
as the contributor-role wiring fix.
"""
import sys
from pathlib import Path
import pytest
# diagnostics/ isn't on sys.path by default; add it for these tests.
_DIAG = Path(__file__).resolve().parents[1] / "diagnostics"
if str(_DIAG) not in sys.path:
sys.path.insert(0, str(_DIAG))
# aiohttp is imported at module load time; skip cleanly if not installed.
pytest.importorskip("aiohttp")
from activity_endpoint import classify_pr_operation # noqa: E402
# ─── Merged PRs: commit_type wins over branch prefix ───────────────────────
def test_extract_branch_legacy_knowledge_classifies_new():
assert classify_pr_operation("merged", "knowledge", "extract/foo", None) == "new"
def test_extract_branch_with_enrich_commit_type_classifies_enrich():
"""Leo gotcha: extract/* + commit_type=enrich → enrich, not new."""
assert classify_pr_operation("merged", "enrich", "extract/foo", None) == "enrich"
def test_extract_branch_with_challenge_commit_type_classifies_challenge():
"""Leo gotcha: extract/* + commit_type=challenge → challenge, not new."""
assert classify_pr_operation("merged", "challenge", "extract/foo", None) == "challenge"
def test_challenged_by_in_description_classifies_challenge():
assert (
classify_pr_operation(
"merged", "knowledge", "extract/foo", "evidence for challenged_by claim"
)
== "challenge"
)
# ─── Branch prefix fallback (when commit_type is generic) ──────────────────
def test_reweave_branch_classifies_enrich():
assert classify_pr_operation("merged", "knowledge", "reweave/batch-1", None) == "enrich"
def test_challenge_branch_classifies_challenge():
assert (
classify_pr_operation("merged", "knowledge", "challenge/nuclear-moloch", None)
== "challenge"
)
# ─── Maintenance commit_types → infra ──────────────────────────────────────
def test_fix_commit_type_classifies_infra():
assert classify_pr_operation("merged", "fix", "fix/deploy-bug", None) == "infra"
def test_pipeline_commit_type_classifies_infra():
assert (
classify_pr_operation("merged", "pipeline", "epimetheus/migration-v14", None)
== "infra"
)
# ─── Knowledge-producing commit_types → new ────────────────────────────────
def test_research_commit_type_classifies_new():
assert (
classify_pr_operation("merged", "research", "theseus/cornelius-batch-2", None)
== "new"
)
def test_entity_commit_type_classifies_new():
assert classify_pr_operation("merged", "entity", "leo/entities-update", None) == "new"
# ─── Non-merged statuses route through NON_MERGED_STATUS_TO_OPERATION ──────
def test_open_pr_classifies_extract():
assert classify_pr_operation("open", None, "extract/foo", None) == "extract"
def test_approved_pr_classifies_new():
assert classify_pr_operation("approved", None, "extract/foo", None) == "new"
def test_closed_pr_classifies_infra():
assert classify_pr_operation("closed", None, "extract/foo", None) == "infra"
def test_conflict_pr_classifies_challenge():
assert classify_pr_operation("conflict", None, "extract/foo", None) == "challenge"
def test_validating_pr_classifies_extract():
assert classify_pr_operation("validating", None, "extract/foo", None) == "extract"
def test_reviewing_pr_classifies_extract():
assert classify_pr_operation("reviewing", None, "extract/foo", None) == "extract"
def test_merging_pr_classifies_new():
assert classify_pr_operation("merging", None, "extract/foo", None) == "new"
def test_zombie_pr_classifies_infra():
assert classify_pr_operation("zombie", None, "extract/foo", None) == "infra"
# ─── Priority order: reweave commit_type vs reweave/ branch ─────────────────
# Reweave commit_type is in _MAINTENANCE_COMMIT_TYPES (→ infra), but
# branch.startswith('reweave/') is checked first (→ enrich). The bifurcation
# is real spec behavior — nightly reweave PRs must classify as enrich, not
# infra. Locking this in prevents a silent flip on future priority refactors.
def test_reweave_commit_type_with_reweave_branch_classifies_enrich():
"""Branch prefix wins over maintenance — reweave PRs are enrich, not infra."""
assert classify_pr_operation("merged", "reweave", "reweave/batch-1", None) == "enrich"
def test_reweave_commit_type_without_reweave_branch_classifies_infra():
"""Without reweave/ prefix, reweave commit_type falls to maintenance → infra."""
assert classify_pr_operation("merged", "reweave", "epimetheus/foo", None) == "infra"
# ─── Defensive cases — null/empty inputs shouldn't crash ───────────────────
def test_null_commit_type_and_branch_classifies_new():
assert classify_pr_operation("merged", None, None, None) == "new"
def test_unknown_status_falls_back_to_infra():
assert classify_pr_operation("nonsense", None, None, None) == "infra"

View file

@ -34,13 +34,34 @@ class TestParseAttribution:
assert result["extractor"][0]["handle"] == "rio" assert result["extractor"][0]["handle"] == "rio"
assert result["sourcer"][0]["handle"] == "theiaresearch" assert result["sourcer"][0]["handle"] == "theiaresearch"
def test_legacy_source_fallback(self): def test_legacy_source_fallback_removed(self):
"""Legacy `source` heuristic removed (Ganymede review, Apr 24).
It fabricated handles from descriptive strings (garbage like
'sec-interpretive-release-s7-2026-09-(march-17'). Claims without
explicit attribution now return empty better to surface as data
hygiene than invent contributors.
"""
fm = { fm = {
"type": "claim", "type": "claim",
"source": "@pineanalytics, Q4 2025 report", "source": "@pineanalytics, Q4 2025 report",
} }
result = parse_attribution(fm) result = parse_attribution(fm)
assert result["sourcer"][0]["handle"] == "pineanalytics" assert all(len(v) == 0 for v in result.values())
def test_bad_handles_filtered(self):
"""Handles with spaces, parens, or garbage chars are dropped."""
fm = {
"sourcer": "governance---meritocratic-voting-+-futarchy",
}
result = parse_attribution(fm)
assert len(result["sourcer"]) == 0
def test_valid_handle_with_hyphen_passes(self):
"""Legitimate handles like 'cameron-s1' survive the filter."""
fm = {"sourcer": "cameron-s1"}
result = parse_attribution(fm)
assert result["sourcer"][0]["handle"] == "cameron-s1"
def test_empty_attribution(self): def test_empty_attribution(self):
fm = {"type": "claim"} fm = {"type": "claim"}

437
tests/test_leaderboard.py Normal file
View file

@ -0,0 +1,437 @@
"""Tests for /api/leaderboard endpoint (diagnostics/leaderboard_routes.py).
Locks behavior for the four slicings consumed by Argus + Oberon:
- window: all_time | Nd | Nh
- domain: per-domain filter
- kind: person | agent | org | all
- limit: pagination + has_more flag
Regression coverage includes the AND-prefix SQL bug (commit 42d35d4): _parse_window
returned clauses prefixed with 'AND ' which produced 'WHERE 1=1 AND AND ...' when
joined into the WHERE clause via " AND ".join(...).
"""
import asyncio
import json
import sqlite3
from pathlib import Path
import pytest
# Skip whole file if aiohttp isn't available (matches test_activity_classify.py pattern)
aiohttp = pytest.importorskip("aiohttp")
# Make diagnostics/ importable
import sys
DIAG_ROOT = Path(__file__).parent.parent / "diagnostics"
sys.path.insert(0, str(DIAG_ROOT))
from leaderboard_routes import ( # noqa: E402
_parse_window,
handle_leaderboard,
KIND_VALUES,
LEADERBOARD_PUBLIC_PATHS,
)
from aiohttp.test_utils import make_mocked_request # noqa: E402
# ─── Schema lifted from lib/db.py:138-209 (v25 minimum) ──────────────────────
SCHEMA = """
CREATE TABLE contributors (
handle TEXT PRIMARY KEY,
kind TEXT DEFAULT 'person',
tier TEXT DEFAULT 'new',
claims_merged INTEGER DEFAULT 0,
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
first_contribution TEXT,
last_contribution TEXT
);
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def db_path(tmp_path):
"""Seeded pipeline.db with deterministic events.
Cohort:
- alice (person): 3 author events, 1 originator (recent 3d, internet-finance)
- bob (person): 5 author events (older, 60d ago, ai-alignment)
- carol (person): 1 author + 1 evaluator (today, internet-finance)
- rio (agent): 4 author + 2 evaluator (mixed, internet-finance + grand-strategy)
- leo (agent): 8 evaluator events (today, mixed domains)
- cnbc (org): 2 originator events (legacy, before classifier moved orgs)
- newhandle (no contributors row): 1 author event tests LEFT JOIN COALESCE
"""
p = tmp_path / "pipeline.db"
conn = sqlite3.connect(str(p))
conn.executescript(SCHEMA)
contribs = [
("alice", "person"),
("bob", "person"),
("carol", "person"),
("rio", "agent"),
("leo", "agent"),
("cnbc", "org"),
# newhandle intentionally absent — tests LEFT JOIN
]
for handle, kind in contribs:
conn.execute(
"INSERT INTO contributors (handle, kind) VALUES (?, ?)",
(handle, kind),
)
# (handle, role, weight, pr_number, claim_path, domain, timestamp)
events = [
# alice — 3 author + 1 originator, recent (all >24h ago, all <7d)
# Most-recent event at -2 days (not -1 days) so 24h window exclusion is
# unambiguous and not subject to fixture-vs-query microsecond drift.
("alice", "author", 0.30, 100, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 101, None, "internet-finance", "now,-2 days"),
("alice", "author", 0.30, 102, None, "ai-alignment", "now,-3 days"),
("alice", "originator", 0.15, 103, "domains/internet-finance/x.md", "internet-finance", "now,-2 days"),
# bob — 5 author, all 60d ago (outside 30d, inside all_time)
("bob", "author", 0.30, 200, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 201, None, "ai-alignment", "now,-60 days"),
("bob", "author", 0.30, 202, None, "ai-alignment", "now,-61 days"),
("bob", "author", 0.30, 203, None, "ai-alignment", "now,-62 days"),
("bob", "author", 0.30, 204, None, "ai-alignment", "now,-63 days"),
# carol — 1 author + 1 evaluator, today
("carol", "author", 0.30, 300, None, "internet-finance", "now"),
("carol", "evaluator", 0.05, 301, None, "internet-finance", "now"),
# rio agent — 4 author + 2 evaluator
("rio", "author", 0.30, 400, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 401, None, "grand-strategy", "now,-2 days"),
("rio", "author", 0.30, 402, None, "internet-finance", "now,-2 days"),
("rio", "author", 0.30, 403, None, "internet-finance", "now,-2 days"),
("rio", "evaluator", 0.05, 404, None, "ai-alignment", "now,-2 days"),
("rio", "evaluator", 0.05, 405, None, "ai-alignment", "now,-2 days"),
# leo agent — 8 evaluator
*[
("leo", "evaluator", 0.05, 500 + i, None, "internet-finance" if i % 2 == 0 else "ai-alignment", "now")
for i in range(8)
],
# cnbc org — 2 originator (legacy data, kept by classifier+gate split)
("cnbc", "originator", 0.15, 600, "domains/internet-finance/y.md", "internet-finance", "now,-5 days"),
("cnbc", "originator", 0.15, 601, "domains/internet-finance/z.md", "internet-finance", "now,-5 days"),
# newhandle — handle in events but no contributors row (LEFT JOIN COALESCE → person)
# -2 days so 24h-window test exclusion is unambiguous (matches alice).
("newhandle", "author", 0.30, 700, None, "ai-alignment", "now,-2 days"),
]
for handle, role, weight, pr_num, claim_path, domain, ts_modifier in events:
# Use SQLite datetime() to compute timestamps relative to "now" so tests
# are deterministic across days. Multi-arg form: datetime('now', '-1 days').
ts_args = ts_modifier.split(",")
if len(ts_args) == 1:
ts_sql = f"datetime('{ts_args[0]}')"
else:
ts_sql = f"datetime('{ts_args[0]}', '{ts_args[1].strip()}')"
conn.execute(
f"""INSERT INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, {ts_sql})""",
(handle, "agent" if handle in {"rio", "leo"} else "person",
role, weight, pr_num, claim_path, domain),
)
conn.commit()
conn.close()
return str(p)
def _call(db_path, **query):
"""Build a mocked request, call handle_leaderboard, return parsed JSON."""
qs = "&".join(f"{k}={v}" for k, v in query.items())
req = make_mocked_request("GET", f"/api/leaderboard?{qs}")
# make_mocked_request gives us req.app — write db_path into it.
req.app["db_path"] = db_path
response = asyncio.run(handle_leaderboard(req))
return json.loads(response.body.decode())
# ─── _parse_window unit tests ────────────────────────────────────────────────
class TestParseWindow:
def test_default_is_all_time(self):
clause, params, label = _parse_window(None)
assert clause == ""
assert params == ()
assert label == "all_time"
def test_explicit_all_time(self):
clause, params, label = _parse_window("all_time")
assert clause == ""
assert label == "all_time"
def test_seven_days(self):
clause, params, label = _parse_window("7d")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-7 days",)
assert label == "7d"
# Regression: must NOT begin with "AND " (handle_leaderboard composes via " AND ".join)
assert not clause.startswith("AND")
def test_thirty_days(self):
clause, params, label = _parse_window("30d")
assert params == ("-30 days",)
assert label == "30d"
def test_hours(self):
clause, params, label = _parse_window("24h")
assert clause == "ce.timestamp >= datetime('now', ?)"
assert params == ("-24 hours",)
assert label == "24h"
def test_caps_days_at_365(self):
clause, params, label = _parse_window("9999d")
assert params == ("-365 days",)
def test_caps_hours_at_8760(self):
clause, params, label = _parse_window("99999h")
assert params == ("-8760 hours",)
def test_garbage_falls_to_all_time(self):
clause, params, label = _parse_window("foobar")
assert clause == ""
assert label == "all_time"
def test_uppercase_normalized(self):
clause, params, label = _parse_window("7D")
assert label == "7d"
def test_zero_days_still_emits_clause(self):
# 0d means "now or later" — empty result, but parse should succeed
clause, params, label = _parse_window("0d")
assert "datetime" in clause
assert label == "0d"
# ─── handle_leaderboard integration tests ────────────────────────────────────
class TestLeaderboardEndpoint:
def test_all_time_default_kind_person(self, db_path):
"""Default kind is 'person'. Returns all persons, sorted by CI desc."""
body = _call(db_path)
assert body["window"] == "all_time"
assert body["kind_filter"] == "person"
assert body["domain"] is None
assert body["source"] == "contribution_events"
# alice 3*0.30 + 0.15 = 1.05
# bob 5*0.30 = 1.50
# carol 0.30 + 0.05 = 0.35
# newhandle 0.30 (LEFT JOIN COALESCE → 'person')
# cnbc excluded (kind='org')
# rio/leo excluded (kind='agent')
handles = [r["handle"] for r in body["leaderboard"]]
assert "bob" in handles
assert "alice" in handles
assert "newhandle" in handles, "LEFT JOIN COALESCE should default missing contributors to 'person'"
assert "cnbc" not in handles, "kind=person should exclude orgs"
assert "rio" not in handles, "kind=person should exclude agents"
# Descending by CI
cis = [r["ci"] for r in body["leaderboard"]]
assert cis == sorted(cis, reverse=True)
def test_window_7d_excludes_old_events(self, db_path):
"""REGRESSION: 7d window must execute (no AND-prefix SQL error).
Bob has all events 60d ago must not appear in 7d window.
Alice has events 1-3d ago must appear.
"""
body = _call(db_path, window="7d")
assert body["window"] == "7d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "bob" not in handles, "60d-old events must be excluded from 7d window"
assert "carol" in handles # today
def test_window_30d_excludes_60d_events(self, db_path):
"""REGRESSION: 30d window must execute. Bob (60d) excluded; alice/carol included."""
body = _call(db_path, window="30d")
assert body["window"] == "30d"
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
def test_window_24h_only_today(self, db_path):
"""24h window picks up today's events only.
Default kind=person. Within 24h: only carol (events at 'now').
Excluded: alice/newhandle (events at -2 days), bob (-60d), rio/leo (kind),
cnbc (-5d AND kind=org).
"""
body = _call(db_path, window="24h")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["carol"], (
"24h + kind=person should return only carol; got %r" % handles
)
def test_kind_agent(self, db_path):
"""kind=agent returns only agents."""
body = _call(db_path, kind="agent")
handles = [r["handle"] for r in body["leaderboard"]]
assert "rio" in handles
assert "leo" in handles
assert "alice" not in handles
assert "bob" not in handles
def test_kind_org(self, db_path):
"""kind=org returns only orgs (legacy events still queryable)."""
body = _call(db_path, kind="org")
handles = [r["handle"] for r in body["leaderboard"]]
assert handles == ["cnbc"]
assert body["leaderboard"][0]["ci"] == 0.30 # 2 * 0.15
def test_kind_all_returns_everyone(self, db_path):
"""kind=all returns all kinds — persons + agents + orgs."""
body = _call(db_path, kind="all")
handles = {r["handle"] for r in body["leaderboard"]}
assert handles == {"alice", "bob", "carol", "rio", "leo", "cnbc", "newhandle"}
def test_invalid_kind_falls_to_person(self, db_path):
"""Defensive: unknown kind value silently falls back to 'person'."""
body = _call(db_path, kind="bogus")
assert body["kind_filter"] == "person"
def test_domain_filter(self, db_path):
"""domain=internet-finance scopes events; kind filter still applies."""
body = _call(db_path, domain="internet-finance")
assert body["domain"] == "internet-finance"
handles = {r["handle"] for r in body["leaderboard"]}
# alice has 2 internet-finance authors + 1 originator
# carol has 1 internet-finance author + 1 evaluator
# bob has 0 (all ai-alignment)
# newhandle has 0 (ai-alignment only)
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles
assert "newhandle" not in handles
def test_composed_window_kind_domain(self, db_path):
"""REGRESSION: composed filters must build SQL correctly.
7d + person + internet-finance alice only.
"""
body = _call(db_path, window="7d", kind="person", domain="internet-finance")
handles = [r["handle"] for r in body["leaderboard"]]
assert "alice" in handles
assert "carol" in handles
assert "bob" not in handles # excluded by 7d
assert "rio" not in handles # excluded by kind=person
def test_limit_caps_results(self, db_path):
"""limit caps the leaderboard slice; total reflects unfiltered count."""
body = _call(db_path, kind="all", limit=3)
assert body["shown"] == 3
assert body["has_more"] is True
assert body["total"] == 7
def test_no_has_more_when_under_limit(self, db_path):
body = _call(db_path, kind="org")
assert body["shown"] == 1
assert body["has_more"] is False
assert body["total"] == 1
def test_invalid_limit_falls_to_default(self, db_path):
"""Defensive: garbage limit param falls to default 100. 7 entries < 100."""
body = _call(db_path, kind="all", limit="not-a-number")
assert body["shown"] == 7
assert body["has_more"] is False
def test_limit_capped_at_500(self, db_path):
"""Defensive: limit > 500 silently caps at 500."""
body = _call(db_path, limit=99999, kind="all")
# No assertion on the value of the cap from the response — just that
# it doesn't error and shown <= 500.
assert body["shown"] <= 500
def test_role_breakdown_present(self, db_path):
"""Each row includes ci_breakdown with all 5 roles."""
body = _call(db_path)
for entry in body["leaderboard"]:
assert set(entry["ci_breakdown"].keys()) == {
"author", "challenger", "synthesizer", "originator", "evaluator",
}
def test_alice_role_breakdown_correct(self, db_path):
"""Alice has 3 author (0.90) + 1 originator (0.15) = 1.05 total."""
body = _call(db_path)
alice = next(r for r in body["leaderboard"] if r["handle"] == "alice")
assert alice["ci"] == 1.05
assert alice["ci_breakdown"]["author"] == 0.90
assert alice["ci_breakdown"]["originator"] == 0.15
assert alice["ci_breakdown"]["challenger"] == 0
assert alice["ci_breakdown"]["synthesizer"] == 0
assert alice["ci_breakdown"]["evaluator"] == 0
assert alice["events_count"] == 4
assert alice["pr_count"] == 4
assert alice["domain_count"] == 2 # internet-finance + ai-alignment
def test_empty_window_returns_clean_response(self, db_path):
"""Window with no matching events returns shape-correct empty response."""
# 24h window + kind=org → cnbc is 5d ago, so empty
body = _call(db_path, window="24h", kind="org")
assert body["leaderboard"] == []
assert body["total"] == 0
assert body["shown"] == 0
assert body["has_more"] is False
assert body["source"] == "contribution_events"
def test_left_join_handles_missing_contributors_row(self, db_path):
"""REGRESSION: handle in events but missing from contributors must default to kind='person'.
Catches the failure mode where a handle classified as cited (auto-create
deferred to Branch 3) accumulates events but has no contributors row yet.
"""
body = _call(db_path)
newhandle_row = next(
(r for r in body["leaderboard"] if r["handle"] == "newhandle"), None
)
assert newhandle_row is not None
assert newhandle_row["kind"] == "person"
assert newhandle_row["ci"] == 0.30
# ─── Public path constant (auth middleware bypass) ───────────────────────────
def test_public_paths_includes_leaderboard():
"""Auth middleware needs LEADERBOARD_PUBLIC_PATHS to skip API key for /api/leaderboard."""
assert "/api/leaderboard" in LEADERBOARD_PUBLIC_PATHS
def test_kind_values_matches_contract():
"""API contract: only these 4 kind values are accepted."""
assert set(KIND_VALUES) == {"person", "agent", "org", "all"}

View file

@ -0,0 +1,167 @@
"""Verify research-attribution backfill is replay-safe against real schema.
Three things to prove:
1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly
(idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL).
2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE
is a true no-op does not create a phantom duplicate.
3. The backfill script's specific operation (DELETE then INSERT for same key)
nets zero rows when run twice in sequence.
"""
import sqlite3
import sys
# Schema lifted verbatim from lib/db.py:181-209
SCHEMA = """
CREATE TABLE contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
"""
def setup() -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
return conn
def insert_event(conn, handle, role, pr_number, claim_path=None):
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', ?, 0.30, ?, ?)""",
(handle, role, pr_number, claim_path),
)
return cur.rowcount
def count(conn) -> int:
return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
def test_pr_level_dedup_with_null_claim_path():
"""Two inserts of same (handle, role, pr_number, NULL) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061)
r2 = insert_event(conn, "rio", "author", 4061)
n = count(conn)
assert r1 == 1, f"first insert should write, got rowcount={r1}"
assert r2 == 0, f"second insert should be ignored, got rowcount={r2}"
assert n == 1, f"expected 1 row, got {n}"
print("PASS: pr-level dedup with NULL claim_path")
def test_per_claim_dedup_with_path():
"""Two inserts of same (handle, role, pr_number, path) → 1 row."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 0 and n == 1
print("PASS: per-claim dedup with claim_path")
def test_pr_level_and_per_claim_coexist():
"""A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist
because the partial indexes target different rows."""
conn = setup()
r1 = insert_event(conn, "rio", "author", 4061, claim_path=None)
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
n = count(conn)
assert r1 == 1 and r2 == 1 and n == 2
print("PASS: pr-level and per-claim events coexist on same pr_number")
def test_backfill_replay_is_noop():
"""Simulate the exact backfill operation: INSERT correct event, DELETE wrong event.
Run twice. Expect identical state no phantom rows, no double-deletions."""
conn = setup()
# Initial state: m3taversal has the wrong author event for pr=4061
insert_event(conn, "m3taversal", "author", 4061)
assert count(conn) == 1
def backfill_pr_4061():
# Insert the correct event (rio is the real author)
conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""",
("rio (self-directed)",),
)
# Delete the wrong event
conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
conn.commit()
backfill_pr_4061()
state_after_first = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first
# Replay
backfill_pr_4061()
state_after_second = sorted(
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
for r in conn.execute("SELECT * FROM contribution_events")
)
assert state_after_first == state_after_second, "replay should be idempotent"
assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}"
print("PASS: backfill replay is a true no-op")
def test_replay_against_already_backfilled_pr_does_not_double_delete():
"""If m3taversal event was already deleted, running backfill again must not error
or affect anything else."""
conn = setup()
# Already-correct state: rio has the author event, m3taversal does not
insert_event(conn, "rio (self-directed)", "author", 4061)
insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched
# Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op
# Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected
cur1 = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path)
VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""",
)
cur2 = conn.execute(
"""DELETE FROM contribution_events
WHERE handle='m3taversal' AND role='author'
AND pr_number=4061 AND claim_path IS NULL""",
)
assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}"
assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}"
assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}"
print("PASS: replay against already-backfilled state preserves unrelated events")
if __name__ == "__main__":
test_pr_level_dedup_with_null_claim_path()
test_per_claim_dedup_with_path()
test_pr_level_and_per_claim_coexist()
test_backfill_replay_is_noop()
test_replay_against_already_backfilled_pr_does_not_double_delete()
print("\nAll 5 tests passed against real schema.")