"""Contributor profile API — GET /api/contributors/{handle}""" import sqlite3 import json import os import re import subprocess from datetime import datetime DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db") SYSTEM_ACCOUNTS = {"pipeline", "unknown", "teleo-agents", "teleo pipeline"} CODEX_PATH = "/opt/teleo-eval/workspaces/main" CI_WEIGHTS = { "sourcer": 0.15, "extractor": 0.05, "challenger": 0.35, "synthesizer": 0.25, "reviewer": 0.20, } FOUNDING_CUTOFF = "2026-03-15" BADGE_DEFS = { "FOUNDING CONTRIBUTOR": {"rarity": "limited", "desc": "Contributed during pre-launch phase"}, "BELIEF MOVER": {"rarity": "rare", "desc": "Challenge that led to a claim revision"}, "KNOWLEDGE SOURCER": {"rarity": "uncommon", "desc": "Source that generated 3+ claims"}, "DOMAIN SPECIALIST": {"rarity": "rare", "desc": "Top 3 CI contributor in a domain"}, "VETERAN": {"rarity": "uncommon", "desc": "10+ accepted contributions"}, "FIRST BLOOD": {"rarity": "common", "desc": "First contribution of any kind"}, "CONTRIBUTOR": {"rarity": "common", "desc": "Account created + first accepted contribution"}, } def _get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def _compute_ci(row): total = 0 for role, weight in CI_WEIGHTS.items(): total += (row.get(f"{role}_count", 0) or 0) * weight return round(total, 2) def _compute_badges(handle, row, domain_breakdown, conn): badges = [] first = row.get("first_contribution", "") if first and first <= FOUNDING_CUTOFF: badges.append("FOUNDING CONTRIBUTOR") claims = row.get("claims_merged", 0) or 0 if claims > 0: badges.append("CONTRIBUTOR") badges.append("FIRST BLOOD") if claims >= 10: badges.append("VETERAN") challenger = row.get("challenger_count", 0) or 0 challenge_ci = row.get("_challenge_count_from_scores", 0) if challenger > 0 or challenge_ci > 0: badges.append("BELIEF MOVER") sourcer = row.get("sourcer_count", 0) or 0 if sourcer >= 3: badges.append("KNOWLEDGE SOURCER") return badges def _get_domain_breakdown(handle, conn): rows = conn.execute(""" SELECT domain, COUNT(*) as cnt FROM prs WHERE status='merged' AND (LOWER(agent)=LOWER(?) OR LOWER(submitted_by)=LOWER(?)) AND domain IS NOT NULL GROUP BY domain ORDER BY cnt DESC """, (handle, handle)).fetchall() return {r["domain"]: r["cnt"] for r in rows} def _get_contribution_timeline(handle, conn, limit=20): rows = conn.execute(""" SELECT number, domain, status, created_at, description, commit_type, source_path FROM prs WHERE status='merged' AND (LOWER(agent)=LOWER(?) OR LOWER(submitted_by)=LOWER(?)) ORDER BY created_at DESC LIMIT ? """, (handle, handle, limit)).fetchall() timeline = [] for r in rows: desc = r["description"] or "" if not desc and r["source_path"]: desc = os.path.basename(r["source_path"]).replace("-", " ").replace(".md", "") timeline.append({ "pr_number": r["number"], "domain": r["domain"], "date": r["created_at"][:10] if r["created_at"] else None, "type": _classify_commit(r["commit_type"]), "summary": desc[:200] if desc else None, }) return timeline def _classify_commit(commit_type): if not commit_type: return "create" ct = commit_type.lower() if "challenge" in ct: return "challenge" if "enrich" in ct or "update" in ct or "reweave" in ct: return "enrich" return "create" def _get_review_stats(handle, conn): rows = conn.execute(""" SELECT outcome, COUNT(*) as cnt FROM review_records WHERE LOWER(agent) = LOWER(?) GROUP BY outcome """, (handle,)).fetchall() stats = {} for r in rows: stats[r["outcome"]] = r["cnt"] return stats def _get_action_ci(handle, conn): """Get action-type CI from contribution_scores table. Checks both exact handle and common variants (with/without suffix). """ h = handle.lower() base = re.sub(r"[-_]\w+\d+$", "", h) variants = list({h, base}) if base and base != h else [h] try: placeholders = ",".join("?" for _ in variants) rows = conn.execute(f""" SELECT event_type, SUM(ci_earned) as total, COUNT(*) as cnt FROM contribution_scores WHERE LOWER(contributor) IN ({placeholders}) GROUP BY event_type """, variants).fetchall() except Exception: return None if not rows: return None breakdown = {} total = 0.0 for r in rows: breakdown[r["event_type"]] = { "count": r["cnt"], "ci": round(r["total"], 4), } total += r["total"] return { "total": round(total, 4), "breakdown": breakdown, } def _get_git_contributor(handle): """Fallback: check git log for contributors not in pipeline.db.""" try: result = subprocess.run( ["git", "log", "--all", "--format=%H|%an|%ae|%aI", "--diff-filter=A", "--", "domains/"], capture_output=True, text=True, cwd=CODEX_PATH, timeout=30 ) if result.returncode != 0: return None claims = [] for line in result.stdout.strip().split("\n"): if not line: continue parts = line.split("|", 3) if len(parts) < 4: continue sha, name, email, date = parts if handle.lower() in name.lower() or handle.lower() in email.lower(): claims.append({"sha": sha, "author": name, "email": email, "date": date[:10]}) if not claims: return None return { "handle": handle, "display_name": claims[0]["author"], "email": claims[0]["email"], "first_contribution": min(c["date"] for c in claims), "last_contribution": max(c["date"] for c in claims), "claims_merged": len(claims), "sourcer_count": 0, "extractor_count": 0, "challenger_count": 0, "synthesizer_count": 0, "reviewer_count": 0, } except Exception: return None def get_contributor_profile(handle): conn = _get_conn() try: row = conn.execute( "SELECT * FROM contributors WHERE LOWER(handle) = LOWER(?)", (handle,) ).fetchone() if row: data = dict(row) else: git_data = _get_git_contributor(handle) if git_data: data = git_data else: return None ci_score = _compute_ci(data) action_ci = _get_action_ci(handle, conn) domain_breakdown = _get_domain_breakdown(handle, conn) timeline = _get_contribution_timeline(handle, conn) review_stats = _get_review_stats(handle, conn) if action_ci and "challenge" in action_ci.get("breakdown", {}): data["_challenge_count_from_scores"] = action_ci["breakdown"]["challenge"]["count"] badges = _compute_badges(handle, data, domain_breakdown, conn) # For git-only contributors, build domain breakdown from git if not domain_breakdown and not row: domain_breakdown = _git_domain_breakdown(handle) hero_badge = None rarity_order = ["limited", "rare", "uncommon", "common"] for rarity in rarity_order: for b in badges: if BADGE_DEFS.get(b, {}).get("rarity") == rarity: hero_badge = b break if hero_badge: break role_breakdown = { "sourcer": data.get("sourcer_count", 0) or 0, "extractor": data.get("extractor_count", 0) or 0, "challenger": data.get("challenger_count", 0) or 0, "synthesizer": data.get("synthesizer_count", 0) or 0, "reviewer": data.get("reviewer_count", 0) or 0, } total_roles = sum(role_breakdown.values()) role_pct = {} for k, v in role_breakdown.items(): role_pct[k] = round(v / total_roles * 100) if total_roles > 0 else 0 return { "handle": data.get("handle", handle), "display_name": data.get("display_name"), "ci_score": ci_score, "action_ci": action_ci, "primary_ci": action_ci["total"] if action_ci else ci_score, "hero_badge": hero_badge, "badges": [{"name": b, **BADGE_DEFS.get(b, {})} for b in badges], "joined": data.get("first_contribution"), "last_active": data.get("last_contribution"), "claims_merged": data.get("claims_merged", 0) or 0, "principal": data.get("principal"), "role_breakdown": role_breakdown, "role_percentages": role_pct, "domain_breakdown": domain_breakdown, "review_stats": review_stats, "contribution_timeline": timeline, "active_domains": list(domain_breakdown.keys()), } finally: conn.close() def _git_domain_breakdown(handle): """For git-only contributors, count claims by domain from file paths.""" try: result = subprocess.run( ["git", "log", "--all", "--name-only", "--format=COMMIT|%an", "--diff-filter=A", "--", "domains/"], capture_output=True, text=True, cwd=CODEX_PATH, timeout=30 ) if result.returncode != 0: return {} domains = {} current_match = False for line in result.stdout.strip().split("\n"): if line.startswith("COMMIT|"): author = line.split("|", 1)[1] current_match = handle.lower() in author.lower() elif current_match and line.startswith("domains/"): parts = line.split("/") if len(parts) >= 2: domain = parts[1] domains[domain] = domains.get(domain, 0) + 1 return domains except Exception: return {} async def handle_contributor_profile(request): from aiohttp import web handle = request.match_info["handle"] profile = get_contributor_profile(handle) if profile is None: return web.json_response({"error": f"Contributor '{handle}' not found"}, status=404) return web.json_response(profile) async def handle_contributors_list(request): from aiohttp import web conn = _get_conn() try: min_claims = int(request.query.get("min_claims", "1")) rows = conn.execute(""" SELECT handle, display_name, first_contribution, last_contribution, sourcer_count, extractor_count, challenger_count, synthesizer_count, reviewer_count, claims_merged, principal FROM contributors WHERE claims_merged >= ? ORDER BY claims_merged DESC """, (min_claims,)).fetchall() contributors = [] for r in rows: data = dict(r) if data["handle"].lower() in SYSTEM_ACCOUNTS: continue ci = _compute_ci(data) action_ci = _get_action_ci(data["handle"], conn) action_total = action_ci["total"] if action_ci else 0.0 contributors.append({ "handle": data["handle"], "display_name": data["display_name"], "ci_score": ci, "action_ci": action_total, "primary_ci": action_total if action_total > 0 else ci, "claims_merged": data["claims_merged"], "first_contribution": data["first_contribution"], "last_contribution": data["last_contribution"], "principal": data["principal"], }) return web.json_response({ "contributors": contributors, "total": len(contributors), }) finally: conn.close() def register_contributor_routes(app): app.router.add_get("/api/contributors/list", handle_contributors_list) app.router.add_get("/api/contributors/{handle}", handle_contributor_profile)