"""PR disposition actions — async Forgejo + DB operations for end-of-eval decisions. Extracted from evaluate.py to isolate the "do something to this PR" functions from orchestration logic. Contains: - post_formal_approvals: submit Forgejo reviews from 2 agents (not PR author) - terminate_pr: close PR, post rejection comment, requeue source - dispose_rejected_pr: disposition logic for rejected PRs on attempt 2+ All functions are async (Forgejo API calls). Dependencies: forgejo, db, config, pr_state, feedback, eval_parse. """ import asyncio import json import logging from . import config, db from .eval_parse import classify_issues from .feedback import format_rejection_comment from .forgejo import api as forgejo_api, get_agent_token, get_pr_diff, repo_path from .github_feedback import on_closed, on_eval_complete from .pr_state import close_pr logger = logging.getLogger("pipeline.eval_actions") async def post_formal_approvals(pr_number: int, pr_author: str): """Submit formal Forgejo reviews from 2 agents (not the PR author).""" approvals = 0 for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]: if agent_name == pr_author: continue if approvals >= 2: break token = get_agent_token(agent_name) if token: result = await forgejo_api( "POST", repo_path(f"pulls/{pr_number}/reviews"), {"body": "Approved.", "event": "APPROVED"}, token=token, ) if result is not None: approvals += 1 logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals) async def terminate_pr(conn, pr_number: int, reason: str): """Terminal state: close PR on Forgejo, mark source needs_human.""" # Get issue tags for structured feedback row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone() issues = [] if row and row["eval_issues"]: try: issues = json.loads(row["eval_issues"]) except (json.JSONDecodeError, TypeError): pass # Post structured rejection comment with quality gate guidance if issues: feedback_body = format_rejection_comment(issues, source="eval_terminal") comment_body = ( f"**Closed by eval pipeline** — {reason}.\n\n" f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " f"Source will be re-queued with feedback.\n\n" f"{feedback_body}" ) else: comment_body = ( f"**Closed by eval pipeline** — {reason}.\n\n" f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. " f"Source will be re-queued with feedback." ) await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": comment_body}, ) closed = await close_pr(conn, pr_number, last_error=reason) if not closed: logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number) return try: await on_closed(conn, pr_number, reason=reason) except Exception: logger.exception("PR #%d: GitHub close feedback failed (non-fatal)", pr_number) # Tag source for re-extraction with feedback cursor = conn.execute( """UPDATE sources SET status = 'needs_reextraction', updated_at = datetime('now') WHERE path = (SELECT source_path FROM prs WHERE number = ?)""", (pr_number,), ) if cursor.rowcount == 0: logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number) db.audit( conn, "evaluate", "pr_terminated", json.dumps( { "pr": pr_number, "reason": reason, } ), ) logger.info("PR #%d: TERMINATED — %s", pr_number, reason) async def dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]): """Disposition logic for rejected PRs on attempt 2+. Auto-close gate (all attempts): near-duplicate of an already-merged PR for the same source — close immediately. Avoids the Apr 22 runaway-damage pattern where a source extracted 20+ times in a short window produced dozens of open PRs that all had to be closed manually. Attempt 1: normal — back to open, wait for fix. Attempt 2: check issue classification. - Mechanical only: keep open for one more attempt (auto-fix future). - Substantive or mixed: close PR, requeue source. Attempt 3+: terminal. """ # Auto-close near-duplicate when a merged sibling for the same source exists. # Runs before the attempt-count branches so it catches the common runaway # case on attempt 1 instead of waiting for attempt 2's terminate path. # # Exact-match requirement (Ganymede review): compound rejections like # ["near_duplicate", "factual_discrepancy"] carry signal about the merged # sibling being wrong or limited — we want humans to see those. Only the # pure single-issue case is safe to auto-close. if all_issues == ["near_duplicate"]: existing_merged = conn.execute( """SELECT p2.number, p1.source_path FROM prs p1 JOIN prs p2 ON p2.source_path = p1.source_path WHERE p1.number = ? AND p1.source_path IS NOT NULL AND p2.number != p1.number AND p2.status = 'merged' LIMIT 1""", (pr_number,), ).fetchone() if existing_merged: sibling = existing_merged[0] source_path = existing_merged[1] # Enrichment guard: LLM reviewers can flag enrichment prose as # "redundant" via eval_parse regex, tagging near_duplicate even # though validate.py's structural check only fires on NEW files. # If the PR only MODIFIES existing files (no "new file mode" in # diff), it's an enrichment — skip auto-close so a human reviews. # # 10s timeout bounds damage when Forgejo is wedged (Apr 22 incident: # hung for 2.5h). Conservative fallback: skip auto-close on any # failure — fall through to normal rejection path. try: diff = await asyncio.wait_for(get_pr_diff(pr_number), timeout=10) except (asyncio.TimeoutError, Exception): logger.warning( "PR #%d: diff fetch failed/timed out for near-dup guard — skipping auto-close", pr_number, exc_info=True, ) diff = None if not diff: # None or empty — conservative fallback, fall through to attempt-count branches pass elif "new file mode" not in diff: logger.info( "PR #%d: near_duplicate but modifies-only (enrichment) — skipping auto-close", pr_number, ) else: logger.info( "PR #%d: auto-closing near-duplicate of merged PR #%d (same source)", pr_number, sibling, ) # Post a brief explanation before closing (best-effort — non-fatal) try: await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": ( f"Auto-closed: near-duplicate of already-merged PR " f"#{sibling} (same source: `{source_path}`)." )}, ) except Exception: logger.debug("PR #%d: auto-close comment failed (non-fatal)", pr_number, exc_info=True) await close_pr( conn, pr_number, last_error=f"auto_closed_near_duplicate: merged sibling #{sibling}", ) db.audit( conn, "evaluate", "auto_closed_near_duplicate", json.dumps({ "pr": pr_number, "merged_sibling": sibling, "source_path": source_path, "eval_attempts": eval_attempts, }), ) return if eval_attempts < 2: # Attempt 1: post structured feedback so agent learns, but don't close if all_issues: feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1") await forgejo_api( "POST", repo_path(f"issues/{pr_number}/comments"), {"body": feedback_body}, ) return classification = classify_issues(all_issues) if eval_attempts >= config.MAX_EVAL_ATTEMPTS: # Terminal await terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts") return if classification == "mechanical": # Mechanical issues only — keep open for one more attempt. # Future: auto-fix module will push fixes here. logger.info( "PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt", pr_number, eval_attempts, all_issues, ) db.audit( conn, "evaluate", "mechanical_retry", json.dumps( { "pr": pr_number, "attempt": eval_attempts, "issues": all_issues, } ), ) else: # Substantive, mixed, or unknown — close and requeue logger.info( "PR #%d: attempt %d, %s issues (%s) — closing and requeuing source", pr_number, eval_attempts, classification, all_issues, ) await terminate_pr( conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}" )