teleo-infrastructure/scripts/replay_decision_engine_eval.py
twentyOne2x 71ea7a625c Add decision engine replay harness
- Add source-linked model discovery registry for bakeoff candidates
- Add Rio, Theseus, and KB interop fixtures with deterministic replay proof
- Gate CI on replay output; verify with 424-test suite

`.crabbox.yaml`
`.github/workflows/ci.yml`
`docs/llm-refinement-decision-engine.md`
`docs/model-discovery-registry.md`
`fixtures/decision-engine-eval/kb_interop_propose_only.json`
`fixtures/decision-engine-eval/rio_meteora_lp_incentives.json`
`fixtures/decision-engine-eval/theseus_live_model_switch_reject.json`
`scripts/check_llm_refinement_contract.py`
`scripts/replay_decision_engine_eval.py`
`tests/test_decision_engine_replay.py`
2026-06-01 17:37:38 +02:00

244 lines
9.7 KiB
Python
Executable file

#!/usr/bin/env python3
"""Replay fixture-backed decision-engine evals without live model calls."""
from __future__ import annotations
import argparse
import json
from collections import Counter
from pathlib import Path
from typing import Any
from lib.agent_routing import classify_pr_route
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_FIXTURES_DIR = REPO_ROOT / "fixtures" / "decision-engine-eval"
DEFAULT_OUTPUT = REPO_ROOT / ".crabbox-results" / "decision-engine-eval.json"
VALID_DISPOSITIONS = {"approve", "reject", "escalate"}
def _read_json(path: Path) -> dict[str, Any]:
with path.open() as fh:
data = json.load(fh)
if not isinstance(data, dict):
raise AssertionError(f"{path.relative_to(REPO_ROOT)} must contain a JSON object")
return data
def _require_dict(data: dict[str, Any], key: str, fixture_id: str) -> dict[str, Any]:
value = data.get(key)
if not isinstance(value, dict):
raise AssertionError(f"{fixture_id}: {key} must be an object")
return value
def _require_list(data: dict[str, Any], key: str, fixture_id: str) -> list[Any]:
value = data.get(key)
if not isinstance(value, list) or not value:
raise AssertionError(f"{fixture_id}: {key} must be a non-empty list")
return value
def _require_str(data: dict[str, Any], key: str, fixture_id: str) -> str:
value = data.get(key)
if not isinstance(value, str) or not value.strip():
raise AssertionError(f"{fixture_id}: {key} must be a non-empty string")
return value
def _validate_fixture(fixture: dict[str, Any], path: Path) -> None:
fixture_id = _require_str(fixture, "id", str(path))
_require_str(fixture, "lane", fixture_id)
input_data = _require_dict(fixture, "input", fixture_id)
rubric = _require_dict(fixture, "rubric", fixture_id)
expected = _require_dict(fixture, "expected", fixture_id)
_require_str(input_data, "diff", fixture_id)
_require_list(rubric, "must_check", fixture_id)
_require_list(rubric, "reject_if", fixture_id)
_require_str(expected, "primary_agent", fixture_id)
_require_list(expected, "required_agents", fixture_id)
_require_str(expected, "route_kind", fixture_id)
disposition = _require_str(expected, "expected_disposition", fixture_id)
if disposition not in VALID_DISPOSITIONS:
raise AssertionError(f"{fixture_id}: expected_disposition must be one of {sorted(VALID_DISPOSITIONS)}")
_require_list(expected, "issue_tags", fixture_id)
def load_fixtures(fixtures_dir: Path = DEFAULT_FIXTURES_DIR) -> list[dict[str, Any]]:
if not fixtures_dir.exists():
raise AssertionError(f"missing fixtures directory: {fixtures_dir.relative_to(REPO_ROOT)}")
fixtures = []
for path in sorted(fixtures_dir.glob("*.json")):
fixture = _read_json(path)
_validate_fixture(fixture, path)
fixtures.append(fixture)
if not fixtures:
raise AssertionError(f"no fixtures found in {fixtures_dir.relative_to(REPO_ROOT)}")
ids = [fixture["id"] for fixture in fixtures]
duplicates = [fixture_id for fixture_id, count in Counter(ids).items() if count > 1]
if duplicates:
raise AssertionError(f"duplicate fixture ids: {duplicates}")
return fixtures
def _kb_interop_ok(fixture: dict[str, Any]) -> bool | None:
if fixture["lane"] != "kb-interop":
return None
proposed_write = fixture["input"].get("proposed_write")
if not isinstance(proposed_write, dict):
return False
target = str(proposed_write.get("target", "")).lower()
denied_actions = proposed_write.get("denied_actions_invoked")
return (
target not in {"main", "production", "prod"}
and proposed_write.get("mutates_production_db") is False
and isinstance(denied_actions, list)
and not denied_actions
and bool(proposed_write.get("proof_path"))
)
def _fixture_result(fixture: dict[str, Any]) -> dict[str, Any]:
input_data = fixture["input"]
expected = fixture["expected"]
route = classify_pr_route(
input_data["diff"],
branch=input_data.get("branch"),
title=input_data.get("title"),
body=input_data.get("body"),
)
checks = {
"route_primary_ok": route.primary_agent == expected["primary_agent"],
"route_required_ok": list(route.required_agents) == expected["required_agents"],
"route_kind_ok": route.route_kind == expected["route_kind"],
"kb_interop_ok": _kb_interop_ok(fixture),
}
applicable_checks = [value for value in checks.values() if value is not None]
return {
"id": fixture["id"],
"lane": fixture["lane"],
"ok": all(applicable_checks),
"expected": expected,
"actual_route": route.to_audit_dict(),
"checks": checks,
"baseline_verdict": {
"disposition": expected["expected_disposition"],
"issue_tags": expected["issue_tags"],
"primary_agent": route.primary_agent,
"required_agents": list(route.required_agents),
"reason": "fixture truth with deterministic route evidence",
},
"rubric": fixture["rubric"],
}
def _load_candidate_output(path: Path | None) -> dict[str, Any] | None:
if path is None:
return None
candidate = _read_json(path)
_require_str(candidate, "candidate_name", str(path))
verdicts = candidate.get("verdicts")
if not isinstance(verdicts, list):
raise AssertionError(f"{path.relative_to(REPO_ROOT)}: verdicts must be a list")
return candidate
def _score_candidate(results: list[dict[str, Any]], candidate: dict[str, Any] | None) -> dict[str, Any] | None:
if candidate is None:
return None
verdicts_by_id = {}
for verdict in candidate["verdicts"]:
if not isinstance(verdict, dict):
raise AssertionError("candidate verdicts must be JSON objects")
fixture_id = _require_str(verdict, "fixture_id", candidate["candidate_name"])
disposition = _require_str(verdict, "disposition", fixture_id)
if disposition not in VALID_DISPOSITIONS:
raise AssertionError(f"{fixture_id}: candidate disposition must be one of {sorted(VALID_DISPOSITIONS)}")
verdicts_by_id[fixture_id] = verdict
missing_verdicts: list[str] = []
false_approves: list[str] = []
false_rejects: list[str] = []
route_mismatches: list[str] = []
missing_required_tags: dict[str, list[str]] = {}
for result in results:
fixture_id = result["id"]
expected = result["expected"]
verdict = verdicts_by_id.get(fixture_id)
if verdict is None:
missing_verdicts.append(fixture_id)
continue
if verdict["disposition"] == "approve" and expected["expected_disposition"] != "approve":
false_approves.append(fixture_id)
if verdict["disposition"] == "reject" and expected["expected_disposition"] == "approve":
false_rejects.append(fixture_id)
if verdict.get("primary_agent") and verdict.get("primary_agent") != expected["primary_agent"]:
route_mismatches.append(fixture_id)
if verdict.get("required_agents") and verdict.get("required_agents") != expected["required_agents"]:
route_mismatches.append(fixture_id)
expected_tags = set(expected["issue_tags"])
actual_tags = set(verdict.get("issue_tags", []))
missing = sorted(expected_tags - actual_tags)
if missing and expected["expected_disposition"] != "approve":
missing_required_tags[fixture_id] = missing
return {
"candidate_name": candidate["candidate_name"],
"ok": not (missing_verdicts or false_approves or false_rejects or route_mismatches or missing_required_tags),
"missing_verdicts": missing_verdicts,
"false_approve_count": len(false_approves),
"false_approves": false_approves,
"false_reject_count": len(false_rejects),
"false_rejects": false_rejects,
"route_mismatches": sorted(set(route_mismatches)),
"missing_required_tags": missing_required_tags,
}
def evaluate_fixtures(
fixtures: list[dict[str, Any]],
*,
candidate: dict[str, Any] | None = None,
) -> dict[str, Any]:
results = [_fixture_result(fixture) for fixture in fixtures]
fixture_count = len(results)
route_ok_count = sum(1 for result in results if result["ok"])
candidate_score = _score_candidate(results, candidate)
proof_ok = route_ok_count == fixture_count and (candidate_score is None or candidate_score["ok"])
return {
"ok": proof_ok,
"scope": "decision_engine_replay",
"fixture_count": fixture_count,
"metrics": {
"route_accuracy": route_ok_count / fixture_count,
"route_ok_count": route_ok_count,
"lanes": dict(sorted(Counter(result["lane"] for result in results).items())),
},
"results": results,
"candidate": candidate_score,
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--fixtures-dir", default=str(DEFAULT_FIXTURES_DIR))
parser.add_argument("--candidate-output")
parser.add_argument("--output", default=str(DEFAULT_OUTPUT))
args = parser.parse_args()
fixtures = load_fixtures(Path(args.fixtures_dir))
candidate = _load_candidate_output(Path(args.candidate_output) if args.candidate_output else None)
proof = evaluate_fixtures(fixtures, candidate=candidate)
output = Path(args.output)
if not output.is_absolute():
output = REPO_ROOT / output
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n")
print(json.dumps(proof, indent=2, sort_keys=True))
return 0 if proof["ok"] else 1
if __name__ == "__main__":
raise SystemExit(main())