fix(gateway): clear zombie agent slot when session_reset races in-flight run
A session_reset (/new, /cc) that bumps the run generation while an agent turn is in flight left the dead agent in the _running_agents slot: the in-flight run's own release is generation-guarded and correctly returns False, and the outer finally's sentinel-only check also missed the leftover real agent. The session then silently dropped every subsequent message as 'agent busy' until a full gateway restart. (#28686) - _process_message_or_command outer finally now calls the unconditional, idempotent _release_running_agent_state(key) on all exit paths instead of the sentinel-vs-else branch that could strand a dead agent. - _handle_reset_command evicts the slot right after bumping the generation, so the zombie is cleared at reset time regardless of how the in-flight run unwinds. Co-authored-by: CryptoByz <cryptobyz.airdrop@gmail.com>
This commit is contained in:
@ -8524,18 +8524,14 @@ class GatewayRunner:
|
||||
logger.debug("goal continuation hook failed: %s", _goal_exc)
|
||||
return _agent_result
|
||||
finally:
|
||||
# If _run_agent replaced the sentinel with a real agent and
|
||||
# then cleaned it up, this is a no-op. If we exited early
|
||||
# (exception, command fallthrough, etc.) the sentinel must
|
||||
# not linger or the session would be permanently locked out.
|
||||
if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL:
|
||||
self._release_running_agent_state(_quick_key)
|
||||
else:
|
||||
# Agent path already cleaned _running_agents; make sure
|
||||
# the paired metadata dicts are gone too.
|
||||
self._running_agents_ts.pop(_quick_key, None)
|
||||
if hasattr(self, "_busy_ack_ts"):
|
||||
self._busy_ack_ts.pop(_quick_key, None)
|
||||
# Unconditional release covers every exit path. _release_running_agent_state
|
||||
# is idempotent (pop-on-absent is harmless) and, called without a
|
||||
# run_generation guard, always clears the slot regardless of which
|
||||
# generation it holds. This evicts the zombie left when session_reset
|
||||
# bumps the generation (N -> N+1) mid-flight: gen-N's guarded release
|
||||
# inside _run_agent returns False, and the old sentinel-only check here
|
||||
# missed the leftover real agent — locking the session out forever (#28686).
|
||||
self._release_running_agent_state(_quick_key)
|
||||
|
||||
async def _prepare_inbound_message_text(
|
||||
self,
|
||||
@ -10032,6 +10028,12 @@ class GatewayRunner:
|
||||
# Get existing session key
|
||||
session_key = self._session_key_for_source(source)
|
||||
self._invalidate_session_run_generation(session_key, reason="session_reset")
|
||||
# Evict the running-agent slot now that the generation is bumped. The
|
||||
# in-flight run's own guarded release (run_generation=old) will return
|
||||
# False and leave its dead agent behind; clearing here keeps the slot
|
||||
# from becoming a zombie that silently drops all later messages (#28686).
|
||||
# Idempotent, so the run's finally calling it again is harmless.
|
||||
self._release_running_agent_state(session_key)
|
||||
|
||||
# Snapshot the old entry so on_session_finalize can report the
|
||||
# expiring session id before reset_session() rotates it.
|
||||
|
||||
@ -228,3 +228,54 @@ class TestSessionDbCloseOnShutdown:
|
||||
|
||||
flaky_db.close.assert_called_once()
|
||||
healthy_db.close.assert_called_once()
|
||||
|
||||
|
||||
class TestSessionResetZombieRace:
|
||||
"""Regression for #28686 — a session_reset racing the in-flight run's
|
||||
guarded release must not leave a dead agent locking the slot forever.
|
||||
"""
|
||||
|
||||
def test_generation_guard_blocks_then_unconditional_release_evicts(self):
|
||||
runner = _make_runner()
|
||||
runner._session_run_generation = {}
|
||||
key = "agent:main:telegram:private:1"
|
||||
|
||||
gen_n = runner._begin_session_run_generation(key)
|
||||
dead_agent = MagicMock()
|
||||
runner._running_agents[key] = dead_agent
|
||||
runner._running_agents_ts[key] = 1.0
|
||||
runner._busy_ack_ts[key] = 1.0
|
||||
|
||||
# session_reset bumps the generation while gen-N is still in flight.
|
||||
runner._invalidate_session_run_generation(key, reason="session_reset")
|
||||
|
||||
# gen-N's own guarded release is correctly blocked — slot would be a
|
||||
# zombie if nothing else cleared it (the pre-fix behaviour).
|
||||
assert runner._release_running_agent_state(key, run_generation=gen_n) is False
|
||||
assert runner._running_agents.get(key) is dead_agent
|
||||
|
||||
# The fix: unconditional release (no run_generation) always clears it.
|
||||
assert runner._release_running_agent_state(key) is True
|
||||
assert key not in runner._running_agents
|
||||
assert key not in runner._running_agents_ts
|
||||
assert key not in runner._busy_ack_ts
|
||||
|
||||
def test_normal_completion_is_not_evicted_by_outer_release(self):
|
||||
"""Guarded release with the current generation succeeds; the outer
|
||||
unconditional release that follows is a harmless no-op.
|
||||
"""
|
||||
runner = _make_runner()
|
||||
runner._session_run_generation = {}
|
||||
key = "agent:main:telegram:private:2"
|
||||
|
||||
gen = runner._begin_session_run_generation(key)
|
||||
runner._running_agents[key] = MagicMock()
|
||||
runner._running_agents_ts[key] = 1.0
|
||||
runner._busy_ack_ts[key] = 1.0
|
||||
|
||||
assert runner._release_running_agent_state(key, run_generation=gen) is True
|
||||
assert key not in runner._running_agents
|
||||
# Outer finally runs the unconditional release after — nothing stranded.
|
||||
assert runner._release_running_agent_state(key) is True
|
||||
assert key not in runner._running_agents_ts
|
||||
assert key not in runner._busy_ack_ts
|
||||
|
||||
Reference in New Issue
Block a user