#!/usr/bin/env python3 """Install a Telegram agent bot token without printing the secret value.""" from __future__ import annotations import argparse import getpass import grp import json import os import pwd import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$") def repo_root_from_script() -> Path: return Path(__file__).resolve().parents[1] def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Install a Telegram agent token from stdin or a hidden prompt.", allow_abbrev=False, ) parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/.yaml") parser.add_argument("--repo-root", default=str(repo_root_from_script())) parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets") parser.add_argument("--from-stdin", action="store_true", help="Read the token from stdin instead of a prompt") parser.add_argument("--owner", default="teleo", help="Token file owner after write") parser.add_argument("--group", default="teleo", help="Token file group after write") parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests") parser.add_argument("--dry-run", action="store_true", help="Validate paths and input without writing") parser.add_argument("--start-service", action="store_true", help="Run systemctl start teleo-agent@.service") parser.add_argument("--enable-service", action="store_true", help="Run systemctl enable teleo-agent@.service") parser.add_argument("--skip-validate", action="store_true", help="Skip agent_runner.py --validate") parser.add_argument("--output", help="Optional sanitized JSON proof path") namespace, unknown = parser.parse_known_args(argv) if unknown: print( "ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.", file=sys.stderr, ) raise SystemExit(2) return namespace def read_token(*, from_stdin: bool) -> str: token = sys.stdin.read() if from_stdin else getpass.getpass("Telegram bot token: ") return token.strip() def validate_token(token: str) -> None: if not TOKEN_RE.match(token): raise ValueError("Telegram bot token shape is invalid") def load_agent_config(repo_root: Path, agent: str): telegram_dir = repo_root / "telegram" sys.path.insert(0, str(telegram_dir)) from agent_config import load_agent_config as load_config config_path = telegram_dir / "agents" / f"{agent}.yaml" config = load_config(str(config_path)) return config_path, config def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path: token_file = Path(bot_token_file) if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}: raise ValueError("bot_token_file must be a plain filename") return secrets_dir / token_file.name def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]: if no_chown: return None, None return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid def write_token_file(token: str, token_path: Path, *, uid: int | None, gid: int | None, dry_run: bool) -> None: if dry_run: return token_path.parent.mkdir(parents=True, mode=0o700, exist_ok=True) tmp_path = token_path.with_name(f".{token_path.name}.tmp-{os.getpid()}") flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL fd = os.open(tmp_path, flags, 0o600) try: with os.fdopen(fd, "w") as handle: handle.write(token) handle.write("\n") handle.flush() os.fsync(handle.fileno()) os.chmod(tmp_path, 0o600) if uid is not None or gid is not None: os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid) os.replace(tmp_path, token_path) os.chmod(token_path, 0o600) finally: if tmp_path.exists(): tmp_path.unlink() def run_command(command: list[str], *, dry_run: bool) -> dict: if dry_run: return {"command": command, "skipped": "dry_run"} proc = subprocess.run(command, check=False, text=True, capture_output=True) return { "command": command, "returncode": proc.returncode, "stdout": proc.stdout.strip(), "stderr": proc.stderr.strip(), } def validate_agent(repo_root: Path, agent: str, *, dry_run: bool, skip_validate: bool) -> dict | None: if skip_validate: return None runner = repo_root / "telegram" / "agent_runner.py" return run_command([sys.executable, str(runner), "--agent", agent, "--validate"], dry_run=dry_run) def main(argv: list[str] | None = None) -> int: args = parse_args(sys.argv[1:] if argv is None else argv) repo_root = Path(args.repo_root).resolve() secrets_dir = Path(args.secrets_dir) token = read_token(from_stdin=args.from_stdin) config_path, config = load_agent_config(repo_root, args.agent) token_path = token_path_for(secrets_dir, config.bot_token_file) validate_token(token) uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown) write_token_file(token, token_path, uid=uid, gid=gid, dry_run=args.dry_run) validate_result = validate_agent(repo_root, args.agent, dry_run=args.dry_run, skip_validate=args.skip_validate) unit = f"teleo-agent@{args.agent}.service" enable_result = None start_result = None if args.enable_service: enable_result = run_command(["systemctl", "enable", unit], dry_run=args.dry_run) if args.start_service: start_result = run_command(["systemctl", "start", unit], dry_run=args.dry_run) proof = { "schema": "livingip.telegramAgentTokenInstall.v1", "generatedAt": datetime.now(timezone.utc).isoformat(), "ok": True, "agent": args.agent, "configPath": str(config_path), "tokenPath": str(token_path), "tokenFileWritten": not args.dry_run, "tokenMode": "0600", "owner": None if args.no_chown else args.owner, "group": None if args.no_chown else args.group, "dryRun": args.dry_run, "validation": validate_result, "serviceUnit": unit, "serviceEnabled": enable_result, "serviceStarted": start_result, "secretValuesIncluded": False, } output = json.dumps(proof, indent=2, sort_keys=True) + "\n" if args.output: output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(output) print(output, end="") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) raise SystemExit(1) from None