Phase 3a: Extract lib/forgejo.py — single Forgejo API client #3

Merged
m3taversal merged 3 commits from ganymede/phase3-forgejo into main 2026-03-13 15:43:11 +00:00
7 changed files with 571 additions and 572 deletions

87
lib/domains.py Normal file
View file

@ -0,0 +1,87 @@
"""Domain→agent mapping and domain detection — single source of truth.
Extracted from evaluate.py and merge.py (Phase 3 refactor).
All domain classification logic goes through this module.
"""
import re
# Canonical domain→agent mapping. Every domain must have exactly one primary agent.
DOMAIN_AGENT_MAP: dict[str, str] = {
"internet-finance": "Rio",
"entertainment": "Clay",
"health": "Vida",
"ai-alignment": "Theseus",
"space-development": "Astra",
"mechanisms": "Rio",
"living-capital": "Rio",
"living-agents": "Theseus",
"teleohumanity": "Leo",
"grand-strategy": "Leo",
"critical-systems": "Theseus",
"collective-intelligence": "Theseus",
"teleological-economics": "Rio",
"cultural-dynamics": "Clay",
}
# Valid domain names — derived from the map, not maintained separately.
VALID_DOMAINS: frozenset[str] = frozenset(DOMAIN_AGENT_MAP.keys())
# Inverse mapping: agent name (lowercase) → primary domain (for branch detection).
_AGENT_PRIMARY_DOMAIN: dict[str, str] = {
"rio": "internet-finance",
"clay": "entertainment",
"theseus": "ai-alignment",
"vida": "health",
"astra": "space-development",
"leo": "grand-strategy",
}
def agent_for_domain(domain: str | None) -> str:
"""Get the reviewing agent for a domain. Falls back to Leo."""
if domain is None:
return "Leo"
return DOMAIN_AGENT_MAP.get(domain, "Leo")
def detect_domain_from_diff(diff: str) -> str | None:
"""Detect primary domain from changed file paths in a unified diff.
Checks domains/, entities/, core/, foundations/ for domain classification.
Returns the most-referenced domain, or None if no domain files found.
"""
domain_counts: dict[str, int] = {}
for line in diff.split("\n"):
if line.startswith("diff --git"):
# Check domains/ and entities/ (both carry domain info)
match = re.search(r"(?:domains|entities)/([^/]+)/", line)
if match:
d = match.group(1)
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check core/ subdirectories
match = re.search(r"core/([^/]+)/", line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check foundations/ subdirectories
match = re.search(r"foundations/([^/]+)/", line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
if domain_counts:
return max(domain_counts, key=domain_counts.get)
return None
def detect_domain_from_branch(branch: str) -> str | None:
"""Extract domain from branch name like 'rio/claims-futarchy''internet-finance'.
Uses agent prefix primary domain mapping for pipeline branches.
"""
prefix = branch.split("/")[0].lower() if "/" in branch else ""
return _AGENT_PRIMARY_DOMAIN.get(prefix)

View file

@ -1,4 +1,4 @@
"""Evaluate stage — triage + domain review + Leo review.
"""Evaluate stage — PR lifecycle orchestration.
Ported from eval-worker.sh. Key architectural change: domain-first, Leo-last.
Sonnet (domain review) filters before Opus (Leo review) to maximize value per
@ -13,319 +13,26 @@ Flow per PR:
6. If both approve status = 'approved' (merge module picks it up)
Design reviewed by Ganymede, Rhea, Vida, Theseus.
LLM transport and prompts extracted to lib/llm.py (Phase 3c).
"""
import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from . import config, db
from .domains import agent_for_domain, detect_domain_from_diff
from .forgejo import api as forgejo_api
from .forgejo import get_agent_token, get_pr_diff, repo_path
from .llm import run_domain_review, run_leo_review, triage_pr
logger = logging.getLogger("pipeline.evaluate")
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
_active_subprocesses: set = set()
# ─── Constants ──────────────────────────────────────────────────────────────
DOMAIN_AGENT_MAP = {
"internet-finance": "Rio",
"entertainment": "Clay",
"health": "Vida",
"ai-alignment": "Theseus",
"space-development": "Astra",
"mechanisms": "Rio",
"living-capital": "Rio",
"living-agents": "Theseus",
"teleohumanity": "Leo",
"grand-strategy": "Leo",
"critical-systems": "Theseus",
"collective-intelligence": "Theseus",
"teleological-economics": "Rio",
"cultural-dynamics": "Clay",
}
async def kill_active_subprocesses():
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
for proc in list(_active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
_active_subprocesses.clear()
def _agent_token(agent_name: str) -> str | None:
"""Read Forgejo token for a named agent. Returns token string or None."""
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
if token_file.exists():
return token_file.read_text().strip()
return None
REVIEW_STYLE_GUIDE = (
"Be concise. Only mention what fails or is interesting. "
"Do not summarize what the PR does — the diff speaks for itself. "
"If everything passes, say so in one line and approve."
)
# ─── Prompt templates ──────────────────────────────────────────────────────
TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT.
DEEP use when ANY of these apply:
- PR adds or modifies claims rated "likely" or higher confidence
- PR touches agent beliefs or creates cross-domain wiki links
- PR challenges an existing claim (has "challenged_by" or contradicts existing)
- PR modifies axiom-level beliefs
- PR is a cross-domain synthesis claim
STANDARD use when:
- New claims in established domain areas
- Enrichments to existing claims (confirm/extend)
- New hypothesis-level beliefs
- Source archives with extraction results
LIGHT use ONLY when ALL changes fit these categories:
- Entity attribute updates (factual corrections, new data points)
- Source archiving without extraction
- Formatting fixes, typo corrections
- Status field changes
IMPORTANT: When uncertain, classify UP, not down. Always err toward more review.
Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line.
--- PR DIFF ---
{diff}"""
DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
Review this PR from your domain expertise:
1. Technical accuracy are the claims factually correct in your domain?
2. Domain duplicates does your domain already have substantially similar claims?
3. Missing context is important domain context absent that would change interpretation?
4. Confidence calibration from your domain expertise, is the confidence level right?
5. Enrichment opportunities should this connect to existing claims via wiki links?
{style_guide}
If you are requesting changes, tag the specific issues:
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by
End your review with exactly one of:
<!-- VERDICT:{agent_upper}:APPROVE -->
<!-- VERDICT:{agent_upper}:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR against the quality criteria:
1. Schema compliance YAML frontmatter, prose-as-title, required fields
2. Duplicate check does this claim already exist?
3. Confidence calibration appropriate for the evidence?
4. Wiki link validity references real claims?
5. Source quality credible for the claim?
6. Domain assignment correct domain?
7. Epistemic hygiene specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check:
1. Cross-domain implications does this claim affect beliefs in other domains?
2. Confidence calibration is the confidence level justified by the evidence?
3. Contradiction check does this contradict any existing claims without explicit argument?
4. Wiki link validity do all wiki links reference real, existing claims?
5. Axiom integrity if touching axiom-level beliefs, is the justification extraordinary?
6. Source quality is the source credible for the claim being made?
7. Duplicate check does a substantially similar claim already exist?
8. Enrichment vs new claim should this be an enrichment to an existing claim instead?
9. Domain assignment is the claim in the correct domain?
10. Schema compliance YAML frontmatter, prose-as-title format, required fields
11. Epistemic hygiene is the claim specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
# ─── API helpers ───────────────────────────────────────────────────────────
async def _forgejo_api(method: str, path: str, body: dict = None, token: str = None):
"""Call Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
if token is None:
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204:
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
async def _openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
import aiohttp
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
logger.error("OpenRouter key file not found")
return None
key = key_file.read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
"temperature": 0.2,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
config.OPENROUTER_URL,
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=timeout_sec),
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("OpenRouter %s%d: %s", model, resp.status, text[:200])
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s%s", model, e)
return None
async def _claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None:
"""Call Claude via CLI (Claude Max subscription). Returns response or None."""
proc = await asyncio.create_subprocess_exec(
str(config.CLAUDE_CLI),
"-p",
"--model",
model,
"--output-format",
"text",
cwd=cwd or str(config.REPO_DIR),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec)
return None
finally:
_active_subprocesses.discard(proc)
out_text = (stdout or b"").decode()
err_text = (stderr or b"").decode()
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
combined_lower = (out_text + err_text).lower()
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
return "RATE_LIMITED"
if proc.returncode != 0:
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
return None
return out_text.strip()
# ─── Diff helpers ──────────────────────────────────────────────────────────
async def _get_pr_diff(pr_number: int) -> str:
"""Fetch PR diff via Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
headers={"Authorization": f"token {token}", "Accept": "text/plain"},
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
if resp.status >= 400:
return ""
diff = await resp.text()
if len(diff) > 2_000_000:
return ""
return diff
except Exception as e:
logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e)
return ""
def _filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
@ -360,38 +67,6 @@ def _extract_changed_files(diff: str) -> str:
)
def _detect_domain_from_diff(diff: str) -> str | None:
"""Detect primary domain from changed file paths.
Checks domains/, entities/, core/, foundations/ for domain classification.
"""
domain_counts: dict[str, int] = {}
for line in diff.split("\n"):
if line.startswith("diff --git"):
# Check domains/ and entities/ (both carry domain info)
match = re.search(r"(?:domains|entities)/([^/]+)/", line)
if match:
d = match.group(1)
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check core/ subdirectories
match = re.search(r"core/([^/]+)/", line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
continue
# Check foundations/ subdirectories
match = re.search(r"foundations/([^/]+)/", line)
if match:
d = match.group(1)
if d in DOMAIN_AGENT_MAP:
domain_counts[d] = domain_counts.get(d, 0) + 1
if domain_counts:
return max(domain_counts, key=domain_counts.get)
return None
def _is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
@ -428,69 +103,6 @@ def _parse_issues(review_text: str) -> list[str]:
return [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
# ─── Review execution ─────────────────────────────────────────────────────
async def _triage_pr(diff: str) -> str:
"""Triage PR via Haiku → DEEP/STANDARD/LIGHT."""
prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage
result = await _openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
if not result:
logger.warning("Triage failed, defaulting to STANDARD")
return "STANDARD"
tier = result.split("\n")[0].strip().upper()
if tier in ("DEEP", "STANDARD", "LIGHT"):
reason = result.split("\n")[1].strip() if "\n" in result else ""
logger.info("Triage: %s%s", tier, reason[:100])
return tier
logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20])
return "STANDARD"
async def _run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o."""
prompt = DOMAIN_PROMPT.format(
agent=agent,
agent_upper=agent.upper(),
domain=domain,
style_guide=REVIEW_STYLE_GUIDE,
diff=diff,
files=files,
)
# Try Claude Max Sonnet first
result = await _claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck)
policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow")
if policy == "overflow":
logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o")
result = await _openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
else:
logger.info("Claude Max rate limited, queuing domain review")
return None
return result
async def _run_leo_review(diff: str, files: str, tier: str) -> str | None:
"""Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy)."""
prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
result = await _claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Leo review queues — don't waste Opus calls (never overflow)
logger.info("Claude Max Opus rate limited, queuing Leo review")
return None
return result
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
@ -499,12 +111,11 @@ async def _post_formal_approvals(pr_number: int, pr_author: str):
continue
if approvals >= 2:
break
token_file = config.SECRETS_DIR / f"forgejo-{agent_name}-token"
if token_file.exists():
token = token_file.read_text().strip()
result = await _forgejo_api(
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/reviews",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
@ -528,16 +139,16 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
return {"pr": pr_number, "skipped": True, "reason": "already_claimed"}
# Fetch diff
diff = await _get_pr_diff(pr_number)
diff = await get_pr_diff(pr_number)
if not diff:
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}
# Musings bypass
if _is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await _forgejo_api(
await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
repo_path(f"issues/{pr_number}/comments"),
{"body": "Auto-approved: musings bypass eval per collective policy."},
)
conn.execute(
@ -554,8 +165,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
files = _extract_changed_files(diff)
# Detect domain
domain = _detect_domain_from_diff(diff)
agent = DOMAIN_AGENT_MAP.get(domain, "Leo") if domain else "Leo"
domain = detect_domain_from_diff(diff)
agent = agent_for_domain(domain)
# Default NULL domain to 'general' (archive-only PRs have no domain files)
if domain is None:
@ -569,7 +180,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Step 1: Triage (if not already triaged)
if tier is None:
tier = await _triage_pr(diff)
tier = await triage_pr(diff)
conn.execute("UPDATE prs SET tier = ? WHERE number = ?", (tier, pr_number))
# Update last_attempt timestamp (status already set to 'reviewing' by atomic claim above)
@ -591,7 +202,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
logger.info("PR #%d: domain review already done (%s), skipping to Leo", pr_number, domain_verdict)
else:
logger.info("PR #%d: domain review (%s/%s, tier=%s)", pr_number, agent, domain, tier)
domain_review = await _run_domain_review(review_diff, files, domain or "general", agent)
domain_review = await run_domain_review(review_diff, files, domain or "general", agent)
if domain_review is None:
# Rate limited, couldn't overflow — revert to open for retry
@ -605,10 +216,10 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
# Post domain review as comment (from agent's Forgejo account)
agent_tok = _agent_token(agent)
await _forgejo_api(
agent_tok = get_agent_token(agent)
await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
repo_path(f"issues/{pr_number}/comments"),
{"body": domain_review},
token=agent_tok,
)
@ -629,7 +240,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
leo_verdict = "skipped"
if tier != "LIGHT":
logger.info("PR #%d: Leo review (tier=%s)", pr_number, tier)
leo_review = await _run_leo_review(review_diff, files, tier)
leo_review = await run_leo_review(review_diff, files, tier)
if leo_review is None:
# Opus rate limited — revert to open for retry (keep domain verdict)
@ -640,10 +251,10 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
leo_tok = _agent_token("Leo")
await _forgejo_api(
leo_tok = get_agent_token("Leo")
await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
repo_path(f"issues/{pr_number}/comments"),
{"body": leo_review},
token=leo_tok,
)
@ -656,9 +267,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
if both_approve:
# Get PR author for formal approvals
pr_info = await _forgejo_api(
pr_info = await forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
repo_path(f"pulls/{pr_number}"),
)
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
@ -733,28 +344,36 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
"""
global _rate_limit_backoff_until
# If we're in rate-limit backoff, skip this cycle entirely
# Check if we're in Opus rate-limit backoff
opus_backoff = False
if _rate_limit_backoff_until is not None:
now = datetime.now(timezone.utc)
if now < _rate_limit_backoff_until:
remaining = int((_rate_limit_backoff_until - now).total_seconds())
logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining)
return 0, 0
logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining)
opus_backoff = True
else:
logger.info("Rate limit backoff expired, resuming eval cycles")
logger.info("Rate limit backoff expired, resuming full eval cycles")
_rate_limit_backoff_until = None
# Find PRs ready for evaluation:
# - status = 'open'
# - tier0_pass = 1 (passed validation)
# - leo_verdict = 'pending' OR domain_verdict = 'pending'
# During Opus backoff: only fetch PRs needing triage or domain review
# (skip PRs already domain-reviewed that are waiting for Leo/Opus)
# Skip PRs attempted within last 10 minutes (backoff during rate limits)
if opus_backoff:
verdict_filter = "AND p.domain_verdict = 'pending'"
else:
verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')"
rows = conn.execute(
"""SELECT p.number, p.tier FROM prs p
f"""SELECT p.number, p.tier FROM prs p
LEFT JOIN sources s ON p.source_path = s.path
WHERE p.status = 'open'
AND p.tier0_pass = 1
AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')
{verdict_filter}
AND (p.last_attempt IS NULL
OR p.last_attempt < datetime('now', '-10 minutes'))
ORDER BY
@ -778,22 +397,51 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]:
for row in rows:
try:
# During Opus backoff, skip PRs that already completed domain review
# (they'd just hit the Opus limit again). Only process PRs still
# needing triage or domain review.
if opus_backoff:
existing = conn.execute(
"SELECT domain_verdict FROM prs WHERE number = ?",
(row["number"],),
).fetchone()
if existing and existing["domain_verdict"] not in ("pending", None):
logger.debug(
"PR #%d: skipping during Opus backoff (domain already %s)",
row["number"],
existing["domain_verdict"],
)
continue
result = await evaluate_pr(conn, row["number"], tier=row["tier"])
if result.get("skipped"):
reason = result.get("reason", "")
logger.debug("PR #%d skipped: %s", row["number"], reason)
# Any rate limit — stop the entire cycle. No point trying more PRs
# when the model is exhausted. The 10-minute backoff on last_attempt
# prevents re-processing the same PR; breaking here prevents
# cycling through OTHER PRs that will also hit the same limit.
if "rate_limited" in reason:
from datetime import timedelta
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info("Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES)
break
if reason == "opus_rate_limited":
# Opus hit — set backoff but DON'T break. Other PRs
# may still need triage (Haiku) or domain review (Sonnet).
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
opus_backoff = True # Update local flag so in-loop guard kicks in
logger.info(
"Opus rate limited — backing off Opus for %d min, continuing triage+domain",
_RATE_LIMIT_BACKOFF_MINUTES,
)
continue
else:
# Non-Opus rate limit (Sonnet/Haiku) — break the cycle,
# nothing else can proceed either.
_rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta(
minutes=_RATE_LIMIT_BACKOFF_MINUTES
)
logger.info(
"Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES
)
break
else:
succeeded += 1
except Exception:

83
lib/forgejo.py Normal file
View file

@ -0,0 +1,83 @@
"""Forgejo API client — single shared module for all pipeline stages.
Extracted from evaluate.py, merge.py, validate.py (Phase 3 refactor).
All Forgejo HTTP calls go through this module.
"""
import logging
import aiohttp
from . import config
logger = logging.getLogger("pipeline.forgejo")
async def api(method: str, path: str, body: dict = None, token: str = None):
"""Call Forgejo API. Returns parsed JSON, {} for 204, or None on error.
Args:
method: HTTP method (GET, POST, DELETE, etc.)
path: API path after /api/v1 (e.g. "/repos/teleo/teleo-codex/pulls")
body: JSON body for POST/PUT/PATCH
token: Override token. If None, reads from FORGEJO_TOKEN_FILE (admin token).
"""
url = f"{config.FORGEJO_URL}/api/v1{path}"
if token is None:
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204:
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
async def get_pr_diff(pr_number: int) -> str:
"""Fetch PR diff via Forgejo API. Returns diff text or empty string."""
url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
headers={"Authorization": f"token {token}", "Accept": "text/plain"},
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
if resp.status >= 400:
return ""
diff = await resp.text()
if len(diff) > 2_000_000:
return ""
return diff
except Exception as e:
logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e)
return ""
def get_agent_token(agent_name: str) -> str | None:
"""Read Forgejo token for a named agent. Returns token string or None."""
token_file = config.SECRETS_DIR / f"forgejo-{agent_name.lower()}-token"
if token_file.exists():
return token_file.read_text().strip()
return None
def repo_path(subpath: str = "") -> str:
"""Build standard repo API path: /repos/{owner}/{repo}/{subpath}."""
base = f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}"
if subpath:
return f"{base}/{subpath}"
return base

