#!/usr/bin/env python3 """X (Twitter) API client for Teleo agents. Consolidated interface to twitterapi.io. Used by: - Telegram bot (research, tweet fetching, link analysis) - Research sessions (network monitoring, source discovery) - Any agent that needs X data Epimetheus owns this module. ## Available Endpoints (twitterapi.io) | Endpoint | What it does | When to use | |----------|-------------|-------------| | GET /tweets?tweet_ids={id} | Fetch specific tweet(s) by ID | User drops a link, need full content | | GET /article?tweet_id={id} | Fetch X long-form article | User drops an article link | | GET /tweet/advanced_search?query={q} | Search tweets by keyword | /research command, topic discovery | | GET /user/last_tweets?userName={u} | Get user's recent tweets | Network monitoring, agent research | ## Cost All endpoints use the X-API-Key header. Pricing is per-request via twitterapi.io. Rate limits depend on plan tier. Key at /opt/teleo-eval/secrets/twitterapi-io-key. ## Rate Limiting Research searches: 3 per user per day (explicit /research). Haiku autonomous searches: uncapped (don't burn user budget). Tweet fetches (URL lookups): uncapped (cheap, single tweet). """ import logging import re import time from pathlib import Path from typing import Optional import aiohttp logger = logging.getLogger("x-client") # ─── Config ────────────────────────────────────────────────────────────── BASE_URL = "https://api.twitterapi.io/twitter" API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key" REQUEST_TIMEOUT = 15 # seconds # Rate limiting for user-triggered research _research_usage: dict[int, list[float]] = {} MAX_RESEARCH_PER_DAY = 3 # ─── API Key ───────────────────────────────────────────────────────────── def _load_api_key() -> Optional[str]: """Load the twitterapi.io API key from secrets.""" try: return Path(API_KEY_FILE).read_text().strip() except Exception: logger.warning("X API key not found at %s", API_KEY_FILE) return None def _headers() -> dict: """Build request headers with API key.""" key = _load_api_key() if not key: return {} return {"X-API-Key": key} # ─── Rate Limiting ─────────────────────────────────────────────────────── def check_research_rate_limit(user_id: int) -> bool: """Check if user has research requests remaining. Returns True if allowed.""" now = time.time() times = _research_usage.get(user_id, []) times = [t for t in times if now - t < 86400] _research_usage[user_id] = times return len(times) < MAX_RESEARCH_PER_DAY def record_research_usage(user_id: int): """Record an explicit research request against user's daily limit.""" _research_usage.setdefault(user_id, []).append(time.time()) def get_research_remaining(user_id: int) -> int: """Get remaining research requests for today.""" now = time.time() times = [t for t in _research_usage.get(user_id, []) if now - t < 86400] return max(0, MAX_RESEARCH_PER_DAY - len(times)) # ─── Core API Functions ────────────────────────────────────────────────── async def get_tweet(tweet_id: str) -> Optional[dict]: """Fetch a single tweet by ID. Works for any tweet, any age. Endpoint: GET /tweets?tweet_ids={id} Returns structured dict or None on failure. """ headers = _headers() if not headers: return None try: async with aiohttp.ClientSession() as session: async with session.get( f"{BASE_URL}/tweets", params={"tweet_ids": tweet_id}, headers=headers, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), ) as resp: if resp.status != 200: logger.warning("get_tweet(%s) → %d", tweet_id, resp.status) return None data = await resp.json() tweets = data.get("tweets", []) if not tweets: return None return _normalize_tweet(tweets[0]) except Exception as e: logger.warning("get_tweet(%s) error: %s", tweet_id, e) return None async def get_article(tweet_id: str) -> Optional[dict]: """Fetch an X long-form article by tweet ID. Endpoint: GET /article?tweet_id={id} Returns structured dict or None if not an article / not found. """ headers = _headers() if not headers: return None try: async with aiohttp.ClientSession() as session: async with session.get( f"{BASE_URL}/article", params={"tweet_id": tweet_id}, headers=headers, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), ) as resp: if resp.status != 200: return None data = await resp.json() article = data.get("article") if not article: return None return { "text": article.get("text", article.get("content", "")), "title": article.get("title", ""), "author": article.get("author", {}).get("userName", ""), "author_name": article.get("author", {}).get("name", ""), "author_followers": article.get("author", {}).get("followers", 0), "tweet_date": article.get("createdAt", ""), "is_article": True, "engagement": 0, } except Exception as e: logger.warning("get_article(%s) error: %s", tweet_id, e) return None async def search_tweets(query: str, max_results: int = 20, min_engagement: int = 0) -> list[dict]: """Search X for tweets matching a query. Returns most recent, sorted by engagement. Endpoint: GET /tweet/advanced_search?query={q}&queryType=Latest Use short queries (2-3 words). Long queries return nothing. """ headers = _headers() if not headers: return [] try: async with aiohttp.ClientSession() as session: async with session.get( f"{BASE_URL}/tweet/advanced_search", params={"query": query, "queryType": "Latest"}, headers=headers, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), ) as resp: if resp.status >= 400: logger.warning("search_tweets('%s') → %d", query, resp.status) return [] data = await resp.json() raw_tweets = data.get("tweets", []) except Exception as e: logger.warning("search_tweets('%s') error: %s", query, e) return [] results = [] for tweet in raw_tweets[:max_results * 2]: normalized = _normalize_tweet(tweet) if not normalized: continue if normalized["text"].startswith("RT @"): continue if normalized["engagement"] < min_engagement: continue results.append(normalized) if len(results) >= max_results: break results.sort(key=lambda t: t["engagement"], reverse=True) return results async def get_user_tweets(username: str, max_results: int = 20) -> list[dict]: """Get a user's most recent tweets. Endpoint: GET /user/last_tweets?userName={username} Used by research sessions for network monitoring. """ headers = _headers() if not headers: return [] try: async with aiohttp.ClientSession() as session: async with session.get( f"{BASE_URL}/user/last_tweets", params={"userName": username}, headers=headers, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), ) as resp: if resp.status >= 400: logger.warning("get_user_tweets('%s') → %d", username, resp.status) return [] data = await resp.json() raw_tweets = data.get("tweets", []) except Exception as e: logger.warning("get_user_tweets('%s') error: %s", username, e) return [] return [_normalize_tweet(t) for t in raw_tweets[:max_results] if _normalize_tweet(t)] # ─── High-Level Functions ──────────────────────────────────────────────── async def fetch_from_url(url: str) -> Optional[dict]: """Fetch tweet or article content from an X URL. Tries tweet lookup first (most common), then article endpoint. Returns structured dict with text, author, engagement. Returns placeholder dict (not None) on failure so the caller can tell the user "couldn't fetch" instead of silently ignoring. """ match = re.search(r'(?:twitter\.com|x\.com)/(\w+)/status/(\d+)', url) if not match: return None username = match.group(1) tweet_id = match.group(2) # Try tweet first (most X URLs are tweets) result = await get_tweet(tweet_id) if result: result["url"] = url return result # Try article (X long-form posts) result = await get_article(tweet_id) if result: result["url"] = url result["author"] = result.get("author") or username return result # Both failed — return placeholder so caller can surface the failure return { "text": f"[Could not fetch content from @{username}]", "url": url, "author": username, "author_name": "", "author_followers": 0, "engagement": 0, "tweet_date": "", "is_article": False, } # ─── Internal ──────────────────────────────────────────────────────────── def _normalize_tweet(raw: dict) -> Optional[dict]: """Normalize a raw API tweet into a consistent structure.""" text = raw.get("text", "") if not text: return None author = raw.get("author", {}) likes = raw.get("likeCount", 0) or 0 retweets = raw.get("retweetCount", 0) or 0 replies = raw.get("replyCount", 0) or 0 views = raw.get("viewCount", 0) or 0 return { "id": raw.get("id", ""), "text": text, "url": raw.get("twitterUrl", raw.get("url", "")), "author": author.get("userName", "unknown"), "author_name": author.get("name", ""), "author_followers": author.get("followers", 0), "engagement": likes + retweets + replies, "likes": likes, "retweets": retweets, "replies": replies, "views": views, "tweet_date": raw.get("createdAt", ""), "is_reply": bool(raw.get("inReplyToId")), "is_article": False, }