diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index d601792ab..36bb3dd21 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1184,9 +1184,16 @@ class WeixinAdapter(BasePlatformAdapter): # iLink delivers messages individually, so rapid multi-message # bursts (forwarded batches, paste-splits) each trigger a # separate agent invocation. Default 3s delay / 5s split delay - # are tuned for iLink's typical delivery cadence. - self._text_batch_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_DELAY_SECONDS", "3.0")) - self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_SPLIT_DELAY_SECONDS", "5.0")) + # are tuned for iLink's typical delivery cadence. Tunable via + # config.yaml under + # ``gateway.platforms.weixin.extra.text_batch_delay_seconds`` / + # ``text_batch_split_delay_seconds``. + self._text_batch_delay_seconds = self._coerce_float_extra( + "text_batch_delay_seconds", 3.0 + ) + self._text_batch_split_delay_seconds = self._coerce_float_extra( + "text_batch_split_delay_seconds", 5.0 + ) self._pending_text_batches: Dict[str, MessageEvent] = {} self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} @@ -1196,6 +1203,25 @@ class WeixinAdapter(BasePlatformAdapter): self._token = str(persisted.get("token") or "").strip() self._base_url = str(persisted.get("base_url") or self._base_url).strip().rstrip("/") + def _coerce_float_extra(self, key: str, default: float) -> float: + """Read a float from ``config.extra``, guarding against bad/non-finite values. + + The result is fed directly to ``asyncio.sleep()``, so NaN/Inf and + unparseable values fall back to ``default``. + """ + import math + + value = self.config.extra.get(key) if getattr(self.config, "extra", None) else None + if value is None: + return float(default) + try: + parsed = float(value) + except (TypeError, ValueError): + return float(default) + if not math.isfinite(parsed) or parsed < 0: + return float(default) + return parsed + @staticmethod def _coerce_list(value: Any) -> List[str]: if value is None: diff --git a/gateway/platforms/whatsapp.py b/gateway/platforms/whatsapp.py index 43b6fe664..703f77432 100644 --- a/gateway/platforms/whatsapp.py +++ b/gateway/platforms/whatsapp.py @@ -284,11 +284,37 @@ class WhatsAppAdapter(BasePlatformAdapter): # message triggers a separate agent invocation, wasting tokens and # flooding the user with reply fragments. Default 5s delay / # 10s split delay are conservative for WhatsApp's delivery cadence. - self._text_batch_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS", "5.0")) - self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_SPLIT_DELAY_SECONDS", "10.0")) + # Tunable via config.yaml under + # ``gateway.platforms.whatsapp.extra.text_batch_delay_seconds`` / + # ``text_batch_split_delay_seconds``. + self._text_batch_delay_seconds = self._coerce_float_extra( + "text_batch_delay_seconds", 5.0 + ) + self._text_batch_split_delay_seconds = self._coerce_float_extra( + "text_batch_split_delay_seconds", 10.0 + ) self._pending_text_batches: Dict[str, MessageEvent] = {} self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} + def _coerce_float_extra(self, key: str, default: float) -> float: + """Read a float from ``config.extra``, guarding against bad/non-finite values. + + The result is fed directly to ``asyncio.sleep()``, so NaN/Inf and + unparseable values fall back to ``default``. + """ + import math + + value = self.config.extra.get(key) if getattr(self.config, "extra", None) else None + if value is None: + return float(default) + try: + parsed = float(value) + except (TypeError, ValueError): + return float(default) + if not math.isfinite(parsed) or parsed < 0: + return float(default) + return parsed + def _effective_reply_prefix(self) -> str: """Return the prefix the Node bridge will add in self-chat mode.""" whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat") diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index ac535865d..0482f6624 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -11,6 +11,7 @@ import pytest from gateway.config import PlatformConfig from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides from gateway.platforms.base import SendResult +from gateway.platforms.base import MessageEvent, MessageType from gateway.platforms import weixin from gateway.platforms.weixin import ContextTokenStore, WeixinAdapter from tools.send_message_tool import _parse_target_ref, _send_to_platform @@ -853,15 +854,27 @@ class TestWeixinContentDedup: adapter = _make_adapter() adapter._poll_session = object() adapter.handle_message = AsyncMock() + # Tighten the text-debounce delay so the flush completes quickly. + adapter._text_batch_delay_seconds = 0.05 + adapter._text_batch_split_delay_seconds = 0.05 base_msg = { "from_user_id": "wxid_user1", "item_list": [{"type": 1, "text_item": {"text": "hello world"}}], } - asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-1"})) - asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-2"})) + async def _drive(): + # Both inbound messages share the same event loop so the debounce + # task created by the first one survives to be flushed. + await adapter._process_message({**base_msg, "message_id": "msg-1"}) + await adapter._process_message({**base_msg, "message_id": "msg-2"}) + # Wait out the quiet period so the buffered text batch flushes. + await asyncio.sleep(0.2) + asyncio.run(_drive()) + + # Content-dedup drops the second (duplicate) message before it is even + # enqueued, so only one combined dispatch reaches handle_message. assert adapter.handle_message.await_count == 1 event = adapter.handle_message.await_args[0][0] assert event.text == "hello world" @@ -882,3 +895,76 @@ class TestWeixinContentDedup: assert adapter.handle_message.await_count == 0 # is_duplicate should only be called for message_id, never for content assert all("content:" not in str(call) for call in adapter._dedup.is_duplicate.call_args_list) + + +class TestWeixinTextDebounce: + """Text-debounce batching for rapid multi-message bursts (issue #35301). + + Delays are read from ``config.extra`` (config.yaml), not env vars. + """ + + def test_batch_delays_default_from_config(self): + adapter = _make_adapter() + assert adapter._text_batch_delay_seconds == 3.0 + assert adapter._text_batch_split_delay_seconds == 5.0 + + def test_batch_delays_overridden_via_config_extra(self): + adapter = WeixinAdapter( + PlatformConfig( + enabled=True, + token="test-token", + extra={ + "account_id": "test-account", + "text_batch_delay_seconds": "0.5", + "text_batch_split_delay_seconds": 1.5, + }, + ) + ) + assert adapter._text_batch_delay_seconds == 0.5 + assert adapter._text_batch_split_delay_seconds == 1.5 + + def test_invalid_config_value_falls_back_to_default(self): + adapter = WeixinAdapter( + PlatformConfig( + enabled=True, + token="test-token", + extra={ + "account_id": "test-account", + "text_batch_delay_seconds": "not-a-number", + "text_batch_split_delay_seconds": -4, + }, + ) + ) + assert adapter._text_batch_delay_seconds == 3.0 + assert adapter._text_batch_split_delay_seconds == 5.0 + + def test_rapid_texts_collapse_into_single_dispatch(self): + adapter = _make_adapter() + adapter._text_batch_delay_seconds = 0.05 + adapter._text_batch_split_delay_seconds = 0.05 + dispatched = [] + + async def _capture(event): + dispatched.append(event.text) + + adapter.handle_message = _capture + + def _event(text): + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=adapter.build_source( + chat_id="wxid_user1", chat_type="dm", + user_id="wxid_user1", user_name="wxid_user1", + ), + ) + + async def _drive(): + adapter._enqueue_text_event(_event("one")) + adapter._enqueue_text_event(_event("two")) + adapter._enqueue_text_event(_event("three")) + assert dispatched == [] # nothing flushed during the burst + await asyncio.sleep(0.2) + + asyncio.run(_drive()) + assert dispatched == ["one\ntwo\nthree"] diff --git a/tests/gateway/test_whatsapp_text_batching.py b/tests/gateway/test_whatsapp_text_batching.py new file mode 100644 index 000000000..4258617c6 --- /dev/null +++ b/tests/gateway/test_whatsapp_text_batching.py @@ -0,0 +1,107 @@ +"""Text-debounce batching for the WhatsApp adapter (issue #35301). + +WhatsApp delivers rapid multi-message bursts (forwarded batches, paste-splits) +individually. Without debounce each fragment triggers a separate agent +invocation, wasting tokens and flooding the user with reply fragments. This +mirrors the Telegram/WeCom/Feishu pattern. + +Batch delays are read from ``config.extra`` (config.yaml), not env vars. +""" + +import asyncio + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import MessageEvent, MessageType +from gateway.platforms.whatsapp import WhatsAppAdapter +from gateway.session import SessionSource + + +def _make_adapter(**extra): + base = {"session_name": "test"} + base.update(extra) + return WhatsAppAdapter(PlatformConfig(enabled=True, extra=base)) + + +def _event(text): + src = SessionSource( + platform=Platform.WHATSAPP, + chat_id="chat123", + chat_type="dm", + user_id="user1", + user_name="tester", + ) + return MessageEvent(text=text, message_type=MessageType.TEXT, source=src) + + +def test_batch_delays_default_from_config(): + adapter = _make_adapter() + assert adapter._text_batch_delay_seconds == 5.0 + assert adapter._text_batch_split_delay_seconds == 10.0 + + +def test_batch_delays_overridden_via_config_extra(): + adapter = _make_adapter( + text_batch_delay_seconds="2.5", + text_batch_split_delay_seconds=7, + ) + assert adapter._text_batch_delay_seconds == 2.5 + assert adapter._text_batch_split_delay_seconds == 7.0 + + +def test_invalid_config_value_falls_back_to_default(): + adapter = _make_adapter( + text_batch_delay_seconds="garbage", + text_batch_split_delay_seconds=-3, + ) + assert adapter._text_batch_delay_seconds == 5.0 + assert adapter._text_batch_split_delay_seconds == 10.0 + + +def test_env_var_is_ignored(monkeypatch): + # Config-only path: the legacy HERMES_* env var must NOT influence delays. + monkeypatch.setenv("HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS", "99") + adapter = _make_adapter() + assert adapter._text_batch_delay_seconds == 5.0 + + +def test_rapid_texts_collapse_into_single_dispatch(): + adapter = _make_adapter( + text_batch_delay_seconds=0.05, + text_batch_split_delay_seconds=0.05, + ) + dispatched = [] + + async def _capture(event): + dispatched.append(event.text) + + adapter.handle_message = _capture + + async def _drive(): + adapter._enqueue_text_event(_event("one")) + adapter._enqueue_text_event(_event("two")) + adapter._enqueue_text_event(_event("three")) + assert dispatched == [] # nothing flushed during the burst + await asyncio.sleep(0.2) + + asyncio.run(_drive()) + assert dispatched == ["one\ntwo\nthree"] + + +def test_lone_message_dispatched_alone(): + adapter = _make_adapter( + text_batch_delay_seconds=0.05, + text_batch_split_delay_seconds=0.05, + ) + dispatched = [] + + async def _capture(event): + dispatched.append(event.text) + + adapter.handle_message = _capture + + async def _drive(): + adapter._enqueue_text_event(_event("solo")) + await asyncio.sleep(0.2) + + asyncio.run(_drive()) + assert dispatched == ["solo"] diff --git a/website/docs/user-guide/messaging/weixin.md b/website/docs/user-guide/messaging/weixin.md index a0d25ee8c..30d75dd5b 100644 --- a/website/docs/user-guide/messaging/weixin.md +++ b/website/docs/user-guide/messaging/weixin.md @@ -123,6 +123,8 @@ Set these in `config.yaml` under `platforms.weixin.extra`: | `allow_from` | `[]` | User IDs allowed for DMs (when dm_policy=allowlist) | | `group_allow_from` | `[]` | Group IDs allowed (when group_policy=allowlist) | | `split_multiline_messages` | `false` | When `true`, split multi-line replies into multiple chat messages (legacy behavior). When `false`, keep multi-line replies as one message unless they exceed the length limit. | +| `text_batch_delay_seconds` | `3.0` | Quiet period (seconds) before a buffered burst of rapid text messages is flushed as one combined request. iLink delivers messages individually, so this debounce avoids one agent invocation per fragment. Set `0` to dispatch each message immediately. | +| `text_batch_split_delay_seconds` | `5.0` | Extended flush delay used when the latest fragment is near the split threshold (long messages iLink may have chunked). | ## Access Policies diff --git a/website/docs/user-guide/messaging/whatsapp.md b/website/docs/user-guide/messaging/whatsapp.md index d2bd52a56..5fb5eb2ae 100644 --- a/website/docs/user-guide/messaging/whatsapp.md +++ b/website/docs/user-guide/messaging/whatsapp.md @@ -201,6 +201,22 @@ Code blocks and inline code are preserved as-is since WhatsApp supports triple-b When the agent calls tools (web search, file operations, etc.), WhatsApp displays real-time progress indicators showing which tool is running. This is enabled by default — no configuration needed. +### Message Batching (Debounce) + +WhatsApp delivers each message individually, so a rapid burst (forwarded batches, paste-splits, multi-line text) would otherwise trigger a separate agent invocation per fragment — wasting tokens and producing several disjointed replies. The adapter buffers successive text messages from the same chat and dispatches them as one combined request after a short quiet period (default **5s**, extended to **10s** for very long fragments). Tune via `config.yaml`: + +```yaml +# ~/.hermes/config.yaml +gateway: + platforms: + whatsapp: + extra: + text_batch_delay_seconds: 5.0 # quiet period before flushing a batch + text_batch_split_delay_seconds: 10.0 # extended delay near the split threshold +``` + +Set `text_batch_delay_seconds: 0` to dispatch each message immediately (disables batching). + --- ## Troubleshooting