Some checks are pending
CI / lint-and-test (push) Waiting to run
rejection_reason was always NULL in review_records — now populated with comma-joined issue tags (near_duplicate, frontmatter_schema, etc.) at both rejection call sites. Also fixes stale reviewer_model="gpt-4o" hardcoding to use config.EVAL_DOMAIN_MODEL (currently Gemini Flash). Ingestion branches (ingestion/futardio-*, ingestion/metadao-*) now resolve to internet-finance domain instead of falling through to "general". Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
100 lines
3.5 KiB
Python
100 lines
3.5 KiB
Python
"""Domain→agent mapping and domain detection — single source of truth.
|
|
|
|
Extracted from evaluate.py and merge.py (Phase 3 refactor).
|
|
All domain classification logic goes through this module.
|
|
"""
|
|
|
|
import re
|
|
|
|
# Canonical domain→agent mapping. Every domain must have exactly one primary agent.
|
|
DOMAIN_AGENT_MAP: dict[str, str] = {
|
|
"internet-finance": "Rio",
|
|
"entertainment": "Clay",
|
|
"health": "Vida",
|
|
"ai-alignment": "Theseus",
|
|
"space-development": "Astra",
|
|
"mechanisms": "Rio",
|
|
"living-capital": "Rio",
|
|
"living-agents": "Theseus",
|
|
"teleohumanity": "Leo",
|
|
"grand-strategy": "Leo",
|
|
"critical-systems": "Theseus",
|
|
"collective-intelligence": "Theseus",
|
|
"teleological-economics": "Rio",
|
|
"cultural-dynamics": "Clay",
|
|
}
|
|
|
|
# Valid domain names — derived from the map, not maintained separately.
|
|
VALID_DOMAINS: frozenset[str] = frozenset(DOMAIN_AGENT_MAP.keys())
|
|
|
|
# Inverse mapping: agent name (lowercase) → primary domain (for branch detection).
|
|
_AGENT_PRIMARY_DOMAIN: dict[str, str] = {
|
|
"rio": "internet-finance",
|
|
"clay": "entertainment",
|
|
"theseus": "ai-alignment",
|
|
"vida": "health",
|
|
"astra": "space-development",
|
|
"leo": "grand-strategy",
|
|
}
|
|
|
|
_INGESTION_SOURCE_DOMAIN: dict[str, str] = {
|
|
"futardio": "internet-finance",
|
|
"metadao": "internet-finance",
|
|
}
|
|
|
|
|
|
def agent_for_domain(domain: str | None) -> str:
|
|
"""Get the reviewing agent for a domain. Falls back to Leo."""
|
|
if domain is None:
|
|
return "Leo"
|
|
return DOMAIN_AGENT_MAP.get(domain, "Leo")
|
|
|
|
|
|
def detect_domain_from_diff(diff: str) -> str | None:
|
|
"""Detect primary domain from changed file paths in a unified diff.
|
|
|
|
Checks domains/, entities/, core/, foundations/ for domain classification.
|
|
Returns the most-referenced domain, or None if no domain files found.
|
|
"""
|
|
domain_counts: dict[str, int] = {}
|
|
for line in diff.split("\n"):
|
|
if line.startswith("diff --git"):
|
|
# Check domains/ and entities/ (both carry domain info)
|
|
match = re.search(r"(?:domains|entities)/([^/]+)/", line)
|
|
if match:
|
|
d = match.group(1)
|
|
domain_counts[d] = domain_counts.get(d, 0) + 1
|
|
continue
|
|
# Check core/ subdirectories
|
|
match = re.search(r"core/([^/]+)/", line)
|
|
if match:
|
|
d = match.group(1)
|
|
if d in DOMAIN_AGENT_MAP:
|
|
domain_counts[d] = domain_counts.get(d, 0) + 1
|
|
continue
|
|
# Check foundations/ subdirectories
|
|
match = re.search(r"foundations/([^/]+)/", line)
|
|
if match:
|
|
d = match.group(1)
|
|
if d in DOMAIN_AGENT_MAP:
|
|
domain_counts[d] = domain_counts.get(d, 0) + 1
|
|
if domain_counts:
|
|
return max(domain_counts, key=domain_counts.get)
|
|
return None
|
|
|
|
|
|
def detect_domain_from_branch(branch: str) -> str | None:
|
|
"""Extract domain from branch name like 'rio/claims-futarchy' → 'internet-finance'.
|
|
|
|
Uses agent prefix → primary domain mapping for pipeline branches.
|
|
For ingestion branches, checks the rest of the name for source-type hints.
|
|
"""
|
|
prefix = branch.split("/")[0].lower() if "/" in branch else ""
|
|
if prefix in _AGENT_PRIMARY_DOMAIN:
|
|
return _AGENT_PRIMARY_DOMAIN[prefix]
|
|
if prefix == "ingestion":
|
|
rest = branch.split("/", 1)[1].lower() if "/" in branch else ""
|
|
for source_key, domain in _INGESTION_SOURCE_DOMAIN.items():
|
|
if source_key in rest:
|
|
return domain
|
|
return None
|