refactor(telegram): generalize observed-media caching into a reusable primitive

Collapse the per-type observed-media dispatch into one platform-agnostic
cache_media_bytes() helper in gateway/platforms/base.py. Any adapter can now
hand it raw attachment bytes + a filename/MIME hint; it classifies against the
shared MIME registries, routes to the right cache_*_from_bytes helper,
sandbox-translates the path, and returns a CachedMedia with a ready
context_note(). Telegram's observed-group path shrinks to: size-gate, download,
call the helper, annotate. Also dedupes the addressed-media type ladder into
_media_message_type().

Net: contributor's Telegram-only +595 LOC becomes a +210/-32 production change,
with the reusable primitive available to Discord/Slack/Signal/etc.

Co-authored-by: Glucksberg <markuscontasul@gmail.com>
This commit is contained in:
teknium1
2026-06-01 19:52:53 -07:00
committed by Teknium
parent f768e75ecf
commit fa3b06b035
4 changed files with 420 additions and 619 deletions

View File

@ -1265,6 +1265,107 @@ def cleanup_document_cache(max_age_hours: int = 24) -> int:
return removed
# ---------------------------------------------------------------------------
# Unified media caching
#
# One entry point for "I have raw attachment bytes from a platform — cache them
# and tell me what I got." Classifies by extension/MIME against the shared
# registries above, routes to the right cache_*_from_bytes helper, and returns
# a small result the caller can store and/or describe in a transcript. Used by
# both the addressed-message path and the observed-group-context path, on any
# platform — not Telegram-specific.
# ---------------------------------------------------------------------------
@dataclass
class CachedMedia:
"""Result of caching one attachment's bytes."""
path: str # absolute cache path, agent-visible (sandbox-translated)
media_type: str # MIME type recorded on the MessageEvent
kind: str # "image" | "video" | "audio" | "document"
display_name: str # human-readable name for transcript notes
def context_note(self) -> str:
"""One-line transcript annotation pointing the agent at the file."""
return f"[{self.kind} '{self.display_name}' saved at: {self.path}]"
def _resolve_media_ext(filename: str, mime_type: str) -> str:
"""Best-effort file extension from filename, then MIME fallback."""
if filename:
ext = os.path.splitext(filename)[1].lower()
if ext:
return ext
mime = (mime_type or "").lower()
if not mime:
return ""
for table in (
SUPPORTED_IMAGE_DOCUMENT_TYPES,
SUPPORTED_VIDEO_TYPES,
SUPPORTED_DOCUMENT_TYPES,
):
for ext, m in table.items():
if m == mime:
return ext
return ""
def cache_media_bytes(
data: bytes,
*,
filename: str = "",
mime_type: str = "",
default_kind: Optional[str] = None,
) -> Optional[CachedMedia]:
"""Classify and cache raw attachment bytes; return a CachedMedia or None.
``default_kind`` ("image"/"video"/"audio"/"document") biases classification
when the extension/MIME are ambiguous — e.g. a Telegram native photo whose
file has no usable name. Unsupported document types return None so the
caller can record an "unsupported" note. Images that fail validation
(``cache_image_from_bytes`` raises ValueError) also return None.
"""
from tools.credential_files import to_agent_visible_cache_path
ext = _resolve_media_ext(filename, mime_type)
mime = (mime_type or "").lower()
display = re.sub(r"[^\w.\- ]", "_", filename) if filename else (ext.lstrip(".") or "file")
is_image = (
mime.startswith("image/")
or ext in SUPPORTED_IMAGE_DOCUMENT_TYPES
or default_kind == "image"
)
is_video = mime.startswith("video/") or ext in SUPPORTED_VIDEO_TYPES or default_kind == "video"
is_audio = mime.startswith("audio/") or default_kind == "audio"
if is_image:
img_ext = ext if ext in SUPPORTED_IMAGE_DOCUMENT_TYPES else ".jpg"
try:
path = cache_image_from_bytes(data, ext=img_ext)
except ValueError:
return None
out_mime = mime if mime.startswith("image/") else SUPPORTED_IMAGE_DOCUMENT_TYPES.get(img_ext, "image/jpeg")
return CachedMedia(to_agent_visible_cache_path(path), out_mime, "image", display)
if is_video:
vid_ext = ext if ext in SUPPORTED_VIDEO_TYPES else ".mp4"
path = cache_video_from_bytes(data, ext=vid_ext)
return CachedMedia(to_agent_visible_cache_path(path), SUPPORTED_VIDEO_TYPES.get(vid_ext, "video/mp4"), "video", display)
if is_audio:
aud_ext = ext if ext in {".ogg", ".mp3", ".wav", ".m4a", ".opus", ".flac"} else ".ogg"
path = cache_audio_from_bytes(data, ext=aud_ext)
out_mime = mime if mime.startswith("audio/") else f"audio/{aud_ext.lstrip('.')}"
return CachedMedia(to_agent_visible_cache_path(path), out_mime, "audio", display)
if ext not in SUPPORTED_DOCUMENT_TYPES:
return None
path = cache_document_from_bytes(data, filename or f"document{ext}")
return CachedMedia(to_agent_visible_cache_path(path), SUPPORTED_DOCUMENT_TYPES[ext], "document", display or f"document{ext}")
class MessageType(Enum):
"""Types of incoming messages."""
TEXT = "text"

