fix(gateway,cron): prevent agent restart loops via self-targeting gateway commands (#30719)

Three defenses against SIGTERM-respawn loops when agent schedules its
own gateway restart under launchd/systemd KeepAlive:

1. HERMES_IN_GATEWAY env var: gateway sets it at startup; stop/restart
   subcommands refuse to run when set (exit 1 with clear message).

2. Cron create payload filter: regex pre-flight rejects prompts/scripts
   containing hermes gateway restart/stop, launchctl kickstart/unload,
   systemctl restart/stop, and pkill patterns.

3. 30 new tests: pattern matching (14), cron block (5), gateway guard (4),
   safe command negatives (7).
This commit is contained in:
simokiihamaki
2026-05-23 06:49:01 +03:00
committed by Teknium
parent 9b78f411c8
commit 5cd6c1717d
4 changed files with 293 additions and 0 deletions

View File

@ -1005,6 +1005,10 @@ except Exception as _bootstrap_exc:
# Gateway runs in quiet mode - suppress debug output and use cwd directly (no temp dirs)
os.environ["HERMES_QUIET"] = "1"
# Mark that we are inside the gateway process — used by `hermes gateway stop/restart`
# to refuse self-targeting calls that would kill the agent's own runtime.
os.environ["HERMES_IN_GATEWAY"] = "1"
# Enable interactive exec approval for dangerous commands on messaging platforms
os.environ["HERMES_EXEC_ASK"] = "1"

View File

@ -6,6 +6,7 @@ pause/resume/run/remove, status, and tick.
"""
import json
import re
import sys
from pathlib import Path
from typing import Iterable, List, Optional
@ -15,6 +16,22 @@ sys.path.insert(0, str(PROJECT_ROOT))
from hermes_cli.colors import Colors, color
# Patterns that indicate a cron job targets the gateway lifecycle.
# Matches commands that restart/stop the gateway or its service manager.
_GATEWAY_LIFECYCLE_PATTERNS = re.compile(
r"(?i)"
r"(hermes\s+gateway\s+(restart|stop|start))"
r"|(launchctl\s+(kickstart|unload|load|stop|restart)\s+.*hermes)"
r"|(systemctl\s+(restart|stop|start)\s+.*hermes)"
r"|(p?kill\s+.*hermes.*gateway)"
r"|(\bgateway.*restart)"
)
def _contains_gateway_lifecycle_command(text: str) -> bool:
"""Return True if *text* contains a gateway lifecycle command pattern."""
return bool(_GATEWAY_LIFECYCLE_PATTERNS.search(text))
def _normalize_skills(single_skill=None, skills: Optional[Iterable[str]] = None) -> Optional[List[str]]:
if skills is None:
@ -166,6 +183,28 @@ def cron_status():
def cron_create(args):
# Defense: reject cron jobs that contain gateway lifecycle commands.
# Prevents agents from scheduling their own restart/stop, which creates
# SIGTERM-respawn loops under launchd/systemd KeepAlive (#30719).
prompt = getattr(args, "prompt", None) or ""
script = getattr(args, "script", None)
combined = prompt
if script:
try:
script_text = Path(script).read_text()
combined = f"{combined}\n{script_text}"
except (OSError, UnicodeDecodeError):
pass
if _contains_gateway_lifecycle_command(combined):
print(color(
"Blocked: cron job contains a gateway lifecycle command "
"(restart/stop/kill).\n"
"This is blocked to prevent restart loops (#30719).\n"
"Use `hermes gateway restart` from a shell outside the gateway.",
Colors.RED,
))
return 1
result = _cron_api(
action="create",
schedule=args.schedule,

View File

@ -5422,6 +5422,16 @@ def _gateway_command_inner(args):
sys.exit(1)
elif subcmd == "stop":
# Defense: refuse self-targeting gateway stop from inside the gateway.
# Prevents agent-initiated kill loops when combined with supervisor KeepAlive.
if os.getenv("HERMES_IN_GATEWAY") == "1":
print_error(
"Refusing to stop the gateway from inside the gateway process.\n"
"This command was blocked to prevent restart loops.\n"
"Use `hermes gateway stop` from a shell outside the running gateway."
)
sys.exit(1)
stop_all = getattr(args, 'all', False)
system = getattr(args, 'system', False)
@ -5497,6 +5507,16 @@ def _gateway_command_inner(args):
print(f"✓ Stopped {get_service_name()} service")
elif subcmd == "restart":
# Defense: refuse self-targeting gateway restart from inside the gateway.
# Prevents agent-initiated kill loops when combined with supervisor KeepAlive.
if os.getenv("HERMES_IN_GATEWAY") == "1":
print_error(
"Refusing to restart the gateway from inside the gateway process.\n"
"This command was blocked to prevent restart loops.\n"
"Use `hermes gateway restart` from a shell outside the running gateway."
)
sys.exit(1)
# Try service first, fall back to killing and restarting
service_available = False
system = getattr(args, 'system', False)

View File

@ -0,0 +1,230 @@
"""Tests for gateway restart-loop defenses (#30719).
Covers:
- Defense 1: gateway stop/restart refuse when HERMES_IN_GATEWAY=1
- Defense 2: cron create rejects prompts containing gateway lifecycle commands
- _contains_gateway_lifecycle_command pattern matching
"""
import os
from argparse import Namespace
import pytest
from hermes_cli.cron import (
_contains_gateway_lifecycle_command,
cron_command,
)
# ---------------------------------------------------------------------------
# Defense 2: _contains_gateway_lifecycle_command pattern tests
# ---------------------------------------------------------------------------
class TestGatewayLifecyclePattern:
"""Verify the regex catches gateway lifecycle commands."""
@pytest.mark.parametrize("text", [
"hermes gateway restart",
"hermes gateway stop",
"hermes gateway start",
"hermes gateway restart", # double spaces
"Hermez Gateway Restart".lower().replace("z", "s"), # case handled
"HERMES GATEWAY RESTART", # uppercase
])
def test_hermes_gateway_commands(self, text):
assert _contains_gateway_lifecycle_command(text), f"Should match: {text!r}"
@pytest.mark.parametrize("text", [
"launchctl kickstart gui/501/ai.hermes.gateway",
"launchctl unload ~/Library/LaunchAgents/ai.hermes.gateway.plist",
"launchctl stop ai.hermes.gateway",
"systemctl restart hermes-gateway",
"systemctl stop hermes-gateway.service",
"systemctl start hermes-gateway",
])
def test_service_manager_commands(self, text):
assert _contains_gateway_lifecycle_command(text), f"Should match: {text!r}"
@pytest.mark.parametrize("text", [
"kill hermes gateway process",
"pkill -f hermes.*gateway",
])
def test_kill_commands(self, text):
assert _contains_gateway_lifecycle_command(text), f"Should match: {text!r}"
@pytest.mark.parametrize("text", [
"restart the server application",
"hermes cron list",
"hermes update",
"hermes config set model claude",
"echo 'just a normal cron job'",
"run the backup script",
"gateway is running fine",
])
def test_safe_commands(self, text):
assert not _contains_gateway_lifecycle_command(text), f"Should NOT match: {text!r}"
class TestCronCreateLifecycleBlock:
"""Verify cron create rejects gateway lifecycle prompts."""
@pytest.fixture(autouse=True)
def _setup_cron_dir(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path / "cron")
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "cron" / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "cron" / "output")
def test_block_hermes_gateway_restart(self, capsys):
args = Namespace(
cron_command="create",
schedule="30m",
prompt="Upgrade hermes then run hermes gateway restart",
name=None,
deliver=None,
repeat=None,
skill=None,
skills=None,
script=None,
workdir=None,
profile=None,
no_agent=False,
)
rc = cron_command(args)
assert rc == 1
out = capsys.readouterr().out
assert "Blocked" in out
assert "#30719" in out
def test_block_launchctl_kickstart(self, capsys):
args = Namespace(
cron_command="create",
schedule="0 9 * * *",
prompt="Run launchctl kickstart -k gui/501/ai.hermes.gateway",
name=None,
deliver=None,
repeat=None,
skill=None,
skills=None,
script=None,
workdir=None,
profile=None,
no_agent=False,
)
rc = cron_command(args)
assert rc == 1
out = capsys.readouterr().out
assert "Blocked" in out
def test_block_script_with_lifecycle_command(self, tmp_path, capsys):
script = tmp_path / "restart.sh"
script.write_text("#!/bin/bash\nhermes gateway restart\n")
args = Namespace(
cron_command="create",
schedule="1h",
prompt=None,
name=None,
deliver=None,
repeat=None,
skill=None,
skills=None,
script=str(script),
workdir=None,
profile=None,
no_agent=False,
)
rc = cron_command(args)
assert rc == 1
out = capsys.readouterr().out
assert "Blocked" in out
def test_allow_safe_prompt(self, capsys):
args = Namespace(
cron_command="create",
schedule="30m",
prompt="Check server health and report status",
name=None,
deliver=None,
repeat=None,
skill=None,
skills=None,
script=None,
workdir=None,
profile=None,
no_agent=False,
)
rc = cron_command(args)
assert rc == 0
out = capsys.readouterr().out
assert "Created job" in out
def test_allow_empty_prompt(self, capsys):
"""Empty prompt (no lifecycle content) should pass the filter — the
API will still reject it for lacking prompt+skill, but that's a
separate validation, not the lifecycle guard."""
args = Namespace(
cron_command="create",
schedule="30m",
prompt=None,
name=None,
deliver=None,
repeat=None,
skill=None,
skills=None,
script=None,
workdir=None,
profile=None,
no_agent=False,
)
rc = cron_command(args)
# The lifecycle guard passes (no gateway command in prompt).
# The API rejects it for "requires prompt or skill" → rc 1, but
# the error message is about prompt/skill, NOT about "Blocked".
out = capsys.readouterr().out
assert "Blocked" not in out
# ---------------------------------------------------------------------------
# Defense 1: gateway stop/restart refuse inside gateway
# ---------------------------------------------------------------------------
class TestGatewaySelfTargetingGuard:
"""Verify hermes gateway stop/restart refuse when HERMES_IN_GATEWAY=1."""
def test_stop_refuses_inside_gateway(self, monkeypatch):
monkeypatch.setenv("HERMES_IN_GATEWAY", "1")
from hermes_cli.gateway import gateway_command
args = Namespace(gateway_command="stop", all=False, system=False)
with pytest.raises(SystemExit) as exc_info:
gateway_command(args)
assert exc_info.value.code == 1
def test_restart_refuses_inside_gateway(self, monkeypatch):
monkeypatch.setenv("HERMES_IN_GATEWAY", "1")
from hermes_cli.gateway import gateway_command
args = Namespace(gateway_command="restart", all=False, system=False)
with pytest.raises(SystemExit) as exc_info:
gateway_command(args)
assert exc_info.value.code == 1
def test_stop_allows_outside_gateway(self, monkeypatch):
monkeypatch.delenv("HERMES_IN_GATEWAY", raising=False)
from hermes_cli.gateway import gateway_command
args = Namespace(gateway_command="stop", all=False, system=False)
# Should not raise SystemExit(1) — it may fail for other reasons
# (no gateway running) but it won't exit with code 1 from the guard.
try:
gateway_command(args)
except SystemExit as e:
# The guard exit code is 1 and prints "Refusing" — make sure
# that's NOT what we hit.
assert e.code != 1 or "Refusing" not in str(e)
def test_restart_allows_outside_gateway(self, monkeypatch):
monkeypatch.delenv("HERMES_IN_GATEWAY", raising=False)
from hermes_cli.gateway import gateway_command
args = Namespace(gateway_command="restart", all=False, system=False)
try:
gateway_command(args)
except SystemExit as e:
assert e.code != 1 or "Refusing" not in str(e)