teleo-infrastructure/lib/eval_actions.py
m3taversal 33c17f87a8 feat(eval): auto-close near-duplicate PRs when merged sibling exists
Prevents Apr 22 runaway-damage pattern (44 open PRs manually bulk-closed)
where a source extracted 20+ times before the cooldown gate landed, each
leaving an orphan 'open' PR after eval correctly rejected as near-duplicate.

Gate fires in dispose_rejected_pr before attempt-count branches:
  all_issues == ["near_duplicate"]  (exact match — compound carries signal)
  AND sibling PR exists with same source_path in status='merged'
  AND diff contains "new file mode"  (not enrichment-only)
  → close on Forgejo + DB with audit, post explanation comment.

Ganymede review — 5 must-fix/warnings applied + 1 must-add:
- Exact match on single-issue near_duplicate (compound rejections preserved)
- Enrichment guard via diff scan (eval_parse regex can flag enrichment prose)
- 10s timeout on get_pr_diff — conservative fallback on Forgejo wedge
- Forgejo comment with canned explanation (best-effort, try/except)
- Partial index idx_prs_source_path + migration v23
- Explicit p1.source_path IS NOT NULL in WHERE

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 11:17:29 +01:00

260 lines
10 KiB
Python

"""PR disposition actions — async Forgejo + DB operations for end-of-eval decisions.
Extracted from evaluate.py to isolate the "do something to this PR" functions
from orchestration logic. Contains:
- post_formal_approvals: submit Forgejo reviews from 2 agents (not PR author)
- terminate_pr: close PR, post rejection comment, requeue source
- dispose_rejected_pr: disposition logic for rejected PRs on attempt 2+
All functions are async (Forgejo API calls). Dependencies: forgejo, db, config,
pr_state, feedback, eval_parse.
"""
import asyncio
import json
import logging
from . import config, db
from .eval_parse import classify_issues
from .feedback import format_rejection_comment
from .forgejo import api as forgejo_api, get_agent_token, get_pr_diff, repo_path
from .github_feedback import on_closed, on_eval_complete
from .pr_state import close_pr
logger = logging.getLogger("pipeline.eval_actions")
async def post_formal_approvals(pr_number: int, pr_author: str):
"""Submit formal Forgejo reviews from 2 agents (not the PR author)."""
approvals = 0
for agent_name in ["leo", "vida", "theseus", "clay", "astra", "rio"]:
if agent_name == pr_author:
continue
if approvals >= 2:
break
token = get_agent_token(agent_name)
if token:
result = await forgejo_api(
"POST",
repo_path(f"pulls/{pr_number}/reviews"),
{"body": "Approved.", "event": "APPROVED"},
token=token,
)
if result is not None:
approvals += 1
logger.debug("Formal approval for PR #%d by %s (%d/2)", pr_number, agent_name, approvals)
async def terminate_pr(conn, pr_number: int, reason: str):
"""Terminal state: close PR on Forgejo, mark source needs_human."""
# Get issue tags for structured feedback
row = conn.execute("SELECT eval_issues, agent FROM prs WHERE number = ?", (pr_number,)).fetchone()
issues = []
if row and row["eval_issues"]:
try:
issues = json.loads(row["eval_issues"])
except (json.JSONDecodeError, TypeError):
pass
# Post structured rejection comment with quality gate guidance
if issues:
feedback_body = format_rejection_comment(issues, source="eval_terminal")
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback.\n\n"
f"{feedback_body}"
)
else:
comment_body = (
f"**Closed by eval pipeline** — {reason}.\n\n"
f"Evaluated {config.MAX_EVAL_ATTEMPTS} times without passing. "
f"Source will be re-queued with feedback."
)
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": comment_body},
)
closed = await close_pr(conn, pr_number, last_error=reason)
if not closed:
logger.warning("PR #%d: Forgejo close failed — skipping source requeue, will retry next cycle", pr_number)
return
try:
await on_closed(conn, pr_number, reason=reason)
except Exception:
logger.exception("PR #%d: GitHub close feedback failed (non-fatal)", pr_number)
# Tag source for re-extraction with feedback
cursor = conn.execute(
"""UPDATE sources SET status = 'needs_reextraction',
updated_at = datetime('now')
WHERE path = (SELECT source_path FROM prs WHERE number = ?)""",
(pr_number,),
)
if cursor.rowcount == 0:
logger.warning("PR #%d: no source_path linked — source not requeued for re-extraction", pr_number)
db.audit(
conn,
"evaluate",
"pr_terminated",
json.dumps(
{
"pr": pr_number,
"reason": reason,
}
),
)
logger.info("PR #%d: TERMINATED — %s", pr_number, reason)
async def dispose_rejected_pr(conn, pr_number: int, eval_attempts: int, all_issues: list[str]):
"""Disposition logic for rejected PRs on attempt 2+.
Auto-close gate (all attempts): near-duplicate of an already-merged PR for
the same source — close immediately. Avoids the Apr 22 runaway-damage
pattern where a source extracted 20+ times in a short window produced
dozens of open PRs that all had to be closed manually.
Attempt 1: normal — back to open, wait for fix.
Attempt 2: check issue classification.
- Mechanical only: keep open for one more attempt (auto-fix future).
- Substantive or mixed: close PR, requeue source.
Attempt 3+: terminal.
"""
# Auto-close near-duplicate when a merged sibling for the same source exists.
# Runs before the attempt-count branches so it catches the common runaway
# case on attempt 1 instead of waiting for attempt 2's terminate path.
#
# Exact-match requirement (Ganymede review): compound rejections like
# ["near_duplicate", "factual_discrepancy"] carry signal about the merged
# sibling being wrong or limited — we want humans to see those. Only the
# pure single-issue case is safe to auto-close.
if all_issues == ["near_duplicate"]:
existing_merged = conn.execute(
"""SELECT p2.number, p1.source_path FROM prs p1
JOIN prs p2 ON p2.source_path = p1.source_path
WHERE p1.number = ?
AND p1.source_path IS NOT NULL
AND p2.number != p1.number
AND p2.status = 'merged'
LIMIT 1""",
(pr_number,),
).fetchone()
if existing_merged:
sibling = existing_merged[0]
source_path = existing_merged[1]
# Enrichment guard: LLM reviewers can flag enrichment prose as
# "redundant" via eval_parse regex, tagging near_duplicate even
# though validate.py's structural check only fires on NEW files.
# If the PR only MODIFIES existing files (no "new file mode" in
# diff), it's an enrichment — skip auto-close so a human reviews.
#
# 10s timeout bounds damage when Forgejo is wedged (Apr 22 incident:
# hung for 2.5h). Conservative fallback: skip auto-close on any
# failure — fall through to normal rejection path.
try:
diff = await asyncio.wait_for(get_pr_diff(pr_number), timeout=10)
except (asyncio.TimeoutError, Exception):
logger.warning(
"PR #%d: diff fetch failed/timed out for near-dup guard — skipping auto-close",
pr_number, exc_info=True,
)
diff = None
if diff is None:
# Conservative: fall through to attempt-count branches below
pass
elif diff and "new file mode" not in diff:
logger.info(
"PR #%d: near_duplicate but modifies-only (enrichment) — skipping auto-close",
pr_number,
)
else:
logger.info(
"PR #%d: auto-closing near-duplicate of merged PR #%d (same source)",
pr_number, sibling,
)
# Post a brief explanation before closing (best-effort — non-fatal)
try:
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": (
f"Auto-closed: near-duplicate of already-merged PR "
f"#{sibling} (same source: `{source_path}`)."
)},
)
except Exception:
logger.debug("PR #%d: auto-close comment failed (non-fatal)", pr_number, exc_info=True)
await close_pr(
conn, pr_number,
last_error=f"auto_closed_near_duplicate: merged sibling #{sibling}",
)
db.audit(
conn, "evaluate", "auto_closed_near_duplicate",
json.dumps({
"pr": pr_number,
"merged_sibling": sibling,
"source_path": source_path,
"eval_attempts": eval_attempts,
}),
)
return
if eval_attempts < 2:
# Attempt 1: post structured feedback so agent learns, but don't close
if all_issues:
feedback_body = format_rejection_comment(all_issues, source="eval_attempt_1")
await forgejo_api(
"POST",
repo_path(f"issues/{pr_number}/comments"),
{"body": feedback_body},
)
return
classification = classify_issues(all_issues)
if eval_attempts >= config.MAX_EVAL_ATTEMPTS:
# Terminal
await terminate_pr(conn, pr_number, f"eval budget exhausted after {eval_attempts} attempts")
return
if classification == "mechanical":
# Mechanical issues only — keep open for one more attempt.
# Future: auto-fix module will push fixes here.
logger.info(
"PR #%d: attempt %d, mechanical issues only (%s) — keeping open for fix attempt",
pr_number,
eval_attempts,
all_issues,
)
db.audit(
conn,
"evaluate",
"mechanical_retry",
json.dumps(
{
"pr": pr_number,
"attempt": eval_attempts,
"issues": all_issues,
}
),
)
else:
# Substantive, mixed, or unknown — close and requeue
logger.info(
"PR #%d: attempt %d, %s issues (%s) — closing and requeuing source",
pr_number,
eval_attempts,
classification,
all_issues,
)
await terminate_pr(
conn, pr_number, f"substantive issues after {eval_attempts} attempts: {', '.join(all_issues)}"
)