Defense-in-depth for PRs that substantive_fixer can't make progress on.
Targets two stuck-verdict shapes empirically observed in production:
1. leo:request_changes + domain:approve
Leo asked for substantive fix; fixer either failed silently
(no_claim_files / no_review_comments / etc.) or the issue tag isn't
in FIXABLE | CONVERTIBLE | UNFIXABLE.
2. leo:skipped + domain:request_changes
Eval bypassed Leo (eval_attempts >= MAX). Domain rejected with no
structured eval_issues. fixer can't classify the issue.
92 PRs match this gate today, oldest at 2026-04-24 (13d stuck).
Behavior:
- Hourly throttle via audit_log sentinel ('verdict_deadlock_reaper_run').
- REAPER_DRY_RUN=True default — first deploy emits 'would_close' audit
events only. No DB writes. No Forgejo writes. (Ship Apr 24 directive.)
- 24h cooldown, oldest-first, capped at 50 per run.
- Heartbeat audit fires whether dry-run or live, so throttle works.
- Live mode: posts comment + closes Forgejo PR + close_pr() in DB.
Audits 'verdict_deadlock_closed' per PR.
- Forgejo PATCH None → skip DB close (avoid drift).
Wired into fix_cycle() in teleo-pipeline.py. Runs after mechanical
and substantive fixes, never blocks them.
Followup (post first-run audit verification):
- Operator inspects 'verdict_deadlock_would_close' audit rows
- Flips REAPER_DRY_RUN to False, redeploys
- Reaper actually closes on next hourly tick
310 lines
11 KiB
Python
310 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Teleo Pipeline v2 — single async daemon replacing 7 cron scripts.
|
|
|
|
Four stages: Ingest → Validate → Evaluate → Merge
|
|
SQLite WAL state store. systemd-managed. Graceful shutdown.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import signal
|
|
import sys
|
|
|
|
# Add parent dir to path so lib/ is importable
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from lib import config, db
|
|
from lib import log as logmod
|
|
from lib.breaker import CircuitBreaker
|
|
from lib.evaluate import evaluate_cycle
|
|
from lib.fixer import fix_cycle as mechanical_fix_cycle
|
|
from lib.substantive_fixer import substantive_fix_cycle, verdict_deadlock_reaper_cycle
|
|
from lib.health import start_health_server, stop_health_server
|
|
from lib.llm import kill_active_subprocesses
|
|
from lib.merge import merge_cycle
|
|
from lib.analytics import record_snapshot
|
|
from lib.entity_batch import entity_batch_cycle
|
|
from lib.extract import extract_cycle as source_extract_cycle
|
|
from lib.validate import validate_cycle
|
|
from lib.watchdog import watchdog_cycle
|
|
|
|
logger = logging.getLogger("pipeline")
|
|
|
|
# Global shutdown event — stages check this between iterations
|
|
shutdown_event = asyncio.Event()
|
|
|
|
|
|
async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker):
|
|
"""Generic stage loop with interval, shutdown check, and circuit breaker."""
|
|
logger.info("Stage %s started (interval=%ds)", name, interval)
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
if not breaker.allow_request():
|
|
logger.debug("Stage %s: breaker OPEN, skipping cycle", name)
|
|
else:
|
|
workers = breaker.max_workers()
|
|
succeeded, failed = await func(conn, max_workers=workers)
|
|
if failed > 0 and succeeded == 0:
|
|
try:
|
|
breaker.record_failure()
|
|
except Exception:
|
|
logger.warning("Stage %s: breaker write failed", name)
|
|
elif succeeded > 0:
|
|
try:
|
|
breaker.record_success()
|
|
except Exception:
|
|
logger.warning("Stage %s: breaker write failed", name)
|
|
except Exception:
|
|
logger.exception("Stage %s: unhandled error in cycle", name)
|
|
try:
|
|
breaker.record_failure()
|
|
except Exception:
|
|
logger.warning("Stage %s: breaker write failed", name)
|
|
|
|
# Wait for interval or shutdown, whichever comes first
|
|
try:
|
|
await asyncio.wait_for(shutdown_event.wait(), timeout=interval)
|
|
break # shutdown_event was set
|
|
except asyncio.TimeoutError:
|
|
pass # interval elapsed, continue loop
|
|
|
|
logger.info("Stage %s stopped", name)
|
|
|
|
|
|
# --- Stage stubs (Phase 1 — replaced in later phases) ---
|
|
|
|
|
|
async def ingest_cycle(conn, max_workers=None):
|
|
"""Stage 1: Entity batch + source extraction."""
|
|
# Entity batch first (fast, local-only operations)
|
|
eb_ok, eb_err = await entity_batch_cycle(conn, max_workers=max_workers)
|
|
# Source extraction (slower, LLM calls)
|
|
try:
|
|
ex_ok, ex_err = await source_extract_cycle(conn, max_workers=max_workers)
|
|
except Exception:
|
|
import logging
|
|
logging.getLogger("pipeline").exception("Extract cycle failed (non-fatal)")
|
|
ex_ok, ex_err = 0, 0
|
|
return eb_ok + ex_ok, eb_err + ex_err
|
|
|
|
|
|
async def fix_cycle(conn, max_workers=None):
|
|
"""Combined fix stage: mechanical fixes first, then substantive fixes,
|
|
finally the verdict-deadlock reaper.
|
|
|
|
Mechanical (fixer.py): wiki link bracket stripping, $0
|
|
Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001
|
|
Reaper (substantive_fixer.verdict_deadlock_reaper_cycle): defense-in-depth
|
|
for stuck-verdict PRs that the substantive fixer can't progress on.
|
|
Hourly throttle, dry-run by default. Cost $0.
|
|
"""
|
|
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
|
|
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
|
|
r_closed = await verdict_deadlock_reaper_cycle(conn)
|
|
return m_fixed + s_fixed + r_closed, m_errors + s_errors
|
|
|
|
|
|
async def snapshot_cycle(conn, max_workers=None):
|
|
"""Record metrics snapshot every cycle (runs on 15-min interval).
|
|
|
|
Populates metrics_snapshots table for Argus analytics dashboard.
|
|
Lightweight — just SQL queries, no LLM calls, no git ops.
|
|
"""
|
|
try:
|
|
record_snapshot(conn)
|
|
return 1, 0
|
|
except Exception:
|
|
logger.exception("Snapshot recording failed")
|
|
return 0, 1
|
|
|
|
|
|
# validate_cycle imported from lib.validate
|
|
|
|
|
|
# evaluate_cycle imported from lib.evaluate
|
|
|
|
|
|
# merge_cycle imported from lib.merge
|
|
|
|
|
|
# --- Shutdown ---
|
|
|
|
|
|
def handle_signal(sig):
|
|
"""Signal handler — sets shutdown event."""
|
|
logger.info("Received %s, initiating graceful shutdown...", sig.name)
|
|
shutdown_event.set()
|
|
|
|
|
|
async def kill_subprocesses():
|
|
"""Kill any lingering Claude CLI subprocesses (delegates to evaluate module)."""
|
|
await kill_active_subprocesses()
|
|
|
|
|
|
async def cleanup_orphan_worktrees():
|
|
"""Remove any orphan worktrees from previous crashes."""
|
|
import glob
|
|
import shutil
|
|
|
|
# Use specific prefix to avoid colliding with other /tmp users (Ganymede)
|
|
orphans = glob.glob("/tmp/teleo-extract-*") + glob.glob("/tmp/teleo-merge-*")
|
|
# Fixer worktrees live under BASE_DIR/workspaces/fix-*
|
|
orphans += glob.glob(str(config.BASE_DIR / "workspaces" / "fix-*"))
|
|
for path in orphans:
|
|
logger.warning("Cleaning orphan worktree: %s", path)
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git",
|
|
"worktree",
|
|
"remove",
|
|
"--force",
|
|
path,
|
|
cwd=str(config.REPO_DIR),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await asyncio.wait_for(proc.wait(), timeout=10)
|
|
except Exception:
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
# Prune stale worktree metadata entries from bare repo (Ganymede)
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"git",
|
|
"worktree",
|
|
"prune",
|
|
cwd=str(config.REPO_DIR),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await asyncio.wait_for(proc.wait(), timeout=10)
|
|
except Exception:
|
|
logger.warning("git worktree prune failed, continuing")
|
|
|
|
|
|
# --- Main ---
|
|
|
|
|
|
async def main():
|
|
logmod.setup_logging()
|
|
logger.info("Teleo Pipeline v2 starting")
|
|
|
|
# Clean orphan worktrees from prior crashes (Ganymede's requirement)
|
|
await cleanup_orphan_worktrees()
|
|
|
|
# Initialize database
|
|
conn = db.get_connection()
|
|
db.migrate(conn)
|
|
logger.info("Database ready at %s", config.DB_PATH)
|
|
|
|
# Initialize circuit breakers
|
|
breakers = {
|
|
"ingest": CircuitBreaker("ingest", conn),
|
|
"validate": CircuitBreaker("validate", conn),
|
|
"evaluate": CircuitBreaker("evaluate", conn),
|
|
"merge": CircuitBreaker("merge", conn),
|
|
"fix": CircuitBreaker("fix", conn),
|
|
"snapshot": CircuitBreaker("snapshot", conn),
|
|
"watchdog": CircuitBreaker("watchdog", conn),
|
|
}
|
|
|
|
# Recover interrupted state from crashes
|
|
# Atomic recovery: all three resets in one transaction (Ganymede)
|
|
# Increment transient_retries on recovered sources to prevent infinite cycling (Vida)
|
|
with db.transaction(conn):
|
|
# Sources stuck in 'extracting' — increment retry counter, move to error if exhausted
|
|
c1 = conn.execute(
|
|
"""UPDATE sources SET
|
|
transient_retries = transient_retries + 1,
|
|
status = CASE
|
|
WHEN transient_retries + 1 >= ? THEN 'error'
|
|
ELSE 'unprocessed'
|
|
END,
|
|
last_error = CASE
|
|
WHEN transient_retries + 1 >= ? THEN 'crash recovery: retry budget exhausted'
|
|
ELSE last_error
|
|
END,
|
|
updated_at = datetime('now')
|
|
WHERE status = 'extracting'""",
|
|
(config.TRANSIENT_RETRY_MAX, config.TRANSIENT_RETRY_MAX),
|
|
)
|
|
# PRs stuck in 'merging' → approved (Ganymede's Q4 answer)
|
|
c2 = conn.execute("UPDATE prs SET status = 'approved' WHERE status = 'merging'")
|
|
# PRs stuck in 'reviewing' → open
|
|
c3 = conn.execute("UPDATE prs SET status = 'open', merge_cycled = 0 WHERE status = 'reviewing'")
|
|
# PRs stuck in 'fixing' → open (fixer crashed mid-fix)
|
|
c4 = conn.execute("UPDATE prs SET status = 'open' WHERE status = 'fixing'")
|
|
recovered = c1.rowcount + c2.rowcount + c3.rowcount + c4.rowcount
|
|
if recovered:
|
|
logger.info("Recovered %d interrupted rows from prior crash", recovered)
|
|
|
|
# Register signal handlers
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, handle_signal, sig)
|
|
|
|
# Start health API
|
|
health_runners = []
|
|
await start_health_server(health_runners)
|
|
|
|
# Start stage loops
|
|
stages = [
|
|
asyncio.create_task(
|
|
stage_loop("ingest", config.INGEST_INTERVAL, ingest_cycle, conn, breakers["ingest"]),
|
|
name="ingest",
|
|
),
|
|
asyncio.create_task(
|
|
stage_loop("validate", config.VALIDATE_INTERVAL, validate_cycle, conn, breakers["validate"]),
|
|
name="validate",
|
|
),
|
|
asyncio.create_task(
|
|
stage_loop("evaluate", config.EVAL_INTERVAL, evaluate_cycle, conn, breakers["evaluate"]),
|
|
name="evaluate",
|
|
),
|
|
asyncio.create_task(
|
|
stage_loop("merge", config.MERGE_INTERVAL, merge_cycle, conn, breakers["merge"]),
|
|
name="merge",
|
|
),
|
|
asyncio.create_task(
|
|
stage_loop("fix", config.FIX_INTERVAL, fix_cycle, conn, breakers["fix"]),
|
|
name="fix",
|
|
),
|
|
asyncio.create_task(
|
|
stage_loop("snapshot", 900, snapshot_cycle, conn, breakers["snapshot"]),
|
|
name="snapshot",
|
|
),
|
|
asyncio.create_task(
|
|
stage_loop("watchdog", 60, watchdog_cycle, conn, breakers["watchdog"]),
|
|
name="watchdog",
|
|
),
|
|
]
|
|
|
|
logger.info("All stages running")
|
|
|
|
# Wait for shutdown signal
|
|
await shutdown_event.wait()
|
|
logger.info("Shutdown event received, waiting for stages to finish...")
|
|
|
|
# Give stages time to finish current work
|
|
try:
|
|
await asyncio.wait_for(asyncio.gather(*stages, return_exceptions=True), timeout=60)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Stages did not finish within 60s, force-cancelling")
|
|
for task in stages:
|
|
task.cancel()
|
|
await asyncio.gather(*stages, return_exceptions=True)
|
|
|
|
# Kill lingering subprocesses
|
|
await kill_subprocesses()
|
|
|
|
# Stop health API
|
|
await stop_health_server(health_runners)
|
|
|
|
# Close DB
|
|
conn.close()
|
|
logger.info("Teleo Pipeline v2 shut down cleanly")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|