teleo-infrastructure/teleo-pipeline.py
m3taversal d79ff60689 epimetheus: sync VPS-deployed code to repo — Mar 18-20 reliability + features
Pipeline reliability (8 fixes, reviewed by Ganymede+Rhea+Leo+Rio):
1. Merge API recovery — pre-flight approval check, transient/permanent distinction, jitter
2. Ghost PR detection — ls-remote branch check in reconciliation, network guard
3. Source status contract — directory IS status, no code change needed
4. Batch-state markers eliminated — two-gate skip (archive-check + batched branch-check)
5. Branch SHA tracking — batched ls-remote, auto-reset verdicts, dismiss stale reviews
6. Mirror pre-flight permissions — chown check in sync-mirror.sh
7. Telegram archive commit-after-write — git add/commit/push with rebase --abort fallback
8. Post-merge source archiving — queue/ → archive/{domain}/ after merge

Pipeline fixes:
- merge_cycled flag — eval attempts preserved during merge-failure cycling (Ganymede+Rhea)
- merge_failures diagnostic counter
- Startup recovery preserves eval_attempts (was incorrectly resetting to 0)
- No-diff PRs auto-closed by eval (root cause of 17 zombie PRs)
- GC threshold aligned with substantive fixer budget (was 2, now 4)
- Conflict retry with 3-attempt budget + permanent conflict handler
- Local ff-merge fallback for Forgejo 405 errors

