teleo-infrastructure/lib/agent_routing.py
2026-05-29 14:00:13 +02:00

287 lines
7.9 KiB
Python

"""Phase 1b Hermes agent routing.
Routes knowledge-base PRs to the agent identity that owns the changed domain.
This module is deliberately pure: no network, database, LLM, or filesystem IO.
"""
from __future__ import annotations
import re
from dataclasses import asdict, dataclass
AGENT_ORDER: tuple[str, ...] = ("Leo", "Theseus", "Rio", "Vida", "Clay", "Astra")
_AGENT_RANK = {agent: idx for idx, agent in enumerate(AGENT_ORDER)}
DOMAIN_AGENT_MAP: dict[str, str] = {
"grand-strategy": "Leo",
"strategy": "Leo",
"teleohumanity": "Leo",
"collective-intelligence": "Leo",
"ai-alignment": "Theseus",
"ai-systems": "Theseus",
"living-agents": "Theseus",
"critical-systems": "Theseus",
"internet-finance": "Rio",
"mechanisms": "Rio",
"living-capital": "Rio",
"teleological-economics": "Rio",
"health": "Vida",
"entertainment": "Clay",
"cultural-dynamics": "Clay",
"space-development": "Astra",
"space": "Astra",
"robotics": "Astra",
"energy": "Astra",
"manufacturing": "Astra",
"advanced-manufacturing": "Astra",
}
_AGENT_PRIMARY_DOMAIN: dict[str, str] = {
"leo": "grand-strategy",
"theseus": "ai-systems",
"rio": "internet-finance",
"vida": "health",
"clay": "entertainment",
"astra": "space-development",
}
_INGESTION_SOURCE_DOMAIN: dict[str, str] = {
"futardio": "internet-finance",
"metadao": "internet-finance",
"x402": "internet-finance",
}
_DOMAIN_PATH_RE = re.compile(r"^(?:domains|entities|core|foundations)/([^/]+)/")
_AGENT_PATH_RE = re.compile(r"^agents/([^/]+)/")
_KEYWORDS: dict[str, tuple[str, ...]] = {
"Leo": (
"grand strategy",
"collective ai",
"collective ais",
"collective goals",
"goal of the collective",
"self-understanding",
"self understanding",
"teleohumanity",
"meta-governance",
),
"Theseus": (
"ai alignment",
"ai systems",
"ai safety",
"agent alignment",
"prompt injection",
"model behavior",
"llm",
"hermes runtime",
),
"Rio": (
"internet finance",
"x402",
"wallet",
"payment",
"payments",
"onchain",
"defi",
"futarchy",
"metadao",
"prediction market",
"decision market",
"stablecoin",
),
"Vida": (
"health",
"medicine",
"clinical",
"patient",
"doctor",
"disease",
"longevity",
"biotech",
"glp-1",
),
"Clay": (
"entertainment",
"game",
"games",
"media",
"story",
"film",
"music",
"culture",
),
"Astra": (
"space",
"robotics",
"robot",
"energy",
"manufacturing",
"advanced manufacturing",
"hardware",
"satellite",
"rocket",
"nuclear",
),
}
@dataclass(frozen=True)
class RouteEvidence:
agent: str
signal: str
weight: int
value: str
@dataclass(frozen=True)
class AgentRoute:
primary_agent: str
required_agents: tuple[str, ...]
route_kind: str
scores: dict[str, int]
evidence: tuple[RouteEvidence, ...]
fallback: bool = False
touched_domains: tuple[str, ...] = ()
def to_audit_dict(self) -> dict:
return {
"primary_agent": self.primary_agent,
"required_agents": list(self.required_agents),
"route_kind": self.route_kind,
"scores": self.scores,
"evidence": [asdict(item) for item in self.evidence],
"fallback": self.fallback,
"touched_domains": list(self.touched_domains),
}
def _changed_paths(diff: str) -> tuple[str, ...]:
paths: list[str] = []
for line in diff.splitlines():
if not line.startswith("diff --git "):
continue
match = re.match(r"diff --git a/(.*?) b/(.*)$", line)
if match:
paths.append(match.group(2))
return tuple(paths)
def _add_score(
scores: dict[str, int],
evidence: list[RouteEvidence],
agent: str,
signal: str,
weight: int,
value: str,
) -> None:
if agent not in scores:
return
scores[agent] += weight
evidence.append(RouteEvidence(agent=agent, signal=signal, weight=weight, value=value))
def _domain_for_branch(branch: str) -> str | None:
prefix = branch.split("/")[0].lower() if "/" in branch else ""
if prefix in _AGENT_PRIMARY_DOMAIN:
return _AGENT_PRIMARY_DOMAIN[prefix]
if prefix == "ingestion":
rest = branch.split("/", 1)[1].lower() if "/" in branch else ""
for source_key, domain in _INGESTION_SOURCE_DOMAIN.items():
if source_key in rest:
return domain
return None
def _keyword_hits(agent: str, text: str) -> list[str]:
hits = []
for keyword in _KEYWORDS[agent]:
pattern = rf"(?<![a-z0-9]){re.escape(keyword)}(?![a-z0-9])"
if re.search(pattern, text):
hits.append(keyword)
return hits
def classify_pr_route(
diff: str,
*,
branch: str | None = None,
title: str | None = None,
body: str | None = None,
max_required_agents: int = 2,
) -> AgentRoute:
"""Classify a PR into one or two required Hermes reviewer agents."""
max_required_agents = max(1, min(max_required_agents, 2))
scores = {agent: 0 for agent in AGENT_ORDER}
evidence: list[RouteEvidence] = []
touched_domains: list[str] = []
path_signal_found = False
for path in _changed_paths(diff):
domain_match = _DOMAIN_PATH_RE.match(path)
if domain_match:
domain = domain_match.group(1).lower()
if domain in DOMAIN_AGENT_MAP:
agent = DOMAIN_AGENT_MAP[domain]
_add_score(scores, evidence, agent, "path", 8, path)
touched_domains.append(domain)
path_signal_found = True
continue
agent_match = _AGENT_PATH_RE.match(path)
if agent_match:
agent_key = agent_match.group(1).lower()
for agent in AGENT_ORDER:
if agent.lower() == agent_key:
_add_score(scores, evidence, agent, "agent_path", 8, path)
path_signal_found = True
break
if branch and not path_signal_found:
branch_domain = _domain_for_branch(branch)
if branch_domain:
agent = DOMAIN_AGENT_MAP[branch_domain]
_add_score(scores, evidence, agent, "branch", 4, branch)
touched_domains.append(branch_domain)
keyword_text = "\n".join(part for part in (title or "", body or "", branch or "", diff) if part).lower()
for agent in AGENT_ORDER:
hits = _keyword_hits(agent, keyword_text)
for keyword in hits[:4]:
_add_score(scores, evidence, agent, "keyword", 2, keyword)
ranked = sorted(
(agent for agent, score in scores.items() if score > 0),
key=lambda agent: (-scores[agent], _AGENT_RANK[agent]),
)
if not ranked:
evidence.append(RouteEvidence(agent="Leo", signal="fallback", weight=0, value="no route signal"))
return AgentRoute(
primary_agent="Leo",
required_agents=("Leo",),
route_kind="fallback",
scores=scores,
evidence=tuple(evidence),
fallback=True,
touched_domains=(),
)
primary = ranked[0]
required = tuple(ranked[:max_required_agents])
if len(ranked) > max_required_agents:
route_kind = "escalated"
elif len(required) > 1:
route_kind = "multi"
else:
route_kind = "single"
return AgentRoute(
primary_agent=primary,
required_agents=required,
route_kind=route_kind,
scores=scores,
evidence=tuple(evidence),
fallback=False,
touched_domains=tuple(dict.fromkeys(touched_domains)),
)