#!/usr/bin/env python3 """Pipeline health metrics — Forgejo API → stage transitions → throughput → JSON. Implements Vida's pipeline diagnostics spec (2026-03-11). Runs on VPS, outputs to /opt/teleo-eval/metrics/pipeline-YYYY-MM-DD.json """ import argparse import json import os import re import statistics import sys from datetime import datetime, timedelta, timezone import urllib.request import urllib.error BASE_URL = "https://api.github.com/repos/living-ip/decision-engine" TOKEN_FILE = "/opt/teleo-eval/secrets/github-admin-token" VERDICT_RE = re.compile(r'') OUTPUT_DIR = "/opt/teleo-eval/metrics" def api_get(path, token, page=1, per_page=50): """GET from GitHub REST API.""" sep = "&" if "?" in path else "?" url = f"{BASE_URL}{path}{sep}page={page}&per_page={per_page}" req = urllib.request.Request(url, headers={ "Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", }) try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: print(f"API error {e.code}: {path}", file=sys.stderr) return [] if "pulls" in path or "comments" in path or "commits" in path else {} def get_all_pulls(token, state="all", since=None): """Paginate through all PRs.""" all_prs = [] page = 1 while True: # GitHub: per_page (not limit) is added by api_get; sort=created&direction=desc supported. path = f"/pulls?state={state}&sort=created&direction=desc" prs = api_get(path, token, page=page) if not prs: break all_prs.extend(prs) # Stop paginating if we've gone past our time window if since and prs: oldest = parse_ts(prs[-1].get("created_at", "")) if oldest and oldest < since - timedelta(days=7): break if len(prs) < 50: # GitHub Link-header pagination would be cleaner; len-check is sufficient here break page += 1 return all_prs def get_comments(token, pr_number): """Get all comments on a PR.""" return api_get(f"/issues/{pr_number}/comments", token) def get_commits(token, pr_number): """Get all commits on a PR.""" return api_get(f"/pulls/{pr_number}/commits", token) def parse_ts(ts_str): """Parse ISO timestamp to datetime.""" if not ts_str: return None try: # Handle various formats ts_str = ts_str.replace("Z", "+00:00") return datetime.fromisoformat(ts_str) except (ValueError, TypeError): return None def hours_between(start, end): """Hours between two datetimes.""" if not start or not end: return None delta = (end - start).total_seconds() / 3600 return round(delta, 2) def parse_verdicts(comments): """Extract verdict events from PR comments, sorted by time.""" verdicts = [] for c in comments: body = c.get("body", "") ts = parse_ts(c.get("created_at")) for match in VERDICT_RE.finditer(body): verdicts.append({ "reviewer": match.group(1), "verdict": match.group(2), "ts": ts, "user": c.get("user", {}).get("login", "unknown"), }) verdicts.sort(key=lambda v: v["ts"] if v["ts"] else datetime.min.replace(tzinfo=timezone.utc)) return verdicts def detect_agent(pr): """Detect proposing agent from branch name.""" ref = pr.get("head", {}).get("ref", "") if "/" in ref: prefix = ref.split("/")[0] if prefix in ("rio", "clay", "theseus", "vida", "astra", "leo", "extract", "auto-fix"): return prefix if prefix not in ("extract", "auto-fix") else "pipeline" return "unknown" def compute_stage_durations(pr, verdicts, commits): """Compute wait times for each stage of a PR.""" created = parse_ts(pr.get("created_at")) merged = parse_ts(pr.get("merged_at")) result = { "review_wait_hrs": None, "remediation_cycles": [], "merge_wait_hrs": None, } if not verdicts: # No review yet — still in stage 1 return result # Stage 1: created → first verdict first_verdict = verdicts[0] result["review_wait_hrs"] = hours_between(created, first_verdict["ts"]) # Stage 2: REQUEST_CHANGES → next push (may repeat) commit_times = sorted([ parse_ts(c.get("created", c.get("commit", {}).get("author", {}).get("date"))) for c in commits if parse_ts(c.get("created", c.get("commit", {}).get("author", {}).get("date"))) ]) for v in verdicts: if v["verdict"] == "REQUEST_CHANGES" and v["ts"]: # Find first commit after this verdict next_push = None for ct in commit_times: if ct > v["ts"]: next_push = ct break cycle_hrs = hours_between(v["ts"], next_push) result["remediation_cycles"].append(cycle_hrs) # Stage 3: last APPROVE → merged approve_verdicts = [v for v in verdicts if v["verdict"] == "APPROVE"] if approve_verdicts: last_approve = approve_verdicts[-1] if merged: result["merge_wait_hrs"] = hours_between(last_approve["ts"], merged) else: result["merge_wait_hrs"] = "in_flight" return result def classify_pr_stage(pr, verdicts, commits): """Classify which queue a PR is currently in.""" if pr.get("merged_at"): return "merged" if pr.get("state") == "closed": return "closed" if not verdicts: return "awaiting_review" # Build per-reviewer latest verdict reviewer_verdicts = {} for v in verdicts: reviewer_verdicts[v["reviewer"]] = v # Any outstanding REQUEST_CHANGES blocks merge has_request_changes = any( v["verdict"] == "REQUEST_CHANGES" for v in reviewer_verdicts.values() ) has_any_approve = any( v["verdict"] == "APPROVE" for v in reviewer_verdicts.values() ) if has_request_changes: # Find the latest REQUEST_CHANGES rc_verdicts = [v for v in verdicts if v["verdict"] == "REQUEST_CHANGES"] latest_rc = rc_verdicts[-1] # Check if there's been a push after it commit_times = [ parse_ts(c.get("created", c.get("commit", {}).get("author", {}).get("date"))) for c in commits ] has_fix = any(ct and ct > latest_rc["ts"] for ct in commit_times if ct) if has_fix: return "awaiting_review" # Fixed, back in review queue return "awaiting_remediation" if has_any_approve and not has_request_changes: # All verdicts are APPROVE — awaiting merge return "awaiting_merge" return "awaiting_review" def percentile(data, p): """Compute percentile of a list.""" if not data: return None sorted_data = sorted(data) k = (len(sorted_data) - 1) * (p / 100) f = int(k) c = f + 1 if f + 1 < len(sorted_data) else f d = k - f return round(sorted_data[f] + d * (sorted_data[c] - sorted_data[f]), 2) def compute_throughput(prs_with_data, window_start, window_end): """Compute per-hour throughput rates within the time window.""" hours = max((window_end - window_start).total_seconds() / 3600, 1) extraction_count = 0 eval_count = 0 feedback_count = 0 merge_count = 0 for item in prs_with_data: pr = item["pr"] verdicts = item["verdicts"] commits = item["commits"] created = parse_ts(pr.get("created_at")) if created and window_start <= created <= window_end: extraction_count += 1 for v in verdicts: if v["ts"] and window_start <= v["ts"] <= window_end: eval_count += 1 merged = parse_ts(pr.get("merged_at")) if merged and window_start <= merged <= window_end: merge_count += 1 # Count feedback pushes (commits after REQUEST_CHANGES within window) rc_times = [v["ts"] for v in verdicts if v["verdict"] == "REQUEST_CHANGES" and v["ts"]] commit_times = sorted([ parse_ts(c.get("created", c.get("commit", {}).get("author", {}).get("date"))) for c in commits if parse_ts(c.get("created", c.get("commit", {}).get("author", {}).get("date"))) ]) for rc_ts in rc_times: for ct in commit_times: if ct and ct > rc_ts and window_start <= ct <= window_end: feedback_count += 1 break ext_rate = round(extraction_count / hours, 2) eval_rate = round(eval_count / hours, 2) fb_rate = round(feedback_count / hours, 2) merge_rate = round(merge_count / hours, 2) # Bottleneck: lowest throughput channel where upstream is higher channels = {"extraction": ext_rate, "eval": eval_rate, "feedback": fb_rate, "merge": merge_rate} bottleneck = "none" # Simple: if extraction > eval, bottleneck is eval. Walk the chain. if ext_rate > eval_rate and eval_rate > 0: bottleneck = "eval" elif eval_rate > fb_rate and fb_rate > 0: bottleneck = "feedback" elif fb_rate > merge_rate and merge_rate > 0: bottleneck = "merge" elif ext_rate > 0 and eval_rate == 0: bottleneck = "eval" return { "extraction_per_hr": ext_rate, "eval_per_hr": eval_rate, "feedback_per_hr": fb_rate, "merge_per_hr": merge_rate, "bottleneck": bottleneck, "queue_growth_rate": round(ext_rate - merge_rate, 2), } def main(): parser = argparse.ArgumentParser(description="Pipeline health metrics") parser.add_argument("--hours", type=int, default=24, help="Time window in hours (default: 24)") parser.add_argument("--output", help="Output file path (default: /opt/teleo-eval/metrics/pipeline-YYYY-MM-DD.json)") parser.add_argument("--max-prs", type=int, default=200, help="Max PRs to analyze") args = parser.parse_args() # Read token token = open(TOKEN_FILE).read().strip() now = datetime.now(timezone.utc) window_start = now - timedelta(hours=args.hours) window_end = now print(f"Pipeline health check: {window_start.isoformat()} → {window_end.isoformat()}") # Fetch open PRs print("Fetching open PRs...", file=sys.stderr) open_prs = get_all_pulls(token, state="open", since=window_start) print(f" {len(open_prs)} open PRs fetched", file=sys.stderr) # Fetch recently closed/merged PRs (sort by updated to get recent merges) print("Fetching recently closed PRs...", file=sys.stderr) closed_prs = api_get("/pulls?state=closed&sort=updated&direction=desc", token) # Filter to those merged/closed within our window recent_closed = [p for p in closed_prs if (parse_ts(p.get("merged_at", "")) and parse_ts(p["merged_at"]) >= window_start) or (parse_ts(p.get("closed_at", "")) and parse_ts(p["closed_at"]) >= window_start)] print(f" {len(recent_closed)} recently closed/merged PRs", file=sys.stderr) # Combine: all open + recently closed, deduplicate all_prs = open_prs + closed_prs analyze_prs = list({p["number"]: p for p in open_prs + recent_closed}.values()) analyze_prs = analyze_prs[:args.max_prs] print(f" Analyzing {len(analyze_prs)} PRs ({len(open_prs)} open, {len(recent_closed)} recently merged/closed)", file=sys.stderr) # Fetch comments + commits for each PR prs_with_data = [] for i, pr in enumerate(analyze_prs): num = pr["number"] if (i + 1) % 20 == 0: print(f" Processing PR {i+1}/{len(analyze_prs)}...", file=sys.stderr) comments = get_comments(token, num) commits = get_commits(token, num) verdicts = parse_verdicts(comments) stage = classify_pr_stage(pr, verdicts, commits) durations = compute_stage_durations(pr, verdicts, commits) prs_with_data.append({ "pr": pr, "verdicts": verdicts, "commits": commits, "stage": stage, "durations": durations, }) # Compute throughput throughput = compute_throughput(prs_with_data, window_start, window_end) # Compute wait times review_waits = [d["durations"]["review_wait_hrs"] for d in prs_with_data if d["durations"]["review_wait_hrs"] is not None] remediation_waits = [c for d in prs_with_data for c in d["durations"]["remediation_cycles"] if c is not None] merge_waits = [d["durations"]["merge_wait_hrs"] for d in prs_with_data if d["durations"]["merge_wait_hrs"] is not None and d["durations"]["merge_wait_hrs"] != "in_flight"] wait_times = { "review": { "median_hrs": round(statistics.median(review_waits), 2) if review_waits else None, "p90_hrs": percentile(review_waits, 90), "max_hrs": round(max(review_waits), 2) if review_waits else None, "n": len(review_waits), }, "remediation": { "median_hrs": round(statistics.median(remediation_waits), 2) if remediation_waits else None, "p90_hrs": percentile(remediation_waits, 90), "max_hrs": round(max(remediation_waits), 2) if remediation_waits else None, "n": len(remediation_waits), }, "merge": { "median_hrs": round(statistics.median(merge_waits), 2) if merge_waits else None, "p90_hrs": percentile(merge_waits, 90), "max_hrs": round(max(merge_waits), 2) if merge_waits else None, "n": len(merge_waits), }, } # Queue snapshot queue = { "awaiting_review": sum(1 for d in prs_with_data if d["stage"] == "awaiting_review"), "awaiting_remediation": sum(1 for d in prs_with_data if d["stage"] == "awaiting_remediation"), "awaiting_merge": sum(1 for d in prs_with_data if d["stage"] == "awaiting_merge"), "total_open": len(open_prs), } # Per-PR detail per_pr = [] for d in prs_with_data: pr = d["pr"] if pr.get("state") != "open": continue per_pr.append({ "number": pr["number"], "title": pr.get("title", "")[:100], "branch": pr.get("head", {}).get("ref", ""), "agent": detect_agent(pr), "current_stage": d["stage"], "created_at": pr.get("created_at"), "stage_durations": d["durations"], }) # Sort per_pr by longest wait first per_pr.sort(key=lambda p: p.get("stage_durations", {}).get("review_wait_hrs") or 0, reverse=True) # Build output output = { "generated": now.isoformat(), "window": { "start": window_start.isoformat(), "end": window_end.isoformat(), "hours": args.hours, }, "throughput": throughput, "wait_times": wait_times, "queue_snapshot": queue, "per_pr": per_pr, } # Write output os.makedirs(OUTPUT_DIR, exist_ok=True) output_path = args.output or os.path.join(OUTPUT_DIR, f"pipeline-{now.strftime('%Y-%m-%d')}.json") with open(output_path, "w") as f: json.dump(output, f, indent=2, default=str) # Print summary print(f"\n{'='*60}") print(f" PIPELINE HEALTH — {now.strftime('%Y-%m-%d %H:%M')} UTC") print(f" Window: {args.hours}h") print(f"{'='*60}") print(f" Throughput (per hour):") print(f" Extraction: {throughput['extraction_per_hr']}") print(f" Eval: {throughput['eval_per_hr']}") print(f" Feedback: {throughput['feedback_per_hr']}") print(f" Merge: {throughput['merge_per_hr']}") print(f" Bottleneck: {throughput['bottleneck']}") print(f" Queue growth: {throughput['queue_growth_rate']}/hr") print(f" Wait times (hours):") print(f" Review: median={wait_times['review']['median_hrs']} p90={wait_times['review']['p90_hrs']} n={wait_times['review']['n']}") print(f" Remediation: median={wait_times['remediation']['median_hrs']} p90={wait_times['remediation']['p90_hrs']} n={wait_times['remediation']['n']}") print(f" Merge: median={wait_times['merge']['median_hrs']} p90={wait_times['merge']['p90_hrs']} n={wait_times['merge']['n']}") print(f" Queue:") print(f" Awaiting review: {queue['awaiting_review']}") print(f" Awaiting remediation: {queue['awaiting_remediation']}") print(f" Awaiting merge: {queue['awaiting_merge']}") print(f" Total open: {queue['total_open']}") print(f"{'='*60}") print(f" Output: {output_path}") return 0 if __name__ == "__main__": sys.exit(main())