feat: add text debounce batching for WhatsApp and WeChat platforms

WhatsApp and WeChat (Weixin/iLink) both deliver messages individually
without any client-side batching, so rapid multi-message bursts (forwarded
batches, paste-splits, etc.) each trigger a separate agent invocation.

This wastes tokens (redundant system prompts / context for each fragment)
and degrades UX (the user receives reply fragments instead of a single
coherent response).

Both adapters now mirror the Telegram adapter's proven text-debounce
pattern:

- _text_batch_delay_seconds / _text_batch_split_delay_seconds
  (configurable via env vars)
- _pending_text_batches dict for per-session aggregation
- _enqueue_text_event() concatenates successive TEXT messages and
  resets the flush timer
- _flush_text_batch() dispatches after the quiet period expires

Configurable via env vars:
  HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS (default 5.0)
  HERMES_WHATSAPP_TEXT_BATCH_SPLIT_DELAY_SECONDS (default 10.0)
  HERMES_WEIXIN_TEXT_BATCH_DELAY_SECONDS (default 3.0)
  HERMES_WEIXIN_TEXT_BATCH_SPLIT_DELAY_SECONDS (default 5.0)
This commit is contained in:
RedPiggy
2026-05-30 19:02:36 +08:00
committed by Teknium
parent 234ac00937
commit b0ce47daac
2 changed files with 165 additions and 8 deletions

View File

@ -1180,6 +1180,16 @@ class WeixinAdapter(BasePlatformAdapter):
default=False,
)
# Text debounce batching (mirrors Telegram adapter pattern).
# iLink delivers messages individually, so rapid multi-message
# bursts (forwarded batches, paste-splits) each trigger a
# separate agent invocation. Default 3s delay / 5s split delay
# are tuned for iLink's typical delivery cadence.
self._text_batch_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_DELAY_SECONDS", "3.0"))
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_SPLIT_DELAY_SECONDS", "5.0"))
self._pending_text_batches: Dict[str, MessageEvent] = {}
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
if self._account_id and not self._token:
persisted = load_weixin_account(hermes_home, self._account_id)
if persisted:
@ -1247,6 +1257,11 @@ class WeixinAdapter(BasePlatformAdapter):
async def disconnect(self) -> None:
_LIVE_ADAPTERS.pop(self._token, None)
self._running = False
for task in self._pending_text_batch_tasks.values():
if not task.done():
task.cancel()
self._pending_text_batches.clear()
self._pending_text_batch_tasks.clear()
if self._poll_task and not self._poll_task.done():
self._poll_task.cancel()
try:
@ -1395,12 +1410,10 @@ class WeixinAdapter(BasePlatformAdapter):
timestamp=datetime.now(),
)
logger.info("[%s] inbound from=%s type=%s media=%d", self.name, _safe_id(sender_id), source.chat_type, len(media_paths))
await self.handle_message(event)
@property
def enforces_own_access_policy(self) -> bool:
"""Weixin gates DM/group access at intake via dm_policy/group_policy."""
return True
if event.message_type == MessageType.TEXT:
self._enqueue_text_event(event)
else:
await self.handle_message(event)
def _is_dm_allowed(self, sender_id: str) -> bool:
if self._dm_policy == "disabled":
@ -1409,6 +1422,76 @@ class WeixinAdapter(BasePlatformAdapter):
return sender_id in self._allow_from
return True
@property
def enforces_own_access_policy(self) -> bool:
"""Weixin gates DM/group access at intake via dm_policy/group_policy."""
return True
# ------------------------------------------------------------------
# Text debounce batching
# ------------------------------------------------------------------
_SPLIT_THRESHOLD = 1800 # iLink chunks at ~2048 chars
def _text_batch_key(self, event: MessageEvent) -> str:
"""Session-scoped key for text message batching."""
from gateway.session import build_session_key
return build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
def _enqueue_text_event(self, event: MessageEvent) -> None:
"""Buffer a text event and reset the flush timer.
When users forward multiple messages or send rapid-fire texts
via WeChat, each arrives as a separate iLink message. This
concatenates them and waits for a short quiet period before
dispatching the combined message.
"""
key = self._text_batch_key(event)
existing = self._pending_text_batches.get(key)
chunk_len = len(event.text or "")
if existing is None:
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
self._pending_text_batches[key] = event
else:
if event.text:
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
if event.media_urls:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
prior_task = self._pending_text_batch_tasks.get(key)
if prior_task and not prior_task.done():
prior_task.cancel()
self._pending_text_batch_tasks[key] = asyncio.create_task(
self._flush_text_batch(key)
)
async def _flush_text_batch(self, key: str) -> None:
"""Wait for quiet period then dispatch aggregated text."""
current_task = asyncio.current_task()
try:
pending = self._pending_text_batches.get(key)
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
if last_len >= self._SPLIT_THRESHOLD:
delay = self._text_batch_split_delay_seconds
else:
delay = self._text_batch_delay_seconds
await asyncio.sleep(delay)
if self._pending_text_batch_tasks.get(key) is not current_task:
return
event = self._pending_text_batches.pop(key, None)
if not event:
return
await self.handle_message(event)
finally:
if self._pending_text_batch_tasks.get(key) is current_task:
self._pending_text_batch_tasks.pop(key, None)
async def _collect_media(self, item: Dict[str, Any], media_paths: List[str], media_types: List[str]) -> None:
item_type = item.get("type")
if item_type == ITEM_IMAGE:

