chore: commit untracked decomposition modules, docs, and ops scripts

- telegram/retrieval.py: RRF merge, query decomposition, vector search
- telegram/response.py: system prompt builder, response parser
- docs/tool-registry-spec.md: Ganymede's tool registry spec
- ops/nightly-reweave.sh: cron wrapper for nightly orphan reweave
- prompts/: changelog and rio system prompt

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-03-31 13:22:09 +01:00
parent 5e0cdfc63a
commit d89fb29c9e
6 changed files with 873 additions and 0 deletions

255
docs/tool-registry-spec.md Normal file
View file

@ -0,0 +1,255 @@
# Tool Registry Architecture Spec
**Status:** Approved (Epimetheus review 2026-03-31)
**Author:** Ganymede
**Date:** 2026-03-31
## Problem
Bot.py has four hardcoded tool paths: LEARNING, RESEARCH, SOURCE, CLAIM. Each is a bespoke code path — tag regex in `response.py`, handler function in `bot.py`, side effects scattered across archival, X search, and file creation. Adding a new tool means modifying the LLM prompt, adding a regex, writing a handler, and wiring the audit trail. No gating — every tool fires immediately on tag match.
## Design
### Registry Interface
```python
# lib/tool_registry.py
@dataclass
class ToolDef:
"""A registered tool that the LLM can invoke via response tags."""
name: str # "research", "source", "claim", "learning"
description: str # Human-readable, included in LLM prompt
tag_prefix: str # "RESEARCH" — literal tag name for parser
arg_pattern: str = r"(.+)" # Regex for argument after "TAG: "
arg_groups: list[str] = field(default_factory=lambda: ["raw_arg"])
prompt_example: str = "" # "RESEARCH: [search query]" — for LLM prompt
handler: Callable # async fn(context: ToolContext) -> ToolResult
cost: str # "free", "cheap", "expensive" — for eval gating
requires_gate: bool # If True, eval pipeline can approve/block
strip_from_display: bool = True # Strip tag from user-visible response
cooldown_seconds: int = 0 # Per-user cooldown (0 = none)
daily_limit: int = 0 # Per-user daily cap (0 = unlimited)
@dataclass
class ToolContext:
"""Input to a tool handler."""
raw_arg: str # The text after the tag (e.g., search query)
user_message: str # Original user message that triggered the response
user: str # @username
chat_id: int
kb_context: str | None # KB context available at response time
confidence: float | None # LLM's self-rated confidence
@dataclass
class ToolResult:
"""Output from a tool handler."""
success: bool
message: str | None # Follow-up message to send (None = silent)
side_effects: list[str] # ["created:inbox/queue/source.md", "searched:x:query"]
audit: dict # Arbitrary data for response_audit.tool_calls
class ToolRegistry:
"""Central registry. Tools register once, available to all agents."""
def register(self, tool: ToolDef) -> None:
"""Register a tool. Raises if name collision."""
def get(self, name: str) -> ToolDef | None:
"""Look up a tool by name."""
def all_tools(self) -> list[ToolDef]:
"""All registered tools, sorted by name."""
def prompt_block(self) -> str:
"""Generate the LLM prompt section describing available tools.
Built from registered tool descriptions + tag formats."""
async def execute(self, name: str, ctx: ToolContext) -> ToolResult:
"""Execute a tool. Applies cooldown/limit checks, eval gate, then handler.
Registry owns timing — stamps duration_ms, tool name, and timestamp on
result.audit automatically. Handlers never touch timing.
Raises ToolRateLimited or ToolNotFound on failure."""
# Timing is owned here, not by handlers:
# start = time.monotonic()
# result = await tool.handler(ctx)
# result.audit["duration_ms"] = int((time.monotonic() - start) * 1000)
# result.audit["tool"] = name
# result.audit["ts"] = datetime.now(UTC).isoformat()
```
### Registration
Tools register at bot startup. No dynamic registration at runtime — the set of available tools is fixed per deploy.
```python
# In bot.py main():
from lib.tool_registry import ToolRegistry, ToolDef
from telegram.tools import research_tool, source_tool, claim_tool, learning_tool
registry = ToolRegistry()
registry.register(research_tool)
registry.register(source_tool)
registry.register(claim_tool)
registry.register(learning_tool)
```
Each tool is defined in `telegram/tools.py` (or split into `telegram/tools/` if the file grows):
```python
# telegram/tools.py
research_tool = ToolDef(
name="research",
description="Search X for recent posts on a topic. Results sent back to chat.",
tag_prefix="RESEARCH",
arg_pattern=r"(.+)",
prompt_example="RESEARCH: [search query]",
handler=_handle_research,
cost="cheap", # One twitterapi.io call
requires_gate=False, # Fire immediately — user expects fast response
cooldown_seconds=0,
daily_limit=3, # Existing limit from bot.py
)
source_tool = ToolDef(
name="source",
description="Archive source material contributed by a user.",
tag_prefix="SOURCE",
arg_pattern=r"(.+)",
prompt_example="SOURCE: [description]",
handler=_handle_source,
cost="free", # File write only
requires_gate=False,
cooldown_seconds=0,
daily_limit=0,
)
claim_tool = ToolDef(
name="claim",
description="Draft a KB claim from a user's assertion.",
tag_prefix="CLAIM",
arg_pattern=r"(.+)",
prompt_example="CLAIM: [specific assertion]",
handler=_handle_claim,
cost="free",
requires_gate=False,
cooldown_seconds=0,
daily_limit=0,
)
learning_tool = ToolDef(
name="learning",
description="Record a correction or new fact from conversation.",
tag_prefix="LEARNING",
arg_pattern=r"(factual|communication|structured_data)\s+(.+)",
arg_groups=["category", "content"],
prompt_example="LEARNING: [category] [what was learned]",
handler=_handle_learning,
cost="free",
requires_gate=False,
cooldown_seconds=0,
daily_limit=0,
)
```
### Integration with Decomposed bot.py
After the 3-module decomposition (bot.py / retrieval.py / response.py), the tool registry slots in cleanly:
1. **response.py** generates the prompt using `registry.prompt_block()` instead of the hardcoded tag instructions at the end of `build_system_prompt()`.
2. **response.py** `parse_response()` becomes `parse_response(raw, registry)` — iterates registered tools to find tags via auto-generated regexes:
```python
for tool in registry.all_tools():
pattern = rf'^{tool.tag_prefix}:\s+{tool.arg_pattern}$'
matches = re.findall(pattern, raw, re.MULTILINE)
```
Each tool's `tag_prefix` + `arg_pattern` defines the pattern. LEARNING's multi-group pattern (`(factual|communication|structured_data)\s+(.+)`) works naturally — `re.findall` returns tuples matched to `arg_groups`.
3. **bot.py** `handle_tagged()` replaces the hardcoded tag-action blocks (lines 1100-1126) with:
```python
for tool_call in parsed.tool_calls:
result = await registry.execute(tool_call.name, tool_call.context)
tool_calls_audit.append(result.audit)
if result.message:
await msg.reply_text(result.message)
```
### Eval Gate Interface
This is the boundary between Epimetheus's eval pipeline and the tool registry.
```python
# lib/eval_gate.py (owned by Epimetheus)
class EvalGate:
"""Approves or blocks tool calls based on eval policy."""
async def check(self, tool: ToolDef, ctx: ToolContext) -> GateDecision:
"""Returns GateDecision(approved=True/False, reason=str).
Called by ToolRegistry.execute() when tool.requires_gate is True.
Receives full ToolDef so gate can check cost tier without registry lookup.
Eval pipeline implements the policy — registry just calls the interface.
"""
```
Contract:
- `ToolRegistry.execute()` calls `EvalGate.check()` before running any tool with `requires_gate=True`.
- If `check()` returns `approved=False`, the tool is not executed and `ToolResult(success=False, message=reason)` is returned.
- If `check()` raises or times out (>2s), the tool **executes anyway** with a warning logged. Non-fatal — eval gate failure should not block user-facing responses.
- `EvalGate` is injected into `ToolRegistry` at construction time. If no gate is provided, all tools execute unconditionally.
```python
registry = ToolRegistry(gate=EvalGate()) # With gating
registry = ToolRegistry() # No gating (default)
```
### Adding a New Tool
One file change + one registration call:
1. Define the tool in `telegram/tools.py`:
```python
new_tool = ToolDef(
name="summarize",
description="Generate a summary of the current conversation.",
tag_prefix="SUMMARIZE",
prompt_example="SUMMARIZE: [topic]",
handler=_handle_summarize,
cost="cheap",
requires_gate=True, # Eval reviews before executing
)
```
2. Register in `main()`:
```python
registry.register(new_tool)
```
The LLM prompt, tag parsing, and audit trail all update automatically — no other code changes needed.
### What This Does NOT Cover
- **Agent-to-agent tool calls.** This registry is for LLM response tags in the Telegram bot. If agents need to call tools on each other, that's a different system (Pentagon messaging).
- **Multi-step tool chains.** Each tool fires independently. If RESEARCH results should feed into a CLAIM, that's handled by conversation context on the next turn, not by chaining tools.
- **Tool discovery by the LLM.** The LLM sees all registered tools in the prompt. No dynamic tool selection or function-calling protocol — we use response tags, which are simpler and auditable.
### Migration Path
1. Write `lib/tool_registry.py` with `ToolRegistry`, `ToolDef`, `ToolContext`, `ToolResult`.
2. Write `telegram/tools.py` with the four existing tools (handlers extracted from bot.py).
3. Update `response.py`: `build_system_prompt` uses `registry.prompt_block()`, `parse_response` uses registry for tag patterns.
4. Update `bot.py` `handle_tagged`: replace hardcoded tag blocks with `registry.execute()` loop.
5. Wire `EvalGate` when Epimetheus's eval pipeline is ready to gate tool calls.
Steps 1-4 are mechanical extraction. Step 5 depends on Epimetheus defining eval policy for tool calls.
### Resolved Questions
1. **Tag regex generation:** Yes — `tag_prefix` + `arg_pattern` on `ToolDef` (structured fields). `parse_response` auto-generates regexes. `prompt_example` is the separate human-readable field for the LLM prompt.
2. **Tag display suppression:** Yes — `strip_from_display: bool = True` on `ToolDef`. Default True (current behavior). Future tools set False if output should be visible.
3. **Rate limiting scope:** Per-user-per-day only. No per-chat limits until real usage demands it. `cooldown_seconds` + `daily_limit` covers current requirements.

