User says "@FutAIrdBot /research P2P.me launch" → bot searches X via twitterapi.io → archives all tweets as ONE consolidated source file in inbox/queue/ → batch extract picks up → claims land in KB. Features (Ganymede+Rhea+Leo+Rio consensus): - Regex + natural language intent detection (not CommandHandler) - One source file per research query (not per-tweet) - Full tweet metadata: author, followers, engagement, date - Contributor attribution: proposed_by + contribution_type: research-direction - Rate limit: 3 searches per user per day - Min engagement filter (3 interactions) - Worktree lock on source file write Phase 2 (not built): domain alignment check before searching. Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
159 lines
4.9 KiB
Python
159 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
|
"""X (Twitter) search client for user-triggered research.
|
|
|
|
Searches X via twitterapi.io, filters for relevance, returns structured tweet data.
|
|
Used by the Telegram bot's /research command.
|
|
|
|
Epimetheus owns this module.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger("x-search")
|
|
|
|
API_URL = "https://api.twitterapi.io/twitter/tweet/advanced_search"
|
|
API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key"
|
|
|
|
# Rate limiting: 3 research queries per user per day
|
|
_research_usage: dict[int, list[float]] = {} # user_id → [timestamps]
|
|
MAX_RESEARCH_PER_DAY = 3
|
|
|
|
|
|
def _load_api_key() -> str | None:
|
|
try:
|
|
return Path(API_KEY_FILE).read_text().strip()
|
|
except Exception:
|
|
logger.warning("Twitter API key not found at %s", API_KEY_FILE)
|
|
return None
|
|
|
|
|
|
def check_research_rate_limit(user_id: int) -> bool:
|
|
"""Check if user has research requests remaining. Returns True if allowed."""
|
|
now = time.time()
|
|
times = _research_usage.get(user_id, [])
|
|
# Prune entries older than 24h
|
|
times = [t for t in times if now - t < 86400]
|
|
_research_usage[user_id] = times
|
|
return len(times) < MAX_RESEARCH_PER_DAY
|
|
|
|
|
|
def record_research_usage(user_id: int):
|
|
"""Record a research request for rate limiting."""
|
|
_research_usage.setdefault(user_id, []).append(time.time())
|
|
|
|
|
|
def get_research_remaining(user_id: int) -> int:
|
|
"""Get remaining research requests for today."""
|
|
now = time.time()
|
|
times = [t for t in _research_usage.get(user_id, []) if now - t < 86400]
|
|
return max(0, MAX_RESEARCH_PER_DAY - len(times))
|
|
|
|
|
|
async def search_x(query: str, max_results: int = 20, min_engagement: int = 3) -> list[dict]:
|
|
"""Search X for tweets matching query. Returns structured tweet data.
|
|
|
|
Filters: recent tweets, min engagement threshold, skip pure retweets.
|
|
"""
|
|
key = _load_api_key()
|
|
if not key:
|
|
return []
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
API_URL,
|
|
params={"query": query, "queryType": "Latest"},
|
|
headers={"X-API-Key": key},
|
|
timeout=aiohttp.ClientTimeout(total=15),
|
|
) as resp:
|
|
if resp.status >= 400:
|
|
logger.warning("X search API → %d for query: %s", resp.status, query)
|
|
return []
|
|
data = await resp.json()
|
|
tweets = data.get("tweets", [])
|
|
except Exception as e:
|
|
logger.warning("X search error: %s", e)
|
|
return []
|
|
|
|
# Filter and structure results
|
|
results = []
|
|
for tweet in tweets[:max_results * 2]: # Fetch more, filter down
|
|
text = tweet.get("text", "")
|
|
author = tweet.get("author", {})
|
|
|
|
# Skip pure retweets (no original text)
|
|
if text.startswith("RT @"):
|
|
continue
|
|
|
|
# Engagement filter
|
|
likes = tweet.get("likeCount", 0) or 0
|
|
retweets = tweet.get("retweetCount", 0) or 0
|
|
replies = tweet.get("replyCount", 0) or 0
|
|
engagement = likes + retweets + replies
|
|
|
|
if engagement < min_engagement:
|
|
continue
|
|
|
|
results.append({
|
|
"text": text,
|
|
"url": tweet.get("twitterUrl", tweet.get("url", "")),
|
|
"author": author.get("userName", "unknown"),
|
|
"author_name": author.get("name", ""),
|
|
"author_followers": author.get("followers", 0),
|
|
"engagement": engagement,
|
|
"likes": likes,
|
|
"retweets": retweets,
|
|
"replies": replies,
|
|
"tweet_date": tweet.get("createdAt", ""),
|
|
"is_reply": bool(tweet.get("inReplyToId")),
|
|
})
|
|
|
|
if len(results) >= max_results:
|
|
break
|
|
|
|
# Sort by engagement (highest first)
|
|
results.sort(key=lambda t: t["engagement"], reverse=True)
|
|
return results
|
|
|
|
|
|
def format_tweet_as_source(tweet: dict, query: str, submitted_by: str) -> str:
|
|
"""Format a tweet as a source file for inbox/queue/."""
|
|
import re
|
|
from datetime import date
|
|
|
|
slug = re.sub(r"[^a-z0-9]+", "-", tweet["text"][:50].lower()).strip("-")
|
|
author = tweet["author"]
|
|
|
|
return f"""---
|
|
type: source
|
|
source_type: x-post
|
|
title: "X post by @{author}: {tweet['text'][:80].replace('"', "'")}"
|
|
url: "{tweet['url']}"
|
|
author: "@{author}"
|
|
date: {date.today().isoformat()}
|
|
domain: internet-finance
|
|
format: social-media
|
|
status: unprocessed
|
|
proposed_by: "{submitted_by}"
|
|
contribution_type: research-direction
|
|
research_query: "{query.replace('"', "'")}"
|
|
tweet_author: "@{author}"
|
|
tweet_author_followers: {tweet.get('author_followers', 0)}
|
|
tweet_engagement: {tweet.get('engagement', 0)}
|
|
tweet_date: "{tweet.get('tweet_date', '')}"
|
|
tags: [x-research, telegram-research]
|
|
---
|
|
|
|
## Tweet by @{author}
|
|
|
|
{tweet['text']}
|
|
|
|
---
|
|
|
|
Engagement: {tweet.get('likes', 0)} likes, {tweet.get('retweets', 0)} retweets, {tweet.get('replies', 0)} replies
|
|
Author followers: {tweet.get('author_followers', 0)}
|
|
"""
|