View File

@ -278,6 +278,17 @@ class WhatsAppAdapter(BasePlatformAdapter):
# notification before the normal "✓ whatsapp disconnected" fires.
self._shutting_down: bool = False
# Text debounce batching (mirrors Telegram adapter pattern).
# WhatsApp often delivers multiple messages in rapid succession
# (e.g. forwarded batches, paste-splits) — without debounce each
# message triggers a separate agent invocation, wasting tokens and
# flooding the user with reply fragments. Default 5s delay /
# 10s split delay are conservative for WhatsApp's delivery cadence.
self._text_batch_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS", "5.0"))
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_SPLIT_DELAY_SECONDS", "10.0"))
self._pending_text_batches: Dict[str, MessageEvent] = {}
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
def _effective_reply_prefix(self) -> str:
"""Return the prefix the Node bridge will add in self-chat mode."""
whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat")
@ -1139,7 +1150,10 @@ class WhatsAppAdapter(BasePlatformAdapter):
for msg_data in messages:
event = await self._build_message_event(msg_data)
if event:
await self.handle_message(event)
if event.message_type == MessageType.TEXT:
self._enqueue_text_event(event)
else:
await self.handle_message(event)
except asyncio.CancelledError:
break
except Exception as e:
@ -1151,7 +1165,67 @@ class WhatsAppAdapter(BasePlatformAdapter):
await asyncio.sleep(5)
await asyncio.sleep(1) # Poll interval
# ── Text debounce batching ──────────────────────────────────────
_SPLIT_THRESHOLD = 6000 # WhatsApp supports ~65K chars; generous threshold
def _text_batch_key(self, event: MessageEvent) -> str:
"""Session-scoped key for text message batching."""
from gateway.session import build_session_key
return build_session_key(
event.source,
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
)
def _enqueue_text_event(self, event: MessageEvent) -> None:
"""Buffer a text event and reset the flush timer.
When WhatsApp delivers rapid-fire messages (e.g. forwarded
batches), this concatenates them and waits for a short quiet
period before dispatching the combined message.
"""
key = self._text_batch_key(event)
existing = self._pending_text_batches.get(key)
chunk_len = len(event.text or "")
if existing is None:
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
self._pending_text_batches[key] = event
else:
if event.text:
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
if event.media_urls:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
prior_task = self._pending_text_batch_tasks.get(key)
if prior_task and not prior_task.done():
prior_task.cancel()
self._pending_text_batch_tasks[key] = asyncio.create_task(
self._flush_text_batch(key)
)
async def _flush_text_batch(self, key: str) -> None:
"""Wait for quiet period then dispatch aggregated text."""
current_task = asyncio.current_task()
try:
pending = self._pending_text_batches.get(key)
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
if last_len >= self._SPLIT_THRESHOLD:
delay = self._text_batch_split_delay_seconds
else:
delay = self._text_batch_delay_seconds
await asyncio.sleep(delay)
event = self._pending_text_batches.pop(key, None)
if not event:
return
await self.handle_message(event)
finally:
if self._pending_text_batch_tasks.get(key) is current_task:
self._pending_text_batch_tasks.pop(key, None)
async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]:
"""Build a MessageEvent from bridge message data, downloading images to cache."""
try: