teleo-infrastructure/scripts/check_telegram_leo_wallet_test_runtime.py

228 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""Verify the disposable Leo wallet-test Telegram runtime without leaking tokens."""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$")
def repo_root_from_script() -> Path:
return Path(__file__).resolve().parents[1]
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Check the Leo wallet-test Telegram bot token, getMe identity, and service state.",
allow_abbrev=False,
)
parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/<agent>.yaml")
parser.add_argument("--repo-root", default=str(repo_root_from_script()))
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
parser.add_argument("--skip-getme", action="store_true", help="Do not call Telegram getMe")
parser.add_argument("--require-token", action="store_true", help="Exit nonzero when token file is missing")
parser.add_argument("--require-service-active", action="store_true", help="Exit nonzero unless systemd says active")
parser.add_argument("--output", default="docs/reports/telegram-leo-wallet-test-runtime-proof.json")
namespace, unknown = parser.parse_known_args(argv)
if unknown:
print(
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
file=sys.stderr,
)
raise SystemExit(2)
return namespace
def load_agent_config(repo_root: Path, agent: str):
telegram_dir = repo_root / "telegram"
sys.path.insert(0, str(telegram_dir))
from agent_config import load_agent_config as load_config
config_path = telegram_dir / "agents" / f"{agent}.yaml"
config = load_config(str(config_path))
return config_path, config
def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path:
token_file = Path(bot_token_file)
if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}:
raise ValueError("bot_token_file must be a plain filename")
return secrets_dir / token_file.name
def run_command(command: list[str]) -> dict:
try:
proc = subprocess.run(command, check=False, text=True, capture_output=True)
except FileNotFoundError:
return {
"command": command,
"returncode": 127,
"stdout": "",
"stderr": "command_unavailable",
}
return {
"command": command,
"returncode": proc.returncode,
"stdout": proc.stdout.strip(),
"stderr": proc.stderr.strip(),
}
def systemd_state(unit: str) -> dict:
active = run_command(["systemctl", "is-active", unit])
enabled = run_command(["systemctl", "is-enabled", unit])
return {
"unit": unit,
"active": active["stdout"] or "unknown",
"activeReturncode": active["returncode"],
"enabled": enabled["stdout"] or "unknown",
"enabledReturncode": enabled["returncode"],
}
def telegram_get_me(token: str, *, timeout_seconds: int = 20) -> dict:
url = f"https://api.telegram.org/bot{token}/getMe"
request = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
status = response.status
body = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
status = exc.code
try:
body = json.loads(exc.read().decode("utf-8"))
except Exception:
body = {"ok": False, "error_code": exc.code, "description": "non_json_http_error"}
except Exception as exc:
return {
"attempted": True,
"httpStatus": None,
"ok": False,
"errorType": type(exc).__name__,
"secretValuesIncluded": False,
}
result = body.get("result") if isinstance(body, dict) else None
return {
"attempted": True,
"httpStatus": status,
"ok": bool(body.get("ok")) if isinstance(body, dict) else False,
"botIdPresent": isinstance(result, dict) and bool(result.get("id")),
"isBot": result.get("is_bot") if isinstance(result, dict) else None,
"username": result.get("username") if isinstance(result, dict) else None,
"firstName": result.get("first_name") if isinstance(result, dict) else None,
"canJoinGroups": result.get("can_join_groups") if isinstance(result, dict) else None,
"canReadAllGroupMessages": result.get("can_read_all_group_messages") if isinstance(result, dict) else None,
"supportsInlineQueries": result.get("supports_inline_queries") if isinstance(result, dict) else None,
"secretValuesIncluded": False,
}
def build_proof(args: argparse.Namespace) -> tuple[dict, int]:
repo_root = Path(args.repo_root).resolve()
secrets_dir = Path(args.secrets_dir)
config_path, config = load_agent_config(repo_root, args.agent)
token_path = token_path_for(secrets_dir, config.bot_token_file)
unit = f"teleo-agent@{args.agent}.service"
token_file_present = token_path.exists()
token_shape_valid = False
get_me = {"attempted": False, "ok": False, "secretValuesIncluded": False}
exact_blocker = None
token = None
if token_file_present:
token = token_path.read_text().strip()
token_shape_valid = bool(TOKEN_RE.match(token))
if not token_shape_valid:
exact_blocker = "telegram_token_shape_invalid"
else:
exact_blocker = "telegram_token_file_missing"
if token_file_present and token_shape_valid and not args.skip_getme:
get_me = telegram_get_me(token)
if not get_me.get("ok"):
exact_blocker = "telegram_getme_failed"
expected_username = config.handle.lstrip("@")
username_matches = (
bool(get_me.get("username"))
and get_me.get("username", "").lower() == expected_username.lower()
)
if get_me.get("attempted") and get_me.get("ok") and not username_matches:
exact_blocker = "telegram_getme_username_mismatch"
service = systemd_state(unit)
service_active = service["active"] == "active"
if args.require_service_active and not service_active:
exact_blocker = exact_blocker or "telegram_service_inactive"
ok = (
token_file_present
and token_shape_valid
and (args.skip_getme or (get_me.get("ok") and username_matches))
and (service_active or not args.require_service_active)
)
if args.require_token and not token_file_present:
ok = False
proof = {
"schema": "livingip.telegramLeoWalletTestRuntimeProof.v1",
"generatedAt": datetime.now(timezone.utc).isoformat(),
"ok": ok,
"requiredTier": "T3_live_readonly",
"currentTier": "T3_live_readonly" if ok else "T2_runtime",
"agent": args.agent,
"configPath": str(config_path),
"expectedHandle": config.handle,
"expectedUsername": expected_username,
"tokenPath": str(token_path),
"tokenFilePresent": token_file_present,
"tokenShapeValid": token_shape_valid,
"getMe": get_me,
"usernameMatchesExpected": username_matches if get_me.get("attempted") else None,
"service": service,
"secretValuesIncluded": False,
"exactBlocker": exact_blocker,
"notProven": [
"Telegram message delivery",
"Telegram reply delivery",
"Telegram-triggered x402 readback",
"Telegram-triggered paid execution",
],
"strongestClaimAllowed": (
"This verifier proves the disposable Leo wallet-test Telegram token identity and service state "
"after BotFather token installation. It does not send Telegram messages or prove x402 payment execution."
),
}
exit_code = 0 if ok or (exact_blocker == "telegram_token_file_missing" and not args.require_token) else 1
return proof, exit_code
def main(argv: list[str] | None = None) -> int:
args = parse_args(sys.argv[1:] if argv is None else argv)
proof, exit_code = build_proof(args)
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(output)
print(output, end="")
return exit_code
if __name__ == "__main__":
try:
raise SystemExit(main())
except ValueError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
raise SystemExit(1) from None