Compare commits

..

8 commits

Author SHA1 Message Date
3fe524dd14 fix(classify): Ganymede review fixes — alias cleanup + counter accuracy + handle alignment
Some checks failed
CI / lint-and-test (push) Has been cancelled
1. WARNING — orphan contributor_aliases after publisher/garbage delete:
   Added alias cleanup to the transaction (gated on --delete-events, same
   audit rationale as events). Both garbage and publisher deletion loops
   now DELETE matching contributor_aliases rows. Dry-run adds an orphan
   count diagnostic so the --delete-events decision is informed.

2. NIT — inserted_publishers counter over-reports on replay:
   INSERT OR IGNORE silently skips name collisions, but the counter
   incremented unconditionally. Now uses cur.rowcount so a second apply
   reports 0 inserts instead of falsely claiming 100. moved_to_publisher
   set remains unconditional — publisher rows already present still need
   the matching contributors row deleted.

3. NIT — handle-length gate diverged from writer path:
   Widened from {0,19} (20 chars) to {0,38} (39 chars) to match GitHub's
   handle limit and contributor.py::_HANDLE_RE. Prevents future long-handle
   real contributors from falling through to review_needed and blocking
   --apply. Current data has 0 review_needed either way.

Bonus (Q5): Added audit_log entry inside the transaction. One row in
audit_log.stage='schema_v26', event='classify_contributors' with counter
detail JSON on every --apply run. Cheap audit trail for the destructive op.

Verified end-to-end on VPS DB snapshot:
- First apply: 100/9/9/100/0 (matches pre-fix)
- Second apply: 0/9/0/0/0 (counter fix working)
- With injected aliases + --delete-events: 2 aliases deleted, 1 pre-existing
  orphan correctly left alone (outside script scope), audit_log entry
  written with accurate counters.

Ganymede msg-3. Protocol closed.
2026-04-24 20:47:21 +01:00
45b2f6de20 feat(schema): v26 — publishers + contributor_identities + sources provenance
Separates three concerns currently conflated in contributors table:
  contributors — people + agents we credit (kind in 'person','agent')
  publishers   — news orgs / academic venues / platforms (not credited)
  sources      — gains publisher_id + content_type + original_author columns

Rationale (Cory directive Apr 24): livingip.xyz leaderboard was showing CNBC,
SpaceNews, TechCrunch etc. at the top because the attribution pipeline credited
news org names as if they were contributors. The mechanism-level fix is a
schema split — orgs live in publishers, individuals in contributors, each
table has one semantics.

Migration v26:
  - CREATE TABLE publishers (id PK, name UNIQUE, kind CHECK IN
    news|academic|social_platform|podcast|self|internal|legal|government|
    research_org|commercial|other, url_pattern, created_at)
  - CREATE TABLE contributor_identities (contributor_handle, platform CHECK IN
    x|telegram|github|email|web|internal, platform_handle, verified, created_at)
    Composite PK on (platform, platform_handle) + index on contributor_handle.
    Enables one contributor to unify X + TG + GitHub handles.
  - ALTER TABLE sources ADD COLUMN publisher_id REFERENCES publishers(id)
  - ALTER TABLE sources ADD COLUMN content_type
    (article|paper|tweet|conversation|self_authored|webpage|podcast)
  - ALTER TABLE sources ADD COLUMN original_author TEXT
    (free-text fallback, e.g., "Kim et al." — not credit-bearing)
  - ALTER TABLE sources ADD COLUMN original_author_handle REFERENCES contributors(handle)
    (set only when the author is in our contributor network)
  - ALTER wrapped in try/except on "duplicate column" for replay safety
  - Both SCHEMA_SQL (fresh installs) + migration block (upgrades) updated
  - SCHEMA_VERSION bumped 25 -> 26

Migration is non-breaking. No data moves yet. Existing publishers-polluting-
contributors row state is preserved until the classifier runs. Writer routing
to these tables lands in a separate branch (Phase B writer changes).

Classifier (scripts/classify-contributors.py):
  Analyzes existing contributors rows, buckets into:
    keep_agent   — 9 Pentagon agents
    keep_person  — 21 real humans + reachable pseudonymous X/TG handles
    publisher    — 100 news orgs, academic venues, formal-citation names,
                   brand/platform names
    garbage      — 9 parse artifacts (containing /, parens, 3+ hyphens)
    review_needed — 0 (fully covered by current allowlists)

  Hand-curated allowlists for news/academic/social/internal publisher kinds.
  Garbage detection via regex on special chars and length > 50.
  Named pseudonyms without @ prefix (karpathy, simonw, swyx, metaproph3t,
  sjdedic, ceterispar1bus, etc.) classified as keep_person — they're real
  X/TG contributors missing an @ prefix because extraction frontmatter
  didn't normalize. Cory's auto-create rule catches these on first reference.

  Formal-citation names (Firstname-Lastname form — Clayton Christensen, Hayek,
  Ostrom, Friston, Bostrom, Bak, etc.) classified as academic publishers —
  these are cited, not reachable via @ handle. Get promoted to contributors
  if/when they sign up with an @ handle.

  Apply path is transactional (BEGIN / COMMIT / ROLLBACK on error). Publisher
  insert happens before contributor delete, and contributor delete is gated
  on successful insert so we never lose a row by moving it to a failed
  publisher insert.

  --apply path flags:
    --delete-events  : also DELETE contribution_events rows for moved handles
                       (default: keep events for audit trail)
  --show <handle>   : inspect a single row's classification

Smoke-tested end-to-end via local copy of VPS DB:
  Before: 139 contributors total (polluted with orgs)
  After:  30 contributors (9 agent + 21 person), 100 publishers, 9 deleted
  contribution_events: 3,705 preserved
  contributors <-> publishers overlap: 0

Named contributors verified present after --apply:
  alexastrum (claims=6)  thesensatore (5)  cameron-s1 (1)  m3taversal (1011)

Pentagon agent 'pipeline' (claims_merged=771) intentionally retained — it's
the process name from old extract.py fallback path, not a real contributor.
Classified as agent (kind='agent') so doesn't appear in person leaderboard.

Deploy sequence after Ganymede review:
  1. Branch ff-merge to main
  2. scp lib/db.py + scripts/classify-contributors.py to VPS
  3. Pipeline already at v26 (migration ran during earlier v26 restart)
  4. Run dry-run: python3 ops/classify-contributors.py
  5. Apply: python3 ops/classify-contributors.py --apply
  6. Verify: livingip.xyz leaderboard stops showing CNBC/SpaceNews
  7. Argus /api/contributors unaffected (reads contributors directly, now clean)

Follow-up branch (not in this commit):
  - Writer routing in lib/contributor.py + extract.py:
    org handles -> publishers table + sources.publisher_id
    person handles with @ prefix -> auto-create contributor, tier='cited'
    formal-citation names -> sources.original_author (free text)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 20:47:21 +01:00
f0f9388c1f feat(diagnostics): add POST /api/search for chat API contract
Some checks are pending
CI / lint-and-test (push) Waiting to run
Wire the search endpoint to accept POST bodies matching the embedded
chat contract (query/limit/min_score/domain/confidence/exclude →
slug/path/title/domain/confidence/score/body_excerpt). GET path retained
for legacy callers and adds a min_score override for hackathon debug.

- _qdrant_hits_to_results() shapes raw hits into chat response format
- handle_api_search() dispatches POST vs GET
- /api/search added to _PUBLIC_PATHS (chat is unauthenticated)
- POST route registered alongside existing GET