View File

@ -16,7 +16,7 @@ import tempfile
import html as _html
import re
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional, Set
from typing import Dict, List, Optional, Set, Any
logger = logging.getLogger(__name__)
@ -4877,58 +4877,7 @@ class TelegramAdapter(BasePlatformAdapter):
def _telegram_group_observe_attributed_text(self, event: MessageEvent) -> str:
user_id = event.source.user_id or "unknown"
sender = event.source.user_name or user_id
parts: list[str] = []
if event.text:
parts.append(event.text)
media_note = self._telegram_group_observe_media_note(event)
if media_note:
parts.append(media_note)
body = "\n\n".join(parts)
return f"[{sender}|{user_id}]\n{body}"
@staticmethod
def _append_media_status_text(existing: str, note: str) -> str:
if not note:
return existing
if not existing:
return note
return f"{existing}\n\n{note}"
def _telegram_group_observe_media_note(self, event: MessageEvent) -> str:
"""Describe cached observed media in transcript context."""
media_urls = list(getattr(event, "media_urls", None) or [])
if not media_urls:
return ""
try:
from tools.credential_files import to_agent_visible_cache_path
except Exception:
to_agent_visible_cache_path = lambda path: path # type: ignore[assignment]
lines: list[str] = []
media_types = list(getattr(event, "media_types", None) or [])
for index, path in enumerate(media_urls):
mtype = media_types[index] if index < len(media_types) else ""
basename = os.path.basename(path)
parts = basename.split("_", 2)
display_name = parts[2] if len(parts) >= 3 else basename
display_name = re.sub(r'[^\w.\- ]', '_', display_name)
agent_path = to_agent_visible_cache_path(path)
if mtype.startswith("image/") or event.message_type == MessageType.PHOTO:
label = "image"
elif event.message_type == MessageType.VOICE:
label = "voice message"
elif event.message_type == MessageType.AUDIO or mtype.startswith("audio/"):
label = "audio file"
elif mtype.startswith("video/") or event.message_type == MessageType.VIDEO:
label = "video"
else:
label = "document"
lines.append(
f"[Observed Telegram {label}: '{display_name}' saved at: {agent_path}]"
)
return "\n".join(lines)
return f"[{sender}|{user_id}]\n{event.text or ''}"
def _telegram_group_observe_channel_prompt(self) -> str:
username = getattr(getattr(self, "_bot", None), "username", None) or "unknown"
@ -4969,6 +4918,96 @@ class TelegramAdapter(BasePlatformAdapter):
channel_prompt=channel_prompt,
)
def _media_message_type(self, msg: Message) -> MessageType:
"""Classify a Telegram media message into a MessageType."""
if msg.sticker:
return MessageType.STICKER
if msg.photo:
return MessageType.PHOTO
if msg.video:
return MessageType.VIDEO
if msg.audio:
return MessageType.AUDIO
if msg.voice:
return MessageType.VOICE
return MessageType.DOCUMENT
async def _cache_observed_media(self, msg: Message, event: MessageEvent) -> None:
"""Cache an unmentioned group attachment and annotate the observed text.
Passive group traffic, so downloads are bounded by the same
``_max_doc_bytes`` limit as the addressed document path. Oversized or
unsupported attachments are noted in the transcript without downloading.
"""
from gateway.platforms.base import cache_media_bytes
source, filename, mime, kind = self._observed_media_source(msg)
if source is None:
return
max_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024)
file_size = getattr(source, "file_size", None)
try:
size = int(file_size or 0)
except (TypeError, ValueError):
size = 0
if not (0 < size <= max_bytes):
limit_mb = max_bytes // (1024 * 1024)
event.text = self._append_observed_note(
event.text,
f"[Observed Telegram attachment too large or unverifiable. Maximum: {limit_mb} MB.]",
)
logger.info("[Telegram] Observed group attachment skipped (size=%s)", file_size)
return
try:
file_obj = await source.get_file()
data = bytes(await file_obj.download_as_bytearray())
if not filename:
filename = os.path.basename(getattr(file_obj, "file_path", "") or "")
cached = cache_media_bytes(data, filename=filename, mime_type=mime, default_kind=kind)
except Exception as exc:
logger.warning("[Telegram] Failed to cache observed group media: %s", exc, exc_info=True)
return
if cached is None:
event.text = self._append_observed_note(
event.text, "[Observed Telegram attachment: unsupported type, not cached.]"
)
return
event.media_urls = [cached.path]
event.media_types = [cached.media_type]
if cached.kind == "image":
event.message_type = MessageType.PHOTO
elif cached.kind == "video":
event.message_type = MessageType.VIDEO
event.text = self._append_observed_note(event.text, cached.context_note())
logger.info("[Telegram] Cached observed group %s at %s", cached.kind, cached.path)
def _observed_media_source(self, msg: Message):
"""Return (telegram_file_source, filename, mime, default_kind) or Nones."""
if msg.photo:
return msg.photo[-1], "", "", "image"
if msg.video:
return msg.video, "", "video/mp4", "video"
if msg.voice:
return msg.voice, "voice.ogg", "audio/ogg", "audio"
if msg.audio:
return msg.audio, getattr(msg.audio, "file_name", "") or "", "", "audio"
if msg.document:
doc = msg.document
return doc, doc.file_name or "", (doc.mime_type or "").lower(), None
return None, "", "", None
@staticmethod
def _append_observed_note(existing: Optional[str], note: str) -> str:
if not note:
return existing or ""
if not existing:
return note
return f"{existing}\n\n{note}"
def _observe_unmentioned_group_message(
self,
message: Message,
@ -5335,272 +5374,27 @@ class TelegramAdapter(BasePlatformAdapter):
self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key))
def _media_message_type(self, msg: Message) -> MessageType:
if msg.sticker:
return MessageType.STICKER
if msg.photo:
return MessageType.PHOTO
if msg.video:
return MessageType.VIDEO
if msg.audio:
return MessageType.AUDIO
if msg.voice:
return MessageType.VOICE
return MessageType.DOCUMENT
def _observed_media_exceeds_cache_limit(self, event: MessageEvent, label: str, file_size: Any) -> bool:
max_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024)
try:
size = int(file_size or 0)
except (TypeError, ValueError):
size = 0
if 0 < size <= max_bytes:
return False
limit_mb = max_bytes // (1024 * 1024)
event.text = self._append_media_status_text(
event.text,
(
f"Observed Telegram {label} was too large or its size could not be "
f"verified. Maximum: {limit_mb} MB."
),
)
logger.info(
"[Telegram] Observed group %s too large or unknown size: %s bytes",
label,
file_size,
)
return True
async def _cache_observed_simple_media(
self,
event: MessageEvent,
*,
source: Any,
label: str,
cache_fn: Callable[..., str],
default_ext: str,
media_type_for_ext: Callable[[str], str],
ext_candidates: Optional[Any] = None,
) -> None:
try:
if self._observed_media_exceeds_cache_limit(event, label, getattr(source, "file_size", None)):
return
file_obj = await source.get_file()
media_bytes = await file_obj.download_as_bytearray()
ext = default_ext
file_path = getattr(file_obj, "file_path", None)
if file_path and ext_candidates:
file_path_lower = file_path.lower()
for candidate in ext_candidates:
if file_path_lower.endswith(candidate):
ext = candidate
break
cached_path = cache_fn(bytes(media_bytes), ext=ext)
event.media_urls = [cached_path]
event.media_types = [media_type_for_ext(ext)]
logger.info("[Telegram] Cached observed group %s at %s", label, cached_path)
except Exception as e:
logger.warning("[Telegram] Failed to cache observed group %s: %s", label, e, exc_info=True)
async def _build_observed_media_event(
self,
msg: Message,
msg_type: MessageType,
update_id: Optional[int] = None,
) -> MessageEvent:
"""Build and cache media for an observed, unmentioned group message."""
event = self._build_message_event(msg, msg_type, update_id=update_id)
if msg.caption:
event.text = self._clean_bot_trigger_text(msg.caption)
if msg.photo:
await self._cache_observed_simple_media(
event,
source=msg.photo[-1],
label="image",
cache_fn=cache_image_from_bytes,
default_ext=".jpg",
ext_candidates=(".png", ".webp", ".gif", ".jpeg", ".jpg"),
media_type_for_ext=lambda ext: f"image/{ext.lstrip('.')}",
)
elif msg.voice:
await self._cache_observed_simple_media(
event,
source=msg.voice,
label="voice message",
cache_fn=cache_audio_from_bytes,
default_ext=".ogg",
media_type_for_ext=lambda _ext: "audio/ogg",
)
elif msg.audio:
await self._cache_observed_simple_media(
event,
source=msg.audio,
label="audio file",
cache_fn=cache_audio_from_bytes,
default_ext=".mp3",
media_type_for_ext=lambda _ext: "audio/mp3",
)
elif msg.video:
await self._cache_observed_simple_media(
event,
source=msg.video,
label="video",
cache_fn=cache_video_from_bytes,
default_ext=".mp4",
ext_candidates=SUPPORTED_VIDEO_TYPES,
media_type_for_ext=lambda ext: SUPPORTED_VIDEO_TYPES.get(ext, "video/mp4"),
)
elif msg.document:
await self._cache_observed_document_event(msg, event)
return event
async def _cache_observed_document_event(self, msg: Message, event: MessageEvent) -> None:
doc = msg.document
try:
ext = ""
original_filename = doc.file_name or ""
if original_filename:
_, ext = os.path.splitext(original_filename)
ext = ext.lower()
doc_mime = (doc.mime_type or "").lower()
if not ext and doc_mime:
ext = _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, "")
if not ext:
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
ext = mime_to_ext.get(doc_mime, "")
max_doc_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024)
if not doc.file_size or doc.file_size > max_doc_bytes:
limit_mb = max_doc_bytes // (1024 * 1024)
event.text = self._append_media_status_text(
event.text,
(
"Observed Telegram document was too large or its size could not "
f"be verified. Maximum: {limit_mb} MB."
),
)
logger.info("[Telegram] Observed group document too large: %s bytes", doc.file_size)
return
if ext in _TELEGRAM_IMAGE_EXTENSIONS or doc_mime.startswith("image/"):
file_obj = await doc.get_file()
image_bytes = await file_obj.download_as_bytearray()
image_ext = (
ext
if ext in _TELEGRAM_IMAGE_EXTENSIONS
else _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, ".jpg")
)
try:
cached_path = cache_image_from_bytes(bytes(image_bytes), ext=image_ext)
except ValueError as e:
logger.warning("[Telegram] Failed to cache observed image document: %s", e, exc_info=True)
event.text = self._append_media_status_text(
event.text,
(
f"Image document '{original_filename or doc_mime or ext or 'unknown'}' "
"could not be read as an image."
),
)
return
event.message_type = MessageType.PHOTO
event.media_urls = [cached_path]
event.media_types = [
doc_mime
if doc_mime.startswith("image/")
else _TELEGRAM_IMAGE_EXT_TO_MIME.get(image_ext, "image/jpeg")
]
logger.info("[Telegram] Cached observed group image-document at %s", cached_path)
return
if not ext and doc.mime_type:
video_mime_to_ext = {v: k for k, v in SUPPORTED_VIDEO_TYPES.items()}
ext = video_mime_to_ext.get(doc.mime_type, "")
if not ext and doc.mime_type:
image_mime_to_ext: dict[str, str] = {}
for _ext, _mime in SUPPORTED_IMAGE_DOCUMENT_TYPES.items():
image_mime_to_ext.setdefault(_mime, _ext)
ext = image_mime_to_ext.get(doc.mime_type, "")
if ext in SUPPORTED_VIDEO_TYPES:
file_obj = await doc.get_file()
video_bytes = await file_obj.download_as_bytearray()
cached_path = cache_video_from_bytes(bytes(video_bytes), ext=ext)
event.media_urls = [cached_path]
event.media_types = [SUPPORTED_VIDEO_TYPES[ext]]
event.message_type = MessageType.VIDEO
logger.info("[Telegram] Cached observed group video document at %s", cached_path)
return
if ext not in SUPPORTED_DOCUMENT_TYPES:
supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys()))
event.text = self._append_media_status_text(
event.text,
f"Unsupported document type '{ext or 'unknown'}'. Supported types: {supported_list}",
)
logger.info("[Telegram] Unsupported observed group document type: %s", ext or "unknown")
return
file_obj = await doc.get_file()
doc_bytes = await file_obj.download_as_bytearray()
raw_bytes = bytes(doc_bytes)
cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}")
mime_type = SUPPORTED_DOCUMENT_TYPES[ext]
event.media_urls = [cached_path]
event.media_types = [mime_type]
logger.info("[Telegram] Cached observed group document at %s", cached_path)
max_text_inject_bytes = 100 * 1024
if ext in {".md", ".txt"} and len(raw_bytes) <= max_text_inject_bytes:
try:
text_content = raw_bytes.decode("utf-8")
display_name = original_filename or f"document{ext}"
display_name = re.sub(r'[^\w.\- ]', '_', display_name)
injection = f"[Content of {display_name}]:\n{text_content}"
event.text = f"{injection}\n\n{event.text}" if event.text else injection
except UnicodeDecodeError:
logger.warning(
"[Telegram] Could not decode observed text file as UTF-8, skipping content injection",
exc_info=True,
)
except Exception as e:
logger.warning("[Telegram] Failed to cache observed group document: %s", e, exc_info=True)
async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming media messages, downloading images to local cache."""
if not update.message:
return
if not self._should_process_message(update.message):
if self._should_observe_unmentioned_group_message(update.message):
msg_type = self._media_message_type(update.message)
event = await self._build_observed_media_event(
update.message,
msg_type,
update_id=update.update_id,
)
_m = update.message
_observe_type = self._media_message_type(_m)
_event = self._build_message_event(_m, _observe_type, update_id=update.update_id)
if _m.caption:
_event.text = self._clean_bot_trigger_text(_m.caption)
await self._cache_observed_media(_m, _event)
self._observe_unmentioned_group_message(
update.message,
event.message_type,
update_id=update.update_id,
event=event,
_m, _event.message_type, update_id=update.update_id, event=_event
)
return
msg = update.message
# Determine media type
msg_type = self._media_message_type(msg)
event = self._build_message_event(msg, msg_type, update_id=update.update_id)
# Add caption as text

View File

@ -155,3 +155,64 @@ class TestSupportedDocumentTypes:
)
def test_expected_extensions_present(self, ext):
assert ext in SUPPORTED_DOCUMENT_TYPES
# ---------------------------------------------------------------------------
# TestCacheMediaBytes — the unified, platform-agnostic caching primitive
# ---------------------------------------------------------------------------
# 1x1 transparent PNG (passes cache_image_from_bytes validation)
_PNG_1PX = bytes.fromhex(
"89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4"
"890000000d49444154789c6360000002000154a24f5f0000000049454e44ae426082"
)
class TestCacheMediaBytes:
def test_pdf_routes_to_document(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(b"%PDF-1.4 body", filename="report.pdf", mime_type="application/pdf")
assert result is not None
assert result.kind == "document"
assert result.media_type == "application/pdf"
assert "report.pdf" in result.display_name
assert os.path.exists(result.path)
assert "report.pdf" in result.context_note()
def test_png_routes_to_image(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(_PNG_1PX, filename="photo.png", mime_type="image/png")
assert result is not None
assert result.kind == "image"
assert result.media_type == "image/png"
assert os.path.exists(result.path)
def test_native_photo_without_filename_uses_default_kind(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(_PNG_1PX, filename="", mime_type="", default_kind="image")
assert result is not None
assert result.kind == "image"
def test_mp4_routes_to_video(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(b"\x00\x00\x00\x18ftypmp42", filename="clip.mp4", mime_type="video/mp4")
assert result is not None
assert result.kind == "video"
assert result.media_type == "video/mp4"
def test_mime_only_resolves_extension(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(b"col1,col2\n1,2", filename="", mime_type="text/csv")
assert result is not None
assert result.kind == "document"
assert result.media_type == "text/csv"
def test_unsupported_document_returns_none(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(b"MZ", filename="malware.exe", mime_type="application/x-msdownload")
assert result is None
def test_invalid_image_returns_none(self):
from gateway.platforms.base import cache_media_bytes
result = cache_media_bytes(b"<html>not an image</html>", filename="x.png", mime_type="image/png")
assert result is None

View File

@ -897,134 +897,6 @@ def _group_voice_message(
)
def _group_photo_message(
*,
chat_id=-100,
from_user_id=111,
from_user_name="Alice Example",
caption="Veja esta foto",
file_size=1024,
):
file_obj = SimpleNamespace(
file_path="photos/observed.png",
download_as_bytearray=AsyncMock(return_value=bytearray(b"\x89PNG\r\n\x1a\n observed")),
)
photo = SimpleNamespace(
file_size=file_size,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=52,
text=None,
caption=caption,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(
id=from_user_id, full_name=from_user_name,
first_name=from_user_name.split()[0],
),
reply_to_message=None,
date=None,
location=None,
venue=None,
sticker=None,
photo=[photo],
video=None,
audio=None,
voice=None,
document=None,
)
def _group_video_message(
*,
chat_id=-100,
from_user_id=111,
from_user_name="Alice Example",
caption="Veja este video",
file_size=1024,
):
file_obj = SimpleNamespace(
file_path="videos/observed.mp4",
download_as_bytearray=AsyncMock(return_value=bytearray(b"observed video")),
)
video = SimpleNamespace(
file_size=file_size,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=53,
text=None,
caption=caption,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(
id=from_user_id, full_name=from_user_name,
first_name=from_user_name.split()[0],
),
reply_to_message=None,
date=None,
location=None,
venue=None,
sticker=None,
photo=None,
video=video,
audio=None,
voice=None,
document=None,
)
def _group_document_message(
*,
chat_id=-100,
from_user_id=111,
from_user_name="Alice Example",
caption="Este arquivo",
document=None,
):
file_obj = SimpleNamespace(
file_path="documents/RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf",
download_as_bytearray=AsyncMock(return_value=bytearray(b"%PDF observed bytes")),
)
document = document or SimpleNamespace(
file_name="RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf",
mime_type="application/pdf",
file_size=1024,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=52,
text=None,
caption=caption,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(
id=from_user_id, full_name=from_user_name,
first_name=from_user_name.split()[0],
),
reply_to_message=None,
date=None,
location=None,
venue=None,
sticker=None,
photo=None,
video=None,
audio=None,
voice=None,
document=document,
)
# ---------------------------------------------------------------------------
# Observe + attribution parity: location messages
# ---------------------------------------------------------------------------
@ -1111,190 +983,6 @@ def test_unmentioned_voice_message_observed_in_group():
asyncio.run(_run())
def test_unmentioned_photo_message_observed_with_cached_path(monkeypatch, tmp_path):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cached_path = tmp_path / "img_abc_observed.png"
monkeypatch.setattr(
"gateway.platforms.telegram.cache_image_from_bytes",
lambda _data, ext=".jpg": str(cached_path),
)
update = SimpleNamespace(
update_id=3003,
message=_group_photo_message(),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Veja esta foto" in message["content"]
assert "Observed Telegram image" in message["content"]
assert str(cached_path) in message["content"]
assert store.sources[0].user_id is None
asyncio.run(_run())
def test_unmentioned_video_too_large_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
adapter._max_doc_bytes = 100
store = _FakeSessionStore()
adapter._session_store = store
cache_video = Mock(return_value="/tmp/observed.mp4")
monkeypatch.setattr("gateway.platforms.telegram.cache_video_from_bytes", cache_video)
message_obj = _group_video_message(file_size=101)
update = SimpleNamespace(
update_id=3004,
message=message_obj,
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
cache_video.assert_not_called()
message_obj.video.get_file.assert_not_called()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Veja este video" in message["content"]
assert "Observed Telegram video was too large" in message["content"]
assert "/tmp/observed.mp4" not in message["content"]
asyncio.run(_run())
def test_unmentioned_document_message_observed_with_cached_path(monkeypatch, tmp_path):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cached_path = tmp_path / "doc_abc_RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf"
monkeypatch.setattr(
"gateway.platforms.telegram.cache_document_from_bytes",
lambda _data, _filename: str(cached_path),
)
update = SimpleNamespace(
update_id=3003,
message=_group_document_message(),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert "RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf" in message["content"]
assert str(cached_path) in message["content"]
assert store.sources[0].user_id is None
asyncio.run(_run())
def test_unmentioned_large_document_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
adapter._max_doc_bytes = 100
store = _FakeSessionStore()
adapter._session_store = store
cache_document = Mock(return_value="/tmp/huge.pdf")
monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document)
document = SimpleNamespace(
file_name="huge.pdf",
mime_type="application/pdf",
file_size=101,
get_file=AsyncMock(),
)
update = SimpleNamespace(
update_id=3005,
message=_group_document_message(document=document),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
cache_document.assert_not_called()
document.get_file.assert_not_called()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert "Observed Telegram document was too large" in message["content"]
assert "/tmp/huge.pdf" not in message["content"]
asyncio.run(_run())
def test_unmentioned_unsupported_document_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cache_document = Mock(return_value="/tmp/malware.exe")
monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document)
document = SimpleNamespace(
file_name="malware.exe",
mime_type="application/x-msdownload",
file_size=100,
get_file=AsyncMock(),
)
update = SimpleNamespace(
update_id=3006,
message=_group_document_message(document=document),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
cache_document.assert_not_called()
document.get_file.assert_not_called()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert "Unsupported document type '.exe'" in message["content"]
assert "/tmp/malware.exe" not in message["content"]
asyncio.run(_run())
def test_triggered_voice_message_uses_shared_session_in_observe_mode():
async def _run():
adapter = _make_adapter(
@ -1317,3 +1005,160 @@ def test_triggered_voice_message_uses_shared_session_in_observe_mode():
assert "[Alice Example|111]" in event.text
asyncio.run(_run())
# ---------------------------------------------------------------------------
# Observed-media caching (unmentioned group attachments)
# ---------------------------------------------------------------------------
def _group_photo_message(*, chat_id=-100, caption="Veja esta foto", file_size=1024):
file_obj = SimpleNamespace(
file_path="photos/observed.png",
download_as_bytearray=AsyncMock(return_value=bytearray(b"\x89PNG\r\n\x1a\n observed")),
)
photo = SimpleNamespace(file_size=file_size, get_file=AsyncMock(return_value=file_obj))
return SimpleNamespace(
message_id=52, text=None, caption=caption, entities=[], caption_entities=[],
message_thread_id=None, is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(id=111, full_name="Alice Example", first_name="Alice"),
reply_to_message=None, date=None, location=None, venue=None,
sticker=None, photo=[photo], video=None, audio=None, voice=None, document=None,
)
def _group_document_message(*, chat_id=-100, caption="Este arquivo", document=None):
file_obj = SimpleNamespace(
file_path="documents/report.pdf",
download_as_bytearray=AsyncMock(return_value=bytearray(b"%PDF observed bytes")),
)
document = document or SimpleNamespace(
file_name="RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf",
mime_type="application/pdf", file_size=1024,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=53, text=None, caption=caption, entities=[], caption_entities=[],
message_thread_id=None, is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(id=111, full_name="Alice Example", first_name="Alice"),
reply_to_message=None, date=None, location=None, venue=None,
sticker=None, photo=None, video=None, audio=None, voice=None, document=document,
)
def test_unmentioned_photo_observed_with_cached_path(monkeypatch, tmp_path):
async def _run():
adapter = _make_adapter(
require_mention=True, allowed_chats=["-100"],
group_allowed_chats=["-100"], observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cached_path = tmp_path / "img_abc_observed.png"
monkeypatch.setattr(
"gateway.platforms.base.cache_image_from_bytes",
lambda _data, ext=".jpg": str(cached_path),
)
update = SimpleNamespace(update_id=3003, message=_group_photo_message(), effective_message=None)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Veja esta foto" in message["content"]
assert "image" in message["content"]
assert str(cached_path) in message["content"]
assert store.sources[0].user_id is None
asyncio.run(_run())
def test_unmentioned_document_observed_with_cached_path(monkeypatch, tmp_path):
async def _run():
adapter = _make_adapter(
require_mention=True, allowed_chats=["-100"],
group_allowed_chats=["-100"], observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cached_path = tmp_path / "doc_abc_report.pdf"
monkeypatch.setattr(
"gateway.platforms.base.cache_document_from_bytes",
lambda _data, _filename: str(cached_path),
)
update = SimpleNamespace(update_id=3004, message=_group_document_message(), effective_message=None)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert str(cached_path) in message["content"]
asyncio.run(_run())
def test_unmentioned_large_document_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True, allowed_chats=["-100"],
group_allowed_chats=["-100"], observe_unmentioned_group_messages=True,
)
adapter._max_doc_bytes = 100
store = _FakeSessionStore()
adapter._session_store = store
cache_doc = Mock(return_value="/tmp/huge.pdf")
monkeypatch.setattr("gateway.platforms.base.cache_document_from_bytes", cache_doc)
document = SimpleNamespace(
file_name="huge.pdf", mime_type="application/pdf",
file_size=101, get_file=AsyncMock(),
)
update = SimpleNamespace(
update_id=3005, message=_group_document_message(document=document), effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
cache_doc.assert_not_called()
document.get_file.assert_not_called()
_, message, _ = store.messages[0]
assert "too large" in message["content"]
assert "/tmp/huge.pdf" not in message["content"]
asyncio.run(_run())
def test_unmentioned_unsupported_document_observed_without_caching(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True, allowed_chats=["-100"],
group_allowed_chats=["-100"], observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cache_doc = Mock(return_value="/tmp/malware.exe")
monkeypatch.setattr("gateway.platforms.base.cache_document_from_bytes", cache_doc)
file_obj = SimpleNamespace(
file_path="documents/malware.exe",
download_as_bytearray=AsyncMock(return_value=bytearray(b"MZ")),
)
document = SimpleNamespace(
file_name="malware.exe", mime_type="application/x-msdownload",
file_size=2, get_file=AsyncMock(return_value=file_obj),
)
update = SimpleNamespace(
update_id=3006, message=_group_document_message(document=document), effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
cache_doc.assert_not_called()
_, message, _ = store.messages[0]
assert "unsupported" in message["content"].lower()
asyncio.run(_run())