Add safe Telegram agent token installer

This commit is contained in:
twentyOne2x 2026-06-22 20:47:44 +02:00
parent 9c29322972
commit 84e1269900
2 changed files with 252 additions and 0 deletions

View file

@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""Install a Telegram agent bot token without printing the secret value."""
from __future__ import annotations
import argparse
import getpass
import grp
import json
import os
import pwd
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$")
def repo_root_from_script() -> Path:
return Path(__file__).resolve().parents[1]
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Install a Telegram agent token from stdin or a hidden prompt.",
allow_abbrev=False,
)
parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/<agent>.yaml")
parser.add_argument("--repo-root", default=str(repo_root_from_script()))
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
parser.add_argument("--from-stdin", action="store_true", help="Read the token from stdin instead of a prompt")
parser.add_argument("--owner", default="teleo", help="Token file owner after write")
parser.add_argument("--group", default="teleo", help="Token file group after write")
parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests")
parser.add_argument("--dry-run", action="store_true", help="Validate paths and input without writing")
parser.add_argument("--start-service", action="store_true", help="Run systemctl start teleo-agent@<agent>.service")
parser.add_argument("--enable-service", action="store_true", help="Run systemctl enable teleo-agent@<agent>.service")
parser.add_argument("--skip-validate", action="store_true", help="Skip agent_runner.py --validate")
parser.add_argument("--output", help="Optional sanitized JSON proof path")
namespace, unknown = parser.parse_known_args(argv)
if unknown:
print(
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
file=sys.stderr,
)
raise SystemExit(2)
return namespace
def read_token(*, from_stdin: bool) -> str:
token = sys.stdin.read() if from_stdin else getpass.getpass("Telegram bot token: ")
return token.strip()
def validate_token(token: str) -> None:
if not TOKEN_RE.match(token):
raise ValueError("Telegram bot token shape is invalid")
def load_agent_config(repo_root: Path, agent: str):
telegram_dir = repo_root / "telegram"
sys.path.insert(0, str(telegram_dir))
from agent_config import load_agent_config as load_config
config_path = telegram_dir / "agents" / f"{agent}.yaml"
config = load_config(str(config_path))
return config_path, config
def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path:
token_file = Path(bot_token_file)
if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}:
raise ValueError("bot_token_file must be a plain filename")
return secrets_dir / token_file.name
def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]:
if no_chown:
return None, None
return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid
def write_token_file(token: str, token_path: Path, *, uid: int | None, gid: int | None, dry_run: bool) -> None:
if dry_run:
return
token_path.parent.mkdir(parents=True, mode=0o700, exist_ok=True)
tmp_path = token_path.with_name(f".{token_path.name}.tmp-{os.getpid()}")
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
fd = os.open(tmp_path, flags, 0o600)
try:
with os.fdopen(fd, "w") as handle:
handle.write(token)
handle.write("\n")
handle.flush()
os.fsync(handle.fileno())
os.chmod(tmp_path, 0o600)
if uid is not None or gid is not None:
os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid)
os.replace(tmp_path, token_path)
os.chmod(token_path, 0o600)
finally:
if tmp_path.exists():
tmp_path.unlink()
def run_command(command: list[str], *, dry_run: bool) -> dict:
if dry_run:
return {"command": command, "skipped": "dry_run"}
proc = subprocess.run(command, check=False, text=True, capture_output=True)
return {
"command": command,
"returncode": proc.returncode,
"stdout": proc.stdout.strip(),
"stderr": proc.stderr.strip(),
}
def validate_agent(repo_root: Path, agent: str, *, dry_run: bool, skip_validate: bool) -> dict | None:
if skip_validate:
return None
runner = repo_root / "telegram" / "agent_runner.py"
return run_command([sys.executable, str(runner), "--agent", agent, "--validate"], dry_run=dry_run)
def main(argv: list[str] | None = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
repo_root = Path(args.repo_root).resolve()
secrets_dir = Path(args.secrets_dir)
token = read_token(from_stdin=args.from_stdin)
config_path, config = load_agent_config(repo_root, args.agent)
token_path = token_path_for(secrets_dir, config.bot_token_file)
validate_token(token)
uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown)
write_token_file(token, token_path, uid=uid, gid=gid, dry_run=args.dry_run)
validate_result = validate_agent(repo_root, args.agent, dry_run=args.dry_run, skip_validate=args.skip_validate)
unit = f"teleo-agent@{args.agent}.service"
enable_result = None
start_result = None
if args.enable_service:
enable_result = run_command(["systemctl", "enable", unit], dry_run=args.dry_run)
if args.start_service:
start_result = run_command(["systemctl", "start", unit], dry_run=args.dry_run)
proof = {
"schema": "livingip.telegramAgentTokenInstall.v1",
"generatedAt": datetime.now(timezone.utc).isoformat(),
"ok": True,
"agent": args.agent,
"configPath": str(config_path),
"tokenPath": str(token_path),
"tokenFileWritten": not args.dry_run,
"tokenMode": "0600",
"owner": None if args.no_chown else args.owner,
"group": None if args.no_chown else args.group,
"dryRun": args.dry_run,
"validation": validate_result,
"serviceUnit": unit,
"serviceEnabled": enable_result,
"serviceStarted": start_result,
"secretValuesIncluded": False,
}
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(output)
print(output, end="")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except ValueError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1) from None

View file

@ -0,0 +1,68 @@
"""Tests for safe Telegram agent token installation."""
import json
import os
import stat
import subprocess
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPT = REPO_ROOT / "scripts" / "install_telegram_agent_token.py"
def run_installer(args: list[str], *, token: str = "123456789:abcdefghijklmnopqrstuvwxyzABC") -> subprocess.CompletedProcess:
return subprocess.run(
[sys.executable, str(SCRIPT), *args],
input=token,
text=True,
capture_output=True,
check=False,
)
def test_installs_leo_wallet_test_token_from_stdin_without_echoing_secret(tmp_path):
token = "123456789:abcdefghijklmnopqrstuvwxyzABC"
proof_path = tmp_path / "proof.json"
proc = run_installer(
[
"--agent",
"leo-wallet-test",
"--repo-root",
str(REPO_ROOT),
"--secrets-dir",
str(tmp_path / "secrets"),
"--from-stdin",
"--no-chown",
"--skip-validate",
"--output",
str(proof_path),
],
token=token,
)
assert proc.returncode == 0, proc.stderr
assert token not in proc.stdout
assert token not in proc.stderr
proof = json.loads(proof_path.read_text())
token_path = Path(proof["tokenPath"])
assert proof["ok"] is True
assert proof["agent"] == "leo-wallet-test"
assert proof["secretValuesIncluded"] is False
assert proof["tokenFileWritten"] is True
assert token not in proof_path.read_text()
assert token_path.read_text().strip() == token
mode = stat.S_IMODE(os.stat(token_path).st_mode)
assert mode == 0o600
def test_refuses_cli_token_argument_without_echoing_secret():
token = "123456789:abcdefghijklmnopqrstuvwxyzABC"
proc = run_installer(["--token", token], token="")
combined_output = proc.stdout + proc.stderr
assert proc.returncode == 2
assert token not in combined_output
assert "Secret-bearing CLI args are not accepted" in combined_output