From d0a4f518d5b7b6ee554c746f1aefd73e59b66992 Mon Sep 17 00:00:00 2001 From: twentyOne2x Date: Mon, 22 Jun 2026 21:24:00 +0200 Subject: [PATCH] Add Leo Telegram smart research bridge (#11) --- ...-leo-x402-smart-research-bridge-proof.json | 23 ++++ ...telegram_leo_x402_smart_research_bridge.py | 91 +++++++++++++ telegram/agent_config.py | 19 +++ telegram/agents/leo-wallet-test.yaml | 7 + telegram/bot.py | 123 +++++++++++++++++- telegram/http_chat_proxy.py | 69 ++++++++++ tests/test_telegram_leo_x402_bridge.py | 45 ++++++- 7 files changed, 374 insertions(+), 3 deletions(-) create mode 100644 docs/reports/telegram-leo-x402-smart-research-bridge-proof.json create mode 100644 scripts/check_telegram_leo_x402_smart_research_bridge.py diff --git a/docs/reports/telegram-leo-x402-smart-research-bridge-proof.json b/docs/reports/telegram-leo-x402-smart-research-bridge-proof.json new file mode 100644 index 0000000..0a6549c --- /dev/null +++ b/docs/reports/telegram-leo-x402-smart-research-bridge-proof.json @@ -0,0 +1,23 @@ +{ + "currentTier": "T3_live_readonly", + "exactBlocker": "smart_research_paid_execution_not_allowed", + "fundsMoved": false, + "generatedAt": "2026-06-22T19:21:49.939563+00:00", + "httpStatus": 402, + "notProven": [ + "teleo-agent@leo-wallet-test.service active", + "Telegram message delivery", + "Telegram reply delivery", + "Telegram-triggered paid execution" + ], + "ok": true, + "paidPostAttempted": false, + "reply": "Leo smart research can select the retained AgentCash x402 research provider and query, but did not attempt payment because the call was not fully authorized.", + "requiredTier": "T3_live_readonly", + "routeSchema": "livingip.x402.leoSmartResearchResponse.v1", + "schema": "livingip.telegramLeoX402SmartResearchBridgeProof.v1", + "secretValuesIncluded": false, + "selectedProvider": "agentcash-stableenrich-exa-search", + "strongestClaimAllowed": "Telegram bridge helper can POST a no-secret smart-research payload to the public Leo research route and extract a usable fail-closed reply. This proves route shape and readback only; it does not prove a Telegram bot service is deployed or a paid Telegram message executed.", + "url": "https://leo.livingip.xyz/api/agents/leo/research" +} diff --git a/scripts/check_telegram_leo_x402_smart_research_bridge.py b/scripts/check_telegram_leo_x402_smart_research_bridge.py new file mode 100644 index 0000000..f6f73b9 --- /dev/null +++ b/scripts/check_telegram_leo_x402_smart_research_bridge.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +"""Prove the Telegram Leo bridge can consume the public smart-research route.""" + +# ruff: noqa: E402, I001 + +from __future__ import annotations + +import argparse +import asyncio +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +TELEGRAM_DIR = REPO_ROOT / "telegram" +sys.path.insert(0, str(TELEGRAM_DIR)) + +from http_chat_proxy import build_smart_research_proxy_payload, post_chat_proxy + + +DEFAULT_URL = "https://leo.livingip.xyz/api/agents/leo/research" +DEFAULT_OUTPUT = "docs/reports/telegram-leo-x402-smart-research-bridge-proof.json" + + +async def run_check(url: str, research_goal: str) -> dict: + payload = build_smart_research_proxy_payload( + research_goal=research_goal, + source="telegram-proof", + agent="leo", + chat_id=0, + message_id=0, + username="codex-proof", + allow_paid_execution=False, + max_amount_usd=0.01, + include_synthesis=True, + ) + status, body, reply = await post_chat_proxy(url=url, payload=payload, timeout_seconds=90) + funds_moved = bool(body.get("fundsMoved")) if isinstance(body, dict) else False + selected_provider = body.get("selectedProvider") if isinstance(body, dict) else None + exact_blocker = body.get("exactBlocker") if isinstance(body, dict) else None + return { + "schema": "livingip.telegramLeoX402SmartResearchBridgeProof.v1", + "generatedAt": datetime.now(timezone.utc).isoformat(), + "ok": bool(reply) and status in {200, 402} and not funds_moved, + "requiredTier": "T3_live_readonly", + "currentTier": body.get("currentTier", "T2_runtime") if isinstance(body, dict) else "T2_runtime", + "url": url, + "httpStatus": status, + "routeSchema": body.get("schema") if isinstance(body, dict) else None, + "selectedProvider": selected_provider, + "exactBlocker": exact_blocker, + "reply": reply, + "paidPostAttempted": bool(body.get("paidPostAttempted")) if isinstance(body, dict) else False, + "fundsMoved": funds_moved, + "secretValuesIncluded": False, + "strongestClaimAllowed": ( + "Telegram bridge helper can POST a no-secret smart-research payload to the public Leo " + "research route and extract a usable fail-closed reply. This proves route shape and " + "readback only; it does not prove a Telegram bot service is deployed or a paid Telegram " + "message executed." + ), + "notProven": [ + "teleo-agent@leo-wallet-test.service active", + "Telegram message delivery", + "Telegram reply delivery", + "Telegram-triggered paid execution", + ], + } + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--url", default=DEFAULT_URL) + parser.add_argument( + "--research-goal", + default="Find current public evidence on x402 agent payments and recommend what Living IP Leo should test next.", + ) + parser.add_argument("--output", default=DEFAULT_OUTPUT) + args = parser.parse_args() + + proof = asyncio.run(run_check(args.url, args.research_goal)) + output_path = REPO_ROOT / args.output + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(proof, indent=2, sort_keys=True) + "\n") + print(json.dumps(proof, indent=2, sort_keys=True)) + return 0 if proof["ok"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/telegram/agent_config.py b/telegram/agent_config.py index 51cf510..b49813e 100644 --- a/telegram/agent_config.py +++ b/telegram/agent_config.py @@ -44,8 +44,10 @@ class AgentConfig: max_tokens: int = 1024 max_response_per_user_per_hour: int = 30 http_chat_proxy_url: Optional[str] = None + http_research_proxy_url: Optional[str] = None respond_to_private_chats: bool = False mention_aliases: list[str] = field(default_factory=list) + smart_research_command_prefixes: list[str] = field(default_factory=list) def to_dict(self) -> dict: """Convert to dict for passing to build_system_prompt.""" @@ -59,8 +61,10 @@ class AgentConfig: "domain_expertise": self.domain_expertise, "pentagon_agent_id": self.pentagon_agent_id, "http_chat_proxy_url": self.http_chat_proxy_url, + "http_research_proxy_url": self.http_research_proxy_url, "respond_to_private_chats": self.respond_to_private_chats, "mention_aliases": self.mention_aliases, + "smart_research_command_prefixes": self.smart_research_command_prefixes, } @property @@ -112,10 +116,23 @@ def load_agent_config(config_path: str) -> AgentConfig: if parsed_proxy.scheme not in {"http", "https"} or not parsed_proxy.netloc: errors.append("http_chat_proxy_url must be an absolute http(s) URL") + research_proxy_url = raw.get("http_research_proxy_url") + if research_proxy_url: + parsed_research_proxy = urlparse(research_proxy_url) + if parsed_research_proxy.scheme not in {"http", "https"} or not parsed_research_proxy.netloc: + errors.append("http_research_proxy_url must be an absolute http(s) URL") + mention_aliases = raw.get("mention_aliases", []) if mention_aliases and not isinstance(mention_aliases, list): errors.append("mention_aliases must be a list") + smart_research_command_prefixes = raw.get("smart_research_command_prefixes", []) + if smart_research_command_prefixes and not isinstance(smart_research_command_prefixes, list): + errors.append("smart_research_command_prefixes must be a list") + for prefix in smart_research_command_prefixes or []: + if not isinstance(prefix, str) or not prefix.startswith("/"): + errors.append("smart_research_command_prefixes entries must start with /") + if errors: raise ValueError( f"Agent config validation failed ({config_path}):\n" @@ -140,8 +157,10 @@ def load_agent_config(config_path: str) -> AgentConfig: max_tokens=raw.get("max_tokens", 1024), max_response_per_user_per_hour=raw.get("max_response_per_user_per_hour", 30), http_chat_proxy_url=proxy_url, + http_research_proxy_url=research_proxy_url, respond_to_private_chats=bool(raw.get("respond_to_private_chats", False)), mention_aliases=mention_aliases, + smart_research_command_prefixes=smart_research_command_prefixes, ) diff --git a/telegram/agents/leo-wallet-test.yaml b/telegram/agents/leo-wallet-test.yaml index 62d9994..9fda504 100644 --- a/telegram/agents/leo-wallet-test.yaml +++ b/telegram/agents/leo-wallet-test.yaml @@ -16,6 +16,10 @@ domain_expertise: > paid research readbacks, and Telegram transport testing http_chat_proxy_url: "https://leo.livingip.xyz/api/agents/leo/chat" +http_research_proxy_url: "https://leo.livingip.xyz/api/agents/leo/research" +smart_research_command_prefixes: + - "/smart_research" + - "/paid_research" respond_to_private_chats: true kb_scope: @@ -36,6 +40,9 @@ voice_definition: | Report current public x402 receive capability, AgentCash paid-readback status, and exact blockers. Do not claim new Telegram-triggered payment execution unless the hosted Leo HTTP route returns retained payment/readback evidence. + Use /smart_research for no-spend smart research planning/readback. Paid + smart research remains fail-closed unless the server-side allow flag, allowed + chat id, cap, and retained approval-ref file are all present. Do not request or expose private keys, bot tokens, wallet exports, seed phrases, or raw secret values. diff --git a/telegram/bot.py b/telegram/bot.py index b7e9c86..09c8711 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -50,7 +50,13 @@ from retrieval import orchestrate_retrieval from market_data import get_token_price, format_price_context from worktree_lock import main_worktree_lock from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining -from http_chat_proxy import build_chat_proxy_payload, post_chat_proxy +from http_chat_proxy import ( + DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES, + build_chat_proxy_payload, + build_smart_research_proxy_payload, + extract_smart_research_goal, + post_chat_proxy, +) # ─── Config ───────────────────────────────────────────────────────────── @@ -83,8 +89,10 @@ AGENT_DOMAIN_EXPERTISE = ( "futarchy, prediction markets, token governance, and the MetaDAO ecosystem" ) AGENT_HTTP_CHAT_PROXY_URL: str | None = None +AGENT_HTTP_RESEARCH_PROXY_URL: str | None = None AGENT_RESPOND_TO_PRIVATE_CHATS = False AGENT_MENTION_ALIASES = ["@teleo", "@FutAIrdBot"] +AGENT_SMART_RESEARCH_COMMAND_PREFIXES = list(DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES) # Rate limits MAX_RESPONSE_PER_USER_PER_HOUR = 30 @@ -610,6 +618,40 @@ def sanitize_message(text: str) -> str: return text[:2000] +def _smart_research_payment_gate(chat_id: int) -> dict: + """Return paid smart-research fields only when all server-side gates pass.""" + if os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID") != "1": + return {"allow_paid_execution": False} + + allowed_chat_id = os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID", "").strip() + if not allowed_chat_id or allowed_chat_id != str(chat_id): + return {"allow_paid_execution": False} + + try: + max_amount_usd = float(os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD", "0.01")) + except ValueError: + return {"allow_paid_execution": False} + if max_amount_usd <= 0 or max_amount_usd > 0.01: + return {"allow_paid_execution": False} + + approval_ref_file = os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE", "").strip() + if not approval_ref_file: + return {"allow_paid_execution": False} + + try: + approval_ref = Path(approval_ref_file).read_text().strip() + except OSError: + return {"allow_paid_execution": False} + if not approval_ref: + return {"allow_paid_execution": False} + + return { + "allow_paid_execution": True, + "approval_ref": approval_ref, + "max_amount_usd": max_amount_usd, + } + + def _git_commit_archive(archive_path, filename: str): """Commit archived source to git so it survives git clean. (Rio review: data loss bug)""" import subprocess @@ -1009,6 +1051,77 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100]) + smart_research_goal = None + if AGENT_HTTP_RESEARCH_PROXY_URL: + smart_research_goal = extract_smart_research_goal( + text, + tuple(AGENT_SMART_RESEARCH_COMMAND_PREFIXES), + ) + if AGENT_HTTP_RESEARCH_PROXY_URL and smart_research_goal: + await msg.chat.send_action("typing") + payment_gate = _smart_research_payment_gate(msg.chat_id) + payload = build_smart_research_proxy_payload( + research_goal=smart_research_goal, + source="telegram", + agent=AGENT_NAME.lower(), + chat_id=msg.chat_id, + message_id=msg.message_id, + username=user.username if user else None, + include_synthesis=True, + **payment_gate, + ) + try: + status, proxy_body, proxy_reply = await post_chat_proxy( + url=AGENT_HTTP_RESEARCH_PROXY_URL, + payload=payload, + timeout_seconds=90, + ) + except Exception as e: + logger.warning("%s HTTP smart research proxy failed: %s", AGENT_NAME, e) + await msg.reply_text( + f"{AGENT_NAME}'s smart research route is temporarily unavailable. " + "Try again after the service recovers.", + do_quote=True, + ) + return + + if not proxy_reply: + logger.warning("%s HTTP smart research proxy returned no reply (status=%s)", AGENT_NAME, status) + await msg.reply_text( + f"{AGENT_NAME}'s smart research route returned no usable reply. " + "The Telegram bridge is fail-closed.", + do_quote=True, + ) + return + + if len(proxy_reply) <= 4096: + await msg.reply_text(proxy_reply, do_quote=True) + else: + first = True + remaining = proxy_reply + while remaining: + chunk = remaining[:4096] + await msg.reply_text(chunk, quote=first) + first = False + remaining = remaining[4096:] + + if user: + username = user.username or "anonymous" + key = (msg.chat_id, user.id) + unanswered_count[key] = 0 + entry = {"user": text[:500], "bot": proxy_reply[:500], "username": username} + history = conversation_history.setdefault(key, []) + history.append(entry) + if len(history) > MAX_HISTORY_USER: + history.pop(0) + chat_key = (msg.chat_id, 0) + chat_history = conversation_history.setdefault(chat_key, []) + chat_history.append(entry) + if len(chat_history) > MAX_HISTORY_CHAT: + chat_history.pop(0) + user_response_times[user.id].append(time.time()) + return + if AGENT_HTTP_CHAT_PROXY_URL: await msg.chat.send_action("typing") payload = build_chat_proxy_payload( @@ -2024,7 +2137,8 @@ def _load_agent_config(config_path: str): global BOT_TOKEN_FILE, RESPONSE_MODEL, TRIAGE_MODEL, AGENT_KB_SCOPE global LEARNINGS_FILE, MAX_RESPONSE_PER_USER_PER_HOUR global AGENT_NAME, AGENT_HANDLE, AGENT_X_HANDLE, AGENT_DOMAIN_EXPERTISE - global AGENT_HTTP_CHAT_PROXY_URL, AGENT_RESPOND_TO_PRIVATE_CHATS, AGENT_MENTION_ALIASES + global AGENT_HTTP_CHAT_PROXY_URL, AGENT_HTTP_RESEARCH_PROXY_URL + global AGENT_RESPOND_TO_PRIVATE_CHATS, AGENT_MENTION_ALIASES, AGENT_SMART_RESEARCH_COMMAND_PREFIXES with open(config_path) as f: cfg = yaml.safe_load(f) @@ -2034,9 +2148,14 @@ def _load_agent_config(config_path: str): AGENT_X_HANDLE = cfg.get("x_handle", AGENT_X_HANDLE) AGENT_DOMAIN_EXPERTISE = cfg.get("domain_expertise", AGENT_DOMAIN_EXPERTISE) AGENT_HTTP_CHAT_PROXY_URL = cfg.get("http_chat_proxy_url") + AGENT_HTTP_RESEARCH_PROXY_URL = cfg.get("http_research_proxy_url") AGENT_RESPOND_TO_PRIVATE_CHATS = bool(cfg.get("respond_to_private_chats", False)) aliases = [AGENT_HANDLE, *cfg.get("mention_aliases", [])] AGENT_MENTION_ALIASES = sorted({alias for alias in aliases if alias}) + AGENT_SMART_RESEARCH_COMMAND_PREFIXES = cfg.get( + "smart_research_command_prefixes", + list(DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES), + ) if cfg.get("bot_token_file"): BOT_TOKEN_FILE = f"/opt/teleo-eval/secrets/{cfg['bot_token_file']}" diff --git a/telegram/http_chat_proxy.py b/telegram/http_chat_proxy.py index 4bb17e2..2d90d33 100644 --- a/telegram/http_chat_proxy.py +++ b/telegram/http_chat_proxy.py @@ -2,8 +2,11 @@ from __future__ import annotations +import re from typing import Any +DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES = ("/smart_research", "/paid_research") + def build_chat_proxy_payload( *, @@ -28,6 +31,57 @@ def build_chat_proxy_payload( } +def extract_smart_research_goal( + message: str, + command_prefixes: tuple[str, ...] | list[str] = DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES, +) -> str | None: + """Return the research goal when a Telegram message opts into smart research.""" + text = message.strip() + for prefix in command_prefixes: + command = re.escape(prefix.lstrip("/")) + match = re.match(rf"^(?:@\w+\s+)?/{command}(?:@\w+)?(?:\s+(?P.+))?$", text, re.IGNORECASE) + if match: + goal = (match.group("goal") or "").strip() + return goal or None + return None + + +def build_smart_research_proxy_payload( + *, + research_goal: str, + source: str, + agent: str, + chat_id: int | None = None, + message_id: int | None = None, + username: str | None = None, + allow_paid_execution: bool = False, + approval_ref: str | None = None, + max_amount_usd: float | None = None, + include_synthesis: bool = True, +) -> dict[str, Any]: + """Build the no-secret Telegram payload for Leo smart research.""" + payload = build_chat_proxy_payload( + message=research_goal, + source=source, + agent=agent, + chat_id=chat_id, + message_id=message_id, + username=username, + ) + payload.update( + { + "research_goal": research_goal, + "allow_paid_execution": bool(allow_paid_execution), + "include_synthesis": bool(include_synthesis), + } + ) + if max_amount_usd is not None: + payload["max_amount_usd"] = max_amount_usd + if allow_paid_execution and approval_ref: + payload["approval_ref"] = approval_ref + return payload + + def extract_chat_proxy_reply(payload: dict[str, Any]) -> str | None: """Extract a reply from supported Living IP Leo chat response shapes.""" if not isinstance(payload, dict): @@ -54,6 +108,21 @@ def extract_chat_proxy_reply(payload: dict[str, Any]) -> str | None: if isinstance(llm_decision_reply, str) and llm_decision_reply.strip(): return llm_decision_reply.strip() + synthesis = payload.get("synthesis") + if isinstance(synthesis, dict): + synthesis_reply = synthesis.get("reply") + if isinstance(synthesis_reply, str) and synthesis_reply.strip(): + return synthesis_reply.strip() + synthesis_decision = synthesis.get("decision") + if isinstance(synthesis_decision, dict): + synthesis_decision_reply = synthesis_decision.get("reply") + if isinstance(synthesis_decision_reply, str) and synthesis_decision_reply.strip(): + return synthesis_decision_reply.strip() + + strongest_claim = payload.get("strongestClaimAllowed") + if isinstance(strongest_claim, str) and strongest_claim.strip(): + return strongest_claim.strip() + return None diff --git a/tests/test_telegram_leo_x402_bridge.py b/tests/test_telegram_leo_x402_bridge.py index 6560ea3..fb63df7 100644 --- a/tests/test_telegram_leo_x402_bridge.py +++ b/tests/test_telegram_leo_x402_bridge.py @@ -10,7 +10,12 @@ TELEGRAM_DIR = REPO_ROOT / "telegram" sys.path.insert(0, str(TELEGRAM_DIR)) from agent_config import load_agent_config # noqa: E402 -from http_chat_proxy import build_chat_proxy_payload, extract_chat_proxy_reply # noqa: E402 +from http_chat_proxy import ( # noqa: E402 + build_chat_proxy_payload, + build_smart_research_proxy_payload, + extract_chat_proxy_reply, + extract_smart_research_goal, +) def test_leo_config_opts_into_http_chat_proxy_without_changing_default_agents(): @@ -24,6 +29,8 @@ def test_leo_config_opts_into_http_chat_proxy_without_changing_default_agents(): assert "@teLEOhuman" in leo.mention_aliases assert leo_wallet_test.name == "Leo Wallet Test" assert leo_wallet_test.http_chat_proxy_url == "https://leo.livingip.xyz/api/agents/leo/chat" + assert leo_wallet_test.http_research_proxy_url == "https://leo.livingip.xyz/api/agents/leo/research" + assert "/smart_research" in leo_wallet_test.smart_research_command_prefixes assert leo_wallet_test.respond_to_private_chats is True assert leo_wallet_test.bot_token_file == "leo-wallet-test-telegram-bot-token" assert "@lipleowallet06222026bot" in leo_wallet_test.mention_aliases @@ -77,12 +84,48 @@ def test_proxy_payload_contains_no_secret_material(): assert "secret" not in str(payload).lower() +def test_smart_research_payload_is_no_spend_by_default(): + payload = build_smart_research_proxy_payload( + research_goal="Find x402 evidence", + source="telegram", + agent="leo", + chat_id=123, + message_id=456, + username="tester", + max_amount_usd=0.01, + ) + + assert payload["message"] == "Find x402 evidence" + assert payload["research_goal"] == "Find x402 evidence" + assert payload["allow_paid_execution"] is False + assert payload["max_amount_usd"] == 0.01 + assert "approval_ref" not in payload + assert "token" not in str(payload).lower() + assert "secret" not in str(payload).lower() + + +@pytest.mark.parametrize( + ("message", "expected"), + [ + ("/smart_research find x402 evidence", "find x402 evidence"), + ("@lipleowallet06222026bot /smart_research find x402 evidence", "find x402 evidence"), + ("/paid_research@lipleowallet06222026bot find x402 evidence", "find x402 evidence"), + ("can Leo use x402 paid research now?", None), + ("/smart_research", None), + ], +) +def test_extract_smart_research_goal(message, expected): + assert extract_smart_research_goal(message) == expected + + @pytest.mark.parametrize( ("payload", "expected"), [ ({"reply": "public route reply"}, "public route reply"), ({"decision": {"reply": "analysis route reply"}}, "analysis route reply"), ({"llm": {"decision": {"reply": "nested decision reply"}}}, "nested decision reply"), + ({"synthesis": {"decision": {"reply": "smart research reply"}}}, "smart research reply"), + ({"strongestClaimAllowed": "fail-closed smart research readback"}, "fail-closed smart research readback"), ], ) def test_extract_chat_proxy_reply_accepts_retained_leo_shapes(payload, expected):