From 693f4c7e9ce583ed740daaa5aa8eb738390c643b Mon Sep 17 00:00:00 2001 From: CryptoByz Date: Thu, 4 Jun 2026 06:03:49 -0700 Subject: [PATCH] fix(gateway): clear zombie agent slot when session_reset races in-flight run A session_reset (/new, /cc) that bumps the run generation while an agent turn is in flight left the dead agent in the _running_agents slot: the in-flight run's own release is generation-guarded and correctly returns False, and the outer finally's sentinel-only check also missed the leftover real agent. The session then silently dropped every subsequent message as 'agent busy' until a full gateway restart. (#28686) - _process_message_or_command outer finally now calls the unconditional, idempotent _release_running_agent_state(key) on all exit paths instead of the sentinel-vs-else branch that could strand a dead agent. - _handle_reset_command evicts the slot right after bumping the generation, so the zombie is cleared at reset time regardless of how the in-flight run unwinds. Co-authored-by: CryptoByz --- gateway/run.py | 26 ++++++----- tests/gateway/test_session_state_cleanup.py | 51 +++++++++++++++++++++ 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 049b07a80..6d2f65987 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -8524,18 +8524,14 @@ class GatewayRunner: logger.debug("goal continuation hook failed: %s", _goal_exc) return _agent_result finally: - # If _run_agent replaced the sentinel with a real agent and - # then cleaned it up, this is a no-op. If we exited early - # (exception, command fallthrough, etc.) the sentinel must - # not linger or the session would be permanently locked out. - if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL: - self._release_running_agent_state(_quick_key) - else: - # Agent path already cleaned _running_agents; make sure - # the paired metadata dicts are gone too. - self._running_agents_ts.pop(_quick_key, None) - if hasattr(self, "_busy_ack_ts"): - self._busy_ack_ts.pop(_quick_key, None) + # Unconditional release covers every exit path. _release_running_agent_state + # is idempotent (pop-on-absent is harmless) and, called without a + # run_generation guard, always clears the slot regardless of which + # generation it holds. This evicts the zombie left when session_reset + # bumps the generation (N -> N+1) mid-flight: gen-N's guarded release + # inside _run_agent returns False, and the old sentinel-only check here + # missed the leftover real agent — locking the session out forever (#28686). + self._release_running_agent_state(_quick_key) async def _prepare_inbound_message_text( self, @@ -10032,6 +10028,12 @@ class GatewayRunner: # Get existing session key session_key = self._session_key_for_source(source) self._invalidate_session_run_generation(session_key, reason="session_reset") + # Evict the running-agent slot now that the generation is bumped. The + # in-flight run's own guarded release (run_generation=old) will return + # False and leave its dead agent behind; clearing here keeps the slot + # from becoming a zombie that silently drops all later messages (#28686). + # Idempotent, so the run's finally calling it again is harmless. + self._release_running_agent_state(session_key) # Snapshot the old entry so on_session_finalize can report the # expiring session id before reset_session() rotates it. diff --git a/tests/gateway/test_session_state_cleanup.py b/tests/gateway/test_session_state_cleanup.py index ffbb465b7..dfde65eb3 100644 --- a/tests/gateway/test_session_state_cleanup.py +++ b/tests/gateway/test_session_state_cleanup.py @@ -228,3 +228,54 @@ class TestSessionDbCloseOnShutdown: flaky_db.close.assert_called_once() healthy_db.close.assert_called_once() + + +class TestSessionResetZombieRace: + """Regression for #28686 — a session_reset racing the in-flight run's + guarded release must not leave a dead agent locking the slot forever. + """ + + def test_generation_guard_blocks_then_unconditional_release_evicts(self): + runner = _make_runner() + runner._session_run_generation = {} + key = "agent:main:telegram:private:1" + + gen_n = runner._begin_session_run_generation(key) + dead_agent = MagicMock() + runner._running_agents[key] = dead_agent + runner._running_agents_ts[key] = 1.0 + runner._busy_ack_ts[key] = 1.0 + + # session_reset bumps the generation while gen-N is still in flight. + runner._invalidate_session_run_generation(key, reason="session_reset") + + # gen-N's own guarded release is correctly blocked — slot would be a + # zombie if nothing else cleared it (the pre-fix behaviour). + assert runner._release_running_agent_state(key, run_generation=gen_n) is False + assert runner._running_agents.get(key) is dead_agent + + # The fix: unconditional release (no run_generation) always clears it. + assert runner._release_running_agent_state(key) is True + assert key not in runner._running_agents + assert key not in runner._running_agents_ts + assert key not in runner._busy_ack_ts + + def test_normal_completion_is_not_evicted_by_outer_release(self): + """Guarded release with the current generation succeeds; the outer + unconditional release that follows is a harmless no-op. + """ + runner = _make_runner() + runner._session_run_generation = {} + key = "agent:main:telegram:private:2" + + gen = runner._begin_session_run_generation(key) + runner._running_agents[key] = MagicMock() + runner._running_agents_ts[key] = 1.0 + runner._busy_ack_ts[key] = 1.0 + + assert runner._release_running_agent_state(key, run_generation=gen) is True + assert key not in runner._running_agents + # Outer finally runs the unconditional release after — nothing stranded. + assert runner._release_running_agent_state(key) is True + assert key not in runner._running_agents_ts + assert key not in runner._busy_ack_ts