299
lib/llm.py Normal file
View file

@ -0,0 +1,299 @@
"""LLM transport and review prompts — shared by all evaluation stages.
Extracted from evaluate.py (Phase 3c refactor). This module owns:
- Prompt templates (triage, domain, Leo)
- OpenRouter API transport
- Claude CLI transport with subprocess tracking
- Review runner functions (triage, domain, Leo)
Orchestration (PR lifecycle, SQLite state, Forgejo posting) stays in evaluate.py.
"""
import asyncio
import logging
import aiohttp
from . import config
logger = logging.getLogger("pipeline.llm")
# Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8)
_active_subprocesses: set = set()
async def kill_active_subprocesses():
"""Kill all tracked Claude CLI subprocesses. Called during graceful shutdown."""
for proc in list(_active_subprocesses):
if proc.returncode is None:
logger.warning("Killing lingering Claude CLI subprocess PID %d", proc.pid)
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
_active_subprocesses.clear()
REVIEW_STYLE_GUIDE = (
"Be concise. Only mention what fails or is interesting. "
"Do not summarize what the PR does — the diff speaks for itself. "
"If everything passes, say so in one line and approve."
)
# ─── Prompt templates ──────────────────────────────────────────────────────
TRIAGE_PROMPT = """Classify this pull request diff into exactly one tier: DEEP, STANDARD, or LIGHT.
DEEP use when ANY of these apply:
- PR adds or modifies claims rated "likely" or higher confidence
- PR touches agent beliefs or creates cross-domain wiki links
- PR challenges an existing claim (has "challenged_by" or contradicts existing)
- PR modifies axiom-level beliefs
- PR is a cross-domain synthesis claim
STANDARD use when:
- New claims in established domain areas
- Enrichments to existing claims (confirm/extend)
- New hypothesis-level beliefs
- Source archives with extraction results
LIGHT use ONLY when ALL changes fit these categories:
- Entity attribute updates (factual corrections, new data points)
- Source archiving without extraction
- Formatting fixes, typo corrections
- Status field changes
IMPORTANT: When uncertain, classify UP, not down. Always err toward more review.
Respond with ONLY the tier name (DEEP, STANDARD, or LIGHT) on the first line, followed by a one-line reason on the second line.
--- PR DIFF ---
{diff}"""
DOMAIN_PROMPT = """You are {agent}, the {domain} domain expert for TeleoHumanity's knowledge base.
Review this PR from your domain expertise:
1. Technical accuracy are the claims factually correct in your domain?
2. Domain duplicates does your domain already have substantially similar claims?
3. Missing context is important domain context absent that would change interpretation?
4. Confidence calibration from your domain expertise, is the confidence level right?
5. Enrichment opportunities should this connect to existing claims via wiki links?
{style_guide}
If you are requesting changes, tag the specific issues:
<!-- ISSUES: tag1, tag2 -->
Valid tags: broken_wiki_links, frontmatter_schema, title_overclaims, confidence_miscalibration, date_errors, factual_discrepancy, near_duplicate, scope_error, source_archive, placeholder_url, missing_challenged_by
End your review with exactly one of:
<!-- VERDICT:{agent_upper}:APPROVE -->
<!-- VERDICT:{agent_upper}:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_STANDARD = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR against the quality criteria:
1. Schema compliance YAML frontmatter, prose-as-title, required fields
2. Duplicate check does this claim already exist?
3. Confidence calibration appropriate for the evidence?
4. Wiki link validity references real claims?
5. Source quality credible for the claim?
6. Domain assignment correct domain?
7. Epistemic hygiene specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
LEO_PROMPT_DEEP = """You are Leo, the lead evaluator for TeleoHumanity's knowledge base.
Review this PR with MAXIMUM scrutiny. This PR may trigger belief cascades. Check:
1. Cross-domain implications does this claim affect beliefs in other domains?
2. Confidence calibration is the confidence level justified by the evidence?
3. Contradiction check does this contradict any existing claims without explicit argument?
4. Wiki link validity do all wiki links reference real, existing claims?
5. Axiom integrity if touching axiom-level beliefs, is the justification extraordinary?
6. Source quality is the source credible for the claim being made?
7. Duplicate check does a substantially similar claim already exist?
8. Enrichment vs new claim should this be an enrichment to an existing claim instead?
9. Domain assignment is the claim in the correct domain?
10. Schema compliance YAML frontmatter, prose-as-title format, required fields
11. Epistemic hygiene is the claim specific enough to be wrong?
{style_guide}
If requesting changes, tag the issues:
<!-- ISSUES: tag1, tag2 -->
End your review with exactly one of:
<!-- VERDICT:LEO:APPROVE -->
<!-- VERDICT:LEO:REQUEST_CHANGES -->
--- PR DIFF ---
{diff}
--- CHANGED FILES ---
{files}"""
# ─── API helpers ───────────────────────────────────────────────────────────
async def openrouter_call(model: str, prompt: str, timeout_sec: int = 120) -> str | None:
"""Call OpenRouter API. Returns response text or None on failure."""
key_file = config.SECRETS_DIR / "openrouter-key"
if not key_file.exists():
logger.error("OpenRouter key file not found")
return None
key = key_file.read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 4096,
"temperature": 0.2,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
config.OPENROUTER_URL,
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=timeout_sec),
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("OpenRouter %s%d: %s", model, resp.status, text[:200])
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s%s", model, e)
return None
async def claude_cli_call(model: str, prompt: str, timeout_sec: int = 600, cwd: str = None) -> str | None:
"""Call Claude via CLI (Claude Max subscription). Returns response or None."""
proc = await asyncio.create_subprocess_exec(
str(config.CLAUDE_CLI),
"-p",
"--model",
model,
"--output-format",
"text",
cwd=cwd or str(config.REPO_DIR),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_active_subprocesses.add(proc) # Track for graceful shutdown (Ganymede #8)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
logger.error("Claude CLI timed out after %ds", timeout_sec)
return None
finally:
_active_subprocesses.discard(proc)
out_text = (stdout or b"").decode()
err_text = (stderr or b"").decode()
# Check for rate limit REGARDLESS of exit code — CLI sometimes exits 0 with limit message
combined_lower = (out_text + err_text).lower()
if "hit your limit" in combined_lower or "rate limit" in combined_lower:
logger.warning("Claude Max rate limited (rc=%d, stdout: %s)", proc.returncode, out_text[:200])
return "RATE_LIMITED"
if proc.returncode != 0:
logger.error("Claude CLI failed (rc=%d): stderr=%s stdout=%s", proc.returncode, err_text[:200], out_text[:200])
return None
return out_text.strip()
# ─── Review execution ─────────────────────────────────────────────────────
async def triage_pr(diff: str) -> str:
"""Triage PR via Haiku → DEEP/STANDARD/LIGHT."""
prompt = TRIAGE_PROMPT.format(diff=diff[:50000]) # Cap diff size for triage
result = await openrouter_call(config.TRIAGE_MODEL, prompt, timeout_sec=30)
if not result:
logger.warning("Triage failed, defaulting to STANDARD")
return "STANDARD"
tier = result.split("\n")[0].strip().upper()
if tier in ("DEEP", "STANDARD", "LIGHT"):
reason = result.split("\n")[1].strip() if "\n" in result else ""
logger.info("Triage: %s%s", tier, reason[:100])
return tier
logger.warning("Triage returned unparseable '%s', defaulting to STANDARD", tier[:20])
return "STANDARD"
async def run_domain_review(diff: str, files: str, domain: str, agent: str) -> str | None:
"""Run domain review. Tries Claude Max Sonnet first, overflows to OpenRouter GPT-4o."""
prompt = DOMAIN_PROMPT.format(
agent=agent,
agent_upper=agent.upper(),
domain=domain,
style_guide=REVIEW_STYLE_GUIDE,
diff=diff,
files=files,
)
# Try Claude Max Sonnet first
result = await claude_cli_call(config.EVAL_DOMAIN_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Overflow to OpenRouter GPT-4o (Rhea: domain review is the volume filter, don't bottleneck)
policy = config.OVERFLOW_POLICY.get("eval_domain", "overflow")
if policy == "overflow":
logger.info("Claude Max rate limited, overflowing domain review to OpenRouter GPT-4o")
result = await openrouter_call(config.EVAL_DEEP_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
else:
logger.info("Claude Max rate limited, queuing domain review")
return None
return result
async def run_leo_review(diff: str, files: str, tier: str) -> str | None:
"""Run Leo review via Claude Max Opus. Returns None if rate limited (queue policy)."""
prompt_template = LEO_PROMPT_DEEP if tier == "DEEP" else LEO_PROMPT_STANDARD
prompt = prompt_template.format(style_guide=REVIEW_STYLE_GUIDE, diff=diff, files=files)
result = await claude_cli_call(config.EVAL_LEO_MODEL, prompt, timeout_sec=config.EVAL_TIMEOUT)
if result == "RATE_LIMITED":
# Leo review queues — don't waste Opus calls (never overflow)
logger.info("Claude Max Opus rate limited, queuing Leo review")
return None
return result

View file

@ -16,6 +16,9 @@ import logging
from collections import defaultdict
from . import config, db
from .domains import detect_domain_from_branch
from .forgejo import api as forgejo_api
from .forgejo import repo_path
logger = logging.getLogger("pipeline.merge")
@ -50,32 +53,6 @@ async def _git(*args, cwd: str = None, timeout: int = 60) -> tuple[int, str]:
return proc.returncode, output
async def _forgejo_api(method: str, path: str, body: dict = None) -> dict | list | None:
"""Call Forgejo API. Returns parsed JSON or None on error."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
token_file = config.FORGEJO_TOKEN_FILE
token = token_file.read_text().strip() if token_file.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204: # No content (DELETE)
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
# --- PR Discovery (Multiplayer v1) ---
@ -92,9 +69,9 @@ async def discover_external_prs(conn) -> int:
page = 1
while True:
prs = await _forgejo_api(
prs = await forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls?state=open&limit=50&page={page}",
repo_path(f"pulls?state=open&limit=50&page={page}"),
)
if not prs:
break
@ -107,9 +84,7 @@ async def discover_external_prs(conn) -> int:
is_pipeline = author.lower() in pipeline_users
origin = "pipeline" if is_pipeline else "human"
priority = "high" if origin == "human" else None
domain = (
_detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"])
)
domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"])
conn.execute(
"""INSERT OR IGNORE INTO prs
@ -146,34 +121,6 @@ async def discover_external_prs(conn) -> int:
return discovered
def _detect_domain_from_branch(branch: str) -> str | None:
"""Extract domain from branch name like 'rio/claims-futarchy''internet-finance'.
Agent-to-domain mapping for pipeline branches.
"""
agent_domain = {
"rio": "internet-finance",
"clay": "entertainment",
"theseus": "ai-alignment",
"vida": "health",
"astra": "space-development",
"leo": "grand-strategy",
}
prefix = branch.split("/")[0].lower() if "/" in branch else ""
return agent_domain.get(prefix)
def _detect_domain_from_files(pr: dict) -> str | None:
"""Detect domain from PR's changed files for human-submitted PRs.
Humans may not follow agent branch naming. Fall back to inspecting
file paths. (Ganymede nit)
"""
# We'd need to fetch files from the API — do it lazily on first eval
# For now, return None. Domain gets set during evaluation.
return None
async def _post_ack_comment(pr_number: int):
"""Post acknowledgment comment on human-submitted PR. (Rhea)
@ -185,9 +132,9 @@ async def _post_ack_comment(pr_number: int):
"(priority: high). Expected review time: ~5 minutes.\n\n"
"_This is an automated message from the Teleo pipeline._"
)
await _forgejo_api(
await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
repo_path(f"issues/{pr_number}/comments"),
{"body": body},
)
@ -296,9 +243,9 @@ async def _rebase_and_push(branch: str) -> tuple[bool, str]:
async def _merge_pr(pr_number: int) -> tuple[bool, str]:
"""Merge PR via Forgejo API. Preserves PR metadata and reviewer attribution."""
result = await _forgejo_api(
result = await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}/merge",
repo_path(f"pulls/{pr_number}/merge"),
{"Do": "merge", "merge_message_field": ""},
)
if result is None:
@ -312,9 +259,9 @@ async def _delete_remote_branch(branch: str):
If DELETE fails, log and move on stale branch is cosmetic,
stale merge is operational.
"""
result = await _forgejo_api(
result = await forgejo_api(
"DELETE",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/branches/{branch}",
repo_path(f"branches/{branch}"),
)
if result is None:
logger.warning("Failed to delete remote branch %s — cosmetic, continuing", branch)

View file

@ -16,30 +16,14 @@ from difflib import SequenceMatcher
from pathlib import Path
from . import config, db
from .domains import VALID_DOMAINS
from .forgejo import api as forgejo_api
from .forgejo import get_pr_diff, repo_path
logger = logging.getLogger("pipeline.validate")
# ─── Constants ──────────────────────────────────────────────────────────────
VALID_DOMAINS = frozenset(
{
"internet-finance",
"entertainment",
"health",
"ai-alignment",
"space-development",
"grand-strategy",
"mechanisms",
"living-capital",
"living-agents",
"teleohumanity",
"critical-systems",
"collective-intelligence",
"teleological-economics",
"cultural-dynamics",
}
)
VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"})
VALID_TYPES = frozenset({"claim", "framework"})
REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created")
@ -356,61 +340,11 @@ def extract_claim_files_from_diff(diff: str) -> dict[str, str]:
return files
# ─── Forgejo API (using merge module's helper) ─────────────────────────────
async def _forgejo_api(method: str, path: str, body: dict = None):
"""Call Forgejo API. Reuses merge module pattern."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1{path}"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, json=body, timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status >= 400:
text = await resp.text()
logger.error("Forgejo API %s %s%d: %s", method, path, resp.status, text[:200])
return None
if resp.status == 204:
return {}
return await resp.json()
except Exception as e:
logger.error("Forgejo API error: %s %s%s", method, path, e)
return None
async def _get_pr_diff(pr_number: int) -> str:
"""Fetch PR diff via Forgejo API."""
import aiohttp
url = f"{config.FORGEJO_URL}/api/v1/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}.diff"
token = config.FORGEJO_TOKEN_FILE.read_text().strip() if config.FORGEJO_TOKEN_FILE.exists() else ""
headers = {"Authorization": f"token {token}", "Accept": "text/plain"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status >= 400:
return ""
diff = await resp.text()
if len(diff) > 2_000_000:
return "" # Too large
return diff
except Exception as e:
logger.error("Failed to fetch diff for PR #%d: %s", pr_number, e)
return ""
async def _get_pr_head_sha(pr_number: int) -> str:
"""Get HEAD SHA of PR's branch."""
pr_info = await _forgejo_api(
pr_info = await forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/pulls/{pr_number}",
repo_path(f"pulls/{pr_number}"),
)
if pr_info:
return pr_info.get("head", {}).get("sha", "")
@ -424,9 +358,9 @@ async def _has_tier0_comment(pr_number: int, head_sha: str) -> bool:
# Paginate comments (Ganymede standing rule)
page = 1
while True:
comments = await _forgejo_api(
comments = await forgejo_api(
"GET",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments?limit=50&page={page}",
repo_path(f"issues/{pr_number}/comments?limit=50&page={page}"),
)
if not comments:
break
@ -469,9 +403,9 @@ async def _post_validation_comment(pr_number: int, results: list[dict], head_sha
lines.append(f"\n*tier0-gate v2 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*")
await _forgejo_api(
await forgejo_api(
"POST",
f"/repos/{config.FORGEJO_OWNER}/{config.FORGEJO_REPO}/issues/{pr_number}/comments",
repo_path(f"issues/{pr_number}/comments"),
{"body": "\n".join(lines)},
)
@ -509,7 +443,7 @@ async def validate_pr(conn, pr_number: int) -> dict:
return {"pr": pr_number, "skipped": True, "reason": "already_validated"}
# Fetch diff
diff = await _get_pr_diff(pr_number)
diff = await get_pr_diff(pr_number)
if not diff:
logger.debug("PR #%d: empty or oversized diff", pr_number)
return {"pr": pr_number, "skipped": True, "reason": "no_diff"}

View file

@ -18,8 +18,9 @@ sys.path.insert(0, str(Path(__file__).parent))
from lib import config, db
from lib import log as logmod
from lib.breaker import CircuitBreaker
from lib.evaluate import evaluate_cycle, kill_active_subprocesses
from lib.evaluate import evaluate_cycle
from lib.health import start_health_server, stop_health_server
from lib.llm import kill_active_subprocesses
from lib.merge import merge_cycle
from lib.validate import validate_cycle