fix: TG message batching — group by chat_id + time proximity

Root cause: _group_into_windows never checked time gaps or chat_id.
All messages went into one stream, capped at 10 per window. 120 msgs
from one chat → 12 windows → 12 source files → 12 extraction branches.

Fix:
- Group by chat_id first (different chats = different windows always)
- Split on actual time gaps (>window_seconds between messages)
- Cap at 50 messages per window (not 10)
- Consolidate substantive windows from same chat into one source file
  at triage time (one source per chat per triage cycle)

6 tests in tests/test_tg_batching.py.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
m3taversal 2026-03-31 12:58:53 +01:00
parent f25a4093c2
commit 9e42c34271
2 changed files with 213 additions and 19 deletions

View file

@ -386,8 +386,14 @@ def get_db_stats() -> dict:
return {"merged_claims": "?", "contributors": "?"}
async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> str | None:
"""Call OpenRouter API."""
from eval import (
_LLMResponse, estimate_cost, check_url_fabrication, apply_confidence_floor,
CONFIDENCE_FLOOR, COST_ALERT_THRESHOLD,
)
async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> _LLMResponse | None:
"""Call OpenRouter API. Returns _LLMResponse with token counts and cost."""
import aiohttp
key = Path(OPENROUTER_KEY_FILE).read_text().strip()
@ -409,7 +415,16 @@ async def call_openrouter(model: str, prompt: str, max_tokens: int = 2048) -> st
logger.error("OpenRouter %s%d", model, resp.status)
return None
data = await resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content")
content = data.get("choices", [{}])[0].get("message", {}).get("content")
if content is None:
return None
# Extract token usage from OpenRouter response
usage = data.get("usage", {})
pt = usage.get("prompt_tokens", 0)
ct = usage.get("completion_tokens", 0)
cost = _estimate_cost(model, pt, ct)
return _LLMResponse(content, prompt_tokens=pt, completion_tokens=ct,
cost=cost, model=model)
except Exception as e:
logger.error("OpenRouter error: %s", e)
return None
@ -1195,6 +1210,29 @@ IMPORTANT: Special tags you can append at the end of your response (after your m
"rank": len(claims_audit) + 1, "source": "graph",
"edge_type": r.get("edge_type", "")})
# ─── Eval: URL fabrication check ──────────────────────────────
blocked = False
block_reason = None
display_response = _check_url_fabrication(display_response, kb_context_text)
# ─── Eval: confidence floor ────────────────────────────────────
if confidence_score is not None and confidence_score < CONFIDENCE_FLOOR:
blocked = True
block_reason = f"confidence {confidence_score:.2f} < floor {CONFIDENCE_FLOOR}"
# Observation mode: still send response but with caveat prefix
display_response = (
f"⚠️ Low confidence ({confidence_score:.2f}) — treat this response with caution.\n\n"
+ display_response
)
logger.warning("Confidence floor triggered: %.2f for query: %s", confidence_score, text[:100])
# ─── Eval: cost alert ──────────────────────────────────────────
response_cost = getattr(response, 'cost', 0.0) if response else 0.0
response_prompt_tokens = getattr(response, 'prompt_tokens', 0) if response else 0
response_completion_tokens = getattr(response, 'completion_tokens', 0) if response else 0
if response_cost > COST_ALERT_THRESHOLD:
logger.warning("Cost alert: $%.4f for query: %s (model=%s)", response_cost, text[:80], RESPONSE_MODEL)
# Detect retrieval gap (Rio: most valuable signal for KB improvement)
retrieval_gap = None
if not claims_audit and not (kb_ctx and kb_ctx.entities):
@ -1233,10 +1271,18 @@ IMPORTANT: Special tags you can append at the end of your response (after your m
display_response=display_response[:5000],
confidence_score=confidence_score,
response_time_ms=response_time_ms,
# Eval pipeline columns (schema v10)
prompt_tokens=response_prompt_tokens,
completion_tokens=response_completion_tokens,
generation_cost=response_cost,
blocked=1 if blocked else 0,
block_reason=block_reason,
)
_audit_conn.commit()
logger.info("Audit record written (confidence=%.2f, layers=%s, %d claims, %dms)",
confidence_score or 0, retrieval_layers, len(claims_audit), response_time_ms)
logger.info("Audit record written (confidence=%.2f, cost=$%.4f, layers=%s, %d claims, %dms%s)",
confidence_score or 0, response_cost, retrieval_layers,
len(claims_audit), response_time_ms,
", BLOCKED" if blocked else "")
except Exception as e:
logger.warning("Failed to write audit record: %s", e)
@ -1572,7 +1618,8 @@ Respond with ONLY the window numbers and tags, one per line:
logger.warning("Triage LLM call failed — buffered messages dropped")
return
# Parse triage results
# Parse triage results — collect substantive windows per chat
substantive_by_chat: dict[int, list[tuple[list[dict], str]]] = {}
for line in result.strip().split("\n"):
match = re.match(r"(\d+):\s*\[(\w+)\]", line)
if not match:
@ -1584,31 +1631,65 @@ Respond with ONLY the window numbers and tags, one per line:
continue
if tag in ("CLAIM", "ENTITY", "EVIDENCE"):
_archive_window(windows[idx], tag)
chat_id = windows[idx][0].get("chat_id", 0)
substantive_by_chat.setdefault(chat_id, []).append(
(windows[idx], tag))
logger.info("Triage complete: %d windows processed", len(windows))
# Consolidate: one source file per chat (merge all substantive windows)
for chat_id, tagged_windows in substantive_by_chat.items():
merged_msgs = []
tags = set()
for win_msgs, tag in tagged_windows:
merged_msgs.extend(win_msgs)
tags.add(tag)
# Use highest-priority tag: CLAIM > EVIDENCE > ENTITY
best_tag = ("CLAIM" if "CLAIM" in tags
else "EVIDENCE" if "EVIDENCE" in tags
else "ENTITY")
_archive_window(merged_msgs, best_tag)
logger.info("Triage complete: %d windows → %d sources",
len(windows), len(substantive_by_chat))
def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]:
"""Group messages into conversation windows by time proximity."""
"""Group messages into conversation windows by chat_id + time proximity.
Messages from the same chat within window_seconds of each other stay in
one window. Different chats always get separate windows. Windows are
capped at 50 messages (one triage cycle of active chat).
"""
if not messages:
return []
# Sort by timestamp
messages.sort(key=lambda m: m.get("timestamp", ""))
# Group by chat_id first
by_chat: dict[int, list[dict]] = {}
for msg in messages:
cid = msg.get("chat_id", 0)
by_chat.setdefault(cid, []).append(msg)
windows = []
current_window = [messages[0]]
for msg in messages[1:]:
# Simple grouping: if within window_seconds of previous message, same window
current_window.append(msg)
if len(current_window) >= 10: # Cap window size
for chat_msgs in by_chat.values():
current_window = [chat_msgs[0]]
for msg in chat_msgs[1:]:
# Split on time gap
prev_ts = current_window[-1].get("timestamp", "")
curr_ts = msg.get("timestamp", "")
try:
gap = (datetime.fromisoformat(curr_ts) -
datetime.fromisoformat(prev_ts)).total_seconds()
except (ValueError, TypeError):
gap = 0
if gap > window_seconds or len(current_window) >= 50:
windows.append(current_window)
current_window = [msg]
else:
current_window.append(msg)
if current_window:
windows.append(current_window)
current_window = []
if current_window:
windows.append(current_window)
return windows

