feat: full transcript archival + SOURCE:/CLAIM: inline tags

Transcript system:
- All messages in all chats captured to chat_transcripts store
- 1-hour dump job writes per-chat JSON to /opt/teleo-eval/transcripts/
- Includes internal reasoning (KB matches, searches, learnings)
- Transcripts accumulate over session (no clear on dump)
- Per-chat directories: transcripts/{chat-slug}/{date-hour}.json

Inline contribution tags:
- SOURCE: creates inbox source file with verbatim user content
- CLAIM: creates draft claim file attributed to contributor
- Both strip tag from displayed response
- Full user message preserved verbatim (Rio decides context, can't alter)

Also: multi-URL processing (up to 5 per message)

Pentagon-Agent: Epimetheus <3D35839A-7722-4740-B93D-51157F7D5E70>
This commit is contained in:
m3taversal 2026-03-25 13:35:10 +00:00
parent 0759655688
commit 66bc742979

View file

@ -104,6 +104,192 @@ MAX_HISTORY_USER = 5
MAX_HISTORY_CHAT = 30 # Group chats: multiple users, longer threads
conversation_history: dict[tuple[int, int], list[dict]] = {} # (chat_id, user_id) → [{user, bot}]
# Full transcript store — all messages in all chats, dumped every 6 hours
# Keyed by chat_id. No cap — dumped and cleared on schedule.
chat_transcripts: dict[int, list[dict]] = {}
TRANSCRIPT_DIR = "/opt/teleo-eval/transcripts"
# ─── Transcript Management ──────────────────────────────────────────────
def _record_transcript(msg, text: str, is_bot: bool = False,
rio_response: str = None, internal: dict = None):
"""Record a message to the full transcript for this chat."""
chat_id = msg.chat_id
transcript = chat_transcripts.setdefault(chat_id, [])
entry = {
"ts": msg.date.isoformat() if hasattr(msg, "date") and msg.date else datetime.now(timezone.utc).isoformat(),
"chat_id": chat_id,
"chat_title": msg.chat.title if hasattr(msg, "chat") and msg.chat else str(chat_id),
"message_id": msg.message_id if hasattr(msg, "message_id") else None,
}
if is_bot:
entry["type"] = "bot_response"
entry["rio_response"] = rio_response or text
if internal:
entry["internal"] = internal # KB matches, searches, learnings
else:
user = msg.from_user if hasattr(msg, "from_user") else None
entry["type"] = "user_message"
entry["username"] = f"@{user.username}" if user and user.username else "unknown"
entry["display_name"] = user.full_name if user else "unknown"
entry["user_id"] = user.id if user else None
entry["message"] = text[:2000]
entry["reply_to"] = msg.reply_to_message.message_id if hasattr(msg, "reply_to_message") and msg.reply_to_message else None
transcript.append(entry)
async def _dump_transcripts(context=None):
"""Dump all chat transcripts to VPS-local JSON files. Runs every 6 hours."""
if not chat_transcripts:
return
os.makedirs(TRANSCRIPT_DIR, exist_ok=True)
now = datetime.now(timezone.utc)
period_end = now.strftime("%Y-%m-%dT%H")
for chat_id, entries in list(chat_transcripts.items()):
if not entries:
continue
# Get chat title from first entry
chat_title = entries[0].get("chat_title", str(chat_id))
chat_slug = re.sub(r"[^a-z0-9]+", "-", chat_title.lower()).strip("-") or str(chat_id)
# Create per-chat directory
chat_dir = os.path.join(TRANSCRIPT_DIR, chat_slug)
os.makedirs(chat_dir, exist_ok=True)
# Build transcript document
unique_users = set()
for e in entries:
if e.get("username"):
unique_users.add(e["username"])
doc = {
"chat_id": chat_id,
"chat_title": chat_title,
"period": {
"start": entries[0].get("ts", ""),
"end": entries[-1].get("ts", ""),
"dumped_at": now.isoformat(),
},
"stats": {
"total_messages": len(entries),
"unique_users": len(unique_users),
"users": sorted(unique_users),
"bot_responses": sum(1 for e in entries if e.get("type") == "bot_response"),
},
"messages": entries,
}
filename = f"{period_end}.json"
filepath = os.path.join(chat_dir, filename)
try:
import json as _json
with open(filepath, "w") as f:
_json.dump(doc, f, indent=2, default=str)
logger.info("Transcript dumped: %s (%d messages, %d users)",
filepath, len(entries), len(unique_users))
except Exception as e:
logger.warning("Failed to dump transcript for %s: %s", chat_slug, e)
# Don't clear — transcripts accumulate over the session.
# Each dump is the full history since last restart.
def _create_inline_source(source_text: str, user_message: str, user, msg):
"""Create a source file from Rio's SOURCE: tag. Verbatim user content, attributed."""
try:
username = user.username if user else "anonymous"
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
slug = re.sub(r"[^a-z0-9]+", "-", source_text[:50].lower()).strip("-")
filename = f"{date_str}-tg-source-{username}-{slug}.md"
source_path = Path(ARCHIVE_DIR) / filename
if source_path.exists():
return
content = f"""---
type: source
source_type: telegram-contribution
title: "Source from @{username}{source_text[:80]}"
author: "@{username}"
date: {date_str}
domain: internet-finance
format: contribution
status: unprocessed
proposed_by: "@{username}"
contribution_type: source-submission
tags: [telegram-contribution, inline-source]
---
# Source: {source_text[:100]}
Contributed by @{username} in Telegram chat.
Flagged by Rio as relevant source material.
## Verbatim User Message
{user_message}
## Rio's Context
{source_text}
"""
source_path.write_text(content)
logger.info("Inline source created: %s (by @%s)", filename, username)
except Exception as e:
logger.warning("Failed to create inline source: %s", e)
def _create_inline_claim(claim_text: str, user_message: str, user, msg):
"""Create a draft claim file from Rio's CLAIM: tag. Attributed to contributor."""
try:
username = user.username if user else "anonymous"
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
slug = re.sub(r"[^a-z0-9]+", "-", claim_text[:60].lower()).strip("-")
filename = f"{date_str}-tg-claim-{username}-{slug}.md"
source_path = Path(ARCHIVE_DIR) / filename
if source_path.exists():
return
content = f"""---
type: source
source_type: telegram-claim
title: "Claim from @{username}{claim_text[:80]}"
author: "@{username}"
date: {date_str}
domain: internet-finance
format: claim-draft
status: unprocessed
proposed_by: "@{username}"
contribution_type: claim-proposal
tags: [telegram-claim, inline-claim]
---
# Draft Claim: {claim_text}
Contributed by @{username} in Telegram chat.
Flagged by Rio as a specific, disagreeable assertion worth extracting.
## Verbatim User Message
{user_message}
## Proposed Claim
{claim_text}
"""
source_path.write_text(content)
logger.info("Inline claim drafted: %s (by @%s)", filename, username)
except Exception as e:
logger.warning("Failed to create inline claim: %s", e)
# ─── Helpers ────────────────────────────────────────────────────────────
@ -538,6 +724,9 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
conversation_history.pop(key, None)
logger.info("Conversation window expired for @%s", user.username or "?")
# Capture to full transcript (all messages, all chats)
_record_transcript(msg, text, is_bot=False)
# Buffer for batch triage
message_buffer.append({
"text": sanitize_message(text),
@ -730,15 +919,21 @@ Message: {text}
Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you only respond when tagged or replied to.
IMPORTANT: Two special tags you can append at the end of your response (after your main text):
IMPORTANT: Special tags you can append at the end of your response (after your main text):
1. If you learn something: LEARNING: [category] [what you learned]
1. LEARNING: [category] [what you learned]
Categories: factual, communication, structured_data
Only when genuinely learned something. Most responses have none.
NEVER save a learning about what data you do or don't have access to. Your knowledge base changes constantly — availability learnings become stale immediately.
NEVER save a learning about what data you do or don't have access to.
2. If the user would benefit from an X search on a topic: RESEARCH: [search query]
This triggers an automatic X search. Use when the user asks about recent sentiment, community takes, or emerging discussions. Only when a search would genuinely help."""
2. RESEARCH: [search query]
Triggers a live X search and sends results back to the chat. Use when the user asks about recent activity, sentiment, or discussions.
3. SOURCE: [description of what to ingest]
When a user shares valuable source material (X posts, articles, data). Creates a source file in the ingestion pipeline, attributed to the user. Include the verbatim content don't alter or summarize the user's contribution. Use this when someone drops a link or shares original analysis worth preserving.
4. CLAIM: [specific, disagreeable assertion]
When a user makes a specific claim with evidence that could enter the KB. Creates a draft claim file attributed to them. Only for genuine claims not opinions or questions."""
# Call Opus
response = await call_openrouter(RESPONSE_MODEL, prompt, max_tokens=1024)
@ -771,7 +966,23 @@ IMPORTANT: Two special tags you can append at the end of your response (after yo
_research_and_followup(msg, query.strip(), user))
logger.info("Auto-research triggered (will follow up): %s", query[:80])
# Post response (without LEARNING lines)
# SOURCE: tag — Rio flags content for pipeline ingestion (verbatim, attributed)
source_lines = re.findall(r'^SOURCE:\s+(.+)$', response, re.MULTILINE)
if source_lines:
display_response = re.sub(r'\nSOURCE:\s+.+$', '', display_response, flags=re.MULTILINE).rstrip()
for source_text in source_lines:
_create_inline_source(source_text.strip(), text, user, msg)
logger.info("Inline SOURCE created: %s", source_text[:80])
# CLAIM: tag — Rio flags a specific assertion for claim drafting
claim_lines = re.findall(r'^CLAIM:\s+(.+)$', response, re.MULTILINE)
if claim_lines:
display_response = re.sub(r'\nCLAIM:\s+.+$', '', display_response, flags=re.MULTILINE).rstrip()
for claim_text in claim_lines:
_create_inline_claim(claim_text.strip(), text, user, msg)
logger.info("Inline CLAIM drafted: %s", claim_text[:80])
# Post response (without tag lines)
# Telegram has a 4096 char limit — split long messages
if len(display_response) <= 4096:
await msg.reply_text(display_response)
@ -820,6 +1031,15 @@ IMPORTANT: Two special tags you can append at the end of your response (after yo
# Log the exchange for audit trail
logger.info("Rio responded to @%s (msg_id=%d)", user.username if user else "?", msg.message_id)
# Record bot response to transcript (with internal reasoning)
_record_transcript(msg, display_response, is_bot=True, rio_response=display_response,
internal={
"entities_matched": [e.name for e in kb_ctx.entities] if kb_ctx else [],
"claims_matched": len(kb_ctx.claims) if kb_ctx else 0,
"search_triggered": bool(research_context),
"learnings_written": bool(learning_lines) if 'learning_lines' in dir() else False,
})
# Detect and fetch URLs for pipeline ingestion (all URLs, not just first)
urls = _extract_urls(text)
url_content = None
@ -1249,8 +1469,15 @@ def main():
first=TRIAGE_INTERVAL,
)
# Transcript dump job — every 1 hour
app.job_queue.run_repeating(
_dump_transcripts,
interval=3600,
first=3600,
)
# Run
logger.info("Bot running. Triage interval: %ds", TRIAGE_INTERVAL)
logger.info("Bot running. Triage interval: %ds, transcript dump: 1h", TRIAGE_INTERVAL)
app.run_polling(drop_pending_updates=True)