test(weixin): regression suite for _api_post/_api_get timeout migration

This commit is contained in:
firefly
2026-05-29 22:10:37 -04:00
committed by Teknium
parent 566669013f
commit 765790a216

View File

@ -968,3 +968,148 @@ class TestWeixinTextDebounce:
asyncio.run(_drive())
assert dispatched == ["one\ntwo\nthree"]
class _StubResponse:
def __init__(self, *, status=200, body="{}", delay=0.0):
self.status = status
self.ok = 200 <= status < 300
self._body = body
self._delay = delay
async def __aenter__(self):
return self
async def __aexit__(self, *_exc):
return False
async def text(self):
if self._delay:
await asyncio.sleep(self._delay)
return self._body
class _StubSession:
"""Records request kwargs and returns a configurable async-CM response.
Unlike aiohttp.ClientSession it installs no TimerContext, so it cannot
reproduce aiohttp's cross-loop crash directly; these tests instead pin the
observable contract of the asyncio.wait_for migration.
"""
def __init__(self, response):
self._response = response
self.post_calls = []
self.get_calls = []
def post(self, url, **kwargs):
self.post_calls.append((url, kwargs))
return self._response
def get(self, url, **kwargs):
self.get_calls.append((url, kwargs))
return self._response
class TestWeixinApiTimeout:
def test_api_post_does_not_pass_aiohttp_timeout_kwarg(self):
session = _StubSession(_StubResponse(body='{"ret": 0}'))
result = asyncio.run(
weixin._api_post(
session,
base_url="https://weixin.example.com",
endpoint="ep",
payload={"k": "v"},
token="tok",
timeout_ms=5000,
)
)
assert result == {"ret": 0}
# The fix enforces the timeout via asyncio.wait_for, so ClientTimeout is
# gone and `timeout` is no longer forwarded to session.post().
[(_url, kwargs)] = session.post_calls
assert "timeout" not in kwargs
def test_api_get_does_not_pass_aiohttp_timeout_kwarg(self):
session = _StubSession(_StubResponse(body='{"ret": 0}'))
result = asyncio.run(
weixin._api_get(
session,
base_url="https://weixin.example.com",
endpoint="ep",
timeout_ms=5000,
)
)
assert result == {"ret": 0}
[(_url, kwargs)] = session.get_calls
assert "timeout" not in kwargs
def test_api_post_raises_timeout_when_response_is_slow(self):
# 1 ms budget against a 1 s response: wait_for must cancel and raise.
session = _StubSession(_StubResponse(delay=1.0))
with pytest.raises(asyncio.TimeoutError):
asyncio.run(
weixin._api_post(
session,
base_url="https://weixin.example.com",
endpoint="ep",
payload={"k": "v"},
token="tok",
timeout_ms=1,
)
)
def test_api_get_raises_timeout_when_response_is_slow(self):
session = _StubSession(_StubResponse(delay=1.0))
with pytest.raises(asyncio.TimeoutError):
asyncio.run(
weixin._api_get(
session,
base_url="https://weixin.example.com",
endpoint="ep",
timeout_ms=1,
)
)
def test_api_post_raises_runtime_error_on_non_ok_status(self):
# The non-2xx branch now lives inside the wait_for-wrapped inner coro;
# confirm it still raises with the HTTP status and truncated body.
session = _StubSession(_StubResponse(status=500, body="boom"))
with pytest.raises(RuntimeError, match="iLink POST ep HTTP 500: boom"):
asyncio.run(
weixin._api_post(
session,
base_url="https://weixin.example.com",
endpoint="ep",
payload={"k": "v"},
token="tok",
timeout_ms=5000,
)
)
def test_api_get_raises_runtime_error_on_non_ok_status(self):
session = _StubSession(_StubResponse(status=500, body="boom"))
with pytest.raises(RuntimeError, match="iLink GET ep HTTP 500: boom"):
asyncio.run(
weixin._api_get(
session,
base_url="https://weixin.example.com",
endpoint="ep",
timeout_ms=5000,
)
)
def test_get_updates_returns_empty_sentinel_on_timeout(self):
# wait_for raises asyncio.TimeoutError, which _get_updates swallows into
# an empty long-poll batch rather than propagating.
session = _StubSession(_StubResponse(delay=1.0))
result = asyncio.run(
weixin._get_updates(
session,
base_url="https://weixin.example.com",
token="tok",
sync_buf="buf-123",
timeout_ms=1,
)
)
assert result == {"ret": 0, "msgs": [], "get_updates_buf": "buf-123"}