48
ops/nightly-reweave.sh Executable file
View file

@ -0,0 +1,48 @@
#!/bin/bash
# Nightly reweave — connect orphan claims via vector similarity + Haiku classification.
# Creates a PR per run for Leo to review.
#
# Cron: 0 1 * * * /opt/teleo-eval/pipeline/ops/nightly-reweave.sh >> /opt/teleo-eval/logs/reweave.log 2>&1
#
# Pentagon-Agent: Epimetheus <0144398e-4ed3-4fe2-95a3-3d72e1abf887>
set -euo pipefail
PIPELINE_DIR="/opt/teleo-eval/pipeline"
EMBED_SCRIPT="/opt/teleo-eval/embed-claims.py"
REWEAVE_SCRIPT="${PIPELINE_DIR}/reweave.py"
LOG_DIR="/opt/teleo-eval/logs"
LOCK_FILE="/opt/teleo-eval/workspaces/.reweave-nightly.lock"
# Batch size per night — 50 orphans is ~$0.05 in Haiku calls
BATCH_SIZE=50
echo "=== Nightly reweave started at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
# Prevent concurrent runs
if [ -f "$LOCK_FILE" ]; then
LOCK_AGE=$(( $(date +%s) - $(stat -c %Y "$LOCK_FILE" 2>/dev/null || stat -f %m "$LOCK_FILE") ))
if [ "$LOCK_AGE" -lt 3600 ]; then
echo "Lock file exists and is ${LOCK_AGE}s old — another reweave is running. Exiting."
exit 0
fi
echo "Stale lock (${LOCK_AGE}s old) — removing."
rm -f "$LOCK_FILE"
fi
trap 'rm -f "$LOCK_FILE"' EXIT
touch "$LOCK_FILE"
# Step 1: Backfill missing embeddings
echo "--- Step 1: Embedding backfill ---"
if [ -f "$EMBED_SCRIPT" ]; then
python3 "$EMBED_SCRIPT" 2>&1 | tail -5
echo "Embedding backfill complete."
else
echo "WARNING: embed-claims.py not found at ${EMBED_SCRIPT} — skipping backfill"
fi
# Step 2: Run reweave
echo "--- Step 2: Reweave (batch=${BATCH_SIZE}) ---"
python3 "$REWEAVE_SCRIPT" --max-orphans "$BATCH_SIZE" 2>&1
echo "=== Nightly reweave finished at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="

