From bfc28e084b00d78502eda37ee92816b2703ff8a2 Mon Sep 17 00:00:00 2001 From: twentyOne2x Date: Tue, 23 Jun 2026 18:37:33 +0200 Subject: [PATCH] Wire Leo Telegram x402 smart research (#17) * Wire Leo Telegram x402 smart research * Suppress token-bearing Telegram HTTP logs * Keep Telegram typing visible during Leo proxy calls * Allow Leo Telegram social research spend cap * Route contextual Leo research prompts to smart research * Generalize Leo smart research intent routing * Resume Leo smart research from paid work orders --- .../install_telegram_smart_research_gates.py | 5 +- telegram/agent_config.py | 3 + telegram/agents/leo-wallet-test.yaml | 9 +- telegram/bot.py | 239 +++++++++++++---- telegram/http_chat_proxy.py | 108 +++++++- telegram/market_data.py | 253 ++++++++++++++++-- ...t_install_telegram_smart_research_gates.py | 4 + tests/test_telegram_leo_x402_bridge.py | 165 +++++++++++- 8 files changed, 692 insertions(+), 94 deletions(-) diff --git a/scripts/install_telegram_smart_research_gates.py b/scripts/install_telegram_smart_research_gates.py index 164e789..10c891f 100644 --- a/scripts/install_telegram_smart_research_gates.py +++ b/scripts/install_telegram_smart_research_gates.py @@ -17,6 +17,7 @@ from pathlib import Path APPROVAL_REF_RE = re.compile(r"^[A-Za-z0-9._:@/-]{8,256}$") CHAT_ID_RE = re.compile(r"^-?\d+$") +MAX_SMART_RESEARCH_USD = 0.06 def parse_args(argv: list[str]) -> argparse.Namespace: @@ -66,8 +67,8 @@ def validate_max_usd(value: str) -> str: parsed = float(value) except ValueError: raise ValueError("max-usd must be numeric") from None - if parsed <= 0 or parsed > 0.01: - raise ValueError("max-usd must be greater than 0 and no more than 0.01") + if parsed <= 0 or parsed > MAX_SMART_RESEARCH_USD: + raise ValueError(f"max-usd must be greater than 0 and no more than {MAX_SMART_RESEARCH_USD:.2f}") return f"{parsed:.2f}" diff --git a/telegram/agent_config.py b/telegram/agent_config.py index b49813e..22e4238 100644 --- a/telegram/agent_config.py +++ b/telegram/agent_config.py @@ -48,6 +48,7 @@ class AgentConfig: respond_to_private_chats: bool = False mention_aliases: list[str] = field(default_factory=list) smart_research_command_prefixes: list[str] = field(default_factory=list) + auto_smart_research_from_chat: bool = False def to_dict(self) -> dict: """Convert to dict for passing to build_system_prompt.""" @@ -65,6 +66,7 @@ class AgentConfig: "respond_to_private_chats": self.respond_to_private_chats, "mention_aliases": self.mention_aliases, "smart_research_command_prefixes": self.smart_research_command_prefixes, + "auto_smart_research_from_chat": self.auto_smart_research_from_chat, } @property @@ -161,6 +163,7 @@ def load_agent_config(config_path: str) -> AgentConfig: respond_to_private_chats=bool(raw.get("respond_to_private_chats", False)), mention_aliases=mention_aliases, smart_research_command_prefixes=smart_research_command_prefixes, + auto_smart_research_from_chat=bool(raw.get("auto_smart_research_from_chat", False)), ) diff --git a/telegram/agents/leo-wallet-test.yaml b/telegram/agents/leo-wallet-test.yaml index 8dc7fb7..910acf8 100644 --- a/telegram/agents/leo-wallet-test.yaml +++ b/telegram/agents/leo-wallet-test.yaml @@ -20,6 +20,7 @@ http_research_proxy_url: "https://leo.livingip.xyz/api/agents/leo/research" smart_research_command_prefixes: - "/smart_research" - "/paid_research" +auto_smart_research_from_chat: true respond_to_private_chats: true kb_scope: @@ -40,9 +41,11 @@ voice_definition: | Report current public x402 receive capability, AgentCash paid-readback status, and exact blockers. Do not claim new Telegram-triggered payment execution unless the hosted Leo HTTP route returns retained payment/readback evidence. - Use /smart_research for no-spend smart research planning/readback. Paid - smart research remains fail-closed unless the server-side allow flag, allowed - chat id, cap, and retained approval-ref file are all present. + Ordinary addressed/private chat may be routed into smart research when the + request clearly asks for sourced, current, market, or evidence-backed work. + Explicit /smart_research remains available for narrow canaries. Paid smart + research remains fail-closed unless the server-side allow flag, allowed chat + id, cap, and retained approval-ref file are all present. Do not request or expose private keys, bot tokens, wallet exports, seed phrases, or raw secret values. diff --git a/telegram/bot.py b/telegram/bot.py index 5fc7d09..98e3ad4 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -29,6 +29,7 @@ import time import yaml from collections import defaultdict from datetime import datetime, timezone +from html import escape from pathlib import Path # Add pipeline lib to path for shared modules @@ -47,13 +48,16 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import json as _json from kb_retrieval import KBIndex, retrieve_context, retrieve_vector_context from retrieval import orchestrate_retrieval -from market_data import get_token_price, format_price_context +from market_data import get_token_price, format_price_context, extract_market_data_tokens from worktree_lock import main_worktree_lock from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining from http_chat_proxy import ( DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES, build_chat_proxy_payload, build_smart_research_proxy_payload, + extract_auto_smart_research_followup_goal, + extract_auto_smart_research_goal, + extract_paid_work_order_id, extract_smart_research_goal, post_chat_proxy, smart_research_command_names, @@ -94,6 +98,7 @@ AGENT_HTTP_RESEARCH_PROXY_URL: str | None = None AGENT_RESPOND_TO_PRIVATE_CHATS = False AGENT_MENTION_ALIASES = ["@teleo", "@FutAIrdBot"] AGENT_SMART_RESEARCH_COMMAND_PREFIXES = list(DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES) +AGENT_AUTO_SMART_RESEARCH_FROM_CHAT = False # Rate limits MAX_RESPONSE_PER_USER_PER_HOUR = 30 @@ -109,6 +114,8 @@ logging.basicConfig( logging.StreamHandler(), ], ) +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) logger = logging.getLogger("telegram-bot") # ─── State ────────────────────────────────────────────────────────────── @@ -122,6 +129,91 @@ user_response_times: dict[int, list[float]] = defaultdict(list) # Allowed group IDs (set after first message received, or configure) allowed_groups: set[int] = set() +TELEGRAM_REPLY_CHUNK_LIMIT = 3500 + + +def _telegram_native_html(text: str) -> str: + """Render common LLM Markdown as Telegram HTML without trusting raw HTML.""" + rendered = escape(text, quote=False) + rendered = re.sub(r"(?m)^#{1,6}\s+(.+)$", r"\1", rendered) + rendered = re.sub(r"\*\*([^*\n]{1,240})\*\*", r"\1", rendered) + rendered = re.sub(r"`([^`\n]{1,240})`", r"\1", rendered) + return rendered + + +def _plain_telegram_fallback(text: str) -> str: + text = re.sub(r"(?m)^#{1,6}\s+", "", text) + text = re.sub(r"\*\*([^*\n]{1,240})\*\*", r"\1", text) + text = re.sub(r"`([^`\n]{1,240})`", r"\1", text) + return text + + +def _telegram_reply_chunks(text: str) -> list[str]: + chunks: list[str] = [] + current = "" + for part in re.split(r"(\n\n+)", text): + if len(part) > TELEGRAM_REPLY_CHUNK_LIMIT: + if current: + chunks.append(current) + current = "" + chunks.extend( + part[i : i + TELEGRAM_REPLY_CHUNK_LIMIT] + for i in range(0, len(part), TELEGRAM_REPLY_CHUNK_LIMIT) + ) + continue + if len(current) + len(part) > TELEGRAM_REPLY_CHUNK_LIMIT and current: + chunks.append(current) + current = part + else: + current += part + if current: + chunks.append(current) + return chunks or [""] + + +async def _reply_text_native(msg, text: str, *, do_quote: bool = True): + first = True + for chunk in _telegram_reply_chunks(text): + try: + await msg.reply_text( + _telegram_native_html(chunk), + parse_mode="HTML", + do_quote=do_quote and first, + ) + except Exception as e: + logger.warning("Telegram native-format reply failed; using plain fallback: %s", e) + await msg.reply_text( + _plain_telegram_fallback(chunk), + do_quote=do_quote and first, + ) + first = False + + +async def _typing_keepalive(chat, stop_event: asyncio.Event, interval_seconds: float = 4.0) -> None: + while not stop_event.is_set(): + try: + await chat.send_action("typing") + except Exception as exc: + logger.debug("typing keepalive failed: %s", exc) + try: + await asyncio.wait_for(stop_event.wait(), timeout=interval_seconds) + except asyncio.TimeoutError: + continue + + +async def _post_chat_proxy_with_typing(chat, **kwargs): + stop_event = asyncio.Event() + keepalive = asyncio.create_task(_typing_keepalive(chat, stop_event)) + try: + return await post_chat_proxy(**kwargs) + finally: + stop_event.set() + keepalive.cancel() + try: + await keepalive + except asyncio.CancelledError: + pass + # Shared KB index (built once, refreshed on mtime change) kb_index = KBIndex(KB_READ_DIR) @@ -621,6 +713,7 @@ def sanitize_message(text: str) -> str: def _smart_research_payment_gate(chat_id: int) -> dict: """Return paid smart-research fields only when all server-side gates pass.""" + max_allowed_usd = 0.06 if os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID") != "1": return {"allow_paid_execution": False} @@ -632,7 +725,7 @@ def _smart_research_payment_gate(chat_id: int) -> dict: max_amount_usd = float(os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD", "0.01")) except ValueError: return {"allow_paid_execution": False} - if max_amount_usd <= 0 or max_amount_usd > 0.01: + if max_amount_usd <= 0 or max_amount_usd > max_allowed_usd: return {"allow_paid_execution": False} approval_ref_file = os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE", "").strip() @@ -653,6 +746,29 @@ def _smart_research_payment_gate(chat_id: int) -> dict: } +async def _market_context_for_message( + text: str, + extra_terms: list[str] | tuple[str, ...] = (), +) -> tuple[str, dict, int, list[str]]: + """Fetch structured market data for token questions without blocking the answer path.""" + token_mentions = extract_market_data_tokens(text, extra_terms=extra_terms) + market_context = "" + market_data_audit = {} + t_market = time.monotonic() + for token in token_mentions: + try: + data = await get_token_price(token) + if data: + price_str = format_price_context(data, token) + if price_str: + market_context += price_str + "\n" + market_data_audit[token] = data + except Exception as e: + logger.warning("Market context lookup failed for %s: %s", token, e) + market_duration = int((time.monotonic() - t_market) * 1000) + return market_context.strip(), market_data_audit, market_duration, token_mentions + + def _git_commit_archive(archive_path, filename: str): """Commit archived source to git so it survives git clean. (Rio review: data loss bug)""" import subprocess @@ -988,6 +1104,17 @@ async def handle_smart_research_command(update: Update, context: ContextTypes.DE await handle_tagged(update, context) +def _previous_user_message(chat_id: int, user_id: int | None) -> str | None: + if user_id is not None: + history = conversation_history.get((chat_id, user_id), []) + if history: + return history[-1].get("user") + chat_history = conversation_history.get((chat_id, 0), []) + if chat_history: + return chat_history[-1].get("user") + return None + + async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle ALL incoming group messages — buffer for triage.""" if not update.message or not update.message.text: @@ -1058,26 +1185,64 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100]) smart_research_goal = None + previous_user_goal = _previous_user_message(msg.chat_id, user.id if user else None) + paid_work_order_id = extract_paid_work_order_id(text) if AGENT_HTTP_RESEARCH_PROXY_URL else None if AGENT_HTTP_RESEARCH_PROXY_URL: smart_research_goal = extract_smart_research_goal( text, tuple(AGENT_SMART_RESEARCH_COMMAND_PREFIXES), ) + if paid_work_order_id and not smart_research_goal: + smart_research_goal = previous_user_goal or text + if not smart_research_goal and AGENT_AUTO_SMART_RESEARCH_FROM_CHAT: + smart_research_goal = extract_auto_smart_research_goal( + text, + tuple(AGENT_MENTION_ALIASES), + ) + if not smart_research_goal and AGENT_AUTO_SMART_RESEARCH_FROM_CHAT: + smart_research_goal = extract_auto_smart_research_followup_goal( + text, + previous_user_goal, + tuple(AGENT_MENTION_ALIASES), + ) if AGENT_HTTP_RESEARCH_PROXY_URL and smart_research_goal: - await msg.chat.send_action("typing") payment_gate = _smart_research_payment_gate(msg.chat_id) + proxy_research_goal = smart_research_goal + market_context, market_data_audit, market_duration, market_tokens = await _market_context_for_message( + smart_research_goal + ) + if market_context: + logger.info( + "%s smart research added structured market context for %s in %dms", + AGENT_NAME, + ",".join(market_tokens), + market_duration, + ) + proxy_research_goal = ( + f"{smart_research_goal}\n\n" + "Structured live market-data context available before web research:\n" + f"{market_context}\n\n" + "Use the structured market-data context as primary evidence for price, volume, FDV, " + "market cap, and liquidity. Do not say you cannot check these metrics when values " + "are present above. For buy/sell wording, do not provide personalized financial advice; " + "give market data, risks, and a concise non-advice thesis. Do not mention payment " + "execution status unless the user asked about payments." + ) payload = build_smart_research_proxy_payload( - research_goal=smart_research_goal, + research_goal=proxy_research_goal, source="telegram", agent=AGENT_NAME.lower(), chat_id=msg.chat_id, message_id=msg.message_id, username=user.username if user else None, include_synthesis=True, + work_order_id=paid_work_order_id, + original_research_goal=previous_user_goal if paid_work_order_id else None, **payment_gate, ) try: - status, proxy_body, proxy_reply = await post_chat_proxy( + status, proxy_body, proxy_reply = await _post_chat_proxy_with_typing( + msg.chat, url=AGENT_HTTP_RESEARCH_PROXY_URL, payload=payload, timeout_seconds=90, @@ -1100,16 +1265,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): ) return - if len(proxy_reply) <= 4096: - await msg.reply_text(proxy_reply, do_quote=True) - else: - first = True - remaining = proxy_reply - while remaining: - chunk = remaining[:4096] - await msg.reply_text(chunk, quote=first) - first = False - remaining = remaining[4096:] + await _reply_text_native(msg, proxy_reply, do_quote=True) if user: username = user.username or "anonymous" @@ -1129,7 +1285,6 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): return if AGENT_HTTP_CHAT_PROXY_URL: - await msg.chat.send_action("typing") payload = build_chat_proxy_payload( message=text, source="telegram", @@ -1139,7 +1294,8 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): username=user.username if user else None, ) try: - status, proxy_body, proxy_reply = await post_chat_proxy( + status, proxy_body, proxy_reply = await _post_chat_proxy_with_typing( + msg.chat, url=AGENT_HTTP_CHAT_PROXY_URL, payload=payload, ) @@ -1161,16 +1317,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): ) return - if len(proxy_reply) <= 4096: - await msg.reply_text(proxy_reply, do_quote=True) - else: - first = True - remaining = proxy_reply - while remaining: - chunk = remaining[:4096] - await msg.reply_text(chunk, quote=first) - first = False - remaining = remaining[4096:] + await _reply_text_native(msg, proxy_reply, do_quote=True) if user: username = user.username or "anonymous" @@ -1370,38 +1517,14 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): stats = get_db_stats() # Fetch live market data for any tokens mentioned (Rhea: market-data API) - market_context = "" - market_data_audit = {} - token_mentions = re.findall(r"\$([A-Z]{2,10})", text.upper()) - # Entity name → token mapping for natural language mentions - ENTITY_TOKEN_MAP = { - "omnipair": "OMFG", "metadao": "META", "sanctum": "CLOUD", - "drift": "DRIFT", "ore": "ORE", "jupiter": "JUP", - } - text_lower = text.lower() - for name, ticker in ENTITY_TOKEN_MAP.items(): - if name in text_lower: - token_mentions.append(ticker) - # Also check entity matches from KB retrieval - for ent in kb_ctx.entities: - for tag in ent.tags: - if tag.upper() in ENTITY_TOKEN_MAP.values(): - token_mentions.append(tag.upper()) - t_market = time.monotonic() - for token in set(token_mentions): - try: - data = await get_token_price(token) - if data: - price_str = format_price_context(data, token) - if price_str: - market_context += price_str + "\n" - market_data_audit[token] = data - except Exception: - pass # Market data is supplementary — never block on failure - market_duration = int((time.monotonic() - t_market) * 1000) + entity_terms = [tag for ent in kb_ctx.entities for tag in ent.tags] + market_context, market_data_audit, market_duration, token_mentions = await _market_context_for_message( + text, + extra_terms=entity_terms, + ) if token_mentions: tool_calls.append({ - "tool": "market_data", "input": {"tickers": list(set(token_mentions))}, + "tool": "market_data", "input": {"tickers": token_mentions}, "output": market_data_audit, "duration_ms": market_duration, }) @@ -2145,6 +2268,7 @@ def _load_agent_config(config_path: str): global AGENT_NAME, AGENT_HANDLE, AGENT_X_HANDLE, AGENT_DOMAIN_EXPERTISE global AGENT_HTTP_CHAT_PROXY_URL, AGENT_HTTP_RESEARCH_PROXY_URL global AGENT_RESPOND_TO_PRIVATE_CHATS, AGENT_MENTION_ALIASES, AGENT_SMART_RESEARCH_COMMAND_PREFIXES + global AGENT_AUTO_SMART_RESEARCH_FROM_CHAT with open(config_path) as f: cfg = yaml.safe_load(f) @@ -2162,6 +2286,7 @@ def _load_agent_config(config_path: str): "smart_research_command_prefixes", list(DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES), ) + AGENT_AUTO_SMART_RESEARCH_FROM_CHAT = bool(cfg.get("auto_smart_research_from_chat", False)) if cfg.get("bot_token_file"): BOT_TOKEN_FILE = f"/opt/teleo-eval/secrets/{cfg['bot_token_file']}" diff --git a/telegram/http_chat_proxy.py b/telegram/http_chat_proxy.py index 829da1a..88aaaaf 100644 --- a/telegram/http_chat_proxy.py +++ b/telegram/http_chat_proxy.py @@ -7,6 +7,49 @@ from typing import Any DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES = ("/smart_research", "/paid_research") _TELEGRAM_COMMAND_NAME_RE = re.compile(r"^[A-Za-z0-9_]+$") +_AUTO_SMART_RESEARCH_RE = re.compile( + r"\b(" + r"research|source|sources|citation|citations|evidence|" + r"search|find|lookup|look\s+up|web|" + r"latest|current|today|recent|as\s+of|this\s+week|this\s+month|" + r"twitter|x\.com|tweet|tweets|social|social\s+media|trend|trends|" + r"discussion|discussions|sentiment|narrative|narratives|" + r"revenue|revenues|fees|tvl|volume|fdv|fully\s+diluted|" + r"market\s+cap|mcap|valuation|funding|liquidity|price|chart|" + r"token|coin|pair|pool|dex|dexscreener|birdeye|jupiter|" + r"buy|sell|should\s+i|yes\s+or\s+no|" + r"estimate|compare|benchmark" + r")\b", + re.IGNORECASE, +) +_AUTO_CONTEXTUAL_RESEARCH_RE = re.compile( + r"(" + r"\b(what\s+are\s+your\s+thoughts|thoughts\s+on|take\s+on|opinion\s+on|" + r"how\s+did|how\s+has|how\s+is|assess|evaluate)\b" + r".*\b(managed|manage|handled|handle|handling|responded|response|situation|incident|" + r"controversy|fallout|fault|blame|position|stance|fair|valuation|valued|growth|metrics|peers?)\b" + r"|" + r"\b(who|what|why)\s+(was\s+)?(at\s+)?fault\b" + r"|" + r"\b(position|stance)\s+(on|about|towards?)\b" + r"|" + r"\b(compare|benchmark)\b.*\b(metrics|growth|valuation|peers?|fintech|web2|products?)\b" + r")", + re.IGNORECASE, +) +_AUTO_SMART_RESEARCH_FOLLOWUP_RE = re.compile( + r"\b(" + r"check\s+it\s+yourself|check\s+yourself|actually\s+check|" + r"look\s+it\s+up|look\s+that\s+up|search\s+it|search\s+that|" + r"use\s+(the\s+)?web|use\s+sources|find\s+sources|" + r"do\s+the\s+research|go\s+research|verify\s+it" + r")\b", + re.IGNORECASE, +) +_PAID_WORK_ORDER_ID_RE = re.compile( + r"\b((?:sponsored_work_orders|payment_receipts):[a-f0-9]{16,64})\b", + re.IGNORECASE, +) def smart_research_command_names( @@ -62,6 +105,59 @@ def extract_smart_research_goal( return None +def extract_auto_smart_research_goal( + message: str, + mention_aliases: tuple[str, ...] | list[str] = (), +) -> str | None: + """Return a research goal when ordinary chat clearly asks for sourced/current research.""" + text = message.strip() + for alias in mention_aliases: + clean_alias = str(alias).strip() + if not clean_alias: + continue + text = re.sub(rf"(^|\s){re.escape(clean_alias)}(?:@\w+)?\b", " ", text, flags=re.IGNORECASE).strip() + text = re.sub(r"\s+", " ", text).strip() + if not text or text.startswith("/"): + return None + if _AUTO_SMART_RESEARCH_RE.search(text) or _AUTO_CONTEXTUAL_RESEARCH_RE.search(text): + return text + return None + + +def extract_auto_smart_research_followup_goal( + message: str, + previous_user_message: str | None, + mention_aliases: tuple[str, ...] | list[str] = (), +) -> str | None: + """Turn short follow-ups like 'check it yourself' into the prior research goal.""" + text = message.strip() + for alias in mention_aliases: + clean_alias = str(alias).strip() + if not clean_alias: + continue + text = re.sub(rf"(^|\s){re.escape(clean_alias)}(?:@\w+)?\b", " ", text, flags=re.IGNORECASE).strip() + text = re.sub(r"\s+", " ", text).strip() + if not text or text.startswith("/") or not _AUTO_SMART_RESEARCH_FOLLOWUP_RE.search(text): + return None + previous_goal = extract_auto_smart_research_goal(previous_user_message or "", mention_aliases) + if not previous_goal: + return None + return ( + f"{previous_goal}\n\n" + f"Follow-up instruction: {text}. Use current public sources and cite assumptions. " + "For buy/sell wording, do not provide personalized financial advice; provide market data, risks, " + "and a concise non-advice thesis." + ) + + +def extract_paid_work_order_id(message: str) -> str | None: + """Return a paid x402 work-order/receipt id from ordinary Telegram text.""" + match = _PAID_WORK_ORDER_ID_RE.search(message.strip()) + if not match: + return None + return match.group(1) + + def build_smart_research_proxy_payload( *, research_goal: str, @@ -74,6 +170,8 @@ def build_smart_research_proxy_payload( approval_ref: str | None = None, max_amount_usd: float | None = None, include_synthesis: bool = True, + work_order_id: str | None = None, + original_research_goal: str | None = None, ) -> dict[str, Any]: """Build the no-secret Telegram payload for Leo smart research.""" payload = build_chat_proxy_payload( @@ -95,11 +193,15 @@ def build_smart_research_proxy_payload( payload["max_amount_usd"] = max_amount_usd if allow_paid_execution and approval_ref: payload["approval_ref"] = approval_ref + if work_order_id: + payload["work_order_id"] = work_order_id + if original_research_goal: + payload["original_research_goal"] = original_research_goal return payload def extract_chat_proxy_reply(payload: dict[str, Any]) -> str | None: - """Extract a reply from supported Living IP Leo chat response shapes.""" + """Extract only user-facing replies from supported Living IP Leo response shapes.""" if not isinstance(payload, dict): return None @@ -135,10 +237,6 @@ def extract_chat_proxy_reply(payload: dict[str, Any]) -> str | None: if isinstance(synthesis_decision_reply, str) and synthesis_decision_reply.strip(): return synthesis_decision_reply.strip() - strongest_claim = payload.get("strongestClaimAllowed") - if isinstance(strongest_claim, str) and strongest_claim.strip(): - return strongest_claim.strip() - return None diff --git a/telegram/market_data.py b/telegram/market_data.py index 0afa5b0..96b55b7 100644 --- a/telegram/market_data.py +++ b/telegram/market_data.py @@ -8,6 +8,7 @@ Epimetheus owns this module. Rhea: static API key pattern. """ import logging +import re from pathlib import Path import aiohttp @@ -16,12 +17,205 @@ logger = logging.getLogger("market-data") API_URL = "https://teleo-ai-api-257133920458.us-east4.run.app/v0/chat/tool/market-data" API_KEY_FILE = "/opt/teleo-eval/secrets/market-data-key" +DEXSCREENER_SEARCH_URL = "https://api.dexscreener.com/latest/dex/search" + +ENTITY_TOKEN_MAP = { + "omnipair": "OMFG", + "omfg": "OMFG", + "avici": "AVICI", + "umbra": "UMBRA", + "metadao": "META", + "sanctum": "CLOUD", + "drift": "DRIFT", + "ore": "ORE", + "jupiter": "JUP", +} + +_BARE_TICKER_STOPWORDS = { + "FDV", + "TVL", + "API", + "USD", + "USDC", + "SOL", + "YES", + "NO", + "BUY", + "SELL", +} # Cache: avoid hitting the API on every message _cache: dict[str, dict] = {} # token_name → {data, timestamp} CACHE_TTL = 300 # 5 minutes +def extract_market_data_tokens(text: str, extra_terms: list[str] | tuple[str, ...] = ()) -> list[str]: + """Extract token tickers from market-data questions while preserving order.""" + seen: set[str] = set() + tokens: list[str] = [] + + def add(token: str | None) -> None: + if not token: + return + normalized = token.upper().strip("$") + if len(normalized) < 2 or normalized in _BARE_TICKER_STOPWORDS or normalized in seen: + return + seen.add(normalized) + tokens.append(normalized) + + for ticker in re.findall(r"\$([A-Za-z][A-Za-z0-9]{1,9})\b", text): + add(ticker) + + marketish = re.search( + r"\b(price|volume|fdv|fully\s+diluted|market\s+cap|mcap|liquidity|buy|sell|token|coin)\b", + text, + flags=re.IGNORECASE, + ) + if marketish: + for ticker in re.findall(r"\b([A-Z][A-Z0-9]{1,9})\b", text): + add(ticker) + + lowered = text.lower() + for name, ticker in ENTITY_TOKEN_MAP.items(): + if re.search(rf"\b{re.escape(name)}\b", lowered): + add(ticker) + + for term in extra_terms: + term_upper = str(term).upper().strip("$") + if term_upper in ENTITY_TOKEN_MAP.values(): + add(term_upper) + + return tokens + + +def _format_usd(value) -> str | None: + try: + number = float(value) + except (TypeError, ValueError): + return None + if number >= 1_000_000_000: + return f"${number / 1_000_000_000:.2f}B" + if number >= 1_000_000: + return f"${number / 1_000_000:.2f}M" + if number >= 1_000: + return f"${number / 1_000:.2f}K" + return f"${number:,.2f}" + + +def _needs_public_market_augmentation(data: dict) -> bool: + result = str(data.get("result") or "").lower() + if not result: + return True + return "fdv" not in result or "volume" not in result + + +def _merge_market_data(primary: dict, public: dict) -> dict: + merged = dict(primary) + primary_result = str(primary.get("result") or "").strip() + public_result = str(public.get("result") or "").strip() + if primary_result and public_result: + merged["result"] = f"{primary_result}\n{public_result}" + elif public_result: + merged["result"] = public_result + merged["public_market_data"] = { + k: v for k, v in public.items() if k != "pair" + } + merged["public_market_pair"] = public.get("pair") + return merged + + +def _dex_pair_score(pair: dict, token: str) -> tuple[int, float]: + token_lower = token.lower() + base = pair.get("baseToken") or {} + symbol = str(base.get("symbol") or "").lower() + name = str(base.get("name") or "").lower() + score = 0 + if symbol == token_lower: + score += 100 + elif token_lower in symbol: + score += 50 + if token_lower in name: + score += 25 + liquidity = ((pair.get("liquidity") or {}).get("usd") or 0) + try: + liquidity_value = float(liquidity) + except (TypeError, ValueError): + liquidity_value = 0 + return score, liquidity_value + + +async def _get_dexscreener_token_data(token_name: str) -> dict | None: + token_upper = token_name.upper().strip("$") + try: + async with aiohttp.ClientSession() as session: + async with session.get( + DEXSCREENER_SEARCH_URL, + params={"q": token_upper}, + timeout=aiohttp.ClientTimeout(total=10), + ) as resp: + if resp.status >= 400: + logger.warning("DexScreener %s -> %d", token_upper, resp.status) + return None + body = await resp.json() + except Exception as e: + logger.warning("DexScreener error for %s: %s", token_upper, e) + return None + + pairs = body.get("pairs") or [] + if not pairs: + return None + + best = max(pairs, key=lambda pair: _dex_pair_score(pair, token_upper)) + score, _ = _dex_pair_score(best, token_upper) + if score <= 0: + return None + + volume_24h = (best.get("volume") or {}).get("h24") + liquidity_usd = (best.get("liquidity") or {}).get("usd") + price_change_24h = (best.get("priceChange") or {}).get("h24") + base = best.get("baseToken") or {} + fdv = best.get("fdv") + market_cap = best.get("marketCap") + price = best.get("priceUsd") + + parts = [ + f"Live market data for {token_upper}", + f"source: DexScreener", + f"pair: {base.get('symbol') or token_upper} on {best.get('chainId', 'unknown')}/{best.get('dexId', 'unknown')}", + ] + if price: + parts.append(f"price: ${price}") + formatted_fdv = _format_usd(fdv) + if formatted_fdv: + parts.append(f"FDV: {formatted_fdv}") + formatted_mcap = _format_usd(market_cap) + if formatted_mcap: + parts.append(f"market cap: {formatted_mcap}") + formatted_volume = _format_usd(volume_24h) + if formatted_volume: + parts.append(f"24h volume: {formatted_volume}") + formatted_liquidity = _format_usd(liquidity_usd) + if formatted_liquidity: + parts.append(f"liquidity: {formatted_liquidity}") + if price_change_24h is not None: + parts.append(f"24h change: {price_change_24h}%") + if best.get("url"): + parts.append(f"url: {best['url']}") + + return { + "provider": "dexscreener", + "token": token_upper, + "result": " | ".join(parts), + "price": price, + "fdv": fdv, + "market_cap": market_cap, + "volume_24h": volume_24h, + "liquidity_usd": liquidity_usd, + "price_change_24h": price_change_24h, + "pair": best, + } + + def _load_api_key() -> str | None: """Load the market-data API key from secrets.""" try: @@ -47,34 +241,41 @@ async def get_token_price(token_name: str) -> dict | None: return cached["data"] key = _load_api_key() - if not key: - return None - try: - async with aiohttp.ClientSession() as session: - async with session.post( - API_URL, - headers={ - "X-Internal-Key": key, - "Content-Type": "application/json", - }, - json={"token": token_upper}, - timeout=aiohttp.ClientTimeout(total=10), - ) as resp: - if resp.status >= 400: - logger.warning("Market data API %s → %d", token_upper, resp.status) - return None - data = await resp.json() + if key: + try: + async with aiohttp.ClientSession() as session: + async with session.post( + API_URL, + headers={ + "X-Internal-Key": key, + "Content-Type": "application/json", + }, + json={"token": token_upper}, + timeout=aiohttp.ClientTimeout(total=10), + ) as resp: + if resp.status < 400: + data = await resp.json() + if _needs_public_market_augmentation(data): + public_data = await _get_dexscreener_token_data(token_upper) + if public_data: + data = _merge_market_data(data, public_data) + _cache[token_upper] = { + "data": data, + "timestamp": time.time(), + } + return data + logger.warning("Market data API %s -> %d", token_upper, resp.status) + except Exception as e: + logger.warning("Market data API error for %s: %s", token_upper, e) - # Cache the result - _cache[token_upper] = { - "data": data, - "timestamp": time.time(), - } - return data - except Exception as e: - logger.warning("Market data API error for %s: %s", token_upper, e) - return None + data = await _get_dexscreener_token_data(token_upper) + if data: + _cache[token_upper] = { + "data": data, + "timestamp": time.time(), + } + return data def format_price_context(data: dict, token_name: str) -> str: diff --git a/tests/test_install_telegram_smart_research_gates.py b/tests/test_install_telegram_smart_research_gates.py index 2ac8a61..1fe317b 100644 --- a/tests/test_install_telegram_smart_research_gates.py +++ b/tests/test_install_telegram_smart_research_gates.py @@ -38,6 +38,8 @@ def test_installs_paid_gate_from_stdin_without_echoing_secret_or_chat_id(tmp_pat "--allow-paid", "--allowed-chat-id", chat_id, + "--max-usd", + "0.06", "--approval-ref-from-stdin", "--no-chown", "--output", @@ -59,10 +61,12 @@ def test_installs_paid_gate_from_stdin_without_echoing_secret_or_chat_id(tmp_pat assert proof["paidEnabled"] is True assert proof["approvalRefPresent"] is True assert proof["allowedChatIdPresent"] is True + assert proof["maxUsd"] == "0.06" assert proof["secretValuesIncluded"] is False env_content = env_path.read_text() assert "LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID=1" in env_content + assert "LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD=0.06" in env_content assert f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={chat_id}" in env_content assert f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_path}" in env_content assert approval_path.read_text().strip() == approval_ref diff --git a/tests/test_telegram_leo_x402_bridge.py b/tests/test_telegram_leo_x402_bridge.py index 97e39d9..ec08a90 100644 --- a/tests/test_telegram_leo_x402_bridge.py +++ b/tests/test_telegram_leo_x402_bridge.py @@ -13,10 +13,14 @@ from agent_config import load_agent_config # noqa: E402 from http_chat_proxy import ( # noqa: E402 build_chat_proxy_payload, build_smart_research_proxy_payload, + extract_auto_smart_research_followup_goal, + extract_auto_smart_research_goal, + extract_paid_work_order_id, extract_chat_proxy_reply, extract_smart_research_goal, smart_research_command_names, ) +from market_data import extract_market_data_tokens, format_price_context # noqa: E402 def test_leo_config_opts_into_http_chat_proxy_without_changing_default_agents(): @@ -32,11 +36,13 @@ def test_leo_config_opts_into_http_chat_proxy_without_changing_default_agents(): assert leo_wallet_test.http_chat_proxy_url == "https://leo.livingip.xyz/api/agents/leo/chat" assert leo_wallet_test.http_research_proxy_url == "https://leo.livingip.xyz/api/agents/leo/research" assert "/smart_research" in leo_wallet_test.smart_research_command_prefixes + assert leo_wallet_test.auto_smart_research_from_chat is True assert leo_wallet_test.respond_to_private_chats is True assert leo_wallet_test.bot_token_file == "leo-test-telegram-bot-token" assert "@lipleowallet0622183538bot" in leo_wallet_test.mention_aliases assert rio.http_chat_proxy_url is None assert rio.respond_to_private_chats is False + assert leo.auto_smart_research_from_chat is False def test_invalid_http_chat_proxy_url_fails_closed(tmp_path): @@ -105,6 +111,29 @@ def test_smart_research_payload_is_no_spend_by_default(): assert "secret" not in str(payload).lower() +def test_smart_research_payload_can_resume_paid_work_order_without_secret_material(): + payload = build_smart_research_proxy_payload( + research_goal="what are the current discussions about MetaDAO Ranger Finance on Twitter?", + source="telegram", + agent="leo", + chat_id=123, + message_id=456, + username="tester", + work_order_id="sponsored_work_orders:f951ccc6c7762ecba6f76cf6", + original_research_goal="what are the current discussions about MetaDAO Ranger Finance on Twitter?", + ) + + assert payload["work_order_id"] == "sponsored_work_orders:f951ccc6c7762ecba6f76cf6" + assert ( + payload["original_research_goal"] + == "what are the current discussions about MetaDAO Ranger Finance on Twitter?" + ) + assert payload["allow_paid_execution"] is False + assert "approval_ref" not in payload + assert "token" not in str(payload).lower() + assert "secret" not in str(payload).lower() + + @pytest.mark.parametrize( ("message", "expected"), [ @@ -119,6 +148,112 @@ def test_extract_smart_research_goal(message, expected): assert extract_smart_research_goal(message) == expected +@pytest.mark.parametrize( + ("message", "expected"), + [ + ( + "work_order_id: sponsored_work_orders:f951ccc6c7762ecba6f76cf6", + "sponsored_work_orders:f951ccc6c7762ecba6f76cf6", + ), + ( + "paid receipt payment_receipts:f951ccc6c7762ecba6f76cf6 thanks", + "payment_receipts:f951ccc6c7762ecba6f76cf6", + ), + ("no paid id here", None), + ], +) +def test_extract_paid_work_order_id(message, expected): + assert extract_paid_work_order_id(message) == expected + + +@pytest.mark.parametrize( + ("message", "expected"), + [ + ( + "@lipleowallet0622183538bot how much revenue does MetaDAO make today?", + "how much revenue does MetaDAO make today?", + ), + ( + "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no", + "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no", + ), + ("Can you find current sources on x402 usage?", "Can you find current sources on x402 usage?"), + ( + "what is the latest trend of internet finance on Twitter", + "what is the latest trend of internet finance on Twitter", + ), + ( + "what are your thoughts on how metadao managed the ranger finance situation", + "what are your thoughts on how metadao managed the ranger finance situation", + ), + ( + "who was at fault for Ranger according to Twitter?", + "who was at fault for Ranger according to Twitter?", + ), + ( + "what is MetaDAO's position on Ranger Finance?", + "what is MetaDAO's position on Ranger Finance?", + ), + ( + "how did Jupiter handle the outage situation?", + "how did Jupiter handle the outage situation?", + ), + ( + "assess whether the protocol valuation is fair versus fintech peers", + "assess whether the protocol valuation is fair versus fintech peers", + ), + ( + "compare growth metrics for these products against web2 companies", + "compare growth metrics for these products against web2 companies", + ), + ("thanks, that makes sense", None), + ("what are your thoughts on lunch", None), + ("how did you sleep", None), + ("/paid_research find x402 evidence", None), + ], +) +def test_extract_auto_smart_research_goal(message, expected): + assert extract_auto_smart_research_goal( + message, + mention_aliases=["@lipleowallet0622183538bot", "@leo"], + ) == expected + + +def test_extract_auto_smart_research_followup_goal_uses_previous_market_question(): + previous = "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no" + goal = extract_auto_smart_research_followup_goal("check it yourself", previous) + + assert previous in goal + assert "Use current public sources" in goal + assert "do not provide personalized financial advice" in goal + + +def test_extract_auto_smart_research_followup_goal_ignores_context_without_research_intent(): + assert extract_auto_smart_research_followup_goal("check it yourself", "thanks, that makes sense") is None + + +def test_market_data_token_extraction_maps_natural_market_question(): + message = "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no" + + assert extract_market_data_tokens(message) == ["OMFG", "AVICI", "UMBRA"] + + +def test_market_data_context_formats_dexscreener_fdv_volume_liquidity(): + context = format_price_context( + { + "provider": "dexscreener", + "result": ( + "Live market data for OMFG | source: DexScreener | price: $0.10 | " + "FDV: $1.20M | 24h volume: $52.00K | liquidity: $101.00K" + ), + }, + "OMFG", + ) + + assert "FDV: $1.20M" in context + assert "24h volume: $52.00K" in context + + def test_smart_research_command_names_are_safe_for_telegram_handlers(): assert smart_research_command_names( [ @@ -139,7 +274,6 @@ def test_smart_research_command_names_are_safe_for_telegram_handlers(): ({"decision": {"reply": "analysis route reply"}}, "analysis route reply"), ({"llm": {"decision": {"reply": "nested decision reply"}}}, "nested decision reply"), ({"synthesis": {"decision": {"reply": "smart research reply"}}}, "smart research reply"), - ({"strongestClaimAllowed": "fail-closed smart research readback"}, "fail-closed smart research readback"), ], ) def test_extract_chat_proxy_reply_accepts_retained_leo_shapes(payload, expected): @@ -148,3 +282,32 @@ def test_extract_chat_proxy_reply_accepts_retained_leo_shapes(payload, expected) def test_extract_chat_proxy_reply_fails_closed_on_missing_reply(): assert extract_chat_proxy_reply({"schema": "livingip.x402.leoChatResponse.v1"}) is None + + +def test_extract_chat_proxy_reply_never_displays_operator_claim_fields(): + payload = { + "ok": False, + "status": "payment_authorization_required", + "strongestClaimAllowed": ( + "Leo smart research can select sponsor-agent-research on Devnet, " + "but did not attempt payment because the call was not fully authorized." + ), + "exactBlocker": "smart_research_paid_execution_not_allowed", + "nextExactAction": "POST again with allow_paid_execution=true.", + } + + assert extract_chat_proxy_reply(payload) is None + + +def test_extract_chat_proxy_reply_uses_clean_route_reply_for_tool_failures(): + payload = { + "ok": False, + "status": "payment_authorization_required", + "reply": "I tried to use my paid research tool, but it was not available. No funds moved.", + "strongestClaimAllowed": "Internal status text that must stay out of Telegram.", + } + + assert ( + extract_chat_proxy_reply(payload) + == "I tried to use my paid research tool, but it was not available. No funds moved." + )