teleo-infrastructure/lib/analytics.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

210 lines
7.9 KiB
Python

"""Analytics module — time-series metrics snapshots + chart data endpoints.
Records pipeline metrics every 15 minutes. Serves historical data for
Chart.js dashboard. Tracks source origin (agent/human/scraper) for
pipeline funnel visualization.
Priority 1 from Cory via Ganymede.
Epimetheus owns this module.
"""
import json
import logging
import re
from datetime import datetime, timezone
from . import config, db
logger = logging.getLogger("pipeline.analytics")
# ─── Snapshot recording ────────────────────────────────────────────────────
def record_snapshot(conn) -> dict:
"""Record a metrics snapshot. Called every 15 minutes by the pipeline daemon.
Returns the snapshot dict for logging/debugging.
"""
# Throughput (last hour)
throughput = conn.execute(
"""SELECT COUNT(*) as n FROM audit_log
WHERE timestamp > datetime('now', '-1 hour')
AND event IN ('approved', 'changes_requested', 'merged')"""
).fetchone()
# PR status counts
statuses = conn.execute("SELECT status, COUNT(*) as n FROM prs GROUP BY status").fetchall()
status_map = {r["status"]: r["n"] for r in statuses}
# Approval rate (24h)
verdicts = conn.execute(
"""SELECT COUNT(*) as total,
SUM(CASE WHEN status IN ('merged', 'approved') THEN 1 ELSE 0 END) as passed
FROM prs WHERE last_attempt > datetime('now', '-24 hours')"""
).fetchone()
total = verdicts["total"] or 0
passed = verdicts["passed"] or 0
approval_rate = round(passed / total, 3) if total > 0 else None
# Evaluated in 24h
evaluated = conn.execute(
"""SELECT COUNT(*) as n FROM prs
WHERE last_attempt > datetime('now', '-24 hours')
AND domain_verdict != 'pending'"""
).fetchone()
# Fix success rate
fix_stats = conn.execute(
"""SELECT COUNT(*) as attempted,
SUM(CASE WHEN status IN ('merged', 'approved') THEN 1 ELSE 0 END) as succeeded
FROM prs WHERE fix_attempts > 0"""
).fetchone()
fix_rate = round((fix_stats["succeeded"] or 0) / fix_stats["attempted"], 3) if fix_stats["attempted"] else None
# Rejection reasons (24h)
issue_rows = conn.execute(
"""SELECT eval_issues FROM prs
WHERE eval_issues IS NOT NULL AND eval_issues != '[]'
AND last_attempt > datetime('now', '-24 hours')"""
).fetchall()
tag_counts = {}
for row in issue_rows:
try:
tags = json.loads(row["eval_issues"])
for tag in tags:
if isinstance(tag, str):
tag_counts[tag] = tag_counts.get(tag, 0) + 1
except (json.JSONDecodeError, TypeError):
pass
# Source origin counts (24h) — agent vs human vs scraper
source_origins = _count_source_origins(conn)
snapshot = {
"throughput_1h": throughput["n"] if throughput else 0,
"approval_rate": approval_rate,
"open_prs": status_map.get("open", 0),
"merged_total": status_map.get("merged", 0),
"closed_total": status_map.get("closed", 0),
"conflict_total": status_map.get("conflict", 0),
"evaluated_24h": evaluated["n"] if evaluated else 0,
"fix_success_rate": fix_rate,
"rejection_broken_wiki_links": tag_counts.get("broken_wiki_links", 0),
"rejection_frontmatter_schema": tag_counts.get("frontmatter_schema", 0),
"rejection_near_duplicate": tag_counts.get("near_duplicate", 0),
"rejection_confidence": tag_counts.get("confidence_miscalibration", 0),
"rejection_other": sum(v for k, v in tag_counts.items()
if k not in ("broken_wiki_links", "frontmatter_schema",
"near_duplicate", "confidence_miscalibration")),
"extraction_model": config.EXTRACT_MODEL,
"eval_domain_model": config.EVAL_DOMAIN_MODEL,
"eval_leo_model": config.EVAL_LEO_STANDARD_MODEL,
"prompt_version": config.PROMPT_VERSION,
"pipeline_version": config.PIPELINE_VERSION,
"source_origin_agent": source_origins.get("agent", 0),
"source_origin_human": source_origins.get("human", 0),
"source_origin_scraper": source_origins.get("scraper", 0),
}
# Write to DB
conn.execute(
"""INSERT INTO metrics_snapshots (
throughput_1h, approval_rate, open_prs, merged_total, closed_total,
conflict_total, evaluated_24h, fix_success_rate,
rejection_broken_wiki_links, rejection_frontmatter_schema,
rejection_near_duplicate, rejection_confidence, rejection_other,
extraction_model, eval_domain_model, eval_leo_model,
prompt_version, pipeline_version,
source_origin_agent, source_origin_human, source_origin_scraper
) VALUES (
:throughput_1h, :approval_rate, :open_prs, :merged_total, :closed_total,
:conflict_total, :evaluated_24h, :fix_success_rate,
:rejection_broken_wiki_links, :rejection_frontmatter_schema,
:rejection_near_duplicate, :rejection_confidence, :rejection_other,
:extraction_model, :eval_domain_model, :eval_leo_model,
:prompt_version, :pipeline_version,
:source_origin_agent, :source_origin_human, :source_origin_scraper
)""",
snapshot,
)
logger.debug("Recorded metrics snapshot: approval=%.1f%%, throughput=%d/h",
(approval_rate or 0) * 100, snapshot["throughput_1h"])
return snapshot
def _count_source_origins(conn) -> dict[str, int]:
"""Count source origins from recent PRs. Returns {agent: N, human: N, scraper: N}."""
counts = {"agent": 0, "human": 0, "scraper": 0}
rows = conn.execute(
"""SELECT origin, COUNT(*) as n FROM prs
WHERE created_at > datetime('now', '-24 hours')
GROUP BY origin"""
).fetchall()
for row in rows:
origin = row["origin"] or "pipeline"
if origin == "human":
counts["human"] += row["n"]
elif origin == "pipeline":
counts["agent"] += row["n"]
else:
counts["scraper"] += row["n"]
return counts
# ─── Chart data endpoints ─────────────────────────────────────────────────
def get_snapshot_history(conn, days: int = 7) -> list[dict]:
"""Get snapshot history for charting. Returns list of snapshot dicts."""
rows = conn.execute(
"""SELECT * FROM metrics_snapshots
WHERE ts > datetime('now', ? || ' days')
ORDER BY ts ASC""",
(f"-{days}",),
).fetchall()
return [dict(row) for row in rows]
def get_version_changes(conn, days: int = 30) -> list[dict]:
"""Get points where prompt_version or pipeline_version changed.
Used for chart annotations — vertical lines marking deployments.
"""
rows = conn.execute(
"""SELECT ts, prompt_version, pipeline_version
FROM metrics_snapshots
WHERE ts > datetime('now', ? || ' days')
ORDER BY ts ASC""",
(f"-{days}",),
).fetchall()
changes = []
prev_prompt = None
prev_pipeline = None
for row in rows:
if row["prompt_version"] != prev_prompt and prev_prompt is not None:
changes.append({
"ts": row["ts"],
"type": "prompt",
"from": prev_prompt,
"to": row["prompt_version"],
})
if row["pipeline_version"] != prev_pipeline and prev_pipeline is not None:
changes.append({
"ts": row["ts"],
"type": "pipeline",
"from": prev_pipeline,
"to": row["pipeline_version"],
})
prev_prompt = row["prompt_version"]
prev_pipeline = row["pipeline_version"]
return changes