teleo-infrastructure/telegram/bot.py
m3taversal 0759655688 fix: process all URLs in a message, not just the first
When a user shared two X links in one message (sjdedic + knimkar),
only the first got a standalone source. Now processes up to 5 URLs
per message, each getting its own standalone source file.

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
2026-03-25 13:21:26 +00:00

1258 lines
53 KiB
Python

#!/usr/bin/env python3
"""Teleo Telegram Bot — Rio as analytical agent in community groups.
Architecture:
- Always-on ingestion: captures all messages, batch triage every N minutes
- Tag-based response: Opus-quality KB-grounded responses when @tagged
- Conversation-window triage: identifies coherent claims across message threads
- Full eval tracing: Rio's responses are logged as KB claims, accountable
Two paths (Ganymede architecture):
- Fast path (read): tag → KB query → Opus response → post to group
- Slow path (write): batch triage → archive to inbox/ → pipeline extracts
Separate systemd service: teleo-telegram.service
Does NOT integrate with pipeline daemon.
Epimetheus owns this module.
"""
import asyncio
import logging
import os
import re
import sqlite3
import sys
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
# Add pipeline lib to path for shared modules
sys.path.insert(0, "/opt/teleo-eval/pipeline")
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context
from market_data import get_token_price, format_price_context
from worktree_lock import main_worktree_lock
from x_client import search_tweets, fetch_from_url, check_research_rate_limit, record_research_usage, get_research_remaining
# ─── Config ─────────────────────────────────────────────────────────────
BOT_TOKEN_FILE = "/opt/teleo-eval/secrets/telegram-bot-token"
OPENROUTER_KEY_FILE = "/opt/teleo-eval/secrets/openrouter-key"
PIPELINE_DB = "/opt/teleo-eval/pipeline/pipeline.db"
KB_READ_DIR = "/opt/teleo-eval/workspaces/main" # For KB retrieval (clean main branch)
ARCHIVE_DIR = "/opt/teleo-eval/telegram-archives" # Write outside worktree to avoid read-only errors
MAIN_WORKTREE = "/opt/teleo-eval/workspaces/main" # For git operations only
LEARNINGS_FILE = "/opt/teleo-eval/workspaces/main/agents/rio/learnings.md" # Agent memory (Option D)
LOG_FILE = "/opt/teleo-eval/logs/telegram-bot.log"
# Triage interval (seconds)
TRIAGE_INTERVAL = 900 # 15 minutes
# Models
RESPONSE_MODEL = "anthropic/claude-opus-4-6" # Opus for tagged responses
TRIAGE_MODEL = "anthropic/claude-haiku-4.5" # Haiku for batch triage
# Rate limits
MAX_RESPONSE_PER_USER_PER_HOUR = 30
MIN_MESSAGE_LENGTH = 20 # Skip very short messages
# ─── Logging ────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(),
],
)
logger = logging.getLogger("telegram-bot")
# ─── State ──────────────────────────────────────────────────────────────
# Message buffer for batch triage
message_buffer: list[dict] = []
# Rate limiting
user_response_times: dict[int, list[float]] = defaultdict(list)
# Allowed group IDs (set after first message received, or configure)
allowed_groups: set[int] = set()
# Shared KB index (built once, refreshed on mtime change)
kb_index = KBIndex(KB_READ_DIR)
# Conversation windows — track active conversations per (chat_id, user_id)
# Rhea's model: count unanswered messages, reset on bot response, expire at threshold
CONVERSATION_WINDOW = 5 # expire after 5 unanswered messages
unanswered_count: dict[tuple[int, int], int] = {} # (chat_id, user_id) → unanswered count
# Conversation history — last N exchanges for prompt context (Ganymede: high-value change)
MAX_HISTORY_USER = 5
MAX_HISTORY_CHAT = 30 # Group chats: multiple users, longer threads
conversation_history: dict[tuple[int, int], list[dict]] = {} # (chat_id, user_id) → [{user, bot}]
# ─── Helpers ────────────────────────────────────────────────────────────
def get_db_stats() -> dict:
"""Get basic KB stats from pipeline DB."""
try:
conn = sqlite3.connect(PIPELINE_DB, timeout=5)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA query_only=ON")
merged = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"]
contributors = conn.execute("SELECT COUNT(*) as n FROM contributors").fetchone()["n"]
conn.close()
return {"merged_claims": merged, "contributors": contributors}
except Exception:
return {"merged_claims": "?", "contributors": "?"}
async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> str | None:
"""Call OpenRouter API."""
import aiohttp
key = Path(OPENROUTER_KEY_FILE).read_text().strip()
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.3,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
json=payload,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
if resp.status >= 400:
logger.error("OpenRouter %s%d", model, resp.status)
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
except Exception as e:
logger.error("OpenRouter error: %s", e)
return None
def is_rate_limited(user_id: int) -> bool:
"""Check if a user has exceeded the response rate limit."""
now = time.time()
times = user_response_times[user_id]
# Prune old entries
times[:] = [t for t in times if now - t < 3600]
return len(times) >= MAX_RESPONSE_PER_USER_PER_HOUR
def sanitize_message(text: str) -> str:
"""Sanitize message content before sending to LLM. (Ganymede: security)"""
# Strip code blocks (potential prompt injection)
text = re.sub(r"```.*?```", "[code block removed]", text, flags=re.DOTALL)
# Strip anything that looks like system instructions
text = re.sub(r"(system:|assistant:|human:|<\|.*?\|>)", "", text, flags=re.IGNORECASE)
# Truncate
return text[:2000]
def _git_commit_archive(archive_path, filename: str):
"""Commit archived source to git so it survives git clean. (Rio review: data loss bug)"""
import subprocess
try:
cwd = MAIN_WORKTREE
subprocess.run(["git", "add", str(archive_path)], cwd=cwd, timeout=10,
capture_output=True, check=False)
result = subprocess.run(
["git", "commit", "-m", f"telegram: archive {filename}\n\n"
"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>"],
cwd=cwd, timeout=10, capture_output=True, check=False,
)
if result.returncode == 0:
# Push with retry (Ganymede: abort rebase on failure, don't lose the file)
for attempt in range(3):
rebase = subprocess.run(["git", "pull", "--rebase", "origin", "main"],
cwd=cwd, timeout=30, capture_output=True, check=False)
if rebase.returncode != 0:
subprocess.run(["git", "rebase", "--abort"], cwd=cwd, timeout=10,
capture_output=True, check=False)
logger.warning("Git rebase failed for archive %s (attempt %d), aborted", filename, attempt + 1)
continue
push = subprocess.run(["git", "push", "origin", "main"],
cwd=cwd, timeout=30, capture_output=True, check=False)
if push.returncode == 0:
logger.info("Git committed archive: %s", filename)
return
# All retries failed — file is still on filesystem (safety net), commit is uncommitted
logger.warning("Git push failed for archive %s after 3 attempts (file preserved on disk)", filename)
except Exception as e:
logger.warning("Git commit archive failed: %s", e)
def _load_learnings() -> str:
"""Load Rio's learnings file for prompt injection. Sanitized (Ganymede: prompt injection risk).
Dated entries older than 7 days are filtered out (Ganymede: stale learning TTL).
Permanent entries (undated) always included.
"""
try:
raw = Path(LEARNINGS_FILE).read_text()[:4000]
today = datetime.now(timezone.utc).date()
lines = []
for line in raw.split("\n"):
# Check for dated entries [YYYY-MM-DD]
date_match = re.search(r"\[(\d{4}-\d{2}-\d{2})\]", line)
if date_match:
try:
entry_date = datetime.strptime(date_match.group(1), "%Y-%m-%d").date()
if (today - entry_date).days > 7:
continue # stale, skip
except ValueError:
pass
lines.append(line)
return sanitize_message("\n".join(lines))
except Exception:
return ""
def _save_learning(correction: str, category: str = "factual"):
"""Append a learning to staging file. Cron syncs to git (same as archives).
Categories: communication, factual, structured_data
"""
try:
# Write to staging file outside worktree (avoids read-only errors)
staging_file = Path(ARCHIVE_DIR) / "pending-learnings.jsonl"
import json as _json
entry = _json.dumps({"category": category, "correction": correction,
"ts": datetime.now(timezone.utc).isoformat()})
with open(staging_file, "a") as f:
f.write(entry + "\n")
logger.info("Learning staged: [%s] %s", category, correction[:80])
return
except Exception as e:
logger.warning("Learning staging failed: %s", e)
# No fallback — staging is the only write path. Cron syncs to git.
def _compress_history(history: list[dict]) -> str:
"""Extract key context from conversation history — 20 tokens, unmissable (Ganymede)."""
if not history:
return ""
# Combine all text for entity/number extraction
all_text = " ".join(h.get("user", "") + " " + h.get("bot", "") for h in history)
tickers = sorted(set(re.findall(r"\$[A-Z]{2,10}", all_text)))
numbers = re.findall(r"\$[\d,.]+[KMB]?|\d+\.?\d*%", all_text)
parts = []
if tickers:
parts.append(f"Discussing: {', '.join(tickers)}")
if numbers:
parts.append(f"Key figures: {', '.join(numbers[:5])}")
parts.append(f"Exchanges: {len(history)}")
return " | ".join(parts)
def _format_conversation_history(chat_id: int, user_id: int) -> str:
"""Format conversation history with compressed context summary (Ganymede: Option C+A).
In group chats, merges user-specific history with chat-level history
so the bot sees exchanges from other users in the same chat.
"""
user_key = (chat_id, user_id)
chat_key = (chat_id, 0) # chat-level history (all users)
# Merge: chat-level history gives full group context
chat_history = conversation_history.get(chat_key, [])
user_history = conversation_history.get(user_key, [])
# Use chat-level if available (group chats), otherwise user-level (DMs)
history = chat_history if chat_history else user_history
if not history:
return "(No prior conversation)"
# Compressed context first — hard for the model to miss
summary = _compress_history(history)
lines = [summary, ""]
# Full exchange log for reference
for exchange in history:
who = exchange.get("username", "User")
if exchange.get("user"):
lines.append(f"@{who}: {exchange['user']}")
if exchange.get("bot"):
lines.append(f"Rio: {exchange['bot']}")
lines.append("")
return "\n".join(lines)
# Research intent patterns (Rhea: explicit /research + natural language fallback)
# Telegram appends @botname to commands in groups (Ganymede: /research@FutAIrdBot query)
RESEARCH_PATTERN = re.compile(r'/research(?:@\w+)?\s+(.+)', re.IGNORECASE)
async def _research_and_followup(msg, query: str, user):
"""Run X search and send a follow-up message with findings.
Used when Opus triggers RESEARCH: tag — the user expects results back,
not silent archival.
"""
from x_client import search_tweets as _search
logger.info("Research follow-up: searching X for '%s'", query)
tweets = await _search(query, max_results=10, min_engagement=0)
if not tweets:
await msg.reply_text(f"Searched X for '{query}' — nothing recent found.")
return
# Build concise summary of findings
lines = [f"Found {len(tweets)} recent posts about '{query}':\n"]
for t in tweets[:5]:
author = t.get("author", "?")
text = t.get("text", "")[:200]
url = t.get("url", "")
lines.append(f"@{author}: {text}")
if url:
lines.append(f" {url}")
lines.append("")
followup = "\n".join(lines)
# Split if needed
if len(followup) <= 4096:
await msg.reply_text(followup)
else:
chunks = []
remaining = followup
while remaining:
if len(remaining) <= 4096:
chunks.append(remaining)
break
split_at = remaining.rfind("\n\n", 0, 4000)
if split_at == -1:
split_at = remaining.rfind("\n", 0, 4096)
if split_at == -1:
split_at = 4096
chunks.append(remaining[:split_at])
remaining = remaining[split_at:].lstrip("\n")
for chunk in chunks:
if chunk.strip():
await msg.reply_text(chunk)
# Also archive for pipeline
await handle_research(msg, query, user, silent=True)
async def handle_research(msg, query: str, user, silent: bool = False):
"""Handle a research request — search X and archive results as sources.
If silent=True, archive only — no messages posted. Used when triggered
by RESEARCH: tag after Opus already responded.
"""
username = user.username if user else "unknown"
if not silent and not check_research_rate_limit(user.id if user else 0):
remaining = get_research_remaining(user.id if user else 0)
await msg.reply_text(f"Research limit reached (3/day). Resets at midnight UTC. {remaining} remaining.")
return
if not silent:
await msg.chat.send_action("typing")
logger.info("Research: searching X for '%s'", query)
tweets = await search_tweets(query, max_results=15, min_engagement=0)
logger.info("Research: got %d tweets for '%s'", len(tweets), query)
if not tweets:
if not silent:
await msg.reply_text(f"No recent tweets found for '{query}'.")
return
# Fetch full content for top tweets (not just search snippets)
from x_client import fetch_from_url
for i, tweet in enumerate(tweets[:5]): # Top 5 by engagement
if i > 0:
await asyncio.sleep(0.5) # Ganymede: 500ms between calls, polite to Ben's API
url = tweet.get("url", "")
if url:
try:
full_data = await fetch_from_url(url)
if full_data:
# Replace snippet with full text
full_text = full_data.get("text", "")
if full_text and len(full_text) > len(tweet.get("text", "")):
tweet["text"] = full_text
# Include article content if available
contents = full_data.get("contents", [])
if contents:
article_parts = []
for block in contents:
block_text = block.get("text", "")
if not block_text:
continue
block_type = block.get("type", "unstyled")
if block_type in ("header-one", "header-two", "header-three"):
article_parts.append(f"\n## {block_text}\n")
elif block_type == "blockquote":
article_parts.append(f"> {block_text}")
elif block_type == "list-item":
article_parts.append(f"- {block_text}")
else:
article_parts.append(block_text)
if article_parts:
tweet["text"] += "\n\n--- Article Content ---\n" + "\n".join(article_parts)
except Exception as e:
logger.warning("Failed to fetch full content for %s: %s", url, e)
# Archive all tweets as ONE source file per research query
# (not per-tweet — one extraction PR produces claims from the best material)
try:
# Write to staging dir (outside worktree — no read-only errors)
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
filename = f"{date_str}-x-research-{slug}.md"
source_path = Path(ARCHIVE_DIR) / filename
source_path.parent.mkdir(parents=True, exist_ok=True)
# Build consolidated source file
tweets_body = ""
for i, tweet in enumerate(tweets, 1):
tweets_body += f"\n### Tweet {i} — @{tweet['author']} ({tweet.get('engagement', 0)} engagement)\n"
tweets_body += f"**URL:** {tweet.get('url', '')}\n"
tweets_body += f"**Followers:** {tweet.get('author_followers', 0)} | "
tweets_body += f"**Likes:** {tweet.get('likes', 0)} | **RT:** {tweet.get('retweets', 0)}\n\n"
tweets_body += f"{tweet['text']}\n"
source_content = f"""---
type: source
source_type: x-research
title: "X research: {query}"
url: ""
author: "multiple"
date: {date_str}
domain: internet-finance
format: social-media-collection
status: unprocessed
proposed_by: "@{username}"
contribution_type: research-direction
research_query: "{query.replace('"', "'")}"
tweet_count: {len(tweets)}
tags: [x-research, telegram-research]
---
# X Research: {query}
Submitted by @{username} via Telegram /research command.
{len(tweets)} tweets found, sorted by engagement.
{tweets_body}
"""
source_path.write_text(source_content)
archived = len(tweets)
logger.info("Research archived: %s (%d tweets)", filename, archived)
except Exception as e:
logger.warning("Research archive failed: %s", e)
if not silent:
record_research_usage(user.id if user else 0)
remaining = get_research_remaining(user.id if user else 0)
top_authors = list(set(t["author"] for t in tweets[:5]))
await msg.reply_text(
f"Queued {archived} tweets about '{query}' for extraction. "
f"Top voices: @{', @'.join(top_authors[:3])}. "
f"Results will appear in the KB within ~30 minutes. "
f"({remaining} research requests remaining today.)"
)
logger.info("Research: @%s queried '%s', archived %d tweets (silent=%s)", username, query, archived, silent)
# ─── Message Handlers ───────────────────────────────────────────────────
def _is_reply_to_bot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""Check if a message is a reply to one of the bot's own messages."""
msg = update.message
if not msg or not msg.reply_to_message:
return False
replied = msg.reply_to_message
return replied.from_user is not None and replied.from_user.id == context.bot.id
async def handle_reply_to_bot(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle replies to the bot's messages — treat as tagged conversation."""
if not _is_reply_to_bot(update, context):
# Not a reply to us — fall through to buffer handler
await handle_message(update, context)
return
logger.info("Reply to bot from @%s",
update.message.from_user.username if update.message.from_user else "unknown")
await handle_tagged(update, context)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle ALL incoming group messages — buffer for triage."""
if not update.message or not update.message.text:
return
msg = update.message
text = msg.text.strip()
# Skip very short messages
if len(text) < MIN_MESSAGE_LENGTH:
return
# Conversation window behavior depends on chat type (Rio: DMs vs groups)
# DMs: auto-respond (always 1-on-1, no false positives)
# Groups: silent context only (reply-to is the only follow-up trigger)
user = msg.from_user
is_dm = msg.chat.type == "private"
if user:
key = (msg.chat_id, user.id)
if key in unanswered_count:
unanswered_count[key] += 1
if is_dm and unanswered_count[key] < CONVERSATION_WINDOW:
# DM: auto-respond — conversation window fires
logger.info("DM conversation window: @%s msg %d/%d",
user.username or "?", unanswered_count[key], CONVERSATION_WINDOW)
await handle_tagged(update, context)
return
# Group: don't track silent messages in history (Ganymede: Option A)
# History should be the actual conversation, not a log of everything said in the group
# Expire window after CONVERSATION_WINDOW unanswered messages
if unanswered_count[key] >= CONVERSATION_WINDOW:
del unanswered_count[key]
conversation_history.pop(key, None)
logger.info("Conversation window expired for @%s", user.username or "?")
# Buffer for batch triage
message_buffer.append({
"text": sanitize_message(text),
"user_id": msg.from_user.id if msg.from_user else None,
"username": msg.from_user.username if msg.from_user else None,
"display_name": msg.from_user.full_name if msg.from_user else None,
"chat_id": msg.chat_id,
"message_id": msg.message_id,
"timestamp": msg.date.isoformat() if msg.date else datetime.now(timezone.utc).isoformat(),
"reply_to": msg.reply_to_message.message_id if msg.reply_to_message else None,
})
async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle messages that tag the bot — Rio responds with Opus."""
if not update.message or not update.message.text:
return
msg = update.message
user = msg.from_user
text = sanitize_message(msg.text)
# Rate limit check
if user and is_rate_limited(user.id):
await msg.reply_text("I'm processing other requests — try again in a few minutes.")
return
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
# Check for /research command — run search BEFORE Opus so results are in context
research_context = ""
research_match = RESEARCH_PATTERN.search(text)
if research_match:
query = research_match.group(1).strip()
logger.info("Research: searching X for '%s'", query)
from x_client import search_tweets, check_research_rate_limit, record_research_usage
if check_research_rate_limit(user.id if user else 0):
tweets = await search_tweets(query, max_results=10, min_engagement=0)
logger.info("Research: got %d tweets for '%s'", len(tweets), query)
if tweets:
# Archive as source file (staging dir)
try:
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
filename = f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}-x-research-{slug}.md"
source_path = Path(ARCHIVE_DIR) / filename
tweets_body = "\n".join(
f"@{t['author']} ({t.get('engagement',0)} eng): {t['text'][:200]}"
for t in tweets[:10]
)
source_path.write_text(f"---\ntype: source\nsource_type: x-research\ntitle: \"X research: {query}\"\ndate: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}\ndomain: internet-finance\nstatus: unprocessed\nproposed_by: \"@{user.username if user else 'unknown'}\"\ncontribution_type: research-direction\n---\n\n{tweets_body}\n")
logger.info("Research archived: %s", filename)
except Exception as e:
logger.warning("Research archive failed: %s", e)
# Build context for Opus prompt
research_context = f"\n## Fresh X Research Results for '{query}'\n"
for t in tweets[:7]:
research_context += f"- @{t['author']}: {t['text'][:150]}\n"
record_research_usage(user.id if user else 0)
# Strip the /research command from text so Opus responds to the topic, not the command
text = re.sub(r'/research(?:@\w+)?\s+', '', text).strip()
if not text:
text = query
# Send typing indicator
await msg.chat.send_action("typing")
# Fetch any X/Twitter links in the message (tweet or article)
x_link_context = ""
x_urls = re.findall(r'https?://(?:twitter\.com|x\.com)/\w+/status/\d+', text)
if x_urls:
from x_client import fetch_from_url
for url in x_urls[:3]: # Cap at 3 links
try:
tweet_data = await fetch_from_url(url)
if tweet_data:
x_link_context += f"\n## Linked Tweet by @{tweet_data['author']}\n"
if tweet_data.get("title"):
x_link_context += f"Title: {tweet_data['title']}\n"
x_link_context += f"{tweet_data['text'][:500]}\n"
x_link_context += f"Engagement: {tweet_data.get('engagement', 0)} | URL: {url}\n"
logger.info("Fetched X link: @%s%s", tweet_data['author'], tweet_data['text'][:60])
except Exception as e:
logger.warning("Failed to fetch X link %s: %s", url, e)
# Haiku pre-pass: does this message need an X search? (Option A: two-pass)
if not research_context: # Skip if /research already ran
try:
haiku_prompt = (
f"Does this Telegram message need a live X/Twitter search to answer well? "
f"Only say YES if the user is asking about recent sentiment, community takes, "
f"what people are saying, or emerging discussions.\n\n"
f"Message: {text}\n\n"
f"If YES, provide a SHORT search query (2-3 words max, like 'P2P.me' or 'MetaDAO buyback'). "
f"Twitter search works best with simple queries — too many words returns nothing.\n\n"
f"Respond with ONLY one of:\n"
f"YES: [2-3 word query]\n"
f"NO"
)
haiku_result = await call_openrouter("anthropic/claude-haiku-4.5", haiku_prompt, max_tokens=50)
if haiku_result and haiku_result.strip().upper().startswith("YES:"):
search_query = haiku_result.strip()[4:].strip()
logger.info("Haiku pre-pass: research needed — '%s'", search_query)
from x_client import search_tweets, check_research_rate_limit, record_research_usage
if check_research_rate_limit(user.id if user else 0):
tweets = await search_tweets(search_query, max_results=10, min_engagement=0)
logger.info("Haiku research: got %d tweets", len(tweets))
if tweets:
research_context = f"\n## LIVE X Search Results (you just searched for '{search_query}' — cite these directly)\n"
for t in tweets[:7]:
research_context += f"- @{t['author']}: {t['text'][:200]}\n"
# Don't burn user's rate limit on autonomous searches (Ganymede)
# Archive as source
try:
slug = re.sub(r"[^a-z0-9]+", "-", search_query[:60].lower()).strip("-")
filename = f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}-x-research-{slug}.md"
source_path = Path(ARCHIVE_DIR) / filename
tweets_body = "\n".join(f"@{t['author']}: {t['text'][:200]}" for t in tweets[:10])
source_path.write_text(f"---\ntype: source\nsource_type: x-research\ntitle: \"X research: {search_query}\"\ndate: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}\ndomain: internet-finance\nstatus: unprocessed\nproposed_by: \"@{user.username if user else 'unknown'}\"\ncontribution_type: research-direction\n---\n\n{tweets_body}\n")
except Exception as e:
logger.warning("Haiku research archive failed: %s", e)
except Exception as e:
logger.warning("Haiku pre-pass failed: %s", e)
# Retrieve full KB context (entity resolution + claim search + agent positions)
kb_ctx = retrieve_context(text, KB_READ_DIR, index=kb_index)
kb_context_text = format_context_for_prompt(kb_ctx)
stats = get_db_stats()
# Fetch live market data for any tokens mentioned (Rhea: market-data API)
market_context = ""
token_mentions = re.findall(r"\$([A-Z]{2,10})", text.upper())
# Entity name → token mapping for natural language mentions
ENTITY_TOKEN_MAP = {
"omnipair": "OMFG", "metadao": "META", "sanctum": "CLOUD",
"drift": "DRIFT", "ore": "ORE", "jupiter": "JUP",
}
text_lower = text.lower()
for name, ticker in ENTITY_TOKEN_MAP.items():
if name in text_lower:
token_mentions.append(ticker)
# Also check entity matches from KB retrieval
for ent in kb_ctx.entities:
for tag in ent.tags:
if tag.upper() in ENTITY_TOKEN_MAP.values():
token_mentions.append(tag.upper())
for token in set(token_mentions):
try:
data = await get_token_price(token)
if data:
price_str = format_price_context(data, token)
if price_str:
market_context += price_str + "\n"
except Exception:
pass # Market data is supplementary — never block on failure
# Build Opus prompt — Rio's voice
prompt = f"""You are Rio, the Teleo internet finance agent. Your Telegram handle is @FutAIrdBot — that IS you. Users tag @FutAIrdBot to reach you. Never say "I'm not FutAIrdBot." You are also @futaRdIO on X. You have deep knowledge about futarchy, prediction markets, token governance, and the MetaDAO ecosystem.
## How to sound
Write like a sharp analyst talking to peers, not like an AI. Specifically:
- Use your knowledge naturally. Don't say "the KB tracks" or "at experimental confidence" or "our claims show." Just state what you know and how confident you are in plain language.
- Have a take. You're an analyst, not a summarizer. Say what you actually think.
- Before you respond, ask yourself: "Does every sentence here add something the user doesn't already know?" If a sentence just restates context, agrees without adding insight, or pads with filler — cut it. Your goal is signal density, not word count.
- Short questions deserve short answers. If someone asks a factual question, give the fact. Don't surround it with caveats, context, and "the honest picture is" framing.
- Long answers are fine when the question is genuinely complex or the user asks for depth. But earn every paragraph — each one should contain a distinct insight the previous one didn't cover.
- Match the user's energy. If they wrote one line, respond in kind.
- Sound human. No em dashes, no "That said", no "It's worth noting." Just say the thing.
- No markdown. Plain text only.
- When you're uncertain, just say so simply. "I'm not sure about X" beats "we don't have data on this yet."
## Your learnings (corrections from past conversations — prioritize these over KB data when they conflict)
{_load_learnings()}
## What you know about this topic
{kb_context_text}
{f"## Live Market Data{chr(10)}{market_context}" if market_context else ""}
{research_context}
{x_link_context}
## Conversation History (NEVER ask a question your history already answers)
{_format_conversation_history(msg.chat_id, user.id if user else 0)}
## The message you're responding to
From: @{user.username if user else 'unknown'}
Message: {text}
Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you — only respond when tagged or replied to.
IMPORTANT: Two special tags you can append at the end of your response (after your main text):
1. If you learn something: LEARNING: [category] [what you learned]
Categories: factual, communication, structured_data
Only when genuinely learned something. Most responses have none.
NEVER save a learning about what data you do or don't have access to. Your knowledge base changes constantly — availability learnings become stale immediately.
2. If the user would benefit from an X search on a topic: RESEARCH: [search query]
This triggers an automatic X search. Use when the user asks about recent sentiment, community takes, or emerging discussions. Only when a search would genuinely help."""
# Call Opus
response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024)
if not response:
await msg.reply_text("Processing error — I'll get back to you.")
return
# Parse LEARNING and RESEARCH tags before posting
display_response = response
# Auto-learning (Rhea: zero-cost self-write trigger)
learning_lines = re.findall(r'^LEARNING:\s*(factual|communication|structured_data)\s+(.+)$',
response, re.MULTILINE)
if learning_lines:
display_response = re.sub(r'\nLEARNING:\s*\S+\s+.+$', '', display_response, flags=re.MULTILINE).rstrip()
for category, correction in learning_lines:
_save_learning(correction.strip(), category.strip())
logger.info("Auto-learned [%s]: %s", category, correction[:80])
# Auto-research (Ganymede: LLM-driven research trigger)
# Skip if Haiku pre-pass already searched (prevents double-fire + duplicate "No tweets found" messages)
research_lines = re.findall(r'^RESEARCH:\s+(.+)$', response, re.MULTILINE)
if research_lines:
display_response = re.sub(r'\nRESEARCH:\s+.+$', '', display_response, flags=re.MULTILINE).rstrip()
if not research_context: # Only fire if Haiku didn't already search
for query in research_lines:
# Send follow-up with findings (not silent — user expects results)
asyncio.get_event_loop().create_task(
_research_and_followup(msg, query.strip(), user))
logger.info("Auto-research triggered (will follow up): %s", query[:80])
# Post response (without LEARNING lines)
# Telegram has a 4096 char limit — split long messages
if len(display_response) <= 4096:
await msg.reply_text(display_response)
else:
# Split on paragraph boundaries where possible
chunks = []
remaining = display_response
while remaining:
if len(remaining) <= 4096:
chunks.append(remaining)
break
# Find a good split point (paragraph break near 4000 chars)
split_at = remaining.rfind("\n\n", 0, 4000)
if split_at == -1:
split_at = remaining.rfind("\n", 0, 4096)
if split_at == -1:
split_at = 4096
chunks.append(remaining[:split_at])
remaining = remaining[split_at:].lstrip("\n")
for chunk in chunks:
if chunk.strip():
await msg.reply_text(chunk)
# Update conversation state: reset window, store history (Ganymede+Rhea)
if user:
username = user.username or "anonymous"
key = (msg.chat_id, user.id)
unanswered_count[key] = 0 # reset — conversation alive
entry = {"user": text[:500], "bot": response[:500], "username": username}
# Per-user history
history = conversation_history.setdefault(key, [])
history.append(entry)
if len(history) > MAX_HISTORY_USER:
history.pop(0)
# Chat-level history (group context — all users visible)
chat_key = (msg.chat_id, 0)
chat_history = conversation_history.setdefault(chat_key, [])
chat_history.append(entry)
if len(chat_history) > MAX_HISTORY_CHAT:
chat_history.pop(0)
# Record rate limit
if user:
user_response_times[user.id].append(time.time())
# Log the exchange for audit trail
logger.info("Rio responded to @%s (msg_id=%d)", user.username if user else "?", msg.message_id)
# Detect and fetch URLs for pipeline ingestion (all URLs, not just first)
urls = _extract_urls(text)
url_content = None
for url in urls[:5]: # Cap at 5 URLs per message
logger.info("Fetching URL: %s", url)
content = await _fetch_url_content(url)
if content:
logger.info("Fetched %d chars from %s", len(content), url)
if url_content is None:
url_content = content # First URL's content for conversation archive
_archive_standalone_source(url, content, user)
# Archive the exchange as a source for pipeline (slow path)
_archive_exchange(text, response, user, msg, url_content=url_content, urls=urls)
def _archive_standalone_source(url: str, content: str, user):
"""Create a standalone source file for a URL shared in Telegram.
Separate from the conversation archive — this is the actual article/tweet
entering the extraction pipeline as a proper source, attributed to the
contributor who shared it. Ganymede: keep pure (no Rio analysis), two
source_types (x-tweet vs x-article).
"""
try:
username = user.username if user else "anonymous"
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# Extract author from URL or content
author = "unknown"
author_match = re.search(r"x\.com/(\w+)/", url) or re.search(r"twitter\.com/(\w+)/", url)
if author_match:
author = f"@{author_match.group(1)}"
# Distinguish tweet vs article (Ganymede: different extraction behavior)
is_article = "--- Article Content ---" in content and len(content) > 1000
source_type = "x-article" if is_article else "x-tweet"
fmt = "article" if is_article else "social-media"
slug = re.sub(r"[^a-z0-9]+", "-", f"{author}-{url.split('/')[-1][:30]}".lower()).strip("-")
filename = f"{date_str}-tg-shared-{slug}.md"
source_path = Path(ARCHIVE_DIR) / filename
# Don't overwrite if already archived
if source_path.exists():
return
source_content = f"""---
type: source
source_type: {source_type}
title: "{author} — shared via Telegram by @{username}"
author: "{author}"
url: "{url}"
date: {date_str}
domain: internet-finance
format: {fmt}
status: unprocessed
proposed_by: "@{username}"
contribution_type: source-submission
tags: [telegram-shared, {source_type}]
---
# {author}{'Article' if is_article else 'Tweet/Thread'}
Shared by @{username} via Telegram.
Source URL: {url}
## Content
{content}
"""
source_path.write_text(source_content)
logger.info("Standalone source archived: %s (shared by @%s)", filename, username)
except Exception as e:
logger.warning("Failed to archive standalone source %s: %s", url, e)
async def _fetch_url_content(url: str) -> str | None:
"""Fetch article/page content from a URL for pipeline ingestion.
For X/Twitter URLs, uses Ben's API (x_client.fetch_from_url) which returns
structured article content. For other URLs, falls back to raw HTTP fetch.
"""
# X/Twitter URLs → use x_client for structured content
if "x.com/" in url or "twitter.com/" in url:
try:
from x_client import fetch_from_url
data = await fetch_from_url(url)
if not data:
logger.warning("x_client returned no data for %s", url)
return None
# Format structured content
parts = []
# Tweet text
tweet_text = data.get("text", "")
if tweet_text:
parts.append(tweet_text)
# Article content (contents[] array with typed blocks)
contents = data.get("contents", [])
if contents:
parts.append("\n--- Article Content ---\n")
for block in contents:
block_type = block.get("type", "unstyled")
block_text = block.get("text", "")
if not block_text:
continue
if block_type in ("header-one", "header-two", "header-three"):
parts.append(f"\n## {block_text}\n")
elif block_type == "blockquote":
parts.append(f"> {block_text}")
elif block_type == "list-item":
parts.append(f"- {block_text}")
else:
parts.append(block_text)
result = "\n".join(parts)
return result[:10000] if result else None
except Exception as e:
logger.warning("x_client fetch failed for %s: %s", url, e)
return None
# Non-X URLs → raw HTTP fetch with HTML stripping
import aiohttp
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status >= 400:
return None
html = await resp.text()
text = re.sub(r"<script.*?</script>", "", html, flags=re.DOTALL)
text = re.sub(r"<style.*?</style>", "", text, flags=re.DOTALL)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text[:10000]
except Exception as e:
logger.warning("Failed to fetch URL %s: %s", url, e)
return None
def _extract_urls(text: str) -> list[str]:
"""Extract URLs from message text."""
return re.findall(r"https?://[^\s<>\"']+", text)
def _archive_exchange(user_text: str, rio_response: str, user, msg,
url_content: str | None = None, urls: list[str] | None = None):
"""Archive a tagged exchange to inbox/queue/ for pipeline processing."""
try:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
username = user.username if user else "anonymous"
slug = re.sub(r"[^a-z0-9]+", "-", user_text[:50].lower()).strip("-")
filename = f"{date_str}-telegram-{username}-{slug}.md"
archive_path = Path(ARCHIVE_DIR) / filename
archive_path.parent.mkdir(parents=True, exist_ok=True)
# Extract rationale (the user's text minus the @mention and URL)
rationale = re.sub(r"@\w+", "", user_text).strip()
for url in (urls or []):
rationale = rationale.replace(url, "").strip()
# Determine priority — directed contribution with rationale gets high priority
priority = "high" if rationale and len(rationale) > 20 else "medium"
intake_tier = "directed" if rationale and len(rationale) > 20 else "undirected"
url_section = ""
if url_content:
url_section = f"\n## Article Content (fetched)\n\n{url_content[:8000]}\n"
content = f"""---
type: source
source_type: telegram
title: "Telegram: @{username}{slug}"
author: "@{username}"
url: "{urls[0] if urls else ''}"
date: {date_str}
domain: internet-finance
format: conversation
status: unprocessed
priority: {priority}
intake_tier: {intake_tier}
rationale: "{rationale[:200]}"
proposed_by: "@{username}"
tags: [telegram, ownership-community]
---
## Conversation
**@{username}:**
{user_text}
**Rio (response):**
{rio_response}
{url_section}
## Agent Notes
**Why archived:** Tagged exchange in ownership community.
**Rationale from contributor:** {rationale if rationale else 'No rationale provided (bare link or question)'}
**Intake tier:** {intake_tier}{'fast-tracked, contributor provided reasoning' if intake_tier == 'directed' else 'standard processing'}
**Triage:** Conversation may contain [CLAIM], [ENTITY], or [EVIDENCE] for extraction.
"""
# Write to telegram-archives/ (outside worktree — no read-only errors)
# A cron moves files into inbox/queue/ and commits them
archive_path.write_text(content)
logger.info("Archived exchange to %s (tier: %s, urls: %d)",
filename, intake_tier, len(urls or []))
except Exception as e:
logger.error("Failed to archive exchange: %s", e)
# ─── Batch Triage ───────────────────────────────────────────────────────
async def run_batch_triage(context: ContextTypes.DEFAULT_TYPE):
"""Batch triage of buffered messages every TRIAGE_INTERVAL seconds.
Groups messages into conversation windows, sends to Haiku for classification,
archives substantive findings.
"""
global message_buffer
if not message_buffer:
return
# Grab and clear buffer
messages = message_buffer[:]
message_buffer = []
logger.info("Batch triage: %d messages to process", len(messages))
# Group into conversation windows (messages within 5 min of each other)
windows = _group_into_windows(messages, window_seconds=300)
if not windows:
return
# Build triage prompt
windows_text = ""
for i, window in enumerate(windows):
window_msgs = "\n".join(
f" @{m.get('username', '?')}: {m['text'][:200]}"
for m in window
)
windows_text += f"\n--- Window {i+1} ({len(window)} messages) ---\n{window_msgs}\n"
prompt = f"""Classify each conversation window. For each, respond with ONE tag:
[CLAIM] — Contains a specific, disagreeable proposition about how something works
[ENTITY] — Contains factual data about a company, protocol, person, or market
[EVIDENCE] — Contains data or argument that supports or challenges an existing claim about internet finance, futarchy, prediction markets, or token governance
[SKIP] — Casual conversation, not relevant to the knowledge base
Be generous with EVIDENCE — even confirming evidence strengthens the KB.
{windows_text}
Respond with ONLY the window numbers and tags, one per line:
1: [TAG]
2: [TAG]
..."""
result = await call_openrouter(TRIAGE_MODEL, prompt, max_tokens=500)
if not result:
logger.warning("Triage LLM call failed — buffered messages dropped")
return
# Parse triage results
for line in result.strip().split("\n"):
match = re.match(r"(\d+):\s*\[(\w+)\]", line)
if not match:
continue
idx = int(match.group(1)) - 1
tag = match.group(2).upper()
if idx < 0 or idx >= len(windows):
continue
if tag in ("CLAIM", "ENTITY", "EVIDENCE"):
_archive_window(windows[idx], tag)
logger.info("Triage complete: %d windows processed", len(windows))
def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]:
"""Group messages into conversation windows by time proximity."""
if not messages:
return []
# Sort by timestamp
messages.sort(key=lambda m: m.get("timestamp", ""))
windows = []
current_window = [messages[0]]
for msg in messages[1:]:
# Simple grouping: if within window_seconds of previous message, same window
current_window.append(msg)
if len(current_window) >= 10: # Cap window size
windows.append(current_window)
current_window = []
if current_window:
windows.append(current_window)
return windows
def _archive_window(window: list[dict], tag: str):
"""Archive a triaged conversation window to inbox/queue/."""
try:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
first_user = window[0].get("username", "group")
slug = re.sub(r"[^a-z0-9]+", "-", window[0]["text"][:40].lower()).strip("-")
filename = f"{date_str}-telegram-{first_user}-{slug}.md"
archive_path = Path(ARCHIVE_DIR) / filename
archive_path.parent.mkdir(parents=True, exist_ok=True)
# Build conversation content
conversation = ""
contributors = set()
for msg in window:
username = msg.get("username", "anonymous")
contributors.add(username)
conversation += f"**@{username}:** {msg['text']}\n\n"
content = f"""---
type: source
source_type: telegram
title: "Telegram conversation: {slug}"
author: "{', '.join(contributors)}"
date: {date_str}
domain: internet-finance
format: conversation
status: unprocessed
priority: medium
triage_tag: {tag.lower()}
tags: [telegram, ownership-community]
---
## Conversation ({len(window)} messages, {len(contributors)} participants)
{conversation}
## Agent Notes
**Triage:** [{tag}] — classified by batch triage
**Participants:** {', '.join(f'@{u}' for u in contributors)}
"""
# Write to telegram-archives/ (outside worktree)
archive_path.write_text(content)
logger.info("Archived window [%s]: %s (%d msgs, %d participants)",
tag, filename, len(window), len(contributors))
except TimeoutError:
logger.warning("Failed to archive window: worktree lock timeout")
except Exception as e:
logger.error("Failed to archive window: %s", e)
# ─── Bot Setup ──────────────────────────────────────────────────────────
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command."""
await update.message.reply_text(
"I'm Rio, the internet finance agent for TeleoHumanity's collective knowledge base. "
"Tag me with @teleo to ask about futarchy, prediction markets, token governance, "
"or anything in our domain. I'll ground my response in our KB's evidence."
)
async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /stats command — show KB stats."""
kb_index.ensure_fresh()
stats = get_db_stats()
await update.message.reply_text(
f"📊 KB Stats:\n"
f"{len(kb_index._claims)} claims indexed\n"
f"{len(kb_index._entities)} entities tracked\n"
f"{len(kb_index._positions)} agent positions\n"
f"{stats['merged_claims']} PRs merged\n"
f"{stats['contributors']} contributors"
)
def main():
"""Start the bot."""
# Load token
token_path = Path(BOT_TOKEN_FILE)
if not token_path.exists():
logger.error("Bot token not found at %s", BOT_TOKEN_FILE)
sys.exit(1)
token = token_path.read_text().strip()
logger.info("Starting Teleo Telegram bot (Rio)...")
# Build application
app = Application.builder().token(token).build()
# Command handlers
app.add_handler(CommandHandler("start", start_command))
app.add_handler(CommandHandler("stats", stats_command))
# Tag handler — messages mentioning the bot
# python-telegram-bot filters.Mention doesn't work for bot mentions in groups
# Use a regex filter for the bot username
app.add_handler(MessageHandler(
filters.TEXT & filters.Regex(r"(?i)(@teleo|@futairdbot)"),
handle_tagged,
))
# Reply handler — replies to the bot's own messages continue the conversation
reply_to_bot_filter = filters.TEXT & filters.REPLY & ~filters.COMMAND
app.add_handler(MessageHandler(
reply_to_bot_filter,
handle_reply_to_bot,
))
# All other text messages — buffer for triage
app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_message,
))
# Batch triage job
app.job_queue.run_repeating(
run_batch_triage,
interval=TRIAGE_INTERVAL,
first=TRIAGE_INTERVAL,
)
# Run
logger.info("Bot running. Triage interval: %ds", TRIAGE_INTERVAL)
app.run_polling(drop_pending_updates=True)
if __name__ == "__main__":
main()