Resolves VPS↔repo drift flagged by Argus before next deploy.sh run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 17:58:30 +01:00
0f2b153c92 fix(backfill): Ganymede review — fix tautological guard + origin='human'
Some checks are pending
CI / lint-and-test (push) Waiting to run
Addresses two findings in commit 762fd42 review:

1. BUG: guard query was tautological. `SELECT MAX(number) FROM prs WHERE
   number < 900000` filters out exactly what the `>= 900000` check tests.
   Replaced with a direct check for unexpected rows in the synthetic range
   (excluding our known 900068/900088).

2. WARNING: origin defaults to 'pipeline' via schema default. lib/merge.py
   convention is origin='human' for external contributors. Synthetic rows
   now set origin='human', priority='high' — matches discover_external_prs
   for real GitHub PRs. Prevents Phase B origin-based filtering from
   misclassifying Alex/Cameron as machine-authored.

Also flagged in review: credit projection was optimistic. Author events are
PR-level (not per-claim), so Alex gets 1×0.30 author credit, not 6. Same
for Cameron. Per-claim originator credit goes to the 7 frontmatter sourcers
where applicable. Not a code change — expectation reset for Cory.
2026-04-24 16:49:12 +01:00
762fd4233e feat(backfill): synthetic PR rows for pre-mirror GitHub PRs #68 (Alex) + #88 (Cameron)
Two historical GitHub PRs merged before our sync-mirror.sh tracked github_pr:
  - GitHub PR #68: alexastrum, 6 claims, merged Mar 9 2026 via squash merge
  - GitHub PR #88: Cameron-S1, 1 claim, merged early April

Their claim files were lost during a Forgejo→GitHub mirror overwrite and later
recovered via direct-to-main commits (dba00a79, da64f805). Because the
recovery commits bypassed the pipeline, our 'prs' table has no row to attach
originator events to — all 4 backfill-events.py strategies returned None,
leaving Alex + Cameron at 0 originator credits despite real historical work.

This reconstructs synthetic 'prs' rows so the existing github_pr strategy in
backfill-events.py attaches 7 originator events on re-run:
  - Numbers 900068 / 900088 live in a clearly-synthetic range that cannot
    collide with real Forgejo PRs (current max: 3941)
  - github_pr=68/88 wires up the existing lookup strategy
  - submitted_by=alexastrum / cameron-s1 establishes author attribution
  - merged_at from the recovery commit messages (not recovery-commit time)
  - last_error tags the rows as synthetic for future audits

Idempotent: INSERT OR IGNORE via check on number OR github_pr. Safe to replay.
Reversible: DELETE FROM prs WHERE number IN (900068, 900088).

After applying this script:
  python3 ops/backfill-events.py
will credit Alex with 6 author + 6 originator events (author=1.80, originator=0.90)
and Cameron with 1 author + 1 originator (0.30 + 0.15), all dated to the
historical merge dates — so 7d/30d leaderboard windows show them correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:33:37 +01:00
10d5c275da fix(backfill): normalize commit_date via datetime() in time-proximity query
Some checks are pending
CI / lint-and-test (push) Waiting to run
SQLite datetime comparison fails lexicographically across ISO-T and
space-separator formats: '2026-03-27 18:00:14' < '2026-03-27T17:43:04+00:00'
because space (0x20) < T (0x54). PRs merged same-day but earlier than the
commit hour were silently excluded from the time-proximity cascade.

Shaga's 3 stigmergic-coordination claims resolved to PR #2032 (later, wrong)
instead of #2025 (earlier, correct). Fixed by wrapping both sides in
datetime(), which normalizes to space-separator before comparison.

Verified: all 3 Shaga claims now resolve to #2025 via git_time_proximity.
No change to totals (126 originator events, 5 proximity hits) — the fix
corrects WHICH PR each proximity-matched claim resolves to, not whether.

Caught by Ganymede review of 1d6b515.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:16:03 +01:00
1d6b51527a feat(backfill): 4-strategy PR recovery for originator events
Rewrite claim-level pass in backfill-events.py to recover the Forgejo PR
that introduced each claim via a cascade of 4 strategies (reliability
order), replacing the single title→description match that missed PRs
with NULL description (Cameron #3377) and bare-subject extracts (Shaga's
Leo research PR).

## Strategies
  1. sourced_from frontmatter → prs.source_path stem match
  2. git log first-add commit → subject pattern → prs.branch
     - "<agent>: extract claims from <slug>"  → extract/<slug>
     - "<agent>: research session YYYY-MM-DD" → <agent>/research-<date>
     - "<agent>: (challenge|contrib|entity|synthesize)" → <agent>/*
     - "Recover X from GitHub PR #N"           → prs.github_pr=N
     - "Extract N claims from X" (no prefix)   → time-proximity on
       agent-owned branches within 24h
  3. Current title_desc fallback for anything the above miss

## Dry-run projection (1,662 merged PRs)

Before:
  Claims processed: 33
  Originator events: 6
  Breakdown: {no_pr_match: 1608, no_sourcer: 26, invalid_handle: 21, skip_self: 6}

After:
  Claims processed: 505 (+472)
  Originator events: 126 (+120)
  Strategy hits: git_subject=412, sourced_from=88, git_time_proximity=5
  Breakdown: {no_pr_match: 1095, no_sourcer: 67, invalid_handle: 359, skip_self: 20}

## Verified on real VPS data
- @thesensatore claims: 3/5 resolve via git_time_proximity to leo/ PRs
- Cameron-S1, alexastrum: remain None — their recovery commits
  (dba00a79, da64f805) bypassed the pipeline entirely, no Forgejo PR
  record exists. Requires synthetic prs rows — deferred to separate
  commit with its own Ganymede review (write operation, larger blast
  radius than this pure-read backfill change).

## Implementation
- New find_pr_for_claim(conn, repo, md) helper returns (pr_number, strategy)
- Claim-level pass uses it first, falls back to title_desc map
- Strategy counter surfaced in summary output for operator visibility

Idempotent — backfill re-runs skip duplicate events via the partial
UNIQUE index on contribution_events.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:06:52 +01:00
540ba97b9d fix(attribution): Phase A followup — bug #1 + 4 nits + refactor (Ganymede review)
Some checks are pending
CI / lint-and-test (push) Waiting to run
Addresses Apr 24 review of 58fa8c52. All 6 findings landed.

Bug #1 — git log -1 returns latest commit, not first (semantic mismatch
with "original author" comment):
  Drop -1 flag, take last line of default-ordered log output (= oldest).
  Fixes mis-credit on multi-commit PRs where a reviewer rebased/force-pushed.

Nit #2 — forward writer didn't pass merged_at:
  Fetch merged_at in the prs SELECT, thread pr_merged_at through all 5
  insert_contribution_event call sites. Keeps forward-emitted and backfilled
  event timestamps on the same timeline after merge retries.

Nit #3 — legacy-counts fallback paths emit no events (parity gap):
  git-author and prs.agent fallback paths now emit challenger/synthesizer
  events via the TRAILER_EVENT_ROLE map when refined_type matches. Closes
  the gap where external-contributor challenge/enrich PRs would accumulate
  legacy counts but disappear from event-sourced leaderboards.

Nit #4 — migration v24 agent seed missing 'pipeline':
  Added "pipeline" to the seed list. Plus new migration v25 with idempotent
  corrective UPDATE so existing envs (where v24 already ran) pick up the
  fix on restart without requiring manual SQL. Verified on VPS state:
  pipeline row was kind='person', will flip to 'agent' on redeploy.

Nit #5 — backfill summary prints originator attempted=0 in wrong pass:
  Split the "=== Summary ===" header into "=== PR-level events ===" and
  "=== Claim-level originator pass ===" with originator counts in the
  right block. Operator-facing cosmetic.

Refactor #6 — AGENT_BRANCH_PREFIXES duplicated in 2 sites:
  Extracted to lib/attribution.py as single source of truth. contributor.py
  imports it. backfill-events.py keeps its local copy (runs standalone
  without pipeline package import) with a sync-reference comment.

No behavioral drift for the common case. Backfill re-runs cleanly against
existing forward-written events (UNIQUE-index idempotency).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 14:13:54 +01:00
7 changed files with 1036 additions and 59 deletions

View file

@ -42,7 +42,7 @@ API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secret
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/prs", "/ops", "/health", "/agents", "/epistemic", "/legacy", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest"})
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest", "/api/search"})
def _get_db() -> sqlite3.Connection:
@ -663,38 +663,115 @@ async def handle_api_domains(request):
return web.json_response({"domains": breakdown})
async def handle_api_search(request):
"""GET /api/search — semantic search over claims via Qdrant + graph expansion.
def _qdrant_hits_to_results(hits, include_expanded=False):
"""Shape raw Qdrant hits into Ship's chat-API contract."""
results = []
for h in hits:
payload = h.get("payload", {}) or {}
path = payload.get("claim_path", "") or ""
slug = path.rsplit("/", 1)[-1]
if slug.endswith(".md"):
slug = slug[:-3]
results.append({
"slug": slug,
"path": path,
"title": payload.get("claim_title", ""),
"domain": payload.get("domain"),
"confidence": payload.get("confidence"),
"score": round(float(h.get("score", 0.0) or 0.0), 4),
"body_excerpt": payload.get("snippet", "") or "",
})
return results
Query params:
q: search query (required)
domain: filter by domain (optional)
confidence: filter by confidence level (optional)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
async def handle_api_search(request):
"""Semantic search over claims via Qdrant.
POST contract (Ship's chat API):
body: {"query": str, "limit": int, "min_score": float?, "domain": str?, "confidence": str?, "exclude": [str]?}
response: {"query": str, "results": [{"slug","path","title","domain","confidence","score","body_excerpt"}], "total": int}
GET (legacy + hackathon debug):
q: search query (required)
limit, domain, confidence, exclude, expand
min_score: if set, bypasses two-pass lib threshold (default lib behavior otherwise)
"""
if request.method == "POST":
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON body"}, status=400)
query = (body.get("query") or "").strip()
if not query:
return web.json_response({"error": "query required"}, status=400)
try:
limit = min(int(body.get("limit") or 5), 50)
except (TypeError, ValueError):
return web.json_response({"error": "limit must be int"}, status=400)
try:
min_score = float(body.get("min_score") if body.get("min_score") is not None else 0.25)
except (TypeError, ValueError):
return web.json_response({"error": "min_score must be float"}, status=400)
domain = body.get("domain")
confidence = body.get("confidence")
exclude = body.get("exclude") or None
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
results = _qdrant_hits_to_results(hits)
return web.json_response({"query": query, "results": results, "total": len(results)})
# GET path
query = request.query.get("q", "").strip()
if not query:
return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain")
confidence = request.query.get("confidence")
limit = min(int(request.query.get("limit", "10")), 50)
try:
limit = min(int(request.query.get("limit", "10")), 50)
except ValueError:
return web.json_response({"error": "limit must be int"}, status=400)
exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false"
min_score_raw = request.query.get("min_score")
# Use shared search library (Layer 1 + Layer 2)
if min_score_raw is not None:
try:
min_score = float(min_score_raw)
except ValueError:
return web.json_response({"error": "min_score must be float"}, status=400)
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
direct = _qdrant_hits_to_results(hits)
return web.json_response({
"query": query,
"direct_results": direct,
"expanded_results": [],
"total": len(direct),
})
# Default GET: Layer 1 + Layer 2 via lib
result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude)
if "error" in result:
error = result["error"]
if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500)
return web.json_response(result)
@ -2268,6 +2345,7 @@ def create_app() -> web.Application:
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search)
app.router.add_post("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage)

View file

@ -21,6 +21,14 @@ logger = logging.getLogger("pipeline.attribution")
VALID_ROLES = frozenset({"sourcer", "extractor", "challenger", "synthesizer", "reviewer"})
# Agent-owned branch prefixes — PRs from these branches get Pentagon-Agent trailer
# credit for challenger/synthesizer roles. Pipeline-infra branches (extract/ reweave/
# fix/ ingestion/) are deliberately excluded: they're automation, not contribution.
# Single source of truth; imported by contributor.py and backfill-events.py.
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
# Handle sanity: lowercase alphanumerics, hyphens, underscores. 1-39 chars (matches
# GitHub's handle rules). Rejects garbage like "governance---meritocratic-voting-+-futarchy"
# or "sec-interpretive-release-s7-2026-09-(march-17" that upstream frontmatter hygiene

View file

@ -14,7 +14,7 @@ import logging
import re
from . import config, db
from .attribution import classify_kind, normalize_handle
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle
from .forgejo import get_pr_diff
logger = logging.getLogger("pipeline.contributor")
@ -186,7 +186,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
# Refine commit_type from diff content (branch prefix may be too broad)
row = conn.execute(
"SELECT commit_type, submitted_by, domain, source_channel, leo_verdict, "
"domain_verdict, domain_agent FROM prs WHERE number = ?",
"domain_verdict, domain_agent, merged_at FROM prs WHERE number = ?",
(pr_number,),
).fetchone()
branch_type = row["commit_type"] if row and row["commit_type"] else "extract"
@ -199,6 +199,10 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
pr_domain = row["domain"] if row else None
pr_channel = row["source_channel"] if row else None
pr_submitted_by = row["submitted_by"] if row else None
# Use the PR's merged_at timestamp so event time matches the actual merge.
# If a merge retries after a crash, this keeps forward-emitted and backfilled
# events on the same timeline. Falls back to datetime('now') in the writer.
pr_merged_at = row["merged_at"] if row and row["merged_at"] else None
# ── AUTHOR event (schema v24, double-write) ──
# Humans-are-always-author rule: the human in the loop gets author credit.
@ -211,27 +215,33 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
if pr_submitted_by:
author_candidate = pr_submitted_by
else:
# External GitHub PRs: git author is the real submitter.
rc_author_head, author_head = await git_fn(
# External GitHub PRs: git author of the FIRST commit on the branch is
# the real submitter. `git log -1` would return the latest commit, which
# mis-credits multi-commit PRs where a reviewer rebased or force-pushed.
# Take the last line of the unreversed log (= oldest commit, since git
# log defaults to reverse-chronological). Ganymede review, Apr 24.
rc_author_log, author_log = await git_fn(
"log", f"origin/main..origin/{branch}", "--no-merges",
"--format=%an", "-1", timeout=5,
"--format=%an", timeout=5,
)
if rc_author_head == 0 and author_head.strip():
candidate = author_head.strip().lower()
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions"}:
author_candidate = candidate
if rc_author_log == 0 and author_log.strip():
lines = [line for line in author_log.strip().split("\n") if line.strip()]
if lines:
candidate = lines[-1].strip().lower()
if candidate and candidate not in {"teleo", "teleo-bot", "pipeline",
"github-actions[bot]", "forgejo-actions"}:
author_candidate = candidate
# Agent-owned branches with no submitted_by: theseus/research-*, leo/*, etc.
if not author_candidate and "/" in branch:
prefix = branch.split("/", 1)[0]
# Exclude pipeline-infrastructure prefixes — those are not authors.
if prefix in ("rio", "theseus", "leo", "vida", "clay", "astra", "oberon"):
author_candidate = prefix
if not author_candidate and branch.startswith(AGENT_BRANCH_PREFIXES):
# Autonomous agent PR (theseus/research-*, leo/entity-*, etc.) —
# credit goes to the agent as author per Cory's directive.
author_candidate = branch.split("/", 1)[0]
if author_candidate:
insert_contribution_event(
conn, author_candidate, "author", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# ── EVALUATOR events (schema v24) ──
@ -243,6 +253,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
insert_contribution_event(
conn, "leo", "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
if row["domain_verdict"] == "approve" and row["domain_agent"]:
dagent = row["domain_agent"].strip().lower()
@ -250,6 +261,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
insert_contribution_event(
conn, dagent, "evaluator", pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Parse Pentagon-Agent trailer from branch commit messages
@ -257,9 +269,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
# Agent-owned branches (theseus/*, rio/*, etc.) give the trailer-named agent
# challenger/synthesizer credit based on refined commit_type. Pipeline-owned
# branches (extract/*, reweave/*, etc.) don't — those are infra, not work.
_AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
"astra/", "oberon/")
is_agent_branch = branch.startswith(_AGENT_BRANCH_PREFIXES)
is_agent_branch = branch.startswith(AGENT_BRANCH_PREFIXES)
_TRAILER_EVENT_ROLE = {
"challenge": "challenger",
"enrich": "synthesizer",
@ -285,6 +295,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
insert_contribution_event(
conn, agent_name, event_role, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(agent_name)
@ -346,6 +357,7 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
conn, handle, "originator", pr_number,
claim_path=rel_path,
domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
# Fallback: if no Pentagon-Agent trailer found, try git commit authors
@ -364,13 +376,35 @@ async def record_contributor_attribution(conn, pr_number: int, branch: str, git_
if author_name and author_name not in _BOT_AUTHORS:
role = commit_type_to_role(refined_type)
upsert_contributor(conn, author_name, None, role, today)
# Event-model parity: emit challenger/synthesizer event when
# the fallback credits a human/agent for that kind of work.
# Without this, external-contributor challenge/enrich PRs
# accumulate legacy counts but disappear from event-sourced
# leaderboards when Phase B cuts over. (Ganymede review.)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, author_name, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
agents_found.add(author_name)
if not agents_found:
row = conn.execute("SELECT agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
if row and row["agent"] and row["agent"] != "external":
fb_row = conn.execute(
"SELECT agent FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if fb_row and fb_row["agent"] and fb_row["agent"] != "external":
pr_agent = fb_row["agent"].lower()
role = commit_type_to_role(refined_type)
upsert_contributor(conn, row["agent"].lower(), None, role, today)
upsert_contributor(conn, pr_agent, None, role, today)
event_role_fb = _TRAILER_EVENT_ROLE.get(refined_type)
if event_role_fb:
insert_contribution_event(
conn, pr_agent, event_role_fb, pr_number,
claim_path=None, domain=pr_domain, channel=pr_channel,
timestamp=pr_merged_at,
)
def upsert_contributor(

View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 24
SCHEMA_VERSION = 26
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -35,6 +35,15 @@ CREATE TABLE IF NOT EXISTS sources (
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
-- v26: provenance publisher (news org / venue) + content author.
-- publisher_id references publishers(id) when source is from a known org.
-- original_author_handle references contributors(handle) when author is in our system.
-- original_author is free-text fallback ("Kim et al.", "Robin Hanson") not credit-bearing.
publisher_id INTEGER REFERENCES publishers(id),
content_type TEXT,
-- article | paper | tweet | conversation | self_authored | webpage | podcast
original_author TEXT,
original_author_handle TEXT REFERENCES contributors(handle),
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
@ -207,6 +216,33 @@ CREATE TABLE IF NOT EXISTS contributor_aliases (
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
-- Publishers: news orgs, academic venues, social platforms. NOT contributors these
-- provide metadata/provenance for sources, never earn leaderboard credit. Separating
-- these from contributors prevents CNBC/SpaceNews from dominating the leaderboard.
-- (Apr 24 Cory directive: "only credit the original source if its on X or tg")
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
-- Multi-platform identity: one contributor, many handles. Enables the leaderboard to
-- unify @thesensatore (X) + thesensatore (TG) + thesensatore@github into one person.
-- Writers check this table after resolving aliases to find canonical contributor handle.
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
"""
@ -737,9 +773,13 @@ def migrate(conn: sqlite3.Connection):
],
)
# Seed kind='agent' for known Pentagon agents so the events writer picks it up.
# Must stay in sync with lib/attribution.PENTAGON_AGENTS — drift causes
# contributors.kind to disagree with classify_kind() output for future
# inserts. (Ganymede review: "pipeline" was missing until Apr 24.)
pentagon_agents = [
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
]
for agent in pentagon_agents:
conn.execute(
@ -749,6 +789,62 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v24: added contribution_events + contributor_aliases tables, kind column")
if current < 25:
# v24 seeded 13 Pentagon agents but missed "pipeline" — classify_kind()
# treats it as agent so contributors.kind drifted from event-insert output.
# Idempotent corrective UPDATE: fresh installs have no "pipeline" row
# (no-op), upgraded envs flip it if it exists. (Ganymede review Apr 24.)
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = 'pipeline'"
)
conn.commit()
logger.info("Migration v25: patched kind='agent' for pipeline handle")
if current < 26:
# Add publishers + contributor_identities. Non-breaking — new tables only.
# No existing data moved. Classification into publishers happens via a
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
# seed list. CHECK constraint on contributors.kind deferred to v27 after
# classification completes. (Apr 24 Cory directive: "fix schema, don't
# filter output" — separate contributors from publishers at the data layer.)
conn.executescript("""
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
""")
# Extend sources with provenance columns. ALTER TABLE ADD COLUMN is
# idempotent-safe via try/except because SQLite doesn't support IF NOT EXISTS
# on column adds.
for col_sql in (
"ALTER TABLE sources ADD COLUMN publisher_id INTEGER REFERENCES publishers(id)",
"ALTER TABLE sources ADD COLUMN content_type TEXT",
"ALTER TABLE sources ADD COLUMN original_author TEXT",
"ALTER TABLE sources ADD COLUMN original_author_handle TEXT REFERENCES contributors(handle)",
):
try:
conn.execute(col_sql)
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
raise
conn.commit()
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -46,8 +46,11 @@ PENTAGON_AGENTS = frozenset({
"pipeline",
})
AGENT_BRANCH_PREFIXES = ("rio/", "theseus/", "leo/", "vida/", "clay/",
"astra/", "oberon/")
# Keep in sync with lib/attribution.AGENT_BRANCH_PREFIXES.
# Duplicated here because this script runs standalone (no pipeline package import).
AGENT_BRANCH_PREFIXES = (
"rio/", "theseus/", "leo/", "vida/", "astra/", "clay/", "oberon/",
)
TRAILER_EVENT_ROLE = {
"challenge": "challenger",
@ -196,6 +199,175 @@ def derive_author(conn: sqlite3.Connection, pr: dict) -> str | None:
return None
def find_pr_for_claim(
conn: sqlite3.Connection,
repo: Path,
md: Path,
) -> tuple[int | None, str]:
"""Recover the Forgejo PR number that introduced a claim file.
Returns (pr_number, strategy) strategy is one of:
'sourced_from' frontmatter sourced_from matched prs.source_path
'git_subject' git log first-add commit message matched a branch pattern
'title_desc' filename stem matched a title in prs.description
'github_pr' recovery commit mentioned GitHub PR # → prs.github_pr
'none' no strategy found a match
Order is chosen by reliability:
1. sourced_from (explicit provenance, most reliable when present)
2. git_subject (covers Leo research, Cameron challenges, Theseus contrib)
3. title_desc (current fallback brittle when description is NULL)
4. github_pr (recovery commits referencing erased GitHub PRs)
"""
rel = str(md.relative_to(repo))
# Strategy 1: sourced_from frontmatter → prs.source_path
try:
content = md.read_text(encoding="utf-8")
except (FileNotFoundError, PermissionError, UnicodeDecodeError):
content = ""
fm = parse_frontmatter(content) if content else None
if fm:
sourced = fm.get("sourced_from")
candidate_paths: list[str] = []
if isinstance(sourced, str) and sourced:
candidate_paths.append(sourced)
elif isinstance(sourced, list):
candidate_paths.extend(s for s in sourced if isinstance(s, str))
for sp in candidate_paths:
stem = Path(sp).stem
if not stem:
continue
row = conn.execute(
"""SELECT number FROM prs
WHERE source_path LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"%{stem}.md",),
).fetchone()
if row:
return row["number"], "sourced_from"
# Strategy 2: git log first-add commit → subject pattern → prs.branch
# Default log order is reverse-chronological; take the last line (oldest)
# to get the original addition, not later rewrites.
log_out = git(
"log", "--diff-filter=A", "--follow",
"--format=%H|||%s|||%b", "--", rel,
)
if log_out.strip():
# Split on the delimiter we chose. Each commit produces 3 fields but
# %b can contain blank lines — group by lines that look like a SHA.
blocks: list[tuple[str, str, str]] = []
current: list[str] = []
for line in log_out.splitlines():
if re.match(r"^[a-f0-9]{40}\|\|\|", line):
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
current = [line]
else:
current.append(line)
if current:
parts = "\n".join(current).split("|||", 2)
if len(parts) == 3:
blocks.append((parts[0], parts[1], parts[2]))
if blocks:
# Oldest addition — git log defaults to reverse-chronological
_oldest_sha, subject, body = blocks[-1]
# Pattern: "<agent>: extract claims from <slug>"
m = re.match(r"^(\w+):\s*extract\s+claims\s+from\s+(\S+)", subject)
if m:
slug = m.group(2).rstrip(".md").rstrip(".")
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"extract/{slug}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: research session <date>"
m = re.match(r"^(\w+):\s*research\s+session\s+(\d{4}-\d{2}-\d{2})", subject)
if m:
agent = m.group(1).lower()
date = m.group(2)
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/research-{date}%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Pattern: "<agent>: challenge" / contrib challenges / entity batches
m = re.match(r"^(\w+):\s*(?:challenge|contrib|entity|synthesize)", subject)
if m:
agent = m.group(1).lower()
row = conn.execute(
"""SELECT number FROM prs
WHERE branch LIKE ? AND status='merged'
ORDER BY merged_at ASC LIMIT 1""",
(f"{agent}/%",),
).fetchone()
if row:
return row["number"], "git_subject"
# Recovery commits referencing erased GitHub PRs (Alex/Cameron).
# Subject: "Recover <who> contribution from GitHub PR #NN (...)".
# Match only when a corresponding prs row exists with github_pr=NN —
# otherwise the claims were direct-to-main without a Forgejo PR
# record, which requires a synthetic PR row (follow-up, not in
# this script's scope).
gh_match = re.search(r"GitHub\s+PR\s+#(\d+)", subject + "\n" + body)
if gh_match:
gh_pr = int(gh_match.group(1))
row = conn.execute(
"SELECT number FROM prs WHERE github_pr = ? AND status='merged' LIMIT 1",
(gh_pr,),
).fetchone()
if row:
return row["number"], "github_pr"
# Pattern: bare "Extract N claims from <source-fragment>" (no
# agent prefix). Used in early research PRs like Shaga's claims
# at PR #2025. Fall back to time-proximity: find the earliest
# agent-branch PR merged within 24h AFTER this commit's date.
m = re.match(r"^Extract\s+\d+\s+claims\s+from\b", subject)
if m:
# Get commit author date
date_out = git(
"log", "-1", "--format=%aI", _oldest_sha, timeout=10,
)
commit_date = date_out.strip() if date_out.strip() else None
if commit_date:
# git %aI returns ISO 8601 with T-separator; prs.merged_at
# uses SQLite's space-separator. Lexicographic comparison
# fails across formats (space<T), so normalize commit_date
# via datetime() before comparing. Without this, PRs merged
# within the same calendar day but earlier than the commit
# hour are silently excluded (caught by Ganymede review —
# Shaga's #2025 was dropped in favor of later #2032).
row = conn.execute(
"""SELECT number FROM prs
WHERE status='merged'
AND merged_at >= datetime(?)
AND merged_at <= datetime(datetime(?), '+24 hours')
AND (branch LIKE 'leo/%' OR branch LIKE 'theseus/%'
OR branch LIKE 'rio/%' OR branch LIKE 'astra/%'
OR branch LIKE 'vida/%' OR branch LIKE 'clay/%')
ORDER BY merged_at ASC LIMIT 1""",
(commit_date, commit_date),
).fetchone()
if row:
return row["number"], "git_time_proximity"
return None, "none"
def emit(conn, counts, dry_run, handle, role, pr_number, claim_path, domain, channel, timestamp):
canonical = normalize_handle(conn, handle)
if not valid_handle(canonical):
@ -332,8 +504,10 @@ def main():
if not args.dry_run:
conn.commit()
print("\n=== Summary ===")
for role in ("author", "originator", "challenger", "synthesizer", "evaluator"):
# Originator is emitted in the claim-level pass below, not the PR-level pass.
# Previous summary listed it here with attempted=0 which confused operators.
print("\n=== PR-level events (author, evaluator, challenger, synthesizer) ===")
for role in ("author", "challenger", "synthesizer", "evaluator"):
att = counts[(role, "attempt")]
if args.dry_run:
wi = counts[(role, "would_insert")]
@ -343,18 +517,16 @@ def main():
skip = counts[(role, "skipped_dup")]
print(f" {role:12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
if not args.dry_run:
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
print(f"\nTotal contribution_events rows: {total}")
# ── Per-claim originator pass ──
# Separate pass: walk the current knowledge tree, parse sourcer frontmatter,
# and attach each claim to the merging PR via a claim_path → pr_number map
# built from prs.description (pipe-separated claim titles). Imperfect — some
# PRs have NULL description or mismatched titles — but recovers the bulk of
# historical originator credit.
# Walk the knowledge tree, parse sourcer attribution, and attach each claim
# to its merging PR via find_pr_for_claim's multi-strategy recovery.
# Apr 24 rewrite (Ganymede-approved): replaces the single-strategy
# title→description match with four strategies in reliability order.
# Previous script missed PRs with NULL description (Cameron #3377) and
# cross-context claims (Shaga's Leo research). Fallback title-match is
# preserved to recover anything the git-log path misses.
print("\n=== Claim-level originator pass ===")
# Build title → pr_number map from prs.description
# Build title → pr_number map from prs.description (strategy 3 fallback)
title_to_pr: dict[str, int] = {}
for r in conn.execute(
"SELECT number, description FROM prs WHERE status='merged' AND description IS NOT NULL AND description != ''"
@ -367,6 +539,7 @@ def main():
title_to_pr[title.lower()] = r["number"]
claim_counts = Counter()
strategy_counts = Counter()
claim_count = 0
originator_count = 0
for md in sorted(repo.glob("domains/**/*.md")) + \
@ -374,13 +547,19 @@ def main():
sorted(repo.glob("foundations/**/*.md")) + \
sorted(repo.glob("decisions/**/*.md")):
rel = str(md.relative_to(repo))
# Match via filename stem (with spaces and hyphens) against description titles
stem = md.stem
# Multiple matching strategies
pr_number = title_to_pr.get(stem.lower())
# Strategies 1, 2, 4 via the helper (sourced_from, git_subject, github_pr).
pr_number, strategy = find_pr_for_claim(conn, repo, md)
# Strategy 3 (fallback): title-match against prs.description.
if not pr_number:
# Hyphenated slug → space variant
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
pr_number = title_to_pr.get(stem.lower())
if not pr_number:
pr_number = title_to_pr.get(stem.replace("-", " ").lower())
if pr_number:
strategy = "title_desc"
if not pr_number:
claim_counts["no_pr_match"] += 1
continue
@ -391,6 +570,7 @@ def main():
continue
claim_count += 1
strategy_counts[strategy] += 1
# Look up author for this PR to skip self-credit
pr_row = conn.execute(
"SELECT submitted_by, branch, domain, source_channel, merged_at FROM prs WHERE number = ?",
@ -419,12 +599,19 @@ def main():
print(f" Claims processed: {claim_count}")
print(f" Originator events emitted: {originator_count}")
print(f" Breakdown: {dict(claim_counts)}")
final_origin_attempted = counts[("originator", "attempt")]
print(f" Strategy hits: {dict(strategy_counts)}")
att = counts[("originator", "attempt")]
if args.dry_run:
print(f" (dry-run) originator would_insert={counts[('originator', 'would_insert')]}")
wi = counts[("originator", "would_insert")]
print(f" {'originator':12s} attempted={att:5d} would_insert={wi:5d}")
else:
print(f" originator inserted={counts[('originator', 'inserted')]} skipped_dup={counts[('originator', 'skipped_dup')]}")
ins = counts[("originator", "inserted")]
skip = counts[("originator", "skipped_dup")]
print(f" {'originator':12s} attempted={att:5d} inserted={ins:5d} skipped_dup={skip:5d}")
if not args.dry_run:
total = conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
print(f"\nTotal contribution_events rows: {total}")
if __name__ == "__main__":

View file

@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""Reconstruct synthetic `prs` rows for historical GitHub PRs lost pre-mirror-wiring.
Two PRs merged on GitHub before our sync-mirror.sh tracked `github_pr`:
- GitHub PR #68: alexastrum — 6 claims, merged 2026-03-09 via GitHub squash,
recovered to Forgejo via commit dba00a79 (Apr 16, after mirror erased files)
- GitHub PR #88: Cameron-S1 — 1 claim, recovered via commit da64f805
The recovery commits wrote the files directly to main, so our `prs` table has
no row to attach originator events to the backfill-events.py strategies all
return NULL. We reconstruct one synthetic `prs` row per historical GitHub PR so
the events pipeline (and `github_pr` strategy in backfill-events) can credit
Alex and Cameron properly.
Numbers 900000+ are clearly synthetic and won't collide with real Forgejo PRs.
Idempotent via INSERT OR IGNORE.
Usage:
python3 scripts/backfill-synthetic-recovery-prs.py --dry-run
python3 scripts/backfill-synthetic-recovery-prs.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Historical GitHub PRs recovered via direct-to-main commits.
# Original GitHub merge dates come from the recovery commit messages.
RECOVERY_PRS = [
{
"number": 900068,
"github_pr": 68,
"branch": "gh-pr-68",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "alexastrum",
"source_channel": "github",
# origin='human' matches lib/merge.py convention for external contributors
# (default is 'pipeline' which misclassifies us as machine-authored).
"origin": "human",
"priority": "high",
"description": "Multi-agent git workflows production maturity | Cryptographic agent trust ratings | Defense in depth for AI agent oversight | Deterministic policy engines below LLM layer | Knowledge validation four-layer architecture | Structurally separating proposer and reviewer agents",
"merged_at": "2026-03-09 00:00:00",
"created_at": "2026-03-08 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #68 pre-mirror-wiring reconstruction (commit dba00a79)",
},
{
"number": 900088,
"github_pr": 88,
"branch": "gh-pr-88",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "cameron-s1",
"source_channel": "github",
"origin": "human",
"priority": "high",
"description": "Orthogonality is an artefact of specification architectures not a property of intelligence itself",
"merged_at": "2026-04-01 00:00:00",
"created_at": "2026-04-01 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #88 pre-mirror-wiring reconstruction (commit da64f805)",
},
]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Guard against synthetic-range colonization (Ganymede review): check for
# any row in the synthetic range that isn't one of ours. INSERT OR IGNORE on
# the specific numbers is the real collision defense; this is belt-and-suspenders.
max_real = conn.execute(
"SELECT MAX(number) FROM prs WHERE number < 900000"
).fetchone()[0] or 0
print(f"Max real Forgejo PR number: {max_real}")
synth_conflict = conn.execute(
"SELECT number FROM prs WHERE number >= 900000 AND number NOT IN (900068, 900088) LIMIT 1"
).fetchone()
if synth_conflict:
print(f"ERROR: PR #{synth_conflict[0]} already exists in synthetic range. "
f"Pick a new range before running.", file=sys.stderr)
sys.exit(2)
inserted = 0
skipped = 0
for row in RECOVERY_PRS:
existing = conn.execute(
"SELECT number FROM prs WHERE number = ? OR github_pr = ?",
(row["number"], row["github_pr"]),
).fetchone()
if existing:
print(f" PR #{row['number']} (github_pr={row['github_pr']}): already exists — skip")
skipped += 1
continue
print(f" {'(dry-run) ' if args.dry_run else ''}INSERT synthetic PR #{row['number']} "
f"(github_pr={row['github_pr']}, submitted_by={row['submitted_by']}, "
f"merged_at={row['merged_at']})")
if not args.dry_run:
conn.execute(
"""INSERT INTO prs (
number, github_pr, branch, status, domain, commit_type, tier,
leo_verdict, domain_verdict, submitted_by, source_channel,
origin, priority,
description, merged_at, created_at, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
row["number"], row["github_pr"], row["branch"], row["status"],
row["domain"], row["commit_type"], row["tier"],
row["leo_verdict"], row["domain_verdict"],
row["submitted_by"], row["source_channel"],
row["origin"], row["priority"],
row["description"], row["merged_at"], row["created_at"],
row["last_error"],
),
)
inserted += 1
if not args.dry_run:
conn.commit()
print(f"\nInserted {inserted}, skipped {skipped}")
if not args.dry_run and inserted:
print("\nNext step: re-run backfill-events.py to attach originator events")
print(" python3 ops/backfill-events.py")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""Classify `contributors` rows into {keep_person, keep_agent, move_to_publisher, delete_garbage}.
Reads current contributors table, proposes reclassification per v26 schema design:
- Real humans + Pentagon agents stay in contributors (kind='person'|'agent')
- News orgs, publications, venues move to publishers table (new v26)
- Multi-word hyphenated garbage (parsing artifacts) gets deleted
- Their contribution_events are handled per category:
* Publishers: DELETE events (orgs shouldn't have credit)
* Garbage: DELETE events (bogus data)
* Persons/agents: keep events untouched
Classification is heuristic uses explicit allowlists + regex patterns + length gates.
Ambiguous cases default to 'review_needed' (human decision).
Usage:
python3 scripts/classify-contributors.py # dry-run analysis + report
python3 scripts/classify-contributors.py --apply # write changes
python3 scripts/classify-contributors.py --show <handle> # inspect a single row
Writes to pipeline.db only. Does NOT modify claim files.
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Pentagon agents: kind='agent'. Authoritative list.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Publisher/news-org handles seen in current contributors table.
# Grouped by kind for the publishers row. Classified by inspection.
# NOTE: This list is hand-curated — add to it as new orgs appear.
PUBLISHERS_NEWS = {
# News outlets / brands
"cnbc", "al-jazeera", "axios", "bloomberg", "reuters", "bettorsinsider",
"fortune", "techcrunch", "coindesk", "coindesk-staff", "coindesk-research",
"coindesk research", "coindesk staff",
"defense-one", "thedefensepost", "theregister", "the-intercept",
"the-meridiem", "variety", "variety-staff", "variety staff", "spacenews",
"nasaspaceflight", "thedonkey", "insidedefense", "techpolicypress",
"morganlewis", "casinoorg", "deadline", "animationmagazine",
"defensepost", "casino-org", "casino.org",
"air & space forces magazine", "ieee spectrum", "techcrunch-staff",
"blockworks", "blockworks-staff", "decrypt", "ainvest", "banking-dive", "banking dive",
"cset-georgetown", "cset georgetown",
"kff", "kff-health-news", "kff health news", "kff-health-news---cbo",
"kff-health-news-/-cbo", "kff health news / cbo", "kffhealthnews",
"bloomberg-law",
"norton-rose-fulbright", "norton rose fulbright",
"defence-post", "the-defensepost",
"wilmerhale", "mofo", "sciencedirect",
"yogonet", "csr", "aisi-uk", "aisi", "aisi_gov", "rand",
"armscontrol", "eclinmed", "solana-compass", "solana compass",
"pmc11919318", "pmc11780016",
"healthverity", "natrium", "form-energy",
"courtlistener", "curtis-schiff", "curtis-schiff-prediction-markets",
"prophetx", "techpolicypress-staff",
"npr", "venturebeat", "geekwire", "payloadspace", "the-ankler",
"theankler", "tubefilter", "emarketer", "dagster",
"numerai", # fund/project brand, not person
"psl", "multistate",
}
PUBLISHERS_ACADEMIC = {
# Academic orgs, labs, papers, journals, institutions
"arxiv", "metr", "metr_evals", "apollo-research", "apollo research", "apolloresearch",
"jacc-study-authors", "jacc-data-report-authors",
"anthropic-fellows-program", "anthropic-fellows",
"anthropic-fellows-/-alignment-science-team", "anthropic-research",
"jmir-2024", "jmir 2024",
"oettl-et-al.,-journal-of-experimental-orthopaedics",
"oettl et al., journal of experimental orthopaedics",
"jacc", "nct06548490", "pmc",
"conitzer-et-al.-(2024)", "aquino-michaels-2026", "pan-et-al.",
"pan-et-al.-'natural-language-agent-harnesses'",
"stanford", "stanford-meta-harness",
"hendershot", "annals-im",
"nellie-liang,-brookings-institution", "nellie liang, brookings institution",
"penn-state", "american-heart-association", "american heart association",
"molt_cornelius", "molt-cornelius",
# Companies / labs / brand-orgs (not specific humans)
"anthropic", "anthropicai", "openai", "nasa", "icrc", "ecri",
"epochairesearch", "metadao", "iapam", "icer",
"who", "ama", "uspstf", "unknown",
"futard.io", # protocol/platform
"oxford-martin-ai-governance-initiative",
"oxford-martin-ai-governance",
"u.s.-food-and-drug-administration",
"jitse-goutbeek,-european-policy-centre", # cited person+org string → publisher
"adepoju-et-al.", # paper citation
# Formal-citation names (Firstname-Lastname or Lastname-et-al) — classified
# as academic citations, not reachable contributors. They'd need an @ handle
# to get CI credit per Cory's growth-loop design.
"senator-elissa-slotkin",
"bostrom", "hanson", "kaufmann", "noah-smith", "doug-shapiro",
"shayon-sengupta", "shayon sengupta",
"robin-hanson", "robin hanson", "eliezer-yudkowsky",
"leopold-aschenbrenner", "aschenbrenner",
"ramstead", "larsson", "heavey",
"dan-slimmon", "van-leeuwaarden", "ward-whitt", "adams",
"tamim-ansary", "spizzirri",
"dario-amodei", # formal-citation form (real @ is @darioamodei)
"corless", "oxranga", "vlahakis",
# Brand/project/DAO tokens — not individuals
"areal-dao", "areal", "theiaresearch", "futard-io", "dhrumil",
# Classic formal-citation names — famous academics/economists cited by surname.
# Reachable via @ handle if/when they join (e.g. Ostrom has no X, Hayek deceased,
# Friston has an institutional affiliation not an @ handle we'd track).
"clayton-christensen", "hidalgo", "coase", "wiener", "juarrero",
"ostrom", "centola", "hayek", "marshall-mcluhan", "blackmore",
"knuth", "friston", "aquino-michaels", "conitzer", "bak",
}
# NOTE: pseudonymous X handles that MAY be real contributors stay in keep_person:
# karpathy, simonw, swyx, metaproph3t, metanallok, mmdhrumil, sjdedic,
# ceterispar1bus — these are real X accounts and match Cory's growth loop.
# They appear without @ prefix because extraction frontmatter didn't normalize.
# Auto-creating them as contributors tier='cited' is correct (A-path from earlier).
PUBLISHERS_SOCIAL = {
"x", "twitter", "telegram", "x.com",
}
PUBLISHERS_INTERNAL = {
"teleohumanity-manifesto", "strategy-session-journal",
"living-capital-thesis-development", "attractor-state-historical-backtesting",
"web-research-compilation", "architectural-investing",
"governance---meritocratic-voting-+-futarchy", # title artifact
"sec-interpretive-release-s7-2026-09-(march-17", # title artifact
"mindstudio", # tooling/platform, not contributor
}
# Merge into one kind→set map for classification
PUBLISHER_KIND_MAP = {}
for h in PUBLISHERS_NEWS:
PUBLISHER_KIND_MAP[h.lower()] = "news"
for h in PUBLISHERS_ACADEMIC:
PUBLISHER_KIND_MAP[h.lower()] = "academic"
for h in PUBLISHERS_SOCIAL:
PUBLISHER_KIND_MAP[h.lower()] = "social_platform"
for h in PUBLISHERS_INTERNAL:
PUBLISHER_KIND_MAP[h.lower()] = "internal"
# Garbage: handles that are clearly parse artifacts, not real names.
# Pattern: contains parens, special chars, or >50 chars.
def is_garbage(handle: str) -> bool:
h = handle.strip()
if len(h) > 50:
return True
if re.search(r"[()\[\]<>{}\/\\|@#$%^&*=?!:;\"']", h):
# But @ can appear legitimately in handles like @thesensatore — allow if @ is only prefix
if h.startswith("@") and not re.search(r"[()\[\]<>{}\/\\|#$%^&*=?!:;\"']", h):
return False
return True
# Multi-word hyphenated with very specific artifact shape: 3+ hyphens in a row or trailing noise
if "---" in h or "---meritocratic" in h or h.endswith("(march") or h.endswith("-(march"):
return True
return False
def classify(handle: str) -> tuple[str, str | None]:
"""Return (category, publisher_kind).
category {'keep_agent', 'keep_person', 'publisher', 'garbage', 'review_needed'}
publisher_kind {'news','academic','social_platform','internal', None}
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return ("keep_agent", None)
if h in PUBLISHER_KIND_MAP:
return ("publisher", PUBLISHER_KIND_MAP[h])
if is_garbage(handle):
return ("garbage", None)
# @-prefixed handles or short-slug real-looking names → keep as person
# (Auto-create rule from Cory: @ handles auto-join as tier='cited'.)
if handle.startswith("@"):
return ("keep_person", None)
# Plausible handles (<=39 chars, alphanum + underscore/hyphen): treat as person.
# 39-char ceiling matches GitHub's handle limit and the writer path in
# contributor.py::_HANDLE_RE, so a valid 21-39 char real handle won't fall
# through to review_needed and block --apply.
if re.match(r"^[a-z0-9][a-z0-9_-]{0,38}$", h):
return ("keep_person", None)
# Everything else: needs human review
return ("review_needed", None)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="Write changes to DB")
parser.add_argument("--show", type=str, help="Inspect a single handle")
parser.add_argument("--delete-events", action="store_true",
help="DELETE contribution_events for publishers+garbage (default: keep for audit)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: publishers table must exist (v26 migration applied)
try:
conn.execute("SELECT 1 FROM publishers LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: publishers table missing. Run migration v26 first.", file=sys.stderr)
sys.exit(2)
rows = conn.execute(
"SELECT handle, kind, tier, claims_merged FROM contributors ORDER BY claims_merged DESC"
).fetchall()
if args.show:
target = args.show.strip().lower().lstrip("@")
for r in rows:
if r["handle"].lower().lstrip("@") == target:
category, pkind = classify(r["handle"])
events_count = conn.execute(
"SELECT COUNT(*) FROM contribution_events WHERE handle = ?",
(r["handle"].lower().lstrip("@"),),
).fetchone()[0]
print(f"handle: {r['handle']}")
print(f"current_kind: {r['kind']}")
print(f"current_tier: {r['tier']}")
print(f"claims_merged: {r['claims_merged']}")
print(f"events: {events_count}")
print(f"→ category: {category}")
if pkind:
print(f"→ publisher: kind={pkind}")
return
print(f"No match for '{args.show}'")
return
# Classify all
buckets: dict[str, list[dict]] = {
"keep_agent": [],
"keep_person": [],
"publisher": [],
"garbage": [],
"review_needed": [],
}
for r in rows:
category, pkind = classify(r["handle"])
buckets[category].append({
"handle": r["handle"],
"kind_now": r["kind"],
"tier": r["tier"],
"claims": r["claims_merged"] or 0,
"publisher_kind": pkind,
})
print("=== Classification summary ===")
for cat, items in buckets.items():
print(f" {cat:18s} {len(items):5d}")
print("\n=== Sample of each category ===")
for cat, items in buckets.items():
print(f"\n--- {cat} (showing up to 10) ---")
for item in items[:10]:
tag = f"{item['publisher_kind']}" if item["publisher_kind"] else ""
print(f" {item['handle']:50s} claims={item['claims']:5d}{tag}")
print("\n=== Full review_needed list ===")
for item in buckets["review_needed"]:
print(f" {item['handle']:50s} claims={item['claims']:5d}")
# Diagnostic: orphan alias count for handles we're about to delete.
# Contributor_aliases has no FK (SQLite FKs require PRAGMA to enforce anyway),
# so aliases pointing to deleted canonical handles become orphans. Surface
# the count so the --delete-events decision is informed.
doomed = [item["handle"].lower().lstrip("@") for item in buckets["garbage"] + buckets["publisher"]]
if doomed:
placeholders = ",".join("?" * len(doomed))
orphan_count = conn.execute(
f"SELECT COUNT(*) FROM contributor_aliases WHERE canonical IN ({placeholders})",
doomed,
).fetchone()[0]
print(f"\n=== Alias orphan check ===")
print(f" contributor_aliases rows pointing to deletable canonicals: {orphan_count}")
if orphan_count:
print(f" (cleanup requires --delete-events; without it, aliases stay as orphans)")
if not args.apply:
print("\n(dry-run — no writes. Re-run with --apply to execute.)")
return
# ── Apply changes ──
print("\n=== Applying changes ===")
if buckets["review_needed"]:
print(f"ABORT: {len(buckets['review_needed'])} rows need human review. Fix classifier before --apply.")
sys.exit(3)
inserted_publishers = 0
reclassified_agents = 0
deleted_garbage = 0
deleted_publisher_rows = 0
deleted_events = 0
deleted_aliases = 0
# Single transaction — if any step errors, roll back. This prevents the failure
# mode where a publisher insert fails silently and we still delete the contributor
# row, losing data.
try:
conn.execute("BEGIN")
# 1. Insert publishers. Track which ones succeeded so step 4 only deletes those.
# Counter uses cur.rowcount so replay runs (where publishers already exist)
# report accurate inserted=0 instead of falsely claiming the full set.
# moved_to_publisher is unconditional — the contributors row still needs to
# be deleted even when the publishers row was added in a prior run.
moved_to_publisher = set()
for item in buckets["publisher"]:
name = item["handle"].strip().lower().lstrip("@")
cur = conn.execute(
"INSERT OR IGNORE INTO publishers (name, kind) VALUES (?, ?)",
(name, item["publisher_kind"]),
)
if cur.rowcount > 0:
inserted_publishers += 1
moved_to_publisher.add(item["handle"])
# 2. Ensure Pentagon agents have kind='agent' (idempotent after v25 patch)
for item in buckets["keep_agent"]:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(item["handle"].lower().lstrip("@"),),
)
reclassified_agents += 1
# 3. Delete garbage handles from contributors (and their events + aliases)
for item in buckets["garbage"]:
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_garbage += cur.rowcount
# 4. Delete publisher rows from contributors — ONLY for those successfully
# inserted into publishers above. Guards against partial failure.
# Aliases pointing to publisher-classified handles get cleaned under the
# same --delete-events gate: publishers live in their own table now, any
# leftover aliases in contributor_aliases are orphans.
for item in buckets["publisher"]:
if item["handle"] not in moved_to_publisher:
continue
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_publisher_rows += cur.rowcount
# 5. Audit log entry for the destructive operation (Ganymede Q5).
conn.execute(
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
(
"schema_v26",
"classify_contributors",
json.dumps({
"publishers_inserted": inserted_publishers,
"agents_updated": reclassified_agents,
"garbage_deleted": deleted_garbage,
"publisher_rows_deleted": deleted_publisher_rows,
"events_deleted": deleted_events,
"aliases_deleted": deleted_aliases,
"delete_events_flag": bool(args.delete_events),
}),
),
)
conn.commit()
except Exception as e:
conn.rollback()
print(f"ERROR: Transaction failed, rolled back. {e}", file=sys.stderr)
sys.exit(4)
print(f" publishers inserted: {inserted_publishers}")
print(f" agents kind='agent' ensured: {reclassified_agents}")
print(f" garbage rows deleted: {deleted_garbage}")
print(f" publisher rows removed from contributors: {deleted_publisher_rows}")
if args.delete_events:
print(f" contribution_events deleted: {deleted_events}")
print(f" contributor_aliases deleted: {deleted_aliases}")
else:
print(f" (events + aliases kept — re-run with --delete-events to clean them)")
if __name__ == "__main__":
main()