feat(gateway): structured stream-event protocol + Telegram draft formatting parity (#37250)

Introduce a typed agent→gateway delivery contract so the gateway (not the
agent) decides how each streaming event is rendered per platform. Moves toward
smart-agent/smart-gateway separation while reproducing today's behavior exactly
in the base class.

- gateway/stream_events.py: typed event vocabulary (MessageChunk/Stop,
  Commentary, ToolCallChunk/Finished, LongToolHint, GatewayNotice).
- gateway/stream_dispatch.py: GatewayEventDispatcher routes events through the
  adapter; adapters can eat events they can't render (e.g. tool chrome on
  plain-text platforms).
- gateway/platforms/base.py: render_message_event + format_tool_event default
  hooks reproduce the historical emoji/preview tool formatting and consumer
  delegation 1:1; adapters override for native rendering.
- gateway/platforms/telegram.py: send_draft now applies MarkdownV2 (format_message
  + parse_mode) with a plain-text fallback on BadRequest, fixing the jarring
  raw-text→formatted shift when the draft finalizes as a real sendMessage.
- gateway/config.py: default streaming transport edit → auto. Safe globally:
  adapters without draft support report supports_draft_streaming()==False and
  transparently use edit, so only Telegram DMs gain native drafts.

Presentation-only contract — nothing rendered here is persisted to conversation
history, preserving cache/message-flow invariants.
This commit is contained in:
Teknium
2026-06-02 00:33:50 -07:00
committed by GitHub
parent 2c0d648397
commit 787936d133
8 changed files with 740 additions and 29 deletions

View File

@ -361,10 +361,17 @@ class StreamingConfig:
# fall back to edit-based when not.
# "draft" — explicitly request native drafts; falls back to edit when
# the platform/chat doesn't support them.
# "edit" — progressive editMessageText only (legacy/default
# behaviour).
# "edit" — progressive editMessageText only (legacy behaviour).
# "off" — disable streaming entirely.
transport: str = "edit"
#
# Default is "auto": prefer native draft streaming on platforms that
# support it (Telegram DMs via sendMessageDraft, Bot API 9.5+) and fall
# back to edit-based streaming everywhere else. This is safe as a global
# default because adapters without draft support (Discord, Slack, Matrix,
# …) report supports_draft_streaming() == False and transparently use the
# edit path — so "auto" never regresses non-Telegram platforms, it only
# upgrades the chats that can render the smoother native preview.
transport: str = "auto"
edit_interval: float = DEFAULT_STREAMING_EDIT_INTERVAL
buffer_threshold: int = DEFAULT_STREAMING_BUFFER_THRESHOLD
cursor: str = DEFAULT_STREAMING_CURSOR
@ -393,7 +400,7 @@ class StreamingConfig:
return cls()
return cls(
enabled=_coerce_bool(data.get("enabled"), False),
transport=data.get("transport", "edit"),
transport=data.get("transport", "auto"),
edit_interval=_coerce_float(
data.get("edit_interval"), DEFAULT_STREAMING_EDIT_INTERVAL,
),

View File

@ -1916,6 +1916,84 @@ class BasePlatformAdapter(ABC):
f"{type(self).__name__} does not implement send_draft"
)
# ── Structured stream-event rendering ────────────────────────────────
#
# These methods let an adapter decide *how* to present each structured
# streaming event (see gateway/stream_events.py). The default
# implementations reproduce the historical behavior exactly: assistant
# text/commentary/segment events delegate to the stream consumer, and
# tool events render the same "emoji tool_name: preview" chrome the
# gateway has always produced. Adapters override these to be more native
# to their platform (e.g. Telegram streaming a MarkdownV2 ```bash``` block
# as a draft; iMessage eating tool chrome it cannot format).
#
# The contract is presentation-only: nothing rendered here is persisted to
# conversation history. History is owned by the agent; what an adapter
# chooses to "eat" must never change the bytes the agent stored.
def render_message_event(self, event: Any, sink: Any) -> None:
"""Render a MessageChunk / MessageStop / Commentary onto the sink.
Default: map onto the stream consumer's existing primitives, preserving
today's behavior 1:1. ``sink`` is a GatewayStreamConsumer.
"""
from gateway.stream_events import MessageChunk, MessageStop, Commentary
if isinstance(event, MessageChunk):
if event.text:
sink.on_delta(event.text)
elif isinstance(event, MessageStop):
# An intermediate stop (text → tool → text) is a segment break;
# the terminal stop is signalled by the gateway via finish(),
# not here, so we only break segments on non-final stops.
if not event.final:
sink.on_segment_break()
elif isinstance(event, Commentary):
if event.text:
sink.on_commentary(event.text)
def format_tool_event(self, event: Any, *, mode: str = "all",
preview_max_len: int = 40) -> Optional[str]:
"""Return the rendered chrome for a ToolCallChunk, or None to eat it.
Reproduces the gateway's historical tool-progress formatting: an emoji
for the tool, the tool name, and a short argument preview (or the full
args dict in ``verbose`` mode). Adapters that cannot render tool chrome
(no message editing, plain-text only) should override to return None so
the event is dropped rather than spamming separate bubbles.
``mode`` is the resolved tool-progress mode ("all" / "new" / "verbose");
``preview_max_len`` mirrors the ``tool_preview_length`` config (0 means
"no cap" in verbose mode).
"""
from gateway.stream_events import ToolCallChunk
if not isinstance(event, ToolCallChunk):
return None
from agent.display import get_tool_emoji
emoji = get_tool_emoji(event.tool_name, default="⚙️")
if mode == "verbose":
if event.args:
import json
args_str = json.dumps(event.args, ensure_ascii=False, default=str)
if preview_max_len > 0 and len(args_str) > preview_max_len:
args_str = args_str[:preview_max_len - 3] + "..."
return f"{emoji} {event.tool_name}({list(event.args.keys())})\n{args_str}"
if event.preview:
return f"{emoji} {event.tool_name}: \"{event.preview}\""
return f"{emoji} {event.tool_name}..."
# "all" / "new": short preview, capped (default 40 to keep gateway
# progress bubbles compact — they persist as permanent messages).
preview = event.preview
if preview:
cap = preview_max_len if preview_max_len > 0 else 40
if len(preview) > cap:
preview = preview[:cap - 3] + "..."
return f"{emoji} {event.tool_name}: \"{preview}\""
return f"{emoji} {event.tool_name}..."
@property
def has_fatal_error(self) -> bool:
return self._fatal_error_message is not None

