From 765790a216d75440cce69c17abd2356327712863 Mon Sep 17 00:00:00 2001 From: firefly Date: Fri, 29 May 2026 22:10:37 -0400 Subject: [PATCH] test(weixin): regression suite for _api_post/_api_get timeout migration --- tests/gateway/test_weixin.py | 145 +++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/tests/gateway/test_weixin.py b/tests/gateway/test_weixin.py index 0482f6624..bbfba37d5 100644 --- a/tests/gateway/test_weixin.py +++ b/tests/gateway/test_weixin.py @@ -968,3 +968,148 @@ class TestWeixinTextDebounce: asyncio.run(_drive()) assert dispatched == ["one\ntwo\nthree"] + + +class _StubResponse: + def __init__(self, *, status=200, body="{}", delay=0.0): + self.status = status + self.ok = 200 <= status < 300 + self._body = body + self._delay = delay + + async def __aenter__(self): + return self + + async def __aexit__(self, *_exc): + return False + + async def text(self): + if self._delay: + await asyncio.sleep(self._delay) + return self._body + + +class _StubSession: + """Records request kwargs and returns a configurable async-CM response. + + Unlike aiohttp.ClientSession it installs no TimerContext, so it cannot + reproduce aiohttp's cross-loop crash directly; these tests instead pin the + observable contract of the asyncio.wait_for migration. + """ + + def __init__(self, response): + self._response = response + self.post_calls = [] + self.get_calls = [] + + def post(self, url, **kwargs): + self.post_calls.append((url, kwargs)) + return self._response + + def get(self, url, **kwargs): + self.get_calls.append((url, kwargs)) + return self._response + + +class TestWeixinApiTimeout: + def test_api_post_does_not_pass_aiohttp_timeout_kwarg(self): + session = _StubSession(_StubResponse(body='{"ret": 0}')) + result = asyncio.run( + weixin._api_post( + session, + base_url="https://weixin.example.com", + endpoint="ep", + payload={"k": "v"}, + token="tok", + timeout_ms=5000, + ) + ) + assert result == {"ret": 0} + # The fix enforces the timeout via asyncio.wait_for, so ClientTimeout is + # gone and `timeout` is no longer forwarded to session.post(). + [(_url, kwargs)] = session.post_calls + assert "timeout" not in kwargs + + def test_api_get_does_not_pass_aiohttp_timeout_kwarg(self): + session = _StubSession(_StubResponse(body='{"ret": 0}')) + result = asyncio.run( + weixin._api_get( + session, + base_url="https://weixin.example.com", + endpoint="ep", + timeout_ms=5000, + ) + ) + assert result == {"ret": 0} + [(_url, kwargs)] = session.get_calls + assert "timeout" not in kwargs + + def test_api_post_raises_timeout_when_response_is_slow(self): + # 1 ms budget against a 1 s response: wait_for must cancel and raise. + session = _StubSession(_StubResponse(delay=1.0)) + with pytest.raises(asyncio.TimeoutError): + asyncio.run( + weixin._api_post( + session, + base_url="https://weixin.example.com", + endpoint="ep", + payload={"k": "v"}, + token="tok", + timeout_ms=1, + ) + ) + + def test_api_get_raises_timeout_when_response_is_slow(self): + session = _StubSession(_StubResponse(delay=1.0)) + with pytest.raises(asyncio.TimeoutError): + asyncio.run( + weixin._api_get( + session, + base_url="https://weixin.example.com", + endpoint="ep", + timeout_ms=1, + ) + ) + + def test_api_post_raises_runtime_error_on_non_ok_status(self): + # The non-2xx branch now lives inside the wait_for-wrapped inner coro; + # confirm it still raises with the HTTP status and truncated body. + session = _StubSession(_StubResponse(status=500, body="boom")) + with pytest.raises(RuntimeError, match="iLink POST ep HTTP 500: boom"): + asyncio.run( + weixin._api_post( + session, + base_url="https://weixin.example.com", + endpoint="ep", + payload={"k": "v"}, + token="tok", + timeout_ms=5000, + ) + ) + + def test_api_get_raises_runtime_error_on_non_ok_status(self): + session = _StubSession(_StubResponse(status=500, body="boom")) + with pytest.raises(RuntimeError, match="iLink GET ep HTTP 500: boom"): + asyncio.run( + weixin._api_get( + session, + base_url="https://weixin.example.com", + endpoint="ep", + timeout_ms=5000, + ) + ) + + def test_get_updates_returns_empty_sentinel_on_timeout(self): + # wait_for raises asyncio.TimeoutError, which _get_updates swallows into + # an empty long-poll batch rather than propagating. + session = _StubSession(_StubResponse(delay=1.0)) + result = asyncio.run( + weixin._get_updates( + session, + base_url="https://weixin.example.com", + token="tok", + sync_buf="buf-123", + timeout_ms=1, + ) + ) + assert result == {"ret": 0, "msgs": [], "get_updates_buf": "buf-123"}