Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge
Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors
Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes
Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
720 lines
26 KiB
Python
720 lines
26 KiB
Python
"""Health API — HTTP server on configurable port for monitoring."""
|
||
|
||
import json
|
||
import logging
|
||
import statistics
|
||
from datetime import date, datetime, timezone
|
||
|
||
from aiohttp import web
|
||
|
||
from . import config, costs, db
|
||
from .analytics import get_snapshot_history, get_version_changes
|
||
from .claim_index import build_claim_index, write_claim_index
|
||
from .feedback import get_agent_error_patterns, get_all_agent_patterns
|
||
|
||
logger = logging.getLogger("pipeline.health")
|
||
|
||
|
||
def _conn(request):
|
||
"""Get the persistent readonly connection from app state."""
|
||
return request.app["db"]
|
||
|
||
|
||
async def handle_health(request):
|
||
"""GET /health — overall pipeline health."""
|
||
conn = _conn(request)
|
||
|
||
# Stage status from circuit breakers
|
||
breakers = conn.execute(
|
||
"SELECT name, state, failures, last_success_at, last_update FROM circuit_breakers"
|
||
).fetchall()
|
||
|
||
# Queue depths
|
||
sources_by_status = conn.execute("SELECT status, COUNT(*) as n FROM sources GROUP BY status").fetchall()
|
||
prs_by_status = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
|
||
|
||
# Per-domain merge queue depth (Vida)
|
||
merge_queue = conn.execute(
|
||
"SELECT domain, COUNT(*) as n FROM prs WHERE status = 'approved' GROUP BY domain"
|
||
).fetchall()
|
||
|
||
# Cost
|
||
budget = costs.check_budget(conn)
|
||
|
||
# Metabolic metrics (Vida)
|
||
null_rate = conn.execute(
|
||
"""SELECT
|
||
CAST(SUM(CASE WHEN status = 'null_result' THEN 1 ELSE 0 END) AS REAL) /
|
||
NULLIF(COUNT(*), 0) as rate
|
||
FROM sources
|
||
WHERE updated_at > datetime('now', '-24 hours')
|
||
AND status IN ('extracted', 'null_result', 'error')"""
|
||
).fetchone()
|
||
|
||
approval_rate = conn.execute(
|
||
"""SELECT
|
||
CAST(SUM(CASE WHEN domain_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
|
||
NULLIF(COUNT(*), 0) as domain_rate,
|
||
CAST(SUM(CASE WHEN leo_verdict = 'approve' THEN 1 ELSE 0 END) AS REAL) /
|
||
NULLIF(COUNT(*), 0) as leo_rate
|
||
FROM prs
|
||
WHERE last_attempt > datetime('now', '-24 hours')
|
||
AND domain_verdict != 'pending'"""
|
||
).fetchone()
|
||
|
||
# Recent activity (last hour)
|
||
recent = conn.execute(
|
||
"""SELECT stage, event, COUNT(*) as n
|
||
FROM audit_log
|
||
WHERE timestamp > datetime('now', '-1 hour')
|
||
GROUP BY stage, event"""
|
||
).fetchall()
|
||
|
||
body = {
|
||
"status": "healthy",
|
||
"breakers": {},
|
||
"sources": {r["status"]: r["n"] for r in sources_by_status},
|
||
"prs": {r["status"]: r["n"] for r in prs_by_status},
|
||
"merge_queue_by_domain": {r["domain"]: r["n"] for r in merge_queue},
|
||
"budget": budget,
|
||
"metabolic": {
|
||
"null_result_rate_24h": round(null_rate["rate"], 3)
|
||
if null_rate and null_rate["rate"] is not None
|
||
else None,
|
||
"domain_approval_rate_24h": round(approval_rate["domain_rate"], 3)
|
||
if approval_rate and approval_rate["domain_rate"] is not None
|
||
else None,
|
||
"leo_approval_rate_24h": round(approval_rate["leo_rate"], 3)
|
||
if approval_rate and approval_rate["leo_rate"] is not None
|
||
else None,
|
||
},
|
||
"recent_activity": [{"stage": r["stage"], "event": r["event"], "count": r["n"]} for r in recent],
|
||
}
|
||
|
||
# Breaker state + stall detection (Vida: last_success_at heartbeat)
|
||
for r in breakers:
|
||
breaker_info = {"state": r["state"], "failures": r["failures"]}
|
||
if r["last_success_at"]:
|
||
last = datetime.fromisoformat(r["last_success_at"])
|
||
if last.tzinfo is None:
|
||
last = last.replace(tzinfo=timezone.utc)
|
||
age_s = (datetime.now(timezone.utc) - last).total_seconds()
|
||
breaker_info["last_success_age_s"] = round(age_s)
|
||
# Stall detection: no success in 2x the stage's interval
|
||
intervals = {
|
||
"ingest": config.INGEST_INTERVAL,
|
||
"validate": config.VALIDATE_INTERVAL,
|
||
"evaluate": config.EVAL_INTERVAL,
|
||
"merge": config.MERGE_INTERVAL,
|
||
}
|
||
threshold = intervals.get(r["name"], 60) * 2
|
||
if age_s > threshold:
|
||
breaker_info["stalled"] = True
|
||
body["breakers"][r["name"]] = breaker_info
|
||
|
||
# Overall status
|
||
if any(b.get("stalled") for b in body["breakers"].values()):
|
||
body["status"] = "stalled"
|
||
if any(b["state"] == "open" for b in body["breakers"].values()):
|
||
body["status"] = "degraded"
|
||
if not budget["ok"]:
|
||
body["status"] = "budget_exhausted"
|
||
# Rubber-stamp warning (Vida)
|
||
if approval_rate and approval_rate["domain_rate"] is not None and approval_rate["domain_rate"] > 0.95:
|
||
body["metabolic"]["warning"] = "domain approval rate >95% — possible rubber-stamping"
|
||
|
||
status_code = 200 if body["status"] == "healthy" else 503
|
||
return web.json_response(body, status=status_code)
|
||
|
||
|
||
async def handle_costs(request):
|
||
"""GET /costs — daily cost breakdown."""
|
||
conn = _conn(request)
|
||
day = request.query.get("date", date.today().isoformat())
|
||
breakdown = costs.get_daily_breakdown(conn, day)
|
||
budget = costs.check_budget(conn)
|
||
return web.json_response({"date": day, "budget": budget, "breakdown": breakdown})
|
||
|
||
|
||
async def handle_sources(request):
|
||
"""GET /sources — source pipeline status."""
|
||
conn = _conn(request)
|
||
status_filter = request.query.get("status")
|
||
if status_filter:
|
||
rows = conn.execute(
|
||
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources WHERE status = ? ORDER BY updated_at DESC LIMIT 50",
|
||
(status_filter,),
|
||
).fetchall()
|
||
else:
|
||
rows = conn.execute(
|
||
"SELECT path, status, priority, claims_count, transient_retries, substantive_retries, updated_at FROM sources ORDER BY updated_at DESC LIMIT 50"
|
||
).fetchall()
|
||
return web.json_response({"sources": [dict(r) for r in rows]})
|
||
|
||
|
||
async def handle_prs(request):
|
||
"""GET /prs — PR pipeline status."""
|
||
conn = _conn(request)
|
||
status_filter = request.query.get("status")
|
||
if status_filter:
|
||
rows = conn.execute(
|
||
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs WHERE status = ? ORDER BY number DESC LIMIT 50",
|
||
(status_filter,),
|
||
).fetchall()
|
||
else:
|
||
rows = conn.execute(
|
||
"SELECT number, source_path, status, domain, tier, leo_verdict, domain_verdict, transient_retries, substantive_retries FROM prs ORDER BY number DESC LIMIT 50"
|
||
).fetchall()
|
||
return web.json_response({"prs": [dict(r) for r in rows]})
|
||
|
||
|
||
async def handle_breakers(request):
|
||
"""GET /breakers — circuit breaker states."""
|
||
conn = _conn(request)
|
||
rows = conn.execute("SELECT * FROM circuit_breakers").fetchall()
|
||
return web.json_response({"breakers": [dict(r) for r in rows]})
|
||
|
||
|
||
async def handle_calibration(request):
|
||
"""GET /calibration — priority calibration analysis (Vida)."""
|
||
conn = _conn(request)
|
||
# Find sources where eval disagreed with ingest priority
|
||
# Focus on upgrades (Theseus: upgrades are the learnable signal)
|
||
rows = conn.execute(
|
||
"""SELECT path, priority, priority_log FROM sources
|
||
WHERE json_array_length(priority_log) >= 2"""
|
||
).fetchall()
|
||
|
||
upgrades = []
|
||
downgrades = []
|
||
for r in rows:
|
||
import json
|
||
|
||
log = json.loads(r["priority_log"] or "[]")
|
||
if len(log) < 2:
|
||
continue
|
||
first = log[0]["priority"]
|
||
last = log[-1]["priority"]
|
||
levels = {"critical": 4, "high": 3, "medium": 2, "low": 1, "skip": 0}
|
||
if levels.get(last, 2) > levels.get(first, 2):
|
||
upgrades.append({"path": r["path"], "from": first, "to": last})
|
||
elif levels.get(last, 2) < levels.get(first, 2):
|
||
downgrades.append({"path": r["path"], "from": first, "to": last})
|
||
|
||
return web.json_response(
|
||
{
|
||
"upgrades": upgrades[:20],
|
||
"downgrades_count": len(downgrades),
|
||
"upgrades_count": len(upgrades),
|
||
"note": "Focus on upgrades — downgrades are expected (downstream has more context)",
|
||
}
|
||
)
|
||
|
||
|
||
async def handle_metrics(request):
|
||
"""GET /metrics — operational health metrics (Rhea).
|
||
|
||
Leo's three numbers plus rejection reasons, time-to-merge, and fix effectiveness.
|
||
Data from audit_log + prs tables. Curl-friendly JSON.
|
||
"""
|
||
conn = _conn(request)
|
||
|
||
# --- 1. Throughput: PRs processed in last hour ---
|
||
throughput = conn.execute(
|
||
"""SELECT COUNT(*) as n FROM audit_log
|
||
WHERE timestamp > datetime('now', '-1 hour')
|
||
AND event IN ('approved', 'changes_requested', 'merged')"""
|
||
).fetchone()
|
||
prs_per_hour = throughput["n"] if throughput else 0
|
||
|
||
# --- 2. Approval rate (24h) ---
|
||
verdicts_24h = conn.execute(
|
||
"""SELECT
|
||
COUNT(*) as total,
|
||
SUM(CASE WHEN status = 'merged' THEN 1 ELSE 0 END) as merged,
|
||
SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END) as approved,
|
||
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed
|
||
FROM prs
|
||
WHERE last_attempt > datetime('now', '-24 hours')"""
|
||
).fetchone()
|
||
total_24h = verdicts_24h["total"] if verdicts_24h else 0
|
||
passed_24h = (verdicts_24h["merged"] or 0) + (verdicts_24h["approved"] or 0)
|
||
approval_rate_24h = round(passed_24h / total_24h, 3) if total_24h > 0 else None
|
||
|
||
# --- 3. Backlog depth by status ---
|
||
backlog_rows = conn.execute(
|
||
"SELECT status, COUNT(*) as n FROM prs GROUP BY status"
|
||
).fetchall()
|
||
backlog = {r["status"]: r["n"] for r in backlog_rows}
|
||
|
||
# --- 4. Rejection reasons (top 10) ---
|
||
issue_rows = conn.execute(
|
||
"""SELECT eval_issues FROM prs
|
||
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
|
||
AND last_attempt > datetime('now', '-24 hours')"""
|
||
).fetchall()
|
||
tag_counts: dict[str, int] = {}
|
||
for row in issue_rows:
|
||
try:
|
||
tags = json.loads(row["eval_issues"])
|
||
except (json.JSONDecodeError, TypeError):
|
||
continue
|
||
for tag in tags:
|
||
if isinstance(tag, str):
|
||
tag_counts[tag] = tag_counts.get(tag, 0) + 1
|
||
rejection_reasons = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]
|
||
|
||
# --- 5. Median time-to-merge (24h, in minutes) ---
|
||
merge_times = conn.execute(
|
||
"""SELECT
|
||
(julianday(merged_at) - julianday(created_at)) * 24 * 60 as minutes
|
||
FROM prs
|
||
WHERE merged_at IS NOT NULL
|
||
AND merged_at > datetime('now', '-24 hours')"""
|
||
).fetchall()
|
||
durations = [r["minutes"] for r in merge_times if r["minutes"] is not None and r["minutes"] > 0]
|
||
median_ttm_minutes = round(statistics.median(durations), 1) if durations else None
|
||
|
||
# --- 6. Fix cycle effectiveness ---
|
||
fix_stats = conn.execute(
|
||
"""SELECT
|
||
COUNT(*) as attempted,
|
||
SUM(CASE WHEN status IN ('merged', 'approved') THEN 1 ELSE 0 END) as succeeded
|
||
FROM prs
|
||
WHERE fix_attempts > 0"""
|
||
).fetchone()
|
||
fix_attempted = fix_stats["attempted"] if fix_stats else 0
|
||
fix_succeeded = fix_stats["succeeded"] or 0 if fix_stats else 0
|
||
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted > 0 else None
|
||
|
||
# --- 7. Cost summary (today) ---
|
||
budget = costs.check_budget(conn)
|
||
|
||
return web.json_response({
|
||
"throughput_prs_per_hour": prs_per_hour,
|
||
"approval_rate_24h": approval_rate_24h,
|
||
"backlog": backlog,
|
||
"rejection_reasons_24h": [{"tag": t, "count": c} for t, c in rejection_reasons],
|
||
"median_time_to_merge_minutes_24h": median_ttm_minutes,
|
||
"fix_cycle": {
|
||
"attempted": fix_attempted,
|
||
"succeeded": fix_succeeded,
|
||
"success_rate": fix_rate,
|
||
},
|
||
"cost_today": budget,
|
||
"prs_with_merge_times_24h": len(durations),
|
||
"prs_evaluated_24h": total_24h,
|
||
})
|
||
|
||
|
||
async def handle_activity(request):
|
||
"""GET /activity — condensed PR activity feed (Rhea).
|
||
|
||
Recent PR outcomes at a glance. Optional ?hours=N (default 1).
|
||
Summary line at top, then individual PRs sorted most-recent-first.
|
||
"""
|
||
conn = _conn(request)
|
||
hours = int(request.query.get("hours", "1"))
|
||
|
||
# Recent PRs with activity
|
||
rows = conn.execute(
|
||
"""SELECT number, source_path, domain, status, tier,
|
||
domain_verdict, leo_verdict, eval_issues,
|
||
eval_attempts, fix_attempts, last_attempt, merged_at
|
||
FROM prs
|
||
WHERE last_attempt > datetime('now', ? || ' hours')
|
||
ORDER BY last_attempt DESC
|
||
LIMIT 50""",
|
||
(f"-{hours}",),
|
||
).fetchall()
|
||
|
||
# Summary counts
|
||
counts: dict[str, int] = {}
|
||
prs = []
|
||
for r in rows:
|
||
s = r["status"]
|
||
counts[s] = counts.get(s, 0) + 1
|
||
|
||
# Parse issues
|
||
issues = []
|
||
try:
|
||
issues = json.loads(r["eval_issues"] or "[]")
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
# Build reviewer string
|
||
reviewers = []
|
||
if r["domain_verdict"] and r["domain_verdict"] != "pending":
|
||
reviewers.append(f"domain:{r['domain_verdict']}")
|
||
if r["leo_verdict"] and r["leo_verdict"] != "pending":
|
||
reviewers.append(f"leo:{r['leo_verdict']}")
|
||
|
||
# Time since last activity
|
||
age = ""
|
||
if r["last_attempt"]:
|
||
try:
|
||
last = datetime.fromisoformat(r["last_attempt"])
|
||
if last.tzinfo is None:
|
||
last = last.replace(tzinfo=timezone.utc)
|
||
delta = datetime.now(timezone.utc) - last
|
||
mins = int(delta.total_seconds() / 60)
|
||
age = f"{mins}m" if mins < 60 else f"{mins // 60}h{mins % 60}m"
|
||
except ValueError:
|
||
pass
|
||
|
||
# Source name — strip the long path prefix
|
||
source = r["source_path"] or ""
|
||
if "/" in source:
|
||
source = source.rsplit("/", 1)[-1]
|
||
if source.endswith(".md"):
|
||
source = source[:-3]
|
||
|
||
prs.append({
|
||
"pr": r["number"],
|
||
"source": source,
|
||
"domain": r["domain"],
|
||
"status": r["status"],
|
||
"tier": r["tier"],
|
||
"issues": issues if issues else None,
|
||
"reviewers": ", ".join(reviewers) if reviewers else None,
|
||
"fixes": r["fix_attempts"] if r["fix_attempts"] else None,
|
||
"age": age,
|
||
})
|
||
|
||
return web.json_response({
|
||
"window": f"{hours}h",
|
||
"summary": counts,
|
||
"prs": prs,
|
||
})
|
||
|
||
|
||
async def handle_contributor(request):
|
||
"""GET /contributor/{handle} — contributor profile. ?detail=card|summary|full"""
|
||
conn = _conn(request)
|
||
handle = request.match_info["handle"].lower().lstrip("@")
|
||
detail = request.query.get("detail", "card")
|
||
|
||
row = conn.execute(
|
||
"SELECT * FROM contributors WHERE handle = ?", (handle,)
|
||
).fetchone()
|
||
|
||
if not row:
|
||
return web.json_response({"error": f"contributor '{handle}' not found"}, status=404)
|
||
|
||
# Card (~50 tokens)
|
||
card = {
|
||
"handle": row["handle"],
|
||
"tier": row["tier"],
|
||
"claims_merged": row["claims_merged"] or 0,
|
||
"domains": json.loads(row["domains"]) if row["domains"] else [],
|
||
"last_contribution": row["last_contribution"],
|
||
}
|
||
|
||
if detail == "card":
|
||
return web.json_response(card)
|
||
|
||
# Summary (~200 tokens) — add role counts + CI
|
||
roles = {
|
||
"sourcer": row["sourcer_count"] or 0,
|
||
"extractor": row["extractor_count"] or 0,
|
||
"challenger": row["challenger_count"] or 0,
|
||
"synthesizer": row["synthesizer_count"] or 0,
|
||
"reviewer": row["reviewer_count"] or 0,
|
||
}
|
||
|
||
# Compute CI from role counts × weights
|
||
ci_components = {}
|
||
ci_total = 0.0
|
||
for role, count in roles.items():
|
||
weight = config.CONTRIBUTION_ROLE_WEIGHTS.get(role, 0)
|
||
score = round(count * weight, 2)
|
||
ci_components[role] = score
|
||
ci_total += score
|
||
|
||
summary = {
|
||
**card,
|
||
"first_contribution": row["first_contribution"],
|
||
"agent_id": row["agent_id"],
|
||
"roles": roles,
|
||
"challenges_survived": row["challenges_survived"] or 0,
|
||
"highlights": json.loads(row["highlights"]) if row["highlights"] else [],
|
||
"ci": {
|
||
**ci_components,
|
||
"total": round(ci_total, 2),
|
||
},
|
||
}
|
||
|
||
if detail == "summary":
|
||
return web.json_response(summary)
|
||
|
||
# Full — add everything
|
||
full = {
|
||
**summary,
|
||
"identities": json.loads(row["identities"]) if row["identities"] else {},
|
||
"display_name": row["display_name"],
|
||
"created_at": row["created_at"],
|
||
"updated_at": row["updated_at"],
|
||
}
|
||
return web.json_response(full)
|
||
|
||
|
||
async def handle_contributors_list(request):
|
||
"""GET /contributors — list all contributors, sorted by CI."""
|
||
conn = _conn(request)
|
||
rows = conn.execute(
|
||
"SELECT handle, tier, claims_merged, sourcer_count, extractor_count, "
|
||
"challenger_count, synthesizer_count, reviewer_count, last_contribution "
|
||
"FROM contributors ORDER BY claims_merged DESC"
|
||
).fetchall()
|
||
|
||
contributors = []
|
||
for row in rows:
|
||
ci_total = sum(
|
||
(row[f"{role}_count"] or 0) * config.CONTRIBUTION_ROLE_WEIGHTS.get(role, 0)
|
||
for role in ("sourcer", "extractor", "challenger", "synthesizer", "reviewer")
|
||
)
|
||
contributors.append({
|
||
"handle": row["handle"],
|
||
"tier": row["tier"],
|
||
"claims_merged": row["claims_merged"] or 0,
|
||
"ci": round(ci_total, 2),
|
||
"last_contribution": row["last_contribution"],
|
||
})
|
||
|
||
return web.json_response({"contributors": contributors, "total": len(contributors)})
|
||
|
||
|
||
async def handle_dashboard(request):
|
||
"""GET /dashboard — human-readable HTML metrics page."""
|
||
conn = _conn(request)
|
||
|
||
# Gather same data as /metrics
|
||
now = datetime.now(timezone.utc)
|
||
today_str = now.strftime("%Y-%m-%d")
|
||
|
||
statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
|
||
status_map = {r["status"]: r["n"] for r in statuses}
|
||
|
||
# Approval rate (24h)
|
||
evaluated = conn.execute(
|
||
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' AND event IN ('approved','changes_requested','domain_rejected') AND timestamp > datetime('now','-24 hours')"
|
||
).fetchone()["n"]
|
||
approved = conn.execute(
|
||
"SELECT COUNT(*) as n FROM audit_log WHERE stage='evaluate' AND event='approved' AND timestamp > datetime('now','-24 hours')"
|
||
).fetchone()["n"]
|
||
approval_rate = round(approved / evaluated, 3) if evaluated else 0
|
||
|
||
# Throughput
|
||
merged_1h = conn.execute(
|
||
"SELECT COUNT(*) as n FROM prs WHERE merged_at > datetime('now','-1 hour')"
|
||
).fetchone()["n"]
|
||
|
||
# Rejection reasons
|
||
reasons = conn.execute(
|
||
"""SELECT value as tag, COUNT(*) as cnt
|
||
FROM audit_log, json_each(json_extract(detail, '$.issues'))
|
||
WHERE stage='evaluate' AND event IN ('changes_requested','domain_rejected','tier05_rejected')
|
||
AND timestamp > datetime('now','-24 hours')
|
||
GROUP BY tag ORDER BY cnt DESC LIMIT 10"""
|
||
).fetchall()
|
||
|
||
# Fix cycle
|
||
fix_attempted = conn.execute(
|
||
"SELECT COUNT(*) as n FROM prs WHERE fix_attempts > 0"
|
||
).fetchone()["n"]
|
||
fix_succeeded = conn.execute(
|
||
"SELECT COUNT(*) as n FROM prs WHERE fix_attempts > 0 AND status = 'merged'"
|
||
).fetchone()["n"]
|
||
fix_rate = round(fix_succeeded / fix_attempted, 3) if fix_attempted else 0
|
||
|
||
# Build HTML
|
||
status_rows = "".join(
|
||
f"<tr><td>{s}</td><td><strong>{status_map.get(s, 0)}</strong></td></tr>"
|
||
for s in ["open", "merged", "closed", "approved", "conflict", "reviewing"]
|
||
if status_map.get(s, 0) > 0
|
||
)
|
||
|
||
reason_rows = "".join(
|
||
f"<tr><td>{r['tag']}</td><td>{r['cnt']}</td></tr>"
|
||
for r in reasons
|
||
)
|
||
|
||
html = f"""<!DOCTYPE html>
|
||
<html><head>
|
||
<meta charset="utf-8"><title>Pipeline Dashboard</title>
|
||
<meta http-equiv="refresh" content="30">
|
||
<style>
|
||
body {{ font-family: -apple-system, system-ui, sans-serif; max-width: 900px; margin: 40px auto; padding: 0 20px; background: #0d1117; color: #c9d1d9; }}
|
||
h1 {{ color: #58a6ff; margin-bottom: 5px; }}
|
||
.subtitle {{ color: #8b949e; margin-bottom: 30px; }}
|
||
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 16px; margin-bottom: 30px; }}
|
||
.card {{ background: #161b22; border: 1px solid #30363d; border-radius: 8px; padding: 20px; }}
|
||
.card .label {{ color: #8b949e; font-size: 13px; text-transform: uppercase; letter-spacing: 0.5px; }}
|
||
.card .value {{ font-size: 32px; font-weight: 700; margin-top: 4px; }}
|
||
.green {{ color: #3fb950; }}
|
||
.yellow {{ color: #d29922; }}
|
||
.red {{ color: #f85149; }}
|
||
table {{ width: 100%; border-collapse: collapse; margin-top: 10px; }}
|
||
th, td {{ text-align: left; padding: 8px 12px; border-bottom: 1px solid #21262d; }}
|
||
th {{ color: #8b949e; font-size: 12px; text-transform: uppercase; }}
|
||
h2 {{ color: #58a6ff; margin-top: 30px; font-size: 16px; }}
|
||
</style>
|
||
</head><body>
|
||
<h1>Teleo Pipeline</h1>
|
||
<p class="subtitle">Auto-refreshes every 30s · {now.strftime("%Y-%m-%d %H:%M UTC")}</p>
|
||
|
||
<div class="grid">
|
||
<div class="card">
|
||
<div class="label">Throughput</div>
|
||
<div class="value">{merged_1h}<span style="font-size:16px;color:#8b949e">/hr</span></div>
|
||
</div>
|
||
<div class="card">
|
||
<div class="label">Approval Rate (24h)</div>
|
||
<div class="value {'green' if approval_rate > 0.3 else 'yellow' if approval_rate > 0.15 else 'red'}">{approval_rate:.1%}</div>
|
||
</div>
|
||
<div class="card">
|
||
<div class="label">Open PRs</div>
|
||
<div class="value">{status_map.get('open', 0)}</div>
|
||
</div>
|
||
<div class="card">
|
||
<div class="label">Merged</div>
|
||
<div class="value green">{status_map.get('merged', 0)}</div>
|
||
</div>
|
||
<div class="card">
|
||
<div class="label">Fix Success</div>
|
||
<div class="value {'red' if fix_rate < 0.1 else 'yellow'}">{fix_rate:.1%}</div>
|
||
</div>
|
||
<div class="card">
|
||
<div class="label">Evaluated (24h)</div>
|
||
<div class="value">{evaluated}</div>
|
||
</div>
|
||
</div>
|
||
|
||
<h2>Backlog</h2>
|
||
<table>{status_rows}</table>
|
||
|
||
<h2>Top Rejection Reasons (24h)</h2>
|
||
<table><tr><th>Issue</th><th>Count</th></tr>{reason_rows}</table>
|
||
|
||
<p style="margin-top:40px;color:#484f58;font-size:12px;">
|
||
<a href="/metrics" style="color:#484f58;">JSON API</a> ·
|
||
<a href="/health" style="color:#484f58;">Health</a> ·
|
||
<a href="/activity" style="color:#484f58;">Activity</a>
|
||
</p>
|
||
</body></html>"""
|
||
|
||
return web.Response(text=html, content_type="text/html")
|
||
|
||
|
||
async def handle_feedback(request):
|
||
"""GET /feedback/{agent} — per-agent rejection patterns with actionable guidance.
|
||
|
||
Returns top rejection reasons, approval rate, and fix instructions.
|
||
Agents query this to learn from their mistakes. (Epimetheus)
|
||
|
||
Optional ?hours=N (default 168 = 7 days).
|
||
"""
|
||
conn = _conn(request)
|
||
agent = request.match_info["agent"]
|
||
hours = int(request.query.get("hours", "168"))
|
||
result = get_agent_error_patterns(conn, agent, hours)
|
||
return web.json_response(result)
|
||
|
||
|
||
async def handle_feedback_all(request):
|
||
"""GET /feedback — rejection patterns for all agents.
|
||
|
||
Optional ?hours=N (default 168 = 7 days).
|
||
"""
|
||
conn = _conn(request)
|
||
hours = int(request.query.get("hours", "168"))
|
||
result = get_all_agent_patterns(conn, hours)
|
||
return web.json_response(result)
|
||
|
||
|
||
async def handle_claim_index(request):
|
||
"""GET /claim-index — structured index of all KB claims.
|
||
|
||
Returns full claim index with titles, domains, confidence, wiki links,
|
||
incoming/outgoing counts, orphan ratio, cross-domain link count.
|
||
Consumed by Argus (dashboard), Vida (vital signs).
|
||
|
||
Also writes to disk for file-based consumers.
|
||
"""
|
||
repo_root = str(config.MAIN_WORKTREE)
|
||
index = build_claim_index(repo_root)
|
||
|
||
# Also write to disk (atomic)
|
||
try:
|
||
write_claim_index(repo_root)
|
||
except Exception:
|
||
pass # Non-fatal — API response is primary
|
||
|
||
return web.json_response(index)
|
||
|
||
|
||
async def handle_analytics_data(request):
|
||
"""GET /analytics/data — time-series snapshot history for Chart.js.
|
||
|
||
Returns snapshot array + version change annotations.
|
||
Optional ?days=N (default 7).
|
||
"""
|
||
conn = _conn(request)
|
||
days = int(request.query.get("days", "7"))
|
||
snapshots = get_snapshot_history(conn, days)
|
||
changes = get_version_changes(conn, days)
|
||
|
||
return web.json_response({
|
||
"snapshots": snapshots,
|
||
"version_changes": changes,
|
||
"days": days,
|
||
"count": len(snapshots),
|
||
})
|
||
|
||
|
||
def create_app() -> web.Application:
|
||
"""Create the health API application."""
|
||
app = web.Application()
|
||
# Persistent readonly connection — one connection, no churn (Ganymede)
|
||
app["db"] = db.get_connection(readonly=True)
|
||
app.router.add_get("/health", handle_health)
|
||
app.router.add_get("/costs", handle_costs)
|
||
app.router.add_get("/sources", handle_sources)
|
||
app.router.add_get("/prs", handle_prs)
|
||
app.router.add_get("/breakers", handle_breakers)
|
||
app.router.add_get("/metrics", handle_metrics)
|
||
app.router.add_get("/dashboard", handle_dashboard)
|
||
app.router.add_get("/contributor/{handle}", handle_contributor)
|
||
app.router.add_get("/contributors", handle_contributors_list)
|
||
app.router.add_get("/", handle_dashboard)
|
||
app.router.add_get("/activity", handle_activity)
|
||
app.router.add_get("/calibration", handle_calibration)
|
||
app.router.add_get("/feedback/{agent}", handle_feedback)
|
||
app.router.add_get("/feedback", handle_feedback_all)
|
||
app.router.add_get("/analytics/data", handle_analytics_data)
|
||
app.router.add_get("/claim-index", handle_claim_index)
|
||
app.on_cleanup.append(_cleanup)
|
||
return app
|
||
|
||
|
||
async def _cleanup(app):
|
||
app["db"].close()
|
||
|
||
|
||
async def start_health_server(runner_ref: list):
|
||
"""Start the health HTTP server. Stores runner in runner_ref for shutdown."""
|
||
app = create_app()
|
||
runner = web.AppRunner(app)
|
||
await runner.setup()
|
||
# Bind to all interfaces — metrics are read-only, no sensitive data (Cory, Mar 14)
|
||
site = web.TCPSite(runner, "0.0.0.0", config.HEALTH_PORT)
|
||
await site.start()
|
||
runner_ref.append(runner)
|
||
logger.info("Health API listening on 0.0.0.0:%d", config.HEALTH_PORT)
|
||
|
||
|
||
async def stop_health_server(runner_ref: list):
|
||
"""Stop the health HTTP server."""
|
||
for runner in runner_ref:
|
||
await runner.cleanup()
|
||
logger.info("Health API stopped")
|