114 lines
3.4 KiB
Python
114 lines
3.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Recover Teleo Telegram agent services from known runtime faults."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
|
|
|
|
DEFAULT_AGENTS = ("leo", "leo-wallet-test")
|
|
TRANSCRIPT_PATH = "/opt/teleo-eval/transcripts"
|
|
RECOVERABLE_LOG_PATTERNS = (
|
|
"Read-only file system: '/opt/teleo-eval/transcripts",
|
|
'Read-only file system: "/opt/teleo-eval/transcripts',
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CommandResult:
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
|
|
|
|
def run_command(args: list[str]) -> CommandResult:
|
|
completed = subprocess.run(args, capture_output=True, text=True, check=False)
|
|
return CommandResult(
|
|
returncode=completed.returncode,
|
|
stdout=completed.stdout,
|
|
stderr=completed.stderr,
|
|
)
|
|
|
|
|
|
def unit_name(agent: str) -> str:
|
|
return f"teleo-agent@{agent}.service"
|
|
|
|
|
|
def service_is_active(unit: str) -> bool:
|
|
return run_command(["systemctl", "is-active", "--quiet", unit]).returncode == 0
|
|
|
|
|
|
def readwrite_paths_include_transcripts(unit: str) -> bool:
|
|
result = run_command(["systemctl", "show", unit, "-p", "ReadWritePaths", "--value"])
|
|
return result.returncode == 0 and TRANSCRIPT_PATH in result.stdout.split()
|
|
|
|
|
|
def recent_logs(unit: str, since: str) -> str:
|
|
result = run_command(["journalctl", "-u", unit, "--since", since, "--no-pager"])
|
|
if result.returncode != 0:
|
|
return result.stdout + result.stderr
|
|
return result.stdout
|
|
|
|
|
|
def should_restart_from_logs(log_text: str) -> bool:
|
|
return any(pattern in log_text for pattern in RECOVERABLE_LOG_PATTERNS)
|
|
|
|
|
|
def restart_service(unit: str, *, dry_run: bool) -> bool:
|
|
if dry_run:
|
|
print(f"dry_run restart {unit}")
|
|
return True
|
|
result = run_command(["systemctl", "restart", unit])
|
|
if result.returncode != 0:
|
|
print(f"restart_failed unit={unit} stderr={result.stderr.strip()}", file=sys.stderr)
|
|
return False
|
|
return True
|
|
|
|
|
|
def check_agent(agent: str, *, since: str, dry_run: bool) -> bool:
|
|
unit = unit_name(agent)
|
|
ok = True
|
|
reasons: list[str] = []
|
|
|
|
if not readwrite_paths_include_transcripts(unit):
|
|
print(f"unit_missing_transcript_write_path unit={unit}", file=sys.stderr)
|
|
ok = False
|
|
|
|
if not service_is_active(unit):
|
|
reasons.append("inactive")
|
|
elif should_restart_from_logs(recent_logs(unit, since)):
|
|
reasons.append("recoverable_log_fault")
|
|
|
|
if reasons:
|
|
print(f"recovering unit={unit} reasons={','.join(reasons)}")
|
|
ok = restart_service(unit, dry_run=dry_run) and ok
|
|
if ok and not dry_run:
|
|
ok = service_is_active(unit)
|
|
print(f"post_restart unit={unit} active={ok}")
|
|
else:
|
|
print(f"healthy unit={unit}")
|
|
|
|
return ok
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--agents", nargs="+", default=list(DEFAULT_AGENTS))
|
|
parser.add_argument("--since", default="20 min ago")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(sys.argv[1:] if argv is None else argv)
|
|
ok = True
|
|
for agent in args.agents:
|
|
ok = check_agent(agent, since=args.since, dry_run=args.dry_run) and ok
|
|
return 0 if ok else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|