fix(feishu): bound _chat_locks with LRU eviction (#34836)
The Feishu adapter stored one asyncio.Lock per chat_id in a plain dict with no upper bound, so a long-running gateway that saw many distinct chats grew _chat_locks without limit. Port the LRU-eviction pattern already used by the yuanbao adapter: OrderedDict + move_to_end on access, CHAT_LOCK_MAX_SIZE cap (1000), and eviction that skips currently-held locks (falling back to dropping the LRU entry only if all are held).
This commit is contained in:
@ -48,6 +48,7 @@ user is seen through different apps in the future.
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import hashlib
|
||||
import hmac
|
||||
import itertools
|
||||
@ -1408,6 +1409,8 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
"""Feishu/Lark bot adapter."""
|
||||
|
||||
MAX_MESSAGE_LENGTH = 8000
|
||||
# Max distinct chat IDs retained in _chat_locks before LRU eviction kicks in.
|
||||
CHAT_LOCK_MAX_SIZE: int = 1000
|
||||
# Threshold for detecting Feishu client-side message splits.
|
||||
# When a chunk is near the ~4096-char practical limit, a continuation
|
||||
# is almost certain.
|
||||
@ -1445,7 +1448,7 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
self._pending_inbound_lock = threading.Lock()
|
||||
self._pending_drain_scheduled = False
|
||||
self._pending_inbound_max_depth = 1000 # cap queue; drop oldest beyond
|
||||
self._chat_locks: Dict[str, asyncio.Lock] = {} # chat_id → lock (per-chat serial processing)
|
||||
self._chat_locks: "collections.OrderedDict[str, asyncio.Lock]" = collections.OrderedDict() # chat_id → lock (per-chat serial processing, LRU-bounded)
|
||||
self._sent_message_ids_to_chat: Dict[str, str] = {} # message_id → chat_id (for reaction routing)
|
||||
self._sent_message_id_order: List[str] = [] # LRU order for _sent_message_ids_to_chat
|
||||
self._chat_info_cache: Dict[str, Dict[str, Any]] = {}
|
||||
@ -2835,11 +2838,28 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
# =========================================================================
|
||||
|
||||
def _get_chat_lock(self, chat_id: str) -> asyncio.Lock:
|
||||
"""Return (creating if needed) the per-chat asyncio.Lock for serial message processing."""
|
||||
"""Return (creating if needed) the per-chat asyncio.Lock for serial message processing.
|
||||
|
||||
Bounded with LRU eviction so a long-running gateway that sees many
|
||||
distinct chats does not grow ``_chat_locks`` without limit. Locks that
|
||||
are currently held are never evicted; if every entry is locked we fall
|
||||
back to dropping the least-recently-used one.
|
||||
"""
|
||||
lock = self._chat_locks.get(chat_id)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._chat_locks[chat_id] = lock
|
||||
if lock is not None:
|
||||
self._chat_locks.move_to_end(chat_id)
|
||||
return lock
|
||||
if len(self._chat_locks) >= self.CHAT_LOCK_MAX_SIZE:
|
||||
evicted = False
|
||||
for key in list(self._chat_locks):
|
||||
if not self._chat_locks[key].locked():
|
||||
self._chat_locks.pop(key)
|
||||
evicted = True
|
||||
break
|
||||
if not evicted:
|
||||
self._chat_locks.pop(next(iter(self._chat_locks)))
|
||||
lock = asyncio.Lock()
|
||||
self._chat_locks[chat_id] = lock
|
||||
return lock
|
||||
|
||||
async def _handle_message_with_guards(self, event: MessageEvent) -> None:
|
||||
|
||||
@ -4883,3 +4883,62 @@ class TestFeishuMentionEndToEnd(unittest.TestCase):
|
||||
# Body: leading @Hermes stripped, Alice preserved, trailing text intact.
|
||||
self.assertIn("@Alice review the spec with Alice", event.text)
|
||||
self.assertNotIn("@Hermes @Alice", event.text)
|
||||
|
||||
|
||||
class TestChatLockEviction(unittest.TestCase):
|
||||
"""_get_chat_lock is LRU-bounded so _chat_locks cannot grow unbounded."""
|
||||
|
||||
def _make_adapter(self, max_size=5):
|
||||
import collections as _collections
|
||||
|
||||
from gateway.platforms.feishu import FeishuAdapter
|
||||
|
||||
adapter = object.__new__(FeishuAdapter)
|
||||
adapter._chat_locks = _collections.OrderedDict()
|
||||
adapter.CHAT_LOCK_MAX_SIZE = max_size
|
||||
return adapter
|
||||
|
||||
def test_chat_locks_is_ordered_dict(self):
|
||||
import collections as _collections
|
||||
|
||||
adapter = self._make_adapter()
|
||||
self.assertIsInstance(adapter._chat_locks, _collections.OrderedDict)
|
||||
|
||||
def test_same_id_returns_same_lock_and_stays_bounded(self):
|
||||
adapter = self._make_adapter(max_size=5)
|
||||
locks = [adapter._get_chat_lock(f"c{i}") for i in range(5)]
|
||||
self.assertEqual(len(adapter._chat_locks), 5)
|
||||
# Re-requesting an existing id returns the identical lock, no growth.
|
||||
self.assertIs(adapter._get_chat_lock("c2"), locks[2])
|
||||
self.assertEqual(len(adapter._chat_locks), 5)
|
||||
|
||||
def test_lru_eviction_respects_recent_access(self):
|
||||
adapter = self._make_adapter(max_size=5)
|
||||
for i in range(5):
|
||||
adapter._get_chat_lock(f"c{i}")
|
||||
# Touch c0 so it is no longer the LRU entry, then add a new chat.
|
||||
adapter._get_chat_lock("c0")
|
||||
adapter._get_chat_lock("c_new")
|
||||
self.assertEqual(len(adapter._chat_locks), 5)
|
||||
self.assertNotIn("c1", adapter._chat_locks) # c1 was the true LRU
|
||||
self.assertIn("c0", adapter._chat_locks)
|
||||
self.assertIn("c_new", adapter._chat_locks)
|
||||
|
||||
def test_eviction_skips_held_locks(self):
|
||||
adapter = self._make_adapter(max_size=3)
|
||||
|
||||
async def _run():
|
||||
held = adapter._get_chat_lock("held")
|
||||
await held.acquire()
|
||||
try:
|
||||
adapter._get_chat_lock("x")
|
||||
adapter._get_chat_lock("y")
|
||||
# At capacity; "held" is LRU but locked, so "x" should go instead.
|
||||
adapter._get_chat_lock("z")
|
||||
self.assertIn("held", adapter._chat_locks)
|
||||
self.assertNotIn("x", adapter._chat_locks)
|
||||
self.assertEqual(len(adapter._chat_locks), 3)
|
||||
finally:
|
||||
held.release()
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
Reference in New Issue
Block a user