refactor: Phase 4 — extract eval_actions.py, drop underscore prefixes in eval_parse
Some checks are pending
CI / lint-and-test (push) Waiting to run

Three changes:

1. Drop underscore prefixes in eval_parse.py — functions are now the public
   API of the module (filter_diff, parse_verdict, classify_issues, etc.).
   All 12 functions renamed, imports updated in evaluate.py and tests.

2. Extract eval_actions.py from evaluate.py — 3 async PR disposition functions:
   - post_formal_approvals: submit Forgejo reviews from 2 agents
   - terminate_pr: close PR, post rejection comment, requeue source
   - dispose_rejected_pr: disposition logic for rejected PRs on attempt 2+
   evaluate.py drops from ~1140 to 911 lines.

3. 14 new tests in test_eval_actions.py covering all three functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-04-16 12:57:51 +01:00
parent 376b77999f
commit f46e14dfae
5 changed files with 576 additions and 270 deletions

167
lib/eval_actions.py Normal file
View file

@ -0,0 +1,167 @@
"""PR disposition actions — async Forgejo + DB operations for end-of-eval decisions.
Extracted from evaluate.py to isolate the "do something to this PR" functions
from orchestration logic. Contains:
- post_formal_approvals: submit Forgejo reviews from 2 agents (not PR author)
- terminate_pr: close PR, post rejection comment, requeue source
- dispose_rejected_pr: disposition logic for rejected PRs on attempt 2+
All functions are async (Forgejo API calls). Dependencies: forgejo, db, config,
pr_state, feedback, eval_parse.
"""
import json
import logging
from . import config, db
from .eval_parse import classify_issues
from .feedback import format_rejection_comment
from .forgejo import api as forgejo_api, get_agent_token, repo_path
from .pr_state import close_pr
logger = logging.getLogger("pipeline.eval_actions")
async def post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
async def terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Get issue tags for structured feedback
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
issues = []
if row and row["eval_issues"]:
try:
issues = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
pass
# Post structured rejection comment with quality gate guidance
if issues:
feedback_body = format_rejection_comment(issues, source="eval_terminal")
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback.\n\n"
f"{feedback_body}"
)
else:
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback."
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": comment_body},
)
closed = await close_pr(conn, pr_number, last_error=reason)
if not closed:
logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number)
return
# Tag source for re-extraction with feedback
cursor = conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
if cursor.rowcount == 0:
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
db.audit(
conn,
"evaluate",
"pr_terminated",
json.dumps(
{
"pr": pr_number,
"reason": reason,
}
),
)
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
async def dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Attempt 1: normal back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
if eval_attempts < 2:
# Attempt 1: post structured feedback so agent learns, but don't close
if all_issues:
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": feedback_body},
)
return
classification = classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number,
eval_attempts,
all_issues,
)
db.audit(
conn,
"evaluate",
"mechanical_retry",
json.dumps(
{
"pr": pr_number,
"attempt": eval_attempts,
"issues": all_issues,
}
),
)
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number,
eval_attempts,
classification,
all_issues,
)
await terminate_pr(
conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}"
)

View file

