teleo-infrastructure/scripts/check_crabbox_ci_contract.py
2026-06-01 15:36:03 +02:00

251 lines
8.1 KiB
Python
Executable file

#!/usr/bin/env python3
"""Validate the repo-owned Crabbox and Leo CI contract.
This is intentionally no-network and dependency-free. It checks the local
Crabbox config for bounded jobs/secret hygiene and exercises a small Leo route
contract through the real Phase 1b router.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from lib.agent_routing import classify_pr_route # noqa: E402
CRABBOX_CONFIG = REPO_ROOT / ".crabbox.yaml"
CRABBOX_DOC = REPO_ROOT / "docs" / "crabbox.md"
CRABBOX_SKILL = REPO_ROOT / ".agents" / "skills" / "crabbox" / "SKILL.md"
CRABBOX_WORKFLOW = REPO_ROOT / ".github" / "workflows" / "crabbox.yml"
CI_WORKFLOW = REPO_ROOT / ".github" / "workflows" / "ci.yml"
REQUIRED_JOBS = {
"unit",
"lint-phase1b",
"phase1b-local-proof",
"sync-smoke",
"ci-contract",
}
REQUIRED_SYNC_EXCLUDES = {
".cache",
".venv",
".pytest_cache",
".ruff_cache",
"__pycache__",
"*.db",
"*.db-wal",
"*.db-shm",
"*.log",
"logs",
"secrets",
".env",
"node_modules",
}
ALLOWED_ENV = {"CI", "PYTHONWARNINGS", "PHASE1B_AGENT_ROUTING_ENABLED"}
FORBIDDEN_CONFIG_TOKENS = {
"HCLOUD_TOKEN",
"HETZNER_TOKEN",
"CRABBOX_COORDINATOR_TOKEN",
"GITHUB_TOKEN",
"GH_TOKEN",
"OPENROUTER",
"FORGEJO",
"BITWARDEN",
"BW_SESSION",
"SSH_PRIVATE",
}
def _read(path: Path) -> str:
if not path.exists():
raise AssertionError(f"missing required file: {path.relative_to(REPO_ROOT)}")
return path.read_text()
def _list_values_under(text: str, parent: str, child: str) -> list[str]:
lines = text.splitlines()
in_parent = False
in_child = False
values: list[str] = []
for line in lines:
if not in_parent:
if line == f"{parent}:":
in_parent = True
continue
if line and not line.startswith(" "):
break
if not in_child:
if line == f" {child}:":
in_child = True
continue
if line.startswith(" - "):
values.append(line.removeprefix(" - ").strip().strip('"'))
continue
break
return values
def _top_level_job_names(text: str) -> set[str]:
jobs_match = re.search(r"(?ms)^jobs:\n(?P<body>.*?)(?:\n\S|\Z)", text)
if not jobs_match:
return set()
return set(re.findall(r"^ ([A-Za-z0-9_-]+):\s*$", jobs_match.group("body"), flags=re.MULTILINE))
def _diff_for(*paths: str, line: str = "+type: claim") -> str:
return "\n".join(f"diff --git a/{path} b/{path}\n{line}" for path in paths)
def _assert_equal(name: str, actual: Any, expected: Any) -> None:
if actual != expected:
raise AssertionError(f"{name}: expected {expected!r}, got {actual!r}")
def _validate_leo_route_contract() -> dict[str, Any]:
cases = [
{
"name": "leo_owned_domain",
"route": classify_pr_route(_diff_for("domains/grand-strategy/strategy.md")),
"required_agents": ["Leo"],
"route_kind": "single",
"fallback": False,
},
{
"name": "leo_fallback",
"route": classify_pr_route(_diff_for("docs/readme.md"), branch="misc/update"),
"required_agents": ["Leo"],
"route_kind": "fallback",
"fallback": True,
},
{
"name": "leo_cross_domain",
"route": classify_pr_route(
_diff_for(
"foundations/collective-intelligence/collective-ai-goals.md",
line="+Collective AI goals and AI systems self-understanding need review.",
)
),
"required_agents": ["Leo", "Theseus"],
"route_kind": "multi",
"fallback": False,
},
{
"name": "non_leo_single_domain",
"route": classify_pr_route(_diff_for("domains/internet-finance/x402.md")),
"required_agents": ["Rio"],
"route_kind": "single",
"fallback": False,
},
]
results = []
for case in cases:
route = case["route"]
result = route.to_audit_dict()
_assert_equal(f"{case['name']} required_agents", result["required_agents"], case["required_agents"])
_assert_equal(f"{case['name']} route_kind", result["route_kind"], case["route_kind"])
_assert_equal(f"{case['name']} fallback", result["fallback"], case["fallback"])
results.append({"name": case["name"], "route": result})
return {
"ok": True,
"cases": results,
"contract": {
"leo_required_when": [
"grand-strategy or Leo-owned domain route",
"no confident route fallback",
"top-2 cross-domain route where Leo is one of the top owners",
],
"leo_not_universal_second_review": True,
},
}
def _validate_crabbox_contract() -> dict[str, Any]:
config = _read(CRABBOX_CONFIG)
doc = _read(CRABBOX_DOC)
skill = _read(CRABBOX_SKILL)
crabbox_workflow = _read(CRABBOX_WORKFLOW)
ci_workflow = _read(CI_WORKFLOW)
jobs = _top_level_job_names(config)
missing_jobs = sorted(REQUIRED_JOBS - jobs)
if missing_jobs:
raise AssertionError(f"missing Crabbox jobs: {missing_jobs}")
sync_excludes = set(_list_values_under(config, "sync", "exclude"))
missing_excludes = sorted(REQUIRED_SYNC_EXCLUDES - sync_excludes)
if missing_excludes:
raise AssertionError(f"missing sync excludes: {missing_excludes}")
allowed_env = set(_list_values_under(config, "env", "allow"))
if allowed_env != ALLOWED_ENV:
raise AssertionError(f"env allowlist must be {sorted(ALLOWED_ENV)}, got {sorted(allowed_env)}")
upper_config = config.upper()
leaked_tokens = sorted(token for token in FORBIDDEN_CONFIG_TOKENS if token in upper_config)
if leaked_tokens:
raise AssertionError(f"secret-like token names must not appear in .crabbox.yaml: {leaked_tokens}")
if "scripts/check_crabbox_ci_contract.py" not in ci_workflow:
raise AssertionError("ci.yml must run scripts/check_crabbox_ci_contract.py")
if "scripts/crabbox_phase1b_proof.sh" not in ci_workflow:
raise AssertionError("ci.yml must run scripts/crabbox_phase1b_proof.sh")
if "crabbox_phase1b_proof.sh" not in config:
raise AssertionError(".crabbox.yaml must run the Phase 1B proof wrapper")
if "crabbox-ci-contract.json" not in config:
raise AssertionError(".crabbox.yaml must download the CI contract proof")
if "runs-on: [self-hosted" not in crabbox_workflow:
raise AssertionError("crabbox hydration workflow must target the dynamic self-hosted runner label")
for job in REQUIRED_JOBS:
if f"crabbox job run {job}" not in skill and f"`{job}`" not in skill:
raise AssertionError(f"Crabbox skill must name allowed job {job}")
if "production deploy" not in doc.lower() or "not the production deploy system" not in doc.lower():
raise AssertionError("docs/crabbox.md must preserve the production deploy boundary")
return {
"ok": True,
"jobs": sorted(jobs),
"required_jobs": sorted(REQUIRED_JOBS),
"sync_excludes_checked": sorted(REQUIRED_SYNC_EXCLUDES),
"env_allowlist": sorted(allowed_env),
"secret_token_names_absent": sorted(FORBIDDEN_CONFIG_TOKENS),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--output", default=".crabbox-results/crabbox-ci-contract.json")
args = parser.parse_args()
proof = {
"ok": True,
"scope": "crabbox_ci_leo_contract",
"crabbox": _validate_crabbox_contract(),
"leo_route_contract": _validate_leo_route_contract(),
}
output = REPO_ROOT / args.output
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n")
print(json.dumps(proof, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())