310 lines
12 KiB
Python
310 lines
12 KiB
Python
"""Tests for config-driven platform access policies at the gateway layer.
|
|
|
|
Background (#34515): WeCom, Weixin, Yuanbao, QQBot, and WhatsApp expose a
|
|
documented config-driven access surface (``dm_policy`` / ``group_policy`` /
|
|
``allow_from`` / ``group_allow_from`` in ``PlatformConfig.extra``) and enforce
|
|
it at intake —
|
|
a message is dropped inside the adapter and never reaches the gateway unless it
|
|
already passed that policy.
|
|
|
|
The gateway's env-based allowlist check (``_is_user_authorized``) runs *after*
|
|
the adapter. Before the fix it fell through to an env-only default-deny when no
|
|
``PLATFORM_ALLOWED_USERS`` env var was set, silently rejecting ``dm_policy:
|
|
open`` and config-only allowlists even though the adapter had already
|
|
authorized the sender.
|
|
|
|
The fix is a single drift-proof contract: adapters that own their access policy
|
|
declare ``enforces_own_access_policy`` (a ``BasePlatformAdapter`` property,
|
|
default ``False``). The gateway trusts that flag and skips the env-only
|
|
default-deny for those platforms, rather than re-implementing each adapter's
|
|
policy logic a second time.
|
|
"""
|
|
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
|
from gateway.session import SessionSource
|
|
|
|
|
|
# Platforms whose adapters own their access policy at intake.
|
|
_OWN_POLICY_PLATFORMS = [
|
|
Platform.WECOM,
|
|
Platform.WEIXIN,
|
|
Platform.YUANBAO,
|
|
Platform.QQBOT,
|
|
Platform.WHATSAPP,
|
|
]
|
|
|
|
|
|
def _clear_auth_env(monkeypatch) -> None:
|
|
for key in (
|
|
"WECOM_ALLOWED_USERS",
|
|
"WEIXIN_ALLOWED_USERS",
|
|
"YUANBAO_ALLOWED_USERS",
|
|
"QQ_ALLOWED_USERS",
|
|
"QQ_GROUP_ALLOWED_USERS",
|
|
"WHATSAPP_ALLOWED_USERS",
|
|
"TELEGRAM_ALLOWED_USERS",
|
|
"GATEWAY_ALLOWED_USERS",
|
|
"GATEWAY_ALLOW_ALL_USERS",
|
|
"WECOM_ALLOW_ALL_USERS",
|
|
"WEIXIN_ALLOW_ALL_USERS",
|
|
"YUANBAO_ALLOW_ALL_USERS",
|
|
"QQ_ALLOW_ALL_USERS",
|
|
"WHATSAPP_ALLOW_ALL_USERS",
|
|
):
|
|
monkeypatch.delenv(key, raising=False)
|
|
|
|
|
|
def _make_runner(platform: Platform, config: GatewayConfig, *, enforces: bool):
|
|
"""Build a bare GatewayRunner with one adapter for *platform*.
|
|
|
|
``enforces`` controls whether the adapter declares
|
|
``enforces_own_access_policy`` — i.e. whether it owns its access gate.
|
|
"""
|
|
from gateway.run import GatewayRunner
|
|
|
|
runner = object.__new__(GatewayRunner)
|
|
runner.config = config
|
|
adapter = SimpleNamespace(send=AsyncMock(), enforces_own_access_policy=enforces)
|
|
runner.adapters = {platform: adapter}
|
|
runner.pairing_store = MagicMock()
|
|
runner.pairing_store.is_approved.return_value = False
|
|
runner.pairing_store._is_rate_limited.return_value = False
|
|
return runner, adapter
|
|
|
|
|
|
def _source(platform: Platform, *, chat_type: str = "dm") -> SessionSource:
|
|
return SessionSource(
|
|
platform=platform,
|
|
user_id="some-user",
|
|
chat_id="some-chat",
|
|
user_name="tester",
|
|
chat_type=chat_type,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 1: the base-class contract and per-adapter overrides
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_base_adapter_defaults_to_not_owning_access_policy():
|
|
"""Adapters that don't override the property delegate to the gateway."""
|
|
from gateway.platforms.base import BasePlatformAdapter
|
|
|
|
# The default lives on the base property descriptor.
|
|
assert BasePlatformAdapter.enforces_own_access_policy.fget(object()) is False
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"module_path, class_name",
|
|
[
|
|
("gateway.platforms.wecom", "WeComAdapter"),
|
|
("gateway.platforms.weixin", "WeixinAdapter"),
|
|
("gateway.platforms.yuanbao", "YuanbaoAdapter"),
|
|
("gateway.platforms.qqbot.adapter", "QQAdapter"),
|
|
("gateway.platforms.whatsapp", "WhatsAppAdapter"),
|
|
],
|
|
)
|
|
def test_own_policy_adapters_declare_the_flag(module_path, class_name):
|
|
"""The config-policy adapters override the flag to True."""
|
|
import importlib
|
|
|
|
module = importlib.import_module(module_path)
|
|
adapter_cls = getattr(module, class_name)
|
|
# Property is overridden on the subclass and returns True regardless of
|
|
# instance state (it reflects a static capability, not runtime config).
|
|
value = adapter_cls.enforces_own_access_policy.fget(object.__new__(adapter_cls))
|
|
assert value is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 2: gateway trusts the adapter-enforced flag
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS)
|
|
def test_own_policy_platform_authorized_without_env_allowlist(monkeypatch, platform):
|
|
"""A message reaching the gateway from an own-policy adapter is trusted.
|
|
|
|
With no env allowlist set, the gateway must NOT default-deny — the adapter
|
|
already authorized the sender at intake (e.g. ``dm_policy: open``).
|
|
"""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={platform: PlatformConfig(enabled=True, extra={"dm_policy": "open"})}
|
|
)
|
|
runner, _adapter = _make_runner(platform, config, enforces=True)
|
|
|
|
assert runner._is_user_authorized(_source(platform)) is True
|
|
|
|
|
|
@pytest.mark.parametrize("platform", _OWN_POLICY_PLATFORMS)
|
|
def test_own_policy_platform_authorized_for_group_chat(monkeypatch, platform):
|
|
"""Group traffic from an own-policy adapter is trusted the same way."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={platform: PlatformConfig(enabled=True, extra={"group_policy": "open"})}
|
|
)
|
|
runner, _adapter = _make_runner(platform, config, enforces=True)
|
|
|
|
assert runner._is_user_authorized(_source(platform, chat_type="group")) is True
|
|
|
|
|
|
def test_non_owning_platform_still_default_denies(monkeypatch):
|
|
"""Adapters that don't own their policy keep the env-only default-deny."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="t")}
|
|
)
|
|
runner, _adapter = _make_runner(Platform.TELEGRAM, config, enforces=False)
|
|
|
|
assert runner._is_user_authorized(_source(Platform.TELEGRAM)) is False
|
|
|
|
|
|
def test_env_allowlist_still_takes_precedence_for_own_policy_platform(monkeypatch):
|
|
"""When an env allowlist IS set, it governs — adapter trust is a fallback.
|
|
|
|
The adapter-trust branch only fires when no env allowlist exists, so an
|
|
operator who sets ``WECOM_ALLOWED_USERS`` still gets env-based gating and
|
|
a non-listed user is denied.
|
|
"""
|
|
_clear_auth_env(monkeypatch)
|
|
monkeypatch.setenv("WECOM_ALLOWED_USERS", "allowed-user")
|
|
config = GatewayConfig(
|
|
platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": "open"})}
|
|
)
|
|
runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
|
|
listed = SessionSource(
|
|
platform=Platform.WECOM, user_id="allowed-user", chat_id="c",
|
|
user_name="t", chat_type="dm",
|
|
)
|
|
stranger = SessionSource(
|
|
platform=Platform.WECOM, user_id="stranger", chat_id="c",
|
|
user_name="t", chat_type="dm",
|
|
)
|
|
assert runner._is_user_authorized(listed) is True
|
|
assert runner._is_user_authorized(stranger) is False
|
|
|
|
|
|
def test_unknown_adapter_does_not_crash_trust_check(monkeypatch):
|
|
"""No adapter registered for the platform → safe default-deny."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(platforms={Platform.WECOM: PlatformConfig(enabled=True)})
|
|
runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
runner.adapters = {} # nothing registered
|
|
|
|
assert runner._adapter_enforces_own_access_policy(Platform.WECOM) is False
|
|
assert runner._is_user_authorized(_source(Platform.WECOM)) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 2b: `dm_policy: pairing` is NOT blanket-trusted
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# Regression: WeCom/Weixin document ``dm_policy: pairing`` and declare
|
|
# ``enforces_own_access_policy=True``, but their intake helper only special-cases
|
|
# ``disabled`` / ``allowlist`` — ``pairing`` falls through and forwards the DM so
|
|
# the gateway can run its pairing handshake. With no env allowlist, the
|
|
# adapter-trust shortcut above then authorized *every* unpaired sender, silently
|
|
# degrading pairing mode to open access. The shortcut must skip pairing-mode DMs
|
|
# so an unpaired sender falls through to default-deny (and gets a pairing code).
|
|
|
|
|
|
@pytest.mark.parametrize("platform", [Platform.WECOM, Platform.WEIXIN])
|
|
def test_pairing_dm_policy_not_blanket_authorized(monkeypatch, platform):
|
|
"""An unpaired sender in ``dm_policy: pairing`` is NOT authorized."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={platform: PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})}
|
|
)
|
|
runner, _adapter = _make_runner(platform, config, enforces=True)
|
|
# pairing_store.is_approved already returns False (set in _make_runner).
|
|
|
|
assert runner._is_user_authorized(_source(platform)) is False
|
|
|
|
|
|
def test_pairing_dm_policy_authorizes_paired_user(monkeypatch):
|
|
"""Once approved in the pairing store, the sender authorizes normally."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": "pairing"})}
|
|
)
|
|
runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
runner.pairing_store.is_approved.return_value = True
|
|
|
|
assert runner._is_user_authorized(_source(Platform.WECOM)) is True
|
|
|
|
|
|
def test_pairing_carveout_reads_adapter_when_env_set(monkeypatch):
|
|
"""Env-only ``WECOM_DM_POLICY=pairing`` (absent from config.extra) is honored.
|
|
|
|
The adapter resolves ``dm_policy`` from the env var, so its ``_dm_policy`` is
|
|
authoritative even when ``config.extra`` is empty. The carve-out must read
|
|
that, not just config.
|
|
"""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={})}
|
|
)
|
|
runner, adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
adapter._dm_policy = "pairing" # as the adapter would resolve from the env var
|
|
|
|
assert runner._is_user_authorized(_source(Platform.WECOM)) is False
|
|
|
|
|
|
def test_pairing_dm_policy_group_chat_still_trusted(monkeypatch):
|
|
"""Pairing is DM-only — group traffic keeps the adapter-trust path."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={
|
|
Platform.WECOM: PlatformConfig(
|
|
enabled=True, extra={"dm_policy": "pairing", "group_policy": "open"}
|
|
)
|
|
}
|
|
)
|
|
runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
|
|
assert runner._is_user_authorized(_source(Platform.WECOM, chat_type="group")) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer 3: unauthorized-DM behavior reads config dm_policy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"dm_policy, expected",
|
|
[
|
|
("allowlist", "ignore"),
|
|
("disabled", "ignore"),
|
|
("pairing", "pair"),
|
|
],
|
|
)
|
|
def test_unauthorized_dm_behavior_follows_config_dm_policy(monkeypatch, dm_policy, expected):
|
|
"""A restrictive dm_policy drops unauthorized DMs; pairing opts back in."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": dm_policy})}
|
|
)
|
|
runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
|
|
assert runner._get_unauthorized_dm_behavior(Platform.WECOM) == expected
|
|
|
|
|
|
def test_unauthorized_dm_behavior_open_policy_keeps_default(monkeypatch):
|
|
"""``dm_policy: open`` is not restrictive → falls through to the default."""
|
|
_clear_auth_env(monkeypatch)
|
|
config = GatewayConfig(
|
|
platforms={Platform.WECOM: PlatformConfig(enabled=True, extra={"dm_policy": "open"})}
|
|
)
|
|
runner, _adapter = _make_runner(Platform.WECOM, config, enforces=True)
|
|
|
|
# No allowlist + no restrictive policy → open-gateway pairing default.
|
|
assert runner._get_unauthorized_dm_behavior(Platform.WECOM) == "pair"
|