228 lines
8.6 KiB
Python
228 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Verify the disposable Leo wallet-test Telegram runtime without leaking tokens."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$")
|
|
|
|
|
|
def repo_root_from_script() -> Path:
|
|
return Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Check the Leo wallet-test Telegram bot token, getMe identity, and service state.",
|
|
allow_abbrev=False,
|
|
)
|
|
parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/<agent>.yaml")
|
|
parser.add_argument("--repo-root", default=str(repo_root_from_script()))
|
|
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
|
|
parser.add_argument("--skip-getme", action="store_true", help="Do not call Telegram getMe")
|
|
parser.add_argument("--require-token", action="store_true", help="Exit nonzero when token file is missing")
|
|
parser.add_argument("--require-service-active", action="store_true", help="Exit nonzero unless systemd says active")
|
|
parser.add_argument("--output", default="docs/reports/telegram-leo-wallet-test-runtime-proof.json")
|
|
|
|
namespace, unknown = parser.parse_known_args(argv)
|
|
if unknown:
|
|
print(
|
|
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
return namespace
|
|
|
|
|
|
def load_agent_config(repo_root: Path, agent: str):
|
|
telegram_dir = repo_root / "telegram"
|
|
sys.path.insert(0, str(telegram_dir))
|
|
from agent_config import load_agent_config as load_config
|
|
|
|
config_path = telegram_dir / "agents" / f"{agent}.yaml"
|
|
config = load_config(str(config_path))
|
|
return config_path, config
|
|
|
|
|
|
def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path:
|
|
token_file = Path(bot_token_file)
|
|
if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}:
|
|
raise ValueError("bot_token_file must be a plain filename")
|
|
return secrets_dir / token_file.name
|
|
|
|
|
|
def run_command(command: list[str]) -> dict:
|
|
try:
|
|
proc = subprocess.run(command, check=False, text=True, capture_output=True)
|
|
except FileNotFoundError:
|
|
return {
|
|
"command": command,
|
|
"returncode": 127,
|
|
"stdout": "",
|
|
"stderr": "command_unavailable",
|
|
}
|
|
return {
|
|
"command": command,
|
|
"returncode": proc.returncode,
|
|
"stdout": proc.stdout.strip(),
|
|
"stderr": proc.stderr.strip(),
|
|
}
|
|
|
|
|
|
def systemd_state(unit: str) -> dict:
|
|
active = run_command(["systemctl", "is-active", unit])
|
|
enabled = run_command(["systemctl", "is-enabled", unit])
|
|
return {
|
|
"unit": unit,
|
|
"active": active["stdout"] or "unknown",
|
|
"activeReturncode": active["returncode"],
|
|
"enabled": enabled["stdout"] or "unknown",
|
|
"enabledReturncode": enabled["returncode"],
|
|
}
|
|
|
|
|
|
def telegram_get_me(token: str, *, timeout_seconds: int = 20) -> dict:
|
|
url = f"https://api.telegram.org/bot{token}/getMe"
|
|
request = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
|
|
status = response.status
|
|
body = json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
status = exc.code
|
|
try:
|
|
body = json.loads(exc.read().decode("utf-8"))
|
|
except Exception:
|
|
body = {"ok": False, "error_code": exc.code, "description": "non_json_http_error"}
|
|
except Exception as exc:
|
|
return {
|
|
"attempted": True,
|
|
"httpStatus": None,
|
|
"ok": False,
|
|
"errorType": type(exc).__name__,
|
|
"secretValuesIncluded": False,
|
|
}
|
|
|
|
result = body.get("result") if isinstance(body, dict) else None
|
|
return {
|
|
"attempted": True,
|
|
"httpStatus": status,
|
|
"ok": bool(body.get("ok")) if isinstance(body, dict) else False,
|
|
"botIdPresent": isinstance(result, dict) and bool(result.get("id")),
|
|
"isBot": result.get("is_bot") if isinstance(result, dict) else None,
|
|
"username": result.get("username") if isinstance(result, dict) else None,
|
|
"firstName": result.get("first_name") if isinstance(result, dict) else None,
|
|
"canJoinGroups": result.get("can_join_groups") if isinstance(result, dict) else None,
|
|
"canReadAllGroupMessages": result.get("can_read_all_group_messages") if isinstance(result, dict) else None,
|
|
"supportsInlineQueries": result.get("supports_inline_queries") if isinstance(result, dict) else None,
|
|
"secretValuesIncluded": False,
|
|
}
|
|
|
|
|
|
def build_proof(args: argparse.Namespace) -> tuple[dict, int]:
|
|
repo_root = Path(args.repo_root).resolve()
|
|
secrets_dir = Path(args.secrets_dir)
|
|
config_path, config = load_agent_config(repo_root, args.agent)
|
|
token_path = token_path_for(secrets_dir, config.bot_token_file)
|
|
unit = f"teleo-agent@{args.agent}.service"
|
|
|
|
token_file_present = token_path.exists()
|
|
token_shape_valid = False
|
|
get_me = {"attempted": False, "ok": False, "secretValuesIncluded": False}
|
|
exact_blocker = None
|
|
token = None
|
|
|
|
if token_file_present:
|
|
token = token_path.read_text().strip()
|
|
token_shape_valid = bool(TOKEN_RE.match(token))
|
|
if not token_shape_valid:
|
|
exact_blocker = "telegram_token_shape_invalid"
|
|
else:
|
|
exact_blocker = "telegram_token_file_missing"
|
|
|
|
if token_file_present and token_shape_valid and not args.skip_getme:
|
|
get_me = telegram_get_me(token)
|
|
if not get_me.get("ok"):
|
|
exact_blocker = "telegram_getme_failed"
|
|
|
|
expected_username = config.handle.lstrip("@")
|
|
username_matches = (
|
|
bool(get_me.get("username"))
|
|
and get_me.get("username", "").lower() == expected_username.lower()
|
|
)
|
|
if get_me.get("attempted") and get_me.get("ok") and not username_matches:
|
|
exact_blocker = "telegram_getme_username_mismatch"
|
|
|
|
service = systemd_state(unit)
|
|
service_active = service["active"] == "active"
|
|
if args.require_service_active and not service_active:
|
|
exact_blocker = exact_blocker or "telegram_service_inactive"
|
|
|
|
ok = (
|
|
token_file_present
|
|
and token_shape_valid
|
|
and (args.skip_getme or (get_me.get("ok") and username_matches))
|
|
and (service_active or not args.require_service_active)
|
|
)
|
|
if args.require_token and not token_file_present:
|
|
ok = False
|
|
|
|
proof = {
|
|
"schema": "livingip.telegramLeoWalletTestRuntimeProof.v1",
|
|
"generatedAt": datetime.now(timezone.utc).isoformat(),
|
|
"ok": ok,
|
|
"requiredTier": "T3_live_readonly",
|
|
"currentTier": "T3_live_readonly" if ok else "T2_runtime",
|
|
"agent": args.agent,
|
|
"configPath": str(config_path),
|
|
"expectedHandle": config.handle,
|
|
"expectedUsername": expected_username,
|
|
"tokenPath": str(token_path),
|
|
"tokenFilePresent": token_file_present,
|
|
"tokenShapeValid": token_shape_valid,
|
|
"getMe": get_me,
|
|
"usernameMatchesExpected": username_matches if get_me.get("attempted") else None,
|
|
"service": service,
|
|
"secretValuesIncluded": False,
|
|
"exactBlocker": exact_blocker,
|
|
"notProven": [
|
|
"Telegram message delivery",
|
|
"Telegram reply delivery",
|
|
"Telegram-triggered x402 readback",
|
|
"Telegram-triggered paid execution",
|
|
],
|
|
"strongestClaimAllowed": (
|
|
"This verifier proves the disposable Leo wallet-test Telegram token identity and service state "
|
|
"after BotFather token installation. It does not send Telegram messages or prove x402 payment execution."
|
|
),
|
|
}
|
|
exit_code = 0 if ok or (exact_blocker == "telegram_token_file_missing" and not args.require_token) else 1
|
|
return proof, exit_code
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(sys.argv[1:] if argv is None else argv)
|
|
proof, exit_code = build_proof(args)
|
|
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(output)
|
|
print(output, end="")
|
|
return exit_code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except ValueError as exc:
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
raise SystemExit(1) from None
|