diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 543e6177a..f868dbf28 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1265,6 +1265,107 @@ def cleanup_document_cache(max_age_hours: int = 24) -> int: return removed +# --------------------------------------------------------------------------- +# Unified media caching +# +# One entry point for "I have raw attachment bytes from a platform — cache them +# and tell me what I got." Classifies by extension/MIME against the shared +# registries above, routes to the right cache_*_from_bytes helper, and returns +# a small result the caller can store and/or describe in a transcript. Used by +# both the addressed-message path and the observed-group-context path, on any +# platform — not Telegram-specific. +# --------------------------------------------------------------------------- + +@dataclass +class CachedMedia: + """Result of caching one attachment's bytes.""" + + path: str # absolute cache path, agent-visible (sandbox-translated) + media_type: str # MIME type recorded on the MessageEvent + kind: str # "image" | "video" | "audio" | "document" + display_name: str # human-readable name for transcript notes + + def context_note(self) -> str: + """One-line transcript annotation pointing the agent at the file.""" + return f"[{self.kind} '{self.display_name}' saved at: {self.path}]" + + +def _resolve_media_ext(filename: str, mime_type: str) -> str: + """Best-effort file extension from filename, then MIME fallback.""" + if filename: + ext = os.path.splitext(filename)[1].lower() + if ext: + return ext + mime = (mime_type or "").lower() + if not mime: + return "" + for table in ( + SUPPORTED_IMAGE_DOCUMENT_TYPES, + SUPPORTED_VIDEO_TYPES, + SUPPORTED_DOCUMENT_TYPES, + ): + for ext, m in table.items(): + if m == mime: + return ext + return "" + + +def cache_media_bytes( + data: bytes, + *, + filename: str = "", + mime_type: str = "", + default_kind: Optional[str] = None, +) -> Optional[CachedMedia]: + """Classify and cache raw attachment bytes; return a CachedMedia or None. + + ``default_kind`` ("image"/"video"/"audio"/"document") biases classification + when the extension/MIME are ambiguous — e.g. a Telegram native photo whose + file has no usable name. Unsupported document types return None so the + caller can record an "unsupported" note. Images that fail validation + (``cache_image_from_bytes`` raises ValueError) also return None. + """ + from tools.credential_files import to_agent_visible_cache_path + + ext = _resolve_media_ext(filename, mime_type) + mime = (mime_type or "").lower() + display = re.sub(r"[^\w.\- ]", "_", filename) if filename else (ext.lstrip(".") or "file") + + is_image = ( + mime.startswith("image/") + or ext in SUPPORTED_IMAGE_DOCUMENT_TYPES + or default_kind == "image" + ) + is_video = mime.startswith("video/") or ext in SUPPORTED_VIDEO_TYPES or default_kind == "video" + is_audio = mime.startswith("audio/") or default_kind == "audio" + + if is_image: + img_ext = ext if ext in SUPPORTED_IMAGE_DOCUMENT_TYPES else ".jpg" + try: + path = cache_image_from_bytes(data, ext=img_ext) + except ValueError: + return None + out_mime = mime if mime.startswith("image/") else SUPPORTED_IMAGE_DOCUMENT_TYPES.get(img_ext, "image/jpeg") + return CachedMedia(to_agent_visible_cache_path(path), out_mime, "image", display) + + if is_video: + vid_ext = ext if ext in SUPPORTED_VIDEO_TYPES else ".mp4" + path = cache_video_from_bytes(data, ext=vid_ext) + return CachedMedia(to_agent_visible_cache_path(path), SUPPORTED_VIDEO_TYPES.get(vid_ext, "video/mp4"), "video", display) + + if is_audio: + aud_ext = ext if ext in {".ogg", ".mp3", ".wav", ".m4a", ".opus", ".flac"} else ".ogg" + path = cache_audio_from_bytes(data, ext=aud_ext) + out_mime = mime if mime.startswith("audio/") else f"audio/{aud_ext.lstrip('.')}" + return CachedMedia(to_agent_visible_cache_path(path), out_mime, "audio", display) + + if ext not in SUPPORTED_DOCUMENT_TYPES: + return None + + path = cache_document_from_bytes(data, filename or f"document{ext}") + return CachedMedia(to_agent_visible_cache_path(path), SUPPORTED_DOCUMENT_TYPES[ext], "document", display or f"document{ext}") + + class MessageType(Enum): """Types of incoming messages.""" TEXT = "text" diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 27a69ae41..23f699f48 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -16,7 +16,7 @@ import tempfile import html as _html import re from datetime import datetime, timezone -from typing import Any, Callable, Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Any logger = logging.getLogger(__name__) @@ -4877,58 +4877,7 @@ class TelegramAdapter(BasePlatformAdapter): def _telegram_group_observe_attributed_text(self, event: MessageEvent) -> str: user_id = event.source.user_id or "unknown" sender = event.source.user_name or user_id - parts: list[str] = [] - if event.text: - parts.append(event.text) - media_note = self._telegram_group_observe_media_note(event) - if media_note: - parts.append(media_note) - body = "\n\n".join(parts) - return f"[{sender}|{user_id}]\n{body}" - - @staticmethod - def _append_media_status_text(existing: str, note: str) -> str: - if not note: - return existing - if not existing: - return note - return f"{existing}\n\n{note}" - - def _telegram_group_observe_media_note(self, event: MessageEvent) -> str: - """Describe cached observed media in transcript context.""" - media_urls = list(getattr(event, "media_urls", None) or []) - if not media_urls: - return "" - - try: - from tools.credential_files import to_agent_visible_cache_path - except Exception: - to_agent_visible_cache_path = lambda path: path # type: ignore[assignment] - - lines: list[str] = [] - media_types = list(getattr(event, "media_types", None) or []) - for index, path in enumerate(media_urls): - mtype = media_types[index] if index < len(media_types) else "" - basename = os.path.basename(path) - parts = basename.split("_", 2) - display_name = parts[2] if len(parts) >= 3 else basename - display_name = re.sub(r'[^\w.\- ]', '_', display_name) - agent_path = to_agent_visible_cache_path(path) - - if mtype.startswith("image/") or event.message_type == MessageType.PHOTO: - label = "image" - elif event.message_type == MessageType.VOICE: - label = "voice message" - elif event.message_type == MessageType.AUDIO or mtype.startswith("audio/"): - label = "audio file" - elif mtype.startswith("video/") or event.message_type == MessageType.VIDEO: - label = "video" - else: - label = "document" - lines.append( - f"[Observed Telegram {label}: '{display_name}' saved at: {agent_path}]" - ) - return "\n".join(lines) + return f"[{sender}|{user_id}]\n{event.text or ''}" def _telegram_group_observe_channel_prompt(self) -> str: username = getattr(getattr(self, "_bot", None), "username", None) or "unknown" @@ -4969,6 +4918,96 @@ class TelegramAdapter(BasePlatformAdapter): channel_prompt=channel_prompt, ) + def _media_message_type(self, msg: Message) -> MessageType: + """Classify a Telegram media message into a MessageType.""" + if msg.sticker: + return MessageType.STICKER + if msg.photo: + return MessageType.PHOTO + if msg.video: + return MessageType.VIDEO + if msg.audio: + return MessageType.AUDIO + if msg.voice: + return MessageType.VOICE + return MessageType.DOCUMENT + + async def _cache_observed_media(self, msg: Message, event: MessageEvent) -> None: + """Cache an unmentioned group attachment and annotate the observed text. + + Passive group traffic, so downloads are bounded by the same + ``_max_doc_bytes`` limit as the addressed document path. Oversized or + unsupported attachments are noted in the transcript without downloading. + """ + from gateway.platforms.base import cache_media_bytes + + source, filename, mime, kind = self._observed_media_source(msg) + if source is None: + return + + max_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024) + file_size = getattr(source, "file_size", None) + try: + size = int(file_size or 0) + except (TypeError, ValueError): + size = 0 + if not (0 < size <= max_bytes): + limit_mb = max_bytes // (1024 * 1024) + event.text = self._append_observed_note( + event.text, + f"[Observed Telegram attachment too large or unverifiable. Maximum: {limit_mb} MB.]", + ) + logger.info("[Telegram] Observed group attachment skipped (size=%s)", file_size) + return + + try: + file_obj = await source.get_file() + data = bytes(await file_obj.download_as_bytearray()) + if not filename: + filename = os.path.basename(getattr(file_obj, "file_path", "") or "") + cached = cache_media_bytes(data, filename=filename, mime_type=mime, default_kind=kind) + except Exception as exc: + logger.warning("[Telegram] Failed to cache observed group media: %s", exc, exc_info=True) + return + + if cached is None: + event.text = self._append_observed_note( + event.text, "[Observed Telegram attachment: unsupported type, not cached.]" + ) + return + + event.media_urls = [cached.path] + event.media_types = [cached.media_type] + if cached.kind == "image": + event.message_type = MessageType.PHOTO + elif cached.kind == "video": + event.message_type = MessageType.VIDEO + event.text = self._append_observed_note(event.text, cached.context_note()) + logger.info("[Telegram] Cached observed group %s at %s", cached.kind, cached.path) + + def _observed_media_source(self, msg: Message): + """Return (telegram_file_source, filename, mime, default_kind) or Nones.""" + if msg.photo: + return msg.photo[-1], "", "", "image" + if msg.video: + return msg.video, "", "video/mp4", "video" + if msg.voice: + return msg.voice, "voice.ogg", "audio/ogg", "audio" + if msg.audio: + return msg.audio, getattr(msg.audio, "file_name", "") or "", "", "audio" + if msg.document: + doc = msg.document + return doc, doc.file_name or "", (doc.mime_type or "").lower(), None + return None, "", "", None + + @staticmethod + def _append_observed_note(existing: Optional[str], note: str) -> str: + if not note: + return existing or "" + if not existing: + return note + return f"{existing}\n\n{note}" + def _observe_unmentioned_group_message( self, message: Message, @@ -5335,272 +5374,27 @@ class TelegramAdapter(BasePlatformAdapter): self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key)) - def _media_message_type(self, msg: Message) -> MessageType: - if msg.sticker: - return MessageType.STICKER - if msg.photo: - return MessageType.PHOTO - if msg.video: - return MessageType.VIDEO - if msg.audio: - return MessageType.AUDIO - if msg.voice: - return MessageType.VOICE - return MessageType.DOCUMENT - - def _observed_media_exceeds_cache_limit(self, event: MessageEvent, label: str, file_size: Any) -> bool: - max_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024) - try: - size = int(file_size or 0) - except (TypeError, ValueError): - size = 0 - if 0 < size <= max_bytes: - return False - - limit_mb = max_bytes // (1024 * 1024) - event.text = self._append_media_status_text( - event.text, - ( - f"Observed Telegram {label} was too large or its size could not be " - f"verified. Maximum: {limit_mb} MB." - ), - ) - logger.info( - "[Telegram] Observed group %s too large or unknown size: %s bytes", - label, - file_size, - ) - return True - - async def _cache_observed_simple_media( - self, - event: MessageEvent, - *, - source: Any, - label: str, - cache_fn: Callable[..., str], - default_ext: str, - media_type_for_ext: Callable[[str], str], - ext_candidates: Optional[Any] = None, - ) -> None: - try: - if self._observed_media_exceeds_cache_limit(event, label, getattr(source, "file_size", None)): - return - - file_obj = await source.get_file() - media_bytes = await file_obj.download_as_bytearray() - ext = default_ext - file_path = getattr(file_obj, "file_path", None) - if file_path and ext_candidates: - file_path_lower = file_path.lower() - for candidate in ext_candidates: - if file_path_lower.endswith(candidate): - ext = candidate - break - - cached_path = cache_fn(bytes(media_bytes), ext=ext) - event.media_urls = [cached_path] - event.media_types = [media_type_for_ext(ext)] - logger.info("[Telegram] Cached observed group %s at %s", label, cached_path) - except Exception as e: - logger.warning("[Telegram] Failed to cache observed group %s: %s", label, e, exc_info=True) - - async def _build_observed_media_event( - self, - msg: Message, - msg_type: MessageType, - update_id: Optional[int] = None, - ) -> MessageEvent: - """Build and cache media for an observed, unmentioned group message.""" - event = self._build_message_event(msg, msg_type, update_id=update_id) - if msg.caption: - event.text = self._clean_bot_trigger_text(msg.caption) - - if msg.photo: - await self._cache_observed_simple_media( - event, - source=msg.photo[-1], - label="image", - cache_fn=cache_image_from_bytes, - default_ext=".jpg", - ext_candidates=(".png", ".webp", ".gif", ".jpeg", ".jpg"), - media_type_for_ext=lambda ext: f"image/{ext.lstrip('.')}", - ) - - elif msg.voice: - await self._cache_observed_simple_media( - event, - source=msg.voice, - label="voice message", - cache_fn=cache_audio_from_bytes, - default_ext=".ogg", - media_type_for_ext=lambda _ext: "audio/ogg", - ) - - elif msg.audio: - await self._cache_observed_simple_media( - event, - source=msg.audio, - label="audio file", - cache_fn=cache_audio_from_bytes, - default_ext=".mp3", - media_type_for_ext=lambda _ext: "audio/mp3", - ) - - elif msg.video: - await self._cache_observed_simple_media( - event, - source=msg.video, - label="video", - cache_fn=cache_video_from_bytes, - default_ext=".mp4", - ext_candidates=SUPPORTED_VIDEO_TYPES, - media_type_for_ext=lambda ext: SUPPORTED_VIDEO_TYPES.get(ext, "video/mp4"), - ) - - elif msg.document: - await self._cache_observed_document_event(msg, event) - - return event - - async def _cache_observed_document_event(self, msg: Message, event: MessageEvent) -> None: - doc = msg.document - try: - ext = "" - original_filename = doc.file_name or "" - if original_filename: - _, ext = os.path.splitext(original_filename) - ext = ext.lower() - - doc_mime = (doc.mime_type or "").lower() - if not ext and doc_mime: - ext = _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, "") - if not ext: - mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()} - ext = mime_to_ext.get(doc_mime, "") - - max_doc_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024) - if not doc.file_size or doc.file_size > max_doc_bytes: - limit_mb = max_doc_bytes // (1024 * 1024) - event.text = self._append_media_status_text( - event.text, - ( - "Observed Telegram document was too large or its size could not " - f"be verified. Maximum: {limit_mb} MB." - ), - ) - logger.info("[Telegram] Observed group document too large: %s bytes", doc.file_size) - return - - if ext in _TELEGRAM_IMAGE_EXTENSIONS or doc_mime.startswith("image/"): - file_obj = await doc.get_file() - image_bytes = await file_obj.download_as_bytearray() - image_ext = ( - ext - if ext in _TELEGRAM_IMAGE_EXTENSIONS - else _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, ".jpg") - ) - try: - cached_path = cache_image_from_bytes(bytes(image_bytes), ext=image_ext) - except ValueError as e: - logger.warning("[Telegram] Failed to cache observed image document: %s", e, exc_info=True) - event.text = self._append_media_status_text( - event.text, - ( - f"Image document '{original_filename or doc_mime or ext or 'unknown'}' " - "could not be read as an image." - ), - ) - return - event.message_type = MessageType.PHOTO - event.media_urls = [cached_path] - event.media_types = [ - doc_mime - if doc_mime.startswith("image/") - else _TELEGRAM_IMAGE_EXT_TO_MIME.get(image_ext, "image/jpeg") - ] - logger.info("[Telegram] Cached observed group image-document at %s", cached_path) - return - - if not ext and doc.mime_type: - video_mime_to_ext = {v: k for k, v in SUPPORTED_VIDEO_TYPES.items()} - ext = video_mime_to_ext.get(doc.mime_type, "") - - if not ext and doc.mime_type: - image_mime_to_ext: dict[str, str] = {} - for _ext, _mime in SUPPORTED_IMAGE_DOCUMENT_TYPES.items(): - image_mime_to_ext.setdefault(_mime, _ext) - ext = image_mime_to_ext.get(doc.mime_type, "") - - if ext in SUPPORTED_VIDEO_TYPES: - file_obj = await doc.get_file() - video_bytes = await file_obj.download_as_bytearray() - cached_path = cache_video_from_bytes(bytes(video_bytes), ext=ext) - event.media_urls = [cached_path] - event.media_types = [SUPPORTED_VIDEO_TYPES[ext]] - event.message_type = MessageType.VIDEO - logger.info("[Telegram] Cached observed group video document at %s", cached_path) - return - - if ext not in SUPPORTED_DOCUMENT_TYPES: - supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys())) - event.text = self._append_media_status_text( - event.text, - f"Unsupported document type '{ext or 'unknown'}'. Supported types: {supported_list}", - ) - logger.info("[Telegram] Unsupported observed group document type: %s", ext or "unknown") - return - - file_obj = await doc.get_file() - doc_bytes = await file_obj.download_as_bytearray() - raw_bytes = bytes(doc_bytes) - cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}") - mime_type = SUPPORTED_DOCUMENT_TYPES[ext] - event.media_urls = [cached_path] - event.media_types = [mime_type] - logger.info("[Telegram] Cached observed group document at %s", cached_path) - - max_text_inject_bytes = 100 * 1024 - if ext in {".md", ".txt"} and len(raw_bytes) <= max_text_inject_bytes: - try: - text_content = raw_bytes.decode("utf-8") - display_name = original_filename or f"document{ext}" - display_name = re.sub(r'[^\w.\- ]', '_', display_name) - injection = f"[Content of {display_name}]:\n{text_content}" - event.text = f"{injection}\n\n{event.text}" if event.text else injection - except UnicodeDecodeError: - logger.warning( - "[Telegram] Could not decode observed text file as UTF-8, skipping content injection", - exc_info=True, - ) - except Exception as e: - logger.warning("[Telegram] Failed to cache observed group document: %s", e, exc_info=True) - async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming media messages, downloading images to local cache.""" if not update.message: return if not self._should_process_message(update.message): if self._should_observe_unmentioned_group_message(update.message): - msg_type = self._media_message_type(update.message) - event = await self._build_observed_media_event( - update.message, - msg_type, - update_id=update.update_id, - ) + _m = update.message + _observe_type = self._media_message_type(_m) + _event = self._build_message_event(_m, _observe_type, update_id=update.update_id) + if _m.caption: + _event.text = self._clean_bot_trigger_text(_m.caption) + await self._cache_observed_media(_m, _event) self._observe_unmentioned_group_message( - update.message, - event.message_type, - update_id=update.update_id, - event=event, + _m, _event.message_type, update_id=update.update_id, event=_event ) return msg = update.message - - # Determine media type + msg_type = self._media_message_type(msg) - + event = self._build_message_event(msg, msg_type, update_id=update.update_id) # Add caption as text diff --git a/tests/gateway/test_document_cache.py b/tests/gateway/test_document_cache.py index cc756cea8..9043bf24d 100644 --- a/tests/gateway/test_document_cache.py +++ b/tests/gateway/test_document_cache.py @@ -155,3 +155,64 @@ class TestSupportedDocumentTypes: ) def test_expected_extensions_present(self, ext): assert ext in SUPPORTED_DOCUMENT_TYPES + + +# --------------------------------------------------------------------------- +# TestCacheMediaBytes — the unified, platform-agnostic caching primitive +# --------------------------------------------------------------------------- + +# 1x1 transparent PNG (passes cache_image_from_bytes validation) +_PNG_1PX = bytes.fromhex( + "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4" + "890000000d49444154789c6360000002000154a24f5f0000000049454e44ae426082" +) + + +class TestCacheMediaBytes: + def test_pdf_routes_to_document(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(b"%PDF-1.4 body", filename="report.pdf", mime_type="application/pdf") + assert result is not None + assert result.kind == "document" + assert result.media_type == "application/pdf" + assert "report.pdf" in result.display_name + assert os.path.exists(result.path) + assert "report.pdf" in result.context_note() + + def test_png_routes_to_image(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(_PNG_1PX, filename="photo.png", mime_type="image/png") + assert result is not None + assert result.kind == "image" + assert result.media_type == "image/png" + assert os.path.exists(result.path) + + def test_native_photo_without_filename_uses_default_kind(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(_PNG_1PX, filename="", mime_type="", default_kind="image") + assert result is not None + assert result.kind == "image" + + def test_mp4_routes_to_video(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(b"\x00\x00\x00\x18ftypmp42", filename="clip.mp4", mime_type="video/mp4") + assert result is not None + assert result.kind == "video" + assert result.media_type == "video/mp4" + + def test_mime_only_resolves_extension(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(b"col1,col2\n1,2", filename="", mime_type="text/csv") + assert result is not None + assert result.kind == "document" + assert result.media_type == "text/csv" + + def test_unsupported_document_returns_none(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(b"MZ", filename="malware.exe", mime_type="application/x-msdownload") + assert result is None + + def test_invalid_image_returns_none(self): + from gateway.platforms.base import cache_media_bytes + result = cache_media_bytes(b"not an image", filename="x.png", mime_type="image/png") + assert result is None diff --git a/tests/gateway/test_telegram_group_gating.py b/tests/gateway/test_telegram_group_gating.py index a96d634bb..f5fb112f1 100644 --- a/tests/gateway/test_telegram_group_gating.py +++ b/tests/gateway/test_telegram_group_gating.py @@ -897,134 +897,6 @@ def _group_voice_message( ) -def _group_photo_message( - *, - chat_id=-100, - from_user_id=111, - from_user_name="Alice Example", - caption="Veja esta foto", - file_size=1024, -): - file_obj = SimpleNamespace( - file_path="photos/observed.png", - download_as_bytearray=AsyncMock(return_value=bytearray(b"\x89PNG\r\n\x1a\n observed")), - ) - photo = SimpleNamespace( - file_size=file_size, - get_file=AsyncMock(return_value=file_obj), - ) - return SimpleNamespace( - message_id=52, - text=None, - caption=caption, - entities=[], - caption_entities=[], - message_thread_id=None, - is_topic_message=False, - chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), - from_user=SimpleNamespace( - id=from_user_id, full_name=from_user_name, - first_name=from_user_name.split()[0], - ), - reply_to_message=None, - date=None, - location=None, - venue=None, - sticker=None, - photo=[photo], - video=None, - audio=None, - voice=None, - document=None, - ) - - -def _group_video_message( - *, - chat_id=-100, - from_user_id=111, - from_user_name="Alice Example", - caption="Veja este video", - file_size=1024, -): - file_obj = SimpleNamespace( - file_path="videos/observed.mp4", - download_as_bytearray=AsyncMock(return_value=bytearray(b"observed video")), - ) - video = SimpleNamespace( - file_size=file_size, - get_file=AsyncMock(return_value=file_obj), - ) - return SimpleNamespace( - message_id=53, - text=None, - caption=caption, - entities=[], - caption_entities=[], - message_thread_id=None, - is_topic_message=False, - chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), - from_user=SimpleNamespace( - id=from_user_id, full_name=from_user_name, - first_name=from_user_name.split()[0], - ), - reply_to_message=None, - date=None, - location=None, - venue=None, - sticker=None, - photo=None, - video=video, - audio=None, - voice=None, - document=None, - ) - - -def _group_document_message( - *, - chat_id=-100, - from_user_id=111, - from_user_name="Alice Example", - caption="Este arquivo", - document=None, -): - file_obj = SimpleNamespace( - file_path="documents/RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf", - download_as_bytearray=AsyncMock(return_value=bytearray(b"%PDF observed bytes")), - ) - document = document or SimpleNamespace( - file_name="RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf", - mime_type="application/pdf", - file_size=1024, - get_file=AsyncMock(return_value=file_obj), - ) - return SimpleNamespace( - message_id=52, - text=None, - caption=caption, - entities=[], - caption_entities=[], - message_thread_id=None, - is_topic_message=False, - chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), - from_user=SimpleNamespace( - id=from_user_id, full_name=from_user_name, - first_name=from_user_name.split()[0], - ), - reply_to_message=None, - date=None, - location=None, - venue=None, - sticker=None, - photo=None, - video=None, - audio=None, - voice=None, - document=document, - ) - - # --------------------------------------------------------------------------- # Observe + attribution parity: location messages # --------------------------------------------------------------------------- @@ -1111,190 +983,6 @@ def test_unmentioned_voice_message_observed_in_group(): asyncio.run(_run()) -def test_unmentioned_photo_message_observed_with_cached_path(monkeypatch, tmp_path): - async def _run(): - adapter = _make_adapter( - require_mention=True, - allowed_chats=["-100"], - group_allowed_chats=["-100"], - observe_unmentioned_group_messages=True, - ) - store = _FakeSessionStore() - adapter._session_store = store - cached_path = tmp_path / "img_abc_observed.png" - monkeypatch.setattr( - "gateway.platforms.telegram.cache_image_from_bytes", - lambda _data, ext=".jpg": str(cached_path), - ) - update = SimpleNamespace( - update_id=3003, - message=_group_photo_message(), - effective_message=None, - ) - - await adapter._handle_media_message(update, SimpleNamespace()) - - adapter._message_handler.assert_not_awaited() - assert len(store.messages) == 1 - _, message, _ = store.messages[0] - assert message["observed"] is True - assert "Veja esta foto" in message["content"] - assert "Observed Telegram image" in message["content"] - assert str(cached_path) in message["content"] - assert store.sources[0].user_id is None - - asyncio.run(_run()) - - -def test_unmentioned_video_too_large_observed_without_download(monkeypatch): - async def _run(): - adapter = _make_adapter( - require_mention=True, - allowed_chats=["-100"], - group_allowed_chats=["-100"], - observe_unmentioned_group_messages=True, - ) - adapter._max_doc_bytes = 100 - store = _FakeSessionStore() - adapter._session_store = store - cache_video = Mock(return_value="/tmp/observed.mp4") - monkeypatch.setattr("gateway.platforms.telegram.cache_video_from_bytes", cache_video) - message_obj = _group_video_message(file_size=101) - update = SimpleNamespace( - update_id=3004, - message=message_obj, - effective_message=None, - ) - - await adapter._handle_media_message(update, SimpleNamespace()) - - adapter._message_handler.assert_not_awaited() - cache_video.assert_not_called() - message_obj.video.get_file.assert_not_called() - assert len(store.messages) == 1 - _, message, _ = store.messages[0] - assert message["observed"] is True - assert "Veja este video" in message["content"] - assert "Observed Telegram video was too large" in message["content"] - assert "/tmp/observed.mp4" not in message["content"] - - asyncio.run(_run()) - - -def test_unmentioned_document_message_observed_with_cached_path(monkeypatch, tmp_path): - async def _run(): - adapter = _make_adapter( - require_mention=True, - allowed_chats=["-100"], - group_allowed_chats=["-100"], - observe_unmentioned_group_messages=True, - ) - store = _FakeSessionStore() - adapter._session_store = store - cached_path = tmp_path / "doc_abc_RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf" - monkeypatch.setattr( - "gateway.platforms.telegram.cache_document_from_bytes", - lambda _data, _filename: str(cached_path), - ) - update = SimpleNamespace( - update_id=3003, - message=_group_document_message(), - effective_message=None, - ) - - await adapter._handle_media_message(update, SimpleNamespace()) - - adapter._message_handler.assert_not_awaited() - assert len(store.messages) == 1 - _, message, _ = store.messages[0] - assert message["observed"] is True - assert "Este arquivo" in message["content"] - assert "RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf" in message["content"] - assert str(cached_path) in message["content"] - assert store.sources[0].user_id is None - - asyncio.run(_run()) - - -def test_unmentioned_large_document_observed_without_download(monkeypatch): - async def _run(): - adapter = _make_adapter( - require_mention=True, - allowed_chats=["-100"], - group_allowed_chats=["-100"], - observe_unmentioned_group_messages=True, - ) - adapter._max_doc_bytes = 100 - store = _FakeSessionStore() - adapter._session_store = store - cache_document = Mock(return_value="/tmp/huge.pdf") - monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document) - document = SimpleNamespace( - file_name="huge.pdf", - mime_type="application/pdf", - file_size=101, - get_file=AsyncMock(), - ) - update = SimpleNamespace( - update_id=3005, - message=_group_document_message(document=document), - effective_message=None, - ) - - await adapter._handle_media_message(update, SimpleNamespace()) - - adapter._message_handler.assert_not_awaited() - cache_document.assert_not_called() - document.get_file.assert_not_called() - assert len(store.messages) == 1 - _, message, _ = store.messages[0] - assert message["observed"] is True - assert "Este arquivo" in message["content"] - assert "Observed Telegram document was too large" in message["content"] - assert "/tmp/huge.pdf" not in message["content"] - - asyncio.run(_run()) - - -def test_unmentioned_unsupported_document_observed_without_download(monkeypatch): - async def _run(): - adapter = _make_adapter( - require_mention=True, - allowed_chats=["-100"], - group_allowed_chats=["-100"], - observe_unmentioned_group_messages=True, - ) - store = _FakeSessionStore() - adapter._session_store = store - cache_document = Mock(return_value="/tmp/malware.exe") - monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document) - document = SimpleNamespace( - file_name="malware.exe", - mime_type="application/x-msdownload", - file_size=100, - get_file=AsyncMock(), - ) - update = SimpleNamespace( - update_id=3006, - message=_group_document_message(document=document), - effective_message=None, - ) - - await adapter._handle_media_message(update, SimpleNamespace()) - - adapter._message_handler.assert_not_awaited() - cache_document.assert_not_called() - document.get_file.assert_not_called() - assert len(store.messages) == 1 - _, message, _ = store.messages[0] - assert message["observed"] is True - assert "Este arquivo" in message["content"] - assert "Unsupported document type '.exe'" in message["content"] - assert "/tmp/malware.exe" not in message["content"] - - asyncio.run(_run()) - - def test_triggered_voice_message_uses_shared_session_in_observe_mode(): async def _run(): adapter = _make_adapter( @@ -1317,3 +1005,160 @@ def test_triggered_voice_message_uses_shared_session_in_observe_mode(): assert "[Alice Example|111]" in event.text asyncio.run(_run()) + + +# --------------------------------------------------------------------------- +# Observed-media caching (unmentioned group attachments) +# --------------------------------------------------------------------------- + +def _group_photo_message(*, chat_id=-100, caption="Veja esta foto", file_size=1024): + file_obj = SimpleNamespace( + file_path="photos/observed.png", + download_as_bytearray=AsyncMock(return_value=bytearray(b"\x89PNG\r\n\x1a\n observed")), + ) + photo = SimpleNamespace(file_size=file_size, get_file=AsyncMock(return_value=file_obj)) + return SimpleNamespace( + message_id=52, text=None, caption=caption, entities=[], caption_entities=[], + message_thread_id=None, is_topic_message=False, + chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), + from_user=SimpleNamespace(id=111, full_name="Alice Example", first_name="Alice"), + reply_to_message=None, date=None, location=None, venue=None, + sticker=None, photo=[photo], video=None, audio=None, voice=None, document=None, + ) + + +def _group_document_message(*, chat_id=-100, caption="Este arquivo", document=None): + file_obj = SimpleNamespace( + file_path="documents/report.pdf", + download_as_bytearray=AsyncMock(return_value=bytearray(b"%PDF observed bytes")), + ) + document = document or SimpleNamespace( + file_name="RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf", + mime_type="application/pdf", file_size=1024, + get_file=AsyncMock(return_value=file_obj), + ) + return SimpleNamespace( + message_id=53, text=None, caption=caption, entities=[], caption_entities=[], + message_thread_id=None, is_topic_message=False, + chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False), + from_user=SimpleNamespace(id=111, full_name="Alice Example", first_name="Alice"), + reply_to_message=None, date=None, location=None, venue=None, + sticker=None, photo=None, video=None, audio=None, voice=None, document=document, + ) + + +def test_unmentioned_photo_observed_with_cached_path(monkeypatch, tmp_path): + async def _run(): + adapter = _make_adapter( + require_mention=True, allowed_chats=["-100"], + group_allowed_chats=["-100"], observe_unmentioned_group_messages=True, + ) + store = _FakeSessionStore() + adapter._session_store = store + cached_path = tmp_path / "img_abc_observed.png" + monkeypatch.setattr( + "gateway.platforms.base.cache_image_from_bytes", + lambda _data, ext=".jpg": str(cached_path), + ) + update = SimpleNamespace(update_id=3003, message=_group_photo_message(), effective_message=None) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Veja esta foto" in message["content"] + assert "image" in message["content"] + assert str(cached_path) in message["content"] + assert store.sources[0].user_id is None + + asyncio.run(_run()) + + +def test_unmentioned_document_observed_with_cached_path(monkeypatch, tmp_path): + async def _run(): + adapter = _make_adapter( + require_mention=True, allowed_chats=["-100"], + group_allowed_chats=["-100"], observe_unmentioned_group_messages=True, + ) + store = _FakeSessionStore() + adapter._session_store = store + cached_path = tmp_path / "doc_abc_report.pdf" + monkeypatch.setattr( + "gateway.platforms.base.cache_document_from_bytes", + lambda _data, _filename: str(cached_path), + ) + update = SimpleNamespace(update_id=3004, message=_group_document_message(), effective_message=None) + + await adapter._handle_media_message(update, SimpleNamespace()) + + adapter._message_handler.assert_not_awaited() + assert len(store.messages) == 1 + _, message, _ = store.messages[0] + assert message["observed"] is True + assert "Este arquivo" in message["content"] + assert str(cached_path) in message["content"] + + asyncio.run(_run()) + + +def test_unmentioned_large_document_observed_without_download(monkeypatch): + async def _run(): + adapter = _make_adapter( + require_mention=True, allowed_chats=["-100"], + group_allowed_chats=["-100"], observe_unmentioned_group_messages=True, + ) + adapter._max_doc_bytes = 100 + store = _FakeSessionStore() + adapter._session_store = store + cache_doc = Mock(return_value="/tmp/huge.pdf") + monkeypatch.setattr("gateway.platforms.base.cache_document_from_bytes", cache_doc) + document = SimpleNamespace( + file_name="huge.pdf", mime_type="application/pdf", + file_size=101, get_file=AsyncMock(), + ) + update = SimpleNamespace( + update_id=3005, message=_group_document_message(document=document), effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + cache_doc.assert_not_called() + document.get_file.assert_not_called() + _, message, _ = store.messages[0] + assert "too large" in message["content"] + assert "/tmp/huge.pdf" not in message["content"] + + asyncio.run(_run()) + + +def test_unmentioned_unsupported_document_observed_without_caching(monkeypatch): + async def _run(): + adapter = _make_adapter( + require_mention=True, allowed_chats=["-100"], + group_allowed_chats=["-100"], observe_unmentioned_group_messages=True, + ) + store = _FakeSessionStore() + adapter._session_store = store + cache_doc = Mock(return_value="/tmp/malware.exe") + monkeypatch.setattr("gateway.platforms.base.cache_document_from_bytes", cache_doc) + file_obj = SimpleNamespace( + file_path="documents/malware.exe", + download_as_bytearray=AsyncMock(return_value=bytearray(b"MZ")), + ) + document = SimpleNamespace( + file_name="malware.exe", mime_type="application/x-msdownload", + file_size=2, get_file=AsyncMock(return_value=file_obj), + ) + update = SimpleNamespace( + update_id=3006, message=_group_document_message(document=document), effective_message=None, + ) + + await adapter._handle_media_message(update, SimpleNamespace()) + + cache_doc.assert_not_called() + _, message, _ = store.messages[0] + assert "unsupported" in message["content"].lower() + + asyncio.run(_run())