From f768e75ecfd2a440d45cb813397469a805d5d148 Mon Sep 17 00:00:00 2001 From: Glucksberg Date: Mon, 1 Jun 2026 14:33:36 -0400 Subject: [PATCH] fix(telegram): cache observed group media --- gateway/platforms/telegram.py | 347 ++++++++++++++++++-- tests/gateway/test_telegram_group_gating.py | 314 +++++++++++++++++- 2 files changed, 628 insertions(+), 33 deletions(-) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 14820c0fe..27a69ae41 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -16,7 +16,7 @@ import tempfile import html as _html import re from datetime import datetime, timezone -from typing import Dict, List, Optional, Set, Any +from typing import Any, Callable, Dict, List, Optional, Set logger = logging.getLogger(__name__) @@ -4877,7 +4877,58 @@ class TelegramAdapter(BasePlatformAdapter): def _telegram_group_observe_attributed_text(self, event: MessageEvent) -> str: user_id = event.source.user_id or "unknown" sender = event.source.user_name or user_id - return f"[{sender}|{user_id}]\n{event.text or ''}" + parts: list[str] = [] + if event.text: + parts.append(event.text) + media_note = self._telegram_group_observe_media_note(event) + if media_note: + parts.append(media_note) + body = "\n\n".join(parts) + return f"[{sender}|{user_id}]\n{body}" + + @staticmethod + def _append_media_status_text(existing: str, note: str) -> str: + if not note: + return existing + if not existing: + return note + return f"{existing}\n\n{note}" + + def _telegram_group_observe_media_note(self, event: MessageEvent) -> str: + """Describe cached observed media in transcript context.""" + media_urls = list(getattr(event, "media_urls", None) or []) + if not media_urls: + return "" + + try: + from tools.credential_files import to_agent_visible_cache_path + except Exception: + to_agent_visible_cache_path = lambda path: path # type: ignore[assignment] + + lines: list[str] = [] + media_types = list(getattr(event, "media_types", None) or []) + for index, path in enumerate(media_urls): + mtype = media_types[index] if index < len(media_types) else "" + basename = os.path.basename(path) + parts = basename.split("_", 2) + display_name = parts[2] if len(parts) >= 3 else basename + display_name = re.sub(r'[^\w.\- ]', '_', display_name) + agent_path = to_agent_visible_cache_path(path) + + if mtype.startswith("image/") or event.message_type == MessageType.PHOTO: + label = "image" + elif event.message_type == MessageType.VOICE: + label = "voice message" + elif event.message_type == MessageType.AUDIO or mtype.startswith("audio/"): + label = "audio file" + elif mtype.startswith("video/") or event.message_type == MessageType.VIDEO: + label = "video" + else: + label = "document" + lines.append( + f"[Observed Telegram {label}: '{display_name}' saved at: {agent_path}]" + ) + return "\n".join(lines) def _telegram_group_observe_channel_prompt(self) -> str: username = getattr(getattr(self, "_bot", None), "username", None) or "unknown" @@ -4918,13 +4969,19 @@ class TelegramAdapter(BasePlatformAdapter): channel_prompt=channel_prompt, ) - def _observe_unmentioned_group_message(self, message: Message, msg_type: MessageType, update_id: Optional[int] = None) -> None: + def _observe_unmentioned_group_message( + self, + message: Message, + msg_type: MessageType, + update_id: Optional[int] = None, + event: Optional[MessageEvent] = None, + ) -> None: """Append skipped group chatter to the target session without dispatching.""" store = getattr(self, "_session_store", None) if not store: return try: - event = self._build_message_event(message, msg_type, update_id=update_id) + event = event or self._build_message_event(message, msg_type, update_id=update_id) shared_source = self._telegram_group_observe_shared_source(event.source) session_entry = store.get_or_create_session(shared_source) entry = { @@ -5278,45 +5335,271 @@ class TelegramAdapter(BasePlatformAdapter): self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key)) + def _media_message_type(self, msg: Message) -> MessageType: + if msg.sticker: + return MessageType.STICKER + if msg.photo: + return MessageType.PHOTO + if msg.video: + return MessageType.VIDEO + if msg.audio: + return MessageType.AUDIO + if msg.voice: + return MessageType.VOICE + return MessageType.DOCUMENT + + def _observed_media_exceeds_cache_limit(self, event: MessageEvent, label: str, file_size: Any) -> bool: + max_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024) + try: + size = int(file_size or 0) + except (TypeError, ValueError): + size = 0 + if 0 < size <= max_bytes: + return False + + limit_mb = max_bytes // (1024 * 1024) + event.text = self._append_media_status_text( + event.text, + ( + f"Observed Telegram {label} was too large or its size could not be " + f"verified. Maximum: {limit_mb} MB." + ), + ) + logger.info( + "[Telegram] Observed group %s too large or unknown size: %s bytes", + label, + file_size, + ) + return True + + async def _cache_observed_simple_media( + self, + event: MessageEvent, + *, + source: Any, + label: str, + cache_fn: Callable[..., str], + default_ext: str, + media_type_for_ext: Callable[[str], str], + ext_candidates: Optional[Any] = None, + ) -> None: + try: + if self._observed_media_exceeds_cache_limit(event, label, getattr(source, "file_size", None)): + return + + file_obj = await source.get_file() + media_bytes = await file_obj.download_as_bytearray() + ext = default_ext + file_path = getattr(file_obj, "file_path", None) + if file_path and ext_candidates: + file_path_lower = file_path.lower() + for candidate in ext_candidates: + if file_path_lower.endswith(candidate): + ext = candidate + break + + cached_path = cache_fn(bytes(media_bytes), ext=ext) + event.media_urls = [cached_path] + event.media_types = [media_type_for_ext(ext)] + logger.info("[Telegram] Cached observed group %s at %s", label, cached_path) + except Exception as e: + logger.warning("[Telegram] Failed to cache observed group %s: %s", label, e, exc_info=True) + + async def _build_observed_media_event( + self, + msg: Message, + msg_type: MessageType, + update_id: Optional[int] = None, + ) -> MessageEvent: + """Build and cache media for an observed, unmentioned group message.""" + event = self._build_message_event(msg, msg_type, update_id=update_id) + if msg.caption: + event.text = self._clean_bot_trigger_text(msg.caption) + + if msg.photo: + await self._cache_observed_simple_media( + event, + source=msg.photo[-1], + label="image", + cache_fn=cache_image_from_bytes, + default_ext=".jpg", + ext_candidates=(".png", ".webp", ".gif", ".jpeg", ".jpg"), + media_type_for_ext=lambda ext: f"image/{ext.lstrip('.')}", + ) + + elif msg.voice: + await self._cache_observed_simple_media( + event, + source=msg.voice, + label="voice message", + cache_fn=cache_audio_from_bytes, + default_ext=".ogg", + media_type_for_ext=lambda _ext: "audio/ogg", + ) + + elif msg.audio: + await self._cache_observed_simple_media( + event, + source=msg.audio, + label="audio file", + cache_fn=cache_audio_from_bytes, + default_ext=".mp3", + media_type_for_ext=lambda _ext: "audio/mp3", + ) + + elif msg.video: + await self._cache_observed_simple_media( + event, + source=msg.video, + label="video", + cache_fn=cache_video_from_bytes, + default_ext=".mp4", + ext_candidates=SUPPORTED_VIDEO_TYPES, + media_type_for_ext=lambda ext: SUPPORTED_VIDEO_TYPES.get(ext, "video/mp4"), + ) + + elif msg.document: + await self._cache_observed_document_event(msg, event) + + return event + + async def _cache_observed_document_event(self, msg: Message, event: MessageEvent) -> None: + doc = msg.document + try: + ext = "" + original_filename = doc.file_name or "" + if original_filename: + _, ext = os.path.splitext(original_filename) + ext = ext.lower() + + doc_mime = (doc.mime_type or "").lower() + if not ext and doc_mime: + ext = _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, "") + if not ext: + mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()} + ext = mime_to_ext.get(doc_mime, "") + + max_doc_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024) + if not doc.file_size or doc.file_size > max_doc_bytes: + limit_mb = max_doc_bytes // (1024 * 1024) + event.text = self._append_media_status_text( + event.text, + ( + "Observed Telegram document was too large or its size could not " + f"be verified. Maximum: {limit_mb} MB." + ), + ) + logger.info("[Telegram] Observed group document too large: %s bytes", doc.file_size) + return + + if ext in _TELEGRAM_IMAGE_EXTENSIONS or doc_mime.startswith("image/"): + file_obj = await doc.get_file() + image_bytes = await file_obj.download_as_bytearray() + image_ext = ( + ext + if ext in _TELEGRAM_IMAGE_EXTENSIONS + else _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, ".jpg") + ) + try: + cached_path = cache_image_from_bytes(bytes(image_bytes), ext=image_ext) + except ValueError as e: + logger.warning("[Telegram] Failed to cache observed image document: %s", e, exc_info=True) + event.text = self._append_media_status_text( + event.text, + ( + f"Image document '{original_filename or doc_mime or ext or 'unknown'}' " + "could not be read as an image." + ), + ) + return + event.message_type = MessageType.PHOTO + event.media_urls = [cached_path] + event.media_types = [ + doc_mime + if doc_mime.startswith("image/") + else _TELEGRAM_IMAGE_EXT_TO_MIME.get(image_ext, "image/jpeg") + ] + logger.info("[Telegram] Cached observed group image-document at %s", cached_path) + return + + if not ext and doc.mime_type: + video_mime_to_ext = {v: k for k, v in SUPPORTED_VIDEO_TYPES.items()} + ext = video_mime_to_ext.get(doc.mime_type, "") + + if not ext and doc.mime_type: + image_mime_to_ext: dict[str, str] = {} + for _ext, _mime in SUPPORTED_IMAGE_DOCUMENT_TYPES.items(): + image_mime_to_ext.setdefault(_mime, _ext) + ext = image_mime_to_ext.get(doc.mime_type, "") + + if ext in SUPPORTED_VIDEO_TYPES: + file_obj = await doc.get_file() + video_bytes = await file_obj.download_as_bytearray() + cached_path = cache_video_from_bytes(bytes(video_bytes), ext=ext) + event.media_urls = [cached_path] + event.media_types = [SUPPORTED_VIDEO_TYPES[ext]] + event.message_type = MessageType.VIDEO + logger.info("[Telegram] Cached observed group video document at %s", cached_path) + return + + if ext not in SUPPORTED_DOCUMENT_TYPES: + supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys())) + event.text = self._append_media_status_text( + event.text, + f"Unsupported document type '{ext or 'unknown'}'. Supported types: {supported_list}", + ) + logger.info("[Telegram] Unsupported observed group document type: %s", ext or "unknown") + return + + file_obj = await doc.get_file() + doc_bytes = await file_obj.download_as_bytearray() + raw_bytes = bytes(doc_bytes) + cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}") + mime_type = SUPPORTED_DOCUMENT_TYPES[ext] + event.media_urls = [cached_path] + event.media_types = [mime_type] + logger.info("[Telegram] Cached observed group document at %s", cached_path) + + max_text_inject_bytes = 100 * 1024 + if ext in {".md", ".txt"} and len(raw_bytes) <= max_text_inject_bytes: + try: + text_content = raw_bytes.decode("utf-8") + display_name = original_filename or f"document{ext}" + display_name = re.sub(r'[^\w.\- ]', '_', display_name) + injection = f"[Content of {display_name}]:\n{text_content}" + event.text = f"{injection}\n\n{event.text}" if event.text else injection + except UnicodeDecodeError: + logger.warning( + "[Telegram] Could not decode observed text file as UTF-8, skipping content injection", + exc_info=True, + ) + except Exception as e: + logger.warning("[Telegram] Failed to cache observed group document: %s", e, exc_info=True) + async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming media messages, downloading images to local cache.""" if not update.message: return if not self._should_process_message(update.message): if self._should_observe_unmentioned_group_message(update.message): - _m = update.message - if _m.sticker: - _observe_type = MessageType.STICKER - elif _m.photo: - _observe_type = MessageType.PHOTO - elif _m.video: - _observe_type = MessageType.VIDEO - elif _m.audio: - _observe_type = MessageType.AUDIO - elif _m.voice: - _observe_type = MessageType.VOICE - else: - _observe_type = MessageType.DOCUMENT - self._observe_unmentioned_group_message(_m, _observe_type, update_id=update.update_id) + msg_type = self._media_message_type(update.message) + event = await self._build_observed_media_event( + update.message, + msg_type, + update_id=update.update_id, + ) + self._observe_unmentioned_group_message( + update.message, + event.message_type, + update_id=update.update_id, + event=event, + ) return msg = update.message # Determine media type - if msg.sticker: - msg_type = MessageType.STICKER - elif msg.photo: - msg_type = MessageType.PHOTO - elif msg.video: - msg_type = MessageType.VIDEO - elif msg.audio: - msg_type = MessageType.AUDIO - elif msg.voice: - msg_type = MessageType.VOICE - elif msg.document: - msg_type = MessageType.DOCUMENT - else: - msg_type = MessageType.DOCUMENT + msg_type = self._media_message_type(msg) event = self._build_message_event(msg, msg_type, update_id=update.update_id) diff --git a/tests/gateway/test_telegram_group_gating.py b/tests/gateway/test_telegram_group_gating.py index c3814a7fb..a96d634bb 100644 --- a/tests/gateway/test_telegram_group_gating.py +++ b/tests/gateway/test_telegram_group_gating.py @@ -1,7 +1,7 @@ import asyncio import json from types import SimpleNamespace -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock from gateway.config import Platform, PlatformConfig, load_gateway_config from gateway.platforms.base import MessageType @@ -897,6 +897,134 @@ def _group_voice_message( ) +def _group_photo_message( + *, + chat_id=-100, + from_user_id=111, + from_user_name="Alice Example", + caption="Veja esta foto", + file_size=1024, +): + file_obj = SimpleNamespace( + file_path="photos/observed.png", + download_as_bytearray=AsyncMock(return_value=bytearray(b"\x89PNG\r\n\x1a\n observed")), + ) + photo = SimpleNamespace( + file_size=file_size, + get_file=AsyncMock(return_value=file_obj), + ) + return SimpleNamespace( + message_id=52, + text=None, + caption=caption, + entities=[], + caption_entities=[], + message_thread_id=None, + is_topic_message=False, + chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), + from_user=SimpleNamespace( + id=from_user_id, full_name=from_user_name, + first_name=from_user_name.split()[0], + ), + reply_to_message=None, + date=None, + location=None, + venue=None, + sticker=None, + photo=[photo], + video=None, + audio=None, + voice=None, + document=None, + ) + + +def _group_video_message( + *, + chat_id=-100, + from_user_id=111, + from_user_name="Alice Example", + caption="Veja este video", + file_size=1024, +): + file_obj = SimpleNamespace( + file_path="videos/observed.mp4", + download_as_bytearray=AsyncMock(return_value=bytearray(b"observed video")), + ) + video = SimpleNamespace( + file_size=file_size, + get_file=AsyncMock(return_value=file_obj), + ) + return SimpleNamespace( + message_id=53, + text=None, + caption=caption, + entities=[], + caption_entities=[], + message_thread_id=None, + is_topic_message=False, + chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), + from_user=SimpleNamespace( + id=from_user_id, full_name=from_user_name, + first_name=from_user_name.split()[0], + ), + reply_to_message=None, + date=None, + location=None, + venue=None, + sticker=None, + photo=None, + video=video, + audio=None, + voice=None, + document=None, + ) + + +def _group_document_message( + *, + chat_id=-100, + from_user_id=111, + from_user_name="Alice Example", + caption="Este arquivo", + document=None, +): + file_obj = SimpleNamespace( + file_path="documents/RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf", + download_as_bytearray=AsyncMock(return_value=bytearray(b"%PDF observed bytes")), + ) + document = document or SimpleNamespace( + file_name="RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf", + mime_type="application/pdf", + file_size=1024, + get_file=AsyncMock(return_value=file_obj), + ) + return SimpleNamespace( + message_id=52, + text=None, + caption=caption, + entities=[], + caption_entities=[], + message_thread_id=None, + is_topic_message=False, + chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), + from_user=SimpleNamespace( + id=from_user_id, full_name=from_user_name, + first_name=from_user_name.split()[0], + ), + reply_to_message=None, + date=None, + location=None, + venue=None, + sticker=None, + photo=None, + video=None, + audio=None, + voice=None, + document=document, + ) + + # --------------------------------------------------------------------------- # Observe + attribution parity: location messages # --------------------------------------------------------------------------- @@ -983,6 +1111,190 @@ def test_unmentioned_voice_message_observed_in_group(): asyncio.run(_run()) +def test_unmentioned_photo_message_observed_with_cached_path(monkeypatch, tmp_path): + async def _run(): + adapter = _make_adapter( + require_mention=True, + allowed_chats=["-100"], + group_allowed_chats=["-100"], + observe_unmentioned_group_messages=True, + ) + store = _FakeSessionStore() + adapter._session_store = store + cached_path = tmp_path / "img_abc_observed.png" + monkeypatch.setattr( + "gateway.platforms.telegram.cache_image_from_bytes", + lambda _data, ext=".jpg": str(cached_path), + ) + update = SimpleNamespace( + update_id=3003, + message=_group_photo_message(), + effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Veja esta foto" in message["content"] + assert "Observed Telegram image" in message["content"] + assert str(cached_path) in message["content"] + assert store.sources[0].user_id is None + + asyncio.run(_run()) + + +def test_unmentioned_video_too_large_observed_without_download(monkeypatch): + async def _run(): + adapter = _make_adapter( + require_mention=True, + allowed_chats=["-100"], + group_allowed_chats=["-100"], + observe_unmentioned_group_messages=True, + ) + adapter._max_doc_bytes = 100 + store = _FakeSessionStore() + adapter._session_store = store + cache_video = Mock(return_value="/tmp/observed.mp4") + monkeypatch.setattr("gateway.platforms.telegram.cache_video_from_bytes", cache_video) + message_obj = _group_video_message(file_size=101) + update = SimpleNamespace( + update_id=3004, + message=message_obj, + effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + cache_video.assert_not_called() + message_obj.video.get_file.assert_not_called() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Veja este video" in message["content"] + assert "Observed Telegram video was too large" in message["content"] + assert "/tmp/observed.mp4" not in message["content"] + + asyncio.run(_run()) + + +def test_unmentioned_document_message_observed_with_cached_path(monkeypatch, tmp_path): + async def _run(): + adapter = _make_adapter( + require_mention=True, + allowed_chats=["-100"], + group_allowed_chats=["-100"], + observe_unmentioned_group_messages=True, + ) + store = _FakeSessionStore() + adapter._session_store = store + cached_path = tmp_path / "doc_abc_RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf" + monkeypatch.setattr( + "gateway.platforms.telegram.cache_document_from_bytes", + lambda _data, _filename: str(cached_path), + ) + update = SimpleNamespace( + update_id=3003, + message=_group_document_message(), + effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Este arquivo" in message["content"] + assert "RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf" in message["content"] + assert str(cached_path) in message["content"] + assert store.sources[0].user_id is None + + asyncio.run(_run()) + + +def test_unmentioned_large_document_observed_without_download(monkeypatch): + async def _run(): + adapter = _make_adapter( + require_mention=True, + allowed_chats=["-100"], + group_allowed_chats=["-100"], + observe_unmentioned_group_messages=True, + ) + adapter._max_doc_bytes = 100 + store = _FakeSessionStore() + adapter._session_store = store + cache_document = Mock(return_value="/tmp/huge.pdf") + monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document) + document = SimpleNamespace( + file_name="huge.pdf", + mime_type="application/pdf", + file_size=101, + get_file=AsyncMock(), + ) + update = SimpleNamespace( + update_id=3005, + message=_group_document_message(document=document), + effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + cache_document.assert_not_called() + document.get_file.assert_not_called() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Este arquivo" in message["content"] + assert "Observed Telegram document was too large" in message["content"] + assert "/tmp/huge.pdf" not in message["content"] + + asyncio.run(_run()) + + +def test_unmentioned_unsupported_document_observed_without_download(monkeypatch): + async def _run(): + adapter = _make_adapter( + require_mention=True, + allowed_chats=["-100"], + group_allowed_chats=["-100"], + observe_unmentioned_group_messages=True, + ) + store = _FakeSessionStore() + adapter._session_store = store + cache_document = Mock(return_value="/tmp/malware.exe") + monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document) + document = SimpleNamespace( + file_name="malware.exe", + mime_type="application/x-msdownload", + file_size=100, + get_file=AsyncMock(), + ) + update = SimpleNamespace( + update_id=3006, + message=_group_document_message(document=document), + effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + cache_document.assert_not_called() + document.get_file.assert_not_called() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Este arquivo" in message["content"] + assert "Unsupported document type '.exe'" in message["content"] + assert "/tmp/malware.exe" not in message["content"] + + asyncio.run(_run()) + + def test_triggered_voice_message_uses_shared_session_in_observe_mode(): async def _run(): adapter = _make_adapter(