Compare commits

...

9 commits

Author SHA1 Message Date
3fe524dd14 fix(classify): Ganymede review fixes — alias cleanup + counter accuracy + handle alignment
Some checks failed
CI / lint-and-test (push) Has been cancelled
1. WARNING — orphan contributor_aliases after publisher/garbage delete:
   Added alias cleanup to the transaction (gated on --delete-events, same
   audit rationale as events). Both garbage and publisher deletion loops
   now DELETE matching contributor_aliases rows. Dry-run adds an orphan
   count diagnostic so the --delete-events decision is informed.

2. NIT — inserted_publishers counter over-reports on replay:
   INSERT OR IGNORE silently skips name collisions, but the counter
   incremented unconditionally. Now uses cur.rowcount so a second apply
   reports 0 inserts instead of falsely claiming 100. moved_to_publisher
   set remains unconditional — publisher rows already present still need
   the matching contributors row deleted.

3. NIT — handle-length gate diverged from writer path:
   Widened from {0,19} (20 chars) to {0,38} (39 chars) to match GitHub's
   handle limit and contributor.py::_HANDLE_RE. Prevents future long-handle
   real contributors from falling through to review_needed and blocking
   --apply. Current data has 0 review_needed either way.

Bonus (Q5): Added audit_log entry inside the transaction. One row in
audit_log.stage='schema_v26', event='classify_contributors' with counter
detail JSON on every --apply run. Cheap audit trail for the destructive op.

Verified end-to-end on VPS DB snapshot:
- First apply: 100/9/9/100/0 (matches pre-fix)
- Second apply: 0/9/0/0/0 (counter fix working)
- With injected aliases + --delete-events: 2 aliases deleted, 1 pre-existing
  orphan correctly left alone (outside script scope), audit_log entry
  written with accurate counters.

Ganymede msg-3. Protocol closed.
2026-04-24 20:47:21 +01:00
45b2f6de20 feat(schema): v26 — publishers + contributor_identities + sources provenance
Separates three concerns currently conflated in contributors table:
  contributors — people + agents we credit (kind in 'person','agent')
  publishers   — news orgs / academic venues / platforms (not credited)
  sources      — gains publisher_id + content_type + original_author columns

Rationale (Cory directive Apr 24): livingip.xyz leaderboard was showing CNBC,
SpaceNews, TechCrunch etc. at the top because the attribution pipeline credited
news org names as if they were contributors. The mechanism-level fix is a
schema split — orgs live in publishers, individuals in contributors, each
table has one semantics.

Migration v26:
  - CREATE TABLE publishers (id PK, name UNIQUE, kind CHECK IN
    news|academic|social_platform|podcast|self|internal|legal|government|
    research_org|commercial|other, url_pattern, created_at)
  - CREATE TABLE contributor_identities (contributor_handle, platform CHECK IN
    x|telegram|github|email|web|internal, platform_handle, verified, created_at)
    Composite PK on (platform, platform_handle) + index on contributor_handle.
    Enables one contributor to unify X + TG + GitHub handles.
  - ALTER TABLE sources ADD COLUMN publisher_id REFERENCES publishers(id)
  - ALTER TABLE sources ADD COLUMN content_type
    (article|paper|tweet|conversation|self_authored|webpage|podcast)
  - ALTER TABLE sources ADD COLUMN original_author TEXT
    (free-text fallback, e.g., "Kim et al." — not credit-bearing)
  - ALTER TABLE sources ADD COLUMN original_author_handle REFERENCES contributors(handle)
    (set only when the author is in our contributor network)
  - ALTER wrapped in try/except on "duplicate column" for replay safety
  - Both SCHEMA_SQL (fresh installs) + migration block (upgrades) updated
  - SCHEMA_VERSION bumped 25 -> 26

Migration is non-breaking. No data moves yet. Existing publishers-polluting-
contributors row state is preserved until the classifier runs. Writer routing
to these tables lands in a separate branch (Phase B writer changes).

Classifier (scripts/classify-contributors.py):
  Analyzes existing contributors rows, buckets into:
    keep_agent   — 9 Pentagon agents
    keep_person  — 21 real humans + reachable pseudonymous X/TG handles
    publisher    — 100 news orgs, academic venues, formal-citation names,
                   brand/platform names
    garbage      — 9 parse artifacts (containing /, parens, 3+ hyphens)
    review_needed — 0 (fully covered by current allowlists)

  Hand-curated allowlists for news/academic/social/internal publisher kinds.
  Garbage detection via regex on special chars and length > 50.
  Named pseudonyms without @ prefix (karpathy, simonw, swyx, metaproph3t,
  sjdedic, ceterispar1bus, etc.) classified as keep_person — they're real
  X/TG contributors missing an @ prefix because extraction frontmatter
  didn't normalize. Cory's auto-create rule catches these on first reference.

  Formal-citation names (Firstname-Lastname form — Clayton Christensen, Hayek,
  Ostrom, Friston, Bostrom, Bak, etc.) classified as academic publishers —
  these are cited, not reachable via @ handle. Get promoted to contributors
  if/when they sign up with an @ handle.

  Apply path is transactional (BEGIN / COMMIT / ROLLBACK on error). Publisher
  insert happens before contributor delete, and contributor delete is gated
  on successful insert so we never lose a row by moving it to a failed
  publisher insert.

  --apply path flags:
    --delete-events  : also DELETE contribution_events rows for moved handles
                       (default: keep events for audit trail)
  --show <handle>   : inspect a single row's classification

Smoke-tested end-to-end via local copy of VPS DB:
  Before: 139 contributors total (polluted with orgs)
  After:  30 contributors (9 agent + 21 person), 100 publishers, 9 deleted
  contribution_events: 3,705 preserved
  contributors <-> publishers overlap: 0

Named contributors verified present after --apply:
  alexastrum (claims=6)  thesensatore (5)  cameron-s1 (1)  m3taversal (1011)

Pentagon agent 'pipeline' (claims_merged=771) intentionally retained — it's
the process name from old extract.py fallback path, not a real contributor.
Classified as agent (kind='agent') so doesn't appear in person leaderboard.

Deploy sequence after Ganymede review:
  1. Branch ff-merge to main
  2. scp lib/db.py + scripts/classify-contributors.py to VPS
  3. Pipeline already at v26 (migration ran during earlier v26 restart)
  4. Run dry-run: python3 ops/classify-contributors.py
  5. Apply: python3 ops/classify-contributors.py --apply
  6. Verify: livingip.xyz leaderboard stops showing CNBC/SpaceNews
  7. Argus /api/contributors unaffected (reads contributors directly, now clean)

Follow-up branch (not in this commit):
  - Writer routing in lib/contributor.py + extract.py:
    org handles -> publishers table + sources.publisher_id
    person handles with @ prefix -> auto-create contributor, tier='cited'
    formal-citation names -> sources.original_author (free text)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 20:47:21 +01:00
f0f9388c1f feat(diagnostics): add POST /api/search for chat API contract
Some checks are pending
CI / lint-and-test (push) Waiting to run
Wire the search endpoint to accept POST bodies matching the embedded
chat contract (query/limit/min_score/domain/confidence/exclude →
slug/path/title/domain/confidence/score/body_excerpt). GET path retained
for legacy callers and adds a min_score override for hackathon debug.

- _qdrant_hits_to_results() shapes raw hits into chat response format
- handle_api_search() dispatches POST vs GET
- /api/search added to _PUBLIC_PATHS (chat is unauthenticated)
- POST route registered alongside existing GET

Resolves VPS↔repo drift flagged by Argus before next deploy.sh run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 17:58:30 +01:00
0f2b153c92 fix(backfill): Ganymede review — fix tautological guard + origin='human'
Some checks are pending
CI / lint-and-test (push) Waiting to run
Addresses two findings in commit 762fd42 review:

1. BUG: guard query was tautological. `SELECT MAX(number) FROM prs WHERE
   number < 900000` filters out exactly what the `>= 900000` check tests.
   Replaced with a direct check for unexpected rows in the synthetic range
   (excluding our known 900068/900088).

2. WARNING: origin defaults to 'pipeline' via schema default. lib/merge.py
   convention is origin='human' for external contributors. Synthetic rows
   now set origin='human', priority='high' — matches discover_external_prs
   for real GitHub PRs. Prevents Phase B origin-based filtering from
   misclassifying Alex/Cameron as machine-authored.

Also flagged in review: credit projection was optimistic. Author events are
PR-level (not per-claim), so Alex gets 1×0.30 author credit, not 6. Same
for Cameron. Per-claim originator credit goes to the 7 frontmatter sourcers
where applicable. Not a code change — expectation reset for Cory.
2026-04-24 16:49:12 +01:00
762fd4233e feat(backfill): synthetic PR rows for pre-mirror GitHub PRs #68 (Alex) + #88 (Cameron)
Two historical GitHub PRs merged before our sync-mirror.sh tracked github_pr:
  - GitHub PR #68: alexastrum, 6 claims, merged Mar 9 2026 via squash merge
  - GitHub PR #88: Cameron-S1, 1 claim, merged early April

Their claim files were lost during a Forgejo→GitHub mirror overwrite and later
recovered via direct-to-main commits (dba00a79, da64f805). Because the
recovery commits bypassed the pipeline, our 'prs' table has no row to attach
originator events to — all 4 backfill-events.py strategies returned None,
leaving Alex + Cameron at 0 originator credits despite real historical work.

This reconstructs synthetic 'prs' rows so the existing github_pr strategy in
backfill-events.py attaches 7 originator events on re-run:
  - Numbers 900068 / 900088 live in a clearly-synthetic range that cannot
    collide with real Forgejo PRs (current max: 3941)
  - github_pr=68/88 wires up the existing lookup strategy
  - submitted_by=alexastrum / cameron-s1 establishes author attribution
  - merged_at from the recovery commit messages (not recovery-commit time)
  - last_error tags the rows as synthetic for future audits

