fix(gateway): retry startup auto-resume when a failed platform reconnects
This commit is contained in:
@ -4156,7 +4156,7 @@ class GatewayRunner:
|
||||
{"restart_timeout", "shutdown_timeout", "restart_interrupted"}
|
||||
)
|
||||
|
||||
def _schedule_resume_pending_sessions(self) -> int:
|
||||
def _schedule_resume_pending_sessions(self, platform=None) -> int:
|
||||
"""Auto-continue fresh restart-interrupted sessions after startup.
|
||||
|
||||
``resume_pending`` already preserves the transcript AND the existing
|
||||
@ -4169,7 +4169,15 @@ class GatewayRunner:
|
||||
Adapters that are not yet ready (adapter missing from
|
||||
``self.adapters``) are skipped silently; their sessions stay
|
||||
``resume_pending`` and will auto-resume on the next real user
|
||||
message, or on the next gateway startup.
|
||||
message, or when the platform reconnects — the reconnect watcher
|
||||
calls this again scoped to that ``platform``.
|
||||
|
||||
``platform`` (a ``Platform``) restricts the pass to sessions that
|
||||
originated on that platform. The reconnect path passes it so a
|
||||
platform coming back online retries only its own sessions and never
|
||||
re-touches another platform's in-flight recoveries. Sessions whose
|
||||
agent is already running are skipped regardless, so a session
|
||||
scheduled at startup is never resumed a second time.
|
||||
"""
|
||||
window = _auto_continue_freshness_window()
|
||||
try:
|
||||
@ -4181,6 +4189,7 @@ class GatewayRunner:
|
||||
and not entry.suspended
|
||||
and entry.origin is not None
|
||||
and entry.resume_reason in self._AUTO_RESUME_REASONS
|
||||
and (platform is None or entry.origin.platform == platform)
|
||||
]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to enumerate resume-pending sessions: %s", exc)
|
||||
@ -4193,6 +4202,11 @@ class GatewayRunner:
|
||||
if marker is not None and (now - marker).total_seconds() > window:
|
||||
continue
|
||||
|
||||
# Already being resumed (e.g. scheduled at startup and still
|
||||
# in-flight) — don't synthesize a second continuation turn.
|
||||
if entry.session_key in self._running_agents:
|
||||
continue
|
||||
|
||||
source = entry.origin
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter is None:
|
||||
@ -6272,6 +6286,21 @@ class GatewayRunner:
|
||||
await build_channel_directory(self.adapters)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# A platform that was offline at gateway startup never
|
||||
# got its restart-interrupted sessions auto-resumed —
|
||||
# the startup pass skips sessions whose adapter isn't
|
||||
# connected yet. Now that it's back, retry the
|
||||
# auto-resume scoped to this platform so recovery
|
||||
# doesn't silently wait for a manual user message.
|
||||
try:
|
||||
self._schedule_resume_pending_sessions(platform=platform)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"resume-pending reschedule after %s reconnect failed",
|
||||
platform.value,
|
||||
exc_info=True,
|
||||
)
|
||||
# Check if the failure is non-retryable
|
||||
elif adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||
self._update_platform_runtime_status(
|
||||
|
||||
@ -217,6 +217,54 @@ class TestPlatformReconnectWatcher:
|
||||
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||
assert Platform.TELEGRAM in runner.adapters
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_retries_resume_pending_for_platform(self):
|
||||
"""A successful reconnect retries the startup auto-resume scoped to
|
||||
that platform.
|
||||
|
||||
Regression: a platform offline at gateway startup had its
|
||||
restart-interrupted sessions skipped by the one-shot startup pass and
|
||||
never rescheduled, so the documented auto-resume silently dropped
|
||||
until the user sent a fresh message. The watcher now re-runs the
|
||||
platform-scoped auto-resume on reconnect.
|
||||
"""
|
||||
runner = _make_runner()
|
||||
runner._sync_voice_mode_state_to_adapter = MagicMock()
|
||||
runner._schedule_resume_pending_sessions = MagicMock(return_value=1)
|
||||
|
||||
platform_config = PlatformConfig(enabled=True, token="test")
|
||||
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||
"config": platform_config,
|
||||
"attempts": 1,
|
||||
"next_retry": time.monotonic() - 1,
|
||||
}
|
||||
|
||||
succeed_adapter = StubAdapter(succeed=True)
|
||||
real_sleep = asyncio.sleep
|
||||
|
||||
with patch.object(runner, "_create_adapter", return_value=succeed_adapter):
|
||||
with patch("gateway.run.build_channel_directory", create=True):
|
||||
async def run_one_iteration():
|
||||
runner._running = True
|
||||
call_count = 0
|
||||
|
||||
async def fake_sleep(n):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count > 1:
|
||||
runner._running = False
|
||||
await real_sleep(0)
|
||||
|
||||
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||
await runner._platform_reconnect_watcher()
|
||||
|
||||
await run_one_iteration()
|
||||
|
||||
assert Platform.TELEGRAM in runner.adapters
|
||||
runner._schedule_resume_pending_sessions.assert_called_once_with(
|
||||
platform=Platform.TELEGRAM
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_nonretryable_removed_from_queue(self):
|
||||
"""Non-retryable errors should remove the platform from the retry queue."""
|
||||
|
||||
@ -1060,6 +1060,134 @@ async def test_startup_auto_resume_skips_when_adapter_unavailable():
|
||||
adapter.handle_message.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_reschedules_pending_after_late_platform_connect():
|
||||
"""A platform offline at startup gets its pending sessions auto-resumed
|
||||
once it reconnects.
|
||||
|
||||
Regression: the startup pass skips sessions whose adapter isn't connected
|
||||
yet (see test_startup_auto_resume_skips_when_adapter_unavailable). Before
|
||||
the fix those sessions were never rescheduled and recovered only if the
|
||||
user sent a fresh message — the documented startup auto-resume silently
|
||||
dropped. The reconnect watcher now retries the platform-scoped pass.
|
||||
"""
|
||||
runner, adapter = make_restart_runner()
|
||||
source = make_restart_source(chat_id="late-chat")
|
||||
pending_entry = SessionEntry(
|
||||
session_key="agent:main:telegram:dm:late-chat",
|
||||
session_id="sid",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
origin=source,
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="dm",
|
||||
resume_pending=True,
|
||||
resume_reason="restart_interrupted",
|
||||
last_resume_marked_at=datetime.now(),
|
||||
)
|
||||
runner.session_store._entries = {pending_entry.session_key: pending_entry}
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
# Platform was not connected at gateway startup → session skipped.
|
||||
runner.adapters = {}
|
||||
assert runner._schedule_resume_pending_sessions() == 0
|
||||
adapter.handle_message.assert_not_called()
|
||||
|
||||
# Platform reconnects → its pending session is retried.
|
||||
runner.adapters = {Platform.TELEGRAM: adapter}
|
||||
scheduled = runner._schedule_resume_pending_sessions(platform=Platform.TELEGRAM)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert scheduled == 1
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
event = adapter.handle_message.await_args.args[0]
|
||||
assert isinstance(event, MessageEvent)
|
||||
assert event.internal is True
|
||||
assert event.message_type == MessageType.TEXT
|
||||
assert event.text == ""
|
||||
assert event.source == source
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_reschedule_is_platform_scoped():
|
||||
"""The platform filter limits the pass to that platform's sessions, so
|
||||
reconnecting one platform never resumes another's pending session."""
|
||||
runner, adapter = make_restart_runner()
|
||||
tg_source = make_restart_source(chat_id="tg-chat")
|
||||
discord_source = SessionSource(
|
||||
platform=Platform.DISCORD, chat_id="dc-chat", chat_type="dm", user_id="u1"
|
||||
)
|
||||
tg_entry = SessionEntry(
|
||||
session_key="agent:main:telegram:dm:tg-chat",
|
||||
session_id="sid-tg",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
origin=tg_source,
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="dm",
|
||||
resume_pending=True,
|
||||
resume_reason="restart_interrupted",
|
||||
last_resume_marked_at=datetime.now(),
|
||||
)
|
||||
discord_entry = SessionEntry(
|
||||
session_key="agent:main:discord:dm:dc-chat",
|
||||
session_id="sid-dc",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
origin=discord_source,
|
||||
platform=Platform.DISCORD,
|
||||
chat_type="dm",
|
||||
resume_pending=True,
|
||||
resume_reason="restart_interrupted",
|
||||
last_resume_marked_at=datetime.now(),
|
||||
)
|
||||
runner.session_store._entries = {
|
||||
tg_entry.session_key: tg_entry,
|
||||
discord_entry.session_key: discord_entry,
|
||||
}
|
||||
adapter.handle_message = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: adapter}
|
||||
|
||||
scheduled = runner._schedule_resume_pending_sessions(platform=Platform.TELEGRAM)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# Only the telegram session is resumed; the discord session waits for its
|
||||
# own reconnect.
|
||||
assert scheduled == 1
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
event = adapter.handle_message.await_args.args[0]
|
||||
assert event.source == tg_source
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_resume_skips_sessions_with_running_agent():
|
||||
"""A session already being resumed (agent in-flight) is not scheduled
|
||||
again — guards against a double resume when a platform reconnects while a
|
||||
startup-scheduled resume is still running."""
|
||||
runner, adapter = make_restart_runner()
|
||||
source = make_restart_source(chat_id="inflight-chat")
|
||||
pending_entry = SessionEntry(
|
||||
session_key="agent:main:telegram:dm:inflight-chat",
|
||||
session_id="sid",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
origin=source,
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="dm",
|
||||
resume_pending=True,
|
||||
resume_reason="restart_interrupted",
|
||||
last_resume_marked_at=datetime.now(),
|
||||
)
|
||||
runner.session_store._entries = {pending_entry.session_key: pending_entry}
|
||||
runner._running_agents = {pending_entry.session_key: object()}
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
scheduled = runner._schedule_resume_pending_sessions(platform=Platform.TELEGRAM)
|
||||
|
||||
assert scheduled == 0
|
||||
adapter.handle_message.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shutdown banner wording
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user