test(gateway): regression tests for #30170 subagent interrupt protection
17 new tests in tests/gateway/test_subagent_protection_30170.py pin
down both the detection helper and the demotion behaviour:
* TestAgentHasActiveSubagents — 11 cases covering the precision and
defensiveness of _agent_has_active_subagents:
- returns False for None, _AGENT_PENDING_SENTINEL, and stub
agents that lack the _active_children attribute;
- returns False for an empty list (the steady state of an idle
AIAgent);
- returns True for one or many children;
- works when _active_children_lock is None (test stubs);
- rejects truthy MagicMock auto-attributes — this is the
regression-guard for "every MagicMock-based gateway test
suddenly demotes to queue mode" (which is how this was
originally found);
- accepts list/tuple/set as the children container.
* TestBusyHandlerDemotesInterruptForSubagents — 6 cases driving
_handle_active_session_busy_message directly:
- parent.interrupt is NOT called when subagents are active,
message is still merged into the pending queue;
- ack copy mentions "Subagent working", "queued", and the
/stop escape hatch — and does NOT mention "Interrupting";
- with no subagents, behaviour is byte-identical to the
pre-#30170 interrupt path (parent.interrupt called with the
user text, ack says "Interrupting");
- configured queue mode keeps its vanilla "Queued for the next
turn" ack (the #30170 demotion-specific copy must NOT fire);
- configured steer mode still routes to running_agent.steer()
even when subagents are active (the guard is interrupt-only);
- _AGENT_PENDING_SENTINEL does not trigger demotion.
Refs #30170.
This commit is contained in:
348
tests/gateway/test_subagent_protection_30170.py
Normal file
348
tests/gateway/test_subagent_protection_30170.py
Normal file
@ -0,0 +1,348 @@
|
||||
"""Regression tests for #30170.
|
||||
|
||||
#30170: Sending a message while ``delegate_task`` is running killed the
|
||||
subagent because the gateway always called ``running_agent.interrupt()``
|
||||
on the parent, which then cascaded synchronously through
|
||||
``AIAgent._active_children`` and aborted every in-flight subagent. The
|
||||
reporter (and the linked Phase-1 spec) asked for the gateway to demote
|
||||
``busy_input_mode='interrupt'`` to ``queue`` semantics whenever the
|
||||
parent is currently driving subagents, while leaving explicit ``/stop``
|
||||
and ``/new`` slash commands untouched.
|
||||
|
||||
These tests pin down the gateway-side guard introduced for #30170:
|
||||
|
||||
* ``GatewayRunner._agent_has_active_subagents`` correctly recognises
|
||||
parents that own real children, without false-positives from a
|
||||
``MagicMock()._active_children`` auto-attribute, missing locks, or
|
||||
the ``_AGENT_PENDING_SENTINEL`` placeholder.
|
||||
* ``_handle_active_session_busy_message`` demotes the interrupt mode to
|
||||
queue semantics (no ``interrupt()`` call, message merged into the
|
||||
pending queue, ack reflects the demotion) when the parent has active
|
||||
subagents.
|
||||
* The ``queue`` and ``steer`` configured modes still behave exactly as
|
||||
before — the guard is interrupt-only.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
# Minimal stubs so gateway imports cleanly (mirrors test_busy_session_ack)
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
_tg = types.ModuleType("telegram")
|
||||
_tg.constants = types.ModuleType("telegram.constants")
|
||||
_ct = MagicMock()
|
||||
_ct.SUPERGROUP = "supergroup"
|
||||
_ct.GROUP = "group"
|
||||
_ct.PRIVATE = "private"
|
||||
_tg.constants.ChatType = _ct
|
||||
sys.modules.setdefault("telegram", _tg)
|
||||
sys.modules.setdefault("telegram.constants", _tg.constants)
|
||||
sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext"))
|
||||
|
||||
from gateway.platforms.base import ( # noqa: E402
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
SessionSource,
|
||||
build_session_key,
|
||||
)
|
||||
from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL # noqa: E402
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
# Builders (parallel to tests/gateway/test_busy_session_ack.py)
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
def _make_event(text: str = "hello", chat_id: str = "123") -> MessageEvent:
|
||||
source = SessionSource(
|
||||
platform=MagicMock(value="telegram"),
|
||||
chat_id=chat_id,
|
||||
chat_type="private",
|
||||
user_id="user1",
|
||||
)
|
||||
return MessageEvent(
|
||||
text=text,
|
||||
message_type=MessageType.TEXT,
|
||||
source=source,
|
||||
message_id="msg1",
|
||||
)
|
||||
|
||||
|
||||
def _make_runner() -> GatewayRunner:
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._running_agents = {}
|
||||
runner._running_agents_ts = {}
|
||||
runner._pending_messages = {}
|
||||
runner._busy_ack_ts = {}
|
||||
runner._draining = False
|
||||
runner.adapters = {}
|
||||
runner.config = MagicMock()
|
||||
runner.session_store = None
|
||||
runner.hooks = MagicMock()
|
||||
runner.hooks.emit = AsyncMock()
|
||||
runner.pairing_store = MagicMock()
|
||||
runner.pairing_store.is_approved.return_value = True
|
||||
runner._is_user_authorized = lambda _source: True
|
||||
return runner
|
||||
|
||||
|
||||
def _make_adapter() -> MagicMock:
|
||||
adapter = MagicMock()
|
||||
adapter._pending_messages = {}
|
||||
adapter._send_with_retry = AsyncMock()
|
||||
adapter.config = MagicMock()
|
||||
adapter.config.extra = {}
|
||||
adapter.platform = MagicMock(value="telegram")
|
||||
return adapter
|
||||
|
||||
|
||||
def _make_parent_with_subagents(
|
||||
*, children: int = 1, with_lock: bool = True
|
||||
) -> MagicMock:
|
||||
"""A MagicMock shaped like an AIAgent that currently owns *children* subagents."""
|
||||
parent = MagicMock()
|
||||
parent._active_children = [MagicMock() for _ in range(children)]
|
||||
parent._active_children_lock = threading.Lock() if with_lock else None
|
||||
parent.get_activity_summary.return_value = {
|
||||
"api_call_count": 7,
|
||||
"max_iterations": 60,
|
||||
"current_tool": "delegate_task",
|
||||
}
|
||||
return parent
|
||||
|
||||
|
||||
def _make_parent_no_subagents() -> MagicMock:
|
||||
"""A MagicMock shaped like an AIAgent that is NOT delegating."""
|
||||
parent = MagicMock()
|
||||
parent._active_children = []
|
||||
parent._active_children_lock = threading.Lock()
|
||||
parent.get_activity_summary.return_value = {
|
||||
"api_call_count": 3,
|
||||
"max_iterations": 60,
|
||||
"current_tool": "terminal",
|
||||
}
|
||||
return parent
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
# _agent_has_active_subagents
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
class TestAgentHasActiveSubagents:
|
||||
"""The detection helper must be both precise and defensive."""
|
||||
|
||||
def test_returns_false_for_none(self) -> None:
|
||||
assert GatewayRunner._agent_has_active_subagents(None) is False
|
||||
|
||||
def test_returns_false_for_pending_sentinel(self) -> None:
|
||||
assert (
|
||||
GatewayRunner._agent_has_active_subagents(_AGENT_PENDING_SENTINEL)
|
||||
is False
|
||||
)
|
||||
|
||||
def test_returns_false_when_attribute_missing(self) -> None:
|
||||
"""Production AIAgents always have _active_children, but the helper
|
||||
must not blow up on test stubs or partial mocks."""
|
||||
|
||||
class StubAgent:
|
||||
pass
|
||||
|
||||
assert GatewayRunner._agent_has_active_subagents(StubAgent()) is False
|
||||
|
||||
def test_returns_false_for_empty_list(self) -> None:
|
||||
assert (
|
||||
GatewayRunner._agent_has_active_subagents(_make_parent_no_subagents())
|
||||
is False
|
||||
)
|
||||
|
||||
def test_returns_true_for_single_child(self) -> None:
|
||||
assert (
|
||||
GatewayRunner._agent_has_active_subagents(_make_parent_with_subagents())
|
||||
is True
|
||||
)
|
||||
|
||||
def test_returns_true_for_many_children(self) -> None:
|
||||
assert (
|
||||
GatewayRunner._agent_has_active_subagents(
|
||||
_make_parent_with_subagents(children=5)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
def test_works_without_lock(self) -> None:
|
||||
"""``_active_children_lock`` is optional in test stubs."""
|
||||
assert (
|
||||
GatewayRunner._agent_has_active_subagents(
|
||||
_make_parent_with_subagents(with_lock=False)
|
||||
)
|
||||
is True
|
||||
)
|
||||
|
||||
def test_rejects_truthy_non_collection_attribute(self) -> None:
|
||||
"""The MagicMock auto-attribute regression. ``MagicMock()._active_children``
|
||||
is itself a truthy MagicMock — without the isinstance guard, the
|
||||
helper would falsely report subagents on every test mock."""
|
||||
parent = MagicMock() # no explicit _active_children setup
|
||||
assert GatewayRunner._agent_has_active_subagents(parent) is False
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"container",
|
||||
[(MagicMock(),), {MagicMock()}, [MagicMock()]],
|
||||
ids=["tuple", "set", "list"],
|
||||
)
|
||||
def test_accepts_list_tuple_set(self, container: Any) -> None:
|
||||
parent = MagicMock()
|
||||
parent._active_children = container
|
||||
parent._active_children_lock = threading.Lock()
|
||||
assert GatewayRunner._agent_has_active_subagents(parent) is True
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
# _handle_active_session_busy_message — interrupt demotion
|
||||
# ──────────────────────────────────────────────────────────────────────
|
||||
class TestBusyHandlerDemotesInterruptForSubagents:
|
||||
"""The Phase-1 fix from #30170: parent.interrupt() must NOT fire when
|
||||
the parent is currently driving subagents."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_does_not_call_interrupt_when_subagents_active(self) -> None:
|
||||
runner = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
event = _make_event(text="follow up while subagent runs")
|
||||
sk = build_session_key(event.source)
|
||||
parent = _make_parent_with_subagents()
|
||||
runner._running_agents[sk] = parent
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event") as merge_mock:
|
||||
handled = await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
assert handled is True
|
||||
parent.interrupt.assert_not_called()
|
||||
# Message must still be queued so it gets picked up on the next turn.
|
||||
merge_mock.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ack_explains_the_demotion(self) -> None:
|
||||
"""The user-visible ack must mention the subagent context AND
|
||||
the `/stop` escape hatch so the operator can self-correct."""
|
||||
runner = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
event = _make_event(text="hi mid-delegation")
|
||||
sk = build_session_key(event.source)
|
||||
parent = _make_parent_with_subagents()
|
||||
runner._running_agents[sk] = parent
|
||||
runner._running_agents_ts[sk] = time.time() - 120
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event"):
|
||||
await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
adapter._send_with_retry.assert_called_once()
|
||||
content = adapter._send_with_retry.call_args.kwargs.get("content", "")
|
||||
assert "Subagent working" in content
|
||||
assert "queued" in content.lower()
|
||||
assert "/stop" in content
|
||||
assert "Interrupting" not in content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_interrupt_still_fires_when_no_subagents(self) -> None:
|
||||
"""Regression-guard the other direction: with no subagents the
|
||||
demotion must NOT trigger and behaviour must be byte-identical
|
||||
to the pre-#30170 interrupt path."""
|
||||
runner = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
event = _make_event(text="please stop")
|
||||
sk = build_session_key(event.source)
|
||||
parent = _make_parent_no_subagents()
|
||||
runner._running_agents[sk] = parent
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event"):
|
||||
await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
parent.interrupt.assert_called_once_with("please stop")
|
||||
content = adapter._send_with_retry.call_args.kwargs.get("content", "")
|
||||
assert "Interrupting" in content
|
||||
assert "Subagent" not in content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queue_mode_unchanged_with_subagents(self) -> None:
|
||||
"""Configured ``queue`` mode is already subagent-safe; the new
|
||||
guard must not change its behaviour or its ack text."""
|
||||
runner = _make_runner()
|
||||
runner._busy_input_mode = "queue"
|
||||
adapter = _make_adapter()
|
||||
event = _make_event(text="queued during delegate")
|
||||
sk = build_session_key(event.source)
|
||||
parent = _make_parent_with_subagents()
|
||||
runner._running_agents[sk] = parent
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event"):
|
||||
await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
parent.interrupt.assert_not_called()
|
||||
content = adapter._send_with_retry.call_args.kwargs.get("content", "")
|
||||
# The vanilla queue copy — NOT the #30170 "Subagent working" copy,
|
||||
# because the user explicitly asked for queue mode.
|
||||
assert "Queued for the next turn" in content
|
||||
assert "respond once the current task finishes" in content
|
||||
assert "Subagent working" not in content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_steer_mode_still_routes_through_running_agent_steer(
|
||||
self,
|
||||
) -> None:
|
||||
"""Configured ``steer`` mode must reach ``running_agent.steer()``
|
||||
even when subagents are active — the #30170 demotion is
|
||||
interrupt-specific so it doesn't accidentally disable steer."""
|
||||
runner = _make_runner()
|
||||
runner._busy_input_mode = "steer"
|
||||
adapter = _make_adapter()
|
||||
event = _make_event(text="course-correct")
|
||||
sk = build_session_key(event.source)
|
||||
parent = _make_parent_with_subagents()
|
||||
parent.steer = MagicMock(return_value=True)
|
||||
runner._running_agents[sk] = parent
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event"):
|
||||
await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
parent.steer.assert_called_once_with("course-correct")
|
||||
parent.interrupt.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pending_sentinel_does_not_demote(self) -> None:
|
||||
"""The placeholder ``_AGENT_PENDING_SENTINEL`` is not a real
|
||||
agent — the guard must not treat it as having subagents.
|
||||
Otherwise we'd permanently queue messages for sessions that
|
||||
haven't actually started running yet."""
|
||||
runner = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
event = _make_event(text="follow up before start")
|
||||
sk = build_session_key(event.source)
|
||||
runner._running_agents[sk] = _AGENT_PENDING_SENTINEL
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event"):
|
||||
handled = await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
assert handled is True
|
||||
# Sentinel can't be interrupted (no .interrupt to call) — verify
|
||||
# that the helper still returns the "interrupting" copy because
|
||||
# demotion did NOT fire (and the sentinel branch in the real
|
||||
# handler just skips the interrupt call silently).
|
||||
content = adapter._send_with_retry.call_args.kwargs.get("content", "")
|
||||
assert "Subagent working" not in content
|
||||
Reference in New Issue
Block a user