"""Activity feed API — serves contribution events from pipeline.db.""" import re import sqlite3 import math import time from aiohttp import web DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" _cache = {"data": None, "ts": 0} CACHE_TTL = 60 # 1 minute — activity should feel fresh # commit_types we surface in the activity feed. `pipeline` is system # maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden. _FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave") # Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're # inbox archive filenames, not claim slugs. Used as a fallback signal when # branch/description heuristics miss (e.g. populated descriptions that # happen to be source titles, not claim insights). _SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$") def _get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA busy_timeout = 10000") return conn def _is_source_slug(slug): return bool(slug and _SOURCE_SLUG_PATTERN.match(slug)) def _classify_event(branch, description, commit_type, candidate_slug=None): """Return one of: create | enrich | challenge | source | session_digest | None. Source-archive PRs are extract/* branches that filed a source into inbox/archive/ but didn't produce a claim. Session-digest PRs are agent research/entity commits with no per-claim description — they represent session-level rollups, not specific knowledge artifacts. """ commit_type_l = (commit_type or "").lower() branch = branch or "" description_lower = (description or "").lower() has_desc = bool(description and description.strip()) if commit_type_l not in _FEED_COMMIT_TYPES: return None # Explicit challenge signals win first. if (commit_type_l == "challenge" or branch.startswith("challenge/") or "challenged_by" in description_lower): return "challenge" # Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich. if (commit_type_l == "enrich" or branch.startswith("enrich/") or branch.startswith("reweave/")): return "enrich" # Research and entity commits with no description are session-level # rollups (e.g. astra/research-2026-05-11). They have no claim to # link to — surface as session_digest, not as a phantom create. if commit_type_l in ("research", "entity") and not has_desc: return "session_digest" # Source-only: extract/* with no claim description means inbox archive # landed but no domain claim was written. if branch.startswith("extract/") and not has_desc: return "source" # Belt-and-suspenders: if the slug we'd surface to the frontend looks # like an inbox archive filename (date-prefix-hash), treat as source # regardless of branch/commit_type/description state. Catches cases # where description leaked but is just a source title, not a claim. if _is_source_slug(candidate_slug): return "source" # Everything else with a description is a new claim. return "create" # Internal classifier value -> canonical `kind` enum returned to frontend. _KIND_MAP = { "create": "claim_merged", "enrich": "claim_enriched", "challenge": "claim_challenged", "source": "source_archived", "session_digest": "session_digest", } def _archive_slug_from_branch(branch): """For extract/YYYY-MM-DD-...-HASH4, return YYYY-MM-DD-... (keep date, drop the 4-hex hash suffix). Matches inbox/archive filename convention. """ if not branch or "/" not in branch: return "" slug = branch.split("/", 1)[1] return re.sub(r"-[a-f0-9]{4}$", "", slug) def _source_target_url(domain, archive_slug): """Forgejo blob URL for an archived source file. Falls back to the repo-wide inbox/archive directory when domain is unknown so the link still resolves to something useful instead of a 404. """ if not archive_slug: return None domain = (domain or "").strip() if not domain or domain == "unknown": return "https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive" return ( "https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive/" f"{domain}/{archive_slug}.md" ) def _claim_target_url(claim_slug): if not claim_slug: return None return f"/claims/{claim_slug}" def _normalize_contributor(submitted_by, agent): if submitted_by and submitted_by.strip(): name = submitted_by.strip().lstrip("@") return name if agent and agent.strip() and agent != "pipeline": return agent.strip() return "pipeline" def _summary_from_branch(branch): if not branch: return "" parts = branch.split("/", 1) if len(parts) < 2: return "" slug = parts[1] slug = re.sub(r"^[\d-]+-", "", slug) # strip date prefix slug = re.sub(r"-[a-f0-9]{4}$", "", slug) # strip hash suffix return slug.replace("-", " ").strip().capitalize() def _extract_claim_slugs(description, branch=None): if not description: if branch: parts = branch.split("/", 1) if len(parts) > 1: return [parts[1]] return [] titles = [t.strip() for t in description.split("|") if t.strip()] slugs = [] for title in titles: slug = title.lower().strip() slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug) slug = slug.replace(" ", "-").strip("-") if len(slug) > 10: slugs.append(slug) return slugs def _hot_score(challenge_count, enrich_count, signal_count, hours_since): numerator = challenge_count * 3 + enrich_count * 2 + signal_count denominator = max(hours_since, 0.5) ** 1.5 return numerator / denominator def _build_events(): conn = _get_conn() try: placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES)) rows = conn.execute(f""" SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, p.merged_at, p.description, p.commit_type, p.cost_usd, p.source_channel, p.source_path FROM prs p WHERE p.status = 'merged' AND p.commit_type IN ({placeholders}) AND p.merged_at IS NOT NULL ORDER BY p.merged_at DESC LIMIT 2000 """, _FEED_COMMIT_TYPES).fetchall() events = [] claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} for row in rows: slugs = _extract_claim_slugs(row["description"], row["branch"]) candidate_slug = slugs[0] if slugs else "" event_type = _classify_event( row["branch"], row["description"], row["commit_type"], candidate_slug=candidate_slug, ) if not event_type: continue contributor = _normalize_contributor(row["submitted_by"], row["agent"]) merged_at = row["merged_at"] or "" domain = row["domain"] or "unknown" kind = _KIND_MAP.get(event_type, event_type) ci_map = { "create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15, "session_digest": 0.05, } ci_earned = ci_map.get(event_type, 0) # Source events never carry a claim_slug — no claim was written. # target_url points at the archived file on Forgejo instead. if event_type == "source": archive_slug = _archive_slug_from_branch(row["branch"]) summary_text = _summary_from_branch(row["branch"]) source_display_slug = ( summary_text.lower().replace(" ", "-") or row["branch"] ) events.append({ "kind": kind, "type": "source", "target_url": _source_target_url(domain, archive_slug), "claim_slug": "", "source_slug": source_display_slug, "domain": domain, "contributor": contributor, "timestamp": merged_at, "ci_earned": round(ci_earned, 2), "summary": summary_text, "pr_number": row["number"], "source_channel": row["source_channel"] or "unknown", }) continue # Session digests have no clickthrough surface yet (per-agent # session pages not built). target_url=null so frontend renders # plain text instead of a broken /claims/research-... link. if event_type == "session_digest": summary_text = _summary_from_branch(row["branch"]) or "Research session" events.append({ "kind": kind, "type": "session_digest", "target_url": None, "claim_slug": "", "domain": domain, "contributor": contributor, "timestamp": merged_at, "ci_earned": round(ci_earned, 2), "summary": summary_text, "pr_number": row["number"], "source_channel": row["source_channel"] or "unknown", }) continue for slug in slugs: if slug not in claim_activity: claim_activity[slug] = { "challenges": 0, "enriches": 0, "signals": 0, "first_seen": merged_at, } if event_type == "challenge": claim_activity[slug]["challenges"] += 1 elif event_type == "enrich": claim_activity[slug]["enriches"] += 1 else: claim_activity[slug]["signals"] += 1 summary_text = "" if row["description"]: first_title = row["description"].split("|")[0].strip() if len(first_title) > 120: first_title = first_title[:117] + "..." summary_text = first_title elif row["branch"]: summary_text = _summary_from_branch(row["branch"]) for slug in (slugs[:1] if slugs else [""]): events.append({ "kind": kind, "type": event_type, "target_url": _claim_target_url(slug), "claim_slug": slug, "domain": domain, "contributor": contributor, "timestamp": merged_at, "ci_earned": round(ci_earned, 2), "summary": summary_text, "pr_number": row["number"], "source_channel": row["source_channel"] or "unknown", }) return events, claim_activity finally: conn.close() def _sort_events(events, claim_activity, sort_mode, now_ts): if sort_mode == "recent": events.sort(key=lambda e: e["timestamp"], reverse=True) elif sort_mode == "hot": def hot_key(e): slug = e["claim_slug"] ca = claim_activity.get(slug, {"challenges": 0, "enriches": 0, "signals": 0}) try: from datetime import datetime evt_time = datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00")) hours = (now_ts - evt_time.timestamp()) / 3600 except (ValueError, AttributeError): hours = 9999 return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) events.sort(key=hot_key, reverse=True) elif sort_mode == "important": type_rank = { "challenge": 0, "enrich": 1, "create": 2, "source": 3, "session_digest": 4, } events.sort(key=lambda e: (type_rank.get(e["type"], 5), -len(e["summary"]))) return events async def handle_activity_feed(request): sort_mode = request.query.get("sort", "recent") if sort_mode not in ("hot", "recent", "important"): sort_mode = "recent" domain = request.query.get("domain", "") contributor = request.query.get("contributor", "") type_param = request.query.get("type", "") type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None try: limit = min(int(request.query.get("limit", "20")), 100) except ValueError: limit = 20 try: offset = max(int(request.query.get("offset", "0")), 0) except ValueError: offset = 0 now = time.time() if _cache["data"] is None or (now - _cache["ts"]) > CACHE_TTL: _cache["data"] = _build_events() _cache["ts"] = now events, claim_activity = _cache["data"] filtered = events if domain: filtered = [e for e in filtered if e["domain"] == domain] if contributor: filtered = [e for e in filtered if e["contributor"] == contributor] if type_filter: # Accept both legacy `type` values (create/enrich/challenge/source/ # session_digest) and canonical `kind` values (claim_merged/etc.) so # callers can migrate at their own pace. filtered = [ e for e in filtered if e["type"] in type_filter or e.get("kind") in type_filter ] sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) total = len(sorted_events) page = sorted_events[offset:offset + limit] return web.json_response({ "events": page, "total": total, "sort": sort_mode, "offset": offset, "limit": limit, }, headers={"Access-Control-Allow-Origin": "*"}) def register(app): app.router.add_get("/api/activity-feed", handle_activity_feed)