Telegram bot:
- KB retrieval: 3-layer (entity resolution → claim search → agent context)
- Reply-to-bot handler (context.bot.id check)
- Tag regex: @teleo|@futairdbot
- Prompt rewrite for natural analyst voice
- Market data API integration (Ben's token price endpoint)
- Conversation windows (5-message unanswered counter, per-user-per-chat)
- Conversation history in prompt (last 5 exchanges)
- Worktree file lock for archive writes

Infrastructure:
- worktree_lock.py — file-based lock (flock) for main worktree coordination
- backfill-sources.py — source DB registration for Argus funnel
- batch-extract-50.sh v3 — two-gate skip, batched ls-remote, network guard
- sync-mirror.sh — auto-PR creation for mirrored GitHub branches, permission pre-flight
- Argus dashboard — conflicts + reviewing in backlog, queue count in funnel
- Enrichment-inside-frontmatter bug fix (regex anchor, not --- split)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-20 20:17:27 +00:00

286 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""Teleo Pipeline v2 — single async daemon replacing 7 cron scripts.
Four stages: Ingest → Validate → Evaluate → Merge
SQLite WAL state store. systemd-managed. Graceful shutdown.
"""
import asyncio
import logging
import signal
import sys
# Add parent dir to path so lib/ is importable
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from lib import config, db
from lib import log as logmod
from lib.breaker import CircuitBreaker
from lib.evaluate import evaluate_cycle
from lib.fixer import fix_cycle as mechanical_fix_cycle
from lib.substantive_fixer import substantive_fix_cycle
from lib.health import start_health_server, stop_health_server
from lib.llm import kill_active_subprocesses
from lib.merge import merge_cycle
from lib.analytics import record_snapshot
from lib.entity_batch import entity_batch_cycle
from lib.validate import validate_cycle
from lib.watchdog import watchdog_cycle
logger = logging.getLogger("pipeline")
# Global shutdown event — stages check this between iterations
shutdown_event = asyncio.Event()
async def stage_loop(name: str, interval: int, func, conn, breaker: CircuitBreaker):
"""Generic stage loop with interval, shutdown check, and circuit breaker."""
logger.info("Stage %s started (interval=%ds)", name, interval)
while not shutdown_event.is_set():
try:
if not breaker.allow_request():
logger.debug("Stage %s: breaker OPEN, skipping cycle", name)
else:
workers = breaker.max_workers()
succeeded, failed = await func(conn, max_workers=workers)
if failed > 0 and succeeded == 0:
breaker.record_failure()
elif succeeded > 0:
breaker.record_success()
except Exception:
logger.exception("Stage %s: unhandled error in cycle", name)
breaker.record_failure()
# Wait for interval or shutdown, whichever comes first
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=interval)
break # shutdown_event was set
except asyncio.TimeoutError:
pass # interval elapsed, continue loop
logger.info("Stage %s stopped", name)
# --- Stage stubs (Phase 1 — replaced in later phases) ---
async def ingest_cycle(conn, max_workers=None):
"""Stage 1: Process entity queue + scan inbox. Entity batch replaces stub."""
return await entity_batch_cycle(conn, max_workers=max_workers)
async def fix_cycle(conn, max_workers=None):
"""Combined fix stage: mechanical fixes first, then substantive fixes.
Mechanical (fixer.py): wiki link bracket stripping, $0
Substantive (substantive_fixer.py): confidence/title/scope fixes via LLM, $0.001
"""
m_fixed, m_errors = await mechanical_fix_cycle(conn, max_workers=max_workers)
s_fixed, s_errors = await substantive_fix_cycle(conn, max_workers=max_workers)
return m_fixed + s_fixed, m_errors + s_errors
async def snapshot_cycle(conn, max_workers=None):
"""Record metrics snapshot every cycle (runs on 15-min interval).
Populates metrics_snapshots table for Argus analytics dashboard.
Lightweight — just SQL queries, no LLM calls, no git ops.
"""
try:
record_snapshot(conn)
return 1, 0
except Exception:
logger.exception("Snapshot recording failed")
return 0, 1
# validate_cycle imported from lib.validate
# evaluate_cycle imported from lib.evaluate
# merge_cycle imported from lib.merge
# --- Shutdown ---
def handle_signal(sig):
"""Signal handler — sets shutdown event."""
logger.info("Received %s, initiating graceful shutdown...", sig.name)
shutdown_event.set()
async def kill_subprocesses():
"""Kill any lingering Claude CLI subprocesses (delegates to evaluate module)."""
await kill_active_subprocesses()
async def cleanup_orphan_worktrees():
"""Remove any orphan worktrees from previous crashes."""
import glob
import shutil
# Use specific prefix to avoid colliding with other /tmp users (Ganymede)
orphans = glob.glob("/tmp/teleo-extract-*") + glob.glob("/tmp/teleo-merge-*")
# Fixer worktrees live under BASE_DIR/workspaces/fix-*
orphans += glob.glob(str(config.BASE_DIR / "workspaces" / "fix-*"))
for path in orphans:
logger.warning("Cleaning orphan worktree: %s", path)
try:
proc = await asyncio.create_subprocess_exec(
"git",
"worktree",
"remove",
"--force",
path,
cwd=str(config.REPO_DIR),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await asyncio.wait_for(proc.wait(), timeout=10)
except Exception:
shutil.rmtree(path, ignore_errors=True)
# Prune stale worktree metadata entries from bare repo (Ganymede)
try:
proc = await asyncio.create_subprocess_exec(
"git",
"worktree",
"prune",
cwd=str(config.REPO_DIR),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await asyncio.wait_for(proc.wait(), timeout=10)
except Exception:
logger.warning("git worktree prune failed, continuing")
# --- Main ---
async def main():
logmod.setup_logging()
logger.info("Teleo Pipeline v2 starting")
# Clean orphan worktrees from prior crashes (Ganymede's requirement)
await cleanup_orphan_worktrees()
# Initialize database
conn = db.get_connection()
db.migrate(conn)
logger.info("Database ready at %s", config.DB_PATH)
# Initialize circuit breakers
breakers = {
"ingest": CircuitBreaker("ingest", conn),
"validate": CircuitBreaker("validate", conn),
"evaluate": CircuitBreaker("evaluate", conn),
"merge": CircuitBreaker("merge", conn),
"fix": CircuitBreaker("fix", conn),
"snapshot": CircuitBreaker("snapshot", conn),
"watchdog": CircuitBreaker("watchdog", conn),
}
# Recover interrupted state from crashes
# Atomic recovery: all three resets in one transaction (Ganymede)
# Increment transient_retries on recovered sources to prevent infinite cycling (Vida)
with db.transaction(conn):
# Sources stuck in 'extracting' — increment retry counter, move to error if exhausted
c1 = conn.execute(
"""UPDATE sources SET
transient_retries = transient_retries + 1,
status = CASE
WHEN transient_retries + 1 >= ? THEN 'error'
ELSE 'unprocessed'
END,
last_error = CASE
WHEN transient_retries + 1 >= ? THEN 'crash recovery: retry budget exhausted'
ELSE last_error
END,
updated_at = datetime('now')
WHERE status = 'extracting'""",
(config.TRANSIENT_RETRY_MAX, config.TRANSIENT_RETRY_MAX),
)
# PRs stuck in 'merging' → approved (Ganymede's Q4 answer)
c2 = conn.execute("UPDATE prs SET status = 'approved' WHERE status = 'merging'")
# PRs stuck in 'reviewing' → open
c3 = conn.execute("UPDATE prs SET status = 'open', merge_cycled = 0 WHERE status = 'reviewing'")
# PRs stuck in 'fixing' → open (fixer crashed mid-fix)
c4 = conn.execute("UPDATE prs SET status = 'open' WHERE status = 'fixing'")
recovered = c1.rowcount + c2.rowcount + c3.rowcount + c4.rowcount
if recovered:
logger.info("Recovered %d interrupted rows from prior crash", recovered)
# Register signal handlers
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, handle_signal, sig)
# Start health API
health_runners = []
await start_health_server(health_runners)
# Start stage loops
stages = [
asyncio.create_task(
stage_loop("ingest", config.INGEST_INTERVAL, ingest_cycle, conn, breakers["ingest"]),
name="ingest",
),
asyncio.create_task(
stage_loop("validate", config.VALIDATE_INTERVAL, validate_cycle, conn, breakers["validate"]),
name="validate",
),
asyncio.create_task(
stage_loop("evaluate", config.EVAL_INTERVAL, evaluate_cycle, conn, breakers["evaluate"]),
name="evaluate",
),
asyncio.create_task(
stage_loop("merge", config.MERGE_INTERVAL, merge_cycle, conn, breakers["merge"]),
name="merge",
),
asyncio.create_task(
stage_loop("fix", config.FIX_INTERVAL, fix_cycle, conn, breakers["fix"]),
name="fix",
),
asyncio.create_task(
stage_loop("snapshot", 900, snapshot_cycle, conn, breakers["snapshot"]),
name="snapshot",
),
asyncio.create_task(
stage_loop("watchdog", 60, watchdog_cycle, conn, breakers["watchdog"]),
name="watchdog",
),
]
logger.info("All stages running")
# Wait for shutdown signal
await shutdown_event.wait()
logger.info("Shutdown event received, waiting for stages to finish...")
# Give stages time to finish current work
try:
await asyncio.wait_for(asyncio.gather(*stages, return_exceptions=True), timeout=60)
except asyncio.TimeoutError:
logger.warning("Stages did not finish within 60s, force-cancelling")
for task in stages:
task.cancel()
await asyncio.gather(*stages, return_exceptions=True)
# Kill lingering subprocesses
await kill_subprocesses()
# Stop health API
await stop_health_server(health_runners)
# Close DB
conn.close()
logger.info("Teleo Pipeline v2 shut down cleanly")
if __name__ == "__main__":
asyncio.run(main())