fix(cron): deliver MEDIA files as native platform attachments
The cron delivery path sent raw 'MEDIA:/path/to/file' text instead of uploading the file as a native attachment. The standalone path (via _send_to_platform) already extracted MEDIA tags and forwarded them as media_files, but the live adapter path passed the unprocessed delivery_content directly to adapter.send(). Two bugs fixed: 1. Live adapter path now sends cleaned text (MEDIA tags stripped) instead of raw content — prevents 'MEDIA:/path' from appearing as literal text in Discord/Telegram/etc. 2. Live adapter path now sends each extracted media file via the adapter's native method (send_voice for audio, send_image_file for images, send_video for video, send_document as fallback) — files are uploaded as proper platform attachments. The file-type routing mirrors BasePlatformAdapter._process_message_background to ensure consistent behavior between normal gateway responses and cron-delivered responses. Adds 2 tests: - test_live_adapter_sends_media_as_attachments: verifies Discord adapter receives send_voice call for .mp3 file - test_live_adapter_sends_cleaned_text_not_raw: verifies MEDIA tag stripped from text sent via live adapter
This commit is contained in:
@ -158,6 +158,44 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
||||
}
|
||||
|
||||
|
||||
# Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background
|
||||
_AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'})
|
||||
_VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'})
|
||||
_IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'})
|
||||
|
||||
|
||||
def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: dict | None, loop, job: dict) -> None:
|
||||
"""Send extracted MEDIA files as native platform attachments via a live adapter.
|
||||
|
||||
Routes each file to the appropriate adapter method (send_voice, send_image_file,
|
||||
send_video, send_document) based on file extension — mirroring the routing logic
|
||||
in ``BasePlatformAdapter._process_message_background``.
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
for media_path, _is_voice in media_files:
|
||||
try:
|
||||
ext = Path(media_path).suffix.lower()
|
||||
if ext in _AUDIO_EXTS:
|
||||
coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
|
||||
elif ext in _VIDEO_EXTS:
|
||||
coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata)
|
||||
elif ext in _IMAGE_EXTS:
|
||||
coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata)
|
||||
else:
|
||||
coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)
|
||||
|
||||
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
||||
result = future.result(timeout=30)
|
||||
if result and not getattr(result, "success", True):
|
||||
logger.warning(
|
||||
"Job '%s': media send failed for %s: %s",
|
||||
job.get("id", "?"), media_path, getattr(result, "error", "unknown"),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
|
||||
|
||||
|
||||
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
||||
"""
|
||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||
@ -246,18 +284,28 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
send_metadata = {"thread_id": thread_id} if thread_id else None
|
||||
try:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata),
|
||||
loop,
|
||||
)
|
||||
send_result = future.result(timeout=60)
|
||||
if send_result and not getattr(send_result, "success", True):
|
||||
err = getattr(send_result, "error", "unknown")
|
||||
logger.warning(
|
||||
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
||||
job["id"], platform_name, chat_id, err,
|
||||
# Send cleaned text (MEDIA tags stripped) — not the raw content
|
||||
text_to_send = cleaned_delivery_content.strip()
|
||||
adapter_ok = True
|
||||
if text_to_send:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
|
||||
loop,
|
||||
)
|
||||
else:
|
||||
send_result = future.result(timeout=60)
|
||||
if send_result and not getattr(send_result, "success", True):
|
||||
err = getattr(send_result, "error", "unknown")
|
||||
logger.warning(
|
||||
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
||||
job["id"], platform_name, chat_id, err,
|
||||
)
|
||||
adapter_ok = False # fall through to standalone path
|
||||
|
||||
# Send extracted media files as native attachments via the live adapter
|
||||
if adapter_ok and media_files:
|
||||
_send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job)
|
||||
|
||||
if adapter_ok:
|
||||
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
||||
return
|
||||
except Exception as e:
|
||||
|
||||
@ -277,6 +277,188 @@ class TestDeliverResultWrapping:
|
||||
# Media files should be forwarded separately
|
||||
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
|
||||
|
||||
def test_live_adapter_sends_media_as_attachments(self):
|
||||
"""When a live adapter is available, MEDIA files should be sent as native
|
||||
platform attachments (e.g., Discord voice, Telegram audio) rather than
|
||||
as literal 'MEDIA:/path' text."""
|
||||
from gateway.config import Platform
|
||||
from concurrent.futures import Future
|
||||
|
||||
adapter = AsyncMock()
|
||||
adapter.send.return_value = MagicMock(success=True)
|
||||
adapter.send_voice.return_value = MagicMock(success=True)
|
||||
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.DISCORD: pconfig}
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
|
||||
def fake_run_coro(coro, _loop):
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
return future
|
||||
|
||||
job = {
|
||||
"id": "tts-job",
|
||||
"deliver": "origin",
|
||||
"origin": {"platform": "discord", "chat_id": "9876"},
|
||||
}
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
_deliver_result(
|
||||
job,
|
||||
"Here is TTS\nMEDIA:/tmp/cron-voice.mp3",
|
||||
adapters={Platform.DISCORD: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
# Text should be sent without the MEDIA tag
|
||||
adapter.send.assert_called_once()
|
||||
text_sent = adapter.send.call_args[0][1]
|
||||
assert "MEDIA:" not in text_sent
|
||||
assert "Here is TTS" in text_sent
|
||||
|
||||
# Audio file should be sent as a voice attachment
|
||||
adapter.send_voice.assert_called_once()
|
||||
voice_call = adapter.send_voice.call_args
|
||||
assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3"
|
||||
|
||||
def test_live_adapter_routes_image_to_send_image_file(self):
|
||||
"""Image MEDIA files should be routed to send_image_file, not send_voice."""
|
||||
from gateway.config import Platform
|
||||
from concurrent.futures import Future
|
||||
|
||||
adapter = AsyncMock()
|
||||
adapter.send.return_value = MagicMock(success=True)
|
||||
adapter.send_image_file.return_value = MagicMock(success=True)
|
||||
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.DISCORD: pconfig}
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
return future
|
||||
|
||||
job = {
|
||||
"id": "img-job",
|
||||
"deliver": "origin",
|
||||
"origin": {"platform": "discord", "chat_id": "1234"},
|
||||
}
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
_deliver_result(
|
||||
job,
|
||||
"Chart attached\nMEDIA:/tmp/chart.png",
|
||||
adapters={Platform.DISCORD: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
adapter.send_image_file.assert_called_once()
|
||||
assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png"
|
||||
adapter.send_voice.assert_not_called()
|
||||
|
||||
def test_live_adapter_media_only_no_text(self):
|
||||
"""When content is ONLY a MEDIA tag with no text, media should still be sent."""
|
||||
from gateway.config import Platform
|
||||
from concurrent.futures import Future
|
||||
|
||||
adapter = AsyncMock()
|
||||
adapter.send_voice.return_value = MagicMock(success=True)
|
||||
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
return future
|
||||
|
||||
job = {
|
||||
"id": "voice-only",
|
||||
"deliver": "origin",
|
||||
"origin": {"platform": "telegram", "chat_id": "999"},
|
||||
}
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
_deliver_result(
|
||||
job,
|
||||
"MEDIA:/tmp/voice.ogg",
|
||||
adapters={Platform.TELEGRAM: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
# Text send should NOT be called (no text after stripping MEDIA tag)
|
||||
adapter.send.assert_not_called()
|
||||
# Audio should still be delivered
|
||||
adapter.send_voice.assert_called_once()
|
||||
|
||||
def test_live_adapter_sends_cleaned_text_not_raw(self):
|
||||
"""The live adapter path must send cleaned text (MEDIA tags stripped),
|
||||
not the raw delivery_content with embedded MEDIA: tags."""
|
||||
from gateway.config import Platform
|
||||
from concurrent.futures import Future
|
||||
|
||||
adapter = AsyncMock()
|
||||
adapter.send.return_value = MagicMock(success=True)
|
||||
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
return future
|
||||
|
||||
job = {
|
||||
"id": "img-job",
|
||||
"deliver": "origin",
|
||||
"origin": {"platform": "telegram", "chat_id": "555"},
|
||||
}
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
_deliver_result(
|
||||
job,
|
||||
"Report\nMEDIA:/tmp/chart.png",
|
||||
adapters={Platform.TELEGRAM: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
text_sent = adapter.send.call_args[0][1]
|
||||
assert "MEDIA:" not in text_sent
|
||||
assert "Report" in text_sent
|
||||
|
||||
def test_no_mirror_to_session_call(self):
|
||||
"""Cron deliveries should NOT mirror into the gateway session."""
|
||||
from gateway.config import Platform
|
||||
|
||||
Reference in New Issue
Block a user