"""Tests for Telegram message batching — chat_id grouping + time proximity. Cannot import bot.py directly (python-telegram-bot dependency), so we replicate _group_into_windows here. The canonical copy lives in telegram/bot.py — any changes there must be mirrored. """ from datetime import datetime def _group_into_windows(messages: list[dict], window_seconds: int = 300) -> list[list[dict]]: """Mirror of telegram/bot.py::_group_into_windows for testing.""" if not messages: return [] messages.sort(key=lambda m: m.get("timestamp", "")) by_chat: dict[int, list[dict]] = {} for msg in messages: cid = msg.get("chat_id", 0) by_chat.setdefault(cid, []).append(msg) windows = [] for chat_msgs in by_chat.values(): current_window = [chat_msgs[0]] for msg in chat_msgs[1:]: prev_ts = current_window[-1].get("timestamp", "") curr_ts = msg.get("timestamp", "") try: gap = (datetime.fromisoformat(curr_ts) - datetime.fromisoformat(prev_ts)).total_seconds() except (ValueError, TypeError): gap = 0 if gap > window_seconds or len(current_window) >= 50: windows.append(current_window) current_window = [msg] else: current_window.append(msg) if current_window: windows.append(current_window) return windows def _msg(chat_id: int, ts: str, text: str = "test", username: str = "user1"): return {"chat_id": chat_id, "timestamp": ts, "text": text, "username": username} def test_separate_chats_get_separate_windows(): """Messages from different chats should never be in the same window.""" msgs = [ _msg(100, "2026-03-31T10:00:00"), _msg(200, "2026-03-31T10:00:01"), _msg(100, "2026-03-31T10:00:02"), _msg(200, "2026-03-31T10:00:03"), ] windows = _group_into_windows(msgs, window_seconds=300) assert len(windows) == 2 # Each window should have messages from only one chat for w in windows: chat_ids = {m["chat_id"] for m in w} assert len(chat_ids) == 1, f"Window has mixed chats: {chat_ids}" def test_time_gap_splits_window(): """Messages >window_seconds apart should be in different windows.""" msgs = [ _msg(100, "2026-03-31T10:00:00"), _msg(100, "2026-03-31T10:01:00"), # 60s gap — same window _msg(100, "2026-03-31T10:10:00"), # 540s gap — new window _msg(100, "2026-03-31T10:11:00"), # 60s gap — same as previous ] windows = _group_into_windows(msgs, window_seconds=300) assert len(windows) == 2 assert len(windows[0]) == 2 assert len(windows[1]) == 2 def test_single_chat_continuous_conversation(): """120 messages from one chat within 5 min should produce few windows.""" msgs = [ _msg(100, f"2026-03-31T10:{i // 60:02d}:{i % 60:02d}") for i in range(120) # 120 messages over 2 minutes ] windows = _group_into_windows(msgs, window_seconds=300) # Should be 2-3 windows (capped at 50 each), NOT 12 assert len(windows) <= 3 total_msgs = sum(len(w) for w in windows) assert total_msgs == 120 def test_cap_at_50(): """Windows should cap at 50 messages.""" msgs = [ _msg(100, f"2026-03-31T10:00:{i:02d}") for i in range(55) ] windows = _group_into_windows(msgs, window_seconds=300) assert len(windows) == 2 assert len(windows[0]) == 50 assert len(windows[1]) == 5 def test_empty_input(): assert _group_into_windows([]) == [] def test_mixed_chats_and_gaps(): """Complex scenario: 2 chats, one with a time gap.""" msgs = [ _msg(100, "2026-03-31T10:00:00"), _msg(100, "2026-03-31T10:01:00"), _msg(200, "2026-03-31T10:00:30"), _msg(200, "2026-03-31T10:01:30"), _msg(100, "2026-03-31T10:20:00"), # 19 min gap — new window for chat 100 ] windows = _group_into_windows(msgs, window_seconds=300) assert len(windows) == 3 # chat 100 early, chat 200, chat 100 late