teleo-infrastructure/lib/evaluate.py
m3taversal e0c9951308
Some checks are pending
CI / lint-and-test (push) Waiting to run
fix: close stale PRs on Forgejo when pipeline DB marks them closed
Two code paths set status='closed' in the pipeline DB without calling
the Forgejo API to close the PR. This caused 50 ghost PRs to accumulate
on Forgejo (dashboard shows review backlog) while the pipeline considered
them done.

- evaluate.py: no-diff stale branch close now calls Forgejo PATCH
- merge.py: permanent conflict close now calls Forgejo PATCH

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 17:15:58 +01:00

1510 lines
61 KiB
Python

"""Evaluate stage — PR lifecycle orchestration.
Tier-based review routing. Model diversity: GPT-4o (domain) + Sonnet (Leo STANDARD)
+ Opus (Leo DEEP) = two model families, no correlated blind spots.
Flow per PR:
1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT
2. Tier overrides:
a. Claim-shape detector: type: claim in YAML → STANDARD min (Theseus)
b. Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio)
3. Domain review → GPT-4o (OpenRouter) — skipped for LIGHT when LIGHT_SKIP_LLM=True
4. Leo review → Opus DEEP / Sonnet STANDARD (OpenRouter) — skipped for LIGHT
5. Post reviews, submit formal Forgejo approvals, update SQLite
6. If both approve → status = 'approved' (merge module picks it up)
7. Retry budget: 3 attempts max, disposition on attempt 2+
Design reviewed by Ganymede, Rio, Theseus, Rhea, Leo.
LLM transport and prompts extracted to lib/llm.py (Phase 3c).
"""
import json
import logging
import random
import re
from datetime import datetime, timezone
from . import config, db
from .domains import agent_for_domain, detect_domain_from_branch, detect_domain_from_diff
from .forgejo import api as forgejo_api
from .forgejo import get_agent_token, get_pr_diff, repo_path
from .merge import PIPELINE_OWNED_PREFIXES
from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr
from .feedback import format_rejection_comment
from .validate import load_existing_claims
logger = logging.getLogger("pipeline.evaluate")
# ─── Diff helpers ──────────────────────────────────────────────────────────
def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE)
skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"]
core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"}
claim_sections = []
entity_sections = []
for section in sections:
if not section.strip():
continue
if any(re.match(p, section) for p in skip_patterns):
continue
entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section)
if entity_match and entity_match.group(1) not in core_domains:
entity_sections.append(section)
continue
claim_sections.append(section)
return "".join(claim_sections), "".join(entity_sections)
def _extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git")
)
def _is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if "agents/" in line and "/musings/" in line:
has_musings = True
else:
has_other = True
return has_musings and not has_other
# ─── NOTE: Tier 0.5 mechanical pre-check moved to validate.py ────────────
# Tier 0.5 now runs as part of the validate stage (before eval), not inside
# evaluate_pr(). This prevents wasting eval_attempts on mechanically fixable
# PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed.
# ─── Tier overrides ───────────────────────────────────────────────────────
def _diff_contains_claim_type(diff: str) -> bool:
"""Claim-shape detector: check if any file in diff has type: claim in frontmatter.
Mechanical check ($0). If YAML declares type: claim, this is a factual claim —
not an entity update or formatting fix. Must be classified STANDARD minimum
regardless of Haiku triage. Catches factual claims disguised as LIGHT content.
(Theseus: converts semantic problem to mechanical check)
"""
for line in diff.split("\n"):
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped in ("type: claim", 'type: "claim"', "type: 'claim'"):
return True
return False
def _deterministic_tier(diff: str) -> str | None:
"""Deterministic tier routing — skip Haiku triage for obvious cases.
Checks diff file patterns before calling the LLM. Returns tier string
if deterministic, None if Haiku triage is needed.
Rules (Leo-calibrated):
- All files in entities/ only → LIGHT
- All files in inbox/ only (queue, archive, null-result) → LIGHT
- Any file in core/ or foundations/ → DEEP (structural KB changes)
- Has challenged_by field → DEEP (challenges existing claims)
- Modifies existing file (not new) in domains/ → DEEP (enrichment/change)
- Otherwise → None (needs Haiku triage)
NOTE: Cross-domain wiki links are NOT a DEEP signal — most claims link
across domains, that's the whole point of the knowledge graph (Leo).
"""
changed_files = []
for line in diff.split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
changed_files.append(path)
if not changed_files:
return None
# All entities/ only → LIGHT
if all(f.startswith("entities/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in entities/)")
return "LIGHT"
# All inbox/ only (queue, archive, null-result) → LIGHT
if all(f.startswith("inbox/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in inbox/)")
return "LIGHT"
# Any file in core/ or foundations/ → DEEP (structural KB changes)
if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files):
logger.info("Deterministic tier: DEEP (touches core/ or foundations/)")
return "DEEP"
# Check diff content for DEEP signals
has_challenged_by = False
has_modified_claim = False
new_files: set[str] = set()
lines = diff.split("\n")
for i, line in enumerate(lines):
# Detect new files
if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"):
new_files.add(lines[i + 1][6:])
# Check for challenged_by field
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped.startswith("challenged_by:"):
has_challenged_by = True
if has_challenged_by:
logger.info("Deterministic tier: DEEP (has challenged_by field)")
return "DEEP"
# NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments
# (appending evidence) are common and should be STANDARD. Let Haiku triage
# distinguish enrichments from structural changes.
return None
# ─── Verdict parsing ──────────────────────────────────────────────────────
def _parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
return "approve"
elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text:
return "request_changes"
else:
logger.warning("No parseable verdict from %s — treating as request_changes", reviewer)
return "request_changes"
# Map model-invented tags to valid tags. Models consistently ignore the valid
# tag list and invent their own. This normalizes them. (Ganymede, Mar 14)
_TAG_ALIASES: dict[str, str] = {
"schema_violation": "frontmatter_schema",
"missing_schema_fields": "frontmatter_schema",
"missing_schema": "frontmatter_schema",
"schema": "frontmatter_schema",
"missing_frontmatter": "frontmatter_schema",
"redundancy": "near_duplicate",
"duplicate": "near_duplicate",
"missing_confidence": "confidence_miscalibration",
"confidence_error": "confidence_miscalibration",
"vague_claims": "scope_error",
"unfalsifiable": "scope_error",
"unverified_wiki_links": "broken_wiki_links",
"unverified-wiki-links": "broken_wiki_links",
"missing_wiki_links": "broken_wiki_links",
"invalid_wiki_links": "broken_wiki_links",
"wiki_link_errors": "broken_wiki_links",
"overclaiming": "title_overclaims",
"title_overclaim": "title_overclaims",
"date_error": "date_errors",
"factual_error": "factual_discrepancy",
"factual_inaccuracy": "factual_discrepancy",
}
VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims",
"confidence_miscalibration", "date_errors", "factual_discrepancy",
"near_duplicate", "scope_error"}
def _normalize_tag(tag: str) -> str | None:
"""Normalize a model-generated tag to a valid tag, or None if unrecognizable."""
tag = tag.strip().lower().replace("-", "_")
if tag in VALID_ISSUE_TAGS:
return tag
if tag in _TAG_ALIASES:
return _TAG_ALIASES[tag]
# Fuzzy: check if any valid tag is a substring or vice versa
for valid in VALID_ISSUE_TAGS:
if valid in tag or tag in valid:
return valid
return None
def _parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review.
First tries structured <!-- ISSUES: tag1, tag2 --> comment with tag normalization.
Falls back to keyword inference from prose.
"""
match = re.search(r"<!-- ISSUES: ([^>]+) -->", review_text)
if match:
raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
normalized = []
for tag in raw_tags:
norm = _normalize_tag(tag)
if norm and norm not in normalized:
normalized.append(norm)
else:
logger.debug("Unrecognized issue tag '%s' — dropped", tag)
if normalized:
return normalized
# Fallback: infer tags from review prose
return _infer_issues_from_prose(review_text)
# Keyword patterns for inferring issue tags from unstructured review prose.
# Conservative: only match unambiguous indicators. Order doesn't matter.
_PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = {
"frontmatter_schema": [
re.compile(r"frontmatter", re.IGNORECASE),
re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE),
re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE),
re.compile(r"required field", re.IGNORECASE),
re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE),
re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE),
re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE),
],
"broken_wiki_links": [
re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE),
re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE),
re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE),
re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE),
],
"factual_discrepancy": [
re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE),
re.compile(r"misrepresent", re.IGNORECASE),
],
"confidence_miscalibration": [
re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE),
re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE),
],
"scope_error": [
re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE),
re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE),
re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE),
re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE),
],
"title_overclaims": [
re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE),
re.compile(r"overclaim", re.IGNORECASE),
],
"near_duplicate": [
re.compile(r"near.?duplicate", re.IGNORECASE),
re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE),
re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE),
re.compile(r"redundan", re.IGNORECASE),
],
}
def _infer_issues_from_prose(review_text: str) -> list[str]:
"""Infer issue tags from unstructured review text via keyword matching.
Fallback for reviews that reject without structured <!-- ISSUES: --> tags.
Conservative: requires at least one unambiguous keyword match per tag.
"""
inferred = []
for tag, patterns in _PROSE_TAG_PATTERNS.items():
if any(p.search(review_text) for p in patterns):
inferred.append(tag)
return inferred
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Retry budget helpers ─────────────────────────────────────────────────
async def _terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Get issue tags for structured feedback
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
issues = []
if row and row["eval_issues"]:
try:
issues = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
pass
# Post structured rejection comment with quality gate guidance (Epimetheus)
if issues:
feedback_body = format_rejection_comment(issues, source="eval_terminal")
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback.\n\n"
f"{feedback_body}"
)
else:
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback."
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": comment_body},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
{"state": "closed"},
)
# Update PR status
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(reason, pr_number),
)
# Tag source for re-extraction with feedback
cursor = conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
if cursor.rowcount == 0:
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
db.audit(
conn,
"evaluate",
"pr_terminated",
json.dumps(
{
"pr": pr_number,
"reason": reason,
}
),
)
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
def _classify_issues(issues: list[str]) -> str:
"""Classify issue tags as 'mechanical', 'substantive', or 'mixed'."""
if not issues:
return "unknown"
mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS
substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS
if substantive and not mechanical:
return "substantive"
if mechanical and not substantive:
return "mechanical"
if mechanical and substantive:
return "mixed"
return "unknown" # tags not in either set
async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Attempt 1: normal — back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
if eval_attempts < 2:
# Attempt 1: post structured feedback so agent learns, but don't close
if all_issues:
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": feedback_body},
)
return
classification = _classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number,
eval_attempts,
all_issues,
)
db.audit(
conn,
"evaluate",
"mechanical_retry",
json.dumps(
{
"pr": pr_number,
"attempt": eval_attempts,
"issues": all_issues,
}
),
)
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number,
eval_attempts,
classification,
all_issues,
)
await _terminate_pr(
conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}"
)
# ─── Single PR evaluation ─────────────────────────────────────────────────
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
from . import costs
pr_cost = 0.0
# Check eval attempt budget before claiming
row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone()
eval_attempts = (row["eval_attempts"] or 0) if row else 0
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal — hard cap reached. Close PR, tag source.
logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS)
await _terminate_pr(conn, pr_number, "eval budget exhausted")
return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"}
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Increment eval_attempts — but not if this is a merge-failure re-entry (Ganymede+Rhea)
merge_cycled = conn.execute(
"SELECT merge_cycled FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if merge_cycled and merge_cycled["merge_cycled"]:
# Merge cycling — don't burn eval budget, clear flag
conn.execute("UPDATE prs SET merge_cycled = 0 WHERE number = ?", (pr_number,))
logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_number)
else:
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?",
(pr_number,),
)
eval_attempts += 1
# Fetch diff
diff = await get_pr_diff(pr_number)
if not diff:
# Close PRs with no diff — stale branch, nothing to evaluate
await forgejo_api("PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"})
conn.execute("UPDATE prs SET status='closed', last_error='closed: no diff against main (stale branch)' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "no_diff_closed"}
# Musings bypass
if _is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"""UPDATE prs SET status = 'approved', leo_verdict = 'skipped',
domain_verdict = 'skipped', domain = COALESCE(domain, 'cross-domain'),
auto_merge = 1 WHERE number = ?""",
(pr_number,),
)
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
# Reweave bypass — reweave PRs only add frontmatter edges (supports/challenges/
# related/depends_on/challenged_by). The eval LLM has no context for judging
# edge correctness and consistently flags factual_discrepancy on valid edges.
# Leo's manual PR review is the real quality gate for reweave.
branch_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
branch_name = branch_row["branch"] if branch_row else ""
if branch_name.startswith("reweave/"):
logger.info("PR #%d is reweave (branch=%s) — auto-approving, Leo reviews manually", pr_number, branch_name)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": "Auto-approved: reweave structural update (frontmatter edges only). Leo reviews manually."},
)
conn.execute(
"""UPDATE prs SET status = 'approved', leo_verdict = 'skipped',
domain_verdict = 'skipped', auto_merge = 1,
domain = COALESCE(domain, 'cross-domain') WHERE number = ?""",
(pr_number,),
)
db.audit(
conn, "evaluate", "reweave_bypass",
json.dumps({"pr": pr_number, "branch": branch_name}),
)
return {"pr": pr_number, "auto_approved": True, "reason": "reweave_bypass"}
# NOTE: Tier 0.5 mechanical checks now run in validate stage (before eval).
# tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here.
# Filter diff
review_diff, _entity_diff = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Detect domain — try diff paths first, then branch prefix, then 'general'
domain = detect_domain_from_diff(diff)
if domain is None:
pr_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
if pr_row and pr_row["branch"]:
domain = detect_domain_from_branch(pr_row["branch"])
if domain is None:
domain = "general"
agent = agent_for_domain(domain)
# Update PR domain if not set
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?",
(domain, agent, pr_number),
)
# Step 1: Triage (if not already triaged)
# Try deterministic routing first ($0), fall back to Haiku triage ($0.001)
if tier is None:
tier = _deterministic_tier(diff)
if tier is not None:
db.audit(
conn, "evaluate", "deterministic_tier",
json.dumps({"pr": pr_number, "tier": tier}),
)
else:
tier, triage_usage, _triage_reason = await triage_pr(diff)
pr_cost += costs.record_usage(
conn, config.TRIAGE_MODEL, "eval_triage",
input_tokens=triage_usage.get("prompt_tokens", 0),
output_tokens=triage_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Tier overrides (claim-shape detector + random promotion)
# Order matters: claim-shape catches obvious cases, random promotion catches the rest.
# Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus)
if tier == "LIGHT" and _diff_contains_claim_type(diff):
tier = "STANDARD"
logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number)
db.audit(
conn, "evaluate", "claim_shape_upgrade", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"})
)
# Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio)
if tier == "LIGHT" and random.random() < config.LIGHT_PROMOTION_RATE:
tier = "STANDARD"
logger.info(
"PR #%d: random promotion LIGHT → STANDARD (%.0f%% rate)", pr_number, config.LIGHT_PROMOTION_RATE * 100
)
db.audit(conn, "evaluate", "random_promotion", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"}))
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
conn.execute(
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
(pr_number,),
)
# Check if domain review already completed (resuming after Leo rate limit)
existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone()
existing_domain_verdict = existing["domain_verdict"] if existing else "pending"
_existing_leo_verdict = existing["leo_verdict"] if existing else "pending"
# Step 2: Domain review (GPT-4o via OpenRouter)
# LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback)
# Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
domain_usage = {"prompt_tokens": 0, "completion_tokens": 0}
leo_usage = {"prompt_tokens": 0, "completion_tokens": 0}
if tier == "LIGHT" and config.LIGHT_SKIP_LLM:
domain_verdict = "skipped"
logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number)
conn.execute(
"UPDATE prs SET domain_verdict = 'skipped', domain_model = 'none' WHERE number = ?",
(pr_number,),
)
elif existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review, domain_usage = await run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# OpenRouter failure (timeout, error) — revert to open for retry.
# NOT a rate limit — don't trigger 15-min backoff, just skip this PR.
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"}
domain_verdict = _parse_verdict(domain_review, agent)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
)
# Post domain review as comment (from agent's Forgejo account)
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": domain_review},
token=agent_tok,
)
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
domain_issues = _parse_issues(domain_review) if domain_review else []
conn.execute(
"""UPDATE prs SET status = 'open', leo_verdict = 'skipped',
last_error = 'domain review requested changes',
eval_issues = ?
WHERE number = ?""",
(json.dumps(domain_issues), pr_number),
)
db.audit(
conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues})
)
db.record_review(
conn, pr_number, "rejected",
domain=domain, agent=agent, reviewer=agent, reviewer_model="gpt-4o",
notes=(domain_review or "")[:4000],
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
if domain_verdict != "skipped":
pr_cost += costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {
"pr": pr_number,
"domain_verdict": domain_verdict,
"leo_verdict": "skipped",
"eval_attempts": eval_attempts,
}
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
leo_verdict = "skipped"
leo_review = None # Initialize — used later for issue extraction
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review, leo_usage = await run_leo_review(review_diff, files, tier)
if leo_review is None:
# DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle).
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
if domain_verdict != "skipped":
pr_cost += costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed"
return {"pr": pr_number, "skipped": True, "reason": reason}
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": leo_review},
token=leo_tok,
)
else:
# LIGHT tier: Leo is auto-skipped, domain verdict is the only gate
conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,))
# Step 4: Determine final verdict
# "skipped" counts as approve (LIGHT skips both reviews deliberately)
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
if both_approve:
# Get PR author for formal approvals
pr_info = await forgejo_api(
"GET",
repo_path(f"pulls/{pr_number}"),
)
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
# Submit formal Forgejo reviews (required for merge)
await _post_formal_approvals(pr_number, pr_author)
# Auto-merge agent PRs: if branch is NOT pipeline-owned, set auto_merge=1
# so the merge cycle picks it up without manual intervention.
branch_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
branch_name = branch_row["branch"] if branch_row else ""
is_agent_pr = not branch_name.startswith(PIPELINE_OWNED_PREFIXES)
conn.execute(
"UPDATE prs SET status = 'approved', domain = COALESCE(domain, ?), auto_merge = ? WHERE number = ?",
(domain, 1 if is_agent_pr else 0, pr_number),
)
db.audit(
conn,
"evaluate",
"approved",
json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent,
"auto_merge": is_agent_pr}),
)
db.record_review(
conn, pr_number, "approved",
domain=domain, agent=agent, reviewer="leo", reviewer_model="sonnet" if tier == "STANDARD" else "opus",
notes=(leo_review or "")[:4000] if leo_review else None,
)
if is_agent_pr:
logger.info("PR #%d: APPROVED + auto_merge (agent branch %s)", pr_number, branch_name)
else:
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict)
else:
# Collect all issue tags from both reviews
all_issues = []
if domain_verdict == "request_changes" and domain_review is not None:
all_issues.extend(_parse_issues(domain_review))
if leo_verdict == "request_changes" and leo_review is not None:
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_number),
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_number),
)
db.audit(
conn,
"evaluate",
"changes_requested",
json.dumps(
{"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}
),
)
db.record_review(
conn, pr_number, "approved-with-changes",
domain=domain, agent=agent, reviewer="leo",
reviewer_model="sonnet" if tier == "STANDARD" else "opus",
notes=(leo_review or domain_review or "")[:4000],
)
logger.info(
"PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)",
pr_number,
leo_verdict,
domain_verdict,
all_issues,
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
# Record cost (only for reviews that actually ran)
if domain_verdict != "skipped":
pr_cost += costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if leo_verdict not in ("skipped",):
if tier == "DEEP":
pr_cost += costs.record_usage(
conn, config.EVAL_LEO_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="max",
)
else:
pr_cost += costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
if pr_cost > 0:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {
"pr": pr_number,
"tier": tier,
"domain": domain,
"leo_verdict": leo_verdict,
"domain_verdict": domain_verdict,
"approved": both_approve,
}
# ─── Rate limit backoff ───────────────────────────────────────────────────
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
# CLI calls overnight when Opus is exhausted.
_rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Batch domain review ─────────────────────────────────────────────────
def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]:
"""Parse batched domain review into per-PR review sections.
Returns {pr_number: review_text} for each PR found in the response.
Missing PRs are omitted — caller handles fallback.
"""
agent_upper = agent.upper()
result: dict[int, str] = {}
# Split by PR verdict markers: <!-- PR:NNN VERDICT:AGENT:... -->
# Each marker terminates the previous PR's section
pattern = re.compile(
r"<!-- PR:(\d+) VERDICT:" + re.escape(agent_upper) + r":(APPROVE|REQUEST_CHANGES) -->"
)
matches = list(pattern.finditer(response))
if not matches:
return result
for i, match in enumerate(matches):
pr_num = int(match.group(1))
verdict = match.group(2)
marker_end = match.end()
# Find the start of this PR's section by looking for the section header
# or the end of the previous verdict
section_header = f"=== PR #{pr_num}"
header_pos = response.rfind(section_header, 0, match.start())
if header_pos >= 0:
# Extract from header to end of verdict marker
section_text = response[header_pos:marker_end].strip()
else:
# No header found — extract from previous marker end to this marker end
prev_end = matches[i - 1].end() if i > 0 else 0
section_text = response[prev_end:marker_end].strip()
# Re-format as individual review comment
# Strip the batch section header, keep just the review content
# Add batch label for traceability
pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers)
review_text = (
f"*(batch review with PRs {pr_nums_str})*\n\n"
f"{section_text}\n"
)
result[pr_num] = review_text
return result
def _validate_batch_fanout(
parsed: dict[int, str],
pr_diffs: list[dict],
agent: str,
) -> tuple[dict[int, str], list[int]]:
"""Validate batch fan-out for completeness and cross-contamination.
Returns (valid_reviews, fallback_pr_numbers).
- valid_reviews: reviews that passed validation
- fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated)
"""
valid: dict[int, str] = {}
fallback: list[int] = []
# Build file map: pr_number → set of path segments for matching.
# Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames
# to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3).
pr_files: dict[int, set[str]] = {}
for pr in pr_diffs:
files = set()
for line in pr["diff"].split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
files.add(path)
# Also add the last 2 path segments (e.g., "internet-finance/dao.md")
# for models that abbreviate paths
parts = path.split("/")
if len(parts) >= 2:
files.add("/".join(parts[-2:]))
pr_files[pr["number"]] = files
for pr in pr_diffs:
pr_num = pr["number"]
# Completeness check: is there a review for this PR?
if pr_num not in parsed:
logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num)
fallback.append(pr_num)
continue
review = parsed[pr_num]
# Cross-contamination check: does review mention at least one file from this PR?
# Use path segments (min 10 chars) to avoid false substring matches on short names.
my_files = pr_files.get(pr_num, set())
mentions_own_file = any(f in review for f in my_files if len(f) >= 10)
if not mentions_own_file and my_files:
# Check if it references files from OTHER PRs (cross-contamination signal)
other_files = set()
for other_pr in pr_diffs:
if other_pr["number"] != pr_num:
other_files.update(pr_files.get(other_pr["number"], set()))
mentions_other = any(f in review for f in other_files if len(f) >= 10)
if mentions_other:
logger.warning(
"Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback",
pr_num,
)
fallback.append(pr_num)
continue
# If it doesn't mention any files at all, could be a generic review — accept it
# (some PRs have short diffs where the model doesn't reference filenames)
valid[pr_num] = review
return valid, fallback
async def _run_batch_domain_eval(
conn, batch_prs: list[dict], domain: str, agent: str,
) -> tuple[int, int]:
"""Execute batch domain review for a group of same-domain STANDARD PRs.
1. Claim all PRs atomically
2. Run single batch domain review
3. Parse + validate fan-out
4. Post per-PR comments
5. Continue to individual Leo review for each
6. Fall back to individual review for any validation failures
Returns (succeeded, failed).
"""
from .forgejo import get_pr_diff as _get_pr_diff
succeeded = 0
failed = 0
# Step 1: Fetch diffs and build batch
pr_diffs = []
claimed_prs = []
for pr_row in batch_prs:
pr_num = pr_row["number"]
# Atomic claim
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_num,),
)
if cursor.rowcount == 0:
continue
# Increment eval_attempts — skip if merge-cycled (Ganymede+Rhea)
mc_row = conn.execute("SELECT merge_cycled FROM prs WHERE number = ?", (pr_num,)).fetchone()
if mc_row and mc_row["merge_cycled"]:
conn.execute(
"UPDATE prs SET merge_cycled = 0, last_attempt = datetime('now') WHERE number = ?",
(pr_num,),
)
logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_num)
else:
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1, "
"last_attempt = datetime('now') WHERE number = ?",
(pr_num,),
)
diff = await _get_pr_diff(pr_num)
if not diff:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
continue
# Musings bypass
if _is_musings_only(diff):
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"UPDATE prs SET status = 'approved', leo_verdict = 'skipped', "
"domain_verdict = 'skipped', domain = COALESCE(domain, 'cross-domain'), "
"auto_merge = 1 WHERE number = ?",
(pr_num,),
)
succeeded += 1
continue
review_diff, _ = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Build label from branch name or first claim filename
branch = pr_row.get("branch", "")
label = branch.split("/")[-1][:60] if branch else f"pr-{pr_num}"
pr_diffs.append({
"number": pr_num,
"label": label,
"diff": review_diff,
"files": files,
"full_diff": diff, # kept for Leo review
"file_count": len([l for l in files.split("\n") if l.strip()]),
})
claimed_prs.append(pr_num)
if not pr_diffs:
return 0, 0
# Enforce BATCH_EVAL_MAX_DIFF_BYTES — split if total diff is too large.
# We only know diff sizes after fetching, so enforce here not in _build_domain_batches.
total_bytes = sum(len(p["diff"].encode()) for p in pr_diffs)
if total_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and len(pr_diffs) > 1:
# Keep PRs up to the byte cap, revert the rest to open for next cycle
kept = []
running_bytes = 0
for p in pr_diffs:
p_bytes = len(p["diff"].encode())
if running_bytes + p_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and kept:
break
kept.append(p)
running_bytes += p_bytes
overflow = [p for p in pr_diffs if p not in kept]
for p in overflow:
conn.execute(
"UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 "
"WHERE number = ?",
(p["number"],),
)
claimed_prs.remove(p["number"])
logger.info(
"PR #%d: diff too large for batch (%d bytes total), deferring to next cycle",
p["number"], total_bytes,
)
pr_diffs = kept
if not pr_diffs:
return 0, 0
# Detect domain for all PRs (should be same domain)
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number IN ({})".format(
",".join("?" * len(claimed_prs))
),
[domain, agent] + claimed_prs,
)
# Step 2: Run batch domain review
logger.info(
"Batch domain review: %d PRs in %s domain (PRs: %s)",
len(pr_diffs),
domain,
", ".join(f"#{p['number']}" for p in pr_diffs),
)
batch_response, batch_domain_usage = await run_batch_domain_review(pr_diffs, domain, agent)
if batch_response is None:
# Complete failure — revert all to open
logger.warning("Batch domain review failed — reverting all PRs to open")
for pr_num in claimed_prs:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
return 0, len(claimed_prs)
# Step 3: Parse + validate fan-out
parsed = _parse_batch_response(batch_response, claimed_prs, agent)
valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent)
db.audit(
conn, "evaluate", "batch_domain_review",
json.dumps({
"domain": domain,
"batch_size": len(pr_diffs),
"valid": len(valid_reviews),
"fallback": fallback_prs,
}),
)
# Record batch domain review cost ONCE for the whole batch (not per-PR)
from . import costs
costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=batch_domain_usage.get("prompt_tokens", 0),
output_tokens=batch_domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Step 4: Process valid reviews — post comments + continue to Leo
for pr_data in pr_diffs:
pr_num = pr_data["number"]
if pr_num in fallback_prs:
# Revert — will be picked up by individual eval next cycle
conn.execute(
"UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 "
"WHERE number = ?",
(pr_num,),
)
logger.info("PR #%d: batch fallback — will retry individually", pr_num)
continue
if pr_num not in valid_reviews:
# Should not happen, but safety
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
continue
review_text = valid_reviews[pr_num]
domain_verdict = _parse_verdict(review_text, agent)
# Post domain review comment
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": review_text},
token=agent_tok,
)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num),
)
# If domain rejects, handle disposition (same as individual path)
if domain_verdict == "request_changes":
domain_issues = _parse_issues(review_text)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
conn.execute(
"UPDATE prs SET status = 'open', leo_verdict = 'skipped', "
"last_error = 'domain review requested changes', eval_issues = ? WHERE number = ?",
(json.dumps(domain_issues), pr_num),
)
db.audit(
conn, "evaluate", "domain_rejected",
json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}),
)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues)
succeeded += 1
continue
# Domain approved — continue to individual Leo review
logger.info("PR #%d: batch domain approved, proceeding to individual Leo review", pr_num)
review_diff = pr_data["diff"]
files = pr_data["files"]
leo_review, leo_usage = await run_leo_review(review_diff, files, "STANDARD")
if leo_review is None:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
logger.debug("PR #%d: Leo review failed, will retry next cycle", pr_num)
continue
if leo_review == "RATE_LIMITED":
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num)
continue
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num))
# Post Leo review
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": leo_review},
token=leo_tok,
)
costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Final verdict
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
if both_approve:
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
await _post_formal_approvals(pr_num, pr_author)
conn.execute("UPDATE prs SET status = 'approved', domain = COALESCE(domain, ?), auto_merge = 1 WHERE number = ?", (domain or "cross-domain", pr_num,))
db.audit(
conn, "evaluate", "approved",
json.dumps({"pr": pr_num, "tier": "STANDARD", "domain": domain,
"leo": leo_verdict, "domain_agent": agent, "batch": True}),
)
logger.info("PR #%d: APPROVED (batch domain + individual Leo)", pr_num)
else:
all_issues = []
if leo_verdict == "request_changes":
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_num),
)
feedback = {"leo": leo_verdict, "domain": domain_verdict,
"tier": "STANDARD", "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_num),
)
db.audit(
conn, "evaluate", "changes_requested",
json.dumps({"pr": pr_num, "tier": "STANDARD", "leo": leo_verdict,
"domain": domain_verdict, "issues": all_issues, "batch": True}),
)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues)
succeeded += 1
return succeeded, failed
def _build_domain_batches(
rows: list, conn,
) -> tuple[dict[str, list[dict]], list[dict]]:
"""Group STANDARD PRs by domain for batch eval. DEEP and LIGHT stay individual.
Returns (batches_by_domain, individual_prs).
Respects BATCH_EVAL_MAX_PRS and BATCH_EVAL_MAX_DIFF_BYTES.
"""
domain_candidates: dict[str, list[dict]] = {}
individual: list[dict] = []
for row in rows:
pr_num = row["number"]
tier = row["tier"]
# Only batch STANDARD PRs with pending domain review
if tier != "STANDARD":
individual.append(row)
continue
# Check if domain review already done (resuming after Leo rate limit)
existing = conn.execute(
"SELECT domain_verdict, domain FROM prs WHERE number = ?", (pr_num,)
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
individual.append(row)
continue
domain = existing["domain"] if existing and existing["domain"] and existing["domain"] != "general" else "general"
domain_candidates.setdefault(domain, []).append(row)
# Build sized batches per domain
batches: dict[str, list[dict]] = {}
for domain, prs in domain_candidates.items():
if len(prs) == 1:
# Single PR — no batching benefit, process individually
individual.extend(prs)
continue
# Cap at BATCH_EVAL_MAX_PRS
batch = prs[: config.BATCH_EVAL_MAX_PRS]
batches[domain] = batch
# Overflow goes individual
individual.extend(prs[config.BATCH_EVAL_MAX_PRS :])
return batches, individual
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one evaluation cycle.
Groups eligible STANDARD PRs by domain for batch domain review.
DEEP PRs get individual eval. LIGHT PRs get auto-approved.
Leo review always individual (safety net for batch cross-contamination).
"""
global _rate_limit_backoff_until
# Check if we're in Opus rate-limit backoff
opus_backoff = False
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining)
opus_backoff = True
else:
logger.info("Rate limit backoff expired, resuming full eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation
if opus_backoff:
verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))"
else:
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
# Stagger removed — migration protection no longer needed. Merge is domain-serialized
# and entity conflicts auto-resolve. Safe to let all eligible PRs enter eval. (Cory, Mar 14)
rows = conn.execute(
f"""SELECT p.number, p.tier, p.branch, p.domain FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS}
{verdict_filter}
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
CASE WHEN COALESCE(p.eval_attempts, 0) = 0 THEN 0 ELSE 1 END,
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT ?""",
(max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if not rows:
return 0, 0
succeeded = 0
failed = 0
# Group STANDARD PRs by domain for batch eval
domain_batches, individual_prs = _build_domain_batches(rows, conn)
# Process batch domain reviews first
for domain, batch_prs in domain_batches.items():
try:
agent = agent_for_domain(domain)
b_succeeded, b_failed = await _run_batch_domain_eval(
conn, batch_prs, domain, agent,
)
succeeded += b_succeeded
failed += b_failed
except Exception:
logger.exception("Batch eval failed for domain %s", domain)
# Revert all to open
for pr_row in batch_prs:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_row["number"],))
failed += len(batch_prs)
# Process individual PRs (DEEP, LIGHT, single-domain, fallback)
for row in individual_prs:
try:
if opus_backoff and row["tier"] == "DEEP":
existing = conn.execute(
"SELECT domain_verdict FROM prs WHERE number = ?",
(row["number"],),
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
logger.debug(
"PR #%d: skipping DEEP during Opus backoff (domain already %s)",
row["number"],
existing["domain_verdict"],
)
continue
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], reason)
if "rate_limited" in reason:
from datetime import timedelta
if reason == "opus_rate_limited":
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
opus_backoff = True
logger.info(
"Opus rate limited — backing off Opus for %d min, continuing triage+domain",
_RATE_LIMIT_BACKOFF_MINUTES,
)
continue
else:
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info(
"Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES
)
break
else:
succeeded += 1
except Exception:
logger.exception("Failed to evaluate PR #%d", row["number"])
failed += 1
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if succeeded or failed:
logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed)
return succeeded, failed