Merge pull request #10 from living-ip/codex/leo-wallet-test-token-installer-20260622
Add safe Telegram agent token installer
This commit is contained in:
commit
2433ed2d8a
2 changed files with 252 additions and 0 deletions
184
scripts/install_telegram_agent_token.py
Executable file
184
scripts/install_telegram_agent_token.py
Executable file
|
|
@ -0,0 +1,184 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Install a Telegram agent bot token without printing the secret value."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import getpass
|
||||
import grp
|
||||
import json
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$")
|
||||
|
||||
|
||||
def repo_root_from_script() -> Path:
|
||||
return Path(__file__).resolve().parents[1]
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Install a Telegram agent token from stdin or a hidden prompt.",
|
||||
allow_abbrev=False,
|
||||
)
|
||||
parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/<agent>.yaml")
|
||||
parser.add_argument("--repo-root", default=str(repo_root_from_script()))
|
||||
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
|
||||
parser.add_argument("--from-stdin", action="store_true", help="Read the token from stdin instead of a prompt")
|
||||
parser.add_argument("--owner", default="teleo", help="Token file owner after write")
|
||||
parser.add_argument("--group", default="teleo", help="Token file group after write")
|
||||
parser.add_argument("--no-chown", action="store_true", help="Skip chown; useful for local tests")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Validate paths and input without writing")
|
||||
parser.add_argument("--start-service", action="store_true", help="Run systemctl start teleo-agent@<agent>.service")
|
||||
parser.add_argument("--enable-service", action="store_true", help="Run systemctl enable teleo-agent@<agent>.service")
|
||||
parser.add_argument("--skip-validate", action="store_true", help="Skip agent_runner.py --validate")
|
||||
parser.add_argument("--output", help="Optional sanitized JSON proof path")
|
||||
|
||||
namespace, unknown = parser.parse_known_args(argv)
|
||||
if unknown:
|
||||
print(
|
||||
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
raise SystemExit(2)
|
||||
return namespace
|
||||
|
||||
|
||||
def read_token(*, from_stdin: bool) -> str:
|
||||
token = sys.stdin.read() if from_stdin else getpass.getpass("Telegram bot token: ")
|
||||
return token.strip()
|
||||
|
||||
|
||||
def validate_token(token: str) -> None:
|
||||
if not TOKEN_RE.match(token):
|
||||
raise ValueError("Telegram bot token shape is invalid")
|
||||
|
||||
|
||||
def load_agent_config(repo_root: Path, agent: str):
|
||||
telegram_dir = repo_root / "telegram"
|
||||
sys.path.insert(0, str(telegram_dir))
|
||||
from agent_config import load_agent_config as load_config
|
||||
|
||||
config_path = telegram_dir / "agents" / f"{agent}.yaml"
|
||||
config = load_config(str(config_path))
|
||||
return config_path, config
|
||||
|
||||
|
||||
def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path:
|
||||
token_file = Path(bot_token_file)
|
||||
if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}:
|
||||
raise ValueError("bot_token_file must be a plain filename")
|
||||
return secrets_dir / token_file.name
|
||||
|
||||
|
||||
def resolve_owner_group(owner: str, group: str, *, no_chown: bool) -> tuple[int | None, int | None]:
|
||||
if no_chown:
|
||||
return None, None
|
||||
return pwd.getpwnam(owner).pw_uid, grp.getgrnam(group).gr_gid
|
||||
|
||||
|
||||
def write_token_file(token: str, token_path: Path, *, uid: int | None, gid: int | None, dry_run: bool) -> None:
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
token_path.parent.mkdir(parents=True, mode=0o700, exist_ok=True)
|
||||
tmp_path = token_path.with_name(f".{token_path.name}.tmp-{os.getpid()}")
|
||||
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
|
||||
fd = os.open(tmp_path, flags, 0o600)
|
||||
try:
|
||||
with os.fdopen(fd, "w") as handle:
|
||||
handle.write(token)
|
||||
handle.write("\n")
|
||||
handle.flush()
|
||||
os.fsync(handle.fileno())
|
||||
os.chmod(tmp_path, 0o600)
|
||||
if uid is not None or gid is not None:
|
||||
os.chown(tmp_path, -1 if uid is None else uid, -1 if gid is None else gid)
|
||||
os.replace(tmp_path, token_path)
|
||||
os.chmod(token_path, 0o600)
|
||||
finally:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
|
||||
|
||||
def run_command(command: list[str], *, dry_run: bool) -> dict:
|
||||
if dry_run:
|
||||
return {"command": command, "skipped": "dry_run"}
|
||||
proc = subprocess.run(command, check=False, text=True, capture_output=True)
|
||||
return {
|
||||
"command": command,
|
||||
"returncode": proc.returncode,
|
||||
"stdout": proc.stdout.strip(),
|
||||
"stderr": proc.stderr.strip(),
|
||||
}
|
||||
|
||||
|
||||
def validate_agent(repo_root: Path, agent: str, *, dry_run: bool, skip_validate: bool) -> dict | None:
|
||||
if skip_validate:
|
||||
return None
|
||||
runner = repo_root / "telegram" / "agent_runner.py"
|
||||
return run_command([sys.executable, str(runner), "--agent", agent, "--validate"], dry_run=dry_run)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(sys.argv[1:] if argv is None else argv)
|
||||
repo_root = Path(args.repo_root).resolve()
|
||||
secrets_dir = Path(args.secrets_dir)
|
||||
token = read_token(from_stdin=args.from_stdin)
|
||||
|
||||
config_path, config = load_agent_config(repo_root, args.agent)
|
||||
token_path = token_path_for(secrets_dir, config.bot_token_file)
|
||||
validate_token(token)
|
||||
uid, gid = resolve_owner_group(args.owner, args.group, no_chown=args.no_chown)
|
||||
|
||||
write_token_file(token, token_path, uid=uid, gid=gid, dry_run=args.dry_run)
|
||||
validate_result = validate_agent(repo_root, args.agent, dry_run=args.dry_run, skip_validate=args.skip_validate)
|
||||
|
||||
unit = f"teleo-agent@{args.agent}.service"
|
||||
enable_result = None
|
||||
start_result = None
|
||||
if args.enable_service:
|
||||
enable_result = run_command(["systemctl", "enable", unit], dry_run=args.dry_run)
|
||||
if args.start_service:
|
||||
start_result = run_command(["systemctl", "start", unit], dry_run=args.dry_run)
|
||||
|
||||
proof = {
|
||||
"schema": "livingip.telegramAgentTokenInstall.v1",
|
||||
"generatedAt": datetime.now(timezone.utc).isoformat(),
|
||||
"ok": True,
|
||||
"agent": args.agent,
|
||||
"configPath": str(config_path),
|
||||
"tokenPath": str(token_path),
|
||||
"tokenFileWritten": not args.dry_run,
|
||||
"tokenMode": "0600",
|
||||
"owner": None if args.no_chown else args.owner,
|
||||
"group": None if args.no_chown else args.group,
|
||||
"dryRun": args.dry_run,
|
||||
"validation": validate_result,
|
||||
"serviceUnit": unit,
|
||||
"serviceEnabled": enable_result,
|
||||
"serviceStarted": start_result,
|
||||
"secretValuesIncluded": False,
|
||||
}
|
||||
|
||||
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
|
||||
if args.output:
|
||||
output_path = Path(args.output)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(output)
|
||||
print(output, end="")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except ValueError as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
raise SystemExit(1) from None
|
||||
68
tests/test_install_telegram_agent_token.py
Normal file
68
tests/test_install_telegram_agent_token.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
"""Tests for safe Telegram agent token installation."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
SCRIPT = REPO_ROOT / "scripts" / "install_telegram_agent_token.py"
|
||||
|
||||
|
||||
def run_installer(args: list[str], *, token: str = "123456789:abcdefghijklmnopqrstuvwxyzABC") -> subprocess.CompletedProcess:
|
||||
return subprocess.run(
|
||||
[sys.executable, str(SCRIPT), *args],
|
||||
input=token,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
)
|
||||
|
||||
|
||||
def test_installs_leo_wallet_test_token_from_stdin_without_echoing_secret(tmp_path):
|
||||
token = "123456789:abcdefghijklmnopqrstuvwxyzABC"
|
||||
proof_path = tmp_path / "proof.json"
|
||||
proc = run_installer(
|
||||
[
|
||||
"--agent",
|
||||
"leo-wallet-test",
|
||||
"--repo-root",
|
||||
str(REPO_ROOT),
|
||||
"--secrets-dir",
|
||||
str(tmp_path / "secrets"),
|
||||
"--from-stdin",
|
||||
"--no-chown",
|
||||
"--skip-validate",
|
||||
"--output",
|
||||
str(proof_path),
|
||||
],
|
||||
token=token,
|
||||
)
|
||||
|
||||
assert proc.returncode == 0, proc.stderr
|
||||
assert token not in proc.stdout
|
||||
assert token not in proc.stderr
|
||||
|
||||
proof = json.loads(proof_path.read_text())
|
||||
token_path = Path(proof["tokenPath"])
|
||||
assert proof["ok"] is True
|
||||
assert proof["agent"] == "leo-wallet-test"
|
||||
assert proof["secretValuesIncluded"] is False
|
||||
assert proof["tokenFileWritten"] is True
|
||||
assert token not in proof_path.read_text()
|
||||
|
||||
assert token_path.read_text().strip() == token
|
||||
mode = stat.S_IMODE(os.stat(token_path).st_mode)
|
||||
assert mode == 0o600
|
||||
|
||||
|
||||
def test_refuses_cli_token_argument_without_echoing_secret():
|
||||
token = "123456789:abcdefghijklmnopqrstuvwxyzABC"
|
||||
proc = run_installer(["--token", token], token="")
|
||||
|
||||
combined_output = proc.stdout + proc.stderr
|
||||
assert proc.returncode == 2
|
||||
assert token not in combined_output
|
||||
assert "Secret-bearing CLI args are not accepted" in combined_output
|
||||
Loading…
Reference in a new issue