teleo-infrastructure/lib/evaluate.py
m3taversal f4dc6b39ce leo: warn on NULL source_path in _terminate_pr (Ganymede nit)
If source_path is NULL, the source requeue silently matches nothing.
Log a warning so we catch orphaned terminations in monitoring.

Pentagon-Agent: Leo <294C3CA1-0205-4668-82FA-B984D54F48AD>
2026-03-13 17:17:30 +00:00

601 lines
25 KiB
Python

"""Evaluate stage — PR lifecycle orchestration.
Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last.
Sonnet (domain review) filters before Opus (Leo review) to maximize value per
scarce Opus call.
Flow per PR:
1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT
2. Domain review → Sonnet (Claude Max, overflow: OpenRouter GPT-4o)
3. Leo review → Opus (Claude Max, overflow: queue) — skipped for LIGHT
4. DEEP cross-family → GPT-4o (OpenRouter) — only if domain + Leo both approve
5. Post reviews, submit formal Forgejo approvals, update SQLite
6. If both approve → status = 'approved' (merge module picks it up)
Design reviewed by Ganymede, Rhea, Vida, Theseus.
LLM transport and prompts extracted to lib/llm.py (Phase 3c).
"""
import json
import logging
import re
from datetime import datetime, timezone
from . import config, db
from .domains import agent_for_domain, detect_domain_from_diff
from .forgejo import api as forgejo_api
from .forgejo import get_agent_token, get_pr_diff, repo_path
from .llm import run_domain_review, run_leo_review, triage_pr
logger = logging.getLogger("pipeline.evaluate")
# ─── Diff helpers ──────────────────────────────────────────────────────────
def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/archive/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE)
skip_patterns = [r"^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/"]
core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"}
claim_sections = []
entity_sections = []
for section in sections:
if not section.strip():
continue
if any(re.match(p, section) for p in skip_patterns):
continue
entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section)
if entity_match and entity_match.group(1) not in core_domains:
entity_sections.append(section)
continue
claim_sections.append(section)
return "".join(claim_sections), "".join(entity_sections)
def _extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git")
)
def _is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if "agents/" in line and "/musings/" in line:
has_musings = True
else:
has_other = True
return has_musings and not has_other
# ─── Verdict parsing ──────────────────────────────────────────────────────
def _parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
return "approve"
elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text:
return "request_changes"
else:
logger.warning("No parseable verdict from %s — treating as request_changes", reviewer)
return "request_changes"
def _parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review."""
match = re.search(r"<!-- ISSUES: ([^>]+) -->", review_text)
if not match:
return []
return [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Retry budget helpers ─────────────────────────────────────────────────
async def _terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Close PR on Forgejo with explanation
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": f"**Closed by eval pipeline** — {reason}.\n\n"
f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source material will be re-queued for extraction with review feedback attached.\n\n"
f"See eval_issues for specific problems."},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
{"state": "closed"},
)
# Update PR status
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(reason, pr_number),
)
# Tag source for re-extraction with feedback
cursor = conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
if cursor.rowcount == 0:
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
db.audit(conn, "evaluate", "pr_terminated", json.dumps({
"pr": pr_number, "reason": reason,
}))
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
def _classify_issues(issues: list[str]) -> str:
"""Classify issue tags as 'mechanical', 'substantive', or 'mixed'."""
if not issues:
return "unknown"
mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS
substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS
if substantive and not mechanical:
return "substantive"
if mechanical and not substantive:
return "mechanical"
if mechanical and substantive:
return "mixed"
return "unknown" # tags not in either set
async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Attempt 1: normal — back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
if eval_attempts < 2:
return # Attempt 1: normal retry
classification = _classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number, eval_attempts, all_issues,
)
db.audit(conn, "evaluate", "mechanical_retry", json.dumps({
"pr": pr_number, "attempt": eval_attempts, "issues": all_issues,
}))
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number, eval_attempts, classification, all_issues,
)
await _terminate_pr(conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}")
# ─── Single PR evaluation ─────────────────────────────────────────────────
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
# Check eval attempt budget before claiming
row = conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)
).fetchone()
eval_attempts = (row["eval_attempts"] or 0) if row else 0
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal — hard cap reached. Close PR, tag source.
logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS)
await _terminate_pr(conn, pr_number, "eval budget exhausted")
return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"}
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Increment eval_attempts
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?",
(pr_number,),
)
eval_attempts += 1
# Fetch diff
diff = await get_pr_diff(pr_number)
if not diff:
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}
# Musings bypass
if _is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"""UPDATE prs SET status = 'approved', leo_verdict = 'skipped',
domain_verdict = 'skipped' WHERE number = ?""",
(pr_number,),
)
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
# Filter diff
review_diff, _entity_diff = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Detect domain
domain = detect_domain_from_diff(diff)
agent = agent_for_domain(domain)
# Default NULL domain to 'general' (archive-only PRs have no domain files)
if domain is None:
domain = "general"
# Update PR domain if not set
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?",
(domain, agent, pr_number),
)
# Step 1: Triage (if not already triaged)
if tier is None:
tier = await triage_pr(diff)
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
conn.execute(
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
(pr_number,),
)
# Check if domain review already completed (resuming after Leo rate limit)
existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone()
existing_domain_verdict = existing["domain_verdict"] if existing else "pending"
_existing_leo_verdict = existing["leo_verdict"] if existing else "pending"
# Step 2: Domain review FIRST (Sonnet — high volume filter)
# Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
if existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review = await run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# OpenRouter failure (timeout, error) — revert to open for retry.
# NOT a rate limit — don't trigger 15-min backoff, just skip this PR.
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"}
domain_verdict = _parse_verdict(domain_review, agent)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
)
# Post domain review as comment (from agent's Forgejo account)
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": domain_review},
token=agent_tok,
)
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
domain_issues = _parse_issues(domain_review) if domain_review else []
conn.execute(
"""UPDATE prs SET status = 'open', leo_verdict = 'skipped',
last_error = 'domain review requested changes',
eval_issues = ?
WHERE number = ?""",
(json.dumps(domain_issues), pr_number),
)
db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues}))
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped", "eval_attempts": eval_attempts}
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
leo_verdict = "skipped"
leo_review = None # Initialize — used later for issue extraction
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review = await run_leo_review(review_diff, files, tier)
if leo_review is None:
# DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle).
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed"
return {"pr": pr_number, "skipped": True, "reason": reason}
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": leo_review},
token=leo_tok,
)
else:
# LIGHT tier: Leo is auto-skipped, domain verdict is the only gate
conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,))
# Step 4: Determine final verdict
both_approve = (leo_verdict == "approve" or leo_verdict == "skipped") and domain_verdict == "approve"
if both_approve:
# Get PR author for formal approvals
pr_info = await forgejo_api(
"GET",
repo_path(f"pulls/{pr_number}"),
)
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
# Submit formal Forgejo reviews (required for merge)
await _post_formal_approvals(pr_number, pr_author)
conn.execute(
"UPDATE prs SET status = 'approved' WHERE number = ?",
(pr_number,),
)
db.audit(
conn,
"evaluate",
"approved",
json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent}),
)
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict)
else:
# Collect all issue tags from both reviews
all_issues = []
if domain_verdict == "request_changes" and domain_review is not None:
all_issues.extend(_parse_issues(domain_review))
if leo_verdict == "request_changes" and leo_review is not None:
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_number),
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_number),
)
db.audit(
conn,
"evaluate",
"changes_requested",
json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}),
)
logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", pr_number, leo_verdict, domain_verdict, all_issues)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
# Record cost (domain review on OpenRouter, Leo depends on tier)
from . import costs
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter")
if tier != "LIGHT":
if tier == "DEEP":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
else:
costs.record_usage(conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", backend="openrouter")
return {
"pr": pr_number,
"tier": tier,
"domain": domain,
"leo_verdict": leo_verdict,
"domain_verdict": domain_verdict,
"approved": both_approve,
}
# ─── Rate limit backoff ───────────────────────────────────────────────────
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
# CLI calls overnight when Opus is exhausted.
_rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one evaluation cycle.
Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
Evaluates in priority order.
"""
global _rate_limit_backoff_until
# Check if we're in Opus rate-limit backoff
opus_backoff = False
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining)
opus_backoff = True
else:
logger.info("Rate limit backoff expired, resuming full eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation:
# - status = 'open'
# - tier0_pass = 1 (passed validation)
# - leo_verdict = 'pending' OR domain_verdict = 'pending'
# During Opus backoff: skip DEEP PRs waiting for Leo (they need Opus).
# STANDARD PRs can overflow Leo review to GPT-4o, so let them through.
# Skip PRs attempted within last 10 minutes (backoff during rate limits)
if opus_backoff:
verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))"
else:
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
# Stagger first pass after migration: if there are previously-rejected PRs
# with eval_attempts=0 (freshly migrated), limit batch to avoid OpenRouter spike.
migrated_count = conn.execute(
"""SELECT COUNT(*) as c FROM prs
WHERE status = 'open' AND eval_attempts = 0
AND (domain_verdict NOT IN ('pending') OR leo_verdict NOT IN ('pending'))"""
).fetchone()["c"]
stagger_limit = 5 if migrated_count > 5 else None
rows = conn.execute(
f"""SELECT p.number, p.tier FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS}
{verdict_filter}
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT ?""",
(stagger_limit or max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if stagger_limit and rows:
logger.info("Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count)
if not rows:
return 0, 0
succeeded = 0
failed = 0
for row in rows:
try:
# During Opus backoff, skip DEEP PRs that already completed domain review
# (they need Opus which is rate limited). STANDARD PRs can overflow to GPT-4o.
if opus_backoff and row["tier"] == "DEEP":
existing = conn.execute(
"SELECT domain_verdict FROM prs WHERE number = ?",
(row["number"],),
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
logger.debug(
"PR #%d: skipping DEEP during Opus backoff (domain already %s)",
row["number"],
existing["domain_verdict"],
)
continue
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], reason)
if "rate_limited" in reason:
from datetime import timedelta
if reason == "opus_rate_limited":
# Opus hit — set backoff but DON'T break. Other PRs
# may still need triage (Haiku) or domain review (Sonnet).
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
opus_backoff = True # Update local flag so in-loop guard kicks in
logger.info(
"Opus rate limited — backing off Opus for %d min, continuing triage+domain",
_RATE_LIMIT_BACKOFF_MINUTES,
)
continue
else:
# Non-Opus rate limit (Sonnet/Haiku) — break the cycle,
# nothing else can proceed either.
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info(
"Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES
)
break
else:
succeeded += 1
except Exception:
logger.exception("Failed to evaluate PR #%d", row["number"])
failed += 1
# Revert to open on unhandled error
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if succeeded or failed:
logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed)
return succeeded, failed