fix(acp): wire HERMES_SESSION_KEY per session so sudo cache scope activates
PR #16858's session-scoped interactive sudo password cache falls back to a thread-identity scope when no HERMES_SESSION_KEY is bound. ACP never set that contextvar, so two ACP sessions landing on the same reused ThreadPoolExecutor thread still shared the cache — the exact scenario the PR headlined. acp_adapter/server.py now: - binds HERMES_SESSION_KEY=<session_id> via gateway.session_context inside _run_agent() (and clears on exit) - wraps the loop.run_in_executor(_executor, _run_agent) call in a fresh contextvars.copy_context() so concurrent ACP sessions don't stomp on each other's ContextVar writes (executor pool threads would otherwise share a context). Adds tests/acp/test_approval_isolation.py:: test_sudo_password_cache_isolated_across_acp_sessions_on_same_pool_thread which drives two back-to-back sessions through a 1-worker ThreadPoolExecutor and asserts B does not observe A's cached password.
This commit is contained in:
@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextvars
|
||||
import logging
|
||||
import os
|
||||
from collections import defaultdict, deque
|
||||
@ -574,6 +575,22 @@ class HermesACPAgent(acp.Agent):
|
||||
|
||||
def _run_agent() -> dict:
|
||||
nonlocal previous_approval_cb, previous_interactive
|
||||
# Bind HERMES_SESSION_KEY for this session so per-session caches
|
||||
# (e.g. the interactive sudo password cache in tools.terminal_tool)
|
||||
# scope to the ACP session rather than leaking across sessions
|
||||
# that land on the same reused executor thread. This call runs
|
||||
# inside a contextvars.copy_context() below, so the ContextVar
|
||||
# write is isolated from other concurrent ACP sessions.
|
||||
try:
|
||||
from gateway.session_context import (
|
||||
clear_session_vars,
|
||||
set_session_vars,
|
||||
)
|
||||
session_tokens = set_session_vars(session_key=session_id)
|
||||
except Exception:
|
||||
session_tokens = None
|
||||
clear_session_vars = None # type: ignore[assignment]
|
||||
logger.debug("Could not set ACP session context", exc_info=True)
|
||||
if approval_cb:
|
||||
try:
|
||||
from tools import terminal_tool as _terminal_tool
|
||||
@ -607,9 +624,19 @@ class HermesACPAgent(acp.Agent):
|
||||
_terminal_tool.set_approval_callback(previous_approval_cb)
|
||||
except Exception:
|
||||
logger.debug("Could not restore approval callback", exc_info=True)
|
||||
if session_tokens is not None and clear_session_vars is not None:
|
||||
try:
|
||||
clear_session_vars(session_tokens)
|
||||
except Exception:
|
||||
logger.debug("Could not clear ACP session context", exc_info=True)
|
||||
|
||||
try:
|
||||
result = await loop.run_in_executor(_executor, _run_agent)
|
||||
# Wrap the executor call in a fresh copy of the current context so
|
||||
# concurrent ACP sessions on the shared ThreadPoolExecutor don't
|
||||
# stomp on each other's ContextVar writes (HERMES_SESSION_KEY in
|
||||
# particular — used by the interactive sudo password cache scope).
|
||||
ctx = contextvars.copy_context()
|
||||
result = await loop.run_in_executor(_executor, ctx.run, _run_agent)
|
||||
except Exception:
|
||||
logger.exception("Executor error for session %s", session_id)
|
||||
return PromptResponse(stop_reason="end_turn")
|
||||
|
||||
@ -141,6 +141,59 @@ class TestThreadLocalApprovalCallback:
|
||||
assert worker_saw == [""]
|
||||
assert _get_cached_sudo_password() == "main-thread-password"
|
||||
|
||||
def test_sudo_password_cache_isolated_across_acp_sessions_on_same_pool_thread(self):
|
||||
"""ACP's ThreadPoolExecutor reuses threads. Two ACP sessions that land
|
||||
on the same reused thread must not share the interactive sudo password
|
||||
cache. The fix wraps each session in contextvars.copy_context() and
|
||||
binds HERMES_SESSION_KEY per session, so the cache scope key differs
|
||||
across sessions even when the underlying thread is identical.
|
||||
"""
|
||||
import contextvars
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from gateway.session_context import (
|
||||
clear_session_vars,
|
||||
set_session_vars,
|
||||
)
|
||||
from tools.terminal_tool import (
|
||||
_get_cached_sudo_password,
|
||||
_reset_cached_sudo_passwords,
|
||||
_set_cached_sudo_password,
|
||||
)
|
||||
|
||||
_reset_cached_sudo_passwords()
|
||||
executor = ThreadPoolExecutor(max_workers=1) # force thread reuse
|
||||
|
||||
runs: list[tuple[str, str, str]] = [] # (session_id, before, after)
|
||||
|
||||
def _simulate_acp_session(session_id: str, write_password: str) -> None:
|
||||
tokens = set_session_vars(session_key=session_id)
|
||||
try:
|
||||
observed_before = _get_cached_sudo_password()
|
||||
_set_cached_sudo_password(write_password)
|
||||
observed_after = _get_cached_sudo_password()
|
||||
runs.append((session_id, observed_before, observed_after))
|
||||
finally:
|
||||
clear_session_vars(tokens)
|
||||
|
||||
def _run_in_fresh_context(session_id: str, pw: str) -> str:
|
||||
ctx = contextvars.copy_context()
|
||||
ctx.run(_simulate_acp_session, session_id, pw)
|
||||
return session_id
|
||||
|
||||
try:
|
||||
executor.submit(_run_in_fresh_context, "acp-session-A", "alpha-secret").result()
|
||||
# Same thread. Without the fix B would see "alpha-secret".
|
||||
executor.submit(_run_in_fresh_context, "acp-session-B", "bravo-secret").result()
|
||||
finally:
|
||||
executor.shutdown(wait=True)
|
||||
_reset_cached_sudo_passwords()
|
||||
|
||||
assert runs[0] == ("acp-session-A", "", "alpha-secret")
|
||||
# Core regression guard: B on the same reused thread must see an empty
|
||||
# cache, not A's password.
|
||||
assert runs[1] == ("acp-session-B", "", "bravo-secret")
|
||||
|
||||
|
||||
class TestAcpExecAskGate:
|
||||
"""GHSA-96vc-wcxf-jjff: ACP's _run_agent must set HERMES_INTERACTIVE so
|
||||
|
||||
Reference in New Issue
Block a user