epimetheus: /research command — user-triggered X search from Telegram

User says "@FutAIrdBot /research P2P.me launch" → bot searches X via twitterapi.io
→ archives all tweets as ONE consolidated source file in inbox/queue/ → batch extract
picks up → claims land in KB.

Features (Ganymede+Rhea+Leo+Rio consensus):
- Regex + natural language intent detection (not CommandHandler)
- One source file per research query (not per-tweet)
- Full tweet metadata: author, followers, engagement, date
- Contributor attribution: proposed_by + contribution_type: research-direction
- Rate limit: 3 searches per user per day
- Min engagement filter (3 interactions)
- Worktree lock on source file write

Phase 2 (not built): domain alignment check before searching.

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
This commit is contained in:
m3taversal 2026-03-21 16:32:43 +00:00
parent e921eda0a0
commit f7d30ced1a
2 changed files with 255 additions and 0 deletions

View file

@ -44,6 +44,7 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context
from market_data import get_token_price, format_price_context
from worktree_lock import main_worktree_lock
from x_search import search_x, format_tweet_as_source, check_research_rate_limit, record_research_usage, get_research_remaining
# ─── Config ─────────────────────────────────────────────────────────────
@ -282,6 +283,93 @@ def _format_conversation_history(chat_id: int, user_id: int) -> str:
return "\n".join(lines)
# Research intent patterns (Rhea: explicit /research + natural language fallback)
RESEARCH_PATTERN = re.compile(r'/research\s+(.+)', re.IGNORECASE)
RESEARCH_NATURAL = re.compile(r'(?:research|search\s+(?:x\s+)?(?:for\s+)?|look\s+up)\s+(.+)', re.IGNORECASE)
async def handle_research(msg, query: str, user):
"""Handle a research request — search X and archive results as sources."""
username = user.username if user else "unknown"
if not check_research_rate_limit(user.id if user else 0):
remaining = get_research_remaining(user.id if user else 0)
await msg.reply_text(f"Research limit reached (3/day). Resets at midnight UTC. {remaining} remaining.")
return
await msg.chat.send_action("typing")
tweets = await search_x(query, max_results=15, min_engagement=3)
if not tweets:
await msg.reply_text(f"No recent tweets found for '{query}'.")
return
# Archive all tweets as ONE source file per research query
# (not per-tweet — one extraction PR produces claims from the best material)
try:
with main_worktree_lock(timeout=10):
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
filename = f"{date_str}-x-research-{slug}.md"
source_path = Path(ARCHIVE_DIR) / "inbox" / "queue" / filename
source_path.parent.mkdir(parents=True, exist_ok=True)
# Build consolidated source file
tweets_body = ""
for i, tweet in enumerate(tweets, 1):
tweets_body += f"\n### Tweet {i} — @{tweet['author']} ({tweet.get('engagement', 0)} engagement)\n"
tweets_body += f"**URL:** {tweet.get('url', '')}\n"
tweets_body += f"**Followers:** {tweet.get('author_followers', 0)} | "
tweets_body += f"**Likes:** {tweet.get('likes', 0)} | **RT:** {tweet.get('retweets', 0)}\n\n"
tweets_body += f"{tweet['text']}\n"
source_content = f"""---
type: source
source_type: x-research
title: "X research: {query}"
url: ""
author: "multiple"
date: {date_str}
domain: internet-finance
format: social-media-collection
status: unprocessed
proposed_by: "@{username}"
contribution_type: research-direction
research_query: "{query.replace('"', "'")}"
tweet_count: {len(tweets)}
tags: [x-research, telegram-research]
---
# X Research: {query}
Submitted by @{username} via Telegram /research command.
{len(tweets)} tweets found, sorted by engagement.
{tweets_body}
"""
source_path.write_text(source_content)
archived = len(tweets)
_git_commit_archive(source_path, filename)
except TimeoutError:
logger.warning("Research archive failed: worktree lock timeout")
except Exception as e:
logger.warning("Research archive failed: %s", e)
record_research_usage(user.id if user else 0)
remaining = get_research_remaining(user.id if user else 0)
# Summary of what was found
top_authors = list(set(t["author"] for t in tweets[:5]))
await msg.reply_text(
f"Queued {archived} tweets about '{query}' for extraction. "
f"Top voices: @{', @'.join(top_authors[:3])}. "
f"Results will appear in the KB within ~30 minutes. "
f"({remaining} research requests remaining today.)"
)
logger.info("Research: @%s queried '%s', archived %d tweets", username, query, archived)
# ─── Message Handlers ───────────────────────────────────────────────────
@ -358,6 +446,14 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
# Check for explicit /research command (Rhea: fast path before LLM)
research_match = RESEARCH_PATTERN.search(text) or RESEARCH_NATURAL.search(text)
if research_match:
query = research_match.group(1).strip()
# Respond with KB knowledge first, then trigger research async (Ganymede: don't block)
await handle_research(msg, query, user)
# Don't return — continue to normal response so user gets immediate KB answer too
# Send typing indicator
await msg.chat.send_action("typing")

