From 21aeefe5fd1cbed15f6e8c479d3b100b091eae57 Mon Sep 17 00:00:00 2001 From: firefly Date: Thu, 28 May 2026 17:47:09 -0400 Subject: [PATCH] fix(code-exec): propagate agent-turn context into tool worker threads Worker threads that dispatch Hermes tools started with an empty contextvars.Context and no thread-local approval/sudo callbacks. Add tools/thread_context.propagate_context_to_thread factoring that capture/install/clear lifecycle (mirrors the GHSA-qg5c-hvr5-hjgr pattern), and refactor agent/tool_executor onto it so the security-critical logic lives in one audited place. Update the contextvar-propagation source guard for the new call shape. Refs #33057 --- agent/tool_executor.py | 45 ++----- ...st_tool_executor_contextvar_propagation.py | 13 ++ tools/thread_context.py | 120 ++++++++++++++++++ 3 files changed, 143 insertions(+), 35 deletions(-) create mode 100644 tools/thread_context.py diff --git a/agent/tool_executor.py b/agent/tool_executor.py index 003fb9420..358c1a0a8 100644 --- a/agent/tool_executor.py +++ b/agent/tool_executor.py @@ -13,7 +13,6 @@ extracted functions reach back through the ``run_agent`` module via from __future__ import annotations import concurrent.futures -import contextvars import json import logging import os @@ -38,12 +37,9 @@ from agent.tool_dispatch_helpers import ( make_tool_result_message, ) from tools.terminal_tool import ( - _get_approval_callback, - _get_sudo_password_callback, - set_approval_callback as _set_approval_callback, - set_sudo_password_callback as _set_sudo_password_callback, get_active_env, ) +from tools.thread_context import propagate_context_to_thread from tools.tool_result_storage import ( maybe_persist_tool_result, enforce_turn_budget, @@ -274,14 +270,6 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe agent._current_tool = tool_names_str agent._touch_activity(f"executing {num_tools} tools concurrently: {tool_names_str}") - # Capture CLI callbacks from the agent thread so worker threads can - # register them locally. Without this, _get_approval_callback() in - # terminal_tool returns None in ThreadPoolExecutor workers, causing - # the dangerous-command prompt to fall back to input() — which - # deadlocks against prompt_toolkit's raw terminal mode (#13617). - _parent_approval_cb = _get_approval_callback() - _parent_sudo_cb = _get_sudo_password_callback() - def _run_tool(index, tool_call, function_name, function_args): """Worker function executed in a thread.""" # Register this worker tid so the agent can fan out an interrupt @@ -308,18 +296,9 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe set_activity_callback(agent._touch_activity) except Exception: pass - # Propagate approval/sudo callbacks to this worker thread. - # Mirrors cli.py run_agent() pattern (GHSA-qg5c-hvr5-hjgr). - if _parent_approval_cb is not None: - try: - _set_approval_callback(_parent_approval_cb) - except Exception: - pass - if _parent_sudo_cb is not None: - try: - _set_sudo_password_callback(_parent_sudo_cb) - except Exception: - pass + # Approval/sudo callbacks (thread-local) and the agent turn's + # ContextVars are propagated by propagate_context_to_thread() at the + # submit site below (GHSA-qg5c-hvr5-hjgr, #13617). start = time.time() try: result = agent._invoke_tool( @@ -349,13 +328,6 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe _ra()._set_interrupt(False, _worker_tid) except Exception: pass - # Clear thread-local callbacks so a recycled worker thread - # doesn't hold stale references to a disposed CLI instance. - try: - _set_approval_callback(None) - _set_sudo_password_callback(None) - except Exception: - pass # Start spinner for CLI mode (skip when TUI handles tool progress) spinner = None @@ -375,9 +347,12 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe max_workers = min(len(runnable_calls), _MAX_TOOL_WORKERS) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for i, tc, name, args in runnable_calls: - # Propagate ContextVars (e.g. _approval_session_key); mirrors asyncio.to_thread. - ctx = contextvars.copy_context() - f = executor.submit(ctx.run, _run_tool, i, tc, name, args) + # Propagate the agent turn's ContextVars (e.g. + # _approval_session_key) AND thread-local approval/sudo + # callbacks into the worker thread; clears callbacks on exit. + f = executor.submit( + propagate_context_to_thread(_run_tool), i, tc, name, args + ) futures.append(f) # Wait for all to complete with periodic heartbeats so the diff --git a/tests/run_agent/test_tool_executor_contextvar_propagation.py b/tests/run_agent/test_tool_executor_contextvar_propagation.py index 2e1d54370..0395dcbba 100644 --- a/tests/run_agent/test_tool_executor_contextvar_propagation.py +++ b/tests/run_agent/test_tool_executor_contextvar_propagation.py @@ -197,6 +197,19 @@ def test_run_agent_concurrent_executor_wraps_submit_with_copy_context(): and call.args[1].id == "_run_tool" ): tool_submits.append(("fixed", call)) + # Fixed (shared helper): executor.submit( + # propagate_context_to_thread(_run_tool), ...) — the helper in + # tools/thread_context.py does copy_context().run(...) internally and + # additionally propagates the thread-local approval/sudo callbacks. + elif ( + isinstance(first, ast.Call) + and isinstance(first.func, ast.Name) + and first.func.id == "propagate_context_to_thread" + and first.args + and isinstance(first.args[0], ast.Name) + and first.args[0].id == "_run_tool" + ): + tool_submits.append(("fixed", call)) assert tool_submits, ( "Could not locate `executor.submit(... _run_tool ...)` in " diff --git a/tools/thread_context.py b/tools/thread_context.py new file mode 100644 index 000000000..8d9a27229 --- /dev/null +++ b/tools/thread_context.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +"""Propagate agent-turn context into worker threads that dispatch Hermes tools. + +A bare ``threading.Thread`` / ``ThreadPoolExecutor`` worker starts with an +empty ``contextvars.Context`` and no thread-local approval/sudo callbacks. +Tool dispatch inside such a thread therefore silently loses: + + * the approval *session/platform* ContextVars (``tools.approval`` / + ``gateway.session_context``) — so gateway sessions fall into + ``check_dangerous_command``'s non-interactive auto-approve branch and + dangerous commands run without prompting (#33057, #30882); + * the thread-local CLI approval/sudo callbacks (``tools.terminal_tool``) — + so ``prompt_dangerous_approval`` cannot reach the user + (GHSA-qg5c-hvr5-hjgr, #15216). + +This helper factors out that capture/install/clear lifecycle so the several +places that fan tool dispatch onto worker threads (``agent.tool_executor`` and +the ``execute_code`` RPC threads) share one audited implementation instead of +divergent copies. + +Usage — call :func:`propagate_context_to_thread` **on the parent thread** +(it snapshots the parent's ContextVars and callbacks at call time) and use the +returned callable as the worker's target:: + + t = threading.Thread(target=propagate_context_to_thread(loop_fn), args=(...)) + # or + executor.submit(propagate_context_to_thread(worker_fn), *args) + +Approval/sudo callbacks are installed for the worker's lifetime and **always +cleared on exit**, so a recycled thread never holds a stale reference to a +disposed CLI instance. +""" + +from __future__ import annotations + +import contextvars +import logging +from typing import Callable + +logger = logging.getLogger(__name__) + + +def _callback_api(): + """Resolve the terminal_tool callback getters/setters. + + Imported lazily: ``tools.terminal_tool`` imports ``tools.approval`` at + module load, so a top-level import here would risk an import cycle for + callers that live in ``tools.approval``. + """ + from tools.terminal_tool import ( + _get_approval_callback, + _get_sudo_password_callback, + set_approval_callback, + set_sudo_password_callback, + ) + return ( + _get_approval_callback, + _get_sudo_password_callback, + set_approval_callback, + set_sudo_password_callback, + ) + + +def propagate_context_to_thread(target: Callable) -> Callable: + """Wrap *target* for execution on a worker thread with the *current* + thread's ContextVars and approval/sudo callbacks propagated. + + Call this on the parent thread; pass the returned callable as the + thread/executor target. The returned callable forwards its positional + and keyword arguments to *target* and returns its result. + + Fail-closed: if callback installation raises, the callbacks are left + unset (``None``). That is the safe outcome — ``prompt_dangerous_approval`` + denies dangerous commands when no callback is registered in an interactive + context, and the gateway approval queue blocks when its notify callback is + absent. + """ + ctx = contextvars.copy_context() + parent_approval_cb = parent_sudo_cb = None + setters = None + try: + get_approval, get_sudo, set_approval, set_sudo = _callback_api() + parent_approval_cb = get_approval() + parent_sudo_cb = get_sudo() + setters = (set_approval, set_sudo) + except Exception: + logger.debug("Could not capture parent approval/sudo callbacks", exc_info=True) + + def _runner(*args, **kwargs): + def _inner(): + if setters is not None: + set_approval, set_sudo = setters + try: + if parent_approval_cb is not None: + set_approval(parent_approval_cb) + if parent_sudo_cb is not None: + set_sudo(parent_sudo_cb) + except Exception: + logger.debug( + "Failed to install propagated approval/sudo callbacks; " + "dangerous-command approval will fail closed", + exc_info=True, + ) + try: + return target(*args, **kwargs) + finally: + if setters is not None: + set_approval, set_sudo = setters + try: + set_approval(None) + set_sudo(None) + except Exception: + logger.debug( + "Failed to clear propagated approval/sudo callbacks", + exc_info=True, + ) + + return ctx.run(_inner) + + return _runner