251 lines
8.1 KiB
Python
Executable file
251 lines
8.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Validate the repo-owned Crabbox and Leo CI contract.
|
|
|
|
This is intentionally no-network and dependency-free. It checks the local
|
|
Crabbox config for bounded jobs/secret hygiene and exercises a small Leo route
|
|
contract through the real Phase 1b router.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(REPO_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
from lib.agent_routing import classify_pr_route # noqa: E402
|
|
|
|
CRABBOX_CONFIG = REPO_ROOT / ".crabbox.yaml"
|
|
CRABBOX_DOC = REPO_ROOT / "docs" / "crabbox.md"
|
|
CRABBOX_SKILL = REPO_ROOT / ".agents" / "skills" / "crabbox" / "SKILL.md"
|
|
CRABBOX_WORKFLOW = REPO_ROOT / ".github" / "workflows" / "crabbox.yml"
|
|
CI_WORKFLOW = REPO_ROOT / ".github" / "workflows" / "ci.yml"
|
|
|
|
REQUIRED_JOBS = {
|
|
"unit",
|
|
"lint-phase1b",
|
|
"phase1b-local-proof",
|
|
"sync-smoke",
|
|
"ci-contract",
|
|
}
|
|
REQUIRED_SYNC_EXCLUDES = {
|
|
".cache",
|
|
".venv",
|
|
".pytest_cache",
|
|
".ruff_cache",
|
|
"__pycache__",
|
|
"*.db",
|
|
"*.db-wal",
|
|
"*.db-shm",
|
|
"*.log",
|
|
"logs",
|
|
"secrets",
|
|
".env",
|
|
"node_modules",
|
|
}
|
|
ALLOWED_ENV = {"CI", "PYTHONWARNINGS", "PHASE1B_AGENT_ROUTING_ENABLED"}
|
|
FORBIDDEN_CONFIG_TOKENS = {
|
|
"HCLOUD_TOKEN",
|
|
"HETZNER_TOKEN",
|
|
"CRABBOX_COORDINATOR_TOKEN",
|
|
"GITHUB_TOKEN",
|
|
"GH_TOKEN",
|
|
"OPENROUTER",
|
|
"FORGEJO",
|
|
"BITWARDEN",
|
|
"BW_SESSION",
|
|
"SSH_PRIVATE",
|
|
}
|
|
|
|
|
|
def _read(path: Path) -> str:
|
|
if not path.exists():
|
|
raise AssertionError(f"missing required file: {path.relative_to(REPO_ROOT)}")
|
|
return path.read_text()
|
|
|
|
|
|
def _list_values_under(text: str, parent: str, child: str) -> list[str]:
|
|
lines = text.splitlines()
|
|
in_parent = False
|
|
in_child = False
|
|
values: list[str] = []
|
|
|
|
for line in lines:
|
|
if not in_parent:
|
|
if line == f"{parent}:":
|
|
in_parent = True
|
|
continue
|
|
|
|
if line and not line.startswith(" "):
|
|
break
|
|
|
|
if not in_child:
|
|
if line == f" {child}:":
|
|
in_child = True
|
|
continue
|
|
|
|
if line.startswith(" - "):
|
|
values.append(line.removeprefix(" - ").strip().strip('"'))
|
|
continue
|
|
break
|
|
|
|
return values
|
|
|
|
|
|
def _top_level_job_names(text: str) -> set[str]:
|
|
jobs_match = re.search(r"(?ms)^jobs:\n(?P<body>.*?)(?:\n\S|\Z)", text)
|
|
if not jobs_match:
|
|
return set()
|
|
return set(re.findall(r"^ ([A-Za-z0-9_-]+):\s*$", jobs_match.group("body"), flags=re.MULTILINE))
|
|
|
|
|
|
def _diff_for(*paths: str, line: str = "+type: claim") -> str:
|
|
return "\n".join(f"diff --git a/{path} b/{path}\n{line}" for path in paths)
|
|
|
|
|
|
def _assert_equal(name: str, actual: Any, expected: Any) -> None:
|
|
if actual != expected:
|
|
raise AssertionError(f"{name}: expected {expected!r}, got {actual!r}")
|
|
|
|
|
|
def _validate_leo_route_contract() -> dict[str, Any]:
|
|
cases = [
|
|
{
|
|
"name": "leo_owned_domain",
|
|
"route": classify_pr_route(_diff_for("domains/grand-strategy/strategy.md")),
|
|
"required_agents": ["Leo"],
|
|
"route_kind": "single",
|
|
"fallback": False,
|
|
},
|
|
{
|
|
"name": "leo_fallback",
|
|
"route": classify_pr_route(_diff_for("docs/readme.md"), branch="misc/update"),
|
|
"required_agents": ["Leo"],
|
|
"route_kind": "fallback",
|
|
"fallback": True,
|
|
},
|
|
{
|
|
"name": "leo_cross_domain",
|
|
"route": classify_pr_route(
|
|
_diff_for(
|
|
"foundations/collective-intelligence/collective-ai-goals.md",
|
|
line="+Collective AI goals and AI systems self-understanding need review.",
|
|
)
|
|
),
|
|
"required_agents": ["Leo", "Theseus"],
|
|
"route_kind": "multi",
|
|
"fallback": False,
|
|
},
|
|
{
|
|
"name": "non_leo_single_domain",
|
|
"route": classify_pr_route(_diff_for("domains/internet-finance/x402.md")),
|
|
"required_agents": ["Rio"],
|
|
"route_kind": "single",
|
|
"fallback": False,
|
|
},
|
|
]
|
|
|
|
results = []
|
|
for case in cases:
|
|
route = case["route"]
|
|
result = route.to_audit_dict()
|
|
_assert_equal(f"{case['name']} required_agents", result["required_agents"], case["required_agents"])
|
|
_assert_equal(f"{case['name']} route_kind", result["route_kind"], case["route_kind"])
|
|
_assert_equal(f"{case['name']} fallback", result["fallback"], case["fallback"])
|
|
results.append({"name": case["name"], "route": result})
|
|
|
|
return {
|
|
"ok": True,
|
|
"cases": results,
|
|
"contract": {
|
|
"leo_required_when": [
|
|
"grand-strategy or Leo-owned domain route",
|
|
"no confident route fallback",
|
|
"top-2 cross-domain route where Leo is one of the top owners",
|
|
],
|
|
"leo_not_universal_second_review": True,
|
|
},
|
|
}
|
|
|
|
|
|
def _validate_crabbox_contract() -> dict[str, Any]:
|
|
config = _read(CRABBOX_CONFIG)
|
|
doc = _read(CRABBOX_DOC)
|
|
skill = _read(CRABBOX_SKILL)
|
|
crabbox_workflow = _read(CRABBOX_WORKFLOW)
|
|
ci_workflow = _read(CI_WORKFLOW)
|
|
|
|
jobs = _top_level_job_names(config)
|
|
missing_jobs = sorted(REQUIRED_JOBS - jobs)
|
|
if missing_jobs:
|
|
raise AssertionError(f"missing Crabbox jobs: {missing_jobs}")
|
|
|
|
sync_excludes = set(_list_values_under(config, "sync", "exclude"))
|
|
missing_excludes = sorted(REQUIRED_SYNC_EXCLUDES - sync_excludes)
|
|
if missing_excludes:
|
|
raise AssertionError(f"missing sync excludes: {missing_excludes}")
|
|
|
|
allowed_env = set(_list_values_under(config, "env", "allow"))
|
|
if allowed_env != ALLOWED_ENV:
|
|
raise AssertionError(f"env allowlist must be {sorted(ALLOWED_ENV)}, got {sorted(allowed_env)}")
|
|
|
|
upper_config = config.upper()
|
|
leaked_tokens = sorted(token for token in FORBIDDEN_CONFIG_TOKENS if token in upper_config)
|
|
if leaked_tokens:
|
|
raise AssertionError(f"secret-like token names must not appear in .crabbox.yaml: {leaked_tokens}")
|
|
|
|
if "scripts/check_crabbox_ci_contract.py" not in ci_workflow:
|
|
raise AssertionError("ci.yml must run scripts/check_crabbox_ci_contract.py")
|
|
if "scripts/crabbox_phase1b_proof.sh" not in ci_workflow:
|
|
raise AssertionError("ci.yml must run scripts/crabbox_phase1b_proof.sh")
|
|
if "crabbox_phase1b_proof.sh" not in config:
|
|
raise AssertionError(".crabbox.yaml must run the Phase 1B proof wrapper")
|
|
if "crabbox-ci-contract.json" not in config:
|
|
raise AssertionError(".crabbox.yaml must download the CI contract proof")
|
|
if "runs-on: [self-hosted" not in crabbox_workflow:
|
|
raise AssertionError("crabbox hydration workflow must target the dynamic self-hosted runner label")
|
|
|
|
for job in REQUIRED_JOBS:
|
|
if f"crabbox job run {job}" not in skill and f"`{job}`" not in skill:
|
|
raise AssertionError(f"Crabbox skill must name allowed job {job}")
|
|
|
|
if "production deploy" not in doc.lower() or "not the production deploy system" not in doc.lower():
|
|
raise AssertionError("docs/crabbox.md must preserve the production deploy boundary")
|
|
|
|
return {
|
|
"ok": True,
|
|
"jobs": sorted(jobs),
|
|
"required_jobs": sorted(REQUIRED_JOBS),
|
|
"sync_excludes_checked": sorted(REQUIRED_SYNC_EXCLUDES),
|
|
"env_allowlist": sorted(allowed_env),
|
|
"secret_token_names_absent": sorted(FORBIDDEN_CONFIG_TOKENS),
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--output", default=".crabbox-results/crabbox-ci-contract.json")
|
|
args = parser.parse_args()
|
|
|
|
proof = {
|
|
"ok": True,
|
|
"scope": "crabbox_ci_leo_contract",
|
|
"crabbox": _validate_crabbox_contract(),
|
|
"leo_route_contract": _validate_leo_route_contract(),
|
|
}
|
|
|
|
output = REPO_ROOT / args.output
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
output.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n")
|
|
print(json.dumps(proof, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|