"""Leaderboard endpoint reading from event-sourced contribution_events. Owner: Argus Source of truth: pipeline.db contribution_events (Epimetheus, schema v25) Reads contribution_events GROUP BY handle, computes CI as SUM(weight), joins contributors for kind, returns sorted leaderboard with role breakdown. Roles + weights (Phase A): author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05 Endpoints: GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100 """ import logging import re import sqlite3 from aiohttp import web logger = logging.getLogger("argus.leaderboard_routes") ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator") KIND_VALUES = ("person", "agent", "org", "all") # Public path set so auth middleware lets it through LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"}) def _conn(app): """Read-only connection to pipeline.db.""" db_path = app["db_path"] conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn def _parse_window(raw): """Parse window param. Returns (sql_clause, params_tuple, label). Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours). Caps N at 365d / 8760h to prevent abuse. """ if not raw or raw == "all_time": return ("", (), "all_time") m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower()) if not m: return ("", (), "all_time") n = int(m.group(1)) unit = m.group(2) # Note: WHERE clause is composed via " AND ".join(...) — do NOT prefix with "AND ". if unit == "d": n = min(n, 365) return ("ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d") n = min(n, 8760) return ("ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h") async def handle_leaderboard(request): """GET /api/leaderboard. Query params: window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h') domain: filter by domain (optional) kind: 'person' (default) | 'agent' | 'org' | 'all' limit: max entries (default 100, max 500) """ window_clause, window_params, window_label = _parse_window(request.query.get("window")) domain = request.query.get("domain") kind = request.query.get("kind", "person") if kind not in KIND_VALUES: kind = "person" try: limit = min(int(request.query.get("limit", "100")), 500) except (ValueError, TypeError): limit = 100 where = ["1=1", window_clause] if window_clause else ["1=1"] params = list(window_params) if domain: where.append("ce.domain = ?") params.append(domain) if kind != "all": where.append("COALESCE(c.kind, 'person') = ?") params.append(kind) where_sql = " AND ".join([w for w in where if w]) conn = _conn(request.app) try: # Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp # LEFT JOIN contributors so handles in events but not in contributors still appear # (defaults to kind='person' via COALESCE). rows = conn.execute(f""" SELECT ce.handle, COALESCE(c.kind, 'person') AS kind, ROUND(SUM(ce.weight), 4) AS ci, COUNT(*) AS events_count, MIN(ce.timestamp) AS first_contribution, MAX(ce.timestamp) AS last_contribution, SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author, SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger, SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer, SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator, SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator, COUNT(DISTINCT ce.domain) AS domain_count, COUNT(DISTINCT ce.pr_number) AS pr_count FROM contribution_events ce LEFT JOIN contributors c ON c.handle = ce.handle WHERE {where_sql} GROUP BY ce.handle, COALESCE(c.kind, 'person') ORDER BY ci DESC, last_contribution DESC LIMIT ? """, (*params, limit + 1)).fetchall() # +1 to detect overflow has_more = len(rows) > limit rows = rows[:limit] # Total count of distinct handles matching filters (without limit) total_row = conn.execute(f""" SELECT COUNT(DISTINCT ce.handle) AS total FROM contribution_events ce LEFT JOIN contributors c ON c.handle = ce.handle WHERE {where_sql} """, params).fetchone() total = total_row["total"] if total_row else 0 leaderboard = [] for r in rows: leaderboard.append({ "handle": r["handle"], "kind": r["kind"], "ci": r["ci"], "ci_breakdown": { "author": round(r["ci_author"] or 0, 4), "challenger": round(r["ci_challenger"] or 0, 4), "synthesizer": round(r["ci_synthesizer"] or 0, 4), "originator": round(r["ci_originator"] or 0, 4), "evaluator": round(r["ci_evaluator"] or 0, 4), }, "events_count": r["events_count"], "domain_count": r["domain_count"], "pr_count": r["pr_count"], "first_contribution": r["first_contribution"], "last_contribution": r["last_contribution"], }) return web.json_response({ "window": window_label, "domain": domain, "kind_filter": kind, "total": total, "shown": len(leaderboard), "has_more": has_more, "source": "contribution_events", # explicit so consumers know the data origin "leaderboard": leaderboard, }) finally: conn.close() def register_leaderboard_routes(app: web.Application): """Register /api/leaderboard. Requires app['db_path'] to be set.""" app.router.add_get("/api/leaderboard", handle_leaderboard)