From ca03486b6a5a86e2be28d83d4cad61770619e7fb Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 31 May 2026 00:19:39 -0700 Subject: [PATCH] fix(streaming): stop duplicating tool-call args from cumulative-resend providers (#35718) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DeepSeek / Baidu Qianfan stream tool-call arguments in cumulative mode: each chunk resends the full arguments-so-far instead of the new fragment. The stream accumulator blindly concatenated arg deltas with +=, turning that into '{...}{...}{...}', which failed json.loads and got nuked to '{}' — a silently corrupted tool call (#35592). Worse on multi-param tools (search_files, session_search, memory replace) because longer args take more chunks, giving more resend opportunities. - Per-slot cumulative latch in the stream accumulator: a delta that is a strict superset of the accumulated buffer marks the slot cumulative and replaces (not appends); exact duplicates are dropped only after latching. Incremental fragments are untouched (default += path). - Backstop _collapse_repeated_json_arguments() in the repair pipeline collapses pure identical-resend buffers (K exact repeats of a valid-JSON unit) for providers that resend the complete object from chunk 1. Only reached after json.loads already failed, so compliant single objects are never touched. Not a gateway or DeepSeek-model bug — any OpenAI-wire provider in cumulative streaming mode is affected. --- agent/chat_completion_helpers.py | 45 +++++- agent/message_sanitization.py | 67 +++++++++ .../test_repair_tool_call_arguments.py | 47 ++++++ tests/run_agent/test_streaming.py | 137 ++++++++++++++++++ 4 files changed, 295 insertions(+), 1 deletion(-) diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index cc7427950..0c051cd66 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -1750,6 +1750,12 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= # call starting at the same index and redirect it to a fresh slot. _last_id_at_idx: dict = {} # raw_index -> last seen non-empty id _active_slot_by_idx: dict = {} # raw_index -> current slot in tool_calls_acc + # Per-slot latch: set once a slot is positively identified as a + # cumulative-resend stream (a delta that is a strict superset of the + # accumulated buffer). Until latched, deltas are appended normally; + # after latching, the buffer is replaced and exact-duplicate deltas + # are dropped. See the argument-accumulation block below (#35592). + _cumulative_args_slot: set = set() finish_reason = None model_name = None role = "assistant" @@ -1867,7 +1873,44 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= # Vercel AI patterns) is immune to this. entry["function"]["name"] = tc_delta.function.name if tc_delta.function.arguments: - entry["function"]["arguments"] += tc_delta.function.arguments + # Argument deltas are normally incremental + # fragments (OpenAI spec), so the default is to + # concatenate. But some OpenAI-compatible + # providers (DeepSeek / Baidu Qianfan, #35592) + # operate in *cumulative* mode: each chunk + # resends the full arguments-so-far instead of + # the new fragment. Blind += turns that into + # '{...}{...}{...}', corrupting the tool call. + # + # Detect cumulative mode per-slot: in cumulative + # mode the new delta is a superset that starts + # with everything accumulated so far (monotonic + # growth), and an exact resend equals it. + # Incremental fragments are JSON suffixes that do + # NOT restate the accumulated prefix, so this is + # unambiguous on the full buffer (not a partial + # per-chunk guess). + _new = tc_delta.function.arguments + _prev = entry["function"]["arguments"] + if not _prev: + entry["function"]["arguments"] = _new + elif len(_new) > len(_prev) and _new.startswith(_prev): + # Strict superset of the accumulated buffer — + # the unambiguous cumulative-resend signature. + # Latch the slot and replace (don't append). + _cumulative_args_slot.add(idx) + entry["function"]["arguments"] = _new + elif idx in _cumulative_args_slot and _new == _prev: + # Already a confirmed cumulative slot and this + # is an exact full resend — drop the duplicate. + pass + else: + # Incremental fragment — normal append. Note + # an exact-equal delta on a NON-latched slot is + # treated as a real fragment, never silently + # dropped, so genuine incremental streams are + # untouched. + entry["function"]["arguments"] = _prev + _new extra = getattr(tc_delta, "extra_content", None) if extra is None and hasattr(tc_delta, "model_extra"): extra = (tc_delta.model_extra or {}).get("extra_content") diff --git a/agent/message_sanitization.py b/agent/message_sanitization.py index ff53d247a..ce65cf80d 100644 --- a/agent/message_sanitization.py +++ b/agent/message_sanitization.py @@ -182,6 +182,58 @@ def _escape_invalid_chars_in_json_strings(raw: str) -> str: return "".join(out) +def _collapse_repeated_json_arguments(raw_stripped: str) -> str | None: + """Collapse concatenated-duplicate tool-call argument JSON. + + Some OpenAI-compatible streaming providers (DeepSeek / Baidu Qianfan + among them) resend the *full* cumulative ``arguments`` string in every + stream chunk instead of incremental fragments. Our stream accumulator + concatenates argument deltas with ``+=`` (correct for spec-compliant + incremental providers), so a cumulative-resend provider yields the + object repeated once per chunk: + + '{"path":"x"}{"path":"x"}{"path":"x"}...' + + A spec-compliant provider always produces exactly one valid JSON object + after concatenation, so this function is only ever reached for strings + that already fail ``json.loads`` (issue #35592). + + Detection is unambiguous and operates on the fully-accumulated string, + not on partial per-chunk data: the input must be K>=2 *exact* repeats of + a unit substring that itself parses as valid JSON. Returns the single + collapsed unit (re-serialised compactly) when that holds, else ``None`` + so the caller falls through to the generic repair passes. + + Safety: a single object like ``{"command":{"command":"x"}}`` parses on + the first ``json.loads`` attempt in the caller and never reaches here; + even if it did, it is not an exact-repeat string, so this returns + ``None`` and leaves it untouched. + """ + n = len(raw_stripped) + if n < 2: + return None + # The unit must divide the total length evenly for an exact K-repeat. + # Try the smallest plausible unit first (divisors of n, ascending), + # capping the repeat count so a pathological input can't blow up. + for unit_len in range(1, n // 2 + 1): + if n % unit_len != 0: + continue + k = n // unit_len + if k < 2: + continue + unit = raw_stripped[:unit_len] + if unit * k != raw_stripped: + continue + # The repeated unit must itself be valid JSON for this to be a + # cumulative-resend collapse and not a coincidental string repeat. + try: + parsed = json.loads(unit, strict=False) + except (json.JSONDecodeError, ValueError): + continue + return json.dumps(parsed, separators=(",", ":")) + return None + + def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str: """Attempt to repair malformed tool_call argument JSON. @@ -220,6 +272,20 @@ def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str: except (json.JSONDecodeError, TypeError, ValueError): pass + # Repair pass 1: collapse concatenated-duplicate JSON. Cumulative-resend + # streaming providers (DeepSeek / Baidu Qianfan, #35592) yield the full + # arguments object repeated once per stream chunk; our += accumulator + # turns that into '{...}{...}{...}'. Only reached when pass 0 above + # already failed, so spec-compliant single objects are never touched. + collapsed = _collapse_repeated_json_arguments(raw_stripped) + if collapsed is not None: + logger.warning( + "Collapsed cumulative-resend duplicate tool_call arguments for " + "%s (%d chars -> %d)", + tool_name, len(raw_stripped), len(collapsed), + ) + return collapsed + # Attempt common JSON repairs fixed = raw_stripped # 1. Strip trailing commas before } or ] @@ -436,6 +502,7 @@ __all__ = [ "_sanitize_messages_surrogates", "_escape_invalid_chars_in_json_strings", "_repair_tool_call_arguments", + "_collapse_repeated_json_arguments", "_strip_non_ascii", "_sanitize_messages_non_ascii", "_sanitize_tools_non_ascii", diff --git a/tests/run_agent/test_repair_tool_call_arguments.py b/tests/run_agent/test_repair_tool_call_arguments.py index dcd98b5ac..e588782e4 100644 --- a/tests/run_agent/test_repair_tool_call_arguments.py +++ b/tests/run_agent/test_repair_tool_call_arguments.py @@ -140,3 +140,50 @@ class TestRepairToolCallArguments: parsed = json.loads(result) assert "line" in parsed["msg"] + # -- Stage 1b: cumulative-resend duplicate collapse (#35592) -- + + def test_collapse_duplicate_object_2x(self): + """Two concatenated identical objects collapse to one.""" + raw = '{"path":"x"}{"path":"x"}' + result = _repair_tool_call_arguments(raw, "search_files") + assert json.loads(result) == {"path": "x"} + + def test_collapse_duplicate_object_many(self): + """K concatenated identical objects collapse to one.""" + raw = '{"a":1,"b":2}' * 100 + result = _repair_tool_call_arguments(raw, "t") + assert json.loads(result) == {"a": 1, "b": 2} + + def test_collapse_nested_object_repeat(self): + """Repeated nested-key object collapses correctly.""" + raw = '{"command":{"command":"x"}}' * 3 + result = _repair_tool_call_arguments(raw, "t") + assert json.loads(result) == {"command": {"command": "x"}} + + def test_single_object_not_touched_by_collapse(self): + """A clean single object never enters the collapse path.""" + raw = '{"path": "x.py"}' + result = _repair_tool_call_arguments(raw, "t") + assert json.loads(result) == {"path": "x.py"} + + def test_single_nested_object_not_corrupted(self): + """Nested-key single object is NOT mistaken for a repeat (safety).""" + raw = '{"command":{"command":"x"}}' + result = _repair_tool_call_arguments(raw, "t") + assert json.loads(result) == {"command": {"command": "x"}} + + def test_two_different_objects_not_collapsed(self): + """Distinct concatenated objects are not a clean repeat — collapse + declines and the generic repair handles it (returns '{}').""" + from agent.message_sanitization import _collapse_repeated_json_arguments + assert _collapse_repeated_json_arguments('{"a":1}{"b":2}') is None + + def test_collapse_helper_returns_none_for_valid_single(self): + from agent.message_sanitization import _collapse_repeated_json_arguments + assert _collapse_repeated_json_arguments('{"a":1}') is None + + def test_collapse_helper_returns_none_for_repeated_non_json(self): + from agent.message_sanitization import _collapse_repeated_json_arguments + # 'abab' repeats 'ab' but 'ab' is not valid JSON. + assert _collapse_repeated_json_arguments('abab') is None + diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index 79c241adf..cc9b2260f 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -184,6 +184,143 @@ class TestStreamingAccumulator: assert tc[0].function.name == "read_file" assert tc[0].function.arguments == '{"path": "x.py"}' + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_args_not_duplicated_on_cumulative_growing_resend(self, mock_close, mock_create): + """DeepSeek / Baidu Qianfan stream args in cumulative mode (#35592). + + Each chunk resends the full arguments-so-far (monotonic growth) + instead of the new fragment. Blind += produced a duplicated + '{...}{...}' string that failed json.loads and got nuked to '{}'. + The per-slot cumulative latch must replace (not append) so the + final arguments are a single valid object. + """ + import json as _json + from run_agent import AIAgent + + full = '{"pattern": "def handle", "path": "gateway", "output_mode": "content"}' + # Growing prefixes, then identical full resends at the tail. + steps = [full[:12], full[:30], full[:48], full, full, full] + chunks = [_make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_qf", name="search_files") + ])] + for s in steps: + chunks.append(_make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_qf", name="search_files", arguments=s) + ])) + chunks.append(_make_stream_chunk(finish_reason="tool_calls")) + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://qianfan.baidubce.com/v2", + model="deepseek-v4-pro", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + tc = response.choices[0].message.tool_calls + assert tc is not None and len(tc) == 1 + assert _json.loads(tc[0].function.arguments) == _json.loads(full) + # Not flagged as truncated (which would set finish_reason='length'). + assert response.choices[0].finish_reason == "tool_calls" + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_args_identical_full_resend_collapsed_by_backstop(self, mock_close, mock_create): + """Provider resends the COMPLETE object identically from chunk 1. + + There is never a strict superset to latch on, so the per-chunk + guard appends — but the post-loop _collapse_repeated_json_arguments + backstop must collapse the K-repeat to one valid object (#35592). + """ + import json as _json + from run_agent import AIAgent + + full = '{"action": "replace", "old_text": "a", "new_text": "b"}' + chunks = [_make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_id", name="memory") + ])] + for _ in range(5): + chunks.append(_make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_id", name="memory", arguments=full) + ])) + chunks.append(_make_stream_chunk(finish_reason="tool_calls")) + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://qianfan.baidubce.com/v2", + model="deepseek-v4-pro", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + tc = response.choices[0].message.tool_calls + assert tc is not None and len(tc) == 1 + assert _json.loads(tc[0].function.arguments) == _json.loads(full) + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_incremental_args_with_duplicate_leading_fragment(self, mock_close, mock_create): + """Genuine incremental stream whose 2nd fragment equals the 1st. + + Safety guard: an exact-equal delta on a NON-latched slot must be + treated as a real fragment and appended, never silently dropped. + Verifies the cumulative-resend fix does not corrupt nested-key + objects (the false-positive class flagged in review of #35592). + """ + from run_agent import AIAgent + + # Fragments deliberately repeat the leading '{"command":' substring. + frags = ['{"command":', '{"command":', ' "x"}', '}'] + chunks = [_make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="c3", name="t") + ])] + for f in frags: + chunks.append(_make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="c3", name="t", arguments=f) + ])) + chunks.append(_make_stream_chunk(finish_reason="tool_calls")) + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + tc = response.choices[0].message.tool_calls + assert tc is not None and len(tc) == 1 + # All four fragments concatenated — nothing dropped. + assert tc[0].function.arguments == '{"command":{"command": "x"}}' + @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") def test_tool_call_extra_content_preserved(self, mock_close, mock_create):