#!/usr/bin/env python3 """X (Twitter) search client for user-triggered research. Searches X via twitterapi.io, filters for relevance, returns structured tweet data. Used by the Telegram bot's /research command. Epimetheus owns this module. """ import logging import time from pathlib import Path import aiohttp logger = logging.getLogger("x-search") API_URL = "https://api.twitterapi.io/twitter/tweet/advanced_search" API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key" # Rate limiting: 3 research queries per user per day _research_usage: dict[int, list[float]] = {} # user_id → [timestamps] MAX_RESEARCH_PER_DAY = 3 def _load_api_key() -> str | None: try: return Path(API_KEY_FILE).read_text().strip() except Exception: logger.warning("Twitter API key not found at %s", API_KEY_FILE) return None def check_research_rate_limit(user_id: int) -> bool: """Check if user has research requests remaining. Returns True if allowed.""" now = time.time() times = _research_usage.get(user_id, []) # Prune entries older than 24h times = [t for t in times if now - t < 86400] _research_usage[user_id] = times return len(times) < MAX_RESEARCH_PER_DAY def record_research_usage(user_id: int): """Record a research request for rate limiting.""" _research_usage.setdefault(user_id, []).append(time.time()) def get_research_remaining(user_id: int) -> int: """Get remaining research requests for today.""" now = time.time() times = [t for t in _research_usage.get(user_id, []) if now - t < 86400] return max(0, MAX_RESEARCH_PER_DAY - len(times)) async def search_x(query: str, max_results: int = 20, min_engagement: int = 3) -> list[dict]: """Search X for tweets matching query. Returns structured tweet data. Filters: recent tweets, min engagement threshold, skip pure retweets. """ key = _load_api_key() if not key: return [] try: async with aiohttp.ClientSession() as session: async with session.get( API_URL, params={"query": query, "queryType": "Latest"}, headers={"X-API-Key": key}, timeout=aiohttp.ClientTimeout(total=15), ) as resp: if resp.status >= 400: logger.warning("X search API → %d for query: %s", resp.status, query) return [] data = await resp.json() tweets = data.get("tweets", []) except Exception as e: logger.warning("X search error: %s", e) return [] # Filter and structure results results = [] for tweet in tweets[:max_results * 2]: # Fetch more, filter down text = tweet.get("text", "") author = tweet.get("author", {}) # Skip pure retweets (no original text) if text.startswith("RT @"): continue # Engagement filter likes = tweet.get("likeCount", 0) or 0 retweets = tweet.get("retweetCount", 0) or 0 replies = tweet.get("replyCount", 0) or 0 engagement = likes + retweets + replies if engagement < min_engagement: continue results.append({ "text": text, "url": tweet.get("twitterUrl", tweet.get("url", "")), "author": author.get("userName", "unknown"), "author_name": author.get("name", ""), "author_followers": author.get("followers", 0), "engagement": engagement, "likes": likes, "retweets": retweets, "replies": replies, "tweet_date": tweet.get("createdAt", ""), "is_reply": bool(tweet.get("inReplyToId")), }) if len(results) >= max_results: break # Sort by engagement (highest first) results.sort(key=lambda t: t["engagement"], reverse=True) return results def format_tweet_as_source(tweet: dict, query: str, submitted_by: str) -> str: """Format a tweet as a source file for inbox/queue/.""" import re from datetime import date slug = re.sub(r"[^a-z0-9]+", "-", tweet["text"][:50].lower()).strip("-") author = tweet["author"] return f"""--- type: source source_type: x-post title: "X post by @{author}: {tweet['text'][:80].replace('"', "'")}" url: "{tweet['url']}" author: "@{author}" date: {date.today().isoformat()} domain: internet-finance format: social-media status: unprocessed proposed_by: "{submitted_by}" contribution_type: research-direction research_query: "{query.replace('"', "'")}" tweet_author: "@{author}" tweet_author_followers: {tweet.get('author_followers', 0)} tweet_engagement: {tweet.get('engagement', 0)} tweet_date: "{tweet.get('tweet_date', '')}" tags: [x-research, telegram-research] --- ## Tweet by @{author} {tweet['text']} --- Engagement: {tweet.get('likes', 0)} likes, {tweet.get('retweets', 0)} retweets, {tweet.get('replies', 0)} replies Author followers: {tweet.get('author_followers', 0)} """ async def fetch_tweet_by_url(url: str) -> dict | None: """Fetch a specific tweet/article by X URL. Extracts username and tweet ID, searches via advanced_search (tweet/detail doesn't work with this API provider). """ import re as _re # Extract username and tweet ID from URL match = _re.search(r'(?:twitter\.com|x\.com)/(\w+)/status/(\d+)', url) if not match: return None username = match.group(1) tweet_id = match.group(2) key = _load_api_key() if not key: return None try: async with aiohttp.ClientSession() as session: # Primary: direct tweet lookup by ID (works for any tweet, any age) async with session.get( "https://api.twitterapi.io/twitter/tweets", params={"tweet_ids": tweet_id}, headers={"X-API-Key": key}, timeout=aiohttp.ClientTimeout(total=10), ) as resp: if resp.status == 200: data = await resp.json() tweets = data.get("tweets", []) if tweets: tweet = tweets[0] author_data = tweet.get("author", {}) return { "text": tweet.get("text", ""), "url": url, "author": author_data.get("userName", username), "author_name": author_data.get("name", ""), "author_followers": author_data.get("followers", 0), "engagement": (tweet.get("likeCount", 0) or 0) + (tweet.get("retweetCount", 0) or 0), "likes": tweet.get("likeCount", 0), "retweets": tweet.get("retweetCount", 0), "views": tweet.get("viewCount", 0), "tweet_date": tweet.get("createdAt", ""), "is_article": False, } # Fallback: try article endpoint (for X long-form articles) async with session.get( "https://api.twitterapi.io/twitter/article", params={"tweet_id": tweet_id}, headers={"X-API-Key": key}, timeout=aiohttp.ClientTimeout(total=10), ) as resp: if resp.status == 200: data = await resp.json() article = data.get("article") if article: return { "text": article.get("text", article.get("content", "")), "url": url, "author": username, "author_name": article.get("author", {}).get("name", ""), "author_followers": article.get("author", {}).get("followers", 0), "engagement": 0, "tweet_date": article.get("createdAt", ""), "is_article": True, "title": article.get("title", ""), } # Both failed — return placeholder (Ganymede: surface failure) return { "text": f"[Could not fetch tweet content from @{username}]", "url": url, "author": username, "author_name": "", "author_followers": 0, "engagement": 0, "tweet_date": "", "is_article": False, } except Exception as e: logger.warning("Tweet fetch error for %s: %s", url, e) return None