diff --git a/lib/domains.py b/lib/domains.py new file mode 100644 index 0000000..0db6f94 --- /dev/null +++ b/lib/domains.py @@ -0,0 +1,87 @@ +"""Domain→agent mapping and domain detection — single source of truth. + +Extracted from evaluate.py and merge.py (Phase 3 refactor). +All domain classification logic goes through this module. +""" + +import re + +# Canonical domain→agent mapping. Every domain must have exactly one primary agent. +DOMAIN_AGENT_MAP: dict[str, str] = { + "internet-finance": "Rio", + "entertainment": "Clay", + "health": "Vida", + "ai-alignment": "Theseus", + "space-development": "Astra", + "mechanisms": "Rio", + "living-capital": "Rio", + "living-agents": "Theseus", + "teleohumanity": "Leo", + "grand-strategy": "Leo", + "critical-systems": "Theseus", + "collective-intelligence": "Theseus", + "teleological-economics": "Rio", + "cultural-dynamics": "Clay", +} + +# Valid domain names — derived from the map, not maintained separately. +VALID_DOMAINS: frozenset[str] = frozenset(DOMAIN_AGENT_MAP.keys()) + +# Inverse mapping: agent name (lowercase) → primary domain (for branch detection). +_AGENT_PRIMARY_DOMAIN: dict[str, str] = { + "rio": "internet-finance", + "clay": "entertainment", + "theseus": "ai-alignment", + "vida": "health", + "astra": "space-development", + "leo": "grand-strategy", +} + + +def agent_for_domain(domain: str | None) -> str: + """Get the reviewing agent for a domain. Falls back to Leo.""" + if domain is None: + return "Leo" + return DOMAIN_AGENT_MAP.get(domain, "Leo") + + +def detect_domain_from_diff(diff: str) -> str | None: + """Detect primary domain from changed file paths in a unified diff. + + Checks domains/, entities/, core/, foundations/ for domain classification. + Returns the most-referenced domain, or None if no domain files found. + """ + domain_counts: dict[str, int] = {} + for line in diff.split("\n"): + if line.startswith("diff --git"): + # Check domains/ and entities/ (both carry domain info) + match = re.search(r"(?:domains|entities)/([^/]+)/", line) + if match: + d = match.group(1) + domain_counts[d] = domain_counts.get(d, 0) + 1 + continue + # Check core/ subdirectories + match = re.search(r"core/([^/]+)/", line) + if match: + d = match.group(1) + if d in DOMAIN_AGENT_MAP: + domain_counts[d] = domain_counts.get(d, 0) + 1 + continue + # Check foundations/ subdirectories + match = re.search(r"foundations/([^/]+)/", line) + if match: + d = match.group(1) + if d in DOMAIN_AGENT_MAP: + domain_counts[d] = domain_counts.get(d, 0) + 1 + if domain_counts: + return max(domain_counts, key=domain_counts.get) + return None + + +def detect_domain_from_branch(branch: str) -> str | None: + """Extract domain from branch name like 'rio/claims-futarchy' → 'internet-finance'. + + Uses agent prefix → primary domain mapping for pipeline branches. + """ + prefix = branch.split("/")[0].lower() if "/" in branch else "" + return _AGENT_PRIMARY_DOMAIN.get(prefix) diff --git a/lib/evaluate.py b/lib/evaluate.py index 197befd..be855d0 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -1,4 +1,4 @@ -"""Evaluate stage — triage + domain review + Leo review. +"""Evaluate stage — PR lifecycle orchestration. Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last. Sonnet (domain review) filters before Opus (Leo review) to maximize value per @@ -13,319 +13,26 @@ Flow per PR: 6. If both approve → status = 'approved' (merge module picks it up) Design reviewed by Ganymede, Rhea, Vida, Theseus. +LLM transport and prompts extracted to lib/llm.py (Phase 3c). """ -import asyncio import json import logging import re from datetime import datetime, timezone from . import config, db +from .domains import agent_for_domain, detect_domain_from_diff +from .forgejo import api as forgejo_api +from .forgejo import get_agent_token, get_pr_diff, repo_path +from .llm import run_domain_review, run_leo_review, triage_pr logger = logging.getLogger("pipeline.evaluate") -# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8) -_active_subprocesses: set = set() - -# ─── Constants ────────────────────────────────────────────────────────────── - -DOMAIN_AGENT_MAP = { - "internet-finance": "Rio", - "entertainment": "Clay", - "health": "Vida", - "ai-alignment": "Theseus", - "space-development": "Astra", - "mechanisms": "Rio", - "living-capital": "Rio", - "living-agents": "Theseus", - "teleohumanity": "Leo", - "grand-strategy": "Leo", - "critical-systems": "Theseus", - "collective-intelligence": "Theseus", - "teleological-economics": "Rio", - "cultural-dynamics": "Clay", -} - - -async def kill_active_subprocesses(): - """Kill all tracked Claude CLI subprocesses. Called during graceful shutdown.""" - for proc in list(_active_subprocesses): - if proc.returncode is None: - logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid) - try: - proc.kill() - await proc.wait() - except ProcessLookupError: - pass - _active_subprocesses.clear() - - -def _agent_token(agent_name: str) -> str | None: - """Read Forgejo token for a named agent. Returns token string or None.""" - token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token" - if token_file.exists(): - return token_file.read_text().strip() - return None - - -REVIEW_STYLE_GUIDE = ( - "Be concise. Only mention what fails or is interesting. " - "Do not summarize what the PR does — the diff speaks for itself. " - "If everything passes, say so in one line and approve." -) - - -# ─── Prompt templates ────────────────────────────────────────────────────── - -TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT. - -DEEP — use when ANY of these apply: -- PR adds or modifies claims rated "likely" or higher confidence -- PR touches agent beliefs or creates cross-domain wiki links -- PR challenges an existing claim (has "challenged_by" or contradicts existing) -- PR modifies axiom-level beliefs -- PR is a cross-domain synthesis claim - -STANDARD — use when: -- New claims in established domain areas -- Enrichments to existing claims (confirm/extend) -- New hypothesis-level beliefs -- Source archives with extraction results - -LIGHT — use ONLY when ALL changes fit these categories: -- Entity attribute updates (factual corrections, new data points) -- Source archiving without extraction -- Formatting fixes, typo corrections -- Status field changes - -IMPORTANT: When uncertain, classify UP, not down. Always err toward more review. - -Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line. - ---- PR DIFF --- -{diff}""" - -DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. - -Review this PR from your domain expertise: -1. Technical accuracy — are the claims factually correct in your domain? -2. Domain duplicates — does your domain already have substantially similar claims? -3. Missing context — is important domain context absent that would change interpretation? -4. Confidence calibration — from your domain expertise, is the confidence level right? -5. Enrichment opportunities — should this connect to existing claims via wiki links? - -{style_guide} - -If you are requesting changes, tag the specific issues: - - -Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by - -End your review with exactly one of: - - - ---- PR DIFF --- -{diff} - ---- CHANGED FILES --- -{files}""" - -LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. - -Review this PR against the quality criteria: -1. Schema compliance — YAML frontmatter, prose-as-title, required fields -2. Duplicate check — does this claim already exist? -3. Confidence calibration — appropriate for the evidence? -4. Wiki link validity — references real claims? -5. Source quality — credible for the claim? -6. Domain assignment — correct domain? -7. Epistemic hygiene — specific enough to be wrong? - -{style_guide} - -If requesting changes, tag the issues: - - -End your review with exactly one of: - - - ---- PR DIFF --- -{diff} - ---- CHANGED FILES --- -{files}""" - -LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. - -Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check: -1. Cross-domain implications — does this claim affect beliefs in other domains? -2. Confidence calibration — is the confidence level justified by the evidence? -3. Contradiction check — does this contradict any existing claims without explicit argument? -4. Wiki link validity — do all wiki links reference real, existing claims? -5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary? -6. Source quality — is the source credible for the claim being made? -7. Duplicate check — does a substantially similar claim already exist? -8. Enrichment vs new claim — should this be an enrichment to an existing claim instead? -9. Domain assignment — is the claim in the correct domain? -10. Schema compliance — YAML frontmatter, prose-as-title format, required fields -11. Epistemic hygiene — is the claim specific enough to be wrong? - -{style_guide} - -If requesting changes, tag the issues: - - -End your review with exactly one of: - - - ---- PR DIFF --- -{diff} - ---- CHANGED FILES --- -{files}""" - - -# ─── API helpers ─────────────────────────────────────────────────────────── - - -async def _forgejo_api(method: str, path: str, body: dict = None, token: str = None): - """Call Forgejo API.""" - import aiohttp - - url = f"{config.FORGEJO_URL}/api/v1{path}" - if token is None: - token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" - headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} - - try: - async with aiohttp.ClientSession() as session: - async with session.request( - method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=60) - ) as resp: - if resp.status >= 400: - text = await resp.text() - logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) - return None - if resp.status == 204: - return {} - return await resp.json() - except Exception as e: - logger.error("Forgejo API error: %s %s → %s", method, path, e) - return None - - -async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: - """Call OpenRouter API. Returns response text or None on failure.""" - import aiohttp - - key_file = config.SECRETS_DIR / "openrouter-key" - if not key_file.exists(): - logger.error("OpenRouter key file not found") - return None - key = key_file.read_text().strip() - - payload = { - "model": model, - "messages": [{"role": "user", "content": prompt}], - "max_tokens": 4096, - "temperature": 0.2, - } - - try: - async with aiohttp.ClientSession() as session: - async with session.post( - config.OPENROUTER_URL, - headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, - json=payload, - timeout=aiohttp.ClientTimeout(total=timeout_sec), - ) as resp: - if resp.status >= 400: - text = await resp.text() - logger.error("OpenRouter %s → %d: %s", model, resp.status, text[:200]) - return None - data = await resp.json() - return data.get("choices", [{}])[0].get("message", {}).get("content") - except Exception as e: - logger.error("OpenRouter error: %s → %s", model, e) - return None - - -async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: - """Call Claude via CLI (Claude Max subscription). Returns response or None.""" - proc = await asyncio.create_subprocess_exec( - str(config.CLAUDE_CLI), - "-p", - "--model", - model, - "--output-format", - "text", - cwd=cwd or str(config.REPO_DIR), - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - _active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8) - try: - stdout, stderr = await asyncio.wait_for( - proc.communicate(input=prompt.encode()), - timeout=timeout_sec, - ) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - logger.error("Claude CLI timed out after %ds", timeout_sec) - return None - finally: - _active_subprocesses.discard(proc) - - out_text = (stdout or b"").decode() - err_text = (stderr or b"").decode() - - # Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message - combined_lower = (out_text + err_text).lower() - if "hit your limit" in combined_lower or "rate limit" in combined_lower: - logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200]) - return "RATE_LIMITED" - - if proc.returncode != 0: - logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200]) - return None - - return out_text.strip() - # ─── Diff helpers ────────────────────────────────────────────────────────── -async def _get_pr_diff(pr_number: int) -> str: - """Fetch PR diff via Forgejo API.""" - import aiohttp - - url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" - token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" - - try: - async with aiohttp.ClientSession() as session: - async with session.get( - url, - headers={"Authorization": f"token {token}", "Accept": "text/plain"}, - timeout=aiohttp.ClientTimeout(total=60), - ) as resp: - if resp.status >= 400: - return "" - diff = await resp.text() - if len(diff) > 2_000_000: - return "" - return diff - except Exception as e: - logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e) - return "" - - def _filter_diff(diff: str) -> tuple[str, str]: """Filter diff to only review-relevant files. @@ -360,38 +67,6 @@ def _extract_changed_files(diff: str) -> str: ) -def _detect_domain_from_diff(diff: str) -> str | None: - """Detect primary domain from changed file paths. - - Checks domains/, entities/, core/, foundations/ for domain classification. - """ - domain_counts: dict[str, int] = {} - for line in diff.split("\n"): - if line.startswith("diff --git"): - # Check domains/ and entities/ (both carry domain info) - match = re.search(r"(?:domains|entities)/([^/]+)/", line) - if match: - d = match.group(1) - domain_counts[d] = domain_counts.get(d, 0) + 1 - continue - # Check core/ subdirectories - match = re.search(r"core/([^/]+)/", line) - if match: - d = match.group(1) - if d in DOMAIN_AGENT_MAP: - domain_counts[d] = domain_counts.get(d, 0) + 1 - continue - # Check foundations/ subdirectories - match = re.search(r"foundations/([^/]+)/", line) - if match: - d = match.group(1) - if d in DOMAIN_AGENT_MAP: - domain_counts[d] = domain_counts.get(d, 0) + 1 - if domain_counts: - return max(domain_counts, key=domain_counts.get) - return None - - def _is_musings_only(diff: str) -> bool: """Check if PR only modifies musing files.""" has_musings = False @@ -428,69 +103,6 @@ def _parse_issues(review_text: str) -> list[str]: return [tag.strip() for tag in match.group(1).split(",") if tag.strip()] -# ─── Review execution ───────────────────────────────────────────────────── - - -async def _triage_pr(diff: str) -> str: - """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" - prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage - result = await _openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) - if not result: - logger.warning("Triage failed, defaulting to STANDARD") - return "STANDARD" - - tier = result.split("\n")[0].strip().upper() - if tier in ("DEEP", "STANDARD", "LIGHT"): - reason = result.split("\n")[1].strip() if "\n" in result else "" - logger.info("Triage: %s — %s", tier, reason[:100]) - return tier - - logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20]) - return "STANDARD" - - -async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: - """Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o.""" - prompt = DOMAIN_PROMPT.format( - agent=agent, - agent_upper=agent.upper(), - domain=domain, - style_guide=REVIEW_STYLE_GUIDE, - diff=diff, - files=files, - ) - - # Try Claude Max Sonnet first - result = await _claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - - if result == "RATE_LIMITED": - # Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck) - policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow") - if policy == "overflow": - logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o") - result = await _openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - else: - logger.info("Claude Max rate limited, queuing domain review") - return None - - return result - - -async def _run_leo_review(diff: str, files: str, tier: str) -> str | None: - """Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy).""" - prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD - prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) - - result = await _claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) - - if result == "RATE_LIMITED": - # Leo review queues — don't waste Opus calls (never overflow) - logger.info("Claude Max Opus rate limited, queuing Leo review") - return None - - return result - - async def _post_formal_approvals(pr_number: int, pr_author: str): """Submit formal Forgejo reviews from 2 agents (not the PR author).""" approvals = 0 @@ -499,12 +111,11 @@ async def _post_formal_approvals(pr_number: int, pr_author: str): continue if approvals >= 2: break - token_file = config.SECRETS_DIR / f"forgejo-{agent_name}-token" - if token_file.exists(): - token = token_file.read_text().strip() - result = await _forgejo_api( + token = get_agent_token(agent_name) + if token: + result = await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/reviews", + repo_path(f"pulls/{pr_number}/reviews"), {"body": "Approved.", "event": "APPROVED"}, token=token, ) @@ -528,16 +139,16 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: return {"pr": pr_number, "skipped": True, "reason": "already_claimed"} # Fetch diff - diff = await _get_pr_diff(pr_number) + diff = await get_pr_diff(pr_number) if not diff: return {"pr": pr_number, "skipped": True, "reason": "no_diff"} # Musings bypass if _is_musings_only(diff): logger.info("PR #%d is musings-only — auto-approving", pr_number) - await _forgejo_api( + await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + repo_path(f"issues/{pr_number}/comments"), {"body": "Auto-approved: musings bypass eval per collective policy."}, ) conn.execute( @@ -554,8 +165,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: files = _extract_changed_files(diff) # Detect domain - domain = _detect_domain_from_diff(diff) - agent = DOMAIN_AGENT_MAP.get(domain, "Leo") if domain else "Leo" + domain = detect_domain_from_diff(diff) + agent = agent_for_domain(domain) # Default NULL domain to 'general' (archive-only PRs have no domain files) if domain is None: @@ -569,7 +180,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Step 1: Triage (if not already triaged) if tier is None: - tier = await _triage_pr(diff) + tier = await triage_pr(diff) conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number)) # Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above) @@ -591,7 +202,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict) else: logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier) - domain_review = await _run_domain_review(review_diff, files, domain or "general", agent) + domain_review = await run_domain_review(review_diff, files, domain or "general", agent) if domain_review is None: # Rate limited, couldn't overflow — revert to open for retry @@ -605,10 +216,10 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) # Post domain review as comment (from agent's Forgejo account) - agent_tok = _agent_token(agent) - await _forgejo_api( + agent_tok = get_agent_token(agent) + await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + repo_path(f"issues/{pr_number}/comments"), {"body": domain_review}, token=agent_tok, ) @@ -629,7 +240,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: leo_verdict = "skipped" if tier != "LIGHT": logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier) - leo_review = await _run_leo_review(review_diff, files, tier) + leo_review = await run_leo_review(review_diff, files, tier) if leo_review is None: # Opus rate limited — revert to open for retry (keep domain verdict) @@ -640,10 +251,10 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number)) # Post Leo review as comment (from Leo's Forgejo account) - leo_tok = _agent_token("Leo") - await _forgejo_api( + leo_tok = get_agent_token("Leo") + await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + repo_path(f"issues/{pr_number}/comments"), {"body": leo_review}, token=leo_tok, ) @@ -656,9 +267,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: if both_approve: # Get PR author for formal approvals - pr_info = await _forgejo_api( + pr_info = await forgejo_api( "GET", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}", + repo_path(f"pulls/{pr_number}"), ) pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" @@ -733,28 +344,36 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: """ global _rate_limit_backoff_until - # If we're in rate-limit backoff, skip this cycle entirely + # Check if we're in Opus rate-limit backoff + opus_backoff = False if _rate_limit_backoff_until is not None: now = datetime.now(timezone.utc) if now < _rate_limit_backoff_until: remaining = int((_rate_limit_backoff_until - now).total_seconds()) - logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining) - return 0, 0 + logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining) + opus_backoff = True else: - logger.info("Rate limit backoff expired, resuming eval cycles") + logger.info("Rate limit backoff expired, resuming full eval cycles") _rate_limit_backoff_until = None # Find PRs ready for evaluation: # - status = 'open' # - tier0_pass = 1 (passed validation) # - leo_verdict = 'pending' OR domain_verdict = 'pending' + # During Opus backoff: only fetch PRs needing triage or domain review + # (skip PRs already domain-reviewed that are waiting for Leo/Opus) # Skip PRs attempted within last 10 minutes (backoff during rate limits) + if opus_backoff: + verdict_filter = "AND p.domain_verdict = 'pending'" + else: + verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')" + rows = conn.execute( - """SELECT p.number, p.tier FROM prs p + f"""SELECT p.number, p.tier FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass = 1 - AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending') + {verdict_filter} AND (p.last_attempt IS NULL OR p.last_attempt < datetime('now', '-10 minutes')) ORDER BY @@ -778,22 +397,51 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: for row in rows: try: + # During Opus backoff, skip PRs that already completed domain review + # (they'd just hit the Opus limit again). Only process PRs still + # needing triage or domain review. + if opus_backoff: + existing = conn.execute( + "SELECT domain_verdict FROM prs WHERE number = ?", + (row["number"],), + ).fetchone() + if existing and existing["domain_verdict"] not in ("pending", None): + logger.debug( + "PR #%d: skipping during Opus backoff (domain already %s)", + row["number"], + existing["domain_verdict"], + ) + continue + result = await evaluate_pr(conn, row["number"], tier=row["tier"]) if result.get("skipped"): reason = result.get("reason", "") logger.debug("PR #%d skipped: %s", row["number"], reason) - # Any rate limit — stop the entire cycle. No point trying more PRs - # when the model is exhausted. The 10-minute backoff on last_attempt - # prevents re-processing the same PR; breaking here prevents - # cycling through OTHER PRs that will also hit the same limit. if "rate_limited" in reason: from datetime import timedelta - _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( - minutes=_RATE_LIMIT_BACKOFF_MINUTES - ) - logger.info("Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES) - break + if reason == "opus_rate_limited": + # Opus hit — set backoff but DON'T break. Other PRs + # may still need triage (Haiku) or domain review (Sonnet). + _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( + minutes=_RATE_LIMIT_BACKOFF_MINUTES + ) + opus_backoff = True # Update local flag so in-loop guard kicks in + logger.info( + "Opus rate limited — backing off Opus for %d min, continuing triage+domain", + _RATE_LIMIT_BACKOFF_MINUTES, + ) + continue + else: + # Non-Opus rate limit (Sonnet/Haiku) — break the cycle, + # nothing else can proceed either. + _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( + minutes=_RATE_LIMIT_BACKOFF_MINUTES + ) + logger.info( + "Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES + ) + break else: succeeded += 1 except Exception: diff --git a/lib/forgejo.py b/lib/forgejo.py new file mode 100644 index 0000000..7bd024f --- /dev/null +++ b/lib/forgejo.py @@ -0,0 +1,83 @@ +"""Forgejo API client — single shared module for all pipeline stages. + +Extracted from evaluate.py, merge.py, validate.py (Phase 3 refactor). +All Forgejo HTTP calls go through this module. +""" + +import logging + +import aiohttp + +from . import config + +logger = logging.getLogger("pipeline.forgejo") + + +async def api(method: str, path: str, body: dict = None, token: str = None): + """Call Forgejo API. Returns parsed JSON, {} for 204, or None on error. + + Args: + method: HTTP method (GET, POST, DELETE, etc.) + path: API path after /api/v1 (e.g. "/repos/teleo/teleo-codex/pulls") + body: JSON body for POST/PUT/PATCH + token: Override token. If None, reads from FORGEJO_TOKEN_FILE (admin token). + """ + url = f"{config.FORGEJO_URL}/api/v1{path}" + if token is None: + token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" + headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} + + try: + async with aiohttp.ClientSession() as session: + async with session.request( + method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=60) + ) as resp: + if resp.status >= 400: + text = await resp.text() + logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) + return None + if resp.status == 204: + return {} + return await resp.json() + except Exception as e: + logger.error("Forgejo API error: %s %s → %s", method, path, e) + return None + + +async def get_pr_diff(pr_number: int) -> str: + """Fetch PR diff via Forgejo API. Returns diff text or empty string.""" + url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" + token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, + headers={"Authorization": f"token {token}", "Accept": "text/plain"}, + timeout=aiohttp.ClientTimeout(total=60), + ) as resp: + if resp.status >= 400: + return "" + diff = await resp.text() + if len(diff) > 2_000_000: + return "" + return diff + except Exception as e: + logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e) + return "" + + +def get_agent_token(agent_name: str) -> str | None: + """Read Forgejo token for a named agent. Returns token string or None.""" + token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token" + if token_file.exists(): + return token_file.read_text().strip() + return None + + +def repo_path(subpath: str = "") -> str: + """Build standard repo API path: /repos/{owner}/{repo}/{subpath}.""" + base = f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}" + if subpath: + return f"{base}/{subpath}" + return base diff --git a/lib/llm.py b/lib/llm.py new file mode 100644 index 0000000..b7079e3 --- /dev/null +++ b/lib/llm.py @@ -0,0 +1,299 @@ +"""LLM transport and review prompts — shared by all evaluation stages. + +Extracted from evaluate.py (Phase 3c refactor). This module owns: +- Prompt templates (triage, domain, Leo) +- OpenRouter API transport +- Claude CLI transport with subprocess tracking +- Review runner functions (triage, domain, Leo) + +Orchestration (PR lifecycle, SQLite state, Forgejo posting) stays in evaluate.py. +""" + +import asyncio +import logging + +import aiohttp + +from . import config + +logger = logging.getLogger("pipeline.llm") + +# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8) +_active_subprocesses: set = set() + + +async def kill_active_subprocesses(): + """Kill all tracked Claude CLI subprocesses. Called during graceful shutdown.""" + for proc in list(_active_subprocesses): + if proc.returncode is None: + logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid) + try: + proc.kill() + await proc.wait() + except ProcessLookupError: + pass + _active_subprocesses.clear() + + +REVIEW_STYLE_GUIDE = ( + "Be concise. Only mention what fails or is interesting. " + "Do not summarize what the PR does — the diff speaks for itself. " + "If everything passes, say so in one line and approve." +) + + +# ─── Prompt templates ────────────────────────────────────────────────────── + +TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT. + +DEEP — use when ANY of these apply: +- PR adds or modifies claims rated "likely" or higher confidence +- PR touches agent beliefs or creates cross-domain wiki links +- PR challenges an existing claim (has "challenged_by" or contradicts existing) +- PR modifies axiom-level beliefs +- PR is a cross-domain synthesis claim + +STANDARD — use when: +- New claims in established domain areas +- Enrichments to existing claims (confirm/extend) +- New hypothesis-level beliefs +- Source archives with extraction results + +LIGHT — use ONLY when ALL changes fit these categories: +- Entity attribute updates (factual corrections, new data points) +- Source archiving without extraction +- Formatting fixes, typo corrections +- Status field changes + +IMPORTANT: When uncertain, classify UP, not down. Always err toward more review. + +Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line. + +--- PR DIFF --- +{diff}""" + +DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base. + +Review this PR from your domain expertise: +1. Technical accuracy — are the claims factually correct in your domain? +2. Domain duplicates — does your domain already have substantially similar claims? +3. Missing context — is important domain context absent that would change interpretation? +4. Confidence calibration — from your domain expertise, is the confidence level right? +5. Enrichment opportunities — should this connect to existing claims via wiki links? + +{style_guide} + +If you are requesting changes, tag the specific issues: + + +Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by + +End your review with exactly one of: + + + +--- PR DIFF --- +{diff} + +--- CHANGED FILES --- +{files}""" + +LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. + +Review this PR against the quality criteria: +1. Schema compliance — YAML frontmatter, prose-as-title, required fields +2. Duplicate check — does this claim already exist? +3. Confidence calibration — appropriate for the evidence? +4. Wiki link validity — references real claims? +5. Source quality — credible for the claim? +6. Domain assignment — correct domain? +7. Epistemic hygiene — specific enough to be wrong? + +{style_guide} + +If requesting changes, tag the issues: + + +End your review with exactly one of: + + + +--- PR DIFF --- +{diff} + +--- CHANGED FILES --- +{files}""" + +LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base. + +Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check: +1. Cross-domain implications — does this claim affect beliefs in other domains? +2. Confidence calibration — is the confidence level justified by the evidence? +3. Contradiction check — does this contradict any existing claims without explicit argument? +4. Wiki link validity — do all wiki links reference real, existing claims? +5. Axiom integrity — if touching axiom-level beliefs, is the justification extraordinary? +6. Source quality — is the source credible for the claim being made? +7. Duplicate check — does a substantially similar claim already exist? +8. Enrichment vs new claim — should this be an enrichment to an existing claim instead? +9. Domain assignment — is the claim in the correct domain? +10. Schema compliance — YAML frontmatter, prose-as-title format, required fields +11. Epistemic hygiene — is the claim specific enough to be wrong? + +{style_guide} + +If requesting changes, tag the issues: + + +End your review with exactly one of: + + + +--- PR DIFF --- +{diff} + +--- CHANGED FILES --- +{files}""" + + +# ─── API helpers ─────────────────────────────────────────────────────────── + + +async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None: + """Call OpenRouter API. Returns response text or None on failure.""" + key_file = config.SECRETS_DIR / "openrouter-key" + if not key_file.exists(): + logger.error("OpenRouter key file not found") + return None + key = key_file.read_text().strip() + + payload = { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 4096, + "temperature": 0.2, + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + config.OPENROUTER_URL, + headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, + json=payload, + timeout=aiohttp.ClientTimeout(total=timeout_sec), + ) as resp: + if resp.status >= 400: + text = await resp.text() + logger.error("OpenRouter %s → %d: %s", model, resp.status, text[:200]) + return None + data = await resp.json() + return data.get("choices", [{}])[0].get("message", {}).get("content") + except Exception as e: + logger.error("OpenRouter error: %s → %s", model, e) + return None + + +async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None: + """Call Claude via CLI (Claude Max subscription). Returns response or None.""" + proc = await asyncio.create_subprocess_exec( + str(config.CLAUDE_CLI), + "-p", + "--model", + model, + "--output-format", + "text", + cwd=cwd or str(config.REPO_DIR), + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(input=prompt.encode()), + timeout=timeout_sec, + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + logger.error("Claude CLI timed out after %ds", timeout_sec) + return None + finally: + _active_subprocesses.discard(proc) + + out_text = (stdout or b"").decode() + err_text = (stderr or b"").decode() + + # Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message + combined_lower = (out_text + err_text).lower() + if "hit your limit" in combined_lower or "rate limit" in combined_lower: + logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200]) + return "RATE_LIMITED" + + if proc.returncode != 0: + logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200]) + return None + + return out_text.strip() + + +# ─── Review execution ───────────────────────────────────────────────────── + + +async def triage_pr(diff: str) -> str: + """Triage PR via Haiku → DEEP/STANDARD/LIGHT.""" + prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage + result = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30) + if not result: + logger.warning("Triage failed, defaulting to STANDARD") + return "STANDARD" + + tier = result.split("\n")[0].strip().upper() + if tier in ("DEEP", "STANDARD", "LIGHT"): + reason = result.split("\n")[1].strip() if "\n" in result else "" + logger.info("Triage: %s — %s", tier, reason[:100]) + return tier + + logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20]) + return "STANDARD" + + +async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None: + """Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o.""" + prompt = DOMAIN_PROMPT.format( + agent=agent, + agent_upper=agent.upper(), + domain=domain, + style_guide=REVIEW_STYLE_GUIDE, + diff=diff, + files=files, + ) + + # Try Claude Max Sonnet first + result = await claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + + if result == "RATE_LIMITED": + # Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck) + policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow") + if policy == "overflow": + logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o") + result = await openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + else: + logger.info("Claude Max rate limited, queuing domain review") + return None + + return result + + +async def run_leo_review(diff: str, files: str, tier: str) -> str | None: + """Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy).""" + prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD + prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files) + + result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT) + + if result == "RATE_LIMITED": + # Leo review queues — don't waste Opus calls (never overflow) + logger.info("Claude Max Opus rate limited, queuing Leo review") + return None + + return result diff --git a/lib/merge.py b/lib/merge.py index a134e05..40f4f97 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -16,6 +16,9 @@ import logging from collections import defaultdict from . import config, db +from .domains import detect_domain_from_branch +from .forgejo import api as forgejo_api +from .forgejo import repo_path logger = logging.getLogger("pipeline.merge") @@ -50,32 +53,6 @@ async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]: return proc.returncode, output -async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list | None: - """Call Forgejo API. Returns parsed JSON or None on error.""" - import aiohttp - - url = f"{config.FORGEJO_URL}/api/v1{path}" - token_file = config.FORGEJO_TOKEN_FILE - token = token_file.read_text().strip() if token_file.exists() else "" - headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} - - try: - async with aiohttp.ClientSession() as session: - async with session.request( - method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30) - ) as resp: - if resp.status >= 400: - text = await resp.text() - logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) - return None - if resp.status == 204: # No content (DELETE) - return {} - return await resp.json() - except Exception as e: - logger.error("Forgejo API error: %s %s → %s", method, path, e) - return None - - # --- PR Discovery (Multiplayer v1) --- @@ -92,9 +69,9 @@ async def discover_external_prs(conn) -> int: page = 1 while True: - prs = await _forgejo_api( + prs = await forgejo_api( "GET", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50&page={page}", + repo_path(f"pulls?state=open&limit=50&page={page}"), ) if not prs: break @@ -107,9 +84,7 @@ async def discover_external_prs(conn) -> int: is_pipeline = author.lower() in pipeline_users origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None - domain = ( - _detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"]) - ) + domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) conn.execute( """INSERT OR IGNORE INTO prs @@ -146,34 +121,6 @@ async def discover_external_prs(conn) -> int: return discovered -def _detect_domain_from_branch(branch: str) -> str | None: - """Extract domain from branch name like 'rio/claims-futarchy' → 'internet-finance'. - - Agent-to-domain mapping for pipeline branches. - """ - agent_domain = { - "rio": "internet-finance", - "clay": "entertainment", - "theseus": "ai-alignment", - "vida": "health", - "astra": "space-development", - "leo": "grand-strategy", - } - prefix = branch.split("/")[0].lower() if "/" in branch else "" - return agent_domain.get(prefix) - - -def _detect_domain_from_files(pr: dict) -> str | None: - """Detect domain from PR's changed files for human-submitted PRs. - - Humans may not follow agent branch naming. Fall back to inspecting - file paths. (Ganymede nit) - """ - # We'd need to fetch files from the API — do it lazily on first eval - # For now, return None. Domain gets set during evaluation. - return None - - async def _post_ack_comment(pr_number: int): """Post acknowledgment comment on human-submitted PR. (Rhea) @@ -185,9 +132,9 @@ async def _post_ack_comment(pr_number: int): "(priority: high). Expected review time: ~5 minutes.\n\n" "_This is an automated message from the Teleo pipeline._" ) - await _forgejo_api( + await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + repo_path(f"issues/{pr_number}/comments"), {"body": body}, ) @@ -296,9 +243,9 @@ async def _rebase_and_push(branch: str) -> tuple[bool, str]: async def _merge_pr(pr_number: int) -> tuple[bool, str]: """Merge PR via Forgejo API. Preserves PR metadata and reviewer attribution.""" - result = await _forgejo_api( + result = await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/merge", + repo_path(f"pulls/{pr_number}/merge"), {"Do": "merge", "merge_message_field": ""}, ) if result is None: @@ -312,9 +259,9 @@ async def _delete_remote_branch(branch: str): If DELETE fails, log and move on — stale branch is cosmetic, stale merge is operational. """ - result = await _forgejo_api( + result = await forgejo_api( "DELETE", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/branches/{branch}", + repo_path(f"branches/{branch}"), ) if result is None: logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch) diff --git a/lib/validate.py b/lib/validate.py index 4a3bbec..afccaaf 100644 --- a/lib/validate.py +++ b/lib/validate.py @@ -16,30 +16,14 @@ from difflib import SequenceMatcher from pathlib import Path from . import config, db +from .domains import VALID_DOMAINS +from .forgejo import api as forgejo_api +from .forgejo import get_pr_diff, repo_path logger = logging.getLogger("pipeline.validate") # ─── Constants ────────────────────────────────────────────────────────────── -VALID_DOMAINS = frozenset( - { - "internet-finance", - "entertainment", - "health", - "ai-alignment", - "space-development", - "grand-strategy", - "mechanisms", - "living-capital", - "living-agents", - "teleohumanity", - "critical-systems", - "collective-intelligence", - "teleological-economics", - "cultural-dynamics", - } -) - VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) VALID_TYPES = frozenset({"claim", "framework"}) REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created") @@ -356,61 +340,11 @@ def extract_claim_files_from_diff(diff: str) -> dict[str, str]: return files -# ─── Forgejo API (using merge module's helper) ───────────────────────────── - - -async def _forgejo_api(method: str, path: str, body: dict = None): - """Call Forgejo API. Reuses merge module pattern.""" - import aiohttp - - url = f"{config.FORGEJO_URL}/api/v1{path}" - token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" - headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} - - try: - async with aiohttp.ClientSession() as session: - async with session.request( - method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30) - ) as resp: - if resp.status >= 400: - text = await resp.text() - logger.error("Forgejo API %s %s → %d: %s", method, path, resp.status, text[:200]) - return None - if resp.status == 204: - return {} - return await resp.json() - except Exception as e: - logger.error("Forgejo API error: %s %s → %s", method, path, e) - return None - - -async def _get_pr_diff(pr_number: int) -> str: - """Fetch PR diff via Forgejo API.""" - import aiohttp - - url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff" - token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else "" - headers = {"Authorization": f"token {token}", "Accept": "text/plain"} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp: - if resp.status >= 400: - return "" - diff = await resp.text() - if len(diff) > 2_000_000: - return "" # Too large - return diff - except Exception as e: - logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e) - return "" - - async def _get_pr_head_sha(pr_number: int) -> str: """Get HEAD SHA of PR's branch.""" - pr_info = await _forgejo_api( + pr_info = await forgejo_api( "GET", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}", + repo_path(f"pulls/{pr_number}"), ) if pr_info: return pr_info.get("head", {}).get("sha", "") @@ -424,9 +358,9 @@ async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool: # Paginate comments (Ganymede standing rule) page = 1 while True: - comments = await _forgejo_api( + comments = await forgejo_api( "GET", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments?limit=50&page={page}", + repo_path(f"issues/{pr_number}/comments?limit=50&page={page}"), ) if not comments: break @@ -469,9 +403,9 @@ async def _post_validation_comment(pr_number: int, results: list[dict], head_sha lines.append(f"\n*tier0-gate v2 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*") - await _forgejo_api( + await forgejo_api( "POST", - f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments", + repo_path(f"issues/{pr_number}/comments"), {"body": "\n".join(lines)}, ) @@ -509,7 +443,7 @@ async def validate_pr(conn, pr_number: int) -> dict: return {"pr": pr_number, "skipped": True, "reason": "already_validated"} # Fetch diff - diff = await _get_pr_diff(pr_number) + diff = await get_pr_diff(pr_number) if not diff: logger.debug("PR #%d: empty or oversized diff", pr_number) return {"pr": pr_number, "skipped": True, "reason": "no_diff"} diff --git a/teleo-pipeline.py b/teleo-pipeline.py index 5c3da0e..d602495 100644 --- a/teleo-pipeline.py +++ b/teleo-pipeline.py @@ -18,8 +18,9 @@ sys.path.insert(0, str(Path(__file__).parent)) from lib import config, db from lib import log as logmod from lib.breaker import CircuitBreaker -from lib.evaluate import evaluate_cycle, kill_active_subprocesses +from lib.evaluate import evaluate_cycle from lib.health import start_health_server, stop_health_server +from lib.llm import kill_active_subprocesses from lib.merge import merge_cycle from lib.validate import validate_cycle