fix(code-exec): propagate agent-turn context into tool worker threads

Worker threads that dispatch Hermes tools started with an empty contextvars.Context and no thread-local approval/sudo callbacks. Add tools/thread_context.propagate_context_to_thread factoring that capture/install/clear lifecycle (mirrors the GHSA-qg5c-hvr5-hjgr pattern), and refactor agent/tool_executor onto it so the security-critical logic lives in one audited place. Update the contextvar-propagation source guard for the new call shape.

Refs #33057
This commit is contained in:
firefly
2026-05-28 17:47:09 -04:00
committed by Teknium
parent a22c250001
commit 21aeefe5fd
3 changed files with 143 additions and 35 deletions

View File

@ -13,7 +13,6 @@ extracted functions reach back through the ``run_agent`` module via
from __future__ import annotations
import concurrent.futures
import contextvars
import json
import logging
import os
@ -38,12 +37,9 @@ from agent.tool_dispatch_helpers import (
make_tool_result_message,
)
from tools.terminal_tool import (
_get_approval_callback,
_get_sudo_password_callback,
set_approval_callback as _set_approval_callback,
set_sudo_password_callback as _set_sudo_password_callback,
get_active_env,
)
from tools.thread_context import propagate_context_to_thread
from tools.tool_result_storage import (
maybe_persist_tool_result,
enforce_turn_budget,
@ -274,14 +270,6 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe
agent._current_tool = tool_names_str
agent._touch_activity(f"executing {num_tools} tools concurrently: {tool_names_str}")
# Capture CLI callbacks from the agent thread so worker threads can
# register them locally. Without this, _get_approval_callback() in
# terminal_tool returns None in ThreadPoolExecutor workers, causing
# the dangerous-command prompt to fall back to input() — which
# deadlocks against prompt_toolkit's raw terminal mode (#13617).
_parent_approval_cb = _get_approval_callback()
_parent_sudo_cb = _get_sudo_password_callback()
def _run_tool(index, tool_call, function_name, function_args):
"""Worker function executed in a thread."""
# Register this worker tid so the agent can fan out an interrupt
@ -308,18 +296,9 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe
set_activity_callback(agent._touch_activity)
except Exception:
pass
# Propagate approval/sudo callbacks to this worker thread.
# Mirrors cli.py run_agent() pattern (GHSA-qg5c-hvr5-hjgr).
if _parent_approval_cb is not None:
try:
_set_approval_callback(_parent_approval_cb)
except Exception:
pass
if _parent_sudo_cb is not None:
try:
_set_sudo_password_callback(_parent_sudo_cb)
except Exception:
pass
# Approval/sudo callbacks (thread-local) and the agent turn's
# ContextVars are propagated by propagate_context_to_thread() at the
# submit site below (GHSA-qg5c-hvr5-hjgr, #13617).
start = time.time()
try:
result = agent._invoke_tool(
@ -349,13 +328,6 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe
_ra()._set_interrupt(False, _worker_tid)
except Exception:
pass
# Clear thread-local callbacks so a recycled worker thread
# doesn't hold stale references to a disposed CLI instance.
try:
_set_approval_callback(None)
_set_sudo_password_callback(None)
except Exception:
pass
# Start spinner for CLI mode (skip when TUI handles tool progress)
spinner = None
@ -375,9 +347,12 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe
max_workers = min(len(runnable_calls), _MAX_TOOL_WORKERS)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for i, tc, name, args in runnable_calls:
# Propagate ContextVars (e.g. _approval_session_key); mirrors asyncio.to_thread.
ctx = contextvars.copy_context()
f = executor.submit(ctx.run, _run_tool, i, tc, name, args)
# Propagate the agent turn's ContextVars (e.g.
# _approval_session_key) AND thread-local approval/sudo
# callbacks into the worker thread; clears callbacks on exit.
f = executor.submit(
propagate_context_to_thread(_run_tool), i, tc, name, args
)
futures.append(f)
# Wait for all to complete with periodic heartbeats so the

View File

@ -197,6 +197,19 @@ def test_run_agent_concurrent_executor_wraps_submit_with_copy_context():
and call.args[1].id == "_run_tool"
):
tool_submits.append(("fixed", call))
# Fixed (shared helper): executor.submit(
# propagate_context_to_thread(_run_tool), ...) — the helper in
# tools/thread_context.py does copy_context().run(...) internally and
# additionally propagates the thread-local approval/sudo callbacks.
elif (
isinstance(first, ast.Call)
and isinstance(first.func, ast.Name)
and first.func.id == "propagate_context_to_thread"
and first.args
and isinstance(first.args[0], ast.Name)
and first.args[0].id == "_run_tool"
):
tool_submits.append(("fixed", call))
assert tool_submits, (
"Could not locate `executor.submit(... _run_tool ...)` in "

120
tools/thread_context.py Normal file
View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""Propagate agent-turn context into worker threads that dispatch Hermes tools.
A bare ``threading.Thread`` / ``ThreadPoolExecutor`` worker starts with an
empty ``contextvars.Context`` and no thread-local approval/sudo callbacks.
Tool dispatch inside such a thread therefore silently loses:
* the approval *session/platform* ContextVars (``tools.approval`` /
``gateway.session_context``) — so gateway sessions fall into
``check_dangerous_command``'s non-interactive auto-approve branch and
dangerous commands run without prompting (#33057, #30882);
* the thread-local CLI approval/sudo callbacks (``tools.terminal_tool``) —
so ``prompt_dangerous_approval`` cannot reach the user
(GHSA-qg5c-hvr5-hjgr, #15216).
This helper factors out that capture/install/clear lifecycle so the several
places that fan tool dispatch onto worker threads (``agent.tool_executor`` and
the ``execute_code`` RPC threads) share one audited implementation instead of
divergent copies.
Usage — call :func:`propagate_context_to_thread` **on the parent thread**
(it snapshots the parent's ContextVars and callbacks at call time) and use the
returned callable as the worker's target::
t = threading.Thread(target=propagate_context_to_thread(loop_fn), args=(...))
# or
executor.submit(propagate_context_to_thread(worker_fn), *args)
Approval/sudo callbacks are installed for the worker's lifetime and **always
cleared on exit**, so a recycled thread never holds a stale reference to a
disposed CLI instance.
"""
from __future__ import annotations
import contextvars
import logging
from typing import Callable
logger = logging.getLogger(__name__)
def _callback_api():
"""Resolve the terminal_tool callback getters/setters.
Imported lazily: ``tools.terminal_tool`` imports ``tools.approval`` at
module load, so a top-level import here would risk an import cycle for
callers that live in ``tools.approval``.
"""
from tools.terminal_tool import (
_get_approval_callback,
_get_sudo_password_callback,
set_approval_callback,
set_sudo_password_callback,
)
return (
_get_approval_callback,
_get_sudo_password_callback,
set_approval_callback,
set_sudo_password_callback,
)
def propagate_context_to_thread(target: Callable) -> Callable:
"""Wrap *target* for execution on a worker thread with the *current*
thread's ContextVars and approval/sudo callbacks propagated.
Call this on the parent thread; pass the returned callable as the
thread/executor target. The returned callable forwards its positional
and keyword arguments to *target* and returns its result.
Fail-closed: if callback installation raises, the callbacks are left
unset (``None``). That is the safe outcome — ``prompt_dangerous_approval``
denies dangerous commands when no callback is registered in an interactive
context, and the gateway approval queue blocks when its notify callback is
absent.
"""
ctx = contextvars.copy_context()
parent_approval_cb = parent_sudo_cb = None
setters = None
try:
get_approval, get_sudo, set_approval, set_sudo = _callback_api()
parent_approval_cb = get_approval()
parent_sudo_cb = get_sudo()
setters = (set_approval, set_sudo)
except Exception:
logger.debug("Could not capture parent approval/sudo callbacks", exc_info=True)
def _runner(*args, **kwargs):
def _inner():
if setters is not None:
set_approval, set_sudo = setters
try:
if parent_approval_cb is not None:
set_approval(parent_approval_cb)
if parent_sudo_cb is not None:
set_sudo(parent_sudo_cb)
except Exception:
logger.debug(
"Failed to install propagated approval/sudo callbacks; "
"dangerous-command approval will fail closed",
exc_info=True,
)
try:
return target(*args, **kwargs)
finally:
if setters is not None:
set_approval, set_sudo = setters
try:
set_approval(None)
set_sudo(None)
except Exception:
logger.debug(
"Failed to clear propagated approval/sudo callbacks",
exc_info=True,
)
return ctx.run(_inner)
return _runner