epimetheus: Haiku pre-pass for auto-research (Option A)
Before Opus responds, Haiku evaluates: "Does this message need an X search?"
If YES, searches X, injects results into Opus prompt, archives as source.
Opus responds with KB knowledge + fresh tweet data combined.
Flow: user asks naturally ("what are people saying about P2P?") → Haiku
decides search needed → X search → results in Opus context → unified response.
~1s latency, ~$0.001 cost per message. Only fires when Haiku says YES.
Explicit /research command still works as direct path.
Also: fixed systemd ProtectSystem paths (Ganymede: root cause of all
write failures). Fixed research regex for Telegram group commands.
Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
This commit is contained in:
parent
5388f701bd
commit
7086bcacb1
1 changed files with 112 additions and 35 deletions
147
telegram/bot.py
147
telegram/bot.py
|
|
@ -272,8 +272,8 @@ def _format_conversation_history(chat_id: int, user_id: int) -> str:
|
|||
|
||||
|
||||
# Research intent patterns (Rhea: explicit /research + natural language fallback)
|
||||
# Explicit /research command only — no natural language detection (too many false positives)
|
||||
RESEARCH_PATTERN = re.compile(r'/research\s+(.+)', re.IGNORECASE)
|
||||
# Telegram appends @botname to commands in groups (Ganymede: /research@FutAIrdBot query)
|
||||
RESEARCH_PATTERN = re.compile(r'/research(?:@\w+)?\s+(.+)', re.IGNORECASE)
|
||||
|
||||
|
||||
async def handle_research(msg, query: str, user):
|
||||
|
|
@ -287,7 +287,9 @@ async def handle_research(msg, query: str, user):
|
|||
|
||||
await msg.chat.send_action("typing")
|
||||
|
||||
logger.info("Research: searching X for '%s'", query)
|
||||
tweets = await search_x(query, max_results=15, min_engagement=3)
|
||||
logger.info("Research: got %d tweets for '%s'", len(tweets), query)
|
||||
if not tweets:
|
||||
await msg.reply_text(f"No recent tweets found for '{query}'.")
|
||||
return
|
||||
|
|
@ -295,23 +297,23 @@ async def handle_research(msg, query: str, user):
|
|||
# Archive all tweets as ONE source file per research query
|
||||
# (not per-tweet — one extraction PR produces claims from the best material)
|
||||
try:
|
||||
with main_worktree_lock(timeout=10):
|
||||
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
|
||||
filename = f"{date_str}-x-research-{slug}.md"
|
||||
source_path = Path(ARCHIVE_DIR) / filename
|
||||
source_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Write to staging dir (outside worktree — no read-only errors)
|
||||
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
|
||||
filename = f"{date_str}-x-research-{slug}.md"
|
||||
source_path = Path(ARCHIVE_DIR) / filename
|
||||
source_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Build consolidated source file
|
||||
tweets_body = ""
|
||||
for i, tweet in enumerate(tweets, 1):
|
||||
tweets_body += f"\n### Tweet {i} — @{tweet['author']} ({tweet.get('engagement', 0)} engagement)\n"
|
||||
tweets_body += f"**URL:** {tweet.get('url', '')}\n"
|
||||
tweets_body += f"**Followers:** {tweet.get('author_followers', 0)} | "
|
||||
tweets_body += f"**Likes:** {tweet.get('likes', 0)} | **RT:** {tweet.get('retweets', 0)}\n\n"
|
||||
tweets_body += f"{tweet['text']}\n"
|
||||
# Build consolidated source file
|
||||
tweets_body = ""
|
||||
for i, tweet in enumerate(tweets, 1):
|
||||
tweets_body += f"\n### Tweet {i} — @{tweet['author']} ({tweet.get('engagement', 0)} engagement)\n"
|
||||
tweets_body += f"**URL:** {tweet.get('url', '')}\n"
|
||||
tweets_body += f"**Followers:** {tweet.get('author_followers', 0)} | "
|
||||
tweets_body += f"**Likes:** {tweet.get('likes', 0)} | **RT:** {tweet.get('retweets', 0)}\n\n"
|
||||
tweets_body += f"{tweet['text']}\n"
|
||||
|
||||
source_content = f"""---
|
||||
source_content = f"""---
|
||||
type: source
|
||||
source_type: x-research
|
||||
title: "X research: {query}"
|
||||
|
|
@ -335,12 +337,9 @@ Submitted by @{username} via Telegram /research command.
|
|||
|
||||
{tweets_body}
|
||||
"""
|
||||
source_path.write_text(source_content)
|
||||
archived = len(tweets)
|
||||
|
||||
_git_commit_archive(source_path, filename)
|
||||
except TimeoutError:
|
||||
logger.warning("Research archive failed: worktree lock timeout")
|
||||
source_path.write_text(source_content)
|
||||
archived = len(tweets)
|
||||
logger.info("Research archived: %s (%d tweets)", filename, archived)
|
||||
except Exception as e:
|
||||
logger.warning("Research archive failed: %s", e)
|
||||
|
||||
|
|
@ -447,17 +446,81 @@ async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|||
|
||||
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
|
||||
|
||||
# Check for explicit /research command (Rhea: fast path before LLM)
|
||||
# Check for /research command — run search BEFORE Opus so results are in context
|
||||
research_context = ""
|
||||
research_match = RESEARCH_PATTERN.search(text)
|
||||
if research_match:
|
||||
query = research_match.group(1).strip()
|
||||
# Respond with KB knowledge first, then trigger research async (Ganymede: don't block)
|
||||
await handle_research(msg, query, user)
|
||||
# Don't return — continue to normal response so user gets immediate KB answer too
|
||||
logger.info("Research: searching X for '%s'", query)
|
||||
from x_search import search_x, format_tweet_as_source, check_research_rate_limit, record_research_usage
|
||||
if check_research_rate_limit(user.id if user else 0):
|
||||
tweets = await search_x(query, max_results=10, min_engagement=0)
|
||||
logger.info("Research: got %d tweets for '%s'", len(tweets), query)
|
||||
if tweets:
|
||||
# Archive as source file (staging dir)
|
||||
try:
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
|
||||
filename = f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}-x-research-{slug}.md"
|
||||
source_path = Path(ARCHIVE_DIR) / filename
|
||||
tweets_body = "\n".join(
|
||||
f"@{t['author']} ({t.get('engagement',0)} eng): {t['text'][:200]}"
|
||||
for t in tweets[:10]
|
||||
)
|
||||
source_path.write_text(f"---\ntype: source\nsource_type: x-research\ntitle: \"X research: {query}\"\ndate: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}\ndomain: internet-finance\nstatus: unprocessed\nproposed_by: \"@{user.username if user else 'unknown'}\"\ncontribution_type: research-direction\n---\n\n{tweets_body}\n")
|
||||
logger.info("Research archived: %s", filename)
|
||||
except Exception as e:
|
||||
logger.warning("Research archive failed: %s", e)
|
||||
|
||||
# Build context for Opus prompt
|
||||
research_context = f"\n## Fresh X Research Results for '{query}'\n"
|
||||
for t in tweets[:7]:
|
||||
research_context += f"- @{t['author']}: {t['text'][:150]}\n"
|
||||
record_research_usage(user.id if user else 0)
|
||||
# Strip the /research command from text so Opus responds to the topic, not the command
|
||||
text = re.sub(r'/research(?:@\w+)?\s+', '', text).strip()
|
||||
if not text:
|
||||
text = query
|
||||
|
||||
# Send typing indicator
|
||||
await msg.chat.send_action("typing")
|
||||
|
||||
# Haiku pre-pass: does this message need an X search? (Option A: two-pass)
|
||||
if not research_context: # Skip if /research already ran
|
||||
try:
|
||||
haiku_prompt = (
|
||||
f"Does this Telegram message need a live X/Twitter search to answer well? "
|
||||
f"Only say YES if the user is asking about recent sentiment, community takes, "
|
||||
f"what people are saying, or emerging discussions that wouldn't be in a knowledge base.\n\n"
|
||||
f"Message: {text}\n\n"
|
||||
f"Respond with ONLY one of:\n"
|
||||
f"YES: [search query]\n"
|
||||
f"NO"
|
||||
)
|
||||
haiku_result = await call_openrouter("anthropic/claude-haiku-4.5", haiku_prompt, max_tokens=50)
|
||||
if haiku_result and haiku_result.strip().upper().startswith("YES:"):
|
||||
search_query = haiku_result.strip()[4:].strip()
|
||||
logger.info("Haiku pre-pass: research needed — '%s'", search_query)
|
||||
from x_search import search_x, check_research_rate_limit, record_research_usage
|
||||
if check_research_rate_limit(user.id if user else 0):
|
||||
tweets = await search_x(search_query, max_results=10, min_engagement=0)
|
||||
logger.info("Haiku research: got %d tweets", len(tweets))
|
||||
if tweets:
|
||||
research_context = f"\n## Fresh X Research Results for '{search_query}'\n"
|
||||
for t in tweets[:7]:
|
||||
research_context += f"- @{t['author']}: {t['text'][:150]}\n"
|
||||
record_research_usage(user.id if user else 0)
|
||||
# Archive as source
|
||||
try:
|
||||
slug = re.sub(r"[^a-z0-9]+", "-", search_query[:60].lower()).strip("-")
|
||||
filename = f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}-x-research-{slug}.md"
|
||||
source_path = Path(ARCHIVE_DIR) / filename
|
||||
tweets_body = "\n".join(f"@{t['author']}: {t['text'][:200]}" for t in tweets[:10])
|
||||
source_path.write_text(f"---\ntype: source\nsource_type: x-research\ntitle: \"X research: {search_query}\"\ndate: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}\ndomain: internet-finance\nstatus: unprocessed\nproposed_by: \"@{user.username if user else 'unknown'}\"\ncontribution_type: research-direction\n---\n\n{tweets_body}\n")
|
||||
except Exception as e:
|
||||
logger.warning("Haiku research archive failed: %s", e)
|
||||
except Exception as e:
|
||||
logger.warning("Haiku pre-pass failed: %s", e)
|
||||
|
||||
# Retrieve full KB context (entity resolution + claim search + agent positions)
|
||||
kb_ctx = retrieve_context(text, KB_READ_DIR, index=kb_index)
|
||||
kb_context_text = format_context_for_prompt(kb_ctx)
|
||||
|
|
@ -513,6 +576,8 @@ Write like a sharp analyst talking to peers, not like an AI. Specifically:
|
|||
|
||||
{f"## Live Market Data{chr(10)}{market_context}" if market_context else ""}
|
||||
|
||||
{research_context}
|
||||
|
||||
## Conversation History (NEVER ask a question your history already answers)
|
||||
{_format_conversation_history(msg.chat_id, user.id if user else 0)}
|
||||
|
||||
|
|
@ -522,10 +587,14 @@ Message: {text}
|
|||
|
||||
Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you — only respond when tagged or replied to.
|
||||
|
||||
IMPORTANT: If you learn something from this exchange — a correction, a new rule, new data — append a LEARNING line at the very end of your response. Format:
|
||||
LEARNING: [category] [what you learned]
|
||||
Categories: factual, communication, structured_data
|
||||
Only when you genuinely learned something. Most responses have NO learning line."""
|
||||
IMPORTANT: Two special tags you can append at the end of your response (after your main text):
|
||||
|
||||
1. If you learn something: LEARNING: [category] [what you learned]
|
||||
Categories: factual, communication, structured_data
|
||||
Only when genuinely learned something. Most responses have none.
|
||||
|
||||
2. If the user would benefit from an X search on a topic: RESEARCH: [search query]
|
||||
This triggers an automatic X search. Use when the user asks about recent sentiment, community takes, or emerging discussions. Only when a search would genuinely help."""
|
||||
|
||||
# Call Opus
|
||||
response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024)
|
||||
|
|
@ -534,18 +603,26 @@ Only when you genuinely learned something. Most responses have NO learning line.
|
|||
await msg.reply_text("Processing error — I'll get back to you.")
|
||||
return
|
||||
|
||||
# Parse LEARNING lines before posting (Rhea: zero-cost self-write trigger)
|
||||
# Parse LEARNING and RESEARCH tags before posting
|
||||
display_response = response
|
||||
|
||||
# Auto-learning (Rhea: zero-cost self-write trigger)
|
||||
learning_lines = re.findall(r'^LEARNING:\s*(factual|communication|structured_data)\s+(.+)$',
|
||||
response, re.MULTILINE)
|
||||
if learning_lines:
|
||||
# Strip LEARNING lines from displayed response
|
||||
display_response = re.sub(r'\nLEARNING:\s*\S+\s+.+$', '', response, flags=re.MULTILINE).rstrip()
|
||||
# Save each learning
|
||||
display_response = re.sub(r'\nLEARNING:\s*\S+\s+.+$', '', display_response, flags=re.MULTILINE).rstrip()
|
||||
for category, correction in learning_lines:
|
||||
_save_learning(correction.strip(), category.strip())
|
||||
logger.info("Auto-learned [%s]: %s", category, correction[:80])
|
||||
|
||||
# Auto-research (Ganymede: LLM-driven research trigger)
|
||||
research_lines = re.findall(r'^RESEARCH:\s+(.+)$', response, re.MULTILINE)
|
||||
if research_lines:
|
||||
display_response = re.sub(r'\nRESEARCH:\s+.+$', '', display_response, flags=re.MULTILINE).rstrip()
|
||||
for query in research_lines:
|
||||
asyncio.get_event_loop().create_task(handle_research(msg, query.strip(), user))
|
||||
logger.info("Auto-research triggered: %s", query[:80])
|
||||
|
||||
# Post response (without LEARNING lines)
|
||||
await msg.reply_text(display_response)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue