Compare commits

..

No commits in common. "main" and "epimetheus/originator-backfill-recovery" have entirely different histories.

5 changed files with 34 additions and 841 deletions

View file

@ -9,16 +9,6 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
_cache = {"data": None, "ts": 0}
CACHE_TTL = 60 # 1 minute — activity should feel fresh
# commit_types we surface in the activity feed. `pipeline` is system
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
# inbox archive filenames, not claim slugs. Used as a fallback signal when
# branch/description heuristics miss (e.g. populated descriptions that
# happen to be source titles, not claim insights).
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
def _get_conn():
conn = sqlite3.connect(DB_PATH)
@ -27,52 +17,19 @@ def _get_conn():
return conn
def _is_source_slug(slug):
return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Two signals classify them
as 'source' (defense in depth):
1. extract/* branch with empty description (no claim title produced)
2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern)
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
description_lower = (description or "").lower()
has_desc = bool(description and description.strip())
if commit_type_l not in _FEED_COMMIT_TYPES:
def _classify_event(branch, description, commit_type):
if commit_type != "knowledge":
return None
# Explicit challenge signals win first.
if (commit_type_l == "challenge"
or branch.startswith("challenge/")
or "challenged_by" in description_lower):
return "challenge"
# Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
if (commit_type_l == "enrich"
or branch.startswith("enrich/")
or branch.startswith("reweave/")):
if branch and branch.startswith("extract/"):
return "create"
if branch and branch.startswith("reweave/"):
return "enrich"
if branch and branch.startswith("challenge/"):
return "challenge"
if description and "challenged_by" in description.lower():
return "challenge"
if branch and branch.startswith("enrich/"):
return "enrich"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
return "source"
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
# like an inbox archive filename (date-prefix-hash), treat as source
# regardless of branch/commit_type/description state. Catches cases
# where description leaked but is just a source title, not a claim.
if _is_source_slug(candidate_slug):
return "source"
# Everything else with a description is a new claim.
return "create"
@ -124,60 +81,33 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
def _build_events():
conn = _get_conn()
try:
placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
rows = conn.execute(f"""
rows = conn.execute("""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel, p.source_path
p.source_channel
FROM prs p
WHERE p.status = 'merged'
AND p.commit_type IN ({placeholders})
AND p.commit_type = 'knowledge'
AND p.merged_at IS NOT NULL
ORDER BY p.merged_at DESC
LIMIT 2000
""", _FEED_COMMIT_TYPES).fetchall()
""").fetchall()
events = []
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
for row in rows:
slugs = _extract_claim_slugs(row["description"], row["branch"])
candidate_slug = slugs[0] if slugs else ""
event_type = _classify_event(
row["branch"], row["description"], row["commit_type"],
candidate_slug=candidate_slug,
)
event_type = _classify_event(row["branch"], row["description"], row["commit_type"])
if not event_type:
continue
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
slugs = _extract_claim_slugs(row["description"], row["branch"])
merged_at = row["merged_at"] or ""
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15}
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40}
ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written —
# so the frontend can't produce a 404-ing claim link.
if event_type == "source":
summary_text = _summary_from_branch(row["branch"])
source_slug = (
_summary_from_branch(row["branch"]).lower().replace(" ", "-")
or row["branch"]
)
events.append({
"type": "source",
"claim_slug": "",
"source_slug": source_slug,
"domain": row["domain"] or "unknown",
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"source_channel": row["source_channel"] or "unknown",
})
continue
for slug in slugs:
if slug not in claim_activity:
claim_activity[slug] = {
@ -234,8 +164,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True)
elif sort_mode == "important":
type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3}
events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"])))
type_rank = {"challenge": 0, "enrich": 1, "create": 2}
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"])))
return events
@ -245,8 +175,6 @@ async def handle_activity_feed(request):
sort_mode = "recent"
domain = request.query.get("domain", "")
contributor = request.query.get("contributor", "")
type_param = request.query.get("type", "")
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
try:
limit = min(int(request.query.get("limit", "20")), 100)
except ValueError:
@ -268,8 +196,6 @@ async def handle_activity_feed(request):
filtered = [e for e in filtered if e["domain"] == domain]
if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
filtered = [e for e in filtered if e["type"] in type_filter]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events)

View file

@ -42,7 +42,7 @@ API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secret
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/prs", "/ops", "/health", "/agents", "/epistemic", "/legacy", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest", "/api/search"})
"/api/contributors", "/api/domains", "/api/audit", "/api/yield", "/api/cost-per-claim", "/api/fix-rates", "/api/compute-profile", "/api/review-queue", "/api/daily-digest"})
def _get_db() -> sqlite3.Connection:
@ -663,115 +663,38 @@ async def handle_api_domains(request):
return web.json_response({"domains": breakdown})
def _qdrant_hits_to_results(hits, include_expanded=False):
"""Shape raw Qdrant hits into Ship's chat-API contract."""
results = []
for h in hits:
payload = h.get("payload", {}) or {}
path = payload.get("claim_path", "") or ""
slug = path.rsplit("/", 1)[-1]
if slug.endswith(".md"):
slug = slug[:-3]
results.append({
"slug": slug,
"path": path,
"title": payload.get("claim_title", ""),
"domain": payload.get("domain"),
"confidence": payload.get("confidence"),
"score": round(float(h.get("score", 0.0) or 0.0), 4),
"body_excerpt": payload.get("snippet", "") or "",
})
return results
async def handle_api_search(request):
"""Semantic search over claims via Qdrant.
"""GET /api/search — semantic search over claims via Qdrant + graph expansion.
POST contract (Ship's chat API):
body: {"query": str, "limit": int, "min_score": float?, "domain": str?, "confidence": str?, "exclude": [str]?}
response: {"query": str, "results": [{"slug","path","title","domain","confidence","score","body_excerpt"}], "total": int}
GET (legacy + hackathon debug):
q: search query (required)
limit, domain, confidence, exclude, expand
min_score: if set, bypasses two-pass lib threshold (default lib behavior otherwise)
Query params:
q: search query (required)
domain: filter by domain (optional)
confidence: filter by confidence level (optional)
limit: max results, default 10 (optional)
exclude: comma-separated claim paths to exclude (optional)
expand: enable graph expansion, default true (optional)
"""
if request.method == "POST":
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON body"}, status=400)
query = (body.get("query") or "").strip()
if not query:
return web.json_response({"error": "query required"}, status=400)
try:
limit = min(int(body.get("limit") or 5), 50)
except (TypeError, ValueError):
return web.json_response({"error": "limit must be int"}, status=400)
try:
min_score = float(body.get("min_score") if body.get("min_score") is not None else 0.25)
except (TypeError, ValueError):
return web.json_response({"error": "min_score must be float"}, status=400)
domain = body.get("domain")
confidence = body.get("confidence")
exclude = body.get("exclude") or None
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
results = _qdrant_hits_to_results(hits)
return web.json_response({"query": query, "results": results, "total": len(results)})
# GET path
query = request.query.get("q", "").strip()
if not query:
return web.json_response({"error": "q parameter required"}, status=400)
domain = request.query.get("domain")
confidence = request.query.get("confidence")
try:
limit = min(int(request.query.get("limit", "10")), 50)
except ValueError:
return web.json_response({"error": "limit must be int"}, status=400)
limit = min(int(request.query.get("limit", "10")), 50)
exclude_raw = request.query.get("exclude", "")
exclude = [p.strip() for p in exclude_raw.split(",") if p.strip()] if exclude_raw else None
expand = request.query.get("expand", "true").lower() != "false"
min_score_raw = request.query.get("min_score")
if min_score_raw is not None:
try:
min_score = float(min_score_raw)
except ValueError:
return web.json_response({"error": "min_score must be float"}, status=400)
vector = embed_query(query)
if vector is None:
return web.json_response({"error": "embedding failed"}, status=502)
hits = search_qdrant(vector, limit=limit, domain=domain,
confidence=confidence, exclude=exclude,
score_threshold=min_score)
direct = _qdrant_hits_to_results(hits)
return web.json_response({
"query": query,
"direct_results": direct,
"expanded_results": [],
"total": len(direct),
})
# Default GET: Layer 1 + Layer 2 via lib
# Use shared search library (Layer 1 + Layer 2)
result = kb_search(query, expand=expand,
domain=domain, confidence=confidence, exclude=exclude)
if "error" in result:
error = result["error"]
if error == "embedding_failed":
return web.json_response({"error": "embedding failed"}, status=502)
return web.json_response({"error": error}, status=500)
return web.json_response(result)
@ -2345,7 +2268,6 @@ def create_app() -> web.Application:
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search)
app.router.add_post("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage)

View file

@ -9,7 +9,7 @@ from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 26
SCHEMA_VERSION = 25
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -35,15 +35,6 @@ CREATE TABLE IF NOT EXISTS sources (
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
-- v26: provenance publisher (news org / venue) + content author.
-- publisher_id references publishers(id) when source is from a known org.
-- original_author_handle references contributors(handle) when author is in our system.
-- original_author is free-text fallback ("Kim et al.", "Robin Hanson") not credit-bearing.
publisher_id INTEGER REFERENCES publishers(id),
content_type TEXT,
-- article | paper | tweet | conversation | self_authored | webpage | podcast
original_author TEXT,
original_author_handle TEXT REFERENCES contributors(handle),
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
@ -216,33 +207,6 @@ CREATE TABLE IF NOT EXISTS contributor_aliases (
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_canonical ON contributor_aliases(canonical);
-- Publishers: news orgs, academic venues, social platforms. NOT contributors these
-- provide metadata/provenance for sources, never earn leaderboard credit. Separating
-- these from contributors prevents CNBC/SpaceNews from dominating the leaderboard.
-- (Apr 24 Cory directive: "only credit the original source if its on X or tg")
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
-- Multi-platform identity: one contributor, many handles. Enables the leaderboard to
-- unify @thesensatore (X) + thesensatore (TG) + thesensatore@github into one person.
-- Writers check this table after resolving aliases to find canonical contributor handle.
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
"""
@ -800,51 +764,6 @@ def migrate(conn: sqlite3.Connection):
conn.commit()
logger.info("Migration v25: patched kind='agent' for pipeline handle")
if current < 26:
# Add publishers + contributor_identities. Non-breaking — new tables only.
# No existing data moved. Classification into publishers happens via a
# separate script (scripts/reclassify-contributors.py) with Cory-reviewed
# seed list. CHECK constraint on contributors.kind deferred to v27 after
# classification completes. (Apr 24 Cory directive: "fix schema, don't
# filter output" — separate contributors from publishers at the data layer.)
conn.executescript("""
CREATE TABLE IF NOT EXISTS publishers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
kind TEXT CHECK(kind IN ('news', 'academic', 'social_platform', 'podcast', 'self', 'internal', 'legal', 'government', 'research_org', 'commercial', 'other')),
url_pattern TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_publishers_name ON publishers(name);
CREATE INDEX IF NOT EXISTS idx_publishers_kind ON publishers(kind);
CREATE TABLE IF NOT EXISTS contributor_identities (
contributor_handle TEXT NOT NULL,
platform TEXT NOT NULL CHECK(platform IN ('x', 'telegram', 'github', 'email', 'web', 'internal')),
platform_handle TEXT NOT NULL,
verified INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (platform, platform_handle)
);
CREATE INDEX IF NOT EXISTS idx_identities_contributor ON contributor_identities(contributor_handle);
""")
# Extend sources with provenance columns. ALTER TABLE ADD COLUMN is
# idempotent-safe via try/except because SQLite doesn't support IF NOT EXISTS
# on column adds.
for col_sql in (
"ALTER TABLE sources ADD COLUMN publisher_id INTEGER REFERENCES publishers(id)",
"ALTER TABLE sources ADD COLUMN content_type TEXT",
"ALTER TABLE sources ADD COLUMN original_author TEXT",
"ALTER TABLE sources ADD COLUMN original_author_handle TEXT REFERENCES contributors(handle)",
):
try:
conn.execute(col_sql)
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
raise
conn.commit()
logger.info("Migration v26: added publishers + contributor_identities tables + sources provenance columns")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",

View file

@ -1,148 +0,0 @@
#!/usr/bin/env python3
"""Reconstruct synthetic `prs` rows for historical GitHub PRs lost pre-mirror-wiring.
Two PRs merged on GitHub before our sync-mirror.sh tracked `github_pr`:
- GitHub PR #68: alexastrum — 6 claims, merged 2026-03-09 via GitHub squash,
recovered to Forgejo via commit dba00a79 (Apr 16, after mirror erased files)
- GitHub PR #88: Cameron-S1 — 1 claim, recovered via commit da64f805
The recovery commits wrote the files directly to main, so our `prs` table has
no row to attach originator events to the backfill-events.py strategies all
return NULL. We reconstruct one synthetic `prs` row per historical GitHub PR so
the events pipeline (and `github_pr` strategy in backfill-events) can credit
Alex and Cameron properly.
Numbers 900000+ are clearly synthetic and won't collide with real Forgejo PRs.
Idempotent via INSERT OR IGNORE.
Usage:
python3 scripts/backfill-synthetic-recovery-prs.py --dry-run
python3 scripts/backfill-synthetic-recovery-prs.py
"""
import argparse
import os
import sqlite3
import sys
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Historical GitHub PRs recovered via direct-to-main commits.
# Original GitHub merge dates come from the recovery commit messages.
RECOVERY_PRS = [
{
"number": 900068,
"github_pr": 68,
"branch": "gh-pr-68",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "alexastrum",
"source_channel": "github",
# origin='human' matches lib/merge.py convention for external contributors
# (default is 'pipeline' which misclassifies us as machine-authored).
"origin": "human",
"priority": "high",
"description": "Multi-agent git workflows production maturity | Cryptographic agent trust ratings | Defense in depth for AI agent oversight | Deterministic policy engines below LLM layer | Knowledge validation four-layer architecture | Structurally separating proposer and reviewer agents",
"merged_at": "2026-03-09 00:00:00",
"created_at": "2026-03-08 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #68 pre-mirror-wiring reconstruction (commit dba00a79)",
},
{
"number": 900088,
"github_pr": 88,
"branch": "gh-pr-88",
"status": "merged",
"domain": "ai-alignment",
"commit_type": "knowledge",
"tier": "STANDARD",
"leo_verdict": "approve",
"domain_verdict": "approve",
"submitted_by": "cameron-s1",
"source_channel": "github",
"origin": "human",
"priority": "high",
"description": "Orthogonality is an artefact of specification architectures not a property of intelligence itself",
"merged_at": "2026-04-01 00:00:00",
"created_at": "2026-04-01 00:00:00",
"last_error": "synthetic_recovery: GitHub PR #88 pre-mirror-wiring reconstruction (commit da64f805)",
},
]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Guard against synthetic-range colonization (Ganymede review): check for
# any row in the synthetic range that isn't one of ours. INSERT OR IGNORE on
# the specific numbers is the real collision defense; this is belt-and-suspenders.
max_real = conn.execute(
"SELECT MAX(number) FROM prs WHERE number < 900000"
).fetchone()[0] or 0
print(f"Max real Forgejo PR number: {max_real}")
synth_conflict = conn.execute(
"SELECT number FROM prs WHERE number >= 900000 AND number NOT IN (900068, 900088) LIMIT 1"
).fetchone()
if synth_conflict:
print(f"ERROR: PR #{synth_conflict[0]} already exists in synthetic range. "
f"Pick a new range before running.", file=sys.stderr)
sys.exit(2)
inserted = 0
skipped = 0
for row in RECOVERY_PRS:
existing = conn.execute(
"SELECT number FROM prs WHERE number = ? OR github_pr = ?",
(row["number"], row["github_pr"]),
).fetchone()
if existing:
print(f" PR #{row['number']} (github_pr={row['github_pr']}): already exists — skip")
skipped += 1
continue
print(f" {'(dry-run) ' if args.dry_run else ''}INSERT synthetic PR #{row['number']} "
f"(github_pr={row['github_pr']}, submitted_by={row['submitted_by']}, "
f"merged_at={row['merged_at']})")
if not args.dry_run:
conn.execute(
"""INSERT INTO prs (
number, github_pr, branch, status, domain, commit_type, tier,
leo_verdict, domain_verdict, submitted_by, source_channel,
origin, priority,
description, merged_at, created_at, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
row["number"], row["github_pr"], row["branch"], row["status"],
row["domain"], row["commit_type"], row["tier"],
row["leo_verdict"], row["domain_verdict"],
row["submitted_by"], row["source_channel"],
row["origin"], row["priority"],
row["description"], row["merged_at"], row["created_at"],
row["last_error"],
),
)
inserted += 1
if not args.dry_run:
conn.commit()
print(f"\nInserted {inserted}, skipped {skipped}")
if not args.dry_run and inserted:
print("\nNext step: re-run backfill-events.py to attach originator events")
print(" python3 ops/backfill-events.py")
if __name__ == "__main__":
main()

View file

@ -1,426 +0,0 @@
#!/usr/bin/env python3
"""Classify `contributors` rows into {keep_person, keep_agent, move_to_publisher, delete_garbage}.
Reads current contributors table, proposes reclassification per v26 schema design:
- Real humans + Pentagon agents stay in contributors (kind='person'|'agent')
- News orgs, publications, venues move to publishers table (new v26)
- Multi-word hyphenated garbage (parsing artifacts) gets deleted
- Their contribution_events are handled per category:
* Publishers: DELETE events (orgs shouldn't have credit)
* Garbage: DELETE events (bogus data)
* Persons/agents: keep events untouched
Classification is heuristic uses explicit allowlists + regex patterns + length gates.
Ambiguous cases default to 'review_needed' (human decision).
Usage:
python3 scripts/classify-contributors.py # dry-run analysis + report
python3 scripts/classify-contributors.py --apply # write changes
python3 scripts/classify-contributors.py --show <handle> # inspect a single row
Writes to pipeline.db only. Does NOT modify claim files.
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import Counter
from pathlib import Path
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
# Pentagon agents: kind='agent'. Authoritative list.
PENTAGON_AGENTS = frozenset({
"rio", "leo", "theseus", "vida", "clay", "astra",
"oberon", "argus", "rhea", "ganymede", "epimetheus", "hermes", "ship",
"pipeline",
})
# Publisher/news-org handles seen in current contributors table.
# Grouped by kind for the publishers row. Classified by inspection.
# NOTE: This list is hand-curated — add to it as new orgs appear.
PUBLISHERS_NEWS = {
# News outlets / brands
"cnbc", "al-jazeera", "axios", "bloomberg", "reuters", "bettorsinsider",
"fortune", "techcrunch", "coindesk", "coindesk-staff", "coindesk-research",
"coindesk research", "coindesk staff",
"defense-one", "thedefensepost", "theregister", "the-intercept",
"the-meridiem", "variety", "variety-staff", "variety staff", "spacenews",
"nasaspaceflight", "thedonkey", "insidedefense", "techpolicypress",
"morganlewis", "casinoorg", "deadline", "animationmagazine",
"defensepost", "casino-org", "casino.org",
"air & space forces magazine", "ieee spectrum", "techcrunch-staff",
"blockworks", "blockworks-staff", "decrypt", "ainvest", "banking-dive", "banking dive",
"cset-georgetown", "cset georgetown",
"kff", "kff-health-news", "kff health news", "kff-health-news---cbo",
"kff-health-news-/-cbo", "kff health news / cbo", "kffhealthnews",
"bloomberg-law",
"norton-rose-fulbright", "norton rose fulbright",
"defence-post", "the-defensepost",
"wilmerhale", "mofo", "sciencedirect",
"yogonet", "csr", "aisi-uk", "aisi", "aisi_gov", "rand",
"armscontrol", "eclinmed", "solana-compass", "solana compass",
"pmc11919318", "pmc11780016",
"healthverity", "natrium", "form-energy",
"courtlistener", "curtis-schiff", "curtis-schiff-prediction-markets",
"prophetx", "techpolicypress-staff",
"npr", "venturebeat", "geekwire", "payloadspace", "the-ankler",
"theankler", "tubefilter", "emarketer", "dagster",
"numerai", # fund/project brand, not person
"psl", "multistate",
}
PUBLISHERS_ACADEMIC = {
# Academic orgs, labs, papers, journals, institutions
"arxiv", "metr", "metr_evals", "apollo-research", "apollo research", "apolloresearch",
"jacc-study-authors", "jacc-data-report-authors",
"anthropic-fellows-program", "anthropic-fellows",
"anthropic-fellows-/-alignment-science-team", "anthropic-research",
"jmir-2024", "jmir 2024",
"oettl-et-al.,-journal-of-experimental-orthopaedics",
"oettl et al., journal of experimental orthopaedics",
"jacc", "nct06548490", "pmc",
"conitzer-et-al.-(2024)", "aquino-michaels-2026", "pan-et-al.",
"pan-et-al.-'natural-language-agent-harnesses'",
"stanford", "stanford-meta-harness",
"hendershot", "annals-im",
"nellie-liang,-brookings-institution", "nellie liang, brookings institution",
"penn-state", "american-heart-association", "american heart association",
"molt_cornelius", "molt-cornelius",
# Companies / labs / brand-orgs (not specific humans)
"anthropic", "anthropicai", "openai", "nasa", "icrc", "ecri",
"epochairesearch", "metadao", "iapam", "icer",
"who", "ama", "uspstf", "unknown",
"futard.io", # protocol/platform
"oxford-martin-ai-governance-initiative",
"oxford-martin-ai-governance",
"u.s.-food-and-drug-administration",
"jitse-goutbeek,-european-policy-centre", # cited person+org string → publisher
"adepoju-et-al.", # paper citation
# Formal-citation names (Firstname-Lastname or Lastname-et-al) — classified
# as academic citations, not reachable contributors. They'd need an @ handle
# to get CI credit per Cory's growth-loop design.
"senator-elissa-slotkin",
"bostrom", "hanson", "kaufmann", "noah-smith", "doug-shapiro",
"shayon-sengupta", "shayon sengupta",
"robin-hanson", "robin hanson", "eliezer-yudkowsky",
"leopold-aschenbrenner", "aschenbrenner",
"ramstead", "larsson", "heavey",
"dan-slimmon", "van-leeuwaarden", "ward-whitt", "adams",
"tamim-ansary", "spizzirri",
"dario-amodei", # formal-citation form (real @ is @darioamodei)
"corless", "oxranga", "vlahakis",
# Brand/project/DAO tokens — not individuals
"areal-dao", "areal", "theiaresearch", "futard-io", "dhrumil",
# Classic formal-citation names — famous academics/economists cited by surname.
# Reachable via @ handle if/when they join (e.g. Ostrom has no X, Hayek deceased,
# Friston has an institutional affiliation not an @ handle we'd track).
"clayton-christensen", "hidalgo", "coase", "wiener", "juarrero",
"ostrom", "centola", "hayek", "marshall-mcluhan", "blackmore",
"knuth", "friston", "aquino-michaels", "conitzer", "bak",
}
# NOTE: pseudonymous X handles that MAY be real contributors stay in keep_person:
# karpathy, simonw, swyx, metaproph3t, metanallok, mmdhrumil, sjdedic,
# ceterispar1bus — these are real X accounts and match Cory's growth loop.
# They appear without @ prefix because extraction frontmatter didn't normalize.
# Auto-creating them as contributors tier='cited' is correct (A-path from earlier).
PUBLISHERS_SOCIAL = {
"x", "twitter", "telegram", "x.com",
}
PUBLISHERS_INTERNAL = {
"teleohumanity-manifesto", "strategy-session-journal",
"living-capital-thesis-development", "attractor-state-historical-backtesting",
"web-research-compilation", "architectural-investing",
"governance---meritocratic-voting-+-futarchy", # title artifact
"sec-interpretive-release-s7-2026-09-(march-17", # title artifact
"mindstudio", # tooling/platform, not contributor
}
# Merge into one kind→set map for classification
PUBLISHER_KIND_MAP = {}
for h in PUBLISHERS_NEWS:
PUBLISHER_KIND_MAP[h.lower()] = "news"
for h in PUBLISHERS_ACADEMIC:
PUBLISHER_KIND_MAP[h.lower()] = "academic"
for h in PUBLISHERS_SOCIAL:
PUBLISHER_KIND_MAP[h.lower()] = "social_platform"
for h in PUBLISHERS_INTERNAL:
PUBLISHER_KIND_MAP[h.lower()] = "internal"
# Garbage: handles that are clearly parse artifacts, not real names.
# Pattern: contains parens, special chars, or >50 chars.
def is_garbage(handle: str) -> bool:
h = handle.strip()
if len(h) > 50:
return True
if re.search(r"[()\[\]<>{}\/\\|@#$%^&*=?!:;\"']", h):
# But @ can appear legitimately in handles like @thesensatore — allow if @ is only prefix
if h.startswith("@") and not re.search(r"[()\[\]<>{}\/\\|#$%^&*=?!:;\"']", h):
return False
return True
# Multi-word hyphenated with very specific artifact shape: 3+ hyphens in a row or trailing noise
if "---" in h or "---meritocratic" in h or h.endswith("(march") or h.endswith("-(march"):
return True
return False
def classify(handle: str) -> tuple[str, str | None]:
"""Return (category, publisher_kind).
category {'keep_agent', 'keep_person', 'publisher', 'garbage', 'review_needed'}
publisher_kind {'news','academic','social_platform','internal', None}
"""
h = handle.strip().lower().lstrip("@")
if h in PENTAGON_AGENTS:
return ("keep_agent", None)
if h in PUBLISHER_KIND_MAP:
return ("publisher", PUBLISHER_KIND_MAP[h])
if is_garbage(handle):
return ("garbage", None)
# @-prefixed handles or short-slug real-looking names → keep as person
# (Auto-create rule from Cory: @ handles auto-join as tier='cited'.)
if handle.startswith("@"):
return ("keep_person", None)
# Plausible handles (<=39 chars, alphanum + underscore/hyphen): treat as person.
# 39-char ceiling matches GitHub's handle limit and the writer path in
# contributor.py::_HANDLE_RE, so a valid 21-39 char real handle won't fall
# through to review_needed and block --apply.
if re.match(r"^[a-z0-9][a-z0-9_-]{0,38}$", h):
return ("keep_person", None)
# Everything else: needs human review
return ("review_needed", None)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="Write changes to DB")
parser.add_argument("--show", type=str, help="Inspect a single handle")
parser.add_argument("--delete-events", action="store_true",
help="DELETE contribution_events for publishers+garbage (default: keep for audit)")
args = parser.parse_args()
if not Path(DB_PATH).exists():
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
# Sanity: publishers table must exist (v26 migration applied)
try:
conn.execute("SELECT 1 FROM publishers LIMIT 1")
except sqlite3.OperationalError:
print("ERROR: publishers table missing. Run migration v26 first.", file=sys.stderr)
sys.exit(2)
rows = conn.execute(
"SELECT handle, kind, tier, claims_merged FROM contributors ORDER BY claims_merged DESC"
).fetchall()
if args.show:
target = args.show.strip().lower().lstrip("@")
for r in rows:
if r["handle"].lower().lstrip("@") == target:
category, pkind = classify(r["handle"])
events_count = conn.execute(
"SELECT COUNT(*) FROM contribution_events WHERE handle = ?",
(r["handle"].lower().lstrip("@"),),
).fetchone()[0]
print(f"handle: {r['handle']}")
print(f"current_kind: {r['kind']}")
print(f"current_tier: {r['tier']}")
print(f"claims_merged: {r['claims_merged']}")
print(f"events: {events_count}")
print(f"→ category: {category}")
if pkind:
print(f"→ publisher: kind={pkind}")
return
print(f"No match for '{args.show}'")
return
# Classify all
buckets: dict[str, list[dict]] = {
"keep_agent": [],
"keep_person": [],
"publisher": [],
"garbage": [],
"review_needed": [],
}
for r in rows:
category, pkind = classify(r["handle"])
buckets[category].append({
"handle": r["handle"],
"kind_now": r["kind"],
"tier": r["tier"],
"claims": r["claims_merged"] or 0,
"publisher_kind": pkind,
})
print("=== Classification summary ===")
for cat, items in buckets.items():
print(f" {cat:18s} {len(items):5d}")
print("\n=== Sample of each category ===")
for cat, items in buckets.items():
print(f"\n--- {cat} (showing up to 10) ---")
for item in items[:10]:
tag = f"{item['publisher_kind']}" if item["publisher_kind"] else ""
print(f" {item['handle']:50s} claims={item['claims']:5d}{tag}")
print("\n=== Full review_needed list ===")
for item in buckets["review_needed"]:
print(f" {item['handle']:50s} claims={item['claims']:5d}")
# Diagnostic: orphan alias count for handles we're about to delete.
# Contributor_aliases has no FK (SQLite FKs require PRAGMA to enforce anyway),
# so aliases pointing to deleted canonical handles become orphans. Surface
# the count so the --delete-events decision is informed.
doomed = [item["handle"].lower().lstrip("@") for item in buckets["garbage"] + buckets["publisher"]]
if doomed:
placeholders = ",".join("?" * len(doomed))
orphan_count = conn.execute(
f"SELECT COUNT(*) FROM contributor_aliases WHERE canonical IN ({placeholders})",
doomed,
).fetchone()[0]
print(f"\n=== Alias orphan check ===")
print(f" contributor_aliases rows pointing to deletable canonicals: {orphan_count}")
if orphan_count:
print(f" (cleanup requires --delete-events; without it, aliases stay as orphans)")
if not args.apply:
print("\n(dry-run — no writes. Re-run with --apply to execute.)")
return
# ── Apply changes ──
print("\n=== Applying changes ===")
if buckets["review_needed"]:
print(f"ABORT: {len(buckets['review_needed'])} rows need human review. Fix classifier before --apply.")
sys.exit(3)
inserted_publishers = 0
reclassified_agents = 0
deleted_garbage = 0
deleted_publisher_rows = 0
deleted_events = 0
deleted_aliases = 0
# Single transaction — if any step errors, roll back. This prevents the failure
# mode where a publisher insert fails silently and we still delete the contributor
# row, losing data.
try:
conn.execute("BEGIN")
# 1. Insert publishers. Track which ones succeeded so step 4 only deletes those.
# Counter uses cur.rowcount so replay runs (where publishers already exist)
# report accurate inserted=0 instead of falsely claiming the full set.
# moved_to_publisher is unconditional — the contributors row still needs to
# be deleted even when the publishers row was added in a prior run.
moved_to_publisher = set()
for item in buckets["publisher"]:
name = item["handle"].strip().lower().lstrip("@")
cur = conn.execute(
"INSERT OR IGNORE INTO publishers (name, kind) VALUES (?, ?)",
(name, item["publisher_kind"]),
)
if cur.rowcount > 0:
inserted_publishers += 1
moved_to_publisher.add(item["handle"])
# 2. Ensure Pentagon agents have kind='agent' (idempotent after v25 patch)
for item in buckets["keep_agent"]:
conn.execute(
"UPDATE contributors SET kind = 'agent' WHERE handle = ?",
(item["handle"].lower().lstrip("@"),),
)
reclassified_agents += 1
# 3. Delete garbage handles from contributors (and their events + aliases)
for item in buckets["garbage"]:
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_garbage += cur.rowcount
# 4. Delete publisher rows from contributors — ONLY for those successfully
# inserted into publishers above. Guards against partial failure.
# Aliases pointing to publisher-classified handles get cleaned under the
# same --delete-events gate: publishers live in their own table now, any
# leftover aliases in contributor_aliases are orphans.
for item in buckets["publisher"]:
if item["handle"] not in moved_to_publisher:
continue
canonical_lower = item["handle"].lower().lstrip("@")
if args.delete_events:
cur = conn.execute(
"DELETE FROM contribution_events WHERE handle = ?",
(canonical_lower,),
)
deleted_events += cur.rowcount
cur = conn.execute(
"DELETE FROM contributor_aliases WHERE canonical = ?",
(canonical_lower,),
)
deleted_aliases += cur.rowcount
cur = conn.execute(
"DELETE FROM contributors WHERE handle = ?",
(item["handle"],),
)
deleted_publisher_rows += cur.rowcount
# 5. Audit log entry for the destructive operation (Ganymede Q5).
conn.execute(
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
(
"schema_v26",
"classify_contributors",
json.dumps({
"publishers_inserted": inserted_publishers,
"agents_updated": reclassified_agents,
"garbage_deleted": deleted_garbage,
"publisher_rows_deleted": deleted_publisher_rows,
"events_deleted": deleted_events,
"aliases_deleted": deleted_aliases,
"delete_events_flag": bool(args.delete_events),
}),
),
)
conn.commit()
except Exception as e:
conn.rollback()
print(f"ERROR: Transaction failed, rolled back. {e}", file=sys.stderr)
sys.exit(4)
print(f" publishers inserted: {inserted_publishers}")
print(f" agents kind='agent' ensured: {reclassified_agents}")
print(f" garbage rows deleted: {deleted_garbage}")
print(f" publisher rows removed from contributors: {deleted_publisher_rows}")
if args.delete_events:
print(f" contribution_events deleted: {deleted_events}")
print(f" contributor_aliases deleted: {deleted_aliases}")
else:
print(f" (events + aliases kept — re-run with --delete-events to clean them)")
if __name__ == "__main__":
main()