teleo-codex/ops/pipeline-v2/lib/evaluate.py
m3taversal 2c0d428dc0 Add Phase 1+2 instrumentation: review records, cascade automation, cross-domain index, agent state
Phase 1 — Audit logging infrastructure:
- review_records table (migration v12) capturing every eval verdict with outcome, rejection reason, disagreement type
- Cascade automation: auto-flag dependent beliefs/positions when merged claims change
- Merge frontmatter stamps: last_review metadata on merged claim files

Phase 2 — Cross-domain and state tracking:
- Cross-domain citation index: entity overlap detection across domains on every merge
- Agent-state schema v1: file-backed state for VPS agents (memory, tasks, inbox, metrics)
- Cascade completion tracking: process-cascade-inbox.py logs review outcomes
- research-session.sh: state hooks + cascade processing integration

All changes are live on VPS. This commit brings the code under version control for review.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 10:50:49 +00:00

1465 lines
59 KiB
Python

"""Evaluate stage — PR lifecycle orchestration.
Tier-based review routing. Model diversity: GPT-4o (domain) + Sonnet (Leo STANDARD)
+ Opus (Leo DEEP) = two model families, no correlated blind spots.
Flow per PR:
1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT
2. Tier overrides:
a. Claim-shape detector: type: claim in YAML → STANDARD min (Theseus)
b. Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio)
3. Domain review → GPT-4o (OpenRouter) — skipped for LIGHT when LIGHT_SKIP_LLM=True
4. Leo review → Opus DEEP / Sonnet STANDARD (OpenRouter) — skipped for LIGHT
5. Post reviews, submit formal Forgejo approvals, update SQLite
6. If both approve → status = 'approved' (merge module picks it up)
7. Retry budget: 3 attempts max, disposition on attempt 2+
Design reviewed by Ganymede, Rio, Theseus, Rhea, Leo.
LLM transport and prompts extracted to lib/llm.py (Phase 3c).
"""
import json
import logging
import random
import re
from datetime import datetime, timezone
from . import config, db
from .domains import agent_for_domain, detect_domain_from_diff
from .forgejo import api as forgejo_api
from .forgejo import get_agent_token, get_pr_diff, repo_path
from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr
from .feedback import format_rejection_comment
from .validate import load_existing_claims
logger = logging.getLogger("pipeline.evaluate")
# ─── Diff helpers ──────────────────────────────────────────────────────────
def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
Strips: inbox/, schemas/, skills/, agents/*/musings/
"""
sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE)
skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"]
core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"}
claim_sections = []
entity_sections = []
for section in sections:
if not section.strip():
continue
if any(re.match(p, section) for p in skip_patterns):
continue
entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section)
if entity_match and entity_match.group(1) not in core_domains:
entity_sections.append(section)
continue
claim_sections.append(section)
return "".join(claim_sections), "".join(entity_sections)
def _extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git")
)
def _is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
for line in diff.split("\n"):
if line.startswith("diff --git"):
if "agents/" in line and "/musings/" in line:
has_musings = True
else:
has_other = True
return has_musings and not has_other
# ─── NOTE: Tier 0.5 mechanical pre-check moved to validate.py ────────────
# Tier 0.5 now runs as part of the validate stage (before eval), not inside
# evaluate_pr(). This prevents wasting eval_attempts on mechanically fixable
# PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed.
# ─── Tier overrides ───────────────────────────────────────────────────────
def _diff_contains_claim_type(diff: str) -> bool:
"""Claim-shape detector: check if any file in diff has type: claim in frontmatter.
Mechanical check ($0). If YAML declares type: claim, this is a factual claim —
not an entity update or formatting fix. Must be classified STANDARD minimum
regardless of Haiku triage. Catches factual claims disguised as LIGHT content.
(Theseus: converts semantic problem to mechanical check)
"""
for line in diff.split("\n"):
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped in ("type: claim", 'type: "claim"', "type: 'claim'"):
return True
return False
def _deterministic_tier(diff: str) -> str | None:
"""Deterministic tier routing — skip Haiku triage for obvious cases.
Checks diff file patterns before calling the LLM. Returns tier string
if deterministic, None if Haiku triage is needed.
Rules (Leo-calibrated):
- All files in entities/ only → LIGHT
- All files in inbox/ only (queue, archive, null-result) → LIGHT
- Any file in core/ or foundations/ → DEEP (structural KB changes)
- Has challenged_by field → DEEP (challenges existing claims)
- Modifies existing file (not new) in domains/ → DEEP (enrichment/change)
- Otherwise → None (needs Haiku triage)
NOTE: Cross-domain wiki links are NOT a DEEP signal — most claims link
across domains, that's the whole point of the knowledge graph (Leo).
"""
changed_files = []
for line in diff.split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
changed_files.append(path)
if not changed_files:
return None
# All entities/ only → LIGHT
if all(f.startswith("entities/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in entities/)")
return "LIGHT"
# All inbox/ only (queue, archive, null-result) → LIGHT
if all(f.startswith("inbox/") for f in changed_files):
logger.info("Deterministic tier: LIGHT (all files in inbox/)")
return "LIGHT"
# Any file in core/ or foundations/ → DEEP (structural KB changes)
if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files):
logger.info("Deterministic tier: DEEP (touches core/ or foundations/)")
return "DEEP"
# Check diff content for DEEP signals
has_challenged_by = False
has_modified_claim = False
new_files: set[str] = set()
lines = diff.split("\n")
for i, line in enumerate(lines):
# Detect new files
if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"):
new_files.add(lines[i + 1][6:])
# Check for challenged_by field
if line.startswith("+") and not line.startswith("+++"):
stripped = line[1:].strip()
if stripped.startswith("challenged_by:"):
has_challenged_by = True
if has_challenged_by:
logger.info("Deterministic tier: DEEP (has challenged_by field)")
return "DEEP"
# NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments
# (appending evidence) are common and should be STANDARD. Let Haiku triage
# distinguish enrichments from structural changes.
return None
# ─── Verdict parsing ──────────────────────────────────────────────────────
def _parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
return "approve"
elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text:
return "request_changes"
else:
logger.warning("No parseable verdict from %s — treating as request_changes", reviewer)
return "request_changes"
# Map model-invented tags to valid tags. Models consistently ignore the valid
# tag list and invent their own. This normalizes them. (Ganymede, Mar 14)
_TAG_ALIASES: dict[str, str] = {
"schema_violation": "frontmatter_schema",
"missing_schema_fields": "frontmatter_schema",
"missing_schema": "frontmatter_schema",
"schema": "frontmatter_schema",
"missing_frontmatter": "frontmatter_schema",
"redundancy": "near_duplicate",
"duplicate": "near_duplicate",
"missing_confidence": "confidence_miscalibration",
"confidence_error": "confidence_miscalibration",
"vague_claims": "scope_error",
"unfalsifiable": "scope_error",
"unverified_wiki_links": "broken_wiki_links",
"unverified-wiki-links": "broken_wiki_links",
"missing_wiki_links": "broken_wiki_links",
"invalid_wiki_links": "broken_wiki_links",
"wiki_link_errors": "broken_wiki_links",
"overclaiming": "title_overclaims",
"title_overclaim": "title_overclaims",
"date_error": "date_errors",
"factual_error": "factual_discrepancy",
"factual_inaccuracy": "factual_discrepancy",
}
VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims",
"confidence_miscalibration", "date_errors", "factual_discrepancy",
"near_duplicate", "scope_error"}
def _normalize_tag(tag: str) -> str | None:
"""Normalize a model-generated tag to a valid tag, or None if unrecognizable."""
tag = tag.strip().lower().replace("-", "_")
if tag in VALID_ISSUE_TAGS:
return tag
if tag in _TAG_ALIASES:
return _TAG_ALIASES[tag]
# Fuzzy: check if any valid tag is a substring or vice versa
for valid in VALID_ISSUE_TAGS:
if valid in tag or tag in valid:
return valid
return None
def _parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review.
First tries structured <!-- ISSUES: tag1, tag2 --> comment with tag normalization.
Falls back to keyword inference from prose.
"""
match = re.search(r"<!-- ISSUES: ([^>]+) -->", review_text)
if match:
raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
normalized = []
for tag in raw_tags:
norm = _normalize_tag(tag)
if norm and norm not in normalized:
normalized.append(norm)
else:
logger.debug("Unrecognized issue tag '%s' — dropped", tag)
if normalized:
return normalized
# Fallback: infer tags from review prose
return _infer_issues_from_prose(review_text)
# Keyword patterns for inferring issue tags from unstructured review prose.
# Conservative: only match unambiguous indicators. Order doesn't matter.
_PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = {
"frontmatter_schema": [
re.compile(r"frontmatter", re.IGNORECASE),
re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE),
re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE),
re.compile(r"required field", re.IGNORECASE),
re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE),
re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE),
re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE),
],
"broken_wiki_links": [
re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE),
re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE),
re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE),
re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE),
],
"factual_discrepancy": [
re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE),
re.compile(r"misrepresent", re.IGNORECASE),
],
"confidence_miscalibration": [
re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE),
re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE),
],
"scope_error": [
re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE),
re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE),
re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE),
re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE),
],
"title_overclaims": [
re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE),
re.compile(r"overclaim", re.IGNORECASE),
],
"near_duplicate": [
re.compile(r"near.?duplicate", re.IGNORECASE),
re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE),
re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE),
re.compile(r"redundan", re.IGNORECASE),
],
}
def _infer_issues_from_prose(review_text: str) -> list[str]:
"""Infer issue tags from unstructured review text via keyword matching.
Fallback for reviews that reject without structured <!-- ISSUES: --> tags.
Conservative: requires at least one unambiguous keyword match per tag.
"""
inferred = []
for tag, patterns in _PROSE_TAG_PATTERNS.items():
if any(p.search(review_text) for p in patterns):
inferred.append(tag)
return inferred
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Retry budget helpers ─────────────────────────────────────────────────
async def _terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Get issue tags for structured feedback
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
issues = []
if row and row["eval_issues"]:
try:
issues = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
pass
# Post structured rejection comment with quality gate guidance (Epimetheus)
if issues:
feedback_body = format_rejection_comment(issues, source="eval_terminal")
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback.\n\n"
f"{feedback_body}"
)
else:
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback."
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": comment_body},
)
await forgejo_api(
"PATCH",
repo_path(f"pulls/{pr_number}"),
{"state": "closed"},
)
# Update PR status
conn.execute(
"UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?",
(reason, pr_number),
)
# Tag source for re-extraction with feedback
cursor = conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
if cursor.rowcount == 0:
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
db.audit(
conn,
"evaluate",
"pr_terminated",
json.dumps(
{
"pr": pr_number,
"reason": reason,
}
),
)
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
def _classify_issues(issues: list[str]) -> str:
"""Classify issue tags as 'mechanical', 'substantive', or 'mixed'."""
if not issues:
return "unknown"
mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS
substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS
if substantive and not mechanical:
return "substantive"
if mechanical and not substantive:
return "mechanical"
if mechanical and substantive:
return "mixed"
return "unknown" # tags not in either set
async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Attempt 1: normal — back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
if eval_attempts < 2:
# Attempt 1: post structured feedback so agent learns, but don't close
if all_issues:
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": feedback_body},
)
return
classification = _classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number,
eval_attempts,
all_issues,
)
db.audit(
conn,
"evaluate",
"mechanical_retry",
json.dumps(
{
"pr": pr_number,
"attempt": eval_attempts,
"issues": all_issues,
}
),
)
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number,
eval_attempts,
classification,
all_issues,
)
await _terminate_pr(
conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}"
)
# ─── Single PR evaluation ─────────────────────────────────────────────────
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
"""Evaluate a single PR. Returns result dict."""
# Check eval attempt budget before claiming
row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone()
eval_attempts = (row["eval_attempts"] or 0) if row else 0
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal — hard cap reached. Close PR, tag source.
logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS)
await _terminate_pr(conn, pr_number, "eval budget exhausted")
return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"}
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_number,),
)
if cursor.rowcount == 0:
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Increment eval_attempts — but not if this is a merge-failure re-entry (Ganymede+Rhea)
merge_cycled = conn.execute(
"SELECT merge_cycled FROM prs WHERE number = ?", (pr_number,)
).fetchone()
if merge_cycled and merge_cycled["merge_cycled"]:
# Merge cycling — don't burn eval budget, clear flag
conn.execute("UPDATE prs SET merge_cycled = 0 WHERE number = ?", (pr_number,))
logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_number)
else:
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?",
(pr_number,),
)
eval_attempts += 1
# Fetch diff
diff = await get_pr_diff(pr_number)
if not diff:
# Close PRs with no diff — stale branch, nothing to evaluate
conn.execute("UPDATE prs SET status='closed', last_error='closed: no diff against main (stale branch)' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "no_diff_closed"}
# Musings bypass
if _is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"""UPDATE prs SET status = 'approved', leo_verdict = 'skipped',
domain_verdict = 'skipped' WHERE number = ?""",
(pr_number,),
)
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
# NOTE: Tier 0.5 mechanical checks now run in validate stage (before eval).
# tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here.
# Filter diff
review_diff, _entity_diff = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Detect domain
domain = detect_domain_from_diff(diff)
agent = agent_for_domain(domain)
# Default NULL domain to 'general' (archive-only PRs have no domain files)
if domain is None:
domain = "general"
# Update PR domain if not set
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?",
(domain, agent, pr_number),
)
# Step 1: Triage (if not already triaged)
# Try deterministic routing first ($0), fall back to Haiku triage ($0.001)
if tier is None:
tier = _deterministic_tier(diff)
if tier is not None:
db.audit(
conn, "evaluate", "deterministic_tier",
json.dumps({"pr": pr_number, "tier": tier}),
)
else:
tier, triage_usage = await triage_pr(diff)
# Record triage cost
from . import costs
costs.record_usage(
conn, config.TRIAGE_MODEL, "eval_triage",
input_tokens=triage_usage.get("prompt_tokens", 0),
output_tokens=triage_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Tier overrides (claim-shape detector + random promotion)
# Order matters: claim-shape catches obvious cases, random promotion catches the rest.
# Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus)
if tier == "LIGHT" and _diff_contains_claim_type(diff):
tier = "STANDARD"
logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number)
db.audit(
conn, "evaluate", "claim_shape_upgrade", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"})
)
# Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio)
if tier == "LIGHT" and random.random() < config.LIGHT_PROMOTION_RATE:
tier = "STANDARD"
logger.info(
"PR #%d: random promotion LIGHT → STANDARD (%.0f%% rate)", pr_number, config.LIGHT_PROMOTION_RATE * 100
)
db.audit(conn, "evaluate", "random_promotion", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"}))
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
conn.execute(
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
(pr_number,),
)
# Check if domain review already completed (resuming after Leo rate limit)
existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone()
existing_domain_verdict = existing["domain_verdict"] if existing else "pending"
_existing_leo_verdict = existing["leo_verdict"] if existing else "pending"
# Step 2: Domain review (GPT-4o via OpenRouter)
# LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback)
# Skip if already completed from a previous attempt
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
domain_usage = {"prompt_tokens": 0, "completion_tokens": 0}
leo_usage = {"prompt_tokens": 0, "completion_tokens": 0}
if tier == "LIGHT" and config.LIGHT_SKIP_LLM:
domain_verdict = "skipped"
logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number)
conn.execute(
"UPDATE prs SET domain_verdict = 'skipped', domain_model = 'none' WHERE number = ?",
(pr_number,),
)
elif existing_domain_verdict not in ("pending", None):
domain_verdict = existing_domain_verdict
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review, domain_usage = await run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# OpenRouter failure (timeout, error) — revert to open for retry.
# NOT a rate limit — don't trigger 15-min backoff, just skip this PR.
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"}
domain_verdict = _parse_verdict(domain_review, agent)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
)
# Post domain review as comment (from agent's Forgejo account)
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": domain_review},
token=agent_tok,
)
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
domain_issues = _parse_issues(domain_review) if domain_review else []
conn.execute(
"""UPDATE prs SET status = 'open', leo_verdict = 'skipped',
last_error = 'domain review requested changes',
eval_issues = ?
WHERE number = ?""",
(json.dumps(domain_issues), pr_number),
)
db.audit(
conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues})
)
# Record structured review outcome
claim_files = [f for f in files if any(f.startswith(d) for d in ("domains/", "core/", "foundations/", "decisions/"))]
db.record_review(
conn, pr_number, reviewer=agent, outcome="rejected",
domain=domain, agent=agent, reviewer_model=config.EVAL_DOMAIN_MODEL,
rejection_reason=None, # TODO: parse from domain_issues when Leo starts tagging
notes=json.dumps(domain_issues) if domain_issues else None,
claims_in_batch=max(len(claim_files), 1),
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
return {
"pr": pr_number,
"domain_verdict": domain_verdict,
"leo_verdict": "skipped",
"eval_attempts": eval_attempts,
}
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
leo_verdict = "skipped"
leo_review = None # Initialize — used later for issue extraction
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review, leo_usage = await run_leo_review(review_diff, files, tier)
if leo_review is None:
# DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle).
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,))
reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed"
return {"pr": pr_number, "skipped": True, "reason": reason}
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": leo_review},
token=leo_tok,
)
else:
# LIGHT tier: Leo is auto-skipped, domain verdict is the only gate
conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,))
# Step 4: Determine final verdict
# "skipped" counts as approve (LIGHT skips both reviews deliberately)
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
if both_approve:
# Get PR author for formal approvals
pr_info = await forgejo_api(
"GET",
repo_path(f"pulls/{pr_number}"),
)
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
# Submit formal Forgejo reviews (required for merge)
await _post_formal_approvals(pr_number, pr_author)
conn.execute(
"UPDATE prs SET status = 'approved' WHERE number = ?",
(pr_number,),
)
db.audit(
conn,
"evaluate",
"approved",
json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent}),
)
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict)
# Record structured review outcome
claim_files = [f for f in files if any(f.startswith(d) for d in ("domains/", "core/", "foundations/", "decisions/"))]
db.record_review(
conn, pr_number, reviewer="leo", outcome="approved",
domain=domain, agent=agent,
reviewer_model=config.MODEL_SONNET if tier == "STANDARD" else "opus",
claims_in_batch=max(len(claim_files), 1),
)
else:
# Collect all issue tags from both reviews
all_issues = []
if domain_verdict == "request_changes" and domain_review is not None:
all_issues.extend(_parse_issues(domain_review))
if leo_verdict == "request_changes" and leo_review is not None:
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_number),
)
# Store feedback for re-extraction path
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_number),
)
db.audit(
conn,
"evaluate",
"changes_requested",
json.dumps(
{"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}
),
)
# Record structured review outcome for Leo rejection
claim_files = [f for f in files if any(f.startswith(d) for d in ("domains/", "core/", "foundations/", "decisions/"))]
reviewer = "leo" if leo_verdict == "request_changes" else agent
db.record_review(
conn, pr_number, reviewer=reviewer, outcome="rejected",
domain=domain, agent=agent,
reviewer_model=config.MODEL_SONNET if tier == "STANDARD" else "opus",
notes=json.dumps(all_issues) if all_issues else None,
claims_in_batch=max(len(claim_files), 1),
)
logger.info(
"PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)",
pr_number,
leo_verdict,
domain_verdict,
all_issues,
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
# Record cost (only for reviews that actually ran)
from . import costs
if domain_verdict != "skipped":
costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=domain_usage.get("prompt_tokens", 0),
output_tokens=domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
if leo_verdict not in ("skipped",):
if tier == "DEEP":
costs.record_usage(
conn, config.EVAL_LEO_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="max",
duration_ms=leo_usage.get("duration_ms", 0),
cache_read_tokens=leo_usage.get("cache_read_tokens", 0),
cache_write_tokens=leo_usage.get("cache_write_tokens", 0),
cost_estimate_usd=leo_usage.get("cost_estimate_usd", 0.0),
)
else:
costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
return {
"pr": pr_number,
"tier": tier,
"domain": domain,
"leo_verdict": leo_verdict,
"domain_verdict": domain_verdict,
"approved": both_approve,
}
# ─── Rate limit backoff ───────────────────────────────────────────────────
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
# CLI calls overnight when Opus is exhausted.
_rate_limit_backoff_until: datetime | None = None
_RATE_LIMIT_BACKOFF_MINUTES = 15
# ─── Batch domain review ─────────────────────────────────────────────────
def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]:
"""Parse batched domain review into per-PR review sections.
Returns {pr_number: review_text} for each PR found in the response.
Missing PRs are omitted — caller handles fallback.
"""
agent_upper = agent.upper()
result: dict[int, str] = {}
# Split by PR verdict markers: <!-- PR:NNN VERDICT:AGENT:... -->
# Each marker terminates the previous PR's section
pattern = re.compile(
r"<!-- PR:(\d+) VERDICT:" + re.escape(agent_upper) + r":(APPROVE|REQUEST_CHANGES) -->"
)
matches = list(pattern.finditer(response))
if not matches:
return result
for i, match in enumerate(matches):
pr_num = int(match.group(1))
verdict = match.group(2)
marker_end = match.end()
# Find the start of this PR's section by looking for the section header
# or the end of the previous verdict
section_header = f"=== PR #{pr_num}"
header_pos = response.rfind(section_header, 0, match.start())
if header_pos >= 0:
# Extract from header to end of verdict marker
section_text = response[header_pos:marker_end].strip()
else:
# No header found — extract from previous marker end to this marker end
prev_end = matches[i - 1].end() if i > 0 else 0
section_text = response[prev_end:marker_end].strip()
# Re-format as individual review comment
# Strip the batch section header, keep just the review content
# Add batch label for traceability
pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers)
review_text = (
f"*(batch review with PRs {pr_nums_str})*\n\n"
f"{section_text}\n"
)
result[pr_num] = review_text
return result
def _validate_batch_fanout(
parsed: dict[int, str],
pr_diffs: list[dict],
agent: str,
) -> tuple[dict[int, str], list[int]]:
"""Validate batch fan-out for completeness and cross-contamination.
Returns (valid_reviews, fallback_pr_numbers).
- valid_reviews: reviews that passed validation
- fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated)
"""
valid: dict[int, str] = {}
fallback: list[int] = []
# Build file map: pr_number → set of path segments for matching.
# Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames
# to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3).
pr_files: dict[int, set[str]] = {}
for pr in pr_diffs:
files = set()
for line in pr["diff"].split("\n"):
if line.startswith("diff --git a/"):
path = line.replace("diff --git a/", "").split(" b/")[0]
files.add(path)
# Also add the last 2 path segments (e.g., "internet-finance/dao.md")
# for models that abbreviate paths
parts = path.split("/")
if len(parts) >= 2:
files.add("/".join(parts[-2:]))
pr_files[pr["number"]] = files
for pr in pr_diffs:
pr_num = pr["number"]
# Completeness check: is there a review for this PR?
if pr_num not in parsed:
logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num)
fallback.append(pr_num)
continue
review = parsed[pr_num]
# Cross-contamination check: does review mention at least one file from this PR?
# Use path segments (min 10 chars) to avoid false substring matches on short names.
my_files = pr_files.get(pr_num, set())
mentions_own_file = any(f in review for f in my_files if len(f) >= 10)
if not mentions_own_file and my_files:
# Check if it references files from OTHER PRs (cross-contamination signal)
other_files = set()
for other_pr in pr_diffs:
if other_pr["number"] != pr_num:
other_files.update(pr_files.get(other_pr["number"], set()))
mentions_other = any(f in review for f in other_files if len(f) >= 10)
if mentions_other:
logger.warning(
"Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback",
pr_num,
)
fallback.append(pr_num)
continue
# If it doesn't mention any files at all, could be a generic review — accept it
# (some PRs have short diffs where the model doesn't reference filenames)
valid[pr_num] = review
return valid, fallback
async def _run_batch_domain_eval(
conn, batch_prs: list[dict], domain: str, agent: str,
) -> tuple[int, int]:
"""Execute batch domain review for a group of same-domain STANDARD PRs.
1. Claim all PRs atomically
2. Run single batch domain review
3. Parse + validate fan-out
4. Post per-PR comments
5. Continue to individual Leo review for each
6. Fall back to individual review for any validation failures
Returns (succeeded, failed).
"""
from .forgejo import get_pr_diff as _get_pr_diff
succeeded = 0
failed = 0
# Step 1: Fetch diffs and build batch
pr_diffs = []
claimed_prs = []
for pr_row in batch_prs:
pr_num = pr_row["number"]
# Atomic claim
cursor = conn.execute(
"UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'",
(pr_num,),
)
if cursor.rowcount == 0:
continue
# Increment eval_attempts — skip if merge-cycled (Ganymede+Rhea)
mc_row = conn.execute("SELECT merge_cycled FROM prs WHERE number = ?", (pr_num,)).fetchone()
if mc_row and mc_row["merge_cycled"]:
conn.execute(
"UPDATE prs SET merge_cycled = 0, last_attempt = datetime('now') WHERE number = ?",
(pr_num,),
)
logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_num)
else:
conn.execute(
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1, "
"last_attempt = datetime('now') WHERE number = ?",
(pr_num,),
)
diff = await _get_pr_diff(pr_num)
if not diff:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
continue
# Musings bypass
if _is_musings_only(diff):
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
"UPDATE prs SET status = 'approved', leo_verdict = 'skipped', "
"domain_verdict = 'skipped' WHERE number = ?",
(pr_num,),
)
succeeded += 1
continue
review_diff, _ = _filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
# Build label from branch name or first claim filename
branch = pr_row.get("branch", "")
label = branch.split("/")[-1][:60] if branch else f"pr-{pr_num}"
pr_diffs.append({
"number": pr_num,
"label": label,
"diff": review_diff,
"files": files,
"full_diff": diff, # kept for Leo review
"file_count": len([l for l in files.split("\n") if l.strip()]),
})
claimed_prs.append(pr_num)
if not pr_diffs:
return 0, 0
# Enforce BATCH_EVAL_MAX_DIFF_BYTES — split if total diff is too large.
# We only know diff sizes after fetching, so enforce here not in _build_domain_batches.
total_bytes = sum(len(p["diff"].encode()) for p in pr_diffs)
if total_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and len(pr_diffs) > 1:
# Keep PRs up to the byte cap, revert the rest to open for next cycle
kept = []
running_bytes = 0
for p in pr_diffs:
p_bytes = len(p["diff"].encode())
if running_bytes + p_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and kept:
break
kept.append(p)
running_bytes += p_bytes
overflow = [p for p in pr_diffs if p not in kept]
for p in overflow:
conn.execute(
"UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 "
"WHERE number = ?",
(p["number"],),
)
claimed_prs.remove(p["number"])
logger.info(
"PR #%d: diff too large for batch (%d bytes total), deferring to next cycle",
p["number"], total_bytes,
)
pr_diffs = kept
if not pr_diffs:
return 0, 0
# Detect domain for all PRs (should be same domain)
conn.execute(
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number IN ({})".format(
",".join("?" * len(claimed_prs))
),
[domain, agent] + claimed_prs,
)
# Step 2: Run batch domain review
logger.info(
"Batch domain review: %d PRs in %s domain (PRs: %s)",
len(pr_diffs),
domain,
", ".join(f"#{p['number']}" for p in pr_diffs),
)
batch_response, batch_domain_usage = await run_batch_domain_review(pr_diffs, domain, agent)
if batch_response is None:
# Complete failure — revert all to open
logger.warning("Batch domain review failed — reverting all PRs to open")
for pr_num in claimed_prs:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
return 0, len(claimed_prs)
# Step 3: Parse + validate fan-out
parsed = _parse_batch_response(batch_response, claimed_prs, agent)
valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent)
db.audit(
conn, "evaluate", "batch_domain_review",
json.dumps({
"domain": domain,
"batch_size": len(pr_diffs),
"valid": len(valid_reviews),
"fallback": fallback_prs,
}),
)
# Record batch domain review cost ONCE for the whole batch (not per-PR)
from . import costs
costs.record_usage(
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
input_tokens=batch_domain_usage.get("prompt_tokens", 0),
output_tokens=batch_domain_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Step 4: Process valid reviews — post comments + continue to Leo
for pr_data in pr_diffs:
pr_num = pr_data["number"]
if pr_num in fallback_prs:
# Revert — will be picked up by individual eval next cycle
conn.execute(
"UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 "
"WHERE number = ?",
(pr_num,),
)
logger.info("PR #%d: batch fallback — will retry individually", pr_num)
continue
if pr_num not in valid_reviews:
# Should not happen, but safety
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
continue
review_text = valid_reviews[pr_num]
domain_verdict = _parse_verdict(review_text, agent)
# Post domain review comment
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": review_text},
token=agent_tok,
)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num),
)
# If domain rejects, handle disposition (same as individual path)
if domain_verdict == "request_changes":
domain_issues = _parse_issues(review_text)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
conn.execute(
"UPDATE prs SET status = 'open', leo_verdict = 'skipped', "
"last_error = 'domain review requested changes', eval_issues = ? WHERE number = ?",
(json.dumps(domain_issues), pr_num),
)
db.audit(
conn, "evaluate", "domain_rejected",
json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}),
)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues)
succeeded += 1
continue
# Domain approved — continue to individual Leo review
logger.info("PR #%d: batch domain approved, proceeding to individual Leo review", pr_num)
review_diff = pr_data["diff"]
files = pr_data["files"]
leo_review, leo_usage = await run_leo_review(review_diff, files, "STANDARD")
if leo_review is None:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
logger.debug("PR #%d: Leo review failed, will retry next cycle", pr_num)
continue
if leo_review == "RATE_LIMITED":
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,))
logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num)
continue
leo_verdict = _parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num))
# Post Leo review
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
{"body": leo_review},
token=leo_tok,
)
costs.record_usage(
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
input_tokens=leo_usage.get("prompt_tokens", 0),
output_tokens=leo_usage.get("completion_tokens", 0),
backend="openrouter",
)
# Final verdict
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
if both_approve:
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
await _post_formal_approvals(pr_num, pr_author)
conn.execute("UPDATE prs SET status = 'approved' WHERE number = ?", (pr_num,))
db.audit(
conn, "evaluate", "approved",
json.dumps({"pr": pr_num, "tier": "STANDARD", "domain": domain,
"leo": leo_verdict, "domain_agent": agent, "batch": True}),
)
logger.info("PR #%d: APPROVED (batch domain + individual Leo)", pr_num)
else:
all_issues = []
if leo_verdict == "request_changes":
all_issues.extend(_parse_issues(leo_review))
conn.execute(
"UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?",
(json.dumps(all_issues), pr_num),
)
feedback = {"leo": leo_verdict, "domain": domain_verdict,
"tier": "STANDARD", "issues": all_issues}
conn.execute(
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
(json.dumps(feedback), pr_num),
)
db.audit(
conn, "evaluate", "changes_requested",
json.dumps({"pr": pr_num, "tier": "STANDARD", "leo": leo_verdict,
"domain": domain_verdict, "issues": all_issues, "batch": True}),
)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues)
succeeded += 1
return succeeded, failed
def _build_domain_batches(
rows: list, conn,
) -> tuple[dict[str, list[dict]], list[dict]]:
"""Group STANDARD PRs by domain for batch eval. DEEP and LIGHT stay individual.
Returns (batches_by_domain, individual_prs).
Respects BATCH_EVAL_MAX_PRS and BATCH_EVAL_MAX_DIFF_BYTES.
"""
domain_candidates: dict[str, list[dict]] = {}
individual: list[dict] = []
for row in rows:
pr_num = row["number"]
tier = row["tier"]
# Only batch STANDARD PRs with pending domain review
if tier != "STANDARD":
individual.append(row)
continue
# Check if domain review already done (resuming after Leo rate limit)
existing = conn.execute(
"SELECT domain_verdict, domain FROM prs WHERE number = ?", (pr_num,)
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
individual.append(row)
continue
domain = existing["domain"] if existing and existing["domain"] else "general"
domain_candidates.setdefault(domain, []).append(row)
# Build sized batches per domain
batches: dict[str, list[dict]] = {}
for domain, prs in domain_candidates.items():
if len(prs) == 1:
# Single PR — no batching benefit, process individually
individual.extend(prs)
continue
# Cap at BATCH_EVAL_MAX_PRS
batch = prs[: config.BATCH_EVAL_MAX_PRS]
batches[domain] = batch
# Overflow goes individual
individual.extend(prs[config.BATCH_EVAL_MAX_PRS :])
return batches, individual
# ─── Main entry point ──────────────────────────────────────────────────────
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""Run one evaluation cycle.
Groups eligible STANDARD PRs by domain for batch domain review.
DEEP PRs get individual eval. LIGHT PRs get auto-approved.
Leo review always individual (safety net for batch cross-contamination).
"""
global _rate_limit_backoff_until
# Check if we're in Opus rate-limit backoff
opus_backoff = False
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining)
opus_backoff = True
else:
logger.info("Rate limit backoff expired, resuming full eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation
if opus_backoff:
verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))"
else:
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
# Stagger removed — migration protection no longer needed. Merge is domain-serialized
# and entity conflicts auto-resolve. Safe to let all eligible PRs enter eval. (Cory, Mar 14)
rows = conn.execute(
f"""SELECT p.number, p.tier, p.branch, p.domain FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS}
{verdict_filter}
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
CASE WHEN COALESCE(p.eval_attempts, 0) = 0 THEN 0 ELSE 1 END,
CASE COALESCE(p.priority, s.priority, 'medium')
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END,
p.created_at ASC
LIMIT ?""",
(max_workers or config.MAX_EVAL_WORKERS,),
).fetchall()
if not rows:
return 0, 0
succeeded = 0
failed = 0
# Group STANDARD PRs by domain for batch eval
domain_batches, individual_prs = _build_domain_batches(rows, conn)
# Process batch domain reviews first
for domain, batch_prs in domain_batches.items():
try:
agent = agent_for_domain(domain)
b_succeeded, b_failed = await _run_batch_domain_eval(
conn, batch_prs, domain, agent,
)
succeeded += b_succeeded
failed += b_failed
except Exception:
logger.exception("Batch eval failed for domain %s", domain)
# Revert all to open
for pr_row in batch_prs:
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_row["number"],))
failed += len(batch_prs)
# Process individual PRs (DEEP, LIGHT, single-domain, fallback)
for row in individual_prs:
try:
if opus_backoff and row["tier"] == "DEEP":
existing = conn.execute(
"SELECT domain_verdict FROM prs WHERE number = ?",
(row["number"],),
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
logger.debug(
"PR #%d: skipping DEEP during Opus backoff (domain already %s)",
row["number"],
existing["domain_verdict"],
)
continue
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], reason)
if "rate_limited" in reason:
from datetime import timedelta
if reason == "opus_rate_limited":
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
opus_backoff = True
logger.info(
"Opus rate limited — backing off Opus for %d min, continuing triage+domain",
_RATE_LIMIT_BACKOFF_MINUTES,
)
continue
else:
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info(
"Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES
)
break
else:
succeeded += 1
except Exception:
logger.exception("Failed to evaluate PR #%d", row["number"])
failed += 1
conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],))
if succeeded or failed:
logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed)
return succeeded, failed