teleo-infrastructure/lib/evaluate.py
m3taversal f166db4f62 ganymede: fix 4 critical bugs before pipeline restart
- Fix #12: domain_review undefined on resume path — initialize to None,
  guard _parse_issues() call. Prevents NameError on PRs resuming after
  partial eval (76 PRs in this state right now).
- Fix #11: concurrent eval workers can duplicate reviews — add atomic
  UPDATE SET status='reviewing' WHERE status='open' at top of
  evaluate_pr(). Check rowcount, skip if already claimed.
- Fix #8: subprocess tracking for graceful shutdown — _active_subprocesses
  set in evaluate module, tracked in _claude_cli_call, exposed via
  kill_active_subprocesses(). Replaces dead code in teleo-pipeline.py.
- Fix health.py divide-by-zero — guard all metabolic metric reads against
  None from NULLIF/empty result set. Prevents TypeError on /health when
  no PRs have been evaluated in 24h.

Also includes Leo's existing hot-fixes:
- Rate limit detection checks stdout regardless of exit code
- 15-minute cycle-level backoff on rate limit

Pentagon-Agent: Ganymede <F99EBFA6-547B-4096-BEEA-1D59C3E4028A>
2026-03-13 14:13:25 +00:00

789 lines
32 KiB
Python

"""Evaluate stage — triage + domain review + Leo review.
Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last.
Sonnet (domain review) filters before Opus (Leo review) to maximize value per
scarce Opus call.
Flow per PR:
1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT
2. Domain review → Sonnet (Claude Max, overflow: OpenRouter GPT-4o)
3. Leo review → Opus (Claude Max, overflow: queue) — skipped for LIGHT
4. DEEP cross-family → GPT-4o (OpenRouter) — only if domain + Leo both approve
5. Post reviews, submit formal Forgejo approvals, update SQLite
6. If both approve → status = 'approved' (merge module picks it up)
Design reviewed by Ganymede, Rhea, Vida, Theseus.
"""
import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from . import config, db
logger = logging.getLogger("pipeline.evaluate")
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
_active_subprocesses: set = set()
# ─── Constants ──────────────────────────────────────────────────────────────
DOMAIN_AGENT_MAP = {
"internet-finance": "Rio",
"entertainment": "Clay",
"health": "Vida",
"ai-alignment": "Theseus",
"space-development": "Astra",
"mechanisms": "Rio",
"living-capital": "Rio",
"living-agents": "Theseus",
"teleohumanity": "Leo",
"grand-strategy": "Leo",
"critical-systems": "Theseus",
"collective-intelligence": "Theseus",
"teleological-economics": "Rio",
"cultural-dynamics": "Clay",
}
async def kill_active_subprocesses():
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
for proc in list(_active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
_active_subprocesses.clear()
def _agent_token(agent_name: str) -> str | None:
"""Read Forgejo token for a named agent. Returns token string or None."""
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
if token_file.exists():
return token_file.read_text().strip()
return None
REVIEW_STYLE_GUIDE = (
"Be concise. Only mention what fails or is interesting. "
"Do not summarize what the PR does — the diff speaks for itself. "
"If everything passes, say so in one line and approve."
)
# ─── Prompt templates ──────────────────────────────────────────────────────
TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT.
DEEP — use when ANY of these apply:
- PR adds or modifies claims rated "likely" or higher confidence
- PR touches agent beliefs or creates cross-domain wiki links
- PR challenges an existing claim (has "challenged_by" or contradicts existing)
- PR modifies axiom-level beliefs
- PR is a cross-domain synthesis claim
STANDARD — use when:
- New claims in established domain areas
- Enrichments to existing claims (confirm/extend)
- New hypothesis-level beliefs
- Source archives with extraction results
LIGHT — use ONLY when ALL changes fit these categories:
- Entity attribute updates (factual corrections, new data points)
- Source archiving without extraction
- Formatting fixes, typo corrections
- Status field changes
IMPORTANT: When uncertain, classify UP, not down. Always err toward more review.
Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line.
--- PR DIFF ---
{diff}"""
DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
Review this PR from your domain expertise:
1. Technical accuracy — are the claims factually correct in your domain?
2. Domain duplicates — does your domain already have substantially similar claims?
3. Missing context — is important domain context absent that would change interpretation?
4. Confidence calibration — from your domain expertise, is the confidence level right?
5. Enrichment opportunities — should this connect to existing claims via wiki links?
{style_guide}
If you are requesting changes, tag the specific issues:
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by
End your review with exactly one of:
<!-- VERDICT:{agent_upper}:APPROVE -->
<!-- VERDICT:{agent_upper}:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR against the quality criteria:
1. Schema compliance — YAML frontmatter, prose-as-title, required fields
2. Duplicate check — does this claim already exist?
3. Confidence calibration — appropriate for the evidence?
4. Wiki link validity — references real claims?
5. Source quality — credible for the claim?
6. Domain assignment — correct domain?
7. Epistemic hygiene — specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check:
1. Cross-domain implications — does this claim affect beliefs in other domains?
2. Confidence calibration — is the confidence level justified by the evidence?
3. Contradiction check — does this contradict any existing claims without explicit argument?
4. Wiki link validity — do all wiki links reference real, existing claims?
5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary?
6. Source quality — is the source credible for the claim being made?
7. Duplicate check — does a substantially similar claim already exist?
8. Enrichment vs new claim — should this be an enrichment to an existing claim instead?
9. Domain assignment — is the claim in the correct domain?
10. Schema compliance — YAML frontmatter, prose-as-title format, required fields
11. Epistemic hygiene — is the claim specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
# ─── API helpers ───────────────────────────────────────────────────────────
async def _forgejo_api(method: str, path: str, body: dict = None, token: str = None):
"""Call Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
if token is None:
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(method, url, headers=headers,
json=body, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204:
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
import aiohttp
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
logger.error("OpenRouter key file not found")
return None
key = key_file.read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
"temperature": 0.2,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
config.OPENROUTER_URL,
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=timeout_sec),
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("OpenRouter %s%d: %s", model, resp.status, text[:200])
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s%s", model, e)
return None
async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None:
"""Call Claude via CLI (Claude Max subscription). Returns response or None."""
proc = await asyncio.create_subprocess_exec(
str(config.CLAUDE_CLI), "-p", "--model", model, "--output-format", "text",
cwd=cwd or str(config.REPO_DIR),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec)
return None
finally:
_active_subprocesses.discard(proc)
out_text = (stdout or b"").decode()
err_text = (stderr or b"").decode()
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
combined_lower = (out_text + err_text).lower()
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
return "RATE_LIMITED"
if proc.returncode != 0:
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
return None
return out_text.strip()
# ─── Diff helpers ──────────────────────────────────────────────────────────
async def _get_pr_diff(pr_number: int) -> str:
"""Fetch PR diff via Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
headers={"Authorization": f"token {token}", "Accept": "text/plain"},
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
if resp.status >= 400:
return ""
diff = await resp.text()
if len(diff) > 2_000_000:
return ""
return diff
except Exception as e:
logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e)
return ""
def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/archive/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r'(?=^diff --git )', diff, flags=re.MULTILINE)
skip_patterns = [r'^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/']
core_domains = {'living-agents', 'living-capital', 'teleohumanity', 'mechanisms'}
claim_sections = []
entity_sections = []
for section in sections:
if not section.strip():
continue
if any(re.match(p, section) for p in skip_patterns):
continue
entity_match = re.match(r'^diff --git a/entities/([^/]+)/', section)
if entity_match and entity_match.group(1) not in core_domains:
entity_sections.append(section)
continue
claim_sections.append(section)
return ''.join(claim_sections), ''.join(entity_sections)
def _extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0]
for line in diff.split("\n")
if line.startswith("diff --git")
)
def _detect_domain_from_diff(diff: str) -> str | None:
"""Detect primary domain from changed file paths.
Checks domains/, entities/, core/, foundations/ for domain classification.
"""
domain_counts: dict[str, int] = {}
for line in diff.split("\n"):
if line.startswith("diff --git"):
# Check domains/ and entities/ (both carry domain info)
match = re.search(r'(?:domains|entities)/([^/]+)/', line)
if match:
d = match.group(1)
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check core/ subdirectories
match = re.search(r'core/([^/]+)/', line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check foundations/ subdirectories
match = re.search(r'foundations/([^/]+)/', line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
if domain_counts:
return max(domain_counts, key=domain_counts.get)
return None
def _is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if "agents/" in line and "/musings/" in line:
has_musings = True
else:
has_other = True
return has_musings and not has_other
# ─── Verdict parsing ──────────────────────────────────────────────────────
def _parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
return "approve"
elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text:
return "request_changes"
else:
logger.warning("No parseable verdict from %s — treating as request_changes", reviewer)
return "request_changes"
def _parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review."""
match = re.search(r'<!-- ISSUES: ([^>]+) -->', review_text)
if not match:
return []
return [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
# ─── Review execution ─────────────────────────────────────────────────────
async def _triage_pr(diff: str) -> str:
"""Triage PR via Haiku → DEEP/STANDARD/LIGHT."""
prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage
result = await _openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
if not result:
logger.warning("Triage failed, defaulting to STANDARD")
return "STANDARD"
tier = result.split("\n")[0].strip().upper()
if tier in ("DEEP", "STANDARD", "LIGHT"):
reason = result.split("\n")[1].strip() if "\n" in result else ""
logger.info("Triage: %s%s", tier, reason[:100])
return tier
logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20])
return "STANDARD"
async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o."""
prompt = DOMAIN_PROMPT.format(
agent=agent, agent_upper=agent.upper(), domain=domain,
style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files,
)
# Try Claude Max Sonnet first
result = await _claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck)
policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow")
if policy == "overflow":
logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o")
result = await _openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
else:
logger.info("Claude Max rate limited, queuing domain review")
return None
return result
async def _run_leo_review(diff: str, files: str, tier: str) -> str | None:
"""Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy)."""
prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
result = await _claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Leo review queues — don't waste Opus calls (never overflow)
logger.info("Claude Max Opus rate limited, queuing Leo review")
return None
return result
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token_file = config.SECRETS_DIR / f"forgejo-{agent_name}-token"
if token_file.exists():
token = token_file.read_text().strip()
result = await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/reviews",
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Single PR evaluation ─────────────────────────────────────────────────
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Fetch diff
diff = await _get_pr_diff(pr_number)
if not diff:
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}
# Musings bypass
if _is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"""UPDATE prs SET status = 'approved', leo_verdict = 'skipped',
domain_verdict = 'skipped' WHERE number = ?""",
(pr_number,),
)
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
# Filter diff
review_diff, entity_diff = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Detect domain
domain = _detect_domain_from_diff(diff)
agent = DOMAIN_AGENT_MAP.get(domain, "Leo") if domain else "Leo"
# Default NULL domain to 'general' (archive-only PRs have no domain files)
if domain is None:
domain = "general"
# Update PR domain if not set
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?",
(domain, agent, pr_number),
)
# Step 1: Triage (if not already triaged)
if tier is None:
tier = await _triage_pr(diff)
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
conn.execute(
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
(pr_number,),
)
# Check if domain review already completed (resuming after Leo rate limit)
existing = conn.execute(
"SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)
).fetchone()
existing_domain_verdict = existing["domain_verdict"] if existing else "pending"
existing_leo_verdict = existing["leo_verdict"] if existing else "pending"
# Step 2: Domain review FIRST (Sonnet — high volume filter)
# Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
if existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review = await _run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# Rate limited, couldn't overflow — revert to open for retry
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "rate_limited"}
domain_verdict = _parse_verdict(domain_review, agent)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
)
# Post domain review as comment (from agent's Forgejo account)
agent_tok = _agent_token(agent)
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": domain_review},
token=agent_tok,
)
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
conn.execute(
"""UPDATE prs SET status = 'open', leo_verdict = 'skipped',
last_error = 'domain review requested changes'
WHERE number = ?""",
(pr_number,),
)
db.audit(conn, "evaluate", "domain_rejected",
json.dumps({"pr": pr_number, "agent": agent}))
return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"}
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
leo_verdict = "skipped"
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review = await _run_leo_review(review_diff, files, tier)
if leo_review is None:
# Opus rate limited — revert to open for retry (keep domain verdict)
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "opus_rate_limited"}
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
leo_tok = _agent_token("Leo")
await _forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
{"body": leo_review},
token=leo_tok,
)
else:
# LIGHT tier: Leo is auto-skipped, domain verdict is the only gate
conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,))
# Step 4: Determine final verdict
both_approve = (
(leo_verdict == "approve" or leo_verdict == "skipped")
and domain_verdict == "approve"
)
if both_approve:
# Get PR author for formal approvals
pr_info = await _forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
)
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
# Submit formal Forgejo reviews (required for merge)
await _post_formal_approvals(pr_number, pr_author)
conn.execute(
"UPDATE prs SET status = 'approved' WHERE number = ?",
(pr_number,),
)
db.audit(conn, "evaluate", "approved",
json.dumps({"pr": pr_number, "tier": tier, "domain": domain,
"leo": leo_verdict, "domain_agent": agent}))
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)",
pr_number, tier, leo_verdict, domain_verdict)
else:
conn.execute(
"UPDATE prs SET status = 'open' WHERE number = ?",
(pr_number,),
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier}
if domain_verdict == "request_changes" and domain_review is not None:
feedback["domain_issues"] = _parse_issues(domain_review)
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_number),
)
db.audit(conn, "evaluate", "changes_requested",
json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict,
"domain": domain_verdict}))
logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)",
pr_number, leo_verdict, domain_verdict)
# Record cost (domain review)
from . import costs
costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max")
if tier != "LIGHT":
costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max")
return {
"pr": pr_number, "tier": tier, "domain": domain,
"leo_verdict": leo_verdict, "domain_verdict": domain_verdict,
"approved": both_approve,
}
# ─── Rate limit backoff ───────────────────────────────────────────────────
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
# CLI calls overnight when Opus is exhausted.
_rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one evaluation cycle.
Finds PRs with status='open', tier0_pass=1, and no pending verdicts.
Evaluates in priority order.
"""
global _rate_limit_backoff_until
# If we're in rate-limit backoff, skip this cycle entirely
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining)
return 0, 0
else:
logger.info("Rate limit backoff expired, resuming eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation:
# - status = 'open'
# - tier0_pass = 1 (passed validation)
# - leo_verdict = 'pending' OR domain_verdict = 'pending'
# Skip PRs attempted within last 10 minutes (backoff during rate limits)
rows = conn.execute(
"""SELECT p.number, p.tier FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT ?""",
(max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if not rows:
return 0, 0
succeeded = 0
failed = 0
for row in rows:
try:
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], reason)
# Any rate limit — stop the entire cycle. No point trying more PRs
# when the model is exhausted. The 10-minute backoff on last_attempt
# prevents re-processing the same PR; breaking here prevents
# cycling through OTHER PRs that will also hit the same limit.
if "rate_limited" in reason:
from datetime import timedelta
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info("Rate limited (%s) — backing off for %d minutes",
reason, _RATE_LIMIT_BACKOFF_MINUTES)
break
else:
succeeded += 1
except Exception:
logger.exception("Failed to evaluate PR #%d", row["number"])
failed += 1
# Revert to open on unhandled error
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if succeeded or failed:
logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed)
return succeeded, failed