"""Evaluate stage — PR lifecycle orchestration. Tier-based review routing. Model diversity: GPT-4o (domain) + Sonnet (Leo STANDARD) + Opus (Leo DEEP) = two model families, no correlated blind spots. Flow per PR: 1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT 2. Tier overrides: a. Claim-shape detector: type: claim in YAML → STANDARD min (Theseus) b. Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio) 3. Domain review → GPT-4o (OpenRouter) — skipped for LIGHT when LIGHT_SKIP_LLM=True 4. Leo review → Opus DEEP / Sonnet STANDARD (OpenRouter) — skipped for LIGHT 5. Post reviews, submit formal Forgejo approvals, update SQLite 6. If both approve → status = 'approved' (merge module picks it up) 7. Retry budget: 3 attempts max, disposition on attempt 2+ Design reviewed by Ganymede, Rio, Theseus, Rhea, Leo. LLM transport and prompts extracted to lib/llm.py (Phase 3c). """ import json import logging import random import re from datetime import datetime, timezone from . import config, db from .domains import agent_for_domain, detect_domain_from_diff from .forgejo import api as forgejo_api from .forgejo import get_agent_token, get_pr_diff, repo_path from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr from .feedback import format_rejection_comment from .validate import load_existing_claims logger = logging.getLogger("pipeline.evaluate") # ─── Diff helpers ────────────────────────────────────────────────────────── def _filter_diff(diff: str) -> tuple[str, str]: """Filter diff to only review-relevant files. Returns (review_diff, entity_diff). Strips: inbox/, schemas/, skills/, agents/*/musings/ """ sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE) skip_patterns = [r"^diff --git a/(inbox/(archive|queue|null-result)|schemas|skills|agents/[^/]+/musings)/"] core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"} claim_sections = [] entity_sections = [] for section in sections: if not section.strip(): continue if any(re.match(p, section) for p in skip_patterns): continue entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section) if entity_match and entity_match.group(1) not in core_domains: entity_sections.append(section) continue claim_sections.append(section) return "".join(claim_sections), "".join(entity_sections) def _extract_changed_files(diff: str) -> str: """Extract changed file paths from diff.""" return "\n".join( line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git") ) def _is_musings_only(diff: str) -> bool: """Check if PR only modifies musing files.""" has_musings = False has_other = False for line in diff.split("\n"): if line.startswith("diff --git"): if "agents/" in line and "/musings/" in line: has_musings = True else: has_other = True return has_musings and not has_other # ─── NOTE: Tier 0.5 mechanical pre-check moved to validate.py ──────────── # Tier 0.5 now runs as part of the validate stage (before eval), not inside # evaluate_pr(). This prevents wasting eval_attempts on mechanically fixable # PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed. # ─── Tier overrides ─────────────────────────────────────────────────────── def _diff_contains_claim_type(diff: str) -> bool: """Claim-shape detector: check if any file in diff has type: claim in frontmatter. Mechanical check ($0). If YAML declares type: claim, this is a factual claim — not an entity update or formatting fix. Must be classified STANDARD minimum regardless of Haiku triage. Catches factual claims disguised as LIGHT content. (Theseus: converts semantic problem to mechanical check) """ for line in diff.split("\n"): if line.startswith("+") and not line.startswith("+++"): stripped = line[1:].strip() if stripped in ("type: claim", 'type: "claim"', "type: 'claim'"): return True return False def _deterministic_tier(diff: str) -> str | None: """Deterministic tier routing — skip Haiku triage for obvious cases. Checks diff file patterns before calling the LLM. Returns tier string if deterministic, None if Haiku triage is needed. Rules (Leo-calibrated): - All files in entities/ only → LIGHT - All files in inbox/ only (queue, archive, null-result) → LIGHT - Any file in core/ or foundations/ → DEEP (structural KB changes) - Has challenged_by field → DEEP (challenges existing claims) - Modifies existing file (not new) in domains/ → DEEP (enrichment/change) - Otherwise → None (needs Haiku triage) NOTE: Cross-domain wiki links are NOT a DEEP signal — most claims link across domains, that's the whole point of the knowledge graph (Leo). """ changed_files = [] for line in diff.split("\n"): if line.startswith("diff --git a/"): path = line.replace("diff --git a/", "").split(" b/")[0] changed_files.append(path) if not changed_files: return None # All entities/ only → LIGHT if all(f.startswith("entities/") for f in changed_files): logger.info("Deterministic tier: LIGHT (all files in entities/)") return "LIGHT" # All inbox/ only (queue, archive, null-result) → LIGHT if all(f.startswith("inbox/") for f in changed_files): logger.info("Deterministic tier: LIGHT (all files in inbox/)") return "LIGHT" # Any file in core/ or foundations/ → DEEP (structural KB changes) if any(f.startswith("core/") or f.startswith("foundations/") for f in changed_files): logger.info("Deterministic tier: DEEP (touches core/ or foundations/)") return "DEEP" # Check diff content for DEEP signals has_challenged_by = False has_modified_claim = False new_files: set[str] = set() lines = diff.split("\n") for i, line in enumerate(lines): # Detect new files if line.startswith("--- /dev/null") and i + 1 < len(lines) and lines[i + 1].startswith("+++ b/"): new_files.add(lines[i + 1][6:]) # Check for challenged_by field if line.startswith("+") and not line.startswith("+++"): stripped = line[1:].strip() if stripped.startswith("challenged_by:"): has_challenged_by = True if has_challenged_by: logger.info("Deterministic tier: DEEP (has challenged_by field)") return "DEEP" # NOTE: Modified existing domain claims are NOT auto-DEEP — enrichments # (appending evidence) are common and should be STANDARD. Let Haiku triage # distinguish enrichments from structural changes. return None # ─── Verdict parsing ────────────────────────────────────────────────────── def _parse_verdict(review_text: str, reviewer: str) -> str: """Parse VERDICT tag from review. Returns 'approve' or 'request_changes'.""" upper = reviewer.upper() if f"VERDICT:{upper}:APPROVE" in review_text: return "approve" elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text: return "request_changes" else: logger.warning("No parseable verdict from %s — treating as request_changes", reviewer) return "request_changes" # Map model-invented tags to valid tags. Models consistently ignore the valid # tag list and invent their own. This normalizes them. (Ganymede, Mar 14) _TAG_ALIASES: dict[str, str] = { "schema_violation": "frontmatter_schema", "missing_schema_fields": "frontmatter_schema", "missing_schema": "frontmatter_schema", "schema": "frontmatter_schema", "missing_frontmatter": "frontmatter_schema", "redundancy": "near_duplicate", "duplicate": "near_duplicate", "missing_confidence": "confidence_miscalibration", "confidence_error": "confidence_miscalibration", "vague_claims": "scope_error", "unfalsifiable": "scope_error", "unverified_wiki_links": "broken_wiki_links", "unverified-wiki-links": "broken_wiki_links", "missing_wiki_links": "broken_wiki_links", "invalid_wiki_links": "broken_wiki_links", "wiki_link_errors": "broken_wiki_links", "overclaiming": "title_overclaims", "title_overclaim": "title_overclaims", "date_error": "date_errors", "factual_error": "factual_discrepancy", "factual_inaccuracy": "factual_discrepancy", } VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims", "confidence_miscalibration", "date_errors", "factual_discrepancy", "near_duplicate", "scope_error"} def _normalize_tag(tag: str) -> str | None: """Normalize a model-generated tag to a valid tag, or None if unrecognizable.""" tag = tag.strip().lower().replace("-", "_") if tag in VALID_ISSUE_TAGS: return tag if tag in _TAG_ALIASES: return _TAG_ALIASES[tag] # Fuzzy: check if any valid tag is a substring or vice versa for valid in VALID_ISSUE_TAGS: if valid in tag or tag in valid: return valid return None def _parse_issues(review_text: str) -> list[str]: """Extract issue tags from review. First tries structured comment with tag normalization. Falls back to keyword inference from prose. """ match = re.search(r"", review_text) if match: raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()] normalized = [] for tag in raw_tags: norm = _normalize_tag(tag) if norm and norm not in normalized: normalized.append(norm) else: logger.debug("Unrecognized issue tag '%s' — dropped", tag) if normalized: return normalized # Fallback: infer tags from review prose return _infer_issues_from_prose(review_text) # Keyword patterns for inferring issue tags from unstructured review prose. # Conservative: only match unambiguous indicators. Order doesn't matter. _PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = { "frontmatter_schema": [ re.compile(r"frontmatter", re.IGNORECASE), re.compile(r"missing.{0,20}(type|domain|confidence|source|created)\b", re.IGNORECASE), re.compile(r"yaml.{0,10}(invalid|missing|error|schema)", re.IGNORECASE), re.compile(r"required field", re.IGNORECASE), re.compile(r"lacks?.{0,15}(required|yaml|schema|fields)", re.IGNORECASE), re.compile(r"missing.{0,15}(schema|fields|frontmatter)", re.IGNORECASE), re.compile(r"schema.{0,10}(compliance|violation|missing|invalid)", re.IGNORECASE), ], "broken_wiki_links": [ re.compile(r"(broken|dead|invalid).{0,10}(wiki.?)?link", re.IGNORECASE), re.compile(r"wiki.?link.{0,20}(not found|missing|broken|invalid|resolv|unverif)", re.IGNORECASE), re.compile(r"\[\[.{1,80}\]\].{0,20}(not found|doesn.t exist|missing)", re.IGNORECASE), re.compile(r"unverified.{0,10}(wiki|link)", re.IGNORECASE), ], "factual_discrepancy": [ re.compile(r"factual.{0,10}(error|inaccura|discrepanc|incorrect)", re.IGNORECASE), re.compile(r"misrepresent", re.IGNORECASE), ], "confidence_miscalibration": [ re.compile(r"confidence.{0,20}(too high|too low|miscalibrat|overstat|should be)", re.IGNORECASE), re.compile(r"(overstat|understat).{0,20}confidence", re.IGNORECASE), ], "scope_error": [ re.compile(r"scope.{0,10}(error|too broad|overscop|unscoped)", re.IGNORECASE), re.compile(r"unscoped.{0,10}(universal|claim)", re.IGNORECASE), re.compile(r"(vague|unfalsifiable).{0,15}(claim|assertion)", re.IGNORECASE), re.compile(r"not.{0,10}(specific|falsifiable|disagreeable).{0,10}enough", re.IGNORECASE), ], "title_overclaims": [ re.compile(r"title.{0,20}(overclaim|overstat|too broad)", re.IGNORECASE), re.compile(r"overclaim", re.IGNORECASE), ], "near_duplicate": [ re.compile(r"near.?duplicate", re.IGNORECASE), re.compile(r"(very|too) similar.{0,20}(claim|title|existing)", re.IGNORECASE), re.compile(r"duplicate.{0,20}(of|claim|title|existing|information)", re.IGNORECASE), re.compile(r"redundan", re.IGNORECASE), ], } def _infer_issues_from_prose(review_text: str) -> list[str]: """Infer issue tags from unstructured review text via keyword matching. Fallback for reviews that reject without structured tags. Conservative: requires at least one unambiguous keyword match per tag. """ inferred = [] for tag, patterns in _PROSE_TAG_PATTERNS.items(): if any(p.search(review_text) for p in patterns): inferred.append(tag) return inferred async def _post_formal_approvals(pr_number: int, pr_author: str): """Submit formal Forgejo reviews from 2 agents (not the PR author).""" approvals = 0 for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: if agent_name == pr_author: continue if approvals >= 2: break token = get_agent_token(agent_name) if token: result = await forgejo_api( "POST", repo_path(f"pulls/{pr_number}/reviews"), {"body": "Approved.", "event": "APPROVED"}, token=token, ) if result is not None: approvals += 1 logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) # ─── Retry budget helpers ───────────────────────────────────────────────── async def _terminate_pr(conn, pr_number: int, reason: str): """Terminal state: close PR on Forgejo, mark source needs_human.""" # Get issue tags for structured feedback row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone() issues = [] if row and row["eval_issues"]: try: issues = json.loads(row["eval_issues"]) except (json.JSONDecodeError, TypeError): pass # Post structured rejection comment with quality gate guidance (Epimetheus) if issues: feedback_body = format_rejection_comment(issues, source="eval_terminal") comment_body = ( f"**Closed by eval pipeline** — {reason}.\n\n" f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " f"Source will be re-queued with feedback.\n\n" f"{feedback_body}" ) else: comment_body = ( f"**Closed by eval pipeline** — {reason}.\n\n" f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " f"Source will be re-queued with feedback." ) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": comment_body}, ) await forgejo_api( "PATCH", repo_path(f"pulls/{pr_number}"), {"state": "closed"}, ) # Update PR status conn.execute( "UPDATE prs SET status = 'closed', last_error = ? WHERE number = ?", (reason, pr_number), ) # Tag source for re-extraction with feedback cursor = conn.execute( """UPDATE sources SET status = 'needs_reextraction', updated_at = datetime('now') WHERE path = (SELECT source_path FROM prs WHERE number = ?)""", (pr_number,), ) if cursor.rowcount == 0: logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number) db.audit( conn, "evaluate", "pr_terminated", json.dumps( { "pr": pr_number, "reason": reason, } ), ) logger.info("PR #%d: TERMINATED — %s", pr_number, reason) def _classify_issues(issues: list[str]) -> str: """Classify issue tags as 'mechanical', 'substantive', or 'mixed'.""" if not issues: return "unknown" mechanical = set(issues) & config.MECHANICAL_ISSUE_TAGS substantive = set(issues) & config.SUBSTANTIVE_ISSUE_TAGS if substantive and not mechanical: return "substantive" if mechanical and not substantive: return "mechanical" if mechanical and substantive: return "mixed" return "unknown" # tags not in either set async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]): """Disposition logic for rejected PRs on attempt 2+. Attempt 1: normal — back to open, wait for fix. Attempt 2: check issue classification. - Mechanical only: keep open for one more attempt (auto-fix future). - Substantive or mixed: close PR, requeue source. Attempt 3+: terminal. """ if eval_attempts < 2: # Attempt 1: post structured feedback so agent learns, but don't close if all_issues: feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1") await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": feedback_body}, ) return classification = _classify_issues(all_issues) if eval_attempts >= config.MAX_EVAL_ATTEMPTS: # Terminal await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts") return if classification == "mechanical": # Mechanical issues only — keep open for one more attempt. # Future: auto-fix module will push fixes here. logger.info( "PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt", pr_number, eval_attempts, all_issues, ) db.audit( conn, "evaluate", "mechanical_retry", json.dumps( { "pr": pr_number, "attempt": eval_attempts, "issues": all_issues, } ), ) else: # Substantive, mixed, or unknown — close and requeue logger.info( "PR #%d: attempt %d, %s issues (%s) — closing and requeuing source", pr_number, eval_attempts, classification, all_issues, ) await _terminate_pr( conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}" ) # ─── Single PR evaluation ───────────────────────────────────────────────── async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" # Check eval attempt budget before claiming row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone() eval_attempts = (row["eval_attempts"] or 0) if row else 0 if eval_attempts >= config.MAX_EVAL_ATTEMPTS: # Terminal — hard cap reached. Close PR, tag source. logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS) await _terminate_pr(conn, pr_number, "eval budget exhausted") return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"} # Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11) cursor = conn.execute( "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", (pr_number,), ) if cursor.rowcount == 0: logger.debug("PR #%d already claimed by another worker, skipping", pr_number) return {"pr": pr_number, "skipped": True, "reason": "already_claimed"} # Increment eval_attempts — but not if this is a merge-failure re-entry (Ganymede+Rhea) merge_cycled = conn.execute( "SELECT merge_cycled FROM prs WHERE number = ?", (pr_number,) ).fetchone() if merge_cycled and merge_cycled["merge_cycled"]: # Merge cycling — don't burn eval budget, clear flag conn.execute("UPDATE prs SET merge_cycled = 0 WHERE number = ?", (pr_number,)) logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_number) else: conn.execute( "UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1 WHERE number = ?", (pr_number,), ) eval_attempts += 1 # Fetch diff diff = await get_pr_diff(pr_number) if not diff: # Close PRs with no diff — stale branch, nothing to evaluate conn.execute("UPDATE prs SET status='closed', last_error='closed: no diff against main (stale branch)' WHERE number = ?", (pr_number,)) return {"pr": pr_number, "skipped": True, "reason": "no_diff_closed"} # Musings bypass if _is_musings_only(diff): logger.info("PR #%d is musings-only — auto-approving", pr_number) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": "Auto-approved: musings bypass eval per collective policy."}, ) conn.execute( """UPDATE prs SET status = 'approved', leo_verdict = 'skipped', domain_verdict = 'skipped' WHERE number = ?""", (pr_number,), ) return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"} # NOTE: Tier 0.5 mechanical checks now run in validate stage (before eval). # tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here. # Filter diff review_diff, _entity_diff = _filter_diff(diff) if not review_diff: review_diff = diff files = _extract_changed_files(diff) # Detect domain domain = detect_domain_from_diff(diff) agent = agent_for_domain(domain) # Default NULL domain to 'general' (archive-only PRs have no domain files) if domain is None: domain = "general" # Update PR domain if not set conn.execute( "UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?", (domain, agent, pr_number), ) # Step 1: Triage (if not already triaged) # Try deterministic routing first ($0), fall back to Haiku triage ($0.001) if tier is None: tier = _deterministic_tier(diff) if tier is not None: db.audit( conn, "evaluate", "deterministic_tier", json.dumps({"pr": pr_number, "tier": tier}), ) else: tier, triage_usage = await triage_pr(diff) # Record triage cost from . import costs costs.record_usage( conn, config.TRIAGE_MODEL, "eval_triage", input_tokens=triage_usage.get("prompt_tokens", 0), output_tokens=triage_usage.get("completion_tokens", 0), backend="openrouter", ) # Tier overrides (claim-shape detector + random promotion) # Order matters: claim-shape catches obvious cases, random promotion catches the rest. # Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus) if tier == "LIGHT" and _diff_contains_claim_type(diff): tier = "STANDARD" logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number) db.audit( conn, "evaluate", "claim_shape_upgrade", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"}) ) # Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio) if tier == "LIGHT" and random.random() < config.LIGHT_PROMOTION_RATE: tier = "STANDARD" logger.info( "PR #%d: random promotion LIGHT → STANDARD (%.0f%% rate)", pr_number, config.LIGHT_PROMOTION_RATE * 100 ) db.audit(conn, "evaluate", "random_promotion", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"})) conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) # Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above) conn.execute( "UPDATE prs SET last_attempt = datetime('now') WHERE number = ?", (pr_number,), ) # Check if domain review already completed (resuming after Leo rate limit) existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone() existing_domain_verdict = existing["domain_verdict"] if existing else "pending" _existing_leo_verdict = existing["leo_verdict"] if existing else "pending" # Step 2: Domain review (GPT-4o via OpenRouter) # LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback) # Skip if already completed from a previous attempt domain_review = None # Initialize — used later for feedback extraction (Ganymede #12) domain_usage = {"prompt_tokens": 0, "completion_tokens": 0} leo_usage = {"prompt_tokens": 0, "completion_tokens": 0} if tier == "LIGHT" and config.LIGHT_SKIP_LLM: domain_verdict = "skipped" logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number) conn.execute( "UPDATE prs SET domain_verdict = 'skipped', domain_model = 'none' WHERE number = ?", (pr_number,), ) elif existing_domain_verdict not in ("pending", None): domain_verdict = existing_domain_verdict logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) else: logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier) domain_review, domain_usage = await run_domain_review(review_diff, files, domain or "general", agent) if domain_review is None: # OpenRouter failure (timeout, error) — revert to open for retry. # NOT a rate limit — don't trigger 15-min backoff, just skip this PR. conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"} domain_verdict = _parse_verdict(domain_review, agent) conn.execute( "UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?", (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number), ) # Post domain review as comment (from agent's Forgejo account) agent_tok = get_agent_token(agent) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": domain_review}, token=agent_tok, ) # If domain review rejects, skip Leo review (save Opus) if domain_verdict == "request_changes": logger.info("PR #%d: domain rejected, skipping Leo review", pr_number) domain_issues = _parse_issues(domain_review) if domain_review else [] conn.execute( """UPDATE prs SET status = 'open', leo_verdict = 'skipped', last_error = 'domain review requested changes', eval_issues = ? WHERE number = ?""", (json.dumps(domain_issues), pr_number), ) db.audit( conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues}) ) # Record structured review outcome claim_files = [f for f in files if any(f.startswith(d) for d in ("domains/", "core/", "foundations/", "decisions/"))] db.record_review( conn, pr_number, reviewer=agent, outcome="rejected", domain=domain, agent=agent, reviewer_model=config.EVAL_DOMAIN_MODEL, rejection_reason=None, # TODO: parse from domain_issues when Leo starts tagging notes=json.dumps(domain_issues) if domain_issues else None, claims_in_batch=max(len(claim_files), 1), ) # Disposition: check if this PR should be terminated or kept open await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) return { "pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped", "eval_attempts": eval_attempts, } # Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT) leo_verdict = "skipped" leo_review = None # Initialize — used later for issue extraction if tier != "LIGHT": logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier) leo_review, leo_usage = await run_leo_review(review_diff, files, tier) if leo_review is None: # DEEP: Opus rate limited (queue for later). STANDARD: OpenRouter failed (skip, retry next cycle). conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed" return {"pr": pr_number, "skipped": True, "reason": reason} leo_verdict = _parse_verdict(leo_review, "LEO") conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number)) # Post Leo review as comment (from Leo's Forgejo account) leo_tok = get_agent_token("Leo") await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": leo_review}, token=leo_tok, ) else: # LIGHT tier: Leo is auto-skipped, domain verdict is the only gate conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,)) # Step 4: Determine final verdict # "skipped" counts as approve (LIGHT skips both reviews deliberately) both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped") if both_approve: # Get PR author for formal approvals pr_info = await forgejo_api( "GET", repo_path(f"pulls/{pr_number}"), ) pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" # Submit formal Forgejo reviews (required for merge) await _post_formal_approvals(pr_number, pr_author) conn.execute( "UPDATE prs SET status = 'approved' WHERE number = ?", (pr_number,), ) db.audit( conn, "evaluate", "approved", json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent}), ) logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict) # Record structured review outcome claim_files = [f for f in files if any(f.startswith(d) for d in ("domains/", "core/", "foundations/", "decisions/"))] db.record_review( conn, pr_number, reviewer="leo", outcome="approved", domain=domain, agent=agent, reviewer_model=config.MODEL_SONNET if tier == "STANDARD" else "opus", claims_in_batch=max(len(claim_files), 1), ) else: # Collect all issue tags from both reviews all_issues = [] if domain_verdict == "request_changes" and domain_review is not None: all_issues.extend(_parse_issues(domain_review)) if leo_verdict == "request_changes" and leo_review is not None: all_issues.extend(_parse_issues(leo_review)) conn.execute( "UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?", (json.dumps(all_issues), pr_number), ) # Store feedback for re-extraction path feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier, "issues": all_issues} conn.execute( "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", (json.dumps(feedback), pr_number), ) db.audit( conn, "evaluate", "changes_requested", json.dumps( {"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues} ), ) # Record structured review outcome for Leo rejection claim_files = [f for f in files if any(f.startswith(d) for d in ("domains/", "core/", "foundations/", "decisions/"))] reviewer = "leo" if leo_verdict == "request_changes" else agent db.record_review( conn, pr_number, reviewer=reviewer, outcome="rejected", domain=domain, agent=agent, reviewer_model=config.MODEL_SONNET if tier == "STANDARD" else "opus", notes=json.dumps(all_issues) if all_issues else None, claims_in_batch=max(len(claim_files), 1), ) logger.info( "PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", pr_number, leo_verdict, domain_verdict, all_issues, ) # Disposition: check if this PR should be terminated or kept open await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues) # Record cost (only for reviews that actually ran) from . import costs if domain_verdict != "skipped": costs.record_usage( conn, config.EVAL_DOMAIN_MODEL, "eval_domain", input_tokens=domain_usage.get("prompt_tokens", 0), output_tokens=domain_usage.get("completion_tokens", 0), backend="openrouter", ) if leo_verdict not in ("skipped",): if tier == "DEEP": costs.record_usage( conn, config.EVAL_LEO_MODEL, "eval_leo", input_tokens=leo_usage.get("prompt_tokens", 0), output_tokens=leo_usage.get("completion_tokens", 0), backend="max", duration_ms=leo_usage.get("duration_ms", 0), cache_read_tokens=leo_usage.get("cache_read_tokens", 0), cache_write_tokens=leo_usage.get("cache_write_tokens", 0), cost_estimate_usd=leo_usage.get("cost_estimate_usd", 0.0), ) else: costs.record_usage( conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", input_tokens=leo_usage.get("prompt_tokens", 0), output_tokens=leo_usage.get("completion_tokens", 0), backend="openrouter", ) return { "pr": pr_number, "tier": tier, "domain": domain, "leo_verdict": leo_verdict, "domain_verdict": domain_verdict, "approved": both_approve, } # ─── Rate limit backoff ─────────────────────────────────────────────────── # When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted # CLI calls overnight when Opus is exhausted. _rate_limit_backoff_until: datetime | None = None _RATE_LIMIT_BACKOFF_MINUTES = 15 # ─── Batch domain review ───────────────────────────────────────────────── def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]: """Parse batched domain review into per-PR review sections. Returns {pr_number: review_text} for each PR found in the response. Missing PRs are omitted — caller handles fallback. """ agent_upper = agent.upper() result: dict[int, str] = {} # Split by PR verdict markers: # Each marker terminates the previous PR's section pattern = re.compile( r"" ) matches = list(pattern.finditer(response)) if not matches: return result for i, match in enumerate(matches): pr_num = int(match.group(1)) verdict = match.group(2) marker_end = match.end() # Find the start of this PR's section by looking for the section header # or the end of the previous verdict section_header = f"=== PR #{pr_num}" header_pos = response.rfind(section_header, 0, match.start()) if header_pos >= 0: # Extract from header to end of verdict marker section_text = response[header_pos:marker_end].strip() else: # No header found — extract from previous marker end to this marker end prev_end = matches[i - 1].end() if i > 0 else 0 section_text = response[prev_end:marker_end].strip() # Re-format as individual review comment # Strip the batch section header, keep just the review content # Add batch label for traceability pr_nums_str = ", ".join(f"#{n}" for n in pr_numbers) review_text = ( f"*(batch review with PRs {pr_nums_str})*\n\n" f"{section_text}\n" ) result[pr_num] = review_text return result def _validate_batch_fanout( parsed: dict[int, str], pr_diffs: list[dict], agent: str, ) -> tuple[dict[int, str], list[int]]: """Validate batch fan-out for completeness and cross-contamination. Returns (valid_reviews, fallback_pr_numbers). - valid_reviews: reviews that passed validation - fallback_pr_numbers: PRs that need individual review (missing or cross-contaminated) """ valid: dict[int, str] = {} fallback: list[int] = [] # Build file map: pr_number → set of path segments for matching. # Use full paths (e.g., "domains/internet-finance/dao.md") not bare filenames # to avoid false matches on short names like "dao.md" or "space.md" (Leo note #3). pr_files: dict[int, set[str]] = {} for pr in pr_diffs: files = set() for line in pr["diff"].split("\n"): if line.startswith("diff --git a/"): path = line.replace("diff --git a/", "").split(" b/")[0] files.add(path) # Also add the last 2 path segments (e.g., "internet-finance/dao.md") # for models that abbreviate paths parts = path.split("/") if len(parts) >= 2: files.add("/".join(parts[-2:])) pr_files[pr["number"]] = files for pr in pr_diffs: pr_num = pr["number"] # Completeness check: is there a review for this PR? if pr_num not in parsed: logger.warning("Batch fan-out: PR #%d missing from response — fallback to individual", pr_num) fallback.append(pr_num) continue review = parsed[pr_num] # Cross-contamination check: does review mention at least one file from this PR? # Use path segments (min 10 chars) to avoid false substring matches on short names. my_files = pr_files.get(pr_num, set()) mentions_own_file = any(f in review for f in my_files if len(f) >= 10) if not mentions_own_file and my_files: # Check if it references files from OTHER PRs (cross-contamination signal) other_files = set() for other_pr in pr_diffs: if other_pr["number"] != pr_num: other_files.update(pr_files.get(other_pr["number"], set())) mentions_other = any(f in review for f in other_files if len(f) >= 10) if mentions_other: logger.warning( "Batch fan-out: PR #%d review references files from another PR — cross-contamination, fallback", pr_num, ) fallback.append(pr_num) continue # If it doesn't mention any files at all, could be a generic review — accept it # (some PRs have short diffs where the model doesn't reference filenames) valid[pr_num] = review return valid, fallback async def _run_batch_domain_eval( conn, batch_prs: list[dict], domain: str, agent: str, ) -> tuple[int, int]: """Execute batch domain review for a group of same-domain STANDARD PRs. 1. Claim all PRs atomically 2. Run single batch domain review 3. Parse + validate fan-out 4. Post per-PR comments 5. Continue to individual Leo review for each 6. Fall back to individual review for any validation failures Returns (succeeded, failed). """ from .forgejo import get_pr_diff as _get_pr_diff succeeded = 0 failed = 0 # Step 1: Fetch diffs and build batch pr_diffs = [] claimed_prs = [] for pr_row in batch_prs: pr_num = pr_row["number"] # Atomic claim cursor = conn.execute( "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", (pr_num,), ) if cursor.rowcount == 0: continue # Increment eval_attempts — skip if merge-cycled (Ganymede+Rhea) mc_row = conn.execute("SELECT merge_cycled FROM prs WHERE number = ?", (pr_num,)).fetchone() if mc_row and mc_row["merge_cycled"]: conn.execute( "UPDATE prs SET merge_cycled = 0, last_attempt = datetime('now') WHERE number = ?", (pr_num,), ) logger.info("PR #%d: merge-cycled re-eval, not incrementing eval_attempts", pr_num) else: conn.execute( "UPDATE prs SET eval_attempts = COALESCE(eval_attempts, 0) + 1, " "last_attempt = datetime('now') WHERE number = ?", (pr_num,), ) diff = await _get_pr_diff(pr_num) if not diff: conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) continue # Musings bypass if _is_musings_only(diff): await forgejo_api( "POST", repo_path(f"issues/{pr_num}/comments"), {"body": "Auto-approved: musings bypass eval per collective policy."}, ) conn.execute( "UPDATE prs SET status = 'approved', leo_verdict = 'skipped', " "domain_verdict = 'skipped' WHERE number = ?", (pr_num,), ) succeeded += 1 continue review_diff, _ = _filter_diff(diff) if not review_diff: review_diff = diff files = _extract_changed_files(diff) # Build label from branch name or first claim filename branch = pr_row.get("branch", "") label = branch.split("/")[-1][:60] if branch else f"pr-{pr_num}" pr_diffs.append({ "number": pr_num, "label": label, "diff": review_diff, "files": files, "full_diff": diff, # kept for Leo review "file_count": len([l for l in files.split("\n") if l.strip()]), }) claimed_prs.append(pr_num) if not pr_diffs: return 0, 0 # Enforce BATCH_EVAL_MAX_DIFF_BYTES — split if total diff is too large. # We only know diff sizes after fetching, so enforce here not in _build_domain_batches. total_bytes = sum(len(p["diff"].encode()) for p in pr_diffs) if total_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and len(pr_diffs) > 1: # Keep PRs up to the byte cap, revert the rest to open for next cycle kept = [] running_bytes = 0 for p in pr_diffs: p_bytes = len(p["diff"].encode()) if running_bytes + p_bytes > config.BATCH_EVAL_MAX_DIFF_BYTES and kept: break kept.append(p) running_bytes += p_bytes overflow = [p for p in pr_diffs if p not in kept] for p in overflow: conn.execute( "UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 " "WHERE number = ?", (p["number"],), ) claimed_prs.remove(p["number"]) logger.info( "PR #%d: diff too large for batch (%d bytes total), deferring to next cycle", p["number"], total_bytes, ) pr_diffs = kept if not pr_diffs: return 0, 0 # Detect domain for all PRs (should be same domain) conn.execute( "UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number IN ({})".format( ",".join("?" * len(claimed_prs)) ), [domain, agent] + claimed_prs, ) # Step 2: Run batch domain review logger.info( "Batch domain review: %d PRs in %s domain (PRs: %s)", len(pr_diffs), domain, ", ".join(f"#{p['number']}" for p in pr_diffs), ) batch_response, batch_domain_usage = await run_batch_domain_review(pr_diffs, domain, agent) if batch_response is None: # Complete failure — revert all to open logger.warning("Batch domain review failed — reverting all PRs to open") for pr_num in claimed_prs: conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) return 0, len(claimed_prs) # Step 3: Parse + validate fan-out parsed = _parse_batch_response(batch_response, claimed_prs, agent) valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent) db.audit( conn, "evaluate", "batch_domain_review", json.dumps({ "domain": domain, "batch_size": len(pr_diffs), "valid": len(valid_reviews), "fallback": fallback_prs, }), ) # Record batch domain review cost ONCE for the whole batch (not per-PR) from . import costs costs.record_usage( conn, config.EVAL_DOMAIN_MODEL, "eval_domain", input_tokens=batch_domain_usage.get("prompt_tokens", 0), output_tokens=batch_domain_usage.get("completion_tokens", 0), backend="openrouter", ) # Step 4: Process valid reviews — post comments + continue to Leo for pr_data in pr_diffs: pr_num = pr_data["number"] if pr_num in fallback_prs: # Revert — will be picked up by individual eval next cycle conn.execute( "UPDATE prs SET status = 'open', eval_attempts = COALESCE(eval_attempts, 1) - 1 " "WHERE number = ?", (pr_num,), ) logger.info("PR #%d: batch fallback — will retry individually", pr_num) continue if pr_num not in valid_reviews: # Should not happen, but safety conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) continue review_text = valid_reviews[pr_num] domain_verdict = _parse_verdict(review_text, agent) # Post domain review comment agent_tok = get_agent_token(agent) await forgejo_api( "POST", repo_path(f"issues/{pr_num}/comments"), {"body": review_text}, token=agent_tok, ) conn.execute( "UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?", (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_num), ) # If domain rejects, handle disposition (same as individual path) if domain_verdict == "request_changes": domain_issues = _parse_issues(review_text) eval_attempts = (conn.execute( "SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,) ).fetchone()["eval_attempts"] or 0) conn.execute( "UPDATE prs SET status = 'open', leo_verdict = 'skipped', " "last_error = 'domain review requested changes', eval_issues = ? WHERE number = ?", (json.dumps(domain_issues), pr_num), ) db.audit( conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}), ) await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues) succeeded += 1 continue # Domain approved — continue to individual Leo review logger.info("PR #%d: batch domain approved, proceeding to individual Leo review", pr_num) review_diff = pr_data["diff"] files = pr_data["files"] leo_review, leo_usage = await run_leo_review(review_diff, files, "STANDARD") if leo_review is None: conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) logger.debug("PR #%d: Leo review failed, will retry next cycle", pr_num) continue if leo_review == "RATE_LIMITED": conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_num,)) logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num) continue leo_verdict = _parse_verdict(leo_review, "LEO") conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num)) # Post Leo review leo_tok = get_agent_token("Leo") await forgejo_api( "POST", repo_path(f"issues/{pr_num}/comments"), {"body": leo_review}, token=leo_tok, ) costs.record_usage( conn, config.EVAL_LEO_STANDARD_MODEL, "eval_leo", input_tokens=leo_usage.get("prompt_tokens", 0), output_tokens=leo_usage.get("completion_tokens", 0), backend="openrouter", ) # Final verdict both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped") if both_approve: pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}")) pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" await _post_formal_approvals(pr_num, pr_author) conn.execute("UPDATE prs SET status = 'approved' WHERE number = ?", (pr_num,)) db.audit( conn, "evaluate", "approved", json.dumps({"pr": pr_num, "tier": "STANDARD", "domain": domain, "leo": leo_verdict, "domain_agent": agent, "batch": True}), ) logger.info("PR #%d: APPROVED (batch domain + individual Leo)", pr_num) else: all_issues = [] if leo_verdict == "request_changes": all_issues.extend(_parse_issues(leo_review)) conn.execute( "UPDATE prs SET status = 'open', eval_issues = ? WHERE number = ?", (json.dumps(all_issues), pr_num), ) feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": "STANDARD", "issues": all_issues} conn.execute( "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", (json.dumps(feedback), pr_num), ) db.audit( conn, "evaluate", "changes_requested", json.dumps({"pr": pr_num, "tier": "STANDARD", "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues, "batch": True}), ) eval_attempts = (conn.execute( "SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,) ).fetchone()["eval_attempts"] or 0) await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues) succeeded += 1 return succeeded, failed def _build_domain_batches( rows: list, conn, ) -> tuple[dict[str, list[dict]], list[dict]]: """Group STANDARD PRs by domain for batch eval. DEEP and LIGHT stay individual. Returns (batches_by_domain, individual_prs). Respects BATCH_EVAL_MAX_PRS and BATCH_EVAL_MAX_DIFF_BYTES. """ domain_candidates: dict[str, list[dict]] = {} individual: list[dict] = [] for row in rows: pr_num = row["number"] tier = row["tier"] # Only batch STANDARD PRs with pending domain review if tier != "STANDARD": individual.append(row) continue # Check if domain review already done (resuming after Leo rate limit) existing = conn.execute( "SELECT domain_verdict, domain FROM prs WHERE number = ?", (pr_num,) ).fetchone() if existing and existing["domain_verdict"] not in ("pending", None): individual.append(row) continue domain = existing["domain"] if existing and existing["domain"] else "general" domain_candidates.setdefault(domain, []).append(row) # Build sized batches per domain batches: dict[str, list[dict]] = {} for domain, prs in domain_candidates.items(): if len(prs) == 1: # Single PR — no batching benefit, process individually individual.extend(prs) continue # Cap at BATCH_EVAL_MAX_PRS batch = prs[: config.BATCH_EVAL_MAX_PRS] batches[domain] = batch # Overflow goes individual individual.extend(prs[config.BATCH_EVAL_MAX_PRS :]) return batches, individual # ─── Main entry point ────────────────────────────────────────────────────── async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one evaluation cycle. Groups eligible STANDARD PRs by domain for batch domain review. DEEP PRs get individual eval. LIGHT PRs get auto-approved. Leo review always individual (safety net for batch cross-contamination). """ global _rate_limit_backoff_until # Check if we're in Opus rate-limit backoff opus_backoff = False if _rate_limit_backoff_until is not None: now = datetime.now(timezone.utc) if now < _rate_limit_backoff_until: remaining = int((_rate_limit_backoff_until - now).total_seconds()) logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining) opus_backoff = True else: logger.info("Rate limit backoff expired, resuming full eval cycles") _rate_limit_backoff_until = None # Find PRs ready for evaluation if opus_backoff: verdict_filter = "AND (p.domain_verdict = 'pending' OR (p.leo_verdict = 'pending' AND p.tier != 'DEEP'))" else: verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')" # Stagger removed — migration protection no longer needed. Merge is domain-serialized # and entity conflicts auto-resolve. Safe to let all eligible PRs enter eval. (Cory, Mar 14) rows = conn.execute( f"""SELECT p.number, p.tier, p.branch, p.domain FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass = 1 AND COALESCE(p.eval_attempts, 0) < {config.MAX_EVAL_ATTEMPTS} {verdict_filter} AND (p.last_attempt IS NULL OR p.last_attempt < datetime('now', '-10 minutes')) ORDER BY CASE WHEN COALESCE(p.eval_attempts, 0) = 0 THEN 0 ELSE 1 END, CASE COALESCE(p.priority, s.priority, 'medium') WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END, p.created_at ASC LIMIT ?""", (max_workers or config.MAX_EVAL_WORKERS,), ).fetchall() if not rows: return 0, 0 succeeded = 0 failed = 0 # Group STANDARD PRs by domain for batch eval domain_batches, individual_prs = _build_domain_batches(rows, conn) # Process batch domain reviews first for domain, batch_prs in domain_batches.items(): try: agent = agent_for_domain(domain) b_succeeded, b_failed = await _run_batch_domain_eval( conn, batch_prs, domain, agent, ) succeeded += b_succeeded failed += b_failed except Exception: logger.exception("Batch eval failed for domain %s", domain) # Revert all to open for pr_row in batch_prs: conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_row["number"],)) failed += len(batch_prs) # Process individual PRs (DEEP, LIGHT, single-domain, fallback) for row in individual_prs: try: if opus_backoff and row["tier"] == "DEEP": existing = conn.execute( "SELECT domain_verdict FROM prs WHERE number = ?", (row["number"],), ).fetchone() if existing and existing["domain_verdict"] not in ("pending", None): logger.debug( "PR #%d: skipping DEEP during Opus backoff (domain already %s)", row["number"], existing["domain_verdict"], ) continue result = await evaluate_pr(conn, row["number"], tier=row["tier"]) if result.get("skipped"): reason = result.get("reason", "") logger.debug("PR #%d skipped: %s", row["number"], reason) if "rate_limited" in reason: from datetime import timedelta if reason == "opus_rate_limited": _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) opus_backoff = True logger.info( "Opus rate limited — backing off Opus for %d min, continuing triage+domain", _RATE_LIMIT_BACKOFF_MINUTES, ) continue else: _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) logger.info( "Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES ) break else: succeeded += 1 except Exception: logger.exception("Failed to evaluate PR #%d", row["number"]) failed += 1 conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],)) if succeeded or failed: logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed) return succeeded, failed