diff --git a/gateway/config.py b/gateway/config.py index 62818b076..a1b61fed5 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -361,10 +361,17 @@ class StreamingConfig: # fall back to edit-based when not. # "draft" — explicitly request native drafts; falls back to edit when # the platform/chat doesn't support them. - # "edit" — progressive editMessageText only (legacy/default - # behaviour). + # "edit" — progressive editMessageText only (legacy behaviour). # "off" — disable streaming entirely. - transport: str = "edit" + # + # Default is "auto": prefer native draft streaming on platforms that + # support it (Telegram DMs via sendMessageDraft, Bot API 9.5+) and fall + # back to edit-based streaming everywhere else. This is safe as a global + # default because adapters without draft support (Discord, Slack, Matrix, + # …) report supports_draft_streaming() == False and transparently use the + # edit path — so "auto" never regresses non-Telegram platforms, it only + # upgrades the chats that can render the smoother native preview. + transport: str = "auto" edit_interval: float = DEFAULT_STREAMING_EDIT_INTERVAL buffer_threshold: int = DEFAULT_STREAMING_BUFFER_THRESHOLD cursor: str = DEFAULT_STREAMING_CURSOR @@ -393,7 +400,7 @@ class StreamingConfig: return cls() return cls( enabled=_coerce_bool(data.get("enabled"), False), - transport=data.get("transport", "edit"), + transport=data.get("transport", "auto"), edit_interval=_coerce_float( data.get("edit_interval"), DEFAULT_STREAMING_EDIT_INTERVAL, ), diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index f868dbf28..89806a739 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1916,6 +1916,84 @@ class BasePlatformAdapter(ABC): f"{type(self).__name__} does not implement send_draft" ) + # ── Structured stream-event rendering ──────────────────────────────── + # + # These methods let an adapter decide *how* to present each structured + # streaming event (see gateway/stream_events.py). The default + # implementations reproduce the historical behavior exactly: assistant + # text/commentary/segment events delegate to the stream consumer, and + # tool events render the same "emoji tool_name: preview" chrome the + # gateway has always produced. Adapters override these to be more native + # to their platform (e.g. Telegram streaming a MarkdownV2 ```bash``` block + # as a draft; iMessage eating tool chrome it cannot format). + # + # The contract is presentation-only: nothing rendered here is persisted to + # conversation history. History is owned by the agent; what an adapter + # chooses to "eat" must never change the bytes the agent stored. + + def render_message_event(self, event: Any, sink: Any) -> None: + """Render a MessageChunk / MessageStop / Commentary onto the sink. + + Default: map onto the stream consumer's existing primitives, preserving + today's behavior 1:1. ``sink`` is a GatewayStreamConsumer. + """ + from gateway.stream_events import MessageChunk, MessageStop, Commentary + + if isinstance(event, MessageChunk): + if event.text: + sink.on_delta(event.text) + elif isinstance(event, MessageStop): + # An intermediate stop (text → tool → text) is a segment break; + # the terminal stop is signalled by the gateway via finish(), + # not here, so we only break segments on non-final stops. + if not event.final: + sink.on_segment_break() + elif isinstance(event, Commentary): + if event.text: + sink.on_commentary(event.text) + + def format_tool_event(self, event: Any, *, mode: str = "all", + preview_max_len: int = 40) -> Optional[str]: + """Return the rendered chrome for a ToolCallChunk, or None to eat it. + + Reproduces the gateway's historical tool-progress formatting: an emoji + for the tool, the tool name, and a short argument preview (or the full + args dict in ``verbose`` mode). Adapters that cannot render tool chrome + (no message editing, plain-text only) should override to return None so + the event is dropped rather than spamming separate bubbles. + + ``mode`` is the resolved tool-progress mode ("all" / "new" / "verbose"); + ``preview_max_len`` mirrors the ``tool_preview_length`` config (0 means + "no cap" in verbose mode). + """ + from gateway.stream_events import ToolCallChunk + if not isinstance(event, ToolCallChunk): + return None + + from agent.display import get_tool_emoji + emoji = get_tool_emoji(event.tool_name, default="⚙️") + + if mode == "verbose": + if event.args: + import json + args_str = json.dumps(event.args, ensure_ascii=False, default=str) + if preview_max_len > 0 and len(args_str) > preview_max_len: + args_str = args_str[:preview_max_len - 3] + "..." + return f"{emoji} {event.tool_name}({list(event.args.keys())})\n{args_str}" + if event.preview: + return f"{emoji} {event.tool_name}: \"{event.preview}\"" + return f"{emoji} {event.tool_name}..." + + # "all" / "new": short preview, capped (default 40 to keep gateway + # progress bubbles compact — they persist as permanent messages). + preview = event.preview + if preview: + cap = preview_max_len if preview_max_len > 0 else 40 + if len(preview) > cap: + preview = preview[:cap - 3] + "..." + return f"{emoji} {event.tool_name}: \"{preview}\"" + return f"{emoji} {event.tool_name}..." + @property def has_fatal_error(self) -> bool: return self._fatal_error_message is not None diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 23f699f48..fa55f2db0 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -2521,31 +2521,55 @@ class TelegramAdapter(BasePlatformAdapter): text = content if len(content) <= self.MAX_MESSAGE_LENGTH else \ self.truncate_message(content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len)[0] - kwargs: Dict[str, Any] = { - "chat_id": int(chat_id), - "draft_id": int(draft_id), - "text": text, - } thread_id = self._metadata_thread_id(metadata) - if thread_id is not None: - kwargs["message_thread_id"] = thread_id - try: - ok = await self._bot.send_message_draft(**kwargs) - if ok: - # Drafts have no message_id; we report success without one - # so the caller knows the animation frame landed. - return SendResult(success=True, message_id=None) - return SendResult(success=False, error="draft_rejected") - except Exception as e: - # Most likely: BadRequest because this bot/chat doesn't allow - # drafts, or a transient server hiccup. The caller treats any - # failure as "fall back to edit-based for this response". - logger.debug( - "[%s] sendMessageDraft failed (chat=%s draft_id=%s): %s", - self.name, chat_id, draft_id, e, - ) - return SendResult(success=False, error=str(e)) + # Apply the same MarkdownV2 conversion the regular ``send`` path uses + # so the animated draft preview renders with identical formatting to + # the final message. Without this, the draft streams as raw text and + # the final ``sendMessage`` (which DOES use MarkdownV2) snaps into + # formatted output, producing a jarring visual shift at the end of the + # response. We try MarkdownV2 first and fall back to plain text if a + # malformed escape would be rejected — mirroring the (True, False) + # retry the streaming send loop uses — so a single bad token never + # kills draft streaming for the whole response. + for use_markdown in (True, False): + kwargs: Dict[str, Any] = { + "chat_id": int(chat_id), + "draft_id": int(draft_id), + "text": self.format_message(text) if use_markdown else text, + } + if use_markdown: + kwargs["parse_mode"] = ParseMode.MARKDOWN_V2 + if thread_id is not None: + kwargs["message_thread_id"] = thread_id + + try: + ok = await self._bot.send_message_draft(**kwargs) + if ok: + # Drafts have no message_id; we report success without one + # so the caller knows the animation frame landed. + return SendResult(success=True, message_id=None) + return SendResult(success=False, error="draft_rejected") + except Exception as e: + # A MarkdownV2 parse failure (BadRequest "can't parse entities") + # is recoverable: retry once as plain text. Any other failure + # (chat doesn't allow drafts, transient hiccup) — or a failure + # on the plain-text attempt — propagates to the caller, which + # treats it as "fall back to edit-based for this response". + if use_markdown and self._is_bad_request_error(e): + logger.debug( + "[%s] sendMessageDraft MarkdownV2 rejected, retrying " + "as plain text (chat=%s draft_id=%s): %s", + self.name, chat_id, draft_id, e, + ) + continue + logger.debug( + "[%s] sendMessageDraft failed (chat=%s draft_id=%s): %s", + self.name, chat_id, draft_id, e, + ) + return SendResult(success=False, error=str(e)) + + return SendResult(success=False, error="draft_rejected") async def _send_message_with_thread_fallback(self, **kwargs): """Send a Telegram message, retrying once without message_thread_id diff --git a/gateway/stream_dispatch.py b/gateway/stream_dispatch.py new file mode 100644 index 000000000..94587149b --- /dev/null +++ b/gateway/stream_dispatch.py @@ -0,0 +1,132 @@ +"""Adapter-driven dispatch of structured stream events to a delivery sink. + +``GatewayEventDispatcher`` is the seam Tobi asked for: the agent emits typed +events (gateway/stream_events.py), and the *adapter* decides how each one is +delivered. The dispatcher holds an adapter + the stream consumer (sink) + the +resolved per-channel presentation settings (tool-progress mode, preview length) +and routes each event through the adapter's render hooks. + +Message/commentary/segment events flow into the consumer (native draft on +Telegram DMs, edit-in-place elsewhere). Tool events are formatted by the +adapter — which may return None to *eat* the event on platforms that can't +render tool chrome — and the rendered line is enqueued onto the same tool +progress queue the gateway already drains, so the two no longer race through +independent code paths. + +This module deliberately has no platform knowledge and no asyncio: it is a thin +synchronous router callable from the agent's worker thread, exactly like the +callbacks it replaces. +""" + +from __future__ import annotations + +import logging +from typing import Any, Callable, Optional + +from gateway.stream_events import ( + Commentary, + GatewayNotice, + LongToolHint, + MessageChunk, + MessageStop, + StreamEvent, + ToolCallChunk, + ToolCallFinished, +) + +logger = logging.getLogger("gateway.stream_events") + + +class GatewayEventDispatcher: + """Route typed stream events through an adapter onto a delivery sink. + + Parameters + ---------- + adapter: + The platform adapter. Provides ``render_message_event`` and + ``format_tool_event`` (BasePlatformAdapter defaults reproduce today's + behavior; adapters may override for native rendering). + sink: + The GatewayStreamConsumer for assistant-text delivery. May be None + when streaming is disabled, in which case message events are dropped + (the final response still goes out via the normal send path). + enqueue_tool_line: + Callback that places a rendered tool-progress line onto the gateway's + progress queue (the same queue ``send_progress_messages`` drains). May + be None when tool progress is disabled for this channel. + tool_mode: + Resolved tool-progress mode for this channel ("all" / "new" / "verbose" + / "off"). + preview_max_len: + Resolved ``tool_preview_length`` (0 = no cap in verbose mode). + on_long_tool / on_notice: + Optional hooks for LongToolHint / GatewayNotice events, letting the + gateway own the "should I surface this here?" decision. + """ + + def __init__( + self, + adapter: Any, + sink: Any = None, + *, + enqueue_tool_line: Optional[Callable[[Any], None]] = None, + tool_mode: str = "all", + preview_max_len: int = 40, + on_long_tool: Optional[Callable[[LongToolHint], None]] = None, + on_notice: Optional[Callable[[GatewayNotice], None]] = None, + ) -> None: + self.adapter = adapter + self.sink = sink + self._enqueue_tool_line = enqueue_tool_line + self.tool_mode = tool_mode or "all" + self.preview_max_len = preview_max_len + self._on_long_tool = on_long_tool + self._on_notice = on_notice + # "new" mode dedup — only report when the tool changes. + self._last_tool: Optional[str] = None + + def dispatch(self, event: StreamEvent) -> None: + """Route a single event. Never raises into the agent's worker thread.""" + try: + self._dispatch(event) + except Exception: # presentation must never break the agent loop + logger.debug("stream-event dispatch error", exc_info=True) + + def _dispatch(self, event: StreamEvent) -> None: + if isinstance(event, (MessageChunk, MessageStop, Commentary)): + if self.sink is not None: + self.adapter.render_message_event(event, self.sink) + return + + if isinstance(event, ToolCallChunk): + if self.tool_mode == "off" or self._enqueue_tool_line is None: + return + # "new" mode: only emit when the tool changes. + if self.tool_mode == "new" and event.tool_name == self._last_tool: + return + self._last_tool = event.tool_name + line = self.adapter.format_tool_event( + event, mode=self.tool_mode, preview_max_len=self.preview_max_len, + ) + # None == adapter chose to eat this event (can't render tool chrome). + if line: + self._enqueue_tool_line(line) + return + + if isinstance(event, ToolCallFinished): + # Default: no chrome on completion (matches today — the gateway only + # rendered "started" events). Completion drives onboarding hints. + return + + if isinstance(event, LongToolHint): + if self._on_long_tool is not None: + self._on_long_tool(event) + return + + if isinstance(event, GatewayNotice): + if self._on_notice is not None: + self._on_notice(event) + return + + +__all__ = ["GatewayEventDispatcher"] diff --git a/gateway/stream_events.py b/gateway/stream_events.py new file mode 100644 index 000000000..206d2d787 --- /dev/null +++ b/gateway/stream_events.py @@ -0,0 +1,171 @@ +"""Structured streaming events — the agent→gateway delivery contract. + +Historically the agent drove gateway delivery through a fan of loosely-typed +callbacks (``stream_delta_callback(text)``, ``tool_progress_callback(event_type, +tool_name, preview, args)``, ``interim_assistant_callback(text)`` …) and each +gateway callback decided *both* what to render and how to send it. That +coupling is why tool-progress bubbles and the streaming draft raced each other +on Telegram, and why tool-call formatting lived agent-side even though only the +gateway knows what a given platform can render. + +This module defines a small, typed event vocabulary that names *what happened* +without prescribing *how it is delivered*. The gateway's stream consumer +(``GatewayStreamConsumer``) is the single sink; the platform adapter decides how +to render each event (Telegram can stream a MarkdownV2 ```bash``` block as a +native draft; iMessage has no rich formatting and may collapse or drop tool +chrome). Separation of concerns: smart agent emits structured data, smart +gateway decides delivery. + +These are intentionally plain frozen dataclasses — no behavior, no platform +knowledge, no I/O. They are cheap to construct on the agent's worker thread and +safe to hand across the thread/async boundary into the consumer queue. + +Design constraints (see hermes-agent-dev skill — message-flow + cache +invariants): + * Events describe *transport*, never *context*. Nothing here is persisted to + conversation history; what the gateway chooses to "eat" (e.g. tool chrome on + a platform that can't render it) must never diverge from the bytes stored in + the agent's message history. History is owned by the agent; these events are + a presentation-layer stream only. + * Backward compatible by construction. The gateway adapts its existing + callbacks into these events at the boundary; adapters that don't opt into + event-native rendering get identical behavior via the base-class default. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, Union + + +# ── Message (assistant text) events ────────────────────────────────────────── + +@dataclass(frozen=True) +class MessageChunk: + """A delta of streamed assistant text. + + ``text`` is the incremental content as it arrives from the model. The + consumer accumulates chunks and progressively renders them (native draft on + Telegram DMs, edit-in-place elsewhere). Reasoning/think-block content is + filtered upstream and never arrives as a MessageChunk. + """ + text: str + + +@dataclass(frozen=True) +class MessageStop: + """The current assistant message segment is complete. + + Emitted when a contiguous run of assistant text ends — either the whole + response finished, or a tool boundary interrupts the text so the next + segment should render as a fresh message *below* any tool chrome. + + ``final`` is True only for the terminal stop of the whole turn; an + intermediate stop (text → tool call → more text) carries ``final=False`` so + the consumer finalizes the current bubble and prepares a new segment without + treating the turn as done. + """ + final: bool = False + + +@dataclass(frozen=True) +class Commentary: + """A complete interim assistant message emitted between tool iterations. + + Example: the model says "I'll inspect the repo first." before issuing a tool + call. Unlike a MessageChunk this is already-complete text (not a delta); the + consumer renders it as its own message so it reads as a distinct beat. + """ + text: str + + +# ── Tool-call events ───────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class ToolCallChunk: + """A tool invocation has started (or its in-progress state changed). + + Carries the raw facts about the call — name, a short argument ``preview``, + and the full ``args`` dict — and lets the *gateway* decide presentation + (emoji, truncation, verbose vs compact, or eat it entirely on platforms that + don't show tool chrome). Previously the agent's gateway callback baked the + emoji + preview formatting in; that decision now belongs to the adapter. + """ + tool_name: str + preview: Optional[str] = None + args: Optional[Dict[str, Any]] = None + # Monotonic per-turn index, so the consumer can correlate a finish with its + # start and so "new"-mode dedup (only report when the tool changes) works + # without the consumer tracking call order itself. + index: int = 0 + + +@dataclass(frozen=True) +class ToolCallFinished: + """A tool invocation completed. + + ``duration`` is wall-clock seconds. ``ok`` reflects whether the tool + returned without raising. The gateway uses this to clear/settle a progress + bubble and to drive one-time onboarding hints (e.g. suggest /verbose after a + long tool run). No tool *output* travels here — output is the agent's + concern and is persisted to history, not streamed as presentation. + """ + tool_name: str + duration: float = 0.0 + ok: bool = True + index: int = 0 + + +# ── Gateway control / lifecycle events ─────────────────────────────────────── + +@dataclass(frozen=True) +class LongToolHint: + """One-shot onboarding nudge when a tool runs longer than the threshold. + + The gateway gates this on platform capability (the /verbose command must be + usable) and on the user not having seen the hint before. Modeled as an + event so the *gateway* owns the "should I surface this here?" decision rather + than the agent. + """ + tool_name: str = "" + duration: float = 0.0 + + +@dataclass(frozen=True) +class GatewayNotice: + """A gateway-originated control message (restart, online, long-run notice). + + ``kind`` is a stable string the adapter can switch on + (``"restart"`` / ``"online"`` / ``"long_run"`` / …). ``text`` is the + human-readable default the base class renders when an adapter has no + platform-specific treatment. + """ + kind: str + text: str = "" + extra: Dict[str, Any] = field(default_factory=dict) + + +# Union of every event the consumer's dispatcher accepts. Kept explicit (rather +# than a marker base class) so a missing ``case`` in an exhaustive match is a +# visible type error rather than a silent fall-through. +StreamEvent = Union[ + MessageChunk, + MessageStop, + Commentary, + ToolCallChunk, + ToolCallFinished, + LongToolHint, + GatewayNotice, +] + + +__all__ = [ + "MessageChunk", + "MessageStop", + "Commentary", + "ToolCallChunk", + "ToolCallFinished", + "LongToolHint", + "GatewayNotice", + "StreamEvent", +] diff --git a/tests/gateway/test_config.py b/tests/gateway/test_config.py index da970eccf..9950f967f 100644 --- a/tests/gateway/test_config.py +++ b/tests/gateway/test_config.py @@ -164,9 +164,12 @@ class TestSessionResetPolicy: class TestStreamingConfig: - def test_defaults_to_edit_transport(self): + def test_defaults_to_auto_transport(self): + # "auto" prefers native draft streaming where the platform supports + # it (Telegram DMs) and falls back to edit-based everywhere else, so + # it is safe as the global out-of-the-box default. restored = StreamingConfig.from_dict({"enabled": "true"}) - assert restored.transport == "edit" + assert restored.transport == "auto" def test_from_dict_coerces_quoted_false_enabled(self): restored = StreamingConfig.from_dict({"enabled": "false"}) diff --git a/tests/gateway/test_stream_events.py b/tests/gateway/test_stream_events.py new file mode 100644 index 000000000..7ba0d79c4 --- /dev/null +++ b/tests/gateway/test_stream_events.py @@ -0,0 +1,182 @@ +"""Structured stream-event protocol + dispatcher behavior. + +Covers the agent→gateway delivery contract introduced to decouple *what +happened* (typed events) from *how it's delivered* (adapter decides). The +default BasePlatformAdapter rendering must reproduce today's behavior exactly; +an adapter may override format_tool_event to eat tool chrome on platforms that +can't render it. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +from gateway.stream_dispatch import GatewayEventDispatcher +from gateway.stream_events import ( + Commentary, + GatewayNotice, + LongToolHint, + MessageChunk, + MessageStop, + ToolCallChunk, + ToolCallFinished, +) + + +def _base_adapter(): + """A real BasePlatformAdapter instance (abstractmethods cleared) so we + exercise the genuine default render hooks, not a mock.""" + from gateway.platforms.base import BasePlatformAdapter + + Concrete = type("Concrete", (BasePlatformAdapter,), {}) + Concrete.__abstractmethods__ = frozenset() + return Concrete.__new__(Concrete) + + +class _FakeSink: + def __init__(self): + self.deltas = [] + self.commentary = [] + self.segment_breaks = 0 + + def on_delta(self, text): + self.deltas.append(text) + + def on_commentary(self, text): + self.commentary.append(text) + + def on_segment_break(self): + self.segment_breaks += 1 + + +# ── Message events → sink ──────────────────────────────────────────────────── + +def test_message_chunk_flows_to_sink_on_delta(): + sink = _FakeSink() + d = GatewayEventDispatcher(_base_adapter(), sink) + d.dispatch(MessageChunk("hello ")) + d.dispatch(MessageChunk("world")) + assert sink.deltas == ["hello ", "world"] + + +def test_intermediate_message_stop_breaks_segment_but_final_does_not(): + sink = _FakeSink() + d = GatewayEventDispatcher(_base_adapter(), sink) + d.dispatch(MessageStop(final=False)) + d.dispatch(MessageStop(final=True)) + assert sink.segment_breaks == 1 # only the non-final stop breaks + + +def test_commentary_flows_to_sink(): + sink = _FakeSink() + d = GatewayEventDispatcher(_base_adapter(), sink) + d.dispatch(Commentary("I'll inspect the repo first.")) + assert sink.commentary == ["I'll inspect the repo first."] + + +def test_message_events_dropped_when_no_sink(): + # streaming disabled → no sink → message events are no-ops, no crash. + d = GatewayEventDispatcher(_base_adapter(), sink=None) + d.dispatch(MessageChunk("x")) # must not raise + + +# ── Tool events → progress queue, formatted by adapter ─────────────────────── + +def test_tool_call_chunk_renders_default_chrome(): + lines = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), + enqueue_tool_line=lines.append, tool_mode="all", + ) + d.dispatch(ToolCallChunk(tool_name="terminal", preview="ls -la")) + assert len(lines) == 1 + assert "terminal" in lines[0] + assert "ls -la" in lines[0] + + +def test_tool_preview_truncated_to_cap(): + lines = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), + enqueue_tool_line=lines.append, tool_mode="all", preview_max_len=10, + ) + d.dispatch(ToolCallChunk(tool_name="x", preview="0123456789ABCDEF")) + # capped at 10 → 7 chars + "..." (then wrapped in quotes by the renderer) + assert '"0123456..."' in lines[0] + assert "89ABCDEF" not in lines[0] + + +def test_new_mode_dedups_same_tool(): + lines = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), + enqueue_tool_line=lines.append, tool_mode="new", + ) + d.dispatch(ToolCallChunk(tool_name="terminal", preview="a")) + d.dispatch(ToolCallChunk(tool_name="terminal", preview="b")) # deduped + d.dispatch(ToolCallChunk(tool_name="read_file", preview="c")) + assert len(lines) == 2 # terminal once, read_file once + + +def test_off_mode_emits_nothing(): + lines = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), + enqueue_tool_line=lines.append, tool_mode="off", + ) + d.dispatch(ToolCallChunk(tool_name="terminal", preview="ls")) + assert lines == [] + + +def test_adapter_can_eat_tool_chrome(): + """An adapter that returns None from format_tool_event drops the event — + the 'iMessage can't render tool chrome' case.""" + adapter = _base_adapter() + adapter.format_tool_event = lambda event, **kw: None # eat everything + lines = [] + d = GatewayEventDispatcher( + adapter, _FakeSink(), enqueue_tool_line=lines.append, tool_mode="all", + ) + d.dispatch(ToolCallChunk(tool_name="terminal", preview="ls")) + assert lines == [] # eaten + + +def test_tool_finished_emits_no_chrome(): + lines = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), + enqueue_tool_line=lines.append, tool_mode="all", + ) + d.dispatch(ToolCallFinished(tool_name="terminal", duration=2.0, ok=True)) + assert lines == [] + + +# ── Control events → gateway-owned hooks ───────────────────────────────────── + +def test_long_tool_hint_routes_to_hook(): + seen = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), on_long_tool=seen.append, + ) + d.dispatch(LongToolHint(tool_name="terminal", duration=45.0)) + assert len(seen) == 1 + assert seen[0].tool_name == "terminal" + + +def test_gateway_notice_routes_to_hook(): + seen = [] + d = GatewayEventDispatcher( + _base_adapter(), _FakeSink(), on_notice=seen.append, + ) + d.dispatch(GatewayNotice(kind="restart", text="Gateway restarted")) + assert seen[0].kind == "restart" + + +def test_dispatch_swallows_render_errors(): + """A render error must never propagate into the agent worker thread.""" + adapter = _base_adapter() + def _boom(event, sink): + raise RuntimeError("render blew up") + adapter.render_message_event = _boom + d = GatewayEventDispatcher(adapter, _FakeSink()) + d.dispatch(MessageChunk("x")) # must not raise diff --git a/tests/gateway/test_telegram_send_draft_format.py b/tests/gateway/test_telegram_send_draft_format.py new file mode 100644 index 000000000..a84a42852 --- /dev/null +++ b/tests/gateway/test_telegram_send_draft_format.py @@ -0,0 +1,114 @@ +"""TelegramAdapter.send_draft MarkdownV2 formatting parity. + +Bot API 9.5 ``sendMessageDraft`` powers the animated streaming preview in +DMs. The regular ``send`` path renders with MarkdownV2, so the draft must +too — otherwise the live preview streams as raw text and the final +``sendMessage`` snaps into formatted output, producing a jarring visual +shift at the end of the response (reported by an external user, May 2026). + +These tests pin: + 1. The happy path passes ``parse_mode=MARKDOWN_V2`` with format_message'd + text (formatting parity with the final message). + 2. A MarkdownV2 BadRequest triggers a single plain-text retry rather than + killing draft streaming for the whole response. + 3. A non-BadRequest failure propagates so the caller falls back to edit. +""" +import sys +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import PlatformConfig + + +def _ensure_telegram_mock(): + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + return + mod = MagicMock() + mod.error.NetworkError = type("NetworkError", (OSError,), {}) + mod.error.TimedOut = type("TimedOut", (OSError,), {}) + mod.error.BadRequest = type("BadRequest", (Exception,), {}) + for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"): + sys.modules.setdefault(name, mod) + sys.modules.setdefault("telegram.error", mod.error) + + +_ensure_telegram_mock() + +from gateway.platforms import telegram as tg_mod # noqa: E402 +from gateway.platforms.telegram import TelegramAdapter # noqa: E402 + + +def _make_adapter() -> TelegramAdapter: + adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***")) + adapter._bot = MagicMock() + adapter._bot.send_message_draft = AsyncMock(return_value=True) + return adapter + + +@pytest.mark.asyncio +async def test_send_draft_passes_markdownv2_parse_mode(): + """Happy path: draft is sent with parse_mode set and format_message'd text.""" + adapter = _make_adapter() + # Make format_message observable and deterministic. + adapter.format_message = lambda c: f"FMT::{c}" + + result = await adapter.send_draft("123", 7, "**bold** body") + + assert result.success is True + adapter._bot.send_message_draft.assert_awaited_once() + kwargs = adapter._bot.send_message_draft.await_args.kwargs + assert kwargs["text"] == "FMT::**bold** body" + assert kwargs["parse_mode"] is tg_mod.ParseMode.MARKDOWN_V2 + assert kwargs["chat_id"] == 123 + assert kwargs["draft_id"] == 7 + + +@pytest.mark.asyncio +async def test_send_draft_falls_back_to_plain_text_on_markdownv2_error(): + """A MarkdownV2 BadRequest retries once as plain text (no parse_mode), + instead of aborting draft streaming for the whole response.""" + adapter = _make_adapter() + adapter.format_message = lambda content: f"FMT::{content}" + + # Resolve the BadRequest type the adapter checks via _is_bad_request_error. + from telegram.error import BadRequest # type: ignore + calls = [] + + async def _draft(**kwargs): + calls.append(kwargs) + if "parse_mode" in kwargs: + raise BadRequest("can't parse entities") + return True + + adapter._bot.send_message_draft = AsyncMock(side_effect=_draft) + + result = await adapter.send_draft("123", 9, "weird _text") + + assert result.success is True + # First attempt: MarkdownV2; second attempt: plain text, no parse_mode. + assert len(calls) == 2 + assert "parse_mode" in calls[0] + assert "parse_mode" not in calls[1] + assert calls[1]["text"] == "weird _text" # raw, unformatted + + +@pytest.mark.asyncio +async def test_send_draft_non_badrequest_propagates_without_retry(): + """A non-BadRequest failure (e.g. drafts not allowed) returns failure + immediately so the caller falls back to the edit transport.""" + adapter = _make_adapter() + adapter.format_message = lambda c: f"FMT::{c}" + + calls = [] + + async def _draft(**kwargs): + calls.append(kwargs) + raise RuntimeError("drafts disabled for this chat") + + adapter._bot.send_message_draft = AsyncMock(side_effect=_draft) + + result = await adapter.send_draft("123", 11, "hi") + + assert result.success is False + assert len(calls) == 1 # no plain-text retry on non-BadRequest