"""LLM transport and review prompts — shared by all evaluation stages. Extracted from evaluate.py (Phase 3c refactor). This module owns: - Prompt templates (triage, domain, Leo) - OpenRouter API transport - Claude CLI transport with subprocess tracking - Review runner functions (triage, domain, Leo) Orchestration (PR lifecycle, SQLite state, Forgejo posting) stays in evaluate.py. """ import asyncio import logging import aiohttp from . import config logger = logging.getLogger("pipeline.llm") # Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8) _active_subprocesses: set = set() async def kill_active_subprocesses(): """Kill all tracked Claude CLI subprocesses. Called during graceful shutdown.""" for proc in list(_active_subprocesses): if proc.returncode is None: logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid) try: proc.kill() await proc.wait() except ProcessLookupError: pass _active_subprocesses.clear() REVIEW_STYLE_GUIDE = ( "Be concise. Only mention what fails or is interesting. " "Do not summarize what the PR does — the diff speaks for itself. " "If everything passes, say so in one line and approve." ) # ─── Prompt templates ────────────────────────────────────────────────────── TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT. DEEP — use when ANY of these apply: - PR adds or modifies claims rated "likely" or higher confidence - PR touches agent beliefs or creates cross-domain wiki links - PR challenges an existing claim (has "challenged_by" or contradicts existing) - PR modifies axiom-level beliefs - PR is a cross-domain synthesis claim STANDARD — use when: - New claims in established domain areas - Enrichments to existing claims (confirm/extend) - New hypothesis-level beliefs - Source archives with extraction results LIGHT — use ONLY when ALL changes fit these categories: - Entity attribute updates (factual corrections, new data points) - Source archiving without extraction - Formatting fixes, typo corrections - Status field changes IMPORTANT: When uncertain, classify UP, not down. Always err toward more review. Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line. --- PR DIFF --- {diff}""" DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. Review this PR from your domain expertise: 1. Technical accuracy — are the claims factually correct in your domain? 2. Domain duplicates — does your domain already have substantially similar claims? 3. Missing context — is important domain context absent that would change interpretation? 4. Confidence calibration — from your domain expertise, is the confidence level right? 5. Enrichment opportunities — should this connect to existing claims via wiki links? {style_guide} If you are requesting changes, tag the specific issues: Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by End your review with exactly one of: --- PR DIFF --- {diff} --- CHANGED FILES --- {files}""" LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. Review this PR against the quality criteria: 1. Schema compliance — YAML frontmatter, prose-as-title, required fields 2. Duplicate check — does this claim already exist? 3. Confidence calibration — appropriate for the evidence? 4. Wiki link validity — references real claims? 5. Source quality — credible for the claim? 6. Domain assignment — correct domain? 7. Epistemic hygiene — specific enough to be wrong? {style_guide} If requesting changes, tag the issues: End your review with exactly one of: --- PR DIFF --- {diff} --- CHANGED FILES --- {files}""" LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check: 1. Cross-domain implications — does this claim affect beliefs in other domains? 2. Confidence calibration — is the confidence level justified by the evidence? 3. Contradiction check — does this contradict any existing claims without explicit argument? 4. Wiki link validity — do all wiki links reference real, existing claims? 5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary? 6. Source quality — is the source credible for the claim being made? 7. Duplicate check — does a substantially similar claim already exist? 8. Enrichment vs new claim — should this be an enrichment to an existing claim instead? 9. Domain assignment — is the claim in the correct domain? 10. Schema compliance — YAML frontmatter, prose-as-title format, required fields 11. Epistemic hygiene — is the claim specific enough to be wrong? {style_guide} If requesting changes, tag the issues: End your review with exactly one of: --- PR DIFF --- {diff} --- CHANGED FILES --- {files}""" # ─── API helpers ─────────────────────────────────────────────────────────── async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: """Call OpenRouter API. Returns response text or None on failure.""" key_file = config.SECRETS_DIR / "openrouter-key" if not key_file.exists(): logger.error("OpenRouter key file not found") return None key = key_file.read_text().strip() payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 4096, "temperature": 0.2, } try: async with aiohttp.ClientSession() as session: async with session.post( config.OPENROUTER_URL, headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, json=payload, timeout=aiohttp.ClientTimeout(total=timeout_sec), ) as resp: if resp.status >= 400: text = await resp.text() logger.error("OpenRouter %s → %d: %s", model, resp.status, text[:200]) return None data = await resp.json() return data.get("choices", [{}])[0].get("message", {}).get("content") except Exception as e: logger.error("OpenRouter error: %s → %s", model, e) return None async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: """Call Claude via CLI (Claude Max subscription). Returns response or None.""" proc = await asyncio.create_subprocess_exec( str(config.CLAUDE_CLI), "-p", "--model", model, "--output-format", "text", cwd=cwd or str(config.REPO_DIR), stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8) try: stdout, stderr = await asyncio.wait_for( proc.communicate(input=prompt.encode()), timeout=timeout_sec, ) except asyncio.TimeoutError: proc.kill() await proc.wait() logger.error("Claude CLI timed out after %ds", timeout_sec) return None finally: _active_subprocesses.discard(proc) out_text = (stdout or b"").decode() err_text = (stderr or b"").decode() # Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message combined_lower = (out_text + err_text).lower() if "hit your limit" in combined_lower or "rate limit" in combined_lower: logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200]) return "RATE_LIMITED" if proc.returncode != 0: logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200]) return None return out_text.strip() # ─── Review execution ───────────────────────────────────────────────────── async def triage_pr(diff: str) -> str: """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage result = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) if not result: logger.warning("Triage failed, defaulting to STANDARD") return "STANDARD" tier = result.split("\n")[0].strip().upper() if tier in ("DEEP", "STANDARD", "LIGHT"): reason = result.split("\n")[1].strip() if "\n" in result else "" logger.info("Triage: %s — %s", tier, reason[:100]) return tier logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20]) return "STANDARD" async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: """Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o.""" prompt = DOMAIN_PROMPT.format( agent=agent, agent_upper=agent.upper(), domain=domain, style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files, ) # Try Claude Max Sonnet first result = await claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) if result == "RATE_LIMITED": # Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck) policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow") if policy == "overflow": logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o") result = await openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) else: logger.info("Claude Max rate limited, queuing domain review") return None return result async def run_leo_review(diff: str, files: str, tier: str) -> str | None: """Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy).""" prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) if result == "RATE_LIMITED": # Leo review queues — don't waste Opus calls (never overflow) logger.info("Claude Max Opus rate limited, queuing Leo review") return None return result