Idempotent: INSERT OR IGNORE via check on number OR github_pr. Safe to replay.
Reversible: DELETE FROM prs WHERE number IN (900068, 900088).

After applying this script:
  python3 ops/backfill-events.py
will credit Alex with 6 author + 6 originator events (author=1.80, originator=0.90)
and Cameron with 1 author + 1 originator (0.30 + 0.15), all dated to the
historical merge dates — so 7d/30d leaderboard windows show them correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:33:37 +01:00
10d5c275da fix(backfill): normalize commit_date via datetime() in time-proximity query
Some checks are pending
CI / lint-and-test (push) Waiting to run
SQLite datetime comparison fails lexicographically across ISO-T and
space-separator formats: '2026-03-27 18:00:14' < '2026-03-27T17:43:04+00:00'
because space (0x20) < T (0x54). PRs merged same-day but earlier than the
commit hour were silently excluded from the time-proximity cascade.

Shaga's 3 stigmergic-coordination claims resolved to PR #2032 (later, wrong)
instead of #2025 (earlier, correct). Fixed by wrapping both sides in
datetime(), which normalizes to space-separator before comparison.

Verified: all 3 Shaga claims now resolve to #2025 via git_time_proximity.
No change to totals (126 originator events, 5 proximity hits) — the fix
corrects WHICH PR each proximity-matched claim resolves to, not whether.

Caught by Ganymede review of 1d6b515.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:16:03 +01:00
1d6b51527a feat(backfill): 4-strategy PR recovery for originator events
Rewrite claim-level pass in backfill-events.py to recover the Forgejo PR
that introduced each claim via a cascade of 4 strategies (reliability
order), replacing the single title→description match that missed PRs
with NULL description (Cameron #3377) and bare-subject extracts (Shaga's
Leo research PR).