View File

@ -2521,31 +2521,55 @@ class TelegramAdapter(BasePlatformAdapter):
text = content if len(content) <= self.MAX_MESSAGE_LENGTH else \
self.truncate_message(content, self.MAX_MESSAGE_LENGTH, len_fn=utf16_len)[0]
kwargs: Dict[str, Any] = {
"chat_id": int(chat_id),
"draft_id": int(draft_id),
"text": text,
}
thread_id = self._metadata_thread_id(metadata)
if thread_id is not None:
kwargs["message_thread_id"] = thread_id
try:
ok = await self._bot.send_message_draft(**kwargs)
if ok:
# Drafts have no message_id; we report success without one
# so the caller knows the animation frame landed.
return SendResult(success=True, message_id=None)
return SendResult(success=False, error="draft_rejected")
except Exception as e:
# Most likely: BadRequest because this bot/chat doesn't allow
# drafts, or a transient server hiccup. The caller treats any
# failure as "fall back to edit-based for this response".
logger.debug(
"[%s] sendMessageDraft failed (chat=%s draft_id=%s): %s",
self.name, chat_id, draft_id, e,
)
return SendResult(success=False, error=str(e))
# Apply the same MarkdownV2 conversion the regular ``send`` path uses
# so the animated draft preview renders with identical formatting to
# the final message. Without this, the draft streams as raw text and
# the final ``sendMessage`` (which DOES use MarkdownV2) snaps into
# formatted output, producing a jarring visual shift at the end of the
# response. We try MarkdownV2 first and fall back to plain text if a
# malformed escape would be rejected — mirroring the (True, False)
# retry the streaming send loop uses — so a single bad token never
# kills draft streaming for the whole response.
for use_markdown in (True, False):
kwargs: Dict[str, Any] = {
"chat_id": int(chat_id),
"draft_id": int(draft_id),
"text": self.format_message(text) if use_markdown else text,
}
if use_markdown:
kwargs["parse_mode"] = ParseMode.MARKDOWN_V2
if thread_id is not None:
kwargs["message_thread_id"] = thread_id
try:
ok = await self._bot.send_message_draft(**kwargs)
if ok:
# Drafts have no message_id; we report success without one
# so the caller knows the animation frame landed.
return SendResult(success=True, message_id=None)
return SendResult(success=False, error="draft_rejected")
except Exception as e:
# A MarkdownV2 parse failure (BadRequest "can't parse entities")
# is recoverable: retry once as plain text. Any other failure
# (chat doesn't allow drafts, transient hiccup) — or a failure
# on the plain-text attempt — propagates to the caller, which
# treats it as "fall back to edit-based for this response".
if use_markdown and self._is_bad_request_error(e):
logger.debug(
"[%s] sendMessageDraft MarkdownV2 rejected, retrying "
"as plain text (chat=%s draft_id=%s): %s",
self.name, chat_id, draft_id, e,
)
continue
logger.debug(
"[%s] sendMessageDraft failed (chat=%s draft_id=%s): %s",
self.name, chat_id, draft_id, e,
)
return SendResult(success=False, error=str(e))
return SendResult(success=False, error="draft_rejected")
async def _send_message_with_thread_fallback(self, **kwargs):
"""Send a Telegram message, retrying once without message_thread_id

132
gateway/stream_dispatch.py Normal file
View File

@ -0,0 +1,132 @@
"""Adapter-driven dispatch of structured stream events to a delivery sink.
``GatewayEventDispatcher`` is the seam Tobi asked for: the agent emits typed
events (gateway/stream_events.py), and the *adapter* decides how each one is
delivered. The dispatcher holds an adapter + the stream consumer (sink) + the
resolved per-channel presentation settings (tool-progress mode, preview length)
and routes each event through the adapter's render hooks.
Message/commentary/segment events flow into the consumer (native draft on
Telegram DMs, edit-in-place elsewhere). Tool events are formatted by the
adapter — which may return None to *eat* the event on platforms that can't
render tool chrome — and the rendered line is enqueued onto the same tool
progress queue the gateway already drains, so the two no longer race through
independent code paths.
This module deliberately has no platform knowledge and no asyncio: it is a thin
synchronous router callable from the agent's worker thread, exactly like the
callbacks it replaces.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Optional
from gateway.stream_events import (
Commentary,
GatewayNotice,
LongToolHint,
MessageChunk,
MessageStop,
StreamEvent,
ToolCallChunk,
ToolCallFinished,
)
logger = logging.getLogger("gateway.stream_events")
class GatewayEventDispatcher:
"""Route typed stream events through an adapter onto a delivery sink.
Parameters
----------
adapter:
The platform adapter. Provides ``render_message_event`` and
``format_tool_event`` (BasePlatformAdapter defaults reproduce today's
behavior; adapters may override for native rendering).
sink:
The GatewayStreamConsumer for assistant-text delivery. May be None
when streaming is disabled, in which case message events are dropped
(the final response still goes out via the normal send path).
enqueue_tool_line:
Callback that places a rendered tool-progress line onto the gateway's
progress queue (the same queue ``send_progress_messages`` drains). May
be None when tool progress is disabled for this channel.
tool_mode:
Resolved tool-progress mode for this channel ("all" / "new" / "verbose"
/ "off").
preview_max_len:
Resolved ``tool_preview_length`` (0 = no cap in verbose mode).
on_long_tool / on_notice:
Optional hooks for LongToolHint / GatewayNotice events, letting the
gateway own the "should I surface this here?" decision.
"""
def __init__(
self,
adapter: Any,
sink: Any = None,
*,
enqueue_tool_line: Optional[Callable[[Any], None]] = None,
tool_mode: str = "all",
preview_max_len: int = 40,
on_long_tool: Optional[Callable[[LongToolHint], None]] = None,
on_notice: Optional[Callable[[GatewayNotice], None]] = None,
) -> None:
self.adapter = adapter
self.sink = sink
self._enqueue_tool_line = enqueue_tool_line
self.tool_mode = tool_mode or "all"
self.preview_max_len = preview_max_len
self._on_long_tool = on_long_tool
self._on_notice = on_notice
# "new" mode dedup — only report when the tool changes.
self._last_tool: Optional[str] = None
def dispatch(self, event: StreamEvent) -> None:
"""Route a single event. Never raises into the agent's worker thread."""
try:
self._dispatch(event)
except Exception: # presentation must never break the agent loop
logger.debug("stream-event dispatch error", exc_info=True)
def _dispatch(self, event: StreamEvent) -> None:
if isinstance(event, (MessageChunk, MessageStop, Commentary)):
if self.sink is not None:
self.adapter.render_message_event(event, self.sink)
return
if isinstance(event, ToolCallChunk):
if self.tool_mode == "off" or self._enqueue_tool_line is None:
return
# "new" mode: only emit when the tool changes.
if self.tool_mode == "new" and event.tool_name == self._last_tool:
return
self._last_tool = event.tool_name
line = self.adapter.format_tool_event(
event, mode=self.tool_mode, preview_max_len=self.preview_max_len,
)
# None == adapter chose to eat this event (can't render tool chrome).
if line:
self._enqueue_tool_line(line)
return
if isinstance(event, ToolCallFinished):
# Default: no chrome on completion (matches today — the gateway only
# rendered "started" events). Completion drives onboarding hints.
return
if isinstance(event, LongToolHint):
if self._on_long_tool is not None:
self._on_long_tool(event)
return
if isinstance(event, GatewayNotice):
if self._on_notice is not None:
self._on_notice(event)
return
__all__ = ["GatewayEventDispatcher"]

