#!/usr/bin/env python3 """Teleo Pipeline v2 — single async daemon replacing 7 cron scripts. Four stages: Ingest → Validate → Evaluate → Merge SQLite WAL state store. systemd-managed. Graceful shutdown. """ import asyncio import logging import signal import sys # Add parent dir to path so lib/ is importable from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from lib import config, db from lib import log as logmod from lib.breaker import CircuitBreaker from lib.evaluate import evaluate_cycle from lib.fixer import fix_cycle as mechanical_fix_cycle from lib.substantive_fixer import substantive_fix_cycle from lib.health import start_health_server, stop_health_server from lib.llm import kill_active_subprocesses from lib.merge import merge_cycle from lib.analytics import record_snapshot from lib.entity_batch import entity_batch_cycle from lib.extract import extract_cycle as source_extract_cycle from lib.validate import validate_cycle from lib.watchdog import watchdog_cycle logger = logging.getLogger("pipeline") # Global shutdown event — stages check this between iterations shutdown_event = asyncio.Event() async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker): """Generic stage loop with interval, shutdown check, and circuit breaker.""" logger.info("Stage %s started (interval=%ds)", name, interval) while not shutdown_event.is_set(): try: if not breaker.allow_request(): logger.debug("Stage %s: breaker OPEN, skipping cycle", name) else: workers = breaker.max_workers() succeeded, failed = await func(conn, max_workers=workers) if failed > 0 and succeeded == 0: breaker.record_failure() elif succeeded > 0: breaker.record_success() except Exception: logger.exception("Stage %s: unhandled error in cycle", name) breaker.record_failure() # Wait for interval or shutdown, whichever comes first try: await asyncio.wait_for(shutdown_event.wait(), timeout=interval) break # shutdown_event was set except asyncio.TimeoutError: pass # interval elapsed, continue loop logger.info("Stage %s stopped", name) # --- Stage stubs (Phase 1 — replaced in later phases) --- async def ingest_cycle(conn, max_workers=None): """Stage 1: Entity batch + source extraction.""" # Entity batch first (fast, local-only operations) eb_ok, eb_err = await entity_batch_cycle(conn, max_workers=max_workers) # Source extraction (slower, LLM calls) try: ex_ok, ex_err = await source_extract_cycle(conn, max_workers=max_workers) except Exception: import logging logging.getLogger("pipeline").exception("Extract cycle failed (non-fatal)") ex_ok, ex_err = 0, 0 return eb_ok + ex_ok, eb_err + ex_err async def fix_cycle(conn, max_workers=None): """Combined fix stage: mechanical fixes first, then substantive fixes. Mechanical (fixer.py): wiki link bracket stripping, $0 Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001 """ m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers) s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers) return m_fixed + s_fixed, m_errors + s_errors async def snapshot_cycle(conn, max_workers=None): """Record metrics snapshot every cycle (runs on 15-min interval). Populates metrics_snapshots table for Argus analytics dashboard. Lightweight — just SQL queries, no LLM calls, no git ops. """ try: record_snapshot(conn) return 1, 0 except Exception: logger.exception("Snapshot recording failed") return 0, 1 # validate_cycle imported from lib.validate # evaluate_cycle imported from lib.evaluate # merge_cycle imported from lib.merge # --- Shutdown --- def handle_signal(sig): """Signal handler — sets shutdown event.""" logger.info("Received %s, initiating graceful shutdown...", sig.name) shutdown_event.set() async def kill_subprocesses(): """Kill any lingering Claude CLI subprocesses (delegates to evaluate module).""" await kill_active_subprocesses() async def cleanup_orphan_worktrees(): """Remove any orphan worktrees from previous crashes.""" import glob import shutil # Use specific prefix to avoid colliding with other /tmp users (Ganymede) orphans = glob.glob("/tmp/teleo-extract-*") + glob.glob("/tmp/teleo-merge-*") # Fixer worktrees live under BASE_DIR/workspaces/fix-* orphans += glob.glob(str(config.BASE_DIR / "workspaces" / "fix-*")) for path in orphans: logger.warning("Cleaning orphan worktree: %s", path) try: proc = await asyncio.create_subprocess_exec( "git", "worktree", "remove", "--force", path, cwd=str(config.REPO_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await asyncio.wait_for(proc.wait(), timeout=10) except Exception: shutil.rmtree(path, ignore_errors=True) # Prune stale worktree metadata entries from bare repo (Ganymede) try: proc = await asyncio.create_subprocess_exec( "git", "worktree", "prune", cwd=str(config.REPO_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await asyncio.wait_for(proc.wait(), timeout=10) except Exception: logger.warning("git worktree prune failed, continuing") # --- Main --- async def main(): logmod.setup_logging() logger.info("Teleo Pipeline v2 starting") # Clean orphan worktrees from prior crashes (Ganymede's requirement) await cleanup_orphan_worktrees() # Initialize database conn = db.get_connection() db.migrate(conn) logger.info("Database ready at %s", config.DB_PATH) # Initialize circuit breakers breakers = { "ingest": CircuitBreaker("ingest", conn), "validate": CircuitBreaker("validate", conn), "evaluate": CircuitBreaker("evaluate", conn), "merge": CircuitBreaker("merge", conn), "fix": CircuitBreaker("fix", conn), "snapshot": CircuitBreaker("snapshot", conn), "watchdog": CircuitBreaker("watchdog", conn), } # Recover interrupted state from crashes # Atomic recovery: all three resets in one transaction (Ganymede) # Increment transient_retries on recovered sources to prevent infinite cycling (Vida) with db.transaction(conn): # Sources stuck in 'extracting' — increment retry counter, move to error if exhausted c1 = conn.execute( """UPDATE sources SET transient_retries = transient_retries + 1, status = CASE WHEN transient_retries + 1 >= ? THEN 'error' ELSE 'unprocessed' END, last_error = CASE WHEN transient_retries + 1 >= ? THEN 'crash recovery: retry budget exhausted' ELSE last_error END, updated_at = datetime('now') WHERE status = 'extracting'""", (config.TRANSIENT_RETRY_MAX, config.TRANSIENT_RETRY_MAX), ) # PRs stuck in 'merging' → approved (Ganymede's Q4 answer) c2 = conn.execute("UPDATE prs SET status = 'approved' WHERE status = 'merging'") # PRs stuck in 'reviewing' → open c3 = conn.execute("UPDATE prs SET status = 'open', merge_cycled = 0 WHERE status = 'reviewing'") # PRs stuck in 'fixing' → open (fixer crashed mid-fix) c4 = conn.execute("UPDATE prs SET status = 'open' WHERE status = 'fixing'") recovered = c1.rowcount + c2.rowcount + c3.rowcount + c4.rowcount if recovered: logger.info("Recovered %d interrupted rows from prior crash", recovered) # Register signal handlers loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, handle_signal, sig) # Start health API health_runners = [] await start_health_server(health_runners) # Start stage loops stages = [ asyncio.create_task( stage_loop("ingest", config.INGEST_INTERVAL, ingest_cycle, conn, breakers["ingest"]), name="ingest", ), asyncio.create_task( stage_loop("validate", config.VALIDATE_INTERVAL, validate_cycle, conn, breakers["validate"]), name="validate", ), asyncio.create_task( stage_loop("evaluate", config.EVAL_INTERVAL, evaluate_cycle, conn, breakers["evaluate"]), name="evaluate", ), asyncio.create_task( stage_loop("merge", config.MERGE_INTERVAL, merge_cycle, conn, breakers["merge"]), name="merge", ), asyncio.create_task( stage_loop("fix", config.FIX_INTERVAL, fix_cycle, conn, breakers["fix"]), name="fix", ), asyncio.create_task( stage_loop("snapshot", 900, snapshot_cycle, conn, breakers["snapshot"]), name="snapshot", ), asyncio.create_task( stage_loop("watchdog", 60, watchdog_cycle, conn, breakers["watchdog"]), name="watchdog", ), ] logger.info("All stages running") # Wait for shutdown signal await shutdown_event.wait() logger.info("Shutdown event received, waiting for stages to finish...") # Give stages time to finish current work try: await asyncio.wait_for(asyncio.gather(*stages, return_exceptions=True), timeout=60) except asyncio.TimeoutError: logger.warning("Stages did not finish within 60s, force-cancelling") for task in stages: task.cancel() await asyncio.gather(*stages, return_exceptions=True) # Kill lingering subprocesses await kill_subprocesses() # Stop health API await stop_health_server(health_runners) # Close DB conn.close() logger.info("Teleo Pipeline v2 shut down cleanly") if __name__ == "__main__": asyncio.run(main())