12
prompts/changelog.md Normal file
View file

@ -0,0 +1,12 @@
---
description: Prompt version history — what changed and why
---
## rio-system-v1 (2026-03-31)
- Initial extraction from inline string in telegram/response.py
- RESEARCH tag tightened: "ONLY use when user explicitly asks about recent activity"
- No structural changes from deployed version
## Planned
- rio-system-v2: Query classification injected into system prompt (Layer 2)
- eval-grounding-v1: Haiku grounding check prompt (Layer 3)

62
prompts/rio-system-v1.md Normal file
View file

@ -0,0 +1,62 @@
---
version: 1
model: anthropic/claude-opus-4
author: epimetheus
date: 2026-03-31
description: Rio's main Telegram response system prompt
parent_version: null
---
You are Rio, the Teleo internet finance agent. Your Telegram handle is @FutAIrdBot — that IS you. Users tag @FutAIrdBot to reach you. Never say "I'm not FutAIrdBot." You are also @futaRdIO on X. You have deep knowledge about futarchy, prediction markets, token governance, and the MetaDAO ecosystem.
## How to sound
Write like a sharp analyst talking to peers, not like an AI. Specifically:
- Use your knowledge naturally. Don't say "the KB tracks" or "at experimental confidence" or "our claims show." Just state what you know and how confident you are in plain language.
- Have a take. You're an analyst, not a summarizer. Say what you actually think.
- Before you respond, ask yourself: "Does every sentence here add something the user doesn't already know?" If a sentence just restates context, agrees without adding insight, or pads with filler — cut it. Your goal is signal density, not word count.
- Short questions deserve short answers. If someone asks a factual question, give the fact. Don't surround it with caveats, context, and "the honest picture is" framing.
- Long answers are fine when the question is genuinely complex or the user asks for depth. But earn every paragraph — each one should contain a distinct insight the previous one didn't cover.
- Match the user's energy. If they wrote one line, respond in kind.
- Sound human. No em dashes, no "That said", no "It's worth noting." Just say the thing.
- No markdown. Plain text only.
- When you're uncertain, just say so simply. "I'm not sure about X" beats "we don't have data on this yet."
## Your learnings (corrections from past conversations — prioritize these over KB data when they conflict)
{learnings}
## What you know about this topic
{kb_context}
{market_section}
{research_context}
{x_link_context}
## Conversation History (NEVER ask a question your history already answers)
{conversation_history}
## The message you're responding to
From: @{username}
Message: {message}
Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you — only respond when tagged or replied to.
IMPORTANT: Special tags you can append at the end of your response (after your main text):
1. LEARNING: [category] [what you learned]
Categories: factual, communication, structured_data
Only when genuinely learned something. Most responses have none.
NEVER save a learning about what data you do or don't have access to.
2. RESEARCH: [search query]
Triggers a live X search and sends results back to the chat. ONLY use when the user explicitly asks about recent activity, live sentiment, or breaking news that the KB can't answer. Do NOT use for general knowledge questions — if you already answered from KB context, don't also trigger a search.
3. SOURCE: [description of what to ingest]
When a user shares valuable source material (X posts, articles, data). Creates a source file in the ingestion pipeline, attributed to the user. Include the verbatim content — don't alter or summarize the user's contribution. Use this when someone drops a link or shares original analysis worth preserving.
4. CLAIM: [specific, disagreeable assertion]
When a user makes a specific claim with evidence that could enter the KB. Creates a draft claim file attributed to them. Only for genuine claims — not opinions or questions.
5. CONFIDENCE: [0.0-1.0]
ALWAYS include this tag. Rate how well the KB context above actually helped you answer this question. 1.0 = KB had exactly what was needed. 0.5 = KB had partial/tangential info. 0.0 = KB had nothing relevant, you answered from general knowledge. This is for internal audit only — never visible to users.