159
telegram/x_search.py Normal file
View file

@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""X (Twitter) search client for user-triggered research.
Searches X via twitterapi.io, filters for relevance, returns structured tweet data.
Used by the Telegram bot's /research command.
Epimetheus owns this module.
"""
import logging
import time
from pathlib import Path
import aiohttp
logger = logging.getLogger("x-search")
API_URL = "https://api.twitterapi.io/twitter/tweet/advanced_search"
API_KEY_FILE = "/opt/teleo-eval/secrets/twitterapi-io-key"
# Rate limiting: 3 research queries per user per day
_research_usage: dict[int, list[float]] = {} # user_id → [timestamps]
MAX_RESEARCH_PER_DAY = 3
def _load_api_key() -> str | None:
try:
return Path(API_KEY_FILE).read_text().strip()
except Exception:
logger.warning("Twitter API key not found at %s", API_KEY_FILE)
return None
def check_research_rate_limit(user_id: int) -> bool:
"""Check if user has research requests remaining. Returns True if allowed."""
now = time.time()
times = _research_usage.get(user_id, [])
# Prune entries older than 24h
times = [t for t in times if now - t < 86400]
_research_usage[user_id] = times
return len(times) < MAX_RESEARCH_PER_DAY
def record_research_usage(user_id: int):
"""Record a research request for rate limiting."""
_research_usage.setdefault(user_id, []).append(time.time())
def get_research_remaining(user_id: int) -> int:
"""Get remaining research requests for today."""
now = time.time()
times = [t for t in _research_usage.get(user_id, []) if now - t < 86400]
return max(0, MAX_RESEARCH_PER_DAY - len(times))
async def search_x(query: str, max_results: int = 20, min_engagement: int = 3) -> list[dict]:
"""Search X for tweets matching query. Returns structured tweet data.
Filters: recent tweets, min engagement threshold, skip pure retweets.
"""
key = _load_api_key()
if not key:
return []
try:
async with aiohttp.ClientSession() as session:
async with session.get(
API_URL,
params={"query": query, "queryType": "Latest"},
headers={"X-API-Key": key},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status >= 400:
logger.warning("X search API → %d for query: %s", resp.status, query)
return []
data = await resp.json()
tweets = data.get("tweets", [])
except Exception as e:
logger.warning("X search error: %s", e)
return []
# Filter and structure results
results = []
for tweet in tweets[:max_results * 2]: # Fetch more, filter down
text = tweet.get("text", "")
author = tweet.get("author", {})
# Skip pure retweets (no original text)
if text.startswith("RT @"):
continue
# Engagement filter
likes = tweet.get("likeCount", 0) or 0
retweets = tweet.get("retweetCount", 0) or 0
replies = tweet.get("replyCount", 0) or 0
engagement = likes + retweets + replies
if engagement < min_engagement:
continue
results.append({
"text": text,
"url": tweet.get("twitterUrl", tweet.get("url", "")),
"author": author.get("userName", "unknown"),
"author_name": author.get("name", ""),
"author_followers": author.get("followers", 0),
"engagement": engagement,
"likes": likes,
"retweets": retweets,
"replies": replies,
"tweet_date": tweet.get("createdAt", ""),
"is_reply": bool(tweet.get("inReplyToId")),
})
if len(results) >= max_results:
break
# Sort by engagement (highest first)
results.sort(key=lambda t: t["engagement"], reverse=True)
return results
def format_tweet_as_source(tweet: dict, query: str, submitted_by: str) -> str:
"""Format a tweet as a source file for inbox/queue/."""
import re
from datetime import date
slug = re.sub(r"[^a-z0-9]+", "-", tweet["text"][:50].lower()).strip("-")
author = tweet["author"]
return f"""---
type: source
source_type: x-post
title: "X post by @{author}: {tweet['text'][:80].replace('"', "'")}"
url: "{tweet['url']}"
author: "@{author}"
date: {date.today().isoformat()}
domain: internet-finance
format: social-media
status: unprocessed
proposed_by: "{submitted_by}"
contribution_type: research-direction
research_query: "{query.replace('"', "'")}"
tweet_author: "@{author}"
tweet_author_followers: {tweet.get('author_followers', 0)}
tweet_engagement: {tweet.get('engagement', 0)}
tweet_date: "{tweet.get('tweet_date', '')}"
tags: [x-research, telegram-research]
---
## Tweet by @{author}
{tweet['text']}
---
Engagement: {tweet.get('likes', 0)} likes, {tweet.get('retweets', 0)} retweets, {tweet.get('replies', 0)} replies
Author followers: {tweet.get('author_followers', 0)}
"""