feat: add /api/activity-feed endpoint with hot/recent/important sort
Serves contribution events from pipeline.db. Classifies PRs as create/enrich/challenge, normalizes contributors, derives summaries from branch names when descriptions are empty. Hot sort uses challenge*3 + enrich*2 + signal / hours^1.5 decay from event time. Domain and contributor filters, pagination (limit/offset). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
11e026448a
commit
1b27a2de31
1 changed files with 212 additions and 0 deletions
212
diagnostics/activity_feed_api.py
Normal file
212
diagnostics/activity_feed_api.py
Normal file
|
|
@ -0,0 +1,212 @@
|
|||
"""Activity feed API — serves contribution events from pipeline.db."""
|
||||
import re
|
||||
import sqlite3
|
||||
import math
|
||||
import time
|
||||
from aiohttp import web
|
||||
|
||||
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
|
||||
_cache = {"data": None, "ts": 0}
|
||||
CACHE_TTL = 60 # 1 minute — activity should feel fresh
|
||||
|
||||
|
||||
def _get_conn():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA busy_timeout = 10000")
|
||||
return conn
|
||||
|
||||
|
||||
def _classify_event(branch, description, commit_type):
|
||||
if commit_type != "knowledge":
|
||||
return None
|
||||
if branch and branch.startswith("extract/"):
|
||||
return "create"
|
||||
if branch and branch.startswith("reweave/"):
|
||||
return "enrich"
|
||||
if branch and branch.startswith("challenge/"):
|
||||
return "challenge"
|
||||
if description and "challenged_by" in description.lower():
|
||||
return "challenge"
|
||||
if branch and branch.startswith("enrich/"):
|
||||
return "enrich"
|
||||
return "create"
|
||||
|
||||
|
||||
def _normalize_contributor(submitted_by, agent):
|
||||
if submitted_by and submitted_by.strip():
|
||||
name = submitted_by.strip().lstrip("@")
|
||||
return name
|
||||
if agent and agent.strip() and agent != "pipeline":
|
||||
return agent.strip()
|
||||
return "pipeline"
|
||||
|
||||
|
||||
def _summary_from_branch(branch):
|
||||
if not branch:
|
||||
return ""
|
||||
parts = branch.split("/", 1)
|
||||
if len(parts) < 2:
|
||||
return ""
|
||||
slug = parts[1]
|
||||
slug = re.sub(r"^[\d-]+-", "", slug) # strip date prefix
|
||||
slug = re.sub(r"-[a-f0-9]{4}$", "", slug) # strip hash suffix
|
||||
return slug.replace("-", " ").strip().capitalize()
|
||||
|
||||
|
||||
def _extract_claim_slugs(description, branch=None):
|
||||
if not description:
|
||||
if branch:
|
||||
parts = branch.split("/", 1)
|
||||
if len(parts) > 1:
|
||||
return [parts[1][:120]]
|
||||
return []
|
||||
titles = [t.strip() for t in description.split("|") if t.strip()]
|
||||
slugs = []
|
||||
for title in titles:
|
||||
slug = title.lower().strip()
|
||||
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
|
||||
slug = slug.replace(" ", "-").strip("-")
|
||||
if len(slug) > 10:
|
||||
slugs.append(slug[:120])
|
||||
return slugs
|
||||
|
||||
|
||||
def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
|
||||
numerator = challenge_count * 3 + enrich_count * 2 + signal_count
|
||||
denominator = max(hours_since, 0.5) ** 1.5
|
||||
return numerator / denominator
|
||||
|
||||
|
||||
def _build_events():
|
||||
conn = _get_conn()
|
||||
try:
|
||||
rows = conn.execute("""
|
||||
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
|
||||
p.merged_at, p.description, p.commit_type, p.cost_usd
|
||||
FROM prs p
|
||||
WHERE p.status = 'merged'
|
||||
AND p.commit_type = 'knowledge'
|
||||
AND p.merged_at IS NOT NULL
|
||||
ORDER BY p.merged_at DESC
|
||||
LIMIT 2000
|
||||
""").fetchall()
|
||||
|
||||
events = []
|
||||
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
|
||||
|
||||
for row in rows:
|
||||
event_type = _classify_event(row["branch"], row["description"], row["commit_type"])
|
||||
if not event_type:
|
||||
continue
|
||||
|
||||
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
|
||||
slugs = _extract_claim_slugs(row["description"], row["branch"])
|
||||
merged_at = row["merged_at"] or ""
|
||||
|
||||
ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40}
|
||||
ci_earned = ci_map.get(event_type, 0)
|
||||
|
||||
for slug in slugs:
|
||||
if slug not in claim_activity:
|
||||
claim_activity[slug] = {
|
||||
"challenges": 0, "enriches": 0, "signals": 0,
|
||||
"first_seen": merged_at,
|
||||
}
|
||||
if event_type == "challenge":
|
||||
claim_activity[slug]["challenges"] += 1
|
||||
elif event_type == "enrich":
|
||||
claim_activity[slug]["enriches"] += 1
|
||||
else:
|
||||
claim_activity[slug]["signals"] += 1
|
||||
|
||||
summary_text = ""
|
||||
if row["description"]:
|
||||
first_title = row["description"].split("|")[0].strip()
|
||||
if len(first_title) > 120:
|
||||
first_title = first_title[:117] + "..."
|
||||
summary_text = first_title
|
||||
elif row["branch"]:
|
||||
summary_text = _summary_from_branch(row["branch"])
|
||||
|
||||
for slug in (slugs[:1] if slugs else [""]):
|
||||
events.append({
|
||||
"type": event_type,
|
||||
"claim_slug": slug,
|
||||
"domain": row["domain"] or "unknown",
|
||||
"contributor": contributor,
|
||||
"timestamp": merged_at,
|
||||
"ci_earned": round(ci_earned, 2),
|
||||
"summary": summary_text,
|
||||
"pr_number": row["number"],
|
||||
})
|
||||
|
||||
return events, claim_activity
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def _sort_events(events, claim_activity, sort_mode, now_ts):
|
||||
if sort_mode == "recent":
|
||||
events.sort(key=lambda e: e["timestamp"], reverse=True)
|
||||
elif sort_mode == "hot":
|
||||
def hot_key(e):
|
||||
slug = e["claim_slug"]
|
||||
ca = claim_activity.get(slug, {"challenges": 0, "enriches": 0, "signals": 0})
|
||||
try:
|
||||
from datetime import datetime
|
||||
evt_time = datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00"))
|
||||
hours = (now_ts - evt_time.timestamp()) / 3600
|
||||
except (ValueError, AttributeError):
|
||||
hours = 9999
|
||||
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
|
||||
events.sort(key=hot_key, reverse=True)
|
||||
elif sort_mode == "important":
|
||||
type_rank = {"challenge": 0, "enrich": 1, "create": 2}
|
||||
events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"])))
|
||||
return events
|
||||
|
||||
|
||||
async def handle_activity_feed(request):
|
||||
sort_mode = request.query.get("sort", "recent")
|
||||
if sort_mode not in ("hot", "recent", "important"):
|
||||
sort_mode = "recent"
|
||||
domain = request.query.get("domain", "")
|
||||
contributor = request.query.get("contributor", "")
|
||||
try:
|
||||
limit = min(int(request.query.get("limit", "20")), 100)
|
||||
except ValueError:
|
||||
limit = 20
|
||||
try:
|
||||
offset = max(int(request.query.get("offset", "0")), 0)
|
||||
except ValueError:
|
||||
offset = 0
|
||||
|
||||
now = time.time()
|
||||
if _cache["data"] is None or (now - _cache["ts"]) > CACHE_TTL:
|
||||
_cache["data"] = _build_events()
|
||||
_cache["ts"] = now
|
||||
|
||||
events, claim_activity = _cache["data"]
|
||||
|
||||
filtered = events
|
||||
if domain:
|
||||
filtered = [e for e in filtered if e["domain"] == domain]
|
||||
if contributor:
|
||||
filtered = [e for e in filtered if e["contributor"] == contributor]
|
||||
|
||||
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
|
||||
total = len(sorted_events)
|
||||
page = sorted_events[offset:offset + limit]
|
||||
|
||||
return web.json_response({
|
||||
"events": page,
|
||||
"total": total,
|
||||
"sort": sort_mode,
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
}, headers={"Access-Control-Allow-Origin": "*"})
|
||||
|
||||
|
||||
def register(app):
|
||||
app.router.add_get("/api/activity-feed", handle_activity_feed)
|
||||
Loading…
Reference in a new issue