fix(reaper): apply Ganymede review — dual-PATCH drift, breaker isolation, env config

Followup to f97dd15. Four fixes from review:

MUST-FIX #1 — Forgejo double-PATCH drift
  reaper closes PR via forgejo_api PATCH at line 689, then close_pr() at
  line 700 issued a second PATCH (default close_on_forgejo=True). On
  transient failure of the second PATCH, close_pr returns False without
  updating the DB → status='open' even though Forgejo is closed. Pass
  close_on_forgejo=False so DB close is unconditional after the explicit
  Forgejo PATCH succeeds.

MUST-FIX #2 — reaper exception trips fix breaker
  Unhandled exception in verdict_deadlock_reaper_cycle propagated to
  stage_loop, recording fix-stage failures. After 5 reaper failures the
  fix breaker would open and block mechanical+substantive for 15 min.
  Wrap reaper call in try/except in fix_cycle (same exception-isolation
  pattern as ingest_cycle's extract_cycle wrapper). Defense-in-depth
  must never block primary paths.

WARNING #1 — throttle SQL full-scan
  audit_log only has idx_audit_stage. Filtering on event alone caused
  full-table scans every 60s. Added stage='reaper' so the planner uses
  the existing index — reaper writes audit rows under stage='reaper'
  already so the filter is correct.

WARNING #2 — REAPER_DRY_RUN as code constant
  Flipping dry-run → live required edit + commit + push + deploy +
  restart. Moved REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS,
  REAPER_INTERVAL_SECONDS, REAPER_MAX_PER_RUN to lib/config.py with
  os.environ.get() overrides. Operator now flips via systemctl edit
  teleo-pipeline.service (Environment=REAPER_DRY_RUN=false) + restart.
  Defaults remain safe: dry-run, 24h age, hourly throttle, 50/run cap.

NIT — dry-run counter naming
  Renamed local `closed` counter in dry-run path to `would_close` so the
  heartbeat audit ("X closed, Y would-close") and journal log are
  unambiguous. Function still returns closed + would_close so callers
  see total work done.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-05-07 18:18:32 -04:00
parent f97dd15349
commit e14b5f2f05
3 changed files with 53 additions and 23 deletions

View file

@ -211,6 +211,14 @@ HEALTH_CHECK_INTERVAL = 60
# --- Extraction gates --- # --- Extraction gates ---
EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter. EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter.
# --- Verdict-deadlock reaper ---
# Defaults safe (dry-run, 24h age, hourly throttle). Operator flips REAPER_DRY_RUN
# to "false" via systemctl edit teleo-pipeline → restart, no code change required.
REAPER_DRY_RUN = os.environ.get("REAPER_DRY_RUN", "true").lower() == "true"
REAPER_DEADLOCK_AGE_HOURS = int(os.environ.get("REAPER_DEADLOCK_AGE_HOURS", "24"))
REAPER_INTERVAL_SECONDS = int(os.environ.get("REAPER_INTERVAL_SECONDS", "3600"))
REAPER_MAX_PER_RUN = int(os.environ.get("REAPER_MAX_PER_RUN", "50"))
# --- Retrieval (Telegram bot) --- # --- Retrieval (Telegram bot) ---
RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source
RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities

View file

@ -586,14 +586,14 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]:
# structured eval_issues. fixer can't classify → silent skip → forever. # structured eval_issues. fixer can't classify → silent skip → forever.
# #
# Both shapes need a clearance path. Reaper closes them after a 24h cooldown # Both shapes need a clearance path. Reaper closes them after a 24h cooldown
# with audit_log breadcrumbs for forensics. First deploy runs in DRY_RUN # with audit_log breadcrumbs for forensics. First deploy runs in dry-run mode
# mode (audit "would_close" events only — no Forgejo writes, no DB closes). # (audit "would_close" events only — no Forgejo writes, no DB closes).
# Operator reviews dry-run output, flips REAPER_DRY_RUN to False, redeploys. #
# Reaper config (REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS, REAPER_INTERVAL_SECONDS,
REAPER_DEADLOCK_AGE_HOURS = 24 # REAPER_MAX_PER_RUN) lives in lib/config.py with env-var overrides — operator
REAPER_INTERVAL_SECONDS = 3600 # at most once per hour # flips dry-run to live via `systemctl edit teleo-pipeline.service`
REAPER_MAX_PER_RUN = 50 # safety cap so a single cycle can't close everything # (Environment=REAPER_DRY_RUN=false) + restart. No code change, no commit, no
REAPER_DRY_RUN = True # FIRST-DEPLOY DEFAULT — flip to False after audit verification # redeploy required.
async def verdict_deadlock_reaper_cycle(conn) -> int: async def verdict_deadlock_reaper_cycle(conn) -> int:
@ -604,14 +604,15 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
""" """
# Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS. # Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS.
# Uses audit_log as the rate-limit ledger so no schema/state needed. # Uses audit_log as the rate-limit ledger so no schema/state needed.
# stage='reaper' filter so the planner uses idx_audit_stage (avoids full scan).
last_run = conn.execute( last_run = conn.execute(
"SELECT MAX(timestamp) FROM audit_log " "SELECT MAX(timestamp) FROM audit_log "
"WHERE event = 'verdict_deadlock_reaper_run'" "WHERE stage = 'reaper' AND event = 'verdict_deadlock_reaper_run'"
).fetchone()[0] ).fetchone()[0]
if last_run: if last_run:
cur = conn.execute( cur = conn.execute(
"SELECT (julianday('now') - julianday(?)) * 86400 < ?", "SELECT (julianday('now') - julianday(?)) * 86400 < ?",
(last_run, REAPER_INTERVAL_SECONDS), (last_run, config.REAPER_INTERVAL_SECONDS),
).fetchone()[0] ).fetchone()[0]
if cur: if cur:
return 0 return 0
@ -631,10 +632,10 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
) )
ORDER BY last_attempt ASC ORDER BY last_attempt ASC
LIMIT ?""", LIMIT ?""",
(f"-{REAPER_DEADLOCK_AGE_HOURS}", REAPER_MAX_PER_RUN), (f"-{config.REAPER_DEADLOCK_AGE_HOURS}", config.REAPER_MAX_PER_RUN),
).fetchall() ).fetchall()
mode = "dryrun" if REAPER_DRY_RUN else "live" mode = "dryrun" if config.REAPER_DRY_RUN else "live"
if not rows: if not rows:
# Heartbeat anyway so throttle ticks even when nothing to reap. # Heartbeat anyway so throttle ticks even when nothing to reap.
@ -645,10 +646,11 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
logger.info( logger.info(
"Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh", "Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh",
mode, len(rows), REAPER_DEADLOCK_AGE_HOURS, mode, len(rows), config.REAPER_DEADLOCK_AGE_HOURS,
) )
closed = 0 closed = 0
would_close = 0
errors = 0 errors = 0
for row in rows: for row in rows:
pr = row["number"] pr = row["number"]
@ -662,7 +664,7 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
"fix_attempts": row["fix_attempts"], "fix_attempts": row["fix_attempts"],
} }
if REAPER_DRY_RUN: if config.REAPER_DRY_RUN:
# Audit only — do NOT touch DB row or Forgejo state. # Audit only — do NOT touch DB row or Forgejo state.
db.audit(conn, "reaper", "verdict_deadlock_would_close", db.audit(conn, "reaper", "verdict_deadlock_would_close",
json.dumps(reason_detail)) json.dumps(reason_detail))
@ -670,13 +672,13 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
"Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)", "Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)",
pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"], pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"],
) )
closed += 1 would_close += 1
continue continue
try: try:
comment_body = ( comment_body = (
"Closed by verdict-deadlock reaper.\n\n" "Closed by verdict-deadlock reaper.\n\n"
f"This PR sat for >{REAPER_DEADLOCK_AGE_HOURS}h with conflicting " f"This PR sat for >{config.REAPER_DEADLOCK_AGE_HOURS}h with conflicting "
f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) " f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) "
f"that the substantive fixer couldn't auto-resolve.\n\n" f"that the substantive fixer couldn't auto-resolve.\n\n"
f"Eval issues: `{row['eval_issues']}`\n" f"Eval issues: `{row['eval_issues']}`\n"
@ -697,12 +699,16 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
) )
errors += 1 errors += 1
continue continue
# Forgejo already closed at the PATCH above — pass close_on_forgejo=False
# so close_pr() doesn't issue a redundant PATCH (which on transient
# failure returns False and skips the DB close → status drift).
await close_pr( await close_pr(
conn, pr, conn, pr,
last_error=( last_error=(
f"verdict_deadlock_reaper: leo={row['leo_verdict']} " f"verdict_deadlock_reaper: leo={row['leo_verdict']} "
f"domain={row['domain_verdict']} age>{REAPER_DEADLOCK_AGE_HOURS}h" f"domain={row['domain_verdict']} age>{config.REAPER_DEADLOCK_AGE_HOURS}h"
), ),
close_on_forgejo=False,
) )
db.audit(conn, "reaper", "verdict_deadlock_closed", db.audit(conn, "reaper", "verdict_deadlock_closed",
json.dumps(reason_detail)) json.dumps(reason_detail))
@ -712,11 +718,16 @@ async def verdict_deadlock_reaper_cycle(conn) -> int:
errors += 1 errors += 1
db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({ db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({
"candidates": len(rows), "closed": closed, "errors": errors, "mode": mode, "candidates": len(rows), "closed": closed, "would_close": would_close,
"errors": errors, "mode": mode,
})) }))
if errors: if errors:
logger.warning("Verdict-deadlock reaper [%s]: %d closed, %d errors", logger.warning(
mode, closed, errors) "Verdict-deadlock reaper [%s]: %d closed, %d would-close, %d errors",
mode, closed, would_close, errors,
)
elif config.REAPER_DRY_RUN:
logger.info("Verdict-deadlock reaper [dryrun]: %d would-close", would_close)
else: else:
logger.info("Verdict-deadlock reaper [%s]: %d closed", mode, closed) logger.info("Verdict-deadlock reaper [live]: %d closed", closed)
return closed return closed + would_close

View file

@ -102,7 +102,18 @@ async def fix_cycle(conn, max_workers=None):
""" """
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers) m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers) s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
r_closed = await verdict_deadlock_reaper_cycle(conn) # Defense-in-depth: reaper exception must never block primary fix paths.
# Same exception-isolation pattern as ingest_cycle's extract_cycle wrapper —
# propagating would trip the fix breaker and lock out mechanical+substantive
# for 15 min after 5 reaper failures.
try:
r_closed = await verdict_deadlock_reaper_cycle(conn)
except Exception:
import logging
logging.getLogger("pipeline").exception(
"Reaper cycle failed (non-fatal)"
)
r_closed = 0
return m_fixed + s_fixed + r_closed, m_errors + s_errors return m_fixed + s_fixed + r_closed, m_errors + s_errors