diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index 025bf052c..d601792ab 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -1180,6 +1180,16 @@ class WeixinAdapter(BasePlatformAdapter): default=False, ) + # Text debounce batching (mirrors Telegram adapter pattern). + # iLink delivers messages individually, so rapid multi-message + # bursts (forwarded batches, paste-splits) each trigger a + # separate agent invocation. Default 3s delay / 5s split delay + # are tuned for iLink's typical delivery cadence. + self._text_batch_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_DELAY_SECONDS", "3.0")) + self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WEIXIN_TEXT_BATCH_SPLIT_DELAY_SECONDS", "5.0")) + self._pending_text_batches: Dict[str, MessageEvent] = {} + self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} + if self._account_id and not self._token: persisted = load_weixin_account(hermes_home, self._account_id) if persisted: @@ -1247,6 +1257,11 @@ class WeixinAdapter(BasePlatformAdapter): async def disconnect(self) -> None: _LIVE_ADAPTERS.pop(self._token, None) self._running = False + for task in self._pending_text_batch_tasks.values(): + if not task.done(): + task.cancel() + self._pending_text_batches.clear() + self._pending_text_batch_tasks.clear() if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() try: @@ -1395,12 +1410,10 @@ class WeixinAdapter(BasePlatformAdapter): timestamp=datetime.now(), ) logger.info("[%s] inbound from=%s type=%s media=%d", self.name, _safe_id(sender_id), source.chat_type, len(media_paths)) - await self.handle_message(event) - - @property - def enforces_own_access_policy(self) -> bool: - """Weixin gates DM/group access at intake via dm_policy/group_policy.""" - return True + if event.message_type == MessageType.TEXT: + self._enqueue_text_event(event) + else: + await self.handle_message(event) def _is_dm_allowed(self, sender_id: str) -> bool: if self._dm_policy == "disabled": @@ -1409,6 +1422,76 @@ class WeixinAdapter(BasePlatformAdapter): return sender_id in self._allow_from return True + @property + def enforces_own_access_policy(self) -> bool: + """Weixin gates DM/group access at intake via dm_policy/group_policy.""" + return True + + # ------------------------------------------------------------------ + # Text debounce batching + # ------------------------------------------------------------------ + + _SPLIT_THRESHOLD = 1800 # iLink chunks at ~2048 chars + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), + ) + + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer. + + When users forward multiple messages or send rapid-fire texts + via WeChat, each arrives as a separate iLink message. This + concatenates them and waits for a short quiet period before + dispatching the combined message. + """ + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) + ) + + async def _flush_text_batch(self, key: str) -> None: + """Wait for quiet period then dispatch aggregated text.""" + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + if self._pending_text_batch_tasks.get(key) is not current_task: + return + event = self._pending_text_batches.pop(key, None) + if not event: + return + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) + async def _collect_media(self, item: Dict[str, Any], media_paths: List[str], media_types: List[str]) -> None: item_type = item.get("type") if item_type == ITEM_IMAGE: diff --git a/gateway/platforms/whatsapp.py b/gateway/platforms/whatsapp.py index 0ca3d41fa..43b6fe664 100644 --- a/gateway/platforms/whatsapp.py +++ b/gateway/platforms/whatsapp.py @@ -278,6 +278,17 @@ class WhatsAppAdapter(BasePlatformAdapter): # notification before the normal "✓ whatsapp disconnected" fires. self._shutting_down: bool = False + # Text debounce batching (mirrors Telegram adapter pattern). + # WhatsApp often delivers multiple messages in rapid succession + # (e.g. forwarded batches, paste-splits) — without debounce each + # message triggers a separate agent invocation, wasting tokens and + # flooding the user with reply fragments. Default 5s delay / + # 10s split delay are conservative for WhatsApp's delivery cadence. + self._text_batch_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_DELAY_SECONDS", "5.0")) + self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WHATSAPP_TEXT_BATCH_SPLIT_DELAY_SECONDS", "10.0")) + self._pending_text_batches: Dict[str, MessageEvent] = {} + self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {} + def _effective_reply_prefix(self) -> str: """Return the prefix the Node bridge will add in self-chat mode.""" whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat") @@ -1139,7 +1150,10 @@ class WhatsAppAdapter(BasePlatformAdapter): for msg_data in messages: event = await self._build_message_event(msg_data) if event: - await self.handle_message(event) + if event.message_type == MessageType.TEXT: + self._enqueue_text_event(event) + else: + await self.handle_message(event) except asyncio.CancelledError: break except Exception as e: @@ -1151,7 +1165,67 @@ class WhatsAppAdapter(BasePlatformAdapter): await asyncio.sleep(5) await asyncio.sleep(1) # Poll interval - + + # ── Text debounce batching ────────────────────────────────────── + + _SPLIT_THRESHOLD = 6000 # WhatsApp supports ~65K chars; generous threshold + + def _text_batch_key(self, event: MessageEvent) -> str: + """Session-scoped key for text message batching.""" + from gateway.session import build_session_key + return build_session_key( + event.source, + group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), + ) + + def _enqueue_text_event(self, event: MessageEvent) -> None: + """Buffer a text event and reset the flush timer. + + When WhatsApp delivers rapid-fire messages (e.g. forwarded + batches), this concatenates them and waits for a short quiet + period before dispatching the combined message. + """ + key = self._text_batch_key(event) + existing = self._pending_text_batches.get(key) + chunk_len = len(event.text or "") + if existing is None: + event._last_chunk_len = chunk_len # type: ignore[attr-defined] + self._pending_text_batches[key] = event + else: + if event.text: + existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text + existing._last_chunk_len = chunk_len # type: ignore[attr-defined] + if event.media_urls: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + + prior_task = self._pending_text_batch_tasks.get(key) + if prior_task and not prior_task.done(): + prior_task.cancel() + self._pending_text_batch_tasks[key] = asyncio.create_task( + self._flush_text_batch(key) + ) + + async def _flush_text_batch(self, key: str) -> None: + """Wait for quiet period then dispatch aggregated text.""" + current_task = asyncio.current_task() + try: + pending = self._pending_text_batches.get(key) + last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0 + if last_len >= self._SPLIT_THRESHOLD: + delay = self._text_batch_split_delay_seconds + else: + delay = self._text_batch_delay_seconds + await asyncio.sleep(delay) + event = self._pending_text_batches.pop(key, None) + if not event: + return + await self.handle_message(event) + finally: + if self._pending_text_batch_tasks.get(key) is current_task: + self._pending_text_batch_tasks.pop(key, None) + async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]: """Build a MessageEvent from bridge message data, downloading images to cache.""" try: