* Wire Leo Telegram x402 smart research * Suppress token-bearing Telegram HTTP logs * Keep Telegram typing visible during Leo proxy calls * Allow Leo Telegram social research spend cap * Route contextual Leo research prompts to smart research * Generalize Leo smart research intent routing * Resume Leo smart research from paid work orders
313 lines
9.7 KiB
Python
313 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Market data API client for live token prices.
|
|
|
|
Calls Ben's teleo-ai-api endpoint for ownership coin prices.
|
|
Used by the Telegram bot to give Rio real-time market context.
|
|
|
|
Epimetheus owns this module. Rhea: static API key pattern.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger("market-data")
|
|
|
|
API_URL = "https://teleo-ai-api-257133920458.us-east4.run.app/v0/chat/tool/market-data"
|
|
API_KEY_FILE = "/opt/teleo-eval/secrets/market-data-key"
|
|
DEXSCREENER_SEARCH_URL = "https://api.dexscreener.com/latest/dex/search"
|
|
|
|
ENTITY_TOKEN_MAP = {
|
|
"omnipair": "OMFG",
|
|
"omfg": "OMFG",
|
|
"avici": "AVICI",
|
|
"umbra": "UMBRA",
|
|
"metadao": "META",
|
|
"sanctum": "CLOUD",
|
|
"drift": "DRIFT",
|
|
"ore": "ORE",
|
|
"jupiter": "JUP",
|
|
}
|
|
|
|
_BARE_TICKER_STOPWORDS = {
|
|
"FDV",
|
|
"TVL",
|
|
"API",
|
|
"USD",
|
|
"USDC",
|
|
"SOL",
|
|
"YES",
|
|
"NO",
|
|
"BUY",
|
|
"SELL",
|
|
}
|
|
|
|
# Cache: avoid hitting the API on every message
|
|
_cache: dict[str, dict] = {} # token_name → {data, timestamp}
|
|
CACHE_TTL = 300 # 5 minutes
|
|
|
|
|
|
def extract_market_data_tokens(text: str, extra_terms: list[str] | tuple[str, ...] = ()) -> list[str]:
|
|
"""Extract token tickers from market-data questions while preserving order."""
|
|
seen: set[str] = set()
|
|
tokens: list[str] = []
|
|
|
|
def add(token: str | None) -> None:
|
|
if not token:
|
|
return
|
|
normalized = token.upper().strip("$")
|
|
if len(normalized) < 2 or normalized in _BARE_TICKER_STOPWORDS or normalized in seen:
|
|
return
|
|
seen.add(normalized)
|
|
tokens.append(normalized)
|
|
|
|
for ticker in re.findall(r"\$([A-Za-z][A-Za-z0-9]{1,9})\b", text):
|
|
add(ticker)
|
|
|
|
marketish = re.search(
|
|
r"\b(price|volume|fdv|fully\s+diluted|market\s+cap|mcap|liquidity|buy|sell|token|coin)\b",
|
|
text,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
if marketish:
|
|
for ticker in re.findall(r"\b([A-Z][A-Z0-9]{1,9})\b", text):
|
|
add(ticker)
|
|
|
|
lowered = text.lower()
|
|
for name, ticker in ENTITY_TOKEN_MAP.items():
|
|
if re.search(rf"\b{re.escape(name)}\b", lowered):
|
|
add(ticker)
|
|
|
|
for term in extra_terms:
|
|
term_upper = str(term).upper().strip("$")
|
|
if term_upper in ENTITY_TOKEN_MAP.values():
|
|
add(term_upper)
|
|
|
|
return tokens
|
|
|
|
|
|
def _format_usd(value) -> str | None:
|
|
try:
|
|
number = float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if number >= 1_000_000_000:
|
|
return f"${number / 1_000_000_000:.2f}B"
|
|
if number >= 1_000_000:
|
|
return f"${number / 1_000_000:.2f}M"
|
|
if number >= 1_000:
|
|
return f"${number / 1_000:.2f}K"
|
|
return f"${number:,.2f}"
|
|
|
|
|
|
def _needs_public_market_augmentation(data: dict) -> bool:
|
|
result = str(data.get("result") or "").lower()
|
|
if not result:
|
|
return True
|
|
return "fdv" not in result or "volume" not in result
|
|
|
|
|
|
def _merge_market_data(primary: dict, public: dict) -> dict:
|
|
merged = dict(primary)
|
|
primary_result = str(primary.get("result") or "").strip()
|
|
public_result = str(public.get("result") or "").strip()
|
|
if primary_result and public_result:
|
|
merged["result"] = f"{primary_result}\n{public_result}"
|
|
elif public_result:
|
|
merged["result"] = public_result
|
|
merged["public_market_data"] = {
|
|
k: v for k, v in public.items() if k != "pair"
|
|
}
|
|
merged["public_market_pair"] = public.get("pair")
|
|
return merged
|
|
|
|
|
|
def _dex_pair_score(pair: dict, token: str) -> tuple[int, float]:
|
|
token_lower = token.lower()
|
|
base = pair.get("baseToken") or {}
|
|
symbol = str(base.get("symbol") or "").lower()
|
|
name = str(base.get("name") or "").lower()
|
|
score = 0
|
|
if symbol == token_lower:
|
|
score += 100
|
|
elif token_lower in symbol:
|
|
score += 50
|
|
if token_lower in name:
|
|
score += 25
|
|
liquidity = ((pair.get("liquidity") or {}).get("usd") or 0)
|
|
try:
|
|
liquidity_value = float(liquidity)
|
|
except (TypeError, ValueError):
|
|
liquidity_value = 0
|
|
return score, liquidity_value
|
|
|
|
|
|
async def _get_dexscreener_token_data(token_name: str) -> dict | None:
|
|
token_upper = token_name.upper().strip("$")
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
DEXSCREENER_SEARCH_URL,
|
|
params={"q": token_upper},
|
|
timeout=aiohttp.ClientTimeout(total=10),
|
|
) as resp:
|
|
if resp.status >= 400:
|
|
logger.warning("DexScreener %s -> %d", token_upper, resp.status)
|
|
return None
|
|
body = await resp.json()
|
|
except Exception as e:
|
|
logger.warning("DexScreener error for %s: %s", token_upper, e)
|
|
return None
|
|
|
|
pairs = body.get("pairs") or []
|
|
if not pairs:
|
|
return None
|
|
|
|
best = max(pairs, key=lambda pair: _dex_pair_score(pair, token_upper))
|
|
score, _ = _dex_pair_score(best, token_upper)
|
|
if score <= 0:
|
|
return None
|
|
|
|
volume_24h = (best.get("volume") or {}).get("h24")
|
|
liquidity_usd = (best.get("liquidity") or {}).get("usd")
|
|
price_change_24h = (best.get("priceChange") or {}).get("h24")
|
|
base = best.get("baseToken") or {}
|
|
fdv = best.get("fdv")
|
|
market_cap = best.get("marketCap")
|
|
price = best.get("priceUsd")
|
|
|
|
parts = [
|
|
f"Live market data for {token_upper}",
|
|
f"source: DexScreener",
|
|
f"pair: {base.get('symbol') or token_upper} on {best.get('chainId', 'unknown')}/{best.get('dexId', 'unknown')}",
|
|
]
|
|
if price:
|
|
parts.append(f"price: ${price}")
|
|
formatted_fdv = _format_usd(fdv)
|
|
if formatted_fdv:
|
|
parts.append(f"FDV: {formatted_fdv}")
|
|
formatted_mcap = _format_usd(market_cap)
|
|
if formatted_mcap:
|
|
parts.append(f"market cap: {formatted_mcap}")
|
|
formatted_volume = _format_usd(volume_24h)
|
|
if formatted_volume:
|
|
parts.append(f"24h volume: {formatted_volume}")
|
|
formatted_liquidity = _format_usd(liquidity_usd)
|
|
if formatted_liquidity:
|
|
parts.append(f"liquidity: {formatted_liquidity}")
|
|
if price_change_24h is not None:
|
|
parts.append(f"24h change: {price_change_24h}%")
|
|
if best.get("url"):
|
|
parts.append(f"url: {best['url']}")
|
|
|
|
return {
|
|
"provider": "dexscreener",
|
|
"token": token_upper,
|
|
"result": " | ".join(parts),
|
|
"price": price,
|
|
"fdv": fdv,
|
|
"market_cap": market_cap,
|
|
"volume_24h": volume_24h,
|
|
"liquidity_usd": liquidity_usd,
|
|
"price_change_24h": price_change_24h,
|
|
"pair": best,
|
|
}
|
|
|
|
|
|
def _load_api_key() -> str | None:
|
|
"""Load the market-data API key from secrets."""
|
|
try:
|
|
return Path(API_KEY_FILE).read_text().strip()
|
|
except Exception:
|
|
logger.warning("Market data API key not found at %s", API_KEY_FILE)
|
|
return None
|
|
|
|
|
|
async def get_token_price(token_name: str) -> dict | None:
|
|
"""Fetch live market data for a token.
|
|
|
|
Returns dict with price, market_cap, volume, etc. or None on failure.
|
|
Caches results for CACHE_TTL seconds.
|
|
"""
|
|
import time
|
|
|
|
token_upper = token_name.upper().strip("$")
|
|
|
|
# Check cache
|
|
cached = _cache.get(token_upper)
|
|
if cached and time.time() - cached["timestamp"] < CACHE_TTL:
|
|
return cached["data"]
|
|
|
|
key = _load_api_key()
|
|
|
|
if key:
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
API_URL,
|
|
headers={
|
|
"X-Internal-Key": key,
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={"token": token_upper},
|
|
timeout=aiohttp.ClientTimeout(total=10),
|
|
) as resp:
|
|
if resp.status < 400:
|
|
data = await resp.json()
|
|
if _needs_public_market_augmentation(data):
|
|
public_data = await _get_dexscreener_token_data(token_upper)
|
|
if public_data:
|
|
data = _merge_market_data(data, public_data)
|
|
_cache[token_upper] = {
|
|
"data": data,
|
|
"timestamp": time.time(),
|
|
}
|
|
return data
|
|
logger.warning("Market data API %s -> %d", token_upper, resp.status)
|
|
except Exception as e:
|
|
logger.warning("Market data API error for %s: %s", token_upper, e)
|
|
|
|
data = await _get_dexscreener_token_data(token_upper)
|
|
if data:
|
|
_cache[token_upper] = {
|
|
"data": data,
|
|
"timestamp": time.time(),
|
|
}
|
|
return data
|
|
|
|
|
|
def format_price_context(data: dict, token_name: str) -> str:
|
|
"""Format market data into a concise string for the LLM prompt."""
|
|
if not data:
|
|
return ""
|
|
|
|
# API returns a "result" text field with pre-formatted data
|
|
result_text = data.get("result", "")
|
|
if result_text:
|
|
return result_text
|
|
|
|
# Fallback for structured JSON responses
|
|
parts = [f"Live market data for {token_name}:"]
|
|
|
|
price = data.get("price") or data.get("current_price")
|
|
if price:
|
|
parts.append(f"Price: ${price}")
|
|
|
|
mcap = data.get("market_cap") or data.get("marketCap")
|
|
if mcap:
|
|
if isinstance(mcap, (int, float)) and mcap > 1_000_000:
|
|
parts.append(f"Market cap: ${mcap/1_000_000:.1f}M")
|
|
else:
|
|
parts.append(f"Market cap: {mcap}")
|
|
|
|
volume = data.get("volume") or data.get("volume_24h")
|
|
if volume:
|
|
parts.append(f"24h volume: ${volume}")
|
|
|
|
change = data.get("price_change_24h") or data.get("change_24h")
|
|
if change:
|
|
parts.append(f"24h change: {change}")
|
|
|
|
return " | ".join(parts) if len(parts) > 1 else ""
|