diff --git a/diagnostics/activity_feed_api.py b/diagnostics/activity_feed_api.py new file mode 100644 index 0000000..a21beb6 --- /dev/null +++ b/diagnostics/activity_feed_api.py @@ -0,0 +1,212 @@ +"""Activity feed API — serves contribution events from pipeline.db.""" +import re +import sqlite3 +import math +import time +from aiohttp import web + +DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" +_cache = {"data": None, "ts": 0} +CACHE_TTL = 60 # 1 minute — activity should feel fresh + + +def _get_conn(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA busy_timeout = 10000") + return conn + + +def _classify_event(branch, description, commit_type): + if commit_type != "knowledge": + return None + if branch and branch.startswith("extract/"): + return "create" + if branch and branch.startswith("reweave/"): + return "enrich" + if branch and branch.startswith("challenge/"): + return "challenge" + if description and "challenged_by" in description.lower(): + return "challenge" + if branch and branch.startswith("enrich/"): + return "enrich" + return "create" + + +def _normalize_contributor(submitted_by, agent): + if submitted_by and submitted_by.strip(): + name = submitted_by.strip().lstrip("@") + return name + if agent and agent.strip() and agent != "pipeline": + return agent.strip() + return "pipeline" + + +def _summary_from_branch(branch): + if not branch: + return "" + parts = branch.split("/", 1) + if len(parts) < 2: + return "" + slug = parts[1] + slug = re.sub(r"^[\d-]+-", "", slug) # strip date prefix + slug = re.sub(r"-[a-f0-9]{4}$", "", slug) # strip hash suffix + return slug.replace("-", " ").strip().capitalize() + + +def _extract_claim_slugs(description, branch=None): + if not description: + if branch: + parts = branch.split("/", 1) + if len(parts) > 1: + return [parts[1][:120]] + return [] + titles = [t.strip() for t in description.split("|") if t.strip()] + slugs = [] + for title in titles: + slug = title.lower().strip() + slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug) + slug = slug.replace(" ", "-").strip("-") + if len(slug) > 10: + slugs.append(slug[:120]) + return slugs + + +def _hot_score(challenge_count, enrich_count, signal_count, hours_since): + numerator = challenge_count * 3 + enrich_count * 2 + signal_count + denominator = max(hours_since, 0.5) ** 1.5 + return numerator / denominator + + +def _build_events(): + conn = _get_conn() + try: + rows = conn.execute(""" + SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, + p.merged_at, p.description, p.commit_type, p.cost_usd + FROM prs p + WHERE p.status = 'merged' + AND p.commit_type = 'knowledge' + AND p.merged_at IS NOT NULL + ORDER BY p.merged_at DESC + LIMIT 2000 + """).fetchall() + + events = [] + claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} + + for row in rows: + event_type = _classify_event(row["branch"], row["description"], row["commit_type"]) + if not event_type: + continue + + contributor = _normalize_contributor(row["submitted_by"], row["agent"]) + slugs = _extract_claim_slugs(row["description"], row["branch"]) + merged_at = row["merged_at"] or "" + + ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40} + ci_earned = ci_map.get(event_type, 0) + + for slug in slugs: + if slug not in claim_activity: + claim_activity[slug] = { + "challenges": 0, "enriches": 0, "signals": 0, + "first_seen": merged_at, + } + if event_type == "challenge": + claim_activity[slug]["challenges"] += 1 + elif event_type == "enrich": + claim_activity[slug]["enriches"] += 1 + else: + claim_activity[slug]["signals"] += 1 + + summary_text = "" + if row["description"]: + first_title = row["description"].split("|")[0].strip() + if len(first_title) > 120: + first_title = first_title[:117] + "..." + summary_text = first_title + elif row["branch"]: + summary_text = _summary_from_branch(row["branch"]) + + for slug in (slugs[:1] if slugs else [""]): + events.append({ + "type": event_type, + "claim_slug": slug, + "domain": row["domain"] or "unknown", + "contributor": contributor, + "timestamp": merged_at, + "ci_earned": round(ci_earned, 2), + "summary": summary_text, + "pr_number": row["number"], + }) + + return events, claim_activity + finally: + conn.close() + + +def _sort_events(events, claim_activity, sort_mode, now_ts): + if sort_mode == "recent": + events.sort(key=lambda e: e["timestamp"], reverse=True) + elif sort_mode == "hot": + def hot_key(e): + slug = e["claim_slug"] + ca = claim_activity.get(slug, {"challenges": 0, "enriches": 0, "signals": 0}) + try: + from datetime import datetime + evt_time = datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) + hours = (now_ts - evt_time.timestamp()) / 3600 + except (ValueError, AttributeError): + hours = 9999 + return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) + events.sort(key=hot_key, reverse=True) + elif sort_mode == "important": + type_rank = {"challenge": 0, "enrich": 1, "create": 2} + events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"]))) + return events + + +async def handle_activity_feed(request): + sort_mode = request.query.get("sort", "recent") + if sort_mode not in ("hot", "recent", "important"): + sort_mode = "recent" + domain = request.query.get("domain", "") + contributor = request.query.get("contributor", "") + try: + limit = min(int(request.query.get("limit", "20")), 100) + except ValueError: + limit = 20 + try: + offset = max(int(request.query.get("offset", "0")), 0) + except ValueError: + offset = 0 + + now = time.time() + if _cache["data"] is None or (now - _cache["ts"]) > CACHE_TTL: + _cache["data"] = _build_events() + _cache["ts"] = now + + events, claim_activity = _cache["data"] + + filtered = events + if domain: + filtered = [e for e in filtered if e["domain"] == domain] + if contributor: + filtered = [e for e in filtered if e["contributor"] == contributor] + + sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) + total = len(sorted_events) + page = sorted_events[offset:offset + limit] + + return web.json_response({ + "events": page, + "total": total, + "sort": sort_mode, + "offset": offset, + "limit": limit, + }, headers={"Access-Control-Allow-Origin": "*"}) + + +def register(app): + app.router.add_get("/api/activity-feed", handle_activity_feed)