From bede3cf12d1492043f4ca604fdb2158ffd6bc619 Mon Sep 17 00:00:00 2001 From: liuhao1024 Date: Sat, 30 May 2026 19:45:18 +0800 Subject: [PATCH] fix(tools): wrap _run_tool cleanup in finally to prevent interrupt state leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When _invoke_tool raises a BaseException (CancelledError, KeyboardInterrupt), the cleanup code at the end of _run_tool was bypassed because it sat outside the except block (which only catches Exception). ThreadPoolExecutor recycles thread IDs, so the leaked tid in _interrupted_threads poisons the next tool scheduled on that thread — it instantly aborts with 'Interrupted'. Move the discard + _set_interrupt(False) into a finally block so cleanup runs regardless of how the worker exits. Fixes #35309 --- agent/tool_executor.py | 59 +++++++++++++------------ tests/tools/test_interrupt.py | 81 +++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 27 deletions(-) diff --git a/agent/tool_executor.py b/agent/tool_executor.py index 1176d95c2..bbbd239df 100644 --- a/agent/tool_executor.py +++ b/agent/tool_executor.py @@ -306,33 +306,38 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe # submit site below (GHSA-qg5c-hvr5-hjgr, #13617). start = time.time() try: - result = agent._invoke_tool( - function_name, - function_args, - effective_task_id, - tool_call.id, - messages=messages, - pre_tool_block_checked=True, - ) - except Exception as tool_error: - result = f"Error executing tool '{function_name}': {tool_error}" - logger.error("_invoke_tool raised for %s: %s", function_name, tool_error, exc_info=True) - duration = time.time() - start - is_error, _ = _detect_tool_failure(function_name, result) - if is_error: - logger.info("tool %s failed (%.2fs): %s", function_name, duration, result[:200]) - else: - logger.info("tool %s completed (%.2fs, %d chars)", function_name, duration, len(result)) - results[index] = (function_name, function_args, result, duration, is_error, False) - # Tear down worker-tid tracking. Clear any interrupt bit we may - # have set so the next task scheduled onto this recycled tid - # starts with a clean slate. - with agent._tool_worker_threads_lock: - agent._tool_worker_threads.discard(_worker_tid) - try: - _ra()._set_interrupt(False, _worker_tid) - except Exception: - pass + try: + result = agent._invoke_tool( + function_name, + function_args, + effective_task_id, + tool_call.id, + messages=messages, + pre_tool_block_checked=True, + ) + except Exception as tool_error: + result = f"Error executing tool '{function_name}': {tool_error}" + logger.error("_invoke_tool raised for %s: %s", function_name, tool_error, exc_info=True) + duration = time.time() - start + is_error, _ = _detect_tool_failure(function_name, result) + if is_error: + logger.info("tool %s failed (%.2fs): %s", function_name, duration, result[:200]) + else: + logger.info("tool %s completed (%.2fs, %d chars)", function_name, duration, len(result)) + results[index] = (function_name, function_args, result, duration, is_error, False) + finally: + # Tear down worker-tid tracking. Clear any interrupt bit we may + # have set so the next task scheduled onto this recycled tid + # starts with a clean slate. This MUST be in a finally block + # because BaseException subclasses (CancelledError, KeyboardInterrupt) + # bypass ``except Exception`` and would otherwise leak the tid + # into _interrupted_threads, poisoning the recycled thread. + with agent._tool_worker_threads_lock: + agent._tool_worker_threads.discard(_worker_tid) + try: + _ra()._set_interrupt(False, _worker_tid) + except Exception: + pass # Start spinner for CLI mode (skip when TUI handles tool progress) spinner = None diff --git a/tests/tools/test_interrupt.py b/tests/tools/test_interrupt.py index 27c610231..02d8cbdff 100644 --- a/tests/tools/test_interrupt.py +++ b/tests/tools/test_interrupt.py @@ -203,6 +203,87 @@ class TestSIGKILLEscalation: assert "interrupted" in result_holder["value"]["output"].lower() +# --------------------------------------------------------------------------- +# Regression: _run_tool cleanup on BaseException (issue #35309) +# --------------------------------------------------------------------------- + +class TestRunToolCleanupOnBaseException: + """Verify that _run_tool cleans up _interrupted_threads even when + _invoke_tool raises a BaseException (e.g. CancelledError). + + Regression test for #35309: without the finally block, a BaseException + bypasses ``except Exception``, leaking the worker tid into + _interrupted_threads. ThreadPoolExecutor recycles tids, so the next + tool scheduled on the same thread is instantly "interrupted". + """ + + def test_cleanup_on_base_exception(self): + from unittest.mock import MagicMock, patch + import types + from tools.interrupt import set_interrupt, is_interrupted, _interrupted_threads, _lock + + # Clear global state + with _lock: + _interrupted_threads.clear() + + # Build a minimal mock agent with the attributes _run_tool needs + agent = MagicMock() + agent._interrupt_requested = False + agent._tool_worker_threads = set() + agent._tool_worker_threads_lock = threading.Lock() + + # _set_interrupt delegates to the real module + def _mock_set_interrupt(active, tid=None): + set_interrupt(active, tid) + agent._set_interrupt = _mock_set_interrupt + + # _invoke_tool raises BaseException (simulating CancelledError) + agent._invoke_tool = MagicMock(side_effect=BaseException("simulated CancelledError")) + + # Bind the real concurrent method so we get _run_tool + from run_agent import AIAgent + agent._execute_tool_calls_concurrent = types.MethodType( + AIAgent._execute_tool_calls_concurrent, agent + ) + + # Build a single tool call + tc = MagicMock() + tc.id = "tc_base_exc" + tc.function.name = "dummy_tool" + tc.function.arguments = "{}" + + assistant_msg = MagicMock() + assistant_msg.tool_calls = [tc] + + # _execute_tool_calls_concurrent will submit _run_tool to a + # ThreadPoolExecutor. The BaseException propagates out of the + # worker, but the finally block should still clean up. + try: + agent._execute_tool_calls_concurrent(assistant_msg, [], "default") + except Exception: + pass # ThreadPoolExecutor may re-raise + + # After the worker finishes (even with BaseException), the worker + # tid should have been removed from _interrupted_threads and + # _tool_worker_threads. + assert len(agent._tool_worker_threads) == 0, ( + f"_tool_worker_threads not cleaned up: {agent._tool_worker_threads}" + ) + + # Verify no stale tid is left in the global interrupt set + # (the worker thread is recycled by ThreadPoolExecutor, so any + # leftover tid would poison the next task on that thread). + # We can't predict the tid, but we know the worker thread is done + # (the call returned), so the set should be empty for this test's + # tid range. Check that no tid from our agent's tracking leaked. + with _lock: + # The only tids that should be in _interrupted_threads are + # ones we explicitly set — we didn't set any, so it should + # be empty (modulo other test interference, hence the + # per-agent tracking assertion above). + pass + + # --------------------------------------------------------------------------- # Manual smoke test checklist (not automated) # ---------------------------------------------------------------------------