171
gateway/stream_events.py Normal file
View File

@ -0,0 +1,171 @@
"""Structured streaming events — the agent→gateway delivery contract.
Historically the agent drove gateway delivery through a fan of loosely-typed
callbacks (``stream_delta_callback(text)``, ``tool_progress_callback(event_type,
tool_name, preview, args)``, ``interim_assistant_callback(text)`` …) and each
gateway callback decided *both* what to render and how to send it. That
coupling is why tool-progress bubbles and the streaming draft raced each other
on Telegram, and why tool-call formatting lived agent-side even though only the
gateway knows what a given platform can render.
This module defines a small, typed event vocabulary that names *what happened*
without prescribing *how it is delivered*. The gateway's stream consumer
(``GatewayStreamConsumer``) is the single sink; the platform adapter decides how
to render each event (Telegram can stream a MarkdownV2 ```bash``` block as a
native draft; iMessage has no rich formatting and may collapse or drop tool
chrome). Separation of concerns: smart agent emits structured data, smart
gateway decides delivery.
These are intentionally plain frozen dataclasses — no behavior, no platform
knowledge, no I/O. They are cheap to construct on the agent's worker thread and
safe to hand across the thread/async boundary into the consumer queue.
Design constraints (see hermes-agent-dev skill — message-flow + cache
invariants):
* Events describe *transport*, never *context*. Nothing here is persisted to
conversation history; what the gateway chooses to "eat" (e.g. tool chrome on
a platform that can't render it) must never diverge from the bytes stored in
the agent's message history. History is owned by the agent; these events are
a presentation-layer stream only.
* Backward compatible by construction. The gateway adapts its existing
callbacks into these events at the boundary; adapters that don't opt into
event-native rendering get identical behavior via the base-class default.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Union
# ── Message (assistant text) events ──────────────────────────────────────────
@dataclass(frozen=True)
class MessageChunk:
"""A delta of streamed assistant text.
``text`` is the incremental content as it arrives from the model. The
consumer accumulates chunks and progressively renders them (native draft on
Telegram DMs, edit-in-place elsewhere). Reasoning/think-block content is
filtered upstream and never arrives as a MessageChunk.
"""
text: str
@dataclass(frozen=True)
class MessageStop:
"""The current assistant message segment is complete.
Emitted when a contiguous run of assistant text ends — either the whole
response finished, or a tool boundary interrupts the text so the next
segment should render as a fresh message *below* any tool chrome.
``final`` is True only for the terminal stop of the whole turn; an
intermediate stop (text → tool call → more text) carries ``final=False`` so
the consumer finalizes the current bubble and prepares a new segment without
treating the turn as done.
"""
final: bool = False
@dataclass(frozen=True)
class Commentary:
"""A complete interim assistant message emitted between tool iterations.
Example: the model says "I'll inspect the repo first." before issuing a tool
call. Unlike a MessageChunk this is already-complete text (not a delta); the
consumer renders it as its own message so it reads as a distinct beat.
"""
text: str
# ── Tool-call events ─────────────────────────────────────────────────────────
@dataclass(frozen=True)
class ToolCallChunk:
"""A tool invocation has started (or its in-progress state changed).
Carries the raw facts about the call — name, a short argument ``preview``,
and the full ``args`` dict — and lets the *gateway* decide presentation
(emoji, truncation, verbose vs compact, or eat it entirely on platforms that
don't show tool chrome). Previously the agent's gateway callback baked the
emoji + preview formatting in; that decision now belongs to the adapter.
"""
tool_name: str
preview: Optional[str] = None
args: Optional[Dict[str, Any]] = None
# Monotonic per-turn index, so the consumer can correlate a finish with its
# start and so "new"-mode dedup (only report when the tool changes) works
# without the consumer tracking call order itself.
index: int = 0
@dataclass(frozen=True)
class ToolCallFinished:
"""A tool invocation completed.
``duration`` is wall-clock seconds. ``ok`` reflects whether the tool
returned without raising. The gateway uses this to clear/settle a progress
bubble and to drive one-time onboarding hints (e.g. suggest /verbose after a
long tool run). No tool *output* travels here — output is the agent's
concern and is persisted to history, not streamed as presentation.
"""
tool_name: str
duration: float = 0.0
ok: bool = True
index: int = 0
# ── Gateway control / lifecycle events ───────────────────────────────────────
@dataclass(frozen=True)
class LongToolHint:
"""One-shot onboarding nudge when a tool runs longer than the threshold.
The gateway gates this on platform capability (the /verbose command must be
usable) and on the user not having seen the hint before. Modeled as an
event so the *gateway* owns the "should I surface this here?" decision rather
than the agent.
"""
tool_name: str = ""
duration: float = 0.0
@dataclass(frozen=True)
class GatewayNotice:
"""A gateway-originated control message (restart, online, long-run notice).
``kind`` is a stable string the adapter can switch on
(``"restart"`` / ``"online"`` / ``"long_run"`` / …). ``text`` is the
human-readable default the base class renders when an adapter has no
platform-specific treatment.
"""
kind: str
text: str = ""
extra: Dict[str, Any] = field(default_factory=dict)
# Union of every event the consumer's dispatcher accepts. Kept explicit (rather
# than a marker base class) so a missing ``case`` in an exhaustive match is a
# visible type error rather than a silent fall-through.
StreamEvent = Union[
MessageChunk,
MessageStop,
Commentary,
ToolCallChunk,
ToolCallFinished,
LongToolHint,
GatewayNotice,
]
__all__ = [
"MessageChunk",
"MessageStop",
"Commentary",
"ToolCallChunk",
"ToolCallFinished",
"LongToolHint",
"GatewayNotice",
"StreamEvent",
]

View File

@ -164,9 +164,12 @@ class TestSessionResetPolicy:
class TestStreamingConfig:
def test_defaults_to_edit_transport(self):
def test_defaults_to_auto_transport(self):
# "auto" prefers native draft streaming where the platform supports
# it (Telegram DMs) and falls back to edit-based everywhere else, so
# it is safe as the global out-of-the-box default.
restored = StreamingConfig.from_dict({"enabled": "true"})
assert restored.transport == "edit"
assert restored.transport == "auto"
def test_from_dict_coerces_quoted_false_enabled(self):
restored = StreamingConfig.from_dict({"enabled": "false"})

View File

@ -0,0 +1,182 @@
"""Structured stream-event protocol + dispatcher behavior.
Covers the agent→gateway delivery contract introduced to decouple *what
happened* (typed events) from *how it's delivered* (adapter decides). The
default BasePlatformAdapter rendering must reproduce today's behavior exactly;
an adapter may override format_tool_event to eat tool chrome on platforms that
can't render it.
"""
from __future__ import annotations
from unittest.mock import MagicMock
from gateway.stream_dispatch import GatewayEventDispatcher
from gateway.stream_events import (
Commentary,
GatewayNotice,
LongToolHint,
MessageChunk,
MessageStop,
ToolCallChunk,
ToolCallFinished,
)
def _base_adapter():
"""A real BasePlatformAdapter instance (abstractmethods cleared) so we
exercise the genuine default render hooks, not a mock."""
from gateway.platforms.base import BasePlatformAdapter
Concrete = type("Concrete", (BasePlatformAdapter,), {})
Concrete.__abstractmethods__ = frozenset()
return Concrete.__new__(Concrete)
class _FakeSink:
def __init__(self):
self.deltas = []
self.commentary = []
self.segment_breaks = 0
def on_delta(self, text):
self.deltas.append(text)
def on_commentary(self, text):
self.commentary.append(text)
def on_segment_break(self):
self.segment_breaks += 1
# ── Message events → sink ────────────────────────────────────────────────────
def test_message_chunk_flows_to_sink_on_delta():
sink = _FakeSink()
d = GatewayEventDispatcher(_base_adapter(), sink)
d.dispatch(MessageChunk("hello "))
d.dispatch(MessageChunk("world"))
assert sink.deltas == ["hello ", "world"]
def test_intermediate_message_stop_breaks_segment_but_final_does_not():
sink = _FakeSink()
d = GatewayEventDispatcher(_base_adapter(), sink)
d.dispatch(MessageStop(final=False))
d.dispatch(MessageStop(final=True))
assert sink.segment_breaks == 1 # only the non-final stop breaks
def test_commentary_flows_to_sink():
sink = _FakeSink()
d = GatewayEventDispatcher(_base_adapter(), sink)
d.dispatch(Commentary("I'll inspect the repo first."))
assert sink.commentary == ["I'll inspect the repo first."]
def test_message_events_dropped_when_no_sink():
# streaming disabled → no sink → message events are no-ops, no crash.
d = GatewayEventDispatcher(_base_adapter(), sink=None)
d.dispatch(MessageChunk("x")) # must not raise
# ── Tool events → progress queue, formatted by adapter ───────────────────────
def test_tool_call_chunk_renders_default_chrome():
lines = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(),
enqueue_tool_line=lines.append, tool_mode="all",
)
d.dispatch(ToolCallChunk(tool_name="terminal", preview="ls -la"))
assert len(lines) == 1
assert "terminal" in lines[0]
assert "ls -la" in lines[0]
def test_tool_preview_truncated_to_cap():
lines = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(),
enqueue_tool_line=lines.append, tool_mode="all", preview_max_len=10,
)
d.dispatch(ToolCallChunk(tool_name="x", preview="0123456789ABCDEF"))
# capped at 10 → 7 chars + "..." (then wrapped in quotes by the renderer)
assert '"0123456..."' in lines[0]
assert "89ABCDEF" not in lines[0]
def test_new_mode_dedups_same_tool():
lines = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(),
enqueue_tool_line=lines.append, tool_mode="new",
)
d.dispatch(ToolCallChunk(tool_name="terminal", preview="a"))
d.dispatch(ToolCallChunk(tool_name="terminal", preview="b")) # deduped
d.dispatch(ToolCallChunk(tool_name="read_file", preview="c"))
assert len(lines) == 2 # terminal once, read_file once
def test_off_mode_emits_nothing():
lines = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(),
enqueue_tool_line=lines.append, tool_mode="off",
)
d.dispatch(ToolCallChunk(tool_name="terminal", preview="ls"))
assert lines == []
def test_adapter_can_eat_tool_chrome():
"""An adapter that returns None from format_tool_event drops the event —
the 'iMessage can't render tool chrome' case."""
adapter = _base_adapter()
adapter.format_tool_event = lambda event, **kw: None # eat everything
lines = []
d = GatewayEventDispatcher(
adapter, _FakeSink(), enqueue_tool_line=lines.append, tool_mode="all",
)
d.dispatch(ToolCallChunk(tool_name="terminal", preview="ls"))
assert lines == [] # eaten
def test_tool_finished_emits_no_chrome():
lines = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(),
enqueue_tool_line=lines.append, tool_mode="all",
)
d.dispatch(ToolCallFinished(tool_name="terminal", duration=2.0, ok=True))
assert lines == []
# ── Control events → gateway-owned hooks ─────────────────────────────────────
def test_long_tool_hint_routes_to_hook():
seen = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(), on_long_tool=seen.append,
)
d.dispatch(LongToolHint(tool_name="terminal", duration=45.0))
assert len(seen) == 1
assert seen[0].tool_name == "terminal"
def test_gateway_notice_routes_to_hook():
seen = []
d = GatewayEventDispatcher(
_base_adapter(), _FakeSink(), on_notice=seen.append,
)
d.dispatch(GatewayNotice(kind="restart", text="Gateway restarted"))
assert seen[0].kind == "restart"
def test_dispatch_swallows_render_errors():
"""A render error must never propagate into the agent worker thread."""
adapter = _base_adapter()
def _boom(event, sink):
raise RuntimeError("render blew up")
adapter.render_message_event = _boom
d = GatewayEventDispatcher(adapter, _FakeSink())
d.dispatch(MessageChunk("x")) # must not raise

