teleo-infrastructure/lib/llm.py
m3taversal 85b86a918a
Some checks failed
CI / lint-and-test (pull_request) Has been cancelled
ganymede: extract lib/llm.py from evaluate.py (Phase 3c)
- What: LLM transport (OpenRouter, Claude CLI), prompt templates
  (triage/domain/Leo), and review runner functions moved to lib/llm.py.
  evaluate.py retains PR lifecycle orchestration, SQLite state, Forgejo
  posting, rate limit backoff, and evaluate_cycle.
- Why: evaluate.py was 734 lines mixing orchestration with LLM concerns.
  Now 455 lines orchestration + 250 lines LLM transport. Each module has
  a single responsibility.
- Connections: completes Phase 3 structural refactor (forgejo.py + domains.py
  + llm.py). teleo-pipeline.py updated to import kill_active_subprocesses
  from lib.llm.

Pentagon-Agent: Ganymede <F99EBFA6-547B-4096-BEEA-1D59C3E4028A>
2026-03-13 15:40:18 +00:00

299 lines
11 KiB
Python

"""LLM transport and review prompts — shared by all evaluation stages.
Extracted from evaluate.py (Phase 3c refactor). This module owns:
- Prompt templates (triage, domain, Leo)
- OpenRouter API transport
- Claude CLI transport with subprocess tracking
- Review runner functions (triage, domain, Leo)
Orchestration (PR lifecycle, SQLite state, Forgejo posting) stays in evaluate.py.
"""
import asyncio
import logging
import aiohttp
from . import config
logger = logging.getLogger("pipeline.llm")
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
_active_subprocesses: set = set()
async def kill_active_subprocesses():
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
for proc in list(_active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
_active_subprocesses.clear()
REVIEW_STYLE_GUIDE = (
"Be concise. Only mention what fails or is interesting. "
"Do not summarize what the PR does — the diff speaks for itself. "
"If everything passes, say so in one line and approve."
)
# ─── Prompt templates ──────────────────────────────────────────────────────
TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT.
DEEP — use when ANY of these apply:
- PR adds or modifies claims rated "likely" or higher confidence
- PR touches agent beliefs or creates cross-domain wiki links
- PR challenges an existing claim (has "challenged_by" or contradicts existing)
- PR modifies axiom-level beliefs
- PR is a cross-domain synthesis claim
STANDARD — use when:
- New claims in established domain areas
- Enrichments to existing claims (confirm/extend)
- New hypothesis-level beliefs
- Source archives with extraction results
LIGHT — use ONLY when ALL changes fit these categories:
- Entity attribute updates (factual corrections, new data points)
- Source archiving without extraction
- Formatting fixes, typo corrections
- Status field changes
IMPORTANT: When uncertain, classify UP, not down. Always err toward more review.
Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line.
--- PR DIFF ---
{diff}"""
DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
Review this PR from your domain expertise:
1. Technical accuracy — are the claims factually correct in your domain?
2. Domain duplicates — does your domain already have substantially similar claims?
3. Missing context — is important domain context absent that would change interpretation?
4. Confidence calibration — from your domain expertise, is the confidence level right?
5. Enrichment opportunities — should this connect to existing claims via wiki links?
{style_guide}
If you are requesting changes, tag the specific issues:
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by
End your review with exactly one of:
<!-- VERDICT:{agent_upper}:APPROVE -->
<!-- VERDICT:{agent_upper}:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR against the quality criteria:
1. Schema compliance — YAML frontmatter, prose-as-title, required fields
2. Duplicate check — does this claim already exist?
3. Confidence calibration — appropriate for the evidence?
4. Wiki link validity — references real claims?
5. Source quality — credible for the claim?
6. Domain assignment — correct domain?
7. Epistemic hygiene — specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check:
1. Cross-domain implications — does this claim affect beliefs in other domains?
2. Confidence calibration — is the confidence level justified by the evidence?
3. Contradiction check — does this contradict any existing claims without explicit argument?
4. Wiki link validity — do all wiki links reference real, existing claims?
5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary?
6. Source quality — is the source credible for the claim being made?
7. Duplicate check — does a substantially similar claim already exist?
8. Enrichment vs new claim — should this be an enrichment to an existing claim instead?
9. Domain assignment — is the claim in the correct domain?
10. Schema compliance — YAML frontmatter, prose-as-title format, required fields
11. Epistemic hygiene — is the claim specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
# ─── API helpers ───────────────────────────────────────────────────────────
async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
logger.error("OpenRouter key file not found")
return None
key = key_file.read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
"temperature": 0.2,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
config.OPENROUTER_URL,
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=timeout_sec),
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("OpenRouter %s%d: %s", model, resp.status, text[:200])
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s%s", model, e)
return None
async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None:
"""Call Claude via CLI (Claude Max subscription). Returns response or None."""
proc = await asyncio.create_subprocess_exec(
str(config.CLAUDE_CLI),
"-p",
"--model",
model,
"--output-format",
"text",
cwd=cwd or str(config.REPO_DIR),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec)
return None
finally:
_active_subprocesses.discard(proc)
out_text = (stdout or b"").decode()
err_text = (stderr or b"").decode()
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
combined_lower = (out_text + err_text).lower()
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
return "RATE_LIMITED"
if proc.returncode != 0:
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
return None
return out_text.strip()
# ─── Review execution ─────────────────────────────────────────────────────
async def triage_pr(diff: str) -> str:
"""Triage PR via Haiku → DEEP/STANDARD/LIGHT."""
prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage
result = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
if not result:
logger.warning("Triage failed, defaulting to STANDARD")
return "STANDARD"
tier = result.split("\n")[0].strip().upper()
if tier in ("DEEP", "STANDARD", "LIGHT"):
reason = result.split("\n")[1].strip() if "\n" in result else ""
logger.info("Triage: %s%s", tier, reason[:100])
return tier
logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20])
return "STANDARD"
async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o."""
prompt = DOMAIN_PROMPT.format(
agent=agent,
agent_upper=agent.upper(),
domain=domain,
style_guide=REVIEW_STYLE_GUIDE,
diff=diff,
files=files,
)
# Try Claude Max Sonnet first
result = await claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck)
policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow")
if policy == "overflow":
logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o")
result = await openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
else:
logger.info("Claude Max rate limited, queuing domain review")
return None
return result
async def run_leo_review(diff: str, files: str, tier: str) -> str | None:
"""Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy)."""
prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Leo review queues — don't waste Opus calls (never overflow)
logger.info("Claude Max Opus rate limited, queuing Leo review")
return None
return result