Some checks are pending
CI / lint-and-test (push) Waiting to run
de7e5ec landed leaderboard_routes.py + the route file's register fn but
the import + register_leaderboard_routes(app) call + auth-middleware
allowlist were never added to app.py — endpoint returned 404 in production.
Three minimal edits to app.py mirror the existing register_*_routes pattern
(import at line 28, allowlist OR-clause at line 512, register call at 2365).
Plus a SQL bug in _parse_window: rolling-window clauses prefixed "AND "
but the WHERE composition uses " AND ".join(...), producing
"WHERE 1=1 AND AND ce.timestamp..." → sqlite3.OperationalError on every
window=Nd / window=Nh request. Stripped the prefix and added a comment so
the asymmetry doesn't bite again.
Verified on VPS:
GET /api/leaderboard?window=all_time&kind=person → 200, 11 rows
GET /api/leaderboard?window=7d&kind=person → 200, 2 rows
GET /api/leaderboard?window=30d&kind=person → 200, 9 rows
GET /api/leaderboard?domain=internet-finance → 200, 3 rows
GET /api/leaderboard?kind=agent → 200, leo/rio/clay/astra/vida
Unblocks: Argus dashboard cutover, Oberon column reorder, Leo's CI
taxonomy broadcast.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
166 lines
6.2 KiB
Python
166 lines
6.2 KiB
Python
"""Leaderboard endpoint reading from event-sourced contribution_events.
|
|
|
|
Owner: Argus
|
|
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
|
|
|
|
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
|
|
joins contributors for kind, returns sorted leaderboard with role breakdown.
|
|
|
|
Roles + weights (Phase A):
|
|
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
|
|
|
|
Endpoints:
|
|
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
|
|
from aiohttp import web
|
|
|
|
logger = logging.getLogger("argus.leaderboard_routes")
|
|
|
|
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
|
|
KIND_VALUES = ("person", "agent", "org", "all")
|
|
|
|
# Public path set so auth middleware lets it through
|
|
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
|
|
|
|
|
|
def _conn(app):
|
|
"""Read-only connection to pipeline.db."""
|
|
db_path = app["db_path"]
|
|
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _parse_window(raw):
|
|
"""Parse window param. Returns (sql_clause, params_tuple, label).
|
|
|
|
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
|
|
Caps N at 365d / 8760h to prevent abuse.
|
|
"""
|
|
if not raw or raw == "all_time":
|
|
return ("", (), "all_time")
|
|
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
|
|
if not m:
|
|
return ("", (), "all_time")
|
|
n = int(m.group(1))
|
|
unit = m.group(2)
|
|
# Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ".
|
|
if unit == "d":
|
|
n = min(n, 365)
|
|
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
|
|
n = min(n, 8760)
|
|
return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
|
|
|
|
|
|
async def handle_leaderboard(request):
|
|
"""GET /api/leaderboard.
|
|
|
|
Query params:
|
|
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
|
|
domain: filter by domain (optional)
|
|
kind: 'person' (default) | 'agent' | 'org' | 'all'
|
|
limit: max entries (default 100, max 500)
|
|
"""
|
|
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
|
|
domain = request.query.get("domain")
|
|
kind = request.query.get("kind", "person")
|
|
if kind not in KIND_VALUES:
|
|
kind = "person"
|
|
try:
|
|
limit = min(int(request.query.get("limit", "100")), 500)
|
|
except (ValueError, TypeError):
|
|
limit = 100
|
|
|
|
where = ["1=1", window_clause] if window_clause else ["1=1"]
|
|
params = list(window_params)
|
|
if domain:
|
|
where.append("ce.domain = ?")
|
|
params.append(domain)
|
|
if kind != "all":
|
|
where.append("COALESCE(c.kind, 'person') = ?")
|
|
params.append(kind)
|
|
|
|
where_sql = " AND ".join([w for w in where if w])
|
|
|
|
conn = _conn(request.app)
|
|
try:
|
|
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
|
|
# LEFT JOIN contributors so handles in events but not in contributors still appear
|
|
# (defaults to kind='person' via COALESCE).
|
|
rows = conn.execute(f"""
|
|
SELECT
|
|
ce.handle,
|
|
COALESCE(c.kind, 'person') AS kind,
|
|
ROUND(SUM(ce.weight), 4) AS ci,
|
|
COUNT(*) AS events_count,
|
|
MIN(ce.timestamp) AS first_contribution,
|
|
MAX(ce.timestamp) AS last_contribution,
|
|
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
|
|
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
|
|
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
|
|
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
|
|
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
|
|
COUNT(DISTINCT ce.domain) AS domain_count,
|
|
COUNT(DISTINCT ce.pr_number) AS pr_count
|
|
FROM contribution_events ce
|
|
LEFT JOIN contributors c ON c.handle = ce.handle
|
|
WHERE {where_sql}
|
|
GROUP BY ce.handle, COALESCE(c.kind, 'person')
|
|
ORDER BY ci DESC, last_contribution DESC
|
|
LIMIT ?
|
|
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
|
|
|
|
has_more = len(rows) > limit
|
|
rows = rows[:limit]
|
|
|
|
# Total count of distinct handles matching filters (without limit)
|
|
total_row = conn.execute(f"""
|
|
SELECT COUNT(DISTINCT ce.handle) AS total
|
|
FROM contribution_events ce
|
|
LEFT JOIN contributors c ON c.handle = ce.handle
|
|
WHERE {where_sql}
|
|
""", params).fetchone()
|
|
total = total_row["total"] if total_row else 0
|
|
|
|
leaderboard = []
|
|
for r in rows:
|
|
leaderboard.append({
|
|
"handle": r["handle"],
|
|
"kind": r["kind"],
|
|
"ci": r["ci"],
|
|
"ci_breakdown": {
|
|
"author": round(r["ci_author"] or 0, 4),
|
|
"challenger": round(r["ci_challenger"] or 0, 4),
|
|
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
|
|
"originator": round(r["ci_originator"] or 0, 4),
|
|
"evaluator": round(r["ci_evaluator"] or 0, 4),
|
|
},
|
|
"events_count": r["events_count"],
|
|
"domain_count": r["domain_count"],
|
|
"pr_count": r["pr_count"],
|
|
"first_contribution": r["first_contribution"],
|
|
"last_contribution": r["last_contribution"],
|
|
})
|
|
|
|
return web.json_response({
|
|
"window": window_label,
|
|
"domain": domain,
|
|
"kind_filter": kind,
|
|
"total": total,
|
|
"shown": len(leaderboard),
|
|
"has_more": has_more,
|
|
"source": "contribution_events", # explicit so consumers know the data origin
|
|
"leaderboard": leaderboard,
|
|
})
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def register_leaderboard_routes(app: web.Application):
|
|
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
|
|
app.router.add_get("/api/leaderboard", handle_leaderboard)
|