PR #35718 added a per-slot "cumulative-resend" latch to the universal
streaming tool-call accumulator to fix DeepSeek / Baidu Qianfan (#35592).
The latch fires when a delta is a strict superset of the accumulated
buffer (len(_new) > len(_prev) and _new.startswith(_prev)) and then
REPLACES the buffer instead of appending.
That superset test is not an unambiguous cumulative signature. A normal
incremental stream can emit a single fragment that restates an already-
accumulated prefix — trivially common in large code-patch arguments with
repeated lines / indentation — which trips the latch and clobbers the
accumulated buffer, corrupting the tool call. Observed in the wild on
Anthropic Opus (the primary model) building a large patch: corrupted /
short arguments → finish_reason='length' dead-end → session killed.
A guessing heuristic that can silently clobber a tool-call buffer has no
place on the path every provider and model shares. Reverting restores the
known-good plain `+=` accumulator. The #35592 narrow provider bug should
be re-addressed provider-gated so it is structurally impossible to touch
Anthropic / OpenAI incremental streams, rather than via a heuristic on the
shared path.
Reverts ca03486b6.
This commit is contained in:
@ -1750,12 +1750,6 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||
# call starting at the same index and redirect it to a fresh slot.
|
||||
_last_id_at_idx: dict = {} # raw_index -> last seen non-empty id
|
||||
_active_slot_by_idx: dict = {} # raw_index -> current slot in tool_calls_acc
|
||||
# Per-slot latch: set once a slot is positively identified as a
|
||||
# cumulative-resend stream (a delta that is a strict superset of the
|
||||
# accumulated buffer). Until latched, deltas are appended normally;
|
||||
# after latching, the buffer is replaced and exact-duplicate deltas
|
||||
# are dropped. See the argument-accumulation block below (#35592).
|
||||
_cumulative_args_slot: set = set()
|
||||
finish_reason = None
|
||||
model_name = None
|
||||
role = "assistant"
|
||||
@ -1873,44 +1867,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||
# Vercel AI patterns) is immune to this.
|
||||
entry["function"]["name"] = tc_delta.function.name
|
||||
if tc_delta.function.arguments:
|
||||
# Argument deltas are normally incremental
|
||||
# fragments (OpenAI spec), so the default is to
|
||||
# concatenate. But some OpenAI-compatible
|
||||
# providers (DeepSeek / Baidu Qianfan, #35592)
|
||||
# operate in *cumulative* mode: each chunk
|
||||
# resends the full arguments-so-far instead of
|
||||
# the new fragment. Blind += turns that into
|
||||
# '{...}{...}{...}', corrupting the tool call.
|
||||
#
|
||||
# Detect cumulative mode per-slot: in cumulative
|
||||
# mode the new delta is a superset that starts
|
||||
# with everything accumulated so far (monotonic
|
||||
# growth), and an exact resend equals it.
|
||||
# Incremental fragments are JSON suffixes that do
|
||||
# NOT restate the accumulated prefix, so this is
|
||||
# unambiguous on the full buffer (not a partial
|
||||
# per-chunk guess).
|
||||
_new = tc_delta.function.arguments
|
||||
_prev = entry["function"]["arguments"]
|
||||
if not _prev:
|
||||
entry["function"]["arguments"] = _new
|
||||
elif len(_new) > len(_prev) and _new.startswith(_prev):
|
||||
# Strict superset of the accumulated buffer —
|
||||
# the unambiguous cumulative-resend signature.
|
||||
# Latch the slot and replace (don't append).
|
||||
_cumulative_args_slot.add(idx)
|
||||
entry["function"]["arguments"] = _new
|
||||
elif idx in _cumulative_args_slot and _new == _prev:
|
||||
# Already a confirmed cumulative slot and this
|
||||
# is an exact full resend — drop the duplicate.
|
||||
pass
|
||||
else:
|
||||
# Incremental fragment — normal append. Note
|
||||
# an exact-equal delta on a NON-latched slot is
|
||||
# treated as a real fragment, never silently
|
||||
# dropped, so genuine incremental streams are
|
||||
# untouched.
|
||||
entry["function"]["arguments"] = _prev + _new
|
||||
entry["function"]["arguments"] += tc_delta.function.arguments
|
||||
extra = getattr(tc_delta, "extra_content", None)
|
||||
if extra is None and hasattr(tc_delta, "model_extra"):
|
||||
extra = (tc_delta.model_extra or {}).get("extra_content")
|
||||
|
||||
@ -182,58 +182,6 @@ def _escape_invalid_chars_in_json_strings(raw: str) -> str:
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def _collapse_repeated_json_arguments(raw_stripped: str) -> str | None:
|
||||
"""Collapse concatenated-duplicate tool-call argument JSON.
|
||||
|
||||
Some OpenAI-compatible streaming providers (DeepSeek / Baidu Qianfan
|
||||
among them) resend the *full* cumulative ``arguments`` string in every
|
||||
stream chunk instead of incremental fragments. Our stream accumulator
|
||||
concatenates argument deltas with ``+=`` (correct for spec-compliant
|
||||
incremental providers), so a cumulative-resend provider yields the
|
||||
object repeated once per chunk:
|
||||
|
||||
'{"path":"x"}{"path":"x"}{"path":"x"}...'
|
||||
|
||||
A spec-compliant provider always produces exactly one valid JSON object
|
||||
after concatenation, so this function is only ever reached for strings
|
||||
that already fail ``json.loads`` (issue #35592).
|
||||
|
||||
Detection is unambiguous and operates on the fully-accumulated string,
|
||||
not on partial per-chunk data: the input must be K>=2 *exact* repeats of
|
||||
a unit substring that itself parses as valid JSON. Returns the single
|
||||
collapsed unit (re-serialised compactly) when that holds, else ``None``
|
||||
so the caller falls through to the generic repair passes.
|
||||
|
||||
Safety: a single object like ``{"command":{"command":"x"}}`` parses on
|
||||
the first ``json.loads`` attempt in the caller and never reaches here;
|
||||
even if it did, it is not an exact-repeat string, so this returns
|
||||
``None`` and leaves it untouched.
|
||||
"""
|
||||
n = len(raw_stripped)
|
||||
if n < 2:
|
||||
return None
|
||||
# The unit must divide the total length evenly for an exact K-repeat.
|
||||
# Try the smallest plausible unit first (divisors of n, ascending),
|
||||
# capping the repeat count so a pathological input can't blow up.
|
||||
for unit_len in range(1, n // 2 + 1):
|
||||
if n % unit_len != 0:
|
||||
continue
|
||||
k = n // unit_len
|
||||
if k < 2:
|
||||
continue
|
||||
unit = raw_stripped[:unit_len]
|
||||
if unit * k != raw_stripped:
|
||||
continue
|
||||
# The repeated unit must itself be valid JSON for this to be a
|
||||
# cumulative-resend collapse and not a coincidental string repeat.
|
||||
try:
|
||||
parsed = json.loads(unit, strict=False)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
return json.dumps(parsed, separators=(",", ":"))
|
||||
return None
|
||||
|
||||
|
||||
def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str:
|
||||
"""Attempt to repair malformed tool_call argument JSON.
|
||||
|
||||
@ -272,20 +220,6 @@ def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str:
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
|
||||
# Repair pass 1: collapse concatenated-duplicate JSON. Cumulative-resend
|
||||
# streaming providers (DeepSeek / Baidu Qianfan, #35592) yield the full
|
||||
# arguments object repeated once per stream chunk; our += accumulator
|
||||
# turns that into '{...}{...}{...}'. Only reached when pass 0 above
|
||||
# already failed, so spec-compliant single objects are never touched.
|
||||
collapsed = _collapse_repeated_json_arguments(raw_stripped)
|
||||
if collapsed is not None:
|
||||
logger.warning(
|
||||
"Collapsed cumulative-resend duplicate tool_call arguments for "
|
||||
"%s (%d chars -> %d)",
|
||||
tool_name, len(raw_stripped), len(collapsed),
|
||||
)
|
||||
return collapsed
|
||||
|
||||
# Attempt common JSON repairs
|
||||
fixed = raw_stripped
|
||||
# 1. Strip trailing commas before } or ]
|
||||
@ -502,7 +436,6 @@ __all__ = [
|
||||
"_sanitize_messages_surrogates",
|
||||
"_escape_invalid_chars_in_json_strings",
|
||||
"_repair_tool_call_arguments",
|
||||
"_collapse_repeated_json_arguments",
|
||||
"_strip_non_ascii",
|
||||
"_sanitize_messages_non_ascii",
|
||||
"_sanitize_tools_non_ascii",
|
||||
|
||||
@ -140,50 +140,3 @@ class TestRepairToolCallArguments:
|
||||
parsed = json.loads(result)
|
||||
assert "line" in parsed["msg"]
|
||||
|
||||
# -- Stage 1b: cumulative-resend duplicate collapse (#35592) --
|
||||
|
||||
def test_collapse_duplicate_object_2x(self):
|
||||
"""Two concatenated identical objects collapse to one."""
|
||||
raw = '{"path":"x"}{"path":"x"}'
|
||||
result = _repair_tool_call_arguments(raw, "search_files")
|
||||
assert json.loads(result) == {"path": "x"}
|
||||
|
||||
def test_collapse_duplicate_object_many(self):
|
||||
"""K concatenated identical objects collapse to one."""
|
||||
raw = '{"a":1,"b":2}' * 100
|
||||
result = _repair_tool_call_arguments(raw, "t")
|
||||
assert json.loads(result) == {"a": 1, "b": 2}
|
||||
|
||||
def test_collapse_nested_object_repeat(self):
|
||||
"""Repeated nested-key object collapses correctly."""
|
||||
raw = '{"command":{"command":"x"}}' * 3
|
||||
result = _repair_tool_call_arguments(raw, "t")
|
||||
assert json.loads(result) == {"command": {"command": "x"}}
|
||||
|
||||
def test_single_object_not_touched_by_collapse(self):
|
||||
"""A clean single object never enters the collapse path."""
|
||||
raw = '{"path": "x.py"}'
|
||||
result = _repair_tool_call_arguments(raw, "t")
|
||||
assert json.loads(result) == {"path": "x.py"}
|
||||
|
||||
def test_single_nested_object_not_corrupted(self):
|
||||
"""Nested-key single object is NOT mistaken for a repeat (safety)."""
|
||||
raw = '{"command":{"command":"x"}}'
|
||||
result = _repair_tool_call_arguments(raw, "t")
|
||||
assert json.loads(result) == {"command": {"command": "x"}}
|
||||
|
||||
def test_two_different_objects_not_collapsed(self):
|
||||
"""Distinct concatenated objects are not a clean repeat — collapse
|
||||
declines and the generic repair handles it (returns '{}')."""
|
||||
from agent.message_sanitization import _collapse_repeated_json_arguments
|
||||
assert _collapse_repeated_json_arguments('{"a":1}{"b":2}') is None
|
||||
|
||||
def test_collapse_helper_returns_none_for_valid_single(self):
|
||||
from agent.message_sanitization import _collapse_repeated_json_arguments
|
||||
assert _collapse_repeated_json_arguments('{"a":1}') is None
|
||||
|
||||
def test_collapse_helper_returns_none_for_repeated_non_json(self):
|
||||
from agent.message_sanitization import _collapse_repeated_json_arguments
|
||||
# 'abab' repeats 'ab' but 'ab' is not valid JSON.
|
||||
assert _collapse_repeated_json_arguments('abab') is None
|
||||
|
||||
|
||||
@ -184,143 +184,6 @@ class TestStreamingAccumulator:
|
||||
assert tc[0].function.name == "read_file"
|
||||
assert tc[0].function.arguments == '{"path": "x.py"}'
|
||||
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_args_not_duplicated_on_cumulative_growing_resend(self, mock_close, mock_create):
|
||||
"""DeepSeek / Baidu Qianfan stream args in cumulative mode (#35592).
|
||||
|
||||
Each chunk resends the full arguments-so-far (monotonic growth)
|
||||
instead of the new fragment. Blind += produced a duplicated
|
||||
'{...}{...}' string that failed json.loads and got nuked to '{}'.
|
||||
The per-slot cumulative latch must replace (not append) so the
|
||||
final arguments are a single valid object.
|
||||
"""
|
||||
import json as _json
|
||||
from run_agent import AIAgent
|
||||
|
||||
full = '{"pattern": "def handle", "path": "gateway", "output_mode": "content"}'
|
||||
# Growing prefixes, then identical full resends at the tail.
|
||||
steps = [full[:12], full[:30], full[:48], full, full, full]
|
||||
chunks = [_make_stream_chunk(tool_calls=[
|
||||
_make_tool_call_delta(index=0, tc_id="call_qf", name="search_files")
|
||||
])]
|
||||
for s in steps:
|
||||
chunks.append(_make_stream_chunk(tool_calls=[
|
||||
_make_tool_call_delta(index=0, tc_id="call_qf", name="search_files", arguments=s)
|
||||
]))
|
||||
chunks.append(_make_stream_chunk(finish_reason="tool_calls"))
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.return_value = iter(chunks)
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
agent = AIAgent(
|
||||
api_key="test-key",
|
||||
base_url="https://qianfan.baidubce.com/v2",
|
||||
model="deepseek-v4-pro",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
|
||||
tc = response.choices[0].message.tool_calls
|
||||
assert tc is not None and len(tc) == 1
|
||||
assert _json.loads(tc[0].function.arguments) == _json.loads(full)
|
||||
# Not flagged as truncated (which would set finish_reason='length').
|
||||
assert response.choices[0].finish_reason == "tool_calls"
|
||||
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_args_identical_full_resend_collapsed_by_backstop(self, mock_close, mock_create):
|
||||
"""Provider resends the COMPLETE object identically from chunk 1.
|
||||
|
||||
There is never a strict superset to latch on, so the per-chunk
|
||||
guard appends — but the post-loop _collapse_repeated_json_arguments
|
||||
backstop must collapse the K-repeat to one valid object (#35592).
|
||||
"""
|
||||
import json as _json
|
||||
from run_agent import AIAgent
|
||||
|
||||
full = '{"action": "replace", "old_text": "a", "new_text": "b"}'
|
||||
chunks = [_make_stream_chunk(tool_calls=[
|
||||
_make_tool_call_delta(index=0, tc_id="call_id", name="memory")
|
||||
])]
|
||||
for _ in range(5):
|
||||
chunks.append(_make_stream_chunk(tool_calls=[
|
||||
_make_tool_call_delta(index=0, tc_id="call_id", name="memory", arguments=full)
|
||||
]))
|
||||
chunks.append(_make_stream_chunk(finish_reason="tool_calls"))
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.return_value = iter(chunks)
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
agent = AIAgent(
|
||||
api_key="test-key",
|
||||
base_url="https://qianfan.baidubce.com/v2",
|
||||
model="deepseek-v4-pro",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
|
||||
tc = response.choices[0].message.tool_calls
|
||||
assert tc is not None and len(tc) == 1
|
||||
assert _json.loads(tc[0].function.arguments) == _json.loads(full)
|
||||
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_incremental_args_with_duplicate_leading_fragment(self, mock_close, mock_create):
|
||||
"""Genuine incremental stream whose 2nd fragment equals the 1st.
|
||||
|
||||
Safety guard: an exact-equal delta on a NON-latched slot must be
|
||||
treated as a real fragment and appended, never silently dropped.
|
||||
Verifies the cumulative-resend fix does not corrupt nested-key
|
||||
objects (the false-positive class flagged in review of #35592).
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
|
||||
# Fragments deliberately repeat the leading '{"command":' substring.
|
||||
frags = ['{"command":', '{"command":', ' "x"}', '}']
|
||||
chunks = [_make_stream_chunk(tool_calls=[
|
||||
_make_tool_call_delta(index=0, tc_id="c3", name="t")
|
||||
])]
|
||||
for f in frags:
|
||||
chunks.append(_make_stream_chunk(tool_calls=[
|
||||
_make_tool_call_delta(index=0, tc_id="c3", name="t", arguments=f)
|
||||
]))
|
||||
chunks.append(_make_stream_chunk(finish_reason="tool_calls"))
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.return_value = iter(chunks)
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
agent = AIAgent(
|
||||
api_key="test-key",
|
||||
base_url="https://openrouter.ai/api/v1",
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
|
||||
tc = response.choices[0].message.tool_calls
|
||||
assert tc is not None and len(tc) == 1
|
||||
# All four fragments concatenated — nothing dropped.
|
||||
assert tc[0].function.arguments == '{"command":{"command": "x"}}'
|
||||
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_tool_call_extra_content_preserved(self, mock_close, mock_create):
|
||||
|
||||
Reference in New Issue
Block a user