#!/usr/bin/env python3 """Replay fixture-backed decision-engine evals without live model calls.""" from __future__ import annotations import argparse import json from collections import Counter from pathlib import Path from typing import Any from lib.agent_routing import classify_pr_route REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_FIXTURES_DIR = REPO_ROOT / "fixtures" / "decision-engine-eval" DEFAULT_OUTPUT = REPO_ROOT / ".crabbox-results" / "decision-engine-eval.json" VALID_DISPOSITIONS = {"approve", "reject", "escalate"} def _read_json(path: Path) -> dict[str, Any]: with path.open() as fh: data = json.load(fh) if not isinstance(data, dict): raise AssertionError(f"{path.relative_to(REPO_ROOT)} must contain a JSON object") return data def _require_dict(data: dict[str, Any], key: str, fixture_id: str) -> dict[str, Any]: value = data.get(key) if not isinstance(value, dict): raise AssertionError(f"{fixture_id}: {key} must be an object") return value def _require_list(data: dict[str, Any], key: str, fixture_id: str) -> list[Any]: value = data.get(key) if not isinstance(value, list) or not value: raise AssertionError(f"{fixture_id}: {key} must be a non-empty list") return value def _require_str(data: dict[str, Any], key: str, fixture_id: str) -> str: value = data.get(key) if not isinstance(value, str) or not value.strip(): raise AssertionError(f"{fixture_id}: {key} must be a non-empty string") return value def _validate_fixture(fixture: dict[str, Any], path: Path) -> None: fixture_id = _require_str(fixture, "id", str(path)) _require_str(fixture, "lane", fixture_id) input_data = _require_dict(fixture, "input", fixture_id) rubric = _require_dict(fixture, "rubric", fixture_id) expected = _require_dict(fixture, "expected", fixture_id) _require_str(input_data, "diff", fixture_id) _require_list(rubric, "must_check", fixture_id) _require_list(rubric, "reject_if", fixture_id) _require_str(expected, "primary_agent", fixture_id) _require_list(expected, "required_agents", fixture_id) _require_str(expected, "route_kind", fixture_id) disposition = _require_str(expected, "expected_disposition", fixture_id) if disposition not in VALID_DISPOSITIONS: raise AssertionError(f"{fixture_id}: expected_disposition must be one of {sorted(VALID_DISPOSITIONS)}") _require_list(expected, "issue_tags", fixture_id) def load_fixtures(fixtures_dir: Path = DEFAULT_FIXTURES_DIR) -> list[dict[str, Any]]: if not fixtures_dir.exists(): raise AssertionError(f"missing fixtures directory: {fixtures_dir.relative_to(REPO_ROOT)}") fixtures = [] for path in sorted(fixtures_dir.glob("*.json")): fixture = _read_json(path) _validate_fixture(fixture, path) fixtures.append(fixture) if not fixtures: raise AssertionError(f"no fixtures found in {fixtures_dir.relative_to(REPO_ROOT)}") ids = [fixture["id"] for fixture in fixtures] duplicates = [fixture_id for fixture_id, count in Counter(ids).items() if count > 1] if duplicates: raise AssertionError(f"duplicate fixture ids: {duplicates}") return fixtures def _kb_interop_ok(fixture: dict[str, Any]) -> bool | None: if fixture["lane"] != "kb-interop": return None proposed_write = fixture["input"].get("proposed_write") if not isinstance(proposed_write, dict): return False target = str(proposed_write.get("target", "")).lower() denied_actions = proposed_write.get("denied_actions_invoked") return ( target not in {"main", "production", "prod"} and proposed_write.get("mutates_production_db") is False and isinstance(denied_actions, list) and not denied_actions and bool(proposed_write.get("proof_path")) ) def _fixture_result(fixture: dict[str, Any]) -> dict[str, Any]: input_data = fixture["input"] expected = fixture["expected"] route = classify_pr_route( input_data["diff"], branch=input_data.get("branch"), title=input_data.get("title"), body=input_data.get("body"), ) checks = { "route_primary_ok": route.primary_agent == expected["primary_agent"], "route_required_ok": list(route.required_agents) == expected["required_agents"], "route_kind_ok": route.route_kind == expected["route_kind"], "kb_interop_ok": _kb_interop_ok(fixture), } applicable_checks = [value for value in checks.values() if value is not None] return { "id": fixture["id"], "lane": fixture["lane"], "ok": all(applicable_checks), "expected": expected, "actual_route": route.to_audit_dict(), "checks": checks, "baseline_verdict": { "disposition": expected["expected_disposition"], "issue_tags": expected["issue_tags"], "primary_agent": route.primary_agent, "required_agents": list(route.required_agents), "reason": "fixture truth with deterministic route evidence", }, "rubric": fixture["rubric"], } def _load_candidate_output(path: Path | None) -> dict[str, Any] | None: if path is None: return None candidate = _read_json(path) _require_str(candidate, "candidate_name", str(path)) verdicts = candidate.get("verdicts") if not isinstance(verdicts, list): raise AssertionError(f"{path.relative_to(REPO_ROOT)}: verdicts must be a list") return candidate def _score_candidate(results: list[dict[str, Any]], candidate: dict[str, Any] | None) -> dict[str, Any] | None: if candidate is None: return None verdicts_by_id = {} for verdict in candidate["verdicts"]: if not isinstance(verdict, dict): raise AssertionError("candidate verdicts must be JSON objects") fixture_id = _require_str(verdict, "fixture_id", candidate["candidate_name"]) disposition = _require_str(verdict, "disposition", fixture_id) if disposition not in VALID_DISPOSITIONS: raise AssertionError(f"{fixture_id}: candidate disposition must be one of {sorted(VALID_DISPOSITIONS)}") verdicts_by_id[fixture_id] = verdict missing_verdicts: list[str] = [] false_approves: list[str] = [] false_rejects: list[str] = [] route_mismatches: list[str] = [] missing_required_tags: dict[str, list[str]] = {} for result in results: fixture_id = result["id"] expected = result["expected"] verdict = verdicts_by_id.get(fixture_id) if verdict is None: missing_verdicts.append(fixture_id) continue if verdict["disposition"] == "approve" and expected["expected_disposition"] != "approve": false_approves.append(fixture_id) if verdict["disposition"] == "reject" and expected["expected_disposition"] == "approve": false_rejects.append(fixture_id) if verdict.get("primary_agent") and verdict.get("primary_agent") != expected["primary_agent"]: route_mismatches.append(fixture_id) if verdict.get("required_agents") and verdict.get("required_agents") != expected["required_agents"]: route_mismatches.append(fixture_id) expected_tags = set(expected["issue_tags"]) actual_tags = set(verdict.get("issue_tags", [])) missing = sorted(expected_tags - actual_tags) if missing and expected["expected_disposition"] != "approve": missing_required_tags[fixture_id] = missing return { "candidate_name": candidate["candidate_name"], "ok": not (missing_verdicts or false_approves or false_rejects or route_mismatches or missing_required_tags), "missing_verdicts": missing_verdicts, "false_approve_count": len(false_approves), "false_approves": false_approves, "false_reject_count": len(false_rejects), "false_rejects": false_rejects, "route_mismatches": sorted(set(route_mismatches)), "missing_required_tags": missing_required_tags, } def evaluate_fixtures( fixtures: list[dict[str, Any]], *, candidate: dict[str, Any] | None = None, ) -> dict[str, Any]: results = [_fixture_result(fixture) for fixture in fixtures] fixture_count = len(results) route_ok_count = sum(1 for result in results if result["ok"]) candidate_score = _score_candidate(results, candidate) proof_ok = route_ok_count == fixture_count and (candidate_score is None or candidate_score["ok"]) return { "ok": proof_ok, "scope": "decision_engine_replay", "fixture_count": fixture_count, "metrics": { "route_accuracy": route_ok_count / fixture_count, "route_ok_count": route_ok_count, "lanes": dict(sorted(Counter(result["lane"] for result in results).items())), }, "results": results, "candidate": candidate_score, } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--fixtures-dir", default=str(DEFAULT_FIXTURES_DIR)) parser.add_argument("--candidate-output") parser.add_argument("--output", default=str(DEFAULT_OUTPUT)) args = parser.parse_args() fixtures = load_fixtures(Path(args.fixtures_dir)) candidate = _load_candidate_output(Path(args.candidate_output) if args.candidate_output else None) proof = evaluate_fixtures(fixtures, candidate=candidate) output = Path(args.output) if not output.is_absolute(): output = REPO_ROOT / output output.parent.mkdir(parents=True, exist_ok=True) output.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n") print(json.dumps(proof, indent=2, sort_keys=True)) return 0 if proof["ok"] else 1 if __name__ == "__main__": raise SystemExit(main())