@ -10,7 +10,7 @@ Contents:
All functions are pure (input output). The only external dependency
is config.MECHANICAL_ISSUE_TAGS / config.SUBSTANTIVE_ISSUE_TAGS for
_classify_issues.
classify_issues.
"""
import logging
@ -24,7 +24,7 @@ logger = logging.getLogger("pipeline.eval_parse")
# ─── Diff helpers ──────────────────────────────────────────────────────────
def _filter_diff(diff: str) -> tuple[str, str]:
def filter_diff(diff: str) -> tuple[str, str]:
"""Filter diff to only review-relevant files.
Returns (review_diff, entity_diff).
@ -51,14 +51,14 @@ def _filter_diff(diff: str) -> tuple[str, str]:
return "".join(claim_sections), "".join(entity_sections)
def _extract_changed_files(diff: str) -> str:
def extract_changed_files(diff: str) -> str:
"""Extract changed file paths from diff."""
return "\n".join(
line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git")
)
def _is_musings_only(diff: str) -> bool:
def is_musings_only(diff: str) -> bool:
"""Check if PR only modifies musing files."""
has_musings = False
has_other = False
@ -71,7 +71,7 @@ def _is_musings_only(diff: str) -> bool:
return has_musings and not has_other
def _diff_contains_claim_type(diff: str) -> bool:
def diff_contains_claim_type(diff: str) -> bool:
"""Claim-shape detector: check if any file in diff has type: claim in frontmatter.
Mechanical check ($0). If YAML declares type: claim, this is a factual claim
@ -87,7 +87,7 @@ def _diff_contains_claim_type(diff: str) -> bool:
return False
def _deterministic_tier(diff: str) -> str | None:
def deterministic_tier(diff: str) -> str | None:
"""Deterministic tier routing — skip Haiku triage for obvious cases.
Checks diff file patterns before calling the LLM. Returns tier string
@ -157,7 +157,7 @@ def _deterministic_tier(diff: str) -> str | None:
# ─── Verdict parsing ──────────────────────────────────────────────────────
def _parse_verdict(review_text: str, reviewer: str) -> str:
def parse_verdict(review_text: str, reviewer: str) -> str:
"""Parse VERDICT tag from review. Returns 'approve' or 'request_changes'."""
upper = reviewer.upper()
if f"VERDICT:{upper}:APPROVE" in review_text:
@ -200,7 +200,7 @@ VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims
"near_duplicate", "scope_error"}
def _normalize_tag(tag: str) -> str | None:
def normalize_tag(tag: str) -> str | None:
"""Normalize a model-generated tag to a valid tag, or None if unrecognizable."""
tag = tag.strip().lower().replace("-", "_")
if tag in VALID_ISSUE_TAGS:
@ -262,7 +262,7 @@ _PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = {
}
def _parse_issues(review_text: str) -> list[str]:
def parse_issues(review_text: str) -> list[str]:
"""Extract issue tags from review.
First tries structured <!-- ISSUES: tag1, tag2 --> comment with tag normalization.
@ -273,7 +273,7 @@ def _parse_issues(review_text: str) -> list[str]:
raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()]
normalized = []
for tag in raw_tags:
norm = _normalize_tag(tag)
norm = normalize_tag(tag)
if norm and norm not in normalized:
normalized.append(norm)
else:
@ -281,10 +281,10 @@ def _parse_issues(review_text: str) -> list[str]:
if normalized:
return normalized
# Fallback: infer tags from review prose
return _infer_issues_from_prose(review_text)
return infer_issues_from_prose(review_text)
def _infer_issues_from_prose(review_text: str) -> list[str]:
def infer_issues_from_prose(review_text: str) -> list[str]:
"""Infer issue tags from unstructured review text via keyword matching.
Fallback for reviews that reject without structured <!-- ISSUES: --> tags.
@ -297,7 +297,7 @@ def _infer_issues_from_prose(review_text: str) -> list[str]:
return inferred
def _classify_issues(issues: list[str]) -> str:
def classify_issues(issues: list[str]) -> str:
"""Classify issue tags as 'mechanical', 'substantive', or 'mixed'."""
if not issues:
return "unknown"
@ -315,7 +315,7 @@ def _classify_issues(issues: list[str]) -> str:
# ─── Batch response parsing ───────────────────────────────────────────────
def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]:
def parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]:
"""Parse batched domain review into per-PR review sections.
Returns {pr_number: review_text} for each PR found in the response.
@ -364,7 +364,7 @@ def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> d
return result
def _validate_batch_fanout(
def validate_batch_fanout(
parsed: dict[int, str],
pr_diffs: list[dict],
agent: str,

View file

@ -26,25 +26,21 @@ from datetime import datetime, timezone
from . import config, db
from .domains import agent_for_domain, detect_domain_from_branch, detect_domain_from_diff
from .eval_parse import (
VALID_ISSUE_TAGS,
_classify_issues,
_deterministic_tier,
_diff_contains_claim_type,
_extract_changed_files,
_filter_diff,
_infer_issues_from_prose,
_is_musings_only,
_normalize_tag,
_parse_batch_response,
_parse_issues,
_parse_verdict,
_validate_batch_fanout,
deterministic_tier,
diff_contains_claim_type,
extract_changed_files,
filter_diff,
is_musings_only,
parse_batch_response,
parse_issues,
parse_verdict,
validate_batch_fanout,
)
from .forgejo import api as forgejo_api
from .forgejo import get_agent_token, get_pr_diff, repo_path
from .merge import PIPELINE_OWNED_PREFIXES
from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr
from .feedback import format_rejection_comment
from .eval_actions import dispose_rejected_pr, post_formal_approvals, terminate_pr
from .pr_state import approve_pr, close_pr, reopen_pr, start_review
from .validate import load_existing_claims
@ -57,153 +53,6 @@ logger = logging.getLogger("pipeline.evaluate")
# PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed.
async def _post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
# ─── Retry budget helpers ─────────────────────────────────────────────────
async def _terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Get issue tags for structured feedback
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
issues = []
if row and row["eval_issues"]:
try:
issues = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
pass
# Post structured rejection comment with quality gate guidance (Epimetheus)
if issues:
feedback_body = format_rejection_comment(issues, source="eval_terminal")
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback.\n\n"
f"{feedback_body}"
)
else:
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback."
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": comment_body},
)
closed = await close_pr(conn, pr_number, last_error=reason)
if not closed:
logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number)
return
# Tag source for re-extraction with feedback
cursor = conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
if cursor.rowcount == 0:
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
db.audit(
conn,
"evaluate",
"pr_terminated",
json.dumps(
{
"pr": pr_number,
"reason": reason,
}
),
)
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Attempt 1: normal back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
if eval_attempts < 2:
# Attempt 1: post structured feedback so agent learns, but don't close
if all_issues:
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": feedback_body},
)
return
classification = _classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number,
eval_attempts,
all_issues,
)
db.audit(
conn,
"evaluate",
"mechanical_retry",
json.dumps(
{
"pr": pr_number,
"attempt": eval_attempts,
"issues": all_issues,
}
),
)
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number,
eval_attempts,
classification,
all_issues,
)
await _terminate_pr(
conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}"
)
# ─── Single PR evaluation ─────────────────────────────────────────────────
@ -218,7 +67,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal — hard cap reached. Close PR, tag source.
logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS)
await _terminate_pr(conn, pr_number, "eval budget exhausted")
await terminate_pr(conn, pr_number, "eval budget exhausted")
return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"}
# Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11)
@ -249,7 +98,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
return {"pr": pr_number, "skipped": True, "reason": "no_diff_closed"}
# Musings bypass
if _is_musings_only(diff):
if is_musings_only(diff):
logger.info("PR #%d is musings-only — auto-approving", pr_number)
await forgejo_api(
"POST",
@ -285,10 +134,10 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here.
# Filter diff
review_diff, _entity_diff = _filter_diff(diff)
review_diff, _entity_diff = filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
files = extract_changed_files(diff)
# Detect domain — try diff paths first, then branch prefix, then 'general'
domain = detect_domain_from_diff(diff)
@ -309,7 +158,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Step 1: Triage (if not already triaged)
# Try deterministic routing first ($0), fall back to Haiku triage ($0.001)
if tier is None:
tier = _deterministic_tier(diff)
tier = deterministic_tier(diff)
if tier is not None:
db.audit(
conn, "evaluate", "deterministic_tier",
@ -328,7 +177,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Order matters: claim-shape catches obvious cases, random promotion catches the rest.
# Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus)
if tier == "LIGHT" and _diff_contains_claim_type(diff):
if tier == "LIGHT" and diff_contains_claim_type(diff):
tier = "STANDARD"
logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number)
db.audit(
@ -384,7 +233,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number))
return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"}
domain_verdict = _parse_verdict(domain_review, agent)
domain_verdict = parse_verdict(domain_review, agent)
conn.execute(
"UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?",
(domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number),
@ -402,7 +251,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# If domain review rejects, skip Leo review (save Opus)
if domain_verdict == "request_changes":
logger.info("PR #%d: domain rejected, skipping Leo review", pr_number)
domain_issues = _parse_issues(domain_review) if domain_review else []
domain_issues = parse_issues(domain_review) if domain_review else []
reopen_pr(conn, pr_number, leo_verdict='skipped',
last_error='domain review requested changes',
eval_issues=json.dumps(domain_issues))
@ -416,7 +265,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
await dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues)
if domain_verdict != "skipped":
pr_cost += costs.record_usage(
@ -456,7 +305,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed"
return {"pr": pr_number, "skipped": True, "reason": reason}
leo_verdict = _parse_verdict(leo_review, "LEO")
leo_verdict = parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number))
# Post Leo review as comment (from Leo's Forgejo account)
@ -484,7 +333,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
# Submit formal Forgejo reviews (required for merge)
await _post_formal_approvals(pr_number, pr_author)
await post_formal_approvals(pr_number, pr_author)
# Auto-merge agent PRs: if branch is NOT pipeline-owned, set auto_merge=1
# so the merge cycle picks it up without manual intervention.
@ -513,9 +362,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
# Collect all issue tags from both reviews
all_issues = []
if domain_verdict == "request_changes" and domain_review is not None:
all_issues.extend(_parse_issues(domain_review))
all_issues.extend(parse_issues(domain_review))
if leo_verdict == "request_changes" and leo_review is not None:
all_issues.extend(_parse_issues(leo_review))
all_issues.extend(parse_issues(leo_review))
reopen_pr(conn, pr_number, eval_issues=json.dumps(all_issues))
# Store feedback for re-extraction path
@ -547,7 +396,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict:
)
# Disposition: check if this PR should be terminated or kept open
await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
await dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues)
# Record cost (only for reviews that actually ran)
if domain_verdict != "skipped":
@ -647,7 +496,7 @@ async def _run_batch_domain_eval(
continue
# Musings bypass
if _is_musings_only(diff):
if is_musings_only(diff):
await forgejo_api(
"POST",
repo_path(f"issues/{pr_num}/comments"),
@ -658,10 +507,10 @@ async def _run_batch_domain_eval(
succeeded += 1
continue
review_diff, _ = _filter_diff(diff)
review_diff, _ = filter_diff(diff)
if not review_diff:
review_diff = diff
files = _extract_changed_files(diff)
files = extract_changed_files(diff)
# Build label from branch name or first claim filename
branch = pr_row.get("branch", "")
@ -731,8 +580,8 @@ async def _run_batch_domain_eval(
return 0, len(claimed_prs)
# Step 3: Parse + validate fan-out
parsed = _parse_batch_response(batch_response, claimed_prs, agent)
valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent)
parsed = parse_batch_response(batch_response, claimed_prs, agent)
valid_reviews, fallback_prs = validate_batch_fanout(parsed, pr_diffs, agent)
db.audit(
conn, "evaluate", "batch_domain_review",
@ -769,7 +618,7 @@ async def _run_batch_domain_eval(
continue
review_text = valid_reviews[pr_num]
domain_verdict = _parse_verdict(review_text, agent)
domain_verdict = parse_verdict(review_text, agent)
# Post domain review comment
agent_tok = get_agent_token(agent)
@ -787,7 +636,7 @@ async def _run_batch_domain_eval(
# If domain rejects, handle disposition (same as individual path)
if domain_verdict == "request_changes":
domain_issues = _parse_issues(review_text)
domain_issues = parse_issues(review_text)
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
@ -799,7 +648,7 @@ async def _run_batch_domain_eval(
conn, "evaluate", "domain_rejected",
json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}),
)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues)
await dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues)
succeeded += 1
continue
@ -821,7 +670,7 @@ async def _run_batch_domain_eval(
logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num)
continue
leo_verdict = _parse_verdict(leo_review, "LEO")
leo_verdict = parse_verdict(leo_review, "LEO")
conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num))
# Post Leo review
@ -846,7 +695,7 @@ async def _run_batch_domain_eval(
if both_approve:
pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}"))
pr_author = pr_info.get("user", {}).get("login", "") if pr_info else ""
await _post_formal_approvals(pr_num, pr_author)
await post_formal_approvals(pr_num, pr_author)
approve_pr(conn, pr_num, domain=domain or 'cross-domain', auto_merge=1)
db.audit(
conn, "evaluate", "approved",
@ -857,7 +706,7 @@ async def _run_batch_domain_eval(
else:
all_issues = []
if leo_verdict == "request_changes":
all_issues.extend(_parse_issues(leo_review))
all_issues.extend(parse_issues(leo_review))
reopen_pr(conn, pr_num, eval_issues=json.dumps(all_issues))
feedback = {"leo": leo_verdict, "domain": domain_verdict,
"tier": "STANDARD", "issues": all_issues}
@ -873,7 +722,7 @@ async def _run_batch_domain_eval(
eval_attempts = (conn.execute(
"SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,)
).fetchone()["eval_attempts"] or 0)
await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues)
await dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues)
succeeded += 1

290
tests/test_eval_actions.py Normal file
View file

@ -0,0 +1,290 @@
"""Tests for lib/eval_actions.py — async PR disposition actions."""
import asyncio
import json
import sqlite3
import sys
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# Mock heavy dependencies before importing
sys.modules.setdefault("aiohttp", MagicMock())
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.eval_actions import dispose_rejected_pr, post_formal_approvals, terminate_pr
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_db():
"""Create a minimal in-memory DB with prs + sources tables."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE prs (
number INTEGER PRIMARY KEY,
source_path TEXT,
branch TEXT,
status TEXT NOT NULL DEFAULT 'open',
domain TEXT,
agent TEXT,
auto_merge INTEGER DEFAULT 0,
leo_verdict TEXT DEFAULT 'pending',
domain_verdict TEXT DEFAULT 'pending',
eval_attempts INTEGER DEFAULT 0,
eval_issues TEXT DEFAULT '[]',
merge_cycled INTEGER DEFAULT 0,
merge_failures INTEGER DEFAULT 0,
conflict_rebase_attempts INTEGER DEFAULT 0,
last_error TEXT,
merged_at TEXT,
last_attempt TEXT,
cost_usd REAL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("""
CREATE TABLE sources (
path TEXT PRIMARY KEY,
status TEXT DEFAULT 'extracted',
feedback TEXT,
updated_at TEXT
)
""")
conn.execute("""
CREATE TABLE audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stage TEXT,
event TEXT,
detail TEXT,
ts TEXT DEFAULT (datetime('now'))
)
""")
return conn
def _insert_pr(conn, number=100, status="open", source_path=None, eval_issues=None, **kwargs):
"""Insert a test PR row."""
cols = ["number", "status"]
vals = [number, status]
if source_path is not None:
cols.append("source_path")
vals.append(source_path)
if eval_issues is not None:
cols.append("eval_issues")
vals.append(eval_issues)
for k, v in kwargs.items():
cols.append(k)
vals.append(v)
placeholders = ", ".join("?" * len(cols))
conn.execute(f"INSERT INTO prs ({', '.join(cols)}) VALUES ({placeholders})", vals)
conn.commit()
def _run(coro):
"""Run async coroutine in sync test."""
return asyncio.run(coro)
# ---------------------------------------------------------------------------
# post_formal_approvals
# ---------------------------------------------------------------------------
class TestPostFormalApprovals:
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
@patch("lib.eval_actions.get_agent_token")
def test_posts_two_approvals(self, mock_token, mock_api):
mock_token.return_value = "fake-token"
mock_api.return_value = {"id": 1}
_run(post_formal_approvals(100, "epimetheus"))
# Should have called forgejo_api exactly 2 times (first 2 agents that aren't author)
assert mock_api.call_count == 2
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
@patch("lib.eval_actions.get_agent_token")
def test_skips_pr_author(self, mock_token, mock_api):
mock_token.return_value = "fake-token"
mock_api.return_value = {"id": 1}
_run(post_formal_approvals(100, "leo"))
# Leo is first in the list, should be skipped, next 2 agents used
calls = mock_api.call_args_list
for call in calls:
assert "leo" not in str(call).lower() or "token" in str(call)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
@patch("lib.eval_actions.get_agent_token")
def test_handles_no_tokens(self, mock_token, mock_api):
mock_token.return_value = None
_run(post_formal_approvals(100, "someone"))
# No tokens available — no API calls
mock_api.assert_not_called()
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
@patch("lib.eval_actions.get_agent_token")
def test_continues_on_api_failure(self, mock_token, mock_api):
mock_token.return_value = "fake-token"
# First call fails (returns None), subsequent succeed
mock_api.side_effect = [None, {"id": 1}, {"id": 2}, {"id": 3}]
_run(post_formal_approvals(100, "nobody"))
# Should keep trying until 2 successes
assert mock_api.call_count >= 3
# ---------------------------------------------------------------------------
# terminate_pr
# ---------------------------------------------------------------------------
class TestTerminatePr:
@patch("lib.eval_actions.close_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_closes_pr_and_requeues_source(self, mock_api, mock_close):
mock_api.return_value = {"id": 1}
mock_close.return_value = True
conn = _make_db()
_insert_pr(conn, 100, source_path="inbox/queue/test.md",
eval_issues='["factual_discrepancy"]')
conn.execute("INSERT INTO sources (path, status) VALUES (?, ?)",
("inbox/queue/test.md", "extracted"))
conn.commit()
_run(terminate_pr(conn, 100, "test reason"))
# close_pr called
mock_close.assert_called_once()
# Source requeued
src = conn.execute("SELECT status FROM sources WHERE path = ?",
("inbox/queue/test.md",)).fetchone()
assert src["status"] == "needs_reextraction"
# Audit logged
audit = conn.execute("SELECT * FROM audit_log WHERE event = 'pr_terminated'").fetchone()
assert audit is not None
@patch("lib.eval_actions.close_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_skips_requeue_on_close_failure(self, mock_api, mock_close):
mock_api.return_value = {"id": 1}
mock_close.return_value = False
conn = _make_db()
_insert_pr(conn, 100, source_path="inbox/queue/test.md")
conn.execute("INSERT INTO sources (path, status) VALUES (?, ?)",
("inbox/queue/test.md", "extracted"))
conn.commit()
_run(terminate_pr(conn, 100, "test reason"))
# Source NOT requeued
src = conn.execute("SELECT status FROM sources WHERE path = ?",
("inbox/queue/test.md",)).fetchone()
assert src["status"] == "extracted"
# No audit for pr_terminated
audit = conn.execute("SELECT * FROM audit_log WHERE event = 'pr_terminated'").fetchone()
assert audit is None
@patch("lib.eval_actions.close_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_posts_structured_feedback_with_issues(self, mock_api, mock_close):
mock_api.return_value = {"id": 1}
mock_close.return_value = True
conn = _make_db()
_insert_pr(conn, 100, eval_issues='["frontmatter_schema", "near_duplicate"]')
_run(terminate_pr(conn, 100, "budget exhausted"))
# Comment posted with structured feedback
comment_call = mock_api.call_args_list[0]
body = comment_call[0][2]["body"]
assert "Closed by eval pipeline" in body
@patch("lib.eval_actions.close_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_handles_no_source_path(self, mock_api, mock_close):
mock_api.return_value = {"id": 1}
mock_close.return_value = True
conn = _make_db()
_insert_pr(conn, 100) # No source_path
_run(terminate_pr(conn, 100, "no source"))
# Should not crash, audit should still be logged
audit = conn.execute("SELECT * FROM audit_log WHERE event = 'pr_terminated'").fetchone()
assert audit is not None
# ---------------------------------------------------------------------------
# dispose_rejected_pr
# ---------------------------------------------------------------------------
class TestDisposeRejectedPr:
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_attempt_1_posts_feedback_only(self, mock_api):
mock_api.return_value = {"id": 1}
conn = _make_db()
_insert_pr(conn, 100)
_run(dispose_rejected_pr(conn, 100, eval_attempts=1, all_issues=["factual_discrepancy"]))
# Should post feedback comment but NOT close
assert mock_api.call_count == 1 # Just the comment
pr = conn.execute("SELECT status FROM prs WHERE number = 100").fetchone()
assert pr["status"] == "open" # Still open
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_attempt_1_no_issues_no_action(self, mock_api):
conn = _make_db()
_insert_pr(conn, 100)
_run(dispose_rejected_pr(conn, 100, eval_attempts=1, all_issues=[]))
mock_api.assert_not_called()
@patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_attempt_max_terminates(self, mock_api, mock_terminate):
conn = _make_db()
_insert_pr(conn, 100)
_run(dispose_rejected_pr(conn, 100, eval_attempts=3, all_issues=["factual_discrepancy"]))
mock_terminate.assert_called_once()
assert "eval budget exhausted" in mock_terminate.call_args[0][2]
@patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_attempt_2_mechanical_keeps_open(self, mock_api, mock_terminate):
conn = _make_db()
_insert_pr(conn, 100)
_run(dispose_rejected_pr(conn, 100, eval_attempts=2, all_issues=["frontmatter_schema"]))
# Should NOT terminate — mechanical issues get one more try
mock_terminate.assert_not_called()
# Should log audit
audit = conn.execute("SELECT * FROM audit_log WHERE event = 'mechanical_retry'").fetchone()
assert audit is not None
@patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_attempt_2_substantive_terminates(self, mock_api, mock_terminate):
conn = _make_db()
_insert_pr(conn, 100)
_run(dispose_rejected_pr(conn, 100, eval_attempts=2, all_issues=["factual_discrepancy"]))
mock_terminate.assert_called_once()
assert "substantive issues" in mock_terminate.call_args[0][2]
@patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock)
@patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock)
def test_attempt_2_mixed_terminates(self, mock_api, mock_terminate):
conn = _make_db()
_insert_pr(conn, 100)
_run(dispose_rejected_pr(conn, 100, eval_attempts=2,
all_issues=["frontmatter_schema", "factual_discrepancy"]))
mock_terminate.assert_called_once()

View file

@ -13,23 +13,23 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from lib.eval_parse import (
VALID_ISSUE_TAGS,
_classify_issues,
_deterministic_tier,
_diff_contains_claim_type,
_extract_changed_files,
_filter_diff,
_infer_issues_from_prose,
_is_musings_only,
_normalize_tag,
_parse_batch_response,
_parse_issues,
_parse_verdict,
_validate_batch_fanout,
classify_issues,
deterministic_tier,
diff_contains_claim_type,
extract_changed_files,
filter_diff,
infer_issues_from_prose,
is_musings_only,
normalize_tag,
parse_batch_response,
parse_issues,
parse_verdict,
validate_batch_fanout,
)
# ---------------------------------------------------------------------------
# _filter_diff
# filter_diff
# ---------------------------------------------------------------------------
class TestFilterDiff:
@ -40,7 +40,7 @@ class TestFilterDiff:
"diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n"
"+real content\n"
)
review_diff, entity_diff = _filter_diff(diff)
review_diff, entity_diff = filter_diff(diff)
assert "inbox" not in review_diff
assert "claim.md" in review_diff
@ -51,7 +51,7 @@ class TestFilterDiff:
"diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n"
"+claim content\n"
)
review_diff, entity_diff = _filter_diff(diff)
review_diff, entity_diff = filter_diff(diff)
assert "alice.md" in entity_diff
assert "claim.md" in review_diff
@ -60,13 +60,13 @@ class TestFilterDiff:
"diff --git a/entities/living-agents/agent.md b/entities/living-agents/agent.md\n"
"+core entity\n"
)
review_diff, entity_diff = _filter_diff(diff)
review_diff, entity_diff = filter_diff(diff)
assert "agent.md" in review_diff
assert entity_diff == ""
# ---------------------------------------------------------------------------
# _extract_changed_files
# extract_changed_files
# ---------------------------------------------------------------------------
class TestExtractChangedFiles:
@ -77,189 +77,189 @@ class TestExtractChangedFiles:
"diff --git a/entities/bar.md b/entities/bar.md\n"
"+more\n"
)
result = _extract_changed_files(diff)
result = extract_changed_files(diff)
assert "domains/foo.md" in result
assert "entities/bar.md" in result
def test_empty_diff(self):
assert _extract_changed_files("") == ""
assert extract_changed_files("") == ""
# ---------------------------------------------------------------------------
# _is_musings_only
# is_musings_only
# ---------------------------------------------------------------------------
class TestIsMusingsOnly:
def test_musings_only(self):
diff = "diff --git a/agents/rio/musings/thought.md b/agents/rio/musings/thought.md\n+musing\n"
assert _is_musings_only(diff) is True
assert is_musings_only(diff) is True
def test_mixed_files(self):
diff = (
"diff --git a/agents/rio/musings/thought.md b/agents/rio/musings/thought.md\n+musing\n"
"diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n+claim\n"
)
assert _is_musings_only(diff) is False
assert is_musings_only(diff) is False
def test_no_musings(self):
diff = "diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n+claim\n"
assert _is_musings_only(diff) is False
assert is_musings_only(diff) is False
# ---------------------------------------------------------------------------
# _diff_contains_claim_type
# diff_contains_claim_type
# ---------------------------------------------------------------------------
class TestDiffContainsClaimType:
def test_detects_claim_type(self):
diff = "+type: claim\n+confidence: 0.8\n"
assert _diff_contains_claim_type(diff) is True
assert diff_contains_claim_type(diff) is True
def test_ignores_non_claim(self):
diff = "+type: entity\n"
assert _diff_contains_claim_type(diff) is False
assert diff_contains_claim_type(diff) is False
def test_ignores_file_header(self):
diff = "+++ b/type: claim\n"
assert _diff_contains_claim_type(diff) is False
assert diff_contains_claim_type(diff) is False
# ---------------------------------------------------------------------------
# _deterministic_tier
# deterministic_tier
# ---------------------------------------------------------------------------
class TestDeterministicTier:
def test_entities_only_is_light(self):
diff = "diff --git a/entities/people/alice.md b/entities/people/alice.md\n+data\n"
assert _deterministic_tier(diff) == "LIGHT"
assert deterministic_tier(diff) == "LIGHT"
def test_inbox_only_is_light(self):
diff = "diff --git a/inbox/archive/src.md b/inbox/archive/src.md\n+data\n"
assert _deterministic_tier(diff) == "LIGHT"
assert deterministic_tier(diff) == "LIGHT"
def test_core_is_deep(self):
diff = "diff --git a/core/schema.md b/core/schema.md\n+structural change\n"
assert _deterministic_tier(diff) == "DEEP"
assert deterministic_tier(diff) == "DEEP"
def test_challenged_by_is_deep(self):
diff = "diff --git a/domains/x/claim.md b/domains/x/claim.md\n+challenged_by: some-claim\n"
assert _deterministic_tier(diff) == "DEEP"
assert deterministic_tier(diff) == "DEEP"
def test_domain_claim_returns_none(self):
diff = "diff --git a/domains/finance/new-claim.md b/domains/finance/new-claim.md\n--- /dev/null\n+++ b/domains/finance/new-claim.md\n+type: claim\n"
assert _deterministic_tier(diff) is None
assert deterministic_tier(diff) is None
def test_empty_diff_returns_none(self):
assert _deterministic_tier("") is None
assert deterministic_tier("") is None
# ---------------------------------------------------------------------------
# _parse_verdict
# parse_verdict
# ---------------------------------------------------------------------------
class TestParseVerdict:
def test_approve(self):
assert _parse_verdict("VERDICT:LEO:APPROVE", "LEO") == "approve"
assert parse_verdict("VERDICT:LEO:APPROVE", "LEO") == "approve"
def test_request_changes(self):
assert _parse_verdict("VERDICT:LEO:REQUEST_CHANGES", "LEO") == "request_changes"
assert parse_verdict("VERDICT:LEO:REQUEST_CHANGES", "LEO") == "request_changes"
def test_no_verdict_defaults_to_request_changes(self):
assert _parse_verdict("some review text", "LEO") == "request_changes"
assert parse_verdict("some review text", "LEO") == "request_changes"
def test_case_insensitive_reviewer(self):
assert _parse_verdict("VERDICT:LEO:APPROVE", "leo") == "approve"
assert parse_verdict("VERDICT:LEO:APPROVE", "leo") == "approve"
# ---------------------------------------------------------------------------
# _normalize_tag
# normalize_tag
# ---------------------------------------------------------------------------
class TestNormalizeTag:
def test_valid_tag_passes_through(self):
assert _normalize_tag("broken_wiki_links") == "broken_wiki_links"
assert normalize_tag("broken_wiki_links") == "broken_wiki_links"
def test_alias_normalizes(self):
assert _normalize_tag("schema_violation") == "frontmatter_schema"
assert _normalize_tag("duplicate") == "near_duplicate"
assert _normalize_tag("factual_error") == "factual_discrepancy"
assert normalize_tag("schema_violation") == "frontmatter_schema"
assert normalize_tag("duplicate") == "near_duplicate"
assert normalize_tag("factual_error") == "factual_discrepancy"
def test_hyphen_to_underscore(self):
assert _normalize_tag("unverified-wiki-links") == "broken_wiki_links"
assert normalize_tag("unverified-wiki-links") == "broken_wiki_links"
def test_unknown_returns_none(self):
assert _normalize_tag("totally_made_up") is None
assert normalize_tag("totally_made_up") is None
def test_fuzzy_substring_match(self):
assert _normalize_tag("wiki_links") == "broken_wiki_links"
assert normalize_tag("wiki_links") == "broken_wiki_links"
# ---------------------------------------------------------------------------
# _parse_issues
# parse_issues
# ---------------------------------------------------------------------------
class TestParseIssues:
def test_structured_comment(self):
text = "Review text. <!-- ISSUES: schema_violation, duplicate -->"
result = _parse_issues(text)
result = parse_issues(text)
assert "frontmatter_schema" in result
assert "near_duplicate" in result
def test_fallback_to_prose(self):
text = "The frontmatter is missing required fields."
result = _parse_issues(text)
result = parse_issues(text)
assert "frontmatter_schema" in result
def test_empty_structured_falls_back(self):
text = "Review. <!-- ISSUES: totally_unknown_tag -->"
# All tags unrecognizable, falls through to prose
result = _parse_issues(text)
result = parse_issues(text)
# With no prose keywords either, should return empty
assert isinstance(result, list)
# ---------------------------------------------------------------------------
# _infer_issues_from_prose
# infer_issues_from_prose
# ---------------------------------------------------------------------------
class TestInferIssuesFromProse:
def test_detects_frontmatter_issues(self):
result = _infer_issues_from_prose("The YAML frontmatter is invalid.")
result = infer_issues_from_prose("The YAML frontmatter is invalid.")
assert "frontmatter_schema" in result
def test_detects_wiki_link_issues(self):
result = _infer_issues_from_prose("Found broken wiki links in the claim.")
result = infer_issues_from_prose("Found broken wiki links in the claim.")
assert "broken_wiki_links" in result
def test_detects_near_duplicate(self):
result = _infer_issues_from_prose("This is a near-duplicate of an existing claim.")
result = infer_issues_from_prose("This is a near-duplicate of an existing claim.")
assert "near_duplicate" in result
def test_no_matches(self):
result = _infer_issues_from_prose("This claim is well-structured and accurate.")
result = infer_issues_from_prose("This claim is well-structured and accurate.")
assert result == []
# ---------------------------------------------------------------------------
# _classify_issues
# classify_issues
# ---------------------------------------------------------------------------
class TestClassifyIssues:
def test_mechanical_only(self):
assert _classify_issues(["frontmatter_schema", "near_duplicate"]) == "mechanical"
assert classify_issues(["frontmatter_schema", "near_duplicate"]) == "mechanical"
def test_substantive_only(self):
assert _classify_issues(["factual_discrepancy"]) == "substantive"
assert classify_issues(["factual_discrepancy"]) == "substantive"
def test_mixed(self):
assert _classify_issues(["frontmatter_schema", "factual_discrepancy"]) == "mixed"
assert classify_issues(["frontmatter_schema", "factual_discrepancy"]) == "mixed"
def test_empty(self):
assert _classify_issues([]) == "unknown"
assert classify_issues([]) == "unknown"
# ---------------------------------------------------------------------------
# _parse_batch_response
# parse_batch_response
# ---------------------------------------------------------------------------
class TestParseBatchResponse:
@ -268,7 +268,7 @@ class TestParseBatchResponse:
"=== PR #100\nReview for 100\n<!-- PR:100 VERDICT:AGENT:APPROVE -->\n"
"=== PR #200\nReview for 200\n<!-- PR:200 VERDICT:AGENT:REQUEST_CHANGES -->\n"
)
result = _parse_batch_response(response, [100, 200], "agent")
result = parse_batch_response(response, [100, 200], "agent")
assert 100 in result
assert 200 in result
assert "APPROVE" in result[100]
@ -276,17 +276,17 @@ class TestParseBatchResponse:
def test_missing_pr(self):
response = "=== PR #100\nReview\n<!-- PR:100 VERDICT:AGENT:APPROVE -->\n"
result = _parse_batch_response(response, [100, 200], "agent")
result = parse_batch_response(response, [100, 200], "agent")
assert 100 in result
assert 200 not in result
def test_empty_response(self):
result = _parse_batch_response("no verdicts here", [100], "agent")
result = parse_batch_response("no verdicts here", [100], "agent")
assert result == {}
# ---------------------------------------------------------------------------
# _validate_batch_fanout
# validate_batch_fanout
# ---------------------------------------------------------------------------
class TestValidateBatchFanout:
@ -296,7 +296,7 @@ class TestValidateBatchFanout:
{"number": 100, "diff": "diff --git a/domains/long-enough-path.md b/domains/long-enough-path.md\n"},
{"number": 200, "diff": "diff --git a/domains/other-long-path.md b/domains/other-long-path.md\n"},
]
valid, fallback = _validate_batch_fanout(parsed, pr_diffs, "agent")
valid, fallback = validate_batch_fanout(parsed, pr_diffs, "agent")
assert 200 in fallback
assert 200 not in valid
@ -309,7 +309,7 @@ class TestValidateBatchFanout:
{"number": 100, "diff": "diff --git a/domains/long-enough-path.md b/domains/long-enough-path.md\n"},
{"number": 200, "diff": "diff --git a/domains/other-long-path.md b/domains/other-long-path.md\n"},
]
valid, fallback = _validate_batch_fanout(parsed, pr_diffs, "agent")
valid, fallback = validate_batch_fanout(parsed, pr_diffs, "agent")
assert 100 in valid
assert 200 in valid
assert fallback == []