diff --git a/scripts/install_telegram_smart_research_gates.py b/scripts/install_telegram_smart_research_gates.py
index 164e789..10c891f 100644
--- a/scripts/install_telegram_smart_research_gates.py
+++ b/scripts/install_telegram_smart_research_gates.py
@@ -17,6 +17,7 @@ from pathlib import Path
APPROVAL_REF_RE = re.compile(r"^[A-Za-z0-9._:@/-]{8,256}$")
CHAT_ID_RE = re.compile(r"^-?\d+$")
+MAX_SMART_RESEARCH_USD = 0.06
def parse_args(argv: list[str]) -> argparse.Namespace:
@@ -66,8 +67,8 @@ def validate_max_usd(value: str) -> str:
parsed = float(value)
except ValueError:
raise ValueError("max-usd must be numeric") from None
- if parsed <= 0 or parsed > 0.01:
- raise ValueError("max-usd must be greater than 0 and no more than 0.01")
+ if parsed <= 0 or parsed > MAX_SMART_RESEARCH_USD:
+ raise ValueError(f"max-usd must be greater than 0 and no more than {MAX_SMART_RESEARCH_USD:.2f}")
return f"{parsed:.2f}"
diff --git a/telegram/agent_config.py b/telegram/agent_config.py
index b49813e..22e4238 100644
--- a/telegram/agent_config.py
+++ b/telegram/agent_config.py
@@ -48,6 +48,7 @@ class AgentConfig:
respond_to_private_chats: bool = False
mention_aliases: list[str] = field(default_factory=list)
smart_research_command_prefixes: list[str] = field(default_factory=list)
+ auto_smart_research_from_chat: bool = False
def to_dict(self) -> dict:
"""Convert to dict for passing to build_system_prompt."""
@@ -65,6 +66,7 @@ class AgentConfig:
"respond_to_private_chats": self.respond_to_private_chats,
"mention_aliases": self.mention_aliases,
"smart_research_command_prefixes": self.smart_research_command_prefixes,
+ "auto_smart_research_from_chat": self.auto_smart_research_from_chat,
}
@property
@@ -161,6 +163,7 @@ def load_agent_config(config_path: str) -> AgentConfig:
respond_to_private_chats=bool(raw.get("respond_to_private_chats", False)),
mention_aliases=mention_aliases,
smart_research_command_prefixes=smart_research_command_prefixes,
+ auto_smart_research_from_chat=bool(raw.get("auto_smart_research_from_chat", False)),
)
diff --git a/telegram/agents/leo-wallet-test.yaml b/telegram/agents/leo-wallet-test.yaml
index 8dc7fb7..910acf8 100644
--- a/telegram/agents/leo-wallet-test.yaml
+++ b/telegram/agents/leo-wallet-test.yaml
@@ -20,6 +20,7 @@ http_research_proxy_url: "https://leo.livingip.xyz/api/agents/leo/research"
smart_research_command_prefixes:
- "/smart_research"
- "/paid_research"
+auto_smart_research_from_chat: true
respond_to_private_chats: true
kb_scope:
@@ -40,9 +41,11 @@ voice_definition: |
Report current public x402 receive capability, AgentCash paid-readback status,
and exact blockers. Do not claim new Telegram-triggered payment execution
unless the hosted Leo HTTP route returns retained payment/readback evidence.
- Use /smart_research for no-spend smart research planning/readback. Paid
- smart research remains fail-closed unless the server-side allow flag, allowed
- chat id, cap, and retained approval-ref file are all present.
+ Ordinary addressed/private chat may be routed into smart research when the
+ request clearly asks for sourced, current, market, or evidence-backed work.
+ Explicit /smart_research remains available for narrow canaries. Paid smart
+ research remains fail-closed unless the server-side allow flag, allowed chat
+ id, cap, and retained approval-ref file are all present.
Do not request or expose private keys, bot tokens, wallet exports, seed phrases,
or raw secret values.
diff --git a/telegram/bot.py b/telegram/bot.py
index 5fc7d09..98e3ad4 100644
--- a/telegram/bot.py
+++ b/telegram/bot.py
@@ -29,6 +29,7 @@ import time
import yaml
from collections import defaultdict
from datetime import datetime, timezone
+from html import escape
from pathlib import Path
# Add pipeline lib to path for shared modules
@@ -47,13 +48,16 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import json as _json
from kb_retrieval import KBIndex, retrieve_context, retrieve_vector_context
from retrieval import orchestrate_retrieval
-from market_data import get_token_price, format_price_context
+from market_data import get_token_price, format_price_context, extract_market_data_tokens
from worktree_lock import main_worktree_lock
from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining
from http_chat_proxy import (
DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES,
build_chat_proxy_payload,
build_smart_research_proxy_payload,
+ extract_auto_smart_research_followup_goal,
+ extract_auto_smart_research_goal,
+ extract_paid_work_order_id,
extract_smart_research_goal,
post_chat_proxy,
smart_research_command_names,
@@ -94,6 +98,7 @@ AGENT_HTTP_RESEARCH_PROXY_URL: str | None = None
AGENT_RESPOND_TO_PRIVATE_CHATS = False
AGENT_MENTION_ALIASES = ["@teleo", "@FutAIrdBot"]
AGENT_SMART_RESEARCH_COMMAND_PREFIXES = list(DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES)
+AGENT_AUTO_SMART_RESEARCH_FROM_CHAT = False
# Rate limits
MAX_RESPONSE_PER_USER_PER_HOUR = 30
@@ -109,6 +114,8 @@ logging.basicConfig(
logging.StreamHandler(),
],
)
+logging.getLogger("httpx").setLevel(logging.WARNING)
+logging.getLogger("httpcore").setLevel(logging.WARNING)
logger = logging.getLogger("telegram-bot")
# ─── State ──────────────────────────────────────────────────────────────
@@ -122,6 +129,91 @@ user_response_times: dict[int, list[float]] = defaultdict(list)
# Allowed group IDs (set after first message received, or configure)
allowed_groups: set[int] = set()
+TELEGRAM_REPLY_CHUNK_LIMIT = 3500
+
+
+def _telegram_native_html(text: str) -> str:
+ """Render common LLM Markdown as Telegram HTML without trusting raw HTML."""
+ rendered = escape(text, quote=False)
+ rendered = re.sub(r"(?m)^#{1,6}\s+(.+)$", r"\1", rendered)
+ rendered = re.sub(r"\*\*([^*\n]{1,240})\*\*", r"\1", rendered)
+ rendered = re.sub(r"`([^`\n]{1,240})`", r"\1", rendered)
+ return rendered
+
+
+def _plain_telegram_fallback(text: str) -> str:
+ text = re.sub(r"(?m)^#{1,6}\s+", "", text)
+ text = re.sub(r"\*\*([^*\n]{1,240})\*\*", r"\1", text)
+ text = re.sub(r"`([^`\n]{1,240})`", r"\1", text)
+ return text
+
+
+def _telegram_reply_chunks(text: str) -> list[str]:
+ chunks: list[str] = []
+ current = ""
+ for part in re.split(r"(\n\n+)", text):
+ if len(part) > TELEGRAM_REPLY_CHUNK_LIMIT:
+ if current:
+ chunks.append(current)
+ current = ""
+ chunks.extend(
+ part[i : i + TELEGRAM_REPLY_CHUNK_LIMIT]
+ for i in range(0, len(part), TELEGRAM_REPLY_CHUNK_LIMIT)
+ )
+ continue
+ if len(current) + len(part) > TELEGRAM_REPLY_CHUNK_LIMIT and current:
+ chunks.append(current)
+ current = part
+ else:
+ current += part
+ if current:
+ chunks.append(current)
+ return chunks or [""]
+
+
+async def _reply_text_native(msg, text: str, *, do_quote: bool = True):
+ first = True
+ for chunk in _telegram_reply_chunks(text):
+ try:
+ await msg.reply_text(
+ _telegram_native_html(chunk),
+ parse_mode="HTML",
+ do_quote=do_quote and first,
+ )
+ except Exception as e:
+ logger.warning("Telegram native-format reply failed; using plain fallback: %s", e)
+ await msg.reply_text(
+ _plain_telegram_fallback(chunk),
+ do_quote=do_quote and first,
+ )
+ first = False
+
+
+async def _typing_keepalive(chat, stop_event: asyncio.Event, interval_seconds: float = 4.0) -> None:
+ while not stop_event.is_set():
+ try:
+ await chat.send_action("typing")
+ except Exception as exc:
+ logger.debug("typing keepalive failed: %s", exc)
+ try:
+ await asyncio.wait_for(stop_event.wait(), timeout=interval_seconds)
+ except asyncio.TimeoutError:
+ continue
+
+
+async def _post_chat_proxy_with_typing(chat, **kwargs):
+ stop_event = asyncio.Event()
+ keepalive = asyncio.create_task(_typing_keepalive(chat, stop_event))
+ try:
+ return await post_chat_proxy(**kwargs)
+ finally:
+ stop_event.set()
+ keepalive.cancel()
+ try:
+ await keepalive
+ except asyncio.CancelledError:
+ pass
+
# Shared KB index (built once, refreshed on mtime change)
kb_index = KBIndex(KB_READ_DIR)
@@ -621,6 +713,7 @@ def sanitize_message(text: str) -> str:
def _smart_research_payment_gate(chat_id: int) -> dict:
"""Return paid smart-research fields only when all server-side gates pass."""
+ max_allowed_usd = 0.06
if os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID") != "1":
return {"allow_paid_execution": False}
@@ -632,7 +725,7 @@ def _smart_research_payment_gate(chat_id: int) -> dict:
max_amount_usd = float(os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD", "0.01"))
except ValueError:
return {"allow_paid_execution": False}
- if max_amount_usd <= 0 or max_amount_usd > 0.01:
+ if max_amount_usd <= 0 or max_amount_usd > max_allowed_usd:
return {"allow_paid_execution": False}
approval_ref_file = os.getenv("LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE", "").strip()
@@ -653,6 +746,29 @@ def _smart_research_payment_gate(chat_id: int) -> dict:
}
+async def _market_context_for_message(
+ text: str,
+ extra_terms: list[str] | tuple[str, ...] = (),
+) -> tuple[str, dict, int, list[str]]:
+ """Fetch structured market data for token questions without blocking the answer path."""
+ token_mentions = extract_market_data_tokens(text, extra_terms=extra_terms)
+ market_context = ""
+ market_data_audit = {}
+ t_market = time.monotonic()
+ for token in token_mentions:
+ try:
+ data = await get_token_price(token)
+ if data:
+ price_str = format_price_context(data, token)
+ if price_str:
+ market_context += price_str + "\n"
+ market_data_audit[token] = data
+ except Exception as e:
+ logger.warning("Market context lookup failed for %s: %s", token, e)
+ market_duration = int((time.monotonic() - t_market) * 1000)
+ return market_context.strip(), market_data_audit, market_duration, token_mentions
+
+
def _git_commit_archive(archive_path, filename: str):
"""Commit archived source to git so it survives git clean. (Rio review: data loss bug)"""
import subprocess
@@ -988,6 +1104,17 @@ async def handle_smart_research_command(update: Update, context: ContextTypes.DE
await handle_tagged(update, context)
+def _previous_user_message(chat_id: int, user_id: int | None) -> str | None:
+ if user_id is not None:
+ history = conversation_history.get((chat_id, user_id), [])
+ if history:
+ return history[-1].get("user")
+ chat_history = conversation_history.get((chat_id, 0), [])
+ if chat_history:
+ return chat_history[-1].get("user")
+ return None
+
+
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle ALL incoming group messages — buffer for triage."""
if not update.message or not update.message.text:
@@ -1058,26 +1185,64 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
smart_research_goal = None
+ previous_user_goal = _previous_user_message(msg.chat_id, user.id if user else None)
+ paid_work_order_id = extract_paid_work_order_id(text) if AGENT_HTTP_RESEARCH_PROXY_URL else None
if AGENT_HTTP_RESEARCH_PROXY_URL:
smart_research_goal = extract_smart_research_goal(
text,
tuple(AGENT_SMART_RESEARCH_COMMAND_PREFIXES),
)
+ if paid_work_order_id and not smart_research_goal:
+ smart_research_goal = previous_user_goal or text
+ if not smart_research_goal and AGENT_AUTO_SMART_RESEARCH_FROM_CHAT:
+ smart_research_goal = extract_auto_smart_research_goal(
+ text,
+ tuple(AGENT_MENTION_ALIASES),
+ )
+ if not smart_research_goal and AGENT_AUTO_SMART_RESEARCH_FROM_CHAT:
+ smart_research_goal = extract_auto_smart_research_followup_goal(
+ text,
+ previous_user_goal,
+ tuple(AGENT_MENTION_ALIASES),
+ )
if AGENT_HTTP_RESEARCH_PROXY_URL and smart_research_goal:
- await msg.chat.send_action("typing")
payment_gate = _smart_research_payment_gate(msg.chat_id)
+ proxy_research_goal = smart_research_goal
+ market_context, market_data_audit, market_duration, market_tokens = await _market_context_for_message(
+ smart_research_goal
+ )
+ if market_context:
+ logger.info(
+ "%s smart research added structured market context for %s in %dms",
+ AGENT_NAME,
+ ",".join(market_tokens),
+ market_duration,
+ )
+ proxy_research_goal = (
+ f"{smart_research_goal}\n\n"
+ "Structured live market-data context available before web research:\n"
+ f"{market_context}\n\n"
+ "Use the structured market-data context as primary evidence for price, volume, FDV, "
+ "market cap, and liquidity. Do not say you cannot check these metrics when values "
+ "are present above. For buy/sell wording, do not provide personalized financial advice; "
+ "give market data, risks, and a concise non-advice thesis. Do not mention payment "
+ "execution status unless the user asked about payments."
+ )
payload = build_smart_research_proxy_payload(
- research_goal=smart_research_goal,
+ research_goal=proxy_research_goal,
source="telegram",
agent=AGENT_NAME.lower(),
chat_id=msg.chat_id,
message_id=msg.message_id,
username=user.username if user else None,
include_synthesis=True,
+ work_order_id=paid_work_order_id,
+ original_research_goal=previous_user_goal if paid_work_order_id else None,
**payment_gate,
)
try:
- status, proxy_body, proxy_reply = await post_chat_proxy(
+ status, proxy_body, proxy_reply = await _post_chat_proxy_with_typing(
+ msg.chat,
url=AGENT_HTTP_RESEARCH_PROXY_URL,
payload=payload,
timeout_seconds=90,
@@ -1100,16 +1265,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
)
return
- if len(proxy_reply) <= 4096:
- await msg.reply_text(proxy_reply, do_quote=True)
- else:
- first = True
- remaining = proxy_reply
- while remaining:
- chunk = remaining[:4096]
- await msg.reply_text(chunk, quote=first)
- first = False
- remaining = remaining[4096:]
+ await _reply_text_native(msg, proxy_reply, do_quote=True)
if user:
username = user.username or "anonymous"
@@ -1129,7 +1285,6 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
return
if AGENT_HTTP_CHAT_PROXY_URL:
- await msg.chat.send_action("typing")
payload = build_chat_proxy_payload(
message=text,
source="telegram",
@@ -1139,7 +1294,8 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
username=user.username if user else None,
)
try:
- status, proxy_body, proxy_reply = await post_chat_proxy(
+ status, proxy_body, proxy_reply = await _post_chat_proxy_with_typing(
+ msg.chat,
url=AGENT_HTTP_CHAT_PROXY_URL,
payload=payload,
)
@@ -1161,16 +1317,7 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
)
return
- if len(proxy_reply) <= 4096:
- await msg.reply_text(proxy_reply, do_quote=True)
- else:
- first = True
- remaining = proxy_reply
- while remaining:
- chunk = remaining[:4096]
- await msg.reply_text(chunk, quote=first)
- first = False
- remaining = remaining[4096:]
+ await _reply_text_native(msg, proxy_reply, do_quote=True)
if user:
username = user.username or "anonymous"
@@ -1370,38 +1517,14 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
stats = get_db_stats()
# Fetch live market data for any tokens mentioned (Rhea: market-data API)
- market_context = ""
- market_data_audit = {}
- token_mentions = re.findall(r"\$([A-Z]{2,10})", text.upper())
- # Entity name → token mapping for natural language mentions
- ENTITY_TOKEN_MAP = {
- "omnipair": "OMFG", "metadao": "META", "sanctum": "CLOUD",
- "drift": "DRIFT", "ore": "ORE", "jupiter": "JUP",
- }
- text_lower = text.lower()
- for name, ticker in ENTITY_TOKEN_MAP.items():
- if name in text_lower:
- token_mentions.append(ticker)
- # Also check entity matches from KB retrieval
- for ent in kb_ctx.entities:
- for tag in ent.tags:
- if tag.upper() in ENTITY_TOKEN_MAP.values():
- token_mentions.append(tag.upper())
- t_market = time.monotonic()
- for token in set(token_mentions):
- try:
- data = await get_token_price(token)
- if data:
- price_str = format_price_context(data, token)
- if price_str:
- market_context += price_str + "\n"
- market_data_audit[token] = data
- except Exception:
- pass # Market data is supplementary — never block on failure
- market_duration = int((time.monotonic() - t_market) * 1000)
+ entity_terms = [tag for ent in kb_ctx.entities for tag in ent.tags]
+ market_context, market_data_audit, market_duration, token_mentions = await _market_context_for_message(
+ text,
+ extra_terms=entity_terms,
+ )
if token_mentions:
tool_calls.append({
- "tool": "market_data", "input": {"tickers": list(set(token_mentions))},
+ "tool": "market_data", "input": {"tickers": token_mentions},
"output": market_data_audit,
"duration_ms": market_duration,
})
@@ -2145,6 +2268,7 @@ def _load_agent_config(config_path: str):
global AGENT_NAME, AGENT_HANDLE, AGENT_X_HANDLE, AGENT_DOMAIN_EXPERTISE
global AGENT_HTTP_CHAT_PROXY_URL, AGENT_HTTP_RESEARCH_PROXY_URL
global AGENT_RESPOND_TO_PRIVATE_CHATS, AGENT_MENTION_ALIASES, AGENT_SMART_RESEARCH_COMMAND_PREFIXES
+ global AGENT_AUTO_SMART_RESEARCH_FROM_CHAT
with open(config_path) as f:
cfg = yaml.safe_load(f)
@@ -2162,6 +2286,7 @@ def _load_agent_config(config_path: str):
"smart_research_command_prefixes",
list(DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES),
)
+ AGENT_AUTO_SMART_RESEARCH_FROM_CHAT = bool(cfg.get("auto_smart_research_from_chat", False))
if cfg.get("bot_token_file"):
BOT_TOKEN_FILE = f"/opt/teleo-eval/secrets/{cfg['bot_token_file']}"
diff --git a/telegram/http_chat_proxy.py b/telegram/http_chat_proxy.py
index 829da1a..88aaaaf 100644
--- a/telegram/http_chat_proxy.py
+++ b/telegram/http_chat_proxy.py
@@ -7,6 +7,49 @@ from typing import Any
DEFAULT_SMART_RESEARCH_COMMAND_PREFIXES = ("/smart_research", "/paid_research")
_TELEGRAM_COMMAND_NAME_RE = re.compile(r"^[A-Za-z0-9_]+$")
+_AUTO_SMART_RESEARCH_RE = re.compile(
+ r"\b("
+ r"research|source|sources|citation|citations|evidence|"
+ r"search|find|lookup|look\s+up|web|"
+ r"latest|current|today|recent|as\s+of|this\s+week|this\s+month|"
+ r"twitter|x\.com|tweet|tweets|social|social\s+media|trend|trends|"
+ r"discussion|discussions|sentiment|narrative|narratives|"
+ r"revenue|revenues|fees|tvl|volume|fdv|fully\s+diluted|"
+ r"market\s+cap|mcap|valuation|funding|liquidity|price|chart|"
+ r"token|coin|pair|pool|dex|dexscreener|birdeye|jupiter|"
+ r"buy|sell|should\s+i|yes\s+or\s+no|"
+ r"estimate|compare|benchmark"
+ r")\b",
+ re.IGNORECASE,
+)
+_AUTO_CONTEXTUAL_RESEARCH_RE = re.compile(
+ r"("
+ r"\b(what\s+are\s+your\s+thoughts|thoughts\s+on|take\s+on|opinion\s+on|"
+ r"how\s+did|how\s+has|how\s+is|assess|evaluate)\b"
+ r".*\b(managed|manage|handled|handle|handling|responded|response|situation|incident|"
+ r"controversy|fallout|fault|blame|position|stance|fair|valuation|valued|growth|metrics|peers?)\b"
+ r"|"
+ r"\b(who|what|why)\s+(was\s+)?(at\s+)?fault\b"
+ r"|"
+ r"\b(position|stance)\s+(on|about|towards?)\b"
+ r"|"
+ r"\b(compare|benchmark)\b.*\b(metrics|growth|valuation|peers?|fintech|web2|products?)\b"
+ r")",
+ re.IGNORECASE,
+)
+_AUTO_SMART_RESEARCH_FOLLOWUP_RE = re.compile(
+ r"\b("
+ r"check\s+it\s+yourself|check\s+yourself|actually\s+check|"
+ r"look\s+it\s+up|look\s+that\s+up|search\s+it|search\s+that|"
+ r"use\s+(the\s+)?web|use\s+sources|find\s+sources|"
+ r"do\s+the\s+research|go\s+research|verify\s+it"
+ r")\b",
+ re.IGNORECASE,
+)
+_PAID_WORK_ORDER_ID_RE = re.compile(
+ r"\b((?:sponsored_work_orders|payment_receipts):[a-f0-9]{16,64})\b",
+ re.IGNORECASE,
+)
def smart_research_command_names(
@@ -62,6 +105,59 @@ def extract_smart_research_goal(
return None
+def extract_auto_smart_research_goal(
+ message: str,
+ mention_aliases: tuple[str, ...] | list[str] = (),
+) -> str | None:
+ """Return a research goal when ordinary chat clearly asks for sourced/current research."""
+ text = message.strip()
+ for alias in mention_aliases:
+ clean_alias = str(alias).strip()
+ if not clean_alias:
+ continue
+ text = re.sub(rf"(^|\s){re.escape(clean_alias)}(?:@\w+)?\b", " ", text, flags=re.IGNORECASE).strip()
+ text = re.sub(r"\s+", " ", text).strip()
+ if not text or text.startswith("/"):
+ return None
+ if _AUTO_SMART_RESEARCH_RE.search(text) or _AUTO_CONTEXTUAL_RESEARCH_RE.search(text):
+ return text
+ return None
+
+
+def extract_auto_smart_research_followup_goal(
+ message: str,
+ previous_user_message: str | None,
+ mention_aliases: tuple[str, ...] | list[str] = (),
+) -> str | None:
+ """Turn short follow-ups like 'check it yourself' into the prior research goal."""
+ text = message.strip()
+ for alias in mention_aliases:
+ clean_alias = str(alias).strip()
+ if not clean_alias:
+ continue
+ text = re.sub(rf"(^|\s){re.escape(clean_alias)}(?:@\w+)?\b", " ", text, flags=re.IGNORECASE).strip()
+ text = re.sub(r"\s+", " ", text).strip()
+ if not text or text.startswith("/") or not _AUTO_SMART_RESEARCH_FOLLOWUP_RE.search(text):
+ return None
+ previous_goal = extract_auto_smart_research_goal(previous_user_message or "", mention_aliases)
+ if not previous_goal:
+ return None
+ return (
+ f"{previous_goal}\n\n"
+ f"Follow-up instruction: {text}. Use current public sources and cite assumptions. "
+ "For buy/sell wording, do not provide personalized financial advice; provide market data, risks, "
+ "and a concise non-advice thesis."
+ )
+
+
+def extract_paid_work_order_id(message: str) -> str | None:
+ """Return a paid x402 work-order/receipt id from ordinary Telegram text."""
+ match = _PAID_WORK_ORDER_ID_RE.search(message.strip())
+ if not match:
+ return None
+ return match.group(1)
+
+
def build_smart_research_proxy_payload(
*,
research_goal: str,
@@ -74,6 +170,8 @@ def build_smart_research_proxy_payload(
approval_ref: str | None = None,
max_amount_usd: float | None = None,
include_synthesis: bool = True,
+ work_order_id: str | None = None,
+ original_research_goal: str | None = None,
) -> dict[str, Any]:
"""Build the no-secret Telegram payload for Leo smart research."""
payload = build_chat_proxy_payload(
@@ -95,11 +193,15 @@ def build_smart_research_proxy_payload(
payload["max_amount_usd"] = max_amount_usd
if allow_paid_execution and approval_ref:
payload["approval_ref"] = approval_ref
+ if work_order_id:
+ payload["work_order_id"] = work_order_id
+ if original_research_goal:
+ payload["original_research_goal"] = original_research_goal
return payload
def extract_chat_proxy_reply(payload: dict[str, Any]) -> str | None:
- """Extract a reply from supported Living IP Leo chat response shapes."""
+ """Extract only user-facing replies from supported Living IP Leo response shapes."""
if not isinstance(payload, dict):
return None
@@ -135,10 +237,6 @@ def extract_chat_proxy_reply(payload: dict[str, Any]) -> str | None:
if isinstance(synthesis_decision_reply, str) and synthesis_decision_reply.strip():
return synthesis_decision_reply.strip()
- strongest_claim = payload.get("strongestClaimAllowed")
- if isinstance(strongest_claim, str) and strongest_claim.strip():
- return strongest_claim.strip()
-
return None
diff --git a/telegram/market_data.py b/telegram/market_data.py
index 0afa5b0..96b55b7 100644
--- a/telegram/market_data.py
+++ b/telegram/market_data.py
@@ -8,6 +8,7 @@ Epimetheus owns this module. Rhea: static API key pattern.
"""
import logging
+import re
from pathlib import Path
import aiohttp
@@ -16,12 +17,205 @@ logger = logging.getLogger("market-data")
API_URL = "https://teleo-ai-api-257133920458.us-east4.run.app/v0/chat/tool/market-data"
API_KEY_FILE = "/opt/teleo-eval/secrets/market-data-key"
+DEXSCREENER_SEARCH_URL = "https://api.dexscreener.com/latest/dex/search"
+
+ENTITY_TOKEN_MAP = {
+ "omnipair": "OMFG",
+ "omfg": "OMFG",
+ "avici": "AVICI",
+ "umbra": "UMBRA",
+ "metadao": "META",
+ "sanctum": "CLOUD",
+ "drift": "DRIFT",
+ "ore": "ORE",
+ "jupiter": "JUP",
+}
+
+_BARE_TICKER_STOPWORDS = {
+ "FDV",
+ "TVL",
+ "API",
+ "USD",
+ "USDC",
+ "SOL",
+ "YES",
+ "NO",
+ "BUY",
+ "SELL",
+}
# Cache: avoid hitting the API on every message
_cache: dict[str, dict] = {} # token_name → {data, timestamp}
CACHE_TTL = 300 # 5 minutes
+def extract_market_data_tokens(text: str, extra_terms: list[str] | tuple[str, ...] = ()) -> list[str]:
+ """Extract token tickers from market-data questions while preserving order."""
+ seen: set[str] = set()
+ tokens: list[str] = []
+
+ def add(token: str | None) -> None:
+ if not token:
+ return
+ normalized = token.upper().strip("$")
+ if len(normalized) < 2 or normalized in _BARE_TICKER_STOPWORDS or normalized in seen:
+ return
+ seen.add(normalized)
+ tokens.append(normalized)
+
+ for ticker in re.findall(r"\$([A-Za-z][A-Za-z0-9]{1,9})\b", text):
+ add(ticker)
+
+ marketish = re.search(
+ r"\b(price|volume|fdv|fully\s+diluted|market\s+cap|mcap|liquidity|buy|sell|token|coin)\b",
+ text,
+ flags=re.IGNORECASE,
+ )
+ if marketish:
+ for ticker in re.findall(r"\b([A-Z][A-Z0-9]{1,9})\b", text):
+ add(ticker)
+
+ lowered = text.lower()
+ for name, ticker in ENTITY_TOKEN_MAP.items():
+ if re.search(rf"\b{re.escape(name)}\b", lowered):
+ add(ticker)
+
+ for term in extra_terms:
+ term_upper = str(term).upper().strip("$")
+ if term_upper in ENTITY_TOKEN_MAP.values():
+ add(term_upper)
+
+ return tokens
+
+
+def _format_usd(value) -> str | None:
+ try:
+ number = float(value)
+ except (TypeError, ValueError):
+ return None
+ if number >= 1_000_000_000:
+ return f"${number / 1_000_000_000:.2f}B"
+ if number >= 1_000_000:
+ return f"${number / 1_000_000:.2f}M"
+ if number >= 1_000:
+ return f"${number / 1_000:.2f}K"
+ return f"${number:,.2f}"
+
+
+def _needs_public_market_augmentation(data: dict) -> bool:
+ result = str(data.get("result") or "").lower()
+ if not result:
+ return True
+ return "fdv" not in result or "volume" not in result
+
+
+def _merge_market_data(primary: dict, public: dict) -> dict:
+ merged = dict(primary)
+ primary_result = str(primary.get("result") or "").strip()
+ public_result = str(public.get("result") or "").strip()
+ if primary_result and public_result:
+ merged["result"] = f"{primary_result}\n{public_result}"
+ elif public_result:
+ merged["result"] = public_result
+ merged["public_market_data"] = {
+ k: v for k, v in public.items() if k != "pair"
+ }
+ merged["public_market_pair"] = public.get("pair")
+ return merged
+
+
+def _dex_pair_score(pair: dict, token: str) -> tuple[int, float]:
+ token_lower = token.lower()
+ base = pair.get("baseToken") or {}
+ symbol = str(base.get("symbol") or "").lower()
+ name = str(base.get("name") or "").lower()
+ score = 0
+ if symbol == token_lower:
+ score += 100
+ elif token_lower in symbol:
+ score += 50
+ if token_lower in name:
+ score += 25
+ liquidity = ((pair.get("liquidity") or {}).get("usd") or 0)
+ try:
+ liquidity_value = float(liquidity)
+ except (TypeError, ValueError):
+ liquidity_value = 0
+ return score, liquidity_value
+
+
+async def _get_dexscreener_token_data(token_name: str) -> dict | None:
+ token_upper = token_name.upper().strip("$")
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ DEXSCREENER_SEARCH_URL,
+ params={"q": token_upper},
+ timeout=aiohttp.ClientTimeout(total=10),
+ ) as resp:
+ if resp.status >= 400:
+ logger.warning("DexScreener %s -> %d", token_upper, resp.status)
+ return None
+ body = await resp.json()
+ except Exception as e:
+ logger.warning("DexScreener error for %s: %s", token_upper, e)
+ return None
+
+ pairs = body.get("pairs") or []
+ if not pairs:
+ return None
+
+ best = max(pairs, key=lambda pair: _dex_pair_score(pair, token_upper))
+ score, _ = _dex_pair_score(best, token_upper)
+ if score <= 0:
+ return None
+
+ volume_24h = (best.get("volume") or {}).get("h24")
+ liquidity_usd = (best.get("liquidity") or {}).get("usd")
+ price_change_24h = (best.get("priceChange") or {}).get("h24")
+ base = best.get("baseToken") or {}
+ fdv = best.get("fdv")
+ market_cap = best.get("marketCap")
+ price = best.get("priceUsd")
+
+ parts = [
+ f"Live market data for {token_upper}",
+ f"source: DexScreener",
+ f"pair: {base.get('symbol') or token_upper} on {best.get('chainId', 'unknown')}/{best.get('dexId', 'unknown')}",
+ ]
+ if price:
+ parts.append(f"price: ${price}")
+ formatted_fdv = _format_usd(fdv)
+ if formatted_fdv:
+ parts.append(f"FDV: {formatted_fdv}")
+ formatted_mcap = _format_usd(market_cap)
+ if formatted_mcap:
+ parts.append(f"market cap: {formatted_mcap}")
+ formatted_volume = _format_usd(volume_24h)
+ if formatted_volume:
+ parts.append(f"24h volume: {formatted_volume}")
+ formatted_liquidity = _format_usd(liquidity_usd)
+ if formatted_liquidity:
+ parts.append(f"liquidity: {formatted_liquidity}")
+ if price_change_24h is not None:
+ parts.append(f"24h change: {price_change_24h}%")
+ if best.get("url"):
+ parts.append(f"url: {best['url']}")
+
+ return {
+ "provider": "dexscreener",
+ "token": token_upper,
+ "result": " | ".join(parts),
+ "price": price,
+ "fdv": fdv,
+ "market_cap": market_cap,
+ "volume_24h": volume_24h,
+ "liquidity_usd": liquidity_usd,
+ "price_change_24h": price_change_24h,
+ "pair": best,
+ }
+
+
def _load_api_key() -> str | None:
"""Load the market-data API key from secrets."""
try:
@@ -47,34 +241,41 @@ async def get_token_price(token_name: str) -> dict | None:
return cached["data"]
key = _load_api_key()
- if not key:
- return None
- try:
- async with aiohttp.ClientSession() as session:
- async with session.post(
- API_URL,
- headers={
- "X-Internal-Key": key,
- "Content-Type": "application/json",
- },
- json={"token": token_upper},
- timeout=aiohttp.ClientTimeout(total=10),
- ) as resp:
- if resp.status >= 400:
- logger.warning("Market data API %s → %d", token_upper, resp.status)
- return None
- data = await resp.json()
+ if key:
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ API_URL,
+ headers={
+ "X-Internal-Key": key,
+ "Content-Type": "application/json",
+ },
+ json={"token": token_upper},
+ timeout=aiohttp.ClientTimeout(total=10),
+ ) as resp:
+ if resp.status < 400:
+ data = await resp.json()
+ if _needs_public_market_augmentation(data):
+ public_data = await _get_dexscreener_token_data(token_upper)
+ if public_data:
+ data = _merge_market_data(data, public_data)
+ _cache[token_upper] = {
+ "data": data,
+ "timestamp": time.time(),
+ }
+ return data
+ logger.warning("Market data API %s -> %d", token_upper, resp.status)
+ except Exception as e:
+ logger.warning("Market data API error for %s: %s", token_upper, e)
- # Cache the result
- _cache[token_upper] = {
- "data": data,
- "timestamp": time.time(),
- }
- return data
- except Exception as e:
- logger.warning("Market data API error for %s: %s", token_upper, e)
- return None
+ data = await _get_dexscreener_token_data(token_upper)
+ if data:
+ _cache[token_upper] = {
+ "data": data,
+ "timestamp": time.time(),
+ }
+ return data
def format_price_context(data: dict, token_name: str) -> str:
diff --git a/tests/test_install_telegram_smart_research_gates.py b/tests/test_install_telegram_smart_research_gates.py
index 2ac8a61..1fe317b 100644
--- a/tests/test_install_telegram_smart_research_gates.py
+++ b/tests/test_install_telegram_smart_research_gates.py
@@ -38,6 +38,8 @@ def test_installs_paid_gate_from_stdin_without_echoing_secret_or_chat_id(tmp_pat
"--allow-paid",
"--allowed-chat-id",
chat_id,
+ "--max-usd",
+ "0.06",
"--approval-ref-from-stdin",
"--no-chown",
"--output",
@@ -59,10 +61,12 @@ def test_installs_paid_gate_from_stdin_without_echoing_secret_or_chat_id(tmp_pat
assert proof["paidEnabled"] is True
assert proof["approvalRefPresent"] is True
assert proof["allowedChatIdPresent"] is True
+ assert proof["maxUsd"] == "0.06"
assert proof["secretValuesIncluded"] is False
env_content = env_path.read_text()
assert "LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOW_PAID=1" in env_content
+ assert "LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_MAX_USD=0.06" in env_content
assert f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_ALLOWED_CHAT_ID={chat_id}" in env_content
assert f"LIVINGIP_LEO_TELEGRAM_SMART_RESEARCH_APPROVAL_REF_FILE={approval_path}" in env_content
assert approval_path.read_text().strip() == approval_ref
diff --git a/tests/test_telegram_leo_x402_bridge.py b/tests/test_telegram_leo_x402_bridge.py
index 97e39d9..ec08a90 100644
--- a/tests/test_telegram_leo_x402_bridge.py
+++ b/tests/test_telegram_leo_x402_bridge.py
@@ -13,10 +13,14 @@ from agent_config import load_agent_config # noqa: E402
from http_chat_proxy import ( # noqa: E402
build_chat_proxy_payload,
build_smart_research_proxy_payload,
+ extract_auto_smart_research_followup_goal,
+ extract_auto_smart_research_goal,
+ extract_paid_work_order_id,
extract_chat_proxy_reply,
extract_smart_research_goal,
smart_research_command_names,
)
+from market_data import extract_market_data_tokens, format_price_context # noqa: E402
def test_leo_config_opts_into_http_chat_proxy_without_changing_default_agents():
@@ -32,11 +36,13 @@ def test_leo_config_opts_into_http_chat_proxy_without_changing_default_agents():
assert leo_wallet_test.http_chat_proxy_url == "https://leo.livingip.xyz/api/agents/leo/chat"
assert leo_wallet_test.http_research_proxy_url == "https://leo.livingip.xyz/api/agents/leo/research"
assert "/smart_research" in leo_wallet_test.smart_research_command_prefixes
+ assert leo_wallet_test.auto_smart_research_from_chat is True
assert leo_wallet_test.respond_to_private_chats is True
assert leo_wallet_test.bot_token_file == "leo-test-telegram-bot-token"
assert "@lipleowallet0622183538bot" in leo_wallet_test.mention_aliases
assert rio.http_chat_proxy_url is None
assert rio.respond_to_private_chats is False
+ assert leo.auto_smart_research_from_chat is False
def test_invalid_http_chat_proxy_url_fails_closed(tmp_path):
@@ -105,6 +111,29 @@ def test_smart_research_payload_is_no_spend_by_default():
assert "secret" not in str(payload).lower()
+def test_smart_research_payload_can_resume_paid_work_order_without_secret_material():
+ payload = build_smart_research_proxy_payload(
+ research_goal="what are the current discussions about MetaDAO Ranger Finance on Twitter?",
+ source="telegram",
+ agent="leo",
+ chat_id=123,
+ message_id=456,
+ username="tester",
+ work_order_id="sponsored_work_orders:f951ccc6c7762ecba6f76cf6",
+ original_research_goal="what are the current discussions about MetaDAO Ranger Finance on Twitter?",
+ )
+
+ assert payload["work_order_id"] == "sponsored_work_orders:f951ccc6c7762ecba6f76cf6"
+ assert (
+ payload["original_research_goal"]
+ == "what are the current discussions about MetaDAO Ranger Finance on Twitter?"
+ )
+ assert payload["allow_paid_execution"] is False
+ assert "approval_ref" not in payload
+ assert "token" not in str(payload).lower()
+ assert "secret" not in str(payload).lower()
+
+
@pytest.mark.parametrize(
("message", "expected"),
[
@@ -119,6 +148,112 @@ def test_extract_smart_research_goal(message, expected):
assert extract_smart_research_goal(message) == expected
+@pytest.mark.parametrize(
+ ("message", "expected"),
+ [
+ (
+ "work_order_id: sponsored_work_orders:f951ccc6c7762ecba6f76cf6",
+ "sponsored_work_orders:f951ccc6c7762ecba6f76cf6",
+ ),
+ (
+ "paid receipt payment_receipts:f951ccc6c7762ecba6f76cf6 thanks",
+ "payment_receipts:f951ccc6c7762ecba6f76cf6",
+ ),
+ ("no paid id here", None),
+ ],
+)
+def test_extract_paid_work_order_id(message, expected):
+ assert extract_paid_work_order_id(message) == expected
+
+
+@pytest.mark.parametrize(
+ ("message", "expected"),
+ [
+ (
+ "@lipleowallet0622183538bot how much revenue does MetaDAO make today?",
+ "how much revenue does MetaDAO make today?",
+ ),
+ (
+ "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no",
+ "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no",
+ ),
+ ("Can you find current sources on x402 usage?", "Can you find current sources on x402 usage?"),
+ (
+ "what is the latest trend of internet finance on Twitter",
+ "what is the latest trend of internet finance on Twitter",
+ ),
+ (
+ "what are your thoughts on how metadao managed the ranger finance situation",
+ "what are your thoughts on how metadao managed the ranger finance situation",
+ ),
+ (
+ "who was at fault for Ranger according to Twitter?",
+ "who was at fault for Ranger according to Twitter?",
+ ),
+ (
+ "what is MetaDAO's position on Ranger Finance?",
+ "what is MetaDAO's position on Ranger Finance?",
+ ),
+ (
+ "how did Jupiter handle the outage situation?",
+ "how did Jupiter handle the outage situation?",
+ ),
+ (
+ "assess whether the protocol valuation is fair versus fintech peers",
+ "assess whether the protocol valuation is fair versus fintech peers",
+ ),
+ (
+ "compare growth metrics for these products against web2 companies",
+ "compare growth metrics for these products against web2 companies",
+ ),
+ ("thanks, that makes sense", None),
+ ("what are your thoughts on lunch", None),
+ ("how did you sleep", None),
+ ("/paid_research find x402 evidence", None),
+ ],
+)
+def test_extract_auto_smart_research_goal(message, expected):
+ assert extract_auto_smart_research_goal(
+ message,
+ mention_aliases=["@lipleowallet0622183538bot", "@leo"],
+ ) == expected
+
+
+def test_extract_auto_smart_research_followup_goal_uses_previous_market_question():
+ previous = "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no"
+ goal = extract_auto_smart_research_followup_goal("check it yourself", previous)
+
+ assert previous in goal
+ assert "Use current public sources" in goal
+ assert "do not provide personalized financial advice" in goal
+
+
+def test_extract_auto_smart_research_followup_goal_ignores_context_without_research_intent():
+ assert extract_auto_smart_research_followup_goal("check it yourself", "thanks, that makes sense") is None
+
+
+def test_market_data_token_extraction_maps_natural_market_question():
+ message = "what is the volume and fdv of omnipair avici umbra? should i buy them yes or no"
+
+ assert extract_market_data_tokens(message) == ["OMFG", "AVICI", "UMBRA"]
+
+
+def test_market_data_context_formats_dexscreener_fdv_volume_liquidity():
+ context = format_price_context(
+ {
+ "provider": "dexscreener",
+ "result": (
+ "Live market data for OMFG | source: DexScreener | price: $0.10 | "
+ "FDV: $1.20M | 24h volume: $52.00K | liquidity: $101.00K"
+ ),
+ },
+ "OMFG",
+ )
+
+ assert "FDV: $1.20M" in context
+ assert "24h volume: $52.00K" in context
+
+
def test_smart_research_command_names_are_safe_for_telegram_handlers():
assert smart_research_command_names(
[
@@ -139,7 +274,6 @@ def test_smart_research_command_names_are_safe_for_telegram_handlers():
({"decision": {"reply": "analysis route reply"}}, "analysis route reply"),
({"llm": {"decision": {"reply": "nested decision reply"}}}, "nested decision reply"),
({"synthesis": {"decision": {"reply": "smart research reply"}}}, "smart research reply"),
- ({"strongestClaimAllowed": "fail-closed smart research readback"}, "fail-closed smart research readback"),
],
)
def test_extract_chat_proxy_reply_accepts_retained_leo_shapes(payload, expected):
@@ -148,3 +282,32 @@ def test_extract_chat_proxy_reply_accepts_retained_leo_shapes(payload, expected)
def test_extract_chat_proxy_reply_fails_closed_on_missing_reply():
assert extract_chat_proxy_reply({"schema": "livingip.x402.leoChatResponse.v1"}) is None
+
+
+def test_extract_chat_proxy_reply_never_displays_operator_claim_fields():
+ payload = {
+ "ok": False,
+ "status": "payment_authorization_required",
+ "strongestClaimAllowed": (
+ "Leo smart research can select sponsor-agent-research on Devnet, "
+ "but did not attempt payment because the call was not fully authorized."
+ ),
+ "exactBlocker": "smart_research_paid_execution_not_allowed",
+ "nextExactAction": "POST again with allow_paid_execution=true.",
+ }
+
+ assert extract_chat_proxy_reply(payload) is None
+
+
+def test_extract_chat_proxy_reply_uses_clean_route_reply_for_tool_failures():
+ payload = {
+ "ok": False,
+ "status": "payment_authorization_required",
+ "reply": "I tried to use my paid research tool, but it was not available. No funds moved.",
+ "strongestClaimAllowed": "Internal status text that must stay out of Telegram.",
+ }
+
+ assert (
+ extract_chat_proxy_reply(payload)
+ == "I tried to use my paid research tool, but it was not available. No funds moved."
+ )