From bcbe54a0a359efbb84c38d25e5278d0cc9b96529 Mon Sep 17 00:00:00 2001 From: m3taversal Date: Mon, 23 Mar 2026 15:26:10 +0000 Subject: [PATCH] epimetheus: consolidated X API client (x_client.py replaces x_search.py) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clean, documented interface to twitterapi.io for all agents: - get_tweet(id) — fetch any tweet by ID, any age - get_article(id) — fetch X long-form articles - search_tweets(query) — keyword search for research - get_user_tweets(username) — user's recent tweets (research sessions) - fetch_from_url(url) — smart dispatcher: tweet → article → placeholder Shared by Telegram bot + research sessions. Documented endpoints, costs, rate limits. Replaces ad-hoc x_search.py. Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70> --- telegram/bot.py | 16 +-- telegram/x_client.py | 317 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 325 insertions(+), 8 deletions(-) create mode 100644 telegram/x_client.py diff --git a/telegram/bot.py b/telegram/bot.py index 141e7b7..5a4a2eb 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -44,7 +44,7 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context from market_data import get_token_price, format_price_context from worktree_lock import main_worktree_lock -from x_search import search_x, format_tweet_as_source, check_research_rate_limit, record_research_usage, get_research_remaining +from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining # ─── Config ───────────────────────────────────────────────────────────── @@ -288,7 +288,7 @@ async def handle_research(msg, query: str, user): await msg.chat.send_action("typing") logger.info("Research: searching X for '%s'", query) - tweets = await search_x(query, max_results=15, min_engagement=0) + tweets = await search_tweets(query, max_results=15, min_engagement=0) logger.info("Research: got %d tweets for '%s'", len(tweets), query) if not tweets: await msg.reply_text(f"No recent tweets found for '{query}'.") @@ -452,9 +452,9 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): if research_match: query = research_match.group(1).strip() logger.info("Research: searching X for '%s'", query) - from x_search import search_x, format_tweet_as_source, check_research_rate_limit, record_research_usage + from x_client import search_tweets, check_research_rate_limit, record_research_usage if check_research_rate_limit(user.id if user else 0): - tweets = await search_x(query, max_results=10, min_engagement=0) + tweets = await search_tweets(query, max_results=10, min_engagement=0) logger.info("Research: got %d tweets for '%s'", len(tweets), query) if tweets: # Archive as source file (staging dir) @@ -488,10 +488,10 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): x_link_context = "" x_urls = re.findall(r'https?://(?:twitter\.com|x\.com)/\w+/status/\d+', text) if x_urls: - from x_search import fetch_tweet_by_url + from x_client import fetch_from_url for url in x_urls[:3]: # Cap at 3 links try: - tweet_data = await fetch_tweet_by_url(url) + tweet_data = await fetch_from_url(url) if tweet_data: x_link_context += f"\n## Linked Tweet by @{tweet_data['author']}\n" if tweet_data.get("title"): @@ -520,9 +520,9 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): if haiku_result and haiku_result.strip().upper().startswith("YES:"): search_query = haiku_result.strip()[4:].strip() logger.info("Haiku pre-pass: research needed — '%s'", search_query) - from x_search import search_x, check_research_rate_limit, record_research_usage + from x_client import search_tweets, check_research_rate_limit, record_research_usage if check_research_rate_limit(user.id if user else 0): - tweets = await search_x(search_query, max_results=10, min_engagement=0) + tweets = await search_tweets(search_query, max_results=10, min_engagement=0) logger.info("Haiku research: got %d tweets", len(tweets)) if tweets: research_context = f"\n## Fresh X Research Results for '{search_query}'\n" diff --git a/telegram/x_client.py b/telegram/x_client.py new file mode 100644 index 0000000..6051c2a --- /dev/null +++ b/telegram/x_client.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +"""X (Twitter) API client for Teleo agents. + +Consolidated interface to twitterapi.io. Used by: +- Telegram bot (research, tweet fetching, link analysis) +- Research sessions (network monitoring, source discovery) +- Any agent that needs X data + +Epimetheus owns this module. + +## Available Endpoints (twitterapi.io) + +| Endpoint | What it does | When to use | +|----------|-------------|-------------| +| GET /tweets?tweet_ids={id} | Fetch specific tweet(s) by ID | User drops a link, need full content | +| GET /article?tweet_id={id} | Fetch X long-form article | User drops an article link | +| GET /tweet/advanced_search?query={q} | Search tweets by keyword | /research command, topic discovery | +| GET /user/last_tweets?userName={u} | Get user's recent tweets | Network monitoring, agent research | + +## Cost + +All endpoints use the X-API-Key header. Pricing is per-request via twitterapi.io. +Rate limits depend on plan tier. Key at /opt/teleo-eval/secrets/twitterapi-io-key. + +## Rate Limiting + +Research searches: 3 per user per day (explicit /research). +Haiku autonomous searches: uncapped (don't burn user budget). +Tweet fetches (URL lookups): uncapped (cheap, single tweet). +""" + +import logging +import re +import time +from pathlib import Path +from typing import Optional + +import aiohttp + +logger = logging.getLogger("x-client") + +# ─── Config ────────────────────────────────────────────────────────────── + +BASE_URL = "https://api.twitterapi.io/twitter" +API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key" +REQUEST_TIMEOUT = 15 # seconds + +# Rate limiting for user-triggered research +_research_usage: dict[int, list[float]] = {} +MAX_RESEARCH_PER_DAY = 3 + + +# ─── API Key ───────────────────────────────────────────────────────────── + +def _load_api_key() -> Optional[str]: + """Load the twitterapi.io API key from secrets.""" + try: + return Path(API_KEY_FILE).read_text().strip() + except Exception: + logger.warning("X API key not found at %s", API_KEY_FILE) + return None + + +def _headers() -> dict: + """Build request headers with API key.""" + key = _load_api_key() + if not key: + return {} + return {"X-API-Key": key} + + +# ─── Rate Limiting ─────────────────────────────────────────────────────── + +def check_research_rate_limit(user_id: int) -> bool: + """Check if user has research requests remaining. Returns True if allowed.""" + now = time.time() + times = _research_usage.get(user_id, []) + times = [t for t in times if now - t < 86400] + _research_usage[user_id] = times + return len(times) < MAX_RESEARCH_PER_DAY + + +def record_research_usage(user_id: int): + """Record an explicit research request against user's daily limit.""" + _research_usage.setdefault(user_id, []).append(time.time()) + + +def get_research_remaining(user_id: int) -> int: + """Get remaining research requests for today.""" + now = time.time() + times = [t for t in _research_usage.get(user_id, []) if now - t < 86400] + return max(0, MAX_RESEARCH_PER_DAY - len(times)) + + +# ─── Core API Functions ────────────────────────────────────────────────── + +async def get_tweet(tweet_id: str) -> Optional[dict]: + """Fetch a single tweet by ID. Works for any tweet, any age. + + Endpoint: GET /tweets?tweet_ids={id} + + Returns structured dict or None on failure. + """ + headers = _headers() + if not headers: + return None + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BASE_URL}/tweets", + params={"tweet_ids": tweet_id}, + headers=headers, + timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), + ) as resp: + if resp.status != 200: + logger.warning("get_tweet(%s) → %d", tweet_id, resp.status) + return None + data = await resp.json() + tweets = data.get("tweets", []) + if not tweets: + return None + return _normalize_tweet(tweets[0]) + except Exception as e: + logger.warning("get_tweet(%s) error: %s", tweet_id, e) + return None + + +async def get_article(tweet_id: str) -> Optional[dict]: + """Fetch an X long-form article by tweet ID. + + Endpoint: GET /article?tweet_id={id} + + Returns structured dict or None if not an article / not found. + """ + headers = _headers() + if not headers: + return None + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BASE_URL}/article", + params={"tweet_id": tweet_id}, + headers=headers, + timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), + ) as resp: + if resp.status != 200: + return None + data = await resp.json() + article = data.get("article") + if not article: + return None + return { + "text": article.get("text", article.get("content", "")), + "title": article.get("title", ""), + "author": article.get("author", {}).get("userName", ""), + "author_name": article.get("author", {}).get("name", ""), + "author_followers": article.get("author", {}).get("followers", 0), + "tweet_date": article.get("createdAt", ""), + "is_article": True, + "engagement": 0, + } + except Exception as e: + logger.warning("get_article(%s) error: %s", tweet_id, e) + return None + + +async def search_tweets(query: str, max_results: int = 20, min_engagement: int = 0) -> list[dict]: + """Search X for tweets matching a query. Returns most recent, sorted by engagement. + + Endpoint: GET /tweet/advanced_search?query={q}&queryType=Latest + + Use short queries (2-3 words). Long queries return nothing. + """ + headers = _headers() + if not headers: + return [] + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BASE_URL}/tweet/advanced_search", + params={"query": query, "queryType": "Latest"}, + headers=headers, + timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), + ) as resp: + if resp.status >= 400: + logger.warning("search_tweets('%s') → %d", query, resp.status) + return [] + data = await resp.json() + raw_tweets = data.get("tweets", []) + except Exception as e: + logger.warning("search_tweets('%s') error: %s", query, e) + return [] + + results = [] + for tweet in raw_tweets[:max_results * 2]: + normalized = _normalize_tweet(tweet) + if not normalized: + continue + if normalized["text"].startswith("RT @"): + continue + if normalized["engagement"] < min_engagement: + continue + results.append(normalized) + if len(results) >= max_results: + break + + results.sort(key=lambda t: t["engagement"], reverse=True) + return results + + +async def get_user_tweets(username: str, max_results: int = 20) -> list[dict]: + """Get a user's most recent tweets. + + Endpoint: GET /user/last_tweets?userName={username} + + Used by research sessions for network monitoring. + """ + headers = _headers() + if not headers: + return [] + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{BASE_URL}/user/last_tweets", + params={"userName": username}, + headers=headers, + timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), + ) as resp: + if resp.status >= 400: + logger.warning("get_user_tweets('%s') → %d", username, resp.status) + return [] + data = await resp.json() + raw_tweets = data.get("tweets", []) + except Exception as e: + logger.warning("get_user_tweets('%s') error: %s", username, e) + return [] + + return [_normalize_tweet(t) for t in raw_tweets[:max_results] if _normalize_tweet(t)] + + +# ─── High-Level Functions ──────────────────────────────────────────────── + +async def fetch_from_url(url: str) -> Optional[dict]: + """Fetch tweet or article content from an X URL. + + Tries tweet lookup first (most common), then article endpoint. + Returns structured dict with text, author, engagement. + Returns placeholder dict (not None) on failure so the caller can tell + the user "couldn't fetch" instead of silently ignoring. + """ + match = re.search(r'(?:twitter\.com|x\.com)/(\w+)/status/(\d+)', url) + if not match: + return None + + username = match.group(1) + tweet_id = match.group(2) + + # Try tweet first (most X URLs are tweets) + result = await get_tweet(tweet_id) + if result: + result["url"] = url + return result + + # Try article (X long-form posts) + result = await get_article(tweet_id) + if result: + result["url"] = url + result["author"] = result.get("author") or username + return result + + # Both failed — return placeholder so caller can surface the failure + return { + "text": f"[Could not fetch content from @{username}]", + "url": url, + "author": username, + "author_name": "", + "author_followers": 0, + "engagement": 0, + "tweet_date": "", + "is_article": False, + } + + +# ─── Internal ──────────────────────────────────────────────────────────── + +def _normalize_tweet(raw: dict) -> Optional[dict]: + """Normalize a raw API tweet into a consistent structure.""" + text = raw.get("text", "") + if not text: + return None + + author = raw.get("author", {}) + likes = raw.get("likeCount", 0) or 0 + retweets = raw.get("retweetCount", 0) or 0 + replies = raw.get("replyCount", 0) or 0 + views = raw.get("viewCount", 0) or 0 + + return { + "id": raw.get("id", ""), + "text": text, + "url": raw.get("twitterUrl", raw.get("url", "")), + "author": author.get("userName", "unknown"), + "author_name": author.get("name", ""), + "author_followers": author.get("followers", 0), + "engagement": likes + retweets + replies, + "likes": likes, + "retweets": retweets, + "replies": replies, + "views": views, + "tweet_date": raw.get("createdAt", ""), + "is_reply": bool(raw.get("inReplyToId")), + "is_article": False, + }