"""Regression tests for the execute_code approval-bypass cluster. Covers the canonical fix for issues #4146, #27303, #30882, #33057: 1. tools.thread_context.propagate_context_to_thread — propagates the agent turn's ContextVars AND thread-local approval/sudo callbacks into worker threads, and clears the callbacks on teardown. 2. Both execute_code RPC threads are wrapped with that helper (source guard). 3. tools.approval.check_execute_code_guard — the entry-point guard decision matrix (isolated backends, yolo/off, cron-deny, headless-local, gateway approve/deny/timeout/missing-notify, smart mode). 4. tools.code_execution_tool._scrub_child_env — broad HERMES_ prefix dropped, operational allowlist kept, DSN/WEBHOOK blocked, passthrough precedence. """ from __future__ import annotations import concurrent.futures import contextvars import threading import pytest from tools import approval as A from tools.thread_context import propagate_context_to_thread # --------------------------------------------------------------------------- # 1. Context + callback propagation helper # --------------------------------------------------------------------------- def test_helper_propagates_contextvar_and_approval_callback(): from tools import terminal_tool as TT probe: contextvars.ContextVar[str] = contextvars.ContextVar( "cluster_probe", default="unset" ) probe.set("parent-value") sentinel = object() TT.set_approval_callback(sentinel) try: seen: dict = {} def worker(): seen["probe"] = probe.get() seen["cb"] = TT._get_approval_callback() t = threading.Thread(target=propagate_context_to_thread(worker)) t.start() t.join(timeout=5) assert seen["probe"] == "parent-value" # ContextVar propagated assert seen["cb"] is sentinel # thread-local callback propagated finally: TT.set_approval_callback(None) def test_helper_clears_callbacks_on_teardown(): """A recycled worker thread must not retain the propagated callback after the wrapped target finishes (mirrors the GHSA-qg5c-hvr5-hjgr teardown).""" from tools import terminal_tool as TT sentinel = object() TT.set_approval_callback(sentinel) try: seen: dict = {} def first(): seen["during"] = TT._get_approval_callback() def second(): # NOT wrapped — runs on the same recycled worker thread seen["after"] = TT._get_approval_callback() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: ex.submit(propagate_context_to_thread(first)).result(timeout=5) ex.submit(second).result(timeout=5) assert seen["during"] is sentinel # installed for the wrapped target assert seen["after"] is None # cleared on teardown finally: TT.set_approval_callback(None) def test_both_rpc_threads_use_propagation_helper(): """Source guard: both execute_code RPC threads must wrap their target with propagate_context_to_thread, or the gateway approval bypass (#33057) silently returns.""" import inspect import tools.code_execution_tool as cet src = inspect.getsource(cet) assert "propagate_context_to_thread(_rpc_server_loop)" in src, ( "local UDS RPC server thread is not wrapped with " "propagate_context_to_thread — gateway approval routing will be lost." ) assert "propagate_context_to_thread(_rpc_poll_loop)" in src, ( "remote file-RPC poll thread is not wrapped with " "propagate_context_to_thread — gateway approval routing will be lost." ) # --------------------------------------------------------------------------- # 3. check_execute_code_guard decision matrix # --------------------------------------------------------------------------- @pytest.fixture def gw_session(monkeypatch): """A clean gateway session: HERMES_GATEWAY_SESSION set, a bound session key, and isolated gateway queues/callbacks. Yields the session_key.""" monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1") monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) monkeypatch.delenv("HERMES_CRON_SESSION", raising=False) monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) # Force manual mode regardless of host config. monkeypatch.setattr(A, "_get_approval_mode", lambda: "manual") session_key = "cluster-test-session" token = A.set_current_session_key(session_key) with A._lock: A._gateway_queues.pop(session_key, None) A._gateway_notify_cbs.pop(session_key, None) try: yield session_key finally: A.reset_current_session_key(token) with A._lock: A._gateway_queues.pop(session_key, None) A._gateway_notify_cbs.pop(session_key, None) def _register_resolver(session_key: str, result): """Register a gateway notify callback that immediately resolves the most recent queued approval entry with *result* (simulating a user response).""" def cb(_approval_data): with A._lock: entries = A._gateway_queues.get(session_key, []) if entries: entry = entries[-1] entry.result = result entry.event.set() with A._lock: A._gateway_notify_cbs[session_key] = cb def test_guard_isolated_backend_approved(): # Container backends already sandbox the child — no-op approve. assert A.check_execute_code_guard("import os", "docker")["approved"] is True def test_guard_headless_local_approved(monkeypatch): # Documented #30882 limitation: no approval surface → preserve auto-run. monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) monkeypatch.delenv("HERMES_CRON_SESSION", raising=False) monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) monkeypatch.setattr(A, "_get_approval_mode", lambda: "manual") assert A.check_execute_code_guard("import os", "local")["approved"] is True def test_guard_cron_deny_blocks(monkeypatch): monkeypatch.setenv("HERMES_CRON_SESSION", "1") monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) monkeypatch.setattr(A, "_get_approval_mode", lambda: "manual") monkeypatch.setattr(A, "_get_cron_approval_mode", lambda: "deny") res = A.check_execute_code_guard("import os", "local") assert res["approved"] is False assert res["outcome"] == "blocked" def test_guard_gateway_user_approves_is_one_shot(gw_session): _register_resolver(gw_session, "once") res = A.check_execute_code_guard("import os; print(1)", "local") assert res["approved"] is True assert res.get("user_approved") is True # One-shot: approval must NOT persist to future scripts. assert A.is_approved(gw_session, "execute_code") is False def test_guard_gateway_user_approves_session_persists(gw_session): """'Approve session' stores session-level approval (#39275).""" _register_resolver(gw_session, "session") res = A.check_execute_code_guard("import os; print(1)", "local") assert res["approved"] is True assert res.get("user_approved") is True # Session approval should now be stored. assert A.is_approved(gw_session, "execute_code") is True # Subsequent calls should auto-approve without prompting. res2 = A.check_execute_code_guard("import os; print(2)", "local") assert res2["approved"] is True # Cleanup with A._lock: s = A._session_approved.get(gw_session, set()) s.discard("execute_code") def test_guard_gateway_user_approves_always_persists(gw_session): """'Always' stores permanent approval (#39275).""" _register_resolver(gw_session, "always") res = A.check_execute_code_guard("import os; print(1)", "local") assert res["approved"] is True assert res.get("user_approved") is True # Permanent approval should now be stored. assert A.is_approved(gw_session, "execute_code") is True # Cleanup with A._lock: A._permanent_approved.discard("execute_code") s = A._session_approved.get(gw_session, set()) s.discard("execute_code") def test_guard_session_approval_short_circuits_prompt(gw_session): """Once session-approved, execute_code skips the approval prompt (#39275).""" # Manually set session approval. A.approve_session(gw_session, "execute_code") try: # Even with a denier registered, the is_approved check short-circuits. _register_resolver(gw_session, "deny") res = A.check_execute_code_guard("import os", "local") assert res["approved"] is True finally: with A._lock: s = A._session_approved.get(gw_session, set()) s.discard("execute_code") def test_guard_gateway_user_denies_blocks(gw_session): _register_resolver(gw_session, "deny") res = A.check_execute_code_guard("import os", "local") assert res["approved"] is False assert res["outcome"] == "denied" assert res["user_consent"] is False def test_guard_gateway_timeout_blocks(gw_session, monkeypatch): # Register a callback that never resolves; force an immediate timeout. with A._lock: A._gateway_notify_cbs[gw_session] = lambda _d: None monkeypatch.setattr(A, "_get_approval_config", lambda: {"gateway_timeout": 0}) res = A.check_execute_code_guard("import os", "local") assert res["approved"] is False assert res["outcome"] == "timeout" def test_guard_gateway_missing_notify_is_pending(gw_session): # No notify callback registered → backward-compat pending approval. res = A.check_execute_code_guard("import os", "local") assert res["approved"] is False assert res["status"] == "pending_approval" def test_guard_smart_mode(gw_session, monkeypatch): monkeypatch.setattr(A, "_get_approval_mode", lambda: "smart") monkeypatch.setattr(A, "_smart_approve", lambda c, d: "approve") res = A.check_execute_code_guard("import os", "local") assert res["approved"] is True and res.get("smart_approved") is True monkeypatch.setattr(A, "_smart_approve", lambda c, d: "deny") res = A.check_execute_code_guard("import os", "local") assert res["approved"] is False and res.get("smart_denied") is True # escalate → falls through to manual gateway approval monkeypatch.setattr(A, "_smart_approve", lambda c, d: "escalate") _register_resolver(gw_session, "once") res = A.check_execute_code_guard("import os", "local") assert res["approved"] is True def test_guard_session_yolo_bypasses(gw_session): A.enable_session_yolo(gw_session) try: # Even with a denier registered, yolo short-circuits before the prompt. _register_resolver(gw_session, "deny") assert A.check_execute_code_guard("import os", "local")["approved"] is True finally: A.disable_session_yolo(gw_session) # --------------------------------------------------------------------------- # 4. Env scrubbing (#27303) # --------------------------------------------------------------------------- def test_env_scrub_hermes_allowlist_and_secret_blocks(): from tools.code_execution_tool import _scrub_child_env env = { # operational allowlist → kept "HERMES_HOME": "/h", "HERMES_PROFILE": "p", "HERMES_CONFIG": "/c.yaml", "HERMES_ENV": "/e", # other HERMES_* → dropped (broad prefix removed) "HERMES_BASE_URL": "https://x", "HERMES_INTERACTIVE": "1", "HERMES_KANBAN_DB": "postgres://u:p@h/db", # secret substrings (incl. new DSN/WEBHOOK) → dropped "SENTRY_DSN": "https://a@s.io/1", "SLACK_WEBHOOK": "https://h/x", "OPENAI_API_KEY": "sk", "GITHUB_TOKEN": "ghp", # safe prefix → kept; uncategorized → dropped "PATH": "/usr/bin", "RANDOM_X": "y", } out = _scrub_child_env(env, is_passthrough=lambda _: False, is_windows=False) for kept in ("HERMES_HOME", "HERMES_PROFILE", "HERMES_CONFIG", "HERMES_ENV", "PATH"): assert kept in out, f"{kept} should be kept" for dropped in ( "HERMES_BASE_URL", "HERMES_INTERACTIVE", "HERMES_KANBAN_DB", "SENTRY_DSN", "SLACK_WEBHOOK", "OPENAI_API_KEY", "GITHUB_TOKEN", "RANDOM_X", ): assert dropped not in out, f"{dropped} should be dropped" def test_env_scrub_passthrough_overrides_secret_block(): """A skill/config-declared passthrough var is an explicit user opt-in and passes even if it matches a secret substring (precedence is intentional).""" from tools.code_execution_tool import _scrub_child_env env = {"MY_SERVICE_DSN": "value"} out = _scrub_child_env(env, is_passthrough=lambda k: k == "MY_SERVICE_DSN", is_windows=False) assert out.get("MY_SERVICE_DSN") == "value" # --------------------------------------------------------------------------- # 5. File-tool sensitive-path refusal (security B1) # --------------------------------------------------------------------------- def test_execute_code_entry_blocks_before_spawn_when_guard_denies(monkeypatch, tmp_path): """Behavioral wiring test: execute_code() consults the entry guard and, on denial, returns the block message WITHOUT spawning the child — proven by a marker file the script would create that never appears.""" import json import tools.code_execution_tool as cet from tools import terminal_tool as TT marker = tmp_path / "child-ran.marker" monkeypatch.setenv("HERMES_CRON_SESSION", "1") monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) monkeypatch.setattr(A, "_get_approval_mode", lambda: "manual") monkeypatch.setattr(A, "_get_cron_approval_mode", lambda: "deny") monkeypatch.setattr(TT, "_get_env_config", lambda: {"env_type": "local"}) result = json.loads( cet.execute_code(f"open({str(marker)!r}, 'w').close()", task_id="cluster-t") ) assert result["status"] == "error" assert "BLOCKED" in result["error"] assert not marker.exists() # guard denied before the child was spawned # --------------------------------------------------------------------------- # 6. Env-scrub diagnosability mitigation (#27303 follow-up) # --------------------------------------------------------------------------- def test_env_scrub_logs_dropped_hermes_vars(caplog): """Dropping a non-allowlisted, non-secret HERMES_* var must be diagnosable: the scrub emits a one-shot debug log naming the dropped vars and pointing at the env_passthrough opt-in, so the silent behavior change (#27303) doesn't leave users guessing why a sandbox script sees an unset HERMES_* var.""" import logging from tools.code_execution_tool import _scrub_child_env env = { "HERMES_HOME": "/h", # allowlisted → kept, not logged "HERMES_BASE_URL": "https://x", # dropped → logged "HERMES_KANBAN_DB": "postgres://u:p@h/db", # dropped → logged "HERMES_API_KEY": "sk", # secret → dropped silently (not logged) "PATH": "/usr/bin", # safe prefix → kept } with caplog.at_level(logging.DEBUG, logger="tools.code_execution_tool"): out = _scrub_child_env(env, is_passthrough=lambda _: False, is_windows=False) assert "HERMES_HOME" in out and "PATH" in out assert "HERMES_BASE_URL" not in out and "HERMES_KANBAN_DB" not in out msgs = "\n".join(r.getMessage() for r in caplog.records) assert "HERMES_BASE_URL" in msgs and "HERMES_KANBAN_DB" in msgs assert "env_passthrough" in msgs # Secret vars are dropped but must NOT be named in the diagnostic log. assert "HERMES_API_KEY" not in msgs def test_env_scrub_no_log_when_nothing_dropped(caplog): """No diagnostic noise when there are no dropped HERMES_* vars.""" import logging from tools.code_execution_tool import _scrub_child_env with caplog.at_level(logging.DEBUG, logger="tools.code_execution_tool"): _scrub_child_env( {"HERMES_HOME": "/h", "PATH": "/usr/bin"}, is_passthrough=lambda _: False, is_windows=False, ) assert "dropped" not in "\n".join(r.getMessage() for r in caplog.records)