Three hackathon-critical fixes for Timeline page rendering (Accelerate Solana, May 5):
Gap 1 — /api/activity respects ?type= now:
- accepts single or comma-separated operation types
(extract|new|enrich|challenge|infra)
- over-fetches 5× limit (capped 2000) so post-build filtering still
fills the requested page size
- unknown types filter out cleanly
Gap 2 — classify_pr_operation() replaces STATUS_TO_OPERATION for merged PRs:
- commit_type wins over branch prefix for merged PRs so extract/* branches
with commit_type='enrich' or 'challenge' surface correctly (same gotcha
as the contributor-role wiring fix)
- priority: challenge → enrich (incl. reweave/) → maintenance (infra) → new
- challenged_by detection carried over from activity_feed_api._classify_event
- non-merged statuses unchanged (extract/new/infra/challenge as before)
- SQL now selects commit_type + description alongside existing columns
- 14 unit tests covering the gotcha matrix
Gap 3 — _CHANNEL_MAP reshape:
- extract/, ingestion/ default → 'unknown' (was 'telegram'; telegram-origin
classification now requires explicit tagging at ingestion time)
- agent/maintenance mappings unchanged
- github_pr override and gh-pr-* branches continue to return 'github'
- 'web' registered as the canonical in-app submission channel (matches
the platform-named pattern established by telegram/github/agent)
- module docstring enumerates all six valid channels
Deployed to VPS; diagnostics + pipeline restarted clean.
Smoke: type=enrich returns 22 events (was 0), type=challenge returns 0
(matches DB — zero challenge commit_types).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
329 lines
12 KiB
Python
329 lines
12 KiB
Python
"""
|
|
/api/activity endpoint for diagnostics service.
|
|
|
|
Serves per-operation events for the dashboard v2 timeline hero panel.
|
|
Derives events from the prs table (per-PR granularity) and audit_log
|
|
(pipeline-level ops). Cursor-based pagination via timestamp.
|
|
|
|
Integration: add route and handler to app.py:
|
|
app.router.add_get('/api/activity', handle_activity)
|
|
|
|
Contract (endpoint #7):
|
|
GET /api/activity?limit=100&cursor=<ISO-timestamp>
|
|
Response: {
|
|
events: [{timestamp, agent, operation, target, domain, description, status, pr_number}],
|
|
limit: int,
|
|
cursor: string|null,
|
|
has_more: bool
|
|
}
|
|
|
|
Data sources:
|
|
- prs table: number, status, domain, agent, created_at, merged_at, branch, source_path
|
|
- audit_log table: timestamp, stage, event, detail
|
|
- contributors table: handle, display_name (for agent name resolution)
|
|
"""
|
|
|
|
from aiohttp import web
|
|
import sqlite3
|
|
import json
|
|
|
|
|
|
# Non-merged statuses map directly to operation — no semantic classification yet.
|
|
NON_MERGED_STATUS_TO_OPERATION = {
|
|
'approved': 'new', # about to become knowledge
|
|
'open': 'extract', # cyan — new extraction in progress
|
|
'validating': 'extract', # cyan — being validated
|
|
'reviewing': 'extract', # cyan — under review
|
|
'merging': 'new', # green — merge in progress
|
|
'closed': 'infra', # grey — closed/rejected
|
|
'zombie': 'infra', # grey — stale
|
|
'conflict': 'challenge', # red-orange — conflict detected
|
|
}
|
|
|
|
# Maintenance commit_types that land on main but don't represent new knowledge.
|
|
_MAINTENANCE_COMMIT_TYPES = {'fix', 'pipeline', 'reweave'}
|
|
|
|
|
|
def classify_pr_operation(status, commit_type, branch, description=None):
|
|
"""Derive a Timeline operation from a PR row.
|
|
|
|
Priority order for MERGED PRs (commit_type wins over branch prefix —
|
|
extract/* branches with commit_type='enrich' or 'challenge' classify
|
|
by commit_type, matching the contributor-role wiring fix):
|
|
1. commit_type == 'challenge' OR branch.startswith('challenge/') OR
|
|
description contains 'challenged_by' → 'challenge'
|
|
2. commit_type == 'enrich' OR branch.startswith('enrich/' | 'reweave/')
|
|
→ 'enrich'
|
|
3. commit_type in _MAINTENANCE_COMMIT_TYPES → 'infra'
|
|
4. default (commit_type='knowledge'|'extract'|'research'|'entity' or
|
|
anything else) → 'new'
|
|
|
|
For non-merged PRs, falls back to NON_MERGED_STATUS_TO_OPERATION.
|
|
"""
|
|
commit_type = (commit_type or '').lower()
|
|
branch = branch or ''
|
|
description_lower = (description or '').lower()
|
|
|
|
if status != 'merged':
|
|
return NON_MERGED_STATUS_TO_OPERATION.get(status, 'infra')
|
|
|
|
# Challenge takes precedence — the signal is inherently more specific.
|
|
if (commit_type == 'challenge'
|
|
or branch.startswith('challenge/')
|
|
or 'challenged_by' in description_lower):
|
|
return 'challenge'
|
|
|
|
if (commit_type == 'enrich'
|
|
or branch.startswith('enrich/')
|
|
or branch.startswith('reweave/')):
|
|
return 'enrich'
|
|
|
|
if commit_type in _MAINTENANCE_COMMIT_TYPES:
|
|
return 'infra'
|
|
|
|
# Default: legacy 'knowledge', new 'extract', 'research', 'entity',
|
|
# unknown/null commit_type → treat as new knowledge.
|
|
return 'new'
|
|
|
|
# Map audit_log stage to operation type
|
|
STAGE_TO_OPERATION = {
|
|
'ingest': 'extract',
|
|
'extract': 'extract',
|
|
'validate': 'infra',
|
|
'evaluate': 'infra',
|
|
'merge': 'new',
|
|
'reject': 'infra',
|
|
'breaker': 'challenge',
|
|
}
|
|
|
|
|
|
def pr_description(row):
|
|
"""Generate human-readable description from a PR row."""
|
|
status = row['status']
|
|
domain = row['domain'] or 'unknown'
|
|
branch = row['branch'] or ''
|
|
|
|
# Extract a meaningful target from the branch name
|
|
# Branch format is typically: agent-name/claims-description
|
|
target = branch.split('/')[-1] if '/' in branch else branch
|
|
|
|
# Infer agent from branch prefix if not in the row
|
|
branch_agent = branch.split('/')[0] if '/' in branch else None
|
|
|
|
# Build a richer description with domain context
|
|
domain_tag = f" [{domain}]" if domain and domain != 'unknown' and domain != 'general' else ''
|
|
|
|
templates = {
|
|
'merged': f"Merged{domain_tag}: {target}",
|
|
'approved': f"Approved{domain_tag}: {target}",
|
|
'open': f"Opened{domain_tag}: {target}",
|
|
'validating': f"Validating{domain_tag}: {target}",
|
|
'reviewing': f"Reviewing{domain_tag}: {target}",
|
|
'merging': f"Merging{domain_tag}: {target}",
|
|
'closed': f"Closed{domain_tag}: {target}",
|
|
'zombie': f"Stale{domain_tag}: {target}",
|
|
'conflict': f"Conflict{domain_tag}: {target}",
|
|
}
|
|
|
|
return templates.get(status, f"PR #{row['number']}{domain_tag}: {target}")
|
|
|
|
|
|
def audit_description(row):
|
|
"""Generate human-readable description from an audit_log row."""
|
|
stage = row['stage'] or ''
|
|
event = row['event'] or ''
|
|
detail = row['detail'] or ''
|
|
|
|
# Try to parse detail as JSON
|
|
if detail:
|
|
try:
|
|
detail_obj = json.loads(detail)
|
|
if isinstance(detail_obj, dict):
|
|
msg = detail_obj.get('message') or detail_obj.get('reason', '')
|
|
if msg:
|
|
return f"[{stage}] {msg}"[:150]
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
if event:
|
|
desc = f"[{stage}] {event}"
|
|
if detail and len(detail) < 80:
|
|
desc += f" — {detail}"
|
|
return desc[:150]
|
|
|
|
return f"[{stage}] pipeline event"
|
|
|
|
|
|
async def handle_activity(request):
|
|
"""Handler for GET /api/activity.
|
|
|
|
Query params:
|
|
limit (int, default 100, max 500): number of events to return
|
|
cursor (ISO timestamp): return events older than this timestamp
|
|
type (str, optional): comma-separated operation types to include
|
|
(extract|new|enrich|challenge|infra). If absent, returns all types.
|
|
|
|
Derives events from two sources:
|
|
1. prs table — per-PR events with domain, agent, status
|
|
2. audit_log — pipeline-level operational events
|
|
|
|
Events are merged and sorted by timestamp descending (most recent first).
|
|
"""
|
|
try:
|
|
limit = min(int(request.query.get('limit', 100)), 500)
|
|
except (ValueError, TypeError):
|
|
limit = 100
|
|
|
|
cursor = request.query.get('cursor')
|
|
type_param = request.query.get('type', '').strip()
|
|
allowed_ops = None
|
|
if type_param:
|
|
allowed_ops = {t.strip() for t in type_param.split(',') if t.strip()}
|
|
if not allowed_ops:
|
|
allowed_ops = None
|
|
|
|
db_path = request.app['db_path']
|
|
|
|
try:
|
|
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
events = []
|
|
|
|
# Source 1: PR events (primary — these have the granularity we need)
|
|
# Each PR generates events at created_at and merged_at timestamps
|
|
pr_query = """
|
|
SELECT number, status, domain, agent, branch, source_path,
|
|
created_at, merged_at, source_channel, commit_type,
|
|
description
|
|
FROM prs
|
|
WHERE {where_clause}
|
|
ORDER BY COALESCE(merged_at, created_at) DESC
|
|
LIMIT ?
|
|
"""
|
|
|
|
# Over-fetch when filtering by type so we have enough matching rows after
|
|
# post-build filtering. Cap at 2000 to avoid runaway queries.
|
|
fetch_limit = min(2000, limit * 5) if allowed_ops else limit + 1
|
|
|
|
if cursor:
|
|
rows = conn.execute(
|
|
pr_query.format(where_clause="COALESCE(merged_at, created_at) < ?"),
|
|
(cursor, fetch_limit)
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
pr_query.format(where_clause="1=1"),
|
|
(fetch_limit,)
|
|
).fetchall()
|
|
|
|
# Known knowledge agents for branch-prefix inference
|
|
knowledge_agents = {'rio', 'clay', 'theseus', 'vida', 'astra', 'leo'}
|
|
|
|
for row in rows:
|
|
row_dict = dict(row)
|
|
operation = classify_pr_operation(
|
|
row_dict['status'],
|
|
row_dict.get('commit_type'),
|
|
row_dict.get('branch'),
|
|
row_dict.get('description'),
|
|
)
|
|
if allowed_ops and operation not in allowed_ops:
|
|
continue
|
|
description = pr_description(row_dict)
|
|
|
|
# Use merged_at if available (more interesting event), else created_at
|
|
timestamp = row_dict['merged_at'] or row_dict['created_at']
|
|
|
|
# Infer agent from branch prefix if DB column is null
|
|
# Branch format: agent-name/claims-description
|
|
agent = row_dict['agent']
|
|
if not agent and row_dict.get('branch'):
|
|
prefix = row_dict['branch'].split('/')[0].lower()
|
|
if prefix in knowledge_agents:
|
|
agent = prefix
|
|
|
|
events.append({
|
|
'timestamp': timestamp,
|
|
'agent': agent,
|
|
'operation': operation,
|
|
'target': (row_dict['branch'] or '').split('/')[-1] if row_dict['branch'] else None,
|
|
'domain': row_dict['domain'],
|
|
'description': description,
|
|
'status': row_dict['status'],
|
|
'pr_number': row_dict['number'],
|
|
'source_channel': row_dict.get('source_channel') or 'unknown',
|
|
})
|
|
|
|
# Source 2: Audit log events (secondary — pipeline-level)
|
|
# Only include if we haven't hit our limit from PRs alone
|
|
if len(events) < limit:
|
|
remaining = limit - len(events) + 1
|
|
audit_query = """
|
|
SELECT timestamp, stage, event, detail
|
|
FROM audit_log
|
|
WHERE {where_clause}
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
"""
|
|
|
|
if cursor:
|
|
audit_rows = conn.execute(
|
|
audit_query.format(where_clause="timestamp < ?"),
|
|
(cursor, remaining)
|
|
).fetchall()
|
|
else:
|
|
audit_rows = conn.execute(
|
|
audit_query.format(where_clause="1=1"),
|
|
(remaining,)
|
|
).fetchall()
|
|
|
|
for row in audit_rows:
|
|
row_dict = dict(row)
|
|
operation = STAGE_TO_OPERATION.get(row_dict['stage'], 'infra')
|
|
if allowed_ops and operation not in allowed_ops:
|
|
continue
|
|
description = audit_description(row_dict)
|
|
|
|
events.append({
|
|
'timestamp': row_dict['timestamp'],
|
|
'agent': None, # audit_log has no agent column
|
|
'operation': operation,
|
|
'target': None,
|
|
'domain': None,
|
|
'description': description,
|
|
'status': None,
|
|
'pr_number': None,
|
|
'source_channel': None, # audit events not tied to a PR
|
|
})
|
|
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
return web.json_response({'error': f'Database error: {e}'}, status=500)
|
|
|
|
# Sort all events by timestamp descending
|
|
events.sort(key=lambda e: e['timestamp'] or '', reverse=True)
|
|
|
|
# Apply limit and check for more
|
|
has_more = len(events) > limit
|
|
events = events[:limit]
|
|
|
|
# Cursor is the timestamp of the last event returned
|
|
next_cursor = events[-1]['timestamp'] if events else None
|
|
|
|
return web.json_response({
|
|
'events': events,
|
|
'limit': limit,
|
|
'cursor': next_cursor,
|
|
'has_more': has_more,
|
|
})
|
|
|
|
|
|
# --- Integration snippet for app.py ---
|
|
# Add to your route setup:
|
|
#
|
|
# from activity_endpoint import handle_activity
|
|
# app.router.add_get('/api/activity', handle_activity)
|
|
#
|
|
# Requires: app['db_path'] set to the pipeline.db path
|
|
# e.g.: app['db_path'] = '/opt/teleo-eval/pipeline/pipeline.db'
|