diff --git a/lib/eval_actions.py b/lib/eval_actions.py new file mode 100644 index 0000000..30b816d --- /dev/null +++ b/lib/eval_actions.py @@ -0,0 +1,167 @@ +"""PR disposition actions — async Forgejo + DB operations for end-of-eval decisions. + +Extracted from evaluate.py to isolate the "do something to this PR" functions +from orchestration logic. Contains: + +- post_formal_approvals: submit Forgejo reviews from 2 agents (not PR author) +- terminate_pr: close PR, post rejection comment, requeue source +- dispose_rejected_pr: disposition logic for rejected PRs on attempt 2+ + +All functions are async (Forgejo API calls). Dependencies: forgejo, db, config, +pr_state, feedback, eval_parse. +""" + +import json +import logging + +from . import config, db +from .eval_parse import classify_issues +from .feedback import format_rejection_comment +from .forgejo import api as forgejo_api, get_agent_token, repo_path +from .pr_state import close_pr + +logger = logging.getLogger("pipeline.eval_actions") + + +async def post_formal_approvals(pr_number: int, pr_author: str): + """Submit formal Forgejo reviews from 2 agents (not the PR author).""" + approvals = 0 + for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: + if agent_name == pr_author: + continue + if approvals >= 2: + break + token = get_agent_token(agent_name) + if token: + result = await forgejo_api( + "POST", + repo_path(f"pulls/{pr_number}/reviews"), + {"body": "Approved.", "event": "APPROVED"}, + token=token, + ) + if result is not None: + approvals += 1 + logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) + + +async def terminate_pr(conn, pr_number: int, reason: str): + """Terminal state: close PR on Forgejo, mark source needs_human.""" + # Get issue tags for structured feedback + row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone() + issues = [] + if row and row["eval_issues"]: + try: + issues = json.loads(row["eval_issues"]) + except (json.JSONDecodeError, TypeError): + pass + + # Post structured rejection comment with quality gate guidance + if issues: + feedback_body = format_rejection_comment(issues, source="eval_terminal") + comment_body = ( + f"**Closed by eval pipeline** — {reason}.\n\n" + f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " + f"Source will be re-queued with feedback.\n\n" + f"{feedback_body}" + ) + else: + comment_body = ( + f"**Closed by eval pipeline** — {reason}.\n\n" + f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " + f"Source will be re-queued with feedback." + ) + + await forgejo_api( + "POST", + repo_path(f"issues/{pr_number}/comments"), + {"body": comment_body}, + ) + closed = await close_pr(conn, pr_number, last_error=reason) + if not closed: + logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number) + return + + # Tag source for re-extraction with feedback + cursor = conn.execute( + """UPDATE sources SET status = 'needs_reextraction', + updated_at = datetime('now') + WHERE path = (SELECT source_path FROM prs WHERE number = ?)""", + (pr_number,), + ) + if cursor.rowcount == 0: + logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number) + + db.audit( + conn, + "evaluate", + "pr_terminated", + json.dumps( + { + "pr": pr_number, + "reason": reason, + } + ), + ) + logger.info("PR #%d: TERMINATED — %s", pr_number, reason) + + +async def dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]): + """Disposition logic for rejected PRs on attempt 2+. + + Attempt 1: normal — back to open, wait for fix. + Attempt 2: check issue classification. + - Mechanical only: keep open for one more attempt (auto-fix future). + - Substantive or mixed: close PR, requeue source. + Attempt 3+: terminal. + """ + if eval_attempts < 2: + # Attempt 1: post structured feedback so agent learns, but don't close + if all_issues: + feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1") + await forgejo_api( + "POST", + repo_path(f"issues/{pr_number}/comments"), + {"body": feedback_body}, + ) + return + + classification = classify_issues(all_issues) + + if eval_attempts >= config.MAX_EVAL_ATTEMPTS: + # Terminal + await terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts") + return + + if classification == "mechanical": + # Mechanical issues only — keep open for one more attempt. + # Future: auto-fix module will push fixes here. + logger.info( + "PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt", + pr_number, + eval_attempts, + all_issues, + ) + db.audit( + conn, + "evaluate", + "mechanical_retry", + json.dumps( + { + "pr": pr_number, + "attempt": eval_attempts, + "issues": all_issues, + } + ), + ) + else: + # Substantive, mixed, or unknown — close and requeue + logger.info( + "PR #%d: attempt %d, %s issues (%s) — closing and requeuing source", + pr_number, + eval_attempts, + classification, + all_issues, + ) + await terminate_pr( + conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}" + ) diff --git a/lib/eval_parse.py b/lib/eval_parse.py index cc94753..7222050 100644 --- a/lib/eval_parse.py +++ b/lib/eval_parse.py @@ -10,7 +10,7 @@ Contents: All functions are pure (input → output). The only external dependency is config.MECHANICAL_ISSUE_TAGS / config.SUBSTANTIVE_ISSUE_TAGS for -_classify_issues. +classify_issues. """ import logging @@ -24,7 +24,7 @@ logger = logging.getLogger("pipeline.eval_parse") # ─── Diff helpers ────────────────────────────────────────────────────────── -def _filter_diff(diff: str) -> tuple[str, str]: +def filter_diff(diff: str) -> tuple[str, str]: """Filter diff to only review-relevant files. Returns (review_diff, entity_diff). @@ -51,14 +51,14 @@ def _filter_diff(diff: str) -> tuple[str, str]: return "".join(claim_sections), "".join(entity_sections) -def _extract_changed_files(diff: str) -> str: +def extract_changed_files(diff: str) -> str: """Extract changed file paths from diff.""" return "\n".join( line.replace("diff --git a/", "").split(" b/")[0] for line in diff.split("\n") if line.startswith("diff --git") ) -def _is_musings_only(diff: str) -> bool: +def is_musings_only(diff: str) -> bool: """Check if PR only modifies musing files.""" has_musings = False has_other = False @@ -71,7 +71,7 @@ def _is_musings_only(diff: str) -> bool: return has_musings and not has_other -def _diff_contains_claim_type(diff: str) -> bool: +def diff_contains_claim_type(diff: str) -> bool: """Claim-shape detector: check if any file in diff has type: claim in frontmatter. Mechanical check ($0). If YAML declares type: claim, this is a factual claim — @@ -87,7 +87,7 @@ def _diff_contains_claim_type(diff: str) -> bool: return False -def _deterministic_tier(diff: str) -> str | None: +def deterministic_tier(diff: str) -> str | None: """Deterministic tier routing — skip Haiku triage for obvious cases. Checks diff file patterns before calling the LLM. Returns tier string @@ -157,7 +157,7 @@ def _deterministic_tier(diff: str) -> str | None: # ─── Verdict parsing ────────────────────────────────────────────────────── -def _parse_verdict(review_text: str, reviewer: str) -> str: +def parse_verdict(review_text: str, reviewer: str) -> str: """Parse VERDICT tag from review. Returns 'approve' or 'request_changes'.""" upper = reviewer.upper() if f"VERDICT:{upper}:APPROVE" in review_text: @@ -200,7 +200,7 @@ VALID_ISSUE_TAGS = {"broken_wiki_links", "frontmatter_schema", "title_overclaims "near_duplicate", "scope_error"} -def _normalize_tag(tag: str) -> str | None: +def normalize_tag(tag: str) -> str | None: """Normalize a model-generated tag to a valid tag, or None if unrecognizable.""" tag = tag.strip().lower().replace("-", "_") if tag in VALID_ISSUE_TAGS: @@ -262,7 +262,7 @@ _PROSE_TAG_PATTERNS: dict[str, list[re.Pattern]] = { } -def _parse_issues(review_text: str) -> list[str]: +def parse_issues(review_text: str) -> list[str]: """Extract issue tags from review. First tries structured comment with tag normalization. @@ -273,7 +273,7 @@ def _parse_issues(review_text: str) -> list[str]: raw_tags = [tag.strip() for tag in match.group(1).split(",") if tag.strip()] normalized = [] for tag in raw_tags: - norm = _normalize_tag(tag) + norm = normalize_tag(tag) if norm and norm not in normalized: normalized.append(norm) else: @@ -281,10 +281,10 @@ def _parse_issues(review_text: str) -> list[str]: if normalized: return normalized # Fallback: infer tags from review prose - return _infer_issues_from_prose(review_text) + return infer_issues_from_prose(review_text) -def _infer_issues_from_prose(review_text: str) -> list[str]: +def infer_issues_from_prose(review_text: str) -> list[str]: """Infer issue tags from unstructured review text via keyword matching. Fallback for reviews that reject without structured tags. @@ -297,7 +297,7 @@ def _infer_issues_from_prose(review_text: str) -> list[str]: return inferred -def _classify_issues(issues: list[str]) -> str: +def classify_issues(issues: list[str]) -> str: """Classify issue tags as 'mechanical', 'substantive', or 'mixed'.""" if not issues: return "unknown" @@ -315,7 +315,7 @@ def _classify_issues(issues: list[str]) -> str: # ─── Batch response parsing ─────────────────────────────────────────────── -def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]: +def parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> dict[int, str]: """Parse batched domain review into per-PR review sections. Returns {pr_number: review_text} for each PR found in the response. @@ -364,7 +364,7 @@ def _parse_batch_response(response: str, pr_numbers: list[int], agent: str) -> d return result -def _validate_batch_fanout( +def validate_batch_fanout( parsed: dict[int, str], pr_diffs: list[dict], agent: str, diff --git a/lib/evaluate.py b/lib/evaluate.py index ad726c2..dc92fbb 100644 --- a/lib/evaluate.py +++ b/lib/evaluate.py @@ -26,25 +26,21 @@ from datetime import datetime, timezone from . import config, db from .domains import agent_for_domain, detect_domain_from_branch, detect_domain_from_diff from .eval_parse import ( - VALID_ISSUE_TAGS, - _classify_issues, - _deterministic_tier, - _diff_contains_claim_type, - _extract_changed_files, - _filter_diff, - _infer_issues_from_prose, - _is_musings_only, - _normalize_tag, - _parse_batch_response, - _parse_issues, - _parse_verdict, - _validate_batch_fanout, + deterministic_tier, + diff_contains_claim_type, + extract_changed_files, + filter_diff, + is_musings_only, + parse_batch_response, + parse_issues, + parse_verdict, + validate_batch_fanout, ) from .forgejo import api as forgejo_api from .forgejo import get_agent_token, get_pr_diff, repo_path from .merge import PIPELINE_OWNED_PREFIXES from .llm import run_batch_domain_review, run_domain_review, run_leo_review, triage_pr -from .feedback import format_rejection_comment +from .eval_actions import dispose_rejected_pr, post_formal_approvals, terminate_pr from .pr_state import approve_pr, close_pr, reopen_pr, start_review from .validate import load_existing_claims @@ -57,153 +53,6 @@ logger = logging.getLogger("pipeline.evaluate") # PRs. Eval trusts that tier0_pass=1 means all mechanical checks passed. -async def _post_formal_approvals(pr_number: int, pr_author: str): - """Submit formal Forgejo reviews from 2 agents (not the PR author).""" - approvals = 0 - for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: - if agent_name == pr_author: - continue - if approvals >= 2: - break - token = get_agent_token(agent_name) - if token: - result = await forgejo_api( - "POST", - repo_path(f"pulls/{pr_number}/reviews"), - {"body": "Approved.", "event": "APPROVED"}, - token=token, - ) - if result is not None: - approvals += 1 - logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) - - -# ─── Retry budget helpers ───────────────────────────────────────────────── - - -async def _terminate_pr(conn, pr_number: int, reason: str): - """Terminal state: close PR on Forgejo, mark source needs_human.""" - # Get issue tags for structured feedback - row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone() - issues = [] - if row and row["eval_issues"]: - try: - issues = json.loads(row["eval_issues"]) - except (json.JSONDecodeError, TypeError): - pass - - # Post structured rejection comment with quality gate guidance (Epimetheus) - if issues: - feedback_body = format_rejection_comment(issues, source="eval_terminal") - comment_body = ( - f"**Closed by eval pipeline** — {reason}.\n\n" - f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " - f"Source will be re-queued with feedback.\n\n" - f"{feedback_body}" - ) - else: - comment_body = ( - f"**Closed by eval pipeline** — {reason}.\n\n" - f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " - f"Source will be re-queued with feedback." - ) - - await forgejo_api( - "POST", - repo_path(f"issues/{pr_number}/comments"), - {"body": comment_body}, - ) - closed = await close_pr(conn, pr_number, last_error=reason) - if not closed: - logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number) - return - - # Tag source for re-extraction with feedback - cursor = conn.execute( - """UPDATE sources SET status = 'needs_reextraction', - updated_at = datetime('now') - WHERE path = (SELECT source_path FROM prs WHERE number = ?)""", - (pr_number,), - ) - if cursor.rowcount == 0: - logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number) - - db.audit( - conn, - "evaluate", - "pr_terminated", - json.dumps( - { - "pr": pr_number, - "reason": reason, - } - ), - ) - logger.info("PR #%d: TERMINATED — %s", pr_number, reason) - - -async def _dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]): - """Disposition logic for rejected PRs on attempt 2+. - - Attempt 1: normal — back to open, wait for fix. - Attempt 2: check issue classification. - - Mechanical only: keep open for one more attempt (auto-fix future). - - Substantive or mixed: close PR, requeue source. - Attempt 3+: terminal. - """ - if eval_attempts < 2: - # Attempt 1: post structured feedback so agent learns, but don't close - if all_issues: - feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1") - await forgejo_api( - "POST", - repo_path(f"issues/{pr_number}/comments"), - {"body": feedback_body}, - ) - return - - classification = _classify_issues(all_issues) - - if eval_attempts >= config.MAX_EVAL_ATTEMPTS: - # Terminal - await _terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts") - return - - if classification == "mechanical": - # Mechanical issues only — keep open for one more attempt. - # Future: auto-fix module will push fixes here. - logger.info( - "PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt", - pr_number, - eval_attempts, - all_issues, - ) - db.audit( - conn, - "evaluate", - "mechanical_retry", - json.dumps( - { - "pr": pr_number, - "attempt": eval_attempts, - "issues": all_issues, - } - ), - ) - else: - # Substantive, mixed, or unknown — close and requeue - logger.info( - "PR #%d: attempt %d, %s issues (%s) — closing and requeuing source", - pr_number, - eval_attempts, - classification, - all_issues, - ) - await _terminate_pr( - conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}" - ) - - # ─── Single PR evaluation ───────────────────────────────────────────────── @@ -218,7 +67,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: if eval_attempts >= config.MAX_EVAL_ATTEMPTS: # Terminal — hard cap reached. Close PR, tag source. logger.warning("PR #%d: eval_attempts=%d >= %d, terminal", pr_number, eval_attempts, config.MAX_EVAL_ATTEMPTS) - await _terminate_pr(conn, pr_number, "eval budget exhausted") + await terminate_pr(conn, pr_number, "eval budget exhausted") return {"pr": pr_number, "terminal": True, "reason": "eval_budget_exhausted"} # Atomic claim — prevent concurrent workers from evaluating the same PR (Ganymede #11) @@ -249,7 +98,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: return {"pr": pr_number, "skipped": True, "reason": "no_diff_closed"} # Musings bypass - if _is_musings_only(diff): + if is_musings_only(diff): logger.info("PR #%d is musings-only — auto-approving", pr_number) await forgejo_api( "POST", @@ -285,10 +134,10 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # tier0_pass=1 guarantees all mechanical checks passed. No Tier 0.5 here. # Filter diff - review_diff, _entity_diff = _filter_diff(diff) + review_diff, _entity_diff = filter_diff(diff) if not review_diff: review_diff = diff - files = _extract_changed_files(diff) + files = extract_changed_files(diff) # Detect domain — try diff paths first, then branch prefix, then 'general' domain = detect_domain_from_diff(diff) @@ -309,7 +158,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Step 1: Triage (if not already triaged) # Try deterministic routing first ($0), fall back to Haiku triage ($0.001) if tier is None: - tier = _deterministic_tier(diff) + tier = deterministic_tier(diff) if tier is not None: db.audit( conn, "evaluate", "deterministic_tier", @@ -328,7 +177,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Order matters: claim-shape catches obvious cases, random promotion catches the rest. # Claim-shape detector: type: claim in YAML → STANDARD minimum (Theseus) - if tier == "LIGHT" and _diff_contains_claim_type(diff): + if tier == "LIGHT" and diff_contains_claim_type(diff): tier = "STANDARD" logger.info("PR #%d: claim-shape detector upgraded LIGHT → STANDARD (type: claim found)", pr_number) db.audit( @@ -384,7 +233,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: conn.execute("UPDATE prs SET cost_usd = cost_usd + ? WHERE number = ?", (pr_cost, pr_number)) return {"pr": pr_number, "skipped": True, "reason": "openrouter_failed"} - domain_verdict = _parse_verdict(domain_review, agent) + domain_verdict = parse_verdict(domain_review, agent) conn.execute( "UPDATE prs SET domain_verdict = ?, domain_model = ? WHERE number = ?", (domain_verdict, config.EVAL_DOMAIN_MODEL, pr_number), @@ -402,7 +251,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # If domain review rejects, skip Leo review (save Opus) if domain_verdict == "request_changes": logger.info("PR #%d: domain rejected, skipping Leo review", pr_number) - domain_issues = _parse_issues(domain_review) if domain_review else [] + domain_issues = parse_issues(domain_review) if domain_review else [] reopen_pr(conn, pr_number, leo_verdict='skipped', last_error='domain review requested changes', eval_issues=json.dumps(domain_issues)) @@ -416,7 +265,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) # Disposition: check if this PR should be terminated or kept open - await _dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) + await dispose_rejected_pr(conn, pr_number, eval_attempts, domain_issues) if domain_verdict != "skipped": pr_cost += costs.record_usage( @@ -456,7 +305,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: reason = "opus_rate_limited" if tier == "DEEP" else "openrouter_failed" return {"pr": pr_number, "skipped": True, "reason": reason} - leo_verdict = _parse_verdict(leo_review, "LEO") + leo_verdict = parse_verdict(leo_review, "LEO") conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_number)) # Post Leo review as comment (from Leo's Forgejo account) @@ -484,7 +333,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" # Submit formal Forgejo reviews (required for merge) - await _post_formal_approvals(pr_number, pr_author) + await post_formal_approvals(pr_number, pr_author) # Auto-merge agent PRs: if branch is NOT pipeline-owned, set auto_merge=1 # so the merge cycle picks it up without manual intervention. @@ -513,9 +362,9 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: # Collect all issue tags from both reviews all_issues = [] if domain_verdict == "request_changes" and domain_review is not None: - all_issues.extend(_parse_issues(domain_review)) + all_issues.extend(parse_issues(domain_review)) if leo_verdict == "request_changes" and leo_review is not None: - all_issues.extend(_parse_issues(leo_review)) + all_issues.extend(parse_issues(leo_review)) reopen_pr(conn, pr_number, eval_issues=json.dumps(all_issues)) # Store feedback for re-extraction path @@ -547,7 +396,7 @@ async def evaluate_pr(conn, pr_number: int, tier: str = None) -> dict: ) # Disposition: check if this PR should be terminated or kept open - await _dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues) + await dispose_rejected_pr(conn, pr_number, eval_attempts, all_issues) # Record cost (only for reviews that actually ran) if domain_verdict != "skipped": @@ -647,7 +496,7 @@ async def _run_batch_domain_eval( continue # Musings bypass - if _is_musings_only(diff): + if is_musings_only(diff): await forgejo_api( "POST", repo_path(f"issues/{pr_num}/comments"), @@ -658,10 +507,10 @@ async def _run_batch_domain_eval( succeeded += 1 continue - review_diff, _ = _filter_diff(diff) + review_diff, _ = filter_diff(diff) if not review_diff: review_diff = diff - files = _extract_changed_files(diff) + files = extract_changed_files(diff) # Build label from branch name or first claim filename branch = pr_row.get("branch", "") @@ -731,8 +580,8 @@ async def _run_batch_domain_eval( return 0, len(claimed_prs) # Step 3: Parse + validate fan-out - parsed = _parse_batch_response(batch_response, claimed_prs, agent) - valid_reviews, fallback_prs = _validate_batch_fanout(parsed, pr_diffs, agent) + parsed = parse_batch_response(batch_response, claimed_prs, agent) + valid_reviews, fallback_prs = validate_batch_fanout(parsed, pr_diffs, agent) db.audit( conn, "evaluate", "batch_domain_review", @@ -769,7 +618,7 @@ async def _run_batch_domain_eval( continue review_text = valid_reviews[pr_num] - domain_verdict = _parse_verdict(review_text, agent) + domain_verdict = parse_verdict(review_text, agent) # Post domain review comment agent_tok = get_agent_token(agent) @@ -787,7 +636,7 @@ async def _run_batch_domain_eval( # If domain rejects, handle disposition (same as individual path) if domain_verdict == "request_changes": - domain_issues = _parse_issues(review_text) + domain_issues = parse_issues(review_text) eval_attempts = (conn.execute( "SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,) ).fetchone()["eval_attempts"] or 0) @@ -799,7 +648,7 @@ async def _run_batch_domain_eval( conn, "evaluate", "domain_rejected", json.dumps({"pr": pr_num, "agent": agent, "issues": domain_issues, "batch": True}), ) - await _dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues) + await dispose_rejected_pr(conn, pr_num, eval_attempts, domain_issues) succeeded += 1 continue @@ -821,7 +670,7 @@ async def _run_batch_domain_eval( logger.info("PR #%d: Leo rate limited, will retry next cycle", pr_num) continue - leo_verdict = _parse_verdict(leo_review, "LEO") + leo_verdict = parse_verdict(leo_review, "LEO") conn.execute("UPDATE prs SET leo_verdict = ? WHERE number = ?", (leo_verdict, pr_num)) # Post Leo review @@ -846,7 +695,7 @@ async def _run_batch_domain_eval( if both_approve: pr_info = await forgejo_api("GET", repo_path(f"pulls/{pr_num}")) pr_author = pr_info.get("user", {}).get("login", "") if pr_info else "" - await _post_formal_approvals(pr_num, pr_author) + await post_formal_approvals(pr_num, pr_author) approve_pr(conn, pr_num, domain=domain or 'cross-domain', auto_merge=1) db.audit( conn, "evaluate", "approved", @@ -857,7 +706,7 @@ async def _run_batch_domain_eval( else: all_issues = [] if leo_verdict == "request_changes": - all_issues.extend(_parse_issues(leo_review)) + all_issues.extend(parse_issues(leo_review)) reopen_pr(conn, pr_num, eval_issues=json.dumps(all_issues)) feedback = {"leo": leo_verdict, "domain": domain_verdict, "tier": "STANDARD", "issues": all_issues} @@ -873,7 +722,7 @@ async def _run_batch_domain_eval( eval_attempts = (conn.execute( "SELECT eval_attempts FROM prs WHERE number = ?", (pr_num,) ).fetchone()["eval_attempts"] or 0) - await _dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues) + await dispose_rejected_pr(conn, pr_num, eval_attempts, all_issues) succeeded += 1 diff --git a/tests/test_eval_actions.py b/tests/test_eval_actions.py new file mode 100644 index 0000000..1e33671 --- /dev/null +++ b/tests/test_eval_actions.py @@ -0,0 +1,290 @@ +"""Tests for lib/eval_actions.py — async PR disposition actions.""" + +import asyncio +import json +import sqlite3 +import sys +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# Mock heavy dependencies before importing +sys.modules.setdefault("aiohttp", MagicMock()) + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from lib.eval_actions import dispose_rejected_pr, post_formal_approvals, terminate_pr + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +def _make_db(): + """Create a minimal in-memory DB with prs + sources tables.""" + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + conn.execute(""" + CREATE TABLE prs ( + number INTEGER PRIMARY KEY, + source_path TEXT, + branch TEXT, + status TEXT NOT NULL DEFAULT 'open', + domain TEXT, + agent TEXT, + auto_merge INTEGER DEFAULT 0, + leo_verdict TEXT DEFAULT 'pending', + domain_verdict TEXT DEFAULT 'pending', + eval_attempts INTEGER DEFAULT 0, + eval_issues TEXT DEFAULT '[]', + merge_cycled INTEGER DEFAULT 0, + merge_failures INTEGER DEFAULT 0, + conflict_rebase_attempts INTEGER DEFAULT 0, + last_error TEXT, + merged_at TEXT, + last_attempt TEXT, + cost_usd REAL DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')) + ) + """) + conn.execute(""" + CREATE TABLE sources ( + path TEXT PRIMARY KEY, + status TEXT DEFAULT 'extracted', + feedback TEXT, + updated_at TEXT + ) + """) + conn.execute(""" + CREATE TABLE audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + stage TEXT, + event TEXT, + detail TEXT, + ts TEXT DEFAULT (datetime('now')) + ) + """) + return conn + + +def _insert_pr(conn, number=100, status="open", source_path=None, eval_issues=None, **kwargs): + """Insert a test PR row.""" + cols = ["number", "status"] + vals = [number, status] + if source_path is not None: + cols.append("source_path") + vals.append(source_path) + if eval_issues is not None: + cols.append("eval_issues") + vals.append(eval_issues) + for k, v in kwargs.items(): + cols.append(k) + vals.append(v) + placeholders = ", ".join("?" * len(cols)) + conn.execute(f"INSERT INTO prs ({', '.join(cols)}) VALUES ({placeholders})", vals) + conn.commit() + + +def _run(coro): + """Run async coroutine in sync test.""" + return asyncio.run(coro) + + +# --------------------------------------------------------------------------- +# post_formal_approvals +# --------------------------------------------------------------------------- + +class TestPostFormalApprovals: + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + @patch("lib.eval_actions.get_agent_token") + def test_posts_two_approvals(self, mock_token, mock_api): + mock_token.return_value = "fake-token" + mock_api.return_value = {"id": 1} + _run(post_formal_approvals(100, "epimetheus")) + # Should have called forgejo_api exactly 2 times (first 2 agents that aren't author) + assert mock_api.call_count == 2 + + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + @patch("lib.eval_actions.get_agent_token") + def test_skips_pr_author(self, mock_token, mock_api): + mock_token.return_value = "fake-token" + mock_api.return_value = {"id": 1} + _run(post_formal_approvals(100, "leo")) + # Leo is first in the list, should be skipped, next 2 agents used + calls = mock_api.call_args_list + for call in calls: + assert "leo" not in str(call).lower() or "token" in str(call) + + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + @patch("lib.eval_actions.get_agent_token") + def test_handles_no_tokens(self, mock_token, mock_api): + mock_token.return_value = None + _run(post_formal_approvals(100, "someone")) + # No tokens available — no API calls + mock_api.assert_not_called() + + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + @patch("lib.eval_actions.get_agent_token") + def test_continues_on_api_failure(self, mock_token, mock_api): + mock_token.return_value = "fake-token" + # First call fails (returns None), subsequent succeed + mock_api.side_effect = [None, {"id": 1}, {"id": 2}, {"id": 3}] + _run(post_formal_approvals(100, "nobody")) + # Should keep trying until 2 successes + assert mock_api.call_count >= 3 + + +# --------------------------------------------------------------------------- +# terminate_pr +# --------------------------------------------------------------------------- + +class TestTerminatePr: + @patch("lib.eval_actions.close_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_closes_pr_and_requeues_source(self, mock_api, mock_close): + mock_api.return_value = {"id": 1} + mock_close.return_value = True + conn = _make_db() + _insert_pr(conn, 100, source_path="inbox/queue/test.md", + eval_issues='["factual_discrepancy"]') + conn.execute("INSERT INTO sources (path, status) VALUES (?, ?)", + ("inbox/queue/test.md", "extracted")) + conn.commit() + + _run(terminate_pr(conn, 100, "test reason")) + + # close_pr called + mock_close.assert_called_once() + # Source requeued + src = conn.execute("SELECT status FROM sources WHERE path = ?", + ("inbox/queue/test.md",)).fetchone() + assert src["status"] == "needs_reextraction" + # Audit logged + audit = conn.execute("SELECT * FROM audit_log WHERE event = 'pr_terminated'").fetchone() + assert audit is not None + + @patch("lib.eval_actions.close_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_skips_requeue_on_close_failure(self, mock_api, mock_close): + mock_api.return_value = {"id": 1} + mock_close.return_value = False + conn = _make_db() + _insert_pr(conn, 100, source_path="inbox/queue/test.md") + conn.execute("INSERT INTO sources (path, status) VALUES (?, ?)", + ("inbox/queue/test.md", "extracted")) + conn.commit() + + _run(terminate_pr(conn, 100, "test reason")) + + # Source NOT requeued + src = conn.execute("SELECT status FROM sources WHERE path = ?", + ("inbox/queue/test.md",)).fetchone() + assert src["status"] == "extracted" + # No audit for pr_terminated + audit = conn.execute("SELECT * FROM audit_log WHERE event = 'pr_terminated'").fetchone() + assert audit is None + + @patch("lib.eval_actions.close_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_posts_structured_feedback_with_issues(self, mock_api, mock_close): + mock_api.return_value = {"id": 1} + mock_close.return_value = True + conn = _make_db() + _insert_pr(conn, 100, eval_issues='["frontmatter_schema", "near_duplicate"]') + + _run(terminate_pr(conn, 100, "budget exhausted")) + + # Comment posted with structured feedback + comment_call = mock_api.call_args_list[0] + body = comment_call[0][2]["body"] + assert "Closed by eval pipeline" in body + + @patch("lib.eval_actions.close_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_handles_no_source_path(self, mock_api, mock_close): + mock_api.return_value = {"id": 1} + mock_close.return_value = True + conn = _make_db() + _insert_pr(conn, 100) # No source_path + + _run(terminate_pr(conn, 100, "no source")) + + # Should not crash, audit should still be logged + audit = conn.execute("SELECT * FROM audit_log WHERE event = 'pr_terminated'").fetchone() + assert audit is not None + + +# --------------------------------------------------------------------------- +# dispose_rejected_pr +# --------------------------------------------------------------------------- + +class TestDisposeRejectedPr: + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_attempt_1_posts_feedback_only(self, mock_api): + mock_api.return_value = {"id": 1} + conn = _make_db() + _insert_pr(conn, 100) + + _run(dispose_rejected_pr(conn, 100, eval_attempts=1, all_issues=["factual_discrepancy"])) + + # Should post feedback comment but NOT close + assert mock_api.call_count == 1 # Just the comment + pr = conn.execute("SELECT status FROM prs WHERE number = 100").fetchone() + assert pr["status"] == "open" # Still open + + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_attempt_1_no_issues_no_action(self, mock_api): + conn = _make_db() + _insert_pr(conn, 100) + + _run(dispose_rejected_pr(conn, 100, eval_attempts=1, all_issues=[])) + + mock_api.assert_not_called() + + @patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_attempt_max_terminates(self, mock_api, mock_terminate): + conn = _make_db() + _insert_pr(conn, 100) + + _run(dispose_rejected_pr(conn, 100, eval_attempts=3, all_issues=["factual_discrepancy"])) + + mock_terminate.assert_called_once() + assert "eval budget exhausted" in mock_terminate.call_args[0][2] + + @patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_attempt_2_mechanical_keeps_open(self, mock_api, mock_terminate): + conn = _make_db() + _insert_pr(conn, 100) + + _run(dispose_rejected_pr(conn, 100, eval_attempts=2, all_issues=["frontmatter_schema"])) + + # Should NOT terminate — mechanical issues get one more try + mock_terminate.assert_not_called() + # Should log audit + audit = conn.execute("SELECT * FROM audit_log WHERE event = 'mechanical_retry'").fetchone() + assert audit is not None + + @patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_attempt_2_substantive_terminates(self, mock_api, mock_terminate): + conn = _make_db() + _insert_pr(conn, 100) + + _run(dispose_rejected_pr(conn, 100, eval_attempts=2, all_issues=["factual_discrepancy"])) + + mock_terminate.assert_called_once() + assert "substantive issues" in mock_terminate.call_args[0][2] + + @patch("lib.eval_actions.terminate_pr", new_callable=AsyncMock) + @patch("lib.eval_actions.forgejo_api", new_callable=AsyncMock) + def test_attempt_2_mixed_terminates(self, mock_api, mock_terminate): + conn = _make_db() + _insert_pr(conn, 100) + + _run(dispose_rejected_pr(conn, 100, eval_attempts=2, + all_issues=["frontmatter_schema", "factual_discrepancy"])) + + mock_terminate.assert_called_once() diff --git a/tests/test_eval_parse.py b/tests/test_eval_parse.py index 0c37fa3..786d5a6 100644 --- a/tests/test_eval_parse.py +++ b/tests/test_eval_parse.py @@ -13,23 +13,23 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from lib.eval_parse import ( VALID_ISSUE_TAGS, - _classify_issues, - _deterministic_tier, - _diff_contains_claim_type, - _extract_changed_files, - _filter_diff, - _infer_issues_from_prose, - _is_musings_only, - _normalize_tag, - _parse_batch_response, - _parse_issues, - _parse_verdict, - _validate_batch_fanout, + classify_issues, + deterministic_tier, + diff_contains_claim_type, + extract_changed_files, + filter_diff, + infer_issues_from_prose, + is_musings_only, + normalize_tag, + parse_batch_response, + parse_issues, + parse_verdict, + validate_batch_fanout, ) # --------------------------------------------------------------------------- -# _filter_diff +# filter_diff # --------------------------------------------------------------------------- class TestFilterDiff: @@ -40,7 +40,7 @@ class TestFilterDiff: "diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n" "+real content\n" ) - review_diff, entity_diff = _filter_diff(diff) + review_diff, entity_diff = filter_diff(diff) assert "inbox" not in review_diff assert "claim.md" in review_diff @@ -51,7 +51,7 @@ class TestFilterDiff: "diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n" "+claim content\n" ) - review_diff, entity_diff = _filter_diff(diff) + review_diff, entity_diff = filter_diff(diff) assert "alice.md" in entity_diff assert "claim.md" in review_diff @@ -60,13 +60,13 @@ class TestFilterDiff: "diff --git a/entities/living-agents/agent.md b/entities/living-agents/agent.md\n" "+core entity\n" ) - review_diff, entity_diff = _filter_diff(diff) + review_diff, entity_diff = filter_diff(diff) assert "agent.md" in review_diff assert entity_diff == "" # --------------------------------------------------------------------------- -# _extract_changed_files +# extract_changed_files # --------------------------------------------------------------------------- class TestExtractChangedFiles: @@ -77,189 +77,189 @@ class TestExtractChangedFiles: "diff --git a/entities/bar.md b/entities/bar.md\n" "+more\n" ) - result = _extract_changed_files(diff) + result = extract_changed_files(diff) assert "domains/foo.md" in result assert "entities/bar.md" in result def test_empty_diff(self): - assert _extract_changed_files("") == "" + assert extract_changed_files("") == "" # --------------------------------------------------------------------------- -# _is_musings_only +# is_musings_only # --------------------------------------------------------------------------- class TestIsMusingsOnly: def test_musings_only(self): diff = "diff --git a/agents/rio/musings/thought.md b/agents/rio/musings/thought.md\n+musing\n" - assert _is_musings_only(diff) is True + assert is_musings_only(diff) is True def test_mixed_files(self): diff = ( "diff --git a/agents/rio/musings/thought.md b/agents/rio/musings/thought.md\n+musing\n" "diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n+claim\n" ) - assert _is_musings_only(diff) is False + assert is_musings_only(diff) is False def test_no_musings(self): diff = "diff --git a/domains/finance/claim.md b/domains/finance/claim.md\n+claim\n" - assert _is_musings_only(diff) is False + assert is_musings_only(diff) is False # --------------------------------------------------------------------------- -# _diff_contains_claim_type +# diff_contains_claim_type # --------------------------------------------------------------------------- class TestDiffContainsClaimType: def test_detects_claim_type(self): diff = "+type: claim\n+confidence: 0.8\n" - assert _diff_contains_claim_type(diff) is True + assert diff_contains_claim_type(diff) is True def test_ignores_non_claim(self): diff = "+type: entity\n" - assert _diff_contains_claim_type(diff) is False + assert diff_contains_claim_type(diff) is False def test_ignores_file_header(self): diff = "+++ b/type: claim\n" - assert _diff_contains_claim_type(diff) is False + assert diff_contains_claim_type(diff) is False # --------------------------------------------------------------------------- -# _deterministic_tier +# deterministic_tier # --------------------------------------------------------------------------- class TestDeterministicTier: def test_entities_only_is_light(self): diff = "diff --git a/entities/people/alice.md b/entities/people/alice.md\n+data\n" - assert _deterministic_tier(diff) == "LIGHT" + assert deterministic_tier(diff) == "LIGHT" def test_inbox_only_is_light(self): diff = "diff --git a/inbox/archive/src.md b/inbox/archive/src.md\n+data\n" - assert _deterministic_tier(diff) == "LIGHT" + assert deterministic_tier(diff) == "LIGHT" def test_core_is_deep(self): diff = "diff --git a/core/schema.md b/core/schema.md\n+structural change\n" - assert _deterministic_tier(diff) == "DEEP" + assert deterministic_tier(diff) == "DEEP" def test_challenged_by_is_deep(self): diff = "diff --git a/domains/x/claim.md b/domains/x/claim.md\n+challenged_by: some-claim\n" - assert _deterministic_tier(diff) == "DEEP" + assert deterministic_tier(diff) == "DEEP" def test_domain_claim_returns_none(self): diff = "diff --git a/domains/finance/new-claim.md b/domains/finance/new-claim.md\n--- /dev/null\n+++ b/domains/finance/new-claim.md\n+type: claim\n" - assert _deterministic_tier(diff) is None + assert deterministic_tier(diff) is None def test_empty_diff_returns_none(self): - assert _deterministic_tier("") is None + assert deterministic_tier("") is None # --------------------------------------------------------------------------- -# _parse_verdict +# parse_verdict # --------------------------------------------------------------------------- class TestParseVerdict: def test_approve(self): - assert _parse_verdict("VERDICT:LEO:APPROVE", "LEO") == "approve" + assert parse_verdict("VERDICT:LEO:APPROVE", "LEO") == "approve" def test_request_changes(self): - assert _parse_verdict("VERDICT:LEO:REQUEST_CHANGES", "LEO") == "request_changes" + assert parse_verdict("VERDICT:LEO:REQUEST_CHANGES", "LEO") == "request_changes" def test_no_verdict_defaults_to_request_changes(self): - assert _parse_verdict("some review text", "LEO") == "request_changes" + assert parse_verdict("some review text", "LEO") == "request_changes" def test_case_insensitive_reviewer(self): - assert _parse_verdict("VERDICT:LEO:APPROVE", "leo") == "approve" + assert parse_verdict("VERDICT:LEO:APPROVE", "leo") == "approve" # --------------------------------------------------------------------------- -# _normalize_tag +# normalize_tag # --------------------------------------------------------------------------- class TestNormalizeTag: def test_valid_tag_passes_through(self): - assert _normalize_tag("broken_wiki_links") == "broken_wiki_links" + assert normalize_tag("broken_wiki_links") == "broken_wiki_links" def test_alias_normalizes(self): - assert _normalize_tag("schema_violation") == "frontmatter_schema" - assert _normalize_tag("duplicate") == "near_duplicate" - assert _normalize_tag("factual_error") == "factual_discrepancy" + assert normalize_tag("schema_violation") == "frontmatter_schema" + assert normalize_tag("duplicate") == "near_duplicate" + assert normalize_tag("factual_error") == "factual_discrepancy" def test_hyphen_to_underscore(self): - assert _normalize_tag("unverified-wiki-links") == "broken_wiki_links" + assert normalize_tag("unverified-wiki-links") == "broken_wiki_links" def test_unknown_returns_none(self): - assert _normalize_tag("totally_made_up") is None + assert normalize_tag("totally_made_up") is None def test_fuzzy_substring_match(self): - assert _normalize_tag("wiki_links") == "broken_wiki_links" + assert normalize_tag("wiki_links") == "broken_wiki_links" # --------------------------------------------------------------------------- -# _parse_issues +# parse_issues # --------------------------------------------------------------------------- class TestParseIssues: def test_structured_comment(self): text = "Review text. " - result = _parse_issues(text) + result = parse_issues(text) assert "frontmatter_schema" in result assert "near_duplicate" in result def test_fallback_to_prose(self): text = "The frontmatter is missing required fields." - result = _parse_issues(text) + result = parse_issues(text) assert "frontmatter_schema" in result def test_empty_structured_falls_back(self): text = "Review. " # All tags unrecognizable, falls through to prose - result = _parse_issues(text) + result = parse_issues(text) # With no prose keywords either, should return empty assert isinstance(result, list) # --------------------------------------------------------------------------- -# _infer_issues_from_prose +# infer_issues_from_prose # --------------------------------------------------------------------------- class TestInferIssuesFromProse: def test_detects_frontmatter_issues(self): - result = _infer_issues_from_prose("The YAML frontmatter is invalid.") + result = infer_issues_from_prose("The YAML frontmatter is invalid.") assert "frontmatter_schema" in result def test_detects_wiki_link_issues(self): - result = _infer_issues_from_prose("Found broken wiki links in the claim.") + result = infer_issues_from_prose("Found broken wiki links in the claim.") assert "broken_wiki_links" in result def test_detects_near_duplicate(self): - result = _infer_issues_from_prose("This is a near-duplicate of an existing claim.") + result = infer_issues_from_prose("This is a near-duplicate of an existing claim.") assert "near_duplicate" in result def test_no_matches(self): - result = _infer_issues_from_prose("This claim is well-structured and accurate.") + result = infer_issues_from_prose("This claim is well-structured and accurate.") assert result == [] # --------------------------------------------------------------------------- -# _classify_issues +# classify_issues # --------------------------------------------------------------------------- class TestClassifyIssues: def test_mechanical_only(self): - assert _classify_issues(["frontmatter_schema", "near_duplicate"]) == "mechanical" + assert classify_issues(["frontmatter_schema", "near_duplicate"]) == "mechanical" def test_substantive_only(self): - assert _classify_issues(["factual_discrepancy"]) == "substantive" + assert classify_issues(["factual_discrepancy"]) == "substantive" def test_mixed(self): - assert _classify_issues(["frontmatter_schema", "factual_discrepancy"]) == "mixed" + assert classify_issues(["frontmatter_schema", "factual_discrepancy"]) == "mixed" def test_empty(self): - assert _classify_issues([]) == "unknown" + assert classify_issues([]) == "unknown" # --------------------------------------------------------------------------- -# _parse_batch_response +# parse_batch_response # --------------------------------------------------------------------------- class TestParseBatchResponse: @@ -268,7 +268,7 @@ class TestParseBatchResponse: "=== PR #100\nReview for 100\n\n" "=== PR #200\nReview for 200\n\n" ) - result = _parse_batch_response(response, [100, 200], "agent") + result = parse_batch_response(response, [100, 200], "agent") assert 100 in result assert 200 in result assert "APPROVE" in result[100] @@ -276,17 +276,17 @@ class TestParseBatchResponse: def test_missing_pr(self): response = "=== PR #100\nReview\n\n" - result = _parse_batch_response(response, [100, 200], "agent") + result = parse_batch_response(response, [100, 200], "agent") assert 100 in result assert 200 not in result def test_empty_response(self): - result = _parse_batch_response("no verdicts here", [100], "agent") + result = parse_batch_response("no verdicts here", [100], "agent") assert result == {} # --------------------------------------------------------------------------- -# _validate_batch_fanout +# validate_batch_fanout # --------------------------------------------------------------------------- class TestValidateBatchFanout: @@ -296,7 +296,7 @@ class TestValidateBatchFanout: {"number": 100, "diff": "diff --git a/domains/long-enough-path.md b/domains/long-enough-path.md\n"}, {"number": 200, "diff": "diff --git a/domains/other-long-path.md b/domains/other-long-path.md\n"}, ] - valid, fallback = _validate_batch_fanout(parsed, pr_diffs, "agent") + valid, fallback = validate_batch_fanout(parsed, pr_diffs, "agent") assert 200 in fallback assert 200 not in valid @@ -309,7 +309,7 @@ class TestValidateBatchFanout: {"number": 100, "diff": "diff --git a/domains/long-enough-path.md b/domains/long-enough-path.md\n"}, {"number": 200, "diff": "diff --git a/domains/other-long-path.md b/domains/other-long-path.md\n"}, ] - valid, fallback = _validate_batch_fanout(parsed, pr_diffs, "agent") + valid, fallback = validate_batch_fanout(parsed, pr_diffs, "agent") assert 100 in valid assert 200 in valid assert fallback == []