consolidate diagnostics: copy newer/unique files from root /diagnostics/ into teleo-codex/ops/diagnostics/

Files consolidated:
- dashboard_routes.py: root copy (39K) overwrites teleo-codex (34K) — has cost fix + connection leak fix
- dashboard_prs.py: root copy overwrites — has cost display rewrite
- dashboard_epistemic.py: root copy overwrites — has Ship rename
- research_tracking.py: new file, existed only in root /diagnostics/ (reviewed by Ganymede, never committed here)
- research_routes.py: new file, same situation
- ops/db.py: new file, unique to root /diagnostics/ops/

After this commit, root /diagnostics/ contains only stale copies and patch files — safe to delete.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-13 10:14:40 +02:00
parent 5514e04498
commit bf3af00d5d
6 changed files with 2230 additions and 805 deletions

View file

@ -74,7 +74,7 @@ def render_epistemic_page(vital_signs: dict, now: datetime) -> str:
<div style="font-size:40px;margin-bottom:12px;opacity:0.3">&#9881;</div>
<div style="color:#8b949e">
Multi-model agreement rate requires the <code>model_evals</code> table.<br>
<span style="font-size:12px">Blocked on: model_evals table creation (Theseus 2 Phase 3)</span>
<span style="font-size:12px">Blocked on: model_evals table creation (Ship Phase 3)</span>
</div>
<div style="margin-top:16px;font-size:12px;color:#8b949e">
Current eval models: Haiku (triage), GPT-4o (domain), Sonnet/Opus (Leo).<br>

View file

