Some checks failed
CI / lint-and-test (push) Has been cancelled
Sources merged: - teleo-codex/ops/pipeline-v2/ (11 newer lib files, 5 new lib modules) - teleo-codex/ops/ (agent-state, diagnostics expansion, systemd units, ops scripts) - VPS /opt/teleo-eval/telegram/ (10 new bot files, agent configs) - VPS /opt/teleo-eval/pipeline/ops/ (vector-gc, backfill-descriptions) - VPS /opt/teleo-eval/sync-mirror.sh (Bug 2 + Step 2.5 fixes) Non-trivial merges: - connect.py: kept codex threshold (0.65) + added infra domain parameter - watchdog.py: kept infra version (stale_pr integration, superset of codex) - deploy.sh: codex rsync version (interim, until VPS git clone migration) - diagnostics/app.py: codex decomposed dashboard (14 new route modules) 81 files changed, +17105/-200 lines Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
262 lines
9 KiB
Python
262 lines
9 KiB
Python
"""
|
|
/api/activity endpoint for diagnostics service.
|
|
|
|
Serves per-operation events for the dashboard v2 timeline hero panel.
|
|
Derives events from the prs table (per-PR granularity) and audit_log
|
|
(pipeline-level ops). Cursor-based pagination via timestamp.
|
|
|
|
Integration: add route and handler to app.py:
|
|
app.router.add_get('/api/activity', handle_activity)
|
|
|
|
Contract (endpoint #7):
|
|
GET /api/activity?limit=100&cursor=<ISO-timestamp>
|
|
Response: {
|
|
events: [{timestamp, agent, operation, target, domain, description, status, pr_number}],
|
|
limit: int,
|
|
cursor: string|null,
|
|
has_more: bool
|
|
}
|
|
|
|
Data sources:
|
|
- prs table: number, status, domain, agent, created_at, merged_at, branch, source_path
|
|
- audit_log table: timestamp, stage, event, detail
|
|
- contributors table: handle, display_name (for agent name resolution)
|
|
"""
|
|
|
|
from aiohttp import web
|
|
import sqlite3
|
|
import json
|
|
|
|
|
|
# Map PR status to Clay's operation color palette
|
|
# extract (cyan), new (green), enrich (amber), challenge (red-orange),
|
|
# decision (violet), infra (grey)
|
|
STATUS_TO_OPERATION = {
|
|
'merged': 'new', # green — new knowledge merged
|
|
'approved': 'enrich', # amber — approved, enriching KB
|
|
'open': 'extract', # cyan — new extraction in progress
|
|
'validating': 'extract', # cyan — being validated
|
|
'reviewing': 'extract', # cyan — under review
|
|
'merging': 'new', # green — merge in progress
|
|
'closed': 'infra', # grey — closed/rejected
|
|
'zombie': 'infra', # grey — stale
|
|
'conflict': 'challenge', # red-orange — conflict detected
|
|
}
|
|
|
|
# Map audit_log stage to operation type
|
|
STAGE_TO_OPERATION = {
|
|
'ingest': 'extract',
|
|
'extract': 'extract',
|
|
'validate': 'infra',
|
|
'evaluate': 'infra',
|
|
'merge': 'new',
|
|
'reject': 'infra',
|
|
'breaker': 'challenge',
|
|
}
|
|
|
|
|
|
def pr_description(row):
|
|
"""Generate human-readable description from a PR row."""
|
|
status = row['status']
|
|
domain = row['domain'] or 'unknown'
|
|
branch = row['branch'] or ''
|
|
|
|
# Extract a meaningful target from the branch name
|
|
# Branch format is typically: agent-name/claims-description
|
|
target = branch.split('/')[-1] if '/' in branch else branch
|
|
|
|
# Infer agent from branch prefix if not in the row
|
|
branch_agent = branch.split('/')[0] if '/' in branch else None
|
|
|
|
# Build a richer description with domain context
|
|
domain_tag = f" [{domain}]" if domain and domain != 'unknown' and domain != 'general' else ''
|
|
|
|
templates = {
|
|
'merged': f"Merged{domain_tag}: {target}",
|
|
'approved': f"Approved{domain_tag}: {target}",
|
|
'open': f"Opened{domain_tag}: {target}",
|
|
'validating': f"Validating{domain_tag}: {target}",
|
|
'reviewing': f"Reviewing{domain_tag}: {target}",
|
|
'merging': f"Merging{domain_tag}: {target}",
|
|
'closed': f"Closed{domain_tag}: {target}",
|
|
'zombie': f"Stale{domain_tag}: {target}",
|
|
'conflict': f"Conflict{domain_tag}: {target}",
|
|
}
|
|
|
|
return templates.get(status, f"PR #{row['number']}{domain_tag}: {target}")
|
|
|
|
|
|
def audit_description(row):
|
|
"""Generate human-readable description from an audit_log row."""
|
|
stage = row['stage'] or ''
|
|
event = row['event'] or ''
|
|
detail = row['detail'] or ''
|
|
|
|
# Try to parse detail as JSON
|
|
if detail:
|
|
try:
|
|
detail_obj = json.loads(detail)
|
|
if isinstance(detail_obj, dict):
|
|
msg = detail_obj.get('message') or detail_obj.get('reason', '')
|
|
if msg:
|
|
return f"[{stage}] {msg}"[:150]
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
if event:
|
|
desc = f"[{stage}] {event}"
|
|
if detail and len(detail) < 80:
|
|
desc += f" — {detail}"
|
|
return desc[:150]
|
|
|
|
return f"[{stage}] pipeline event"
|
|
|
|
|
|
async def handle_activity(request):
|
|
"""Handler for GET /api/activity.
|
|
|
|
Query params:
|
|
limit (int, default 100, max 500): number of events to return
|
|
cursor (ISO timestamp): return events older than this timestamp
|
|
|
|
Derives events from two sources:
|
|
1. prs table — per-PR events with domain, agent, status
|
|
2. audit_log — pipeline-level operational events
|
|
|
|
Events are merged and sorted by timestamp descending (most recent first).
|
|
"""
|
|
try:
|
|
limit = min(int(request.query.get('limit', 100)), 500)
|
|
except (ValueError, TypeError):
|
|
limit = 100
|
|
|
|
cursor = request.query.get('cursor')
|
|
db_path = request.app['db_path']
|
|
|
|
try:
|
|
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
events = []
|
|
|
|
# Source 1: PR events (primary — these have the granularity we need)
|
|
# Each PR generates events at created_at and merged_at timestamps
|
|
pr_query = """
|
|
SELECT number, status, domain, agent, branch, source_path,
|
|
created_at, merged_at
|
|
FROM prs
|
|
WHERE {where_clause}
|
|
ORDER BY COALESCE(merged_at, created_at) DESC
|
|
LIMIT ?
|
|
"""
|
|
|
|
if cursor:
|
|
rows = conn.execute(
|
|
pr_query.format(where_clause="COALESCE(merged_at, created_at) < ?"),
|
|
(cursor, limit + 1)
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
pr_query.format(where_clause="1=1"),
|
|
(limit + 1,)
|
|
).fetchall()
|
|
|
|
# Known knowledge agents for branch-prefix inference
|
|
knowledge_agents = {'rio', 'clay', 'theseus', 'vida', 'astra', 'leo'}
|
|
|
|
for row in rows:
|
|
row_dict = dict(row)
|
|
operation = STATUS_TO_OPERATION.get(row_dict['status'], 'infra')
|
|
description = pr_description(row_dict)
|
|
|
|
# Use merged_at if available (more interesting event), else created_at
|
|
timestamp = row_dict['merged_at'] or row_dict['created_at']
|
|
|
|
# Infer agent from branch prefix if DB column is null
|
|
# Branch format: agent-name/claims-description
|
|
agent = row_dict['agent']
|
|
if not agent and row_dict.get('branch'):
|
|
prefix = row_dict['branch'].split('/')[0].lower()
|
|
if prefix in knowledge_agents:
|
|
agent = prefix
|
|
|
|
events.append({
|
|
'timestamp': timestamp,
|
|
'agent': agent,
|
|
'operation': operation,
|
|
'target': (row_dict['branch'] or '').split('/')[-1] if row_dict['branch'] else None,
|
|
'domain': row_dict['domain'],
|
|
'description': description,
|
|
'status': row_dict['status'],
|
|
'pr_number': row_dict['number'],
|
|
})
|
|
|
|
# Source 2: Audit log events (secondary — pipeline-level)
|
|
# Only include if we haven't hit our limit from PRs alone
|
|
if len(events) < limit:
|
|
remaining = limit - len(events) + 1
|
|
audit_query = """
|
|
SELECT timestamp, stage, event, detail
|
|
FROM audit_log
|
|
WHERE {where_clause}
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
"""
|
|
|
|
if cursor:
|
|
audit_rows = conn.execute(
|
|
audit_query.format(where_clause="timestamp < ?"),
|
|
(cursor, remaining)
|
|
).fetchall()
|
|
else:
|
|
audit_rows = conn.execute(
|
|
audit_query.format(where_clause="1=1"),
|
|
(remaining,)
|
|
).fetchall()
|
|
|
|
for row in audit_rows:
|
|
row_dict = dict(row)
|
|
operation = STAGE_TO_OPERATION.get(row_dict['stage'], 'infra')
|
|
description = audit_description(row_dict)
|
|
|
|
events.append({
|
|
'timestamp': row_dict['timestamp'],
|
|
'agent': None, # audit_log has no agent column
|
|
'operation': operation,
|
|
'target': None,
|
|
'domain': None,
|
|
'description': description,
|
|
'status': None,
|
|
'pr_number': None,
|
|
})
|
|
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
return web.json_response({'error': f'Database error: {e}'}, status=500)
|
|
|
|
# Sort all events by timestamp descending
|
|
events.sort(key=lambda e: e['timestamp'] or '', reverse=True)
|
|
|
|
# Apply limit and check for more
|
|
has_more = len(events) > limit
|
|
events = events[:limit]
|
|
|
|
# Cursor is the timestamp of the last event returned
|
|
next_cursor = events[-1]['timestamp'] if events else None
|
|
|
|
return web.json_response({
|
|
'events': events,
|
|
'limit': limit,
|
|
'cursor': next_cursor,
|
|
'has_more': has_more,
|
|
})
|
|
|
|
|
|
# --- Integration snippet for app.py ---
|
|
# Add to your route setup:
|
|
#
|
|
# from activity_endpoint import handle_activity
|
|
# app.router.add_get('/api/activity', handle_activity)
|
|
#
|
|
# Requires: app['db_path'] set to the pipeline.db path
|
|
# e.g.: app['db_path'] = '/opt/teleo-eval/pipeline/pipeline.db'
|