diff --git a/scripts/install_telegram_agent_token.py b/scripts/install_telegram_agent_token.py new file mode 100755 index 0000000..0b2b973 --- /dev/null +++ b/scripts/install_telegram_agent_token.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +"""Install a Telegram agent bot token without printing the secret value.""" + +from __future__ import annotations + +import argparse +import getpass +import grp +import json +import os +import pwd +import re +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + +TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$") + + +def repo_root_from_script() -> Path: + return Path(__file__).resolve().parents[1] + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Install a Telegram agent token from stdin or a hidden prompt.", + allow_abbrev=False, + ) + parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/.yaml") + parser.add_argument("--repo-root", default=str(repo_root_from_script())) + parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets") + parser.add_argument("--from-stdin", action="store_true", help="Read the token from stdin instead of a prompt") + parser.add_argument("--owner", default="teleo", help="Token file owner after write") + parser.add_argument("--group", default="teleo", help="Token file group after write") + parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests") + parser.add_argument("--dry-run", action="store_true", help="Validate paths and input without writing") + parser.add_argument("--start-service", action="store_true", help="Run systemctl start teleo-agent@.service") + parser.add_argument("--enable-service", action="store_true", help="Run systemctl enable teleo-agent@.service") + parser.add_argument("--skip-validate", action="store_true", help="Skip agent_runner.py --validate") + parser.add_argument("--output", help="Optional sanitized JSON proof path") + + namespace, unknown = parser.parse_known_args(argv) + if unknown: + print( + "ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.", + file=sys.stderr, + ) + raise SystemExit(2) + return namespace + + +def read_token(*, from_stdin: bool) -> str: + token = sys.stdin.read() if from_stdin else getpass.getpass("Telegram bot token: ") + return token.strip() + + +def validate_token(token: str) -> None: + if not TOKEN_RE.match(token): + raise ValueError("Telegram bot token shape is invalid") + + +def load_agent_config(repo_root: Path, agent: str): + telegram_dir = repo_root / "telegram" + sys.path.insert(0, str(telegram_dir)) + from agent_config import load_agent_config as load_config + + config_path = telegram_dir / "agents" / f"{agent}.yaml" + config = load_config(str(config_path)) + return config_path, config + + +def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path: + token_file = Path(bot_token_file) + if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}: + raise ValueError("bot_token_file must be a plain filename") + return secrets_dir / token_file.name + + +def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]: + if no_chown: + return None, None + return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid + + +def write_token_file(token: str, token_path: Path, *, uid: int | None, gid: int | None, dry_run: bool) -> None: + if dry_run: + return + + token_path.parent.mkdir(parents=True, mode=0o700, exist_ok=True) + tmp_path = token_path.with_name(f".{token_path.name}.tmp-{os.getpid()}") + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + fd = os.open(tmp_path, flags, 0o600) + try: + with os.fdopen(fd, "w") as handle: + handle.write(token) + handle.write("\n") + handle.flush() + os.fsync(handle.fileno()) + os.chmod(tmp_path, 0o600) + if uid is not None or gid is not None: + os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid) + os.replace(tmp_path, token_path) + os.chmod(token_path, 0o600) + finally: + if tmp_path.exists(): + tmp_path.unlink() + + +def run_command(command: list[str], *, dry_run: bool) -> dict: + if dry_run: + return {"command": command, "skipped": "dry_run"} + proc = subprocess.run(command, check=False, text=True, capture_output=True) + return { + "command": command, + "returncode": proc.returncode, + "stdout": proc.stdout.strip(), + "stderr": proc.stderr.strip(), + } + + +def validate_agent(repo_root: Path, agent: str, *, dry_run: bool, skip_validate: bool) -> dict | None: + if skip_validate: + return None + runner = repo_root / "telegram" / "agent_runner.py" + return run_command([sys.executable, str(runner), "--agent", agent, "--validate"], dry_run=dry_run) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(sys.argv[1:] if argv is None else argv) + repo_root = Path(args.repo_root).resolve() + secrets_dir = Path(args.secrets_dir) + token = read_token(from_stdin=args.from_stdin) + + config_path, config = load_agent_config(repo_root, args.agent) + token_path = token_path_for(secrets_dir, config.bot_token_file) + validate_token(token) + uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown) + + write_token_file(token, token_path, uid=uid, gid=gid, dry_run=args.dry_run) + validate_result = validate_agent(repo_root, args.agent, dry_run=args.dry_run, skip_validate=args.skip_validate) + + unit = f"teleo-agent@{args.agent}.service" + enable_result = None + start_result = None + if args.enable_service: + enable_result = run_command(["systemctl", "enable", unit], dry_run=args.dry_run) + if args.start_service: + start_result = run_command(["systemctl", "start", unit], dry_run=args.dry_run) + + proof = { + "schema": "livingip.telegramAgentTokenInstall.v1", + "generatedAt": datetime.now(timezone.utc).isoformat(), + "ok": True, + "agent": args.agent, + "configPath": str(config_path), + "tokenPath": str(token_path), + "tokenFileWritten": not args.dry_run, + "tokenMode": "0600", + "owner": None if args.no_chown else args.owner, + "group": None if args.no_chown else args.group, + "dryRun": args.dry_run, + "validation": validate_result, + "serviceUnit": unit, + "serviceEnabled": enable_result, + "serviceStarted": start_result, + "secretValuesIncluded": False, + } + + output = json.dumps(proof, indent=2, sort_keys=True) + "\n" + if args.output: + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(output) + print(output, end="") + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except ValueError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + raise SystemExit(1) from None diff --git a/tests/test_install_telegram_agent_token.py b/tests/test_install_telegram_agent_token.py new file mode 100644 index 0000000..eed2162 --- /dev/null +++ b/tests/test_install_telegram_agent_token.py @@ -0,0 +1,68 @@ +"""Tests for safe Telegram agent token installation.""" + +import json +import os +import stat +import subprocess +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPT = REPO_ROOT / "scripts" / "install_telegram_agent_token.py" + + +def run_installer(args: list[str], *, token: str = "123456789:abcdefghijklmnopqrstuvwxyzABC") -> subprocess.CompletedProcess: + return subprocess.run( + [sys.executable, str(SCRIPT), *args], + input=token, + text=True, + capture_output=True, + check=False, + ) + + +def test_installs_leo_wallet_test_token_from_stdin_without_echoing_secret(tmp_path): + token = "123456789:abcdefghijklmnopqrstuvwxyzABC" + proof_path = tmp_path / "proof.json" + proc = run_installer( + [ + "--agent", + "leo-wallet-test", + "--repo-root", + str(REPO_ROOT), + "--secrets-dir", + str(tmp_path / "secrets"), + "--from-stdin", + "--no-chown", + "--skip-validate", + "--output", + str(proof_path), + ], + token=token, + ) + + assert proc.returncode == 0, proc.stderr + assert token not in proc.stdout + assert token not in proc.stderr + + proof = json.loads(proof_path.read_text()) + token_path = Path(proof["tokenPath"]) + assert proof["ok"] is True + assert proof["agent"] == "leo-wallet-test" + assert proof["secretValuesIncluded"] is False + assert proof["tokenFileWritten"] is True + assert token not in proof_path.read_text() + + assert token_path.read_text().strip() == token + mode = stat.S_IMODE(os.stat(token_path).st_mode) + assert mode == 0o600 + + +def test_refuses_cli_token_argument_without_echoing_secret(): + token = "123456789:abcdefghijklmnopqrstuvwxyzABC" + proc = run_installer(["--token", token], token="") + + combined_output = proc.stdout + proc.stderr + assert proc.returncode == 2 + assert token not in combined_output + assert "Secret-bearing CLI args are not accepted" in combined_output