feat(diagnostics): /api/leaderboard reads contribution_events directly
Some checks are pending
CI / lint-and-test (push) Waiting to run

New endpoint replaces the legacy /api/contributors *_count read path with
event-sourced reads from the Phase A contribution_events ledger.

- Params: window (all_time | Nd | Nh), kind (person | agent | org | all),
  domain (filter), limit (default 100, max 500)
- Returns per-handle CI, full role breakdown (author/challenger/synthesizer/
  originator/evaluator), events_count, pr_count, first/last contribution
- ORDER BY ci DESC, last_contribution DESC — recent contributors break ties
- Read-only sqlite URI; total/has_more computed for paginated UIs

Wiring (import + register + _PUBLIC_PATHS entry) currently applied to live
app.py on VPS only — repo app.py has drift from Ship's uncommitted /api/search
POST contract. Next deploy.sh round-trip needs both to land together.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-27 13:16:41 +01:00
parent 369f6c96da
commit de7e5ec709

View file

@ -0,0 +1,165 @@
"""Leaderboard endpoint reading from event-sourced contribution_events.
Owner: Argus
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
joins contributors for kind, returns sorted leaderboard with role breakdown.
Roles + weights (Phase A):
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
Endpoints:
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
"""
import logging
import re
import sqlite3
from aiohttp import web
logger = logging.getLogger("argus.leaderboard_routes")
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
KIND_VALUES = ("person", "agent", "org", "all")
# Public path set so auth middleware lets it through
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
def _parse_window(raw):
"""Parse window param. Returns (sql_clause, params_tuple, label).
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
Caps N at 365d / 8760h to prevent abuse.
"""
if not raw or raw == "all_time":
return ("", (), "all_time")
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
if not m:
return ("", (), "all_time")
n = int(m.group(1))
unit = m.group(2)
if unit == "d":
n = min(n, 365)
return ("AND ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
n = min(n, 8760)
return ("AND ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
async def handle_leaderboard(request):
"""GET /api/leaderboard.
Query params:
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
domain: filter by domain (optional)
kind: 'person' (default) | 'agent' | 'org' | 'all'
limit: max entries (default 100, max 500)
"""
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
domain = request.query.get("domain")
kind = request.query.get("kind", "person")
if kind not in KIND_VALUES:
kind = "person"
try:
limit = min(int(request.query.get("limit", "100")), 500)
except (ValueError, TypeError):
limit = 100
where = ["1=1", window_clause] if window_clause else ["1=1"]
params = list(window_params)
if domain:
where.append("ce.domain = ?")
params.append(domain)
if kind != "all":
where.append("COALESCE(c.kind, 'person') = ?")
params.append(kind)
where_sql = " AND ".join([w for w in where if w])
conn = _conn(request.app)
try:
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
# LEFT JOIN contributors so handles in events but not in contributors still appear
# (defaults to kind='person' via COALESCE).
rows = conn.execute(f"""
SELECT
ce.handle,
COALESCE(c.kind, 'person') AS kind,
ROUND(SUM(ce.weight), 4) AS ci,
COUNT(*) AS events_count,
MIN(ce.timestamp) AS first_contribution,
MAX(ce.timestamp) AS last_contribution,
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
COUNT(DISTINCT ce.domain) AS domain_count,
COUNT(DISTINCT ce.pr_number) AS pr_count
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
GROUP BY ce.handle, COALESCE(c.kind, 'person')
ORDER BY ci DESC, last_contribution DESC
LIMIT ?
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
has_more = len(rows) > limit
rows = rows[:limit]
# Total count of distinct handles matching filters (without limit)
total_row = conn.execute(f"""
SELECT COUNT(DISTINCT ce.handle) AS total
FROM contribution_events ce
LEFT JOIN contributors c ON c.handle = ce.handle
WHERE {where_sql}
""", params).fetchone()
total = total_row["total"] if total_row else 0
leaderboard = []
for r in rows:
leaderboard.append({
"handle": r["handle"],
"kind": r["kind"],
"ci": r["ci"],
"ci_breakdown": {
"author": round(r["ci_author"] or 0, 4),
"challenger": round(r["ci_challenger"] or 0, 4),
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
"originator": round(r["ci_originator"] or 0, 4),
"evaluator": round(r["ci_evaluator"] or 0, 4),
},
"events_count": r["events_count"],
"domain_count": r["domain_count"],
"pr_count": r["pr_count"],
"first_contribution": r["first_contribution"],
"last_contribution": r["last_contribution"],
})
return web.json_response({
"window": window_label,
"domain": domain,
"kind_filter": kind,
"total": total,
"shown": len(leaderboard),
"has_more": has_more,
"source": "contribution_events", # explicit so consumers know the data origin
"leaderboard": leaderboard,
})
finally:
conn.close()
def register_leaderboard_routes(app: web.Application):
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
app.router.add_get("/api/leaderboard", handle_leaderboard)