teleo-infrastructure/diagnostics/activity_endpoint.py
m3taversal 05d15cea56 feat(activity): Timeline data gaps — type filter + commit_type classifier + source_channel reshape
Three hackathon-critical fixes for Timeline page rendering (Accelerate Solana, May 5):

Gap 1 — /api/activity respects ?type= now:
  - accepts single or comma-separated operation types
    (extract|new|enrich|challenge|infra)
  - over-fetches 5× limit (capped 2000) so post-build filtering still
    fills the requested page size
  - unknown types filter out cleanly

Gap 2 — classify_pr_operation() replaces STATUS_TO_OPERATION for merged PRs:
  - commit_type wins over branch prefix for merged PRs so extract/* branches
    with commit_type='enrich' or 'challenge' surface correctly (same gotcha
    as the contributor-role wiring fix)
  - priority: challenge → enrich (incl. reweave/) → maintenance (infra) → new
  - challenged_by detection carried over from activity_feed_api._classify_event
  - non-merged statuses unchanged (extract/new/infra/challenge as before)
  - SQL now selects commit_type + description alongside existing columns
  - 14 unit tests covering the gotcha matrix

Gap 3 — _CHANNEL_MAP reshape:
  - extract/, ingestion/ default → 'unknown' (was 'telegram'; telegram-origin
    classification now requires explicit tagging at ingestion time)
  - agent/maintenance mappings unchanged
  - github_pr override and gh-pr-* branches continue to return 'github'
  - 'web' registered as the canonical in-app submission channel (matches
    the platform-named pattern established by telegram/github/agent)
  - module docstring enumerates all six valid channels

Deployed to VPS; diagnostics + pipeline restarted clean.
Smoke: type=enrich returns 22 events (was 0), type=challenge returns 0
(matches DB — zero challenge commit_types).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 19:51:58 +01:00

329 lines
12 KiB
Python

"""
/api/activity endpoint for diagnostics service.
Serves per-operation events for the dashboard v2 timeline hero panel.
Derives events from the prs table (per-PR granularity) and audit_log
(pipeline-level ops). Cursor-based pagination via timestamp.
Integration: add route and handler to app.py:
app.router.add_get('/api/activity', handle_activity)
Contract (endpoint #7):
GET /api/activity?limit=100&cursor=<ISO-timestamp>
Response: {
events: [{timestamp, agent, operation, target, domain, description, status, pr_number}],
limit: int,
cursor: string|null,
has_more: bool
}
Data sources:
- prs table: number, status, domain, agent, created_at, merged_at, branch, source_path
- audit_log table: timestamp, stage, event, detail
- contributors table: handle, display_name (for agent name resolution)
"""
from aiohttp import web
import sqlite3
import json
# Non-merged statuses map directly to operation — no semantic classification yet.
NON_MERGED_STATUS_TO_OPERATION = {
'approved': 'new', # about to become knowledge
'open': 'extract', # cyan — new extraction in progress
'validating': 'extract', # cyan — being validated
'reviewing': 'extract', # cyan — under review
'merging': 'new', # green — merge in progress
'closed': 'infra', # grey — closed/rejected
'zombie': 'infra', # grey — stale
'conflict': 'challenge', # red-orange — conflict detected
}
# Maintenance commit_types that land on main but don't represent new knowledge.
_MAINTENANCE_COMMIT_TYPES = {'fix', 'pipeline', 'reweave'}
def classify_pr_operation(status, commit_type, branch, description=None):
"""Derive a Timeline operation from a PR row.
Priority order for MERGED PRs (commit_type wins over branch prefix —
extract/* branches with commit_type='enrich' or 'challenge' classify
by commit_type, matching the contributor-role wiring fix):
1. commit_type == 'challenge' OR branch.startswith('challenge/') OR
description contains 'challenged_by''challenge'
2. commit_type == 'enrich' OR branch.startswith('enrich/' | 'reweave/')
'enrich'
3. commit_type in _MAINTENANCE_COMMIT_TYPES → 'infra'
4. default (commit_type='knowledge'|'extract'|'research'|'entity' or
anything else) → 'new'
For non-merged PRs, falls back to NON_MERGED_STATUS_TO_OPERATION.
"""
commit_type = (commit_type or '').lower()
branch = branch or ''
description_lower = (description or '').lower()
if status != 'merged':
return NON_MERGED_STATUS_TO_OPERATION.get(status, 'infra')
# Challenge takes precedence — the signal is inherently more specific.
if (commit_type == 'challenge'
or branch.startswith('challenge/')
or 'challenged_by' in description_lower):
return 'challenge'
if (commit_type == 'enrich'
or branch.startswith('enrich/')
or branch.startswith('reweave/')):
return 'enrich'
if commit_type in _MAINTENANCE_COMMIT_TYPES:
return 'infra'
# Default: legacy 'knowledge', new 'extract', 'research', 'entity',
# unknown/null commit_type → treat as new knowledge.
return 'new'
# Map audit_log stage to operation type
STAGE_TO_OPERATION = {
'ingest': 'extract',
'extract': 'extract',
'validate': 'infra',
'evaluate': 'infra',
'merge': 'new',
'reject': 'infra',
'breaker': 'challenge',
}
def pr_description(row):
"""Generate human-readable description from a PR row."""
status = row['status']
domain = row['domain'] or 'unknown'
branch = row['branch'] or ''
# Extract a meaningful target from the branch name
# Branch format is typically: agent-name/claims-description
target = branch.split('/')[-1] if '/' in branch else branch
# Infer agent from branch prefix if not in the row
branch_agent = branch.split('/')[0] if '/' in branch else None
# Build a richer description with domain context
domain_tag = f" [{domain}]" if domain and domain != 'unknown' and domain != 'general' else ''
templates = {
'merged': f"Merged{domain_tag}: {target}",
'approved': f"Approved{domain_tag}: {target}",
'open': f"Opened{domain_tag}: {target}",
'validating': f"Validating{domain_tag}: {target}",
'reviewing': f"Reviewing{domain_tag}: {target}",
'merging': f"Merging{domain_tag}: {target}",
'closed': f"Closed{domain_tag}: {target}",
'zombie': f"Stale{domain_tag}: {target}",
'conflict': f"Conflict{domain_tag}: {target}",
}
return templates.get(status, f"PR #{row['number']}{domain_tag}: {target}")
def audit_description(row):
"""Generate human-readable description from an audit_log row."""
stage = row['stage'] or ''
event = row['event'] or ''
detail = row['detail'] or ''
# Try to parse detail as JSON
if detail:
try:
detail_obj = json.loads(detail)
if isinstance(detail_obj, dict):
msg = detail_obj.get('message') or detail_obj.get('reason', '')
if msg:
return f"[{stage}] {msg}"[:150]
except (json.JSONDecodeError, TypeError):
pass
if event:
desc = f"[{stage}] {event}"
if detail and len(detail) < 80:
desc += f"{detail}"
return desc[:150]
return f"[{stage}] pipeline event"
async def handle_activity(request):
"""Handler for GET /api/activity.
Query params:
limit (int, default 100, max 500): number of events to return
cursor (ISO timestamp): return events older than this timestamp
type (str, optional): comma-separated operation types to include
(extract|new|enrich|challenge|infra). If absent, returns all types.
Derives events from two sources:
1. prs table — per-PR events with domain, agent, status
2. audit_log — pipeline-level operational events
Events are merged and sorted by timestamp descending (most recent first).
"""
try:
limit = min(int(request.query.get('limit', 100)), 500)
except (ValueError, TypeError):
limit = 100
cursor = request.query.get('cursor')
type_param = request.query.get('type', '').strip()
allowed_ops = None
if type_param:
allowed_ops = {t.strip() for t in type_param.split(',') if t.strip()}
if not allowed_ops:
allowed_ops = None
db_path = request.app['db_path']
try:
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
conn.row_factory = sqlite3.Row
events = []
# Source 1: PR events (primary — these have the granularity we need)
# Each PR generates events at created_at and merged_at timestamps
pr_query = """
SELECT number, status, domain, agent, branch, source_path,
created_at, merged_at, source_channel, commit_type,
description
FROM prs
WHERE {where_clause}
ORDER BY COALESCE(merged_at, created_at) DESC
LIMIT ?
"""
# Over-fetch when filtering by type so we have enough matching rows after
# post-build filtering. Cap at 2000 to avoid runaway queries.
fetch_limit = min(2000, limit * 5) if allowed_ops else limit + 1
if cursor:
rows = conn.execute(
pr_query.format(where_clause="COALESCE(merged_at, created_at) < ?"),
(cursor, fetch_limit)
).fetchall()
else:
rows = conn.execute(
pr_query.format(where_clause="1=1"),
(fetch_limit,)
).fetchall()
# Known knowledge agents for branch-prefix inference
knowledge_agents = {'rio', 'clay', 'theseus', 'vida', 'astra', 'leo'}
for row in rows:
row_dict = dict(row)
operation = classify_pr_operation(
row_dict['status'],
row_dict.get('commit_type'),
row_dict.get('branch'),
row_dict.get('description'),
)
if allowed_ops and operation not in allowed_ops:
continue
description = pr_description(row_dict)
# Use merged_at if available (more interesting event), else created_at
timestamp = row_dict['merged_at'] or row_dict['created_at']
# Infer agent from branch prefix if DB column is null
# Branch format: agent-name/claims-description
agent = row_dict['agent']
if not agent and row_dict.get('branch'):
prefix = row_dict['branch'].split('/')[0].lower()
if prefix in knowledge_agents:
agent = prefix
events.append({
'timestamp': timestamp,
'agent': agent,
'operation': operation,
'target': (row_dict['branch'] or '').split('/')[-1] if row_dict['branch'] else None,
'domain': row_dict['domain'],
'description': description,
'status': row_dict['status'],
'pr_number': row_dict['number'],
'source_channel': row_dict.get('source_channel') or 'unknown',
})
# Source 2: Audit log events (secondary — pipeline-level)
# Only include if we haven't hit our limit from PRs alone
if len(events) < limit:
remaining = limit - len(events) + 1
audit_query = """
SELECT timestamp, stage, event, detail
FROM audit_log
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT ?
"""
if cursor:
audit_rows = conn.execute(
audit_query.format(where_clause="timestamp < ?"),
(cursor, remaining)
).fetchall()
else:
audit_rows = conn.execute(
audit_query.format(where_clause="1=1"),
(remaining,)
).fetchall()
for row in audit_rows:
row_dict = dict(row)
operation = STAGE_TO_OPERATION.get(row_dict['stage'], 'infra')
if allowed_ops and operation not in allowed_ops:
continue
description = audit_description(row_dict)
events.append({
'timestamp': row_dict['timestamp'],
'agent': None, # audit_log has no agent column
'operation': operation,
'target': None,
'domain': None,
'description': description,
'status': None,
'pr_number': None,
'source_channel': None, # audit events not tied to a PR
})
conn.close()
except sqlite3.Error as e:
return web.json_response({'error': f'Database error: {e}'}, status=500)
# Sort all events by timestamp descending
events.sort(key=lambda e: e['timestamp'] or '', reverse=True)
# Apply limit and check for more
has_more = len(events) > limit
events = events[:limit]
# Cursor is the timestamp of the last event returned
next_cursor = events[-1]['timestamp'] if events else None
return web.json_response({
'events': events,
'limit': limit,
'cursor': next_cursor,
'has_more': has_more,
})
# --- Integration snippet for app.py ---
# Add to your route setup:
#
# from activity_endpoint import handle_activity
# app.router.add_get('/api/activity', handle_activity)
#
# Requires: app['db_path'] set to the pipeline.db path
# e.g.: app['db_path'] = '/opt/teleo-eval/pipeline/pipeline.db'