diff --git a/telegram/bot.py b/telegram/bot.py index 0a34dad..7584e93 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -44,6 +44,7 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context from market_data import get_token_price, format_price_context from worktree_lock import main_worktree_lock +from x_search import search_x, format_tweet_as_source, check_research_rate_limit, record_research_usage, get_research_remaining # ─── Config ───────────────────────────────────────────────────────────── @@ -282,6 +283,93 @@ def _format_conversation_history(chat_id: int, user_id: int) -> str: return "\n".join(lines) +# Research intent patterns (Rhea: explicit /research + natural language fallback) +RESEARCH_PATTERN = re.compile(r'/research\s+(.+)', re.IGNORECASE) +RESEARCH_NATURAL = re.compile(r'(?:research|search\s+(?:x\s+)?(?:for\s+)?|look\s+up)\s+(.+)', re.IGNORECASE) + + +async def handle_research(msg, query: str, user): + """Handle a research request — search X and archive results as sources.""" + username = user.username if user else "unknown" + + if not check_research_rate_limit(user.id if user else 0): + remaining = get_research_remaining(user.id if user else 0) + await msg.reply_text(f"Research limit reached (3/day). Resets at midnight UTC. {remaining} remaining.") + return + + await msg.chat.send_action("typing") + + tweets = await search_x(query, max_results=15, min_engagement=3) + if not tweets: + await msg.reply_text(f"No recent tweets found for '{query}'.") + return + + # Archive all tweets as ONE source file per research query + # (not per-tweet — one extraction PR produces claims from the best material) + try: + with main_worktree_lock(timeout=10): + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-") + filename = f"{date_str}-x-research-{slug}.md" + source_path = Path(ARCHIVE_DIR) / "inbox" / "queue" / filename + source_path.parent.mkdir(parents=True, exist_ok=True) + + # Build consolidated source file + tweets_body = "" + for i, tweet in enumerate(tweets, 1): + tweets_body += f"\n### Tweet {i} — @{tweet['author']} ({tweet.get('engagement', 0)} engagement)\n" + tweets_body += f"**URL:** {tweet.get('url', '')}\n" + tweets_body += f"**Followers:** {tweet.get('author_followers', 0)} | " + tweets_body += f"**Likes:** {tweet.get('likes', 0)} | **RT:** {tweet.get('retweets', 0)}\n\n" + tweets_body += f"{tweet['text']}\n" + + source_content = f"""--- +type: source +source_type: x-research +title: "X research: {query}" +url: "" +author: "multiple" +date: {date_str} +domain: internet-finance +format: social-media-collection +status: unprocessed +proposed_by: "@{username}" +contribution_type: research-direction +research_query: "{query.replace('"', "'")}" +tweet_count: {len(tweets)} +tags: [x-research, telegram-research] +--- + +# X Research: {query} + +Submitted by @{username} via Telegram /research command. +{len(tweets)} tweets found, sorted by engagement. + +{tweets_body} +""" + source_path.write_text(source_content) + archived = len(tweets) + + _git_commit_archive(source_path, filename) + except TimeoutError: + logger.warning("Research archive failed: worktree lock timeout") + except Exception as e: + logger.warning("Research archive failed: %s", e) + + record_research_usage(user.id if user else 0) + remaining = get_research_remaining(user.id if user else 0) + + # Summary of what was found + top_authors = list(set(t["author"] for t in tweets[:5])) + await msg.reply_text( + f"Queued {archived} tweets about '{query}' for extraction. " + f"Top voices: @{', @'.join(top_authors[:3])}. " + f"Results will appear in the KB within ~30 minutes. " + f"({remaining} research requests remaining today.)" + ) + logger.info("Research: @%s queried '%s', archived %d tweets", username, query, archived) + + # ─── Message Handlers ─────────────────────────────────────────────────── @@ -358,6 +446,14 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100]) + # Check for explicit /research command (Rhea: fast path before LLM) + research_match = RESEARCH_PATTERN.search(text) or RESEARCH_NATURAL.search(text) + if research_match: + query = research_match.group(1).strip() + # Respond with KB knowledge first, then trigger research async (Ganymede: don't block) + await handle_research(msg, query, user) + # Don't return — continue to normal response so user gets immediate KB answer too + # Send typing indicator await msg.chat.send_action("typing") diff --git a/telegram/x_search.py b/telegram/x_search.py new file mode 100644 index 0000000..2efa246 --- /dev/null +++ b/telegram/x_search.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +"""X (Twitter) search client for user-triggered research. + +Searches X via twitterapi.io, filters for relevance, returns structured tweet data. +Used by the Telegram bot's /research command. + +Epimetheus owns this module. +""" + +import logging +import time +from pathlib import Path + +import aiohttp + +logger = logging.getLogger("x-search") + +API_URL = "https://api.twitterapi.io/twitter/tweet/advanced_search" +API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key" + +# Rate limiting: 3 research queries per user per day +_research_usage: dict[int, list[float]] = {} # user_id → [timestamps] +MAX_RESEARCH_PER_DAY = 3 + + +def _load_api_key() -> str | None: + try: + return Path(API_KEY_FILE).read_text().strip() + except Exception: + logger.warning("Twitter API key not found at %s", API_KEY_FILE) + return None + + +def check_research_rate_limit(user_id: int) -> bool: + """Check if user has research requests remaining. Returns True if allowed.""" + now = time.time() + times = _research_usage.get(user_id, []) + # Prune entries older than 24h + times = [t for t in times if now - t < 86400] + _research_usage[user_id] = times + return len(times) < MAX_RESEARCH_PER_DAY + + +def record_research_usage(user_id: int): + """Record a research request for rate limiting.""" + _research_usage.setdefault(user_id, []).append(time.time()) + + +def get_research_remaining(user_id: int) -> int: + """Get remaining research requests for today.""" + now = time.time() + times = [t for t in _research_usage.get(user_id, []) if now - t < 86400] + return max(0, MAX_RESEARCH_PER_DAY - len(times)) + + +async def search_x(query: str, max_results: int = 20, min_engagement: int = 3) -> list[dict]: + """Search X for tweets matching query. Returns structured tweet data. + + Filters: recent tweets, min engagement threshold, skip pure retweets. + """ + key = _load_api_key() + if not key: + return [] + + try: + async with aiohttp.ClientSession() as session: + async with session.get( + API_URL, + params={"query": query, "queryType": "Latest"}, + headers={"X-API-Key": key}, + timeout=aiohttp.ClientTimeout(total=15), + ) as resp: + if resp.status >= 400: + logger.warning("X search API → %d for query: %s", resp.status, query) + return [] + data = await resp.json() + tweets = data.get("tweets", []) + except Exception as e: + logger.warning("X search error: %s", e) + return [] + + # Filter and structure results + results = [] + for tweet in tweets[:max_results * 2]: # Fetch more, filter down + text = tweet.get("text", "") + author = tweet.get("author", {}) + + # Skip pure retweets (no original text) + if text.startswith("RT @"): + continue + + # Engagement filter + likes = tweet.get("likeCount", 0) or 0 + retweets = tweet.get("retweetCount", 0) or 0 + replies = tweet.get("replyCount", 0) or 0 + engagement = likes + retweets + replies + + if engagement < min_engagement: + continue + + results.append({ + "text": text, + "url": tweet.get("twitterUrl", tweet.get("url", "")), + "author": author.get("userName", "unknown"), + "author_name": author.get("name", ""), + "author_followers": author.get("followers", 0), + "engagement": engagement, + "likes": likes, + "retweets": retweets, + "replies": replies, + "tweet_date": tweet.get("createdAt", ""), + "is_reply": bool(tweet.get("inReplyToId")), + }) + + if len(results) >= max_results: + break + + # Sort by engagement (highest first) + results.sort(key=lambda t: t["engagement"], reverse=True) + return results + + +def format_tweet_as_source(tweet: dict, query: str, submitted_by: str) -> str: + """Format a tweet as a source file for inbox/queue/.""" + import re + from datetime import date + + slug = re.sub(r"[^a-z0-9]+", "-", tweet["text"][:50].lower()).strip("-") + author = tweet["author"] + + return f"""--- +type: source +source_type: x-post +title: "X post by @{author}: {tweet['text'][:80].replace('"', "'")}" +url: "{tweet['url']}" +author: "@{author}" +date: {date.today().isoformat()} +domain: internet-finance +format: social-media +status: unprocessed +proposed_by: "{submitted_by}" +contribution_type: research-direction +research_query: "{query.replace('"', "'")}" +tweet_author: "@{author}" +tweet_author_followers: {tweet.get('author_followers', 0)} +tweet_engagement: {tweet.get('engagement', 0)} +tweet_date: "{tweet.get('tweet_date', '')}" +tags: [x-research, telegram-research] +--- + +## Tweet by @{author} + +{tweet['text']} + +--- + +Engagement: {tweet.get('likes', 0)} likes, {tweet.get('retweets', 0)} retweets, {tweet.get('replies', 0)} replies +Author followers: {tweet.get('author_followers', 0)} +"""