diff --git a/diagnostics/activity_feed_api.py b/diagnostics/activity_feed_api.py index a33a17c..275c859 100644 --- a/diagnostics/activity_feed_api.py +++ b/diagnostics/activity_feed_api.py @@ -9,6 +9,16 @@ DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db" _cache = {"data": None, "ts": 0} CACHE_TTL = 60 # 1 minute — activity should feel fresh +# commit_types we surface in the activity feed. `pipeline` is system +# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden. +_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave") + +# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're +# inbox archive filenames, not claim slugs. Used as a fallback signal when +# branch/description heuristics miss (e.g. populated descriptions that +# happen to be source titles, not claim insights). +_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$") + def _get_conn(): conn = sqlite3.connect(DB_PATH) @@ -17,19 +27,52 @@ def _get_conn(): return conn -def _classify_event(branch, description, commit_type): - if commit_type != "knowledge": +def _is_source_slug(slug): + return bool(slug and _SOURCE_SLUG_PATTERN.match(slug)) + + +def _classify_event(branch, description, commit_type, candidate_slug=None): + """Return one of: create | enrich | challenge | source | None. + + Source-archive PRs are extract/* branches that filed a source into + inbox/archive/ but didn't produce a claim. Two signals classify them + as 'source' (defense in depth): + 1. extract/* branch with empty description (no claim title produced) + 2. candidate_slug matches YYYY-MM-DD-...-HASH4 (inbox filename pattern) + """ + commit_type_l = (commit_type or "").lower() + branch = branch or "" + description_lower = (description or "").lower() + has_desc = bool(description and description.strip()) + + if commit_type_l not in _FEED_COMMIT_TYPES: return None - if branch and branch.startswith("extract/"): - return "create" - if branch and branch.startswith("reweave/"): - return "enrich" - if branch and branch.startswith("challenge/"): + + # Explicit challenge signals win first. + if (commit_type_l == "challenge" + or branch.startswith("challenge/") + or "challenged_by" in description_lower): return "challenge" - if description and "challenged_by" in description.lower(): - return "challenge" - if branch and branch.startswith("enrich/"): + + # Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich. + if (commit_type_l == "enrich" + or branch.startswith("enrich/") + or branch.startswith("reweave/")): return "enrich" + + # Source-only: extract/* with no claim description means inbox archive + # landed but no domain claim was written. + if branch.startswith("extract/") and not has_desc: + return "source" + + # Belt-and-suspenders: if the slug we'd surface to the frontend looks + # like an inbox archive filename (date-prefix-hash), treat as source + # regardless of branch/commit_type/description state. Catches cases + # where description leaked but is just a source title, not a claim. + if _is_source_slug(candidate_slug): + return "source" + + # Everything else with a description is a new claim. return "create" @@ -81,33 +124,60 @@ def _hot_score(challenge_count, enrich_count, signal_count, hours_since): def _build_events(): conn = _get_conn() try: - rows = conn.execute(""" + placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES)) + rows = conn.execute(f""" SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by, p.merged_at, p.description, p.commit_type, p.cost_usd, - p.source_channel + p.source_channel, p.source_path FROM prs p WHERE p.status = 'merged' - AND p.commit_type = 'knowledge' + AND p.commit_type IN ({placeholders}) AND p.merged_at IS NOT NULL ORDER BY p.merged_at DESC LIMIT 2000 - """).fetchall() + """, _FEED_COMMIT_TYPES).fetchall() events = [] claim_activity = {} # slug -> {challenges, enriches, signals, first_seen} for row in rows: - event_type = _classify_event(row["branch"], row["description"], row["commit_type"]) + slugs = _extract_claim_slugs(row["description"], row["branch"]) + candidate_slug = slugs[0] if slugs else "" + event_type = _classify_event( + row["branch"], row["description"], row["commit_type"], + candidate_slug=candidate_slug, + ) if not event_type: continue contributor = _normalize_contributor(row["submitted_by"], row["agent"]) - slugs = _extract_claim_slugs(row["description"], row["branch"]) merged_at = row["merged_at"] or "" - ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40} + ci_map = {"create": 0.35, "enrich": 0.25, "challenge": 0.40, "source": 0.15} ci_earned = ci_map.get(event_type, 0) + # Source events never carry a claim_slug — no claim was written — + # so the frontend can't produce a 404-ing claim link. + if event_type == "source": + summary_text = _summary_from_branch(row["branch"]) + source_slug = ( + _summary_from_branch(row["branch"]).lower().replace(" ", "-") + or row["branch"] + ) + events.append({ + "type": "source", + "claim_slug": "", + "source_slug": source_slug, + "domain": row["domain"] or "unknown", + "contributor": contributor, + "timestamp": merged_at, + "ci_earned": round(ci_earned, 2), + "summary": summary_text, + "pr_number": row["number"], + "source_channel": row["source_channel"] or "unknown", + }) + continue + for slug in slugs: if slug not in claim_activity: claim_activity[slug] = { @@ -164,8 +234,8 @@ def _sort_events(events, claim_activity, sort_mode, now_ts): return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours) events.sort(key=hot_key, reverse=True) elif sort_mode == "important": - type_rank = {"challenge": 0, "enrich": 1, "create": 2} - events.sort(key=lambda e: (type_rank.get(e["type"], 3), -len(e["summary"]))) + type_rank = {"challenge": 0, "enrich": 1, "create": 2, "source": 3} + events.sort(key=lambda e: (type_rank.get(e["type"], 4), -len(e["summary"]))) return events @@ -175,6 +245,8 @@ async def handle_activity_feed(request): sort_mode = "recent" domain = request.query.get("domain", "") contributor = request.query.get("contributor", "") + type_param = request.query.get("type", "") + type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None try: limit = min(int(request.query.get("limit", "20")), 100) except ValueError: @@ -196,6 +268,8 @@ async def handle_activity_feed(request): filtered = [e for e in filtered if e["domain"] == domain] if contributor: filtered = [e for e in filtered if e["contributor"] == contributor] + if type_filter: + filtered = [e for e in filtered if e["type"] in type_filter] sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now) total = len(sorted_events)