From 66bc742979278474ec6716aebd28cca65958ae8e Mon Sep 17 00:00:00 2001 From: m3taversal Date: Wed, 25 Mar 2026 13:35:10 +0000 Subject: [PATCH] feat: full transcript archival + SOURCE:/CLAIM: inline tags Transcript system: - All messages in all chats captured to chat_transcripts store - 1-hour dump job writes per-chat JSON to /opt/teleo-eval/transcripts/ - Includes internal reasoning (KB matches, searches, learnings) - Transcripts accumulate over session (no clear on dump) - Per-chat directories: transcripts/{chat-slug}/{date-hour}.json Inline contribution tags: - SOURCE: creates inbox source file with verbatim user content - CLAIM: creates draft claim file attributed to contributor - Both strip tag from displayed response - Full user message preserved verbatim (Rio decides context, can't alter) Also: multi-URL processing (up to 5 per message) Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70> --- telegram/bot.py | 241 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 234 insertions(+), 7 deletions(-) diff --git a/telegram/bot.py b/telegram/bot.py index 1ca8c20..f31d6d6 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -104,6 +104,192 @@ MAX_HISTORY_USER = 5 MAX_HISTORY_CHAT = 30 # Group chats: multiple users, longer threads conversation_history: dict[tuple[int, int], list[dict]] = {} # (chat_id, user_id) → [{user, bot}] +# Full transcript store — all messages in all chats, dumped every 6 hours +# Keyed by chat_id. No cap — dumped and cleared on schedule. +chat_transcripts: dict[int, list[dict]] = {} +TRANSCRIPT_DIR = "/opt/teleo-eval/transcripts" + + +# ─── Transcript Management ────────────────────────────────────────────── + + +def _record_transcript(msg, text: str, is_bot: bool = False, + rio_response: str = None, internal: dict = None): + """Record a message to the full transcript for this chat.""" + chat_id = msg.chat_id + transcript = chat_transcripts.setdefault(chat_id, []) + + entry = { + "ts": msg.date.isoformat() if hasattr(msg, "date") and msg.date else datetime.now(timezone.utc).isoformat(), + "chat_id": chat_id, + "chat_title": msg.chat.title if hasattr(msg, "chat") and msg.chat else str(chat_id), + "message_id": msg.message_id if hasattr(msg, "message_id") else None, + } + + if is_bot: + entry["type"] = "bot_response" + entry["rio_response"] = rio_response or text + if internal: + entry["internal"] = internal # KB matches, searches, learnings + else: + user = msg.from_user if hasattr(msg, "from_user") else None + entry["type"] = "user_message" + entry["username"] = f"@{user.username}" if user and user.username else "unknown" + entry["display_name"] = user.full_name if user else "unknown" + entry["user_id"] = user.id if user else None + entry["message"] = text[:2000] + entry["reply_to"] = msg.reply_to_message.message_id if hasattr(msg, "reply_to_message") and msg.reply_to_message else None + + transcript.append(entry) + + +async def _dump_transcripts(context=None): + """Dump all chat transcripts to VPS-local JSON files. Runs every 6 hours.""" + if not chat_transcripts: + return + + os.makedirs(TRANSCRIPT_DIR, exist_ok=True) + now = datetime.now(timezone.utc) + period_end = now.strftime("%Y-%m-%dT%H") + + for chat_id, entries in list(chat_transcripts.items()): + if not entries: + continue + + # Get chat title from first entry + chat_title = entries[0].get("chat_title", str(chat_id)) + chat_slug = re.sub(r"[^a-z0-9]+", "-", chat_title.lower()).strip("-") or str(chat_id) + + # Create per-chat directory + chat_dir = os.path.join(TRANSCRIPT_DIR, chat_slug) + os.makedirs(chat_dir, exist_ok=True) + + # Build transcript document + unique_users = set() + for e in entries: + if e.get("username"): + unique_users.add(e["username"]) + + doc = { + "chat_id": chat_id, + "chat_title": chat_title, + "period": { + "start": entries[0].get("ts", ""), + "end": entries[-1].get("ts", ""), + "dumped_at": now.isoformat(), + }, + "stats": { + "total_messages": len(entries), + "unique_users": len(unique_users), + "users": sorted(unique_users), + "bot_responses": sum(1 for e in entries if e.get("type") == "bot_response"), + }, + "messages": entries, + } + + filename = f"{period_end}.json" + filepath = os.path.join(chat_dir, filename) + + try: + import json as _json + with open(filepath, "w") as f: + _json.dump(doc, f, indent=2, default=str) + logger.info("Transcript dumped: %s (%d messages, %d users)", + filepath, len(entries), len(unique_users)) + except Exception as e: + logger.warning("Failed to dump transcript for %s: %s", chat_slug, e) + + # Don't clear — transcripts accumulate over the session. + # Each dump is the full history since last restart. + + +def _create_inline_source(source_text: str, user_message: str, user, msg): + """Create a source file from Rio's SOURCE: tag. Verbatim user content, attributed.""" + try: + username = user.username if user else "anonymous" + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + slug = re.sub(r"[^a-z0-9]+", "-", source_text[:50].lower()).strip("-") + filename = f"{date_str}-tg-source-{username}-{slug}.md" + source_path = Path(ARCHIVE_DIR) / filename + if source_path.exists(): + return + + content = f"""--- +type: source +source_type: telegram-contribution +title: "Source from @{username} — {source_text[:80]}" +author: "@{username}" +date: {date_str} +domain: internet-finance +format: contribution +status: unprocessed +proposed_by: "@{username}" +contribution_type: source-submission +tags: [telegram-contribution, inline-source] +--- + +# Source: {source_text[:100]} + +Contributed by @{username} in Telegram chat. +Flagged by Rio as relevant source material. + +## Verbatim User Message + +{user_message} + +## Rio's Context + +{source_text} +""" + source_path.write_text(content) + logger.info("Inline source created: %s (by @%s)", filename, username) + except Exception as e: + logger.warning("Failed to create inline source: %s", e) + + +def _create_inline_claim(claim_text: str, user_message: str, user, msg): + """Create a draft claim file from Rio's CLAIM: tag. Attributed to contributor.""" + try: + username = user.username if user else "anonymous" + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + slug = re.sub(r"[^a-z0-9]+", "-", claim_text[:60].lower()).strip("-") + filename = f"{date_str}-tg-claim-{username}-{slug}.md" + source_path = Path(ARCHIVE_DIR) / filename + if source_path.exists(): + return + + content = f"""--- +type: source +source_type: telegram-claim +title: "Claim from @{username} — {claim_text[:80]}" +author: "@{username}" +date: {date_str} +domain: internet-finance +format: claim-draft +status: unprocessed +proposed_by: "@{username}" +contribution_type: claim-proposal +tags: [telegram-claim, inline-claim] +--- + +# Draft Claim: {claim_text} + +Contributed by @{username} in Telegram chat. +Flagged by Rio as a specific, disagreeable assertion worth extracting. + +## Verbatim User Message + +{user_message} + +## Proposed Claim + +{claim_text} +""" + source_path.write_text(content) + logger.info("Inline claim drafted: %s (by @%s)", filename, username) + except Exception as e: + logger.warning("Failed to create inline claim: %s", e) + # ─── Helpers ──────────────────────────────────────────────────────────── @@ -538,6 +724,9 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): conversation_history.pop(key, None) logger.info("Conversation window expired for @%s", user.username or "?") + # Capture to full transcript (all messages, all chats) + _record_transcript(msg, text, is_bot=False) + # Buffer for batch triage message_buffer.append({ "text": sanitize_message(text), @@ -730,15 +919,21 @@ Message: {text} Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you — only respond when tagged or replied to. -IMPORTANT: Two special tags you can append at the end of your response (after your main text): +IMPORTANT: Special tags you can append at the end of your response (after your main text): -1. If you learn something: LEARNING: [category] [what you learned] +1. LEARNING: [category] [what you learned] Categories: factual, communication, structured_data Only when genuinely learned something. Most responses have none. - NEVER save a learning about what data you do or don't have access to. Your knowledge base changes constantly — availability learnings become stale immediately. + NEVER save a learning about what data you do or don't have access to. -2. If the user would benefit from an X search on a topic: RESEARCH: [search query] - This triggers an automatic X search. Use when the user asks about recent sentiment, community takes, or emerging discussions. Only when a search would genuinely help.""" +2. RESEARCH: [search query] + Triggers a live X search and sends results back to the chat. Use when the user asks about recent activity, sentiment, or discussions. + +3. SOURCE: [description of what to ingest] + When a user shares valuable source material (X posts, articles, data). Creates a source file in the ingestion pipeline, attributed to the user. Include the verbatim content — don't alter or summarize the user's contribution. Use this when someone drops a link or shares original analysis worth preserving. + +4. CLAIM: [specific, disagreeable assertion] + When a user makes a specific claim with evidence that could enter the KB. Creates a draft claim file attributed to them. Only for genuine claims — not opinions or questions.""" # Call Opus response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024) @@ -771,7 +966,23 @@ IMPORTANT: Two special tags you can append at the end of your response (after yo _research_and_followup(msg, query.strip(), user)) logger.info("Auto-research triggered (will follow up): %s", query[:80]) - # Post response (without LEARNING lines) + # SOURCE: tag — Rio flags content for pipeline ingestion (verbatim, attributed) + source_lines = re.findall(r'^SOURCE:\s+(.+)$', response, re.MULTILINE) + if source_lines: + display_response = re.sub(r'\nSOURCE:\s+.+$', '', display_response, flags=re.MULTILINE).rstrip() + for source_text in source_lines: + _create_inline_source(source_text.strip(), text, user, msg) + logger.info("Inline SOURCE created: %s", source_text[:80]) + + # CLAIM: tag — Rio flags a specific assertion for claim drafting + claim_lines = re.findall(r'^CLAIM:\s+(.+)$', response, re.MULTILINE) + if claim_lines: + display_response = re.sub(r'\nCLAIM:\s+.+$', '', display_response, flags=re.MULTILINE).rstrip() + for claim_text in claim_lines: + _create_inline_claim(claim_text.strip(), text, user, msg) + logger.info("Inline CLAIM drafted: %s", claim_text[:80]) + + # Post response (without tag lines) # Telegram has a 4096 char limit — split long messages if len(display_response) <= 4096: await msg.reply_text(display_response) @@ -820,6 +1031,15 @@ IMPORTANT: Two special tags you can append at the end of your response (after yo # Log the exchange for audit trail logger.info("Rio responded to @%s (msg_id=%d)", user.username if user else "?", msg.message_id) + # Record bot response to transcript (with internal reasoning) + _record_transcript(msg, display_response, is_bot=True, rio_response=display_response, + internal={ + "entities_matched": [e.name for e in kb_ctx.entities] if kb_ctx else [], + "claims_matched": len(kb_ctx.claims) if kb_ctx else 0, + "search_triggered": bool(research_context), + "learnings_written": bool(learning_lines) if 'learning_lines' in dir() else False, + }) + # Detect and fetch URLs for pipeline ingestion (all URLs, not just first) urls = _extract_urls(text) url_content = None @@ -1249,8 +1469,15 @@ def main(): first=TRIAGE_INTERVAL, ) + # Transcript dump job — every 1 hour + app.job_queue.run_repeating( + _dump_transcripts, + interval=3600, + first=3600, + ) + # Run - logger.info("Bot running. Triage interval: %ds", TRIAGE_INTERVAL) + logger.info("Bot running. Triage interval: %ds, transcript dump: 1h", TRIAGE_INTERVAL) app.run_polling(drop_pending_updates=True)