teleo-infrastructure/scripts/check_telegram_leo_x402_bridge.py
2026-06-19 19:28:04 +02:00

88 lines
2.9 KiB
Python

#!/usr/bin/env python3
"""Prove the Telegram Leo bridge can consume the public Leo HTTP chat route."""
# ruff: noqa: E402, I001
from __future__ import annotations
import argparse
import asyncio
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
TELEGRAM_DIR = REPO_ROOT / "telegram"
sys.path.insert(0, str(TELEGRAM_DIR))
from http_chat_proxy import build_chat_proxy_payload, post_chat_proxy
DEFAULT_URL = "https://leo.livingip.xyz/api/agents/leo/chat"
DEFAULT_OUTPUT = "docs/reports/telegram-leo-x402-bridge-proof.json"
async def run_check(url: str, message: str) -> dict:
payload = build_chat_proxy_payload(
message=message,
source="telegram-proof",
agent="leo",
chat_id=0,
message_id=0,
username="codex-proof",
)
status, body, reply = await post_chat_proxy(url=url, payload=payload)
return {
"schema": "livingip.telegramLeoX402BridgeProof.v1",
"generatedAt": datetime.now(timezone.utc).isoformat(),
"ok": bool(reply),
"requiredTier": "T3_live_readonly",
"currentTier": "T3_live_readonly" if reply else "T2_runtime",
"url": url,
"httpStatus": status,
"routeSchema": body.get("schema") if isinstance(body, dict) else None,
"agent": body.get("agent") if isinstance(body, dict) else None,
"llmOk": (
body.get("llmOk")
if isinstance(body, dict) and "llmOk" in body
else body.get("llm", {}).get("ok")
if isinstance(body.get("llm"), dict)
else None
),
"reply": reply,
"secretValuesIncluded": False,
"strongestClaimAllowed": (
"Telegram bridge helper can POST a no-secret payload to the public Leo HTTP chat route "
"and extract a usable Leo reply. This proves the bridge parser/readback only; it does "
"not prove the Telegram bot service is deployed or active."
),
"notProven": [
"teleo-agent@leo.service active",
"Telegram message delivery",
"Telegram reply delivery",
"new payment execution",
],
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--url", default=DEFAULT_URL)
parser.add_argument(
"--message",
default="Telegram bridge proof: reply with one sentence confirming this reached Leo HTTP.",
)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
proof = asyncio.run(run_check(args.url, args.message))
output_path = REPO_ROOT / args.output
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n")
print(json.dumps(proof, indent=2, sort_keys=True))
return 0 if proof["ok"] else 1
if __name__ == "__main__":
raise SystemExit(main())