150
telegram/response.py Normal file
View file

@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""Response construction and post-processing.
Builds LLM prompts, parses response tags (LEARNING, RESEARCH, SOURCE, CLAIM,
CONFIDENCE), strips internal tags from display output.
All functions are stateless. No Telegram types, no SQLite, no module-level state.
Extracted from bot.py (Ganymede decomposition spec).
"""
import logging
import re
from dataclasses import dataclass, field
logger = logging.getLogger("tg.response")
@dataclass
class ParsedResponse:
"""Result of parsing Rio's raw LLM response."""
display_text: str
confidence: float | None
learnings: list[tuple[str, str]] = field(default_factory=list) # [(category, correction)]
research_queries: list[str] = field(default_factory=list)
sources: list[str] = field(default_factory=list)
claims: list[str] = field(default_factory=list)
def build_system_prompt(
*,
kb_context: str,
market_context: str,
research_context: str,
x_link_context: str,
learnings: str,
conversation_history: str,
username: str,
message: str,
) -> str:
"""Assemble the full Opus system prompt for Rio's response.
All context is pre-formatted strings this function only templates them.
"""
return f"""You are Rio, the Teleo internet finance agent. Your Telegram handle is @FutAIrdBot — that IS you. Users tag @FutAIrdBot to reach you. Never say "I'm not FutAIrdBot." You are also @futaRdIO on X. You have deep knowledge about futarchy, prediction markets, token governance, and the MetaDAO ecosystem.
## How to sound
Write like a sharp analyst talking to peers, not like an AI. Specifically:
- Use your knowledge naturally. Don't say "the KB tracks" or "at experimental confidence" or "our claims show." Just state what you know and how confident you are in plain language.
- Have a take. You're an analyst, not a summarizer. Say what you actually think.
- Before you respond, ask yourself: "Does every sentence here add something the user doesn't already know?" If a sentence just restates context, agrees without adding insight, or pads with filler cut it. Your goal is signal density, not word count.
- Short questions deserve short answers. If someone asks a factual question, give the fact. Don't surround it with caveats, context, and "the honest picture is" framing.
- Long answers are fine when the question is genuinely complex or the user asks for depth. But earn every paragraph each one should contain a distinct insight the previous one didn't cover.
- Match the user's energy. If they wrote one line, respond in kind.
- Sound human. No em dashes, no "That said", no "It's worth noting." Just say the thing.
- No markdown. Plain text only.
- When you're uncertain, just say so simply. "I'm not sure about X" beats "we don't have data on this yet."
## Your learnings (corrections from past conversations — prioritize these over KB data when they conflict)
{learnings}
## What you know about this topic
{kb_context}
{f"## Live Market Data{chr(10)}{market_context}" if market_context else ""}
{research_context}
{x_link_context}
## Conversation History (NEVER ask a question your history already answers)
{conversation_history}
## The message you're responding to
From: @{username}
Message: {message}
Respond now. Be substantive but concise. If they're wrong about something, say so directly. If they know something you don't, tell them it's worth digging into. If they correct you, accept it and build on the correction. Do NOT respond to messages that aren't directed at you only respond when tagged or replied to.
IMPORTANT: Special tags you can append at the end of your response (after your main text):
1. LEARNING: [category] [what you learned]
Categories: factual, communication, structured_data
Only when genuinely learned something. Most responses have none.
NEVER save a learning about what data you do or don't have access to.
2. RESEARCH: [search query]
Triggers a live X search and sends results back to the chat. ONLY use when the user explicitly asks about recent activity, live sentiment, or breaking news that the KB can't answer. Do NOT use for general knowledge questions — if you already answered from KB context, don't also trigger a search.
3. SOURCE: [description of what to ingest]
When a user shares valuable source material (X posts, articles, data). Creates a source file in the ingestion pipeline, attributed to the user. Include the verbatim content don't alter or summarize the user's contribution. Use this when someone drops a link or shares original analysis worth preserving.
4. CLAIM: [specific, disagreeable assertion]
When a user makes a specific claim with evidence that could enter the KB. Creates a draft claim file attributed to them. Only for genuine claims not opinions or questions.
5. CONFIDENCE: [0.0-1.0]
ALWAYS include this tag. Rate how well the KB context above actually helped you answer this question. 1.0 = KB had exactly what was needed. 0.5 = KB had partial/tangential info. 0.0 = KB had nothing relevant, you answered from general knowledge. This is for internal audit only never visible to users."""
def parse_response(raw_response: str) -> ParsedResponse:
"""Parse LLM response: extract tags, strip them from display, extract confidence.
Tag parsing order: LEARNING, RESEARCH, SOURCE, CLAIM, CONFIDENCE.
Confidence regex is case-insensitive, bracket-optional.
"""
display = raw_response
# LEARNING tags
learnings = re.findall(
r'^LEARNING:\s*(factual|communication|structured_data)\s+(.+)$',
raw_response, re.MULTILINE)
if learnings:
display = re.sub(r'\n?LEARNING:\s*\S+\s+.+$', '', display, flags=re.MULTILINE).rstrip()
# RESEARCH tags
research_queries = re.findall(r'^RESEARCH:\s+(.+)$', raw_response, re.MULTILINE)
if research_queries:
display = re.sub(r'\n?RESEARCH:\s+.+$', '', display, flags=re.MULTILINE).rstrip()
# SOURCE tags
sources = re.findall(r'^SOURCE:\s+(.+)$', raw_response, re.MULTILINE)
if sources:
display = re.sub(r'\n?SOURCE:\s+.+$', '', display, flags=re.MULTILINE).rstrip()
# CLAIM tags
claims = re.findall(r'^CLAIM:\s+(.+)$', raw_response, re.MULTILINE)
if claims:
display = re.sub(r'\n?CLAIM:\s+.+$', '', display, flags=re.MULTILINE).rstrip()
# CONFIDENCE tag (case-insensitive, bracket-optional)
confidence = None
confidence_match = re.search(
r'^CONFIDENCE:\s*\[?([\d.]+)\]?', raw_response, re.MULTILINE | re.IGNORECASE)
if confidence_match:
try:
confidence = max(0.0, min(1.0, float(confidence_match.group(1))))
except ValueError:
pass
# Broad strip — catches any format deviation
display = re.sub(
r'\n?^CONFIDENCE\s*:.*$', '', display, flags=re.MULTILINE | re.IGNORECASE).rstrip()
return ParsedResponse(
display_text=display,
confidence=confidence,
learnings=[(cat, corr) for cat, corr in learnings],
research_queries=[q.strip() for q in research_queries],
sources=[s.strip() for s in sources],
claims=[c.strip() for c in claims],
)

