fix(gateway,cron): reuse existing _HERMES_GATEWAY marker; tighten cron regex

Follow-up to the salvaged #30728:
- Gateway already exports _HERMES_GATEWAY=1 at startup (gateway/run.py) and
  cli.py already keys off it. Drop the redundant new HERMES_IN_GATEWAY var;
  guard stop/restart on _HERMES_GATEWAY instead. One marker for one fact.
- Drop the greedy \bgateway.*restart alternation from the cron lifecycle
  filter — it false-positived on legit prompts that merely mention an
  unrelated gateway + a restart (API/payment gateway monitoring). The
  specific 'hermes gateway (restart|stop|start)' pattern already covers the
  real command.
- Rework the two negative guard tests to sentinel the first downstream call
  so they don't drive real signal delivery (tripped the live-system guard).
- Add false-positive regression cases to test_safe_commands.
This commit is contained in:
teknium1
2026-05-30 21:02:53 -07:00
committed by Teknium
parent 5cd6c1717d
commit bd72d333dc
5 changed files with 49 additions and 28 deletions

View File

@ -1005,10 +1005,6 @@ except Exception as _bootstrap_exc:
# Gateway runs in quiet mode - suppress debug output and use cwd directly (no temp dirs)
os.environ["HERMES_QUIET"] = "1"
# Mark that we are inside the gateway process — used by `hermes gateway stop/restart`
# to refuse self-targeting calls that would kill the agent's own runtime.
os.environ["HERMES_IN_GATEWAY"] = "1"
# Enable interactive exec approval for dangerous commands on messaging platforms
os.environ["HERMES_EXEC_ASK"] = "1"

View File

@ -18,13 +18,15 @@ from hermes_cli.colors import Colors, color
# Patterns that indicate a cron job targets the gateway lifecycle.
# Matches commands that restart/stop the gateway or its service manager.
# Deliberately specific — a bare "gateway ... restart" catch-all would block
# legitimate prompts that merely mention an unrelated gateway (e.g. "summarize
# the API gateway logs and report restart events").
_GATEWAY_LIFECYCLE_PATTERNS = re.compile(
r"(?i)"
r"(hermes\s+gateway\s+(restart|stop|start))"
r"|(launchctl\s+(kickstart|unload|load|stop|restart)\s+.*hermes)"
r"|(systemctl\s+(restart|stop|start)\s+.*hermes)"
r"|(p?kill\s+.*hermes.*gateway)"
r"|(\bgateway.*restart)"
)
@ -191,7 +193,7 @@ def cron_create(args):
combined = prompt
if script:
try:
script_text = Path(script).read_text()
script_text = Path(script).read_text(encoding="utf-8")
combined = f"{combined}\n{script_text}"
except (OSError, UnicodeDecodeError):
pass

View File

