From de7e5ec709e0acd33199339d4524e5c33465e49c Mon Sep 17 00:00:00 2001 From: m3taversal Date: Mon, 27 Apr 2026 13:16:41 +0100 Subject: [PATCH] feat(diagnostics): /api/leaderboard reads contribution_events directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New endpoint replaces the legacy /api/contributors *_count read path with event-sourced reads from the Phase A contribution_events ledger. - Params: window (all_time | Nd | Nh), kind (person | agent | org | all), domain (filter), limit (default 100, max 500) - Returns per-handle CI, full role breakdown (author/challenger/synthesizer/ originator/evaluator), events_count, pr_count, first/last contribution - ORDER BY ci DESC, last_contribution DESC — recent contributors break ties - Read-only sqlite URI; total/has_more computed for paginated UIs Wiring (import + register + _PUBLIC_PATHS entry) currently applied to live app.py on VPS only — repo app.py has drift from Ship's uncommitted /api/search POST contract. Next deploy.sh round-trip needs both to land together. Co-Authored-By: Claude Opus 4.7 (1M context) --- diagnostics/leaderboard_routes.py | 165 ++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 diagnostics/leaderboard_routes.py diff --git a/diagnostics/leaderboard_routes.py b/diagnostics/leaderboard_routes.py new file mode 100644 index 0000000..433b4a6 --- /dev/null +++ b/diagnostics/leaderboard_routes.py @@ -0,0 +1,165 @@ +"""Leaderboard endpoint reading from event-sourced contribution_events. + +Owner: Argus +Source of truth: pipeline.db contribution_events (Epimetheus, schema v25) + +Reads contribution_events GROUP BY handle, computes CI as SUM(weight), +joins contributors for kind, returns sorted leaderboard with role breakdown. + +Roles + weights (Phase A): + author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05 + +Endpoints: + GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100 +""" + +import logging +import re +import sqlite3 + +from aiohttp import web + +logger = logging.getLogger("argus.leaderboard_routes") + +ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator") +KIND_VALUES = ("person", "agent", "org", "all") + +# Public path set so auth middleware lets it through +LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"}) + + +def _conn(app): + """Read-only connection to pipeline.db.""" + db_path = app["db_path"] + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + return conn + + +def _parse_window(raw): + """Parse window param. Returns (sql_clause, params_tuple, label). + + Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours). + Caps N at 365d / 8760h to prevent abuse. + """ + if not raw or raw == "all_time": + return ("", (), "all_time") + m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower()) + if not m: + return ("", (), "all_time") + n = int(m.group(1)) + unit = m.group(2) + if unit == "d": + n = min(n, 365) + return ("AND ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d") + n = min(n, 8760) + return ("AND ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h") + + +async def handle_leaderboard(request): + """GET /api/leaderboard. + + Query params: + window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h') + domain: filter by domain (optional) + kind: 'person' (default) | 'agent' | 'org' | 'all' + limit: max entries (default 100, max 500) + """ + window_clause, window_params, window_label = _parse_window(request.query.get("window")) + domain = request.query.get("domain") + kind = request.query.get("kind", "person") + if kind not in KIND_VALUES: + kind = "person" + try: + limit = min(int(request.query.get("limit", "100")), 500) + except (ValueError, TypeError): + limit = 100 + + where = ["1=1", window_clause] if window_clause else ["1=1"] + params = list(window_params) + if domain: + where.append("ce.domain = ?") + params.append(domain) + if kind != "all": + where.append("COALESCE(c.kind, 'person') = ?") + params.append(kind) + + where_sql = " AND ".join([w for w in where if w]) + + conn = _conn(request.app) + try: + # Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp + # LEFT JOIN contributors so handles in events but not in contributors still appear + # (defaults to kind='person' via COALESCE). + rows = conn.execute(f""" + SELECT + ce.handle, + COALESCE(c.kind, 'person') AS kind, + ROUND(SUM(ce.weight), 4) AS ci, + COUNT(*) AS events_count, + MIN(ce.timestamp) AS first_contribution, + MAX(ce.timestamp) AS last_contribution, + SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author, + SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger, + SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer, + SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator, + SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator, + COUNT(DISTINCT ce.domain) AS domain_count, + COUNT(DISTINCT ce.pr_number) AS pr_count + FROM contribution_events ce + LEFT JOIN contributors c ON c.handle = ce.handle + WHERE {where_sql} + GROUP BY ce.handle, COALESCE(c.kind, 'person') + ORDER BY ci DESC, last_contribution DESC + LIMIT ? + """, (*params, limit + 1)).fetchall() # +1 to detect overflow + + has_more = len(rows) > limit + rows = rows[:limit] + + # Total count of distinct handles matching filters (without limit) + total_row = conn.execute(f""" + SELECT COUNT(DISTINCT ce.handle) AS total + FROM contribution_events ce + LEFT JOIN contributors c ON c.handle = ce.handle + WHERE {where_sql} + """, params).fetchone() + total = total_row["total"] if total_row else 0 + + leaderboard = [] + for r in rows: + leaderboard.append({ + "handle": r["handle"], + "kind": r["kind"], + "ci": r["ci"], + "ci_breakdown": { + "author": round(r["ci_author"] or 0, 4), + "challenger": round(r["ci_challenger"] or 0, 4), + "synthesizer": round(r["ci_synthesizer"] or 0, 4), + "originator": round(r["ci_originator"] or 0, 4), + "evaluator": round(r["ci_evaluator"] or 0, 4), + }, + "events_count": r["events_count"], + "domain_count": r["domain_count"], + "pr_count": r["pr_count"], + "first_contribution": r["first_contribution"], + "last_contribution": r["last_contribution"], + }) + + return web.json_response({ + "window": window_label, + "domain": domain, + "kind_filter": kind, + "total": total, + "shown": len(leaderboard), + "has_more": has_more, + "source": "contribution_events", # explicit so consumers know the data origin + "leaderboard": leaderboard, + }) + finally: + conn.close() + + +def register_leaderboard_routes(app: web.Application): + """Register /api/leaderboard. Requires app['db_path'] to be set.""" + app.router.add_get("/api/leaderboard", handle_leaderboard)