Add Leo wallet-test Telegram runtime verifier (#14)
This commit is contained in:
parent
595f977a94
commit
754f5aeee7
2 changed files with 338 additions and 0 deletions
228
scripts/check_telegram_leo_wallet_test_runtime.py
Normal file
228
scripts/check_telegram_leo_wallet_test_runtime.py
Normal file
|
|
@ -0,0 +1,228 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Verify the disposable Leo wallet-test Telegram runtime without leaking tokens."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
TOKEN_RE = re.compile(r"^\d{6,12}:[A-Za-z0-9_-]{25,}$")
|
||||||
|
|
||||||
|
|
||||||
|
def repo_root_from_script() -> Path:
|
||||||
|
return Path(__file__).resolve().parents[1]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Check the Leo wallet-test Telegram bot token, getMe identity, and service state.",
|
||||||
|
allow_abbrev=False,
|
||||||
|
)
|
||||||
|
parser.add_argument("--agent", default="leo-wallet-test", help="telegram/agents/<agent>.yaml")
|
||||||
|
parser.add_argument("--repo-root", default=str(repo_root_from_script()))
|
||||||
|
parser.add_argument("--secrets-dir", default="/opt/teleo-eval/secrets")
|
||||||
|
parser.add_argument("--skip-getme", action="store_true", help="Do not call Telegram getMe")
|
||||||
|
parser.add_argument("--require-token", action="store_true", help="Exit nonzero when token file is missing")
|
||||||
|
parser.add_argument("--require-service-active", action="store_true", help="Exit nonzero unless systemd says active")
|
||||||
|
parser.add_argument("--output", default="docs/reports/telegram-leo-wallet-test-runtime-proof.json")
|
||||||
|
|
||||||
|
namespace, unknown = parser.parse_known_args(argv)
|
||||||
|
if unknown:
|
||||||
|
print(
|
||||||
|
"ERROR: Unsupported arguments were provided. Secret-bearing CLI args are not accepted.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
raise SystemExit(2)
|
||||||
|
return namespace
|
||||||
|
|
||||||
|
|
||||||
|
def load_agent_config(repo_root: Path, agent: str):
|
||||||
|
telegram_dir = repo_root / "telegram"
|
||||||
|
sys.path.insert(0, str(telegram_dir))
|
||||||
|
from agent_config import load_agent_config as load_config
|
||||||
|
|
||||||
|
config_path = telegram_dir / "agents" / f"{agent}.yaml"
|
||||||
|
config = load_config(str(config_path))
|
||||||
|
return config_path, config
|
||||||
|
|
||||||
|
|
||||||
|
def token_path_for(secrets_dir: Path, bot_token_file: str) -> Path:
|
||||||
|
token_file = Path(bot_token_file)
|
||||||
|
if token_file.name != bot_token_file or token_file.name in {"", ".", ".."}:
|
||||||
|
raise ValueError("bot_token_file must be a plain filename")
|
||||||
|
return secrets_dir / token_file.name
|
||||||
|
|
||||||
|
|
||||||
|
def run_command(command: list[str]) -> dict:
|
||||||
|
try:
|
||||||
|
proc = subprocess.run(command, check=False, text=True, capture_output=True)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return {
|
||||||
|
"command": command,
|
||||||
|
"returncode": 127,
|
||||||
|
"stdout": "",
|
||||||
|
"stderr": "command_unavailable",
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"command": command,
|
||||||
|
"returncode": proc.returncode,
|
||||||
|
"stdout": proc.stdout.strip(),
|
||||||
|
"stderr": proc.stderr.strip(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def systemd_state(unit: str) -> dict:
|
||||||
|
active = run_command(["systemctl", "is-active", unit])
|
||||||
|
enabled = run_command(["systemctl", "is-enabled", unit])
|
||||||
|
return {
|
||||||
|
"unit": unit,
|
||||||
|
"active": active["stdout"] or "unknown",
|
||||||
|
"activeReturncode": active["returncode"],
|
||||||
|
"enabled": enabled["stdout"] or "unknown",
|
||||||
|
"enabledReturncode": enabled["returncode"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def telegram_get_me(token: str, *, timeout_seconds: int = 20) -> dict:
|
||||||
|
url = f"https://api.telegram.org/bot{token}/getMe"
|
||||||
|
request = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
|
||||||
|
status = response.status
|
||||||
|
body = json.loads(response.read().decode("utf-8"))
|
||||||
|
except urllib.error.HTTPError as exc:
|
||||||
|
status = exc.code
|
||||||
|
try:
|
||||||
|
body = json.loads(exc.read().decode("utf-8"))
|
||||||
|
except Exception:
|
||||||
|
body = {"ok": False, "error_code": exc.code, "description": "non_json_http_error"}
|
||||||
|
except Exception as exc:
|
||||||
|
return {
|
||||||
|
"attempted": True,
|
||||||
|
"httpStatus": None,
|
||||||
|
"ok": False,
|
||||||
|
"errorType": type(exc).__name__,
|
||||||
|
"secretValuesIncluded": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
result = body.get("result") if isinstance(body, dict) else None
|
||||||
|
return {
|
||||||
|
"attempted": True,
|
||||||
|
"httpStatus": status,
|
||||||
|
"ok": bool(body.get("ok")) if isinstance(body, dict) else False,
|
||||||
|
"botIdPresent": isinstance(result, dict) and bool(result.get("id")),
|
||||||
|
"isBot": result.get("is_bot") if isinstance(result, dict) else None,
|
||||||
|
"username": result.get("username") if isinstance(result, dict) else None,
|
||||||
|
"firstName": result.get("first_name") if isinstance(result, dict) else None,
|
||||||
|
"canJoinGroups": result.get("can_join_groups") if isinstance(result, dict) else None,
|
||||||
|
"canReadAllGroupMessages": result.get("can_read_all_group_messages") if isinstance(result, dict) else None,
|
||||||
|
"supportsInlineQueries": result.get("supports_inline_queries") if isinstance(result, dict) else None,
|
||||||
|
"secretValuesIncluded": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def build_proof(args: argparse.Namespace) -> tuple[dict, int]:
|
||||||
|
repo_root = Path(args.repo_root).resolve()
|
||||||
|
secrets_dir = Path(args.secrets_dir)
|
||||||
|
config_path, config = load_agent_config(repo_root, args.agent)
|
||||||
|
token_path = token_path_for(secrets_dir, config.bot_token_file)
|
||||||
|
unit = f"teleo-agent@{args.agent}.service"
|
||||||
|
|
||||||
|
token_file_present = token_path.exists()
|
||||||
|
token_shape_valid = False
|
||||||
|
get_me = {"attempted": False, "ok": False, "secretValuesIncluded": False}
|
||||||
|
exact_blocker = None
|
||||||
|
token = None
|
||||||
|
|
||||||
|
if token_file_present:
|
||||||
|
token = token_path.read_text().strip()
|
||||||
|
token_shape_valid = bool(TOKEN_RE.match(token))
|
||||||
|
if not token_shape_valid:
|
||||||
|
exact_blocker = "telegram_token_shape_invalid"
|
||||||
|
else:
|
||||||
|
exact_blocker = "telegram_token_file_missing"
|
||||||
|
|
||||||
|
if token_file_present and token_shape_valid and not args.skip_getme:
|
||||||
|
get_me = telegram_get_me(token)
|
||||||
|
if not get_me.get("ok"):
|
||||||
|
exact_blocker = "telegram_getme_failed"
|
||||||
|
|
||||||
|
expected_username = config.handle.lstrip("@")
|
||||||
|
username_matches = (
|
||||||
|
bool(get_me.get("username"))
|
||||||
|
and get_me.get("username", "").lower() == expected_username.lower()
|
||||||
|
)
|
||||||
|
if get_me.get("attempted") and get_me.get("ok") and not username_matches:
|
||||||
|
exact_blocker = "telegram_getme_username_mismatch"
|
||||||
|
|
||||||
|
service = systemd_state(unit)
|
||||||
|
service_active = service["active"] == "active"
|
||||||
|
if args.require_service_active and not service_active:
|
||||||
|
exact_blocker = exact_blocker or "telegram_service_inactive"
|
||||||
|
|
||||||
|
ok = (
|
||||||
|
token_file_present
|
||||||
|
and token_shape_valid
|
||||||
|
and (args.skip_getme or (get_me.get("ok") and username_matches))
|
||||||
|
and (service_active or not args.require_service_active)
|
||||||
|
)
|
||||||
|
if args.require_token and not token_file_present:
|
||||||
|
ok = False
|
||||||
|
|
||||||
|
proof = {
|
||||||
|
"schema": "livingip.telegramLeoWalletTestRuntimeProof.v1",
|
||||||
|
"generatedAt": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"ok": ok,
|
||||||
|
"requiredTier": "T3_live_readonly",
|
||||||
|
"currentTier": "T3_live_readonly" if ok else "T2_runtime",
|
||||||
|
"agent": args.agent,
|
||||||
|
"configPath": str(config_path),
|
||||||
|
"expectedHandle": config.handle,
|
||||||
|
"expectedUsername": expected_username,
|
||||||
|
"tokenPath": str(token_path),
|
||||||
|
"tokenFilePresent": token_file_present,
|
||||||
|
"tokenShapeValid": token_shape_valid,
|
||||||
|
"getMe": get_me,
|
||||||
|
"usernameMatchesExpected": username_matches if get_me.get("attempted") else None,
|
||||||
|
"service": service,
|
||||||
|
"secretValuesIncluded": False,
|
||||||
|
"exactBlocker": exact_blocker,
|
||||||
|
"notProven": [
|
||||||
|
"Telegram message delivery",
|
||||||
|
"Telegram reply delivery",
|
||||||
|
"Telegram-triggered x402 readback",
|
||||||
|
"Telegram-triggered paid execution",
|
||||||
|
],
|
||||||
|
"strongestClaimAllowed": (
|
||||||
|
"This verifier proves the disposable Leo wallet-test Telegram token identity and service state "
|
||||||
|
"after BotFather token installation. It does not send Telegram messages or prove x402 payment execution."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
exit_code = 0 if ok or (exact_blocker == "telegram_token_file_missing" and not args.require_token) else 1
|
||||||
|
return proof, exit_code
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
args = parse_args(sys.argv[1:] if argv is None else argv)
|
||||||
|
proof, exit_code = build_proof(args)
|
||||||
|
output = json.dumps(proof, indent=2, sort_keys=True) + "\n"
|
||||||
|
output_path = Path(args.output)
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
output_path.write_text(output)
|
||||||
|
print(output, end="")
|
||||||
|
return exit_code
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
raise SystemExit(main())
|
||||||
|
except ValueError as exc:
|
||||||
|
print(f"ERROR: {exc}", file=sys.stderr)
|
||||||
|
raise SystemExit(1) from None
|
||||||
110
tests/test_telegram_leo_wallet_test_runtime.py
Normal file
110
tests/test_telegram_leo_wallet_test_runtime.py
Normal file
|
|
@ -0,0 +1,110 @@
|
||||||
|
"""Tests for the Leo wallet-test Telegram runtime verifier."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
SCRIPT = REPO_ROOT / "scripts" / "check_telegram_leo_wallet_test_runtime.py"
|
||||||
|
|
||||||
|
|
||||||
|
def run_checker(args: list[str]) -> subprocess.CompletedProcess:
|
||||||
|
return subprocess.run(
|
||||||
|
[sys.executable, str(SCRIPT), *args],
|
||||||
|
text=True,
|
||||||
|
capture_output=True,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_missing_token_writes_sanitized_blocker(tmp_path):
|
||||||
|
proof_path = tmp_path / "proof.json"
|
||||||
|
proc = run_checker(
|
||||||
|
[
|
||||||
|
"--agent",
|
||||||
|
"leo-wallet-test",
|
||||||
|
"--repo-root",
|
||||||
|
str(REPO_ROOT),
|
||||||
|
"--secrets-dir",
|
||||||
|
str(tmp_path / "secrets"),
|
||||||
|
"--skip-getme",
|
||||||
|
"--output",
|
||||||
|
str(proof_path),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert proc.returncode == 0, proc.stderr
|
||||||
|
proof = json.loads(proof_path.read_text())
|
||||||
|
assert proof["ok"] is False
|
||||||
|
assert proof["exactBlocker"] == "telegram_token_file_missing"
|
||||||
|
assert proof["tokenFilePresent"] is False
|
||||||
|
assert proof["secretValuesIncluded"] is False
|
||||||
|
assert "secretValuesIncluded" in proc.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_token_shape_fails_without_printing_token(tmp_path):
|
||||||
|
secrets_dir = tmp_path / "secrets"
|
||||||
|
secrets_dir.mkdir()
|
||||||
|
token_path = secrets_dir / "leo-wallet-test-telegram-bot-token"
|
||||||
|
token = "not-a-valid-token"
|
||||||
|
token_path.write_text(token)
|
||||||
|
proof_path = tmp_path / "proof.json"
|
||||||
|
proc = run_checker(
|
||||||
|
[
|
||||||
|
"--agent",
|
||||||
|
"leo-wallet-test",
|
||||||
|
"--repo-root",
|
||||||
|
str(REPO_ROOT),
|
||||||
|
"--secrets-dir",
|
||||||
|
str(secrets_dir),
|
||||||
|
"--skip-getme",
|
||||||
|
"--require-token",
|
||||||
|
"--output",
|
||||||
|
str(proof_path),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert proc.returncode == 1
|
||||||
|
assert token not in proc.stdout
|
||||||
|
assert token not in proc.stderr
|
||||||
|
proof = json.loads(proof_path.read_text())
|
||||||
|
assert proof["exactBlocker"] == "telegram_token_shape_invalid"
|
||||||
|
assert proof["tokenFilePresent"] is True
|
||||||
|
assert proof["tokenShapeValid"] is False
|
||||||
|
assert proof["secretValuesIncluded"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_getme_result_is_sanitized_and_matches_expected_username():
|
||||||
|
module_dir = str(SCRIPT.parent)
|
||||||
|
sys.path.insert(0, module_dir)
|
||||||
|
import check_telegram_leo_wallet_test_runtime as checker
|
||||||
|
|
||||||
|
token = "dummy-token-value"
|
||||||
|
response_body = {
|
||||||
|
"ok": True,
|
||||||
|
"result": {
|
||||||
|
"id": 123456789,
|
||||||
|
"is_bot": True,
|
||||||
|
"first_name": "Living IP Leo Wallet Test",
|
||||||
|
"username": "lipleowallet06222026bot",
|
||||||
|
"can_join_groups": True,
|
||||||
|
"can_read_all_group_messages": False,
|
||||||
|
"supports_inline_queries": False,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
response = MagicMock()
|
||||||
|
response.status = 200
|
||||||
|
response.read.return_value = json.dumps(response_body).encode("utf-8")
|
||||||
|
response.__enter__.return_value = response
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen", return_value=response):
|
||||||
|
result = checker.telegram_get_me(token)
|
||||||
|
|
||||||
|
serialized = json.dumps(result)
|
||||||
|
assert result["ok"] is True
|
||||||
|
assert result["username"] == "lipleowallet06222026bot"
|
||||||
|
assert result["botIdPresent"] is True
|
||||||
|
assert result["secretValuesIncluded"] is False
|
||||||
|
assert token not in serialized
|
||||||
Loading…
Reference in a new issue