diff --git a/diagnostics/app.py b/diagnostics/app.py index 59eb5dd..f677287 100644 --- a/diagnostics/app.py +++ b/diagnostics/app.py @@ -180,28 +180,94 @@ def _version_changes(conn, days: int = 30) -> list[dict]: return changes -def _contributor_leaderboard(conn, limit: int = 20) -> list[dict]: - """Top contributors by CI score.""" +def _has_column(conn, table: str, column: str) -> bool: + """Check if a column exists in a table (graceful schema migration support).""" + cols = conn.execute(f"PRAGMA table_info({table})").fetchall() + return any(c["name"] == column for c in cols) + + +def _contributor_leaderboard(conn, limit: int = 20, view: str = "principal") -> list[dict]: + """Top contributors by CI score. + + view="agent" — one row per contributor handle (original behavior) + view="principal" — rolls up agent contributions to their principal (human) + """ + has_principal = _has_column(conn, "contributors", "principal") + rows = conn.execute( "SELECT handle, tier, claims_merged, sourcer_count, extractor_count, " - "challenger_count, synthesizer_count, reviewer_count, domains, last_contribution " - "FROM contributors ORDER BY claims_merged DESC LIMIT ?", - (limit,), + "challenger_count, synthesizer_count, reviewer_count, domains, last_contribution" + + (", principal" if has_principal else "") + + " FROM contributors ORDER BY claims_merged DESC", ).fetchall() - weights = {"sourcer": 0.15, "extractor": 0.40, "challenger": 0.20, "synthesizer": 0.15, "reviewer": 0.10} - result = [] - for r in rows: - ci = sum((r[f"{role}_count"] or 0) * w for role, w in weights.items()) - result.append({ - "handle": r["handle"], - "tier": r["tier"], - "claims_merged": r["claims_merged"] or 0, - "ci": round(ci, 2), - "domains": json.loads(r["domains"]) if r["domains"] else [], - "last_contribution": r["last_contribution"], - }) - return sorted(result, key=lambda x: x["ci"], reverse=True) + # Weights reward quality over volume (Cory-approved) + weights = {"sourcer": 0.15, "extractor": 0.05, "challenger": 0.35, "synthesizer": 0.25, "reviewer": 0.20} + role_keys = list(weights.keys()) + + if view == "principal" and has_principal: + # Aggregate by principal — agents with a principal roll up to the human + buckets: dict[str, dict] = {} + for r in rows: + principal = r["principal"] + key = principal if principal else r["handle"] + if key not in buckets: + buckets[key] = { + "handle": key, + "tier": r["tier"], + "claims_merged": 0, + "domains": set(), + "last_contribution": None, + "agents": [], + **{f"{role}_count": 0 for role in role_keys}, + } + b = buckets[key] + b["claims_merged"] += r["claims_merged"] or 0 + for role in role_keys: + b[f"{role}_count"] += r[f"{role}_count"] or 0 + if r["domains"]: + b["domains"].update(json.loads(r["domains"])) + if r["last_contribution"]: + if not b["last_contribution"] or r["last_contribution"] > b["last_contribution"]: + b["last_contribution"] = r["last_contribution"] + # Upgrade tier (veteran > contributor > new) + tier_rank = {"veteran": 2, "contributor": 1, "new": 0} + if tier_rank.get(r["tier"], 0) > tier_rank.get(b["tier"], 0): + b["tier"] = r["tier"] + if principal: + b["agents"].append(r["handle"]) + + result = [] + for b in buckets.values(): + ci = sum(b[f"{role}_count"] * w for role, w in weights.items()) + result.append({ + "handle": b["handle"], + "tier": b["tier"], + "claims_merged": b["claims_merged"], + "ci": round(ci, 2), + "domains": sorted(b["domains"])[:5], + "last_contribution": b["last_contribution"], + "agents": b["agents"], + }) + else: + # By-agent view (original behavior) + result = [] + for r in rows: + ci = sum((r[f"{role}_count"] or 0) * w for role, w in weights.items()) + entry = { + "handle": r["handle"], + "tier": r["tier"], + "claims_merged": r["claims_merged"] or 0, + "ci": round(ci, 2), + "domains": json.loads(r["domains"]) if r["domains"] else [], + "last_contribution": r["last_contribution"], + } + if has_principal: + entry["principal"] = r["principal"] + result.append(entry) + + result = sorted(result, key=lambda x: x["ci"], reverse=True) + return result[:limit] # ─── Vital signs (Vida's five) ─────────────────────────────────────────────── @@ -386,7 +452,8 @@ async def handle_dashboard(request): snapshots = _snapshot_history(conn, days=7) changes = _version_changes(conn, days=30) vital_signs = _compute_vital_signs(conn) - contributors = _contributor_leaderboard(conn, limit=10) + contributors_principal = _contributor_leaderboard(conn, limit=10, view="principal") + contributors_agent = _contributor_leaderboard(conn, limit=10, view="agent") except sqlite3.Error as e: return web.Response( text=_render_error(f"Pipeline database unavailable: {e}"), @@ -394,7 +461,7 @@ async def handle_dashboard(request): status=503, ) now = datetime.now(timezone.utc) - html = _render_dashboard(metrics, snapshots, changes, vital_signs, contributors, now) + html = _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, now) return web.Response(text=html, content_type="text/html") @@ -420,10 +487,19 @@ async def handle_api_vital_signs(request): async def handle_api_contributors(request): - """GET /api/contributors — contributor leaderboard.""" + """GET /api/contributors — contributor leaderboard. + + Query params: + limit: max entries (default 50) + view: "principal" (default, rolls up agents) or "agent" (one row per handle) + """ conn = _conn(request) limit = int(request.query.get("limit", "50")) - return web.json_response({"contributors": _contributor_leaderboard(conn, limit)}) + view = request.query.get("view", "principal") + if view not in ("principal", "agent"): + view = "principal" + contributors = _contributor_leaderboard(conn, limit, view=view) + return web.json_response({"contributors": contributors, "view": view}) async def handle_api_domains(request): @@ -445,7 +521,7 @@ def _render_error(message: str) -> str:
{message}
Check if teleo-pipeline.service is running and pipeline.db exists.