@ -5424,7 +5424,7 @@ def _gateway_command_inner(args):
elif subcmd == "stop":
# Defense: refuse self-targeting gateway stop from inside the gateway.
# Prevents agent-initiated kill loops when combined with supervisor KeepAlive.
if os.getenv("HERMES_IN_GATEWAY") == "1":
if os.getenv("_HERMES_GATEWAY") == "1":
print_error(
"Refusing to stop the gateway from inside the gateway process.\n"
"This command was blocked to prevent restart loops.\n"
@ -5509,7 +5509,7 @@ def _gateway_command_inner(args):
elif subcmd == "restart":
# Defense: refuse self-targeting gateway restart from inside the gateway.
# Prevents agent-initiated kill loops when combined with supervisor KeepAlive.
if os.getenv("HERMES_IN_GATEWAY") == "1":
if os.getenv("_HERMES_GATEWAY") == "1":
print_error(
"Refusing to restart the gateway from inside the gateway process.\n"
"This command was blocked to prevent restart loops.\n"

View File

@ -182,6 +182,7 @@ _HERMES_BEHAVIORAL_VARS = frozenset({
"HERMES_SESSION_SOURCE",
"HERMES_SESSION_KEY",
"HERMES_GATEWAY_SESSION",
"_HERMES_GATEWAY",
"HERMES_PLATFORM",
"HERMES_MODEL",
"HERMES_INFERENCE_MODEL",

View File

@ -1,7 +1,7 @@
"""Tests for gateway restart-loop defenses (#30719).
Covers:
- Defense 1: gateway stop/restart refuse when HERMES_IN_GATEWAY=1
- Defense 1: gateway stop/restart refuse when _HERMES_GATEWAY=1
- Defense 2: cron create rejects prompts containing gateway lifecycle commands
- _contains_gateway_lifecycle_command pattern matching
"""
@ -61,6 +61,11 @@ class TestGatewayLifecyclePattern:
"echo 'just a normal cron job'",
"run the backup script",
"gateway is running fine",
# Regression (#30728 follow-up): legit prompts that merely mention an
# unrelated gateway + a restart must NOT be blocked.
"Summarize the API gateway logs and report any restart events from last night",
"Check if the payment gateway needs a restart after the deploy",
"Monitor the gateway and tell me if a restart is recommended",
])
def test_safe_commands(self, text):
assert not _contains_gateway_lifecycle_command(text), f"Should NOT match: {text!r}"
@ -189,10 +194,10 @@ class TestCronCreateLifecycleBlock:
# ---------------------------------------------------------------------------
class TestGatewaySelfTargetingGuard:
"""Verify hermes gateway stop/restart refuse when HERMES_IN_GATEWAY=1."""
"""Verify hermes gateway stop/restart refuse when _HERMES_GATEWAY=1."""
def test_stop_refuses_inside_gateway(self, monkeypatch):
monkeypatch.setenv("HERMES_IN_GATEWAY", "1")
monkeypatch.setenv("_HERMES_GATEWAY", "1")
from hermes_cli.gateway import gateway_command
args = Namespace(gateway_command="stop", all=False, system=False)
with pytest.raises(SystemExit) as exc_info:
@ -200,7 +205,7 @@ class TestGatewaySelfTargetingGuard:
assert exc_info.value.code == 1
def test_restart_refuses_inside_gateway(self, monkeypatch):
monkeypatch.setenv("HERMES_IN_GATEWAY", "1")
monkeypatch.setenv("_HERMES_GATEWAY", "1")
from hermes_cli.gateway import gateway_command
args = Namespace(gateway_command="restart", all=False, system=False)
with pytest.raises(SystemExit) as exc_info:
@ -208,23 +213,40 @@ class TestGatewaySelfTargetingGuard:
assert exc_info.value.code == 1
def test_stop_allows_outside_gateway(self, monkeypatch):
monkeypatch.delenv("HERMES_IN_GATEWAY", raising=False)
from hermes_cli.gateway import gateway_command
# With the gateway marker unset, the self-targeting guard must NOT
# fire. Prove control reaches the real stop path (rather than driving
# real signal delivery, which would trip the live-system guard) by
# short-circuiting the first downstream call with a sentinel.
monkeypatch.delenv("_HERMES_GATEWAY", raising=False)
import hermes_cli.gateway as gw
class _Reached(Exception):
pass
def _sentinel(*a, **k):
raise _Reached()
monkeypatch.setattr(gw, "_dispatch_via_service_manager_if_s6", _sentinel)
monkeypatch.setattr(gw, "_dispatch_all_via_service_manager_if_s6", _sentinel)
args = Namespace(gateway_command="stop", all=False, system=False)
# Should not raise SystemExit(1) — it may fail for other reasons
# (no gateway running) but it won't exit with code 1 from the guard.
try:
gateway_command(args)
except SystemExit as e:
# The guard exit code is 1 and prints "Refusing" — make sure
# that's NOT what we hit.
assert e.code != 1 or "Refusing" not in str(e)
with pytest.raises(_Reached):
gw.gateway_command(args)
def test_restart_allows_outside_gateway(self, monkeypatch):
monkeypatch.delenv("HERMES_IN_GATEWAY", raising=False)
from hermes_cli.gateway import gateway_command
# Same as above for restart: guard must not fire when the marker is
# unset. The first thing restart does after the guard is the s6
# dispatch check — sentinel it so we never reach real signal delivery.
monkeypatch.delenv("_HERMES_GATEWAY", raising=False)
import hermes_cli.gateway as gw
class _Reached(Exception):
pass
def _sentinel(*a, **k):
raise _Reached()
monkeypatch.setattr(gw, "_dispatch_via_service_manager_if_s6", _sentinel)
monkeypatch.setattr(gw, "_dispatch_all_via_service_manager_if_s6", _sentinel)
args = Namespace(gateway_command="restart", all=False, system=False)
try:
gateway_command(args)
except SystemExit as e:
assert e.code != 1 or "Refusing" not in str(e)
with pytest.raises(_Reached):
gw.gateway_command(args)