"""Activity feed API — serves contribution events from pipeline.db.""" import re import sqlite3 import math import time from aiohttp import web DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" _cache = {"data": None, "ts": 0} CACHE_TTL = 60 # 1 minute — activity should feel fresh def _get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA busy_timeout = 10000") return conn def _classify_event(branch, description, commit_type): if commit_type != "knowledge": return None if branch and branch.startswith("extract/"): return "create" if branch and branch.startswith("reweave/"): return "enrich" if branch and branch.startswith("challenge/"): return "challenge" if description and "challenged_by" in description.lower(): return "challenge" if branch and branch.startswith("enrich/"): return "enrich" return "create" def _normalize_contributor(submitted_by, agent): if submitted_by and submitted_by.strip(): name = submitted_by.strip().lstrip("@") return name if agent and agent.strip() and agent != "pipeline": return agent.strip() return "pipeline" def _summary_from_branch(branch): if not branch: return "" parts = branch.split("/", 1) if len(parts) < 2: return "" slug = parts[1] slug = re.sub(r"^[\d-]+-", "", slug) # strip date prefix slug = re.sub(r"-[a-f0-9]{4}$", "", slug) # strip hash suffix return slug.replace("-", " ").strip().capitalize() def _extract_claim_slugs(description, branch=None): if not description: if branch: parts = branch.split("/", 1) if len(parts) > 1: return [parts[1][:120]] return [] titles = [t.strip() for t in description.split("|") if t.strip()] slugs = [] for title in titles: slug = title.lower().strip() slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug) slug = slug.replace(" ", "-").strip("-") if len(slug) > 10: slugs.append(slug[:120]) return slugs def _hot_score(challenge_count, enrich_count, signal_count, hours_since): numerator = challenge_count * 3 + enrich_count * 2 + signal_count denominator = max(hours_since, 0.5) ** 1.5 return numerator / denominator def _build_events(): conn = _get_conn() try: rows = conn.execute(""" SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, p.merged_at, p.description, p.commit_type, p.cost_usd FROM prs p WHERE p.status = 'merged' AND p.commit_type = 'knowledge' AND p.merged_at IS NOT NULL ORDER BY p.merged_at DESC LIMIT 2000 """).fetchall() events = [] claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} for row in rows: event_type = _classify_event(row["branch"], row["description"], row["commit_type"]) if not event_type: continue contributor = _normalize_contributor(row["submitted_by"], row["agent"]) slugs = _extract_claim_slugs(row["description"], row["branch"]) merged_at = row["merged_at"] or "" ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40} ci_earned = ci_map.get(event_type, 0) for slug in slugs: if slug not in claim_activity: claim_activity[slug] = { "challenges": 0, "enriches": 0, "signals": 0, "first_seen": merged_at, } if event_type == "challenge": claim_activity[slug]["challenges"] += 1 elif event_type == "enrich": claim_activity[slug]["enriches"] += 1 else: claim_activity[slug]["signals"] += 1 summary_text = "" if row["description"]: first_title = row["description"].split("|")[0].strip() if len(first_title) > 120: first_title = first_title[:117] + "..." summary_text = first_title elif row["branch"]: summary_text = _summary_from_branch(row["branch"]) for slug in (slugs[:1] if slugs else [""]): events.append({ "type": event_type, "claim_slug": slug, "domain": row["domain"] or "unknown", "contributor": contributor, "timestamp": merged_at, "ci_earned": round(ci_earned, 2), "summary": summary_text, "pr_number": row["number"], }) return events, claim_activity finally: conn.close() def _sort_events(events, claim_activity, sort_mode, now_ts): if sort_mode == "recent": events.sort(key=lambda e: e["timestamp"], reverse=True) elif sort_mode == "hot": def hot_key(e): slug = e["claim_slug"] ca = claim_activity.get(slug, {"challenges": 0, "enriches": 0, "signals": 0}) try: from datetime import datetime evt_time = datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) hours = (now_ts - evt_time.timestamp()) / 3600 except (ValueError, AttributeError): hours = 9999 return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) events.sort(key=hot_key, reverse=True) elif sort_mode == "important": type_rank = {"challenge": 0, "enrich": 1, "create": 2} events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"]))) return events async def handle_activity_feed(request): sort_mode = request.query.get("sort", "recent") if sort_mode not in ("hot", "recent", "important"): sort_mode = "recent" domain = request.query.get("domain", "") contributor = request.query.get("contributor", "") try: limit = min(int(request.query.get("limit", "20")), 100) except ValueError: limit = 20 try: offset = max(int(request.query.get("offset", "0")), 0) except ValueError: offset = 0 now = time.time() if _cache["data"] is None or (now - _cache["ts"]) > CACHE_TTL: _cache["data"] = _build_events() _cache["ts"] = now events, claim_activity = _cache["data"] filtered = events if domain: filtered = [e for e in filtered if e["domain"] == domain] if contributor: filtered = [e for e in filtered if e["contributor"] == contributor] sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) total = len(sorted_events) page = sorted_events[offset:offset + limit] return web.json_response({ "events": page, "total": total, "sort": sort_mode, "offset": offset, "limit": limit, }, headers={"Access-Control-Allow-Origin": "*"}) def register(app): app.router.add_get("/api/activity-feed", handle_activity_feed)