* fix(gateway): only fire planned-stop watcher for markers targeting self Salvaged from #34599 — rebased onto current main. The planned-stop watcher now only fires shutdown for a marker that targets the current process, instead of any marker that exists on disk. Fixes the Windows crash loop (#34597) where a stale marker from a previous Gateway instance kills a freshly booted Gateway ~400ms after start with a false "Received UNKNOWN — initiating shutdown". Co-authored-by: Bartok9 <danielrpike9@gmail.com> * fix(gateway): match planned-stop/takeover markers by PID alone when start_time is unavailable Follow-up to the #34599 salvage. The watcher's non-destructive probe (planned_stop_marker_targets_self) already falls back to PID equality when a process start_time is unavailable, but the authoritative consume it gates (_consume_pid_marker_for_self) still required a non-None start_time match. _get_process_start_time reads /proc/<pid>/stat and returns None on macOS and native Windows — the only platform the planned-stop watcher exists for. So on Windows the probe would fire the shutdown handler (PID matches) but the handler's consume_planned_stop_marker_for_self() would return False, and a legitimate 'hermes gateway stop' was still misclassified as an unexpected UNKNOWN exit (exit 1) and revived by the service manager — a residual half of the #34597 crash loop on the legitimate-stop path. Align the consume with the probe: when both start_times are known they must match (PID-reuse guard preserved on Linux); when either is unavailable, fall back to PID equality alone, bounded by the existing short marker TTL. This also fixes the parallel --replace takeover consume on Windows, which shares the same helper. Adds regression tests for the Windows (None start_time) path, the foreign-PID rejection under that fallback, and confirmation the start_time-mismatch guard still rejects when both are known. --------- Co-authored-by: Bartok9 <danielrpike9@gmail.com>
394 lines
14 KiB
Python
394 lines
14 KiB
Python
"""Tests for the planned-stop marker watcher thread (gateway/run.py).
|
|
|
|
The watcher is the Windows-fallback path for the v0.13.0 session-resume
|
|
feature — on Windows ``asyncio.add_signal_handler`` raises
|
|
NotImplementedError, so the SIGTERM signal handler never runs and the
|
|
shutdown drain (which writes ``resume_pending=True``) is skipped. The
|
|
watcher closes this gap by polling for the planned-stop marker file
|
|
and translating its existence into the same shutdown-handler call a
|
|
real SIGTERM would have produced.
|
|
|
|
See issue #33778 for the original Windows session-loss bug report.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
from unittest.mock import MagicMock
|
|
|
|
|
|
from gateway.run import _run_planned_stop_watcher
|
|
from gateway import status as status_mod
|
|
|
|
|
|
def _write_self_marker(marker, *, stale: bool = False):
|
|
"""Write a planned-stop marker that targets the CURRENT process.
|
|
|
|
The watcher only fires for markers naming our PID + start_time (the
|
|
fix for issue #34597), so tests that expect a fire must write a
|
|
self-targeting marker. Pass ``stale=True`` to backdate ``written_at``
|
|
past the TTL.
|
|
"""
|
|
written_at = "2000-01-01T00:00:00+00:00" if stale else status_mod._utc_now_iso()
|
|
record = {
|
|
"target_pid": os.getpid(),
|
|
"target_start_time": status_mod._get_process_start_time(os.getpid()),
|
|
"stopper_pid": os.getpid(),
|
|
"written_at": written_at,
|
|
}
|
|
marker.write_text(json.dumps(record), encoding="utf-8")
|
|
|
|
|
|
class _FakeRunner:
|
|
"""Stand-in for GatewayRunner — only exposes the two flags the watcher reads."""
|
|
|
|
def __init__(self, *, running: bool = True, draining: bool = False):
|
|
self._running = running
|
|
self._draining = draining
|
|
|
|
|
|
def _make_loop_capturing_calls():
|
|
"""Build a fake asyncio loop whose call_soon_threadsafe records its args."""
|
|
loop = MagicMock(spec=asyncio.AbstractEventLoop)
|
|
loop._captured = []
|
|
|
|
def fake_call_soon_threadsafe(fn, *args):
|
|
loop._captured.append((fn, args))
|
|
|
|
loop.call_soon_threadsafe = fake_call_soon_threadsafe
|
|
return loop
|
|
|
|
|
|
def test_watcher_fires_shutdown_when_marker_appears(tmp_path, monkeypatch):
|
|
"""When a marker targeting THIS process exists, fire the shutdown handler."""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
|
|
# Patch the marker-path resolver so the watcher polls our temp location.
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
shutdown_handler = MagicMock(name="shutdown_signal_handler")
|
|
stop_event = threading.Event()
|
|
|
|
# Drop a self-targeting marker before the thread starts.
|
|
_write_self_marker(marker)
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, shutdown_handler),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert not watcher.is_alive(), "Watcher should exit after firing"
|
|
assert len(loop._captured) == 1, (
|
|
f"Expected exactly one shutdown invocation, got {loop._captured}"
|
|
)
|
|
fn, args = loop._captured[0]
|
|
assert fn is shutdown_handler
|
|
# The handler must be called with signal=None (planned stop sentinel).
|
|
assert args == (None,)
|
|
|
|
|
|
def test_watcher_does_not_fire_when_marker_absent(tmp_path, monkeypatch):
|
|
"""No marker = no shutdown call. Watcher just spins until stop_event."""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
# Deliberately do NOT create the marker.
|
|
|
|
from gateway import status as status_mod
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
shutdown_handler = MagicMock()
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, shutdown_handler),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.3) # let it poll a few times
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert not watcher.is_alive()
|
|
assert loop._captured == [], (
|
|
f"No marker present, but watcher fired shutdown: {loop._captured}"
|
|
)
|
|
shutdown_handler.assert_not_called()
|
|
|
|
|
|
def test_watcher_skips_when_runner_already_draining(tmp_path, monkeypatch):
|
|
"""If shutdown is already in progress, don't re-fire the handler.
|
|
|
|
This prevents a race where the SIGTERM handler is mid-drain and the
|
|
watcher would double-tap the shutdown path. We check ``_draining``
|
|
so the watcher backs off once any shutdown is in flight.
|
|
"""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
_write_self_marker(marker)
|
|
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
# Already draining — watcher should be a no-op.
|
|
runner = _FakeRunner(running=False, draining=True)
|
|
loop = _make_loop_capturing_calls()
|
|
shutdown_handler = MagicMock()
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, shutdown_handler),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.2)
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert loop._captured == [], "Watcher fired while runner was already draining"
|
|
|
|
|
|
def test_watcher_skips_when_runner_not_started(tmp_path, monkeypatch):
|
|
"""If the runner hasn't started, the marker is for a previous instance —
|
|
we shouldn't shutdown a not-yet-running gateway.
|
|
"""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
marker.write_text('{"target_pid": 9999}', encoding="utf-8")
|
|
|
|
from gateway import status as status_mod
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=False, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
shutdown_handler = MagicMock()
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, shutdown_handler),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.2)
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert loop._captured == [], "Watcher fired before runner was running"
|
|
|
|
|
|
def test_watcher_responds_to_stop_event_promptly(tmp_path, monkeypatch):
|
|
"""Setting stop_event must exit the watcher within ~poll_interval seconds."""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
from gateway import status as status_mod
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, MagicMock()),
|
|
kwargs={"poll_interval": 0.1},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.05)
|
|
started_stop = time.monotonic()
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
elapsed = time.monotonic() - started_stop
|
|
|
|
assert not watcher.is_alive()
|
|
assert elapsed < 0.5, f"Watcher took {elapsed:.2f}s to honour stop_event"
|
|
|
|
|
|
def test_watcher_fires_only_once_when_marker_persists(tmp_path, monkeypatch):
|
|
"""Marker file existing for multiple polls must NOT spam the handler.
|
|
|
|
The watcher fires once and exits its loop (the shutdown handler is
|
|
responsible for consuming the marker on its own thread). If we
|
|
re-fired on every tick, the handler would be invoked dozens of
|
|
times before the gateway actually shuts down.
|
|
"""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
_write_self_marker(marker)
|
|
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, MagicMock()),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
# Let the watcher tick several times — but it should exit after the first fire.
|
|
watcher.join(timeout=1.0)
|
|
|
|
assert not watcher.is_alive()
|
|
assert len(loop._captured) == 1, (
|
|
f"Watcher fired {len(loop._captured)} times; should fire once "
|
|
f"and exit (events={loop._captured})"
|
|
)
|
|
|
|
|
|
def test_watcher_tolerates_marker_path_resolution_errors(tmp_path, monkeypatch, caplog):
|
|
"""If _get_planned_stop_marker_path() raises, the watcher logs and continues."""
|
|
from gateway import status as status_mod
|
|
|
|
call_count = [0]
|
|
def explode():
|
|
call_count[0] += 1
|
|
# First call (the one outside the loop, at thread start) is fine —
|
|
# but subsequent .exists() calls on a corrupt Path could explode.
|
|
if call_count[0] == 1:
|
|
return tmp_path / "nonexistent"
|
|
raise OSError("filesystem failed")
|
|
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", explode)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, MagicMock()),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.2)
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert not watcher.is_alive(), "Watcher should still honour stop_event after errors"
|
|
# No shutdown fired because the marker never reported existence.
|
|
assert loop._captured == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Regression coverage for issue #34597:
|
|
# A marker left behind by a PREVIOUS gateway instance (different PID, or
|
|
# past its TTL) must NOT crash the freshly booted gateway. The watcher
|
|
# only fires when the marker targets the current process, and self-heals
|
|
# by cleaning up stale/malformed markers.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_watcher_does_not_fire_for_foreign_pid_marker(tmp_path, monkeypatch):
|
|
"""A marker naming a DIFFERENT process must not trigger our shutdown.
|
|
|
|
This is the core #34597 regression: a stale marker from a prior
|
|
gateway instance was firing the handler, driving the new gateway into
|
|
a false "Received UNKNOWN" shutdown and a watchdog crash loop.
|
|
"""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
# Foreign PID + a start_time that cannot match ours, freshly written
|
|
# so the TTL does NOT remove it — the watcher must still decline.
|
|
record = {
|
|
"target_pid": os.getpid() + 1,
|
|
"target_start_time": -1,
|
|
"stopper_pid": os.getpid() + 1,
|
|
"written_at": status_mod._utc_now_iso(),
|
|
}
|
|
marker.write_text(json.dumps(record), encoding="utf-8")
|
|
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
shutdown_handler = MagicMock(name="shutdown_signal_handler")
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, shutdown_handler),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.3) # several poll cycles
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert not watcher.is_alive()
|
|
assert loop._captured == [], (
|
|
f"Watcher fired on a foreign-PID marker (#34597 regression): {loop._captured}"
|
|
)
|
|
shutdown_handler.assert_not_called()
|
|
# Foreign (but live) marker is left in place — it may still belong to
|
|
# the process it names.
|
|
assert marker.exists()
|
|
|
|
|
|
def test_watcher_cleans_up_stale_marker_and_keeps_running(tmp_path, monkeypatch):
|
|
"""A marker older than the TTL is unlinked and never fires shutdown."""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
# Self-targeting but backdated past the TTL: must be treated as dead.
|
|
_write_self_marker(marker, stale=True)
|
|
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
runner = _FakeRunner(running=True, draining=False)
|
|
loop = _make_loop_capturing_calls()
|
|
shutdown_handler = MagicMock(name="shutdown_signal_handler")
|
|
stop_event = threading.Event()
|
|
|
|
watcher = threading.Thread(
|
|
target=_run_planned_stop_watcher,
|
|
args=(stop_event, runner, loop, shutdown_handler),
|
|
kwargs={"poll_interval": 0.05},
|
|
daemon=True,
|
|
)
|
|
watcher.start()
|
|
time.sleep(0.3)
|
|
stop_event.set()
|
|
watcher.join(timeout=2.0)
|
|
|
|
assert not watcher.is_alive()
|
|
assert loop._captured == [], "Stale marker must not fire shutdown"
|
|
shutdown_handler.assert_not_called()
|
|
assert not marker.exists(), "Stale marker should have been cleaned up"
|
|
|
|
|
|
def test_planned_stop_marker_targets_self_probe_is_non_destructive(tmp_path, monkeypatch):
|
|
"""The probe returns True for a self-marker WITHOUT unlinking it.
|
|
|
|
The shutdown handler performs the authoritative consume on its own
|
|
thread, so the watcher's probe must leave a matching marker intact.
|
|
"""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
_write_self_marker(marker)
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
assert status_mod.planned_stop_marker_targets_self() is True
|
|
assert marker.exists(), "Probe must not consume a matching marker"
|
|
# Idempotent: still True on a second call.
|
|
assert status_mod.planned_stop_marker_targets_self() is True
|
|
|
|
|
|
def test_planned_stop_marker_targets_self_drops_malformed(tmp_path, monkeypatch):
|
|
"""A malformed marker reports False and is cleaned up."""
|
|
marker = tmp_path / ".gateway-planned-stop.json"
|
|
marker.write_text("{not valid json", encoding="utf-8")
|
|
monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker)
|
|
|
|
assert status_mod.planned_stop_marker_targets_self() is False
|