Option A: history only contains actual bot-user exchanges, not unaddressed group messages. Empty bot responses in history confused the model. Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
932 lines
37 KiB
Python
932 lines
37 KiB
Python
#!/usr/bin/env python3
|
|
"""Teleo Telegram Bot — Rio as analytical agent in community groups.
|
|
|
|
Architecture:
|
|
- Always-on ingestion: captures all messages, batch triage every N minutes
|
|
- Tag-based response: Opus-quality KB-grounded responses when @tagged
|
|
- Conversation-window triage: identifies coherent claims across message threads
|
|
- Full eval tracing: Rio's responses are logged as KB claims, accountable
|
|
|
|
Two paths (Ganymede architecture):
|
|
- Fast path (read): tag → KB query → Opus response → post to group
|
|
- Slow path (write): batch triage → archive to inbox/ → pipeline extracts
|
|
|
|
Separate systemd service: teleo-telegram.service
|
|
Does NOT integrate with pipeline daemon.
|
|
|
|
Epimetheus owns this module.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# Add pipeline lib to path for shared modules
|
|
sys.path.insert(0, "/opt/teleo-eval/pipeline")
|
|
|
|
from telegram import Update
|
|
from telegram.ext import (
|
|
Application,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
)
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from kb_retrieval import KBIndex, format_context_for_prompt, retrieve_context
|
|
from market_data import get_token_price, format_price_context
|
|
from worktree_lock import main_worktree_lock
|
|
from x_search import search_x, format_tweet_as_source, check_research_rate_limit, record_research_usage, get_research_remaining
|
|
|
|
# ─── Config ─────────────────────────────────────────────────────────────
|
|
|
|
BOT_TOKEN_FILE = "/opt/teleo-eval/secrets/telegram-bot-token"
|
|
OPENROUTER_KEY_FILE = "/opt/teleo-eval/secrets/openrouter-key"
|
|
PIPELINE_DB = "/opt/teleo-eval/pipeline/pipeline.db"
|
|
KB_READ_DIR = "/opt/teleo-eval/workspaces/main" # For KB retrieval (clean main branch)
|
|
ARCHIVE_DIR = "/opt/teleo-eval/telegram-archives" # Write outside worktree to avoid read-only errors
|
|
MAIN_WORKTREE = "/opt/teleo-eval/workspaces/main" # For git operations only
|
|
LEARNINGS_FILE = "/opt/teleo-eval/workspaces/main/agents/rio/learnings.md" # Agent memory (Option D)
|
|
LOG_FILE = "/opt/teleo-eval/logs/telegram-bot.log"
|
|
|
|
# Triage interval (seconds)
|
|
TRIAGE_INTERVAL = 900 # 15 minutes
|
|
|
|
# Models
|
|
RESPONSE_MODEL = "anthropic/claude-opus-4-6" # Opus for tagged responses
|
|
TRIAGE_MODEL = "anthropic/claude-haiku-4.5" # Haiku for batch triage
|
|
|
|
# Rate limits
|
|
MAX_RESPONSE_PER_USER_PER_HOUR = 30
|
|
MIN_MESSAGE_LENGTH = 20 # Skip very short messages
|
|
|
|
# ─── Logging ────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(name)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler(),
|
|
],
|
|
)
|
|
logger = logging.getLogger("telegram-bot")
|
|
|
|
# ─── State ──────────────────────────────────────────────────────────────
|
|
|
|
# Message buffer for batch triage
|
|
message_buffer: list[dict] = []
|
|
|
|
# Rate limiting
|
|
user_response_times: dict[int, list[float]] = defaultdict(list)
|
|
|
|
# Allowed group IDs (set after first message received, or configure)
|
|
allowed_groups: set[int] = set()
|
|
|
|
# Shared KB index (built once, refreshed on mtime change)
|
|
kb_index = KBIndex(KB_READ_DIR)
|
|
|
|
# Conversation windows — track active conversations per (chat_id, user_id)
|
|
# Rhea's model: count unanswered messages, reset on bot response, expire at threshold
|
|
CONVERSATION_WINDOW = 5 # expire after 5 unanswered messages
|
|
unanswered_count: dict[tuple[int, int], int] = {} # (chat_id, user_id) → unanswered count
|
|
|
|
# Conversation history — last N exchanges for prompt context (Ganymede: high-value change)
|
|
MAX_HISTORY = 5
|
|
conversation_history: dict[tuple[int, int], list[dict]] = {} # (chat_id, user_id) → [{user, bot}]
|
|
|
|
|
|
# ─── Helpers ────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
def get_db_stats() -> dict:
|
|
"""Get basic KB stats from pipeline DB."""
|
|
try:
|
|
conn = sqlite3.connect(PIPELINE_DB, timeout=5)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA query_only=ON")
|
|
merged = conn.execute("SELECT COUNT(*) as n FROM prs WHERE status='merged'").fetchone()["n"]
|
|
contributors = conn.execute("SELECT COUNT(*) as n FROM contributors").fetchone()["n"]
|
|
conn.close()
|
|
return {"merged_claims": merged, "contributors": contributors}
|
|
except Exception:
|
|
return {"merged_claims": "?", "contributors": "?"}
|
|
|
|
|
|
async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> str | None:
|
|
"""Call OpenRouter API."""
|
|
import aiohttp
|
|
|
|
key = Path(OPENROUTER_KEY_FILE).read_text().strip()
|
|
payload = {
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": max_tokens,
|
|
"temperature": 0.3,
|
|
}
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
"https://openrouter.ai/api/v1/chat/completions",
|
|
headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=120),
|
|
) as resp:
|
|
if resp.status >= 400:
|
|
logger.error("OpenRouter %s → %d", model, resp.status)
|
|
return None
|
|
data = await resp.json()
|
|
return data.get("choices", [{}])[0].get("message", {}).get("content")
|
|
except Exception as e:
|
|
logger.error("OpenRouter error: %s", e)
|
|
return None
|
|
|
|
|
|
def is_rate_limited(user_id: int) -> bool:
|
|
"""Check if a user has exceeded the response rate limit."""
|
|
now = time.time()
|
|
times = user_response_times[user_id]
|
|
# Prune old entries
|
|
times[:] = [t for t in times if now - t < 3600]
|
|
return len(times) >= MAX_RESPONSE_PER_USER_PER_HOUR
|
|
|
|
|
|
def sanitize_message(text: str) -> str:
|
|
"""Sanitize message content before sending to LLM. (Ganymede: security)"""
|
|
# Strip code blocks (potential prompt injection)
|
|
text = re.sub(r"```.*?```", "[code block removed]", text, flags=re.DOTALL)
|
|
# Strip anything that looks like system instructions
|
|
text = re.sub(r"(system:|assistant:|human:|<\|.*?\|>)", "", text, flags=re.IGNORECASE)
|
|
# Truncate
|
|
return text[:2000]
|
|
|
|
|
|
def _git_commit_archive(archive_path, filename: str):
|
|
"""Commit archived source to git so it survives git clean. (Rio review: data loss bug)"""
|
|
import subprocess
|
|
try:
|
|
cwd = MAIN_WORKTREE
|
|
subprocess.run(["git", "add", str(archive_path)], cwd=cwd, timeout=10,
|
|
capture_output=True, check=False)
|
|
result = subprocess.run(
|
|
["git", "commit", "-m", f"telegram: archive {filename}\n\n"
|
|
"Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>"],
|
|
cwd=cwd, timeout=10, capture_output=True, check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
# Push with retry (Ganymede: abort rebase on failure, don't lose the file)
|
|
for attempt in range(3):
|
|
rebase = subprocess.run(["git", "pull", "--rebase", "origin", "main"],
|
|
cwd=cwd, timeout=30, capture_output=True, check=False)
|
|
if rebase.returncode != 0:
|
|
subprocess.run(["git", "rebase", "--abort"], cwd=cwd, timeout=10,
|
|
capture_output=True, check=False)
|
|
logger.warning("Git rebase failed for archive %s (attempt %d), aborted", filename, attempt + 1)
|
|
continue
|
|
push = subprocess.run(["git", "push", "origin", "main"],
|
|
cwd=cwd, timeout=30, capture_output=True, check=False)
|
|
if push.returncode == 0:
|
|
logger.info("Git committed archive: %s", filename)
|
|
return
|
|
# All retries failed — file is still on filesystem (safety net), commit is uncommitted
|
|
logger.warning("Git push failed for archive %s after 3 attempts (file preserved on disk)", filename)
|
|
except Exception as e:
|
|
logger.warning("Git commit archive failed: %s", e)
|
|
|
|
|
|
def _load_learnings() -> str:
|
|
"""Load Rio's learnings file for prompt injection. Sanitized (Ganymede: prompt injection risk)."""
|
|
try:
|
|
raw = Path(LEARNINGS_FILE).read_text()[:3000]
|
|
return sanitize_message(raw) # Same sanitization as user messages
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _save_learning(correction: str, category: str = "factual"):
|
|
"""Append a learning to Rio's memory file. Direct commit to main via worktree lock.
|
|
|
|
Categories: communication, factual, structured_data
|
|
"""
|
|
try:
|
|
with main_worktree_lock(timeout=10):
|
|
section_map = {
|
|
"communication": "## Communication Notes",
|
|
"factual": "## Factual Corrections",
|
|
"structured_data": "## Structured Data",
|
|
}
|
|
section = section_map.get(category, "## Factual Corrections")
|
|
|
|
content = Path(LEARNINGS_FILE).read_text()
|
|
# Find the section and append after the last line of that section
|
|
# Simple approach: append before the next ## header or at end
|
|
lines = content.split("\n")
|
|
insert_idx = len(lines) # default: end of file
|
|
in_section = False
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == section:
|
|
in_section = True
|
|
continue
|
|
if in_section and line.startswith("## ") and line.strip() != section:
|
|
insert_idx = i
|
|
break
|
|
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
new_line = f"- [{date_str}] {correction}"
|
|
lines.insert(insert_idx, new_line)
|
|
|
|
Path(LEARNINGS_FILE).write_text("\n".join(lines))
|
|
|
|
# Commit + push
|
|
import subprocess
|
|
cwd = MAIN_WORKTREE
|
|
subprocess.run(["git", "add", LEARNINGS_FILE], cwd=cwd, timeout=10,
|
|
capture_output=True, check=False)
|
|
subprocess.run(
|
|
["git", "commit", "-m", f"rio: learn — {correction[:60]}\n\n"
|
|
"Pentagon-Agent: Rio <5551F5AF-0C5C-429F-8915-1FE74A00E019>"],
|
|
cwd=cwd, timeout=10, capture_output=True, check=False)
|
|
for _ in range(3):
|
|
subprocess.run(["git", "pull", "--rebase", "origin", "main"],
|
|
cwd=cwd, timeout=30, capture_output=True, check=False)
|
|
push = subprocess.run(["git", "push", "origin", "main"],
|
|
cwd=cwd, timeout=30, capture_output=True, check=False)
|
|
if push.returncode == 0:
|
|
logger.info("Learning saved: %s", correction[:80])
|
|
return
|
|
logger.warning("Failed to push learning (file preserved on disk)")
|
|
except TimeoutError:
|
|
logger.warning("Learning save failed: worktree lock timeout")
|
|
except Exception as e:
|
|
logger.warning("Learning save failed: %s", e)
|
|
|
|
|
|
def _compress_history(history: list[dict]) -> str:
|
|
"""Extract key context from conversation history — 20 tokens, unmissable (Ganymede)."""
|
|
if not history:
|
|
return ""
|
|
# Combine all text for entity/number extraction
|
|
all_text = " ".join(h.get("user", "") + " " + h.get("bot", "") for h in history)
|
|
tickers = sorted(set(re.findall(r"\$[A-Z]{2,10}", all_text)))
|
|
numbers = re.findall(r"\$[\d,.]+[KMB]?|\d+\.?\d*%", all_text)
|
|
parts = []
|
|
if tickers:
|
|
parts.append(f"Discussing: {', '.join(tickers)}")
|
|
if numbers:
|
|
parts.append(f"Key figures: {', '.join(numbers[:5])}")
|
|
parts.append(f"Exchanges: {len(history)}")
|
|
return " | ".join(parts)
|
|
|
|
|
|
def _format_conversation_history(chat_id: int, user_id: int) -> str:
|
|
"""Format conversation history with compressed context summary (Ganymede: Option C+A)."""
|
|
key = (chat_id, user_id)
|
|
history = conversation_history.get(key, [])
|
|
if not history:
|
|
return "(No prior conversation)"
|
|
|
|
# Compressed context first — hard for the model to miss
|
|
summary = _compress_history(history)
|
|
lines = [summary, ""]
|
|
|
|
# Full exchange log for reference
|
|
for exchange in history:
|
|
if exchange.get("user"):
|
|
lines.append(f"User: {exchange['user']}")
|
|
if exchange.get("bot"):
|
|
lines.append(f"Rio: {exchange['bot']}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# Research intent patterns (Rhea: explicit /research + natural language fallback)
|
|
RESEARCH_PATTERN = re.compile(r'/research\s+(.+)', re.IGNORECASE)
|
|
RESEARCH_NATURAL = re.compile(r'(?:research|search\s+(?:x\s+)?(?:for\s+)?|look\s+up)\s+(.+)', re.IGNORECASE)
|
|
|
|
|
|
async def handle_research(msg, query: str, user):
|
|
"""Handle a research request — search X and archive results as sources."""
|
|
username = user.username if user else "unknown"
|
|
|
|
if not check_research_rate_limit(user.id if user else 0):
|
|
remaining = get_research_remaining(user.id if user else 0)
|
|
await msg.reply_text(f"Research limit reached (3/day). Resets at midnight UTC. {remaining} remaining.")
|
|
return
|
|
|
|
await msg.chat.send_action("typing")
|
|
|
|
tweets = await search_x(query, max_results=15, min_engagement=3)
|
|
if not tweets:
|
|
await msg.reply_text(f"No recent tweets found for '{query}'.")
|
|
return
|
|
|
|
# Archive all tweets as ONE source file per research query
|
|
# (not per-tweet — one extraction PR produces claims from the best material)
|
|
try:
|
|
with main_worktree_lock(timeout=10):
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
slug = re.sub(r"[^a-z0-9]+", "-", query[:60].lower()).strip("-")
|
|
filename = f"{date_str}-x-research-{slug}.md"
|
|
source_path = Path(ARCHIVE_DIR) / filename
|
|
source_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build consolidated source file
|
|
tweets_body = ""
|
|
for i, tweet in enumerate(tweets, 1):
|
|
tweets_body += f"\n### Tweet {i} — @{tweet['author']} ({tweet.get('engagement', 0)} engagement)\n"
|
|
tweets_body += f"**URL:** {tweet.get('url', '')}\n"
|
|
tweets_body += f"**Followers:** {tweet.get('author_followers', 0)} | "
|
|
tweets_body += f"**Likes:** {tweet.get('likes', 0)} | **RT:** {tweet.get('retweets', 0)}\n\n"
|
|
tweets_body += f"{tweet['text']}\n"
|
|
|
|
source_content = f"""---
|
|
type: source
|
|
source_type: x-research
|
|
title: "X research: {query}"
|
|
url: ""
|
|
author: "multiple"
|
|
date: {date_str}
|
|
domain: internet-finance
|
|
format: social-media-collection
|
|
status: unprocessed
|
|
proposed_by: "@{username}"
|
|
contribution_type: research-direction
|
|
research_query: "{query.replace('"', "'")}"
|
|
tweet_count: {len(tweets)}
|
|
tags: [x-research, telegram-research]
|
|
---
|
|
|
|
# X Research: {query}
|
|
|
|
Submitted by @{username} via Telegram /research command.
|
|
{len(tweets)} tweets found, sorted by engagement.
|
|
|
|
{tweets_body}
|
|
"""
|
|
source_path.write_text(source_content)
|
|
archived = len(tweets)
|
|
|
|
_git_commit_archive(source_path, filename)
|
|
except TimeoutError:
|
|
logger.warning("Research archive failed: worktree lock timeout")
|
|
except Exception as e:
|
|
logger.warning("Research archive failed: %s", e)
|
|
|
|
record_research_usage(user.id if user else 0)
|
|
remaining = get_research_remaining(user.id if user else 0)
|
|
|
|
# Summary of what was found
|
|
top_authors = list(set(t["author"] for t in tweets[:5]))
|
|
await msg.reply_text(
|
|
f"Queued {archived} tweets about '{query}' for extraction. "
|
|
f"Top voices: @{', @'.join(top_authors[:3])}. "
|
|
f"Results will appear in the KB within ~30 minutes. "
|
|
f"({remaining} research requests remaining today.)"
|
|
)
|
|
logger.info("Research: @%s queried '%s', archived %d tweets", username, query, archived)
|
|
|
|
|
|
# ─── Message Handlers ───────────────────────────────────────────────────
|
|
|
|
|
|
def _is_reply_to_bot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
|
|
"""Check if a message is a reply to one of the bot's own messages."""
|
|
msg = update.message
|
|
if not msg or not msg.reply_to_message:
|
|
return False
|
|
replied = msg.reply_to_message
|
|
return replied.from_user is not None and replied.from_user.id == context.bot.id
|
|
|
|
|
|
async def handle_reply_to_bot(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle replies to the bot's messages — treat as tagged conversation."""
|
|
if not _is_reply_to_bot(update, context):
|
|
# Not a reply to us — fall through to buffer handler
|
|
await handle_message(update, context)
|
|
return
|
|
logger.info("Reply to bot from @%s",
|
|
update.message.from_user.username if update.message.from_user else "unknown")
|
|
await handle_tagged(update, context)
|
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle ALL incoming group messages — buffer for triage."""
|
|
if not update.message or not update.message.text:
|
|
return
|
|
|
|
msg = update.message
|
|
text = msg.text.strip()
|
|
|
|
# Skip very short messages
|
|
if len(text) < MIN_MESSAGE_LENGTH:
|
|
return
|
|
|
|
# Conversation window behavior depends on chat type (Rio: DMs vs groups)
|
|
# DMs: auto-respond (always 1-on-1, no false positives)
|
|
# Groups: silent context only (reply-to is the only follow-up trigger)
|
|
user = msg.from_user
|
|
is_dm = msg.chat.type == "private"
|
|
|
|
if user:
|
|
key = (msg.chat_id, user.id)
|
|
if key in unanswered_count:
|
|
unanswered_count[key] += 1
|
|
|
|
if is_dm and unanswered_count[key] < CONVERSATION_WINDOW:
|
|
# DM: auto-respond — conversation window fires
|
|
logger.info("DM conversation window: @%s msg %d/%d",
|
|
user.username or "?", unanswered_count[key], CONVERSATION_WINDOW)
|
|
await handle_tagged(update, context)
|
|
return
|
|
# Group: don't track silent messages in history (Ganymede: Option A)
|
|
# History should be the actual conversation, not a log of everything said in the group
|
|
# Expire window after CONVERSATION_WINDOW unanswered messages
|
|
if unanswered_count[key] >= CONVERSATION_WINDOW:
|
|
del unanswered_count[key]
|
|
conversation_history.pop(key, None)
|
|
logger.info("Conversation window expired for @%s", user.username or "?")
|
|
|
|
# Buffer for batch triage
|
|
message_buffer.append({
|
|
"text": sanitize_message(text),
|
|
"user_id": msg.from_user.id if msg.from_user else None,
|
|
"username": msg.from_user.username if msg.from_user else None,
|
|
"display_name": msg.from_user.full_name if msg.from_user else None,
|
|
"chat_id": msg.chat_id,
|
|
"message_id": msg.message_id,
|
|
"timestamp": msg.date.isoformat() if msg.date else datetime.now(timezone.utc).isoformat(),
|
|
"reply_to": msg.reply_to_message.message_id if msg.reply_to_message else None,
|
|
})
|
|
|
|
|
|
async def handle_tagged(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle messages that tag the bot — Rio responds with Opus."""
|
|
if not update.message or not update.message.text:
|
|
return
|
|
|
|
msg = update.message
|
|
user = msg.from_user
|
|
text = sanitize_message(msg.text)
|
|
|
|
# Rate limit check
|
|
if user and is_rate_limited(user.id):
|
|
await msg.reply_text("I'm processing other requests — try again in a few minutes.")
|
|
return
|
|
|
|
logger.info("Tagged by @%s: %s", user.username if user else "unknown", text[:100])
|
|
|
|
# Check for explicit /research command (Rhea: fast path before LLM)
|
|
research_match = RESEARCH_PATTERN.search(text) or RESEARCH_NATURAL.search(text)
|
|
if research_match:
|
|
query = research_match.group(1).strip()
|
|
# Respond with KB knowledge first, then trigger research async (Ganymede: don't block)
|
|
await handle_research(msg, query, user)
|
|
# Don't return — continue to normal response so user gets immediate KB answer too
|
|
|
|
# Send typing indicator
|
|
await msg.chat.send_action("typing")
|
|
|
|
# Retrieve full KB context (entity resolution + claim search + agent positions)
|
|
kb_ctx = retrieve_context(text, KB_READ_DIR, index=kb_index)
|
|
kb_context_text = format_context_for_prompt(kb_ctx)
|
|
stats = get_db_stats()
|
|
|
|
# Fetch live market data for any tokens mentioned (Rhea: market-data API)
|
|
market_context = ""
|
|
token_mentions = re.findall(r"\$([A-Z]{2,10})", text.upper())
|
|
# Entity name → token mapping for natural language mentions
|
|
ENTITY_TOKEN_MAP = {
|
|
"omnipair": "OMFG", "metadao": "META", "sanctum": "CLOUD",
|
|
"drift": "DRIFT", "ore": "ORE", "jupiter": "JUP",
|
|
}
|
|
text_lower = text.lower()
|
|
for name, ticker in ENTITY_TOKEN_MAP.items():
|
|
if name in text_lower:
|
|
token_mentions.append(ticker)
|
|
# Also check entity matches from KB retrieval
|
|
for ent in kb_ctx.entities:
|
|
for tag in ent.tags:
|
|
if tag.upper() in ENTITY_TOKEN_MAP.values():
|
|
token_mentions.append(tag.upper())
|
|
for token in set(token_mentions):
|
|
try:
|
|
data = await get_token_price(token)
|
|
if data:
|
|
price_str = format_price_context(data, token)
|
|
if price_str:
|
|
market_context += price_str + "\n"
|
|
except Exception:
|
|
pass # Market data is supplementary — never block on failure
|
|
|
|
# Build Opus prompt — Rio's voice
|
|
prompt = f"""You are Rio, the Teleo internet finance agent. Your Telegram handle is @FutAIrdBot — that IS you. Users tag @FutAIrdBot to reach you. Never say "I'm not FutAIrdBot." You are also @futaRdIO on X. You have deep knowledge about futarchy, prediction markets, token governance, and the MetaDAO ecosystem.
|
|
|
|
## How to sound
|
|
Write like a sharp analyst talking to peers, not like an AI. Specifically:
|
|
- Use your knowledge naturally. Don't say "the KB tracks" or "at experimental confidence" or "our claims show." Just state what you know and how confident you are in plain language.
|
|
- Have a take. You're an analyst, not a summarizer. Say what you actually think.
|
|
- Default SHORT. 1-2 sentences unless the question genuinely requires depth. Match the length and energy of the user's message — if they wrote one line, you write one line. Every word has to add value.
|
|
- Sound human. Avoid em dashes, avoid starting sentences with "That said" or "The honest X is." Vary your sentence structure. Be direct.
|
|
- No markdown. Plain text only, no asterisks or formatting. Use line breaks between paragraphs.
|
|
- When you're uncertain, just say so simply. "I'm not sure about X" beats "we don't have data on this yet."
|
|
|
|
## Your learnings (corrections from past conversations — prioritize these over KB data when they conflict)
|
|
{_load_learnings()}
|
|
|
|
## What you know about this topic
|
|
{kb_context_text}
|
|
|
|
{f"## Live Market Data{chr(10)}{market_context}" if market_context else ""}
|
|
|
|
## Conversation History (NEVER ask a question your history already answers)
|
|
{_format_conversation_history(msg.chat_id, user.id if user else 0)}
|
|
|
|
## The message you're responding to
|
|
From: @{user.username if user else 'unknown'}
|
|
Message: {text}
|
|
|
|
Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you — only respond when tagged or replied to.
|
|
|
|
IMPORTANT: If you learn something from this exchange — a correction, a new rule, new data — append a LEARNING line at the very end of your response. Format:
|
|
LEARNING: [category] [what you learned]
|
|
Categories: factual, communication, structured_data
|
|
Only when you genuinely learned something. Most responses have NO learning line."""
|
|
|
|
# Call Opus
|
|
response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024)
|
|
|
|
if not response:
|
|
await msg.reply_text("Processing error — I'll get back to you.")
|
|
return
|
|
|
|
# Parse LEARNING lines before posting (Rhea: zero-cost self-write trigger)
|
|
display_response = response
|
|
learning_lines = re.findall(r'^LEARNING:\s*(factual|communication|structured_data)\s+(.+)$',
|
|
response, re.MULTILINE)
|
|
if learning_lines:
|
|
# Strip LEARNING lines from displayed response
|
|
display_response = re.sub(r'\nLEARNING:\s*\S+\s+.+$', '', response, flags=re.MULTILINE).rstrip()
|
|
# Save each learning
|
|
for category, correction in learning_lines:
|
|
_save_learning(correction.strip(), category.strip())
|
|
logger.info("Auto-learned [%s]: %s", category, correction[:80])
|
|
|
|
# Post response (without LEARNING lines)
|
|
await msg.reply_text(display_response)
|
|
|
|
# Update conversation state: reset window, store history (Ganymede+Rhea)
|
|
if user:
|
|
key = (msg.chat_id, user.id)
|
|
unanswered_count[key] = 0 # reset — conversation alive
|
|
history = conversation_history.setdefault(key, [])
|
|
history.append({"user": text[:500], "bot": response[:500]})
|
|
if len(history) > MAX_HISTORY:
|
|
history.pop(0)
|
|
|
|
# Record rate limit
|
|
if user:
|
|
user_response_times[user.id].append(time.time())
|
|
|
|
# Log the exchange for audit trail
|
|
logger.info("Rio responded to @%s (msg_id=%d)", user.username if user else "?", msg.message_id)
|
|
|
|
# Detect and fetch URLs for pipeline ingestion
|
|
urls = _extract_urls(text)
|
|
url_content = None
|
|
if urls:
|
|
logger.info("Fetching URL: %s", urls[0])
|
|
url_content = await _fetch_url_content(urls[0])
|
|
if url_content:
|
|
logger.info("Fetched %d chars from %s", len(url_content), urls[0])
|
|
|
|
# Archive the exchange as a source for pipeline (slow path)
|
|
_archive_exchange(text, response, user, msg, url_content=url_content, urls=urls)
|
|
|
|
|
|
async def _fetch_url_content(url: str) -> str | None:
|
|
"""Fetch article/page content from a URL for pipeline ingestion."""
|
|
import aiohttp
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
|
|
if resp.status >= 400:
|
|
return None
|
|
html = await resp.text()
|
|
# Strip HTML tags for plain text (basic — upgrade to readability later)
|
|
text = re.sub(r"<script.*?</script>", "", html, flags=re.DOTALL)
|
|
text = re.sub(r"<style.*?</style>", "", text, flags=re.DOTALL)
|
|
text = re.sub(r"<[^>]+>", " ", text)
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text[:10000] # Cap at 10K chars
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch URL %s: %s", url, e)
|
|
return None
|
|
|
|
|
|
def _extract_urls(text: str) -> list[str]:
|
|
"""Extract URLs from message text."""
|
|
return re.findall(r"https?://[^\s<>\"']+", text)
|
|
|
|
|
|
def _archive_exchange(user_text: str, rio_response: str, user, msg,
|
|
url_content: str | None = None, urls: list[str] | None = None):
|
|
"""Archive a tagged exchange to inbox/queue/ for pipeline processing."""
|
|
try:
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
username = user.username if user else "anonymous"
|
|
slug = re.sub(r"[^a-z0-9]+", "-", user_text[:50].lower()).strip("-")
|
|
filename = f"{date_str}-telegram-{username}-{slug}.md"
|
|
|
|
archive_path = Path(ARCHIVE_DIR) / filename
|
|
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Extract rationale (the user's text minus the @mention and URL)
|
|
rationale = re.sub(r"@\w+", "", user_text).strip()
|
|
for url in (urls or []):
|
|
rationale = rationale.replace(url, "").strip()
|
|
|
|
# Determine priority — directed contribution with rationale gets high priority
|
|
priority = "high" if rationale and len(rationale) > 20 else "medium"
|
|
intake_tier = "directed" if rationale and len(rationale) > 20 else "undirected"
|
|
|
|
url_section = ""
|
|
if url_content:
|
|
url_section = f"\n## Article Content (fetched)\n\n{url_content[:8000]}\n"
|
|
|
|
content = f"""---
|
|
type: source
|
|
source_type: telegram
|
|
title: "Telegram: @{username} — {slug}"
|
|
author: "@{username}"
|
|
url: "{urls[0] if urls else ''}"
|
|
date: {date_str}
|
|
domain: internet-finance
|
|
format: conversation
|
|
status: unprocessed
|
|
priority: {priority}
|
|
intake_tier: {intake_tier}
|
|
rationale: "{rationale[:200]}"
|
|
proposed_by: "@{username}"
|
|
tags: [telegram, ownership-community]
|
|
---
|
|
|
|
## Conversation
|
|
|
|
**@{username}:**
|
|
{user_text}
|
|
|
|
**Rio (response):**
|
|
{rio_response}
|
|
{url_section}
|
|
## Agent Notes
|
|
**Why archived:** Tagged exchange in ownership community.
|
|
**Rationale from contributor:** {rationale if rationale else 'No rationale provided (bare link or question)'}
|
|
**Intake tier:** {intake_tier} — {'fast-tracked, contributor provided reasoning' if intake_tier == 'directed' else 'standard processing'}
|
|
**Triage:** Conversation may contain [CLAIM], [ENTITY], or [EVIDENCE] for extraction.
|
|
"""
|
|
# Write to telegram-archives/ (outside worktree — no read-only errors)
|
|
# A cron moves files into inbox/queue/ and commits them
|
|
archive_path.write_text(content)
|
|
logger.info("Archived exchange to %s (tier: %s, urls: %d)",
|
|
filename, intake_tier, len(urls or []))
|
|
except Exception as e:
|
|
logger.error("Failed to archive exchange: %s", e)
|
|
|
|
|
|
# ─── Batch Triage ───────────────────────────────────────────────────────
|
|
|
|
|
|
async def run_batch_triage(context: ContextTypes.DEFAULT_TYPE):
|
|
"""Batch triage of buffered messages every TRIAGE_INTERVAL seconds.
|
|
|
|
Groups messages into conversation windows, sends to Haiku for classification,
|
|
archives substantive findings.
|
|
"""
|
|
global message_buffer
|
|
|
|
if not message_buffer:
|
|
return
|
|
|
|
# Grab and clear buffer
|
|
messages = message_buffer[:]
|
|
message_buffer = []
|
|
|
|
logger.info("Batch triage: %d messages to process", len(messages))
|
|
|
|
# Group into conversation windows (messages within 5 min of each other)
|
|
windows = _group_into_windows(messages, window_seconds=300)
|
|
|
|
if not windows:
|
|
return
|
|
|
|
# Build triage prompt
|
|
windows_text = ""
|
|
for i, window in enumerate(windows):
|
|
window_msgs = "\n".join(
|
|
f" @{m.get('username', '?')}: {m['text'][:200]}"
|
|
for m in window
|
|
)
|
|
windows_text += f"\n--- Window {i+1} ({len(window)} messages) ---\n{window_msgs}\n"
|
|
|
|
prompt = f"""Classify each conversation window. For each, respond with ONE tag:
|
|
|
|
[CLAIM] — Contains a specific, disagreeable proposition about how something works
|
|
[ENTITY] — Contains factual data about a company, protocol, person, or market
|
|
[EVIDENCE] — Contains data or argument that supports or challenges an existing claim about internet finance, futarchy, prediction markets, or token governance
|
|
[SKIP] — Casual conversation, not relevant to the knowledge base
|
|
|
|
Be generous with EVIDENCE — even confirming evidence strengthens the KB.
|
|
|
|
{windows_text}
|
|
|
|
Respond with ONLY the window numbers and tags, one per line:
|
|
1: [TAG]
|
|
2: [TAG]
|
|
..."""
|
|
|
|
result = await call_openrouter(TRIAGE_MODEL, prompt, max_tokens=500)
|
|
|
|
if not result:
|
|
logger.warning("Triage LLM call failed — buffered messages dropped")
|
|
return
|
|
|
|
# Parse triage results
|
|
for line in result.strip().split("\n"):
|
|
match = re.match(r"(\d+):\s*\[(\w+)\]", line)
|
|
if not match:
|
|
continue
|
|
idx = int(match.group(1)) - 1
|
|
tag = match.group(2).upper()
|
|
|
|
if idx < 0 or idx >= len(windows):
|
|
continue
|
|
|
|
if tag in ("CLAIM", "ENTITY", "EVIDENCE"):
|
|
_archive_window(windows[idx], tag)
|
|
|
|
logger.info("Triage complete: %d windows processed", len(windows))
|
|
|
|
|
|
def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]:
|
|
"""Group messages into conversation windows by time proximity."""
|
|
if not messages:
|
|
return []
|
|
|
|
# Sort by timestamp
|
|
messages.sort(key=lambda m: m.get("timestamp", ""))
|
|
|
|
windows = []
|
|
current_window = [messages[0]]
|
|
|
|
for msg in messages[1:]:
|
|
# Simple grouping: if within window_seconds of previous message, same window
|
|
current_window.append(msg)
|
|
if len(current_window) >= 10: # Cap window size
|
|
windows.append(current_window)
|
|
current_window = []
|
|
|
|
if current_window:
|
|
windows.append(current_window)
|
|
|
|
return windows
|
|
|
|
|
|
def _archive_window(window: list[dict], tag: str):
|
|
"""Archive a triaged conversation window to inbox/queue/."""
|
|
try:
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
first_user = window[0].get("username", "group")
|
|
slug = re.sub(r"[^a-z0-9]+", "-", window[0]["text"][:40].lower()).strip("-")
|
|
filename = f"{date_str}-telegram-{first_user}-{slug}.md"
|
|
|
|
archive_path = Path(ARCHIVE_DIR) / filename
|
|
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build conversation content
|
|
conversation = ""
|
|
contributors = set()
|
|
for msg in window:
|
|
username = msg.get("username", "anonymous")
|
|
contributors.add(username)
|
|
conversation += f"**@{username}:** {msg['text']}\n\n"
|
|
|
|
content = f"""---
|
|
type: source
|
|
source_type: telegram
|
|
title: "Telegram conversation: {slug}"
|
|
author: "{', '.join(contributors)}"
|
|
date: {date_str}
|
|
domain: internet-finance
|
|
format: conversation
|
|
status: unprocessed
|
|
priority: medium
|
|
triage_tag: {tag.lower()}
|
|
tags: [telegram, ownership-community]
|
|
---
|
|
|
|
## Conversation ({len(window)} messages, {len(contributors)} participants)
|
|
|
|
{conversation}
|
|
|
|
## Agent Notes
|
|
**Triage:** [{tag}] — classified by batch triage
|
|
**Participants:** {', '.join(f'@{u}' for u in contributors)}
|
|
"""
|
|
# Write to telegram-archives/ (outside worktree)
|
|
archive_path.write_text(content)
|
|
logger.info("Archived window [%s]: %s (%d msgs, %d participants)",
|
|
tag, filename, len(window), len(contributors))
|
|
except TimeoutError:
|
|
logger.warning("Failed to archive window: worktree lock timeout")
|
|
except Exception as e:
|
|
logger.error("Failed to archive window: %s", e)
|
|
|
|
|
|
# ─── Bot Setup ──────────────────────────────────────────────────────────
|
|
|
|
|
|
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /start command."""
|
|
await update.message.reply_text(
|
|
"I'm Rio, the internet finance agent for TeleoHumanity's collective knowledge base. "
|
|
"Tag me with @teleo to ask about futarchy, prediction markets, token governance, "
|
|
"or anything in our domain. I'll ground my response in our KB's evidence."
|
|
)
|
|
|
|
|
|
async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /stats command — show KB stats."""
|
|
kb_index.ensure_fresh()
|
|
stats = get_db_stats()
|
|
await update.message.reply_text(
|
|
f"📊 KB Stats:\n"
|
|
f"• {len(kb_index._claims)} claims indexed\n"
|
|
f"• {len(kb_index._entities)} entities tracked\n"
|
|
f"• {len(kb_index._positions)} agent positions\n"
|
|
f"• {stats['merged_claims']} PRs merged\n"
|
|
f"• {stats['contributors']} contributors"
|
|
)
|
|
|
|
|
|
def main():
|
|
"""Start the bot."""
|
|
# Load token
|
|
token_path = Path(BOT_TOKEN_FILE)
|
|
if not token_path.exists():
|
|
logger.error("Bot token not found at %s", BOT_TOKEN_FILE)
|
|
sys.exit(1)
|
|
token = token_path.read_text().strip()
|
|
|
|
logger.info("Starting Teleo Telegram bot (Rio)...")
|
|
|
|
# Build application
|
|
app = Application.builder().token(token).build()
|
|
|
|
# Command handlers
|
|
app.add_handler(CommandHandler("start", start_command))
|
|
app.add_handler(CommandHandler("stats", stats_command))
|
|
|
|
# Tag handler — messages mentioning the bot
|
|
# python-telegram-bot filters.Mention doesn't work for bot mentions in groups
|
|
# Use a regex filter for the bot username
|
|
app.add_handler(MessageHandler(
|
|
filters.TEXT & filters.Regex(r"(?i)(@teleo|@futairdbot)"),
|
|
handle_tagged,
|
|
))
|
|
|
|
# Reply handler — replies to the bot's own messages continue the conversation
|
|
reply_to_bot_filter = filters.TEXT & filters.REPLY & ~filters.COMMAND
|
|
app.add_handler(MessageHandler(
|
|
reply_to_bot_filter,
|
|
handle_reply_to_bot,
|
|
))
|
|
|
|
# All other text messages — buffer for triage
|
|
app.add_handler(MessageHandler(
|
|
filters.TEXT & ~filters.COMMAND,
|
|
handle_message,
|
|
))
|
|
|
|
# Batch triage job
|
|
app.job_queue.run_repeating(
|
|
run_batch_triage,
|
|
interval=TRIAGE_INTERVAL,
|
|
first=TRIAGE_INTERVAL,
|
|
)
|
|
|
|
# Run
|
|
logger.info("Bot running. Triage interval: %ds", TRIAGE_INTERVAL)
|
|
app.run_polling(drop_pending_updates=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|