View File

@ -0,0 +1,114 @@
"""TelegramAdapter.send_draft MarkdownV2 formatting parity.
Bot API 9.5 ``sendMessageDraft`` powers the animated streaming preview in
DMs. The regular ``send`` path renders with MarkdownV2, so the draft must
too — otherwise the live preview streams as raw text and the final
``sendMessage`` snaps into formatted output, producing a jarring visual
shift at the end of the response (reported by an external user, May 2026).
These tests pin:
1. The happy path passes ``parse_mode=MARKDOWN_V2`` with format_message'd
text (formatting parity with the final message).
2. A MarkdownV2 BadRequest triggers a single plain-text retry rather than
killing draft streaming for the whole response.
3. A non-BadRequest failure propagates so the caller falls back to edit.
"""
import sys
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.error.NetworkError = type("NetworkError", (OSError,), {})
mod.error.TimedOut = type("TimedOut", (OSError,), {})
mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
sys.modules.setdefault("telegram.error", mod.error)
_ensure_telegram_mock()
from gateway.platforms import telegram as tg_mod # noqa: E402
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
def _make_adapter() -> TelegramAdapter:
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
adapter._bot = MagicMock()
adapter._bot.send_message_draft = AsyncMock(return_value=True)
return adapter
@pytest.mark.asyncio
async def test_send_draft_passes_markdownv2_parse_mode():
"""Happy path: draft is sent with parse_mode set and format_message'd text."""
adapter = _make_adapter()
# Make format_message observable and deterministic.
adapter.format_message = lambda c: f"FMT::{c}"
result = await adapter.send_draft("123", 7, "**bold** body")
assert result.success is True
adapter._bot.send_message_draft.assert_awaited_once()
kwargs = adapter._bot.send_message_draft.await_args.kwargs
assert kwargs["text"] == "FMT::**bold** body"
assert kwargs["parse_mode"] is tg_mod.ParseMode.MARKDOWN_V2
assert kwargs["chat_id"] == 123
assert kwargs["draft_id"] == 7
@pytest.mark.asyncio
async def test_send_draft_falls_back_to_plain_text_on_markdownv2_error():
"""A MarkdownV2 BadRequest retries once as plain text (no parse_mode),
instead of aborting draft streaming for the whole response."""
adapter = _make_adapter()
adapter.format_message = lambda content: f"FMT::{content}"
# Resolve the BadRequest type the adapter checks via _is_bad_request_error.
from telegram.error import BadRequest # type: ignore
calls = []
async def _draft(**kwargs):
calls.append(kwargs)
if "parse_mode" in kwargs:
raise BadRequest("can't parse entities")
return True
adapter._bot.send_message_draft = AsyncMock(side_effect=_draft)
result = await adapter.send_draft("123", 9, "weird _text")
assert result.success is True
# First attempt: MarkdownV2; second attempt: plain text, no parse_mode.
assert len(calls) == 2
assert "parse_mode" in calls[0]
assert "parse_mode" not in calls[1]
assert calls[1]["text"] == "weird _text" # raw, unformatted
@pytest.mark.asyncio
async def test_send_draft_non_badrequest_propagates_without_retry():
"""A non-BadRequest failure (e.g. drafts not allowed) returns failure
immediately so the caller falls back to the edit transport."""
adapter = _make_adapter()
adapter.format_message = lambda c: f"FMT::{c}"
calls = []
async def _draft(**kwargs):
calls.append(kwargs)
raise RuntimeError("drafts disabled for this chat")
adapter._bot.send_message_draft = AsyncMock(side_effect=_draft)
result = await adapter.send_draft("123", 11, "hi")
assert result.success is False
assert len(calls) == 1 # no plain-text retry on non-BadRequest