Compare commits
13 commits
ship/metad
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1351db70a9 | |||
| d60b6f8bf2 | |||
| cd5aac5cc6 | |||
| 7c6417d6be | |||
| 42d35d4e15 | |||
| de7e5ec709 | |||
| 369f6c96da | |||
| 6aff03ff56 | |||
| 319e03e2c6 | |||
| 2d332c66d4 | |||
| dea1b02aa6 | |||
| d0fb4c96e3 | |||
| 926a397839 |
14 changed files with 1466 additions and 525 deletions
|
|
@ -51,7 +51,7 @@ fi
|
||||||
|
|
||||||
# Syntax check all Python files before copying
|
# Syntax check all Python files before copying
|
||||||
ERRORS=0
|
ERRORS=0
|
||||||
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py scripts/*.py; do
|
for f in lib/*.py *.py diagnostics/*.py telegram/*.py tests/*.py; do
|
||||||
[ -f "$f" ] || continue
|
[ -f "$f" ] || continue
|
||||||
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>&1; then
|
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>&1; then
|
||||||
log "SYNTAX ERROR: $f"
|
log "SYNTAX ERROR: $f"
|
||||||
|
|
@ -77,7 +77,6 @@ rsync "${RSYNC_OPTS[@]}" telegram/ "$PIPELINE_DIR/telegram/"
|
||||||
rsync "${RSYNC_OPTS[@]}" diagnostics/ "$DIAGNOSTICS_DIR/"
|
rsync "${RSYNC_OPTS[@]}" diagnostics/ "$DIAGNOSTICS_DIR/"
|
||||||
rsync "${RSYNC_OPTS[@]}" agent-state/ "$AGENT_STATE_DIR/"
|
rsync "${RSYNC_OPTS[@]}" agent-state/ "$AGENT_STATE_DIR/"
|
||||||
rsync "${RSYNC_OPTS[@]}" tests/ "$PIPELINE_DIR/tests/"
|
rsync "${RSYNC_OPTS[@]}" tests/ "$PIPELINE_DIR/tests/"
|
||||||
rsync "${RSYNC_OPTS[@]}" scripts/ "$PIPELINE_DIR/scripts/"
|
|
||||||
[ -f research/research-session.sh ] && rsync "${RSYNC_OPTS[@]}" research/research-session.sh /opt/teleo-eval/research-session.sh
|
[ -f research/research-session.sh ] && rsync "${RSYNC_OPTS[@]}" research/research-session.sh /opt/teleo-eval/research-session.sh
|
||||||
|
|
||||||
# Safety net: ensure all .sh files are executable after rsync
|
# Safety net: ensure all .sh files are executable after rsync
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ echo ""
|
||||||
# Syntax check all Python files before deploying
|
# Syntax check all Python files before deploying
|
||||||
echo "=== Pre-deploy syntax check ==="
|
echo "=== Pre-deploy syntax check ==="
|
||||||
ERRORS=0
|
ERRORS=0
|
||||||
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py "$REPO_ROOT/scripts/"*.py; do
|
for f in "$REPO_ROOT/lib/"*.py "$REPO_ROOT/"*.py "$REPO_ROOT/diagnostics/"*.py "$REPO_ROOT/telegram/"*.py; do
|
||||||
[ -f "$f" ] || continue
|
[ -f "$f" ] || continue
|
||||||
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>/dev/null; then
|
if ! python3 -c "import ast, sys; ast.parse(open(sys.argv[1]).read())" "$f" 2>/dev/null; then
|
||||||
echo "SYNTAX ERROR: $f"
|
echo "SYNTAX ERROR: $f"
|
||||||
|
|
@ -80,10 +80,6 @@ echo "=== Tests ==="
|
||||||
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/tests/" "$VPS_HOST:$VPS_PIPELINE/tests/"
|
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/tests/" "$VPS_HOST:$VPS_PIPELINE/tests/"
|
||||||
echo ""
|
echo ""
|
||||||
|
|
||||||
echo "=== Scripts ==="
|
|
||||||
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/scripts/" "$VPS_HOST:$VPS_PIPELINE/scripts/"
|
|
||||||
echo ""
|
|
||||||
|
|
||||||
echo "=== Diagnostics ==="
|
echo "=== Diagnostics ==="
|
||||||
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/"
|
rsync "${RSYNC_OPTS[@]}" "$REPO_ROOT/diagnostics/" "$VPS_HOST:$VPS_DIAGNOSTICS/"
|
||||||
echo ""
|
echo ""
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,16 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
|
||||||
_cache = {"data": None, "ts": 0}
|
_cache = {"data": None, "ts": 0}
|
||||||
CACHE_TTL = 60 # 1 minute — activity should feel fresh
|
CACHE_TTL = 60 # 1 minute — activity should feel fresh
|
||||||
|
|
||||||
|
# commit_types we surface in the activity feed. `pipeline` is system
|
||||||
|
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
|
||||||
|
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
|
||||||
|
|
||||||
|
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
|
||||||
|
# inbox archive filenames, not claim slugs. Used as a fallback signal when
|
||||||
|
# branch/description heuristics miss (e.g. populated descriptions that
|
||||||
|
# happen to be source titles, not claim insights).
|
||||||
|
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
|
||||||
|
|
||||||
|
|
||||||
def _get_conn():
|
def _get_conn():
|
||||||
conn = sqlite3.connect(DB_PATH)
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
|
@ -17,19 +27,52 @@ def _get_conn():
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
|
||||||
def _classify_event(branch, description, commit_type):
|
def _is_source_slug(slug):
|
||||||
if commit_type != "knowledge":
|
return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_event(branch, description, commit_type, candidate_slug=None):
|
||||||
|
"""Return one of: create | enrich | challenge | source | None.
|
||||||
|
|
||||||
|
Source-archive PRs are extract/* branches that filed a source into
|
||||||
|
inbox/archive/ but didn't produce a claim. Two signals classify them
|
||||||
|
as 'source' (defense in depth):
|
||||||
|
1. extract/* branch with empty description (no claim title produced)
|
||||||
|
2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern)
|
||||||
|
"""
|
||||||
|
commit_type_l = (commit_type or "").lower()
|
||||||
|
branch = branch or ""
|
||||||
|
description_lower = (description or "").lower()
|
||||||
|
has_desc = bool(description and description.strip())
|
||||||
|
|
||||||
|
if commit_type_l not in _FEED_COMMIT_TYPES:
|
||||||
return None
|
return None
|
||||||
if branch and branch.startswith("extract/"):
|
|
||||||
return "create"
|
# Explicit challenge signals win first.
|
||||||
if branch and branch.startswith("reweave/"):
|
if (commit_type_l == "challenge"
|
||||||
return "enrich"
|
or branch.startswith("challenge/")
|
||||||
if branch and branch.startswith("challenge/"):
|
or "challenged_by" in description_lower):
|
||||||
return "challenge"
|
return "challenge"
|
||||||
if description and "challenged_by" in description.lower():
|
|
||||||
return "challenge"
|
# Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
|
||||||
if branch and branch.startswith("enrich/"):
|
if (commit_type_l == "enrich"
|
||||||
|
or branch.startswith("enrich/")
|
||||||
|
or branch.startswith("reweave/")):
|
||||||
return "enrich"
|
return "enrich"
|
||||||
|
|
||||||
|
# Source-only: extract/* with no claim description means inbox archive
|
||||||
|
# landed but no domain claim was written.
|
||||||
|
if branch.startswith("extract/") and not has_desc:
|
||||||
|
return "source"
|
||||||
|
|
||||||
|
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
|
||||||
|
# like an inbox archive filename (date-prefix-hash), treat as source
|
||||||
|
# regardless of branch/commit_type/description state. Catches cases
|
||||||
|
# where description leaked but is just a source title, not a claim.
|
||||||
|
if _is_source_slug(candidate_slug):
|
||||||
|
return "source"
|
||||||
|
|
||||||
|
# Everything else with a description is a new claim.
|
||||||
return "create"
|
return "create"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -59,7 +102,7 @@ def _extract_claim_slugs(description, branch=None):
|
||||||
if branch:
|
if branch:
|
||||||
parts = branch.split("/", 1)
|
parts = branch.split("/", 1)
|
||||||
if len(parts) > 1:
|
if len(parts) > 1:
|
||||||
return [parts[1][:120]]
|
return [parts[1]]
|
||||||
return []
|
return []
|
||||||
titles = [t.strip() for t in description.split("|") if t.strip()]
|
titles = [t.strip() for t in description.split("|") if t.strip()]
|
||||||
slugs = []
|
slugs = []
|
||||||
|
|
@ -68,7 +111,7 @@ def _extract_claim_slugs(description, branch=None):
|
||||||
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
|
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
|
||||||
slug = slug.replace(" ", "-").strip("-")
|
slug = slug.replace(" ", "-").strip("-")
|
||||||
if len(slug) > 10:
|
if len(slug) > 10:
|
||||||
slugs.append(slug[:120])
|
slugs.append(slug)
|
||||||
return slugs
|
return slugs
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -81,33 +124,60 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
|
||||||
def _build_events():
|
def _build_events():
|
||||||
conn = _get_conn()
|
conn = _get_conn()
|
||||||
try:
|
try:
|
||||||
rows = conn.execute("""
|
placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
|
||||||
|
rows = conn.execute(f"""
|
||||||
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
|
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
|
||||||
p.merged_at, p.description, p.commit_type, p.cost_usd,
|
p.merged_at, p.description, p.commit_type, p.cost_usd,
|
||||||
p.source_channel
|
p.source_channel, p.source_path
|
||||||
FROM prs p
|
FROM prs p
|
||||||
WHERE p.status = 'merged'
|
WHERE p.status = 'merged'
|
||||||
AND p.commit_type = 'knowledge'
|
AND p.commit_type IN ({placeholders})
|
||||||
AND p.merged_at IS NOT NULL
|
AND p.merged_at IS NOT NULL
|
||||||
ORDER BY p.merged_at DESC
|
ORDER BY p.merged_at DESC
|
||||||
LIMIT 2000
|
LIMIT 2000
|
||||||
""").fetchall()
|
""", _FEED_COMMIT_TYPES).fetchall()
|
||||||
|
|
||||||
events = []
|
events = []
|
||||||
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
|
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
|
||||||
|
|
||||||
for row in rows:
|
for row in rows:
|
||||||
event_type = _classify_event(row["branch"], row["description"], row["commit_type"])
|
slugs = _extract_claim_slugs(row["description"], row["branch"])
|
||||||
|
candidate_slug = slugs[0] if slugs else ""
|
||||||
|
event_type = _classify_event(
|
||||||
|
row["branch"], row["description"], row["commit_type"],
|
||||||
|
candidate_slug=candidate_slug,
|
||||||
|
)
|
||||||
if not event_type:
|
if not event_type:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
|
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
|
||||||
slugs = _extract_claim_slugs(row["description"], row["branch"])
|
|
||||||
merged_at = row["merged_at"] or ""
|
merged_at = row["merged_at"] or ""
|
||||||
|
|
||||||
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40}
|
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15}
|
||||||
ci_earned = ci_map.get(event_type, 0)
|
ci_earned = ci_map.get(event_type, 0)
|
||||||
|
|
||||||
|
# Source events never carry a claim_slug — no claim was written —
|
||||||
|
# so the frontend can't produce a 404-ing claim link.
|
||||||
|
if event_type == "source":
|
||||||
|
summary_text = _summary_from_branch(row["branch"])
|
||||||
|
source_slug = (
|
||||||
|
_summary_from_branch(row["branch"]).lower().replace(" ", "-")
|
||||||
|
or row["branch"]
|
||||||
|
)
|
||||||
|
events.append({
|
||||||
|
"type": "source",
|
||||||
|
"claim_slug": "",
|
||||||
|
"source_slug": source_slug,
|
||||||
|
"domain": row["domain"] or "unknown",
|
||||||
|
"contributor": contributor,
|
||||||
|
"timestamp": merged_at,
|
||||||
|
"ci_earned": round(ci_earned, 2),
|
||||||
|
"summary": summary_text,
|
||||||
|
"pr_number": row["number"],
|
||||||
|
"source_channel": row["source_channel"] or "unknown",
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
for slug in slugs:
|
for slug in slugs:
|
||||||
if slug not in claim_activity:
|
if slug not in claim_activity:
|
||||||
claim_activity[slug] = {
|
claim_activity[slug] = {
|
||||||
|
|
@ -164,8 +234,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts):
|
||||||
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
|
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
|
||||||
events.sort(key=hot_key, reverse=True)
|
events.sort(key=hot_key, reverse=True)
|
||||||
elif sort_mode == "important":
|
elif sort_mode == "important":
|
||||||
type_rank = {"challenge": 0, "enrich": 1, "create": 2}
|
type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3}
|
||||||
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"])))
|
events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"])))
|
||||||
return events
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -175,6 +245,8 @@ async def handle_activity_feed(request):
|
||||||
sort_mode = "recent"
|
sort_mode = "recent"
|
||||||
domain = request.query.get("domain", "")
|
domain = request.query.get("domain", "")
|
||||||
contributor = request.query.get("contributor", "")
|
contributor = request.query.get("contributor", "")
|
||||||
|
type_param = request.query.get("type", "")
|
||||||
|
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
|
||||||
try:
|
try:
|
||||||
limit = min(int(request.query.get("limit", "20")), 100)
|
limit = min(int(request.query.get("limit", "20")), 100)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
|
@ -196,6 +268,8 @@ async def handle_activity_feed(request):
|
||||||
filtered = [e for e in filtered if e["domain"] == domain]
|
filtered = [e for e in filtered if e["domain"] == domain]
|
||||||
if contributor:
|
if contributor:
|
||||||
filtered = [e for e in filtered if e["contributor"] == contributor]
|
filtered = [e for e in filtered if e["contributor"] == contributor]
|
||||||
|
if type_filter:
|
||||||
|
filtered = [e for e in filtered if e["type"] in type_filter]
|
||||||
|
|
||||||
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
|
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
|
||||||
total = len(sorted_events)
|
total = len(sorted_events)
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ from aiohttp import web
|
||||||
from review_queue_routes import register_review_queue_routes
|
from review_queue_routes import register_review_queue_routes
|
||||||
from daily_digest_routes import register_daily_digest_routes
|
from daily_digest_routes import register_daily_digest_routes
|
||||||
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
|
from response_audit_routes import register_response_audit_routes, RESPONSE_AUDIT_PUBLIC_PATHS
|
||||||
|
from leaderboard_routes import register_leaderboard_routes, LEADERBOARD_PUBLIC_PATHS
|
||||||
from lib.search import search as kb_search, embed_query, search_qdrant
|
from lib.search import search as kb_search, embed_query, search_qdrant
|
||||||
|
|
||||||
logger = logging.getLogger("argus")
|
logger = logging.getLogger("argus")
|
||||||
|
|
@ -508,7 +509,7 @@ def _load_secret(path: Path) -> str | None:
|
||||||
@web.middleware
|
@web.middleware
|
||||||
async def auth_middleware(request, handler):
|
async def auth_middleware(request, handler):
|
||||||
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
|
"""API key check. Public paths skip auth. Protected paths require X-Api-Key header."""
|
||||||
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
|
if request.path in _PUBLIC_PATHS or request.path in RESPONSE_AUDIT_PUBLIC_PATHS or request.path in LEADERBOARD_PUBLIC_PATHS or request.path.startswith("/api/response-audit/"):
|
||||||
return await handler(request)
|
return await handler(request)
|
||||||
expected = request.app.get("api_key")
|
expected = request.app.get("api_key")
|
||||||
if not expected:
|
if not expected:
|
||||||
|
|
@ -2361,6 +2362,8 @@ def create_app() -> web.Application:
|
||||||
# Response audit - cost tracking + reasoning traces
|
# Response audit - cost tracking + reasoning traces
|
||||||
app["db_path"] = str(DB_PATH)
|
app["db_path"] = str(DB_PATH)
|
||||||
register_response_audit_routes(app)
|
register_response_audit_routes(app)
|
||||||
|
# Event-sourced leaderboard (Phase B — reads contribution_events directly)
|
||||||
|
register_leaderboard_routes(app)
|
||||||
# Timeline activity feed (per-PR + audit_log events for dashboard v2)
|
# Timeline activity feed (per-PR + audit_log events for dashboard v2)
|
||||||
from activity_endpoint import handle_activity
|
from activity_endpoint import handle_activity
|
||||||
app.router.add_get("/api/activity", handle_activity)
|
app.router.add_get("/api/activity", handle_activity)
|
||||||
|
|
|
||||||
166
diagnostics/leaderboard_routes.py
Normal file
166
diagnostics/leaderboard_routes.py
Normal file
|
|
@ -0,0 +1,166 @@
|
||||||
|
"""Leaderboard endpoint reading from event-sourced contribution_events.
|
||||||
|
|
||||||
|
Owner: Argus
|
||||||
|
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
|
||||||
|
|
||||||
|
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
|
||||||
|
joins contributors for kind, returns sorted leaderboard with role breakdown.
|
||||||
|
|
||||||
|
Roles + weights (Phase A):
|
||||||
|
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
|
||||||
|
|
||||||
|
Endpoints:
|
||||||
|
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
|
logger = logging.getLogger("argus.leaderboard_routes")
|
||||||
|
|
||||||
|
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
|
||||||
|
KIND_VALUES = ("person", "agent", "org", "all")
|
||||||
|
|
||||||
|
# Public path set so auth middleware lets it through
|
||||||
|
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
|
||||||
|
|
||||||
|
|
||||||
|
def _conn(app):
|
||||||
|
"""Read-only connection to pipeline.db."""
|
||||||
|
db_path = app["db_path"]
|
||||||
|
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_window(raw):
|
||||||
|
"""Parse window param. Returns (sql_clause, params_tuple, label).
|
||||||
|
|
||||||
|
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
|
||||||
|
Caps N at 365d / 8760h to prevent abuse.
|
||||||
|
"""
|
||||||
|
if not raw or raw == "all_time":
|
||||||
|
return ("", (), "all_time")
|
||||||
|
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
|
||||||
|
if not m:
|
||||||
|
return ("", (), "all_time")
|
||||||
|
n = int(m.group(1))
|
||||||
|
unit = m.group(2)
|
||||||
|
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
|
||||||
|
if unit == "d":
|
||||||
|
n = min(n, 365)
|
||||||
|
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
|
||||||
|
n = min(n, 8760)
|
||||||
|
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_leaderboard(request):
|
||||||
|
"""GET /api/leaderboard.
|
||||||
|
|
||||||
|
Query params:
|
||||||
|
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
|
||||||
|
domain: filter by domain (optional)
|
||||||
|
kind: 'person' (default) | 'agent' | 'org' | 'all'
|
||||||
|
limit: max entries (default 100, max 500)
|
||||||
|
"""
|
||||||
|
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
|
||||||
|
domain = request.query.get("domain")
|
||||||
|
kind = request.query.get("kind", "person")
|
||||||
|
if kind not in KIND_VALUES:
|
||||||
|
kind = "person"
|
||||||
|
try:
|
||||||
|
limit = min(int(request.query.get("limit", "100")), 500)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
limit = 100
|
||||||
|
|
||||||
|
where = ["1=1", window_clause] if window_clause else ["1=1"]
|
||||||
|
params = list(window_params)
|
||||||
|
if domain:
|
||||||
|
where.append("ce.domain = ?")
|
||||||
|
params.append(domain)
|
||||||
|
if kind != "all":
|
||||||
|
where.append("COALESCE(c.kind, 'person') = ?")
|
||||||
|
params.append(kind)
|
||||||
|
|
||||||
|
where_sql = " AND ".join([w for w in where if w])
|
||||||
|
|
||||||
|
conn = _conn(request.app)
|
||||||
|
try:
|
||||||
|
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
|
||||||
|
# LEFT JOIN contributors so handles in events but not in contributors still appear
|
||||||
|
# (defaults to kind='person' via COALESCE).
|
||||||
|
rows = conn.execute(f"""
|
||||||
|
SELECT
|
||||||
|
ce.handle,
|
||||||
|
COALESCE(c.kind, 'person') AS kind,
|
||||||
|
ROUND(SUM(ce.weight), 4) AS ci,
|
||||||
|
COUNT(*) AS events_count,
|
||||||
|
MIN(ce.timestamp) AS first_contribution,
|
||||||
|
MAX(ce.timestamp) AS last_contribution,
|
||||||
|
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
|
||||||
|
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
|
||||||
|
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
|
||||||
|
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
|
||||||
|
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
|
||||||
|
COUNT(DISTINCT ce.domain) AS domain_count,
|
||||||
|
COUNT(DISTINCT ce.pr_number) AS pr_count
|
||||||
|
FROM contribution_events ce
|
||||||
|
LEFT JOIN contributors c ON c.handle = ce.handle
|
||||||
|
WHERE {where_sql}
|
||||||
|
GROUP BY ce.handle, COALESCE(c.kind, 'person')
|
||||||
|
ORDER BY ci DESC, last_contribution DESC
|
||||||
|
LIMIT ?
|
||||||
|
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
|
||||||
|
|
||||||
|
has_more = len(rows) > limit
|
||||||
|
rows = rows[:limit]
|
||||||
|
|
||||||
|
# Total count of distinct handles matching filters (without limit)
|
||||||
|
total_row = conn.execute(f"""
|
||||||
|
SELECT COUNT(DISTINCT ce.handle) AS total
|
||||||
|
FROM contribution_events ce
|
||||||
|
LEFT JOIN contributors c ON c.handle = ce.handle
|
||||||
|
WHERE {where_sql}
|
||||||
|
""", params).fetchone()
|
||||||
|
total = total_row["total"] if total_row else 0
|
||||||
|
|
||||||
|
leaderboard = []
|
||||||
|
for r in rows:
|
||||||
|
leaderboard.append({
|
||||||
|
"handle": r["handle"],
|
||||||
|
"kind": r["kind"],
|
||||||
|
"ci": r["ci"],
|
||||||
|
"ci_breakdown": {
|
||||||
|
"author": round(r["ci_author"] or 0, 4),
|
||||||
|
"challenger": round(r["ci_challenger"] or 0, 4),
|
||||||
|
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
|
||||||
|
"originator": round(r["ci_originator"] or 0, 4),
|
||||||
|
"evaluator": round(r["ci_evaluator"] or 0, 4),
|
||||||
|
},
|
||||||
|
"events_count": r["events_count"],
|
||||||
|
"domain_count": r["domain_count"],
|
||||||
|
"pr_count": r["pr_count"],
|
||||||
|
"first_contribution": r["first_contribution"],
|
||||||
|
"last_contribution": r["last_contribution"],
|
||||||
|
})
|
||||||
|
|
||||||
|
return web.json_response({
|
||||||
|
"window": window_label,
|
||||||
|
"domain": domain,
|
||||||
|
"kind_filter": kind,
|
||||||
|
"total": total,
|
||||||
|
"shown": len(leaderboard),
|
||||||
|
"has_more": has_more,
|
||||||
|
"source": "contribution_events", # explicit so consumers know the data origin
|
||||||
|
"leaderboard": leaderboard,
|
||||||
|
})
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def register_leaderboard_routes(app: web.Application):
|
||||||
|
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
|
||||||
|
app.router.add_get("/api/leaderboard", handle_leaderboard)
|
||||||
|
|
@ -15,6 +15,7 @@ Epimetheus owns this module. Leo reviews changes.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
import sqlite3
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
logger = logging.getLogger("pipeline.attribution")
|
logger = logging.getLogger("pipeline.attribution")
|
||||||
|
|
@ -81,6 +82,7 @@ def normalize_handle(handle: str, conn=None) -> str:
|
||||||
if not handle:
|
if not handle:
|
||||||
return ""
|
return ""
|
||||||
h = handle.strip().lower().lstrip("@")
|
h = handle.strip().lower().lstrip("@")
|
||||||
|
h = re.sub(r"\s*\(self-directed\)\s*$", "", h)
|
||||||
if conn is None:
|
if conn is None:
|
||||||
return h
|
return h
|
||||||
try:
|
try:
|
||||||
|
|
@ -108,6 +110,36 @@ def classify_kind(handle: str) -> str:
|
||||||
return "person"
|
return "person"
|
||||||
|
|
||||||
|
|
||||||
|
def is_publisher_handle(handle: str, conn) -> int | None:
|
||||||
|
"""Return publisher.id if the handle exists as a publisher name, else None.
|
||||||
|
|
||||||
|
Schema v26 split orgs/citations into the publishers table. Writer code
|
||||||
|
(upsert_contributor, insert_contribution_event) calls this to gate creating
|
||||||
|
contributor rows or events for handles that belong to publishers.
|
||||||
|
|
||||||
|
Without this gate, every merged PR with `sourcer: cnbc` (for example) would
|
||||||
|
re-create CNBC as a contributor and undo the v26 classifier cleanup.
|
||||||
|
|
||||||
|
Falls back gracefully on pre-v26 DBs: returns None if publishers table
|
||||||
|
doesn't exist yet (writer behaves like before, no regression).
|
||||||
|
"""
|
||||||
|
if not handle or conn is None:
|
||||||
|
return None
|
||||||
|
h = handle.strip().lower().lstrip("@")
|
||||||
|
try:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT id FROM publishers WHERE name = ?", (h,),
|
||||||
|
).fetchone()
|
||||||
|
if row:
|
||||||
|
return row["id"] if hasattr(row, "keys") else row[0]
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
# Pre-v26 DB: publishers table doesn't exist yet. Fall through to None
|
||||||
|
# so writer behaves as before. Any other exception class is real signal
|
||||||
|
# (programming error, lock contention, corruption) — let it propagate.
|
||||||
|
logger.debug("is_publisher_handle: publishers table not present (pre-v26?)", exc_info=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
# ─── Parse attribution from claim content ──────────────────────────────────
|
# ─── Parse attribution from claim content ──────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from . import config, db
|
from . import config, db
|
||||||
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, normalize_handle
|
from .attribution import AGENT_BRANCH_PREFIXES, classify_kind, is_publisher_handle, normalize_handle
|
||||||
from .forgejo import get_pr_diff
|
from .forgejo import get_pr_diff
|
||||||
|
|
||||||
logger = logging.getLogger("pipeline.contributor")
|
logger = logging.getLogger("pipeline.contributor")
|
||||||
|
|
@ -62,6 +62,12 @@ def insert_contribution_event(
|
||||||
canonical = normalize_handle(handle, conn=conn)
|
canonical = normalize_handle(handle, conn=conn)
|
||||||
if not canonical:
|
if not canonical:
|
||||||
return False
|
return False
|
||||||
|
# Schema v26 gate: handles classified as publishers (CNBC, SpaceNews, arxiv,
|
||||||
|
# etc.) are provenance metadata, not contributors. Don't credit them. Without
|
||||||
|
# this gate every merge re-creates org events and undoes the v26 cleanup.
|
||||||
|
if is_publisher_handle(canonical, conn) is not None:
|
||||||
|
logger.debug("insert_contribution_event: %r is a publisher — skipping event", canonical)
|
||||||
|
return False
|
||||||
kind = classify_kind(canonical)
|
kind = classify_kind(canonical)
|
||||||
try:
|
try:
|
||||||
cur = conn.execute(
|
cur = conn.execute(
|
||||||
|
|
@ -419,6 +425,21 @@ def upsert_contributor(
|
||||||
logger.warning("Unknown contributor role: %s", role)
|
logger.warning("Unknown contributor role: %s", role)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Schema v26 gate: orgs/citations live in publishers table, not contributors.
|
||||||
|
# Skip without writing so the v26 classifier cleanup isn't undone by every
|
||||||
|
# merge that has `sourcer: cnbc` (or similar) in claim frontmatter.
|
||||||
|
#
|
||||||
|
# Note: bare normalization (lower + lstrip @), no alias resolution. This is
|
||||||
|
# consistent with the existing `SELECT handle FROM contributors WHERE handle = ?`
|
||||||
|
# below — both look up by canonical-form-as-stored. Today's classifier produces
|
||||||
|
# one publisher row per canonical handle, so bare lookup hits. Branch 3 will
|
||||||
|
# normalize alias→canonical at writer entry points (extract.py, post_extract);
|
||||||
|
# at that point this gate auto-tightens because callers pass canonical handles.
|
||||||
|
canonical_handle = handle.strip().lower().lstrip("@") if handle else ""
|
||||||
|
if canonical_handle and is_publisher_handle(canonical_handle, conn) is not None:
|
||||||
|
logger.debug("upsert_contributor: %r is a publisher — skipping contributor row", canonical_handle)
|
||||||
|
return
|
||||||
|
|
||||||
existing = conn.execute(
|
existing = conn.execute(
|
||||||
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
|
"SELECT handle FROM contributors WHERE handle = ?", (handle,)
|
||||||
).fetchone()
|
).fetchone()
|
||||||
|
|
|
||||||
|
|
@ -267,6 +267,7 @@ format: tweet | thread
|
||||||
status: unprocessed
|
status: unprocessed
|
||||||
priority: high | medium | low
|
priority: high | medium | low
|
||||||
tags: [topic1, topic2]
|
tags: [topic1, topic2]
|
||||||
|
intake_tier: research-task
|
||||||
---
|
---
|
||||||
|
|
||||||
## Content
|
## Content
|
||||||
|
|
|
||||||
280
scripts/backfill-research-session-attribution.py
Normal file
280
scripts/backfill-research-session-attribution.py
Normal file
|
|
@ -0,0 +1,280 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Backfill: re-attribute research-session-derived PRs from m3taversal to agent.
|
||||||
|
|
||||||
|
Problem: research-session.sh used to write source frontmatter without
|
||||||
|
`proposed_by` / `intake_tier`, so extract.py's contributor-classification
|
||||||
|
fallback set `prs.submitted_by = '@m3taversal'`, which propagated into
|
||||||
|
`contribution_events` as a `handle='m3taversal', role='author'` row per
|
||||||
|
research-derived claim. Result: agent research credited to the human.
|
||||||
|
|
||||||
|
Forward fix is a frontmatter-template patch to research-session.sh.
|
||||||
|
This script corrects historical records.
|
||||||
|
|
||||||
|
Identification:
|
||||||
|
Research-session source archives are committed to teleo-codex with a
|
||||||
|
message matching `^<agent>: research session YYYY-MM-DD —`. The diff
|
||||||
|
for that commit lists `inbox/queue/*.md` files the agent created. Any
|
||||||
|
PR whose `source_path` matches one of those filenames is research-derived.
|
||||||
|
|
||||||
|
Touch list (per matched PR):
|
||||||
|
1. UPDATE prs SET submitted_by = '<agent> (self-directed)'
|
||||||
|
2. DELETE FROM contribution_events
|
||||||
|
WHERE handle='m3taversal' AND role='author' AND pr_number=?
|
||||||
|
3. INSERT OR IGNORE INTO contribution_events with handle=<agent>,
|
||||||
|
kind='agent', role='author', weight=0.30, original timestamp/domain/channel.
|
||||||
|
|
||||||
|
Defaults to --dry-run. Pass --apply to commit changes.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 backfill-research-session-attribution.py --dry-run --days 30
|
||||||
|
python3 backfill-research-session-attribution.py --apply --days 30
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from collections import defaultdict
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||||
|
logger = logging.getLogger("backfill-research-attr")
|
||||||
|
|
||||||
|
DEFAULT_REPO = Path(os.environ.get("REPO_DIR", "/opt/teleo-eval/workspaces/main"))
|
||||||
|
DEFAULT_DB = Path(os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db"))
|
||||||
|
|
||||||
|
KNOWN_AGENTS = frozenset({"rio", "leo", "theseus", "vida", "clay", "astra"})
|
||||||
|
COMMIT_HEADER_RE = re.compile(r"^([a-z]+):\s+research session\s+\d{4}-\d{2}-\d{2}\s+—")
|
||||||
|
AUTHOR_WEIGHT = 0.30
|
||||||
|
|
||||||
|
|
||||||
|
def git(repo: Path, *args: str) -> str:
|
||||||
|
"""Run a git command in repo, return stdout. Raises on non-zero."""
|
||||||
|
result = subprocess.run(
|
||||||
|
["git", "-C", str(repo), *args],
|
||||||
|
capture_output=True, text=True, check=True,
|
||||||
|
)
|
||||||
|
return result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def discover_research_session_archives(repo: Path, days: int) -> dict[str, str]:
|
||||||
|
"""Return {source_filename_basename: agent_handle} for last N days.
|
||||||
|
|
||||||
|
Walks teleo-codex `git log --since`, filters to research-session commits,
|
||||||
|
parses agent from message header, lists inbox/queue/*.md files added in
|
||||||
|
that commit's diff. Maps the basename (which becomes source_path on extract)
|
||||||
|
to the agent who created it.
|
||||||
|
"""
|
||||||
|
log = git(repo, "log", f"--since={days} days ago", "--pretty=%H|%s", "--no-merges")
|
||||||
|
file_to_agent: dict[str, str] = {}
|
||||||
|
commits_seen = 0
|
||||||
|
commits_matched = 0
|
||||||
|
for line in log.splitlines():
|
||||||
|
if not line or "|" not in line:
|
||||||
|
continue
|
||||||
|
commits_seen += 1
|
||||||
|
sha, _, subject = line.partition("|")
|
||||||
|
m = COMMIT_HEADER_RE.match(subject)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
agent = m.group(1)
|
||||||
|
if agent not in KNOWN_AGENTS:
|
||||||
|
logger.debug("skipping commit %s — unknown agent %r", sha[:8], agent)
|
||||||
|
continue
|
||||||
|
commits_matched += 1
|
||||||
|
# List files added in this commit (inbox/queue/*.md only)
|
||||||
|
try:
|
||||||
|
added = git(repo, "diff-tree", "--no-commit-id", "--name-only", "-r",
|
||||||
|
"--diff-filter=A", sha)
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
logger.warning("diff-tree failed for %s", sha[:8])
|
||||||
|
continue
|
||||||
|
for f in added.splitlines():
|
||||||
|
if f.startswith("inbox/queue/") and f.endswith(".md"):
|
||||||
|
basename = Path(f).name
|
||||||
|
if basename in file_to_agent and file_to_agent[basename] != agent:
|
||||||
|
logger.warning(
|
||||||
|
"filename collision: %s — was %s, now %s (keeping first)",
|
||||||
|
basename, file_to_agent[basename], agent,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
file_to_agent.setdefault(basename, agent)
|
||||||
|
logger.info(
|
||||||
|
"scanned %d commits, %d research-session matches, %d unique source files",
|
||||||
|
commits_seen, commits_matched, len(file_to_agent),
|
||||||
|
)
|
||||||
|
return file_to_agent
|
||||||
|
|
||||||
|
|
||||||
|
def find_misattributed_prs(conn: sqlite3.Connection, file_to_agent: dict[str, str], days: int):
|
||||||
|
"""Return list of (pr_number, current_submitted_by, source_path, agent, domain, channel, merged_at).
|
||||||
|
|
||||||
|
Only includes PRs:
|
||||||
|
- with source_path basename in our research-session map
|
||||||
|
- currently attributed to '@m3taversal'
|
||||||
|
- merged within the last N days (cap on temporal scope)
|
||||||
|
"""
|
||||||
|
rows = conn.execute(
|
||||||
|
"""SELECT number, submitted_by, source_path, domain, source_channel, merged_at
|
||||||
|
FROM prs
|
||||||
|
WHERE submitted_by = '@m3taversal'
|
||||||
|
AND source_path IS NOT NULL
|
||||||
|
AND status = 'merged'
|
||||||
|
AND merged_at > datetime('now', ?)""",
|
||||||
|
(f"-{days} days",),
|
||||||
|
).fetchall()
|
||||||
|
matches = []
|
||||||
|
for row in rows:
|
||||||
|
basename = Path(row["source_path"]).name
|
||||||
|
agent = file_to_agent.get(basename)
|
||||||
|
if agent:
|
||||||
|
matches.append({
|
||||||
|
"pr": row["number"],
|
||||||
|
"current_submitted_by": row["submitted_by"],
|
||||||
|
"source_path": row["source_path"],
|
||||||
|
"basename": basename,
|
||||||
|
"agent": agent,
|
||||||
|
"domain": row["domain"],
|
||||||
|
"channel": row["source_channel"],
|
||||||
|
"merged_at": row["merged_at"],
|
||||||
|
})
|
||||||
|
return matches
|
||||||
|
|
||||||
|
|
||||||
|
def existing_event_count(conn: sqlite3.Connection, pr: int, handle: str, role: str) -> int:
|
||||||
|
"""Return count of contribution_events rows matching (handle, role, pr_number, claim_path IS NULL)."""
|
||||||
|
return conn.execute(
|
||||||
|
"""SELECT COUNT(*) FROM contribution_events
|
||||||
|
WHERE handle = ? AND role = ? AND pr_number = ? AND claim_path IS NULL""",
|
||||||
|
(handle, role, pr),
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
|
def apply_backfill(conn: sqlite3.Connection, matches: list[dict], dry_run: bool) -> dict:
|
||||||
|
"""Apply the backfill. Returns counters."""
|
||||||
|
counters = defaultdict(int)
|
||||||
|
if not dry_run:
|
||||||
|
conn.execute("BEGIN")
|
||||||
|
try:
|
||||||
|
for m in matches:
|
||||||
|
pr = m["pr"]
|
||||||
|
agent = m["agent"]
|
||||||
|
|
||||||
|
# Pre-checks for accurate dry-run reporting
|
||||||
|
old_event_exists = existing_event_count(conn, pr, "m3taversal", "author") > 0
|
||||||
|
new_event_exists = existing_event_count(conn, pr, agent, "author") > 0
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
logger.info(
|
||||||
|
"would update pr=%d submitted_by '%s' → '%s (self-directed)' "
|
||||||
|
"[m3ta_event=%s, agent_event=%s]",
|
||||||
|
pr, m["current_submitted_by"], agent,
|
||||||
|
old_event_exists, new_event_exists,
|
||||||
|
)
|
||||||
|
counters["prs"] += 1
|
||||||
|
if old_event_exists:
|
||||||
|
counters["events_to_delete"] += 1
|
||||||
|
if not new_event_exists:
|
||||||
|
counters["events_to_insert"] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 1. UPDATE prs.submitted_by
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE prs SET submitted_by = ? WHERE number = ?",
|
||||||
|
(f"{agent} (self-directed)", pr),
|
||||||
|
)
|
||||||
|
counters["prs"] += 1
|
||||||
|
|
||||||
|
# 2. INSERT new agent author event (idempotent via UNIQUE index)
|
||||||
|
cur = conn.execute(
|
||||||
|
"""INSERT OR IGNORE INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path, domain, channel, timestamp)
|
||||||
|
VALUES (?, 'agent', 'author', ?, ?, NULL, ?, ?, COALESCE(?, datetime('now')))""",
|
||||||
|
(agent, AUTHOR_WEIGHT, pr, m["domain"], m["channel"], m["merged_at"]),
|
||||||
|
)
|
||||||
|
if cur.rowcount > 0:
|
||||||
|
counters["events_inserted"] += 1
|
||||||
|
|
||||||
|
# 3. DELETE old m3taversal author event
|
||||||
|
cur = conn.execute(
|
||||||
|
"""DELETE FROM contribution_events
|
||||||
|
WHERE handle = 'm3taversal' AND role = 'author'
|
||||||
|
AND pr_number = ? AND claim_path IS NULL""",
|
||||||
|
(pr,),
|
||||||
|
)
|
||||||
|
if cur.rowcount > 0:
|
||||||
|
counters["events_deleted"] += 1
|
||||||
|
|
||||||
|
if not dry_run:
|
||||||
|
conn.execute("COMMIT")
|
||||||
|
except Exception:
|
||||||
|
if not dry_run:
|
||||||
|
conn.execute("ROLLBACK")
|
||||||
|
raise
|
||||||
|
|
||||||
|
return dict(counters)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--repo", type=Path, default=DEFAULT_REPO)
|
||||||
|
parser.add_argument("--db", type=Path, default=DEFAULT_DB)
|
||||||
|
parser.add_argument("--days", type=int, default=30)
|
||||||
|
parser.add_argument("--apply", action="store_true", help="commit changes (default: dry-run)")
|
||||||
|
parser.add_argument("--limit", type=int, default=0,
|
||||||
|
help="cap PR updates (0 = no cap; useful for testing on a small slice)")
|
||||||
|
args = parser.parse_args()
|
||||||
|
dry_run = not args.apply
|
||||||
|
|
||||||
|
logger.info("repo=%s db=%s days=%d mode=%s",
|
||||||
|
args.repo, args.db, args.days, "DRY-RUN" if dry_run else "APPLY")
|
||||||
|
|
||||||
|
if not args.repo.exists():
|
||||||
|
logger.error("repo not found: %s", args.repo)
|
||||||
|
sys.exit(1)
|
||||||
|
if not args.db.exists():
|
||||||
|
logger.error("db not found: %s", args.db)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
file_to_agent = discover_research_session_archives(args.repo, args.days)
|
||||||
|
if not file_to_agent:
|
||||||
|
logger.warning("no research-session source files found in last %d days", args.days)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Per-agent breakdown
|
||||||
|
by_agent = defaultdict(int)
|
||||||
|
for agent in file_to_agent.values():
|
||||||
|
by_agent[agent] += 1
|
||||||
|
for agent, count in sorted(by_agent.items()):
|
||||||
|
logger.info(" research-session sources by %s: %d", agent, count)
|
||||||
|
|
||||||
|
conn = sqlite3.connect(args.db)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
matches = find_misattributed_prs(conn, file_to_agent, args.days)
|
||||||
|
logger.info("misattributed PRs found: %d", len(matches))
|
||||||
|
|
||||||
|
if args.limit and len(matches) > args.limit:
|
||||||
|
logger.info("--limit=%d — truncating from %d", args.limit, len(matches))
|
||||||
|
matches = matches[:args.limit]
|
||||||
|
|
||||||
|
if not matches:
|
||||||
|
logger.info("nothing to do")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Per-agent breakdown of misattribution
|
||||||
|
miss_by_agent = defaultdict(int)
|
||||||
|
for m in matches:
|
||||||
|
miss_by_agent[m["agent"]] += 1
|
||||||
|
logger.info("misattributed PR breakdown:")
|
||||||
|
for agent, count in sorted(miss_by_agent.items()):
|
||||||
|
logger.info(" %s: %d", agent, count)
|
||||||
|
|
||||||
|
counters = apply_backfill(conn, matches, dry_run)
|
||||||
|
logger.info("RESULT (%s): %s", "DRY-RUN" if dry_run else "APPLIED", counters)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -1,495 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""metadao-scrape.py — pull active/recent proposals from metadao.fi into source markdown.
|
|
||||||
|
|
||||||
Replaces the broken futard.io GraphQL ingestion (Cloud Run → teleo-api).
|
|
||||||
metadao.fi is a Vercel-protected Next.js App Router site; direct curl is blocked
|
|
||||||
by the anti-bot challenge. A real headless browser passes the challenge cleanly,
|
|
||||||
and once cookies are issued for the context we can call /api/decode-proposal/{addr}
|
|
||||||
from inside the browser to get structured instruction data.
|
|
||||||
|
|
||||||
Discovery flow:
|
|
||||||
1. visit / to prime Vercel cookies
|
|
||||||
2. visit /projects, scrape distinct /projects/{slug} hrefs
|
|
||||||
3. for each project, visit /projects/{slug}, scrape proposal addresses from DOM
|
|
||||||
4. for each NEW proposal (basename not already in --archive-dir):
|
|
||||||
a. visit proposal page, capture rendered prose
|
|
||||||
b. call /api/decode-proposal/{addr} via in-browser fetch for instructions
|
|
||||||
c. write source markdown to --output-dir
|
|
||||||
|
|
||||||
Idempotent. Skips proposals whose basename is already present in archive-dir
|
|
||||||
or output-dir. Designed to run from a systemd timer or one-shot.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
python3 metadao-scrape.py --archive-dir /opt/teleo-eval/workspaces/main/inbox/archive \\
|
|
||||||
--output-dir /opt/teleo-eval/workspaces/main/inbox/queue \\
|
|
||||||
[--dry-run] [--limit 10] [--project solomon]
|
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
from datetime import date, datetime
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
|
|
||||||
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format="%(asctime)s %(levelname)s %(message)s",
|
|
||||||
)
|
|
||||||
log = logging.getLogger("metadao-scrape")
|
|
||||||
|
|
||||||
BASE = "https://www.metadao.fi"
|
|
||||||
USER_AGENT = (
|
|
||||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
||||||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def slugify(text: str, max_len: int = 60) -> str:
|
|
||||||
s = text.lower().strip()
|
|
||||||
s = re.sub(r"[^a-z0-9\s-]", "", s)
|
|
||||||
s = re.sub(r"\s+", "-", s)
|
|
||||||
s = re.sub(r"-+", "-", s)
|
|
||||||
return s.strip("-")[:max_len].rstrip("-")
|
|
||||||
|
|
||||||
|
|
||||||
def _yaml_str(s: str) -> str:
|
|
||||||
"""Quote-safe YAML string. JSON strings are valid YAML strings."""
|
|
||||||
return json.dumps(s, ensure_ascii=False)
|
|
||||||
|
|
||||||
|
|
||||||
def existing_basenames(*dirs: Path) -> set[str]:
|
|
||||||
"""Collect all .md basenames (without extension) across the given dirs (recursive)."""
|
|
||||||
seen: set[str] = set()
|
|
||||||
for d in dirs:
|
|
||||||
if not d.exists():
|
|
||||||
continue
|
|
||||||
for p in d.rglob("*.md"):
|
|
||||||
seen.add(p.stem)
|
|
||||||
return seen
|
|
||||||
|
|
||||||
|
|
||||||
PROP_ADDR_RE = re.compile(r"proposal_address:\s*[\"']?([A-Za-z0-9]{32,44})[\"']?")
|
|
||||||
URL_ADDR_RE = re.compile(r"(?:futard\.io|metadao\.fi)(?:/[^/\s\"']*)*?/proposal/([A-Za-z0-9]{32,44})")
|
|
||||||
|
|
||||||
|
|
||||||
def existing_proposal_addresses(*dirs: Path) -> set[str]:
|
|
||||||
"""Scan frontmatter / URLs in existing source files to collect known proposal addresses.
|
|
||||||
|
|
||||||
Reads only the first 4KB of each file (frontmatter + URL line are at the top)
|
|
||||||
to keep this fast on large archives.
|
|
||||||
"""
|
|
||||||
addrs: set[str] = set()
|
|
||||||
for d in dirs:
|
|
||||||
if not d.exists():
|
|
||||||
continue
|
|
||||||
for p in d.rglob("*.md"):
|
|
||||||
try:
|
|
||||||
head = p.read_text(errors="replace")[:4096]
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
for m in PROP_ADDR_RE.finditer(head):
|
|
||||||
addrs.add(m.group(1))
|
|
||||||
for m in URL_ADDR_RE.finditer(head):
|
|
||||||
addrs.add(m.group(1))
|
|
||||||
return addrs
|
|
||||||
|
|
||||||
|
|
||||||
def list_project_slugs(page) -> list[str]:
|
|
||||||
"""Read /projects and extract distinct project slugs."""
|
|
||||||
page.goto(f"{BASE}/projects", wait_until="domcontentloaded", timeout=30000)
|
|
||||||
page.wait_for_timeout(1500)
|
|
||||||
hrefs = page.evaluate(
|
|
||||||
"""() => {
|
|
||||||
const links = Array.from(document.querySelectorAll('a[href^="/projects/"]'));
|
|
||||||
const slugs = new Set();
|
|
||||||
for (const a of links) {
|
|
||||||
const m = a.getAttribute('href').match(/^\\/projects\\/([a-z0-9-]+)(?:\\/|$)/);
|
|
||||||
if (m && m[1]) slugs.add(m[1]);
|
|
||||||
}
|
|
||||||
return [...slugs];
|
|
||||||
}"""
|
|
||||||
)
|
|
||||||
return list(hrefs)
|
|
||||||
|
|
||||||
|
|
||||||
def get_project_metadata(page, slug: str) -> dict:
|
|
||||||
"""Visit a project page and return basic metadata + proposal addresses + card text.
|
|
||||||
Card text typically contains 'SOLO-004 ENDED DP-00003 (MEM): The Gigabus Proposal Pass $0.64...'
|
|
||||||
so we capture it for downstream title parsing.
|
|
||||||
"""
|
|
||||||
url = f"{BASE}/projects/{slug}"
|
|
||||||
page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
|
||||||
page.wait_for_timeout(1500)
|
|
||||||
|
|
||||||
proposals = page.evaluate(
|
|
||||||
"""() => {
|
|
||||||
const links = Array.from(document.querySelectorAll('a[href*="/proposal/"]'));
|
|
||||||
const seen = new Set();
|
|
||||||
const out = [];
|
|
||||||
const TARGET_ADDR_RE = /\\/proposal\\/([A-Za-z0-9]+)/;
|
|
||||||
for (const a of links) {
|
|
||||||
const m = a.getAttribute('href').match(TARGET_ADDR_RE);
|
|
||||||
if (!m) continue;
|
|
||||||
if (seen.has(m[1])) continue;
|
|
||||||
seen.add(m[1]);
|
|
||||||
const addr = m[1];
|
|
||||||
// Walk up only while the ancestor contains exactly one proposal link
|
|
||||||
// (so we get the card, not a parent that contains all cards).
|
|
||||||
let card = a;
|
|
||||||
while (card.parentElement) {
|
|
||||||
const parent = card.parentElement;
|
|
||||||
const propLinks = parent.querySelectorAll('a[href*="/proposal/"]');
|
|
||||||
if (propLinks.length > 1) break;
|
|
||||||
card = parent;
|
|
||||||
}
|
|
||||||
out.push({
|
|
||||||
address: addr,
|
|
||||||
link_text: (a.innerText || '').trim().slice(0, 600),
|
|
||||||
card_text: (card.innerText || '').trim().slice(0, 1500),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}"""
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try to read project name from h1 / title
|
|
||||||
project_name = page.evaluate(
|
|
||||||
"""() => {
|
|
||||||
const h = document.querySelector('h1');
|
|
||||||
return h ? h.innerText.trim() : '';
|
|
||||||
}"""
|
|
||||||
) or slug.title()
|
|
||||||
|
|
||||||
return {"slug": slug, "name": project_name, "url": url, "proposals": proposals}
|
|
||||||
|
|
||||||
|
|
||||||
# Strict pattern: DP-NNNNN (CAT): Title — the canonical proposal heading.
|
|
||||||
DP_STRICT_RE = re.compile(r"DP-\d+\s*\([A-Z]+\)\s*[:\-]\s*[^\n\r]+", re.MULTILINE)
|
|
||||||
# Loose pattern: any line starting with DP-NNNNN followed by something.
|
|
||||||
DP_LOOSE_RE = re.compile(r"DP-\d+\s*(?:\([A-Z]+\))?\s*[:\-]?\s*[^\n\r]+", re.MULTILINE)
|
|
||||||
STAT_BLEED_RE = re.compile(
|
|
||||||
# Stat keywords only bleed when followed by a numeric/symbolic stat token,
|
|
||||||
# so word-only sequences like "Active Capital" or "Live Streaming Service" pass.
|
|
||||||
r"\s+\b(?:Pass|Fail|Passed|Failed|Active|Pending|Ended|Live|TOTAL|VOLUME|STATUS|MCAP|PRICE|SPOT)\b\s+(?:\$|\+|-|\d)"
|
|
||||||
r"|\s*(?:\$\d|\+\d{2,}|\d+\.\d+%|\d{5,})",
|
|
||||||
re.IGNORECASE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _clean_title_candidate(line: str) -> str:
|
|
||||||
line = line.strip()
|
|
||||||
# Find first bleed match past offset 10. re.search returns leftmost, but the
|
|
||||||
# DP-NNNNN digit sequence always wins first place; we want the first POST-title
|
|
||||||
# match instead. Walk all matches and trim at the earliest one past the guard.
|
|
||||||
for bleed in STAT_BLEED_RE.finditer(line):
|
|
||||||
if bleed.start() > 10:
|
|
||||||
line = line[: bleed.start()].rstrip(" :-—")
|
|
||||||
break
|
|
||||||
return line.strip()[:200]
|
|
||||||
|
|
||||||
|
|
||||||
def extract_dp_title(*texts: str) -> str:
|
|
||||||
"""Find the canonical 'DP-NNNNN (CAT): Title' line.
|
|
||||||
|
|
||||||
Strategy:
|
|
||||||
1. Try strict pattern (with parenthetical category code) across all sources.
|
|
||||||
Take the SHORTEST hit — prose continuations of an already-correct title
|
|
||||||
tend to be longer than the title itself.
|
|
||||||
2. Fall back to loose pattern, longest match.
|
|
||||||
"""
|
|
||||||
strict: list[str] = []
|
|
||||||
loose: list[str] = []
|
|
||||||
for t in texts:
|
|
||||||
if not t:
|
|
||||||
continue
|
|
||||||
for m in DP_STRICT_RE.finditer(t):
|
|
||||||
cleaned = _clean_title_candidate(m.group(0))
|
|
||||||
if cleaned:
|
|
||||||
strict.append(cleaned)
|
|
||||||
for m in DP_LOOSE_RE.finditer(t):
|
|
||||||
cleaned = _clean_title_candidate(m.group(0))
|
|
||||||
if cleaned:
|
|
||||||
loose.append(cleaned)
|
|
||||||
if strict:
|
|
||||||
return min(strict, key=len)
|
|
||||||
if loose:
|
|
||||||
return max(loose, key=len)
|
|
||||||
return ""
|
|
||||||
|
|
||||||
|
|
||||||
def fetch_proposal(page, project_slug: str, addr: str, card_text: str = "") -> dict | None:
|
|
||||||
"""Visit proposal page, capture rendered text + decode instructions via in-browser fetch."""
|
|
||||||
url = f"{BASE}/projects/{project_slug}/proposal/{addr}"
|
|
||||||
log.info("fetching proposal %s/%s", project_slug, addr[:8])
|
|
||||||
try:
|
|
||||||
page.goto(url, wait_until="domcontentloaded", timeout=45000)
|
|
||||||
except PWTimeout:
|
|
||||||
log.warning("timeout loading %s — using whatever rendered", url)
|
|
||||||
page.wait_for_timeout(2500) # let RSC stream finish
|
|
||||||
|
|
||||||
body_text = page.evaluate("() => document.body.innerText || ''")
|
|
||||||
|
|
||||||
# Title preference: card_text (from project page) → body_text DP-NNNNN match → first h1/h2
|
|
||||||
title_block = extract_dp_title(card_text, body_text)
|
|
||||||
if not title_block:
|
|
||||||
title_block = page.evaluate(
|
|
||||||
"""() => {
|
|
||||||
const h = document.querySelector('h1, h2');
|
|
||||||
return h ? h.innerText.trim() : '';
|
|
||||||
}"""
|
|
||||||
) or f"proposal-{addr[:8]}"
|
|
||||||
|
|
||||||
# Status: 'Passed' / 'Failed' / 'Active' / 'Pending'
|
|
||||||
status = page.evaluate(
|
|
||||||
"""() => {
|
|
||||||
const text = document.body.innerText || '';
|
|
||||||
const m = text.match(/\\n(Passed|Failed|Active|Pending|Live|Ended)\\b/);
|
|
||||||
return m ? m[1] : '';
|
|
||||||
}"""
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get the structured /api/decode-proposal data
|
|
||||||
decoded = None
|
|
||||||
try:
|
|
||||||
decoded = page.evaluate(
|
|
||||||
f"""async () => {{
|
|
||||||
try {{
|
|
||||||
const r = await fetch('/api/decode-proposal/{addr}');
|
|
||||||
if (!r.ok) return null;
|
|
||||||
return await r.json();
|
|
||||||
}} catch (e) {{ return null; }}
|
|
||||||
}}"""
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
log.debug("decode fetch failed for %s: %s", addr, e)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"address": addr,
|
|
||||||
"project_slug": project_slug,
|
|
||||||
"url": url,
|
|
||||||
"title": title_block,
|
|
||||||
"status": status,
|
|
||||||
"body_text": body_text,
|
|
||||||
"decoded": decoded,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def parse_dp_code(title: str) -> tuple[str, str]:
|
|
||||||
"""Parse 'DP-00003 (MEM): The Gigabus Proposal' → ('dp-00003-mem', 'The Gigabus Proposal').
|
|
||||||
Falls back gracefully if format doesn't match.
|
|
||||||
"""
|
|
||||||
# Match leading DP-NNNNN[space(category)]?[:]?[space]? plus the rest
|
|
||||||
m = re.match(r"^(DP-\d+(?:\s*\([A-Z]+\))?)\s*[:\-]?\s*(.*)$", title.strip())
|
|
||||||
if m:
|
|
||||||
code = re.sub(r"[^a-z0-9]+", "-", m.group(1).lower()).strip("-")
|
|
||||||
rest = m.group(2).strip()
|
|
||||||
return code, rest
|
|
||||||
return "", title.strip()
|
|
||||||
|
|
||||||
|
|
||||||
def build_filename(project_slug: str, proposal: dict, today: str) -> str:
|
|
||||||
"""YYYY-MM-DD-metadao-{slug}-{title-fragment}-{addr8}.md
|
|
||||||
|
|
||||||
Embedding the address fragment makes filenames stable across runs even when
|
|
||||||
the title isn't unique (e.g. projects that don't use DP-NNNNN naming).
|
|
||||||
"""
|
|
||||||
title = proposal.get("title") or ""
|
|
||||||
code, rest = parse_dp_code(title)
|
|
||||||
parts: list[str] = []
|
|
||||||
if code:
|
|
||||||
parts.append(code)
|
|
||||||
if rest:
|
|
||||||
parts.append(slugify(rest, max_len=40))
|
|
||||||
body_slug = "-".join(p for p in parts if p)[:60].rstrip("-")
|
|
||||||
addr_frag = proposal["address"][:8].lower()
|
|
||||||
if body_slug:
|
|
||||||
return f"{today}-metadao-{project_slug}-{body_slug}-{addr_frag}.md"
|
|
||||||
return f"{today}-metadao-{project_slug}-{addr_frag}.md"
|
|
||||||
|
|
||||||
|
|
||||||
def build_source_markdown(project: dict, proposal: dict, today: str) -> str:
|
|
||||||
"""Build the source markdown matching the existing schema."""
|
|
||||||
title = proposal.get("title") or f"{project['name']} proposal {proposal['address'][:8]}"
|
|
||||||
body_text = (proposal.get("body_text") or "").strip()
|
|
||||||
decoded = proposal.get("decoded") or {}
|
|
||||||
|
|
||||||
# Build YAML frontmatter — all free-text values escaped via _yaml_str (json.dumps).
|
|
||||||
# project_slug is constrained to [a-z0-9-] by slugify upstream, but pass through
|
|
||||||
# the same path for consistency.
|
|
||||||
full_title = f"MetaDAO: {project['name']} — {title}"
|
|
||||||
fm_lines = [
|
|
||||||
"---",
|
|
||||||
"type: source",
|
|
||||||
f"title: {_yaml_str(full_title)}",
|
|
||||||
f"author: {_yaml_str('metadao.fi')}",
|
|
||||||
f"url: {_yaml_str(proposal['url'])}",
|
|
||||||
f"date: {today}",
|
|
||||||
"domain: internet-finance",
|
|
||||||
"format: data",
|
|
||||||
"status: unprocessed",
|
|
||||||
f"tags: [futardio, metadao, futarchy, solana, governance, {project['slug']}]",
|
|
||||||
"event_type: proposal",
|
|
||||||
f"project_slug: {_yaml_str(project['slug'])}",
|
|
||||||
f"proposal_address: {_yaml_str(proposal['address'])}",
|
|
||||||
]
|
|
||||||
if proposal.get("status"):
|
|
||||||
fm_lines.append(f"proposal_status: {_yaml_str(proposal['status'])}")
|
|
||||||
if decoded.get("squadsProposal"):
|
|
||||||
fm_lines.append(f"squads_proposal: {_yaml_str(decoded['squadsProposal'])}")
|
|
||||||
if decoded.get("squadsStatus"):
|
|
||||||
fm_lines.append(f"squads_status: {_yaml_str(decoded['squadsStatus'])}")
|
|
||||||
fm_lines.append("---")
|
|
||||||
fm_lines.append("")
|
|
||||||
|
|
||||||
# Header section — quick facts
|
|
||||||
body_md = [
|
|
||||||
f"# {title}",
|
|
||||||
"",
|
|
||||||
"## Proposal Details",
|
|
||||||
f"- Project: {project['name']} (`{project['slug']}`)",
|
|
||||||
f"- Proposal: {title}",
|
|
||||||
f"- Address: `{proposal['address']}`",
|
|
||||||
]
|
|
||||||
if proposal.get("status"):
|
|
||||||
body_md.append(f"- Status: {proposal['status']}")
|
|
||||||
body_md.append(f"- URL: {proposal['url']}")
|
|
||||||
|
|
||||||
# Proposal prose body (rendered text from the page)
|
|
||||||
body_md.append("")
|
|
||||||
body_md.append("## Proposal Body")
|
|
||||||
body_md.append("")
|
|
||||||
body_md.append(body_text or "_(no body captured)_")
|
|
||||||
|
|
||||||
# Decoded on-chain instructions
|
|
||||||
if decoded:
|
|
||||||
body_md.append("")
|
|
||||||
body_md.append("## On-chain Decoded")
|
|
||||||
if decoded.get("squadsUrl"):
|
|
||||||
body_md.append(f"- Squads: {decoded['squadsUrl']}")
|
|
||||||
instrs = decoded.get("instructions") or []
|
|
||||||
if instrs:
|
|
||||||
body_md.append("")
|
|
||||||
body_md.append("### Instructions")
|
|
||||||
for i, instr in enumerate(instrs, 1):
|
|
||||||
body_md.append(f"{i}. **{instr.get('description', instr.get('type', 'instruction'))}** ({instr.get('program', '')})")
|
|
||||||
for f in instr.get("fields", []) or []:
|
|
||||||
val = f.get("fullValue") or f.get("value") or ""
|
|
||||||
body_md.append(f" - {f.get('label', '')}: `{val}`")
|
|
||||||
if instr.get("summary"):
|
|
||||||
body_md.append(f" - Summary: {instr['summary']}")
|
|
||||||
|
|
||||||
return "\n".join(fm_lines + body_md) + "\n"
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> int:
|
|
||||||
p = argparse.ArgumentParser(description="Scrape MetaDAO proposals into inbox source files")
|
|
||||||
p.add_argument("--archive-dir", required=True, help="existing archive dir (skip if basename exists here)")
|
|
||||||
p.add_argument("--output-dir", required=True, help="dir to write new source markdown into")
|
|
||||||
p.add_argument("--project", help="restrict to a single project slug (default: scan all)")
|
|
||||||
p.add_argument("--limit", type=int, default=0, help="max number of new proposals to capture (0 = unlimited)")
|
|
||||||
p.add_argument("--dry-run", action="store_true", help="print intended writes instead of writing")
|
|
||||||
p.add_argument("--headless", action="store_true", default=True)
|
|
||||||
args = p.parse_args()
|
|
||||||
|
|
||||||
archive_dir = Path(args.archive_dir).resolve()
|
|
||||||
output_dir = Path(args.output_dir).resolve()
|
|
||||||
seen_basenames = existing_basenames(archive_dir, output_dir)
|
|
||||||
seen_addresses = existing_proposal_addresses(archive_dir, output_dir)
|
|
||||||
log.info("loaded %d existing basenames + %d known proposal addresses from %s + %s",
|
|
||||||
len(seen_basenames), len(seen_addresses), archive_dir, output_dir)
|
|
||||||
|
|
||||||
today = date.today().isoformat()
|
|
||||||
|
|
||||||
written: list[str] = []
|
|
||||||
skipped_existing = 0
|
|
||||||
|
|
||||||
with sync_playwright() as pw:
|
|
||||||
browser = pw.chromium.launch(headless=args.headless)
|
|
||||||
ctx = browser.new_context(user_agent=USER_AGENT)
|
|
||||||
page = ctx.new_page()
|
|
||||||
|
|
||||||
# Prime cookies
|
|
||||||
log.info("priming Vercel session via homepage")
|
|
||||||
page.goto(f"{BASE}/", wait_until="domcontentloaded", timeout=30000)
|
|
||||||
page.wait_for_timeout(1500)
|
|
||||||
|
|
||||||
# Discovery
|
|
||||||
if args.project:
|
|
||||||
project_slugs = [args.project]
|
|
||||||
else:
|
|
||||||
project_slugs = list_project_slugs(page)
|
|
||||||
log.info("discovered %d project slugs: %s", len(project_slugs), project_slugs)
|
|
||||||
|
|
||||||
for slug in project_slugs:
|
|
||||||
try:
|
|
||||||
project = get_project_metadata(page, slug)
|
|
||||||
except Exception:
|
|
||||||
log.exception("failed to read project %s", slug)
|
|
||||||
continue
|
|
||||||
log.info(" %s — %d proposals", slug, len(project["proposals"]))
|
|
||||||
|
|
||||||
for prop in project["proposals"]:
|
|
||||||
addr = prop["address"]
|
|
||||||
# Pre-check #1: known proposal address (cheapest, no browser visit)
|
|
||||||
if addr in seen_addresses:
|
|
||||||
skipped_existing += 1
|
|
||||||
continue
|
|
||||||
# Pre-check #2: address fragment in an existing basename
|
|
||||||
addr_frag = addr[:8].lower()
|
|
||||||
if any(addr_frag in b.lower() for b in seen_basenames):
|
|
||||||
skipped_existing += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
proposal_data = fetch_proposal(page, slug, addr, card_text=prop.get("card_text", ""))
|
|
||||||
except Exception:
|
|
||||||
log.exception("failed to fetch proposal %s/%s", slug, addr)
|
|
||||||
continue
|
|
||||||
if not proposal_data:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Minimum-render gate: skip partial renders rather than archiving stubs.
|
|
||||||
# Successful captures are 20KB+; require either a real body or a DP-N title.
|
|
||||||
body_len = len(proposal_data.get("body_text") or "")
|
|
||||||
has_dp_match = bool(re.search(r"DP-\d+", proposal_data.get("title", "") or ""))
|
|
||||||
if body_len < 500 and not has_dp_match:
|
|
||||||
log.warning(" skip (insufficient render): %s body=%dB title=%r",
|
|
||||||
addr, body_len, proposal_data.get("title", ""))
|
|
||||||
continue
|
|
||||||
|
|
||||||
fname = build_filename(slug, proposal_data, today)
|
|
||||||
|
|
||||||
if Path(fname).stem in seen_basenames:
|
|
||||||
skipped_existing += 1
|
|
||||||
log.info(" skip (already archived by title): %s", fname)
|
|
||||||
continue
|
|
||||||
|
|
||||||
content = build_source_markdown(project, proposal_data, today)
|
|
||||||
target = output_dir / fname
|
|
||||||
if args.dry_run:
|
|
||||||
log.info(" DRY: would write %s (%d bytes)", target, len(content))
|
|
||||||
else:
|
|
||||||
target.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
target.write_text(content)
|
|
||||||
log.info(" wrote %s (%d bytes)", target, len(content))
|
|
||||||
written.append(fname)
|
|
||||||
|
|
||||||
if args.limit and len(written) >= args.limit:
|
|
||||||
log.info("hit limit=%d, stopping", args.limit)
|
|
||||||
browser.close()
|
|
||||||
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
|
|
||||||
return 0
|
|
||||||
|
|
||||||
browser.close()
|
|
||||||
|
|
||||||
print(json.dumps({"written": written, "skipped_existing": skipped_existing, "dry_run": args.dry_run}))
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
sys.exit(main())
|
|
||||||
108
scripts/reset-m3taversal-sourcer.py
Normal file
108
scripts/reset-m3taversal-sourcer.py
Normal file
|
|
@ -0,0 +1,108 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Reset m3taversal.sourcer_count from inflated legacy value to file-truth count.
|
||||||
|
|
||||||
|
Background: pre-Phase-A extract.py had a `submitted_by` fallback that credited
|
||||||
|
m3taversal as sourcer for every Telegram-ingested source, accumulating to 1011
|
||||||
|
sourcer_count in the contributors table. The actual file-truth count (sourcer
|
||||||
|
frontmatter equal to "m3taversal" in claim files) is 21. The 990-row delta is
|
||||||
|
infrastructure attribution that doesn't reflect content authorship.
|
||||||
|
|
||||||
|
The Phase A event-sourced ledger (contribution_events) computed the correct
|
||||||
|
389.55 CI from author events; /api/leaderboard reads from there directly.
|
||||||
|
But the legacy /api/contributors endpoint reads contributors.claims_merged
|
||||||
|
which carries the inflated 1011. Until that endpoint is deprecated, the
|
||||||
|
divergence shows two different numbers depending on which surface the UI
|
||||||
|
queries.
|
||||||
|
|
||||||
|
This script applies the surgical UPDATE that was run on VPS on 2026-04-27
|
||||||
|
during the leaderboard cutover. Committed as a script per Ganymede review:
|
||||||
|
"DB mutations go through reviewable code paths matters more than the
|
||||||
|
convenience of one-shot SQL. The artifact explains what was done and why."
|
||||||
|
|
||||||
|
Idempotent — safe to re-run. If sourcer_count is already 21, no change.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 scripts/reset-m3taversal-sourcer.py --dry-run
|
||||||
|
python3 scripts/reset-m3taversal-sourcer.py
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
|
||||||
|
TARGET_HANDLE = "m3taversal"
|
||||||
|
TRUTH_SOURCER_COUNT = 21
|
||||||
|
TRUTH_CLAIMS_MERGED = 21
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--dry-run", action="store_true")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not Path(DB_PATH).exists():
|
||||||
|
print(f"ERROR: DB not found at {DB_PATH}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
conn = sqlite3.connect(DB_PATH, timeout=30)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT handle, sourcer_count, claims_merged FROM contributors WHERE handle = ?",
|
||||||
|
(TARGET_HANDLE,),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
print(f" No contributors row for {TARGET_HANDLE} — nothing to reset.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(
|
||||||
|
f" Current: {row['handle']} sourcer_count={row['sourcer_count']} "
|
||||||
|
f"claims_merged={row['claims_merged']}"
|
||||||
|
)
|
||||||
|
print(f" Target: sourcer_count={TRUTH_SOURCER_COUNT} claims_merged={TRUTH_CLAIMS_MERGED}")
|
||||||
|
|
||||||
|
if (row["sourcer_count"] == TRUTH_SOURCER_COUNT
|
||||||
|
and row["claims_merged"] == TRUTH_CLAIMS_MERGED):
|
||||||
|
print(" Already at target values — no-op.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
print(" (dry-run) UPDATE would be applied. Re-run without --dry-run.")
|
||||||
|
return
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
"""UPDATE contributors SET
|
||||||
|
sourcer_count = ?,
|
||||||
|
claims_merged = ?,
|
||||||
|
updated_at = datetime('now')
|
||||||
|
WHERE handle = ?""",
|
||||||
|
(TRUTH_SOURCER_COUNT, TRUTH_CLAIMS_MERGED, TARGET_HANDLE),
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
"""INSERT INTO audit_log (stage, event, detail) VALUES (?, ?, ?)""",
|
||||||
|
(
|
||||||
|
"manual",
|
||||||
|
"m3taversal_sourcer_reset",
|
||||||
|
(
|
||||||
|
'{"reason":"Pre-Phase-A submitted_by fallback inflated to 1011; '
|
||||||
|
'file-truth is 21","sourcer_count_before":1011,'
|
||||||
|
'"sourcer_count_after":21,"claims_merged_after":21}'
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
after = conn.execute(
|
||||||
|
"SELECT sourcer_count, claims_merged FROM contributors WHERE handle = ?",
|
||||||
|
(TARGET_HANDLE,),
|
||||||
|
).fetchone()
|
||||||
|
print(
|
||||||
|
f" Applied. Now: sourcer_count={after['sourcer_count']} "
|
||||||
|
f"claims_merged={after['claims_merged']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
152
tests/test_activity_classify.py
Normal file
152
tests/test_activity_classify.py
Normal file
|
|
@ -0,0 +1,152 @@
|
||||||
|
"""Tests for diagnostics/activity_endpoint.py classify_pr_operation.
|
||||||
|
|
||||||
|
Covers the Leo gotcha — extract/* branches with commit_type=enrich or
|
||||||
|
challenge classify by commit_type, not branch prefix. Same class of bug
|
||||||
|
as the contributor-role wiring fix.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# diagnostics/ isn't on sys.path by default; add it for these tests.
|
||||||
|
_DIAG = Path(__file__).resolve().parents[1] / "diagnostics"
|
||||||
|
if str(_DIAG) not in sys.path:
|
||||||
|
sys.path.insert(0, str(_DIAG))
|
||||||
|
|
||||||
|
# aiohttp is imported at module load time; skip cleanly if not installed.
|
||||||
|
pytest.importorskip("aiohttp")
|
||||||
|
|
||||||
|
from activity_endpoint import classify_pr_operation # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Merged PRs: commit_type wins over branch prefix ───────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_branch_legacy_knowledge_classifies_new():
|
||||||
|
assert classify_pr_operation("merged", "knowledge", "extract/foo", None) == "new"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_branch_with_enrich_commit_type_classifies_enrich():
|
||||||
|
"""Leo gotcha: extract/* + commit_type=enrich → enrich, not new."""
|
||||||
|
assert classify_pr_operation("merged", "enrich", "extract/foo", None) == "enrich"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_branch_with_challenge_commit_type_classifies_challenge():
|
||||||
|
"""Leo gotcha: extract/* + commit_type=challenge → challenge, not new."""
|
||||||
|
assert classify_pr_operation("merged", "challenge", "extract/foo", None) == "challenge"
|
||||||
|
|
||||||
|
|
||||||
|
def test_challenged_by_in_description_classifies_challenge():
|
||||||
|
assert (
|
||||||
|
classify_pr_operation(
|
||||||
|
"merged", "knowledge", "extract/foo", "evidence for challenged_by claim"
|
||||||
|
)
|
||||||
|
== "challenge"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Branch prefix fallback (when commit_type is generic) ──────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_reweave_branch_classifies_enrich():
|
||||||
|
assert classify_pr_operation("merged", "knowledge", "reweave/batch-1", None) == "enrich"
|
||||||
|
|
||||||
|
|
||||||
|
def test_challenge_branch_classifies_challenge():
|
||||||
|
assert (
|
||||||
|
classify_pr_operation("merged", "knowledge", "challenge/nuclear-moloch", None)
|
||||||
|
== "challenge"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Maintenance commit_types → infra ──────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_fix_commit_type_classifies_infra():
|
||||||
|
assert classify_pr_operation("merged", "fix", "fix/deploy-bug", None) == "infra"
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_commit_type_classifies_infra():
|
||||||
|
assert (
|
||||||
|
classify_pr_operation("merged", "pipeline", "epimetheus/migration-v14", None)
|
||||||
|
== "infra"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Knowledge-producing commit_types → new ────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_research_commit_type_classifies_new():
|
||||||
|
assert (
|
||||||
|
classify_pr_operation("merged", "research", "theseus/cornelius-batch-2", None)
|
||||||
|
== "new"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_entity_commit_type_classifies_new():
|
||||||
|
assert classify_pr_operation("merged", "entity", "leo/entities-update", None) == "new"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Non-merged statuses route through NON_MERGED_STATUS_TO_OPERATION ──────
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_pr_classifies_extract():
|
||||||
|
assert classify_pr_operation("open", None, "extract/foo", None) == "extract"
|
||||||
|
|
||||||
|
|
||||||
|
def test_approved_pr_classifies_new():
|
||||||
|
assert classify_pr_operation("approved", None, "extract/foo", None) == "new"
|
||||||
|
|
||||||
|
|
||||||
|
def test_closed_pr_classifies_infra():
|
||||||
|
assert classify_pr_operation("closed", None, "extract/foo", None) == "infra"
|
||||||
|
|
||||||
|
|
||||||
|
def test_conflict_pr_classifies_challenge():
|
||||||
|
assert classify_pr_operation("conflict", None, "extract/foo", None) == "challenge"
|
||||||
|
|
||||||
|
|
||||||
|
def test_validating_pr_classifies_extract():
|
||||||
|
assert classify_pr_operation("validating", None, "extract/foo", None) == "extract"
|
||||||
|
|
||||||
|
|
||||||
|
def test_reviewing_pr_classifies_extract():
|
||||||
|
assert classify_pr_operation("reviewing", None, "extract/foo", None) == "extract"
|
||||||
|
|
||||||
|
|
||||||
|
def test_merging_pr_classifies_new():
|
||||||
|
assert classify_pr_operation("merging", None, "extract/foo", None) == "new"
|
||||||
|
|
||||||
|
|
||||||
|
def test_zombie_pr_classifies_infra():
|
||||||
|
assert classify_pr_operation("zombie", None, "extract/foo", None) == "infra"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Priority order: reweave commit_type vs reweave/ branch ─────────────────
|
||||||
|
# Reweave commit_type is in _MAINTENANCE_COMMIT_TYPES (→ infra), but
|
||||||
|
# branch.startswith('reweave/') is checked first (→ enrich). The bifurcation
|
||||||
|
# is real spec behavior — nightly reweave PRs must classify as enrich, not
|
||||||
|
# infra. Locking this in prevents a silent flip on future priority refactors.
|
||||||
|
|
||||||
|
|
||||||
|
def test_reweave_commit_type_with_reweave_branch_classifies_enrich():
|
||||||
|
"""Branch prefix wins over maintenance — reweave PRs are enrich, not infra."""
|
||||||
|
assert classify_pr_operation("merged", "reweave", "reweave/batch-1", None) == "enrich"
|
||||||
|
|
||||||
|
|
||||||
|
def test_reweave_commit_type_without_reweave_branch_classifies_infra():
|
||||||
|
"""Without reweave/ prefix, reweave commit_type falls to maintenance → infra."""
|
||||||
|
assert classify_pr_operation("merged", "reweave", "epimetheus/foo", None) == "infra"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Defensive cases — null/empty inputs shouldn't crash ───────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_null_commit_type_and_branch_classifies_new():
|
||||||
|
assert classify_pr_operation("merged", None, None, None) == "new"
|
||||||
|
|
||||||
|
|
||||||
|
def test_unknown_status_falls_back_to_infra():
|
||||||
|
assert classify_pr_operation("nonsense", None, None, None) == "infra"
|
||||||
437
tests/test_leaderboard.py
Normal file
437
tests/test_leaderboard.py
Normal file
|
|
@ -0,0 +1,437 @@
|
||||||
|
"""Tests for /api/leaderboard endpoint (diagnostics/leaderboard_routes.py).
|
||||||
|
|
||||||
|
Locks behavior for the four slicings consumed by Argus + Oberon:
|
||||||
|
- window: all_time | Nd | Nh
|
||||||
|
- domain: per-domain filter
|
||||||
|
- kind: person | agent | org | all
|
||||||
|
- limit: pagination + has_more flag
|
||||||
|
|
||||||
|
Regression coverage includes the AND-prefix SQL bug (commit 42d35d4): _parse_window
|
||||||
|
returned clauses prefixed with 'AND ' which produced 'WHERE 1=1 AND AND ...' when
|
||||||
|
joined into the WHERE clause via " AND ".join(...).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Skip whole file if aiohttp isn't available (matches test_activity_classify.py pattern)
|
||||||
|
aiohttp = pytest.importorskip("aiohttp")
|
||||||
|
|
||||||
|
# Make diagnostics/ importable
|
||||||
|
import sys
|
||||||
|
DIAG_ROOT = Path(__file__).parent.parent / "diagnostics"
|
||||||
|
sys.path.insert(0, str(DIAG_ROOT))
|
||||||
|
|
||||||
|
from leaderboard_routes import ( # noqa: E402
|
||||||
|
_parse_window,
|
||||||
|
handle_leaderboard,
|
||||||
|
KIND_VALUES,
|
||||||
|
LEADERBOARD_PUBLIC_PATHS,
|
||||||
|
)
|
||||||
|
from aiohttp.test_utils import make_mocked_request # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Schema lifted from lib/db.py:138-209 (v25 minimum) ──────────────────────
|
||||||
|
|
||||||
|
SCHEMA = """
|
||||||
|
CREATE TABLE contributors (
|
||||||
|
handle TEXT PRIMARY KEY,
|
||||||
|
kind TEXT DEFAULT 'person',
|
||||||
|
tier TEXT DEFAULT 'new',
|
||||||
|
claims_merged INTEGER DEFAULT 0,
|
||||||
|
sourcer_count INTEGER DEFAULT 0,
|
||||||
|
extractor_count INTEGER DEFAULT 0,
|
||||||
|
challenger_count INTEGER DEFAULT 0,
|
||||||
|
synthesizer_count INTEGER DEFAULT 0,
|
||||||
|
reviewer_count INTEGER DEFAULT 0,
|
||||||
|
challenges_survived INTEGER DEFAULT 0,
|
||||||
|
domains TEXT DEFAULT '[]',
|
||||||
|
first_contribution TEXT,
|
||||||
|
last_contribution TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE contribution_events (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
handle TEXT NOT NULL,
|
||||||
|
kind TEXT NOT NULL DEFAULT 'person',
|
||||||
|
role TEXT NOT NULL,
|
||||||
|
weight REAL NOT NULL,
|
||||||
|
pr_number INTEGER NOT NULL,
|
||||||
|
claim_path TEXT,
|
||||||
|
domain TEXT,
|
||||||
|
channel TEXT,
|
||||||
|
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
|
||||||
|
handle, role, pr_number, claim_path
|
||||||
|
) WHERE claim_path IS NOT NULL;
|
||||||
|
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
|
||||||
|
handle, role, pr_number
|
||||||
|
) WHERE claim_path IS NULL;
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Fixtures ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def db_path(tmp_path):
|
||||||
|
"""Seeded pipeline.db with deterministic events.
|
||||||
|
|
||||||
|
Cohort:
|
||||||
|
- alice (person): 3 author events, 1 originator (recent 3d, internet-finance)
|
||||||
|
- bob (person): 5 author events (older, 60d ago, ai-alignment)
|
||||||
|
- carol (person): 1 author + 1 evaluator (today, internet-finance)
|
||||||
|
- rio (agent): 4 author + 2 evaluator (mixed, internet-finance + grand-strategy)
|
||||||
|
- leo (agent): 8 evaluator events (today, mixed domains)
|
||||||
|
- cnbc (org): 2 originator events (legacy, before classifier moved orgs)
|
||||||
|
- newhandle (no contributors row): 1 author event — tests LEFT JOIN COALESCE
|
||||||
|
"""
|
||||||
|
p = tmp_path / "pipeline.db"
|
||||||
|
conn = sqlite3.connect(str(p))
|
||||||
|
conn.executescript(SCHEMA)
|
||||||
|
|
||||||
|
contribs = [
|
||||||
|
("alice", "person"),
|
||||||
|
("bob", "person"),
|
||||||
|
("carol", "person"),
|
||||||
|
("rio", "agent"),
|
||||||
|
("leo", "agent"),
|
||||||
|
("cnbc", "org"),
|
||||||
|
# newhandle intentionally absent — tests LEFT JOIN
|
||||||
|
]
|
||||||
|
for handle, kind in contribs:
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO contributors (handle, kind) VALUES (?, ?)",
|
||||||
|
(handle, kind),
|
||||||
|
)
|
||||||
|
|
||||||
|
# (handle, role, weight, pr_number, claim_path, domain, timestamp)
|
||||||
|
events = [
|
||||||
|
# alice — 3 author + 1 originator, recent (all >24h ago, all <7d)
|
||||||
|
# Most-recent event at -2 days (not -1 days) so 24h window exclusion is
|
||||||
|
# unambiguous and not subject to fixture-vs-query microsecond drift.
|
||||||
|
("alice", "author", 0.30, 100, None, "internet-finance", "now,-2 days"),
|
||||||
|
("alice", "author", 0.30, 101, None, "internet-finance", "now,-2 days"),
|
||||||
|
("alice", "author", 0.30, 102, None, "ai-alignment", "now,-3 days"),
|
||||||
|
("alice", "originator", 0.15, 103, "domains/internet-finance/x.md", "internet-finance", "now,-2 days"),
|
||||||
|
# bob — 5 author, all 60d ago (outside 30d, inside all_time)
|
||||||
|
("bob", "author", 0.30, 200, None, "ai-alignment", "now,-60 days"),
|
||||||
|
("bob", "author", 0.30, 201, None, "ai-alignment", "now,-60 days"),
|
||||||
|
("bob", "author", 0.30, 202, None, "ai-alignment", "now,-61 days"),
|
||||||
|
("bob", "author", 0.30, 203, None, "ai-alignment", "now,-62 days"),
|
||||||
|
("bob", "author", 0.30, 204, None, "ai-alignment", "now,-63 days"),
|
||||||
|
# carol — 1 author + 1 evaluator, today
|
||||||
|
("carol", "author", 0.30, 300, None, "internet-finance", "now"),
|
||||||
|
("carol", "evaluator", 0.05, 301, None, "internet-finance", "now"),
|
||||||
|
# rio agent — 4 author + 2 evaluator
|
||||||
|
("rio", "author", 0.30, 400, None, "internet-finance", "now,-2 days"),
|
||||||
|
("rio", "author", 0.30, 401, None, "grand-strategy", "now,-2 days"),
|
||||||
|
("rio", "author", 0.30, 402, None, "internet-finance", "now,-2 days"),
|
||||||
|
("rio", "author", 0.30, 403, None, "internet-finance", "now,-2 days"),
|
||||||
|
("rio", "evaluator", 0.05, 404, None, "ai-alignment", "now,-2 days"),
|
||||||
|
("rio", "evaluator", 0.05, 405, None, "ai-alignment", "now,-2 days"),
|
||||||
|
# leo agent — 8 evaluator
|
||||||
|
*[
|
||||||
|
("leo", "evaluator", 0.05, 500 + i, None, "internet-finance" if i % 2 == 0 else "ai-alignment", "now")
|
||||||
|
for i in range(8)
|
||||||
|
],
|
||||||
|
# cnbc org — 2 originator (legacy data, kept by classifier+gate split)
|
||||||
|
("cnbc", "originator", 0.15, 600, "domains/internet-finance/y.md", "internet-finance", "now,-5 days"),
|
||||||
|
("cnbc", "originator", 0.15, 601, "domains/internet-finance/z.md", "internet-finance", "now,-5 days"),
|
||||||
|
# newhandle — handle in events but no contributors row (LEFT JOIN COALESCE → person)
|
||||||
|
# -2 days so 24h-window test exclusion is unambiguous (matches alice).
|
||||||
|
("newhandle", "author", 0.30, 700, None, "ai-alignment", "now,-2 days"),
|
||||||
|
]
|
||||||
|
for handle, role, weight, pr_num, claim_path, domain, ts_modifier in events:
|
||||||
|
# Use SQLite datetime() to compute timestamps relative to "now" so tests
|
||||||
|
# are deterministic across days. Multi-arg form: datetime('now', '-1 days').
|
||||||
|
ts_args = ts_modifier.split(",")
|
||||||
|
if len(ts_args) == 1:
|
||||||
|
ts_sql = f"datetime('{ts_args[0]}')"
|
||||||
|
else:
|
||||||
|
ts_sql = f"datetime('{ts_args[0]}', '{ts_args[1].strip()}')"
|
||||||
|
conn.execute(
|
||||||
|
f"""INSERT INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path, domain, timestamp)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, {ts_sql})""",
|
||||||
|
(handle, "agent" if handle in {"rio", "leo"} else "person",
|
||||||
|
role, weight, pr_num, claim_path, domain),
|
||||||
|
)
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
return str(p)
|
||||||
|
|
||||||
|
|
||||||
|
def _call(db_path, **query):
|
||||||
|
"""Build a mocked request, call handle_leaderboard, return parsed JSON."""
|
||||||
|
qs = "&".join(f"{k}={v}" for k, v in query.items())
|
||||||
|
req = make_mocked_request("GET", f"/api/leaderboard?{qs}")
|
||||||
|
# make_mocked_request gives us req.app — write db_path into it.
|
||||||
|
req.app["db_path"] = db_path
|
||||||
|
response = asyncio.run(handle_leaderboard(req))
|
||||||
|
return json.loads(response.body.decode())
|
||||||
|
|
||||||
|
|
||||||
|
# ─── _parse_window unit tests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseWindow:
|
||||||
|
def test_default_is_all_time(self):
|
||||||
|
clause, params, label = _parse_window(None)
|
||||||
|
assert clause == ""
|
||||||
|
assert params == ()
|
||||||
|
assert label == "all_time"
|
||||||
|
|
||||||
|
def test_explicit_all_time(self):
|
||||||
|
clause, params, label = _parse_window("all_time")
|
||||||
|
assert clause == ""
|
||||||
|
assert label == "all_time"
|
||||||
|
|
||||||
|
def test_seven_days(self):
|
||||||
|
clause, params, label = _parse_window("7d")
|
||||||
|
assert clause == "ce.timestamp >= datetime('now', ?)"
|
||||||
|
assert params == ("-7 days",)
|
||||||
|
assert label == "7d"
|
||||||
|
# Regression: must NOT begin with "AND " (handle_leaderboard composes via " AND ".join)
|
||||||
|
assert not clause.startswith("AND")
|
||||||
|
|
||||||
|
def test_thirty_days(self):
|
||||||
|
clause, params, label = _parse_window("30d")
|
||||||
|
assert params == ("-30 days",)
|
||||||
|
assert label == "30d"
|
||||||
|
|
||||||
|
def test_hours(self):
|
||||||
|
clause, params, label = _parse_window("24h")
|
||||||
|
assert clause == "ce.timestamp >= datetime('now', ?)"
|
||||||
|
assert params == ("-24 hours",)
|
||||||
|
assert label == "24h"
|
||||||
|
|
||||||
|
def test_caps_days_at_365(self):
|
||||||
|
clause, params, label = _parse_window("9999d")
|
||||||
|
assert params == ("-365 days",)
|
||||||
|
|
||||||
|
def test_caps_hours_at_8760(self):
|
||||||
|
clause, params, label = _parse_window("99999h")
|
||||||
|
assert params == ("-8760 hours",)
|
||||||
|
|
||||||
|
def test_garbage_falls_to_all_time(self):
|
||||||
|
clause, params, label = _parse_window("foobar")
|
||||||
|
assert clause == ""
|
||||||
|
assert label == "all_time"
|
||||||
|
|
||||||
|
def test_uppercase_normalized(self):
|
||||||
|
clause, params, label = _parse_window("7D")
|
||||||
|
assert label == "7d"
|
||||||
|
|
||||||
|
def test_zero_days_still_emits_clause(self):
|
||||||
|
# 0d means "now or later" — empty result, but parse should succeed
|
||||||
|
clause, params, label = _parse_window("0d")
|
||||||
|
assert "datetime" in clause
|
||||||
|
assert label == "0d"
|
||||||
|
|
||||||
|
|
||||||
|
# ─── handle_leaderboard integration tests ────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestLeaderboardEndpoint:
|
||||||
|
def test_all_time_default_kind_person(self, db_path):
|
||||||
|
"""Default kind is 'person'. Returns all persons, sorted by CI desc."""
|
||||||
|
body = _call(db_path)
|
||||||
|
assert body["window"] == "all_time"
|
||||||
|
assert body["kind_filter"] == "person"
|
||||||
|
assert body["domain"] is None
|
||||||
|
assert body["source"] == "contribution_events"
|
||||||
|
# alice 3*0.30 + 0.15 = 1.05
|
||||||
|
# bob 5*0.30 = 1.50
|
||||||
|
# carol 0.30 + 0.05 = 0.35
|
||||||
|
# newhandle 0.30 (LEFT JOIN COALESCE → 'person')
|
||||||
|
# cnbc excluded (kind='org')
|
||||||
|
# rio/leo excluded (kind='agent')
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert "bob" in handles
|
||||||
|
assert "alice" in handles
|
||||||
|
assert "newhandle" in handles, "LEFT JOIN COALESCE should default missing contributors to 'person'"
|
||||||
|
assert "cnbc" not in handles, "kind=person should exclude orgs"
|
||||||
|
assert "rio" not in handles, "kind=person should exclude agents"
|
||||||
|
# Descending by CI
|
||||||
|
cis = [r["ci"] for r in body["leaderboard"]]
|
||||||
|
assert cis == sorted(cis, reverse=True)
|
||||||
|
|
||||||
|
def test_window_7d_excludes_old_events(self, db_path):
|
||||||
|
"""REGRESSION: 7d window must execute (no AND-prefix SQL error).
|
||||||
|
|
||||||
|
Bob has all events 60d ago → must not appear in 7d window.
|
||||||
|
Alice has events 1-3d ago → must appear.
|
||||||
|
"""
|
||||||
|
body = _call(db_path, window="7d")
|
||||||
|
assert body["window"] == "7d"
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert "alice" in handles
|
||||||
|
assert "bob" not in handles, "60d-old events must be excluded from 7d window"
|
||||||
|
assert "carol" in handles # today
|
||||||
|
|
||||||
|
def test_window_30d_excludes_60d_events(self, db_path):
|
||||||
|
"""REGRESSION: 30d window must execute. Bob (60d) excluded; alice/carol included."""
|
||||||
|
body = _call(db_path, window="30d")
|
||||||
|
assert body["window"] == "30d"
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert "alice" in handles
|
||||||
|
assert "carol" in handles
|
||||||
|
assert "bob" not in handles
|
||||||
|
|
||||||
|
def test_window_24h_only_today(self, db_path):
|
||||||
|
"""24h window picks up today's events only.
|
||||||
|
|
||||||
|
Default kind=person. Within 24h: only carol (events at 'now').
|
||||||
|
Excluded: alice/newhandle (events at -2 days), bob (-60d), rio/leo (kind),
|
||||||
|
cnbc (-5d AND kind=org).
|
||||||
|
"""
|
||||||
|
body = _call(db_path, window="24h")
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert handles == ["carol"], (
|
||||||
|
"24h + kind=person should return only carol; got %r" % handles
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_kind_agent(self, db_path):
|
||||||
|
"""kind=agent returns only agents."""
|
||||||
|
body = _call(db_path, kind="agent")
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert "rio" in handles
|
||||||
|
assert "leo" in handles
|
||||||
|
assert "alice" not in handles
|
||||||
|
assert "bob" not in handles
|
||||||
|
|
||||||
|
def test_kind_org(self, db_path):
|
||||||
|
"""kind=org returns only orgs (legacy events still queryable)."""
|
||||||
|
body = _call(db_path, kind="org")
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert handles == ["cnbc"]
|
||||||
|
assert body["leaderboard"][0]["ci"] == 0.30 # 2 * 0.15
|
||||||
|
|
||||||
|
def test_kind_all_returns_everyone(self, db_path):
|
||||||
|
"""kind=all returns all kinds — persons + agents + orgs."""
|
||||||
|
body = _call(db_path, kind="all")
|
||||||
|
handles = {r["handle"] for r in body["leaderboard"]}
|
||||||
|
assert handles == {"alice", "bob", "carol", "rio", "leo", "cnbc", "newhandle"}
|
||||||
|
|
||||||
|
def test_invalid_kind_falls_to_person(self, db_path):
|
||||||
|
"""Defensive: unknown kind value silently falls back to 'person'."""
|
||||||
|
body = _call(db_path, kind="bogus")
|
||||||
|
assert body["kind_filter"] == "person"
|
||||||
|
|
||||||
|
def test_domain_filter(self, db_path):
|
||||||
|
"""domain=internet-finance scopes events; kind filter still applies."""
|
||||||
|
body = _call(db_path, domain="internet-finance")
|
||||||
|
assert body["domain"] == "internet-finance"
|
||||||
|
handles = {r["handle"] for r in body["leaderboard"]}
|
||||||
|
# alice has 2 internet-finance authors + 1 originator
|
||||||
|
# carol has 1 internet-finance author + 1 evaluator
|
||||||
|
# bob has 0 (all ai-alignment)
|
||||||
|
# newhandle has 0 (ai-alignment only)
|
||||||
|
assert "alice" in handles
|
||||||
|
assert "carol" in handles
|
||||||
|
assert "bob" not in handles
|
||||||
|
assert "newhandle" not in handles
|
||||||
|
|
||||||
|
def test_composed_window_kind_domain(self, db_path):
|
||||||
|
"""REGRESSION: composed filters must build SQL correctly.
|
||||||
|
|
||||||
|
7d + person + internet-finance — alice only.
|
||||||
|
"""
|
||||||
|
body = _call(db_path, window="7d", kind="person", domain="internet-finance")
|
||||||
|
handles = [r["handle"] for r in body["leaderboard"]]
|
||||||
|
assert "alice" in handles
|
||||||
|
assert "carol" in handles
|
||||||
|
assert "bob" not in handles # excluded by 7d
|
||||||
|
assert "rio" not in handles # excluded by kind=person
|
||||||
|
|
||||||
|
def test_limit_caps_results(self, db_path):
|
||||||
|
"""limit caps the leaderboard slice; total reflects unfiltered count."""
|
||||||
|
body = _call(db_path, kind="all", limit=3)
|
||||||
|
assert body["shown"] == 3
|
||||||
|
assert body["has_more"] is True
|
||||||
|
assert body["total"] == 7
|
||||||
|
|
||||||
|
def test_no_has_more_when_under_limit(self, db_path):
|
||||||
|
body = _call(db_path, kind="org")
|
||||||
|
assert body["shown"] == 1
|
||||||
|
assert body["has_more"] is False
|
||||||
|
assert body["total"] == 1
|
||||||
|
|
||||||
|
def test_invalid_limit_falls_to_default(self, db_path):
|
||||||
|
"""Defensive: garbage limit param falls to default 100. 7 entries < 100."""
|
||||||
|
body = _call(db_path, kind="all", limit="not-a-number")
|
||||||
|
assert body["shown"] == 7
|
||||||
|
assert body["has_more"] is False
|
||||||
|
|
||||||
|
def test_limit_capped_at_500(self, db_path):
|
||||||
|
"""Defensive: limit > 500 silently caps at 500."""
|
||||||
|
body = _call(db_path, limit=99999, kind="all")
|
||||||
|
# No assertion on the value of the cap from the response — just that
|
||||||
|
# it doesn't error and shown <= 500.
|
||||||
|
assert body["shown"] <= 500
|
||||||
|
|
||||||
|
def test_role_breakdown_present(self, db_path):
|
||||||
|
"""Each row includes ci_breakdown with all 5 roles."""
|
||||||
|
body = _call(db_path)
|
||||||
|
for entry in body["leaderboard"]:
|
||||||
|
assert set(entry["ci_breakdown"].keys()) == {
|
||||||
|
"author", "challenger", "synthesizer", "originator", "evaluator",
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_alice_role_breakdown_correct(self, db_path):
|
||||||
|
"""Alice has 3 author (0.90) + 1 originator (0.15) = 1.05 total."""
|
||||||
|
body = _call(db_path)
|
||||||
|
alice = next(r for r in body["leaderboard"] if r["handle"] == "alice")
|
||||||
|
assert alice["ci"] == 1.05
|
||||||
|
assert alice["ci_breakdown"]["author"] == 0.90
|
||||||
|
assert alice["ci_breakdown"]["originator"] == 0.15
|
||||||
|
assert alice["ci_breakdown"]["challenger"] == 0
|
||||||
|
assert alice["ci_breakdown"]["synthesizer"] == 0
|
||||||
|
assert alice["ci_breakdown"]["evaluator"] == 0
|
||||||
|
assert alice["events_count"] == 4
|
||||||
|
assert alice["pr_count"] == 4
|
||||||
|
assert alice["domain_count"] == 2 # internet-finance + ai-alignment
|
||||||
|
|
||||||
|
def test_empty_window_returns_clean_response(self, db_path):
|
||||||
|
"""Window with no matching events returns shape-correct empty response."""
|
||||||
|
# 24h window + kind=org → cnbc is 5d ago, so empty
|
||||||
|
body = _call(db_path, window="24h", kind="org")
|
||||||
|
assert body["leaderboard"] == []
|
||||||
|
assert body["total"] == 0
|
||||||
|
assert body["shown"] == 0
|
||||||
|
assert body["has_more"] is False
|
||||||
|
assert body["source"] == "contribution_events"
|
||||||
|
|
||||||
|
def test_left_join_handles_missing_contributors_row(self, db_path):
|
||||||
|
"""REGRESSION: handle in events but missing from contributors must default to kind='person'.
|
||||||
|
|
||||||
|
Catches the failure mode where a handle classified as cited (auto-create
|
||||||
|
deferred to Branch 3) accumulates events but has no contributors row yet.
|
||||||
|
"""
|
||||||
|
body = _call(db_path)
|
||||||
|
newhandle_row = next(
|
||||||
|
(r for r in body["leaderboard"] if r["handle"] == "newhandle"), None
|
||||||
|
)
|
||||||
|
assert newhandle_row is not None
|
||||||
|
assert newhandle_row["kind"] == "person"
|
||||||
|
assert newhandle_row["ci"] == 0.30
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Public path constant (auth middleware bypass) ───────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_public_paths_includes_leaderboard():
|
||||||
|
"""Auth middleware needs LEADERBOARD_PUBLIC_PATHS to skip API key for /api/leaderboard."""
|
||||||
|
assert "/api/leaderboard" in LEADERBOARD_PUBLIC_PATHS
|
||||||
|
|
||||||
|
|
||||||
|
def test_kind_values_matches_contract():
|
||||||
|
"""API contract: only these 4 kind values are accepted."""
|
||||||
|
assert set(KIND_VALUES) == {"person", "agent", "org", "all"}
|
||||||
167
tests/test_research_backfill_idempotent.py
Normal file
167
tests/test_research_backfill_idempotent.py
Normal file
|
|
@ -0,0 +1,167 @@
|
||||||
|
"""Verify research-attribution backfill is replay-safe against real schema.
|
||||||
|
|
||||||
|
Three things to prove:
|
||||||
|
1. (handle, role, pr_number) with claim_path=NULL deduplicates correctly
|
||||||
|
(idx_ce_unique_pr partial index handles SQLite NULL-not-equal-NULL).
|
||||||
|
2. Re-inserting an existing (handle, role, pr_number, NULL) row via INSERT OR IGNORE
|
||||||
|
is a true no-op — does not create a phantom duplicate.
|
||||||
|
3. The backfill script's specific operation (DELETE then INSERT for same key)
|
||||||
|
nets zero rows when run twice in sequence.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Schema lifted verbatim from lib/db.py:181-209
|
||||||
|
SCHEMA = """
|
||||||
|
CREATE TABLE contribution_events (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
handle TEXT NOT NULL,
|
||||||
|
kind TEXT NOT NULL DEFAULT 'person',
|
||||||
|
role TEXT NOT NULL,
|
||||||
|
weight REAL NOT NULL,
|
||||||
|
pr_number INTEGER NOT NULL,
|
||||||
|
claim_path TEXT,
|
||||||
|
domain TEXT,
|
||||||
|
channel TEXT,
|
||||||
|
timestamp TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX idx_ce_unique_claim ON contribution_events(
|
||||||
|
handle, role, pr_number, claim_path
|
||||||
|
) WHERE claim_path IS NOT NULL;
|
||||||
|
CREATE UNIQUE INDEX idx_ce_unique_pr ON contribution_events(
|
||||||
|
handle, role, pr_number
|
||||||
|
) WHERE claim_path IS NULL;
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def setup() -> sqlite3.Connection:
|
||||||
|
conn = sqlite3.connect(":memory:")
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
conn.executescript(SCHEMA)
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def insert_event(conn, handle, role, pr_number, claim_path=None):
|
||||||
|
cur = conn.execute(
|
||||||
|
"""INSERT OR IGNORE INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path)
|
||||||
|
VALUES (?, 'agent', ?, 0.30, ?, ?)""",
|
||||||
|
(handle, role, pr_number, claim_path),
|
||||||
|
)
|
||||||
|
return cur.rowcount
|
||||||
|
|
||||||
|
|
||||||
|
def count(conn) -> int:
|
||||||
|
return conn.execute("SELECT COUNT(*) FROM contribution_events").fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_pr_level_dedup_with_null_claim_path():
|
||||||
|
"""Two inserts of same (handle, role, pr_number, NULL) → 1 row."""
|
||||||
|
conn = setup()
|
||||||
|
r1 = insert_event(conn, "rio", "author", 4061)
|
||||||
|
r2 = insert_event(conn, "rio", "author", 4061)
|
||||||
|
n = count(conn)
|
||||||
|
assert r1 == 1, f"first insert should write, got rowcount={r1}"
|
||||||
|
assert r2 == 0, f"second insert should be ignored, got rowcount={r2}"
|
||||||
|
assert n == 1, f"expected 1 row, got {n}"
|
||||||
|
print("PASS: pr-level dedup with NULL claim_path")
|
||||||
|
|
||||||
|
|
||||||
|
def test_per_claim_dedup_with_path():
|
||||||
|
"""Two inserts of same (handle, role, pr_number, path) → 1 row."""
|
||||||
|
conn = setup()
|
||||||
|
r1 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
|
||||||
|
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
|
||||||
|
n = count(conn)
|
||||||
|
assert r1 == 1 and r2 == 0 and n == 1
|
||||||
|
print("PASS: per-claim dedup with claim_path")
|
||||||
|
|
||||||
|
|
||||||
|
def test_pr_level_and_per_claim_coexist():
|
||||||
|
"""A (handle, role, pr_number, NULL) and (handle, role, pr_number, 'x.md') coexist
|
||||||
|
because the partial indexes target different rows."""
|
||||||
|
conn = setup()
|
||||||
|
r1 = insert_event(conn, "rio", "author", 4061, claim_path=None)
|
||||||
|
r2 = insert_event(conn, "rio", "author", 4061, claim_path="domains/x.md")
|
||||||
|
n = count(conn)
|
||||||
|
assert r1 == 1 and r2 == 1 and n == 2
|
||||||
|
print("PASS: pr-level and per-claim events coexist on same pr_number")
|
||||||
|
|
||||||
|
|
||||||
|
def test_backfill_replay_is_noop():
|
||||||
|
"""Simulate the exact backfill operation: INSERT correct event, DELETE wrong event.
|
||||||
|
Run twice. Expect identical state — no phantom rows, no double-deletions."""
|
||||||
|
conn = setup()
|
||||||
|
|
||||||
|
# Initial state: m3taversal has the wrong author event for pr=4061
|
||||||
|
insert_event(conn, "m3taversal", "author", 4061)
|
||||||
|
assert count(conn) == 1
|
||||||
|
|
||||||
|
def backfill_pr_4061():
|
||||||
|
# Insert the correct event (rio is the real author)
|
||||||
|
conn.execute(
|
||||||
|
"""INSERT OR IGNORE INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path)
|
||||||
|
VALUES (?, 'agent', 'author', 0.30, 4061, NULL)""",
|
||||||
|
("rio (self-directed)",),
|
||||||
|
)
|
||||||
|
# Delete the wrong event
|
||||||
|
conn.execute(
|
||||||
|
"""DELETE FROM contribution_events
|
||||||
|
WHERE handle='m3taversal' AND role='author'
|
||||||
|
AND pr_number=4061 AND claim_path IS NULL""",
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
backfill_pr_4061()
|
||||||
|
state_after_first = sorted(
|
||||||
|
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
|
||||||
|
for r in conn.execute("SELECT * FROM contribution_events")
|
||||||
|
)
|
||||||
|
assert state_after_first == [("rio (self-directed)", "author", 4061, None)], state_after_first
|
||||||
|
|
||||||
|
# Replay
|
||||||
|
backfill_pr_4061()
|
||||||
|
state_after_second = sorted(
|
||||||
|
(r["handle"], r["role"], r["pr_number"], r["claim_path"])
|
||||||
|
for r in conn.execute("SELECT * FROM contribution_events")
|
||||||
|
)
|
||||||
|
assert state_after_first == state_after_second, "replay should be idempotent"
|
||||||
|
assert count(conn) == 1, f"expected 1 row after replay, got {count(conn)}"
|
||||||
|
print("PASS: backfill replay is a true no-op")
|
||||||
|
|
||||||
|
|
||||||
|
def test_replay_against_already_backfilled_pr_does_not_double_delete():
|
||||||
|
"""If m3taversal event was already deleted, running backfill again must not error
|
||||||
|
or affect anything else."""
|
||||||
|
conn = setup()
|
||||||
|
# Already-correct state: rio has the author event, m3taversal does not
|
||||||
|
insert_event(conn, "rio (self-directed)", "author", 4061)
|
||||||
|
insert_event(conn, "leo", "evaluator", 4061) # noise — should not be touched
|
||||||
|
|
||||||
|
# Run backfill: tries to INSERT (rio, author, 4061) — already exists, no-op
|
||||||
|
# Tries to DELETE (m3taversal, author, 4061) — already absent, 0 rows affected
|
||||||
|
cur1 = conn.execute(
|
||||||
|
"""INSERT OR IGNORE INTO contribution_events
|
||||||
|
(handle, kind, role, weight, pr_number, claim_path)
|
||||||
|
VALUES ('rio (self-directed)', 'agent', 'author', 0.30, 4061, NULL)""",
|
||||||
|
)
|
||||||
|
cur2 = conn.execute(
|
||||||
|
"""DELETE FROM contribution_events
|
||||||
|
WHERE handle='m3taversal' AND role='author'
|
||||||
|
AND pr_number=4061 AND claim_path IS NULL""",
|
||||||
|
)
|
||||||
|
assert cur1.rowcount == 0, f"insert should be no-op, got {cur1.rowcount}"
|
||||||
|
assert cur2.rowcount == 0, f"delete should be no-op, got {cur2.rowcount}"
|
||||||
|
assert count(conn) == 2, f"expected 2 rows preserved, got {count(conn)}"
|
||||||
|
print("PASS: replay against already-backfilled state preserves unrelated events")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_pr_level_dedup_with_null_claim_path()
|
||||||
|
test_per_claim_dedup_with_path()
|
||||||
|
test_pr_level_and_per_claim_coexist()
|
||||||
|
test_backfill_replay_is_noop()
|
||||||
|
test_replay_against_already_backfilled_pr_does_not_double_delete()
|
||||||
|
print("\nAll 5 tests passed against real schema.")
|
||||||
Loading…
Reference in a new issue