fix: sync all code from VPS — repo is now authoritative source of truth
Some checks are pending
CI / lint-and-test (push) Waiting to run

24 files: 8 pipeline lib modules, 6 diagnostics updates, 4 new diagnostics
modules, telegram bot fix, 5 active operational scripts. Key changes:
- Security: SQL injection prevention (alerting.py), SSL verification
  (review_queue.py), path traversal guard (extract.py)
- Cost tracking: per-PR cost accumulation in evaluate.py
- Auto-recovery: watchdog tier0 reset with retry cap + cooldown
- Extraction: structured edge fields, post-write vector connection
- New modules: vitality, research_tracking, research_routes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-15 13:18:01 +01:00
parent d2aec7fee3
commit 81afcd319f
24 changed files with 3544 additions and 1111 deletions

View file

@ -67,6 +67,8 @@ def check_agent_health(conn: sqlite3.Connection) -> list[dict]:
now = datetime.now(timezone.utc)
for r in rows:
agent = r["agent"]
if agent in ("unknown", None):
continue
latest = r["latest"]
if not latest:
continue
@ -157,8 +159,17 @@ def check_quality_regression(conn: sqlite3.Connection) -> list[dict]:
return alerts
_ALLOWED_DIM_EXPRS = frozenset({
"json_extract(detail, '$.agent')",
"json_extract(detail, '$.domain')",
"COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent'))",
})
def _check_approval_by_dimension(conn, alerts, dim_name, dim_expr):
"""Check approval rate regression grouped by a dimension (agent or domain)."""
"""Check approval rate regression grouped by a dimension. dim_expr must be in _ALLOWED_DIM_EXPRS."""
if dim_expr not in _ALLOWED_DIM_EXPRS:
raise ValueError(f"untrusted dim_expr: {dim_expr}")
# 7-day baseline per dimension
baseline_rows = conn.execute(
f"""SELECT {dim_expr} as dim_val,
@ -257,24 +268,22 @@ def check_rejection_spike(conn: sqlite3.Connection) -> list[dict]:
"""Detect single rejection reason exceeding REJECTION_SPIKE_RATIO of recent rejections."""
alerts = []
# Total rejections in 24h
# Total rejected PRs in 24h (prs.eval_issues is the canonical source — Epimetheus 2026-04-02)
total = conn.execute(
"""SELECT COUNT(*) as n FROM audit_log
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')"""
"""SELECT COUNT(*) as n FROM prs
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND created_at > datetime('now', '-24 hours')"""
).fetchone()["n"]
if total < 10:
return alerts # Not enough data
# Count by rejection tag
# Count by rejection tag from prs.eval_issues
tags = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND created_at > datetime('now', '-24 hours')
GROUP BY tag ORDER BY cnt DESC"""
).fetchall()
@ -306,16 +315,13 @@ def check_stuck_loops(conn: sqlite3.Connection) -> list[dict]:
"""Detect agents repeatedly failing on the same rejection reason."""
alerts = []
# COALESCE: rejection events use $.agent, eval events use $.domain_agent (Epimetheus 2026-03-28)
# Agent + rejection reason from prs table directly (Epimetheus correction 2026-04-02)
rows = conn.execute(
"""SELECT COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) as agent,
value as tag,
COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-6 hours')
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) IS NOT NULL
"""SELECT agent, value as tag, COUNT(*) as cnt
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND agent IS NOT NULL
AND created_at > datetime('now', '-6 hours')
GROUP BY agent, tag
HAVING cnt > ?""",
(STUCK_LOOP_THRESHOLD,),
@ -403,16 +409,13 @@ def check_domain_rejection_patterns(conn: sqlite3.Connection) -> list[dict]:
"""Track rejection reason shift per domain — surfaces domain maturity issues."""
alerts = []
# Per-domain rejection breakdown in 24h
# Per-domain rejection breakdown in 24h from prs table (Epimetheus correction 2026-04-02)
rows = conn.execute(
"""SELECT json_extract(detail, '$.domain') as domain,
value as tag,
COUNT(*) as cnt
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND timestamp > datetime('now', '-24 hours')
AND json_extract(detail, '$.domain') IS NOT NULL
"""SELECT domain, value as tag, COUNT(*) as cnt
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND domain IS NOT NULL
AND created_at > datetime('now', '-24 hours')
GROUP BY domain, tag
ORDER BY domain, cnt DESC"""
).fetchall()
@ -464,12 +467,11 @@ def generate_failure_report(conn: sqlite3.Connection, agent: str, hours: int = 2
hours = int(hours) # defensive — callers should pass int, but enforce it
rows = conn.execute(
"""SELECT value as tag, COUNT(*) as cnt,
GROUP_CONCAT(DISTINCT json_extract(detail, '$.pr')) as pr_numbers
FROM audit_log, json_each(json_extract(detail, '$.issues'))
WHERE stage='evaluate'
AND event IN ('changes_requested','domain_rejected','tier05_rejected')
AND COALESCE(json_extract(detail, '$.agent'), json_extract(detail, '$.domain_agent')) = ?
AND timestamp > datetime('now', ? || ' hours')
GROUP_CONCAT(DISTINCT number) as pr_numbers
FROM prs, json_each(prs.eval_issues)
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND agent = ?
AND created_at > datetime('now', ? || ' hours')
GROUP BY tag ORDER BY cnt DESC
LIMIT 5""",
(agent, f"-{hours}"),

View file

@ -26,22 +26,24 @@ async def handle_check(request):
conn = request.app["_alerting_conn_func"]()
try:
alerts = run_all_checks(conn)
# Generate failure reports for agents with stuck loops
failure_reports = {}
stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]}
for agent in stuck_agents:
report = generate_failure_report(conn, agent)
if report:
failure_reports[agent] = report
except Exception as e:
logger.error("Check failed: %s", e)
return web.json_response({"error": str(e)}, status=500)
finally:
conn.close()
global _active_alerts, _last_check
_active_alerts = alerts
_last_check = datetime.now(timezone.utc).isoformat()
# Generate failure reports for agents with stuck loops
failure_reports = {}
stuck_agents = {a["agent"] for a in alerts if a["category"] == "health" and "stuck" in a["id"] and a["agent"]}
for agent in stuck_agents:
report = generate_failure_report(conn, agent)
if report:
failure_reports[agent] = report
result = {
"checked_at": _last_check,
"alert_count": len(alerts),
@ -104,10 +106,15 @@ async def handle_api_failure_report(request):
hours: lookback window (default 24)
"""
agent = request.match_info["agent"]
hours = int(request.query.get("hours", "24"))
try:
hours = min(int(request.query.get("hours", "24")), 168)
except ValueError:
hours = 24
conn = request.app["_alerting_conn_func"]()
report = generate_failure_report(conn, agent, hours)
try:
report = generate_failure_report(conn, agent, hours)
finally:
conn.close()
if not report:
return web.json_response({"agent": agent, "status": "no_rejections", "period_hours": hours})

View file

@ -74,7 +74,7 @@ def render_epistemic_page(vital_signs: dict, now: datetime) -> str:
<div style="font-size:40px;margin-bottom:12px;opacity:0.3">&#9881;</div>
<div style="color:#8b949e">
Multi-model agreement rate requires the <code>model_evals</code> table.<br>
<span style="font-size:12px">Blocked on: model_evals table creation (Theseus 2 Phase 3)</span>
<span style="font-size:12px">Blocked on: model_evals table creation (Ship Phase 3)</span>
</div>
<div style="margin-top:16px;font-size:12px;color:#8b949e">
Current eval models: Haiku (triage), GPT-4o (domain), Sonnet/Opus (Leo).<br>
@ -194,12 +194,6 @@ fetch('/api/review-summary?days=30')
reasonRows += '<tr><td><code>' + esc(r.reason) + '</code></td><td>' + r.count + '</td></tr>';
}}
// Disagreement types
let disagreeRows = '';
for (const d of (data.disagreement_types || [])) {{
disagreeRows += '<tr><td>' + esc(d.type) + '</td><td>' + d.count + '</td></tr>';
}}
el.innerHTML = `
<div class="grid">
<div class="card"><div class="label">Total Reviews</div><div class="hero-value">${{data.total}}</div></div>
@ -215,13 +209,6 @@ fetch('/api/review-summary?days=30')
${{reasonRows || '<tr><td colspan="2" style="color:#8b949e">No rejections</td></tr>'}}
</table>
</div>
<div class="card">
<div style="font-weight:600;margin-bottom:8px">Disagreement Types</div>
<table>
<tr><th>Type</th><th>Count</th></tr>
${{disagreeRows || '<tr><td colspan="2" style="color:#8b949e">No disagreements</td></tr>'}}
</table>
</div>
</div>`;
}}).catch(() => {{
document.getElementById('review-container').innerHTML =

View file

@ -1,8 +1,8 @@
"""PR Lifecycle dashboard — single-page view of every PR through the pipeline.
Sortable table: PR#, summary, claims, domain, contributor, outcome, evals, evaluator, cost, date.
Click any row to expand: claim titles, eval chain, timeline, reviews, issues.
Hero cards: total PRs, merge rate, total claims, est. cost.
Sortable table: PR#, summary, claims, domain, outcome, evals, evaluator, cost, date.
Click any row to expand: timeline, claim list, issues summary.
Hero cards: total PRs, merge rate, median eval rounds, total claims, total cost.
Data sources: prs table, audit_log (eval rounds), review_records.
Owner: Ship
@ -14,7 +14,7 @@ from shared_ui import render_page
EXTRA_CSS = """
.content-wrapper { max-width: 1600px !important; }
.page-content { max-width: 1600px !important; }
.filters { display: flex; gap: 12px; flex-wrap: wrap; margin-bottom: 16px; }
.filters select, .filters input {
background: #161b22; color: #c9d1d9; border: 1px solid #30363d;
@ -22,15 +22,14 @@ EXTRA_CSS = """
.filters select:focus, .filters input:focus { border-color: #58a6ff; outline: none; }
.pr-table { width: 100%; border-collapse: collapse; font-size: 13px; table-layout: fixed; }
.pr-table th:nth-child(1) { width: 50px; } /* PR# */
.pr-table th:nth-child(2) { width: 28%; } /* Summary */
.pr-table th:nth-child(2) { width: 30%; } /* Summary */
.pr-table th:nth-child(3) { width: 50px; } /* Claims */
.pr-table th:nth-child(4) { width: 11%; } /* Domain */
.pr-table th:nth-child(5) { width: 10%; } /* Contributor */
.pr-table th:nth-child(6) { width: 10%; } /* Outcome */
.pr-table th:nth-child(7) { width: 44px; } /* Evals */
.pr-table th:nth-child(8) { width: 12%; } /* Evaluator */
.pr-table th:nth-child(9) { width: 60px; } /* Cost */
.pr-table th:nth-child(10) { width: 80px; } /* Date */
.pr-table th:nth-child(4) { width: 12%; } /* Domain */
.pr-table th:nth-child(5) { width: 10%; } /* Outcome */
.pr-table th:nth-child(6) { width: 50px; } /* Evals */
.pr-table th:nth-child(7) { width: 16%; } /* Evaluator */
.pr-table th:nth-child(8) { width: 70px; } /* Cost */
.pr-table th:nth-child(9) { width: 90px; } /* Date */
.pr-table td { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; padding: 8px 6px; }
.pr-table td:nth-child(2) { white-space: normal; overflow: visible; line-height: 1.4; }
.pr-table th { cursor: pointer; user-select: none; position: relative; padding: 8px 18px 8px 6px; }
@ -49,24 +48,22 @@ EXTRA_CSS = """
.pr-table .pr-link:hover { text-decoration: underline; }
.pr-table td .summary-text { font-size: 12px; color: #c9d1d9; }
.pr-table td .review-snippet { font-size: 11px; color: #f85149; margin-top: 2px; opacity: 0.8; }
.pr-table td .model-tag { font-size: 10px; color: #6e7681; background: #161b22; border-radius: 3px; padding: 1px 4px; }
.pr-table td .contributor-tag { font-size: 11px; color: #d2a8ff; }
.pr-table td .contributor-self { font-size: 11px; color: #6e7681; font-style: italic; }
.pr-table td .model-tag { font-size: 9px; color: #6e7681; background: #21262d; border-radius: 3px; padding: 1px 4px; display: inline-block; margin: 1px 0; }
.pr-table td .expand-chevron { display: inline-block; width: 12px; color: #484f58; font-size: 10px; transition: transform 0.2s; }
.pr-table tr.expanded .expand-chevron { transform: rotate(90deg); color: #58a6ff; }
.pr-table td .cost-val { font-size: 12px; color: #8b949e; }
.pr-table td .claims-count { font-size: 13px; color: #c9d1d9; text-align: center; }
.pr-table td .evals-count { font-size: 13px; text-align: center; }
.trace-panel { background: #0d1117; border: 1px solid #30363d; border-radius: 8px;
padding: 16px; margin: 4px 0 8px 0; font-size: 12px; display: none; }
.trace-panel.open { display: block; }
.trace-panel h4 { color: #58a6ff; font-size: 12px; margin: 12px 0 6px 0; }
.trace-panel h4:first-child { margin-top: 0; }
.claim-list { list-style: none; padding: 0; margin: 0; }
.claim-list li { padding: 4px 0 4px 16px; border-left: 2px solid #238636; color: #c9d1d9; font-size: 12px; line-height: 1.5; }
.claim-list li .claim-confidence { font-size: 10px; color: #8b949e; margin-left: 6px; }
.issues-box { background: #1c1210; border: 1px solid #f8514933; border-radius: 6px;
.trace-panel .section-title { color: #58a6ff; font-size: 12px; font-weight: 600; margin: 12px 0 6px; }
.trace-panel .section-title:first-child { margin-top: 0; }
.trace-panel .claim-list { list-style: none; padding: 0; margin: 0; }
.trace-panel .claim-list li { padding: 4px 0; border-bottom: 1px solid #21262d; color: #c9d1d9; font-size: 12px; }
.trace-panel .claim-list li:last-child { border-bottom: none; }
.trace-panel .issues-box { background: #1c1017; border: 1px solid #f8514930; border-radius: 6px;
padding: 8px 12px; margin: 4px 0; font-size: 12px; color: #f85149; }
.eval-chain { background: #161b22; border-radius: 6px; padding: 8px 12px; margin: 4px 0; font-size: 12px; }
.eval-chain .chain-step { display: inline-block; margin-right: 6px; }
.eval-chain .chain-arrow { color: #484f58; margin: 0 4px; }
.trace-timeline { list-style: none; padding: 0; }
.trace-timeline li { padding: 4px 0; border-left: 2px solid #30363d; padding-left: 12px; margin-left: 8px; }
.trace-timeline li .ts { color: #484f58; font-size: 11px; }
@ -76,6 +73,12 @@ EXTRA_CSS = """
.trace-timeline li.ev-changes .ev { color: #d29922; }
.review-text { background: #161b22; padding: 8px 12px; border-radius: 4px;
margin: 4px 0; white-space: pre-wrap; font-size: 11px; color: #8b949e; max-height: 200px; overflow-y: auto; }
.eval-chain { background: #161b22; border-radius: 6px; padding: 8px 12px; margin: 4px 0 8px;
font-size: 12px; display: flex; gap: 12px; flex-wrap: wrap; align-items: center; }
.eval-chain .step { display: flex; align-items: center; gap: 4px; }
.eval-chain .step-label { color: #8b949e; font-size: 11px; }
.eval-chain .step-model { color: #c9d1d9; font-size: 11px; font-weight: 600; }
.eval-chain .arrow { color: #484f58; }
.pagination { display: flex; gap: 8px; align-items: center; justify-content: center; margin-top: 16px; }
.pagination button { background: #161b22; color: #c9d1d9; border: 1px solid #30363d;
border-radius: 4px; padding: 4px 12px; cursor: pointer; font-size: 12px; }
@ -93,6 +96,7 @@ def render_prs_page(now: datetime) -> str:
<div class="grid" id="hero-cards">
<div class="card"><div class="label">Total PRs</div><div class="value blue" id="kpi-total">--</div><div class="detail" id="kpi-total-detail"></div></div>
<div class="card"><div class="label">Merge Rate</div><div class="value green" id="kpi-merge-rate">--</div><div class="detail" id="kpi-merge-detail"></div></div>
<div class="card"><div class="label">Median Eval Rounds</div><div class="value" id="kpi-rounds">--</div><div class="detail" id="kpi-rounds-detail"></div></div>
<div class="card"><div class="label">Total Claims</div><div class="value blue" id="kpi-claims">--</div><div class="detail" id="kpi-claims-detail"></div></div>
<div class="card"><div class="label">Est. Cost</div><div class="value" id="kpi-cost">--</div><div class="detail" id="kpi-cost-detail"></div></div>
</div>
@ -100,7 +104,6 @@ def render_prs_page(now: datetime) -> str:
<!-- Filters -->
<div class="filters">
<select id="filter-domain"><option value="">All Domains</option></select>
<select id="filter-contributor"><option value="">All Contributors</option></select>
<select id="filter-outcome">
<option value="">All Outcomes</option>
<option value="merged">Merged</option>
@ -130,10 +133,9 @@ def render_prs_page(now: datetime) -> str:
<th data-col="summary">Summary <span class="sort-arrow">&#9650;</span></th>
<th data-col="claims_count">Claims <span class="sort-arrow">&#9650;</span></th>
<th data-col="domain">Domain <span class="sort-arrow">&#9650;</span></th>
<th data-col="submitted_by">Contributor <span class="sort-arrow">&#9650;</span></th>
<th data-col="status">Outcome <span class="sort-arrow">&#9650;</span></th>
<th data-col="eval_rounds">Evals <span class="sort-arrow">&#9650;</span></th>
<th data-col="evaluator_label">Evaluator <span class="sort-arrow">&#9650;</span></th>
<th data-col="evaluator">Evaluator <span class="sort-arrow">&#9650;</span></th>
<th data-col="est_cost">Cost <span class="sort-arrow">&#9650;</span></th>
<th data-col="created_at">Date <span class="sort-arrow">&#9650;</span></th>
</tr>
@ -150,71 +152,42 @@ def render_prs_page(now: datetime) -> str:
</div>
"""
# Use single-quoted JS strings throughout to avoid Python/HTML escaping issues
scripts = """<script>
var PAGE_SIZE = 50;
var FORGEJO = 'https://git.livingip.xyz/teleo/teleo-codex/pulls/';
var allData = [];
var filtered = [];
var sortCol = 'number';
var sortAsc = false;
var page = 0;
var expandedPr = null;
// Tier-based cost estimates (per eval round)
var TIER_COSTS = {
'DEEP': 0.145, // Haiku triage + Gemini Flash domain + Opus Leo
'STANDARD': 0.043, // Haiku triage + Gemini Flash domain + Sonnet Leo
'LIGHT': 0.027 // Haiku triage + Gemini Flash domain only
};
function estimateCost(pr) {
var tier = pr.tier || 'STANDARD';
var rounds = pr.eval_rounds || 1;
var baseCost = TIER_COSTS[tier] || TIER_COSTS['STANDARD'];
return baseCost * rounds;
}
function fmtCost(val) {
if (val == null || val === 0) return '--';
return '$' + val.toFixed(3);
}
const PAGE_SIZE = 50;
const FORGEJO = 'https://git.livingip.xyz/teleo/teleo-codex/pulls/';
let allData = [];
let filtered = [];
let sortCol = 'number';
let sortAsc = false;
let page = 0;
let expandedPr = null;
function loadData() {
var days = document.getElementById('filter-days').value;
var url = '/api/pr-lifecycle' + (days !== '0' ? '?days=' + days : '?days=9999');
fetch(url).then(function(r) { return r.json(); }).then(function(data) {
allData = data.prs || [];
// Compute derived fields
allData.forEach(function(p) {
p.est_cost = estimateCost(p);
// Evaluator label for sorting
p.evaluator_label = p.domain_agent || p.agent || '--';
});
populateFilters(allData);
updateKPIs(data);
applyFilters();
}).catch(function() {
document.getElementById('pr-tbody').innerHTML =
'<tr><td colspan="10" style="text-align:center;color:#f85149;">Failed to load data</td></tr>';
'<tr><td colspan="9" style="text-align:center;color:#f85149;">Failed to load data</td></tr>';
});
}
function populateFilters(prs) {
var domains = [], contribs = [], seenD = {}, seenC = {};
var domains = [], seenD = {};
prs.forEach(function(p) {
if (p.domain && !seenD[p.domain]) { seenD[p.domain] = 1; domains.push(p.domain); }
var c = p.submitted_by || 'unknown';
if (!seenC[c]) { seenC[c] = 1; contribs.push(c); }
});
domains.sort(); contribs.sort();
domains.sort();
var domSel = document.getElementById('filter-domain');
var conSel = document.getElementById('filter-contributor');
var curDom = domSel.value, curCon = conSel.value;
var curDom = domSel.value;
domSel.innerHTML = '<option value="">All Domains</option>' +
domains.map(function(d) { return '<option value="' + esc(d) + '">' + esc(d) + '</option>'; }).join('');
conSel.innerHTML = '<option value="">All Contributors</option>' +
contribs.map(function(c) { return '<option value="' + esc(c) + '">' + esc(c) + '</option>'; }).join('');
domSel.value = curDom; conSel.value = curCon;
domSel.value = curDom;
}
function updateKPIs(data) {
@ -226,29 +199,47 @@ def render_prs_page(now: datetime) -> str:
document.getElementById('kpi-merge-rate').textContent = fmtPct(rate);
document.getElementById('kpi-merge-detail').textContent = fmtNum(data.open) + ' open';
var totalClaims = 0, mergedClaims = 0, totalCost = 0;
document.getElementById('kpi-rounds').textContent =
data.median_rounds != null ? data.median_rounds.toFixed(1) : '--';
document.getElementById('kpi-rounds-detail').textContent =
data.max_rounds != null ? 'max: ' + data.max_rounds : '';
var totalClaims = 0, mergedClaims = 0;
var totalCost = 0;
var actualCount = 0, estCount = 0;
(data.prs || []).forEach(function(p) {
totalClaims += (p.claims_count || 1);
if (p.status === 'merged') mergedClaims += (p.claims_count || 1);
totalCost += estimateCost(p);
totalCost += (p.cost || 0);
if (p.cost_is_actual) actualCount++; else estCount++;
});
document.getElementById('kpi-claims').textContent = fmtNum(totalClaims);
document.getElementById('kpi-claims-detail').textContent = fmtNum(mergedClaims) + ' merged';
document.getElementById('kpi-cost').textContent = '$' + totalCost.toFixed(2);
var perClaim = totalClaims > 0 ? totalCost / totalClaims : 0;
document.getElementById('kpi-cost-detail').textContent = '$' + perClaim.toFixed(3) + '/claim';
// Show actual DB total if available, otherwise sum from PRs
var costLabel = '';
if (data.actual_total_cost > 0) {
document.getElementById('kpi-cost').textContent = '$' + data.actual_total_cost.toFixed(2);
costLabel = 'from costs table';
} else if (actualCount > 0) {
document.getElementById('kpi-cost').textContent = '$' + totalCost.toFixed(2);
costLabel = actualCount + ' actual, ' + estCount + ' est.';
} else {
document.getElementById('kpi-cost').textContent = '$' + totalCost.toFixed(2);
costLabel = 'ALL ESTIMATED';
}
var costPerClaim = totalClaims > 0 ? totalCost / totalClaims : 0;
document.getElementById('kpi-cost-detail').textContent =
'$' + costPerClaim.toFixed(3) + '/claim \u00b7 ' + costLabel;
}
function applyFilters() {
var dom = document.getElementById('filter-domain').value;
var con = document.getElementById('filter-contributor').value;
var out = document.getElementById('filter-outcome').value;
var tier = document.getElementById('filter-tier').value;
filtered = allData.filter(function(p) {
if (dom && p.domain !== dom) return false;
if (con && (p.submitted_by || 'unknown') !== con) return false;
if (out && p.status !== out) return false;
if (tier && p.tier !== tier) return false;
return true;
@ -278,6 +269,19 @@ def render_prs_page(now: datetime) -> str:
return s.length > n ? s.substring(0, n) + '...' : s;
}
function shortModel(m) {
if (!m) return '';
// Shorten model names for display
if (m.indexOf('gemini-2.5-flash') !== -1) return 'Gemini Flash';
if (m.indexOf('claude-sonnet') !== -1 || m.indexOf('sonnet-4') !== -1) return 'Sonnet';
if (m.indexOf('claude-opus') !== -1 || m.indexOf('opus') !== -1) return 'Opus';
if (m.indexOf('haiku') !== -1) return 'Haiku';
if (m.indexOf('gpt-4o') !== -1) return 'GPT-4o';
// fallback: strip provider prefix
var parts = m.split('/');
return parts[parts.length - 1];
}
function renderTable() {
var tbody = document.getElementById('pr-tbody');
var start = page * PAGE_SIZE;
@ -285,7 +289,7 @@ def render_prs_page(now: datetime) -> str:
var totalPages = Math.ceil(filtered.length / PAGE_SIZE);
if (slice.length === 0) {
tbody.innerHTML = '<tr><td colspan="10" style="text-align:center;color:#8b949e;">No PRs match filters</td></tr>';
tbody.innerHTML = '<tr><td colspan="9" style="text-align:center;color:#8b949e;">No PRs match filters</td></tr>';
return;
}
@ -297,37 +301,40 @@ def render_prs_page(now: datetime) -> str:
(p.tier || '').toLowerCase() === 'standard' ? 'tier-standard' : 'tier-light';
var date = p.created_at ? p.created_at.substring(0, 10) : '--';
// Summary: first claim title
// Summary
var summary = p.summary || '--';
var reviewSnippet = '';
if (p.status === 'closed' && p.review_snippet) {
reviewSnippet = '<div class="review-snippet">' + esc(truncate(p.review_snippet, 120)) + '</div>';
}
// Outcome with tier badge
var outcomeLabel = esc(p.status || '--');
var tierBadge = p.tier ? ' <span class="' + tierClass + '" style="font-size:10px;">' + esc(p.tier) + '</span>' : '';
// Review snippet for issues
var reviewSnippet = '';
if (p.review_snippet) {
reviewSnippet = '<div class="review-snippet">' + esc(truncate(p.review_snippet, 100)) + '</div>';
}
// Contributor display
var contributor = p.submitted_by || '--';
var contribClass = 'contributor-tag';
if (contributor.indexOf('self-directed') >= 0 || contributor === 'unknown') {
contribClass = 'contributor-self';
}
// Evaluator: domain agent + model tag
// Evaluator column: domain agent + model
var evaluator = '';
if (p.domain_agent) {
var modelShort = '';
if (p.domain_model) {
var m = p.domain_model;
if (m.indexOf('gemini') >= 0) modelShort = 'Gemini Flash';
else if (m.indexOf('gpt-4o') >= 0) modelShort = 'GPT-4o';
else if (m.indexOf('sonnet') >= 0) modelShort = 'Sonnet';
else modelShort = m.split('/').pop();
evaluator = '<div style="font-size:12px;color:#c9d1d9;">' + esc(p.domain_agent) + '</div>';
}
if (p.domain_model) {
evaluator += '<div class="model-tag">' + esc(shortModel(p.domain_model)) + '</div>';
}
if (p.leo_model) {
evaluator += '<div class="model-tag">' + esc(shortModel(p.leo_model)) + '</div>';
}
if (!evaluator) evaluator = '<span style="color:#484f58;">--</span>';
// Cost actual from DB or estimated (flagged)
var costStr;
if (p.cost != null && p.cost > 0) {
if (p.cost_is_actual) {
costStr = '<span class="cost-val">$' + p.cost.toFixed(3) + '</span>';
} else {
costStr = '<span class="cost-val" style="opacity:0.5;" title="Estimated — no actual cost tracked">~$' + p.cost.toFixed(3) + '</span>';
}
evaluator = esc(p.domain_agent) + (modelShort ? ' <span class="model-tag">' + esc(modelShort) + '</span>' : '');
} else {
costStr = '<span style="color:#484f58;">--</span>';
}
rows.push(
@ -335,17 +342,16 @@ def render_prs_page(now: datetime) -> str:
'<td><span class="expand-chevron">&#9654;</span> ' +
'<a class="pr-link" href="' + FORGEJO + p.number + '" target="_blank" rel="noopener" onclick="event.stopPropagation();">#' + p.number + '</a></td>' +
'<td style="white-space:normal;"><span class="summary-text">' + esc(summary) + '</span>' + reviewSnippet + '</td>' +
'<td style="text-align:center;">' + (p.claims_count || 1) + '</td>' +
'<td style="text-align:center;">' + (p.claims_count || '--') + '</td>' +
'<td>' + esc(p.domain || '--') + '</td>' +
'<td><span class="' + contribClass + '">' + esc(truncate(contributor, 20)) + '</span></td>' +
'<td class="' + outClass + '">' + esc(p.status || '--') + tierBadge + '</td>' +
'<td class="' + outClass + '">' + outcomeLabel + tierBadge + '</td>' +
'<td style="text-align:center;">' + (p.eval_rounds || '--') + '</td>' +
'<td>' + evaluator + '</td>' +
'<td>' + fmtCost(p.est_cost) + '</td>' +
'<td>' + costStr + '</td>' +
'<td>' + date + '</td>' +
'</tr>' +
'<tr id="trace-' + p.number + '" style="display:none;"><td colspan="10" style="padding:0;">' +
'<div class="trace-panel" id="panel-' + p.number + '">Loading...</div>' +
'<tr id="trace-' + p.number + '" style="display:none;"><td colspan="9" style="padding:0;">' +
'<div class="trace-panel" id="panel-' + p.number + '">Loading trace...</div>' +
'</td></tr>'
);
});
@ -408,34 +414,46 @@ def render_prs_page(now: datetime) -> str:
});
function loadTrace(pr, panel) {
// Find the PR data for claim titles
// Also find this PR in allData for claim list
var prData = null;
for (var i = 0; i < allData.length; i++) {
if (allData[i].number == pr) { prData = allData[i]; break; }
}
allData.forEach(function(p) { if (p.number == pr) prData = p; });
fetch('/api/trace/' + pr).then(function(r) { return r.json(); }).then(function(data) {
var html = '';
// Claims contained in this PR
if (prData && prData.description) {
var titles = prData.description.split('|').map(function(t) { return t.trim(); }).filter(Boolean);
if (titles.length > 0) {
html += '<h4>Claims (' + titles.length + ')</h4>';
html += '<ul class="claim-list">';
titles.forEach(function(t) {
html += '<li>' + esc(t) + '</li>';
});
html += '</ul>';
}
// --- Claims contained in this PR ---
if (prData && prData.claim_titles && prData.claim_titles.length > 0) {
html += '<div class="section-title">Claims (' + prData.claim_titles.length + ')</div>';
html += '<ul class="claim-list">';
prData.claim_titles.forEach(function(t) {
html += '<li>' + esc(t) + '</li>';
});
html += '</ul>';
}
// Issues (if any)
// --- Issues summary ---
var issues = [];
if (data.timeline) {
data.timeline.forEach(function(ev) {
if (ev.detail && ev.detail.issues) {
var iss = ev.detail.issues;
if (typeof iss === 'string') { try { iss = JSON.parse(iss); } catch(e) { iss = [iss]; } }
if (Array.isArray(iss)) {
iss.forEach(function(i) {
var label = String(i).replace(/_/g, ' ');
if (issues.indexOf(label) === -1) issues.push(label);
});
}
}
});
}
if (prData && prData.review_snippet) {
html += '<div class="issues-box">' + esc(prData.review_snippet) + '</div>';
} else if (issues.length > 0) {
html += '<div class="issues-box">Issues: ' + issues.map(esc).join(', ') + '</div>';
}
// Eval chain with models
// --- Eval chain (who reviewed with what model) ---
var models = {};
if (data.timeline) {
data.timeline.forEach(function(ev) {
@ -446,38 +464,23 @@ def render_prs_page(now: datetime) -> str:
}
});
}
html += '<div class="eval-chain"><strong style="color:#58a6ff;">Eval Chain:</strong> ';
var chain = [];
if (models['triage.haiku_triage'] || models['triage.deterministic_triage']) {
chain.push('<span class="chain-step">Triage <span class="model-tag">' +
esc(models['triage.haiku_triage'] || 'deterministic') + '</span></span>');
}
if (models['domain_review']) {
chain.push('<span class="chain-step">Domain <span class="model-tag">' +
esc(models['domain_review']) + '</span></span>');
}
if (models['leo_review']) {
chain.push('<span class="chain-step">Leo <span class="model-tag">' +
esc(models['leo_review']) + '</span></span>');
}
html += chain.length > 0 ? chain.join('<span class="chain-arrow">&#8594;</span>') :
'<span style="color:#484f58;">No model data</span>';
html += '</div>';
// Source + contributor metadata
if (data.pr) {
html += '<div style="margin:8px 0;font-size:12px;color:#8b949e;">';
if (data.pr.source_path) html += 'Source: <span style="color:#c9d1d9;">' + esc(data.pr.source_path) + '</span> &middot; ';
if (prData && prData.submitted_by) html += 'Contributor: <span style="color:#d2a8ff;">' + esc(prData.submitted_by) + '</span> &middot; ';
if (data.pr.tier) html += 'Tier: <span style="color:#c9d1d9;">' + esc(data.pr.tier) + '</span> &middot; ';
html += '<a class="pr-link" href="' + FORGEJO + pr + '" target="_blank">View on Forgejo</a>';
if (Object.keys(models).length > 0) {
html += '<div class="eval-chain">';
html += '<strong style="color:#58a6ff;">Eval chain:</strong> ';
var parts = [];
if (models['triage.haiku_triage'] || models['triage.deterministic_triage'])
parts.push('<span class="step"><span class="step-label">Triage</span> <span class="step-model">' + shortModel(models['triage.haiku_triage'] || 'deterministic') + '</span></span>');
if (models['domain_review'])
parts.push('<span class="step"><span class="step-label">Domain</span> <span class="step-model">' + shortModel(models['domain_review']) + '</span></span>');
if (models['leo_review'])
parts.push('<span class="step"><span class="step-label">Leo</span> <span class="step-model">' + shortModel(models['leo_review']) + '</span></span>');
html += parts.length > 0 ? parts.join(' <span class="arrow">&#8594;</span> ') : '<span style="color:#484f58;">No model data</span>';
html += '</div>';
}
// Timeline
// --- Timeline ---
if (data.timeline && data.timeline.length > 0) {
html += '<h4>Timeline</h4>';
html += '<div class="section-title">Timeline</div>';
html += '<ul class="trace-timeline">';
data.timeline.forEach(function(ev) {
var cls = ev.event === 'approved' ? 'ev-approved' :
@ -488,7 +491,7 @@ def render_prs_page(now: datetime) -> str:
if (ev.detail) {
if (ev.detail.tier) detail += ' tier=' + ev.detail.tier;
if (ev.detail.reason) detail += ' &#8212; ' + esc(ev.detail.reason);
if (ev.detail.model) detail += ' [' + esc(ev.detail.model) + ']';
if (ev.detail.model) detail += ' [' + esc(shortModel(ev.detail.model)) + ']';
if (ev.detail.review_text) {
detail += '<div class="review-text">' + esc(ev.detail.review_text).substring(0, 2000) + '</div>';
}
@ -506,19 +509,19 @@ def render_prs_page(now: datetime) -> str:
});
html += '</ul>';
} else {
html += '<div style="color:#484f58;font-size:12px;margin:8px 0;">No timeline events</div>';
html += '<div style="color:#484f58;font-size:12px;margin-top:8px;">No timeline events</div>';
}
// Reviews
// --- Reviews ---
if (data.reviews && data.reviews.length > 0) {
html += '<h4>Reviews</h4>';
html += '<div class="section-title">Reviews</div>';
data.reviews.forEach(function(r) {
var cls = r.outcome === 'approved' ? 'badge-green' :
r.outcome === 'rejected' ? 'badge-red' : 'badge-yellow';
html += '<div style="margin:4px 0;">' +
'<span class="badge ' + cls + '">' + esc(r.outcome) + '</span> ' +
'<span style="color:#8b949e;font-size:11px;">' + esc(r.reviewer || '') + ' ' +
(r.model ? '[' + esc(r.model) + ']' : '') + ' ' +
(r.model ? '[' + esc(shortModel(r.model)) + ']' : '') + ' ' +
(r.reviewed_at || '').substring(0, 19) + '</span>';
if (r.rejection_reason) {
html += ' <code>' + esc(r.rejection_reason) + '</code>';
@ -537,7 +540,7 @@ def render_prs_page(now: datetime) -> str:
}
// Filter listeners
['filter-domain', 'filter-contributor', 'filter-outcome', 'filter-tier'].forEach(function(id) {
['filter-domain', 'filter-outcome', 'filter-tier'].forEach(function(id) {
document.getElementById(id).addEventListener('change', applyFilters);
});
document.getElementById('filter-days').addEventListener('change', loadData);

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,279 @@
"""Dashboard API routes for research session + cost tracking.
Argus-side read-only endpoints. These query the data that
research_tracking.py writes to pipeline.db.
Add to app.py after alerting_routes setup.
"""
import json
import sqlite3
from aiohttp import web
def _conn(app):
"""Read-only connection to pipeline.db."""
db_path = app["db_path"]
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
return conn
async def handle_api_research_sessions(request):
"""GET /api/research-sessions?agent=&domain=&days=7
Returns research sessions with linked sources and cost data.
"""
agent = request.query.get("agent")
domain = request.query.get("domain")
try:
days = int(request.query.get("days", 7))
except (ValueError, TypeError):
days = 7
conn = _conn(request.app)
try:
where = ["rs.started_at >= datetime('now', ?)"]
params = [f"-{days} days"]
if agent:
where.append("rs.agent = ?")
params.append(agent)
if domain:
where.append("rs.domain = ?")
params.append(domain)
where_clause = " AND ".join(where)
sessions = conn.execute(f"""
SELECT rs.*,
GROUP_CONCAT(s.path, '||') as source_paths,
GROUP_CONCAT(s.status, '||') as source_statuses,
GROUP_CONCAT(s.claims_count, '||') as source_claims,
GROUP_CONCAT(COALESCE(s.cost_usd, 0), '||') as source_costs
FROM research_sessions rs
LEFT JOIN sources s ON s.session_id = rs.id
WHERE {where_clause}
GROUP BY rs.id
ORDER BY rs.started_at DESC
""", params).fetchall()
result = []
for s in sessions:
sources = []
if s["source_paths"]:
paths = s["source_paths"].split("||")
statuses = (s["source_statuses"] or "").split("||")
claims = (s["source_claims"] or "").split("||")
costs = (s["source_costs"] or "").split("||")
for i, p in enumerate(paths):
sources.append({
"path": p,
"status": statuses[i] if i < len(statuses) else None,
"claims_count": int(claims[i]) if i < len(claims) and claims[i] else 0,
"extraction_cost": float(costs[i]) if i < len(costs) and costs[i] else 0,
})
result.append({
"id": s["id"],
"agent": s["agent"],
"domain": s["domain"],
"topic": s["topic"],
"reasoning": s["reasoning"],
"summary": s["summary"],
"sources_planned": s["sources_planned"],
"sources_produced": s["sources_produced"],
"model": s["model"],
"input_tokens": s["input_tokens"],
"output_tokens": s["output_tokens"],
"research_cost": s["cost_usd"],
"extraction_cost": sum(src["extraction_cost"] for src in sources),
"total_cost": s["cost_usd"] + sum(src["extraction_cost"] for src in sources),
"total_claims": sum(src["claims_count"] for src in sources),
"status": s["status"],
"started_at": s["started_at"],
"completed_at": s["completed_at"],
"sources": sources,
})
# Summary stats
total_sessions = len(result)
total_cost = sum(r["total_cost"] for r in result)
total_claims = sum(r["total_claims"] for r in result)
total_sources = sum(r["sources_produced"] for r in result)
return web.json_response({
"summary": {
"sessions": total_sessions,
"total_cost": round(total_cost, 2),
"total_claims": total_claims,
"total_sources": total_sources,
"avg_cost_per_claim": round(total_cost / total_claims, 4) if total_claims else 0,
"avg_cost_per_session": round(total_cost / total_sessions, 4) if total_sessions else 0,
},
"sessions": result,
})
finally:
conn.close()
async def handle_api_costs(request):
"""GET /api/costs?days=14&by=stage|model|date
Comprehensive cost breakdown. Works with EXISTING data in costs table
plus the new extraction costs once backfilled.
"""
try:
days = int(request.query.get("days", 14))
except (ValueError, TypeError):
days = 14
group_by = request.query.get("by", "stage")
conn = _conn(request.app)
try:
valid_groups = {"stage", "model", "date"}
if group_by not in valid_groups:
group_by = "stage"
rows = conn.execute(f"""
SELECT {group_by},
SUM(calls) as total_calls,
SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cost_usd) as total_cost
FROM costs
WHERE date >= date('now', ?)
GROUP BY {group_by}
ORDER BY total_cost DESC
""", (f"-{days} days",)).fetchall()
result = []
for r in rows:
result.append({
group_by: r[group_by],
"calls": r["total_calls"],
"input_tokens": r["total_input"],
"output_tokens": r["total_output"],
"cost_usd": round(r["total_cost"], 4),
})
grand_total = sum(r["cost_usd"] for r in result)
# Also get per-agent cost from sources table (extraction costs)
agent_costs = conn.execute("""
SELECT p.agent,
COUNT(DISTINCT s.path) as sources,
SUM(s.cost_usd) as extraction_cost,
SUM(s.claims_count) as claims
FROM sources s
LEFT JOIN prs p ON p.source_path = s.path
WHERE s.cost_usd > 0
GROUP BY p.agent
ORDER BY extraction_cost DESC
""").fetchall()
agent_breakdown = []
for r in agent_costs:
agent_breakdown.append({
"agent": r["agent"] or "unlinked",
"sources": r["sources"],
"extraction_cost": round(r["extraction_cost"], 2),
"claims": r["claims"],
"cost_per_claim": round(r["extraction_cost"] / r["claims"], 4) if r["claims"] else 0,
})
return web.json_response({
"period_days": days,
"grand_total": round(grand_total, 2),
"by_" + group_by: result,
"by_agent": agent_breakdown,
})
finally:
conn.close()
async def handle_api_source_detail(request):
"""GET /api/source/{path}
Full lifecycle of a single source: research session extraction claims eval outcomes.
"""
source_path = request.match_info["path"]
conn = _conn(request.app)
try:
# Try exact match first, fall back to suffix match (anchored)
source = conn.execute(
"SELECT * FROM sources WHERE path = ?",
(source_path,),
).fetchone()
if not source:
# Suffix match — anchor with / prefix to avoid substring hits
source = conn.execute(
"SELECT * FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_path}",),
).fetchone()
if not source:
return web.json_response({"error": "Source not found"}, status=404)
result = dict(source)
# Get research session if linked
if source["session_id"]:
session = conn.execute(
"SELECT * FROM research_sessions WHERE id = ?",
(source["session_id"],),
).fetchone()
result["research_session"] = dict(session) if session else None
else:
result["research_session"] = None
# Get PRs from this source
prs = conn.execute(
"SELECT number, status, domain, agent, tier, leo_verdict, domain_verdict, "
"cost_usd, created_at, merged_at, commit_type, transient_retries, substantive_retries, last_error "
"FROM prs WHERE source_path = ?",
(source["path"],),
).fetchall()
result["prs"] = [dict(p) for p in prs]
# Get eval events from audit_log for those PRs
# NOTE: audit_log.detail is mixed — some rows are JSON (evaluate events),
# some are plain text. Use json_valid() to filter safely.
pr_numbers = [p["number"] for p in prs]
if pr_numbers:
placeholders = ",".join("?" * len(pr_numbers))
evals = conn.execute(f"""
SELECT * FROM audit_log
WHERE stage = 'evaluate'
AND json_valid(detail)
AND json_extract(detail, '$.pr') IN ({placeholders})
ORDER BY timestamp
""", pr_numbers).fetchall()
result["eval_history"] = [
{"timestamp": e["timestamp"], "event": e["event"],
"detail": json.loads(e["detail"]) if e["detail"] else None}
for e in evals
]
else:
result["eval_history"] = []
return web.json_response(result)
finally:
conn.close()
def setup_research_routes(app):
"""Register research tracking routes. Call from create_app()."""
app.router.add_get("/api/research-sessions", handle_api_research_sessions)
app.router.add_get("/api/costs", handle_api_costs)
app.router.add_get("/api/source/{path:.+}", handle_api_source_detail)
# Public paths to add to auth middleware
RESEARCH_PUBLIC_PATHS = frozenset({
"/api/research-sessions",
"/api/costs",
})
# /api/source/{path} needs prefix matching — add to auth middleware:
# if path.startswith("/api/source/"): allow

View file

@ -0,0 +1,419 @@
"""Research session tracking + cost attribution for the Teleo pipeline.
This module adds three capabilities:
1. research_sessions table tracks WHY agents researched, what they found interesting,
session cost, and links to generated sources
2. Extraction cost attribution writes per-source cost to sources.cost_usd after extraction
3. Source claim linkage ensures prs.source_path is always populated
Designed for Epimetheus to integrate into the pipeline. Argus built the spec;
Ganymede reviews; Epimetheus wires it in.
Data flow:
Agent research session research_sessions row (with reasoning + summary)
sources created (with session_id FK)
extraction runs (cost written to sources.cost_usd + costs table)
PRs created (source_path populated)
claims merged (traceable back to session)
"""
import json
import logging
import sqlite3
from datetime import datetime
from typing import Optional
logger = logging.getLogger("research_tracking")
# ---------------------------------------------------------------------------
# Migration v11: research_sessions table + sources.session_id FK
# (v9 is current; v10 is Epimetheus's eval pipeline migration)
# ---------------------------------------------------------------------------
MIGRATION_V11_SQL = """
-- Research session tracking table
CREATE TABLE IF NOT EXISTS research_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
-- Which agent ran the research (leo, rio, astra, etc.)
domain TEXT,
-- Primary domain of the research
topic TEXT NOT NULL,
-- What they researched (short description)
reasoning TEXT,
-- WHY they chose this topic (agent's own explanation)
summary TEXT,
-- What they found most interesting/relevant
sources_planned INTEGER DEFAULT 0,
-- How many sources they intended to produce
sources_produced INTEGER DEFAULT 0,
-- How many actually materialized
model TEXT,
-- Model used for research (e.g. claude-opus-4-6)
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cost_usd REAL DEFAULT 0,
-- Total research session cost (LLM calls for discovery + writing)
status TEXT DEFAULT 'running',
-- running, completed, failed, partial
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
metadata TEXT DEFAULT '{}'
-- JSON: any extra context (prompt version, search queries used, etc.)
);
CREATE INDEX IF NOT EXISTS idx_rs_agent ON research_sessions(agent);
CREATE INDEX IF NOT EXISTS idx_rs_domain ON research_sessions(domain);
CREATE INDEX IF NOT EXISTS idx_rs_started ON research_sessions(started_at);
-- Add session_id FK to sources table
ALTER TABLE sources ADD COLUMN session_id INTEGER REFERENCES research_sessions(id);
CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
-- Record migration
INSERT INTO schema_version (version) VALUES (11);
"""
# ---------------------------------------------------------------------------
# Cost attribution: write extraction cost to sources.cost_usd
# ---------------------------------------------------------------------------
# Pricing per million tokens (as of March 2026)
MODEL_PRICING = {
"anthropic/claude-sonnet-4.5": {"input": 3.00, "output": 15.00},
"anthropic/claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"anthropic/claude-haiku-4.5": {"input": 0.80, "output": 4.00},
"anthropic/claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"minimax/minimax-m2.5": {"input": 0.14, "output": 0.56},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate USD cost from model name and token counts."""
pricing = MODEL_PRICING.get(model)
if not pricing:
# Default to Sonnet 4.5 pricing as conservative estimate
logger.warning("Unknown model %s — using Sonnet 4.5 pricing", model)
pricing = {"input": 3.00, "output": 15.00}
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
def record_extraction_cost(
conn: sqlite3.Connection,
source_path: str,
model: str,
input_tokens: int,
output_tokens: int,
):
"""Write extraction cost to both sources.cost_usd and costs table.
Call this after each successful extraction call in openrouter-extract-v2.py.
This is the missing link the CSV logger records tokens but never writes
cost back to the DB.
"""
cost = calculate_cost(model, input_tokens, output_tokens)
# Update source row
conn.execute(
"UPDATE sources SET cost_usd = cost_usd + ?, extraction_model = ? WHERE path = ?",
(cost, model, source_path),
)
# Also record in costs table for dashboard aggregation
date = datetime.utcnow().strftime("%Y-%m-%d")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, input_tokens, output_tokens, cost),
)
conn.commit()
logger.info(
"Recorded extraction cost for %s: $%.4f (%d in, %d out, %s)",
source_path, cost, input_tokens, output_tokens, model,
)
return cost
# ---------------------------------------------------------------------------
# Research session lifecycle
# ---------------------------------------------------------------------------
def start_session(
conn: sqlite3.Connection,
agent: str,
topic: str,
domain: Optional[str] = None,
reasoning: Optional[str] = None,
sources_planned: int = 0,
model: Optional[str] = None,
metadata: Optional[dict] = None,
) -> int:
"""Call at the START of a research session. Returns session_id.
The agent should call this before it begins producing sources,
explaining what it plans to research and why.
"""
cur = conn.execute(
"""INSERT INTO research_sessions
(agent, domain, topic, reasoning, sources_planned, model, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
agent,
domain,
topic,
reasoning,
sources_planned,
model,
json.dumps(metadata or {}),
),
)
conn.commit()
session_id = cur.lastrowid
logger.info("Started research session #%d: %s / %s", session_id, agent, topic)
return session_id
def link_source_to_session(
conn: sqlite3.Connection,
source_path: str,
session_id: int,
):
"""Link a source file to its research session.
Call this when a source is written to inbox/ during a research session.
"""
conn.execute(
"UPDATE sources SET session_id = ? WHERE path = ?",
(session_id, source_path),
)
conn.execute(
"""UPDATE research_sessions
SET sources_produced = sources_produced + 1
WHERE id = ?""",
(session_id,),
)
conn.commit()
def complete_session(
conn: sqlite3.Connection,
session_id: int,
summary: str,
input_tokens: int = 0,
output_tokens: int = 0,
cost_usd: float = 0,
status: str = "completed",
):
"""Call at the END of a research session.
The agent should summarize what it found most interesting/relevant.
Cost should include ALL LLM calls made during the session (web search,
analysis, source writing everything).
"""
conn.execute(
"""UPDATE research_sessions
SET summary = ?, input_tokens = ?, output_tokens = ?,
cost_usd = ?, status = ?, completed_at = datetime('now')
WHERE id = ?""",
(summary, input_tokens, output_tokens, cost_usd, status, session_id),
)
conn.commit()
logger.info("Completed research session #%d: %s", session_id, status)
# ---------------------------------------------------------------------------
# Source → PR linkage fix
# ---------------------------------------------------------------------------
def ensure_source_path_on_pr(
conn: sqlite3.Connection,
pr_number: int,
source_path: str,
):
"""Ensure prs.source_path is populated. Call during PR creation.
Currently 0/1451 PRs have source_path set. This is the fix.
"""
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ? AND (source_path IS NULL OR source_path = '')",
(source_path, pr_number),
)
conn.commit()
# ---------------------------------------------------------------------------
# Backfill: attribute extraction costs from existing CSV log
# ---------------------------------------------------------------------------
def backfill_extraction_costs(conn: sqlite3.Connection, csv_path: str):
"""One-time backfill: read openrouter-usage.csv and write costs to sources + costs tables.
Run once to fill in the ~$338 of extraction costs that were logged to CSV
but never written to the database.
Safe to re-run only updates sources where cost_usd = 0, so partial
runs can be resumed without double-counting.
"""
import csv
count = 0
total_cost = 0.0
with open(csv_path) as f:
reader = csv.DictReader(f)
for row in reader:
source_file = row.get("source_file", "")
model = row.get("model", "")
try:
in_tok = int(row.get("input_tokens", 0) or 0)
out_tok = int(row.get("output_tokens", 0) or 0)
except (ValueError, TypeError):
continue
cost = calculate_cost(model, in_tok, out_tok)
if cost <= 0:
continue
# Try to match source_file to sources.path
# CSV has filename, DB has full path — match on exact suffix
# Use ORDER BY length(path) to prefer shortest (most specific) match
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? AND cost_usd = 0 ORDER BY length(path) LIMIT 1",
(f"%/{source_file}" if "/" not in source_file else f"%{source_file}",),
).fetchone()
if matched:
conn.execute(
"UPDATE sources SET cost_usd = ?, extraction_model = ? WHERE path = ?",
(cost, model, matched[0]),
)
# Always record in costs table
date = row.get("date", "unknown")
conn.execute(
"""INSERT INTO costs (date, model, stage, calls, input_tokens, output_tokens, cost_usd)
VALUES (?, ?, 'extraction', 1, ?, ?, ?)
ON CONFLICT(date, model, stage)
DO UPDATE SET calls = calls + 1,
input_tokens = input_tokens + excluded.input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
cost_usd = cost_usd + excluded.cost_usd""",
(date, model, in_tok, out_tok, cost),
)
count += 1
total_cost += cost
conn.commit()
logger.info("Backfilled %d extraction cost records, total $%.2f", count, total_cost)
return count, total_cost
# ---------------------------------------------------------------------------
# Backfill: populate prs.source_path from branch naming convention
# ---------------------------------------------------------------------------
def backfill_source_paths(conn: sqlite3.Connection):
"""One-time backfill: derive source_path for existing PRs from branch names.
Branch format: extract/YYYY-MM-DD-source-name or similar patterns.
Source path format: inbox/queue/YYYY-MM-DD-source-name.md
"""
rows = conn.execute(
"SELECT number, branch FROM prs WHERE source_path IS NULL AND branch IS NOT NULL"
).fetchall()
count = 0
for number, branch in rows:
# Try to extract source name from branch
# Common patterns: extract/source-name, claims/source-name
parts = branch.split("/", 1)
if len(parts) < 2:
continue
source_stem = parts[1]
# Try to find matching source in DB — exact suffix match, shortest path wins
matched = conn.execute(
"SELECT path FROM sources WHERE path LIKE ? ORDER BY length(path) LIMIT 1",
(f"%/{source_stem}%" if source_stem else "",),
).fetchone()
if matched:
conn.execute(
"UPDATE prs SET source_path = ? WHERE number = ?",
(matched[0], number),
)
count += 1
conn.commit()
logger.info("Backfilled source_path for %d PRs", count)
return count
# ---------------------------------------------------------------------------
# Integration points (for Epimetheus to wire in)
# ---------------------------------------------------------------------------
INTEGRATION_GUIDE = """
## Where to wire this in
### 1. openrouter-extract-v2.py — after successful extraction call
from research_tracking import record_extraction_cost
# After line 430 (content, usage = call_openrouter(...))
# After line 672 (log_usage(...))
record_extraction_cost(
conn, args.source_file, args.model,
usage.get("prompt_tokens", 0),
usage.get("completion_tokens", 0),
)
### 2. Agent research scripts — wrap research sessions
from research_tracking import start_session, link_source_to_session, complete_session
# At start of research:
session_id = start_session(conn, agent="leo", topic="weapons stigmatization campaigns",
domain="grand-strategy",
reasoning="Following up on EU AI Act national security exclusion — exploring how stigmatization
campaigns have historically driven arms control policy",
sources_planned=6, model="claude-opus-4-6")
# As each source is written:
link_source_to_session(conn, source_path, session_id)
# At end of research:
complete_session(conn, session_id,
summary="Ottawa Treaty mine ban model is the strongest parallel to AI weapons — same
3-condition framework (humanitarian harm + low military utility + civil society
coalition). Ukraine Shahed case is a near-miss triggering event.",
input_tokens=total_in, output_tokens=total_out, cost_usd=total_cost)
### 3. PR creation in lib/merge.py or lib/validate.py — ensure source_path
from research_tracking import ensure_source_path_on_pr
# When creating a PR, pass the source:
ensure_source_path_on_pr(conn, pr_number, source_path)
### 4. One-time backfills (run manually after migration)
from research_tracking import backfill_extraction_costs, backfill_source_paths
backfill_extraction_costs(conn, "/opt/teleo-eval/logs/openrouter-usage.csv")
backfill_source_paths(conn)
### 5. Migration
Run MIGRATION_V11_SQL against pipeline.db after backing up.
"""

View file

@ -140,7 +140,7 @@ async def fetch_review_queue(
if forgejo_token:
headers["Authorization"] = f"token {forgejo_token}"
connector = aiohttp.TCPConnector(ssl=False)
connector = aiohttp.TCPConnector() # Default SSL verification — Forgejo token must not be exposed to MITM
async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
# Fetch open PRs
url = f"{FORGEJO_BASE}/repos/{REPO}/pulls?state=open&limit=50&sort=oldest"

629
diagnostics/vitality.py Normal file
View file

@ -0,0 +1,629 @@
"""Agent Vitality Diagnostics — data collection and schema.
Records daily vitality snapshots per agent across 10 dimensions.
Designed as the objective function for agent "aliveness" ranking.
Owner: Ship (data collection) + Argus (storage, API, dashboard)
Data sources: pipeline.db (read-only), claim-index API, agent-state filesystem, review_records
Dimension keys (agreed with Leo 2026-04-08):
knowledge_output, knowledge_quality, contributor_engagement,
review_performance, spend_efficiency, autonomy,
infrastructure_health, social_reach, capital, external_impact
"""
import json
import logging
import os
import sqlite3
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger("vitality")
# Known domain agents and their primary domains
AGENT_DOMAINS = {
"rio": ["internet-finance"],
"theseus": ["collective-intelligence", "living-agents"],
"astra": ["space-development", "energy", "manufacturing", "robotics"],
"vida": ["health"],
"clay": ["entertainment", "cultural-dynamics"],
"leo": ["grand-strategy", "teleohumanity"],
"hermes": [], # communications, no domain
"rhea": [], # infrastructure ops, no domain
"ganymede": [], # code review, no domain
"epimetheus": [], # pipeline, no domain
"oberon": [], # dashboard, no domain
"argus": [], # diagnostics, no domain
"ship": [], # engineering, no domain
}
# Agent file path prefixes — for matching claims by location, not just domain field.
# Handles claims in core/ and foundations/ that may not have a standard domain field
# in the claim-index (domain derived from directory path).
AGENT_PATHS = {
"rio": ["domains/internet-finance/"],
"theseus": ["domains/ai-alignment/", "core/living-agents/", "core/collective-intelligence/",
"foundations/collective-intelligence/"],
"astra": ["domains/space-development/", "domains/energy/",
"domains/manufacturing/", "domains/robotics/"],
"vida": ["domains/health/"],
"clay": ["domains/entertainment/", "foundations/cultural-dynamics/"],
"leo": ["core/grand-strategy/", "core/teleohumanity/", "core/mechanisms/",
"core/living-capital/", "foundations/teleological-economics/",
"foundations/critical-systems/"],
}
ALL_AGENTS = list(AGENT_DOMAINS.keys())
# Agent-state directory (VPS filesystem)
AGENT_STATE_DIR = Path(os.environ.get(
"AGENT_STATE_DIR", "/opt/teleo-eval/agent-state"
))
MIGRATION_SQL = """
CREATE TABLE IF NOT EXISTS vitality_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT NOT NULL,
dimension TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL DEFAULT 0,
unit TEXT NOT NULL DEFAULT '',
source TEXT,
recorded_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(agent_name, dimension, metric, recorded_at)
);
CREATE INDEX IF NOT EXISTS idx_vitality_agent_time
ON vitality_snapshots(agent_name, recorded_at);
CREATE INDEX IF NOT EXISTS idx_vitality_dimension
ON vitality_snapshots(dimension, recorded_at);
"""
# Add source column if missing (idempotent upgrade from v1 schema)
UPGRADE_SQL = """
ALTER TABLE vitality_snapshots ADD COLUMN source TEXT;
"""
def ensure_schema(db_path: str):
"""Create vitality_snapshots table if it doesn't exist."""
conn = sqlite3.connect(db_path, timeout=30)
try:
conn.executescript(MIGRATION_SQL)
try:
conn.execute(UPGRADE_SQL)
except sqlite3.OperationalError:
pass # column already exists
conn.commit()
logger.info("vitality_snapshots schema ensured")
finally:
conn.close()
def _fetch_claim_index(url: str = "http://localhost:8080/claim-index") -> dict | None:
"""Fetch claim-index from pipeline health API."""
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except Exception as e:
logger.warning("claim-index fetch failed: %s", e)
return None
def _ro_conn(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
return conn
# ---------------------------------------------------------------------------
# Dimension 1: knowledge_output — "How much has this agent produced?"
# ---------------------------------------------------------------------------
def collect_knowledge_output(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Claims merged, domain count, PRs submitted."""
metrics = []
row = conn.execute(
"SELECT COUNT(*) as cnt FROM prs WHERE agent = ? AND status = 'merged'",
(agent,),
).fetchone()
metrics.append({"metric": "claims_merged", "value": row["cnt"], "unit": "claims"})
row = conn.execute(
"SELECT COUNT(DISTINCT domain) as cnt FROM prs "
"WHERE agent = ? AND domain IS NOT NULL AND status = 'merged'",
(agent,),
).fetchone()
metrics.append({"metric": "domains_contributed", "value": row["cnt"], "unit": "domains"})
row = conn.execute(
"SELECT COUNT(*) as cnt FROM prs WHERE agent = ? AND created_at > datetime('now', '-7 days')",
(agent,),
).fetchone()
metrics.append({"metric": "prs_7d", "value": row["cnt"], "unit": "PRs"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 2: knowledge_quality — "How good is the output?"
# ---------------------------------------------------------------------------
def collect_knowledge_quality(
conn: sqlite3.Connection, claim_index: dict | None, agent: str
) -> list[dict]:
"""Evidence density, challenge rate, cross-domain links, domain coverage."""
metrics = []
agent_domains = AGENT_DOMAINS.get(agent, [])
# Challenge rate = challenge PRs / total PRs
rows = conn.execute(
"SELECT commit_type, COUNT(*) as cnt FROM prs "
"WHERE agent = ? AND commit_type IS NOT NULL GROUP BY commit_type",
(agent,),
).fetchall()
total = sum(r["cnt"] for r in rows)
type_counts = {r["commit_type"]: r["cnt"] for r in rows}
challenge_rate = type_counts.get("challenge", 0) / total if total > 0 else 0
metrics.append({"metric": "challenge_rate", "value": round(challenge_rate, 4), "unit": "ratio"})
# Activity breadth (distinct commit types)
metrics.append({"metric": "activity_breadth", "value": len(type_counts), "unit": "types"})
# Evidence density + cross-domain links from claim-index
# Match by domain field OR file path prefix (catches core/, foundations/ claims)
agent_paths = AGENT_PATHS.get(agent, [])
if claim_index and (agent_domains or agent_paths):
claims = claim_index.get("claims", [])
agent_claims = [
c for c in claims
if c.get("domain") in agent_domains
or any(c.get("file", "").startswith(p) for p in agent_paths)
]
total_claims = len(agent_claims)
# Evidence density: claims with incoming links / total claims
linked = sum(1 for c in agent_claims if c.get("incoming_count", 0) > 0)
density = linked / total_claims if total_claims > 0 else 0
metrics.append({"metric": "evidence_density", "value": round(density, 4), "unit": "ratio"})
# Cross-domain links
cross_domain = sum(
1 for c in agent_claims
for link in c.get("outgoing_links", [])
if any(d in link for d in claim_index.get("domains", {}).keys()
if d not in agent_domains)
)
metrics.append({"metric": "cross_domain_links", "value": cross_domain, "unit": "links"})
# Domain coverage: agent's claims / average domain size
domains_data = claim_index.get("domains", {})
agent_claim_count = sum(domains_data.get(d, 0) for d in agent_domains)
avg_domain_size = (sum(domains_data.values()) / len(domains_data)) if domains_data else 1
coverage = min(agent_claim_count / avg_domain_size, 1.0) if avg_domain_size > 0 else 0
metrics.append({"metric": "domain_coverage", "value": round(coverage, 4), "unit": "ratio"})
else:
metrics.append({"metric": "evidence_density", "value": 0, "unit": "ratio"})
metrics.append({"metric": "cross_domain_links", "value": 0, "unit": "links"})
metrics.append({"metric": "domain_coverage", "value": 0, "unit": "ratio"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 3: contributor_engagement — "Who contributes to this agent's domain?"
# ---------------------------------------------------------------------------
def collect_contributor_engagement(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Unique submitters to this agent's domain."""
row = conn.execute(
"SELECT COUNT(DISTINCT submitted_by) as cnt FROM prs "
"WHERE agent = ? AND submitted_by IS NOT NULL AND submitted_by != ''",
(agent,),
).fetchone()
return [
{"metric": "unique_submitters", "value": row["cnt"], "unit": "contributors"},
]
# ---------------------------------------------------------------------------
# Dimension 4: review_performance — "How good is the evaluator feedback loop?"
# ---------------------------------------------------------------------------
def collect_review_performance(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Approval rate, rejection reasons from review_records."""
metrics = []
# Check if review_records table exists
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='review_records'"
).fetchone()
if not table_check:
return [
{"metric": "approval_rate", "value": 0, "unit": "ratio"},
{"metric": "total_reviews", "value": 0, "unit": "reviews"},
]
# Overall approval rate for this agent's claims (join through prs table)
row = conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN r.outcome = 'approved' THEN 1 ELSE 0 END) as approved, "
"SUM(CASE WHEN r.outcome = 'approved-with-changes' THEN 1 ELSE 0 END) as with_changes, "
"SUM(CASE WHEN r.outcome = 'rejected' THEN 1 ELSE 0 END) as rejected "
"FROM review_records r "
"JOIN prs p ON r.pr_number = p.pr_number "
"WHERE LOWER(p.agent) = LOWER(?)",
(agent,),
).fetchone()
total = row["total"] or 0
approved = (row["approved"] or 0) + (row["with_changes"] or 0)
rejected = row["rejected"] or 0
approval_rate = approved / total if total > 0 else 0
metrics.append({"metric": "total_reviews", "value": total, "unit": "reviews"})
metrics.append({"metric": "approval_rate", "value": round(approval_rate, 4), "unit": "ratio"})
metrics.append({"metric": "approved", "value": row["approved"] or 0, "unit": "reviews"})
metrics.append({"metric": "approved_with_changes", "value": row["with_changes"] or 0, "unit": "reviews"})
metrics.append({"metric": "rejected", "value": rejected, "unit": "reviews"})
# Top rejection reasons (last 30 days)
reasons = conn.execute(
"SELECT r.rejection_reason, COUNT(*) as cnt FROM review_records r "
"JOIN prs p ON r.pr_number = p.pr_number "
"WHERE LOWER(p.agent) = LOWER(?) AND r.outcome = 'rejected' "
"AND r.rejection_reason IS NOT NULL "
"AND r.review_date > datetime('now', '-30 days') "
"GROUP BY r.rejection_reason ORDER BY cnt DESC",
(agent,),
).fetchall()
for r in reasons:
metrics.append({
"metric": f"rejection_{r['rejection_reason']}",
"value": r["cnt"],
"unit": "rejections",
})
return metrics
# ---------------------------------------------------------------------------
# Dimension 5: spend_efficiency — "What does it cost per merged claim?"
# ---------------------------------------------------------------------------
def collect_spend_efficiency(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Cost per merged claim, total spend, response costs."""
metrics = []
# Pipeline cost attributed to this agent (from prs.cost_usd)
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as merged "
"FROM prs WHERE agent = ? AND status = 'merged'",
(agent,),
).fetchone()
total_cost = row["cost"] or 0
merged = row["merged"] or 0
cost_per_claim = total_cost / merged if merged > 0 else 0
metrics.append({"metric": "total_pipeline_cost", "value": round(total_cost, 4), "unit": "USD"})
metrics.append({"metric": "cost_per_merged_claim", "value": round(cost_per_claim, 4), "unit": "USD"})
# Response audit costs (Telegram bot) — per-agent
row = conn.execute(
"SELECT COALESCE(SUM(generation_cost), 0) as cost, COUNT(*) as cnt "
"FROM response_audit WHERE agent = ?",
(agent,),
).fetchone()
metrics.append({"metric": "response_cost_total", "value": round(row["cost"], 4), "unit": "USD"})
metrics.append({"metric": "total_responses", "value": row["cnt"], "unit": "responses"})
# 24h spend snapshot
row = conn.execute(
"SELECT COALESCE(SUM(generation_cost), 0) as cost "
"FROM response_audit WHERE agent = ? AND timestamp > datetime('now', '-24 hours')",
(agent,),
).fetchone()
metrics.append({"metric": "response_cost_24h", "value": round(row["cost"], 4), "unit": "USD"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 6: autonomy — "How independently does this agent act?"
# ---------------------------------------------------------------------------
def collect_autonomy(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Self-directed actions, active days."""
metrics = []
# Autonomous responses in last 24h
row = conn.execute(
"SELECT COUNT(*) as cnt FROM response_audit "
"WHERE agent = ? AND timestamp > datetime('now', '-24 hours')",
(agent,),
).fetchone()
metrics.append({"metric": "autonomous_responses_24h", "value": row["cnt"], "unit": "actions"})
# Active days in last 7
row = conn.execute(
"SELECT COUNT(DISTINCT date(created_at)) as days FROM prs "
"WHERE agent = ? AND created_at > datetime('now', '-7 days')",
(agent,),
).fetchone()
metrics.append({"metric": "active_days_7d", "value": row["days"], "unit": "days"})
return metrics
# ---------------------------------------------------------------------------
# Dimension 7: infrastructure_health — "Is the agent's machinery working?"
# ---------------------------------------------------------------------------
def collect_infrastructure_health(conn: sqlite3.Connection, agent: str) -> list[dict]:
"""Circuit breakers, PR success rate, agent-state liveness."""
metrics = []
# Circuit breakers
rows = conn.execute(
"SELECT name, state FROM circuit_breakers WHERE name LIKE ?",
(f"%{agent}%",),
).fetchall()
open_breakers = sum(1 for r in rows if r["state"] != "closed")
metrics.append({"metric": "open_circuit_breakers", "value": open_breakers, "unit": "breakers"})
# PR success rate last 7 days
row = conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN status='merged' THEN 1 ELSE 0 END) as merged "
"FROM prs WHERE agent = ? AND created_at > datetime('now', '-7 days')",
(agent,),
).fetchone()
total = row["total"]
rate = row["merged"] / total if total > 0 else 0
metrics.append({"metric": "merge_rate_7d", "value": round(rate, 4), "unit": "ratio"})
# Agent-state liveness (read metrics.json from filesystem)
state_file = AGENT_STATE_DIR / agent / "metrics.json"
if state_file.exists():
try:
with open(state_file) as f:
state = json.load(f)
lifetime = state.get("lifetime", {})
metrics.append({
"metric": "sessions_total",
"value": lifetime.get("sessions_total", 0),
"unit": "sessions",
})
metrics.append({
"metric": "sessions_timeout",
"value": lifetime.get("sessions_timeout", 0),
"unit": "sessions",
})
metrics.append({
"metric": "sessions_error",
"value": lifetime.get("sessions_error", 0),
"unit": "sessions",
})
except (json.JSONDecodeError, OSError) as e:
logger.warning("Failed to read agent-state for %s: %s", agent, e)
return metrics
# ---------------------------------------------------------------------------
# Dimensions 8-10: Stubs (no data sources yet)
# ---------------------------------------------------------------------------
def collect_social_reach(agent: str) -> list[dict]:
"""Social dimension: stub zeros until X API accounts are active."""
return [
{"metric": "followers", "value": 0, "unit": "followers"},
{"metric": "impressions_7d", "value": 0, "unit": "impressions"},
{"metric": "engagement_rate", "value": 0, "unit": "ratio"},
]
def collect_capital(agent: str) -> list[dict]:
"""Capital dimension: stub zeros until treasury/revenue tracking exists."""
return [
{"metric": "aum", "value": 0, "unit": "USD"},
{"metric": "treasury", "value": 0, "unit": "USD"},
]
def collect_external_impact(agent: str) -> list[dict]:
"""External impact dimension: stub zeros until manual tracking exists."""
return [
{"metric": "decisions_informed", "value": 0, "unit": "decisions"},
{"metric": "deals_sourced", "value": 0, "unit": "deals"},
]
# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
DIMENSION_MAP = {
"knowledge_output": lambda conn, ci, agent: collect_knowledge_output(conn, agent),
"knowledge_quality": collect_knowledge_quality,
"contributor_engagement": lambda conn, ci, agent: collect_contributor_engagement(conn, agent),
"review_performance": lambda conn, ci, agent: collect_review_performance(conn, agent),
"spend_efficiency": lambda conn, ci, agent: collect_spend_efficiency(conn, agent),
"autonomy": lambda conn, ci, agent: collect_autonomy(conn, agent),
"infrastructure_health": lambda conn, ci, agent: collect_infrastructure_health(conn, agent),
"social_reach": lambda conn, ci, agent: collect_social_reach(agent),
"capital": lambda conn, ci, agent: collect_capital(agent),
"external_impact": lambda conn, ci, agent: collect_external_impact(agent),
}
def collect_all_for_agent(
db_path: str,
agent: str,
claim_index_url: str = "http://localhost:8080/claim-index",
) -> dict:
"""Collect all 10 vitality dimensions for a single agent.
Returns {dimension: [metrics]}.
"""
claim_index = _fetch_claim_index(claim_index_url)
conn = _ro_conn(db_path)
try:
result = {}
for dim_key, collector in DIMENSION_MAP.items():
try:
result[dim_key] = collector(conn, claim_index, agent)
except Exception as e:
logger.error("collector %s failed for %s: %s", dim_key, agent, e)
result[dim_key] = []
return result
finally:
conn.close()
def collect_system_aggregate(
db_path: str,
claim_index_url: str = "http://localhost:8080/claim-index",
) -> dict:
"""System-level aggregate vitality metrics."""
claim_index = _fetch_claim_index(claim_index_url)
conn = _ro_conn(db_path)
try:
metrics = {}
# Knowledge totals
total_claims = claim_index["total_claims"] if claim_index else 0
orphan_ratio = claim_index.get("orphan_ratio", 0) if claim_index else 0
domain_count = len(claim_index.get("domains", {})) if claim_index else 0
metrics["knowledge_output"] = [
{"metric": "total_claims", "value": total_claims, "unit": "claims"},
{"metric": "total_domains", "value": domain_count, "unit": "domains"},
{"metric": "orphan_ratio", "value": round(orphan_ratio, 4), "unit": "ratio"},
]
# Cross-domain citation rate
if claim_index:
claims = claim_index.get("claims", [])
total_links = sum(c.get("outgoing_count", 0) for c in claims)
cross_domain = 0
for c in claims:
src_domain = c.get("domain")
for link in c.get("outgoing_links", []):
linked_claims = [
x for x in claims
if x.get("stem") in link or x.get("file", "").endswith(link + ".md")
]
for lc in linked_claims:
if lc.get("domain") != src_domain:
cross_domain += 1
metrics["knowledge_quality"] = [
{"metric": "cross_domain_citation_rate",
"value": round(cross_domain / max(total_links, 1), 4),
"unit": "ratio"},
]
# Pipeline throughput
row = conn.execute(
"SELECT COUNT(*) as merged FROM prs "
"WHERE status='merged' AND merged_at > datetime('now', '-24 hours')"
).fetchone()
row2 = conn.execute("SELECT COUNT(*) as total FROM sources").fetchone()
row3 = conn.execute(
"SELECT COUNT(*) as pending FROM prs "
"WHERE status NOT IN ('merged','rejected','closed')"
).fetchone()
metrics["infrastructure_health"] = [
{"metric": "prs_merged_24h", "value": row["merged"], "unit": "PRs/day"},
{"metric": "total_sources", "value": row2["total"], "unit": "sources"},
{"metric": "queue_depth", "value": row3["pending"], "unit": "PRs"},
]
# Total spend
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd), 0) as cost "
"FROM costs WHERE date > date('now', '-1 day')"
).fetchone()
row2 = conn.execute(
"SELECT COALESCE(SUM(generation_cost), 0) as cost FROM response_audit "
"WHERE timestamp > datetime('now', '-24 hours')"
).fetchone()
metrics["spend_efficiency"] = [
{"metric": "pipeline_cost_24h", "value": round(row["cost"], 4), "unit": "USD"},
{"metric": "response_cost_24h", "value": round(row2["cost"], 4), "unit": "USD"},
{"metric": "total_cost_24h",
"value": round(row["cost"] + row2["cost"], 4), "unit": "USD"},
]
# Stubs
metrics["social_reach"] = [{"metric": "total_followers", "value": 0, "unit": "followers"}]
metrics["capital"] = [{"metric": "total_aum", "value": 0, "unit": "USD"}]
return metrics
finally:
conn.close()
def record_snapshot(
db_path: str,
claim_index_url: str = "http://localhost:8080/claim-index",
):
"""Run a full vitality snapshot — one row per agent per dimension per metric."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
rows = []
# Per-agent snapshots
for agent in ALL_AGENTS:
try:
dimensions = collect_all_for_agent(db_path, agent, claim_index_url)
for dim_name, metrics in dimensions.items():
collector_name = f"{dim_name}_collector"
for m in metrics:
rows.append((
agent, dim_name, m["metric"], m["value"],
m["unit"], collector_name, now,
))
except Exception as e:
logger.error("vitality collection failed for %s: %s", agent, e)
# System aggregate
try:
system = collect_system_aggregate(db_path, claim_index_url)
for dim_name, metrics in system.items():
for m in metrics:
rows.append((
"_system", dim_name, m["metric"], m["value"],
m["unit"], "system_aggregate", now,
))
except Exception as e:
logger.error("vitality system aggregate failed: %s", e)
# Write all rows
ensure_schema(db_path)
conn = sqlite3.connect(db_path, timeout=30)
try:
conn.executemany(
"INSERT OR REPLACE INTO vitality_snapshots "
"(agent_name, dimension, metric, value, unit, source, recorded_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
logger.info(
"vitality snapshot recorded: %d rows for %d agents + system",
len(rows), len(ALL_AGENTS),
)
return {"rows_written": len(rows), "agents": len(ALL_AGENTS), "recorded_at": now}
finally:
conn.close()
if __name__ == "__main__":
"""CLI: python3 vitality.py [db_path] — runs a snapshot."""
import sys
logging.basicConfig(level=logging.INFO)
db = sys.argv[1] if len(sys.argv) > 1 else "/opt/teleo-eval/pipeline/pipeline.db"
result = record_snapshot(db)
print(json.dumps(result, indent=2))

View file

@ -0,0 +1,293 @@
"""Vitality API routes for Argus diagnostics dashboard.
Endpoints:
GET /api/vitality latest snapshot + time-series for all agents or one
GET /api/vitality/snapshot trigger a new snapshot (POST-like via GET for cron curl)
GET /api/vitality/leaderboard agents ranked by composite vitality score
Owner: Argus
"""
import json
import logging
import sqlite3
from pathlib import Path
from aiohttp import web
from vitality import (
ALL_AGENTS,
MIGRATION_SQL,
collect_all_for_agent,
collect_system_aggregate,
record_snapshot,
)
logger = logging.getLogger("argus.vitality")
# Composite vitality weights — Leo-approved 2026-04-08
# Dimension keys match Ship's refactored vitality.py DIMENSION_MAP
VITALITY_WEIGHTS = {
"knowledge_output": 0.30, # primary output — highest weight
"knowledge_quality": 0.20, # was "diversity" — quality of output
"contributor_engagement": 0.15, # attracting external contributors
"review_performance": 0.00, # new dim, zero until review_records populated
"autonomy": 0.15, # independent action
"infrastructure_health": 0.05, # machinery working
"spend_efficiency": 0.05, # cost discipline
"social_reach": 0.00, # zero until accounts active
"capital": 0.00, # zero until treasury exists
"external_impact": 0.00, # zero until measurable
}
# Public paths (no auth required)
VITALITY_PUBLIC_PATHS = frozenset({
"/api/vitality",
"/api/vitality/snapshot",
"/api/vitality/leaderboard",
})
def _ro_conn(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=30)
conn.row_factory = sqlite3.Row
return conn
async def handle_vitality(request: web.Request) -> web.Response:
"""GET /api/vitality?agent=<name>&days=7
Returns latest snapshot and time-series data.
If agent is specified, returns that agent only. Otherwise returns all.
"""
db_path = request.app["db_path"]
agent = request.query.get("agent")
try:
days = min(int(request.query.get("days", "7")), 90)
except ValueError:
days = 7
conn = _ro_conn(db_path)
try:
# Check if table exists
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'"
).fetchone()
if not table_check:
return web.json_response({
"error": "No vitality data yet. Trigger a snapshot first via /api/vitality/snapshot",
"has_data": False
})
# Latest snapshot timestamp
latest = conn.execute(
"SELECT MAX(recorded_at) as ts FROM vitality_snapshots"
).fetchone()
latest_ts = latest["ts"] if latest else None
if not latest_ts:
return web.json_response({"has_data": False})
# Latest snapshot data
if agent:
agents_filter = [agent]
else:
agents_filter = ALL_AGENTS + ["_system"]
result = {"latest_snapshot": latest_ts, "agents": {}}
for a in agents_filter:
rows = conn.execute(
"SELECT dimension, metric, value, unit FROM vitality_snapshots "
"WHERE agent_name = ? AND recorded_at = ?",
(a, latest_ts)
).fetchall()
if not rows:
continue
dimensions = {}
for r in rows:
dim = r["dimension"]
if dim not in dimensions:
dimensions[dim] = []
dimensions[dim].append({
"metric": r["metric"],
"value": r["value"],
"unit": r["unit"],
})
result["agents"][a] = dimensions
# Time-series for trend charts (one data point per snapshot)
ts_query_agent = agent if agent else "_system"
ts_rows = conn.execute(
"SELECT recorded_at, dimension, metric, value "
"FROM vitality_snapshots "
"WHERE agent_name = ? AND recorded_at > datetime('now', ?)"
"ORDER BY recorded_at",
(ts_query_agent, f"-{days} days")
).fetchall()
time_series = {}
for r in ts_rows:
key = f"{r['dimension']}.{r['metric']}"
if key not in time_series:
time_series[key] = []
time_series[key].append({
"t": r["recorded_at"],
"v": r["value"],
})
result["time_series"] = time_series
result["has_data"] = True
return web.json_response(result)
finally:
conn.close()
async def handle_vitality_snapshot(request: web.Request) -> web.Response:
"""GET /api/vitality/snapshot — trigger a new snapshot collection.
Used by cron: curl http://localhost:8081/api/vitality/snapshot
Requires ?confirm=1 to prevent accidental triggers from crawlers/prefetch.
"""
if request.query.get("confirm") != "1":
return web.json_response(
{"status": "noop", "error": "Add ?confirm=1 to trigger a snapshot write"},
status=400,
)
db_path = request.app["db_path"]
claim_index_url = request.app.get("claim_index_url", "http://localhost:8080/claim-index")
try:
result = record_snapshot(db_path, claim_index_url)
return web.json_response({"status": "ok", **result})
except Exception as e:
logger.error("vitality snapshot failed: %s", e)
return web.json_response({"status": "error", "error": str(e)}, status=500)
async def handle_vitality_leaderboard(request: web.Request) -> web.Response:
"""GET /api/vitality/leaderboard — agents ranked by composite vitality score.
Scoring approach:
- Each dimension gets a 0-1 normalized score based on the metric values
- Weighted sum produces composite score
- Agents ranked by composite score descending
"""
db_path = request.app["db_path"]
conn = _ro_conn(db_path)
try:
table_check = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='vitality_snapshots'"
).fetchone()
if not table_check:
return web.json_response({"error": "No vitality data yet", "has_data": False})
latest = conn.execute(
"SELECT MAX(recorded_at) as ts FROM vitality_snapshots"
).fetchone()
if not latest or not latest["ts"]:
return web.json_response({"has_data": False})
latest_ts = latest["ts"]
# Collect all agents' latest data
agent_scores = []
for agent in ALL_AGENTS:
rows = conn.execute(
"SELECT dimension, metric, value FROM vitality_snapshots "
"WHERE agent_name = ? AND recorded_at = ?",
(agent, latest_ts)
).fetchall()
if not rows:
continue
dims = {}
for r in rows:
dim = r["dimension"]
if dim not in dims:
dims[dim] = {}
dims[dim][r["metric"]] = r["value"]
# Normalize each dimension to 0-1
# Dimension keys match Ship's refactored vitality.py DIMENSION_MAP
dim_scores = {}
# knowledge_output: claims_merged (cap at 100 = 1.0)
ko = dims.get("knowledge_output", {})
claims = ko.get("claims_merged", 0)
dim_scores["knowledge_output"] = min(claims / 100, 1.0)
# knowledge_quality: challenge_rate + breadth + evidence_density + domain_coverage
kq = dims.get("knowledge_quality", {})
cr = kq.get("challenge_rate", 0)
breadth = kq.get("activity_breadth", 0)
evidence = kq.get("evidence_density", 0)
coverage = kq.get("domain_coverage", 0)
dim_scores["knowledge_quality"] = min(
(cr / 0.1 * 0.2 + breadth / 4 * 0.2 + evidence * 0.3 + coverage * 0.3), 1.0
)
# contributor_engagement: unique_submitters (cap at 5 = 1.0)
ce = dims.get("contributor_engagement", {})
dim_scores["contributor_engagement"] = min(ce.get("unique_submitters", 0) / 5, 1.0)
# review_performance: approval_rate from review_records (0 until populated)
rp = dims.get("review_performance", {})
dim_scores["review_performance"] = rp.get("approval_rate", 0)
# autonomy: active_days_7d (7 = 1.0)
am = dims.get("autonomy", {})
dim_scores["autonomy"] = min(am.get("active_days_7d", 0) / 7, 1.0)
# infrastructure_health: merge_rate_7d directly (already 0-1)
ih = dims.get("infrastructure_health", {})
dim_scores["infrastructure_health"] = ih.get("merge_rate_7d", 0)
# spend_efficiency: inverted — lower cost per claim is better
se = dims.get("spend_efficiency", {})
daily_cost = se.get("response_cost_24h", 0)
dim_scores["spend_efficiency"] = max(1.0 - daily_cost / 10.0, 0)
# Social/Capital/External: stubbed at 0
dim_scores["social_reach"] = 0
dim_scores["capital"] = 0
dim_scores["external_impact"] = 0
# Composite weighted score
composite = sum(
dim_scores.get(dim, 0) * weight
for dim, weight in VITALITY_WEIGHTS.items()
)
agent_scores.append({
"agent": agent,
"composite_score": round(composite, 4),
"dimension_scores": {k: round(v, 4) for k, v in dim_scores.items()},
"raw_highlights": {
"claims_merged": int(claims),
"merge_rate": round(ih.get("merge_rate_7d", 0) * 100, 1),
"active_days": int(am.get("active_days_7d", 0)),
"challenge_rate": round(cr * 100, 1),
"evidence_density": round(evidence * 100, 1),
},
})
# Sort by composite score descending
agent_scores.sort(key=lambda x: x["composite_score"], reverse=True)
return web.json_response({
"has_data": True,
"snapshot_at": latest_ts,
"leaderboard": agent_scores,
})
finally:
conn.close()
def register_vitality_routes(app: web.Application):
"""Register vitality endpoints on the aiohttp app."""
app.router.add_get("/api/vitality", handle_vitality)
app.router.add_get("/api/vitality/snapshot", handle_vitality_snapshot)
app.router.add_get("/api/vitality/leaderboard", handle_vitality_leaderboard)

View file

@ -9,7 +9,7 @@ the same atomic-write pattern as lib-state.sh.
"""
import asyncio
import hashlib
import secrets
import json
import logging
import os
@ -116,8 +116,8 @@ def _write_inbox_message(agent: str, subject: str, body: str) -> bool:
return False
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
file_hash = hashlib.md5(f"{agent}-{subject}-{body[:200]}".encode()).hexdigest()[:8]
filename = f"cascade-{ts}-{subject[:60]}-{file_hash}.md"
nonce = secrets.token_hex(3)
filename = f"cascade-{ts}-{nonce}-{subject[:60]}.md"
final_path = inbox_dir / filename
try:

View file

@ -63,7 +63,7 @@ def _build_search_text(content: str) -> str:
return " ".join(parts)
def _add_related_edges(claim_path: str, neighbor_titles: list[str]) -> bool:
def _add_related_edges(claim_path: str, neighbor_slugs: list[str]) -> bool:
"""Add related edges to a claim's frontmatter. Returns True if modified."""
try:
with open(claim_path) as f:
@ -87,10 +87,10 @@ def _add_related_edges(claim_path: str, neighbor_titles: list[str]) -> bool:
# Add new edges
added = []
for title in neighbor_titles:
if title.strip().lower() not in existing_lower:
added.append(title)
existing_lower.add(title.strip().lower())
for slug in neighbor_slugs:
if slug.strip().lower() not in existing_lower:
added.append(slug)
existing_lower.add(slug.strip().lower())
if not added:
return False
@ -107,7 +107,6 @@ def _add_related_edges(claim_path: str, neighbor_titles: list[str]) -> bool:
def connect_new_claims(
claim_paths: list[str],
domain: str | None = None,
threshold: float = CONNECT_THRESHOLD,
max_neighbors: int = CONNECT_MAX_NEIGHBORS,
) -> dict:
@ -115,7 +114,6 @@ def connect_new_claims(
Args:
claim_paths: List of file paths to newly-written claim files.
domain: Optional domain filter for Qdrant search.
threshold: Minimum cosine similarity for connection.
max_neighbors: Maximum edges to add per claim.
@ -169,27 +167,28 @@ def connect_new_claims(
stats["skipped_no_neighbors"] += 1
continue
# Extract neighbor titles
neighbor_titles = []
# Extract neighbor slugs (filename stems, not titles — reciprocal edges need resolvable names)
neighbor_slugs = []
for hit in hits:
payload = hit.get("payload", {})
title = payload.get("claim_title", "")
if title:
neighbor_titles.append(title)
claim_path_qdrant = payload.get("claim_path", "")
if claim_path_qdrant:
slug = claim_path_qdrant.rsplit("/", 1)[-1].replace(".md", "")
neighbor_slugs.append(slug)
if not neighbor_titles:
if not neighbor_slugs:
stats["skipped_no_neighbors"] += 1
continue
# Add edges to the new claim's frontmatter
if _add_related_edges(claim_path, neighbor_titles):
if _add_related_edges(claim_path, neighbor_slugs):
stats["connected"] += 1
stats["edges_added"] += len(neighbor_titles)
stats["edges_added"] += len(neighbor_slugs)
stats["connections"].append({
"claim": os.path.basename(claim_path),
"neighbors": neighbor_titles,
"neighbors": neighbor_slugs,
})
logger.info("Connected %s%d neighbors", os.path.basename(claim_path), len(neighbor_titles))
logger.info("Connected %s%d neighbors", os.path.basename(claim_path), len(neighbor_slugs))
else:
stats["skipped_no_neighbors"] += 1

View file

@ -479,6 +479,9 @@ def migrate(conn: sqlite3.Connection):
logger.info("Migration v11: added auto_merge column to prs table")
# v12-v16 ran manually on VPS before code was version-controlled.
# Their changes are consolidated into v17+ migrations below.
if current < 17:
# Add prompt/pipeline version tracking per PR
for col, default in [

View file

@ -493,6 +493,9 @@ async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_iss
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
from . import costs
pr_cost = 0.0
# Check eval attempt budget before claiming
row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone()
eval_attempts = (row["eval_attempts"] or 0) if row else 0
@ -608,10 +611,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
json.dumps({"pr": pr_number, "tier": tier}),
)
else:
tier, triage_usage = await triage_pr(diff)
# Record triage cost
from . import costs
costs.record_usage(
tier, triage_usage, _triage_reason = await triage_pr(diff)
pr_cost += costs.record_usage(
conn, config.TRIAGE_MODEL, "eval_triage",
input_tokens=triage_usage.get("prompt_tokens", 0),
output_tokens=triage_usage.get("completion_tokens", 0),
@ -674,6 +675,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# OpenRouter failure (timeout, error) — revert to open for retry.
# NOT a rate limit — don't trigger 15-min backoff, just skip this PR.
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"}
domain_verdict = _parse_verdict(domain_review, agent)
@ -714,6 +717,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
if domain_verdict != "skipped":
pr_cost += costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {
"pr": pr_number,
"domain_verdict": domain_verdict,
@ -731,6 +743,15 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
if leo_review is None:
# DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle).
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
if domain_verdict != "skipped":
pr_cost += costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed"
return {"pr": pr_number, "skipped": True, "reason": reason}
@ -834,10 +855,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
# Record cost (only for reviews that actually ran)
from . import costs
if domain_verdict != "skipped":
costs.record_usage(
pr_cost += costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
@ -845,15 +864,23 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
if leo_verdict not in ("skipped",):
if tier == "DEEP":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
pr_cost += costs.record_usage(
conn, config.EVAL_LEO_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="max",
)
else:
costs.record_usage(
pr_cost += costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {
"pr": pr_number,
"tier": tier,

View file

@ -37,6 +37,7 @@ from .domains import agent_for_domain
from .extraction_prompt import build_extraction_prompt
from .forgejo import api as forgejo_api
from .llm import openrouter_call
from .connect import connect_new_claims
from .post_extract import load_existing_claims_from_repo, validate_and_fix_claims
from .worktree_lock import async_main_worktree_lock
@ -225,7 +226,29 @@ def _build_claim_content(claim: dict, agent: str) -> str:
body = claim.get("body", "")
scope = claim.get("scope", "")
sourcer = claim.get("sourcer", "")
related = claim.get("related_claims", [])
related_claims = claim.get("related_claims", [])
connections = claim.get("connections", [])
edge_fields = {"supports": [], "challenges": [], "related": []}
for conn in connections:
target = conn.get("target", "")
rel = conn.get("relationship", "related")
if target and rel in edge_fields:
target = target.replace(".md", "")
if target not in edge_fields[rel]:
edge_fields[rel].append(target)
for r in related_claims[:5]:
r_clean = r.replace(".md", "")
if r_clean not in edge_fields["related"]:
edge_fields["related"].append(r_clean)
edge_lines = []
for edge_type in ("supports", "challenges", "related"):
targets = edge_fields[edge_type]
if targets:
edge_lines.append(f"{edge_type}:")
for t in targets:
edge_lines.append(f" - {t}")
lines = [
"---",
@ -242,10 +265,7 @@ def _build_claim_content(claim: dict, agent: str) -> str:
lines.append(f"scope: {scope}")
if sourcer:
lines.append(f'sourcer: "{sourcer}"')
if related:
lines.append("related_claims:")
for r in related:
lines.append(f' - "[[{r}]]"')
lines.extend(edge_lines)
lines.append("---")
lines.append("")
lines.append(f"# {title}")
@ -376,6 +396,7 @@ async def _extract_one_source(
filename = c.get("filename", "")
if not filename:
continue
filename = Path(filename).name # Strip directory components — LLM output may contain path traversal
if not filename.endswith(".md"):
filename += ".md"
content = _build_claim_content(c, agent_lower)
@ -387,6 +408,7 @@ async def _extract_one_source(
filename = e.get("filename", "")
if not filename:
continue
filename = Path(filename).name # Strip directory components — LLM output may contain path traversal
if not filename.endswith(".md"):
filename += ".md"
action = e.get("action", "create")
@ -454,6 +476,19 @@ async def _extract_one_source(
await _archive_source(source_path, domain, "null-result")
return 0, 0
# Post-write: connect new claims to existing KB via vector search (non-fatal)
claim_paths = [str(worktree / f) for f in files_written if f.startswith("domains/")]
if claim_paths:
try:
connect_stats = connect_new_claims(claim_paths)
if connect_stats["connected"] > 0:
logger.info(
"Extract-connect: %d/%d claims → %d edges",
connect_stats["connected"], len(claim_paths), connect_stats["edges_added"],
)
except Exception:
logger.warning("Extract-connect failed (non-fatal)", exc_info=True)
# Stage and commit
for f in files_written:
await _git("add", f, cwd=str(EXTRACT_WORKTREE))

View file

@ -1,220 +1,94 @@
"""Stale PR monitor — auto-close extraction PRs that produced no claims.
"""Stale extraction PR cleanup — closes extraction PRs that produce no claims.
Catches the failure mode where batch-extract creates a PR but extraction
produces only source-file updates (no actual claims). These PRs sit open
indefinitely, consuming merge queue bandwidth and confusing metrics.
When an extraction PR sits open >30 min with claims_count=0, it indicates:
- Extraction failed (model couldn't extract anything useful)
- Batch job stalled (no claims written)
- Source material is empty/junk
Rules:
- PR branch starts with "extract/"
- PR is open for >30 minutes
- PR diff contains 0 files in domains/*/ or decisions/*/
Auto-close with comment, log to audit_log as stale_extraction_closed
Auto-closing prevents zombie PRs from blocking the pipeline.
Logs each close for root cause analysis (model failures, bad sources, etc.).
- If same source branch has been stale-closed 2+ times
Mark source as extraction_failed in pipeline.db sources table
Called from the pipeline daemon (piggyback on validate_cycle interval)
or standalone via: python3 -m lib.stale_pr
Owner: Epimetheus
Epimetheus owns this module.
"""
import logging
import json
import os
import re
import sqlite3
import urllib.request
from datetime import datetime, timedelta, timezone
import logging
from datetime import datetime, timezone
from . import config
from . import config, db
from .forgejo import api, repo_path
logger = logging.getLogger("pipeline.stale_pr")
STALE_THRESHOLD_MINUTES = 30
MAX_STALE_FAILURES = 2 # After this many stale closures, mark source as failed
STALE_THRESHOLD_MINUTES = 45
def _forgejo_api(method: str, path: str, body: dict | None = None) -> dict | list | None:
"""Call Forgejo API. Returns parsed JSON or None on failure."""
token_file = config.FORGEJO_TOKEN_FILE
if not token_file.exists():
logger.error("No Forgejo token at %s", token_file)
return None
token = token_file.read_text().strip()
async def check_stale_prs(conn) -> tuple[int, int]:
"""Auto-close extraction PRs open >30 min with zero claims.
url = f"{config.FORGEJO_URL}/api/v1/{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(
url,
data=data,
headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
},
method=method,
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except Exception as e:
logger.warning("Forgejo API %s %s failed: %s", method, path, e)
return None
def _pr_has_claim_files(pr_number: int) -> bool:
"""Check if a PR's diff contains any files in domains/ or decisions/."""
diff_data = _forgejo_api("GET", f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/files")
if not diff_data or not isinstance(diff_data, list):
return False
for file_entry in diff_data:
filename = file_entry.get("filename", "")
if filename.startswith("domains/") or filename.startswith("decisions/"):
# Check it's a .md file, not a directory marker
if filename.endswith(".md"):
return True
return False
def _close_pr(pr_number: int, reason: str) -> bool:
"""Close a PR with a comment explaining why."""
# Add comment
_forgejo_api("POST",
f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": f"Auto-closed by stale PR monitor: {reason}\n\nPentagon-Agent: Epimetheus"},
)
# Close PR
result = _forgejo_api("PATCH",
f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
{"state": "closed"},
)
return result is not None
def _log_audit(conn: sqlite3.Connection, pr_number: int, branch: str):
"""Log stale closure to audit_log."""
try:
conn.execute(
"INSERT INTO audit_log (timestamp, stage, event, detail) VALUES (datetime('now'), ?, ?, ?)",
("monitor", "stale_extraction_closed", json.dumps({"pr": pr_number, "branch": branch})),
)
conn.commit()
except Exception as e:
logger.warning("Audit log write failed: %s", e)
def _count_stale_closures(conn: sqlite3.Connection, branch: str) -> int:
"""Count how many times this branch has been stale-closed."""
try:
row = conn.execute(
"SELECT COUNT(*) FROM audit_log WHERE event = 'stale_extraction_closed' AND detail LIKE ?",
(f'%"branch": "{branch}"%',),
).fetchone()
return row[0] if row else 0
except Exception:
return 0
def _mark_source_failed(conn: sqlite3.Connection, branch: str):
"""Mark the source as extraction_failed after repeated stale closures."""
# Extract source name from branch: extract/source-name → source-name
source_name = branch.removeprefix("extract/")
try:
conn.execute(
"UPDATE sources SET status = 'extraction_failed', last_error = 'repeated_stale_extraction', updated_at = datetime('now') WHERE path LIKE ?",
(f"%{source_name}%",),
)
conn.commit()
logger.info("Marked source %s as extraction_failed (repeated stale closures)", source_name)
except Exception as e:
logger.warning("Failed to mark source as failed: %s", e)
def check_stale_prs(conn: sqlite3.Connection) -> tuple[int, int]:
"""Check for and close stale extraction PRs.
Returns (closed_count, error_count).
Returns (stale_closed, stale_errors) count of closed PRs and close failures.
"""
closed = 0
errors = 0
stale_closed = 0
stale_errors = 0
# Fetch all open PRs (paginated)
page = 1
all_prs = []
while True:
prs = _forgejo_api("GET",
f"repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50&page={page}")
if not prs:
break
all_prs.extend(prs)
if len(prs) < 50:
break
page += 1
# Find extraction PRs: open >30 min, source has 0 claims
stale_prs = conn.execute(
"""SELECT p.number, p.branch, p.source_path, p.created_at
FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.commit_type = 'extract'
AND datetime(p.created_at) < datetime('now', '-' || ? || ' minutes')
AND COALESCE(s.claims_count, 0) = 0""",
(STALE_THRESHOLD_MINUTES,),
).fetchall()
now = datetime.now(timezone.utc)
for pr in stale_prs:
pr_num = pr["number"]
source_path = pr["source_path"] or "unknown"
for pr in all_prs:
branch = pr.get("head", {}).get("ref", "")
if not branch.startswith("extract/"):
continue
# Check age
created_str = pr.get("created_at", "")
if not created_str:
continue
try:
# Forgejo returns ISO format with Z suffix
created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
except ValueError:
continue
# Close the PR via Forgejo
result = await api(
"PATCH",
repo_path(f"pulls/{pr_num}"),
body={"state": "closed"},
)
if result is None:
stale_errors += 1
logger.warning(
"Failed to close stale extraction PR #%d (%s, %s)",
pr_num, source_path, pr["branch"],
)
continue
age_minutes = (now - created).total_seconds() / 60
if age_minutes < STALE_THRESHOLD_MINUTES:
continue
# Update local DB status
conn.execute(
"UPDATE prs SET status = 'closed' WHERE number = ?",
(pr_num,),
)
db.audit(
conn,
"watchdog",
"stale_pr_closed",
json.dumps({
"pr": pr_num,
"branch": pr["branch"],
"source": source_path,
"open_minutes": STALE_THRESHOLD_MINUTES,
}),
)
stale_closed += 1
logger.info(
"WATCHDOG: closed stale extraction PR #%d (no claims after %d min): %s",
pr_num, STALE_THRESHOLD_MINUTES, source_path,
)
pr_number = pr["number"]
except Exception as e:
stale_errors += 1
logger.warning(
"Stale PR close exception for #%d: %s",
pr_num, e,
)
# Check if PR has claim files
if _pr_has_claim_files(pr_number):
continue # PR has claims — not stale
# PR is stale — close it
logger.info("Stale PR #%d: branch=%s, age=%.0f min, no claim files — closing",
pr_number, branch, age_minutes)
if _close_pr(pr_number, f"No claim files after {int(age_minutes)} minutes. Branch: {branch}"):
closed += 1
_log_audit(conn, pr_number, branch)
# Check for repeated failures
failure_count = _count_stale_closures(conn, branch)
if failure_count >= MAX_STALE_FAILURES:
_mark_source_failed(conn, branch)
logger.warning("Source %s marked as extraction_failed after %d stale closures",
branch, failure_count)
else:
errors += 1
logger.warning("Failed to close stale PR #%d", pr_number)
if closed:
logger.info("Stale PR monitor: closed %d PRs", closed)
return closed, errors
# Allow standalone execution
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
db_path = config.DB_PATH
if not db_path.exists():
print(f"ERROR: Database not found at {db_path}", file=sys.stderr)
sys.exit(1)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
closed, errs = check_stale_prs(conn)
print(f"Stale PR monitor: {closed} closed, {errs} errors")
conn.close()
return stale_closed, stale_errors

View file

@ -620,6 +620,27 @@ async def validate_pr(conn, pr_number: int) -> dict:
# Extract claim files (domains/, core/, foundations/)
claim_files = extract_claim_files_from_diff(diff)
# ── Backfill description (claim titles) if missing ──
# discover_external_prs creates rows without description. Extract H1 titles
# from the diff so the dashboard shows what the PR actually contains.
existing_desc = conn.execute(
"SELECT description FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if existing_desc and not (existing_desc["description"] or "").strip() and claim_files:
titles = []
for _fp, content in claim_files.items():
for line in content.split("\n"):
if line.startswith("# ") and len(line) > 3:
titles.append(line[2:].strip())
break
if titles:
desc = " | ".join(titles)
conn.execute(
"UPDATE prs SET description = ? WHERE number = ? AND (description IS NULL OR description = '')",
(desc, pr_number),
)
logger.info("PR #%d: backfilled description with %d claim titles", pr_number, len(titles))
# ── Tier 0: per-claim validation ──
# Only validates NEW files (not modified). Modified files have partial content
# from diffs (only + lines) — frontmatter parsing fails on partial content,

View file

@ -104,26 +104,83 @@ async def watchdog_check(conn) -> dict:
"action": "GC should auto-close these — check fixer.py GC logic",
})
# 5. Tier0 blockage: many PRs with tier0_pass=0 (potential validation bug)
# 5. Tier0 blockage: auto-reset stuck PRs with retry cap
MAX_TIER0_RESETS = 3
TIER0_RESET_COOLDOWN_S = 3600
tier0_blocked = conn.execute(
"SELECT COUNT(*) as n FROM prs WHERE status = 'open' AND tier0_pass = 0"
).fetchone()["n"]
if tier0_blocked >= 5:
issues.append({
"type": "tier0_blockage",
"severity": "warning",
"detail": f"{tier0_blocked} PRs blocked at tier0_pass=0",
"action": "Check validate.py — may be the modified-file or wiki-link bug recurring",
})
"SELECT number, branch FROM prs WHERE status = 'open' AND tier0_pass = 0"
).fetchall()
if tier0_blocked:
reset_count = 0
permanent_count = 0
for pr in tier0_blocked:
row = conn.execute(
"""SELECT COUNT(*) as n, MAX(timestamp) as last_ts FROM audit_log
WHERE stage = 'watchdog' AND event = 'tier0_reset'
AND json_extract(detail, '$.pr') = ?""",
(pr["number"],),
).fetchone()
prior_resets = row["n"]
if prior_resets >= MAX_TIER0_RESETS:
permanent_count += 1
continue
last_reset = row["last_ts"]
if last_reset:
try:
last_ts = datetime.fromisoformat(last_reset).replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - last_ts).total_seconds()
if age < TIER0_RESET_COOLDOWN_S:
continue
except (ValueError, TypeError):
pass
conn.execute(
"UPDATE prs SET tier0_pass = NULL WHERE number = ?",
(pr["number"],),
)
db.audit(
conn, "watchdog", "tier0_reset",
json.dumps({
"pr": pr["number"],
"branch": pr["branch"],
"attempt": prior_resets + 1,
"max": MAX_TIER0_RESETS,
}),
)
reset_count += 1
logger.info(
"WATCHDOG: auto-reset tier0 for PR #%d (attempt %d/%d)",
pr["number"], prior_resets + 1, MAX_TIER0_RESETS,
)
if reset_count:
issues.append({
"type": "tier0_reset",
"severity": "info",
"detail": f"Auto-reset {reset_count} PRs stuck at tier0_pass=0 for re-validation",
"action": "Monitor — if same PRs fail again, check validate.py",
})
if permanent_count:
issues.append({
"type": "tier0_permanent_failure",
"severity": "warning",
"detail": f"{permanent_count} PRs exhausted {MAX_TIER0_RESETS} tier0 retries — manual intervention needed",
"action": "Inspect PR content or close stale PRs",
})
# 6. Stale extraction PRs: open >30 min with no claim files
try:
stale_closed, stale_errors = check_stale_prs(conn)
stale_closed, stale_errors = await check_stale_prs(conn)
if stale_closed > 0:
issues.append({
"type": "stale_prs_closed",
"severity": "info",
"detail": f"Auto-closed {stale_closed} stale extraction PRs (no claims after {30} min)",
"detail": f"Auto-closed {stale_closed} stale extraction PRs (no claims after 30 min)",
"action": "Check batch-extract logs for extraction failures",
})
if stale_errors > 0:

92
research/entity-session.sh Executable file
View file

@ -0,0 +1,92 @@
#!/bin/bash
set -e
AGENT="rio"
BRANCH="${AGENT}/entity-population-$(date +%Y-%m-%d)"
WORKSPACE="/opt/teleo-eval/workspaces/entity-${AGENT}"
LOG="/opt/teleo-eval/logs/entity-${AGENT}.log"
BRIEF="/opt/teleo-eval/entity-research-brief.md"
SCHEMA="/opt/teleo-eval/entity-schema.md"
log() { echo "[$(date -Iseconds)] $1" | tee -a "$LOG"; }
# Setup workspace
if [ ! -d "$WORKSPACE" ]; then
log "Cloning fresh workspace..."
git clone http://localhost:3000/teleo/teleo-codex.git "$WORKSPACE"
fi
cd "$WORKSPACE"
git checkout main
git pull origin main
git checkout -b "$BRANCH"
# Copy schema into workspace
cp "$SCHEMA" schemas/entity.md
# Create entities directory
mkdir -p entities/internet-finance
log "On branch $BRANCH"
log "Starting Claude entity population session..."
# Build the prompt
PROMPT="You are Rio, the internet finance domain agent for the Teleo Codex knowledge base.
Your task: populate the first entity files for the knowledge base, focusing on the futarchic ecosystem.
RESEARCH BRIEF:
$(cat "$BRIEF")
ENTITY SCHEMA:
$(cat "$SCHEMA")
INSTRUCTIONS:
1. Read the research brief carefully
2. Read the entity schema at schemas/entity.md
3. Read existing claims in domains/internet-finance/ for context
4. Read relevant source archives in inbox/archive/
5. Use web search to find current data for each entity (market caps, metrics, recent events)
6. Create entity files in entities/internet-finance/ following the schema exactly
7. Start with the companies and people listed in the brief
8. Create the market entity for futarchic markets
9. Make sure all wiki links point to real existing files
10. Add timeline events with dates
11. Include competitive positioning for companies
12. Include known positions and credibility basis for people
Create all 12 entities listed in the brief. Quality over speed."
# Run Claude
timeout 5400 /home/teleo/.local/bin/claude -p "$PROMPT" \
--model opus \
--allowedTools Read,Write,Edit,Glob,Grep,WebSearch,WebFetch \
2>&1 | tee -a "$LOG" || true
# Commit and push
log "Session complete. Committing..."
git add entities/ schemas/entity.md
ENTITY_COUNT=$(find entities/ -name "*.md" | wc -l)
git commit -m "rio: populate ${ENTITY_COUNT} entity files — futarchic ecosystem
- What: First entity population using new entity schema
- Why: Cory directive — agents need industry analysis, not just claims
- Schema: entities track companies, people, markets with temporal data
Pentagon-Agent: Rio <CE7B8202-2877-4C70-8AAB-B05F832F50EA>" || log "Nothing to commit"
git push -u origin "$BRANCH" || log "Push failed"
# Create PR
PR_URL=$(curl -s -X POST "http://localhost:3000/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token $(cat /opt/teleo-eval/secrets/forgejo-admin-token)" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"rio: entity schema + ${ENTITY_COUNT} entity files — futarchic ecosystem\",
\"body\": \"## Summary\n\nNew entity schema + first population of entity files for the futarchic ecosystem.\n\nEntities track companies, people, and markets as dynamic objects with temporal attributes — a parallel input to beliefs alongside claims.\n\n### Entities created:\n- Companies: MetaDAO, Solomon, Ranger Finance, MycoRealms, Futardio, Aave, Polymarket\n- People: Stani Kulechov, Proph3t, Gabriel Shapiro, Felipe Montealegre\n- Markets: Futarchic Markets ecosystem\n\nDesigned by Leo, populated by Rio.\",
\"head\": \"${BRANCH}\",
\"base\": \"main\"
}" | python3 -c "import sys,json; print(json.load(sys.stdin).get(html_url,no url))")
log "PR opened: $PR_URL"
log "=== Entity session complete for ${AGENT} ==="

212
research/vida-directed-session.sh Executable file
View file

@ -0,0 +1,212 @@
#!/bin/bash
# Directed research session for Vida — MA/Senior Care/International
# Wraps research-session.sh with a custom brief injected into the prompt
set -euo pipefail
AGENT="vida"
MODEL="opus"
REPO_DIR="/opt/teleo-eval/workspaces/research-${AGENT}"
FORGEJO_URL="http://localhost:3000"
FORGEJO_ADMIN_TOKEN=$(cat /opt/teleo-eval/secrets/forgejo-admin-token)
AGENT_TOKEN=$(cat "/opt/teleo-eval/secrets/forgejo-${AGENT}-token")
CLAUDE_BIN="/home/teleo/.local/bin/claude"
LOG="/opt/teleo-eval/logs/research-${AGENT}.log"
LOCKFILE="/tmp/research-${AGENT}.lock"
DATE=$(date +%Y-%m-%d)
BRANCH="${AGENT}/research-ma-senior-care-${DATE}"
BRIEF_FILE="/opt/teleo-eval/vida-research-brief.md"
DOMAIN="health"
log() { echo "[$(date -Iseconds)] $*" >> "$LOG"; }
# Lock
if [ -f "$LOCKFILE" ]; then
pid=$(cat "$LOCKFILE" 2>/dev/null)
if kill -0 "$pid" 2>/dev/null; then
log "SKIP: research session already running for $AGENT (pid $pid)"
exit 0
fi
rm -f "$LOCKFILE"
fi
echo $$ > "$LOCKFILE"
trap 'rm -f "$LOCKFILE"' EXIT
log "=== Starting DIRECTED research session for $AGENT (model: $MODEL) ==="
log "Topic: Medicare Advantage, Senior Care, International Comparisons"
# Ensure repo
if [ ! -d "$REPO_DIR/.git" ]; then
git -c http.extraHeader="Authorization: token $FORGEJO_ADMIN_TOKEN" \
clone "${FORGEJO_URL}/teleo/teleo-codex.git" "$REPO_DIR" >> "$LOG" 2>&1
fi
cd "$REPO_DIR"
git config credential.helper "!f() { echo username=m3taversal; echo password=$FORGEJO_ADMIN_TOKEN; }; f"
git remote set-url origin "${FORGEJO_URL}/teleo/teleo-codex.git" 2>/dev/null || true
git checkout main >> "$LOG" 2>&1
git pull --rebase >> "$LOG" 2>&1 || { git rebase --abort 2>/dev/null; git reset --hard origin/main >> "$LOG" 2>&1; }
# Create branch
git branch -D "$BRANCH" 2>/dev/null || true
git checkout -b "$BRANCH" >> "$LOG" 2>&1
# Read the brief
BRIEF=$(cat "$BRIEF_FILE")
RESEARCH_PROMPT="You are Vida, a Teleo knowledge base agent specializing in health and human flourishing.
## Your Task: Directed Research Session
You have a SPECIFIC research brief from the collective. This is not self-directed — follow the brief.
### Step 1: Orient (5 min)
Read these files:
- agents/vida/identity.md
- agents/vida/beliefs.md
- agents/vida/reasoning.md
- domains/health/_map.md
### Step 2: Read Your Research Brief
${BRIEF}
### Step 3: Research via Web (75 min)
For each track, use the WebSearch and WebFetch tools to find the specific sources listed in the brief. Archive everything substantive.
**Search strategy:**
- Start with the named sources (MedPAC, KFF, Commonwealth Fund, etc.)
- Follow citations to primary data
- Look for recent (2024-2026) analysis that synthesizes historical data
- Don't just find one article per question — find the BEST source per question
For each source found, create an archive file at:
inbox/archive/YYYY-MM-DD-{author-or-org}-{brief-slug}.md
Use this frontmatter:
---
type: source
title: \"Descriptive title\"
author: \"Author or Organization\"
url: https://original-url
date: YYYY-MM-DD
domain: health
secondary_domains: []
format: report | paper | article | data
status: unprocessed
priority: high | medium | low
tags: [topic1, topic2]
---
## Content
[Key excerpts, data points, findings — enough for an extractor to work with]
## Agent Notes
**Why this matters:** [1-2 sentences connecting to beliefs]
**What surprised me:** [Anything unexpected]
**KB connections:** [Which existing health claims relate?]
**Extraction hints:** [What claims should the extractor focus on?]
## Curator Notes
PRIMARY CONNECTION: [existing claim this most relates to]
WHY ARCHIVED: [what gap this fills]
EXTRACTION HINT: [scope the extractor's attention]
### Step 3 Rules:
- Archive EVERYTHING substantive — do NOT extract claims yourself
- Set all sources to status: unprocessed
- Aim for 15-25 source archives across the three tracks
- Prioritize Track 1 (MA history) — that's the anchor
- Check inbox/archive/ for existing sources before creating duplicates
### Step 4: Write Research Musing (5 min)
Write to agents/vida/musings/research-ma-senior-care-${DATE}.md:
- What you found across the three tracks
- Key surprises or gaps
- Follow-up directions for next session
- Which of your beliefs got stronger or weaker
### Step 5: Update Research Journal (3 min)
Append to agents/vida/research-journal.md (create if needed):
## Session ${DATE} — Medicare Advantage & Senior Care
**Question:** [primary research question]
**Key finding:** [most important thing learned]
**Confidence shift:** [belief updates]
### Step 6: Stop
When done archiving and writing notes, STOP. Do not commit or push."
log "Starting Claude Opus session..."
timeout 5400 "$CLAUDE_BIN" -p "$RESEARCH_PROMPT" \
--allowedTools 'Read,Write,Edit,Glob,Grep,WebSearch,WebFetch' \
--model "$MODEL" \
--permission-mode bypassPermissions \
>> "$LOG" 2>&1 || {
log "WARN: Research session failed or timed out"
# Still try to commit whatever was produced
}
log "Claude session complete"
# Check for changes
CHANGED_FILES=$(git status --porcelain)
if [ -z "$CHANGED_FILES" ]; then
log "No sources archived"
git checkout main >> "$LOG" 2>&1
exit 0
fi
# Stage and commit
git add inbox/archive/ agents/vida/musings/ agents/vida/research-journal.md 2>/dev/null || true
if git diff --cached --quiet; then
log "No valid changes to commit"
git checkout main >> "$LOG" 2>&1
exit 0
fi
SOURCE_COUNT=$(git diff --cached --name-only | grep -c "^inbox/archive/" || echo "0")
git commit -m "vida: directed research — MA, senior care, international comparisons
- ${SOURCE_COUNT} sources archived across 3 tracks
- Track 1: Medicare Advantage history & structure
- Track 2: Senior care infrastructure
- Track 3: International health system comparisons
Pentagon-Agent: Vida <HEADLESS>" >> "$LOG" 2>&1
git push -u origin "$BRANCH" --force >> "$LOG" 2>&1
log "Pushed $BRANCH"
# Open PR
EXISTING_PR=$(curl -s "${FORGEJO_URL}/api/v1/repos/teleo/teleo-codex/pulls?state=open" \
-H "Authorization: token $AGENT_TOKEN" \
| jq -r ".[] | select(.head.ref == \"$BRANCH\") | .number" 2>/dev/null)
if [ -n "$EXISTING_PR" ]; then
log "PR already exists (#$EXISTING_PR)"
else
PR_JSON=$(jq -n \
--arg title "vida: directed research — Medicare Advantage, senior care, international comparisons" \
--arg body "## Directed Research Session
Three-track investigation commissioned by Cory:
**Track 1:** Medicare Advantage — full history from 1965 to present, risk adjustment, market structure, vertical integration
**Track 2:** Senior care infrastructure — home health, PACE, caregiver crisis, aging demographics
**Track 3:** International comparisons — Commonwealth Fund, Singapore, Costa Rica, NHS, Japan LTCI
Sources archived for extraction by the claim pipeline." \
--arg base "main" \
--arg head "$BRANCH" \
'{title: $title, body: $body, base: $base, head: $head}')
curl -s -X POST "${FORGEJO_URL}/api/v1/repos/teleo/teleo-codex/pulls" \
-H "Authorization: token $AGENT_TOKEN" \
-H "Content-Type: application/json" \
-d "$PR_JSON" >> "$LOG" 2>&1
log "PR opened"
fi
git checkout main >> "$LOG" 2>&1
log "=== Directed research session complete ==="

View file

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""Backfill reviewer_count in contributors table from prs review data.
Sources of review data:
1. leo_verdict in prs table (approve/request_changes = Leo reviewed)
2. domain_verdict + domain_agent in prs table (domain agent reviewed)
3. Forgejo API reviews (agents that submitted reviews via Forgejo)
Deduplication: If the same agent is both leo_verdict reviewer and domain_agent
on the same PR, count it once per PR.
"""
import sqlite3
import json
import os
import sys
import urllib.request
DB_PATH = os.environ.get("PIPELINE_DB", "/opt/teleo-eval/pipeline/pipeline.db")
FORGEJO_URL = "http://localhost:3000/api/v1"
REPO = "teleo/teleo-codex"
def get_forgejo_token():
token_path = "/opt/teleo-eval/secrets/forgejo-admin-token"
if os.path.exists(token_path):
return open(token_path).read().strip()
return os.environ.get("FORGEJO_TOKEN", "")
def fetch_forgejo_reviews(pr_number, token):
"""Fetch reviews from Forgejo API for a single PR."""
url = f"{FORGEJO_URL}/repos/{REPO}/pulls/{pr_number}/reviews"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
try:
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except Exception:
return []
def main():
dry_run = "--dry-run" in sys.argv
skip_forgejo = "--skip-forgejo" in sys.argv
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# Step 1: Collect review events from prs table
# reviewer -> set of PR numbers they reviewed
reviewer_prs = {}
# Leo reviews (leo_verdict = approve or request_changes)
rows = conn.execute("""
SELECT number FROM prs
WHERE status='merged' AND leo_verdict IN ('approve', 'request_changes')
""").fetchall()
leo_prs = {r["number"] for r in rows}
if leo_prs:
reviewer_prs["leo"] = leo_prs
print(f"Leo reviews from leo_verdict: {len(leo_prs)}")
# Domain agent reviews
rows = conn.execute("""
SELECT number, domain_agent FROM prs
WHERE status='merged' AND domain_verdict IN ('approve', 'request_changes')
AND domain_agent IS NOT NULL AND domain_agent != ''
""").fetchall()
for r in rows:
agent = r["domain_agent"].lower()
if agent not in reviewer_prs:
reviewer_prs[agent] = set()
reviewer_prs[agent].add(r["number"])
# Print domain agent counts (before dedup with Leo)
for agent in sorted(reviewer_prs):
if agent != "leo":
print(f" {agent} domain reviews: {len(reviewer_prs[agent])}")
# Leo as domain_agent overlaps with leo_verdict — already deduped by using sets
leo_domain = conn.execute("""
SELECT COUNT(*) as cnt FROM prs
WHERE status='merged' AND domain_agent='Leo'
AND domain_verdict IN ('approve', 'request_changes')
""").fetchone()["cnt"]
print(f" Leo as domain_agent: {leo_domain} (deduplicated into Leo's total)")
# Step 2: Optionally fetch Forgejo API reviews
if not skip_forgejo:
token = get_forgejo_token()
if token:
# Get all merged PR numbers
merged = conn.execute(
"SELECT number FROM prs WHERE status='merged'"
).fetchall()
merged_numbers = [r["number"] for r in merged]
print(f"\nFetching Forgejo reviews for {len(merged_numbers)} merged PRs...")
forgejo_count = 0
for i, pr_num in enumerate(merged_numbers):
if i % 100 == 0 and i > 0:
print(f" ...{i}/{len(merged_numbers)}")
reviews = fetch_forgejo_reviews(pr_num, token)
for review in reviews:
if review.get("state") in ("APPROVED", "REQUEST_CHANGES"):
login = review["user"]["login"].lower()
if login not in reviewer_prs:
reviewer_prs[login] = set()
reviewer_prs[login].add(pr_num)
forgejo_count += 1
print(f" Forgejo API reviews found: {forgejo_count}")
else:
print("\nNo Forgejo token found, skipping API reviews")
else:
print("\nSkipping Forgejo API reviews (--skip-forgejo)")
# Step 3: Compute final counts
print("\n--- Final reviewer counts ---")
existing = {r["handle"]: r["reviewer_count"] for r in
conn.execute("SELECT handle, reviewer_count FROM contributors").fetchall()}
updates = {}
for reviewer, prs in sorted(reviewer_prs.items()):
count = len(prs)
current = existing.get(reviewer, None)
if current is not None:
updates[reviewer] = count
print(f" {reviewer}: {current} -> {count} ({count - current:+d})")
else:
print(f" {reviewer}: {count} reviews (no contributor record, skipping)")
# Step 4: Apply updates
if dry_run:
print(f"\n[DRY RUN] Would update {len(updates)} contributors")
else:
for handle, count in updates.items():
conn.execute(
"UPDATE contributors SET reviewer_count = ?, updated_at = datetime('now') WHERE handle = ?",
(count, handle)
)
conn.commit()
print(f"\nUpdated {len(updates)} contributors")
conn.close()
if __name__ == "__main__":
main()

View file

@ -994,7 +994,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Rate limit check
if user and is_rate_limited(user.id):
await msg.reply_text("I'm processing other requests — try again in a few minutes.", quote=True)
await msg.reply_text("I'm processing other requests — try again in a few minutes.", do_quote=True)
return
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
@ -1295,7 +1295,7 @@ IMPORTANT: Special tags you can append at the end of your response (after your m
tool_calls.append({"tool": f"kb:{t.get('tool', 'unknown')}", **{k: v for k, v in t.items() if k != "tool"}})
if not response:
await msg.reply_text("Processing error — I'll get back to you.", quote=True)
await msg.reply_text("Processing error — I'll get back to you.", do_quote=True)
return
# Parse LEARNING and RESEARCH tags before posting
@ -1445,7 +1445,7 @@ IMPORTANT: Special tags you can append at the end of your response (after your m
# Post response (without tag lines)
# Telegram has a 4096 char limit — split long messages
if len(display_response) <= 4096:
await msg.reply_text(display_response, quote=True)
await msg.reply_text(display_response, do_quote=True)
else:
# Split on paragraph boundaries where possible
chunks = []

View file

@ -0,0 +1,72 @@
#!/bin/bash
# Move telegram archives + apply pending learnings. Runs every 5 min via cron.
set -euo pipefail
STAGING="/opt/teleo-eval/telegram-archives"
MAIN="/opt/teleo-eval/workspaces/main"
LOCKFILE="/opt/teleo-eval/workspaces/.main-worktree.lock"
LEARNINGS="$MAIN/agents/rio/learnings.md"
PENDING="$STAGING/pending-learnings.jsonl"
# Check if there's anything to do
HAS_ARCHIVES=$(ls "$STAGING"/*.md 2>/dev/null | head -1) || true
HAS_LEARNINGS=""
[ -s "$PENDING" ] && HAS_LEARNINGS="yes"
[ -z "$HAS_ARCHIVES" ] && [ -z "$HAS_LEARNINGS" ] && exit 0
# Acquire worktree lock
exec 9>"$LOCKFILE"
if ! flock -n 9; then
exit 0 # Lock held — skip this cycle
fi
CHANGED=0
# Move archive files
for f in $STAGING/*.md; do
[ -f "$f" ] || continue
mv "$f" "$MAIN/inbox/queue/"
CHANGED=$((CHANGED + 1))
done
# Apply pending learnings to learnings.md
if [ -s "$PENDING" ]; then
while IFS= read -r line; do
category=$(echo "$line" | python3 -c "import sys,json; print(json.load(sys.stdin).get('category','factual'))" 2>/dev/null || echo "factual")
correction=$(echo "$line" | python3 -c "import sys,json; print(json.load(sys.stdin).get('correction',''))" 2>/dev/null || echo "")
date_str=$(date +%Y-%m-%d)
[ -z "$correction" ] && continue
# Append to the right section
case "$category" in
communication)
# Find ## Communication Notes and append after it
sed -i "/^## Communication Notes/a - [$date_str] $correction" "$LEARNINGS"
;;
structured_data)
sed -i "/^## Structured Data/a - [$date_str] $correction" "$LEARNINGS"
;;
*)
sed -i "/^## Factual Corrections/a - [$date_str] $correction" "$LEARNINGS"
;;
esac
CHANGED=$((CHANGED + 1))
done < "$PENDING"
rm -f "$PENDING"
fi
if [ "$CHANGED" -gt 0 ]; then
cd "$MAIN"
git add -A inbox/queue/ agents/rio/learnings.md
git commit -m "rio: sync $CHANGED item(s) from telegram staging
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>" 2>/dev/null || true
for attempt in 1 2 3; do
git pull --rebase origin main 2>/dev/null || { git rebase --abort 2>/dev/null; continue; }
git push origin main 2>/dev/null && break
sleep 2
done
fi
flock -u 9

86
telegram/x-ingest.py Normal file
View file

@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""Pull all tweets from specified X accounts and save as JSON archives."""
import json
import sys
import time
import urllib.request
API_KEY = "new1_280dafc879374475a86a64f6f388ac22"
BASE = "https://api.twitterapi.io/twitter/user/last_tweets"
OUT_DIR = "/opt/teleo-eval/x-archives"
ACCOUNTS = [
"m3taversal",
"Living_IP",
"teLEOhuman",
"aiCLAYno",
"futaRdIO_ai",
]
import os
os.makedirs(OUT_DIR, exist_ok=True)
def fetch_page(username, cursor=None):
url = f"{BASE}?userName={username}"
if cursor:
url += f"&cursor={cursor}"
req = urllib.request.Request(url, headers={"X-API-Key": API_KEY})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except Exception as e:
print(f" ERROR fetching {username}: {e}")
return None
def pull_all_tweets(username):
all_tweets = []
cursor = None
page = 0
while True:
page += 1
print(f" Page {page} (cursor: {'yes' if cursor else 'start'})...", end=" ")
data = fetch_page(username, cursor)
if not data or data.get("status") != "success":
print(f"FAILED: {data}")
break
tweets = data.get("data", {}).get("tweets", [])
next_cursor = data.get("data", {}).get("next_cursor")
# Deduplicate
seen_ids = {t["id"] for t in all_tweets}
new_tweets = [t for t in tweets if t["id"] not in seen_ids]
all_tweets.extend(new_tweets)
print(f"{len(new_tweets)} new tweets (total: {len(all_tweets)})")
if not next_cursor or not new_tweets:
break
cursor = next_cursor
time.sleep(1) # Rate limit courtesy
return all_tweets
for account in ACCOUNTS:
print(f"\n=== @{account} ===")
tweets = pull_all_tweets(account)
# Save raw
outfile = os.path.join(OUT_DIR, f"{account}-tweets.json")
with open(outfile, "w") as f:
json.dump({"account": account, "tweet_count": len(tweets), "tweets": tweets}, f, indent=2)
print(f" Saved {len(tweets)} tweets to {outfile}")
# Quick stats
originals = [t for t in tweets if not t.get("text", "").startswith("RT @") and not t.get("isReply")]
replies = [t for t in tweets if t.get("isReply")]
rts = [t for t in tweets if t.get("text", "").startswith("RT @")]
print(f" Breakdown: {len(originals)} original, {len(replies)} replies, {len(rts)} RTs")
if originals:
top = sorted(originals, key=lambda t: int(t.get("viewCount", 0) or 0), reverse=True)[:5]
print(f" Top 5 by views:")
for t in top:
text = t["text"][:80].replace("\n", " ")
print(f" {t.get('viewCount', '?')} views | {t.get('likeCount', '?')} likes | {text}...")
print("\n=== DONE ===")