346
telegram/retrieval.py Normal file
View file

@ -0,0 +1,346 @@
#!/usr/bin/env python3
"""Retrieval orchestration — keyword, vector, RRF merge, query decomposition.
All functions are stateless. LLM calls are injected via callback (llm_fn).
No Telegram types, no SQLite, no module-level state.
Extracted from bot.py (Ganymede decomposition spec).
"""
import logging
import re
import time
from typing import Any, Callable, Awaitable
from lib.config import (
RETRIEVAL_RRF_K as RRF_K,
RETRIEVAL_ENTITY_BOOST as ENTITY_BOOST,
RETRIEVAL_MAX_RESULTS as MAX_RETRIEVAL_CLAIMS,
)
logger = logging.getLogger("tg.retrieval")
# Type alias for the LLM callback injected by bot.py
LLMFn = Callable[[str, str, int], Awaitable[str | None]] # (model, prompt, max_tokens) → response
def rrf_merge_context(kb_ctx: Any, vector_meta: dict, kb_read_dir: str) -> tuple[str, list[dict]]:
"""Merge keyword and vector retrieval into a single ranked claim list via RRF.
Reciprocal Rank Fusion: RRF(d) = Σ 1/(k + rank_i(d))
k=20 tuned for small result sets (5-10 per source).
Entity-aware boosting: claims wiki-linked from matched entities get +50% RRF score.
Returns (formatted_text, ranked_claims_for_audit).
"""
# Collect claim titles wiki-linked from matched entities
entity_linked_titles: set[str] = set()
if kb_ctx and kb_ctx.entities:
for ent in kb_ctx.entities:
for t in ent.related_claims:
entity_linked_titles.add(t.lower())
# --- Build per-claim RRF scores ---
claim_map: dict[str, dict] = {}
# Keyword claims (already sorted by keyword score desc)
for rank, claim in enumerate(kb_ctx.claims):
p = claim.path
if kb_read_dir and p.startswith(kb_read_dir):
p = p[len(kb_read_dir):].lstrip("/")
rrf = 1.0 / (RRF_K + rank)
claim_map[p] = {
"rrf_score": rrf,
"title": claim.title,
"domain": claim.domain,
"confidence": claim.confidence,
"description": claim.description,
"source": "keyword",
"vector_score": None,
}
# Vector results (already sorted by cosine desc)
for rank, vr in enumerate(vector_meta.get("direct_results", [])):
p = vr.get("path", "")
rrf = 1.0 / (RRF_K + rank)
if p in claim_map:
claim_map[p]["rrf_score"] += rrf
claim_map[p]["source"] = "vector+keyword"
claim_map[p]["vector_score"] = vr.get("score")
else:
claim_map[p] = {
"rrf_score": rrf,
"title": vr.get("title", ""),
"domain": vr.get("domain", ""),
"confidence": "",
"description": "",
"source": "vector",
"vector_score": vr.get("score"),
}
# Apply entity-linked boost
if entity_linked_titles:
for p, info in claim_map.items():
if info["title"].lower() in entity_linked_titles:
info["rrf_score"] *= ENTITY_BOOST
info["source"] = info["source"] + "+entity"
# Sort by RRF score desc
ranked = sorted(claim_map.items(), key=lambda x: x[1]["rrf_score"], reverse=True)
# --- Format output ---
sections = []
# Entities section (keyword search is still best for entity resolution)
if kb_ctx.entities:
sections.append("## Matched Entities")
for i, ent in enumerate(kb_ctx.entities):
sections.append(f"**{ent.name}** ({ent.entity_type}, {ent.domain})")
if i < 3:
sections.append(ent.overview[:8000])
else:
sections.append(ent.overview[:500])
if ent.related_claims:
sections.append("Related claims: " + ", ".join(ent.related_claims[:5]))
sections.append("")
# Merged claims section (RRF-ranked)
if ranked:
sections.append("## Retrieved Claims")
for path, info in ranked[:MAX_RETRIEVAL_CLAIMS]:
line = f"- **{info['title']}**"
meta_parts = []
if info["confidence"]:
meta_parts.append(f"confidence: {info['confidence']}")
if info["domain"]:
meta_parts.append(info["domain"])
if info["vector_score"] is not None:
meta_parts.append(f"{int(info['vector_score'] * 100)}% semantic match")
if meta_parts:
line += f" ({', '.join(meta_parts)})"
sections.append(line)
if info["description"]:
sections.append(f" {info['description']}")
sections.append("")
# Positions section
if kb_ctx.positions:
sections.append("## Agent Positions")
for pos in kb_ctx.positions:
sections.append(f"**{pos.agent}**: {pos.title}")
sections.append(pos.content[:200])
sections.append("")
# Beliefs section
if kb_ctx.belief_excerpts:
sections.append("## Relevant Beliefs")
for exc in kb_ctx.belief_excerpts:
sections.append(exc)
sections.append("")
# Build audit-friendly ranked list
claims_audit = []
for i, (path, info) in enumerate(ranked[:MAX_RETRIEVAL_CLAIMS]):
claims_audit.append({
"path": path, "title": info["title"],
"score": round(info["rrf_score"], 4),
"rank": i + 1, "source": info["source"],
})
if not sections:
return "No relevant KB content found for this query.", claims_audit
# Stats footer
n_vector = sum(1 for _, v in ranked if v["source"] in ("vector", "vector+keyword"))
n_keyword = sum(1 for _, v in ranked if v["source"] in ("keyword", "vector+keyword"))
n_both = sum(1 for _, v in ranked if v["source"] == "vector+keyword")
sections.append(f"---\nKB: {kb_ctx.stats.get('total_claims', '?')} claims, "
f"{kb_ctx.stats.get('total_entities', '?')} entities. "
f"Retrieved: {len(ranked)} claims (vector: {n_vector}, keyword: {n_keyword}, both: {n_both}).")
return "\n".join(sections), claims_audit
async def reformulate_query(
query: str,
history: list[dict],
llm_fn: LLMFn,
model: str,
) -> str:
"""Rewrite conversational follow-ups into standalone search queries.
If there's no conversation history or the query is already standalone,
returns the original query unchanged.
"""
if not history:
return query
try:
last_exchange = history[-1]
recent_context = ""
if last_exchange.get("user"):
recent_context += f"User: {last_exchange['user'][:300]}\n"
if last_exchange.get("bot"):
recent_context += f"Bot: {last_exchange['bot'][:300]}\n"
reformulate_prompt = (
f"A user is in a conversation. Given the recent exchange and their new message, "
f"rewrite the new message as a STANDALONE search query that captures what they're "
f"actually asking about. The query should work for semantic search — specific topics, "
f"entities, and concepts.\n\n"
f"Recent exchange:\n{recent_context}\n"
f"New message: {query}\n\n"
f"If the message is already a clear standalone question or topic, return it unchanged.\n"
f"If it's a follow-up, correction, or reference to the conversation, rewrite it.\n\n"
f"Return ONLY the rewritten query, nothing else. Max 30 words."
)
reformulated = await llm_fn(model, reformulate_prompt, 80)
if reformulated and reformulated.strip() and len(reformulated.strip()) > 3:
logger.info("Query reformulated: '%s''%s'", query[:60], reformulated.strip()[:60])
return reformulated.strip()
except Exception as e:
logger.warning("Query reformulation failed: %s", e)
return query
async def decompose_query(
query: str,
llm_fn: LLMFn,
model: str,
) -> list[str]:
"""Split multi-part queries into focused sub-queries for vector search.
Only decomposes if query is >8 words and contains a conjunction or multiple
question marks. Otherwise returns [query] unchanged.
"""
try:
words = query.split()
has_conjunction = any(w.lower() in ("and", "but", "also", "plus", "versus", "vs") for w in words)
has_question_marks = query.count("?") > 1
if len(words) > 8 and (has_conjunction or has_question_marks):
decompose_prompt = (
f"Split this query into 2-3 focused search sub-queries. Each sub-query should "
f"target one specific concept or question. Return one sub-query per line, nothing else.\n\n"
f"Query: {query}\n\n"
f"If the query is already focused on one topic, return it unchanged on a single line."
)
decomposed = await llm_fn(model, decompose_prompt, 150)
if decomposed:
parts = [p.strip().lstrip("0123456789.-) ") for p in decomposed.strip().split("\n") if p.strip()]
if 1 < len(parts) <= 4:
logger.info("Query decomposed: '%s'%s", query[:60], parts)
return parts
except Exception as e:
logger.warning("Query decomposition failed: %s", e)
return [query]
def vector_search_merge(
sub_queries: list[str],
retrieve_vector_fn: Callable[[str], tuple[str, dict]],
) -> dict:
"""Run vector search on each sub-query, dedup by path (keep highest score).
Returns merged vector_meta dict with keys:
direct_results, expanded_results, layers_hit, duration_ms, errors.
"""
all_direct = []
all_expanded = []
layers = []
total_duration = 0
errors = []
for sq in sub_queries:
_, v_meta = retrieve_vector_fn(sq)
all_direct.extend(v_meta.get("direct_results", []))
all_expanded.extend(v_meta.get("expanded_results", []))
layers.extend(v_meta.get("layers_hit", []))
total_duration += v_meta.get("duration_ms", 0)
if v_meta.get("error"):
errors.append(v_meta["error"])
# Dedup by path (keep highest score)
seen: dict[str, dict] = {}
for vr in all_direct:
p = vr.get("path", "")
if p not in seen or vr.get("score", 0) > seen[p].get("score", 0):
seen[p] = vr
result = {
"direct_results": list(seen.values()),
"expanded_results": all_expanded,
"layers_hit": list(set(layers)),
"duration_ms": total_duration,
}
if errors:
result["errors"] = errors
return result
async def orchestrate_retrieval(
text: str,
search_query: str,
kb_read_dir: str,
kb_index: Any,
llm_fn: LLMFn,
triage_model: str,
retrieve_context_fn: Callable,
retrieve_vector_fn: Callable[[str], tuple[str, dict]],
) -> dict:
"""Full retrieval pipeline: keyword → decompose → vector → RRF merge.
Returns dict with keys:
kb_context_text, claims_audit, retrieval_layers, vector_meta,
tool_calls, kb_ctx.
"""
tool_calls = []
# 1. Keyword retrieval (entity resolution needs full context)
t_kb = time.monotonic()
kb_ctx = retrieve_context_fn(search_query, kb_read_dir, index=kb_index)
kb_duration = int((time.monotonic() - t_kb) * 1000)
retrieval_layers = ["keyword"] if (kb_ctx and (kb_ctx.entities or kb_ctx.claims)) else []
tool_calls.append({
"tool": "retrieve_context",
"input": {"query": search_query[:200], "original_query": text[:200] if search_query != text else None},
"output": {"entities": len(kb_ctx.entities) if kb_ctx else 0,
"claims": len(kb_ctx.claims) if kb_ctx else 0},
"duration_ms": kb_duration,
})
# 2. Query decomposition
t_decompose = time.monotonic()
sub_queries = await decompose_query(search_query, llm_fn, triage_model)
decompose_duration = int((time.monotonic() - t_decompose) * 1000)
if len(sub_queries) > 1:
tool_calls.append({
"tool": "query_decompose",
"input": {"query": search_query[:200]},
"output": {"sub_queries": sub_queries},
"duration_ms": decompose_duration,
})
# 3. Vector search across sub-queries
vector_meta = vector_search_merge(sub_queries, retrieve_vector_fn)
# 4. RRF merge
kb_context_text, claims_audit = rrf_merge_context(kb_ctx, vector_meta, kb_read_dir)
retrieval_layers.extend(vector_meta.get("layers_hit", []))
tool_calls.append({
"tool": "retrieve_qdrant_context",
"input": {"query": text[:200]},
"output": {"direct_hits": len(vector_meta.get("direct_results", [])),
"expanded": len(vector_meta.get("expanded_results", []))},
"duration_ms": vector_meta.get("duration_ms", 0),
})
return {
"kb_context_text": kb_context_text,
"claims_audit": claims_audit,
"retrieval_layers": retrieval_layers,
"vector_meta": vector_meta,
"tool_calls": tool_calls,
"kb_ctx": kb_ctx,
}