diff --git a/scripts/check_telegram_leo_wallet_test_runtime.py b/scripts/check_telegram_leo_wallet_test_runtime.py new file mode 100644 index 0000000..d725a7a --- /dev/null +++ b/scripts/check_telegram_leo_wallet_test_runtime.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +"""Verify the disposable Leo wallet-test Telegram runtime without leaking tokens.""" + +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import sys +import urllib.error +import urllib.request +from datetime import datetime, timezone +from pathlib import Path + +TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$") + + +def repo_root_from_script() -> Path: + return Path(__file__).resolve().parents[1] + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Check the Leo wallet-test Telegram bot token, getMe identity, and service state.", + allow_abbrev=False, + ) + parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/.yaml") + parser.add_argument("--repo-root", default=str(repo_root_from_script())) + parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets") + parser.add_argument("--skip-getme", action="store_true", help="Do not call Telegram getMe") + parser.add_argument("--require-token", action="store_true", help="Exit nonzero when token file is missing") + parser.add_argument("--require-service-active", action="store_true", help="Exit nonzero unless systemd says active") + parser.add_argument("--output", default="docs/reports/telegram-leo-wallet-test-runtime-proof.json") + + namespace, unknown = parser.parse_known_args(argv) + if unknown: + print( + "ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.", + file=sys.stderr, + ) + raise SystemExit(2) + return namespace + + +def load_agent_config(repo_root: Path, agent: str): + telegram_dir = repo_root / "telegram" + sys.path.insert(0, str(telegram_dir)) + from agent_config import load_agent_config as load_config + + config_path = telegram_dir / "agents" / f"{agent}.yaml" + config = load_config(str(config_path)) + return config_path, config + + +def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path: + token_file = Path(bot_token_file) + if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}: + raise ValueError("bot_token_file must be a plain filename") + return secrets_dir / token_file.name + + +def run_command(command: list[str]) -> dict: + try: + proc = subprocess.run(command, check=False, text=True, capture_output=True) + except FileNotFoundError: + return { + "command": command, + "returncode": 127, + "stdout": "", + "stderr": "command_unavailable", + } + return { + "command": command, + "returncode": proc.returncode, + "stdout": proc.stdout.strip(), + "stderr": proc.stderr.strip(), + } + + +def systemd_state(unit: str) -> dict: + active = run_command(["systemctl", "is-active", unit]) + enabled = run_command(["systemctl", "is-enabled", unit]) + return { + "unit": unit, + "active": active["stdout"] or "unknown", + "activeReturncode": active["returncode"], + "enabled": enabled["stdout"] or "unknown", + "enabledReturncode": enabled["returncode"], + } + + +def telegram_get_me(token: str, *, timeout_seconds: int = 20) -> dict: + url = f"https://api.telegram.org/bot{token}/getMe" + request = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + status = response.status + body = json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + status = exc.code + try: + body = json.loads(exc.read().decode("utf-8")) + except Exception: + body = {"ok": False, "error_code": exc.code, "description": "non_json_http_error"} + except Exception as exc: + return { + "attempted": True, + "httpStatus": None, + "ok": False, + "errorType": type(exc).__name__, + "secretValuesIncluded": False, + } + + result = body.get("result") if isinstance(body, dict) else None + return { + "attempted": True, + "httpStatus": status, + "ok": bool(body.get("ok")) if isinstance(body, dict) else False, + "botIdPresent": isinstance(result, dict) and bool(result.get("id")), + "isBot": result.get("is_bot") if isinstance(result, dict) else None, + "username": result.get("username") if isinstance(result, dict) else None, + "firstName": result.get("first_name") if isinstance(result, dict) else None, + "canJoinGroups": result.get("can_join_groups") if isinstance(result, dict) else None, + "canReadAllGroupMessages": result.get("can_read_all_group_messages") if isinstance(result, dict) else None, + "supportsInlineQueries": result.get("supports_inline_queries") if isinstance(result, dict) else None, + "secretValuesIncluded": False, + } + + +def build_proof(args: argparse.Namespace) -> tuple[dict, int]: + repo_root = Path(args.repo_root).resolve() + secrets_dir = Path(args.secrets_dir) + config_path, config = load_agent_config(repo_root, args.agent) + token_path = token_path_for(secrets_dir, config.bot_token_file) + unit = f"teleo-agent@{args.agent}.service" + + token_file_present = token_path.exists() + token_shape_valid = False + get_me = {"attempted": False, "ok": False, "secretValuesIncluded": False} + exact_blocker = None + token = None + + if token_file_present: + token = token_path.read_text().strip() + token_shape_valid = bool(TOKEN_RE.match(token)) + if not token_shape_valid: + exact_blocker = "telegram_token_shape_invalid" + else: + exact_blocker = "telegram_token_file_missing" + + if token_file_present and token_shape_valid and not args.skip_getme: + get_me = telegram_get_me(token) + if not get_me.get("ok"): + exact_blocker = "telegram_getme_failed" + + expected_username = config.handle.lstrip("@") + username_matches = ( + bool(get_me.get("username")) + and get_me.get("username", "").lower() == expected_username.lower() + ) + if get_me.get("attempted") and get_me.get("ok") and not username_matches: + exact_blocker = "telegram_getme_username_mismatch" + + service = systemd_state(unit) + service_active = service["active"] == "active" + if args.require_service_active and not service_active: + exact_blocker = exact_blocker or "telegram_service_inactive" + + ok = ( + token_file_present + and token_shape_valid + and (args.skip_getme or (get_me.get("ok") and username_matches)) + and (service_active or not args.require_service_active) + ) + if args.require_token and not token_file_present: + ok = False + + proof = { + "schema": "livingip.telegramLeoWalletTestRuntimeProof.v1", + "generatedAt": datetime.now(timezone.utc).isoformat(), + "ok": ok, + "requiredTier": "T3_live_readonly", + "currentTier": "T3_live_readonly" if ok else "T2_runtime", + "agent": args.agent, + "configPath": str(config_path), + "expectedHandle": config.handle, + "expectedUsername": expected_username, + "tokenPath": str(token_path), + "tokenFilePresent": token_file_present, + "tokenShapeValid": token_shape_valid, + "getMe": get_me, + "usernameMatchesExpected": username_matches if get_me.get("attempted") else None, + "service": service, + "secretValuesIncluded": False, + "exactBlocker": exact_blocker, + "notProven": [ + "Telegram message delivery", + "Telegram reply delivery", + "Telegram-triggered x402 readback", + "Telegram-triggered paid execution", + ], + "strongestClaimAllowed": ( + "This verifier proves the disposable Leo wallet-test Telegram token identity and service state " + "after BotFather token installation. It does not send Telegram messages or prove x402 payment execution." + ), + } + exit_code = 0 if ok or (exact_blocker == "telegram_token_file_missing" and not args.require_token) else 1 + return proof, exit_code + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(sys.argv[1:] if argv is None else argv) + proof, exit_code = build_proof(args) + output = json.dumps(proof, indent=2, sort_keys=True) + "\n" + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(output) + print(output, end="") + return exit_code + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except ValueError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + raise SystemExit(1) from None diff --git a/tests/test_telegram_leo_wallet_test_runtime.py b/tests/test_telegram_leo_wallet_test_runtime.py new file mode 100644 index 0000000..3bfb3e6 --- /dev/null +++ b/tests/test_telegram_leo_wallet_test_runtime.py @@ -0,0 +1,110 @@ +"""Tests for the Leo wallet-test Telegram runtime verifier.""" + +import json +import subprocess +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPT = REPO_ROOT / "scripts" / "check_telegram_leo_wallet_test_runtime.py" + + +def run_checker(args: list[str]) -> subprocess.CompletedProcess: + return subprocess.run( + [sys.executable, str(SCRIPT), *args], + text=True, + capture_output=True, + check=False, + ) + + +def test_missing_token_writes_sanitized_blocker(tmp_path): + proof_path = tmp_path / "proof.json" + proc = run_checker( + [ + "--agent", + "leo-wallet-test", + "--repo-root", + str(REPO_ROOT), + "--secrets-dir", + str(tmp_path / "secrets"), + "--skip-getme", + "--output", + str(proof_path), + ] + ) + + assert proc.returncode == 0, proc.stderr + proof = json.loads(proof_path.read_text()) + assert proof["ok"] is False + assert proof["exactBlocker"] == "telegram_token_file_missing" + assert proof["tokenFilePresent"] is False + assert proof["secretValuesIncluded"] is False + assert "secretValuesIncluded" in proc.stdout + + +def test_invalid_token_shape_fails_without_printing_token(tmp_path): + secrets_dir = tmp_path / "secrets" + secrets_dir.mkdir() + token_path = secrets_dir / "leo-wallet-test-telegram-bot-token" + token = "not-a-valid-token" + token_path.write_text(token) + proof_path = tmp_path / "proof.json" + proc = run_checker( + [ + "--agent", + "leo-wallet-test", + "--repo-root", + str(REPO_ROOT), + "--secrets-dir", + str(secrets_dir), + "--skip-getme", + "--require-token", + "--output", + str(proof_path), + ] + ) + + assert proc.returncode == 1 + assert token not in proc.stdout + assert token not in proc.stderr + proof = json.loads(proof_path.read_text()) + assert proof["exactBlocker"] == "telegram_token_shape_invalid" + assert proof["tokenFilePresent"] is True + assert proof["tokenShapeValid"] is False + assert proof["secretValuesIncluded"] is False + + +def test_getme_result_is_sanitized_and_matches_expected_username(): + module_dir = str(SCRIPT.parent) + sys.path.insert(0, module_dir) + import check_telegram_leo_wallet_test_runtime as checker + + token = "dummy-token-value" + response_body = { + "ok": True, + "result": { + "id": 123456789, + "is_bot": True, + "first_name": "Living IP Leo Wallet Test", + "username": "lipleowallet06222026bot", + "can_join_groups": True, + "can_read_all_group_messages": False, + "supports_inline_queries": False, + }, + } + response = MagicMock() + response.status = 200 + response.read.return_value = json.dumps(response_body).encode("utf-8") + response.__enter__.return_value = response + + with patch("urllib.request.urlopen", return_value=response): + result = checker.telegram_get_me(token) + + serialized = json.dumps(result) + assert result["ok"] is True + assert result["username"] == "lipleowallet06222026bot" + assert result["botIdPresent"] is True + assert result["secretValuesIncluded"] is False + assert token not in serialized