"""Activity feed API — serves contribution events from pipeline.db.""" import re import sqlite3 import math import time from aiohttp import web DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" _cache = {"data": None, "ts": 0} CACHE_TTL = 60 # 1 minute — activity should feel fresh # commit_types we surface in the activity feed. `pipeline` is system # maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden. _FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave") # Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're # inbox archive filenames, not claim slugs. Used as a fallback signal when # branch/description heuristics miss (e.g. populated descriptions that # happen to be source titles, not claim insights). _SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$") def _get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA busy_timeout = 10000") return conn def _is_source_slug(slug): return bool(slug and _SOURCE_SLUG_PATTERN.match(slug)) def _classify_event(branch, description, commit_type, candidate_slug=None): """Return one of: create | enrich | challenge | source | None. Source-archive PRs are extract/* branches that filed a source into inbox/archive/ but didn't produce a claim. Two signals classify them as 'source' (defense in depth): 1. extract/* branch with empty description (no claim title produced) 2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern) """ commit_type_l = (commit_type or "").lower() branch = branch or "" description_lower = (description or "").lower() has_desc = bool(description and description.strip()) if commit_type_l not in _FEED_COMMIT_TYPES: return None # Explicit challenge signals win first. if (commit_type_l == "challenge" or branch.startswith("challenge/") or "challenged_by" in description_lower): return "challenge" # Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich. if (commit_type_l == "enrich" or branch.startswith("enrich/") or branch.startswith("reweave/")): return "enrich" # Source-only: extract/* with no claim description means inbox archive # landed but no domain claim was written. if branch.startswith("extract/") and not has_desc: return "source" # Belt-and-suspenders: if the slug we'd surface to the frontend looks # like an inbox archive filename (date-prefix-hash), treat as source # regardless of branch/commit_type/description state. Catches cases # where description leaked but is just a source title, not a claim. if _is_source_slug(candidate_slug): return "source" # Everything else with a description is a new claim. return "create" def _normalize_contributor(submitted_by, agent): if submitted_by and submitted_by.strip(): name = submitted_by.strip().lstrip("@") return name if agent and agent.strip() and agent != "pipeline": return agent.strip() return "pipeline" def _summary_from_branch(branch): if not branch: return "" parts = branch.split("/", 1) if len(parts) < 2: return "" slug = parts[1] slug = re.sub(r"^[\d-]+-", "", slug) # strip date prefix slug = re.sub(r"-[a-f0-9]{4}$", "", slug) # strip hash suffix return slug.replace("-", " ").strip().capitalize() def _extract_claim_slugs(description, branch=None): if not description: if branch: parts = branch.split("/", 1) if len(parts) > 1: return [parts[1][:120]] return [] titles = [t.strip() for t in description.split("|") if t.strip()] slugs = [] for title in titles: slug = title.lower().strip() slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug) slug = slug.replace(" ", "-").strip("-") if len(slug) > 10: slugs.append(slug[:120]) return slugs def _hot_score(challenge_count, enrich_count, signal_count, hours_since): numerator = challenge_count * 3 + enrich_count * 2 + signal_count denominator = max(hours_since, 0.5) ** 1.5 return numerator / denominator def _build_events(): conn = _get_conn() try: placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES)) rows = conn.execute(f""" SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, p.merged_at, p.description, p.commit_type, p.cost_usd, p.source_channel, p.source_path FROM prs p WHERE p.status = 'merged' AND p.commit_type IN ({placeholders}) AND p.merged_at IS NOT NULL ORDER BY p.merged_at DESC LIMIT 2000 """, _FEED_COMMIT_TYPES).fetchall() events = [] claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} for row in rows: slugs = _extract_claim_slugs(row["description"], row["branch"]) candidate_slug = slugs[0] if slugs else "" event_type = _classify_event( row["branch"], row["description"], row["commit_type"], candidate_slug=candidate_slug, ) if not event_type: continue contributor = _normalize_contributor(row["submitted_by"], row["agent"]) merged_at = row["merged_at"] or "" ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15} ci_earned = ci_map.get(event_type, 0) # Source events never carry a claim_slug — no claim was written — # so the frontend can't produce a 404-ing claim link. if event_type == "source": summary_text = _summary_from_branch(row["branch"]) source_slug = ( _summary_from_branch(row["branch"]).lower().replace(" ", "-") or row["branch"] ) events.append({ "type": "source", "claim_slug": "", "source_slug": source_slug, "domain": row["domain"] or "unknown", "contributor": contributor, "timestamp": merged_at, "ci_earned": round(ci_earned, 2), "summary": summary_text, "pr_number": row["number"], "source_channel": row["source_channel"] or "unknown", }) continue for slug in slugs: if slug not in claim_activity: claim_activity[slug] = { "challenges": 0, "enriches": 0, "signals": 0, "first_seen": merged_at, } if event_type == "challenge": claim_activity[slug]["challenges"] += 1 elif event_type == "enrich": claim_activity[slug]["enriches"] += 1 else: claim_activity[slug]["signals"] += 1 summary_text = "" if row["description"]: first_title = row["description"].split("|")[0].strip() if len(first_title) > 120: first_title = first_title[:117] + "..." summary_text = first_title elif row["branch"]: summary_text = _summary_from_branch(row["branch"]) for slug in (slugs[:1] if slugs else [""]): events.append({ "type": event_type, "claim_slug": slug, "domain": row["domain"] or "unknown", "contributor": contributor, "timestamp": merged_at, "ci_earned": round(ci_earned, 2), "summary": summary_text, "pr_number": row["number"], "source_channel": row["source_channel"] or "unknown", }) return events, claim_activity finally: conn.close() def _sort_events(events, claim_activity, sort_mode, now_ts): if sort_mode == "recent": events.sort(key=lambda e: e["timestamp"], reverse=True) elif sort_mode == "hot": def hot_key(e): slug = e["claim_slug"] ca = claim_activity.get(slug, {"challenges": 0, "enriches": 0, "signals": 0}) try: from datetime import datetime evt_time = datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) hours = (now_ts - evt_time.timestamp()) / 3600 except (ValueError, AttributeError): hours = 9999 return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) events.sort(key=hot_key, reverse=True) elif sort_mode == "important": type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3} events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"]))) return events async def handle_activity_feed(request): sort_mode = request.query.get("sort", "recent") if sort_mode not in ("hot", "recent", "important"): sort_mode = "recent" domain = request.query.get("domain", "") contributor = request.query.get("contributor", "") type_param = request.query.get("type", "") type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None try: limit = min(int(request.query.get("limit", "20")), 100) except ValueError: limit = 20 try: offset = max(int(request.query.get("offset", "0")), 0) except ValueError: offset = 0 now = time.time() if _cache["data"] is None or (now - _cache["ts"]) > CACHE_TTL: _cache["data"] = _build_events() _cache["ts"] = now events, claim_activity = _cache["data"] filtered = events if domain: filtered = [e for e in filtered if e["domain"] == domain] if contributor: filtered = [e for e in filtered if e["contributor"] == contributor] if type_filter: filtered = [e for e in filtered if e["type"] in type_filter] sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) total = len(sorted_events) page = sorted_events[offset:offset + limit] return web.json_response({ "events": page, "total": total, "sort": sort_mode, "offset": offset, "limit": limit, }, headers={"Access-Control-Allow-Origin": "*"}) def register(app): app.router.add_get("/api/activity-feed", handle_activity_feed)