feat: queue staleness diagnostic + vector GC reconciliation

Add queue_staleness to vital signs (sources >7d unprocessed, bucketed by age).
Add ops/vector-gc.py to reconcile Qdrant vectors against filesystem claims —
reports orphan vectors and missing embeddings, with --purge flag for cleanup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-03-30 01:14:01 +01:00
parent e17e6c25db
commit 6fb3f2258f
2 changed files with 451 additions and 3 deletions

View file

@ -38,8 +38,8 @@ CLAIM_INDEX_URL = os.environ.get("CLAIM_INDEX_URL", "http://localhost:8080/claim
API_KEY_FILE = Path(os.environ.get("ARGUS_API_KEY_FILE", "/opt/teleo-eval/secrets/argus-api-key"))
# Endpoints that skip auth (dashboard is public for now, can lock later)
_PUBLIC_PATHS = frozenset({"/", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains"})
_PUBLIC_PATHS = frozenset({"/", "/audit", "/api/metrics", "/api/snapshots", "/api/vital-signs",
"/api/contributors", "/api/domains", "/api/audit"})
def _get_db() -> sqlite3.Connection:
@ -426,6 +426,40 @@ def _compute_vital_signs(conn) -> dict:
"conversion_rate": round(merged_prs / total_prs, 3) if total_prs else 0,
}
# Queue staleness — sources unprocessed for >7 days
stale_buckets = conn.execute("""
SELECT
CASE
WHEN created_at < datetime('now', '-30 days') THEN '30d+'
WHEN created_at < datetime('now', '-14 days') THEN '14-30d'
WHEN created_at < datetime('now', '-7 days') THEN '7-14d'
ELSE 'fresh'
END as age_bucket,
COUNT(*) as cnt
FROM sources
WHERE status = 'unprocessed'
GROUP BY age_bucket
""").fetchall()
stale_map = {r["age_bucket"]: r["cnt"] for r in stale_buckets}
stale_total = sum(v for k, v in stale_map.items() if k != "fresh")
oldest_unprocessed = conn.execute(
"SELECT MIN(created_at) as oldest FROM sources WHERE status='unprocessed'"
).fetchone()
oldest_age_days = None
if oldest_unprocessed and oldest_unprocessed["oldest"]:
oldest_dt = datetime.fromisoformat(oldest_unprocessed["oldest"])
if oldest_dt.tzinfo is None:
oldest_dt = oldest_dt.replace(tzinfo=timezone.utc)
oldest_age_days = round((datetime.now(timezone.utc) - oldest_dt).total_seconds() / 86400, 1)
queue_staleness = {
"stale_count": stale_total,
"buckets": stale_map,
"oldest_age_days": oldest_age_days,
"status": "healthy" if stale_total == 0 else ("warning" if stale_total <= 10 else "critical"),
}
return {
"claim_index_status": claim_index_status,
"review_throughput": {
@ -453,6 +487,7 @@ def _compute_vital_signs(conn) -> dict:
"status": "healthy" if not stagnant_domains else "warning",
},
"funnel": funnel,
"queue_staleness": queue_staleness,
}
@ -660,6 +695,80 @@ async def handle_api_search(request):
return web.json_response(result)
async def handle_api_audit(request):
"""GET /api/audit — query response_audit table for agent response diagnostics.
Query params:
agent: filter by agent name (optional)
query: search in query text (optional)
limit: max results, default 50, max 200 (optional)
offset: pagination offset (optional)
days: how many days back, default 7 (optional)
"""
conn = _conn(request)
# Check if response_audit table exists
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='response_audit'"
).fetchone()
if not table_check:
return web.json_response({"error": "response_audit table not found"}, status=404)
agent = request.query.get("agent")
query_filter = request.query.get("query", "").strip()
limit = min(int(request.query.get("limit", "50")), 200)
offset = int(request.query.get("offset", "0"))
days = int(request.query.get("days", "7"))
where_clauses = ["timestamp > datetime('now', ?||' days')"]
params: list = [f"-{days}"]
if agent:
where_clauses.append("agent = ?")
params.append(agent)
if query_filter:
where_clauses.append("query LIKE ?")
params.append(f"%{query_filter}%")
where_sql = " AND ".join(where_clauses)
rows = conn.execute(
f"""SELECT id, timestamp, agent, chat_id, query, reformulated_query,
claims_matched, confidence_score, response_length,
retrieval_method, vector_scores, tool_calls,
pass_used, total_candidates
FROM response_audit
WHERE {where_sql}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?""",
params + [limit, offset],
).fetchall()
total = conn.execute(
f"SELECT COUNT(*) as n FROM response_audit WHERE {where_sql}",
params,
).fetchone()["n"]
results = []
for r in rows:
row_dict = dict(r)
# Parse JSON fields for the response
for json_field in ("claims_matched", "vector_scores", "tool_calls"):
if row_dict.get(json_field):
try:
row_dict[json_field] = json.loads(row_dict[json_field])
except (json.JSONDecodeError, TypeError):
pass
results.append(row_dict)
return web.json_response({"total": total, "results": results})
async def handle_audit_page(request):
"""GET /audit — HTML page for browsing response audit data."""
return web.Response(content_type="text/html", text=_render_audit_page())
async def handle_api_usage(request):
"""POST /api/usage — log claim usage for analytics.
@ -706,6 +815,191 @@ def _render_error(message: str) -> str:
</head><body><div class="err"><h1>Argus</h1><p>{message}</p><p>Check if <code>teleo-pipeline.service</code> is running and pipeline.db exists.</p></div></body></html>"""
def _render_audit_page() -> str:
"""Render the response audit browser page."""
return """<!DOCTYPE html>
<html><head>
<meta charset="utf-8">
<title>Argus Response Audit</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, system-ui, 'Segoe UI', sans-serif; background: #0d1117; color: #c9d1d9; padding: 24px; }
h1 { color: #58a6ff; margin-bottom: 8px; font-size: 22px; }
.subtitle { color: #8b949e; margin-bottom: 20px; font-size: 13px; }
.filters { display: flex; gap: 12px; margin-bottom: 20px; flex-wrap: wrap; align-items: center; }
.filters input, .filters select { background: #161b22; border: 1px solid #30363d; color: #c9d1d9; padding: 8px 12px; border-radius: 6px; font-size: 13px; }
.filters input:focus, .filters select:focus { border-color: #58a6ff; outline: none; }
.filters button { background: #238636; color: #fff; border: none; padding: 8px 16px; border-radius: 6px; cursor: pointer; font-size: 13px; }
.filters button:hover { background: #2ea043; }
.stats { color: #8b949e; font-size: 12px; margin-bottom: 12px; }
.audit-card { background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 16px; margin-bottom: 12px; }
.audit-card:hover { border-color: #484f58; }
.card-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px; }
.card-meta { font-size: 12px; color: #8b949e; }
.card-agent { display: inline-block; background: #1f6feb33; color: #58a6ff; padding: 2px 8px; border-radius: 4px; font-size: 12px; font-weight: 600; }
.card-confidence { display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 12px; font-weight: 600; }
.conf-high { background: #23863633; color: #3fb950; }
.conf-mid { background: #d2992233; color: #d29922; }
.conf-low { background: #f8514933; color: #f85149; }
.card-query { font-size: 14px; margin-bottom: 8px; word-break: break-word; }
.card-query strong { color: #e6edf3; }
.card-reformulated { font-size: 13px; color: #8b949e; margin-bottom: 8px; font-style: italic; }
.card-claims { margin-top: 8px; }
.card-claims summary { cursor: pointer; color: #58a6ff; font-size: 13px; }
.claim-list { margin-top: 6px; padding-left: 16px; font-size: 12px; color: #8b949e; }
.claim-list li { margin-bottom: 4px; }
.score-badge { display: inline-block; background: #30363d; padding: 1px 6px; border-radius: 3px; font-size: 11px; margin-left: 4px; }
.card-tools { margin-top: 8px; }
.card-tools summary { cursor: pointer; color: #58a6ff; font-size: 13px; }
.tool-json { margin-top: 6px; font-size: 11px; color: #8b949e; white-space: pre-wrap; word-break: break-all; max-height: 200px; overflow-y: auto; background: #0d1117; padding: 8px; border-radius: 4px; }
.pagination { display: flex; gap: 8px; margin-top: 16px; justify-content: center; }
.pagination button { background: #21262d; color: #c9d1d9; border: 1px solid #30363d; padding: 6px 14px; border-radius: 6px; cursor: pointer; }
.pagination button:hover { background: #30363d; }
.pagination button:disabled { opacity: 0.4; cursor: not-allowed; }
.empty { text-align: center; color: #484f58; padding: 40px; font-size: 14px; }
.nav { margin-bottom: 20px; font-size: 13px; }
.nav a { color: #58a6ff; text-decoration: none; }
.nav a:hover { text-decoration: underline; }
.pass-badge { display: inline-block; background: #30363d; padding: 1px 6px; border-radius: 3px; font-size: 11px; margin-left: 4px; }
</style>
</head>
<body>
<div class="nav"><a href="/">&larr; Dashboard</a></div>
<h1>Response Audit</h1>
<p class="subtitle">Browse agent responses, retrieved claims, and search quality metrics</p>
<div class="filters">
<input type="text" id="query-filter" placeholder="Search queries..." style="width:250px">
<select id="agent-filter">
<option value="">All agents</option>
<option value="rio">rio</option>
<option value="leo">leo</option>
<option value="theseus">theseus</option>
</select>
<select id="days-filter">
<option value="1">Last 24h</option>
<option value="7" selected>Last 7 days</option>
<option value="30">Last 30 days</option>
</select>
<button onclick="loadAudit()">Search</button>
</div>
<div class="stats" id="stats"></div>
<div id="results"></div>
<div class="pagination" id="pagination"></div>
<script>
let currentOffset = 0;
const PAGE_SIZE = 25;
async function loadAudit(offset = 0) {
currentOffset = offset;
const query = document.getElementById('query-filter').value;
const agent = document.getElementById('agent-filter').value;
const days = document.getElementById('days-filter').value;
const params = new URLSearchParams({limit: PAGE_SIZE, offset, days});
if (query) params.set('query', query);
if (agent) params.set('agent', agent);
const resp = await fetch('/api/audit?' + params);
const data = await resp.json();
if (data.error) {
document.getElementById('results').innerHTML = '<div class="empty">' + data.error + '</div>';
return;
}
document.getElementById('stats').textContent =
`${data.total} response${data.total !== 1 ? 's' : ''} found showing ${offset + 1}${Math.min(offset + PAGE_SIZE, data.total)}`;
if (data.results.length === 0) {
document.getElementById('results').innerHTML = '<div class="empty">No audit entries found. The bot logs responses as they happen.</div>';
document.getElementById('pagination').innerHTML = '';
return;
}
let html = '';
for (const r of data.results) {
const confClass = r.confidence_score >= 0.7 ? 'conf-high' : r.confidence_score >= 0.4 ? 'conf-mid' : 'conf-low';
const confLabel = r.confidence_score !== null ? (r.confidence_score * 100).toFixed(0) + '%' : '';
// Claims matched
let claimsHtml = '';
if (r.claims_matched && Array.isArray(r.claims_matched) && r.claims_matched.length > 0) {
const items = r.claims_matched.map(c => {
if (typeof c === 'string') return '<li>' + esc(c) + '</li>';
return '<li>' + esc(c.path || c.title || JSON.stringify(c)) + '</li>';
}).join('');
claimsHtml = '<details class="card-claims"><summary>' + r.claims_matched.length + ' claims retrieved</summary><ul class="claim-list">' + items + '</ul></details>';
} else {
claimsHtml = '<div style="font-size:12px;color:#484f58">No claims matched</div>';
}
// Vector scores
let scoresHtml = '';
if (r.vector_scores && Array.isArray(r.vector_scores) && r.vector_scores.length > 0) {
const scoreItems = r.vector_scores.map(s => {
if (typeof s === 'object') return '<li>' + esc(s.claim || s.path || '?') + ' <span class="score-badge">' + (s.score || s.similarity || '?') + '</span></li>';
return '<li>' + esc(String(s)) + '</li>';
}).join('');
scoresHtml = '<details class="card-claims"><summary>Vector scores</summary><ul class="claim-list">' + scoreItems + '</ul></details>';
}
// Tool calls
let toolsHtml = '';
if (r.tool_calls && Array.isArray(r.tool_calls) && r.tool_calls.length > 0) {
toolsHtml = '<details class="card-tools"><summary>' + r.tool_calls.length + ' retrieval steps</summary><div class="tool-json">' + esc(JSON.stringify(r.tool_calls, null, 2)) + '</div></details>';
}
// Reformulated query
let reformHtml = '';
if (r.reformulated_query && r.reformulated_query !== r.query) {
reformHtml = '<div class="card-reformulated">Reformulated: ' + esc(r.reformulated_query) + '</div>';
}
// Pass info
let passHtml = '';
if (r.pass_used) {
passHtml = ' <span class="pass-badge">Pass ' + esc(String(r.pass_used)) + '</span>';
}
if (r.total_candidates) {
passHtml += ' <span class="pass-badge">' + r.total_candidates + ' candidates</span>';
}
html += '<div class="audit-card">' +
'<div class="card-header">' +
'<div><span class="card-agent">' + esc(r.agent || '?') + '</span>' + passHtml + '</div>' +
'<div class="card-meta">' + esc(r.timestamp) + ' <span class="card-confidence ' + confClass + '">' + confLabel + '</span></div>' +
'</div>' +
'<div class="card-query"><strong>Q:</strong> ' + esc(r.query || '') + '</div>' +
reformHtml +
claimsHtml +
scoresHtml +
toolsHtml +
'</div>';
}
document.getElementById('results').innerHTML = html;
// Pagination
const totalPages = Math.ceil(data.total / PAGE_SIZE);
const currentPage = Math.floor(offset / PAGE_SIZE);
let pagHtml = '';
if (currentPage > 0) pagHtml += '<button onclick="loadAudit(' + ((currentPage - 1) * PAGE_SIZE) + ')">&larr; Prev</button>';
pagHtml += '<span style="color:#8b949e;padding:6px">Page ' + (currentPage + 1) + ' / ' + totalPages + '</span>';
if (currentPage < totalPages - 1) pagHtml += '<button onclick="loadAudit(' + ((currentPage + 1) * PAGE_SIZE) + ')">Next &rarr;</button>';
document.getElementById('pagination').innerHTML = pagHtml;
}
function esc(s) { const d = document.createElement('div'); d.textContent = s; return d.innerHTML; }
// Load on page open
loadAudit();
</script>
</body></html>"""
def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_principal, contributors_agent, domain_breakdown, now) -> str:
"""Render the full operational dashboard as HTML with Chart.js."""
@ -1063,7 +1357,8 @@ def _render_dashboard(metrics, snapshots, changes, vital_signs, contributors_pri
<a href="/api/snapshots">Snapshots</a> &middot;
<a href="/api/vital-signs">Vital Signs</a> &middot;
<a href="/api/contributors">Contributors</a> &middot;
<a href="/api/domains">Domains</a>
<a href="/api/domains">Domains</a> &middot;
<a href="/audit">Response Audit</a>
</div>
<script>
@ -1250,6 +1545,8 @@ def create_app() -> web.Application:
app.router.add_get("/api/contributors", handle_api_contributors)
app.router.add_get("/api/domains", handle_api_domains)
app.router.add_get("/api/search", handle_api_search)
app.router.add_get("/api/audit", handle_api_audit)
app.router.add_get("/audit", handle_audit_page)
app.router.add_post("/api/usage", handle_api_usage)
app.on_cleanup.append(_cleanup)
return app

151
ops/vector-gc.py Normal file
View file

@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""Vector GC — reconcile Qdrant vectors against filesystem claims.
Scrolls all Qdrant points, cross-references against current claim files
in the worktree, and reports (or purges) orphan vectors whose source files
no longer exist.
Usage:
python3 vector-gc.py # Dry run — report only
python3 vector-gc.py --purge # Delete orphan vectors from Qdrant
Pentagon-Agent: Epimetheus <0144398E-4ED3-4FE2-95A3-3D72E1ABF887>
"""
import argparse
import hashlib
import json
import sys
import urllib.request
from pathlib import Path
REPO_DIR = Path("/opt/teleo-eval/workspaces/main")
QDRANT_URL = "http://localhost:6333"
COLLECTION = "teleo-claims"
EMBED_DIRS = ["domains", "core", "foundations", "decisions", "entities"]
def make_point_id(path: str) -> str:
"""Deterministic UUID from file path (must match embed-claims.py)."""
return hashlib.md5(path.encode()).hexdigest()
def scroll_all_points() -> list[dict]:
"""Scroll all points from Qdrant collection."""
points = []
offset = None
while True:
body = {"limit": 100, "with_payload": True, "with_vector": False}
if offset is not None:
body["offset"] = offset
data = json.dumps(body).encode()
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())["result"]
batch = result.get("points", [])
points.extend(batch)
offset = result.get("next_page_offset")
if not offset or not batch:
break
except Exception as e:
print(f"ERROR scrolling Qdrant: {e}", file=sys.stderr)
sys.exit(1)
return points
def get_expected_ids() -> dict[str, Path]:
"""Build map of expected point IDs from filesystem."""
expected = {}
for d in EMBED_DIRS:
dir_path = REPO_DIR / d
if not dir_path.exists():
continue
for f in dir_path.rglob("*.md"):
rel = str(f.relative_to(REPO_DIR))
pid = make_point_id(rel)
expected[pid] = f
return expected
def delete_points(point_ids: list[str]):
"""Delete points from Qdrant by ID."""
body = json.dumps({"points": point_ids}).encode()
req = urllib.request.Request(
f"{QDRANT_URL}/collections/{COLLECTION}/points/delete",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
def main():
parser = argparse.ArgumentParser(description="Vector GC — reconcile Qdrant vs filesystem")
parser.add_argument("--purge", action="store_true", help="Delete orphan vectors")
args = parser.parse_args()
print("Scrolling all Qdrant points...")
points = scroll_all_points()
print(f" Qdrant vectors: {len(points)}")
print("Scanning filesystem for expected claims...")
expected = get_expected_ids()
print(f" Filesystem files: {len(expected)}")
qdrant_ids = {p["id"] for p in points}
expected_ids = set(expected.keys())
orphan_ids = qdrant_ids - expected_ids
missing_ids = expected_ids - qdrant_ids
# Categorize orphans by their payload path
orphan_details = []
for p in points:
if p["id"] in orphan_ids:
path = p.get("payload", {}).get("path", "unknown")
orphan_details.append({"id": p["id"], "path": path})
print(f"\n=== Vector GC Report ===")
print(f"Qdrant vectors: {len(qdrant_ids)}")
print(f"Filesystem claims: {len(expected_ids)}")
print(f"Orphan vectors: {len(orphan_ids)} (in Qdrant, no file)")
print(f"Missing vectors: {len(missing_ids)} (file exists, not in Qdrant)")
if orphan_details:
print(f"\nOrphan vectors (source file deleted):")
for o in sorted(orphan_details, key=lambda x: x["path"]):
print(f" {o['id'][:12]} {o['path']}")
if missing_ids:
print(f"\nMissing from Qdrant (need re-embed):")
for mid in sorted(missing_ids):
if mid in expected:
print(f" {mid[:12]} {expected[mid].relative_to(REPO_DIR)}")
if args.purge and orphan_ids:
print(f"\nPurging {len(orphan_ids)} orphan vectors...")
result = delete_points(list(orphan_ids))
print(f" Done: {result}")
elif orphan_ids and not args.purge:
print(f"\nRun with --purge to delete orphan vectors.")
# Summary JSON for cron output
summary = {
"qdrant_count": len(qdrant_ids),
"filesystem_count": len(expected_ids),
"orphans": len(orphan_ids),
"missing": len(missing_ids),
"orphan_paths": [o["path"] for o in orphan_details],
}
print(f"\n{json.dumps(summary)}")
if __name__ == "__main__":
main()