Files
hermes-agent/tests/gateway/test_planned_stop_watcher.py
kshitij 7379f17556 fix(gateway): only fire planned-stop watcher for self-targeting markers + fix Windows consume (#34749)
* fix(gateway): only fire planned-stop watcher for markers targeting self

Salvaged from #34599 — rebased onto current main.

The planned-stop watcher now only fires shutdown for a marker that targets
the current process, instead of any marker that exists on disk. Fixes the
Windows crash loop (#34597) where a stale marker from a previous Gateway
instance kills a freshly booted Gateway ~400ms after start with a false
"Received UNKNOWN — initiating shutdown".

Co-authored-by: Bartok9 <danielrpike9@gmail.com>

* fix(gateway): match planned-stop/takeover markers by PID alone when start_time is unavailable

Follow-up to the #34599 salvage. The watcher's non-destructive probe
(planned_stop_marker_targets_self) already falls back to PID equality when
a process start_time is unavailable, but the authoritative consume it gates
(_consume_pid_marker_for_self) still required a non-None start_time match.

_get_process_start_time reads /proc/<pid>/stat and returns None on macOS and
native Windows — the only platform the planned-stop watcher exists for. So on
Windows the probe would fire the shutdown handler (PID matches) but the
handler's consume_planned_stop_marker_for_self() would return False, and a
legitimate 'hermes gateway stop' was still misclassified as an unexpected
UNKNOWN exit (exit 1) and revived by the service manager — a residual half of
the #34597 crash loop on the legitimate-stop path.

Align the consume with the probe: when both start_times are known they must
match (PID-reuse guard preserved on Linux); when either is unavailable, fall
back to PID equality alone, bounded by the existing short marker TTL. This
also fixes the parallel --replace takeover consume on Windows, which shares
the same helper.

Adds regression tests for the Windows (None start_time) path, the foreign-PID
rejection under that fallback, and confirmation the start_time-mismatch guard
still rejects when both are known.

---------

Co-authored-by: Bartok9 <danielrpike9@gmail.com>
2026-05-29 17:36:58 +00:00

394 lines
14 KiB
Python

"""Tests for the planned-stop marker watcher thread (gateway/run.py).
The watcher is the Windows-fallback path for the v0.13.0 session-resume
feature — on Windows ``asyncio.add_signal_handler`` raises
NotImplementedError, so the SIGTERM signal handler never runs and the
shutdown drain (which writes ``resume_pending=True``) is skipped. The
watcher closes this gap by polling for the planned-stop marker file
and translating its existence into the same shutdown-handler call a
real SIGTERM would have produced.
See issue #33778 for the original Windows session-loss bug report.
"""
import asyncio
import json
import os
import threading
import time
from unittest.mock import MagicMock
from gateway.run import _run_planned_stop_watcher
from gateway import status as status_mod
def _write_self_marker(marker, *, stale: bool = False):
"""Write a planned-stop marker that targets the CURRENT process.
The watcher only fires for markers naming our PID + start_time (the
fix for issue #34597), so tests that expect a fire must write a
self-targeting marker. Pass ``stale=True`` to backdate ``written_at``
past the TTL.
"""
written_at = "2000-01-01T00:00:00+00:00" if stale else status_mod._utc_now_iso()
record = {
"target_pid": os.getpid(),
"target_start_time": status_mod._get_process_start_time(os.getpid()),
"stopper_pid": os.getpid(),
"written_at": written_at,
}
marker.write_text(json.dumps(record), encoding="utf-8")
class _FakeRunner:
"""Stand-in for GatewayRunner — only exposes the two flags the watcher reads."""
def __init__(self, *, running: bool = True, draining: bool = False):
self._running = running
self._draining = draining
def _make_loop_capturing_calls():
"""Build a fake asyncio loop whose call_soon_threadsafe records its args."""
loop = MagicMock(spec=asyncio.AbstractEventLoop)
loop._captured = []
def fake_call_soon_threadsafe(fn, *args):
loop._captured.append((fn, args))
loop.call_soon_threadsafe = fake_call_soon_threadsafe
return loop
def test_watcher_fires_shutdown_when_marker_appears(tmp_path, monkeypatch):
"""When a marker targeting THIS process exists, fire the shutdown handler."""
marker = tmp_path / ".gateway-planned-stop.json"
# Patch the marker-path resolver so the watcher polls our temp location.
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
shutdown_handler = MagicMock(name="shutdown_signal_handler")
stop_event = threading.Event()
# Drop a self-targeting marker before the thread starts.
_write_self_marker(marker)
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, shutdown_handler),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
watcher.join(timeout=2.0)
assert not watcher.is_alive(), "Watcher should exit after firing"
assert len(loop._captured) == 1, (
f"Expected exactly one shutdown invocation, got {loop._captured}"
)
fn, args = loop._captured[0]
assert fn is shutdown_handler
# The handler must be called with signal=None (planned stop sentinel).
assert args == (None,)
def test_watcher_does_not_fire_when_marker_absent(tmp_path, monkeypatch):
"""No marker = no shutdown call. Watcher just spins until stop_event."""
marker = tmp_path / ".gateway-planned-stop.json"
# Deliberately do NOT create the marker.
from gateway import status as status_mod
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
shutdown_handler = MagicMock()
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, shutdown_handler),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
time.sleep(0.3) # let it poll a few times
stop_event.set()
watcher.join(timeout=2.0)
assert not watcher.is_alive()
assert loop._captured == [], (
f"No marker present, but watcher fired shutdown: {loop._captured}"
)
shutdown_handler.assert_not_called()
def test_watcher_skips_when_runner_already_draining(tmp_path, monkeypatch):
"""If shutdown is already in progress, don't re-fire the handler.
This prevents a race where the SIGTERM handler is mid-drain and the
watcher would double-tap the shutdown path. We check ``_draining``
so the watcher backs off once any shutdown is in flight.
"""
marker = tmp_path / ".gateway-planned-stop.json"
_write_self_marker(marker)
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
# Already draining — watcher should be a no-op.
runner = _FakeRunner(running=False, draining=True)
loop = _make_loop_capturing_calls()
shutdown_handler = MagicMock()
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, shutdown_handler),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
time.sleep(0.2)
stop_event.set()
watcher.join(timeout=2.0)
assert loop._captured == [], "Watcher fired while runner was already draining"
def test_watcher_skips_when_runner_not_started(tmp_path, monkeypatch):
"""If the runner hasn't started, the marker is for a previous instance —
we shouldn't shutdown a not-yet-running gateway.
"""
marker = tmp_path / ".gateway-planned-stop.json"
marker.write_text('{"target_pid": 9999}', encoding="utf-8")
from gateway import status as status_mod
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=False, draining=False)
loop = _make_loop_capturing_calls()
shutdown_handler = MagicMock()
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, shutdown_handler),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
time.sleep(0.2)
stop_event.set()
watcher.join(timeout=2.0)
assert loop._captured == [], "Watcher fired before runner was running"
def test_watcher_responds_to_stop_event_promptly(tmp_path, monkeypatch):
"""Setting stop_event must exit the watcher within ~poll_interval seconds."""
marker = tmp_path / ".gateway-planned-stop.json"
from gateway import status as status_mod
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, MagicMock()),
kwargs={"poll_interval": 0.1},
daemon=True,
)
watcher.start()
time.sleep(0.05)
started_stop = time.monotonic()
stop_event.set()
watcher.join(timeout=2.0)
elapsed = time.monotonic() - started_stop
assert not watcher.is_alive()
assert elapsed < 0.5, f"Watcher took {elapsed:.2f}s to honour stop_event"
def test_watcher_fires_only_once_when_marker_persists(tmp_path, monkeypatch):
"""Marker file existing for multiple polls must NOT spam the handler.
The watcher fires once and exits its loop (the shutdown handler is
responsible for consuming the marker on its own thread). If we
re-fired on every tick, the handler would be invoked dozens of
times before the gateway actually shuts down.
"""
marker = tmp_path / ".gateway-planned-stop.json"
_write_self_marker(marker)
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, MagicMock()),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
# Let the watcher tick several times — but it should exit after the first fire.
watcher.join(timeout=1.0)
assert not watcher.is_alive()
assert len(loop._captured) == 1, (
f"Watcher fired {len(loop._captured)} times; should fire once "
f"and exit (events={loop._captured})"
)
def test_watcher_tolerates_marker_path_resolution_errors(tmp_path, monkeypatch, caplog):
"""If _get_planned_stop_marker_path() raises, the watcher logs and continues."""
from gateway import status as status_mod
call_count = [0]
def explode():
call_count[0] += 1
# First call (the one outside the loop, at thread start) is fine —
# but subsequent .exists() calls on a corrupt Path could explode.
if call_count[0] == 1:
return tmp_path / "nonexistent"
raise OSError("filesystem failed")
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", explode)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, MagicMock()),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
time.sleep(0.2)
stop_event.set()
watcher.join(timeout=2.0)
assert not watcher.is_alive(), "Watcher should still honour stop_event after errors"
# No shutdown fired because the marker never reported existence.
assert loop._captured == []
# ---------------------------------------------------------------------------
# Regression coverage for issue #34597:
# A marker left behind by a PREVIOUS gateway instance (different PID, or
# past its TTL) must NOT crash the freshly booted gateway. The watcher
# only fires when the marker targets the current process, and self-heals
# by cleaning up stale/malformed markers.
# ---------------------------------------------------------------------------
def test_watcher_does_not_fire_for_foreign_pid_marker(tmp_path, monkeypatch):
"""A marker naming a DIFFERENT process must not trigger our shutdown.
This is the core #34597 regression: a stale marker from a prior
gateway instance was firing the handler, driving the new gateway into
a false "Received UNKNOWN" shutdown and a watchdog crash loop.
"""
marker = tmp_path / ".gateway-planned-stop.json"
# Foreign PID + a start_time that cannot match ours, freshly written
# so the TTL does NOT remove it — the watcher must still decline.
record = {
"target_pid": os.getpid() + 1,
"target_start_time": -1,
"stopper_pid": os.getpid() + 1,
"written_at": status_mod._utc_now_iso(),
}
marker.write_text(json.dumps(record), encoding="utf-8")
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
shutdown_handler = MagicMock(name="shutdown_signal_handler")
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, shutdown_handler),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
time.sleep(0.3) # several poll cycles
stop_event.set()
watcher.join(timeout=2.0)
assert not watcher.is_alive()
assert loop._captured == [], (
f"Watcher fired on a foreign-PID marker (#34597 regression): {loop._captured}"
)
shutdown_handler.assert_not_called()
# Foreign (but live) marker is left in place — it may still belong to
# the process it names.
assert marker.exists()
def test_watcher_cleans_up_stale_marker_and_keeps_running(tmp_path, monkeypatch):
"""A marker older than the TTL is unlinked and never fires shutdown."""
marker = tmp_path / ".gateway-planned-stop.json"
# Self-targeting but backdated past the TTL: must be treated as dead.
_write_self_marker(marker, stale=True)
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
runner = _FakeRunner(running=True, draining=False)
loop = _make_loop_capturing_calls()
shutdown_handler = MagicMock(name="shutdown_signal_handler")
stop_event = threading.Event()
watcher = threading.Thread(
target=_run_planned_stop_watcher,
args=(stop_event, runner, loop, shutdown_handler),
kwargs={"poll_interval": 0.05},
daemon=True,
)
watcher.start()
time.sleep(0.3)
stop_event.set()
watcher.join(timeout=2.0)
assert not watcher.is_alive()
assert loop._captured == [], "Stale marker must not fire shutdown"
shutdown_handler.assert_not_called()
assert not marker.exists(), "Stale marker should have been cleaned up"
def test_planned_stop_marker_targets_self_probe_is_non_destructive(tmp_path, monkeypatch):
"""The probe returns True for a self-marker WITHOUT unlinking it.
The shutdown handler performs the authoritative consume on its own
thread, so the watcher's probe must leave a matching marker intact.
"""
marker = tmp_path / ".gateway-planned-stop.json"
_write_self_marker(marker)
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
assert status_mod.planned_stop_marker_targets_self() is True
assert marker.exists(), "Probe must not consume a matching marker"
# Idempotent: still True on a second call.
assert status_mod.planned_stop_marker_targets_self() is True
def test_planned_stop_marker_targets_self_drops_malformed(tmp_path, monkeypatch):
"""A malformed marker reports False and is cleaned up."""
marker = tmp_path / ".gateway-planned-stop.json"
marker.write_text("{not valid json", encoding="utf-8")
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
assert status_mod.planned_stop_marker_targets_self() is False