## Strategies
  1. sourced_from frontmatter → prs.source_path stem match
  2. git log first-add commit → subject pattern → prs.branch
     - "<agent>: extract claims from <slug>"  → extract/<slug>
     - "<agent>: research session YYYY-MM-DD" → <agent>/research-<date>
     - "<agent>: (challenge|contrib|entity|synthesize)" → <agent>/*
     - "Recover X from GitHub PR #N"           → prs.github_pr=N
     - "Extract N claims from X" (no prefix)   → time-proximity on
       agent-owned branches within 24h
  3. Current title_desc fallback for anything the above miss

## Dry-run projection (1,662 merged PRs)

Before:
  Claims processed: 33
  Originator events: 6
  Breakdown: {no_pr_match: 1608, no_sourcer: 26, invalid_handle: 21, skip_self: 6}

After:
  Claims processed: 505 (+472)
  Originator events: 126 (+120)
  Strategy hits: git_subject=412, sourced_from=88, git_time_proximity=5
  Breakdown: {no_pr_match: 1095, no_sourcer: 67, invalid_handle: 359, skip_self: 20}

## Verified on real VPS data
- @thesensatore claims: 3/5 resolve via git_time_proximity to leo/ PRs
- Cameron-S1, alexastrum: remain None — their recovery commits
  (dba00a79, da64f805) bypassed the pipeline entirely, no Forgejo PR
  record exists. Requires synthetic prs rows — deferred to separate
  commit with its own Ganymede review (write operation, larger blast
  radius than this pure-read backfill change).

## Implementation
- New find_pr_for_claim(conn, repo, md) helper returns (pr_number, strategy)
- Claim-level pass uses it first, falls back to title_desc map
- Strategy counter surfaced in summary output for operator visibility

Idempotent — backfill re-runs skip duplicate events via the partial
UNIQUE index on contribution_events.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:06:52 +01:00
540ba97b9d fix(attribution): Phase A followup — bug #1 + 4 nits + refactor (Ganymede review)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Addresses Apr 24 review of 58fa8c52. All 6 findings landed.

Bug #1 — git log -1 returns latest commit, not first (semantic mismatch
with "original author" comment):
  Drop -1 flag, take last line of default-ordered log output (= oldest).
  Fixes mis-credit on multi-commit PRs where a reviewer rebased/force-pushed.

Nit #2 — forward writer didn't pass merged_at:
  Fetch merged_at in the prs SELECT, thread pr_merged_at through all 5
  insert_contribution_event call sites. Keeps forward-emitted and backfilled
  event timestamps on the same timeline after merge retries.

Nit #3 — legacy-counts fallback paths emit no events (parity gap):
  git-author and prs.agent fallback paths now emit challenger/synthesizer
  events via the TRAILER_EVENT_ROLE map when refined_type matches. Closes
  the gap where external-contributor challenge/enrich PRs would accumulate
  legacy counts but disappear from event-sourced leaderboards.

Nit #4 — migration v24 agent seed missing 'pipeline':
  Added "pipeline" to the seed list. Plus new migration v25 with idempotent
  corrective UPDATE so existing envs (where v24 already ran) pick up the
  fix on restart without requiring manual SQL. Verified on VPS state:
  pipeline row was kind='person', will flip to 'agent' on redeploy.

Nit #5 — backfill summary prints originator attempted=0 in wrong pass:
  Split the "=== Summary ===" header into "=== PR-level events ===" and
  "=== Claim-level originator pass ===" with originator counts in the
  right block. Operator-facing cosmetic.

Refactor #6 — AGENT_BRANCH_PREFIXES duplicated in 2 sites:
  Extracted to lib/attribution.py as single source of truth. contributor.py
  imports it. backfill-events.py keeps its local copy (runs standalone
  without pipeline package import) with a sync-reference comment.

No behavioral drift for the common case. Backfill re-runs cleanly against
existing forward-written events (UNIQUE-index idempotency).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 14:13:54 +01:00
58fa8c5276 feat(attribution): Phase A — event-sourced contribution ledger (schema v24)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Introduces contribution_events table + non-breaking double-write. Schema
lands today, forward traffic writes events alongside existing count upserts,
backfill script replays history. Phase B will add leaderboard API reading
from events; Phase C switches Argus dashboard over.

## Schema v24 (lib/db.py)

- contribution_events: one row per credit-earning event
  (id, handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
  Partial UNIQUE indexes handle SQLite's NULL != NULL semantics:
    idx_ce_unique_claim on (handle, role, pr_number, claim_path) WHERE claim_path NOT NULL
    idx_ce_unique_pr    on (handle, role, pr_number)             WHERE claim_path IS NULL
  PR-level events (evaluator, author, challenger, synthesizer) dedup on 3-tuple.
  Per-claim events (originator) dedup on 4-tuple. Idempotent on replay.
- contributor_aliases: canonical handle mapping
  Seeded: @thesensatore → thesensatore, cameron → cameron-s1
- contributors.kind TEXT DEFAULT 'person'
  Migration seeds 'agent' for known Pentagon agent handles.

## Role model (confirmed by Cory Apr 24)

Weights: author 0.30, challenger 0.25, synthesizer 0.20, originator 0.15, evaluator 0.05
- author:     human who submitted the PR (curation + submission work)
- originator: person who authored the underlying content (rewards external creators)
- challenger: agent/person who brought a productive disagreement
- synthesizer: cross-domain work (enrichments, research sessions)
- evaluator:  reviewer who approved (Leo + domain agent)

Humans-are-always-author: agents credit is capped at evaluator/synthesizer/
challenger. Pentagon agents classify as kind='agent' and surface in the
agent-view leaderboard, not the default person view.

## Writer (lib/contributor.py)

- New insert_contribution_event(): idempotent INSERT OR IGNORE with alias
  normalization + kind classification. Falls back silently on pre-v24 DBs.
- record_contributor_attribution double-writes alongside existing
  upsert_contributor calls. Zero risk to current dashboard.
- Author event: emitted once per PR from prs.submitted_by → git author →
  agent-branch-prefix.
- Originator events: emitted per claim from frontmatter sourcer, skipping
  when sourcer == author (avoids self-credit double-count).
- Evaluator events: Leo (always when leo_verdict='approve') + domain_agent
  (when domain_verdict='approve' and not Leo).
- Challenger/Synthesizer: emitted from Pentagon-Agent trailer on
  agent-owned branches (theseus/*, rio/*, etc.) based on commit_type.
  Pipeline-owned branches (extract/*, reweave/*) get no trailer-based event —
  infrastructure work isn't contribution credit.

## Helpers (lib/attribution.py)

- normalize_handle(raw, conn=None): lowercase + strip @ + alias lookup
- classify_kind(handle): returns 'agent' for PENTAGON_AGENTS, else 'person'
  Intentionally narrow. Orgs get classified by operator review, not heuristics.

## Backfill (scripts/backfill-events.py)

Replays all merged PRs into events. Idempotent (safe to re-run). Emits:
- PR-level: author, evaluator, challenger, synthesizer
- Per-claim: originator (walks knowledge tree, matches via description titles)

Known limitation: post-merge PR branches are deleted from Forgejo, so we
can't diff them for granular per-claim events. Claim→PR mapping uses
prs.description (pipe-separated titles). Misses some edge cases but
recovers the bulk of historical originator credit. Forward traffic gets
clean per-claim events via the normal record_contributor_attribution path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 13:59:22 +01:00
7 changed files with 1742 additions and 19 deletions

View file

@ -42,7 +42,7 @@ API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secret
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/prs", "/ops", "/health", "/agents", "/epistemic", "/legacy", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest"})
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest", "/api/search"})
def _get_db() -> sqlite3.Connection:
@ -663,38 +663,115 @@ async def handle_api_domains(request):
return web.json_response({"domains": breakdown})
async def handle_api_search(request):
"""GET /api/search — semantic search over claims via Qdrant + graph expansion.
def _qdrant_hits_to_results(hits, include_expanded=False):
"""Shape raw Qdrant hits into Ship's chat-API contract."""
results = []
for h in hits:
payload = h.get("payload", {}) or {}
path = payload.get("claim_path", "") or ""
slug = path.rsplit("/", 1)[-1]
if slug.endswith(".md"):
slug = slug[:-3]
results.append({
"slug": slug,
"path": path,
"title": payload.get("claim_title", ""),
"domain": payload.get("domain"),
"confidence": payload.get("confidence"),
"score": round(float(h.get("score", 0.0) or 0.0), 4),
"body_excerpt": payload.get("snippet", "") or "",
})
return results
Query params:
q: search query (required)
domain: filter by domain (optional)
confidence: filter by confidence level (optional)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
async def handle_api_search(request):
"""Semantic search over claims via Qdrant.
POST contract (Ship's chat API):
body: {"query": str, "limit": int, "min_score": float?, "domain": str?, "confidence": str?, "exclude": [str]?}
response: {"query": str, "results": [{"slug","path","title","domain","confidence","score","body_excerpt"}], "total": int}
GET (legacy + hackathon debug):
q: search query (required)
limit, domain, confidence, exclude, expand
min_score: if set, bypasses two-pass lib threshold (default lib behavior otherwise)
"""
if request.method == "POST":
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON body"}, status=400)
query = (body.get("query") or "").strip()
if not query:
return web.json_response({"error": "query required"}, status=400)
try:
limit = min(int(body.get("limit") or 5), 50)
except (TypeError, ValueError):
return web.json_response({"error": "limit must be int"}, status=400)
try:
min_score = float(body.get("min_score") if body.get("min_score") is not None else 0.25)
except (TypeError, ValueError):
return web.json_response({"error": "min_score must be float"}, status=400)
domain = body.get("domain")
confidence = body.get("confidence")
exclude = body.get("exclude") or None
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
results = _qdrant_hits_to_results(hits)
return web.json_response({"query": query, "results": results, "total": len(results)})
# GET path
query = request.query.get("q", "").strip()
if not query:
return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain")
confidence = request.query.get("confidence")
limit = min(int(request.query.get("limit", "10")), 50)
try:
limit = min(int(request.query.get("limit", "10")), 50)
except ValueError:
return web.json_response({"error": "limit must be int"}, status=400)
exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false"
min_score_raw = request.query.get("min_score")
# Use shared search library (Layer 1 + Layer 2)
if min_score_raw is not None:
try:
min_score = float(min_score_raw)
except ValueError:
return web.json_response({"error": "min_score must be float"}, status=400)
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
direct = _qdrant_hits_to_results(hits)
return web.json_response({
"query": query,
"direct_results": direct,
"expanded_results": [],
"total": len(direct),
})
# Default GET: Layer 1 + Layer 2 via lib
result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude)
if "error" in result:
error = result["error"]
if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500)
return web.json_response(result)
@ -2268,6 +2345,7 @@ def create_app() -> web.Application:
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search)
app.router.add_post("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage)

View file

@ -21,6 +21,14 @@ logger = logging.getLogger("pipeline.attribution")
VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"})
# Agent-owned branch prefixes — PRs from these branches get Pentagon-Agent trailer
# credit for challenger/synthesizer roles. Pipeline-infra branches (extract/ reweave/
# fix/ ingestion/) are deliberately excluded: they're automation, not contribution.
# Single source of truth; imported by contributor.py and backfill-events.py.
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
# Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches
# GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy"
# or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene
@ -48,6 +56,58 @@ def _filter_valid_handles(result: dict) -> dict:
return filtered
# ─── Handle normalization + kind classification (schema v24) ──────────────
# Known Pentagon agents. Used to classify contributor kind='agent' so the
# leaderboard can filter them out of the default person view.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline", # pipeline-owned commits (extract/*, reweave/*, fix/*)
})
def normalize_handle(handle: str, conn=None) -> str:
"""Canonicalize a handle: lowercase, strip @, resolve alias if conn provided.
Examples:
'@thesensatore' 'thesensatore'
'Cameron' 'cameron' 'cameron-s1' (via alias if seeded)
'CNBC' 'cnbc'
Always lowercases and strips @ prefix. Alias resolution requires a conn
argument (not always available at parse time; merge-time writer passes it).
"""
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
if conn is None:
return h
try:
row = conn.execute(
"SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,),
).fetchone()
if row:
return row["canonical"] if isinstance(row, dict) or hasattr(row, "keys") else row[0]
except Exception:
# Alias table might not exist yet on pre-v24 DBs — degrade gracefully.
logger.debug("normalize_handle: alias lookup failed for %r", h, exc_info=True)
return h
def classify_kind(handle: str) -> str:
"""Return 'agent' for known Pentagon agents, 'person' otherwise.
The 'org' kind (CNBC, SpaceNews, etc.) is assigned by operator review,
not inferred here. Keeping heuristics narrow: we know our own agents;
everything else defaults to person until explicitly classified.
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return "agent"
return "person"
# ─── Parse attribution from claim content ──────────────────────────────────

View file

@ -5,6 +5,7 @@ Extracted from merge.py (Phase 5 decomposition). Functions:
- refine_commit_type: extract challenge/enrich refinement from diff content
- record_contributor_attribution: parse trailers + frontmatter, upsert contributors
- upsert_contributor: insert/update contributor record with role counts
- insert_contribution_event: event-sourced credit log (schema v24)
- recalculate_tier: tier promotion based on config rules
"""
@ -13,11 +14,69 @@ import logging
import re
from . import config, db
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle
from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor")
# ─── Event schema (v24) ───────────────────────────────────────────────────
# Role → CI weight, per Cory's confirmed schema (Apr 24 conversation).
# Humans-are-always-author rule: agents never accumulate author credit;
# evaluator (0.05) is the only agent-facing role. Internal agents still earn
# author/challenger/synthesizer on their own autonomous research PRs but
# surface in the kind='agent' leaderboard, not the default person view.
ROLE_WEIGHTS = {
"author": 0.30,
"challenger": 0.25,
"synthesizer": 0.20,
"originator": 0.15,
"evaluator": 0.05,
}
def insert_contribution_event(
conn,
handle: str,
role: str,
pr_number: int,
*,
claim_path: str | None = None,
domain: str | None = None,
channel: str | None = None,
timestamp: str | None = None,
) -> bool:
"""Emit a contribution_events row. Idempotent via UNIQUE constraint.
Returns True if the event was inserted, False if the constraint blocked it
(same handle/role/pr/claim_path combo already recorded safe to replay).
Canonicalizes handle via alias table. Classifies kind from handle.
Falls back silently if contribution_events table doesn't exist yet (pre-v24).
"""
if role not in ROLE_WEIGHTS:
logger.warning("insert_contribution_event: unknown role %r", role)
return False
weight = ROLE_WEIGHTS[role]
canonical = normalize_handle(handle, conn=conn)
if not canonical:
return False
kind = classify_kind(canonical)
try:
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
)
return cur.rowcount > 0
except Exception:
logger.debug("insert_contribution_event failed for pr=%d handle=%r role=%r",
pr_number, canonical, role, exc_info=True)
return False
def is_knowledge_pr(diff: str) -> bool:
"""Check if a PR touches knowledge files (claims, decisions, core, foundations).
@ -125,15 +184,98 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
return
# Refine commit_type from diff content (branch prefix may be too broad)
row = conn.execute("SELECT commit_type FROM prs WHERE number = ?", (pr_number,)).fetchone()
row = conn.execute(
"SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, "
"domain_verdict, domain_agent, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
refined_type = refine_commit_type(diff, branch_type)
if refined_type != branch_type:
conn.execute("UPDATE prs SET commit_type = ? WHERE number = ?", (refined_type, pr_number))
logger.info("PR #%d: commit_type refined %s%s", pr_number, branch_type, refined_type)
# Schema v24 event-sourcing context. Fetched once per PR, reused across emit sites.
pr_domain = row["domain"] if row else None
pr_channel = row["source_channel"] if row else None
pr_submitted_by = row["submitted_by"] if row else None
# Use the PR's merged_at timestamp so event time matches the actual merge.
# If a merge retries after a crash, this keeps forward-emitted and backfilled
# events on the same timeline. Falls back to datetime('now') in the writer.
pr_merged_at = row["merged_at"] if row and row["merged_at"] else None
# ── AUTHOR event (schema v24, double-write) ──
# Humans-are-always-author rule: the human in the loop gets author credit.
# Precedence: prs.submitted_by (set by extract.py from source proposed_by, or
# by discover for human PRs) → git author of first commit → branch-prefix agent.
# Pentagon-owned infra branches (extract/ reweave/ fix/ ingestion/) don't get
# author events from branch prefix; extract/ PRs carry submitted_by from the
# source's proposed_by field so the human who submitted gets credit via path 1.
author_candidate: str | None = None
if pr_submitted_by:
author_candidate = pr_submitted_by
else:
# External GitHub PRs: git author of the FIRST commit on the branch is
# the real submitter. `git log -1` would return the latest commit, which
# mis-credits multi-commit PRs where a reviewer rebased or force-pushed.
# Take the last line of the unreversed log (= oldest commit, since git
# log defaults to reverse-chronological). Ganymede review, Apr 24.
rc_author_log, author_log = await git_fn(
"log", f"origin/main..origin/{branch}", "--no-merges",
"--format=%an", timeout=5,
)
if rc_author_log == 0 and author_log.strip():
lines = [line for line in author_log.strip().split("\n") if line.strip()]
if lines:
candidate = lines[-1].strip().lower()
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions"}:
author_candidate = candidate
# Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc.
if not author_candidate and branch.startswith(AGENT_BRANCH_PREFIXES):
# Autonomous agent PR (theseus/research-*, leo/entity-*, etc.) —
# credit goes to the agent as author per Cory's directive.
author_candidate = branch.split("/", 1)[0]
if author_candidate:
insert_contribution_event(
conn, author_candidate, "author", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# ── EVALUATOR events (schema v24) ──
# Leo reviews every PR (STANDARD/DEEP tiers). domain_agent is the second
# reviewer. Both earn evaluator credit (0.05) per approved PR. Skip when
# verdict is 'request_changes' — failed review isn't contribution credit.
if row:
if row["leo_verdict"] == "approve":
insert_contribution_event(
conn, "leo", "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
if row["domain_verdict"] == "approve" and row["domain_agent"]:
dagent = row["domain_agent"].strip().lower()
if dagent and dagent != "leo": # don't double-credit leo
insert_contribution_event(
conn, dagent, "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Parse Pentagon-Agent trailer from branch commit messages
agents_found: set[str] = set()
# Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent
# challenger/synthesizer credit based on refined commit_type. Pipeline-owned
# branches (extract/*, reweave/*, etc.) don't — those are infra, not work.
is_agent_branch = branch.startswith(AGENT_BRANCH_PREFIXES)
_TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"research": "synthesizer",
"reweave": "synthesizer",
}
rc, log_output = await git_fn(
"log", f"origin/main..origin/{branch}", "--format=%b%n%N",
timeout=10,
@ -146,6 +288,15 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
upsert_contributor(
conn, agent_name, agent_uuid, role, today,
)
# Event-emit only for agent-owned branches where the trailer's agent
# actually did the substantive work (challenger/synthesizer).
event_role = _TRAILER_EVENT_ROLE.get(refined_type)
if is_agent_branch and event_role:
insert_contribution_event(
conn, agent_name, event_role, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(agent_name)
# Parse attribution from NEWLY ADDED knowledge files via the canonical attribution
@ -175,6 +326,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
# already matches via the claim file. Widening requires Cory sign-off
# since it would change leaderboard accounting (entity-only PRs → CI credit).
knowledge_prefixes = ("domains/", "core/", "foundations/", "decisions/")
author_canonical = normalize_handle(author_candidate, conn=conn) if author_candidate else None
for rel_path in files_output.strip().split("\n"):
rel_path = rel_path.strip()
if not rel_path.endswith(".md"):
@ -192,6 +344,21 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
upsert_contributor(
conn, handle, entry.get("agent_id"), role, today,
)
# Event-emit: only 'sourcer' frontmatter entries become
# originator events. 'extractor' frontmatter = infrastructure
# (the Sonnet extraction agent), no event. challenger/
# synthesizer frontmatter is extremely rare at extract time.
# Skip originator if same as author — avoids double-credit
# when someone submits their own content (self-authored).
if role == "sourcer":
origin_canonical = normalize_handle(handle, conn=conn)
if origin_canonical and origin_canonical != author_canonical:
insert_contribution_event(
conn, handle, "originator", pr_number,
claim_path=rel_path,
domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Fallback: if no Pentagon-Agent trailer found, try git commit authors
_BOT_AUTHORS = frozenset({
@ -209,13 +376,35 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
if author_name and author_name not in _BOT_AUTHORS:
role = commit_type_to_role(refined_type)
upsert_contributor(conn, author_name, None, role, today)
# Event-model parity: emit challenger/synthesizer event when
# the fallback credits a human/agent for that kind of work.
# Without this, external-contributor challenge/enrich PRs
# accumulate legacy counts but disappear from event-sourced
# leaderboards when Phase B cuts over. (Ganymede review.)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, author_name, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(author_name)
if not agents_found:
row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
if row and row["agent"] and row["agent"] != "external":
fb_row = conn.execute(
"SELECT agent FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if fb_row and fb_row["agent"] and fb_row["agent"] != "external":
pr_agent = fb_row["agent"].lower()
role = commit_type_to_role(refined_type)
upsert_contributor(conn, row["agent"].lower(), None, role, today)
upsert_contributor(conn, pr_agent, None, role, today)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, pr_agent, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
def upsert_contributor(

206
lib/db.py
View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 23
SCHEMA_VERSION = 26
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -35,6 +35,15 @@ CREATE TABLE IF NOT EXISTS sources (
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
-- v26: provenance publisher (news org / venue) + content author.
-- publisher_id references publishers(id) when source is from a known org.
-- original_author_handle references contributors(handle) when author is in our system.
-- original_author is free-text fallback ("Kim et al.", "Robin Hanson") not credit-bearing.
publisher_id INTEGER REFERENCES publishers(id),
content_type TEXT,
-- article | paper | tweet | conversation | self_authored | webpage | podcast
original_author TEXT,
original_author_handle TEXT REFERENCES contributors(handle),
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
@ -163,6 +172,77 @@ CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
-- Event-sourced contributions (schema v24).
-- One row per credit-earning event. Idempotent via two partial UNIQUE indexes
-- (SQLite treats NULL != NULL in UNIQUE constraints, so a single composite
-- UNIQUE with nullable claim_path would allow evaluator-event duplicates).
-- Leaderboards are SQL aggregations over this table; contributors becomes a materialized cache.
CREATE TABLE IF NOT EXISTS contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
-- person | org | agent
role TEXT NOT NULL,
-- author | originator | challenger | synthesizer | evaluator
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
-- NULL for PR-level events (e.g. evaluator). Set for per-claim events.
domain TEXT,
channel TEXT,
-- telegram | github | agent | web | unknown
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Per-claim events: unique on (handle, role, pr_number, claim_path) when path IS NOT NULL.
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
-- PR-level events (evaluator, author, trailer-based): unique on (handle, role, pr_number) when path IS NULL.
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
-- Handle aliasing. @thesensatore thesensatore. cameron cameron-s1.
-- Writers call resolve_alias(handle) before inserting events or upserting contributors.
CREATE TABLE IF NOT EXISTS contributor_aliases (
alias TEXT PRIMARY KEY,
canonical TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
-- Publishers: news orgs, academic venues, social platforms. NOT contributors these
-- provide metadata/provenance for sources, never earn leaderboard credit. Separating
-- these from contributors prevents CNBC/SpaceNews from dominating the leaderboard.
-- (Apr 24 Cory directive: "only credit the original source if its on X or tg")
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
-- Multi-platform identity: one contributor, many handles. Enables the leaderboard to
-- unify @thesensatore (X) + thesensatore (TG) + thesensatore@github into one person.
-- Writers check this table after resolving aliases to find canonical contributor handle.
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
"""
@ -641,6 +721,130 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v23: added idx_prs_source_path for auto-close dedup lookup")
if current < 24:
# Event-sourced contributions table + alias table + kind column on contributors.
# Non-breaking: contributors table stays; events are written in addition via
# double-write in merge.py. Leaderboards switch to events in Phase B.
conn.executescript("""
CREATE TABLE IF NOT EXISTS contribution_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
handle TEXT NOT NULL,
kind TEXT NOT NULL DEFAULT 'person',
role TEXT NOT NULL,
weight REAL NOT NULL,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
channel TEXT,
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Partial unique indexes handle SQLite's NULL != NULL UNIQUE semantics.
-- Per-claim events dedup on 4-tuple; PR-level events dedup on 3-tuple.
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_claim ON contribution_events(
handle, role, pr_number, claim_path
) WHERE claim_path IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_ce_unique_pr ON contribution_events(
handle, role, pr_number
) WHERE claim_path IS NULL;
CREATE INDEX IF NOT EXISTS idx_ce_handle_ts ON contribution_events(handle, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_domain_ts ON contribution_events(domain, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_pr ON contribution_events(pr_number);
CREATE INDEX IF NOT EXISTS idx_ce_role_ts ON contribution_events(role, timestamp);
CREATE INDEX IF NOT EXISTS idx_ce_kind_ts ON contribution_events(kind, timestamp);
CREATE TABLE IF NOT EXISTS contributor_aliases (
alias TEXT PRIMARY KEY,
canonical TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
""")
try:
conn.execute("ALTER TABLE contributors ADD COLUMN kind TEXT DEFAULT 'person'")
except sqlite3.OperationalError:
pass # column already exists
# Seed known aliases. @thesensatore → thesensatore catches the zombie row Argus flagged.
# cameron → cameron-s1 reconciles the Leo-flagged missing contributor.
conn.executemany(
"INSERT OR IGNORE INTO contributor_aliases (alias, canonical) VALUES (?, ?)",
[
("@thesensatore", "thesensatore"),
("cameron", "cameron-s1"),
],
)
# Seed kind='agent' for known Pentagon agents so the events writer picks it up.
# Must stay in sync with lib/attribution.PENTAGON_AGENTS — drift causes
# contributors.kind to disagree with classify_kind() output for future
# inserts. (Ganymede review: "pipeline" was missing until Apr 24.)
pentagon_agents = [
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
]
for agent in pentagon_agents:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(agent,),
)
conn.commit()
logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column")
if current < 25:
# v24 seeded 13 Pentagon agents but missed "pipeline" — classify_kind()
# treats it as agent so contributors.kind drifted from event-insert output.
# Idempotent corrective UPDATE: fresh installs have no "pipeline" row
# (no-op), upgraded envs flip it if it exists. (Ganymede review Apr 24.)
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = 'pipeline'"
)
conn.commit()
logger.info("Migration v25: patched kind='agent' for pipeline handle")
if current < 26:
# Add publishers + contributor_identities. Non-breaking — new tables only.
# No existing data moved. Classification into publishers happens via a
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
# seed list. CHECK constraint on contributors.kind deferred to v27 after
# classification completes. (Apr 24 Cory directive: "fix schema, don't
# filter output" — separate contributors from publishers at the data layer.)
conn.executescript("""
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
""")
# Extend sources with provenance columns. ALTER TABLE ADD COLUMN is
# idempotent-safe via try/except because SQLite doesn't support IF NOT EXISTS
# on column adds.
for col_sql in (
"ALTER TABLE sources ADD COLUMN publisher_id INTEGER REFERENCES publishers(id)",
"ALTER TABLE sources ADD COLUMN content_type TEXT",
"ALTER TABLE sources ADD COLUMN original_author TEXT",
"ALTER TABLE sources ADD COLUMN original_author_handle TEXT REFERENCES contributors(handle)",
):
try:
conn.execute(col_sql)
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
raise
conn.commit()
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

618
scripts/backfill-events.py Normal file
View file

@ -0,0 +1,618 @@
#!/usr/bin/env python3
"""Backfill contribution_events by replaying merged PRs from pipeline.db + worktree.
For each merged PR:
- Derive author from prs.submitted_by git author branch prefix
- Emit author event (role=author, weight=0.30, claim_path=NULL)
- For each claim file under a knowledge prefix, parse frontmatter and emit
originator events for sourcer entries that differ from the author
- Emit evaluator events for Leo (when leo_verdict='approve') and domain_agent
(when domain_verdict='approve' and not Leo)
- Emit challenger/synthesizer events for Pentagon-Agent trailers on
agent-owned branches (theseus/*, rio/*, etc.) based on commit_type
Idempotent via the partial UNIQUE indexes on contribution_events. Safe to re-run.
Usage:
python3 scripts/backfill-events.py --dry-run # Count events without writing
python3 scripts/backfill-events.py # Apply
Runs read-only against the git worktree; only writes to pipeline.db.
"""
import argparse
import os
import re
import sqlite3
import subprocess
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
REPO_DIR = os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main")
# Role weights — must match lib/contributor.py ROLE_WEIGHTS.
ROLE_WEIGHTS = {
"author": 0.30,
"challenger": 0.25,
"synthesizer": 0.20,
"originator": 0.15,
"evaluator": 0.05,
}
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES.
# Duplicated here because this script runs standalone (no pipeline package import).
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
"research": "synthesizer",
"reweave": "synthesizer",
}
KNOWLEDGE_PREFIXES = ("domains/", "core/", "foundations/", "decisions/")
BOT_AUTHORS = frozenset({
"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions",
})
def normalize_handle(conn: sqlite3.Connection, handle: str) -> str:
if not handle:
return ""
h = handle.strip().lower().lstrip("@")
row = conn.execute("SELECT canonical FROM contributor_aliases WHERE alias = ?", (h,)).fetchone()
if row:
return row[0]
return h
def classify_kind(handle: str) -> str:
h = handle.strip().lower().lstrip("@")
return "agent" if h in PENTAGON_AGENTS else "person"
def parse_frontmatter(text: str):
"""Minimal YAML frontmatter parser using PyYAML when available."""
if not text.startswith("---"):
return None
end = text.find("---", 3)
if end == -1:
return None
raw = text[3:end]
try:
import yaml
fm = yaml.safe_load(raw)
return fm if isinstance(fm, dict) else None
except ImportError:
return None
except Exception:
return None
def extract_sourcers_from_file(path: Path) -> list[str]:
"""Return the sourcer handles from a claim file's frontmatter.
Matches three formats:
1. Block: `attribution: { sourcer: [{handle: "x"}, ...] }`
2. Bare-key flat: `sourcer: alexastrum`
3. Prefix-keyed: `attribution_sourcer: alexastrum`
"""
try:
content = path.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
return []
fm = parse_frontmatter(content)
if not fm:
return []
handles: list[str] = []
attr = fm.get("attribution")
if isinstance(attr, dict):
entries = attr.get("sourcer", [])
if isinstance(entries, list):
for e in entries:
if isinstance(e, dict) and "handle" in e:
handles.append(e["handle"])
elif isinstance(e, str):
handles.append(e)
elif isinstance(entries, str):
handles.append(entries)
return handles
flat = fm.get("attribution_sourcer")
if flat:
if isinstance(flat, str):
handles.append(flat)
elif isinstance(flat, list):
handles.extend(v for v in flat if isinstance(v, str))
if handles:
return handles
bare = fm.get("sourcer")
if bare:
if isinstance(bare, str):
handles.append(bare)
elif isinstance(bare, list):
handles.extend(v for v in bare if isinstance(v, str))
return handles
_HANDLE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,38}$")
def valid_handle(h: str) -> bool:
if not h:
return False
lower = h.strip().lower().lstrip("@")
if lower.endswith("-") or lower.endswith("_"):
return False
return bool(_HANDLE_RE.match(lower))
def git(*args, cwd: str = REPO_DIR, timeout: int = 30) -> str:
"""Run a git command, return stdout. Returns empty string on failure."""
try:
result = subprocess.run(
["git", *args],
cwd=cwd, capture_output=True, text=True, timeout=timeout, check=False,
)
return result.stdout
except (subprocess.TimeoutExpired, OSError):
return ""
def git_first_commit_author(pr_branch: str, merged_at: str) -> str:
"""Best-effort: find git author of first non-merge commit on the branch.
PR branches are usually deleted after merge. We fall back to scanning main
commits around merged_at for commits matching the branch slug.
"""
# Post-merge branches are cleaned up. For the backfill, we accept that this
# path rarely yields results and rely on submitted_by + branch prefix.
return ""
def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
"""Author precedence: submitted_by → branch-prefix agent for agent-owned branches."""
if pr.get("submitted_by"):
cand = pr["submitted_by"].strip().lower().lstrip("@")
if cand and cand not in BOT_AUTHORS:
return cand
branch = pr.get("branch") or ""
if "/" in branch:
prefix = branch.split("/", 1)[0].lower()
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
return prefix
return None
def find_pr_for_claim(
conn: sqlite3.Connection,
repo: Path,
md: Path,
) -> tuple[int | None, str]:
"""Recover the Forgejo PR number that introduced a claim file.
Returns (pr_number, strategy) strategy is one of:
'sourced_from' frontmatter sourced_from matched prs.source_path
'git_subject' git log first-add commit message matched a branch pattern
'title_desc' filename stem matched a title in prs.description
'github_pr' recovery commit mentioned GitHub PR # → prs.github_pr
'none' no strategy found a match
Order is chosen by reliability:
1. sourced_from (explicit provenance, most reliable when present)
2. git_subject (covers Leo research, Cameron challenges, Theseus contrib)
3. title_desc (current fallback brittle when description is NULL)
4. github_pr (recovery commits referencing erased GitHub PRs)
"""
rel = str(md.relative_to(repo))
# Strategy 1: sourced_from frontmatter → prs.source_path
try:
content = md.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
content = ""
fm = parse_frontmatter(content) if content else None
if fm:
sourced = fm.get("sourced_from")
candidate_paths: list[str] = []
if isinstance(sourced, str) and sourced:
candidate_paths.append(sourced)
elif isinstance(sourced, list):
candidate_paths.extend(s for s in sourced if isinstance(s, str))
for sp in candidate_paths:
stem = Path(sp).stem
if not stem:
continue
row = conn.execute(
"""SELECT number FROM prs
WHERE source_path LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"%{stem}.md",),
).fetchone()
if row:
return row["number"], "sourced_from"
# Strategy 2: git log first-add commit → subject pattern → prs.branch
# Default log order is reverse-chronological; take the last line (oldest)
# to get the original addition, not later rewrites.
log_out = git(
"log", "--diff-filter=A", "--follow",
"--format=%H|||%s|||%b", "--", rel,
)
if log_out.strip():
# Split on the delimiter we chose. Each commit produces 3 fields but
# %b can contain blank lines — group by lines that look like a SHA.
blocks: list[tuple[str, str, str]] = []
current: list[str] = []
for line in log_out.splitlines():
if re.match(r"^[a-f0-9]{40}\|\|\|", line):
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
current = [line]
else:
current.append(line)
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
if blocks:
# Oldest addition — git log defaults to reverse-chronological
_oldest_sha, subject, body = blocks[-1]
# Pattern: "<agent>: extract claims from <slug>"
m = re.match(r"^(\w+):\s*extract\s+claims\s+from\s+(\S+)", subject)
if m:
slug = m.group(2).rstrip(".md").rstrip(".")
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"extract/{slug}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: research session <date>"
m = re.match(r"^(\w+):\s*research\s+session\s+(\d{4}-\d{2}-\d{2})", subject)
if m:
agent = m.group(1).lower()
date = m.group(2)
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/research-{date}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: challenge" / contrib challenges / entity batches
m = re.match(r"^(\w+):\s*(?:challenge|contrib|entity|synthesize)", subject)
if m:
agent = m.group(1).lower()
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Recovery commits referencing erased GitHub PRs (Alex/Cameron).
# Subject: "Recover <who> contribution from GitHub PR #NN (...)".
# Match only when a corresponding prs row exists with github_pr=NN —
# otherwise the claims were direct-to-main without a Forgejo PR
# record, which requires a synthetic PR row (follow-up, not in
# this script's scope).
gh_match = re.search(r"GitHub\s+PR\s+#(\d+)", subject + "\n" + body)
if gh_match:
gh_pr = int(gh_match.group(1))
row = conn.execute(
"SELECT number FROM prs WHERE github_pr = ? AND status='merged' LIMIT 1",
(gh_pr,),
).fetchone()
if row:
return row["number"], "github_pr"
# Pattern: bare "Extract N claims from <source-fragment>" (no
# agent prefix). Used in early research PRs like Shaga's claims
# at PR #2025. Fall back to time-proximity: find the earliest
# agent-branch PR merged within 24h AFTER this commit's date.
m = re.match(r"^Extract\s+\d+\s+claims\s+from\b", subject)
if m:
# Get commit author date
date_out = git(
"log", "-1", "--format=%aI", _oldest_sha, timeout=10,
)
commit_date = date_out.strip() if date_out.strip() else None
if commit_date:
# git %aI returns ISO 8601 with T-separator; prs.merged_at
# uses SQLite's space-separator. Lexicographic comparison
# fails across formats (space<T), so normalize commit_date
# via datetime() before comparing. Without this, PRs merged
# within the same calendar day but earlier than the commit
# hour are silently excluded (caught by Ganymede review —
# Shaga's #2025 was dropped in favor of later #2032).
row = conn.execute(
"""SELECT number FROM prs
WHERE status='merged'
AND merged_at >= datetime(?)
AND merged_at <= datetime(datetime(?), '+24 hours')
AND (branch LIKE 'leo/%' OR branch LIKE 'theseus/%'
OR branch LIKE 'rio/%' OR branch LIKE 'astra/%'
OR branch LIKE 'vida/%' OR branch LIKE 'clay/%')
ORDER BY merged_at ASC LIMIT 1""",
(commit_date, commit_date),
).fetchone()
if row:
return row["number"], "git_time_proximity"
return None, "none"
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
canonical = normalize_handle(conn, handle)
if not valid_handle(canonical):
return
kind = classify_kind(canonical)
weight = ROLE_WEIGHTS[role]
counts[(role, "attempt")] += 1
if dry_run:
counts[(role, "would_insert")] += 1
return
cur = conn.execute(
"""INSERT OR IGNORE INTO contribution_events
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, datetime('now')))""",
(canonical, kind, role, weight, pr_number, claim_path, domain, channel, timestamp),
)
if cur.rowcount > 0:
counts[(role, "inserted")] += 1
else:
counts[(role, "skipped_dup")] += 1
def files_added_in_pr(pr_number: int, branch: str) -> list[str]:
"""Best-effort: list added .md files in the PR.
Uses prs.source_path as a fallback signal (the claim being added). If the
branch no longer exists post-merge, this will return []; we accept the loss
for historical PRs where the granular per-claim events can't be recovered —
PR-level author/evaluator events still land correctly.
"""
# Post-merge PR branches are deleted from Forgejo so we can't diff them.
# For the backfill we use prs.source_path — for extract/* PRs this points to
# the source inbox file; we can glob the claim files from the extract branch
# commit on main. But main's commits don't track which files a given PR touched.
# Accept the loss: backfill emits only PR-level events (author, evaluator,
# challenger/synthesizer). Originator events come from parsing claim files
# attributed to the branch via description field which lists claim titles.
return []
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--limit", type=int, default=0, help="Process at most N PRs (0 = all)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: contribution_events exists (v24 migration applied)
try:
conn.execute("SELECT 1 FROM contribution_events LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: contribution_events table missing. Run migration v24 first.", file=sys.stderr)
sys.exit(2)
# Walk all merged knowledge PRs
query = """
SELECT number, branch, domain, source_channel, submitted_by,
leo_verdict, domain_verdict, domain_agent,
commit_type, merged_at
FROM prs
WHERE status = 'merged'
ORDER BY merged_at ASC
"""
if args.limit:
query += f" LIMIT {args.limit}"
prs = conn.execute(query).fetchall()
print(f"Replaying {len(prs)} merged PRs (dry_run={args.dry_run})...")
counts: Counter = Counter()
repo = Path(REPO_DIR)
for pr in prs:
pr_number = pr["number"]
branch = pr["branch"] or ""
domain = pr["domain"]
channel = pr["source_channel"]
merged_at = pr["merged_at"]
# Skip pipeline-only branches for author credit (extract/*, reweave/*,
# fix/*, ingestion/*, epimetheus/*) — those are infrastructure. But
# evaluator credit for Leo/domain_agent still applies.
is_pipeline_branch = branch.startswith((
"extract/", "reweave/", "fix/", "ingestion/", "epimetheus/",
))
# ── AUTHOR ──
# For pipeline branches, submitted_by carries the real author (the
# human who submitted the source via Telegram/etc). For agent branches,
# the agent is author. For external branches (gh-pr-*), git author is
# in submitted_by from the sync-mirror pipeline.
author = derive_author(conn, dict(pr))
if author:
emit(conn, counts, args.dry_run, author, "author", pr_number,
None, domain, channel, merged_at)
# ── EVALUATOR ──
if pr["leo_verdict"] == "approve":
emit(conn, counts, args.dry_run, "leo", "evaluator", pr_number,
None, domain, channel, merged_at)
if pr["domain_verdict"] == "approve" and pr["domain_agent"]:
dagent = pr["domain_agent"].strip().lower()
if dagent and dagent != "leo":
emit(conn, counts, args.dry_run, dagent, "evaluator", pr_number,
None, domain, channel, merged_at)
# ── CHALLENGER / SYNTHESIZER from branch+commit_type ──
# Only fires on agent-owned branches. Pipeline branches aren't creditable
# work (they're machine extraction, evaluator already captures the review).
if branch.startswith(AGENT_BRANCH_PREFIXES):
prefix = branch.split("/", 1)[0].lower()
event_role = TRAILER_EVENT_ROLE.get(pr["commit_type"] or "")
if event_role:
emit(conn, counts, args.dry_run, prefix, event_role, pr_number,
None, domain, channel, merged_at)
# ── ORIGINATOR per claim ──
# Walk claim files currently on main whose content was added in this PR.
# We can't diff old branches (deleted post-merge), but for extract PRs
# the source_path + description carry claim titles — too lossy to build
# per-claim events reliably. Strategy: walk ALL claim files that have a
# sourcer in their frontmatter and assign them to the PR whose
# source_path matches (via description or filename heuristic).
# DEFERRED: per-claim originator events require branch introspection
# that fails on deleted branches. Backfill emits PR-level events only.
# Forward traffic (post-deploy) gets per-claim originator events via
# record_contributor_attribution's added-files walk.
if not args.dry_run:
conn.commit()
# Originator is emitted in the claim-level pass below, not the PR-level pass.
# Previous summary listed it here with attempted=0 which confused operators.
print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===")
for role in ("author", "challenger", "synthesizer", "evaluator"):
att = counts[(role, "attempt")]
if args.dry_run:
wi = counts[(role, "would_insert")]
print(f" {role:12s} attempted={att:5d} would_insert={wi:5d}")
else:
ins = counts[(role, "inserted")]
skip = counts[(role, "skipped_dup")]
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
# ── Per-claim originator pass ──
# Walk the knowledge tree, parse sourcer attribution, and attach each claim
# to its merging PR via find_pr_for_claim's multi-strategy recovery.
# Apr 24 rewrite (Ganymede-approved): replaces the single-strategy
# title→description match with four strategies in reliability order.
# Previous script missed PRs with NULL description (Cameron #3377) and
# cross-context claims (Shaga's Leo research). Fallback title-match is
# preserved to recover anything the git-log path misses.
print("\n=== Claim-level originator pass ===")
# Build title → pr_number map from prs.description (strategy 3 fallback)
title_to_pr: dict[str, int] = {}
for r in conn.execute(
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
).fetchall():
desc = r["description"] or ""
for title in desc.split(" | "):
title = title.strip()
if title:
# Last-writer wins. Conflicts are rare (titles unique in practice).
title_to_pr[title.lower()] = r["number"]
claim_counts = Counter()
strategy_counts = Counter()
claim_count = 0
originator_count = 0
for md in sorted(repo.glob("domains/**/*.md")) + \
sorted(repo.glob("core/**/*.md")) + \
sorted(repo.glob("foundations/**/*.md")) + \
sorted(repo.glob("decisions/**/*.md")):
rel = str(md.relative_to(repo))
stem = md.stem
# Strategies 1, 2, 4 via the helper (sourced_from, git_subject, github_pr).
pr_number, strategy = find_pr_for_claim(conn, repo, md)
# Strategy 3 (fallback): title-match against prs.description.
if not pr_number:
pr_number = title_to_pr.get(stem.lower())
if not pr_number:
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
if pr_number:
strategy = "title_desc"
if not pr_number:
claim_counts["no_pr_match"] += 1
continue
sourcers = extract_sourcers_from_file(md)
if not sourcers:
claim_counts["no_sourcer"] += 1
continue
claim_count += 1
strategy_counts[strategy] += 1
# Look up author for this PR to skip self-credit
pr_row = conn.execute(
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
if not pr_row:
continue
author = derive_author(conn, dict(pr_row))
author_canonical = normalize_handle(conn, author) if author else None
for src_handle in sourcers:
src_canonical = normalize_handle(conn, src_handle)
if not valid_handle(src_canonical):
claim_counts["invalid_handle"] += 1
continue
if src_canonical == author_canonical:
claim_counts["skip_self"] += 1
continue
emit(conn, counts, args.dry_run, src_handle, "originator", pr_number,
rel, pr_row["domain"], pr_row["source_channel"], pr_row["merged_at"])
originator_count += 1
if not args.dry_run:
conn.commit()
print(f" Claims processed: {claim_count}")
print(f" Originator events emitted: {originator_count}")
print(f" Breakdown: {dict(claim_counts)}")
print(f" Strategy hits: {dict(strategy_counts)}")
att = counts[("originator", "attempt")]
if args.dry_run:
wi = counts[("originator", "would_insert")]
print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}")
else:
ins = counts[("originator", "inserted")]
skip = counts[("originator", "skipped_dup")]
print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
if not args.dry_run:
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
print(f"\nTotal contribution_events rows: {total}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""Reconstruct synthetic `prs` rows for historical GitHub PRs lost pre-mirror-wiring.
Two PRs merged on GitHub before our sync-mirror.sh tracked `github_pr`:
- GitHub PR #68: alexastrum — 6 claims, merged 2026-03-09 via GitHub squash,
recovered to Forgejo via commit dba00a79 (Apr 16, after mirror erased files)
- GitHub PR #88: Cameron-S1 — 1 claim, recovered via commit da64f805
The recovery commits wrote the files directly to main, so our `prs` table has
no row to attach originator events to the backfill-events.py strategies all
return NULL. We reconstruct one synthetic `prs` row per historical GitHub PR so
the events pipeline (and `github_pr` strategy in backfill-events) can credit
Alex and Cameron properly.
Numbers 900000+ are clearly synthetic and won't collide with real Forgejo PRs.
Idempotent via INSERT OR IGNORE.
Usage:
python3 scripts/backfill-synthetic-recovery-prs.py --dry-run
python3 scripts/backfill-synthetic-recovery-prs.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Historical GitHub PRs recovered via direct-to-main commits.
# Original GitHub merge dates come from the recovery commit messages.
RECOVERY_PRS = [
{
"number": 900068,
"github_pr": 68,
"branch": "gh-pr-68",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "alexastrum",
"source_channel": "github",
# origin='human' matches lib/merge.py convention for external contributors
# (default is 'pipeline' which misclassifies us as machine-authored).
"origin": "human",
"priority": "high",
"description": "Multi-agent git workflows production maturity | Cryptographic agent trust ratings | Defense in depth for AI agent oversight | Deterministic policy engines below LLM layer | Knowledge validation four-layer architecture | Structurally separating proposer and reviewer agents",
"merged_at": "2026-03-09 00:00:00",
"created_at": "2026-03-08 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #68 pre-mirror-wiring reconstruction (commit dba00a79)",
},
{
"number": 900088,
"github_pr": 88,
"branch": "gh-pr-88",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "cameron-s1",
"source_channel": "github",
"origin": "human",
"priority": "high",
"description": "Orthogonality is an artefact of specification architectures not a property of intelligence itself",
"merged_at": "2026-04-01 00:00:00",
"created_at": "2026-04-01 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #88 pre-mirror-wiring reconstruction (commit da64f805)",
},
]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Guard against synthetic-range colonization (Ganymede review): check for
# any row in the synthetic range that isn't one of ours. INSERT OR IGNORE on
# the specific numbers is the real collision defense; this is belt-and-suspenders.
max_real = conn.execute(
"SELECT MAX(number) FROM prs WHERE number < 900000"
).fetchone()[0] or 0
print(f"Max real Forgejo PR number: {max_real}")
synth_conflict = conn.execute(
"SELECT number FROM prs WHERE number >= 900000 AND number NOT IN (900068, 900088) LIMIT 1"
).fetchone()
if synth_conflict:
print(f"ERROR: PR #{synth_conflict[0]} already exists in synthetic range. "
f"Pick a new range before running.", file=sys.stderr)
sys.exit(2)
inserted = 0
skipped = 0
for row in RECOVERY_PRS:
existing = conn.execute(
"SELECT number FROM prs WHERE number = ? OR github_pr = ?",
(row["number"], row["github_pr"]),
).fetchone()
if existing:
print(f" PR #{row['number']} (github_pr={row['github_pr']}): already exists — skip")
skipped += 1
continue
print(f" {'(dry-run) ' if args.dry_run else ''}INSERT synthetic PR #{row['number']} "
f"(github_pr={row['github_pr']}, submitted_by={row['submitted_by']}, "
f"merged_at={row['merged_at']})")
if not args.dry_run:
conn.execute(
"""INSERT INTO prs (
number, github_pr, branch, status, domain, commit_type, tier,
leo_verdict, domain_verdict, submitted_by, source_channel,
origin, priority,
description, merged_at, created_at, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
row["number"], row["github_pr"], row["branch"], row["status"],
row["domain"], row["commit_type"], row["tier"],
row["leo_verdict"], row["domain_verdict"],
row["submitted_by"], row["source_channel"],
row["origin"], row["priority"],
row["description"], row["merged_at"], row["created_at"],
row["last_error"],
),
)
inserted += 1
if not args.dry_run:
conn.commit()
print(f"\nInserted {inserted}, skipped {skipped}")
if not args.dry_run and inserted:
print("\nNext step: re-run backfill-events.py to attach originator events")
print(" python3 ops/backfill-events.py")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""Classify `contributors` rows into {keep_person, keep_agent, move_to_publisher, delete_garbage}.
Reads current contributors table, proposes reclassification per v26 schema design:
- Real humans + Pentagon agents stay in contributors (kind='person'|'agent')
- News orgs, publications, venues move to publishers table (new v26)
- Multi-word hyphenated garbage (parsing artifacts) gets deleted
- Their contribution_events are handled per category:
* Publishers: DELETE events (orgs shouldn't have credit)
* Garbage: DELETE events (bogus data)
* Persons/agents: keep events untouched
Classification is heuristic uses explicit allowlists + regex patterns + length gates.
Ambiguous cases default to 'review_needed' (human decision).
Usage:
python3 scripts/classify-contributors.py # dry-run analysis + report
python3 scripts/classify-contributors.py --apply # write changes
python3 scripts/classify-contributors.py --show <handle> # inspect a single row
Writes to pipeline.db only. Does NOT modify claim files.
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Pentagon agents: kind='agent'. Authoritative list.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Publisher/news-org handles seen in current contributors table.
# Grouped by kind for the publishers row. Classified by inspection.
# NOTE: This list is hand-curated — add to it as new orgs appear.
PUBLISHERS_NEWS = {
# News outlets / brands
"cnbc", "al-jazeera", "axios", "bloomberg", "reuters", "bettorsinsider",
"fortune", "techcrunch", "coindesk", "coindesk-staff", "coindesk-research",
"coindesk research", "coindesk staff",
"defense-one", "thedefensepost", "theregister", "the-intercept",
"the-meridiem", "variety", "variety-staff", "variety staff", "spacenews",
"nasaspaceflight", "thedonkey", "insidedefense", "techpolicypress",
"morganlewis", "casinoorg", "deadline", "animationmagazine",
"defensepost", "casino-org", "casino.org",
"air & space forces magazine", "ieee spectrum", "techcrunch-staff",
"blockworks", "blockworks-staff", "decrypt", "ainvest", "banking-dive", "banking dive",
"cset-georgetown", "cset georgetown",
"kff", "kff-health-news", "kff health news", "kff-health-news---cbo",
"kff-health-news-/-cbo", "kff health news / cbo", "kffhealthnews",
"bloomberg-law",
"norton-rose-fulbright", "norton rose fulbright",
"defence-post", "the-defensepost",
"wilmerhale", "mofo", "sciencedirect",
"yogonet", "csr", "aisi-uk", "aisi", "aisi_gov", "rand",
"armscontrol", "eclinmed", "solana-compass", "solana compass",
"pmc11919318", "pmc11780016",
"healthverity", "natrium", "form-energy",
"courtlistener", "curtis-schiff", "curtis-schiff-prediction-markets",
"prophetx", "techpolicypress-staff",
"npr", "venturebeat", "geekwire", "payloadspace", "the-ankler",
"theankler", "tubefilter", "emarketer", "dagster",
"numerai", # fund/project brand, not person
"psl", "multistate",
}
PUBLISHERS_ACADEMIC = {
# Academic orgs, labs, papers, journals, institutions
"arxiv", "metr", "metr_evals", "apollo-research", "apollo research", "apolloresearch",
"jacc-study-authors", "jacc-data-report-authors",
"anthropic-fellows-program", "anthropic-fellows",
"anthropic-fellows-/-alignment-science-team", "anthropic-research",
"jmir-2024", "jmir 2024",
"oettl-et-al.,-journal-of-experimental-orthopaedics",
"oettl et al., journal of experimental orthopaedics",
"jacc", "nct06548490", "pmc",
"conitzer-et-al.-(2024)", "aquino-michaels-2026", "pan-et-al.",
"pan-et-al.-'natural-language-agent-harnesses'",
"stanford", "stanford-meta-harness",
"hendershot", "annals-im",
"nellie-liang,-brookings-institution", "nellie liang, brookings institution",
"penn-state", "american-heart-association", "american heart association",
"molt_cornelius", "molt-cornelius",
# Companies / labs / brand-orgs (not specific humans)
"anthropic", "anthropicai", "openai", "nasa", "icrc", "ecri",
"epochairesearch", "metadao", "iapam", "icer",
"who", "ama", "uspstf", "unknown",
"futard.io", # protocol/platform
"oxford-martin-ai-governance-initiative",
"oxford-martin-ai-governance",
"u.s.-food-and-drug-administration",
"jitse-goutbeek,-european-policy-centre", # cited person+org string → publisher
"adepoju-et-al.", # paper citation
# Formal-citation names (Firstname-Lastname or Lastname-et-al) — classified
# as academic citations, not reachable contributors. They'd need an @ handle
# to get CI credit per Cory's growth-loop design.
"senator-elissa-slotkin",
"bostrom", "hanson", "kaufmann", "noah-smith", "doug-shapiro",
"shayon-sengupta", "shayon sengupta",
"robin-hanson", "robin hanson", "eliezer-yudkowsky",
"leopold-aschenbrenner", "aschenbrenner",
"ramstead", "larsson", "heavey",
"dan-slimmon", "van-leeuwaarden", "ward-whitt", "adams",
"tamim-ansary", "spizzirri",
"dario-amodei", # formal-citation form (real @ is @darioamodei)
"corless", "oxranga", "vlahakis",
# Brand/project/DAO tokens — not individuals
"areal-dao", "areal", "theiaresearch", "futard-io", "dhrumil",
# Classic formal-citation names — famous academics/economists cited by surname.
# Reachable via @ handle if/when they join (e.g. Ostrom has no X, Hayek deceased,
# Friston has an institutional affiliation not an @ handle we'd track).
"clayton-christensen", "hidalgo", "coase", "wiener", "juarrero",
"ostrom", "centola", "hayek", "marshall-mcluhan", "blackmore",
"knuth", "friston", "aquino-michaels", "conitzer", "bak",
}
# NOTE: pseudonymous X handles that MAY be real contributors stay in keep_person:
# karpathy, simonw, swyx, metaproph3t, metanallok, mmdhrumil, sjdedic,
# ceterispar1bus — these are real X accounts and match Cory's growth loop.
# They appear without @ prefix because extraction frontmatter didn't normalize.
# Auto-creating them as contributors tier='cited' is correct (A-path from earlier).
PUBLISHERS_SOCIAL = {
"x", "twitter", "telegram", "x.com",
}
PUBLISHERS_INTERNAL = {
"teleohumanity-manifesto", "strategy-session-journal",
"living-capital-thesis-development", "attractor-state-historical-backtesting",
"web-research-compilation", "architectural-investing",
"governance---meritocratic-voting-+-futarchy", # title artifact
"sec-interpretive-release-s7-2026-09-(march-17", # title artifact
"mindstudio", # tooling/platform, not contributor
}
# Merge into one kind→set map for classification
PUBLISHER_KIND_MAP = {}
for h in PUBLISHERS_NEWS:
PUBLISHER_KIND_MAP[h.lower()] = "news"
for h in PUBLISHERS_ACADEMIC:
PUBLISHER_KIND_MAP[h.lower()] = "academic"
for h in PUBLISHERS_SOCIAL:
PUBLISHER_KIND_MAP[h.lower()] = "social_platform"
for h in PUBLISHERS_INTERNAL:
PUBLISHER_KIND_MAP[h.lower()] = "internal"
# Garbage: handles that are clearly parse artifacts, not real names.
# Pattern: contains parens, special chars, or >50 chars.
def is_garbage(handle: str) -> bool:
h = handle.strip()
if len(h) > 50:
return True
if re.search(r"[()\[\]<>{}\/\\|@#$%^&*=?!:;\"']", h):
# But @ can appear legitimately in handles like @thesensatore — allow if @ is only prefix
if h.startswith("@") and not re.search(r"[()\[\]<>{}\/\\|#$%^&*=?!:;\"']", h):
return False
return True
# Multi-word hyphenated with very specific artifact shape: 3+ hyphens in a row or trailing noise
if "---" in h or "---meritocratic" in h or h.endswith("(march") or h.endswith("-(march"):
return True
return False
def classify(handle: str) -> tuple[str, str | None]:
"""Return (category, publisher_kind).
category {'keep_agent', 'keep_person', 'publisher', 'garbage', 'review_needed'}
publisher_kind {'news','academic','social_platform','internal', None}
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return ("keep_agent", None)
if h in PUBLISHER_KIND_MAP:
return ("publisher", PUBLISHER_KIND_MAP[h])
if is_garbage(handle):
return ("garbage", None)
# @-prefixed handles or short-slug real-looking names → keep as person
# (Auto-create rule from Cory: @ handles auto-join as tier='cited'.)
if handle.startswith("@"):
return ("keep_person", None)
# Plausible handles (<=39 chars, alphanum + underscore/hyphen): treat as person.
# 39-char ceiling matches GitHub's handle limit and the writer path in
# contributor.py::_HANDLE_RE, so a valid 21-39 char real handle won't fall
# through to review_needed and block --apply.
if re.match(r"^[a-z0-9][a-z0-9_-]{0,38}$", h):
return ("keep_person", None)
# Everything else: needs human review
return ("review_needed", None)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="Write changes to DB")
parser.add_argument("--show", type=str, help="Inspect a single handle")
parser.add_argument("--delete-events", action="store_true",
help="DELETE contribution_events for publishers+garbage (default: keep for audit)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: publishers table must exist (v26 migration applied)
try:
conn.execute("SELECT 1 FROM publishers LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: publishers table missing. Run migration v26 first.", file=sys.stderr)
sys.exit(2)
rows = conn.execute(
"SELECT handle, kind, tier, claims_merged FROM contributors ORDER BY claims_merged DESC"
).fetchall()
if args.show:
target = args.show.strip().lower().lstrip("@")
for r in rows:
if r["handle"].lower().lstrip("@") == target:
category, pkind = classify(r["handle"])
events_count = conn.execute(
"SELECT COUNT(*) FROM contribution_events WHERE handle = ?",
(r["handle"].lower().lstrip("@"),),
).fetchone()[0]
print(f"handle: {r['handle']}")
print(f"current_kind: {r['kind']}")
print(f"current_tier: {r['tier']}")
print(f"claims_merged: {r['claims_merged']}")
print(f"events: {events_count}")
print(f"→ category: {category}")
if pkind:
print(f"→ publisher: kind={pkind}")
return
print(f"No match for '{args.show}'")
return
# Classify all
buckets: dict[str, list[dict]] = {
"keep_agent": [],
"keep_person": [],
"publisher": [],
"garbage": [],
"review_needed": [],
}
for r in rows:
category, pkind = classify(r["handle"])
buckets[category].append({
"handle": r["handle"],
"kind_now": r["kind"],
"tier": r["tier"],
"claims": r["claims_merged"] or 0,
"publisher_kind": pkind,
})
print("=== Classification summary ===")
for cat, items in buckets.items():
print(f" {cat:18s} {len(items):5d}")
print("\n=== Sample of each category ===")
for cat, items in buckets.items():
print(f"\n--- {cat} (showing up to 10) ---")
for item in items[:10]:
tag = f"{item['publisher_kind']}" if item["publisher_kind"] else ""
print(f" {item['handle']:50s} claims={item['claims']:5d}{tag}")
print("\n=== Full review_needed list ===")
for item in buckets["review_needed"]:
print(f" {item['handle']:50s} claims={item['claims']:5d}")
# Diagnostic: orphan alias count for handles we're about to delete.
# Contributor_aliases has no FK (SQLite FKs require PRAGMA to enforce anyway),
# so aliases pointing to deleted canonical handles become orphans. Surface
# the count so the --delete-events decision is informed.
doomed = [item["handle"].lower().lstrip("@") for item in buckets["garbage"] + buckets["publisher"]]
if doomed:
placeholders = ",".join("?" * len(doomed))
orphan_count = conn.execute(
f"SELECT COUNT(*) FROM contributor_aliases WHERE canonical IN ({placeholders})",
doomed,
).fetchone()[0]
print(f"\n=== Alias orphan check ===")
print(f" contributor_aliases rows pointing to deletable canonicals: {orphan_count}")
if orphan_count:
print(f" (cleanup requires --delete-events; without it, aliases stay as orphans)")
if not args.apply:
print("\n(dry-run — no writes. Re-run with --apply to execute.)")
return
# ── Apply changes ──
print("\n=== Applying changes ===")
if buckets["review_needed"]:
print(f"ABORT: {len(buckets['review_needed'])} rows need human review. Fix classifier before --apply.")
sys.exit(3)
inserted_publishers = 0
reclassified_agents = 0
deleted_garbage = 0
deleted_publisher_rows = 0
deleted_events = 0
deleted_aliases = 0
# Single transaction — if any step errors, roll back. This prevents the failure
# mode where a publisher insert fails silently and we still delete the contributor
# row, losing data.
try:
conn.execute("BEGIN")
# 1. Insert publishers. Track which ones succeeded so step 4 only deletes those.
# Counter uses cur.rowcount so replay runs (where publishers already exist)
# report accurate inserted=0 instead of falsely claiming the full set.
# moved_to_publisher is unconditional — the contributors row still needs to
# be deleted even when the publishers row was added in a prior run.
moved_to_publisher = set()
for item in buckets["publisher"]:
name = item["handle"].strip().lower().lstrip("@")
cur = conn.execute(
"INSERT OR IGNORE INTO publishers (name, kind) VALUES (?, ?)",
(name, item["publisher_kind"]),
)
if cur.rowcount > 0:
inserted_publishers += 1
moved_to_publisher.add(item["handle"])
# 2. Ensure Pentagon agents have kind='agent' (idempotent after v25 patch)
for item in buckets["keep_agent"]:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(item["handle"].lower().lstrip("@"),),
)
reclassified_agents += 1
# 3. Delete garbage handles from contributors (and their events + aliases)
for item in buckets["garbage"]:
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_garbage += cur.rowcount
# 4. Delete publisher rows from contributors — ONLY for those successfully
# inserted into publishers above. Guards against partial failure.
# Aliases pointing to publisher-classified handles get cleaned under the
# same --delete-events gate: publishers live in their own table now, any
# leftover aliases in contributor_aliases are orphans.
for item in buckets["publisher"]:
if item["handle"] not in moved_to_publisher:
continue
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_publisher_rows += cur.rowcount
# 5. Audit log entry for the destructive operation (Ganymede Q5).
conn.execute(
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
(
"schema_v26",
"classify_contributors",
json.dumps({
"publishers_inserted": inserted_publishers,
"agents_updated": reclassified_agents,
"garbage_deleted": deleted_garbage,
"publisher_rows_deleted": deleted_publisher_rows,
"events_deleted": deleted_events,
"aliases_deleted": deleted_aliases,
"delete_events_flag": bool(args.delete_events),
}),
),
)
conn.commit()
except Exception as e:
conn.rollback()
print(f"ERROR: Transaction failed, rolled back. {e}", file=sys.stderr)
sys.exit(4)
print(f" publishers inserted: {inserted_publishers}")
print(f" agents kind='agent' ensured: {reclassified_agents}")
print(f" garbage rows deleted: {deleted_garbage}")
print(f" publisher rows removed from contributors: {deleted_publisher_rows}")
if args.delete_events:
print(f" contribution_events deleted: {deleted_events}")
print(f" contributor_aliases deleted: {deleted_aliases}")
else:
print(f" (events + aliases kept — re-run with --delete-events to clean them)")
if __name__ == "__main__":
main()