Some checks are pending
CI / lint-and-test (push) Waiting to run
Critical bug fix: close_pr now checks forgejo_api return value and skips DB update on Forgejo failure, preventing ghost PRs (DB closed, Forgejo open). Returns bool so callers can handle failures. _terminate_pr checks return value — skips source requeue on failure. stale_pr.py migrated from raw Forgejo+DB to close_pr (last raw close transition eliminated). eval_parse.py: 15 pure parsing functions extracted from evaluate.py (~370 lines removed). Zero I/O, zero async, independently testable. evaluate.py drops from ~1510 to ~1140 lines. Tests: 295 passed (42 new eval_parse + 2 new close_pr), zero regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1062 lines
43 KiB
Python
1062 lines
43 KiB
Python
"""Evaluate stage — PR lifecycle orchestration.
|
|
|
|
Tier-based review routing. Model diversity: GPT-4o (domain) + Sonnet (Leo STANDARD)
|
|
+ Opus (Leo DEEP) = two model families, no correlated blind spots.
|
|
|
|
Flow per PR:
|
|
1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT
|
|
2. Tier overrides:
|
|
a. Claim-shape detector: type: claim in YAML → STANDARD min (Theseus)
|
|
b. Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio)
|
|
3. Domain review → GPT-4o (OpenRouter) — skipped for LIGHT when LIGHT_SKIP_LLM=True
|
|
4. Leo review → Opus DEEP / Sonnet STANDARD (OpenRouter) — skipped for LIGHT
|
|
5. Post reviews, submit formal Forgejo approvals, update SQLite
|
|
6. If both approve → status = 'approved' (merge module picks it up)
|
|
7. Retry budget: 3 attempts max, disposition on attempt 2+
|
|
|
|
Design reviewed by Ganymede, Rio, Theseus, Rhea, Leo.
|
|
LLM transport and prompts extracted to lib/llm.py (Phase 3c).
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import random
|
|
from datetime import datetime, timezone
|
|
|
|
from . import config, db
|
|
from .domains import agent_for_domain, detect_domain_from_branch, detect_domain_from_diff
|
|
from .eval_parse import (
|
|
VALID_ISSUE_TAGS,
|
|
_classify_issues,
|
|
_deterministic_tier,
|
|
_diff_contains_claim_type,
|
|
_extract_changed_files,
|
|
_filter_diff,
|
|
_infer_issues_from_prose,
|
|
_is_musings_only,
|
|
_normalize_tag,
|
|
_parse_batch_response,
|
|
_parse_issues,
|
|
_parse_verdict,
|
|
_validate_batch_fanout,
|
|
)
|
|
from .forgejo import api as forgejo_api
|
|
from .forgejo import get_agent_token, get_pr_diff, repo_path
|
|
from .merge import PIPELINE_OWNED_PREFIXES
|
|
from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr
|
|
from .feedback import format_rejection_comment
|
|
from .pr_state import approve_pr, close_pr, reopen_pr, start_review
|
|
from .validate import load_existing_claims
|
|
|
|
logger = logging.getLogger("pipeline.evaluate")
|
|
|
|
|
|
# ─── NOTE: Tier 0.5 mechanical pre-check moved to validate.py ────────────
|
|
# Tier 0.5 now runs as part of the validate stage (before eval), not inside
|
|
# evaluate_pr(). This prevents wasting eval_attempts on mechanically fixable
|
|
# PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed.
|
|
|
|
|
|
async def _post_formal_approvals(pr_number: int, pr_author: str):
|
|
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
|
|
approvals = 0
|
|
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
|
|
if agent_name == pr_author:
|
|
continue
|
|
if approvals >= 2:
|
|
break
|
|
token = get_agent_token(agent_name)
|
|
if token:
|
|
result = await forgejo_api(
|
|
"POST",
|
|
repo_path(f"pulls/{pr_number}/reviews"),
|
|
{"body": "Approved.", "event": "APPROVED"},
|
|
token=token,
|
|
)
|
|
if result is not None:
|
|
approvals += 1
|
|
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
|
|
|
|
|
|
# ─── Retry budget helpers ─────────────────────────────────────────────────
|
|
|
|
|
|
async def _terminate_pr(conn, pr_number: int, reason: str):
|
|
"""Terminal state: close PR on Forgejo, mark source needs_human."""
|
|
# Get issue tags for structured feedback
|
|
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
issues = []
|
|
if row and row["eval_issues"]:
|
|
try:
|
|
issues = json.loads(row["eval_issues"])
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
# Post structured rejection comment with quality gate guidance (Epimetheus)
|
|
if issues:
|
|
feedback_body = format_rejection_comment(issues, source="eval_terminal")
|
|
comment_body = (
|
|
f"**Closed by eval pipeline** — {reason}.\n\n"
|
|
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
|
|
f"Source will be re-queued with feedback.\n\n"
|
|
f"{feedback_body}"
|
|
)
|
|
else:
|
|
comment_body = (
|
|
f"**Closed by eval pipeline** — {reason}.\n\n"
|
|
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
|
|
f"Source will be re-queued with feedback."
|
|
)
|
|
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": comment_body},
|
|
)
|
|
closed = await close_pr(conn, pr_number, last_error=reason)
|
|
if not closed:
|
|
logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number)
|
|
return
|
|
|
|
# Tag source for re-extraction with feedback
|
|
cursor = conn.execute(
|
|
"""UPDATE sources SET status = 'needs_reextraction',
|
|
updated_at = datetime('now')
|
|
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
|
|
(pr_number,),
|
|
)
|
|
if cursor.rowcount == 0:
|
|
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
|
|
|
|
db.audit(
|
|
conn,
|
|
"evaluate",
|
|
"pr_terminated",
|
|
json.dumps(
|
|
{
|
|
"pr": pr_number,
|
|
"reason": reason,
|
|
}
|
|
),
|
|
)
|
|
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
|
|
|
|
|
|
async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
|
|
"""Disposition logic for rejected PRs on attempt 2+.
|
|
|
|
Attempt 1: normal — back to open, wait for fix.
|
|
Attempt 2: check issue classification.
|
|
- Mechanical only: keep open for one more attempt (auto-fix future).
|
|
- Substantive or mixed: close PR, requeue source.
|
|
Attempt 3+: terminal.
|
|
"""
|
|
if eval_attempts < 2:
|
|
# Attempt 1: post structured feedback so agent learns, but don't close
|
|
if all_issues:
|
|
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": feedback_body},
|
|
)
|
|
return
|
|
|
|
classification = _classify_issues(all_issues)
|
|
|
|
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
|
|
# Terminal
|
|
await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
|
|
return
|
|
|
|
if classification == "mechanical":
|
|
# Mechanical issues only — keep open for one more attempt.
|
|
# Future: auto-fix module will push fixes here.
|
|
logger.info(
|
|
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
|
|
pr_number,
|
|
eval_attempts,
|
|
all_issues,
|
|
)
|
|
db.audit(
|
|
conn,
|
|
"evaluate",
|
|
"mechanical_retry",
|
|
json.dumps(
|
|
{
|
|
"pr": pr_number,
|
|
"attempt": eval_attempts,
|
|
"issues": all_issues,
|
|
}
|
|
),
|
|
)
|
|
else:
|
|
# Substantive, mixed, or unknown — close and requeue
|
|
logger.info(
|
|
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
|
|
pr_number,
|
|
eval_attempts,
|
|
classification,
|
|
all_issues,
|
|
)
|
|
await _terminate_pr(
|
|
conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}"
|
|
)
|
|
|
|
|
|
# ─── Single PR evaluation ─────────────────────────────────────────────────
|
|
|
|
|
|
async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
|
|
"""Evaluate a single PR. Returns result dict."""
|
|
from . import costs
|
|
pr_cost = 0.0
|
|
|
|
# Check eval attempt budget before claiming
|
|
row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
eval_attempts = (row["eval_attempts"] or 0) if row else 0
|
|
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
|
|
# Terminal — hard cap reached. Close PR, tag source.
|
|
logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS)
|
|
await _terminate_pr(conn, pr_number, "eval budget exhausted")
|
|
return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"}
|
|
|
|
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
|
|
if not start_review(conn, pr_number):
|
|
logger.debug("PR #%d already claimed by another worker, skipping", pr_number)
|
|
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
|
|
|
|
# Increment eval_attempts — but not if this is a merge-failure re-entry (Ganymede+Rhea)
|
|
merge_cycled = conn.execute(
|
|
"SELECT merge_cycled FROM prs WHERE number = ?", (pr_number,)
|
|
).fetchone()
|
|
if merge_cycled and merge_cycled["merge_cycled"]:
|
|
# Merge cycling — don't burn eval budget, clear flag
|
|
conn.execute("UPDATE prs SET merge_cycled = 0 WHERE number = ?", (pr_number,))
|
|
logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_number)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?",
|
|
(pr_number,),
|
|
)
|
|
eval_attempts += 1
|
|
|
|
# Fetch diff
|
|
diff = await get_pr_diff(pr_number)
|
|
if not diff:
|
|
# Close PRs with no diff — stale branch, nothing to evaluate
|
|
await close_pr(conn, pr_number, last_error='closed: no diff against main (stale branch)')
|
|
return {"pr": pr_number, "skipped": True, "reason": "no_diff_closed"}
|
|
|
|
# Musings bypass
|
|
if _is_musings_only(diff):
|
|
logger.info("PR #%d is musings-only — auto-approving", pr_number)
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": "Auto-approved: musings bypass eval per collective policy."},
|
|
)
|
|
approve_pr(conn, pr_number, domain='cross-domain', auto_merge=1,
|
|
leo_verdict='skipped', domain_verdict='skipped')
|
|
return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"}
|
|
|
|
# Reweave bypass — reweave PRs only add frontmatter edges (supports/challenges/
|
|
# related/depends_on/challenged_by). The eval LLM has no context for judging
|
|
# edge correctness and consistently flags factual_discrepancy on valid edges.
|
|
# Leo's manual PR review is the real quality gate for reweave.
|
|
branch_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
branch_name = branch_row["branch"] if branch_row else ""
|
|
if branch_name.startswith("reweave/"):
|
|
logger.info("PR #%d is reweave (branch=%s) — auto-approving, Leo reviews manually", pr_number, branch_name)
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": "Auto-approved: reweave structural update (frontmatter edges only). Leo reviews manually."},
|
|
)
|
|
approve_pr(conn, pr_number, domain='cross-domain', auto_merge=1,
|
|
leo_verdict='skipped', domain_verdict='skipped')
|
|
db.audit(
|
|
conn, "evaluate", "reweave_bypass",
|
|
json.dumps({"pr": pr_number, "branch": branch_name}),
|
|
)
|
|
return {"pr": pr_number, "auto_approved": True, "reason": "reweave_bypass"}
|
|
|
|
# NOTE: Tier 0.5 mechanical checks now run in validate stage (before eval).
|
|
# tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here.
|
|
|
|
# Filter diff
|
|
review_diff, _entity_diff = _filter_diff(diff)
|
|
if not review_diff:
|
|
review_diff = diff
|
|
files = _extract_changed_files(diff)
|
|
|
|
# Detect domain — try diff paths first, then branch prefix, then 'general'
|
|
domain = detect_domain_from_diff(diff)
|
|
if domain is None:
|
|
pr_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
if pr_row and pr_row["branch"]:
|
|
domain = detect_domain_from_branch(pr_row["branch"])
|
|
if domain is None:
|
|
domain = "general"
|
|
agent = agent_for_domain(domain)
|
|
|
|
# Update PR domain if not set
|
|
conn.execute(
|
|
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?",
|
|
(domain, agent, pr_number),
|
|
)
|
|
|
|
# Step 1: Triage (if not already triaged)
|
|
# Try deterministic routing first ($0), fall back to Haiku triage ($0.001)
|
|
if tier is None:
|
|
tier = _deterministic_tier(diff)
|
|
if tier is not None:
|
|
db.audit(
|
|
conn, "evaluate", "deterministic_tier",
|
|
json.dumps({"pr": pr_number, "tier": tier}),
|
|
)
|
|
else:
|
|
tier, triage_usage, _triage_reason = await triage_pr(diff)
|
|
pr_cost += costs.record_usage(
|
|
conn, config.TRIAGE_MODEL, "eval_triage",
|
|
input_tokens=triage_usage.get("prompt_tokens", 0),
|
|
output_tokens=triage_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
|
|
# Tier overrides (claim-shape detector + random promotion)
|
|
# Order matters: claim-shape catches obvious cases, random promotion catches the rest.
|
|
|
|
# Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus)
|
|
if tier == "LIGHT" and _diff_contains_claim_type(diff):
|
|
tier = "STANDARD"
|
|
logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number)
|
|
db.audit(
|
|
conn, "evaluate", "claim_shape_upgrade", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"})
|
|
)
|
|
|
|
# Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio)
|
|
if tier == "LIGHT" and random.random() < config.LIGHT_PROMOTION_RATE:
|
|
tier = "STANDARD"
|
|
logger.info(
|
|
"PR #%d: random promotion LIGHT → STANDARD (%.0f%% rate)", pr_number, config.LIGHT_PROMOTION_RATE * 100
|
|
)
|
|
db.audit(conn, "evaluate", "random_promotion", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"}))
|
|
|
|
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
|
|
|
|
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
|
|
conn.execute(
|
|
"UPDATE prs SET last_attempt = datetime('now') WHERE number = ?",
|
|
(pr_number,),
|
|
)
|
|
|
|
# Check if domain review already completed (resuming after Leo rate limit)
|
|
existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
existing_domain_verdict = existing["domain_verdict"] if existing else "pending"
|
|
_existing_leo_verdict = existing["leo_verdict"] if existing else "pending"
|
|
|
|
# Step 2: Domain review (GPT-4o via OpenRouter)
|
|
# LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback)
|
|
# Skip if already completed from a previous attempt
|
|
domain_review = None # Initialize — used later for feedback extraction (Ganymede #12)
|
|
domain_usage = {"prompt_tokens": 0, "completion_tokens": 0}
|
|
leo_usage = {"prompt_tokens": 0, "completion_tokens": 0}
|
|
if tier == "LIGHT" and config.LIGHT_SKIP_LLM:
|
|
domain_verdict = "skipped"
|
|
logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number)
|
|
conn.execute(
|
|
"UPDATE prs SET domain_verdict = 'skipped', domain_model = 'none' WHERE number = ?",
|
|
(pr_number,),
|
|
)
|
|
elif existing_domain_verdict not in ("pending", None):
|
|
domain_verdict = existing_domain_verdict
|
|
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
|
|
else:
|
|
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
|
|
domain_review, domain_usage = await run_domain_review(review_diff, files, domain or "general", agent)
|
|
|
|
if domain_review is None:
|
|
# OpenRouter failure (timeout, error) — revert to open for retry.
|
|
# NOT a rate limit — don't trigger 15-min backoff, just skip this PR.
|
|
reopen_pr(conn, pr_number)
|
|
if pr_cost > 0:
|
|
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
|
|
return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"}
|
|
|
|
domain_verdict = _parse_verdict(domain_review, agent)
|
|
conn.execute(
|
|
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
|
|
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
|
|
)
|
|
|
|
# Post domain review as comment (from agent's Forgejo account)
|
|
agent_tok = get_agent_token(agent)
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": domain_review},
|
|
token=agent_tok,
|
|
)
|
|
|
|
# If domain review rejects, skip Leo review (save Opus)
|
|
if domain_verdict == "request_changes":
|
|
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
|
|
domain_issues = _parse_issues(domain_review) if domain_review else []
|
|
reopen_pr(conn, pr_number, leo_verdict='skipped',
|
|
last_error='domain review requested changes',
|
|
eval_issues=json.dumps(domain_issues))
|
|
db.audit(
|
|
conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues})
|
|
)
|
|
db.record_review(
|
|
conn, pr_number, "rejected",
|
|
domain=domain, agent=agent, reviewer=agent, reviewer_model="gpt-4o",
|
|
notes=(domain_review or "")[:4000],
|
|
)
|
|
|
|
# Disposition: check if this PR should be terminated or kept open
|
|
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
|
|
|
|
if domain_verdict != "skipped":
|
|
pr_cost += costs.record_usage(
|
|
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
|
|
input_tokens=domain_usage.get("prompt_tokens", 0),
|
|
output_tokens=domain_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
if pr_cost > 0:
|
|
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
|
|
return {
|
|
"pr": pr_number,
|
|
"domain_verdict": domain_verdict,
|
|
"leo_verdict": "skipped",
|
|
"eval_attempts": eval_attempts,
|
|
}
|
|
|
|
# Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT)
|
|
leo_verdict = "skipped"
|
|
leo_review = None # Initialize — used later for issue extraction
|
|
if tier != "LIGHT":
|
|
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
|
|
leo_review, leo_usage = await run_leo_review(review_diff, files, tier)
|
|
|
|
if leo_review is None:
|
|
# DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle).
|
|
reopen_pr(conn, pr_number)
|
|
if domain_verdict != "skipped":
|
|
pr_cost += costs.record_usage(
|
|
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
|
|
input_tokens=domain_usage.get("prompt_tokens", 0),
|
|
output_tokens=domain_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
if pr_cost > 0:
|
|
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
|
|
reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed"
|
|
return {"pr": pr_number, "skipped": True, "reason": reason}
|
|
|
|
leo_verdict = _parse_verdict(leo_review, "LEO")
|
|
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
|
|
|
|
# Post Leo review as comment (from Leo's Forgejo account)
|
|
leo_tok = get_agent_token("Leo")
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_number}/comments"),
|
|
{"body": leo_review},
|
|
token=leo_tok,
|
|
)
|
|
else:
|
|
# LIGHT tier: Leo is auto-skipped, domain verdict is the only gate
|
|
conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,))
|
|
|
|
# Step 4: Determine final verdict
|
|
# "skipped" counts as approve (LIGHT skips both reviews deliberately)
|
|
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
|
|
|
|
if both_approve:
|
|
# Get PR author for formal approvals
|
|
pr_info = await forgejo_api(
|
|
"GET",
|
|
repo_path(f"pulls/{pr_number}"),
|
|
)
|
|
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
|
|
|
|
# Submit formal Forgejo reviews (required for merge)
|
|
await _post_formal_approvals(pr_number, pr_author)
|
|
|
|
# Auto-merge agent PRs: if branch is NOT pipeline-owned, set auto_merge=1
|
|
# so the merge cycle picks it up without manual intervention.
|
|
branch_row = conn.execute("SELECT branch FROM prs WHERE number = ?", (pr_number,)).fetchone()
|
|
branch_name = branch_row["branch"] if branch_row else ""
|
|
is_agent_pr = not branch_name.startswith(PIPELINE_OWNED_PREFIXES)
|
|
|
|
approve_pr(conn, pr_number, domain=domain, auto_merge=1 if is_agent_pr else 0)
|
|
db.audit(
|
|
conn,
|
|
"evaluate",
|
|
"approved",
|
|
json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent,
|
|
"auto_merge": is_agent_pr}),
|
|
)
|
|
db.record_review(
|
|
conn, pr_number, "approved",
|
|
domain=domain, agent=agent, reviewer="leo", reviewer_model="sonnet" if tier == "STANDARD" else "opus",
|
|
notes=(leo_review or "")[:4000] if leo_review else None,
|
|
)
|
|
if is_agent_pr:
|
|
logger.info("PR #%d: APPROVED + auto_merge (agent branch %s)", pr_number, branch_name)
|
|
else:
|
|
logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict)
|
|
else:
|
|
# Collect all issue tags from both reviews
|
|
all_issues = []
|
|
if domain_verdict == "request_changes" and domain_review is not None:
|
|
all_issues.extend(_parse_issues(domain_review))
|
|
if leo_verdict == "request_changes" and leo_review is not None:
|
|
all_issues.extend(_parse_issues(leo_review))
|
|
|
|
reopen_pr(conn, pr_number, eval_issues=json.dumps(all_issues))
|
|
# Store feedback for re-extraction path
|
|
feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues}
|
|
conn.execute(
|
|
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
|
|
(json.dumps(feedback), pr_number),
|
|
)
|
|
db.audit(
|
|
conn,
|
|
"evaluate",
|
|
"changes_requested",
|
|
json.dumps(
|
|
{"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}
|
|
),
|
|
)
|
|
db.record_review(
|
|
conn, pr_number, "approved-with-changes",
|
|
domain=domain, agent=agent, reviewer="leo",
|
|
reviewer_model="sonnet" if tier == "STANDARD" else "opus",
|
|
notes=(leo_review or domain_review or "")[:4000],
|
|
)
|
|
logger.info(
|
|
"PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)",
|
|
pr_number,
|
|
leo_verdict,
|
|
domain_verdict,
|
|
all_issues,
|
|
)
|
|
|
|
# Disposition: check if this PR should be terminated or kept open
|
|
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
|
|
|
|
# Record cost (only for reviews that actually ran)
|
|
if domain_verdict != "skipped":
|
|
pr_cost += costs.record_usage(
|
|
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
|
|
input_tokens=domain_usage.get("prompt_tokens", 0),
|
|
output_tokens=domain_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
if leo_verdict not in ("skipped",):
|
|
if tier == "DEEP":
|
|
pr_cost += costs.record_usage(
|
|
conn, config.EVAL_LEO_MODEL, "eval_leo",
|
|
input_tokens=leo_usage.get("prompt_tokens", 0),
|
|
output_tokens=leo_usage.get("completion_tokens", 0),
|
|
backend="max",
|
|
)
|
|
else:
|
|
pr_cost += costs.record_usage(
|
|
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
|
|
input_tokens=leo_usage.get("prompt_tokens", 0),
|
|
output_tokens=leo_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
|
|
if pr_cost > 0:
|
|
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
|
|
|
|
return {
|
|
"pr": pr_number,
|
|
"tier": tier,
|
|
"domain": domain,
|
|
"leo_verdict": leo_verdict,
|
|
"domain_verdict": domain_verdict,
|
|
"approved": both_approve,
|
|
}
|
|
|
|
|
|
# ─── Rate limit backoff ───────────────────────────────────────────────────
|
|
|
|
# When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted
|
|
# CLI calls overnight when Opus is exhausted.
|
|
_rate_limit_backoff_until: datetime | None = None
|
|
_RATE_LIMIT_BACKOFF_MINUTES = 15
|
|
|
|
|
|
# ─── Batch domain review ─────────────────────────────────────────────────
|
|
|
|
|
|
async def _run_batch_domain_eval(
|
|
conn, batch_prs: list[dict], domain: str, agent: str,
|
|
) -> tuple[int, int]:
|
|
"""Execute batch domain review for a group of same-domain STANDARD PRs.
|
|
|
|
1. Claim all PRs atomically
|
|
2. Run single batch domain review
|
|
3. Parse + validate fan-out
|
|
4. Post per-PR comments
|
|
5. Continue to individual Leo review for each
|
|
6. Fall back to individual review for any validation failures
|
|
|
|
Returns (succeeded, failed).
|
|
"""
|
|
from .forgejo import get_pr_diff as _get_pr_diff
|
|
|
|
succeeded = 0
|
|
failed = 0
|
|
|
|
# Step 1: Fetch diffs and build batch
|
|
pr_diffs = []
|
|
claimed_prs = []
|
|
for pr_row in batch_prs:
|
|
pr_num = pr_row["number"]
|
|
|
|
# Atomic claim
|
|
if not start_review(conn, pr_num):
|
|
continue
|
|
|
|
# Increment eval_attempts — skip if merge-cycled (Ganymede+Rhea)
|
|
mc_row = conn.execute("SELECT merge_cycled FROM prs WHERE number = ?", (pr_num,)).fetchone()
|
|
if mc_row and mc_row["merge_cycled"]:
|
|
conn.execute(
|
|
"UPDATE prs SET merge_cycled = 0, last_attempt = datetime('now') WHERE number = ?",
|
|
(pr_num,),
|
|
)
|
|
logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_num)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1, "
|
|
"last_attempt = datetime('now') WHERE number = ?",
|
|
(pr_num,),
|
|
)
|
|
|
|
diff = await _get_pr_diff(pr_num)
|
|
if not diff:
|
|
reopen_pr(conn, pr_num)
|
|
continue
|
|
|
|
# Musings bypass
|
|
if _is_musings_only(diff):
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_num}/comments"),
|
|
{"body": "Auto-approved: musings bypass eval per collective policy."},
|
|
)
|
|
approve_pr(conn, pr_num, domain='cross-domain', auto_merge=1,
|
|
leo_verdict='skipped', domain_verdict='skipped')
|
|
succeeded += 1
|
|
continue
|
|
|
|
review_diff, _ = _filter_diff(diff)
|
|
if not review_diff:
|
|
review_diff = diff
|
|
files = _extract_changed_files(diff)
|
|
|
|
# Build label from branch name or first claim filename
|
|
branch = pr_row.get("branch", "")
|
|
label = branch.split("/")[-1][:60] if branch else f"pr-{pr_num}"
|
|
|
|
pr_diffs.append({
|
|
"number": pr_num,
|
|
"label": label,
|
|
"diff": review_diff,
|
|
"files": files,
|
|
"full_diff": diff, # kept for Leo review
|
|
"file_count": len([l for l in files.split("\n") if l.strip()]),
|
|
})
|
|
claimed_prs.append(pr_num)
|
|
|
|
if not pr_diffs:
|
|
return 0, 0
|
|
|
|
# Enforce BATCH_EVAL_MAX_DIFF_BYTES — split if total diff is too large.
|
|
# We only know diff sizes after fetching, so enforce here not in _build_domain_batches.
|
|
total_bytes = sum(len(p["diff"].encode()) for p in pr_diffs)
|
|
if total_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and len(pr_diffs) > 1:
|
|
# Keep PRs up to the byte cap, revert the rest to open for next cycle
|
|
kept = []
|
|
running_bytes = 0
|
|
for p in pr_diffs:
|
|
p_bytes = len(p["diff"].encode())
|
|
if running_bytes + p_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and kept:
|
|
break
|
|
kept.append(p)
|
|
running_bytes += p_bytes
|
|
overflow = [p for p in pr_diffs if p not in kept]
|
|
for p in overflow:
|
|
reopen_pr(conn, p["number"], dec_eval_attempts=True)
|
|
claimed_prs.remove(p["number"])
|
|
logger.info(
|
|
"PR #%d: diff too large for batch (%d bytes total), deferring to next cycle",
|
|
p["number"], total_bytes,
|
|
)
|
|
pr_diffs = kept
|
|
|
|
if not pr_diffs:
|
|
return 0, 0
|
|
|
|
# Detect domain for all PRs (should be same domain)
|
|
conn.execute(
|
|
"UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number IN ({})".format(
|
|
",".join("?" * len(claimed_prs))
|
|
),
|
|
[domain, agent] + claimed_prs,
|
|
)
|
|
|
|
# Step 2: Run batch domain review
|
|
logger.info(
|
|
"Batch domain review: %d PRs in %s domain (PRs: %s)",
|
|
len(pr_diffs),
|
|
domain,
|
|
", ".join(f"#{p['number']}" for p in pr_diffs),
|
|
)
|
|
batch_response, batch_domain_usage = await run_batch_domain_review(pr_diffs, domain, agent)
|
|
|
|
if batch_response is None:
|
|
# Complete failure — revert all to open
|
|
logger.warning("Batch domain review failed — reverting all PRs to open")
|
|
for pr_num in claimed_prs:
|
|
reopen_pr(conn, pr_num)
|
|
return 0, len(claimed_prs)
|
|
|
|
# Step 3: Parse + validate fan-out
|
|
parsed = _parse_batch_response(batch_response, claimed_prs, agent)
|
|
valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent)
|
|
|
|
db.audit(
|
|
conn, "evaluate", "batch_domain_review",
|
|
json.dumps({
|
|
"domain": domain,
|
|
"batch_size": len(pr_diffs),
|
|
"valid": len(valid_reviews),
|
|
"fallback": fallback_prs,
|
|
}),
|
|
)
|
|
|
|
# Record batch domain review cost ONCE for the whole batch (not per-PR)
|
|
from . import costs
|
|
costs.record_usage(
|
|
conn, config.EVAL_DOMAIN_MODEL, "eval_domain",
|
|
input_tokens=batch_domain_usage.get("prompt_tokens", 0),
|
|
output_tokens=batch_domain_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
|
|
# Step 4: Process valid reviews — post comments + continue to Leo
|
|
for pr_data in pr_diffs:
|
|
pr_num = pr_data["number"]
|
|
|
|
if pr_num in fallback_prs:
|
|
# Revert — will be picked up by individual eval next cycle
|
|
reopen_pr(conn, pr_num, dec_eval_attempts=True)
|
|
logger.info("PR #%d: batch fallback — will retry individually", pr_num)
|
|
continue
|
|
|
|
if pr_num not in valid_reviews:
|
|
# Should not happen, but safety
|
|
reopen_pr(conn, pr_num)
|
|
continue
|
|
|
|
review_text = valid_reviews[pr_num]
|
|
domain_verdict = _parse_verdict(review_text, agent)
|
|
|
|
# Post domain review comment
|
|
agent_tok = get_agent_token(agent)
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_num}/comments"),
|
|
{"body": review_text},
|
|
token=agent_tok,
|
|
)
|
|
|
|
conn.execute(
|
|
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
|
|
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num),
|
|
)
|
|
|
|
# If domain rejects, handle disposition (same as individual path)
|
|
if domain_verdict == "request_changes":
|
|
domain_issues = _parse_issues(review_text)
|
|
eval_attempts = (conn.execute(
|
|
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
|
|
).fetchone()["eval_attempts"] or 0)
|
|
|
|
reopen_pr(conn, pr_num, leo_verdict='skipped',
|
|
last_error='domain review requested changes',
|
|
eval_issues=json.dumps(domain_issues))
|
|
db.audit(
|
|
conn, "evaluate", "domain_rejected",
|
|
json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}),
|
|
)
|
|
await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues)
|
|
succeeded += 1
|
|
continue
|
|
|
|
# Domain approved — continue to individual Leo review
|
|
logger.info("PR #%d: batch domain approved, proceeding to individual Leo review", pr_num)
|
|
|
|
review_diff = pr_data["diff"]
|
|
files = pr_data["files"]
|
|
|
|
leo_review, leo_usage = await run_leo_review(review_diff, files, "STANDARD")
|
|
|
|
if leo_review is None:
|
|
reopen_pr(conn, pr_num)
|
|
logger.debug("PR #%d: Leo review failed, will retry next cycle", pr_num)
|
|
continue
|
|
|
|
if leo_review == "RATE_LIMITED":
|
|
reopen_pr(conn, pr_num)
|
|
logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num)
|
|
continue
|
|
|
|
leo_verdict = _parse_verdict(leo_review, "LEO")
|
|
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num))
|
|
|
|
# Post Leo review
|
|
leo_tok = get_agent_token("Leo")
|
|
await forgejo_api(
|
|
"POST",
|
|
repo_path(f"issues/{pr_num}/comments"),
|
|
{"body": leo_review},
|
|
token=leo_tok,
|
|
)
|
|
|
|
costs.record_usage(
|
|
conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo",
|
|
input_tokens=leo_usage.get("prompt_tokens", 0),
|
|
output_tokens=leo_usage.get("completion_tokens", 0),
|
|
backend="openrouter",
|
|
)
|
|
|
|
# Final verdict
|
|
both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped")
|
|
|
|
if both_approve:
|
|
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}"))
|
|
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
|
|
await _post_formal_approvals(pr_num, pr_author)
|
|
approve_pr(conn, pr_num, domain=domain or 'cross-domain', auto_merge=1)
|
|
db.audit(
|
|
conn, "evaluate", "approved",
|
|
json.dumps({"pr": pr_num, "tier": "STANDARD", "domain": domain,
|
|
"leo": leo_verdict, "domain_agent": agent, "batch": True}),
|
|
)
|
|
logger.info("PR #%d: APPROVED (batch domain + individual Leo)", pr_num)
|
|
else:
|
|
all_issues = []
|
|
if leo_verdict == "request_changes":
|
|
all_issues.extend(_parse_issues(leo_review))
|
|
reopen_pr(conn, pr_num, eval_issues=json.dumps(all_issues))
|
|
feedback = {"leo": leo_verdict, "domain": domain_verdict,
|
|
"tier": "STANDARD", "issues": all_issues}
|
|
conn.execute(
|
|
"UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)",
|
|
(json.dumps(feedback), pr_num),
|
|
)
|
|
db.audit(
|
|
conn, "evaluate", "changes_requested",
|
|
json.dumps({"pr": pr_num, "tier": "STANDARD", "leo": leo_verdict,
|
|
"domain": domain_verdict, "issues": all_issues, "batch": True}),
|
|
)
|
|
eval_attempts = (conn.execute(
|
|
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
|
|
).fetchone()["eval_attempts"] or 0)
|
|
await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues)
|
|
|
|
succeeded += 1
|
|
|
|
return succeeded, failed
|
|
|
|
|
|
def _build_domain_batches(
|
|
rows: list, conn,
|
|
) -> tuple[dict[str, list[dict]], list[dict]]:
|
|
"""Group STANDARD PRs by domain for batch eval. DEEP and LIGHT stay individual.
|
|
|
|
Returns (batches_by_domain, individual_prs).
|
|
Respects BATCH_EVAL_MAX_PRS and BATCH_EVAL_MAX_DIFF_BYTES.
|
|
"""
|
|
domain_candidates: dict[str, list[dict]] = {}
|
|
individual: list[dict] = []
|
|
|
|
for row in rows:
|
|
pr_num = row["number"]
|
|
tier = row["tier"]
|
|
|
|
# Only batch STANDARD PRs with pending domain review
|
|
if tier != "STANDARD":
|
|
individual.append(row)
|
|
continue
|
|
|
|
# Check if domain review already done (resuming after Leo rate limit)
|
|
existing = conn.execute(
|
|
"SELECT domain_verdict, domain FROM prs WHERE number = ?", (pr_num,)
|
|
).fetchone()
|
|
if existing and existing["domain_verdict"] not in ("pending", None):
|
|
individual.append(row)
|
|
continue
|
|
|
|
domain = existing["domain"] if existing and existing["domain"] and existing["domain"] != "general" else "general"
|
|
domain_candidates.setdefault(domain, []).append(row)
|
|
|
|
# Build sized batches per domain
|
|
batches: dict[str, list[dict]] = {}
|
|
for domain, prs in domain_candidates.items():
|
|
if len(prs) == 1:
|
|
# Single PR — no batching benefit, process individually
|
|
individual.extend(prs)
|
|
continue
|
|
# Cap at BATCH_EVAL_MAX_PRS
|
|
batch = prs[: config.BATCH_EVAL_MAX_PRS]
|
|
batches[domain] = batch
|
|
# Overflow goes individual
|
|
individual.extend(prs[config.BATCH_EVAL_MAX_PRS :])
|
|
|
|
return batches, individual
|
|
|
|
|
|
# ─── Main entry point ──────────────────────────────────────────────────────
|
|
|
|
|
|
async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
|
|
"""Run one evaluation cycle.
|
|
|
|
Groups eligible STANDARD PRs by domain for batch domain review.
|
|
DEEP PRs get individual eval. LIGHT PRs get auto-approved.
|
|
Leo review always individual (safety net for batch cross-contamination).
|
|
"""
|
|
global _rate_limit_backoff_until
|
|
|
|
# Check if we're in Opus rate-limit backoff
|
|
opus_backoff = False
|
|
if _rate_limit_backoff_until is not None:
|
|
now = datetime.now(timezone.utc)
|
|
if now < _rate_limit_backoff_until:
|
|
remaining = int((_rate_limit_backoff_until - now).total_seconds())
|
|
logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining)
|
|
opus_backoff = True
|
|
else:
|
|
logger.info("Rate limit backoff expired, resuming full eval cycles")
|
|
_rate_limit_backoff_until = None
|
|
|
|
# Find PRs ready for evaluation
|
|
if opus_backoff:
|
|
verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))"
|
|
else:
|
|
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
|
|
|
|
# Stagger removed — migration protection no longer needed. Merge is domain-serialized
|
|
# and entity conflicts auto-resolve. Safe to let all eligible PRs enter eval. (Cory, Mar 14)
|
|
|
|
rows = conn.execute(
|
|
f"""SELECT p.number, p.tier, p.branch, p.domain FROM prs p
|
|
LEFT JOIN sources s ON p.source_path = s.path
|
|
WHERE p.status = 'open'
|
|
AND p.tier0_pass = 1
|
|
AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS}
|
|
{verdict_filter}
|
|
AND (p.last_attempt IS NULL
|
|
OR p.last_attempt < datetime('now', '-10 minutes'))
|
|
ORDER BY
|
|
CASE WHEN COALESCE(p.eval_attempts, 0) = 0 THEN 0 ELSE 1 END,
|
|
CASE COALESCE(p.priority, s.priority, 'medium')
|
|
WHEN 'critical' THEN 0
|
|
WHEN 'high' THEN 1
|
|
WHEN 'medium' THEN 2
|
|
WHEN 'low' THEN 3
|
|
ELSE 4
|
|
END,
|
|
p.created_at ASC
|
|
LIMIT ?""",
|
|
(max_workers or config.MAX_EVAL_WORKERS,),
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
return 0, 0
|
|
|
|
succeeded = 0
|
|
failed = 0
|
|
|
|
# Group STANDARD PRs by domain for batch eval
|
|
domain_batches, individual_prs = _build_domain_batches(rows, conn)
|
|
|
|
# Process batch domain reviews first
|
|
for domain, batch_prs in domain_batches.items():
|
|
try:
|
|
agent = agent_for_domain(domain)
|
|
b_succeeded, b_failed = await _run_batch_domain_eval(
|
|
conn, batch_prs, domain, agent,
|
|
)
|
|
succeeded += b_succeeded
|
|
failed += b_failed
|
|
except Exception:
|
|
logger.exception("Batch eval failed for domain %s", domain)
|
|
# Revert all to open
|
|
for pr_row in batch_prs:
|
|
reopen_pr(conn, pr_row["number"])
|
|
failed += len(batch_prs)
|
|
|
|
# Process individual PRs (DEEP, LIGHT, single-domain, fallback)
|
|
for row in individual_prs:
|
|
try:
|
|
if opus_backoff and row["tier"] == "DEEP":
|
|
existing = conn.execute(
|
|
"SELECT domain_verdict FROM prs WHERE number = ?",
|
|
(row["number"],),
|
|
).fetchone()
|
|
if existing and existing["domain_verdict"] not in ("pending", None):
|
|
logger.debug(
|
|
"PR #%d: skipping DEEP during Opus backoff (domain already %s)",
|
|
row["number"],
|
|
existing["domain_verdict"],
|
|
)
|
|
continue
|
|
|
|
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
|
|
if result.get("skipped"):
|
|
reason = result.get("reason", "")
|
|
logger.debug("PR #%d skipped: %s", row["number"], reason)
|
|
if "rate_limited" in reason:
|
|
from datetime import timedelta
|
|
|
|
if reason == "opus_rate_limited":
|
|
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
|
|
minutes=_RATE_LIMIT_BACKOFF_MINUTES
|
|
)
|
|
opus_backoff = True
|
|
logger.info(
|
|
"Opus rate limited — backing off Opus for %d min, continuing triage+domain",
|
|
_RATE_LIMIT_BACKOFF_MINUTES,
|
|
)
|
|
continue
|
|
else:
|
|
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
|
|
minutes=_RATE_LIMIT_BACKOFF_MINUTES
|
|
)
|
|
logger.info(
|
|
"Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES
|
|
)
|
|
break
|
|
else:
|
|
succeeded += 1
|
|
except Exception:
|
|
logger.exception("Failed to evaluate PR #%d", row["number"])
|
|
failed += 1
|
|
reopen_pr(conn, row["number"])
|
|
|
|
if succeeded or failed:
|
|
logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed)
|
|
|
|
return succeeded, failed
|