fix(dashboard): reap orphaned embedded-chat sessions to stop slash_worker leak
Since #38591 made the dashboard's embedded chat unconditional, every browser refresh of /chat spins up a fresh session.create (new sid + a fresh _SlashWorker via _deferred_build) over /api/ws, but the old tab's WS disconnect only DETACHES the transport (ws.py) — it never closes the old session or its slash_worker. The dashboard's in-process gateway is long-lived, so the detached _SlashWorker subprocess's stdin pipe stays open forever and the worker never reaches EOF: one leaked python process per refresh. Fix at the session-lifecycle layer (not PTY signal timing — verified that a process whose owning gateway dies is always reaped via stdin-EOF; the leak is specifically the long-lived dashboard process keeping detached sessions parked). On WS disconnect, schedule a grace-delayed reap of any session left orphaned (transport detached to stdio, not mid-turn). A quick reconnect / session.resume / prompt.submit rebinds a live transport and cancels the reap, preserving the intentional detach-for-reconnect window. - server.py: extract _teardown_session() (shared with session.close), add _ws_session_is_orphaned() + _schedule_ws_orphan_reap(), gated by HERMES_TUI_WS_ORPHAN_REAP_GRACE_S (default 20s, 0 disables). - ws.py: schedule the reap for each detached session on disconnect. - tests: reap-closes-worker, spares-reattached/mid-turn/finalized, disabled-when-grace-zero.
This commit is contained in:
@ -876,6 +876,73 @@ def test_session_close_commits_memory_and_fires_finalize_hook(monkeypatch):
|
||||
server._sessions.pop("sid", None)
|
||||
|
||||
|
||||
def test_ws_orphan_reap_closes_worker_when_session_stays_detached(monkeypatch):
|
||||
"""A detached WS session past its grace window has its slash_worker closed.
|
||||
|
||||
Regression for #38591 fallout: every dashboard refresh spawned a fresh
|
||||
session + _SlashWorker but never reaped the previous one, leaking one
|
||||
python subprocess per refresh.
|
||||
"""
|
||||
closed = {"worker": False}
|
||||
|
||||
class _FakeWorker:
|
||||
def close(self):
|
||||
closed["worker"] = True
|
||||
|
||||
server._sessions["orphan-sid"] = _session(
|
||||
transport=server._stdio_transport,
|
||||
slash_worker=_FakeWorker(),
|
||||
running=False,
|
||||
)
|
||||
# Run the reap body synchronously (no real timer/grace) to assert behaviour.
|
||||
monkeypatch.setattr(server, "_WS_ORPHAN_REAP_GRACE_S", 0.01)
|
||||
try:
|
||||
# Directly invoke the orphaned-check + teardown the timer would run.
|
||||
assert server._ws_session_is_orphaned(server._sessions["orphan-sid"]) is True
|
||||
session = server._sessions.pop("orphan-sid")
|
||||
server._teardown_session(session)
|
||||
assert closed["worker"] is True
|
||||
finally:
|
||||
server._sessions.pop("orphan-sid", None)
|
||||
|
||||
|
||||
def test_ws_orphan_reap_spares_reattached_session(monkeypatch):
|
||||
"""A session that rebinds a live transport is NOT considered orphaned."""
|
||||
|
||||
class _LiveTransport:
|
||||
def write(self, *a, **k):
|
||||
return True
|
||||
|
||||
# Reattached: transport is a live (non-stdio) transport.
|
||||
reattached = _session(transport=_LiveTransport(), running=False)
|
||||
assert server._ws_session_is_orphaned(reattached) is False
|
||||
|
||||
# Mid-turn sessions are also spared even if detached.
|
||||
mid_turn = _session(transport=server._stdio_transport, running=True)
|
||||
assert server._ws_session_is_orphaned(mid_turn) is False
|
||||
|
||||
# Already finalized sessions are spared (idempotency).
|
||||
done = _session(transport=server._stdio_transport, running=False, _finalized=True)
|
||||
assert server._ws_session_is_orphaned(done) is False
|
||||
|
||||
|
||||
def test_ws_orphan_reap_disabled_when_grace_zero(monkeypatch):
|
||||
"""Grace=0 disables the reaper entirely (pre-fix park-forever behaviour)."""
|
||||
fired = {"timer": False}
|
||||
|
||||
class _Timer:
|
||||
def __init__(self, *a, **k):
|
||||
fired["timer"] = True
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(server, "_WS_ORPHAN_REAP_GRACE_S", 0.0)
|
||||
monkeypatch.setattr(server.threading, "Timer", _Timer)
|
||||
server._schedule_ws_orphan_reap("any-sid")
|
||||
assert fired["timer"] is False
|
||||
|
||||
|
||||
def test_init_session_fires_reset_hook(monkeypatch):
|
||||
hooks = []
|
||||
|
||||
|
||||
@ -136,6 +136,24 @@ try:
|
||||
except (ValueError, TypeError):
|
||||
_slash_timeout = 45.0
|
||||
_SLASH_WORKER_TIMEOUT_S = max(5.0, _slash_timeout)
|
||||
|
||||
# When a WebSocket client (the dashboard's embedded-chat tab / desktop app)
|
||||
# disconnects, ``tui_gateway.ws`` detaches the transport but intentionally
|
||||
# leaves the session parked so a quick reconnect can reattach it (see ws.py).
|
||||
# That park is unbounded, though: a browser refresh spins up a brand-new
|
||||
# ``session.create`` (new sid + a fresh _SlashWorker via _deferred_build) and
|
||||
# never reattaches the OLD sid, so the old session's slash-worker subprocess
|
||||
# lingers forever — one leaked python process per refresh (#38591 fallout).
|
||||
# After this grace window, an orphaned (transport-detached, not-running) WS
|
||||
# session is reaped: its _SlashWorker is closed and the session finalized.
|
||||
# Set to 0 to disable (park forever, pre-fix behaviour).
|
||||
try:
|
||||
_ws_orphan_reap_grace = float(
|
||||
os.environ.get("HERMES_TUI_WS_ORPHAN_REAP_GRACE_S") or "20"
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
_ws_orphan_reap_grace = 20.0
|
||||
_WS_ORPHAN_REAP_GRACE_S = max(0.0, _ws_orphan_reap_grace)
|
||||
_DETAIL_SECTION_NAMES = ("thinking", "tools", "subagents", "activity")
|
||||
_DETAIL_MODES = frozenset({"hidden", "collapsed", "expanded"})
|
||||
|
||||
@ -326,6 +344,78 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No
|
||||
pass
|
||||
|
||||
|
||||
def _teardown_session(session: dict | None) -> None:
|
||||
"""Fully tear down a session: finalize, unregister, close agent + worker.
|
||||
|
||||
Shared by ``session.close`` and the orphaned-WS-session reaper so the
|
||||
slash-worker subprocess is always closed exactly once via the same path.
|
||||
Idempotent: the ``_finalized`` guard in ``_finalize_session`` and the
|
||||
``poll()`` guard in ``_SlashWorker.close`` make repeat calls harmless.
|
||||
"""
|
||||
if not session:
|
||||
return
|
||||
_finalize_session(session)
|
||||
try:
|
||||
from tools.approval import unregister_gateway_notify
|
||||
|
||||
unregister_gateway_notify(session["session_key"])
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
agent = session.get("agent")
|
||||
if agent and hasattr(agent, "close"):
|
||||
agent.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
worker = session.get("slash_worker")
|
||||
if worker:
|
||||
worker.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _ws_session_is_orphaned(session: dict | None) -> bool:
|
||||
"""True if a WS session has no live transport and no in-flight turn.
|
||||
|
||||
After ``handle_ws`` detaches a disconnected client it points the session
|
||||
at ``_stdio_transport``. In the dashboard's in-process gateway there is no
|
||||
real stdio peer reading those frames, so a session left on the stdio
|
||||
transport (and not mid-turn) is genuinely orphaned and safe to reap.
|
||||
"""
|
||||
if not session or session.get("_finalized"):
|
||||
return False
|
||||
if session.get("running"):
|
||||
return False
|
||||
return session.get("transport") is _stdio_transport
|
||||
|
||||
|
||||
def _schedule_ws_orphan_reap(sid: str) -> None:
|
||||
"""After a grace window, reap session ``sid`` iff it's still orphaned.
|
||||
|
||||
Called from the WS-disconnect path. The grace window lets a transient
|
||||
reconnect (or a ``session.resume`` that reattaches the transport) cancel
|
||||
the reap by re-binding a live transport. Disabled when the grace is 0.
|
||||
"""
|
||||
if _WS_ORPHAN_REAP_GRACE_S <= 0:
|
||||
return
|
||||
|
||||
def _reap() -> None:
|
||||
with _session_resume_lock:
|
||||
session = _sessions.get(sid)
|
||||
if not _ws_session_is_orphaned(session):
|
||||
return
|
||||
_sessions.pop(sid, None)
|
||||
try:
|
||||
_teardown_session(session)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
timer = threading.Timer(_WS_ORPHAN_REAP_GRACE_S, _reap)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
|
||||
|
||||
def _shutdown_sessions() -> None:
|
||||
with _sessions_lock:
|
||||
snapshot = list(_sessions.values())
|
||||
@ -3675,25 +3765,7 @@ def _(rid, params: dict) -> dict:
|
||||
session = _sessions.pop(sid, None)
|
||||
if not session:
|
||||
return _ok(rid, {"closed": False})
|
||||
_finalize_session(session)
|
||||
try:
|
||||
from tools.approval import unregister_gateway_notify
|
||||
|
||||
unregister_gateway_notify(session["session_key"])
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
agent = session.get("agent")
|
||||
if agent and hasattr(agent, "close"):
|
||||
agent.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
worker = session.get("slash_worker")
|
||||
if worker:
|
||||
worker.close()
|
||||
except Exception:
|
||||
pass
|
||||
_teardown_session(session)
|
||||
return _ok(rid, {"closed": True})
|
||||
|
||||
|
||||
|
||||
@ -262,15 +262,32 @@ async def handle_ws(ws: Any) -> None:
|
||||
break
|
||||
finally:
|
||||
detached_sessions = 0
|
||||
reaped_scheduled = 0
|
||||
if transport is not None:
|
||||
transport.close()
|
||||
|
||||
# Detach the transport from any sessions it owned so later emits
|
||||
# fall back to stdio instead of crashing into a closed socket.
|
||||
for _, sess in list(server._sessions.items()):
|
||||
#
|
||||
# In the dashboard's in-process gateway that stdio fallback has no
|
||||
# real reader, so a detached session would otherwise sit forever
|
||||
# holding its _SlashWorker subprocess open (one leaked python proc
|
||||
# per browser refresh — #38591 fallout). Schedule a grace-delayed
|
||||
# reap; a quick reconnect / session.resume re-binds a live
|
||||
# transport and cancels it (see _ws_session_is_orphaned).
|
||||
for _sid, sess in list(server._sessions.items()):
|
||||
if sess.get("transport") is transport:
|
||||
sess["transport"] = server._stdio_transport
|
||||
detached_sessions += 1
|
||||
try:
|
||||
server._schedule_ws_orphan_reap(_sid)
|
||||
reaped_scheduled += 1
|
||||
except Exception:
|
||||
_log.exception(
|
||||
"ws orphan-reap schedule failed peer=%s sid=%s",
|
||||
peer,
|
||||
_sid,
|
||||
)
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception as exc:
|
||||
|
||||
Reference in New Issue
Block a user