diff --git a/lib/config.py b/lib/config.py index fb09412..b408fd4 100644 --- a/lib/config.py +++ b/lib/config.py @@ -211,6 +211,14 @@ HEALTH_CHECK_INTERVAL = 60 # --- Extraction gates --- EXTRACTION_COOLDOWN_HOURS = 4 # Skip sources with any PR activity in this window. Defense-in-depth for DB-status filter. +# --- Verdict-deadlock reaper --- +# Defaults safe (dry-run, 24h age, hourly throttle). Operator flips REAPER_DRY_RUN +# to "false" via systemctl edit teleo-pipeline → restart, no code change required. +REAPER_DRY_RUN = os.environ.get("REAPER_DRY_RUN", "true").lower() == "true" +REAPER_DEADLOCK_AGE_HOURS = int(os.environ.get("REAPER_DEADLOCK_AGE_HOURS", "24")) +REAPER_INTERVAL_SECONDS = int(os.environ.get("REAPER_INTERVAL_SECONDS", "3600")) +REAPER_MAX_PER_RUN = int(os.environ.get("REAPER_MAX_PER_RUN", "50")) + # --- Retrieval (Telegram bot) --- RETRIEVAL_RRF_K = 20 # RRF smoothing constant — tuned for 5-10 results per source RETRIEVAL_ENTITY_BOOST = 1.5 # RRF score multiplier for claims wiki-linked from matched entities diff --git a/lib/substantive_fixer.py b/lib/substantive_fixer.py index 04f0f5d..eabe93e 100644 --- a/lib/substantive_fixer.py +++ b/lib/substantive_fixer.py @@ -586,14 +586,14 @@ async def substantive_fix_cycle(conn, max_workers=None) -> tuple[int, int]: # structured eval_issues. fixer can't classify → silent skip → forever. # # Both shapes need a clearance path. Reaper closes them after a 24h cooldown -# with audit_log breadcrumbs for forensics. First deploy runs in DRY_RUN -# mode (audit "would_close" events only — no Forgejo writes, no DB closes). -# Operator reviews dry-run output, flips REAPER_DRY_RUN to False, redeploys. - -REAPER_DEADLOCK_AGE_HOURS = 24 -REAPER_INTERVAL_SECONDS = 3600 # at most once per hour -REAPER_MAX_PER_RUN = 50 # safety cap so a single cycle can't close everything -REAPER_DRY_RUN = True # FIRST-DEPLOY DEFAULT — flip to False after audit verification +# with audit_log breadcrumbs for forensics. First deploy runs in dry-run mode +# (audit "would_close" events only — no Forgejo writes, no DB closes). +# +# Reaper config (REAPER_DRY_RUN, REAPER_DEADLOCK_AGE_HOURS, REAPER_INTERVAL_SECONDS, +# REAPER_MAX_PER_RUN) lives in lib/config.py with env-var overrides — operator +# flips dry-run to live via `systemctl edit teleo-pipeline.service` +# (Environment=REAPER_DRY_RUN=false) + restart. No code change, no commit, no +# redeploy required. async def verdict_deadlock_reaper_cycle(conn) -> int: @@ -604,14 +604,15 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: """ # Throttle: skip if last reaper run was within REAPER_INTERVAL_SECONDS. # Uses audit_log as the rate-limit ledger so no schema/state needed. + # stage='reaper' filter so the planner uses idx_audit_stage (avoids full scan). last_run = conn.execute( "SELECT MAX(timestamp) FROM audit_log " - "WHERE event = 'verdict_deadlock_reaper_run'" + "WHERE stage = 'reaper' AND event = 'verdict_deadlock_reaper_run'" ).fetchone()[0] if last_run: cur = conn.execute( "SELECT (julianday('now') - julianday(?)) * 86400 < ?", - (last_run, REAPER_INTERVAL_SECONDS), + (last_run, config.REAPER_INTERVAL_SECONDS), ).fetchone()[0] if cur: return 0 @@ -631,10 +632,10 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: ) ORDER BY last_attempt ASC LIMIT ?""", - (f"-{REAPER_DEADLOCK_AGE_HOURS}", REAPER_MAX_PER_RUN), + (f"-{config.REAPER_DEADLOCK_AGE_HOURS}", config.REAPER_MAX_PER_RUN), ).fetchall() - mode = "dryrun" if REAPER_DRY_RUN else "live" + mode = "dryrun" if config.REAPER_DRY_RUN else "live" if not rows: # Heartbeat anyway so throttle ticks even when nothing to reap. @@ -645,10 +646,11 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: logger.info( "Verdict-deadlock reaper [%s]: %d candidate(s) in deadlock >%dh", - mode, len(rows), REAPER_DEADLOCK_AGE_HOURS, + mode, len(rows), config.REAPER_DEADLOCK_AGE_HOURS, ) closed = 0 + would_close = 0 errors = 0 for row in rows: pr = row["number"] @@ -662,7 +664,7 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: "fix_attempts": row["fix_attempts"], } - if REAPER_DRY_RUN: + if config.REAPER_DRY_RUN: # Audit only — do NOT touch DB row or Forgejo state. db.audit(conn, "reaper", "verdict_deadlock_would_close", json.dumps(reason_detail)) @@ -670,13 +672,13 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: "Reaper [dryrun]: would close PR #%d (leo=%s domain=%s issues=%s)", pr, row["leo_verdict"], row["domain_verdict"], row["eval_issues"], ) - closed += 1 + would_close += 1 continue try: comment_body = ( "Closed by verdict-deadlock reaper.\n\n" - f"This PR sat for >{REAPER_DEADLOCK_AGE_HOURS}h with conflicting " + f"This PR sat for >{config.REAPER_DEADLOCK_AGE_HOURS}h with conflicting " f"verdicts (leo={row['leo_verdict']}, domain={row['domain_verdict']}) " f"that the substantive fixer couldn't auto-resolve.\n\n" f"Eval issues: `{row['eval_issues']}`\n" @@ -697,12 +699,16 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: ) errors += 1 continue + # Forgejo already closed at the PATCH above — pass close_on_forgejo=False + # so close_pr() doesn't issue a redundant PATCH (which on transient + # failure returns False and skips the DB close → status drift). await close_pr( conn, pr, last_error=( f"verdict_deadlock_reaper: leo={row['leo_verdict']} " - f"domain={row['domain_verdict']} age>{REAPER_DEADLOCK_AGE_HOURS}h" + f"domain={row['domain_verdict']} age>{config.REAPER_DEADLOCK_AGE_HOURS}h" ), + close_on_forgejo=False, ) db.audit(conn, "reaper", "verdict_deadlock_closed", json.dumps(reason_detail)) @@ -712,11 +718,16 @@ async def verdict_deadlock_reaper_cycle(conn) -> int: errors += 1 db.audit(conn, "reaper", "verdict_deadlock_reaper_run", json.dumps({ - "candidates": len(rows), "closed": closed, "errors": errors, "mode": mode, + "candidates": len(rows), "closed": closed, "would_close": would_close, + "errors": errors, "mode": mode, })) if errors: - logger.warning("Verdict-deadlock reaper [%s]: %d closed, %d errors", - mode, closed, errors) + logger.warning( + "Verdict-deadlock reaper [%s]: %d closed, %d would-close, %d errors", + mode, closed, would_close, errors, + ) + elif config.REAPER_DRY_RUN: + logger.info("Verdict-deadlock reaper [dryrun]: %d would-close", would_close) else: - logger.info("Verdict-deadlock reaper [%s]: %d closed", mode, closed) - return closed + logger.info("Verdict-deadlock reaper [live]: %d closed", closed) + return closed + would_close diff --git a/teleo-pipeline.py b/teleo-pipeline.py index 58e0bf5..9b78edf 100644 --- a/teleo-pipeline.py +++ b/teleo-pipeline.py @@ -102,7 +102,18 @@ async def fix_cycle(conn, max_workers=None): """ m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers) s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers) - r_closed = await verdict_deadlock_reaper_cycle(conn) + # Defense-in-depth: reaper exception must never block primary fix paths. + # Same exception-isolation pattern as ingest_cycle's extract_cycle wrapper — + # propagating would trip the fix breaker and lock out mechanical+substantive + # for 15 min after 5 reaper failures. + try: + r_closed = await verdict_deadlock_reaper_cycle(conn) + except Exception: + import logging + logging.getLogger("pipeline").exception( + "Reaper cycle failed (non-fatal)" + ) + r_closed = 0 return m_fixed + s_fixed + r_closed, m_errors + s_errors