"""Phase 1b Hermes agent routing. Routes knowledge-base PRs to the agent identity that owns the changed domain. This module is deliberately pure: no network, database, LLM, or filesystem IO. """ from __future__ import annotations import re from dataclasses import asdict, dataclass AGENT_ORDER: tuple[str, ...] = ("Leo", "Theseus", "Rio", "Vida", "Clay", "Astra") _AGENT_RANK = {agent: idx for idx, agent in enumerate(AGENT_ORDER)} DOMAIN_AGENT_MAP: dict[str, str] = { "grand-strategy": "Leo", "strategy": "Leo", "teleohumanity": "Leo", "collective-intelligence": "Leo", "ai-alignment": "Theseus", "ai-systems": "Theseus", "living-agents": "Theseus", "critical-systems": "Theseus", "internet-finance": "Rio", "mechanisms": "Rio", "living-capital": "Rio", "teleological-economics": "Rio", "health": "Vida", "entertainment": "Clay", "cultural-dynamics": "Clay", "space-development": "Astra", "space": "Astra", "robotics": "Astra", "energy": "Astra", "manufacturing": "Astra", "advanced-manufacturing": "Astra", } _AGENT_PRIMARY_DOMAIN: dict[str, str] = { "leo": "grand-strategy", "theseus": "ai-systems", "rio": "internet-finance", "vida": "health", "clay": "entertainment", "astra": "space-development", } _INGESTION_SOURCE_DOMAIN: dict[str, str] = { "futardio": "internet-finance", "metadao": "internet-finance", "x402": "internet-finance", } _DOMAIN_PATH_RE = re.compile(r"^(?:domains|entities|core|foundations)/([^/]+)/") _AGENT_PATH_RE = re.compile(r"^agents/([^/]+)/") _KEYWORDS: dict[str, tuple[str, ...]] = { "Leo": ( "grand strategy", "collective ai", "collective ais", "collective goals", "goal of the collective", "self-understanding", "self understanding", "teleohumanity", "meta-governance", ), "Theseus": ( "ai alignment", "ai systems", "ai safety", "agent alignment", "prompt injection", "model behavior", "llm", "hermes runtime", ), "Rio": ( "internet finance", "x402", "wallet", "payment", "payments", "onchain", "defi", "futarchy", "metadao", "prediction market", "decision market", "stablecoin", ), "Vida": ( "health", "medicine", "clinical", "patient", "doctor", "disease", "longevity", "biotech", "glp-1", ), "Clay": ( "entertainment", "game", "games", "media", "story", "film", "music", "culture", ), "Astra": ( "space", "robotics", "robot", "energy", "manufacturing", "advanced manufacturing", "hardware", "satellite", "rocket", "nuclear", ), } @dataclass(frozen=True) class RouteEvidence: agent: str signal: str weight: int value: str @dataclass(frozen=True) class AgentRoute: primary_agent: str required_agents: tuple[str, ...] route_kind: str scores: dict[str, int] evidence: tuple[RouteEvidence, ...] fallback: bool = False touched_domains: tuple[str, ...] = () def to_audit_dict(self) -> dict: return { "primary_agent": self.primary_agent, "required_agents": list(self.required_agents), "route_kind": self.route_kind, "scores": self.scores, "evidence": [asdict(item) for item in self.evidence], "fallback": self.fallback, "touched_domains": list(self.touched_domains), } def _changed_paths(diff: str) -> tuple[str, ...]: paths: list[str] = [] for line in diff.splitlines(): if not line.startswith("diff --git "): continue match = re.match(r"diff --git a/(.*?) b/(.*)$", line) if match: paths.append(match.group(2)) return tuple(paths) def _add_score( scores: dict[str, int], evidence: list[RouteEvidence], agent: str, signal: str, weight: int, value: str, ) -> None: if agent not in scores: return scores[agent] += weight evidence.append(RouteEvidence(agent=agent, signal=signal, weight=weight, value=value)) def _domain_for_branch(branch: str) -> str | None: prefix = branch.split("/")[0].lower() if "/" in branch else "" if prefix in _AGENT_PRIMARY_DOMAIN: return _AGENT_PRIMARY_DOMAIN[prefix] if prefix == "ingestion": rest = branch.split("/", 1)[1].lower() if "/" in branch else "" for source_key, domain in _INGESTION_SOURCE_DOMAIN.items(): if source_key in rest: return domain return None def _keyword_hits(agent: str, text: str) -> list[str]: hits = [] for keyword in _KEYWORDS[agent]: pattern = rf"(? AgentRoute: """Classify a PR into one or two required Hermes reviewer agents.""" max_required_agents = max(1, min(max_required_agents, 2)) scores = {agent: 0 for agent in AGENT_ORDER} evidence: list[RouteEvidence] = [] touched_domains: list[str] = [] path_signal_found = False for path in _changed_paths(diff): domain_match = _DOMAIN_PATH_RE.match(path) if domain_match: domain = domain_match.group(1).lower() if domain in DOMAIN_AGENT_MAP: agent = DOMAIN_AGENT_MAP[domain] _add_score(scores, evidence, agent, "path", 8, path) touched_domains.append(domain) path_signal_found = True continue agent_match = _AGENT_PATH_RE.match(path) if agent_match: agent_key = agent_match.group(1).lower() for agent in AGENT_ORDER: if agent.lower() == agent_key: _add_score(scores, evidence, agent, "agent_path", 8, path) path_signal_found = True break if branch and not path_signal_found: branch_domain = _domain_for_branch(branch) if branch_domain: agent = DOMAIN_AGENT_MAP[branch_domain] _add_score(scores, evidence, agent, "branch", 4, branch) touched_domains.append(branch_domain) keyword_text = "\n".join(part for part in (title or "", body or "", branch or "", diff) if part).lower() for agent in AGENT_ORDER: hits = _keyword_hits(agent, keyword_text) for keyword in hits[:4]: _add_score(scores, evidence, agent, "keyword", 2, keyword) ranked = sorted( (agent for agent, score in scores.items() if score > 0), key=lambda agent: (-scores[agent], _AGENT_RANK[agent]), ) if not ranked: evidence.append(RouteEvidence(agent="Leo", signal="fallback", weight=0, value="no route signal")) return AgentRoute( primary_agent="Leo", required_agents=("Leo",), route_kind="fallback", scores=scores, evidence=tuple(evidence), fallback=True, touched_domains=(), ) primary = ranked[0] required = tuple(ranked[:max_required_agents]) if len(ranked) > max_required_agents: route_kind = "escalated" elif len(required) > 1: route_kind = "multi" else: route_kind = "single" return AgentRoute( primary_agent=primary, required_agents=required, route_kind=route_kind, scores=scores, evidence=tuple(evidence), fallback=False, touched_domains=tuple(dict.fromkeys(touched_domains)), )