fix(gateway): scope final-delivery flags to turn-final segment (#29346)

A streamed preamble ("Let me search...") finalized at a tool boundary
routed through _try_fresh_final, which unconditionally set
_final_response_sent=True even though it is a NON-final segment. The
gateway then reads that flag as "final delivered" and suppresses the
genuine final answer produced on the next API call, so the user silently
gets nothing. Only reproduces with fresh_final_after_seconds > 0.

- _try_fresh_final / _send_or_edit take is_turn_final; the segment-break
  call site passes is_turn_final=got_done so only the turn-final answer
  marks final-delivered.
- _reset_segment_state clears the final-delivery flags at every tool
  boundary as defense-in-depth against any future premature setter.
- Failing-first regression + happy-path no-duplicate test.
This commit is contained in:
firefly
2026-05-28 23:23:31 -04:00
committed by Teknium
parent 92273e4f57
commit 8bf498c21d
2 changed files with 197 additions and 4 deletions

View File

@ -261,6 +261,12 @@ class GatewayStreamConsumer:
self._last_sent_text = ""
self._fallback_final_send = False
self._fallback_prefix = ""
# #29346: a tool/segment boundary means what we delivered was an interim
# preamble, not the final answer — clear the flags so a premature setter
# can't fool the gateway. Safe: got_done returns before any reset, and
# run.py reads these only after the consumer task exits.
self._final_response_sent = False
self._final_content_delivered = False
# Native draft streaming: bump the draft_id so the next text segment
# animates as a fresh preview below the tool-progress bubbles, not
# over the prior segment's already-finalized draft. This is how
@ -549,6 +555,9 @@ class GatewayStreamConsumer:
current_update_visible = await self._send_or_edit(
display_text,
finalize=(got_done or got_segment_break),
# A segment-break finalize closes a preamble, not the
# turn-final answer — only got_done marks delivered (#29346).
is_turn_final=got_done,
)
self._last_edit_time = time.monotonic()
@ -1058,12 +1067,17 @@ class GatewayStreamConsumer:
age = time.monotonic() - self._message_created_ts
return age >= threshold
async def _try_fresh_final(self, text: str) -> bool:
async def _try_fresh_final(self, text: str, *, is_turn_final: bool = True) -> bool:
"""Send ``text`` as a brand-new message (best-effort delete the old
preview) so the platform's visible timestamp reflects completion
time. Returns True on successful delivery, False on any failure so
the caller falls back to the normal edit path.
``is_turn_final`` is False when finalizing an interim segment at a tool
boundary (a preamble) rather than the turn-final answer; the
final-delivery flag is then left unset so the gateway still delivers the
real answer from the next API call (#29346).
Ported from openclaw/openclaw#72038.
"""
old_message_id = self._message_id
@ -1108,10 +1122,13 @@ class GatewayStreamConsumer:
self._message_created_ts = None
self._already_sent = True
self._last_sent_text = text
self._final_response_sent = True
if is_turn_final:
self._final_response_sent = True
return True
async def _send_or_edit(self, text: str, *, finalize: bool = False) -> bool:
async def _send_or_edit(
self, text: str, *, finalize: bool = False, is_turn_final: bool = True,
) -> bool:
"""Send or edit the streaming message.
Returns True if the text was successfully delivered (sent or edited),
@ -1205,7 +1222,9 @@ class GatewayStreamConsumer:
if (
finalize
and self._should_send_fresh_final()
and await self._try_fresh_final(text)
and await self._try_fresh_final(
text, is_turn_final=is_turn_final,
)
):
return True
# Edit existing message

View File

@ -10,6 +10,7 @@ time instead of first-token time.
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
@ -173,6 +174,179 @@ class TestFreshFinalForLongLivedPreviews:
assert consumer._should_send_fresh_final() is False
class TestSegmentBreakDoesNotMarkFinalSent:
"""Regression for #29346 — silent response loss after tool calls.
When ``fresh_final_after_seconds > 0`` and a streamed *preamble* ("Let me
search…") has aged past the threshold, finalizing it at a tool boundary
used to route through ``_try_fresh_final``, which unconditionally set
``_final_response_sent = True`` even though this is a NON-final segment.
The gateway (run.py:18128) then reads that flag as "final delivered" and
suppresses the genuine final answer (which arrives on a later API call and
does not re-stream), so the user gets nothing.
The fix scopes the final-delivery flags to the turn-final segment and
clears them at every tool/segment boundary, so a preamble can never mark
the turn as delivered.
"""
@staticmethod
def _delivered_texts(adapter) -> list[str]:
"""Every text the adapter actually put on screen (sends + edits)."""
texts = [c.kwargs.get("content", "") for c in adapter.send.call_args_list]
texts += [c.kwargs.get("content", "") for c in adapter.edit_message.call_args_list]
return texts
@pytest.mark.asyncio
async def test_preamble_fresh_final_at_tool_boundary_does_not_mark_final(self):
"""Real-aging reproduction (exercises the actual _should_send_fresh_final
age gate, not a monkeypatch): a preamble ages past the threshold, then a
tool boundary finalizes it via fresh-final. The genuine final answer is
produced on a later API call and is NOT streamed through this consumer
(the #29346 repro), so the consumer must NOT believe the final was sent."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.001, # tiny → real aging fires
),
)
consumer.on_delta("Let me search the web for that.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05) # preamble sent + aged well past 0.001s
consumer.on_delta(None) # tool boundary → segment-break fresh-final
await asyncio.sleep(0.05)
consumer.finish()
await task
# Fresh-final actually engaged (preamble preview + a fresh resend), yet
# the turn is NOT marked delivered — no genuine final ever streamed.
assert adapter.send.call_count >= 2
assert consumer.final_response_sent is False
assert consumer.final_content_delivered is False
@pytest.mark.asyncio
async def test_final_answer_after_preamble_is_delivered_exactly_once(self):
"""P0 user-visible contract: when the real final answer DOES stream in
after the preamble + tool boundary, the user gets it exactly once AND
the consumer marks it delivered (so the gateway correctly suppresses a
redundant send)."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.001,
),
)
consumer.on_delta("Let me search the web for that.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta(None) # tool boundary
consumer.on_delta("The answer is 42.") # genuine final answer streams
await asyncio.sleep(0.05)
consumer.finish()
await task
# The real final answer was delivered → suppression must engage.
assert consumer.final_response_sent is True
# And it reached the user exactly once (no duplicate fresh send).
final_sends = [
c for c in adapter.send.call_args_list
if "answer is 42" in c.kwargs.get("content", "")
]
assert len(final_sends) <= 1
assert any("answer is 42" in t for t in self._delivered_texts(adapter))
@pytest.mark.asyncio
async def test_genuine_final_answer_without_tools_marks_delivered(self):
"""P1 happy path: a single answer streamed straight to completion (no
tool boundary) still sets final_response_sent so the gateway suppresses
the redundant final send."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=60.0,
),
)
consumer.on_delta("Here is the full answer.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.finish()
await task
assert consumer.final_response_sent is True
assert any("Here is the full answer." in t for t in self._delivered_texts(adapter))
@pytest.mark.asyncio
async def test_no_edit_adapter_delivers_final_after_preamble(self):
"""No-edit adapters (Signal/SMS/webhook → __no_edit__) accumulate and
deliver rather than fresh-final. A preamble before a tool call must not
swallow the genuine final answer — it must reach the user."""
adapter = _make_adapter()
adapter.send.return_value = SimpleNamespace(success=True, message_id=None)
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.001,
),
)
consumer.on_delta("Let me search the web for that.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta(None) # tool boundary
consumer.on_delta("The answer is 42.") # genuine final answer
await asyncio.sleep(0.05)
consumer.finish()
await task
# The final answer reached the user, not swallowed by the preamble.
assert any(
"answer is 42" in c.kwargs.get("content", "")
for c in adapter.send.call_args_list
)
@pytest.mark.asyncio
async def test_multi_tool_call_turn_delivers_final_once(self):
"""Two tool boundaries before the final answer: flags stay clear across
both boundaries and the genuine final is delivered exactly once and
marked sent."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(
edit_interval=0.01, buffer_threshold=5, cursor="",
fresh_final_after_seconds=0.001,
),
)
consumer.on_delta("Let me check a couple of things.")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta(None) # tool boundary 1
consumer.on_delta("Now cross-referencing.")
await asyncio.sleep(0.05)
consumer.on_delta(None) # tool boundary 2
consumer.on_delta("The answer is 42.") # genuine final answer
await asyncio.sleep(0.05)
consumer.finish()
await task
assert consumer.final_response_sent is True
final_sends = [
c for c in adapter.send.call_args_list
if "answer is 42" in c.kwargs.get("content", "")
]
assert len(final_sends) <= 1
assert any("answer is 42" in t for t in self._delivered_texts(adapter))
class TestStreamConsumerConfigFreshFinalField:
"""The dataclass field must exist and default to 0 (disabled)."""