diff --git a/diagnostics/app.py b/diagnostics/app.py index 96beb2e..f1e060b 100644 --- a/diagnostics/app.py +++ b/diagnostics/app.py @@ -38,8 +38,8 @@ CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secrets/argus-api-key")) # Endpoints that skip auth (dashboard is public for now, can lock later) -_PUBLIC_PATHS = frozenset({"/", "/api/metrics", "/api/snapshots", "/api/vital-signs", - "/api/contributors", "/api/domains"}) +_PUBLIC_PATHS = frozenset({"/", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs", + "/api/contributors", "/api/domains", "/api/audit"}) def _get_db() -> sqlite3.Connection: @@ -426,6 +426,40 @@ def _compute_vital_signs(conn) -> dict: "conversion_rate": round(merged_prs / total_prs, 3) if total_prs else 0, } + # Queue staleness — sources unprocessed for >7 days + stale_buckets = conn.execute(""" + SELECT + CASE + WHEN created_at < datetime('now', '-30 days') THEN '30d+' + WHEN created_at < datetime('now', '-14 days') THEN '14-30d' + WHEN created_at < datetime('now', '-7 days') THEN '7-14d' + ELSE 'fresh' + END as age_bucket, + COUNT(*) as cnt + FROM sources + WHERE status = 'unprocessed' + GROUP BY age_bucket + """).fetchall() + stale_map = {r["age_bucket"]: r["cnt"] for r in stale_buckets} + stale_total = sum(v for k, v in stale_map.items() if k != "fresh") + + oldest_unprocessed = conn.execute( + "SELECT MIN(created_at) as oldest FROM sources WHERE status='unprocessed'" + ).fetchone() + oldest_age_days = None + if oldest_unprocessed and oldest_unprocessed["oldest"]: + oldest_dt = datetime.fromisoformat(oldest_unprocessed["oldest"]) + if oldest_dt.tzinfo is None: + oldest_dt = oldest_dt.replace(tzinfo=timezone.utc) + oldest_age_days = round((datetime.now(timezone.utc) - oldest_dt).total_seconds() / 86400, 1) + + queue_staleness = { + "stale_count": stale_total, + "buckets": stale_map, + "oldest_age_days": oldest_age_days, + "status": "healthy" if stale_total == 0 else ("warning" if stale_total <= 10 else "critical"), + } + return { "claim_index_status": claim_index_status, "review_throughput": { @@ -453,6 +487,7 @@ def _compute_vital_signs(conn) -> dict: "status": "healthy" if not stagnant_domains else "warning", }, "funnel": funnel, + "queue_staleness": queue_staleness, } @@ -660,6 +695,80 @@ async def handle_api_search(request): return web.json_response(result) +async def handle_api_audit(request): + """GET /api/audit — query response_audit table for agent response diagnostics. + + Query params: + agent: filter by agent name (optional) + query: search in query text (optional) + limit: max results, default 50, max 200 (optional) + offset: pagination offset (optional) + days: how many days back, default 7 (optional) + """ + conn = _conn(request) + + # Check if response_audit table exists + table_check = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='response_audit'" + ).fetchone() + if not table_check: + return web.json_response({"error": "response_audit table not found"}, status=404) + + agent = request.query.get("agent") + query_filter = request.query.get("query", "").strip() + limit = min(int(request.query.get("limit", "50")), 200) + offset = int(request.query.get("offset", "0")) + days = int(request.query.get("days", "7")) + + where_clauses = ["timestamp > datetime('now', ?||' days')"] + params: list = [f"-{days}"] + + if agent: + where_clauses.append("agent = ?") + params.append(agent) + if query_filter: + where_clauses.append("query LIKE ?") + params.append(f"%{query_filter}%") + + where_sql = " AND ".join(where_clauses) + + rows = conn.execute( + f"""SELECT id, timestamp, agent, chat_id, query, reformulated_query, + claims_matched, confidence_score, response_length, + retrieval_method, vector_scores, tool_calls, + pass_used, total_candidates + FROM response_audit + WHERE {where_sql} + ORDER BY timestamp DESC + LIMIT ? OFFSET ?""", + params + [limit, offset], + ).fetchall() + + total = conn.execute( + f"SELECT COUNT(*) as n FROM response_audit WHERE {where_sql}", + params, + ).fetchone()["n"] + + results = [] + for r in rows: + row_dict = dict(r) + # Parse JSON fields for the response + for json_field in ("claims_matched", "vector_scores", "tool_calls"): + if row_dict.get(json_field): + try: + row_dict[json_field] = json.loads(row_dict[json_field]) + except (json.JSONDecodeError, TypeError): + pass + results.append(row_dict) + + return web.json_response({"total": total, "results": results}) + + +async def handle_audit_page(request): + """GET /audit — HTML page for browsing response audit data.""" + return web.Response(content_type="text/html", text=_render_audit_page()) + + async def handle_api_usage(request): """POST /api/usage — log claim usage for analytics. @@ -706,6 +815,191 @@ def _render_error(message: str) -> str:

Argus

{message}

Check if teleo-pipeline.service is running and pipeline.db exists.

""" +def _render_audit_page() -> str: + """Render the response audit browser page.""" + return """ + + +Argus — Response Audit + + + + +

Response Audit

+

Browse agent responses, retrieved claims, and search quality metrics

+ +
+ + + + +
+ +
+
+ + + +""" + + def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, domain_breakdown, now) -> str: """Render the full operational dashboard as HTML with Chart.js.""" @@ -1063,7 +1357,8 @@ def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_pri Snapshots · Vital Signs · Contributors · - Domains + Domains · + Response Audit