diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index a06470cbc..16d1cecd3 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -261,6 +261,12 @@ class GatewayStreamConsumer: self._last_sent_text = "" self._fallback_final_send = False self._fallback_prefix = "" + # #29346: a tool/segment boundary means what we delivered was an interim + # preamble, not the final answer — clear the flags so a premature setter + # can't fool the gateway. Safe: got_done returns before any reset, and + # run.py reads these only after the consumer task exits. + self._final_response_sent = False + self._final_content_delivered = False # Native draft streaming: bump the draft_id so the next text segment # animates as a fresh preview below the tool-progress bubbles, not # over the prior segment's already-finalized draft. This is how @@ -549,6 +555,9 @@ class GatewayStreamConsumer: current_update_visible = await self._send_or_edit( display_text, finalize=(got_done or got_segment_break), + # A segment-break finalize closes a preamble, not the + # turn-final answer — only got_done marks delivered (#29346). + is_turn_final=got_done, ) self._last_edit_time = time.monotonic() @@ -1058,12 +1067,17 @@ class GatewayStreamConsumer: age = time.monotonic() - self._message_created_ts return age >= threshold - async def _try_fresh_final(self, text: str) -> bool: + async def _try_fresh_final(self, text: str, *, is_turn_final: bool = True) -> bool: """Send ``text`` as a brand-new message (best-effort delete the old preview) so the platform's visible timestamp reflects completion time. Returns True on successful delivery, False on any failure so the caller falls back to the normal edit path. + ``is_turn_final`` is False when finalizing an interim segment at a tool + boundary (a preamble) rather than the turn-final answer; the + final-delivery flag is then left unset so the gateway still delivers the + real answer from the next API call (#29346). + Ported from openclaw/openclaw#72038. """ old_message_id = self._message_id @@ -1108,10 +1122,13 @@ class GatewayStreamConsumer: self._message_created_ts = None self._already_sent = True self._last_sent_text = text - self._final_response_sent = True + if is_turn_final: + self._final_response_sent = True return True - async def _send_or_edit(self, text: str, *, finalize: bool = False) -> bool: + async def _send_or_edit( + self, text: str, *, finalize: bool = False, is_turn_final: bool = True, + ) -> bool: """Send or edit the streaming message. Returns True if the text was successfully delivered (sent or edited), @@ -1205,7 +1222,9 @@ class GatewayStreamConsumer: if ( finalize and self._should_send_fresh_final() - and await self._try_fresh_final(text) + and await self._try_fresh_final( + text, is_turn_final=is_turn_final, + ) ): return True # Edit existing message diff --git a/tests/gateway/test_stream_consumer_fresh_final.py b/tests/gateway/test_stream_consumer_fresh_final.py index 95f55a211..2ecef4a48 100644 --- a/tests/gateway/test_stream_consumer_fresh_final.py +++ b/tests/gateway/test_stream_consumer_fresh_final.py @@ -10,6 +10,7 @@ time instead of first-token time. from __future__ import annotations +import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock @@ -173,6 +174,179 @@ class TestFreshFinalForLongLivedPreviews: assert consumer._should_send_fresh_final() is False +class TestSegmentBreakDoesNotMarkFinalSent: + """Regression for #29346 — silent response loss after tool calls. + + When ``fresh_final_after_seconds > 0`` and a streamed *preamble* ("Let me + search…") has aged past the threshold, finalizing it at a tool boundary + used to route through ``_try_fresh_final``, which unconditionally set + ``_final_response_sent = True`` even though this is a NON-final segment. + The gateway (run.py:18128) then reads that flag as "final delivered" and + suppresses the genuine final answer (which arrives on a later API call and + does not re-stream), so the user gets nothing. + + The fix scopes the final-delivery flags to the turn-final segment and + clears them at every tool/segment boundary, so a preamble can never mark + the turn as delivered. + """ + + @staticmethod + def _delivered_texts(adapter) -> list[str]: + """Every text the adapter actually put on screen (sends + edits).""" + texts = [c.kwargs.get("content", "") for c in adapter.send.call_args_list] + texts += [c.kwargs.get("content", "") for c in adapter.edit_message.call_args_list] + return texts + + @pytest.mark.asyncio + async def test_preamble_fresh_final_at_tool_boundary_does_not_mark_final(self): + """Real-aging reproduction (exercises the actual _should_send_fresh_final + age gate, not a monkeypatch): a preamble ages past the threshold, then a + tool boundary finalizes it via fresh-final. The genuine final answer is + produced on a later API call and is NOT streamed through this consumer + (the #29346 repro), so the consumer must NOT believe the final was sent.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + fresh_final_after_seconds=0.001, # tiny → real aging fires + ), + ) + consumer.on_delta("Let me search the web for that.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) # preamble sent + aged well past 0.001s + consumer.on_delta(None) # tool boundary → segment-break fresh-final + await asyncio.sleep(0.05) + consumer.finish() + await task + + # Fresh-final actually engaged (preamble preview + a fresh resend), yet + # the turn is NOT marked delivered — no genuine final ever streamed. + assert adapter.send.call_count >= 2 + assert consumer.final_response_sent is False + assert consumer.final_content_delivered is False + + @pytest.mark.asyncio + async def test_final_answer_after_preamble_is_delivered_exactly_once(self): + """P0 user-visible contract: when the real final answer DOES stream in + after the preamble + tool boundary, the user gets it exactly once AND + the consumer marks it delivered (so the gateway correctly suppresses a + redundant send).""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + fresh_final_after_seconds=0.001, + ), + ) + consumer.on_delta("Let me search the web for that.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.on_delta(None) # tool boundary + consumer.on_delta("The answer is 42.") # genuine final answer streams + await asyncio.sleep(0.05) + consumer.finish() + await task + + # The real final answer was delivered → suppression must engage. + assert consumer.final_response_sent is True + # And it reached the user exactly once (no duplicate fresh send). + final_sends = [ + c for c in adapter.send.call_args_list + if "answer is 42" in c.kwargs.get("content", "") + ] + assert len(final_sends) <= 1 + assert any("answer is 42" in t for t in self._delivered_texts(adapter)) + + @pytest.mark.asyncio + async def test_genuine_final_answer_without_tools_marks_delivered(self): + """P1 happy path: a single answer streamed straight to completion (no + tool boundary) still sets final_response_sent so the gateway suppresses + the redundant final send.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + fresh_final_after_seconds=60.0, + ), + ) + consumer.on_delta("Here is the full answer.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.finish() + await task + assert consumer.final_response_sent is True + assert any("Here is the full answer." in t for t in self._delivered_texts(adapter)) + + @pytest.mark.asyncio + async def test_no_edit_adapter_delivers_final_after_preamble(self): + """No-edit adapters (Signal/SMS/webhook → __no_edit__) accumulate and + deliver rather than fresh-final. A preamble before a tool call must not + swallow the genuine final answer — it must reach the user.""" + adapter = _make_adapter() + adapter.send.return_value = SimpleNamespace(success=True, message_id=None) + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + fresh_final_after_seconds=0.001, + ), + ) + consumer.on_delta("Let me search the web for that.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.on_delta(None) # tool boundary + consumer.on_delta("The answer is 42.") # genuine final answer + await asyncio.sleep(0.05) + consumer.finish() + await task + # The final answer reached the user, not swallowed by the preamble. + assert any( + "answer is 42" in c.kwargs.get("content", "") + for c in adapter.send.call_args_list + ) + + @pytest.mark.asyncio + async def test_multi_tool_call_turn_delivers_final_once(self): + """Two tool boundaries before the final answer: flags stay clear across + both boundaries and the genuine final is delivered exactly once and + marked sent.""" + adapter = _make_adapter() + consumer = GatewayStreamConsumer( + adapter=adapter, + chat_id="chat", + config=StreamConsumerConfig( + edit_interval=0.01, buffer_threshold=5, cursor=" ▉", + fresh_final_after_seconds=0.001, + ), + ) + consumer.on_delta("Let me check a couple of things.") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.on_delta(None) # tool boundary 1 + consumer.on_delta("Now cross-referencing.") + await asyncio.sleep(0.05) + consumer.on_delta(None) # tool boundary 2 + consumer.on_delta("The answer is 42.") # genuine final answer + await asyncio.sleep(0.05) + consumer.finish() + await task + + assert consumer.final_response_sent is True + final_sends = [ + c for c in adapter.send.call_args_list + if "answer is 42" in c.kwargs.get("content", "") + ] + assert len(final_sends) <= 1 + assert any("answer is 42" in t for t in self._delivered_texts(adapter)) + + class TestStreamConsumerConfigFreshFinalField: """The dataclass field must exist and default to 0 (disabled)."""