teleo-infrastructure/scripts/install_telegram_agent_token.py
2026-06-22 20:47:44 +02:00

184 lines
6.8 KiB
Python
Executable file

#!/usr/bin/env python3
"""Install a Telegram agent bot token without printing the secret value."""
from __future__ import annotations
import argparse
import getpass
import grp
import json
import os
import pwd
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$")
def repo_root_from_script() -> Path:
return Path(__file__).resolve().parents[1]
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Install a Telegram agent token from stdin or a hidden prompt.",
allow_abbrev=False,
)
parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/<agent>.yaml")
parser.add_argument("--repo-root", default=str(repo_root_from_script()))
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
parser.add_argument("--from-stdin", action="store_true", help="Read the token from stdin instead of a prompt")
parser.add_argument("--owner", default="teleo", help="Token file owner after write")
parser.add_argument("--group", default="teleo", help="Token file group after write")
parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests")
parser.add_argument("--dry-run", action="store_true", help="Validate paths and input without writing")
parser.add_argument("--start-service", action="store_true", help="Run systemctl start teleo-agent@<agent>.service")
parser.add_argument("--enable-service", action="store_true", help="Run systemctl enable teleo-agent@<agent>.service")
parser.add_argument("--skip-validate", action="store_true", help="Skip agent_runner.py --validate")
parser.add_argument("--output", help="Optional sanitized JSON proof path")
namespace, unknown = parser.parse_known_args(argv)
if unknown:
print(
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
file=sys.stderr,
)
raise SystemExit(2)
return namespace
def read_token(*, from_stdin: bool) -> str:
token = sys.stdin.read() if from_stdin else getpass.getpass("Telegram bot token: ")
return token.strip()
def validate_token(token: str) -> None:
if not TOKEN_RE.match(token):
raise ValueError("Telegram bot token shape is invalid")
def load_agent_config(repo_root: Path, agent: str):
telegram_dir = repo_root / "telegram"
sys.path.insert(0, str(telegram_dir))
from agent_config import load_agent_config as load_config
config_path = telegram_dir / "agents" / f"{agent}.yaml"
config = load_config(str(config_path))
return config_path, config
def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path:
token_file = Path(bot_token_file)
if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}:
raise ValueError("bot_token_file must be a plain filename")
return secrets_dir / token_file.name
def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]:
if no_chown:
return None, None
return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid
def write_token_file(token: str, token_path: Path, *, uid: int | None, gid: int | None, dry_run: bool) -> None:
if dry_run:
return
token_path.parent.mkdir(parents=True, mode=0o700, exist_ok=True)
tmp_path = token_path.with_name(f".{token_path.name}.tmp-{os.getpid()}")
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
fd = os.open(tmp_path, flags, 0o600)
try:
with os.fdopen(fd, "w") as handle:
handle.write(token)
handle.write("\n")
handle.flush()
os.fsync(handle.fileno())
os.chmod(tmp_path, 0o600)
if uid is not None or gid is not None:
os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid)
os.replace(tmp_path, token_path)
os.chmod(token_path, 0o600)
finally:
if tmp_path.exists():
tmp_path.unlink()
def run_command(command: list[str], *, dry_run: bool) -> dict:
if dry_run:
return {"command": command, "skipped": "dry_run"}
proc = subprocess.run(command, check=False, text=True, capture_output=True)
return {
"command": command,
"returncode": proc.returncode,
"stdout": proc.stdout.strip(),
"stderr": proc.stderr.strip(),
}
def validate_agent(repo_root: Path, agent: str, *, dry_run: bool, skip_validate: bool) -> dict | None:
if skip_validate:
return None
runner = repo_root / "telegram" / "agent_runner.py"
return run_command([sys.executable, str(runner), "--agent", agent, "--validate"], dry_run=dry_run)
def main(argv: list[str] | None = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
repo_root = Path(args.repo_root).resolve()
secrets_dir = Path(args.secrets_dir)
token = read_token(from_stdin=args.from_stdin)
config_path, config = load_agent_config(repo_root, args.agent)
token_path = token_path_for(secrets_dir, config.bot_token_file)
validate_token(token)
uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown)
write_token_file(token, token_path, uid=uid, gid=gid, dry_run=args.dry_run)
validate_result = validate_agent(repo_root, args.agent, dry_run=args.dry_run, skip_validate=args.skip_validate)
unit = f"teleo-agent@{args.agent}.service"
enable_result = None
start_result = None
if args.enable_service:
enable_result = run_command(["systemctl", "enable", unit], dry_run=args.dry_run)
if args.start_service:
start_result = run_command(["systemctl", "start", unit], dry_run=args.dry_run)
proof = {
"schema": "livingip.telegramAgentTokenInstall.v1",
"generatedAt": datetime.now(timezone.utc).isoformat(),
"ok": True,
"agent": args.agent,
"configPath": str(config_path),
"tokenPath": str(token_path),
"tokenFileWritten": not args.dry_run,
"tokenMode": "0600",
"owner": None if args.no_chown else args.owner,
"group": None if args.no_chown else args.group,
"dryRun": args.dry_run,
"validation": validate_result,
"serviceUnit": unit,
"serviceEnabled": enable_result,
"serviceStarted": start_result,
"secretValuesIncluded": False,
}
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(output)
print(output, end="")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except ValueError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1) from None