fix(telegram): cache observed group media

This commit is contained in:
Glucksberg
2026-06-01 14:33:36 -04:00
committed by Teknium
parent 34468ed0d4
commit f768e75ecf
2 changed files with 628 additions and 33 deletions

View File

@ -16,7 +16,7 @@ import tempfile
import html as _html
import re
from datetime import datetime, timezone
from typing import Dict, List, Optional, Set, Any
from typing import Any, Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
@ -4877,7 +4877,58 @@ class TelegramAdapter(BasePlatformAdapter):
def _telegram_group_observe_attributed_text(self, event: MessageEvent) -> str:
user_id = event.source.user_id or "unknown"
sender = event.source.user_name or user_id
return f"[{sender}|{user_id}]\n{event.text or ''}"
parts: list[str] = []
if event.text:
parts.append(event.text)
media_note = self._telegram_group_observe_media_note(event)
if media_note:
parts.append(media_note)
body = "\n\n".join(parts)
return f"[{sender}|{user_id}]\n{body}"
@staticmethod
def _append_media_status_text(existing: str, note: str) -> str:
if not note:
return existing
if not existing:
return note
return f"{existing}\n\n{note}"
def _telegram_group_observe_media_note(self, event: MessageEvent) -> str:
"""Describe cached observed media in transcript context."""
media_urls = list(getattr(event, "media_urls", None) or [])
if not media_urls:
return ""
try:
from tools.credential_files import to_agent_visible_cache_path
except Exception:
to_agent_visible_cache_path = lambda path: path # type: ignore[assignment]
lines: list[str] = []
media_types = list(getattr(event, "media_types", None) or [])
for index, path in enumerate(media_urls):
mtype = media_types[index] if index < len(media_types) else ""
basename = os.path.basename(path)
parts = basename.split("_", 2)
display_name = parts[2] if len(parts) >= 3 else basename
display_name = re.sub(r'[^\w.\- ]', '_', display_name)
agent_path = to_agent_visible_cache_path(path)
if mtype.startswith("image/") or event.message_type == MessageType.PHOTO:
label = "image"
elif event.message_type == MessageType.VOICE:
label = "voice message"
elif event.message_type == MessageType.AUDIO or mtype.startswith("audio/"):
label = "audio file"
elif mtype.startswith("video/") or event.message_type == MessageType.VIDEO:
label = "video"
else:
label = "document"
lines.append(
f"[Observed Telegram {label}: '{display_name}' saved at: {agent_path}]"
)
return "\n".join(lines)
def _telegram_group_observe_channel_prompt(self) -> str:
username = getattr(getattr(self, "_bot", None), "username", None) or "unknown"
@ -4918,13 +4969,19 @@ class TelegramAdapter(BasePlatformAdapter):
channel_prompt=channel_prompt,
)
def _observe_unmentioned_group_message(self, message: Message, msg_type: MessageType, update_id: Optional[int] = None) -> None:
def _observe_unmentioned_group_message(
self,
message: Message,
msg_type: MessageType,
update_id: Optional[int] = None,
event: Optional[MessageEvent] = None,
) -> None:
"""Append skipped group chatter to the target session without dispatching."""
store = getattr(self, "_session_store", None)
if not store:
return
try:
event = self._build_message_event(message, msg_type, update_id=update_id)
event = event or self._build_message_event(message, msg_type, update_id=update_id)
shared_source = self._telegram_group_observe_shared_source(event.source)
session_entry = store.get_or_create_session(shared_source)
entry = {
@ -5278,45 +5335,271 @@ class TelegramAdapter(BasePlatformAdapter):
self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key))
def _media_message_type(self, msg: Message) -> MessageType:
if msg.sticker:
return MessageType.STICKER
if msg.photo:
return MessageType.PHOTO
if msg.video:
return MessageType.VIDEO
if msg.audio:
return MessageType.AUDIO
if msg.voice:
return MessageType.VOICE
return MessageType.DOCUMENT
def _observed_media_exceeds_cache_limit(self, event: MessageEvent, label: str, file_size: Any) -> bool:
max_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024)
try:
size = int(file_size or 0)
except (TypeError, ValueError):
size = 0
if 0 < size <= max_bytes:
return False
limit_mb = max_bytes // (1024 * 1024)
event.text = self._append_media_status_text(
event.text,
(
f"Observed Telegram {label} was too large or its size could not be "
f"verified. Maximum: {limit_mb} MB."
),
)
logger.info(
"[Telegram] Observed group %s too large or unknown size: %s bytes",
label,
file_size,
)
return True
async def _cache_observed_simple_media(
self,
event: MessageEvent,
*,
source: Any,
label: str,
cache_fn: Callable[..., str],
default_ext: str,
media_type_for_ext: Callable[[str], str],
ext_candidates: Optional[Any] = None,
) -> None:
try:
if self._observed_media_exceeds_cache_limit(event, label, getattr(source, "file_size", None)):
return
file_obj = await source.get_file()
media_bytes = await file_obj.download_as_bytearray()
ext = default_ext
file_path = getattr(file_obj, "file_path", None)
if file_path and ext_candidates:
file_path_lower = file_path.lower()
for candidate in ext_candidates:
if file_path_lower.endswith(candidate):
ext = candidate
break
cached_path = cache_fn(bytes(media_bytes), ext=ext)
event.media_urls = [cached_path]
event.media_types = [media_type_for_ext(ext)]
logger.info("[Telegram] Cached observed group %s at %s", label, cached_path)
except Exception as e:
logger.warning("[Telegram] Failed to cache observed group %s: %s", label, e, exc_info=True)
async def _build_observed_media_event(
self,
msg: Message,
msg_type: MessageType,
update_id: Optional[int] = None,
) -> MessageEvent:
"""Build and cache media for an observed, unmentioned group message."""
event = self._build_message_event(msg, msg_type, update_id=update_id)
if msg.caption:
event.text = self._clean_bot_trigger_text(msg.caption)
if msg.photo:
await self._cache_observed_simple_media(
event,
source=msg.photo[-1],
label="image",
cache_fn=cache_image_from_bytes,
default_ext=".jpg",
ext_candidates=(".png", ".webp", ".gif", ".jpeg", ".jpg"),
media_type_for_ext=lambda ext: f"image/{ext.lstrip('.')}",
)
elif msg.voice:
await self._cache_observed_simple_media(
event,
source=msg.voice,
label="voice message",
cache_fn=cache_audio_from_bytes,
default_ext=".ogg",
media_type_for_ext=lambda _ext: "audio/ogg",
)
elif msg.audio:
await self._cache_observed_simple_media(
event,
source=msg.audio,
label="audio file",
cache_fn=cache_audio_from_bytes,
default_ext=".mp3",
media_type_for_ext=lambda _ext: "audio/mp3",
)
elif msg.video:
await self._cache_observed_simple_media(
event,
source=msg.video,
label="video",
cache_fn=cache_video_from_bytes,
default_ext=".mp4",
ext_candidates=SUPPORTED_VIDEO_TYPES,
media_type_for_ext=lambda ext: SUPPORTED_VIDEO_TYPES.get(ext, "video/mp4"),
)
elif msg.document:
await self._cache_observed_document_event(msg, event)
return event
async def _cache_observed_document_event(self, msg: Message, event: MessageEvent) -> None:
doc = msg.document
try:
ext = ""
original_filename = doc.file_name or ""
if original_filename:
_, ext = os.path.splitext(original_filename)
ext = ext.lower()
doc_mime = (doc.mime_type or "").lower()
if not ext and doc_mime:
ext = _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, "")
if not ext:
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
ext = mime_to_ext.get(doc_mime, "")
max_doc_bytes = getattr(self, "_max_doc_bytes", 20 * 1024 * 1024)
if not doc.file_size or doc.file_size > max_doc_bytes:
limit_mb = max_doc_bytes // (1024 * 1024)
event.text = self._append_media_status_text(
event.text,
(
"Observed Telegram document was too large or its size could not "
f"be verified. Maximum: {limit_mb} MB."
),
)
logger.info("[Telegram] Observed group document too large: %s bytes", doc.file_size)
return
if ext in _TELEGRAM_IMAGE_EXTENSIONS or doc_mime.startswith("image/"):
file_obj = await doc.get_file()
image_bytes = await file_obj.download_as_bytearray()
image_ext = (
ext
if ext in _TELEGRAM_IMAGE_EXTENSIONS
else _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, ".jpg")
)
try:
cached_path = cache_image_from_bytes(bytes(image_bytes), ext=image_ext)
except ValueError as e:
logger.warning("[Telegram] Failed to cache observed image document: %s", e, exc_info=True)
event.text = self._append_media_status_text(
event.text,
(
f"Image document '{original_filename or doc_mime or ext or 'unknown'}' "
"could not be read as an image."
),
)
return
event.message_type = MessageType.PHOTO
event.media_urls = [cached_path]
event.media_types = [
doc_mime
if doc_mime.startswith("image/")
else _TELEGRAM_IMAGE_EXT_TO_MIME.get(image_ext, "image/jpeg")
]
logger.info("[Telegram] Cached observed group image-document at %s", cached_path)
return
if not ext and doc.mime_type:
video_mime_to_ext = {v: k for k, v in SUPPORTED_VIDEO_TYPES.items()}
ext = video_mime_to_ext.get(doc.mime_type, "")
if not ext and doc.mime_type:
image_mime_to_ext: dict[str, str] = {}
for _ext, _mime in SUPPORTED_IMAGE_DOCUMENT_TYPES.items():
image_mime_to_ext.setdefault(_mime, _ext)
ext = image_mime_to_ext.get(doc.mime_type, "")
if ext in SUPPORTED_VIDEO_TYPES:
file_obj = await doc.get_file()
video_bytes = await file_obj.download_as_bytearray()
cached_path = cache_video_from_bytes(bytes(video_bytes), ext=ext)
event.media_urls = [cached_path]
event.media_types = [SUPPORTED_VIDEO_TYPES[ext]]
event.message_type = MessageType.VIDEO
logger.info("[Telegram] Cached observed group video document at %s", cached_path)
return
if ext not in SUPPORTED_DOCUMENT_TYPES:
supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys()))
event.text = self._append_media_status_text(
event.text,
f"Unsupported document type '{ext or 'unknown'}'. Supported types: {supported_list}",
)
logger.info("[Telegram] Unsupported observed group document type: %s", ext or "unknown")
return
file_obj = await doc.get_file()
doc_bytes = await file_obj.download_as_bytearray()
raw_bytes = bytes(doc_bytes)
cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}")
mime_type = SUPPORTED_DOCUMENT_TYPES[ext]
event.media_urls = [cached_path]
event.media_types = [mime_type]
logger.info("[Telegram] Cached observed group document at %s", cached_path)
max_text_inject_bytes = 100 * 1024
if ext in {".md", ".txt"} and len(raw_bytes) <= max_text_inject_bytes:
try:
text_content = raw_bytes.decode("utf-8")
display_name = original_filename or f"document{ext}"
display_name = re.sub(r'[^\w.\- ]', '_', display_name)
injection = f"[Content of {display_name}]:\n{text_content}"
event.text = f"{injection}\n\n{event.text}" if event.text else injection
except UnicodeDecodeError:
logger.warning(
"[Telegram] Could not decode observed text file as UTF-8, skipping content injection",
exc_info=True,
)
except Exception as e:
logger.warning("[Telegram] Failed to cache observed group document: %s", e, exc_info=True)
async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming media messages, downloading images to local cache."""
if not update.message:
return
if not self._should_process_message(update.message):
if self._should_observe_unmentioned_group_message(update.message):
_m = update.message
if _m.sticker:
_observe_type = MessageType.STICKER
elif _m.photo:
_observe_type = MessageType.PHOTO
elif _m.video:
_observe_type = MessageType.VIDEO
elif _m.audio:
_observe_type = MessageType.AUDIO
elif _m.voice:
_observe_type = MessageType.VOICE
else:
_observe_type = MessageType.DOCUMENT
self._observe_unmentioned_group_message(_m, _observe_type, update_id=update.update_id)
msg_type = self._media_message_type(update.message)
event = await self._build_observed_media_event(
update.message,
msg_type,
update_id=update.update_id,
)
self._observe_unmentioned_group_message(
update.message,
event.message_type,
update_id=update.update_id,
event=event,
)
return
msg = update.message
# Determine media type
if msg.sticker:
msg_type = MessageType.STICKER
elif msg.photo:
msg_type = MessageType.PHOTO
elif msg.video:
msg_type = MessageType.VIDEO
elif msg.audio:
msg_type = MessageType.AUDIO
elif msg.voice:
msg_type = MessageType.VOICE
elif msg.document:
msg_type = MessageType.DOCUMENT
else:
msg_type = MessageType.DOCUMENT
msg_type = self._media_message_type(msg)
event = self._build_message_event(msg, msg_type, update_id=update.update_id)

View File

@ -1,7 +1,7 @@
import asyncio
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, Mock
from gateway.config import Platform, PlatformConfig, load_gateway_config
from gateway.platforms.base import MessageType
@ -897,6 +897,134 @@ def _group_voice_message(
)
def _group_photo_message(
*,
chat_id=-100,
from_user_id=111,
from_user_name="Alice Example",
caption="Veja esta foto",
file_size=1024,
):
file_obj = SimpleNamespace(
file_path="photos/observed.png",
download_as_bytearray=AsyncMock(return_value=bytearray(b"\x89PNG\r\n\x1a\n observed")),
)
photo = SimpleNamespace(
file_size=file_size,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=52,
text=None,
caption=caption,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(
id=from_user_id, full_name=from_user_name,
first_name=from_user_name.split()[0],
),
reply_to_message=None,
date=None,
location=None,
venue=None,
sticker=None,
photo=[photo],
video=None,
audio=None,
voice=None,
document=None,
)
def _group_video_message(
*,
chat_id=-100,
from_user_id=111,
from_user_name="Alice Example",
caption="Veja este video",
file_size=1024,
):
file_obj = SimpleNamespace(
file_path="videos/observed.mp4",
download_as_bytearray=AsyncMock(return_value=bytearray(b"observed video")),
)
video = SimpleNamespace(
file_size=file_size,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=53,
text=None,
caption=caption,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(
id=from_user_id, full_name=from_user_name,
first_name=from_user_name.split()[0],
),
reply_to_message=None,
date=None,
location=None,
venue=None,
sticker=None,
photo=None,
video=video,
audio=None,
voice=None,
document=None,
)
def _group_document_message(
*,
chat_id=-100,
from_user_id=111,
from_user_name="Alice Example",
caption="Este arquivo",
document=None,
):
file_obj = SimpleNamespace(
file_path="documents/RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf",
download_as_bytearray=AsyncMock(return_value=bytearray(b"%PDF observed bytes")),
)
document = document or SimpleNamespace(
file_name="RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf",
mime_type="application/pdf",
file_size=1024,
get_file=AsyncMock(return_value=file_obj),
)
return SimpleNamespace(
message_id=52,
text=None,
caption=caption,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type="group", title="Test Group", is_forum=False),
from_user=SimpleNamespace(
id=from_user_id, full_name=from_user_name,
first_name=from_user_name.split()[0],
),
reply_to_message=None,
date=None,
location=None,
venue=None,
sticker=None,
photo=None,
video=None,
audio=None,
voice=None,
document=document,
)
# ---------------------------------------------------------------------------
# Observe + attribution parity: location messages
# ---------------------------------------------------------------------------
@ -983,6 +1111,190 @@ def test_unmentioned_voice_message_observed_in_group():
asyncio.run(_run())
def test_unmentioned_photo_message_observed_with_cached_path(monkeypatch, tmp_path):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cached_path = tmp_path / "img_abc_observed.png"
monkeypatch.setattr(
"gateway.platforms.telegram.cache_image_from_bytes",
lambda _data, ext=".jpg": str(cached_path),
)
update = SimpleNamespace(
update_id=3003,
message=_group_photo_message(),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Veja esta foto" in message["content"]
assert "Observed Telegram image" in message["content"]
assert str(cached_path) in message["content"]
assert store.sources[0].user_id is None
asyncio.run(_run())
def test_unmentioned_video_too_large_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
adapter._max_doc_bytes = 100
store = _FakeSessionStore()
adapter._session_store = store
cache_video = Mock(return_value="/tmp/observed.mp4")
monkeypatch.setattr("gateway.platforms.telegram.cache_video_from_bytes", cache_video)
message_obj = _group_video_message(file_size=101)
update = SimpleNamespace(
update_id=3004,
message=message_obj,
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
cache_video.assert_not_called()
message_obj.video.get_file.assert_not_called()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Veja este video" in message["content"]
assert "Observed Telegram video was too large" in message["content"]
assert "/tmp/observed.mp4" not in message["content"]
asyncio.run(_run())
def test_unmentioned_document_message_observed_with_cached_path(monkeypatch, tmp_path):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cached_path = tmp_path / "doc_abc_RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf"
monkeypatch.setattr(
"gateway.platforms.telegram.cache_document_from_bytes",
lambda _data, _filename: str(cached_path),
)
update = SimpleNamespace(
update_id=3003,
message=_group_document_message(),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert "RESULTADO BIOLOGICO - PROTOCOLO 103- URBAN.pdf" in message["content"]
assert str(cached_path) in message["content"]
assert store.sources[0].user_id is None
asyncio.run(_run())
def test_unmentioned_large_document_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
adapter._max_doc_bytes = 100
store = _FakeSessionStore()
adapter._session_store = store
cache_document = Mock(return_value="/tmp/huge.pdf")
monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document)
document = SimpleNamespace(
file_name="huge.pdf",
mime_type="application/pdf",
file_size=101,
get_file=AsyncMock(),
)
update = SimpleNamespace(
update_id=3005,
message=_group_document_message(document=document),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
cache_document.assert_not_called()
document.get_file.assert_not_called()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert "Observed Telegram document was too large" in message["content"]
assert "/tmp/huge.pdf" not in message["content"]
asyncio.run(_run())
def test_unmentioned_unsupported_document_observed_without_download(monkeypatch):
async def _run():
adapter = _make_adapter(
require_mention=True,
allowed_chats=["-100"],
group_allowed_chats=["-100"],
observe_unmentioned_group_messages=True,
)
store = _FakeSessionStore()
adapter._session_store = store
cache_document = Mock(return_value="/tmp/malware.exe")
monkeypatch.setattr("gateway.platforms.telegram.cache_document_from_bytes", cache_document)
document = SimpleNamespace(
file_name="malware.exe",
mime_type="application/x-msdownload",
file_size=100,
get_file=AsyncMock(),
)
update = SimpleNamespace(
update_id=3006,
message=_group_document_message(document=document),
effective_message=None,
)
await adapter._handle_media_message(update, SimpleNamespace())
adapter._message_handler.assert_not_awaited()
cache_document.assert_not_called()
document.get_file.assert_not_called()
assert len(store.messages) == 1
_, message, _ = store.messages[0]
assert message["observed"] is True
assert "Este arquivo" in message["content"]
assert "Unsupported document type '.exe'" in message["content"]
assert "/tmp/malware.exe" not in message["content"]
asyncio.run(_run())
def test_triggered_voice_message_uses_shared_session_in_observe_mode():
async def _run():
adapter = _make_adapter(