@ -1,8 +1,8 @@
"""PR Lifecycle dashboard — single-page view of every PR through the pipeline.
Sortable table: PR#, summary, claims, domain, contributor, outcome, evals, evaluator, cost, date.
Click any row to expand: claim titles, eval chain, timeline, reviews, issues.
Hero cards: total PRs, merge rate, total claims, est. cost.
Sortable table: PR#, summary, claims, domain, outcome, evals, evaluator, cost, date.
Click any row to expand: timeline, claim list, issues summary.
Hero cards: total PRs, merge rate, median eval rounds, total claims, total cost.
Data sources: prs table, audit_log (eval rounds), review_records.
Owner: Ship
@ -14,7 +14,7 @@ from shared_ui import render_page
EXTRA_CSS = """
.content-wrapper { max-width: 1600px !important; }
.page-content { max-width: 1600px !important; }
.filters { display: flex; gap: 12px; flex-wrap: wrap; margin-bottom: 16px; }
.filters select, .filters input {
background: #161b22; color: #c9d1d9; border: 1px solid #30363d;
@ -22,15 +22,14 @@ EXTRA_CSS = """
.filters select:focus, .filters input:focus { border-color: #58a6ff; outline: none; }
.pr-table { width: 100%; border-collapse: collapse; font-size: 13px; table-layout: fixed; }
.pr-table th:nth-child(1) { width: 50px; } /* PR# */
.pr-table th:nth-child(2) { width: 28%; } /* Summary */
.pr-table th:nth-child(2) { width: 30%; } /* Summary */
.pr-table th:nth-child(3) { width: 50px; } /* Claims */
.pr-table th:nth-child(4) { width: 11%; } /* Domain */
.pr-table th:nth-child(5) { width: 10%; } /* Contributor */
.pr-table th:nth-child(6) { width: 10%; } /* Outcome */
.pr-table th:nth-child(7) { width: 44px; } /* Evals */
.pr-table th:nth-child(8) { width: 12%; } /* Evaluator */
.pr-table th:nth-child(9) { width: 60px; } /* Cost */
.pr-table th:nth-child(10) { width: 80px; } /* Date */
.pr-table th:nth-child(4) { width: 12%; } /* Domain */
.pr-table th:nth-child(5) { width: 10%; } /* Outcome */
.pr-table th:nth-child(6) { width: 50px; } /* Evals */
.pr-table th:nth-child(7) { width: 16%; } /* Evaluator */
.pr-table th:nth-child(8) { width: 70px; } /* Cost */
.pr-table th:nth-child(9) { width: 90px; } /* Date */
.pr-table td { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; padding: 8px 6px; }
.pr-table td:nth-child(2) { white-space: normal; overflow: visible; line-height: 1.4; }
.pr-table th { cursor: pointer; user-select: none; position: relative; padding: 8px 18px 8px 6px; }
@ -49,24 +48,22 @@ EXTRA_CSS = """
.pr-table .pr-link:hover { text-decoration: underline; }
.pr-table td .summary-text { font-size: 12px; color: #c9d1d9; }
.pr-table td .review-snippet { font-size: 11px; color: #f85149; margin-top: 2px; opacity: 0.8; }
.pr-table td .model-tag { font-size: 10px; color: #6e7681; background: #161b22; border-radius: 3px; padding: 1px 4px; }
.pr-table td .contributor-tag { font-size: 11px; color: #d2a8ff; }
.pr-table td .contributor-self { font-size: 11px; color: #6e7681; font-style: italic; }
.pr-table td .model-tag { font-size: 9px; color: #6e7681; background: #21262d; border-radius: 3px; padding: 1px 4px; display: inline-block; margin: 1px 0; }
.pr-table td .expand-chevron { display: inline-block; width: 12px; color: #484f58; font-size: 10px; transition: transform 0.2s; }
.pr-table tr.expanded .expand-chevron { transform: rotate(90deg); color: #58a6ff; }
.pr-table td .cost-val { font-size: 12px; color: #8b949e; }
.pr-table td .claims-count { font-size: 13px; color: #c9d1d9; text-align: center; }
.pr-table td .evals-count { font-size: 13px; text-align: center; }
.trace-panel { background: #0d1117; border: 1px solid #30363d; border-radius: 8px;
padding: 16px; margin: 4px 0 8px 0; font-size: 12px; display: none; }
.trace-panel.open { display: block; }
.trace-panel h4 { color: #58a6ff; font-size: 12px; margin: 12px 0 6px 0; }
.trace-panel h4:first-child { margin-top: 0; }
.claim-list { list-style: none; padding: 0; margin: 0; }
.claim-list li { padding: 4px 0 4px 16px; border-left: 2px solid #238636; color: #c9d1d9; font-size: 12px; line-height: 1.5; }
.claim-list li .claim-confidence { font-size: 10px; color: #8b949e; margin-left: 6px; }
.issues-box { background: #1c1210; border: 1px solid #f8514933; border-radius: 6px;
.trace-panel .section-title { color: #58a6ff; font-size: 12px; font-weight: 600; margin: 12px 0 6px; }
.trace-panel .section-title:first-child { margin-top: 0; }
.trace-panel .claim-list { list-style: none; padding: 0; margin: 0; }
.trace-panel .claim-list li { padding: 4px 0; border-bottom: 1px solid #21262d; color: #c9d1d9; font-size: 12px; }
.trace-panel .claim-list li:last-child { border-bottom: none; }
.trace-panel .issues-box { background: #1c1017; border: 1px solid #f8514930; border-radius: 6px;
padding: 8px 12px; margin: 4px 0; font-size: 12px; color: #f85149; }
.eval-chain { background: #161b22; border-radius: 6px; padding: 8px 12px; margin: 4px 0; font-size: 12px; }
.eval-chain .chain-step { display: inline-block; margin-right: 6px; }
.eval-chain .chain-arrow { color: #484f58; margin: 0 4px; }
.trace-timeline { list-style: none; padding: 0; }
.trace-timeline li { padding: 4px 0; border-left: 2px solid #30363d; padding-left: 12px; margin-left: 8px; }
.trace-timeline li .ts { color: #484f58; font-size: 11px; }
@ -76,6 +73,12 @@ EXTRA_CSS = """
.trace-timeline li.ev-changes .ev { color: #d29922; }
.review-text { background: #161b22; padding: 8px 12px; border-radius: 4px;
margin: 4px 0; white-space: pre-wrap; font-size: 11px; color: #8b949e; max-height: 200px; overflow-y: auto; }
.eval-chain { background: #161b22; border-radius: 6px; padding: 8px 12px; margin: 4px 0 8px;
font-size: 12px; display: flex; gap: 12px; flex-wrap: wrap; align-items: center; }
.eval-chain .step { display: flex; align-items: center; gap: 4px; }
.eval-chain .step-label { color: #8b949e; font-size: 11px; }
.eval-chain .step-model { color: #c9d1d9; font-size: 11px; font-weight: 600; }
.eval-chain .arrow { color: #484f58; }
.pagination { display: flex; gap: 8px; align-items: center; justify-content: center; margin-top: 16px; }
.pagination button { background: #161b22; color: #c9d1d9; border: 1px solid #30363d;
border-radius: 4px; padding: 4px 12px; cursor: pointer; font-size: 12px; }
@ -93,6 +96,7 @@ def render_prs_page(now: datetime) -> str:
<div class="grid" id="hero-cards">
<div class="card"><div class="label">Total PRs</div><div class="value blue" id="kpi-total">--</div><div class="detail" id="kpi-total-detail"></div></div>
<div class="card"><div class="label">Merge Rate</div><div class="value green" id="kpi-merge-rate">--</div><div class="detail" id="kpi-merge-detail"></div></div>
<div class="card"><div class="label">Median Eval Rounds</div><div class="value" id="kpi-rounds">--</div><div class="detail" id="kpi-rounds-detail"></div></div>
<div class="card"><div class="label">Total Claims</div><div class="value blue" id="kpi-claims">--</div><div class="detail" id="kpi-claims-detail"></div></div>
<div class="card"><div class="label">Est. Cost</div><div class="value" id="kpi-cost">--</div><div class="detail" id="kpi-cost-detail"></div></div>
</div>
@ -100,7 +104,6 @@ def render_prs_page(now: datetime) -> str:
<!-- Filters -->
<div class="filters">
<select id="filter-domain"><option value="">All Domains</option></select>
<select id="filter-contributor"><option value="">All Contributors</option></select>
<select id="filter-outcome">
<option value="">All Outcomes</option>
<option value="merged">Merged</option>
@ -130,10 +133,9 @@ def render_prs_page(now: datetime) -> str:
<th data-col="summary">Summary <span class="sort-arrow">&#9650;</span></th>
<th data-col="claims_count">Claims <span class="sort-arrow">&#9650;</span></th>
<th data-col="domain">Domain <span class="sort-arrow">&#9650;</span></th>
<th data-col="submitted_by">Contributor <span class="sort-arrow">&#9650;</span></th>
<th data-col="status">Outcome <span class="sort-arrow">&#9650;</span></th>
<th data-col="eval_rounds">Evals <span class="sort-arrow">&#9650;</span></th>
<th data-col="evaluator_label">Evaluator <span class="sort-arrow">&#9650;</span></th>
<th data-col="evaluator">Evaluator <span class="sort-arrow">&#9650;</span></th>
<th data-col="est_cost">Cost <span class="sort-arrow">&#9650;</span></th>
<th data-col="created_at">Date <span class="sort-arrow">&#9650;</span></th>
</tr>
@ -150,71 +152,42 @@ def render_prs_page(now: datetime) -> str:
</div>
"""
# Use single-quoted JS strings throughout to avoid Python/HTML escaping issues
scripts = """<script>
var PAGE_SIZE = 50;
var FORGEJO = 'https://git.livingip.xyz/teleo/teleo-codex/pulls/';
var allData = [];
var filtered = [];
var sortCol = 'number';
var sortAsc = false;
var page = 0;
var expandedPr = null;
// Tier-based cost estimates (per eval round)
var TIER_COSTS = {
'DEEP': 0.145, // Haiku triage + Gemini Flash domain + Opus Leo
'STANDARD': 0.043, // Haiku triage + Gemini Flash domain + Sonnet Leo
'LIGHT': 0.027 // Haiku triage + Gemini Flash domain only
};
function estimateCost(pr) {
var tier = pr.tier || 'STANDARD';
var rounds = pr.eval_rounds || 1;
var baseCost = TIER_COSTS[tier] || TIER_COSTS['STANDARD'];
return baseCost * rounds;
}
function fmtCost(val) {
if (val == null || val === 0) return '--';
return '$' + val.toFixed(3);
}
const PAGE_SIZE = 50;
const FORGEJO = 'https://git.livingip.xyz/teleo/teleo-codex/pulls/';
let allData = [];
let filtered = [];
let sortCol = 'number';
let sortAsc = false;
let page = 0;
let expandedPr = null;
function loadData() {
var days = document.getElementById('filter-days').value;
var url = '/api/pr-lifecycle' + (days !== '0' ? '?days=' + days : '?days=9999');
fetch(url).then(function(r) { return r.json(); }).then(function(data) {
allData = data.prs || [];
// Compute derived fields
allData.forEach(function(p) {
p.est_cost = estimateCost(p);
// Evaluator label for sorting
p.evaluator_label = p.domain_agent || p.agent || '--';
});
populateFilters(allData);
updateKPIs(data);
applyFilters();
}).catch(function() {
document.getElementById('pr-tbody').innerHTML =
'<tr><td colspan="10" style="text-align:center;color:#f85149;">Failed to load data</td></tr>';
'<tr><td colspan="9" style="text-align:center;color:#f85149;">Failed to load data</td></tr>';
});
}
function populateFilters(prs) {
var domains = [], contribs = [], seenD = {}, seenC = {};
var domains = [], seenD = {};
prs.forEach(function(p) {
if (p.domain && !seenD[p.domain]) { seenD[p.domain] = 1; domains.push(p.domain); }
var c = p.submitted_by || 'unknown';
if (!seenC[c]) { seenC[c] = 1; contribs.push(c); }
});
domains.sort(); contribs.sort();
domains.sort();
var domSel = document.getElementById('filter-domain');
var conSel = document.getElementById('filter-contributor');
var curDom = domSel.value, curCon = conSel.value;
var curDom = domSel.value;
domSel.innerHTML = '<option value="">All Domains</option>' +
domains.map(function(d) { return '<option value="' + esc(d) + '">' + esc(d) + '</option>'; }).join('');
conSel.innerHTML = '<option value="">All Contributors</option>' +
contribs.map(function(c) { return '<option value="' + esc(c) + '">' + esc(c) + '</option>'; }).join('');
domSel.value = curDom; conSel.value = curCon;
domSel.value = curDom;
}
function updateKPIs(data) {
@ -226,29 +199,47 @@ def render_prs_page(now: datetime) -> str:
document.getElementById('kpi-merge-rate').textContent = fmtPct(rate);
document.getElementById('kpi-merge-detail').textContent = fmtNum(data.open) + ' open';
var totalClaims = 0, mergedClaims = 0, totalCost = 0;
document.getElementById('kpi-rounds').textContent =
data.median_rounds != null ? data.median_rounds.toFixed(1) : '--';
document.getElementById('kpi-rounds-detail').textContent =
data.max_rounds != null ? 'max: ' + data.max_rounds : '';
var totalClaims = 0, mergedClaims = 0;
var totalCost = 0;
var actualCount = 0, estCount = 0;
(data.prs || []).forEach(function(p) {
totalClaims += (p.claims_count || 1);
if (p.status === 'merged') mergedClaims += (p.claims_count || 1);
totalCost += estimateCost(p);
totalCost += (p.cost || 0);
if (p.cost_is_actual) actualCount++; else estCount++;
});
document.getElementById('kpi-claims').textContent = fmtNum(totalClaims);
document.getElementById('kpi-claims-detail').textContent = fmtNum(mergedClaims) + ' merged';
// Show actual DB total if available, otherwise sum from PRs
var costLabel = '';
if (data.actual_total_cost > 0) {
document.getElementById('kpi-cost').textContent = '$' + data.actual_total_cost.toFixed(2);
costLabel = 'from costs table';
} else if (actualCount > 0) {
document.getElementById('kpi-cost').textContent = '$' + totalCost.toFixed(2);
var perClaim = totalClaims > 0 ? totalCost / totalClaims : 0;
document.getElementById('kpi-cost-detail').textContent = '$' + perClaim.toFixed(3) + '/claim';
costLabel = actualCount + ' actual, ' + estCount + ' est.';
} else {
document.getElementById('kpi-cost').textContent = '$' + totalCost.toFixed(2);
costLabel = 'ALL ESTIMATED';
}
var costPerClaim = totalClaims > 0 ? totalCost / totalClaims : 0;
document.getElementById('kpi-cost-detail').textContent =
'$' + costPerClaim.toFixed(3) + '/claim \u00b7 ' + costLabel;
}
function applyFilters() {
var dom = document.getElementById('filter-domain').value;
var con = document.getElementById('filter-contributor').value;
var out = document.getElementById('filter-outcome').value;
var tier = document.getElementById('filter-tier').value;
filtered = allData.filter(function(p) {
if (dom && p.domain !== dom) return false;
if (con && (p.submitted_by || 'unknown') !== con) return false;
if (out && p.status !== out) return false;
if (tier && p.tier !== tier) return false;
return true;
@ -278,6 +269,19 @@ def render_prs_page(now: datetime) -> str:
return s.length > n ? s.substring(0, n) + '...' : s;
}
function shortModel(m) {
if (!m) return '';
// Shorten model names for display
if (m.indexOf('gemini-2.5-flash') !== -1) return 'Gemini Flash';
if (m.indexOf('claude-sonnet') !== -1 || m.indexOf('sonnet-4') !== -1) return 'Sonnet';
if (m.indexOf('claude-opus') !== -1 || m.indexOf('opus') !== -1) return 'Opus';
if (m.indexOf('haiku') !== -1) return 'Haiku';
if (m.indexOf('gpt-4o') !== -1) return 'GPT-4o';
// fallback: strip provider prefix
var parts = m.split('/');
return parts[parts.length - 1];
}
function renderTable() {
var tbody = document.getElementById('pr-tbody');
var start = page * PAGE_SIZE;
@ -285,7 +289,7 @@ def render_prs_page(now: datetime) -> str:
var totalPages = Math.ceil(filtered.length / PAGE_SIZE);
if (slice.length === 0) {
tbody.innerHTML = '<tr><td colspan="10" style="text-align:center;color:#8b949e;">No PRs match filters</td></tr>';
tbody.innerHTML = '<tr><td colspan="9" style="text-align:center;color:#8b949e;">No PRs match filters</td></tr>';
return;
}
@ -297,37 +301,40 @@ def render_prs_page(now: datetime) -> str:
(p.tier || '').toLowerCase() === 'standard' ? 'tier-standard' : 'tier-light';
var date = p.created_at ? p.created_at.substring(0, 10) : '--';
// Summary: first claim title
// Summary
var summary = p.summary || '--';
var reviewSnippet = '';
if (p.status === 'closed' && p.review_snippet) {
reviewSnippet = '<div class="review-snippet">' + esc(truncate(p.review_snippet, 120)) + '</div>';
}
// Outcome with tier badge
var outcomeLabel = esc(p.status || '--');
var tierBadge = p.tier ? ' <span class="' + tierClass + '" style="font-size:10px;">' + esc(p.tier) + '</span>' : '';
// Review snippet for issues
var reviewSnippet = '';
if (p.review_snippet) {
reviewSnippet = '<div class="review-snippet">' + esc(truncate(p.review_snippet, 100)) + '</div>';
}
// Contributor display
var contributor = p.submitted_by || '--';
var contribClass = 'contributor-tag';
if (contributor.indexOf('self-directed') >= 0 || contributor === 'unknown') {
contribClass = 'contributor-self';
}
// Evaluator: domain agent + model tag
// Evaluator column: domain agent + model
var evaluator = '';
if (p.domain_agent) {
var modelShort = '';
if (p.domain_model) {
var m = p.domain_model;
if (m.indexOf('gemini') >= 0) modelShort = 'Gemini Flash';
else if (m.indexOf('gpt-4o') >= 0) modelShort = 'GPT-4o';
else if (m.indexOf('sonnet') >= 0) modelShort = 'Sonnet';
else modelShort = m.split('/').pop();
evaluator = '<div style="font-size:12px;color:#c9d1d9;">' + esc(p.domain_agent) + '</div>';
}
evaluator = esc(p.domain_agent) + (modelShort ? ' <span class="model-tag">' + esc(modelShort) + '</span>' : '');
if (p.domain_model) {
evaluator += '<div class="model-tag">' + esc(shortModel(p.domain_model)) + '</div>';
}
if (p.leo_model) {
evaluator += '<div class="model-tag">' + esc(shortModel(p.leo_model)) + '</div>';
}
if (!evaluator) evaluator = '<span style="color:#484f58;">--</span>';
// Cost actual from DB or estimated (flagged)
var costStr;
if (p.cost != null && p.cost > 0) {
if (p.cost_is_actual) {
costStr = '<span class="cost-val">$' + p.cost.toFixed(3) + '</span>';
} else {
costStr = '<span class="cost-val" style="opacity:0.5;" title="Estimated — no actual cost tracked">~$' + p.cost.toFixed(3) + '</span>';
}
} else {
costStr = '<span style="color:#484f58;">--</span>';
}
rows.push(
@ -335,17 +342,16 @@ def render_prs_page(now: datetime) -> str:
'<td><span class="expand-chevron">&#9654;</span> ' +
'<a class="pr-link" href="' + FORGEJO + p.number + '" target="_blank" rel="noopener" onclick="event.stopPropagation();">#' + p.number + '</a></td>' +
'<td style="white-space:normal;"><span class="summary-text">' + esc(summary) + '</span>' + reviewSnippet + '</td>' +
'<td style="text-align:center;">' + (p.claims_count || 1) + '</td>' +
'<td style="text-align:center;">' + (p.claims_count || '--') + '</td>' +
'<td>' + esc(p.domain || '--') + '</td>' +
'<td><span class="' + contribClass + '">' + esc(truncate(contributor, 20)) + '</span></td>' +
'<td class="' + outClass + '">' + esc(p.status || '--') + tierBadge + '</td>' +
'<td class="' + outClass + '">' + outcomeLabel + tierBadge + '</td>' +
'<td style="text-align:center;">' + (p.eval_rounds || '--') + '</td>' +
'<td>' + evaluator + '</td>' +
'<td>' + fmtCost(p.est_cost) + '</td>' +
'<td>' + costStr + '</td>' +
'<td>' + date + '</td>' +
'</tr>' +
'<tr id="trace-' + p.number + '" style="display:none;"><td colspan="10" style="padding:0;">' +
'<div class="trace-panel" id="panel-' + p.number + '">Loading...</div>' +
'<tr id="trace-' + p.number + '" style="display:none;"><td colspan="9" style="padding:0;">' +
'<div class="trace-panel" id="panel-' + p.number + '">Loading trace...</div>' +
'</td></tr>'
);
});
@ -408,34 +414,46 @@ def render_prs_page(now: datetime) -> str:
});
function loadTrace(pr, panel) {
// Find the PR data for claim titles
// Also find this PR in allData for claim list
var prData = null;
for (var i = 0; i < allData.length; i++) {
if (allData[i].number == pr) { prData = allData[i]; break; }
}
allData.forEach(function(p) { if (p.number == pr) prData = p; });
fetch('/api/trace/' + pr).then(function(r) { return r.json(); }).then(function(data) {
var html = '';
// Claims contained in this PR
if (prData && prData.description) {
var titles = prData.description.split('|').map(function(t) { return t.trim(); }).filter(Boolean);
if (titles.length > 0) {
html += '<h4>Claims (' + titles.length + ')</h4>';
// --- Claims contained in this PR ---
if (prData && prData.claim_titles && prData.claim_titles.length > 0) {
html += '<div class="section-title">Claims (' + prData.claim_titles.length + ')</div>';
html += '<ul class="claim-list">';
titles.forEach(function(t) {
prData.claim_titles.forEach(function(t) {
html += '<li>' + esc(t) + '</li>';
});
html += '</ul>';
}
}
// Issues (if any)
// --- Issues summary ---
var issues = [];
if (data.timeline) {
data.timeline.forEach(function(ev) {
if (ev.detail && ev.detail.issues) {
var iss = ev.detail.issues;
if (typeof iss === 'string') { try { iss = JSON.parse(iss); } catch(e) { iss = [iss]; } }
if (Array.isArray(iss)) {
iss.forEach(function(i) {
var label = String(i).replace(/_/g, ' ');
if (issues.indexOf(label) === -1) issues.push(label);
});
}
}
});
}
if (prData && prData.review_snippet) {
html += '<div class="issues-box">' + esc(prData.review_snippet) + '</div>';
} else if (issues.length > 0) {
html += '<div class="issues-box">Issues: ' + issues.map(esc).join(', ') + '</div>';
}
// Eval chain with models
// --- Eval chain (who reviewed with what model) ---
var models = {};
if (data.timeline) {
data.timeline.forEach(function(ev) {
@ -446,38 +464,23 @@ def render_prs_page(now: datetime) -> str:
}
});
}
html += '<div class="eval-chain"><strong style="color:#58a6ff;">Eval Chain:</strong> ';
var chain = [];
if (models['triage.haiku_triage'] || models['triage.deterministic_triage']) {
chain.push('<span class="chain-step">Triage <span class="model-tag">' +
esc(models['triage.haiku_triage'] || 'deterministic') + '</span></span>');
}
if (models['domain_review']) {
chain.push('<span class="chain-step">Domain <span class="model-tag">' +
esc(models['domain_review']) + '</span></span>');
}
if (models['leo_review']) {
chain.push('<span class="chain-step">Leo <span class="model-tag">' +
esc(models['leo_review']) + '</span></span>');
}
html += chain.length > 0 ? chain.join('<span class="chain-arrow">&#8594;</span>') :
'<span style="color:#484f58;">No model data</span>';
html += '</div>';
// Source + contributor metadata
if (data.pr) {
html += '<div style="margin:8px 0;font-size:12px;color:#8b949e;">';
if (data.pr.source_path) html += 'Source: <span style="color:#c9d1d9;">' + esc(data.pr.source_path) + '</span> &middot; ';
if (prData && prData.submitted_by) html += 'Contributor: <span style="color:#d2a8ff;">' + esc(prData.submitted_by) + '</span> &middot; ';
if (data.pr.tier) html += 'Tier: <span style="color:#c9d1d9;">' + esc(data.pr.tier) + '</span> &middot; ';
html += '<a class="pr-link" href="' + FORGEJO + pr + '" target="_blank">View on Forgejo</a>';
if (Object.keys(models).length > 0) {
html += '<div class="eval-chain">';
html += '<strong style="color:#58a6ff;">Eval chain:</strong> ';
var parts = [];
if (models['triage.haiku_triage'] || models['triage.deterministic_triage'])
parts.push('<span class="step"><span class="step-label">Triage</span> <span class="step-model">' + shortModel(models['triage.haiku_triage'] || 'deterministic') + '</span></span>');
if (models['domain_review'])
parts.push('<span class="step"><span class="step-label">Domain</span> <span class="step-model">' + shortModel(models['domain_review']) + '</span></span>');
if (models['leo_review'])
parts.push('<span class="step"><span class="step-label">Leo</span> <span class="step-model">' + shortModel(models['leo_review']) + '</span></span>');
html += parts.length > 0 ? parts.join(' <span class="arrow">&#8594;</span> ') : '<span style="color:#484f58;">No model data</span>';
html += '</div>';
}
// Timeline
// --- Timeline ---
if (data.timeline && data.timeline.length > 0) {
html += '<h4>Timeline</h4>';
html += '<div class="section-title">Timeline</div>';
html += '<ul class="trace-timeline">';
data.timeline.forEach(function(ev) {
var cls = ev.event === 'approved' ? 'ev-approved' :
@ -488,7 +491,7 @@ def render_prs_page(now: datetime) -> str:
if (ev.detail) {
if (ev.detail.tier) detail += ' tier=' + ev.detail.tier;
if (ev.detail.reason) detail += ' &#8212; ' + esc(ev.detail.reason);
if (ev.detail.model) detail += ' [' + esc(ev.detail.model) + ']';
if (ev.detail.model) detail += ' [' + esc(shortModel(ev.detail.model)) + ']';
if (ev.detail.review_text) {
detail += '<div class="review-text">' + esc(ev.detail.review_text).substring(0, 2000) + '</div>';
}
@ -506,19 +509,19 @@ def render_prs_page(now: datetime) -> str:
});
html += '</ul>';
} else {
html += '<div style="color:#484f58;font-size:12px;margin:8px 0;">No timeline events</div>';
html += '<div style="color:#484f58;font-size:12px;margin-top:8px;">No timeline events</div>';
}
// Reviews
// --- Reviews ---
if (data.reviews && data.reviews.length > 0) {
html += '<h4>Reviews</h4>';
html += '<div class="section-title">Reviews</div>';
data.reviews.forEach(function(r) {
var cls = r.outcome === 'approved' ? 'badge-green' :
r.outcome === 'rejected' ? 'badge-red' : 'badge-yellow';
html += '<div style="margin:4px 0;">' +
'<span class="badge ' + cls + '">' + esc(r.outcome) + '</span> ' +
'<span style="color:#8b949e;font-size:11px;">' + esc(r.reviewer || '') + ' ' +
(r.model ? '[' + esc(r.model) + ']' : '') + ' ' +
(r.model ? '[' + esc(shortModel(r.model)) + ']' : '') + ' ' +
(r.reviewed_at || '').substring(0, 19) + '</span>';
if (r.rejection_reason) {
html += ' <code>' + esc(r.rejection_reason) + '</code>';
@ -537,7 +540,7 @@ def render_prs_page(now: datetime) -> str:
}
// Filter listeners
['filter-domain', 'filter-contributor', 'filter-outcome', 'filter-tier'].forEach(function(id) {
['filter-domain', 'filter-outcome', 'filter-tier'].forEach(function(id) {
document.getElementById(id).addEventListener('change', applyFilters);
});
document.getElementById('filter-days').addEventListener('change', loadData);

View file

@ -61,6 +61,7 @@ async def handle_stage_times(request):
Returns median minutes between consecutive stages.
"""
conn = request.app["_get_conn"]()
try:
hours = int(request.query.get("hours", "24"))
# Get per-PR event timestamps
@ -117,6 +118,8 @@ async def handle_stage_times(request):
}
return web.json_response({"hours": hours, "stages": stage_times})
finally:
conn.close()
# ─── GET /api/herfindahl ──────────────────────────────────────────────────
@ -127,6 +130,7 @@ async def handle_herfindahl(request):
HHI = sum of (domain_share^2). 1.0 = single domain, lower = more diverse.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
rows = conn.execute(
@ -164,6 +168,8 @@ async def handle_herfindahl(request):
"total_merged": total,
"days": days,
})
finally:
conn.close()
# ─── GET /api/agent-state ─────────────────────────────────────────────────
@ -226,6 +232,7 @@ async def handle_agent_state(request):
async def handle_extraction_yield_by_domain(request):
"""Sources → claims conversion rate per domain."""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
# Sources per domain (approximate from PR source_path domain)
@ -269,6 +276,8 @@ async def handle_extraction_yield_by_domain(request):
domains.sort(key=lambda x: x["merged"], reverse=True)
return web.json_response({"days": days, "domains": domains})
finally:
conn.close()
# ─── GET /api/agents-dashboard ─────────────────────────────────────────────
@ -281,6 +290,7 @@ async def handle_agents_dashboard(request):
All in one response to avoid N client-side fetches.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
# Per-agent merged + rejected counts
@ -380,6 +390,8 @@ async def handle_agents_dashboard(request):
pass
return web.json_response({"days": days, "agents": agents})
finally:
conn.close()
# ─── GET /api/cascade-coverage ────────────────────────────────────────────
@ -390,6 +402,7 @@ async def handle_cascade_coverage(request):
Returns: triggered count, by-agent breakdown, claims affected.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
triggered = conn.execute(
@ -440,6 +453,8 @@ async def handle_cascade_coverage(request):
"merges_with_cascade": summaries["total_merges_with_cascade"] if summaries else 0,
"by_agent": by_agent,
})
finally:
conn.close()
# ─── GET /api/review-summary ─────────────────────────────────────────────
@ -451,6 +466,7 @@ async def handle_review_summary(request):
disagreement_type columns.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
# Check if table exists and has data
@ -537,6 +553,8 @@ async def handle_review_summary(request):
for r in domains
],
})
finally:
conn.close()
# ─── Trace endpoint ────────────────────────────────────────────────────────
@ -549,11 +567,8 @@ async def handle_trace(request: web.Request) -> web.Response:
One thread, every stage, chronological.
"""
trace_id = request.match_info["trace_id"]
get_conn = request.app["_get_conn"]
conn = get_conn()
# Audit log events (the backbone)
# Try trace_id first, fall back to PR number in detail JSON
conn = request.app["_get_conn"]()
try:
events = conn.execute(
"""SELECT timestamp, stage, event, detail
FROM audit_log
@ -563,7 +578,6 @@ async def handle_trace(request: web.Request) -> web.Response:
).fetchall()
if not events:
# Fallback: match by PR number in detail JSON (for rows without trace_id)
events = conn.execute(
"""SELECT timestamp, stage, event, detail
FROM audit_log
@ -572,7 +586,6 @@ async def handle_trace(request: web.Request) -> web.Response:
(trace_id,),
).fetchall()
# Review records for this PR
reviews = conn.execute(
"""SELECT reviewed_at, reviewer, reviewer_model, outcome,
rejection_reason, disagreement_type, notes, claim_path
@ -582,7 +595,6 @@ async def handle_trace(request: web.Request) -> web.Response:
(trace_id,),
).fetchall()
# PR metadata
pr = conn.execute(
"""SELECT number, source_path, domain, agent, tier, status,
origin, created_at, merged_at
@ -608,6 +620,8 @@ async def handle_trace(request: web.Request) -> web.Response:
}
return web.json_response(result)
finally:
conn.close()
# ─── GET /api/growth ──────────────────────────────────────────────────────
@ -618,6 +632,7 @@ async def handle_growth(request):
Returns daily data points with running totals for each series.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "90"))
# Daily new sources
@ -709,6 +724,8 @@ async def handle_growth(request):
"merged": m_total,
},
})
finally:
conn.close()
import re
@ -723,23 +740,36 @@ async def handle_pr_lifecycle(request):
Joins prs + audit_log (eval rounds) + review_records.
"""
conn = request.app["_get_conn"]()
try:
days = int(request.query.get("days", "30"))
day_clause = "AND p.created_at > datetime('now', ? || ' days')" if days < 9999 else ""
params = (f"-{days}",) if days < 9999 else ()
# Base PR data
# Base PR data (include cost_usd for actual cost tracking)
pr_rows = conn.execute(
f"""SELECT p.number, p.agent, p.domain, p.tier, p.status,
p.created_at, p.merged_at, p.leo_verdict, p.description,
p.domain_agent, p.domain_model, p.branch, p.submitted_by,
p.source_path
p.domain_agent, p.domain_model, p.branch, p.cost_usd
FROM prs p
WHERE 1=1 {day_clause}
ORDER BY p.number DESC""",
params,
).fetchall()
# Actual costs from costs table (aggregated, same date window as PRs)
cost_day_clause = "AND date > date('now', ? || ' days')" if days < 9999 else ""
actual_cost_rows = conn.execute(
f"""SELECT SUM(cost_usd) as total_actual_cost,
SUM(calls) as total_calls,
SUM(input_tokens) as total_input_tokens,
SUM(output_tokens) as total_output_tokens
FROM costs
WHERE cost_usd > 0 {cost_day_clause}""",
params,
).fetchone()
actual_total_cost = actual_cost_rows["total_actual_cost"] if actual_cost_rows and actual_cost_rows["total_actual_cost"] else 0
# Eval round counts per PR (from audit_log)
eval_rows = conn.execute(
f"""SELECT CAST(json_extract(detail, '$.pr') AS INTEGER) as pr,
@ -802,6 +832,19 @@ async def handle_pr_lifecycle(request):
except (json.JSONDecodeError, TypeError):
pass
TIER_COST_EST = {
"LIGHT": 0.002,
"STANDARD": 0.018,
"DEEP": 0.12,
}
EXTRACT_COST_EST = 0.025
LEO_MODEL_BY_TIER = {
"DEEP": "claude-opus-4-20250514",
"STANDARD": "anthropic/claude-sonnet-4.5",
"LIGHT": None,
}
# Build PR list
prs = []
ttm_values = []
@ -839,38 +882,46 @@ async def handle_pr_lifecycle(request):
elif status == "open":
open_count += 1
# Claims count from pipe-separated description titles
desc = r["description"] or ""
claims_count = desc.count("|") + 1 if desc.strip() else 1
claim_titles = [t.strip() for t in desc.split("|") if t.strip()] if desc.strip() else []
claims_count = len(claim_titles) if claim_titles else 1
# Summary: first claim title from description, fallback to branch name
summary = None
if desc.strip():
first_title = desc.split("|")[0].strip()
summary = first_title[:120] if first_title else None
if claim_titles:
summary = claim_titles[0][:120]
if not summary:
branch = r["branch"] or ""
# Use prefix as category if present: "extract/...", "reweave/...", etc.
prefix = ""
if "/" in branch:
prefix = branch.split("/", 1)[0]
branch = branch.split("/", 1)[1]
# Strip date prefix like "2026-04-06-" or "2026-02-00-"
branch = _DATE_PREFIX_RE.sub("", branch)
# Strip trailing hash suffix like "-116d" or "-2cb1"
branch = re.sub(r"-[0-9a-f]{4}$", "", branch)
if branch:
summary = branch.replace("-", " ").replace("_", " ").strip()[:120]
elif prefix:
summary = prefix # "reweave", "ingestion", etc.
summary = prefix
tier = r["tier"] or "STANDARD"
actual_cost = r["cost_usd"] if r["cost_usd"] and r["cost_usd"] > 0 else None
if actual_cost is not None:
cost = round(actual_cost, 4)
cost_is_actual = True
else:
eval_cost = TIER_COST_EST.get(tier, 0.018) * max(rounds, 1)
cost = round(EXTRACT_COST_EST + eval_cost, 4)
cost_is_actual = False
leo_model = LEO_MODEL_BY_TIER.get(tier)
prs.append({
"number": pr_num,
"agent": r["agent"],
"domain": r["domain"],
"tier": r["tier"],
"tier": tier,
"status": status,
"claims_count": claims_count,
"claim_titles": claim_titles,
"eval_rounds": rounds,
"ttm_minutes": round(ttm, 1) if ttm is not None else None,
"created_at": r["created_at"],
@ -880,10 +931,11 @@ async def handle_pr_lifecycle(request):
"summary": summary,
"description": desc if desc.strip() else None,
"review_snippet": snippet_map.get(pr_num),
"submitted_by": r["submitted_by"],
"source_path": r["source_path"],
"domain_agent": r["domain_agent"],
"domain_model": r["domain_model"],
"leo_model": leo_model,
"cost": cost,
"cost_is_actual": cost_is_actual,
})
# Summary KPIs
@ -903,18 +955,35 @@ async def handle_pr_lifecycle(request):
return None
return vals[int(len(vals) * 0.9)]
# Compute cost summary: actual where available, estimated where not
total_actual = sum(p["cost"] for p in prs if p["cost_is_actual"])
total_estimated = sum(p["cost"] for p in prs if not p["cost_is_actual"])
prs_with_actual_cost = sum(1 for p in prs if p["cost_is_actual"])
med_ttm = median(ttm_values)
med_rounds = median(round_values)
return web.json_response({
"days": days,
"total": len(prs),
"merged": merged_count,
"closed": closed_count,
"open": open_count,
"median_ttm": round(median(ttm_values), 1) if median(ttm_values) is not None else None,
"median_ttm": round(med_ttm, 1) if med_ttm is not None else None,
"p90_ttm": round(p90(ttm_values), 1) if p90(ttm_values) is not None else None,
"median_rounds": round(median(round_values), 1) if median(round_values) is not None else None,
"median_rounds": round(med_rounds, 1) if med_rounds is not None else None,
"max_rounds": max(round_values) if round_values else None,
"actual_total_cost": round(actual_total_cost, 2),
"cost_summary": {
"total_actual": round(total_actual, 2),
"total_estimated": round(total_estimated, 2),
"prs_with_actual_cost": prs_with_actual_cost,
"prs_with_estimated_cost": len(prs) - prs_with_actual_cost,
},
"prs": prs,
})
finally:
conn.close()
# ─── Registration ──────────────────────────────────────────────────────────

655
ops/diagnostics/ops/db.py Normal file
View file

@ -0,0 +1,655 @@
"""SQLite database — schema, migrations, connection management."""
import json
import logging
import sqlite3
from contextlib import contextmanager
from . import config
logger = logging.getLogger("pipeline.db")
SCHEMA_VERSION = 16
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sources (
path TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'unprocessed',
-- unprocessed, triaging, extracting, extracted, null_result,
-- needs_reextraction, error
priority TEXT DEFAULT 'medium',
-- critical, high, medium, low, skip
priority_log TEXT DEFAULT '[]',
-- JSON array: [{stage, priority, reasoning, ts}]
extraction_model TEXT,
claims_count INTEGER DEFAULT 0,
pr_number INTEGER,
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
feedback TEXT,
-- eval feedback for re-extraction (JSON)
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS prs (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
-- validating, open, reviewing, approved, merging, merged, closed, zombie, conflict
-- conflict: rebase failed or merge timed out needs human intervention
domain TEXT,
agent TEXT,
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'challenge', 'enrich', 'synthesize', 'unknown')),
tier TEXT,
-- LIGHT, STANDARD, DEEP
tier0_pass INTEGER,
-- 0/1
leo_verdict TEXT DEFAULT 'pending',
-- pending, approve, request_changes, skipped, failed
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
-- NULL = inherit from source. Set explicitly for human-submitted PRs.
-- Pipeline PRs: COALESCE(p.priority, s.priority, 'medium')
-- Human PRs: 'critical' (detected via missing source_path or non-agent author)
origin TEXT DEFAULT 'pipeline',
-- pipeline | human | external
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
CREATE TABLE IF NOT EXISTS costs (
date TEXT,
model TEXT,
stage TEXT,
calls INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
PRIMARY KEY (date, model, stage)
);
CREATE TABLE IF NOT EXISTS circuit_breakers (
name TEXT PRIMARY KEY,
state TEXT DEFAULT 'closed',
-- closed, open, halfopen
failures INTEGER DEFAULT 0,
successes INTEGER DEFAULT 0,
tripped_at TEXT,
last_success_at TEXT,
-- heartbeat: if now() - last_success_at > 2*interval, stage is stalled (Vida)
last_update TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
stage TEXT,
event TEXT,
detail TEXT
);
CREATE TABLE IF NOT EXISTS response_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
chat_id INTEGER,
user TEXT,
agent TEXT DEFAULT 'rio',
model TEXT,
query TEXT,
conversation_window TEXT,
-- JSON: prior N messages for context
-- NOTE: intentional duplication of transcript data for audit self-containment.
-- Transcripts live in /opt/teleo-eval/transcripts/ but audit rows need prompt
-- context inline for retrieval-quality diagnosis. Primary driver of row size
-- target for cleanup when 90-day retention policy lands.
entities_matched TEXT,
-- JSON: [{name, path, score, used_in_response}]
claims_matched TEXT,
-- JSON: [{path, title, score, source, used_in_response}]
retrieval_layers_hit TEXT,
-- JSON: ["keyword","qdrant","graph"]
retrieval_gap TEXT,
-- What the KB was missing (if anything)
market_data TEXT,
-- JSON: injected token prices
research_context TEXT,
-- Haiku pre-pass results if any
kb_context_text TEXT,
-- Full context string sent to model
tool_calls TEXT,
-- JSON: ordered array [{tool, input, output, duration_ms, ts}]
raw_response TEXT,
display_response TEXT,
confidence_score REAL,
-- Model self-rated retrieval quality 0.0-1.0
response_time_ms INTEGER,
-- Eval pipeline columns (v10)
prompt_tokens INTEGER,
completion_tokens INTEGER,
generation_cost REAL,
embedding_cost REAL,
total_cost REAL,
blocked INTEGER DEFAULT 0,
block_reason TEXT,
query_type TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
CREATE INDEX IF NOT EXISTS idx_prs_status ON prs(status);
CREATE INDEX IF NOT EXISTS idx_prs_domain ON prs(domain);
CREATE INDEX IF NOT EXISTS idx_costs_date ON costs(date);
CREATE INDEX IF NOT EXISTS idx_audit_stage ON audit_log(stage);
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
"""
def get_connection(readonly: bool = False) -> sqlite3.Connection:
"""Create a SQLite connection with WAL mode and proper settings."""
config.DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(
str(config.DB_PATH),
timeout=30,
isolation_level=None, # autocommit — we manage transactions explicitly
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=10000")
conn.execute("PRAGMA foreign_keys=ON")
if readonly:
conn.execute("PRAGMA query_only=ON")
return conn
@contextmanager
def transaction(conn: sqlite3.Connection):
"""Context manager for explicit transactions."""
conn.execute("BEGIN")
try:
yield conn
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
# Branch prefix → (agent, commit_type) mapping.
# Single source of truth — used by merge.py at INSERT time and migration v7 backfill.
# Unknown prefixes → ('unknown', 'unknown') + warning log.
BRANCH_PREFIX_MAP = {
"extract": ("pipeline", "extract"),
"ingestion": ("pipeline", "extract"),
"epimetheus": ("epimetheus", "extract"),
"rio": ("rio", "research"),
"theseus": ("theseus", "research"),
"astra": ("astra", "research"),
"vida": ("vida", "research"),
"clay": ("clay", "research"),
"leo": ("leo", "entity"),
"reweave": ("pipeline", "reweave"),
"fix": ("pipeline", "fix"),
}
def classify_branch(branch: str) -> tuple[str, str]:
"""Derive (agent, commit_type) from branch prefix.
Returns ('unknown', 'unknown') and logs a warning for unrecognized prefixes.
"""
prefix = branch.split("/", 1)[0] if "/" in branch else branch
result = BRANCH_PREFIX_MAP.get(prefix)
if result is None:
logger.warning("Unknown branch prefix %r in branch %r — defaulting to ('unknown', 'unknown')", prefix, branch)
return ("unknown", "unknown")
return result
def migrate(conn: sqlite3.Connection):
"""Run schema migrations."""
conn.executescript(SCHEMA_SQL)
# Check current version
try:
row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone()
current = row["v"] if row and row["v"] else 0
except sqlite3.OperationalError:
current = 0
# --- Incremental migrations ---
if current < 2:
# Phase 2: add multiplayer columns to prs table
for stmt in [
"ALTER TABLE prs ADD COLUMN priority TEXT",
"ALTER TABLE prs ADD COLUMN origin TEXT DEFAULT 'pipeline'",
"ALTER TABLE prs ADD COLUMN last_error TEXT",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v2: added priority, origin, last_error to prs")
if current < 3:
# Phase 3: retry budget — track eval attempts and issue tags per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN eval_attempts INTEGER DEFAULT 0",
"ALTER TABLE prs ADD COLUMN eval_issues TEXT DEFAULT '[]'",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v3: added eval_attempts, eval_issues to prs")
if current < 4:
# Phase 4: auto-fixer — track fix attempts per PR
for stmt in [
"ALTER TABLE prs ADD COLUMN fix_attempts INTEGER DEFAULT 0",
]:
try:
conn.execute(stmt)
except sqlite3.OperationalError:
pass # Column already exists (idempotent)
logger.info("Migration v4: added fix_attempts to prs")
if current < 5:
# Phase 5: contributor identity system — tracks who contributed what
# Aligned with schemas/attribution.md (5 roles) + Leo's tier system.
# CI is COMPUTED from raw counts × weights, never stored.
conn.executescript("""
CREATE TABLE IF NOT EXISTS contributors (
handle TEXT PRIMARY KEY,
display_name TEXT,
agent_id TEXT,
first_contribution TEXT,
last_contribution TEXT,
tier TEXT DEFAULT 'new',
-- new, contributor, veteran
sourcer_count INTEGER DEFAULT 0,
extractor_count INTEGER DEFAULT 0,
challenger_count INTEGER DEFAULT 0,
synthesizer_count INTEGER DEFAULT 0,
reviewer_count INTEGER DEFAULT 0,
claims_merged INTEGER DEFAULT 0,
challenges_survived INTEGER DEFAULT 0,
domains TEXT DEFAULT '[]',
highlights TEXT DEFAULT '[]',
identities TEXT DEFAULT '{}',
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_contributors_tier ON contributors(tier);
""")
logger.info("Migration v5: added contributors table")
if current < 6:
# Phase 6: analytics — time-series metrics snapshots for trending dashboard
conn.executescript("""
CREATE TABLE IF NOT EXISTS metrics_snapshots (
ts TEXT DEFAULT (datetime('now')),
throughput_1h INTEGER,
approval_rate REAL,
open_prs INTEGER,
merged_total INTEGER,
closed_total INTEGER,
conflict_total INTEGER,
evaluated_24h INTEGER,
fix_success_rate REAL,
rejection_broken_wiki_links INTEGER DEFAULT 0,
rejection_frontmatter_schema INTEGER DEFAULT 0,
rejection_near_duplicate INTEGER DEFAULT 0,
rejection_confidence INTEGER DEFAULT 0,
rejection_other INTEGER DEFAULT 0,
extraction_model TEXT,
eval_domain_model TEXT,
eval_leo_model TEXT,
prompt_version TEXT,
pipeline_version TEXT,
source_origin_agent INTEGER DEFAULT 0,
source_origin_human INTEGER DEFAULT 0,
source_origin_scraper INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON metrics_snapshots(ts);
""")
logger.info("Migration v6: added metrics_snapshots table for analytics dashboard")
if current < 7:
# Phase 7: agent attribution + commit_type for dashboard
# commit_type column + backfill agent/commit_type from branch prefix
try:
conn.execute("ALTER TABLE prs ADD COLUMN commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract', 'research', 'entity', 'decision', 'reweave', 'fix', 'unknown'))")
except sqlite3.OperationalError:
pass # column already exists from CREATE TABLE
# Backfill agent and commit_type from branch prefix
rows = conn.execute("SELECT number, branch FROM prs WHERE branch IS NOT NULL").fetchall()
for row in rows:
agent, commit_type = classify_branch(row["branch"])
conn.execute(
"UPDATE prs SET agent = ?, commit_type = ? WHERE number = ? AND (agent IS NULL OR commit_type IS NULL)",
(agent, commit_type, row["number"]),
)
backfilled = len(rows)
logger.info("Migration v7: added commit_type column, backfilled %d PRs with agent/commit_type", backfilled)
if current < 8:
# Phase 8: response audit — full-chain visibility for agent response quality
# Captures: query → tool calls → retrieval → context → response → confidence
# Approved by Ganymede (architecture), Rio (agent needs), Rhea (ops)
conn.executescript("""
CREATE TABLE IF NOT EXISTS response_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
chat_id INTEGER,
user TEXT,
agent TEXT DEFAULT 'rio',
model TEXT,
query TEXT,
conversation_window TEXT, -- intentional transcript duplication for audit self-containment
entities_matched TEXT,
claims_matched TEXT,
retrieval_layers_hit TEXT,
retrieval_gap TEXT,
market_data TEXT,
research_context TEXT,
kb_context_text TEXT,
tool_calls TEXT,
raw_response TEXT,
display_response TEXT,
confidence_score REAL,
response_time_ms INTEGER,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_response_audit_ts ON response_audit(timestamp);
CREATE INDEX IF NOT EXISTS idx_response_audit_agent ON response_audit(agent);
CREATE INDEX IF NOT EXISTS idx_response_audit_chat_ts ON response_audit(chat_id, timestamp);
""")
logger.info("Migration v8: added response_audit table for agent response auditing")
if current < 9:
# Phase 9: rebuild prs table to expand CHECK constraint on commit_type.
# SQLite cannot ALTER CHECK constraints in-place — must rebuild table.
# Old constraint (v7): extract,research,entity,decision,reweave,fix,unknown
# New constraint: adds challenge,enrich,synthesize
# Also re-derive commit_type from branch prefix for rows with invalid/NULL values.
# Step 1: Get all column names from existing table
cols_info = conn.execute("PRAGMA table_info(prs)").fetchall()
col_names = [c["name"] for c in cols_info]
col_list = ", ".join(col_names)
# Step 2: Create new table with expanded CHECK constraint
conn.executescript(f"""
CREATE TABLE prs_new (
number INTEGER PRIMARY KEY,
source_path TEXT REFERENCES sources(path),
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
domain TEXT,
agent TEXT,
commit_type TEXT CHECK(commit_type IS NULL OR commit_type IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown')),
tier TEXT,
tier0_pass INTEGER,
leo_verdict TEXT DEFAULT 'pending',
domain_verdict TEXT DEFAULT 'pending',
domain_agent TEXT,
domain_model TEXT,
priority TEXT,
origin TEXT DEFAULT 'pipeline',
transient_retries INTEGER DEFAULT 0,
substantive_retries INTEGER DEFAULT 0,
last_error TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
merged_at TEXT
);
INSERT INTO prs_new ({col_list}) SELECT {col_list} FROM prs;
DROP TABLE prs;
ALTER TABLE prs_new RENAME TO prs;
""")
logger.info("Migration v9: rebuilt prs table with expanded commit_type CHECK constraint")
# Step 3: Re-derive commit_type from branch prefix for invalid/NULL values
rows = conn.execute(
"""SELECT number, branch FROM prs
WHERE branch IS NOT NULL
AND (commit_type IS NULL
OR commit_type NOT IN ('extract','research','entity','decision','reweave','fix','challenge','enrich','synthesize','unknown'))"""
).fetchall()
fixed = 0
for row in rows:
agent, commit_type = classify_branch(row["branch"])
conn.execute(
"UPDATE prs SET agent = COALESCE(agent, ?), commit_type = ? WHERE number = ?",
(agent, commit_type, row["number"]),
)
fixed += 1
conn.commit()
logger.info("Migration v9: re-derived commit_type for %d PRs with invalid/NULL values", fixed)
if current < 10:
# Add eval pipeline columns to response_audit
# VPS may already be at v10/v11 from prior (incomplete) deploys — use IF NOT EXISTS pattern
for col_def in [
("prompt_tokens", "INTEGER"),
("completion_tokens", "INTEGER"),
("generation_cost", "REAL"),
("embedding_cost", "REAL"),
("total_cost", "REAL"),
("blocked", "INTEGER DEFAULT 0"),
("block_reason", "TEXT"),
("query_type", "TEXT"),
]:
try:
conn.execute(f"ALTER TABLE response_audit ADD COLUMN {col_def[0]} {col_def[1]}")
except sqlite3.OperationalError:
pass # Column already exists
conn.commit()
logger.info("Migration v10: added eval pipeline columns to response_audit")
if current < 11:
# Phase 11: compute tracking — extended costs table columns
# (May already exist on VPS from manual deploy — idempotent ALTERs)
for col_def in [
("duration_ms", "INTEGER DEFAULT 0"),
("cache_read_tokens", "INTEGER DEFAULT 0"),
("cache_write_tokens", "INTEGER DEFAULT 0"),
("cost_estimate_usd", "REAL DEFAULT 0"),
]:
try:
conn.execute(f"ALTER TABLE costs ADD COLUMN {col_def[0]} {col_def[1]}")
except sqlite3.OperationalError:
pass # Column already exists
conn.commit()
logger.info("Migration v11: added compute tracking columns to costs")
if current < 12:
# Phase 12: structured review records — captures all evaluation outcomes
# including rejections, disagreements, and approved-with-changes.
# Schema locked with Leo (2026-04-01).
conn.executescript("""
CREATE TABLE IF NOT EXISTS review_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pr_number INTEGER NOT NULL,
claim_path TEXT,
domain TEXT,
agent TEXT,
reviewer TEXT NOT NULL,
reviewer_model TEXT,
outcome TEXT NOT NULL
CHECK (outcome IN ('approved', 'approved-with-changes', 'rejected')),
rejection_reason TEXT
CHECK (rejection_reason IS NULL OR rejection_reason IN (
'fails-standalone-test', 'duplicate', 'scope-mismatch',
'evidence-insufficient', 'framing-poor', 'other'
)),
disagreement_type TEXT
CHECK (disagreement_type IS NULL OR disagreement_type IN (
'factual', 'scope', 'framing', 'evidence'
)),
notes TEXT,
batch_id TEXT,
claims_in_batch INTEGER DEFAULT 1,
reviewed_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_review_records_pr ON review_records(pr_number);
CREATE INDEX IF NOT EXISTS idx_review_records_outcome ON review_records(outcome);
CREATE INDEX IF NOT EXISTS idx_review_records_domain ON review_records(domain);
CREATE INDEX IF NOT EXISTS idx_review_records_reviewer ON review_records(reviewer);
""")
logger.info("Migration v12: created review_records table")
if current < 16:
# Phase 16: trace_id on audit_log — queryable provenance chain.
# Auto-extracted from detail JSON's "pr" field by audit().
# Backfill existing rows from their detail JSON.
try:
conn.execute("ALTER TABLE audit_log ADD COLUMN trace_id TEXT")
except sqlite3.OperationalError:
pass # Column already exists
conn.execute("""
UPDATE audit_log
SET trace_id = json_extract(detail, '$.pr')
WHERE trace_id IS NULL
AND detail IS NOT NULL
AND json_extract(detail, '$.pr') IS NOT NULL
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_log_trace ON audit_log(trace_id)")
conn.commit()
logger.info("Migration v16: added trace_id to audit_log + backfilled from detail JSON")
if current < SCHEMA_VERSION:
conn.execute(
"INSERT OR REPLACE INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
conn.commit() # Explicit commit — executescript auto-commits DDL but not subsequent DML
logger.info("Database migrated to schema version %d", SCHEMA_VERSION)
else:
logger.debug("Database at schema version %d", current)
def audit(conn: sqlite3.Connection, stage: str, event: str, detail: str = None, *, trace_id: str = None):
"""Write an audit log entry.
trace_id is auto-extracted from detail JSON's "pr" field if not provided.
This gives every audit row a queryable trace without changing any call site.
"""
if trace_id is None and detail:
try:
trace_id = str(json.loads(detail).get("pr", ""))
except (json.JSONDecodeError, TypeError, AttributeError):
pass
if trace_id == "":
trace_id = None
conn.execute(
"INSERT INTO audit_log (stage, event, detail, trace_id) VALUES (?, ?, ?, ?)",
(stage, event, detail, trace_id),
)
def record_review(conn, pr_number: int, reviewer: str, outcome: str, *,
claim_path: str = None, domain: str = None, agent: str = None,
reviewer_model: str = None, rejection_reason: str = None,
disagreement_type: str = None, notes: str = None,
claims_in_batch: int = 1):
"""Record a structured review outcome.
Called from evaluate stage after Leo/domain reviewer returns a verdict.
outcome must be: approved, approved-with-changes, or rejected.
"""
batch_id = str(pr_number)
conn.execute(
"""INSERT INTO review_records
(pr_number, claim_path, domain, agent, reviewer, reviewer_model,
outcome, rejection_reason, disagreement_type, notes,
batch_id, claims_in_batch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(pr_number, claim_path, domain, agent, reviewer, reviewer_model,
outcome, rejection_reason, disagreement_type, notes,
batch_id, claims_in_batch),
)
def append_priority_log(conn: sqlite3.Connection, path: str, stage: str, priority: str, reasoning: str):
"""Append a priority assessment to a source's priority_log.
NOTE: This does NOT update the source's priority column. The priority column
is the authoritative priority, set only by initial triage or human override.
The priority_log records each stage's opinion for offline calibration analysis.
(Bug caught by Theseus original version overwrote priority with each stage's opinion.)
(Race condition fix per Vida read-then-write wrapped in transaction.)
"""
conn.execute("BEGIN")
try:
row = conn.execute("SELECT priority_log FROM sources WHERE path = ?", (path,)).fetchone()
if not row:
conn.execute("ROLLBACK")
return
log = json.loads(row["priority_log"] or "[]")
log.append({"stage": stage, "priority": priority, "reasoning": reasoning})
conn.execute(
"UPDATE sources SET priority_log = ?, updated_at = datetime('now') WHERE path = ?",
(json.dumps(log), path),
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
def insert_response_audit(conn: sqlite3.Connection, **kwargs):
"""Insert a response audit record. All fields optional except query."""
cols = [
"timestamp", "chat_id", "user", "agent", "model", "query",
"conversation_window", "entities_matched", "claims_matched",
"retrieval_layers_hit", "retrieval_gap", "market_data",
"research_context", "kb_context_text", "tool_calls",
"raw_response", "display_response", "confidence_score",
"response_time_ms",
# Eval pipeline columns (v10)
"prompt_tokens", "completion_tokens", "generation_cost",
"embedding_cost", "total_cost", "blocked", "block_reason",
"query_type",
]
present = {k: v for k, v in kwargs.items() if k in cols and v is not None}
if not present:
return
col_names = ", ".join(present.keys())
placeholders = ", ".join("?" for _ in present)
conn.execute(
f"INSERT INTO response_audit ({col_names}) VALUES ({placeholders})",
tuple(present.values()),
)
def set_priority(conn: sqlite3.Connection, path: str, priority: str, reason: str = "human override"):
"""Set a source's authoritative priority. Used for human overrides and initial triage."""
conn.execute(
"UPDATE sources SET priority = ?, updated_at = datetime('now') WHERE path = ?",
(priority, path),
)
append_priority_log(conn, path, "override", priority, reason)

View file

@ -0,0 +1,279 @@
"""Dashboard API routes for research session + cost tracking.
Argus-side read-only endpoints. These query the data that
research_tracking.py writes to pipeline.db.
Add to app.py after alerting_routes setup.
"""
import json
import sqlite3
from aiohttp import web
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
async def handle_api_research_sessions(request):
"""GET /api/research-sessions?agent=&domain=&days=7
Returns research sessions with linked sources and cost data.
"""
agent = request.query.get("agent")
domain = request.query.get("domain")
try:
days = int(request.query.get("days", 7))
except (ValueError, TypeError):
days = 7
conn = _conn(request.app)
try:
where = ["rs.started_at >= datetime('now', ?)"]
params = [f"-{days} days"]
if agent:
where.append("rs.agent = ?")
params.append(agent)
if domain:
where.append("rs.domain = ?")
params.append(domain)
where_clause = " AND ".join(where)
sessions = conn.execute(f"""
SELECT rs.*,
GROUP_CONCAT(s.path, '||') as source_paths,
GROUP_CONCAT(s.status, '||') as source_statuses,
GROUP_CONCAT(s.claims_count, '||') as source_claims,
GROUP_CONCAT(COALESCE(s.cost_usd, 0), '||') as source_costs
FROM research_sessions rs
LEFT JOIN sources s ON s.session_id = rs.id
WHERE {where_clause}
GROUP BY rs.id
ORDER BY rs.started_at DESC
""", params).fetchall()
result = []
for s in sessions:
sources = []
if s["source_paths"]:
paths = s["source_paths"].split("||")
statuses = (s["source_statuses"] or "").split("||")
claims = (s["source_claims"] or "").split("||")
costs = (s["source_costs"] or "").split("||")
for i, p in enumerate(paths):
sources.append({
"path": p,
"status": statuses[i] if i < len(statuses) else None,
"claims_count": int(claims[i]) if i < len(claims) and claims[i] else 0,
"extraction_cost": float(costs[i]) if i < len(costs) and costs[i] else 0,
})
result.append({
"id": s["id"],
"agent": s["agent"],
"domain": s["domain"],
"topic": s["topic"],
"reasoning": s["reasoning"],
"summary": s["summary"],
"sources_planned": s["sources_planned"],
"sources_produced": s["sources_produced"],
"model": s["model"],
"input_tokens": s["input_tokens"],
"output_tokens": s["output_tokens"],
"research_cost": s["cost_usd"],
"extraction_cost": sum(src["extraction_cost"] for src in sources),
"total_cost": s["cost_usd"] + sum(src["extraction_cost"] for src in sources),
"total_claims": sum(src["claims_count"] for src in sources),
"status": s["status"],
"started_at": s["started_at"],
"completed_at": s["completed_at"],
"sources": sources,
})
# Summary stats
total_sessions = len(result)
total_cost = sum(r["total_cost"] for r in result)
total_claims = sum(r["total_claims"] for r in result)
total_sources = sum(r["sources_produced"] for r in result)
return web.json_response({
"summary": {
"sessions": total_sessions,
"total_cost": round(total_cost, 2),
"total_claims": total_claims,
"total_sources": total_sources,
"avg_cost_per_claim": round(total_cost / total_claims, 4) if total_claims else 0,
"avg_cost_per_session": round(total_cost / total_sessions, 4) if total_sessions else 0,
},
"sessions": result,
})
finally:
conn.close()
async def handle_api_costs(request):
"""GET /api/costs?days=14&by=stage|model|date
Comprehensive cost breakdown. Works with EXISTING data in costs table
plus the new extraction costs once backfilled.
"""
try:
days = int(request.query.get("days", 14))
except (ValueError, TypeError):
days = 14
group_by = request.query.get("by", "stage")
conn = _conn(request.app)
try:
valid_groups = {"stage", "model", "date"}
if group_by not in valid_groups:
group_by = "stage"
rows = conn.execute(f"""
SELECT {group_by},
SUM(calls) as total_calls,
SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cost_usd) as total_cost
FROM costs
WHERE date >= date('now', ?)
GROUP BY {group_by}
ORDER BY total_cost DESC
""", (f"-{days} days",)).fetchall()
result = []
for r in rows:
result.append({
group_by: r[group_by],
"calls": r["total_calls"],
"input_tokens": r["total_input"],
"output_tokens": r["total_output"],
"cost_usd": round(r["total_cost"], 4),
})
grand_total = sum(r["cost_usd"] for r in result)
# Also get per-agent cost from sources table (extraction costs)
agent_costs = conn.execute("""
SELECT p.agent,
COUNT(DISTINCT s.path) as sources,
SUM(s.cost_usd) as extraction_cost,
SUM(s.claims_count) as claims
FROM sources s
LEFT JOIN prs p ON p.source_path = s.path
WHERE s.cost_usd > 0
GROUP BY p.agent
ORDER BY extraction_cost DESC
""").fetchall()
agent_breakdown = []
for r in agent_costs:
agent_breakdown.append({
"agent": r["agent"] or "unlinked",
"sources": r["sources"],
"extraction_cost": round(r["extraction_cost"], 2),
"claims": r["claims"],
"cost_per_claim": round(r["extraction_cost"] / r["claims"], 4) if r["claims"] else 0,
})
return web.json_response({
"period_days": days,
"grand_total": round(grand_total, 2),
"by_" + group_by: result,
"by_agent": agent_breakdown,
})
finally:
conn.close()
async def handle_api_source_detail(request):
"""GET /api/source/{path}
Full lifecycle of a single source: research session extraction claims eval outcomes.
"""
source_path = request.match_info["path"]
conn = _conn(request.app)
try:
# Try exact match first, fall back to suffix match (anchored)
source = conn.execute(
"SELECT * FROM sources WHERE path = ?",
(source_path,),
).fetchone()
if not source:
# Suffix match — anchor with / prefix to avoid substring hits
source = conn.execute(
"SELECT * FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_path}",),
).fetchone()
if not source:
return web.json_response({"error": "Source not found"}, status=404)
result = dict(source)
# Get research session if linked
if source["session_id"]:
session = conn.execute(
"SELECT * FROM research_sessions WHERE id = ?",
(source["session_id"],),
).fetchone()
result["research_session"] = dict(session) if session else None
else:
result["research_session"] = None
# Get PRs from this source
prs = conn.execute(
"SELECT number, status, domain, agent, tier, leo_verdict, domain_verdict, "
"cost_usd, created_at, merged_at, commit_type, transient_retries, substantive_retries, last_error "
"FROM prs WHERE source_path = ?",
(source["path"],),
).fetchall()
result["prs"] = [dict(p) for p in prs]
# Get eval events from audit_log for those PRs
# NOTE: audit_log.detail is mixed — some rows are JSON (evaluate events),
# some are plain text. Use json_valid() to filter safely.
pr_numbers = [p["number"] for p in prs]
if pr_numbers:
placeholders = ",".join("?" * len(pr_numbers))
evals = conn.execute(f"""
SELECT * FROM audit_log
WHERE stage = 'evaluate'
AND json_valid(detail)
AND json_extract(detail, '$.pr') IN ({placeholders})
ORDER BY timestamp
""", pr_numbers).fetchall()
result["eval_history"] = [
{"timestamp": e["timestamp"], "event": e["event"],
"detail": json.loads(e["detail"]) if e["detail"] else None}
for e in evals
]
else:
result["eval_history"] = []
return web.json_response(result)
finally:
conn.close()
def setup_research_routes(app):
"""Register research tracking routes. Call from create_app()."""
app.router.add_get("/api/research-sessions", handle_api_research_sessions)
app.router.add_get("/api/costs", handle_api_costs)
app.router.add_get("/api/source/{path:.+}", handle_api_source_detail)
# Public paths to add to auth middleware
RESEARCH_PUBLIC_PATHS = frozenset({
"/api/research-sessions",
"/api/costs",
})
# /api/source/{path} needs prefix matching — add to auth middleware:
# if path.startswith("/api/source/"): allow

View file

@ -0,0 +1,419 @@
"""Research session tracking + cost attribution for the Teleo pipeline.
This module adds three capabilities:
1. research_sessions table tracks WHY agents researched, what they found interesting,
session cost, and links to generated sources
2. Extraction cost attribution writes per-source cost to sources.cost_usd after extraction
3. Source claim linkage ensures prs.source_path is always populated
Designed for Epimetheus to integrate into the pipeline. Argus built the spec;
Ganymede reviews; Epimetheus wires it in.
Data flow:
Agent research session research_sessions row (with reasoning + summary)
sources created (with session_id FK)
extraction runs (cost written to sources.cost_usd + costs table)
PRs created (source_path populated)
claims merged (traceable back to session)
"""
import json
import logging
import sqlite3
from datetime import datetime
from typing import Optional
logger = logging.getLogger("research_tracking")
# ---------------------------------------------------------------------------
# Migration v11: research_sessions table + sources.session_id FK
# (v9 is current; v10 is Epimetheus's eval pipeline migration)
# ---------------------------------------------------------------------------
MIGRATION_V11_SQL = """
-- Research session tracking table
CREATE TABLE IF NOT EXISTS research_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
-- Which agent ran the research (leo, rio, astra, etc.)
domain TEXT,
-- Primary domain of the research
topic TEXT NOT NULL,
-- What they researched (short description)
reasoning TEXT,
-- WHY they chose this topic (agent's own explanation)
summary TEXT,
-- What they found most interesting/relevant
sources_planned INTEGER DEFAULT 0,
-- How many sources they intended to produce
sources_produced INTEGER DEFAULT 0,
-- How many actually materialized
model TEXT,
-- Model used for research (e.g. claude-opus-4-6)
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
-- Total research session cost (LLM calls for discovery + writing)
status TEXT DEFAULT 'running',
-- running, completed, failed, partial
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
metadata TEXT DEFAULT '{}'
-- JSON: any extra context (prompt version, search queries used, etc.)
);
CREATE INDEX IF NOT EXISTS idx_rs_agent ON research_sessions(agent);
CREATE INDEX IF NOT EXISTS idx_rs_domain ON research_sessions(domain);
CREATE INDEX IF NOT EXISTS idx_rs_started ON research_sessions(started_at);
-- Add session_id FK to sources table
ALTER TABLE sources ADD COLUMN session_id INTEGER REFERENCES research_sessions(id);
CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
-- Record migration
INSERT INTO schema_version (version) VALUES (11);
"""
# ---------------------------------------------------------------------------
# Cost attribution: write extraction cost to sources.cost_usd
# ---------------------------------------------------------------------------
# Pricing per million tokens (as of March 2026)
MODEL_PRICING = {
"anthropic/claude-sonnet-4.5": {"input": 3.00, "output": 15.00},
"anthropic/claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"anthropic/claude-haiku-4.5": {"input": 0.80, "output": 4.00},
"anthropic/claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"minimax/minimax-m2.5": {"input": 0.14, "output": 0.56},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate USD cost from model name and token counts."""
pricing = MODEL_PRICING.get(model)
if not pricing:
# Default to Sonnet 4.5 pricing as conservative estimate
logger.warning("Unknown model %s — using Sonnet 4.5 pricing", model)
pricing = {"input": 3.00, "output": 15.00}
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
def record_extraction_cost(
conn: sqlite3.Connection,
source_path: str,
model: str,
input_tokens: int,
output_tokens: int,
):
"""Write extraction cost to both sources.cost_usd and costs table.
Call this after each successful extraction call in openrouter-extract-v2.py.
This is the missing link the CSV logger records tokens but never writes
cost back to the DB.
"""
cost = calculate_cost(model, input_tokens, output_tokens)
# Update source row
conn.execute(
"UPDATE sources SET cost_usd = cost_usd + ?, extraction_model = ? WHERE path = ?",
(cost, model, source_path),
)
# Also record in costs table for dashboard aggregation
date = datetime.utcnow().strftime("%Y-%m-%d")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, input_tokens, output_tokens, cost),
)
conn.commit()
logger.info(
"Recorded extraction cost for %s: $%.4f (%d in, %d out, %s)",
source_path, cost, input_tokens, output_tokens, model,
)
return cost
# ---------------------------------------------------------------------------
# Research session lifecycle
# ---------------------------------------------------------------------------
def start_session(
conn: sqlite3.Connection,
agent: str,
topic: str,
domain: Optional[str] = None,
reasoning: Optional[str] = None,
sources_planned: int = 0,
model: Optional[str] = None,
metadata: Optional[dict] = None,
) -> int:
"""Call at the START of a research session. Returns session_id.
The agent should call this before it begins producing sources,
explaining what it plans to research and why.
"""
cur = conn.execute(
"""INSERT INTO research_sessions
(agent, domain, topic, reasoning, sources_planned, model, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
agent,
domain,
topic,
reasoning,
sources_planned,
model,
json.dumps(metadata or {}),
),
)
conn.commit()
session_id = cur.lastrowid
logger.info("Started research session #%d: %s / %s", session_id, agent, topic)
return session_id
def link_source_to_session(
conn: sqlite3.Connection,
source_path: str,
session_id: int,
):
"""Link a source file to its research session.
Call this when a source is written to inbox/ during a research session.
"""
conn.execute(
"UPDATE sources SET session_id = ? WHERE path = ?",
(session_id, source_path),
)
conn.execute(
"""UPDATE research_sessions
SET sources_produced = sources_produced + 1
WHERE id = ?""",
(session_id,),
)
conn.commit()
def complete_session(
conn: sqlite3.Connection,
session_id: int,
summary: str,
input_tokens: int = 0,
output_tokens: int = 0,
cost_usd: float = 0,
status: str = "completed",
):
"""Call at the END of a research session.
The agent should summarize what it found most interesting/relevant.
Cost should include ALL LLM calls made during the session (web search,
analysis, source writing everything).
"""
conn.execute(
"""UPDATE research_sessions
SET summary = ?, input_tokens = ?, output_tokens = ?,
cost_usd = ?, status = ?, completed_at = datetime('now')
WHERE id = ?""",
(summary, input_tokens, output_tokens, cost_usd, status, session_id),
)
conn.commit()
logger.info("Completed research session #%d: %s", session_id, status)
# ---------------------------------------------------------------------------
# Source → PR linkage fix
# ---------------------------------------------------------------------------
def ensure_source_path_on_pr(
conn: sqlite3.Connection,
pr_number: int,
source_path: str,
):
"""Ensure prs.source_path is populated. Call during PR creation.
Currently 0/1451 PRs have source_path set. This is the fix.
"""
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ? AND (source_path IS NULL OR source_path = '')",
(source_path, pr_number),
)
conn.commit()
# ---------------------------------------------------------------------------
# Backfill: attribute extraction costs from existing CSV log
# ---------------------------------------------------------------------------
def backfill_extraction_costs(conn: sqlite3.Connection, csv_path: str):
"""One-time backfill: read openrouter-usage.csv and write costs to sources + costs tables.
Run once to fill in the ~$338 of extraction costs that were logged to CSV
but never written to the database.
Safe to re-run only updates sources where cost_usd = 0, so partial
runs can be resumed without double-counting.
"""
import csv
count = 0
total_cost = 0.0
with open(csv_path) as f:
reader = csv.DictReader(f)
for row in reader:
source_file = row.get("source_file", "")
model = row.get("model", "")
try:
in_tok = int(row.get("input_tokens", 0) or 0)
out_tok = int(row.get("output_tokens", 0) or 0)
except (ValueError, TypeError):
continue
cost = calculate_cost(model, in_tok, out_tok)
if cost <= 0:
continue
# Try to match source_file to sources.path
# CSV has filename, DB has full path — match on exact suffix
# Use ORDER BY length(path) to prefer shortest (most specific) match
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? AND cost_usd = 0 ORDER BY length(path) LIMIT 1",
(f"%/{source_file}" if "/" not in source_file else f"%{source_file}",),
).fetchone()
if matched:
conn.execute(
"UPDATE sources SET cost_usd = ?, extraction_model = ? WHERE path = ?",
(cost, model, matched[0]),
)
# Always record in costs table
date = row.get("date", "unknown")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, in_tok, out_tok, cost),
)
count += 1
total_cost += cost
conn.commit()
logger.info("Backfilled %d extraction cost records, total $%.2f", count, total_cost)
return count, total_cost
# ---------------------------------------------------------------------------
# Backfill: populate prs.source_path from branch naming convention
# ---------------------------------------------------------------------------
def backfill_source_paths(conn: sqlite3.Connection):
"""One-time backfill: derive source_path for existing PRs from branch names.
Branch format: extract/YYYY-MM-DD-source-name or similar patterns.
Source path format: inbox/queue/YYYY-MM-DD-source-name.md
"""
rows = conn.execute(
"SELECT number, branch FROM prs WHERE source_path IS NULL AND branch IS NOT NULL"
).fetchall()
count = 0
for number, branch in rows:
# Try to extract source name from branch
# Common patterns: extract/source-name, claims/source-name
parts = branch.split("/", 1)
if len(parts) < 2:
continue
source_stem = parts[1]
# Try to find matching source in DB — exact suffix match, shortest path wins
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_stem}%" if source_stem else "",),
).fetchone()
if matched:
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ?",
(matched[0], number),
)
count += 1
conn.commit()
logger.info("Backfilled source_path for %d PRs", count)
return count
# ---------------------------------------------------------------------------
# Integration points (for Epimetheus to wire in)
# ---------------------------------------------------------------------------
INTEGRATION_GUIDE = """
## Where to wire this in
### 1. openrouter-extract-v2.py — after successful extraction call
from research_tracking import record_extraction_cost
# After line 430 (content, usage = call_openrouter(...))
# After line 672 (log_usage(...))
record_extraction_cost(
conn, args.source_file, args.model,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
)
### 2. Agent research scripts — wrap research sessions
from research_tracking import start_session, link_source_to_session, complete_session
# At start of research:
session_id = start_session(conn, agent="leo", topic="weapons stigmatization campaigns",
domain="grand-strategy",
reasoning="Following up on EU AI Act national security exclusion — exploring how stigmatization
campaigns have historically driven arms control policy",
sources_planned=6, model="claude-opus-4-6")
# As each source is written:
link_source_to_session(conn, source_path, session_id)
# At end of research:
complete_session(conn, session_id,
summary="Ottawa Treaty mine ban model is the strongest parallel to AI weapons — same
3-condition framework (humanitarian harm + low military utility + civil society
coalition). Ukraine Shahed case is a near-miss triggering event.",
input_tokens=total_in, output_tokens=total_out, cost_usd=total_cost)
### 3. PR creation in lib/merge.py or lib/validate.py — ensure source_path
from research_tracking import ensure_source_path_on_pr
# When creating a PR, pass the source:
ensure_source_path_on_pr(conn, pr_number, source_path)
### 4. One-time backfills (run manually after migration)
from research_tracking import backfill_extraction_costs, backfill_source_paths
backfill_extraction_costs(conn, "/opt/teleo-eval/logs/openrouter-usage.csv")
backfill_source_paths(conn)
### 5. Migration
Run MIGRATION_V11_SQL against pipeline.db after backing up.
"""