fix(gateway): config.yaml path for WhatsApp/Weixin text-batch delays
Convert the salvaged text-debounce delays from HERMES_* env vars to config.yaml (gateway.platforms.<name>.extra.text_batch_delay_seconds / text_batch_split_delay_seconds), per the '.env is for secrets only' policy. Adds a finite/non-negative guard so bad YAML values fall back to the defaults instead of crashing asyncio.sleep(). - whatsapp.py / weixin.py: read delays via _coerce_float_extra(config.extra) - update Weixin content-dedup regression test for the deferred dispatch path - add text-debounce coverage (whatsapp + weixin): defaults, config override, bad-value fallback, env-var-ignored, burst-collapse, lone-message - docs: WhatsApp + Weixin config keys
This commit is contained in:
@ -1184,9 +1184,16 @@ class WeixinAdapter(BasePlatformAdapter):
|
||||
# iLink delivers messages individually, so rapid multi-message
|
||||
# bursts (forwarded batches, paste-splits) each trigger a
|
||||
# separate agent invocation. Default 3s delay / 5s split delay
|
||||
# are tuned for iLink's typical delivery cadence.
|
||||
self._text_batch_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_DELAY_SECONDS", "3.0"))
|
||||
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_SPLIT_DELAY_SECONDS", "5.0"))
|
||||
# are tuned for iLink's typical delivery cadence. Tunable via
|
||||
# config.yaml under
|
||||
# ``gateway.platforms.weixin.extra.text_batch_delay_seconds`` /
|
||||
# ``text_batch_split_delay_seconds``.
|
||||
self._text_batch_delay_seconds = self._coerce_float_extra(
|
||||
"text_batch_delay_seconds", 3.0
|
||||
)
|
||||
self._text_batch_split_delay_seconds = self._coerce_float_extra(
|
||||
"text_batch_split_delay_seconds", 5.0
|
||||
)
|
||||
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||
|
||||
@ -1196,6 +1203,25 @@ class WeixinAdapter(BasePlatformAdapter):
|
||||
self._token = str(persisted.get("token") or "").strip()
|
||||
self._base_url = str(persisted.get("base_url") or self._base_url).strip().rstrip("/")
|
||||
|
||||
def _coerce_float_extra(self, key: str, default: float) -> float:
|
||||
"""Read a float from ``config.extra``, guarding against bad/non-finite values.
|
||||
|
||||
The result is fed directly to ``asyncio.sleep()``, so NaN/Inf and
|
||||
unparseable values fall back to ``default``.
|
||||
"""
|
||||
import math
|
||||
|
||||
value = self.config.extra.get(key) if getattr(self.config, "extra", None) else None
|
||||
if value is None:
|
||||
return float(default)
|
||||
try:
|
||||
parsed = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return float(default)
|
||||
if not math.isfinite(parsed) or parsed < 0:
|
||||
return float(default)
|
||||
return parsed
|
||||
|
||||
@staticmethod
|
||||
def _coerce_list(value: Any) -> List[str]:
|
||||
if value is None:
|
||||
|
||||
@ -284,11 +284,37 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
# message triggers a separate agent invocation, wasting tokens and
|
||||
# flooding the user with reply fragments. Default 5s delay /
|
||||
# 10s split delay are conservative for WhatsApp's delivery cadence.
|
||||
self._text_batch_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS", "5.0"))
|
||||
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_SPLIT_DELAY_SECONDS", "10.0"))
|
||||
# Tunable via config.yaml under
|
||||
# ``gateway.platforms.whatsapp.extra.text_batch_delay_seconds`` /
|
||||
# ``text_batch_split_delay_seconds``.
|
||||
self._text_batch_delay_seconds = self._coerce_float_extra(
|
||||
"text_batch_delay_seconds", 5.0
|
||||
)
|
||||
self._text_batch_split_delay_seconds = self._coerce_float_extra(
|
||||
"text_batch_split_delay_seconds", 10.0
|
||||
)
|
||||
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||
|
||||
def _coerce_float_extra(self, key: str, default: float) -> float:
|
||||
"""Read a float from ``config.extra``, guarding against bad/non-finite values.
|
||||
|
||||
The result is fed directly to ``asyncio.sleep()``, so NaN/Inf and
|
||||
unparseable values fall back to ``default``.
|
||||
"""
|
||||
import math
|
||||
|
||||
value = self.config.extra.get(key) if getattr(self.config, "extra", None) else None
|
||||
if value is None:
|
||||
return float(default)
|
||||
try:
|
||||
parsed = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return float(default)
|
||||
if not math.isfinite(parsed) or parsed < 0:
|
||||
return float(default)
|
||||
return parsed
|
||||
|
||||
def _effective_reply_prefix(self) -> str:
|
||||
"""Return the prefix the Node bridge will add in self-chat mode."""
|
||||
whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat")
|
||||
|
||||
@ -11,6 +11,7 @@ import pytest
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides
|
||||
from gateway.platforms.base import SendResult
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.platforms import weixin
|
||||
from gateway.platforms.weixin import ContextTokenStore, WeixinAdapter
|
||||
from tools.send_message_tool import _parse_target_ref, _send_to_platform
|
||||
@ -853,15 +854,27 @@ class TestWeixinContentDedup:
|
||||
adapter = _make_adapter()
|
||||
adapter._poll_session = object()
|
||||
adapter.handle_message = AsyncMock()
|
||||
# Tighten the text-debounce delay so the flush completes quickly.
|
||||
adapter._text_batch_delay_seconds = 0.05
|
||||
adapter._text_batch_split_delay_seconds = 0.05
|
||||
|
||||
base_msg = {
|
||||
"from_user_id": "wxid_user1",
|
||||
"item_list": [{"type": 1, "text_item": {"text": "hello world"}}],
|
||||
}
|
||||
|
||||
asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-1"}))
|
||||
asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-2"}))
|
||||
async def _drive():
|
||||
# Both inbound messages share the same event loop so the debounce
|
||||
# task created by the first one survives to be flushed.
|
||||
await adapter._process_message({**base_msg, "message_id": "msg-1"})
|
||||
await adapter._process_message({**base_msg, "message_id": "msg-2"})
|
||||
# Wait out the quiet period so the buffered text batch flushes.
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
asyncio.run(_drive())
|
||||
|
||||
# Content-dedup drops the second (duplicate) message before it is even
|
||||
# enqueued, so only one combined dispatch reaches handle_message.
|
||||
assert adapter.handle_message.await_count == 1
|
||||
event = adapter.handle_message.await_args[0][0]
|
||||
assert event.text == "hello world"
|
||||
@ -882,3 +895,76 @@ class TestWeixinContentDedup:
|
||||
assert adapter.handle_message.await_count == 0
|
||||
# is_duplicate should only be called for message_id, never for content
|
||||
assert all("content:" not in str(call) for call in adapter._dedup.is_duplicate.call_args_list)
|
||||
|
||||
|
||||
class TestWeixinTextDebounce:
|
||||
"""Text-debounce batching for rapid multi-message bursts (issue #35301).
|
||||
|
||||
Delays are read from ``config.extra`` (config.yaml), not env vars.
|
||||
"""
|
||||
|
||||
def test_batch_delays_default_from_config(self):
|
||||
adapter = _make_adapter()
|
||||
assert adapter._text_batch_delay_seconds == 3.0
|
||||
assert adapter._text_batch_split_delay_seconds == 5.0
|
||||
|
||||
def test_batch_delays_overridden_via_config_extra(self):
|
||||
adapter = WeixinAdapter(
|
||||
PlatformConfig(
|
||||
enabled=True,
|
||||
token="test-token",
|
||||
extra={
|
||||
"account_id": "test-account",
|
||||
"text_batch_delay_seconds": "0.5",
|
||||
"text_batch_split_delay_seconds": 1.5,
|
||||
},
|
||||
)
|
||||
)
|
||||
assert adapter._text_batch_delay_seconds == 0.5
|
||||
assert adapter._text_batch_split_delay_seconds == 1.5
|
||||
|
||||
def test_invalid_config_value_falls_back_to_default(self):
|
||||
adapter = WeixinAdapter(
|
||||
PlatformConfig(
|
||||
enabled=True,
|
||||
token="test-token",
|
||||
extra={
|
||||
"account_id": "test-account",
|
||||
"text_batch_delay_seconds": "not-a-number",
|
||||
"text_batch_split_delay_seconds": -4,
|
||||
},
|
||||
)
|
||||
)
|
||||
assert adapter._text_batch_delay_seconds == 3.0
|
||||
assert adapter._text_batch_split_delay_seconds == 5.0
|
||||
|
||||
def test_rapid_texts_collapse_into_single_dispatch(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._text_batch_delay_seconds = 0.05
|
||||
adapter._text_batch_split_delay_seconds = 0.05
|
||||
dispatched = []
|
||||
|
||||
async def _capture(event):
|
||||
dispatched.append(event.text)
|
||||
|
||||
adapter.handle_message = _capture
|
||||
|
||||
def _event(text):
|
||||
return MessageEvent(
|
||||
text=text,
|
||||
message_type=MessageType.TEXT,
|
||||
source=adapter.build_source(
|
||||
chat_id="wxid_user1", chat_type="dm",
|
||||
user_id="wxid_user1", user_name="wxid_user1",
|
||||
),
|
||||
)
|
||||
|
||||
async def _drive():
|
||||
adapter._enqueue_text_event(_event("one"))
|
||||
adapter._enqueue_text_event(_event("two"))
|
||||
adapter._enqueue_text_event(_event("three"))
|
||||
assert dispatched == [] # nothing flushed during the burst
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
asyncio.run(_drive())
|
||||
assert dispatched == ["one\ntwo\nthree"]
|
||||
|
||||
107
tests/gateway/test_whatsapp_text_batching.py
Normal file
107
tests/gateway/test_whatsapp_text_batching.py
Normal file
@ -0,0 +1,107 @@
|
||||
"""Text-debounce batching for the WhatsApp adapter (issue #35301).
|
||||
|
||||
WhatsApp delivers rapid multi-message bursts (forwarded batches, paste-splits)
|
||||
individually. Without debounce each fragment triggers a separate agent
|
||||
invocation, wasting tokens and flooding the user with reply fragments. This
|
||||
mirrors the Telegram/WeCom/Feishu pattern.
|
||||
|
||||
Batch delays are read from ``config.extra`` (config.yaml), not env vars.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.platforms.whatsapp import WhatsAppAdapter
|
||||
from gateway.session import SessionSource
|
||||
|
||||
|
||||
def _make_adapter(**extra):
|
||||
base = {"session_name": "test"}
|
||||
base.update(extra)
|
||||
return WhatsAppAdapter(PlatformConfig(enabled=True, extra=base))
|
||||
|
||||
|
||||
def _event(text):
|
||||
src = SessionSource(
|
||||
platform=Platform.WHATSAPP,
|
||||
chat_id="chat123",
|
||||
chat_type="dm",
|
||||
user_id="user1",
|
||||
user_name="tester",
|
||||
)
|
||||
return MessageEvent(text=text, message_type=MessageType.TEXT, source=src)
|
||||
|
||||
|
||||
def test_batch_delays_default_from_config():
|
||||
adapter = _make_adapter()
|
||||
assert adapter._text_batch_delay_seconds == 5.0
|
||||
assert adapter._text_batch_split_delay_seconds == 10.0
|
||||
|
||||
|
||||
def test_batch_delays_overridden_via_config_extra():
|
||||
adapter = _make_adapter(
|
||||
text_batch_delay_seconds="2.5",
|
||||
text_batch_split_delay_seconds=7,
|
||||
)
|
||||
assert adapter._text_batch_delay_seconds == 2.5
|
||||
assert adapter._text_batch_split_delay_seconds == 7.0
|
||||
|
||||
|
||||
def test_invalid_config_value_falls_back_to_default():
|
||||
adapter = _make_adapter(
|
||||
text_batch_delay_seconds="garbage",
|
||||
text_batch_split_delay_seconds=-3,
|
||||
)
|
||||
assert adapter._text_batch_delay_seconds == 5.0
|
||||
assert adapter._text_batch_split_delay_seconds == 10.0
|
||||
|
||||
|
||||
def test_env_var_is_ignored(monkeypatch):
|
||||
# Config-only path: the legacy HERMES_* env var must NOT influence delays.
|
||||
monkeypatch.setenv("HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS", "99")
|
||||
adapter = _make_adapter()
|
||||
assert adapter._text_batch_delay_seconds == 5.0
|
||||
|
||||
|
||||
def test_rapid_texts_collapse_into_single_dispatch():
|
||||
adapter = _make_adapter(
|
||||
text_batch_delay_seconds=0.05,
|
||||
text_batch_split_delay_seconds=0.05,
|
||||
)
|
||||
dispatched = []
|
||||
|
||||
async def _capture(event):
|
||||
dispatched.append(event.text)
|
||||
|
||||
adapter.handle_message = _capture
|
||||
|
||||
async def _drive():
|
||||
adapter._enqueue_text_event(_event("one"))
|
||||
adapter._enqueue_text_event(_event("two"))
|
||||
adapter._enqueue_text_event(_event("three"))
|
||||
assert dispatched == [] # nothing flushed during the burst
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
asyncio.run(_drive())
|
||||
assert dispatched == ["one\ntwo\nthree"]
|
||||
|
||||
|
||||
def test_lone_message_dispatched_alone():
|
||||
adapter = _make_adapter(
|
||||
text_batch_delay_seconds=0.05,
|
||||
text_batch_split_delay_seconds=0.05,
|
||||
)
|
||||
dispatched = []
|
||||
|
||||
async def _capture(event):
|
||||
dispatched.append(event.text)
|
||||
|
||||
adapter.handle_message = _capture
|
||||
|
||||
async def _drive():
|
||||
adapter._enqueue_text_event(_event("solo"))
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
asyncio.run(_drive())
|
||||
assert dispatched == ["solo"]
|
||||
@ -123,6 +123,8 @@ Set these in `config.yaml` under `platforms.weixin.extra`:
|
||||
| `allow_from` | `[]` | User IDs allowed for DMs (when dm_policy=allowlist) |
|
||||
| `group_allow_from` | `[]` | Group IDs allowed (when group_policy=allowlist) |
|
||||
| `split_multiline_messages` | `false` | When `true`, split multi-line replies into multiple chat messages (legacy behavior). When `false`, keep multi-line replies as one message unless they exceed the length limit. |
|
||||
| `text_batch_delay_seconds` | `3.0` | Quiet period (seconds) before a buffered burst of rapid text messages is flushed as one combined request. iLink delivers messages individually, so this debounce avoids one agent invocation per fragment. Set `0` to dispatch each message immediately. |
|
||||
| `text_batch_split_delay_seconds` | `5.0` | Extended flush delay used when the latest fragment is near the split threshold (long messages iLink may have chunked). |
|
||||
|
||||
## Access Policies
|
||||
|
||||
|
||||
@ -201,6 +201,22 @@ Code blocks and inline code are preserved as-is since WhatsApp supports triple-b
|
||||
|
||||
When the agent calls tools (web search, file operations, etc.), WhatsApp displays real-time progress indicators showing which tool is running. This is enabled by default — no configuration needed.
|
||||
|
||||
### Message Batching (Debounce)
|
||||
|
||||
WhatsApp delivers each message individually, so a rapid burst (forwarded batches, paste-splits, multi-line text) would otherwise trigger a separate agent invocation per fragment — wasting tokens and producing several disjointed replies. The adapter buffers successive text messages from the same chat and dispatches them as one combined request after a short quiet period (default **5s**, extended to **10s** for very long fragments). Tune via `config.yaml`:
|
||||
|
||||
```yaml
|
||||
# ~/.hermes/config.yaml
|
||||
gateway:
|
||||
platforms:
|
||||
whatsapp:
|
||||
extra:
|
||||
text_batch_delay_seconds: 5.0 # quiet period before flushing a batch
|
||||
text_batch_split_delay_seconds: 10.0 # extended delay near the split threshold
|
||||
```
|
||||
|
||||
Set `text_batch_delay_seconds: 0` to dispatch each message immediately (disables batching).
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
Reference in New Issue
Block a user