From 32032e1e2d9bf909f26862373c8e3dbbb6929460 Mon Sep 17 00:00:00 2001 From: maxcz79 Date: Sat, 16 May 2026 22:54:28 +0200 Subject: [PATCH] fix(simplex): avoid reconnecting healthy idle websocket Do not treat lack of application-level SimpleX events as a stale WebSocket. The websockets client already uses protocol ping/pong for connection liveness, so quiet but healthy connections should not be closed by the health monitor. --- plugins/platforms/simplex/adapter.py | 44 +++++++++++++-------- tests/gateway/test_simplex_plugin.py | 59 ++++++++++++++++++++++------ 2 files changed, 73 insertions(+), 30 deletions(-) diff --git a/plugins/platforms/simplex/adapter.py b/plugins/platforms/simplex/adapter.py index 9c3d22a42..52f93dedc 100644 --- a/plugins/platforms/simplex/adapter.py +++ b/plugins/platforms/simplex/adapter.py @@ -269,7 +269,13 @@ class SimplexAdapter(BasePlatformAdapter): # ------------------------------------------------------------------ async def _health_monitor(self) -> None: - """Force reconnect if the WebSocket has been idle too long.""" + """Observe WebSocket idleness without reconnecting healthy quiet links. + + simplex-chat can legitimately stay application-silent for long periods + when no messages arrive. The websockets client already sends protocol + pings (see _ws_listener ping_interval/ping_timeout), so treating lack of + chat events as a stale connection causes needless reconnect churn. + """ while self._running: await asyncio.sleep(HEALTH_CHECK_INTERVAL) if not self._running: @@ -277,15 +283,7 @@ class SimplexAdapter(BasePlatformAdapter): elapsed = time.time() - self._last_ws_activity if elapsed > HEALTH_CHECK_STALE_THRESHOLD: - logger.warning( - "SimpleX: WS idle for %.0fs, forcing reconnect", elapsed - ) - self._last_ws_activity = time.time() - if self._ws: - try: - await self._ws.close() - except Exception: - pass + logger.debug("SimpleX: WS application-idle for %.0fs", elapsed) # ------------------------------------------------------------------ # Inbound event handling @@ -293,7 +291,12 @@ class SimplexAdapter(BasePlatformAdapter): async def _handle_event(self, event: dict) -> None: """Dispatch a daemon event to the appropriate handler.""" - resp_type = event.get("type") or event.get("resp", {}).get("type", "") + # simplex-chat WebSocket messages are usually shaped as: + # {"corrId": "...", "resp": {"type": "newChatItems", ...}} + # Older/examples may put the response fields at top-level. Normalize + # both forms before dispatching, otherwise inbound chatItems are lost. + resp = event.get("resp") if isinstance(event.get("resp"), dict) else event + resp_type = event.get("type") or resp.get("type", "") # Filter responses to our own commands (echoes) corr_id = event.get("corrId", "") @@ -302,10 +305,10 @@ class SimplexAdapter(BasePlatformAdapter): return if resp_type == "newChatItem": - await self._handle_new_chat_item(event) + await self._handle_new_chat_item(resp) elif resp_type == "newChatItems": # Batch variant — process each item - items = event.get("chatItems") or [] + items = resp.get("chatItems") or [] for item_wrapper in items: await self._handle_new_chat_item(item_wrapper) # Ignore all other event types (delivery receipts, contact updates, etc.) @@ -347,7 +350,9 @@ class SimplexAdapter(BasePlatformAdapter): or contact_info.get("localDisplayName") or contact_id ) - chat_id = contact_id + # Replies must be routed by SimpleX CLI display name, while + # authorization should still use the stable numeric contactId. + chat_id = contact_name or contact_id chat_name = contact_name if not chat_id: @@ -364,7 +369,7 @@ class SimplexAdapter(BasePlatformAdapter): or sender_id ) else: - sender_id = chat_id + sender_id = contact_id if not is_group else chat_id sender_name = chat_name # Extract text @@ -508,7 +513,11 @@ class SimplexAdapter(BasePlatformAdapter): group_id = chat_id[6:] cmd_str = f"#[{group_id}] {content}" else: - cmd_str = f"@[{chat_id}] {content}" + # SimpleX CLI addresses direct contacts by display name, e.g. + # `@Alice hello`. `@[Alice]` is interpreted literally as a contact + # named "[Alice]" and `@[4]` as "[4]", so do not wrap direct + # chat IDs / display names in brackets. + cmd_str = f"@{chat_id} {content}" payload = { "corrId": corr_id, @@ -643,7 +652,8 @@ async def _standalone_send( group_id = chat_id[6:] cmd_str = f"#[{group_id}] {message}" else: - cmd_str = f"@[{chat_id}] {message}" + # Direct contacts are addressed by display name without brackets. + cmd_str = f"@{chat_id} {message}" payload = { "corrId": f"hermes-snd-{int(time.time() * 1000)}", diff --git a/tests/gateway/test_simplex_plugin.py b/tests/gateway/test_simplex_plugin.py index 1048168aa..535f3d909 100644 --- a/tests/gateway/test_simplex_plugin.py +++ b/tests/gateway/test_simplex_plugin.py @@ -7,6 +7,7 @@ sibling platform-plugin tests on the same xdist worker. from __future__ import annotations +import asyncio import json from unittest.mock import AsyncMock, MagicMock @@ -214,7 +215,7 @@ async def test_send_dm(): result = await adapter.send("contact-42", "Hello, SimpleX!") mock_ws.send.assert_called_once() payload = json.loads(mock_ws.send.call_args[0][0]) - assert payload["cmd"] == "@[contact-42] Hello, SimpleX!" + assert payload["cmd"] == "@contact-42 Hello, SimpleX!" assert payload["corrId"].startswith(_CORR_PREFIX) assert result.success is True @@ -301,23 +302,55 @@ async def test_standalone_send_missing_websockets(monkeypatch): @pytest.mark.asyncio -async def test_standalone_send_missing_url(monkeypatch): +async def test_standalone_send_defaults_to_local_daemon(monkeypatch): monkeypatch.delenv("SIMPLEX_WS_URL", raising=False) pconfig = MagicMock() pconfig.extra = {} - # We expect the URL fallback (extra+env both empty) to be empty string, - # producing an error. We also need websockets to be importable for the - # url-check branch to be reached, so skip when it's not. - try: - import websockets.client # noqa: F401 - except ImportError: - pytest.skip("websockets not installed") + + sent_payloads = [] + + class DummyWs: + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def send(self, payload): + sent_payloads.append(json.loads(payload)) + + def fake_connect(url, **kwargs): + assert url == "ws://127.0.0.1:5225" + assert kwargs["open_timeout"] == 10 + assert kwargs["close_timeout"] == 5 + return DummyWs() + + import websockets + monkeypatch.setattr(websockets, "connect", fake_connect) result = await _standalone_send(pconfig, "contact-42", "hi") - assert isinstance(result, dict) - # Either error about URL or a connection attempt failure — both are valid - # signals that the standalone path requires configuration. - assert "error" in result + assert result == {"success": True, "platform": "simplex", "chat_id": "contact-42"} + assert sent_payloads[0]["cmd"] == "@contact-42 hi" + + +@pytest.mark.asyncio +async def test_health_monitor_does_not_reconnect_quiet_healthy_ws(monkeypatch): + from gateway.config import PlatformConfig + cfg = PlatformConfig(enabled=True, extra={"ws_url": "ws://localhost:5225"}) + adapter = SimplexAdapter(cfg) + adapter._running = True + adapter._last_ws_activity = 0 + adapter._ws = AsyncMock() + + monkeypatch.setattr(_simplex, "HEALTH_CHECK_INTERVAL", 0.01) + monkeypatch.setattr(_simplex, "HEALTH_CHECK_STALE_THRESHOLD", 0.01) + + task = asyncio.create_task(adapter._health_monitor()) + await asyncio.sleep(0.03) + adapter._running = False + await asyncio.wait_for(task, timeout=1) + + adapter._ws.close.assert_not_called() # ---------------------------------------------------------------------------