113
tests/test_tg_batching.py Normal file
View file

@ -0,0 +1,113 @@
"""Tests for Telegram message batching — chat_id grouping + time proximity.
Cannot import bot.py directly (python-telegram-bot dependency), so we
replicate _group_into_windows here. The canonical copy lives in
telegram/bot.py any changes there must be mirrored.
"""
from datetime import datetime
def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]:
"""Mirror of telegram/bot.py::_group_into_windows for testing."""
if not messages:
return []
messages.sort(key=lambda m: m.get("timestamp", ""))
by_chat: dict[int, list[dict]] = {}
for msg in messages:
cid = msg.get("chat_id", 0)
by_chat.setdefault(cid, []).append(msg)
windows = []
for chat_msgs in by_chat.values():
current_window = [chat_msgs[0]]
for msg in chat_msgs[1:]:
prev_ts = current_window[-1].get("timestamp", "")
curr_ts = msg.get("timestamp", "")
try:
gap = (datetime.fromisoformat(curr_ts) -
datetime.fromisoformat(prev_ts)).total_seconds()
except (ValueError, TypeError):
gap = 0
if gap > window_seconds or len(current_window) >= 50:
windows.append(current_window)
current_window = [msg]
else:
current_window.append(msg)
if current_window:
windows.append(current_window)
return windows
def _msg(chat_id: int, ts: str, text: str = "test", username: str = "user1"):
return {"chat_id": chat_id, "timestamp": ts, "text": text, "username": username}
def test_separate_chats_get_separate_windows():
"""Messages from different chats should never be in the same window."""
msgs = [
_msg(100, "2026-03-31T10:00:00"),
_msg(200, "2026-03-31T10:00:01"),
_msg(100, "2026-03-31T10:00:02"),
_msg(200, "2026-03-31T10:00:03"),
]
windows = _group_into_windows(msgs, window_seconds=300)
assert len(windows) == 2
# Each window should have messages from only one chat
for w in windows:
chat_ids = {m["chat_id"] for m in w}
assert len(chat_ids) == 1, f"Window has mixed chats: {chat_ids}"
def test_time_gap_splits_window():
"""Messages >window_seconds apart should be in different windows."""
msgs = [
_msg(100, "2026-03-31T10:00:00"),
_msg(100, "2026-03-31T10:01:00"), # 60s gap — same window
_msg(100, "2026-03-31T10:10:00"), # 540s gap — new window
_msg(100, "2026-03-31T10:11:00"), # 60s gap — same as previous
]
windows = _group_into_windows(msgs, window_seconds=300)
assert len(windows) == 2
assert len(windows[0]) == 2
assert len(windows[1]) == 2
def test_single_chat_continuous_conversation():
"""120 messages from one chat within 5 min should produce few windows."""
msgs = [
_msg(100, f"2026-03-31T10:{i // 60:02d}:{i % 60:02d}")
for i in range(120) # 120 messages over 2 minutes
]
windows = _group_into_windows(msgs, window_seconds=300)
# Should be 2-3 windows (capped at 50 each), NOT 12
assert len(windows) <= 3
total_msgs = sum(len(w) for w in windows)
assert total_msgs == 120
def test_cap_at_50():
"""Windows should cap at 50 messages."""
msgs = [
_msg(100, f"2026-03-31T10:00:{i:02d}")
for i in range(55)
]
windows = _group_into_windows(msgs, window_seconds=300)
assert len(windows) == 2
assert len(windows[0]) == 50
assert len(windows[1]) == 5
def test_empty_input():
assert _group_into_windows([]) == []
def test_mixed_chats_and_gaps():
"""Complex scenario: 2 chats, one with a time gap."""
msgs = [
_msg(100, "2026-03-31T10:00:00"),
_msg(100, "2026-03-31T10:01:00"),
_msg(200, "2026-03-31T10:00:30"),
_msg(200, "2026-03-31T10:01:30"),
_msg(100, "2026-03-31T10:20:00"), # 19 min gap — new window for chat 100
]
windows = _group_into_windows(msgs, window_seconds=300)
assert len(windows) == 3 # chat 100 early, chat 200, chat 100 late