diff --git a/lib/domains.py b/lib/domains.py new file mode 100644 index 0000000..0db6f94 --- /dev/null +++ b/lib/domains.py @@ -0,0 +1,87 @@ +"""Domain→agent mapping and domain detection — single source of truth. + +Extracted from evaluate.py and merge.py (Phase 3 refactor). +All domain classification logic goes through this module. +""" + +import re + +# Canonical domain→agent mapping. Every domain must have exactly one primary agent. +DOMAIN_AGENT_MAP: dict[str, str] = { + "internet-finance": "Rio", + "entertainment": "Clay", + "health": "Vida", + "ai-alignment": "Theseus", + "space-development": "Astra", + "mechanisms": "Rio", + "living-capital": "Rio", + "living-agents": "Theseus", + "teleohumanity": "Leo", + "grand-strategy": "Leo", + "critical-systems": "Theseus", + "collective-intelligence": "Theseus", + "teleological-economics": "Rio", + "cultural-dynamics": "Clay", +} + +# Valid domain names — derived from the map, not maintained separately. +VALID_DOMAINS: frozenset[str] = frozenset(DOMAIN_AGENT_MAP.keys()) + +# Inverse mapping: agent name (lowercase) → primary domain (for branch detection). +_AGENT_PRIMARY_DOMAIN: dict[str, str] = { + "rio": "internet-finance", + "clay": "entertainment", + "theseus": "ai-alignment", + "vida": "health", + "astra": "space-development", + "leo": "grand-strategy", +} + + +def agent_for_domain(domain: str | None) -> str: + """Get the reviewing agent for a domain. Falls back to Leo.""" + if domain is None: + return "Leo" + return DOMAIN_AGENT_MAP.get(domain, "Leo") + + +def detect_domain_from_diff(diff: str) -> str | None: + """Detect primary domain from changed file paths in a unified diff. + + Checks domains/, entities/, core/, foundations/ for domain classification. + Returns the most-referenced domain, or None if no domain files found. + """ + domain_counts: dict[str, int] = {} + for line in diff.split("\n"): + if line.startswith("diff --git"): + # Check domains/ and entities/ (both carry domain info) + match = re.search(r"(?:domains|entities)/([^/]+)/", line) + if match: + d = match.group(1) + domain_counts[d] = domain_counts.get(d, 0) + 1 + continue + # Check core/ subdirectories + match = re.search(r"core/([^/]+)/", line) + if match: + d = match.group(1) + if d in DOMAIN_AGENT_MAP: + domain_counts[d] = domain_counts.get(d, 0) + 1 + continue + # Check foundations/ subdirectories + match = re.search(r"foundations/([^/]+)/", line) + if match: + d = match.group(1) + if d in DOMAIN_AGENT_MAP: + domain_counts[d] = domain_counts.get(d, 0) + 1 + if domain_counts: + return max(domain_counts, key=domain_counts.get) + return None + + +def detect_domain_from_branch(branch: str) -> str | None: + """Extract domain from branch name like 'rio/claims-futarchy' → 'internet-finance'. + + Uses agent prefix → primary domain mapping for pipeline branches. + """ + prefix = branch.split("/")[0].lower() if "/" in branch else "" + return _AGENT_PRIMARY_DOMAIN.get(prefix) diff --git a/lib/evaluate.py b/lib/evaluate.py index 7420d72..b9c45e4 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -22,6 +22,7 @@ import re from datetime import datetime, timezone from . import config, db +from .domains import agent_for_domain, detect_domain_from_diff from .forgejo import api as forgejo_api from .forgejo import get_agent_token, get_pr_diff, repo_path @@ -30,25 +31,6 @@ logger = logging.getLogger("pipeline.evaluate") # Track active Claude CLI subprocesses for graceful shutdown (Ganymede #8) _active_subprocesses: set = set() -# ─── Constants ────────────────────────────────────────────────────────────── - -DOMAIN_AGENT_MAP = { - "internet-finance": "Rio", - "entertainment": "Clay", - "health": "Vida", - "ai-alignment": "Theseus", - "space-development": "Astra", - "mechanisms": "Rio", - "living-capital": "Rio", - "living-agents": "Theseus", - "teleohumanity": "Leo", - "grand-strategy": "Leo", - "critical-systems": "Theseus", - "collective-intelligence": "Theseus", - "teleological-economics": "Rio", - "cultural-dynamics": "Clay", -} - async def kill_active_subprocesses(): """Kill all tracked Claude CLI subprocesses. Called during graceful shutdown.""" @@ -303,38 +285,6 @@ def _extract_changed_files(diff: str) -> str: ) -def _detect_domain_from_diff(diff: str) -> str | None: - """Detect primary domain from changed file paths. - - Checks domains/, entities/, core/, foundations/ for domain classification. - """ - domain_counts: dict[str, int] = {} - for line in diff.split("\n"): - if line.startswith("diff --git"): - # Check domains/ and entities/ (both carry domain info) - match = re.search(r"(?:domains|entities)/([^/]+)/", line) - if match: - d = match.group(1) - domain_counts[d] = domain_counts.get(d, 0) + 1 - continue - # Check core/ subdirectories - match = re.search(r"core/([^/]+)/", line) - if match: - d = match.group(1) - if d in DOMAIN_AGENT_MAP: - domain_counts[d] = domain_counts.get(d, 0) + 1 - continue - # Check foundations/ subdirectories - match = re.search(r"foundations/([^/]+)/", line) - if match: - d = match.group(1) - if d in DOMAIN_AGENT_MAP: - domain_counts[d] = domain_counts.get(d, 0) + 1 - if domain_counts: - return max(domain_counts, key=domain_counts.get) - return None - - def _is_musings_only(diff: str) -> bool: """Check if PR only modifies musing files.""" has_musings = False @@ -496,8 +446,8 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: files = _extract_changed_files(diff) # Detect domain - domain = _detect_domain_from_diff(diff) - agent = DOMAIN_AGENT_MAP.get(domain, "Leo") if domain else "Leo" + domain = detect_domain_from_diff(diff) + agent = agent_for_domain(domain) # Default NULL domain to 'general' (archive-only PRs have no domain files) if domain is None: @@ -675,28 +625,36 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: """ global _rate_limit_backoff_until - # If we're in rate-limit backoff, skip this cycle entirely + # Check if we're in Opus rate-limit backoff + opus_backoff = False if _rate_limit_backoff_until is not None: now = datetime.now(timezone.utc) if now < _rate_limit_backoff_until: remaining = int((_rate_limit_backoff_until - now).total_seconds()) - logger.debug("Rate limit backoff: %d seconds remaining, skipping cycle", remaining) - return 0, 0 + logger.debug("Opus rate limit backoff: %d seconds remaining — triage + domain review continue", remaining) + opus_backoff = True else: - logger.info("Rate limit backoff expired, resuming eval cycles") + logger.info("Rate limit backoff expired, resuming full eval cycles") _rate_limit_backoff_until = None # Find PRs ready for evaluation: # - status = 'open' # - tier0_pass = 1 (passed validation) # - leo_verdict = 'pending' OR domain_verdict = 'pending' + # During Opus backoff: only fetch PRs needing triage or domain review + # (skip PRs already domain-reviewed that are waiting for Leo/Opus) # Skip PRs attempted within last 10 minutes (backoff during rate limits) + if opus_backoff: + verdict_filter = "AND p.domain_verdict = 'pending'" + else: + verdict_filter = "AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending')" + rows = conn.execute( - """SELECT p.number, p.tier FROM prs p + f"""SELECT p.number, p.tier FROM prs p LEFT JOIN sources s ON p.source_path = s.path WHERE p.status = 'open' AND p.tier0_pass = 1 - AND (p.leo_verdict = 'pending' OR p.domain_verdict = 'pending') + {verdict_filter} AND (p.last_attempt IS NULL OR p.last_attempt < datetime('now', '-10 minutes')) ORDER BY @@ -724,18 +682,30 @@ async def evaluate_cycle(conn, max_workers=None) -> tuple[int, int]: if result.get("skipped"): reason = result.get("reason", "") logger.debug("PR #%d skipped: %s", row["number"], reason) - # Any rate limit — stop the entire cycle. No point trying more PRs - # when the model is exhausted. The 10-minute backoff on last_attempt - # prevents re-processing the same PR; breaking here prevents - # cycling through OTHER PRs that will also hit the same limit. if "rate_limited" in reason: from datetime import timedelta - _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( - minutes=_RATE_LIMIT_BACKOFF_MINUTES - ) - logger.info("Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES) - break + if reason == "opus_rate_limited": + # Opus hit — set backoff but DON'T break. Other PRs + # may still need triage (Haiku) or domain review (Sonnet). + _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( + minutes=_RATE_LIMIT_BACKOFF_MINUTES + ) + logger.info( + "Opus rate limited — backing off Opus for %d min, continuing triage+domain", + _RATE_LIMIT_BACKOFF_MINUTES, + ) + continue + else: + # Non-Opus rate limit (Sonnet/Haiku) — break the cycle, + # nothing else can proceed either. + _rate_limit_backoff_until = datetime.now(timezone.utc) + timedelta( + minutes=_RATE_LIMIT_BACKOFF_MINUTES + ) + logger.info( + "Rate limited (%s) — backing off for %d minutes", reason, _RATE_LIMIT_BACKOFF_MINUTES + ) + break else: succeeded += 1 except Exception: diff --git a/lib/merge.py b/lib/merge.py index 49e6180..40f4f97 100644 --- a/lib/merge.py +++ b/lib/merge.py @@ -16,6 +16,7 @@ import logging from collections import defaultdict from . import config, db +from .domains import detect_domain_from_branch from .forgejo import api as forgejo_api from .forgejo import repo_path @@ -83,9 +84,7 @@ async def discover_external_prs(conn) -> int: is_pipeline = author.lower() in pipeline_users origin = "pipeline" if is_pipeline else "human" priority = "high" if origin == "human" else None - domain = ( - _detect_domain_from_files(pr) if not is_pipeline else _detect_domain_from_branch(pr["head"]["ref"]) - ) + domain = None if not is_pipeline else detect_domain_from_branch(pr["head"]["ref"]) conn.execute( """INSERT OR IGNORE INTO prs @@ -122,34 +121,6 @@ async def discover_external_prs(conn) -> int: return discovered -def _detect_domain_from_branch(branch: str) -> str | None: - """Extract domain from branch name like 'rio/claims-futarchy' → 'internet-finance'. - - Agent-to-domain mapping for pipeline branches. - """ - agent_domain = { - "rio": "internet-finance", - "clay": "entertainment", - "theseus": "ai-alignment", - "vida": "health", - "astra": "space-development", - "leo": "grand-strategy", - } - prefix = branch.split("/")[0].lower() if "/" in branch else "" - return agent_domain.get(prefix) - - -def _detect_domain_from_files(pr: dict) -> str | None: - """Detect domain from PR's changed files for human-submitted PRs. - - Humans may not follow agent branch naming. Fall back to inspecting - file paths. (Ganymede nit) - """ - # We'd need to fetch files from the API — do it lazily on first eval - # For now, return None. Domain gets set during evaluation. - return None - - async def _post_ack_comment(pr_number: int): """Post acknowledgment comment on human-submitted PR. (Rhea) diff --git a/lib/validate.py b/lib/validate.py index 988a813..afccaaf 100644 --- a/lib/validate.py +++ b/lib/validate.py @@ -16,6 +16,7 @@ from difflib import SequenceMatcher from pathlib import Path from . import config, db +from .domains import VALID_DOMAINS from .forgejo import api as forgejo_api from .forgejo import get_pr_diff, repo_path @@ -23,25 +24,6 @@ logger = logging.getLogger("pipeline.validate") # ─── Constants ────────────────────────────────────────────────────────────── -VALID_DOMAINS = frozenset( - { - "internet-finance", - "entertainment", - "health", - "ai-alignment", - "space-development", - "grand-strategy", - "mechanisms", - "living-capital", - "living-agents", - "teleohumanity", - "critical-systems", - "collective-intelligence", - "teleological-economics", - "cultural-dynamics", - } -) - VALID_CONFIDENCE = frozenset({"proven", "likely", "experimental", "speculative"}) VALID_TYPES = frozenset({"claim", "framework"}) REQUIRED_FIELDS = ("type", "domain", "description", "confidence", "source", "created")