fix(gateway): preserve session guard across in-band drain handoff
When the in-band pending-message drain spawns a fresh task and transfers ownership via _session_tasks[session_key] = drain_task, the original task still unwinds through the finally block. The drain task picks up the same interrupt_event in its own _process_message_background entry, so an unconditional _release_session_guard(session_key, guard=interrupt_event) at the end of the finally matches and deletes _active_sessions[session_key] while the drain task is still pending its first await. A concurrent inbound message arriving in that handoff window passes the Level-1 guard (no entry exists) and spawns a second _process_message_background for the same session — two agents on one session_key, duplicate responses, duplicate tool calls. Fix: only call _release_session_guard when the current task still owns _session_tasks[session_key]. When ownership has been transferred to a drain task, leave _active_sessions populated; the drain task's own lifecycle releases it. This mirrors the late-arrival drain path in the same finally block, which already leaves both entries alone after handing off. Also reorder stdlib imports in the new regression test file to match the gateway test convention (stdlib before third-party). Regression test: capture _active_sessions[sk] identity at every handler entry across a 2-step in-band drain chain and assert the guard Event identity stays the same. Pre-fix, the original task's finally deletes the entry, the drain task falls through to the `or asyncio.Event()` branch, and a fresh Event is installed — identity diverges. Post-fix, the entry is preserved and the drain task reuses the original Event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@ -2832,10 +2832,23 @@ class BasePlatformAdapter(ABC):
|
||||
# reset-like command that already swapped in its own
|
||||
# command_guard (and cancelled us) can't be accidentally
|
||||
# cleared by our unwind. The command owns the session now.
|
||||
#
|
||||
# The owner-check also covers the in-band drain handoff
|
||||
# above: when we spawned a drain_task and transferred
|
||||
# ownership via ``_session_tasks[session_key] = drain_task``,
|
||||
# ``_session_tasks.get(session_key) is current_task`` is
|
||||
# False, so we leave _active_sessions populated. Without
|
||||
# this guard, the drain task picks up the same
|
||||
# interrupt_event in its own _process_message_background
|
||||
# entry, _release_session_guard's guard-match succeeds,
|
||||
# and we'd delete the entry while the drain task is still
|
||||
# running — letting a concurrent inbound message pass
|
||||
# the Level-1 guard and spawn a second handler for the
|
||||
# same session.
|
||||
current_task = asyncio.current_task()
|
||||
if current_task is not None and self._session_tasks.get(session_key) is current_task:
|
||||
del self._session_tasks[session_key]
|
||||
self._release_session_guard(session_key, guard=interrupt_event)
|
||||
self._release_session_guard(session_key, guard=interrupt_event)
|
||||
|
||||
async def cancel_background_tasks(self) -> None:
|
||||
"""Cancel any in-flight background message-processing tasks.
|
||||
|
||||
@ -21,9 +21,9 @@ task spawning keeps it constant (1 every time).
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import (
|
||||
@ -127,3 +127,62 @@ async def test_in_band_drain_does_not_grow_stack():
|
||||
f"in-band drain is recursing instead of spawning a fresh task — "
|
||||
f"stack depth grew with chain length: {depths!r}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_in_band_drain_preserves_active_session_guard():
|
||||
"""The original task must NOT release ``_active_sessions[session_key]``
|
||||
after handing off to the drain task.
|
||||
|
||||
When the in-band drain spawns ``drain_task`` and transfers ownership
|
||||
via ``_session_tasks[session_key] = drain_task``, the original task
|
||||
still unwinds through the ``finally`` block. The drain task picks
|
||||
up the same ``interrupt_event`` in its own
|
||||
``_process_message_background`` entry, so a naive
|
||||
``_release_session_guard(session_key, guard=interrupt_event)`` in
|
||||
the unwind matches and deletes ``_active_sessions[session_key]``.
|
||||
That briefly reopens the Level-1 guard between the original task's
|
||||
finally and the drain task's first await — a concurrent inbound
|
||||
arriving in that window passes the guard and spawns a second
|
||||
handler for the same session.
|
||||
|
||||
Invariant: ``_active_sessions[sk]`` must hold the SAME interrupt
|
||||
Event identity at every handler entry across an in-band drain
|
||||
chain. Pre-fix, the original task's finally deletes the entry, so
|
||||
the drain task falls through to the ``or asyncio.Event()`` branch
|
||||
in ``_process_message_background`` and installs a *new* Event —
|
||||
the identity diverges. Post-fix, the entry is preserved across
|
||||
handoff and the drain task reuses the original Event.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
sk = _sk()
|
||||
|
||||
seen_guards: list = []
|
||||
|
||||
async def handler(event):
|
||||
seen_guards.append(adapter._active_sessions.get(sk))
|
||||
if len(seen_guards) == 1:
|
||||
adapter._pending_messages[sk] = _make_event(text="M1")
|
||||
return "ok"
|
||||
|
||||
adapter._message_handler = handler
|
||||
|
||||
await adapter.handle_message(_make_event(text="M0"))
|
||||
|
||||
for _ in range(400):
|
||||
if len(seen_guards) >= 2 and sk not in adapter._active_sessions:
|
||||
break
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
await adapter.cancel_background_tasks()
|
||||
|
||||
assert len(seen_guards) == 2, f"expected 2 handler runs, got {len(seen_guards)}"
|
||||
assert seen_guards[0] is not None, "M0 saw no active-session guard"
|
||||
assert seen_guards[1] is not None, "M1 saw no active-session guard"
|
||||
assert seen_guards[0] is seen_guards[1], (
|
||||
"in-band drain handoff replaced the active-session guard — the "
|
||||
"original task's finally deleted _active_sessions[sk] and the "
|
||||
"drain task installed a new Event. Concurrent inbounds during "
|
||||
"the handoff window would bypass the Level-1 guard and spawn a "
|
||||
"second handler for the same session."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user