teleo-infrastructure/diagnostics/activity_feed_api.py
Teleo Agents e78308862a
Some checks are pending
CI / lint-and-test (pull_request) Waiting to run
fix(activity-feed): emit Forgejo pr_url fallback so every event has a clickthrough
Previously _github_pr_url() only returned a URL when prs.github_pr was
populated. That field is set on only 3 of 4094 merged PRs (the rare cases
mirrored to the public GitHub repo), so pr_url was null for ~100% of the
feed. The frontend whole-row PR overlay (livingip-web PR #30) renders
only when pr_url is non-null, so until now no rows had the overlay.

Pipeline-attributed events (reweave/*, ingestion/*) are the most visible
victim: their /contributors/pipeline link lands on a sparse stub, with
no way to reach the actual commit/PR they refer to.

Fix: rename _github_pr_url -> _pr_url and fall back to the canonical
Forgejo URL (git.livingip.xyz/teleo/teleo-codex/pulls/{number}) when no
GitHub mirror exists. Verified 200 OK against a sample (#10568). GitHub
URL still wins when available.

Result: 1972/1972 events in _build_events now carry a pr_url. Whole-row
overlay starts working for everything including pipeline events.
2026-05-13 04:29:54 +00:00

423 lines
16 KiB
Python

"""Activity feed API — serves contribution events from pipeline.db."""
import re
import sqlite3
import math
import time
from aiohttp import web
DB_PATH = "/opt/teleo-eval/pipeline/pipeline.db"
_cache = {"data": None, "ts": 0}
CACHE_TTL = 60 # 1 minute — activity should feel fresh
# commit_types we surface in the activity feed. `pipeline` is system
# maintenance (reweave/fix auto-runs, zombie cleanup) and stays hidden.
_FEED_COMMIT_TYPES = ("knowledge", "enrich", "challenge", "research", "entity", "extract", "reweave")
# Source-archive slugs follow YYYY-MM-DD-publisher-topic-HASH4 — they're
# inbox archive filenames, not claim slugs. Used as a fallback signal when
# branch/description heuristics miss (e.g. populated descriptions that
# happen to be source titles, not claim insights).
_SOURCE_SLUG_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}-.+-[a-f0-9]{4}$")
def _get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA busy_timeout = 10000")
return conn
def _is_source_slug(slug):
return bool(slug and _SOURCE_SLUG_PATTERN.match(slug))
def _classify_event(branch, description, commit_type, candidate_slug=None):
"""Return one of: create | enrich | challenge | source | session_digest | None.
Source-archive PRs are extract/* branches that filed a source into
inbox/archive/ but didn't produce a claim. Session-digest PRs are
agent research/entity commits with no per-claim description — they
represent session-level rollups, not specific knowledge artifacts.
"""
commit_type_l = (commit_type or "").lower()
branch = branch or ""
description_lower = (description or "").lower()
has_desc = bool(description and description.strip())
if commit_type_l not in _FEED_COMMIT_TYPES:
return None
# Explicit challenge signals win first.
if (commit_type_l == "challenge"
or branch.startswith("challenge/")
or "challenged_by" in description_lower):
return "challenge"
# Enrichment: reweave edge-connects, enrich/ branches, or commit_type=enrich.
if (commit_type_l == "enrich"
or branch.startswith("enrich/")
or branch.startswith("reweave/")):
return "enrich"
# Research and entity commits with no description are session-level
# rollups (e.g. astra/research-2026-05-11). They have no claim to
# link to — surface as session_digest, not as a phantom create.
if commit_type_l in ("research", "entity") and not has_desc:
return "session_digest"
# Source-only: extract/* with no claim description means inbox archive
# landed but no domain claim was written.
if branch.startswith("extract/") and not has_desc:
return "source"
# Belt-and-suspenders: if the slug we'd surface to the frontend looks
# like an inbox archive filename (date-prefix-hash), treat as source
# regardless of branch/commit_type/description state. Catches cases
# where description leaked but is just a source title, not a claim.
if _is_source_slug(candidate_slug):
return "source"
# Everything else with a description is a new claim.
return "create"
# Internal classifier value -> canonical `kind` enum returned to frontend.
_KIND_MAP = {
"create": "claim_merged",
"enrich": "claim_enriched",
"challenge": "claim_challenged",
"source": "source_archived",
"session_digest": "session_digest",
}
def _archive_slug_from_branch(branch):
"""For extract/YYYY-MM-DD-...-HASH4, return YYYY-MM-DD-... (keep date,
drop the 4-hex hash suffix). Matches inbox/archive filename convention.
"""
if not branch or "/" not in branch:
return ""
slug = branch.split("/", 1)[1]
return re.sub(r"-[a-f0-9]{4}$", "", slug)
def _source_target_url(domain, archive_slug):
"""Forgejo blob URL for an archived source file. Falls back to the
repo-wide inbox/archive directory when domain is unknown so the link
still resolves to something useful instead of a 404.
"""
if not archive_slug:
return None
domain = (domain or "").strip()
if not domain or domain == "unknown":
return "https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive"
return (
"https://git.livingip.xyz/teleo/teleo-codex/src/branch/main/inbox/archive/"
f"{domain}/{archive_slug}.md"
)
def _claim_target_url(claim_slug):
if not claim_slug:
return None
return f"/claims/{claim_slug}"
# Canonical clickthrough URL for an activity-feed event.
#
# Every merged PR in the pipeline.db `prs` table lives on Forgejo at
# git.livingip.xyz/teleo/teleo-codex/pulls/{number}. A small subset (3 of
# 4094 as of 2026-05-13) was additionally mirrored to GitHub and has
# prs.github_pr populated. Prefer GitHub when available (more public-facing
# surface), fall back to Forgejo so every row has a real destination
# instead of None (which makes the frontend whole-row overlay no-op and
# leaves pipeline-attributed events looking dead-on-click).
def _pr_url(pr_number, github_pr):
if github_pr:
return f"https://github.com/living-ip/teleo-codex/pull/{github_pr}"
if pr_number:
return f"https://git.livingip.xyz/teleo/teleo-codex/pulls/{pr_number}"
return None
# Canonicalize contributor labels so frontend links resolve to real
# /contributors/{handle} pages. Pipeline writers (extract.py, manual edits,
# the old backfill_submitted_by.py) historically wrote mixed-case agent
# names with a trailing decorator into prs.submitted_by — e.g.
# "Vida (self-directed)", "pipeline (reweave)", or "@m3taversal".
# These decorated strings do not exist as contributors and 404 the profile
# page. Strip the trailing parenthetical wholesale: valid handles match
# ^[a-z0-9][a-z0-9_-]{0,38}$ (see pipeline/lib/attribution._HANDLE_RE) and
# cannot contain parens, so this is lossless.
_TRAILING_PAREN_RE = re.compile(r"\s*\([^)]*\)\s*$")
def _canonicalize(raw):
if not raw:
return ""
h = raw.strip().lower().lstrip("@")
h = _TRAILING_PAREN_RE.sub("", h).strip()
return h
def _normalize_contributor(submitted_by, agent):
name = _canonicalize(submitted_by)
if name:
return name
name = _canonicalize(agent)
if name and name != "pipeline":
return name
return "pipeline"
def _summary_from_branch(branch):
if not branch:
return ""
parts = branch.split("/", 1)
if len(parts) < 2:
return ""
slug = parts[1]
slug = re.sub(r"^[\d-]+-", "", slug) # strip date prefix
slug = re.sub(r"-[a-f0-9]{4}$", "", slug) # strip hash suffix
return slug.replace("-", " ").strip().capitalize()
def _extract_claim_slugs(description, branch=None):
if not description:
if branch:
parts = branch.split("/", 1)
if len(parts) > 1:
return [parts[1]]
return []
titles = [t.strip() for t in description.split("|") if t.strip()]
slugs = []
for title in titles:
slug = title.lower().strip()
slug = "".join(c if c.isalnum() or c in (" ", "-") else "" for c in slug)
slug = slug.replace(" ", "-").strip("-")
if len(slug) > 10:
slugs.append(slug)
return slugs
def _hot_score(challenge_count, enrich_count, signal_count, hours_since):
numerator = challenge_count * 3 + enrich_count * 2 + signal_count
denominator = max(hours_since, 0.5) ** 1.5
return numerator / denominator
def _build_events():
conn = _get_conn()
try:
placeholders = ",".join("?" * len(_FEED_COMMIT_TYPES))
rows = conn.execute(f"""
SELECT p.number, p.branch, p.domain, p.agent, p.submitted_by,
p.merged_at, p.description, p.commit_type, p.cost_usd,
p.source_channel, p.source_path, p.github_pr
FROM prs p
WHERE p.status = 'merged'
AND p.commit_type IN ({placeholders})
AND p.merged_at IS NOT NULL
ORDER BY p.merged_at DESC
LIMIT 2000
""", _FEED_COMMIT_TYPES).fetchall()
events = []
claim_activity = {} # slug -> {challenges, enriches, signals, first_seen}
for row in rows:
slugs = _extract_claim_slugs(row["description"], row["branch"])
candidate_slug = slugs[0] if slugs else ""
event_type = _classify_event(
row["branch"], row["description"], row["commit_type"],
candidate_slug=candidate_slug,
)
if not event_type:
continue
contributor = _normalize_contributor(row["submitted_by"], row["agent"])
# Hide pipeline-attributed events (reweave/*, ingestion/*) from the
# public activity feed. They're automation maintenance, not
# contributions — the daemon re-knits the graph nightly and ingests
# external sources. Internal diagnostics + CI math still see these
# rows in prs / contribution_events; only the public timeline drops
# them. Mirrors the existing _FEED_COMMIT_TYPES filter (which hides
# commit_type='pipeline') along the contributor axis.
if contributor == "pipeline":
continue
merged_at = row["merged_at"] or ""
domain = row["domain"] or "unknown"
kind = _KIND_MAP.get(event_type, event_type)
ci_map = {
"create": 0.35, "enrich": 0.25, "challenge": 0.40,
"source": 0.15, "session_digest": 0.05,
}
ci_earned = ci_map.get(event_type, 0)
# Source events never carry a claim_slug — no claim was written.
# target_url points at the archived file on Forgejo instead.
if event_type == "source":
archive_slug = _archive_slug_from_branch(row["branch"])
summary_text = _summary_from_branch(row["branch"])
source_display_slug = (
summary_text.lower().replace(" ", "-") or row["branch"]
)
events.append({
"kind": kind,
"type": "source",
"target_url": _source_target_url(domain, archive_slug),
"claim_slug": "",
"source_slug": source_display_slug,
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _pr_url(row["number"], row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
continue
# Session digests have no clickthrough surface yet (per-agent
# session pages not built). target_url=null so frontend renders
# plain text instead of a broken /claims/research-... link.
if event_type == "session_digest":
summary_text = _summary_from_branch(row["branch"]) or "Research session"
events.append({
"kind": kind,
"type": "session_digest",
"target_url": None,
"claim_slug": "",
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _pr_url(row["number"], row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
continue
for slug in slugs:
if slug not in claim_activity:
claim_activity[slug] = {
"challenges": 0, "enriches": 0, "signals": 0,
"first_seen": merged_at,
}
if event_type == "challenge":
claim_activity[slug]["challenges"] += 1
elif event_type == "enrich":
claim_activity[slug]["enriches"] += 1
else:
claim_activity[slug]["signals"] += 1
summary_text = ""
if row["description"]:
first_title = row["description"].split("|")[0].strip()
if len(first_title) > 120:
first_title = first_title[:117] + "..."
summary_text = first_title
elif row["branch"]:
summary_text = _summary_from_branch(row["branch"])
for slug in (slugs[:1] if slugs else [""]):
events.append({
"kind": kind,
"type": event_type,
"target_url": _claim_target_url(slug),
"claim_slug": slug,
"domain": domain,
"contributor": contributor,
"timestamp": merged_at,
"ci_earned": round(ci_earned, 2),
"summary": summary_text,
"pr_number": row["number"],
"pr_url": _pr_url(row["number"], row["github_pr"]),
"source_channel": row["source_channel"] or "unknown",
})
return events, claim_activity
finally:
conn.close()
def _sort_events(events, claim_activity, sort_mode, now_ts):
if sort_mode == "recent":
events.sort(key=lambda e: e["timestamp"], reverse=True)
elif sort_mode == "hot":
def hot_key(e):
slug = e["claim_slug"]
ca = claim_activity.get(slug, {"challenges": 0, "enriches": 0, "signals": 0})
try:
from datetime import datetime
evt_time = datetime.fromisoformat(e["timestamp"].replace("Z", "+00:00"))
hours = (now_ts - evt_time.timestamp()) / 3600
except (ValueError, AttributeError):
hours = 9999
return _hot_score(ca["challenges"], ca["enriches"], ca["signals"], hours)
events.sort(key=hot_key, reverse=True)
elif sort_mode == "important":
type_rank = {
"challenge": 0, "enrich": 1, "create": 2,
"source": 3, "session_digest": 4,
}
events.sort(key=lambda e: (type_rank.get(e["type"], 5), -len(e["summary"])))
return events
async def handle_activity_feed(request):
sort_mode = request.query.get("sort", "recent")
if sort_mode not in ("hot", "recent", "important"):
sort_mode = "recent"
domain = request.query.get("domain", "")
contributor = request.query.get("contributor", "")
type_param = request.query.get("type", "")
type_filter = {t.strip() for t in type_param.split(",") if t.strip()} if type_param else None
try:
limit = min(int(request.query.get("limit", "20")), 100)
except ValueError:
limit = 20
try:
offset = max(int(request.query.get("offset", "0")), 0)
except ValueError:
offset = 0
now = time.time()
if _cache["data"] is None or (now - _cache["ts"]) > CACHE_TTL:
_cache["data"] = _build_events()
_cache["ts"] = now
events, claim_activity = _cache["data"]
filtered = events
if domain:
filtered = [e for e in filtered if e["domain"] == domain]
if contributor:
filtered = [e for e in filtered if e["contributor"] == contributor]
if type_filter:
# Accept both legacy `type` values (create/enrich/challenge/source/
# session_digest) and canonical `kind` values (claim_merged/etc.) so
# callers can migrate at their own pace.
filtered = [
e for e in filtered
if e["type"] in type_filter or e.get("kind") in type_filter
]
sorted_events = _sort_events(list(filtered), claim_activity, sort_mode, now)
total = len(sorted_events)
page = sorted_events[offset:offset + limit]
return web.json_response({
"events": page,
"total": total,
"sort": sort_mode,
"offset": offset,
"limit": limit,
}, headers={"Access-Control-Allow-Origin": "*"})
def register(app):
app.router.add_get("/api/activity-feed", handle_activity_feed)