From 2b5268f716c2a69ad451de0baed57138191ebebb Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 31 May 2026 06:14:32 -0700 Subject: [PATCH] revert: drop cumulative-resend tool-arg heuristic from shared streaming path (#35718) (#35860) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #35718 added a per-slot "cumulative-resend" latch to the universal streaming tool-call accumulator to fix DeepSeek / Baidu Qianfan (#35592). The latch fires when a delta is a strict superset of the accumulated buffer (len(_new) > len(_prev) and _new.startswith(_prev)) and then REPLACES the buffer instead of appending. That superset test is not an unambiguous cumulative signature. A normal incremental stream can emit a single fragment that restates an already- accumulated prefix — trivially common in large code-patch arguments with repeated lines / indentation — which trips the latch and clobbers the accumulated buffer, corrupting the tool call. Observed in the wild on Anthropic Opus (the primary model) building a large patch: corrupted / short arguments → finish_reason='length' dead-end → session killed. A guessing heuristic that can silently clobber a tool-call buffer has no place on the path every provider and model shares. Reverting restores the known-good plain `+=` accumulator. The #35592 narrow provider bug should be re-addressed provider-gated so it is structurally impossible to touch Anthropic / OpenAI incremental streams, rather than via a heuristic on the shared path. Reverts ca03486b6. --- agent/chat_completion_helpers.py | 45 +----- agent/message_sanitization.py | 67 --------- .../test_repair_tool_call_arguments.py | 47 ------ tests/run_agent/test_streaming.py | 137 ------------------ 4 files changed, 1 insertion(+), 295 deletions(-) diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index 0c051cd66..cc7427950 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -1750,12 +1750,6 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= # call starting at the same index and redirect it to a fresh slot. _last_id_at_idx: dict = {} # raw_index -> last seen non-empty id _active_slot_by_idx: dict = {} # raw_index -> current slot in tool_calls_acc - # Per-slot latch: set once a slot is positively identified as a - # cumulative-resend stream (a delta that is a strict superset of the - # accumulated buffer). Until latched, deltas are appended normally; - # after latching, the buffer is replaced and exact-duplicate deltas - # are dropped. See the argument-accumulation block below (#35592). - _cumulative_args_slot: set = set() finish_reason = None model_name = None role = "assistant" @@ -1873,44 +1867,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= # Vercel AI patterns) is immune to this. entry["function"]["name"] = tc_delta.function.name if tc_delta.function.arguments: - # Argument deltas are normally incremental - # fragments (OpenAI spec), so the default is to - # concatenate. But some OpenAI-compatible - # providers (DeepSeek / Baidu Qianfan, #35592) - # operate in *cumulative* mode: each chunk - # resends the full arguments-so-far instead of - # the new fragment. Blind += turns that into - # '{...}{...}{...}', corrupting the tool call. - # - # Detect cumulative mode per-slot: in cumulative - # mode the new delta is a superset that starts - # with everything accumulated so far (monotonic - # growth), and an exact resend equals it. - # Incremental fragments are JSON suffixes that do - # NOT restate the accumulated prefix, so this is - # unambiguous on the full buffer (not a partial - # per-chunk guess). - _new = tc_delta.function.arguments - _prev = entry["function"]["arguments"] - if not _prev: - entry["function"]["arguments"] = _new - elif len(_new) > len(_prev) and _new.startswith(_prev): - # Strict superset of the accumulated buffer — - # the unambiguous cumulative-resend signature. - # Latch the slot and replace (don't append). - _cumulative_args_slot.add(idx) - entry["function"]["arguments"] = _new - elif idx in _cumulative_args_slot and _new == _prev: - # Already a confirmed cumulative slot and this - # is an exact full resend — drop the duplicate. - pass - else: - # Incremental fragment — normal append. Note - # an exact-equal delta on a NON-latched slot is - # treated as a real fragment, never silently - # dropped, so genuine incremental streams are - # untouched. - entry["function"]["arguments"] = _prev + _new + entry["function"]["arguments"] += tc_delta.function.arguments extra = getattr(tc_delta, "extra_content", None) if extra is None and hasattr(tc_delta, "model_extra"): extra = (tc_delta.model_extra or {}).get("extra_content") diff --git a/agent/message_sanitization.py b/agent/message_sanitization.py index ce65cf80d..ff53d247a 100644 --- a/agent/message_sanitization.py +++ b/agent/message_sanitization.py @@ -182,58 +182,6 @@ def _escape_invalid_chars_in_json_strings(raw: str) -> str: return "".join(out) -def _collapse_repeated_json_arguments(raw_stripped: str) -> str | None: - """Collapse concatenated-duplicate tool-call argument JSON. - - Some OpenAI-compatible streaming providers (DeepSeek / Baidu Qianfan - among them) resend the *full* cumulative ``arguments`` string in every - stream chunk instead of incremental fragments. Our stream accumulator - concatenates argument deltas with ``+=`` (correct for spec-compliant - incremental providers), so a cumulative-resend provider yields the - object repeated once per chunk: - - '{"path":"x"}{"path":"x"}{"path":"x"}...' - - A spec-compliant provider always produces exactly one valid JSON object - after concatenation, so this function is only ever reached for strings - that already fail ``json.loads`` (issue #35592). - - Detection is unambiguous and operates on the fully-accumulated string, - not on partial per-chunk data: the input must be K>=2 *exact* repeats of - a unit substring that itself parses as valid JSON. Returns the single - collapsed unit (re-serialised compactly) when that holds, else ``None`` - so the caller falls through to the generic repair passes. - - Safety: a single object like ``{"command":{"command":"x"}}`` parses on - the first ``json.loads`` attempt in the caller and never reaches here; - even if it did, it is not an exact-repeat string, so this returns - ``None`` and leaves it untouched. - """ - n = len(raw_stripped) - if n < 2: - return None - # The unit must divide the total length evenly for an exact K-repeat. - # Try the smallest plausible unit first (divisors of n, ascending), - # capping the repeat count so a pathological input can't blow up. - for unit_len in range(1, n // 2 + 1): - if n % unit_len != 0: - continue - k = n // unit_len - if k < 2: - continue - unit = raw_stripped[:unit_len] - if unit * k != raw_stripped: - continue - # The repeated unit must itself be valid JSON for this to be a - # cumulative-resend collapse and not a coincidental string repeat. - try: - parsed = json.loads(unit, strict=False) - except (json.JSONDecodeError, ValueError): - continue - return json.dumps(parsed, separators=(",", ":")) - return None - - def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str: """Attempt to repair malformed tool_call argument JSON. @@ -272,20 +220,6 @@ def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str: except (json.JSONDecodeError, TypeError, ValueError): pass - # Repair pass 1: collapse concatenated-duplicate JSON. Cumulative-resend - # streaming providers (DeepSeek / Baidu Qianfan, #35592) yield the full - # arguments object repeated once per stream chunk; our += accumulator - # turns that into '{...}{...}{...}'. Only reached when pass 0 above - # already failed, so spec-compliant single objects are never touched. - collapsed = _collapse_repeated_json_arguments(raw_stripped) - if collapsed is not None: - logger.warning( - "Collapsed cumulative-resend duplicate tool_call arguments for " - "%s (%d chars -> %d)", - tool_name, len(raw_stripped), len(collapsed), - ) - return collapsed - # Attempt common JSON repairs fixed = raw_stripped # 1. Strip trailing commas before } or ] @@ -502,7 +436,6 @@ __all__ = [ "_sanitize_messages_surrogates", "_escape_invalid_chars_in_json_strings", "_repair_tool_call_arguments", - "_collapse_repeated_json_arguments", "_strip_non_ascii", "_sanitize_messages_non_ascii", "_sanitize_tools_non_ascii", diff --git a/tests/run_agent/test_repair_tool_call_arguments.py b/tests/run_agent/test_repair_tool_call_arguments.py index e588782e4..dcd98b5ac 100644 --- a/tests/run_agent/test_repair_tool_call_arguments.py +++ b/tests/run_agent/test_repair_tool_call_arguments.py @@ -140,50 +140,3 @@ class TestRepairToolCallArguments: parsed = json.loads(result) assert "line" in parsed["msg"] - # -- Stage 1b: cumulative-resend duplicate collapse (#35592) -- - - def test_collapse_duplicate_object_2x(self): - """Two concatenated identical objects collapse to one.""" - raw = '{"path":"x"}{"path":"x"}' - result = _repair_tool_call_arguments(raw, "search_files") - assert json.loads(result) == {"path": "x"} - - def test_collapse_duplicate_object_many(self): - """K concatenated identical objects collapse to one.""" - raw = '{"a":1,"b":2}' * 100 - result = _repair_tool_call_arguments(raw, "t") - assert json.loads(result) == {"a": 1, "b": 2} - - def test_collapse_nested_object_repeat(self): - """Repeated nested-key object collapses correctly.""" - raw = '{"command":{"command":"x"}}' * 3 - result = _repair_tool_call_arguments(raw, "t") - assert json.loads(result) == {"command": {"command": "x"}} - - def test_single_object_not_touched_by_collapse(self): - """A clean single object never enters the collapse path.""" - raw = '{"path": "x.py"}' - result = _repair_tool_call_arguments(raw, "t") - assert json.loads(result) == {"path": "x.py"} - - def test_single_nested_object_not_corrupted(self): - """Nested-key single object is NOT mistaken for a repeat (safety).""" - raw = '{"command":{"command":"x"}}' - result = _repair_tool_call_arguments(raw, "t") - assert json.loads(result) == {"command": {"command": "x"}} - - def test_two_different_objects_not_collapsed(self): - """Distinct concatenated objects are not a clean repeat — collapse - declines and the generic repair handles it (returns '{}').""" - from agent.message_sanitization import _collapse_repeated_json_arguments - assert _collapse_repeated_json_arguments('{"a":1}{"b":2}') is None - - def test_collapse_helper_returns_none_for_valid_single(self): - from agent.message_sanitization import _collapse_repeated_json_arguments - assert _collapse_repeated_json_arguments('{"a":1}') is None - - def test_collapse_helper_returns_none_for_repeated_non_json(self): - from agent.message_sanitization import _collapse_repeated_json_arguments - # 'abab' repeats 'ab' but 'ab' is not valid JSON. - assert _collapse_repeated_json_arguments('abab') is None - diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index cc9b2260f..79c241adf 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -184,143 +184,6 @@ class TestStreamingAccumulator: assert tc[0].function.name == "read_file" assert tc[0].function.arguments == '{"path": "x.py"}' - @patch("run_agent.AIAgent._create_request_openai_client") - @patch("run_agent.AIAgent._close_request_openai_client") - def test_args_not_duplicated_on_cumulative_growing_resend(self, mock_close, mock_create): - """DeepSeek / Baidu Qianfan stream args in cumulative mode (#35592). - - Each chunk resends the full arguments-so-far (monotonic growth) - instead of the new fragment. Blind += produced a duplicated - '{...}{...}' string that failed json.loads and got nuked to '{}'. - The per-slot cumulative latch must replace (not append) so the - final arguments are a single valid object. - """ - import json as _json - from run_agent import AIAgent - - full = '{"pattern": "def handle", "path": "gateway", "output_mode": "content"}' - # Growing prefixes, then identical full resends at the tail. - steps = [full[:12], full[:30], full[:48], full, full, full] - chunks = [_make_stream_chunk(tool_calls=[ - _make_tool_call_delta(index=0, tc_id="call_qf", name="search_files") - ])] - for s in steps: - chunks.append(_make_stream_chunk(tool_calls=[ - _make_tool_call_delta(index=0, tc_id="call_qf", name="search_files", arguments=s) - ])) - chunks.append(_make_stream_chunk(finish_reason="tool_calls")) - - mock_client = MagicMock() - mock_client.chat.completions.create.return_value = iter(chunks) - mock_create.return_value = mock_client - - agent = AIAgent( - api_key="test-key", - base_url="https://qianfan.baidubce.com/v2", - model="deepseek-v4-pro", - quiet_mode=True, - skip_context_files=True, - skip_memory=True, - ) - agent.api_mode = "chat_completions" - agent._interrupt_requested = False - - response = agent._interruptible_streaming_api_call({}) - - tc = response.choices[0].message.tool_calls - assert tc is not None and len(tc) == 1 - assert _json.loads(tc[0].function.arguments) == _json.loads(full) - # Not flagged as truncated (which would set finish_reason='length'). - assert response.choices[0].finish_reason == "tool_calls" - - @patch("run_agent.AIAgent._create_request_openai_client") - @patch("run_agent.AIAgent._close_request_openai_client") - def test_args_identical_full_resend_collapsed_by_backstop(self, mock_close, mock_create): - """Provider resends the COMPLETE object identically from chunk 1. - - There is never a strict superset to latch on, so the per-chunk - guard appends — but the post-loop _collapse_repeated_json_arguments - backstop must collapse the K-repeat to one valid object (#35592). - """ - import json as _json - from run_agent import AIAgent - - full = '{"action": "replace", "old_text": "a", "new_text": "b"}' - chunks = [_make_stream_chunk(tool_calls=[ - _make_tool_call_delta(index=0, tc_id="call_id", name="memory") - ])] - for _ in range(5): - chunks.append(_make_stream_chunk(tool_calls=[ - _make_tool_call_delta(index=0, tc_id="call_id", name="memory", arguments=full) - ])) - chunks.append(_make_stream_chunk(finish_reason="tool_calls")) - - mock_client = MagicMock() - mock_client.chat.completions.create.return_value = iter(chunks) - mock_create.return_value = mock_client - - agent = AIAgent( - api_key="test-key", - base_url="https://qianfan.baidubce.com/v2", - model="deepseek-v4-pro", - quiet_mode=True, - skip_context_files=True, - skip_memory=True, - ) - agent.api_mode = "chat_completions" - agent._interrupt_requested = False - - response = agent._interruptible_streaming_api_call({}) - - tc = response.choices[0].message.tool_calls - assert tc is not None and len(tc) == 1 - assert _json.loads(tc[0].function.arguments) == _json.loads(full) - - @patch("run_agent.AIAgent._create_request_openai_client") - @patch("run_agent.AIAgent._close_request_openai_client") - def test_incremental_args_with_duplicate_leading_fragment(self, mock_close, mock_create): - """Genuine incremental stream whose 2nd fragment equals the 1st. - - Safety guard: an exact-equal delta on a NON-latched slot must be - treated as a real fragment and appended, never silently dropped. - Verifies the cumulative-resend fix does not corrupt nested-key - objects (the false-positive class flagged in review of #35592). - """ - from run_agent import AIAgent - - # Fragments deliberately repeat the leading '{"command":' substring. - frags = ['{"command":', '{"command":', ' "x"}', '}'] - chunks = [_make_stream_chunk(tool_calls=[ - _make_tool_call_delta(index=0, tc_id="c3", name="t") - ])] - for f in frags: - chunks.append(_make_stream_chunk(tool_calls=[ - _make_tool_call_delta(index=0, tc_id="c3", name="t", arguments=f) - ])) - chunks.append(_make_stream_chunk(finish_reason="tool_calls")) - - mock_client = MagicMock() - mock_client.chat.completions.create.return_value = iter(chunks) - mock_create.return_value = mock_client - - agent = AIAgent( - api_key="test-key", - base_url="https://openrouter.ai/api/v1", - model="test/model", - quiet_mode=True, - skip_context_files=True, - skip_memory=True, - ) - agent.api_mode = "chat_completions" - agent._interrupt_requested = False - - response = agent._interruptible_streaming_api_call({}) - - tc = response.choices[0].message.tool_calls - assert tc is not None and len(tc) == 1 - # All four fragments concatenated — nothing dropped. - assert tc[0].function.arguments == '{"command":{"command": "x"}}' - @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") def test_tool_call_extra_content_preserved(self, mock_close, mock_create):