"""Evaluate stage — triage + domain review + Leo review. Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last. Sonnet (domain review) filters before Opus (Leo review) to maximize value per scarce Opus call. Flow per PR: 1. Triage → Haiku (OpenRouter) → DEEP / STANDARD / LIGHT 2. Domain review → Sonnet (Claude Max, overflow: OpenRouter GPT-4o) 3. Leo review → Opus (Claude Max, overflow: queue) — skipped for LIGHT 4. DEEP cross-family → GPT-4o (OpenRouter) — only if domain + Leo both approve 5. Post reviews, submit formal Forgejo approvals, update SQLite 6. If both approve → status = 'approved' (merge module picks it up) Design reviewed by Ganymede, Rhea, Vida, Theseus. """ import asyncio import json import logging import re from datetime import datetime, timezone from . import config, db from .domains import agent_for_domain, detect_domain_from_diff from .forgejo import api as forgejo_api from .forgejo import get_agent_token, get_pr_diff, repo_path logger = logging.getLogger("pipeline.evaluate") # Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8) _active_subprocesses: set = set() async def kill_active_subprocesses(): """Kill all tracked Claude CLI subprocesses. Called during graceful shutdown.""" for proc in list(_active_subprocesses): if proc.returncode is None: logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid) try: proc.kill() await proc.wait() except ProcessLookupError: pass _active_subprocesses.clear() REVIEW_STYLE_GUIDE = ( "Be concise. Only mention what fails or is interesting. " "Do not summarize what the PR does — the diff speaks for itself. " "If everything passes, say so in one line and approve." ) # ─── Prompt templates ────────────────────────────────────────────────────── TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT. DEEP — use when ANY of these apply: - PR adds or modifies claims rated "likely" or higher confidence - PR touches agent beliefs or creates cross-domain wiki links - PR challenges an existing claim (has "challenged_by" or contradicts existing) - PR modifies axiom-level beliefs - PR is a cross-domain synthesis claim STANDARD — use when: - New claims in established domain areas - Enrichments to existing claims (confirm/extend) - New hypothesis-level beliefs - Source archives with extraction results LIGHT — use ONLY when ALL changes fit these categories: - Entity attribute updates (factual corrections, new data points) - Source archiving without extraction - Formatting fixes, typo corrections - Status field changes IMPORTANT: When uncertain, classify UP, not down. Always err toward more review. Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line. --- PR DIFF --- {diff}""" DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. Review this PR from your domain expertise: 1. Technical accuracy — are the claims factually correct in your domain? 2. Domain duplicates — does your domain already have substantially similar claims? 3. Missing context — is important domain context absent that would change interpretation? 4. Confidence calibration — from your domain expertise, is the confidence level right? 5. Enrichment opportunities — should this connect to existing claims via wiki links? {style_guide} If you are requesting changes, tag the specific issues: Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by End your review with exactly one of: --- PR DIFF --- {diff} --- CHANGED FILES --- {files}""" LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. Review this PR against the quality criteria: 1. Schema compliance — YAML frontmatter, prose-as-title, required fields 2. Duplicate check — does this claim already exist? 3. Confidence calibration — appropriate for the evidence? 4. Wiki link validity — references real claims? 5. Source quality — credible for the claim? 6. Domain assignment — correct domain? 7. Epistemic hygiene — specific enough to be wrong? {style_guide} If requesting changes, tag the issues: End your review with exactly one of: --- PR DIFF --- {diff} --- CHANGED FILES --- {files}""" LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check: 1. Cross-domain implications — does this claim affect beliefs in other domains? 2. Confidence calibration — is the confidence level justified by the evidence? 3. Contradiction check — does this contradict any existing claims without explicit argument? 4. Wiki link validity — do all wiki links reference real, existing claims? 5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary? 6. Source quality — is the source credible for the claim being made? 7. Duplicate check — does a substantially similar claim already exist? 8. Enrichment vs new claim — should this be an enrichment to an existing claim instead? 9. Domain assignment — is the claim in the correct domain? 10. Schema compliance — YAML frontmatter, prose-as-title format, required fields 11. Epistemic hygiene — is the claim specific enough to be wrong? {style_guide} If requesting changes, tag the issues: End your review with exactly one of: --- PR DIFF --- {diff} --- CHANGED FILES --- {files}""" # ─── API helpers ─────────────────────────────────────────────────────────── async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: """Call OpenRouter API. Returns response text or None on failure.""" import aiohttp key_file = config.SECRETS_DIR / "openrouter-key" if not key_file.exists(): logger.error("OpenRouter key file not found") return None key = key_file.read_text().strip() payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 4096, "temperature": 0.2, } try: async with aiohttp.ClientSession() as session: async with session.post( config.OPENROUTER_URL, headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, json=payload, timeout=aiohttp.ClientTimeout(total=timeout_sec), ) as resp: if resp.status >= 400: text = await resp.text() logger.error("OpenRouter %s → %d: %s", model, resp.status, text[:200]) return None data = await resp.json() return data.get("choices", [{}])[0].get("message", {}).get("content") except Exception as e: logger.error("OpenRouter error: %s → %s", model, e) return None async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: """Call Claude via CLI (Claude Max subscription). Returns response or None.""" proc = await asyncio.create_subprocess_exec( str(config.CLAUDE_CLI), "-p", "--model", model, "--output-format", "text", cwd=cwd or str(config.REPO_DIR), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8) try: stdout, stderr = await asyncio.wait_for( proc.communicate(input=prompt.encode()), timeout=timeout_sec, ) except asyncio.TimeoutError: proc.kill() await proc.wait() logger.error("Claude CLI timed out after %ds", timeout_sec) return None finally: _active_subprocesses.discard(proc) out_text = (stdout or b"").decode() err_text = (stderr or b"").decode() # Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message combined_lower = (out_text + err_text).lower() if "hit your limit" in combined_lower or "rate limit" in combined_lower: logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200]) return "RATE_LIMITED" if proc.returncode != 0: logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200]) return None return out_text.strip() # ─── Diff helpers ────────────────────────────────────────────────────────── def _filter_diff(diff: str) -> tuple[str, str]: """Filter diff to only review-relevant files. Returns (review_diff, entity_diff). Strips: inbox/archive/, schemas/, skills/, agents/*/musings/ """ sections = re.split(r"(?=^diff --git )", diff, flags=re.MULTILINE) skip_patterns = [r"^diff --git a/(inbox/archive|schemas|skills|agents/[^/]+/musings)/"] core_domains = {"living-agents", "living-capital", "teleohumanity", "mechanisms"} claim_sections = [] entity_sections = [] for section in sections: if not section.strip(): continue if any(re.match(p, section) for p in skip_patterns): continue entity_match = re.match(r"^diff --git a/entities/([^/]+)/", section) if entity_match and entity_match.group(1) not in core_domains: entity_sections.append(section) continue claim_sections.append(section) return "".join(claim_sections), "".join(entity_sections) def _extract_changed_files(diff: str) -> str: """Extract changed file paths from diff.""" return "\n".join( line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git") ) def _is_musings_only(diff: str) -> bool: """Check if PR only modifies musing files.""" has_musings = False has_other = False for line in diff.split("\n"): if line.startswith("diff --git"): if "agents/" in line and "/musings/" in line: has_musings = True else: has_other = True return has_musings and not has_other # ─── Verdict parsing ────────────────────────────────────────────────────── def _parse_verdict(review_text: str, reviewer: str) -> str: """Parse VERDICT tag from review. Returns 'approve' or 'request_changes'.""" upper = reviewer.upper() if f"VERDICT:{upper}:APPROVE" in review_text: return "approve" elif f"VERDICT:{upper}:REQUEST_CHANGES" in review_text: return "request_changes" else: logger.warning("No parseable verdict from %s — treating as request_changes", reviewer) return "request_changes" def _parse_issues(review_text: str) -> list[str]: """Extract issue tags from review.""" match = re.search(r"", review_text) if not match: return [] return [tag.strip() for tag in match.group(1).split(",") if tag.strip()] # ─── Review execution ───────────────────────────────────────────────────── async def _triage_pr(diff: str) -> str: """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage result = await _openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) if not result: logger.warning("Triage failed, defaulting to STANDARD") return "STANDARD" tier = result.split("\n")[0].strip().upper() if tier in ("DEEP", "STANDARD", "LIGHT"): reason = result.split("\n")[1].strip() if "\n" in result else "" logger.info("Triage: %s — %s", tier, reason[:100]) return tier logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20]) return "STANDARD" async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: """Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o.""" prompt = DOMAIN_PROMPT.format( agent=agent, agent_upper=agent.upper(), domain=domain, style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files, ) # Try Claude Max Sonnet first result = await _claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) if result == "RATE_LIMITED": # Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck) policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow") if policy == "overflow": logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o") result = await _openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) else: logger.info("Claude Max rate limited, queuing domain review") return None return result async def _run_leo_review(diff: str, files: str, tier: str) -> str | None: """Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy).""" prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) result = await _claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) if result == "RATE_LIMITED": # Leo review queues — don't waste Opus calls (never overflow) logger.info("Claude Max Opus rate limited, queuing Leo review") return None return result async def _post_formal_approvals(pr_number: int, pr_author: str): """Submit formal Forgejo reviews from 2 agents (not the PR author).""" approvals = 0 for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: if agent_name == pr_author: continue if approvals >= 2: break token = get_agent_token(agent_name) if token: result = await forgejo_api( "POST", repo_path(f"pulls/{pr_number}/reviews"), {"body": "Approved.", "event": "APPROVED"}, token=token, ) if result is not None: approvals += 1 logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) # ─── Single PR evaluation ───────────────────────────────────────────────── async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: """Evaluate a single PR. Returns result dict.""" # Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11) cursor = conn.execute( "UPDATE prs SET status = 'reviewing' WHERE number = ? AND status = 'open'", (pr_number,), ) if cursor.rowcount == 0: logger.debug("PR #%d already claimed by another worker, skipping", pr_number) return {"pr": pr_number, "skipped": True, "reason": "already_claimed"} # Fetch diff diff = await get_pr_diff(pr_number) if not diff: return {"pr": pr_number, "skipped": True, "reason": "no_diff"} # Musings bypass if _is_musings_only(diff): logger.info("PR #%d is musings-only — auto-approving", pr_number) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": "Auto-approved: musings bypass eval per collective policy."}, ) conn.execute( """UPDATE prs SET status = 'approved', leo_verdict = 'skipped', domain_verdict = 'skipped' WHERE number = ?""", (pr_number,), ) return {"pr": pr_number, "auto_approved": True, "reason": "musings_only"} # Filter diff review_diff, _entity_diff = _filter_diff(diff) if not review_diff: review_diff = diff files = _extract_changed_files(diff) # Detect domain domain = detect_domain_from_diff(diff) agent = agent_for_domain(domain) # Default NULL domain to 'general' (archive-only PRs have no domain files) if domain is None: domain = "general" # Update PR domain if not set conn.execute( "UPDATE prs SET domain = COALESCE(domain, ?), domain_agent = ? WHERE number = ?", (domain, agent, pr_number), ) # Step 1: Triage (if not already triaged) if tier is None: tier = await _triage_pr(diff) conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) # Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above) conn.execute( "UPDATE prs SET last_attempt = datetime('now') WHERE number = ?", (pr_number,), ) # Check if domain review already completed (resuming after Leo rate limit) existing = conn.execute("SELECT domain_verdict, leo_verdict FROM prs WHERE number = ?", (pr_number,)).fetchone() existing_domain_verdict = existing["domain_verdict"] if existing else "pending" _existing_leo_verdict = existing["leo_verdict"] if existing else "pending" # Step 2: Domain review FIRST (Sonnet — high volume filter) # Skip if already completed from a previous attempt domain_review = None # Initialize — used later for feedback extraction (Ganymede #12) if existing_domain_verdict not in ("pending", None): domain_verdict = existing_domain_verdict logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) else: logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier) domain_review = await _run_domain_review(review_diff, files, domain or "general", agent) if domain_review is None: # Rate limited, couldn't overflow — revert to open for retry conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) return {"pr": pr_number, "skipped": True, "reason": "rate_limited"} domain_verdict = _parse_verdict(domain_review, agent) conn.execute( "UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?", (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number), ) # Post domain review as comment (from agent's Forgejo account) agent_tok = get_agent_token(agent) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": domain_review}, token=agent_tok, ) # If domain review rejects, skip Leo review (save Opus) if domain_verdict == "request_changes": logger.info("PR #%d: domain rejected, skipping Leo review", pr_number) conn.execute( """UPDATE prs SET status = 'open', leo_verdict = 'skipped', last_error = 'domain review requested changes' WHERE number = ?""", (pr_number,), ) db.audit(conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_number, "agent": agent})) return {"pr": pr_number, "domain_verdict": domain_verdict, "leo_verdict": "skipped"} # Step 3: Leo review (Opus — only if domain passes, skipped for LIGHT) leo_verdict = "skipped" if tier != "LIGHT": logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier) leo_review = await _run_leo_review(review_diff, files, tier) if leo_review is None: # Opus rate limited — revert to open for retry (keep domain verdict) conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,)) return {"pr": pr_number, "skipped": True, "reason": "opus_rate_limited"} leo_verdict = _parse_verdict(leo_review, "LEO") conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number)) # Post Leo review as comment (from Leo's Forgejo account) leo_tok = get_agent_token("Leo") await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": leo_review}, token=leo_tok, ) else: # LIGHT tier: Leo is auto-skipped, domain verdict is the only gate conn.execute("UPDATE prs SET leo_verdict = 'skipped' WHERE number = ?", (pr_number,)) # Step 4: Determine final verdict both_approve = (leo_verdict == "approve" or leo_verdict == "skipped") and domain_verdict == "approve" if both_approve: # Get PR author for formal approvals pr_info = await forgejo_api( "GET", repo_path(f"pulls/{pr_number}"), ) pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" # Submit formal Forgejo reviews (required for merge) await _post_formal_approvals(pr_number, pr_author) conn.execute( "UPDATE prs SET status = 'approved' WHERE number = ?", (pr_number,), ) db.audit( conn, "evaluate", "approved", json.dumps({"pr": pr_number, "tier": tier, "domain": domain, "leo": leo_verdict, "domain_agent": agent}), ) logger.info("PR #%d: APPROVED (tier=%s, leo=%s, domain=%s)", pr_number, tier, leo_verdict, domain_verdict) else: conn.execute( "UPDATE prs SET status = 'open' WHERE number = ?", (pr_number,), ) # Store feedback for re-extraction path feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": tier} if domain_verdict == "request_changes" and domain_review is not None: feedback["domain_issues"] = _parse_issues(domain_review) conn.execute( "UPDATE sources SET feedback = ? WHERE path = (SELECT source_path FROM prs WHERE number = ?)", (json.dumps(feedback), pr_number), ) db.audit( conn, "evaluate", "changes_requested", json.dumps({"pr": pr_number, "tier": tier, "leo": leo_verdict, "domain": domain_verdict}), ) logger.info("PR #%d: CHANGES REQUESTED (leo=%s, domain=%s)", pr_number, leo_verdict, domain_verdict) # Record cost (domain review) from . import costs costs.record_usage(conn, config.EVAL_DOMAIN_MODEL, "eval_domain", backend="max") if tier != "LIGHT": costs.record_usage(conn, config.EVAL_LEO_MODEL, "eval_leo", backend="max") return { "pr": pr_number, "tier": tier, "domain": domain, "leo_verdict": leo_verdict, "domain_verdict": domain_verdict, "approved": both_approve, } # ─── Rate limit backoff ─────────────────────────────────────────────────── # When rate limited, don't retry for 15 minutes. Prevents ~2700 wasted # CLI calls overnight when Opus is exhausted. _rate_limit_backoff_until: datetime | None = None _RATE_LIMIT_BACKOFF_MINUTES = 15 # ─── Main entry point ────────────────────────────────────────────────────── async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: """Run one evaluation cycle. Finds PRs with status='open', tier0_pass=1, and no pending verdicts. Evaluates in priority order. """ global _rate_limit_backoff_until # Check if we're in Opus rate-limit backoff opus_backoff = False if _rate_limit_backoff_until is not None: now = datetime.now(timezone.utc) if now < _rate_limit_backoff_until: remaining = int((_rate_limit_backoff_until - now).total_seconds()) logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining) opus_backoff = True else: logger.info("Rate limit backoff expired, resuming full eval cycles") _rate_limit_backoff_until = None # Find PRs ready for evaluation: # - status = 'open' # - tier0_pass = 1 (passed validation) # - leo_verdict = 'pending' OR domain_verdict = 'pending' # During Opus backoff: only fetch PRs needing triage or domain review # (skip PRs already domain-reviewed that are waiting for Leo/Opus) # Skip PRs attempted within last 10 minutes (backoff during rate limits) if opus_backoff: verdict_filter = "AND p.domain_verdict = 'pending'" else: verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')" rows = conn.execute( f"""SELECT p.number, p.tier FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass = 1 {verdict_filter} AND (p.last_attempt IS NULL OR p.last_attempt < datetime('now', '-10 minutes')) ORDER BY CASE COALESCE(p.priority, s.priority, 'medium') WHEN 'critical' THEN 0 WHEN 'high' THEN 1 WHEN 'medium' THEN 2 WHEN 'low' THEN 3 ELSE 4 END, p.created_at ASC LIMIT ?""", (max_workers or config.MAX_EVAL_WORKERS,), ).fetchall() if not rows: return 0, 0 succeeded = 0 failed = 0 for row in rows: try: result = await evaluate_pr(conn, row["number"], tier=row["tier"]) if result.get("skipped"): reason = result.get("reason", "") logger.debug("PR #%d skipped: %s", row["number"], reason) if "rate_limited" in reason: from datetime import timedelta if reason == "opus_rate_limited": # Opus hit — set backoff but DON'T break. Other PRs # may still need triage (Haiku) or domain review (Sonnet). _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) logger.info( "Opus rate limited — backing off Opus for %d min, continuing triage+domain", _RATE_LIMIT_BACKOFF_MINUTES, ) continue else: # Non-Opus rate limit (Sonnet/Haiku) — break the cycle, # nothing else can proceed either. _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( minutes=_RATE_LIMIT_BACKOFF_MINUTES ) logger.info( "Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES ) break else: succeeded += 1 except Exception: logger.exception("Failed to evaluate PR #%d", row["number"]) failed += 1 # Revert to open on unhandled error conn.execute("UPDATE prs SET status = 'open' WHERE number = ?", (row["number"],)) if succeeded or failed: logger.info("Evaluate cycle: %d evaluated, %d errors", succeeded, failed) return succeeded, failed