feat(diagnostics): /api/leaderboard reads contribution_events directly
Some checks are pending
CI / lint-and-test (push) Waiting to run
Some checks are pending
CI / lint-and-test (push) Waiting to run
New endpoint replaces the legacy /api/contributors *_count read path with event-sourced reads from the Phase A contribution_events ledger. - Params: window (all_time | Nd | Nh), kind (person | agent | org | all), domain (filter), limit (default 100, max 500) - Returns per-handle CI, full role breakdown (author/challenger/synthesizer/ originator/evaluator), events_count, pr_count, first/last contribution - ORDER BY ci DESC, last_contribution DESC — recent contributors break ties - Read-only sqlite URI; total/has_more computed for paginated UIs Wiring (import + register + _PUBLIC_PATHS entry) currently applied to live app.py on VPS only — repo app.py has drift from Ship's uncommitted /api/search POST contract. Next deploy.sh round-trip needs both to land together. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
369f6c96da
commit
de7e5ec709
1 changed files with 165 additions and 0 deletions
165
diagnostics/leaderboard_routes.py
Normal file
165
diagnostics/leaderboard_routes.py
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
"""Leaderboard endpoint reading from event-sourced contribution_events.
|
||||
|
||||
Owner: Argus
|
||||
Source of truth: pipeline.db contribution_events (Epimetheus, schema v25)
|
||||
|
||||
Reads contribution_events GROUP BY handle, computes CI as SUM(weight),
|
||||
joins contributors for kind, returns sorted leaderboard with role breakdown.
|
||||
|
||||
Roles + weights (Phase A):
|
||||
author 0.30 | challenger 0.25 | synthesizer 0.20 | originator 0.15 | evaluator 0.05
|
||||
|
||||
Endpoints:
|
||||
GET /api/leaderboard?window=all_time|Nd|Nh&domain=&kind=person|agent|org|all&limit=100
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
logger = logging.getLogger("argus.leaderboard_routes")
|
||||
|
||||
ROLE_KEYS = ("author", "challenger", "synthesizer", "originator", "evaluator")
|
||||
KIND_VALUES = ("person", "agent", "org", "all")
|
||||
|
||||
# Public path set so auth middleware lets it through
|
||||
LEADERBOARD_PUBLIC_PATHS = frozenset({"/api/leaderboard"})
|
||||
|
||||
|
||||
def _conn(app):
|
||||
"""Read-only connection to pipeline.db."""
|
||||
db_path = app["db_path"]
|
||||
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def _parse_window(raw):
|
||||
"""Parse window param. Returns (sql_clause, params_tuple, label).
|
||||
|
||||
Accepts: 'all_time' (default), 'Nd' (last N days), 'Nh' (last N hours).
|
||||
Caps N at 365d / 8760h to prevent abuse.
|
||||
"""
|
||||
if not raw or raw == "all_time":
|
||||
return ("", (), "all_time")
|
||||
m = re.fullmatch(r"(\d+)([dh])", raw.strip().lower())
|
||||
if not m:
|
||||
return ("", (), "all_time")
|
||||
n = int(m.group(1))
|
||||
unit = m.group(2)
|
||||
if unit == "d":
|
||||
n = min(n, 365)
|
||||
return ("AND ce.timestamp >= datetime('now', ?)", (f"-{n} days",), f"{n}d")
|
||||
n = min(n, 8760)
|
||||
return ("AND ce.timestamp >= datetime('now', ?)", (f"-{n} hours",), f"{n}h")
|
||||
|
||||
|
||||
async def handle_leaderboard(request):
|
||||
"""GET /api/leaderboard.
|
||||
|
||||
Query params:
|
||||
window: 'all_time' (default) | 'Nd' (e.g. '7d') | 'Nh' (e.g. '24h')
|
||||
domain: filter by domain (optional)
|
||||
kind: 'person' (default) | 'agent' | 'org' | 'all'
|
||||
limit: max entries (default 100, max 500)
|
||||
"""
|
||||
window_clause, window_params, window_label = _parse_window(request.query.get("window"))
|
||||
domain = request.query.get("domain")
|
||||
kind = request.query.get("kind", "person")
|
||||
if kind not in KIND_VALUES:
|
||||
kind = "person"
|
||||
try:
|
||||
limit = min(int(request.query.get("limit", "100")), 500)
|
||||
except (ValueError, TypeError):
|
||||
limit = 100
|
||||
|
||||
where = ["1=1", window_clause] if window_clause else ["1=1"]
|
||||
params = list(window_params)
|
||||
if domain:
|
||||
where.append("ce.domain = ?")
|
||||
params.append(domain)
|
||||
if kind != "all":
|
||||
where.append("COALESCE(c.kind, 'person') = ?")
|
||||
params.append(kind)
|
||||
|
||||
where_sql = " AND ".join([w for w in where if w])
|
||||
|
||||
conn = _conn(request.app)
|
||||
try:
|
||||
# Aggregate per handle: total CI, per-role breakdown, event count, first/last timestamp
|
||||
# LEFT JOIN contributors so handles in events but not in contributors still appear
|
||||
# (defaults to kind='person' via COALESCE).
|
||||
rows = conn.execute(f"""
|
||||
SELECT
|
||||
ce.handle,
|
||||
COALESCE(c.kind, 'person') AS kind,
|
||||
ROUND(SUM(ce.weight), 4) AS ci,
|
||||
COUNT(*) AS events_count,
|
||||
MIN(ce.timestamp) AS first_contribution,
|
||||
MAX(ce.timestamp) AS last_contribution,
|
||||
SUM(CASE WHEN ce.role='author' THEN ce.weight ELSE 0 END) AS ci_author,
|
||||
SUM(CASE WHEN ce.role='challenger' THEN ce.weight ELSE 0 END) AS ci_challenger,
|
||||
SUM(CASE WHEN ce.role='synthesizer' THEN ce.weight ELSE 0 END) AS ci_synthesizer,
|
||||
SUM(CASE WHEN ce.role='originator' THEN ce.weight ELSE 0 END) AS ci_originator,
|
||||
SUM(CASE WHEN ce.role='evaluator' THEN ce.weight ELSE 0 END) AS ci_evaluator,
|
||||
COUNT(DISTINCT ce.domain) AS domain_count,
|
||||
COUNT(DISTINCT ce.pr_number) AS pr_count
|
||||
FROM contribution_events ce
|
||||
LEFT JOIN contributors c ON c.handle = ce.handle
|
||||
WHERE {where_sql}
|
||||
GROUP BY ce.handle, COALESCE(c.kind, 'person')
|
||||
ORDER BY ci DESC, last_contribution DESC
|
||||
LIMIT ?
|
||||
""", (*params, limit + 1)).fetchall() # +1 to detect overflow
|
||||
|
||||
has_more = len(rows) > limit
|
||||
rows = rows[:limit]
|
||||
|
||||
# Total count of distinct handles matching filters (without limit)
|
||||
total_row = conn.execute(f"""
|
||||
SELECT COUNT(DISTINCT ce.handle) AS total
|
||||
FROM contribution_events ce
|
||||
LEFT JOIN contributors c ON c.handle = ce.handle
|
||||
WHERE {where_sql}
|
||||
""", params).fetchone()
|
||||
total = total_row["total"] if total_row else 0
|
||||
|
||||
leaderboard = []
|
||||
for r in rows:
|
||||
leaderboard.append({
|
||||
"handle": r["handle"],
|
||||
"kind": r["kind"],
|
||||
"ci": r["ci"],
|
||||
"ci_breakdown": {
|
||||
"author": round(r["ci_author"] or 0, 4),
|
||||
"challenger": round(r["ci_challenger"] or 0, 4),
|
||||
"synthesizer": round(r["ci_synthesizer"] or 0, 4),
|
||||
"originator": round(r["ci_originator"] or 0, 4),
|
||||
"evaluator": round(r["ci_evaluator"] or 0, 4),
|
||||
},
|
||||
"events_count": r["events_count"],
|
||||
"domain_count": r["domain_count"],
|
||||
"pr_count": r["pr_count"],
|
||||
"first_contribution": r["first_contribution"],
|
||||
"last_contribution": r["last_contribution"],
|
||||
})
|
||||
|
||||
return web.json_response({
|
||||
"window": window_label,
|
||||
"domain": domain,
|
||||
"kind_filter": kind,
|
||||
"total": total,
|
||||
"shown": len(leaderboard),
|
||||
"has_more": has_more,
|
||||
"source": "contribution_events", # explicit so consumers know the data origin
|
||||
"leaderboard": leaderboard,
|
||||
})
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def register_leaderboard_routes(app: web.Application):
|
||||
"""Register /api/leaderboard. Requires app['db_path'] to be set."""
|
||||
app.router.add_get("/api/leaderboard", handle_leaderboard)
|
||||
Loading…
Reference in a new issue