diff --git a/lib/config.py b/lib/config.py index 96c66d6..fc95bdc 100644 --- a/lib/config.py +++ b/lib/config.py @@ -107,8 +107,17 @@ OPENROUTER_DAILY_BUDGET = 20.0 # USD OPENROUTER_WARN_THRESHOLD = 0.8 # 80% of budget # --- Quality --- -SAMPLE_AUDIT_RATE = 0.10 # 10% of LIGHT merges +SAMPLE_AUDIT_RATE = 0.15 # 15% of LIGHT merges get pre-merge promotion to STANDARD (Rio) SAMPLE_AUDIT_DISAGREEMENT_THRESHOLD = 0.10 # 10% disagreement → tighten LIGHT criteria +SAMPLE_AUDIT_MODEL = MODEL_OPUS # Opus for audit — different family from Haiku triage (Leo) + +# --- Tier logic --- +# LIGHT_SKIP_LLM: when True, LIGHT PRs skip domain+Leo review entirely (auto-approve on Tier 0 pass). +# Set False for shadow mode (domain review runs but logs only). Flip True after 24h validation (Rhea). +LIGHT_SKIP_LLM = os.environ.get("LIGHT_SKIP_LLM", "false").lower() == "true" +# Random pre-merge promotion: fraction of LIGHT PRs upgraded to STANDARD before eval (Rio). +# Makes gaming unpredictable — extraction agents can't know which LIGHT PRs get full review. +LIGHT_PROMOTION_RATE = float(os.environ.get("LIGHT_PROMOTION_RATE", "0.15")) # --- Polling intervals (seconds) --- INGEST_INTERVAL = 60 diff --git a/lib/evaluate.py b/lib/evaluate.py index da862f2..8662f16 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -1,23 +1,26 @@ """Evaluate stage — PR lifecycle orchestration. -Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last. -Sonnet (domain review) filters before Opus (Leo review) to maximize value per -scarce Opus call. +Tier-based review routing. Model diversity: GPT-4o (domain) + Sonnet (Leo STANDARD) ++ Opus (Leo DEEP) = two model families, no correlated blind spots. Flow per PR: 1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT - 2. Domain review → Sonnet (Claude Max, overflow: OpenRouter GPT-4o) - 3. Leo review → Opus (Claude Max, overflow: queue) — skipped for LIGHT - 4. DEEP cross-family → GPT-4o (OpenRouter) — only if domain + Leo both approve + 2. Tier overrides: + a. Claim-shape detector: type: claim in YAML → STANDARD min (Theseus) + b. Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio) + 3. Domain review → GPT-4o (OpenRouter) — skipped for LIGHT when LIGHT_SKIP_LLM=True + 4. Leo review → Opus DEEP / Sonnet STANDARD (OpenRouter) — skipped for LIGHT 5. Post reviews, submit formal Forgejo approvals, update SQLite 6. If both approve → status = 'approved' (merge module picks it up) + 7. Retry budget: 3 attempts max, disposition on attempt 2+ -Design reviewed by Ganymede, Rhea, Vida, Theseus. +Design reviewed by Ganymede, Rio, Theseus, Rhea, Leo. LLM transport and prompts extracted to lib/llm.py (Phase 3c). """ import json import logging +import random import re from datetime import datetime, timezone @@ -80,6 +83,25 @@ def _is_musings_only(diff: str) -> bool: return has_musings and not has_other +# ─── Tier overrides ─────────────────────────────────────────────────────── + + +def _diff_contains_claim_type(diff: str) -> bool: + """Claim-shape detector: check if any file in diff has type: claim in frontmatter. + + Mechanical check ($0). If YAML declares type: claim, this is a factual claim — + not an entity update or formatting fix. Must be classified STANDARD minimum + regardless of Haiku triage. Catches factual claims disguised as LIGHT content. + (Theseus: converts semantic problem to mechanical check) + """ + for line in diff.split("\n"): + if line.startswith("+") and not line.startswith("+++"): + stripped = line[1:].strip() + if stripped in ("type: claim", 'type: "claim"', "type: 'claim'"): + return True + return False + + # ─── Verdict parsing ────────────────────────────────────────────────────── @@ -133,10 +155,12 @@ async def _terminate_pr(conn, pr_number: int, reason: str): await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), - {"body": f"**Closed by eval pipeline** — {reason}.\n\n" - f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " - f"Source material will be re-queued for extraction with review feedback attached.\n\n" - f"See eval_issues for specific problems."}, + { + "body": f"**Closed by eval pipeline** — {reason}.\n\n" + f"This PR has been evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " + f"Source material will be re-queued for extraction with review feedback attached.\n\n" + f"See eval_issues for specific problems." + }, ) await forgejo_api( "PATCH", @@ -160,9 +184,17 @@ async def _terminate_pr(conn, pr_number: int, reason: str): if cursor.rowcount == 0: logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number) - db.audit(conn, "evaluate", "pr_terminated", json.dumps({ - "pr": pr_number, "reason": reason, - })) + db.audit( + conn, + "evaluate", + "pr_terminated", + json.dumps( + { + "pr": pr_number, + "reason": reason, + } + ), + ) logger.info("PR #%d: TERMINATED — %s", pr_number, reason) @@ -205,18 +237,34 @@ async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_iss # Future: auto-fix module will push fixes here. logger.info( "PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt", - pr_number, eval_attempts, all_issues, + pr_number, + eval_attempts, + all_issues, + ) + db.audit( + conn, + "evaluate", + "mechanical_retry", + json.dumps( + { + "pr": pr_number, + "attempt": eval_attempts, + "issues": all_issues, + } + ), ) - db.audit(conn, "evaluate", "mechanical_retry", json.dumps({ - "pr": pr_number, "attempt": eval_attempts, "issues": all_issues, - })) else: # Substantive, mixed, or unknown — close and requeue logger.info( "PR #%d: attempt %d, %s issues (%s) — closing and requeuing source", - pr_number, eval_attempts, classification, all_issues, + pr_number, + eval_attempts, + classification, + all_issues, + ) + await _terminate_pr( + conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}" ) - await _terminate_pr(conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}") # ─── Single PR evaluation ───────────────────────────────────────────────── @@ -225,9 +273,7 @@ async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_iss async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" # Check eval attempt budget before claiming - row = conn.execute( - "SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,) - ).fetchone() + row = conn.execute("SELECT eval_attempts FROM prs WHERE number = ?", (pr_number,)).fetchone() eval_attempts = (row["eval_attempts"] or 0) if row else 0 if eval_attempts >= config.MAX_EVAL_ATTEMPTS: # Terminal — hard cap reached. Close PR, tag source. @@ -294,6 +340,26 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Step 1: Triage (if not already triaged) if tier is None: tier = await triage_pr(diff) + + # Tier overrides (claim-shape detector + random promotion) + # Order matters: claim-shape catches obvious cases, random promotion catches the rest. + + # Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus) + if tier == "LIGHT" and _diff_contains_claim_type(diff): + tier = "STANDARD" + logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number) + db.audit( + conn, "evaluate", "claim_shape_upgrade", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"}) + ) + + # Random pre-merge promotion: 15% of LIGHT → STANDARD (Rio) + if tier == "LIGHT" and random.random() < config.LIGHT_PROMOTION_RATE: + tier = "STANDARD" + logger.info( + "PR #%d: random promotion LIGHT → STANDARD (%.0f%% rate)", pr_number, config.LIGHT_PROMOTION_RATE * 100 + ) + db.audit(conn, "evaluate", "random_promotion", json.dumps({"pr": pr_number, "from": "LIGHT", "to": "STANDARD"})) + conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) # Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above) @@ -307,10 +373,18 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: existing_domain_verdict = existing["domain_verdict"] if existing else "pending" _existing_leo_verdict = existing["leo_verdict"] if existing else "pending" - # Step 2: Domain review FIRST (Sonnet — high volume filter) + # Step 2: Domain review (GPT-4o via OpenRouter) + # LIGHT tier: skip entirely when LIGHT_SKIP_LLM enabled (Rhea: config flag rollback) # Skip if already completed from a previous attempt domain_review = None # Initialize — used later for feedback extraction (Ganymede #12) - if existing_domain_verdict not in ("pending", None): + if tier == "LIGHT" and config.LIGHT_SKIP_LLM: + domain_verdict = "skipped" + logger.info("PR #%d: LIGHT tier — skipping domain review (LIGHT_SKIP_LLM=True)", pr_number) + conn.execute( + "UPDATE prs SET domain_verdict = 'skipped', domain_model = 'none' WHERE number = ?", + (pr_number,), + ) + elif existing_domain_verdict not in ("pending", None): domain_verdict = existing_domain_verdict logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) else: @@ -349,12 +423,19 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: WHERE number = ?""", (json.dumps(domain_issues), pr_number), ) - db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues})) + db.audit( + conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent, "issues": domain_issues}) + ) # Disposition: check if this PR should be terminated or kept open await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) - return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped", "eval_attempts": eval_attempts} + return { + "pr": pr_number, + "domain_verdict": domain_verdict, + "leo_verdict": "skipped", + "eval_attempts": eval_attempts, + } # Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT) leo_verdict = "skipped" @@ -385,7 +466,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,)) # Step 4: Determine final verdict - both_approve = (leo_verdict == "approve" or leo_verdict == "skipped") and domain_verdict == "approve" + # "skipped" counts as approve (LIGHT skips both reviews deliberately) + both_approve = leo_verdict in ("approve", "skipped") and domain_verdict in ("approve", "skipped") if both_approve: # Get PR author for formal approvals @@ -431,18 +513,27 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: conn, "evaluate", "changes_requested", - json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues}), + json.dumps( + {"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict, "issues": all_issues} + ), + ) + logger.info( + "PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", + pr_number, + leo_verdict, + domain_verdict, + all_issues, ) - logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s, issues=%s)", pr_number, leo_verdict, domain_verdict, all_issues) # Disposition: check if this PR should be terminated or kept open await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues) - # Record cost (domain review on OpenRouter, Leo depends on tier) + # Record cost (only for reviews that actually ran) from . import costs - costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter") - if tier != "LIGHT": + if domain_verdict != "skipped": + costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="openrouter") + if leo_verdict not in ("skipped",): if tier == "DEEP": costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") else: @@ -535,7 +626,9 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: ).fetchall() if stagger_limit and rows: - logger.info("Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count) + logger.info( + "Post-migration stagger: limiting eval batch to %d (migrated PRs: %d)", stagger_limit, migrated_count + ) if not rows: return 0, 0