Adds a bundled dashboard-auth provider plugin that authenticates the
web dashboard against any conformant self-hosted OpenID Connect server
(Authentik, Keycloak, Zitadel, Authelia, Auth0, Okta, Google, …) using
standard OIDC — no per-IDP code.
It's a pure drop-in plugin implementing the DashboardAuthProvider
protocol; it touches no core auth/runtime/login paths. Mechanics:
- OIDC discovery from {issuer}/.well-known/openid-configuration
(cached; issuer pinned; endpoints required HTTPS, loopback http
allowed for local-dev IDPs)
- authorization-code + PKCE (S256), public client
- verifies the OIDC ID token (RS256/ES256) against the discovered
jwks_uri with iss/aud pinned to the configured issuer/client_id, and
maps standard claims (sub/email/name/preferred_username, groups→org)
onto a Session
- standard refresh_token grant for silent re-auth; RFC 7009 revocation
on logout when advertised
Verifies the ID token (not the access token) because OIDC guarantees the
ID token is a signed JWT carrying identity, while access-token format is
opaque to the client per spec — the only universally-correct choice
across self-hosted IDPs.
Config via dashboard.oauth.self_hosted.{issuer,client_id,scopes} in
config.yaml or HERMES_DASHBOARD_OIDC_{ISSUER,CLIENT_ID,SCOPES} env vars
(env-wins-config, empty-is-unset — same convention as the nous plugin).
Confidential clients (client_secret) left as a documented TODO seam.
Docs: adds a Self-hosted OIDC section to the web-dashboard guide,
including a copy-paste Keycloak worked example (realm import + docker
run + dashboard wiring + login walkthrough).
Tests: 65 cases covering construction, discovery (incl. issuer
mismatch + https enforcement), start_login/PKCE, complete_login, ID
token verification, refresh/revoke, and env/config precedence.
883 lines
34 KiB
Python
883 lines
34 KiB
Python
"""Tests for the bundled self-hosted OIDC dashboard-auth plugin.
|
|
|
|
Covers, by analogy with ``test_nous_provider.py``:
|
|
|
|
1. Plugin entry-point registration gating (env + config.yaml precedence).
|
|
2. ``start_login`` shape (PKCE/state, authorize URL parameters, OIDC discovery).
|
|
3. ``complete_login`` httpx-mocked happy path + error mapping (ID-token grant).
|
|
4. ``verify_session`` ID-token verification — RSA keypair, audience/issuer
|
|
pinning, standard OIDC claim mapping (sub/email/name/groups).
|
|
5. ``refresh_session`` rotation + error mapping, ``revoke_session`` (RFC 7009).
|
|
6. OIDC discovery: endpoint extraction, issuer pinning, https enforcement.
|
|
|
|
All HTTP is mocked: nothing here talks to a real IDP.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import time
|
|
import urllib.parse
|
|
from typing import Any, Dict
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import httpx
|
|
import jwt
|
|
import pytest
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
|
|
import plugins.dashboard_auth.self_hosted as oidc_plugin
|
|
from hermes_cli.dashboard_auth import (
|
|
InvalidCodeError,
|
|
LoginStart,
|
|
ProviderError,
|
|
RefreshExpiredError,
|
|
Session,
|
|
assert_protocol_compliance,
|
|
)
|
|
|
|
_ISSUER = "https://auth.example.com/application/o/hermes"
|
|
_CLIENT_ID = "hermes-dashboard"
|
|
|
|
_DISCOVERY_DOC = {
|
|
"issuer": _ISSUER,
|
|
"authorization_endpoint": f"{_ISSUER}/authorize",
|
|
"token_endpoint": f"{_ISSUER}/token",
|
|
"jwks_uri": f"{_ISSUER}/jwks",
|
|
"revocation_endpoint": f"{_ISSUER}/revoke",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RSA keypair fixture (module-scope — keygen is slow)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def rsa_keypair() -> Dict[str, Any]:
|
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
private_pem = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
).decode()
|
|
public_numbers = key.public_key().public_numbers()
|
|
|
|
def _b64url_uint(n: int) -> str:
|
|
length = (n.bit_length() + 7) // 8
|
|
return (
|
|
base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
|
|
)
|
|
|
|
jwk = {
|
|
"kty": "RSA",
|
|
"use": "sig",
|
|
"alg": "RS256",
|
|
"kid": "test-key-1",
|
|
"n": _b64url_uint(public_numbers.n),
|
|
"e": _b64url_uint(public_numbers.e),
|
|
}
|
|
return {"private_pem": private_pem, "jwk": jwk, "kid": jwk["kid"]}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token-mint helper — standard OIDC ID-token claims
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _mint_id_token(
|
|
rsa_keypair: Dict[str, Any],
|
|
*,
|
|
iss: str = _ISSUER,
|
|
aud: str = _CLIENT_ID,
|
|
sub: str = "usr_abc",
|
|
email: str | None = "alice@example.com",
|
|
name: str | None = "Alice Example",
|
|
groups: Any = None,
|
|
org_id: str | None = None,
|
|
ttl_seconds: int = 900,
|
|
extra_claims: Dict[str, Any] | None = None,
|
|
) -> str:
|
|
now = int(time.time())
|
|
claims: Dict[str, Any] = {
|
|
"iss": iss,
|
|
"aud": aud,
|
|
"sub": sub,
|
|
"iat": now,
|
|
"exp": now + ttl_seconds,
|
|
}
|
|
if email is not None:
|
|
claims["email"] = email
|
|
if name is not None:
|
|
claims["name"] = name
|
|
if groups is not None:
|
|
claims["groups"] = groups
|
|
if org_id is not None:
|
|
claims["org_id"] = org_id
|
|
if extra_claims:
|
|
claims.update(extra_claims)
|
|
return jwt.encode(
|
|
claims,
|
|
rsa_keypair["private_pem"],
|
|
algorithm="RS256",
|
|
headers={"kid": rsa_keypair["kid"]},
|
|
)
|
|
|
|
|
|
def _make_provider(rsa_keypair, *, scopes: str | None = None):
|
|
"""Construct a provider with discovery + JWKS stubbed (no network)."""
|
|
kwargs: Dict[str, Any] = {"issuer": _ISSUER, "client_id": _CLIENT_ID}
|
|
if scopes is not None:
|
|
kwargs["scopes"] = scopes
|
|
p = oidc_plugin.SelfHostedOIDCProvider(**kwargs)
|
|
# Pre-seed discovery so nothing hits the network.
|
|
p._discovery = dict(_DISCOVERY_DOC)
|
|
p._discovery_fetched_at = time.time()
|
|
# Patch the JWKS client to return our fixture key.
|
|
fake_key = MagicMock()
|
|
fake_key.key = serialization.load_pem_private_key(
|
|
rsa_keypair["private_pem"].encode(), password=None
|
|
).public_key()
|
|
fake_client = MagicMock()
|
|
fake_client.get_signing_key_from_jwt.return_value = fake_key
|
|
p._jwks_client = fake_client
|
|
return p
|
|
|
|
|
|
def _mock_post(status_code: int, body: Any, *, ctype: str = "application/json"):
|
|
resp = MagicMock(spec=httpx.Response)
|
|
resp.status_code = status_code
|
|
if isinstance(body, dict):
|
|
resp.text = json.dumps(body)
|
|
resp.json = MagicMock(return_value=body)
|
|
else:
|
|
resp.text = body
|
|
resp.json = MagicMock(side_effect=ValueError("not json"))
|
|
resp.headers = {"content-type": ctype}
|
|
return resp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestConstruction:
|
|
def test_protocol_compliance(self):
|
|
assert_protocol_compliance(oidc_plugin.SelfHostedOIDCProvider)
|
|
|
|
def test_name_and_display(self):
|
|
p = oidc_plugin.SelfHostedOIDCProvider(issuer=_ISSUER, client_id=_CLIENT_ID)
|
|
assert p.name == "self-hosted"
|
|
assert p.display_name == "Self-Hosted OIDC"
|
|
|
|
def test_strips_trailing_slash_from_issuer(self):
|
|
p = oidc_plugin.SelfHostedOIDCProvider(
|
|
issuer=_ISSUER + "/", client_id=_CLIENT_ID
|
|
)
|
|
assert p._issuer == _ISSUER
|
|
|
|
def test_requires_issuer(self):
|
|
with pytest.raises(ValueError, match="issuer"):
|
|
oidc_plugin.SelfHostedOIDCProvider(issuer="", client_id=_CLIENT_ID)
|
|
|
|
def test_requires_client_id(self):
|
|
with pytest.raises(ValueError, match="client_id"):
|
|
oidc_plugin.SelfHostedOIDCProvider(issuer=_ISSUER, client_id="")
|
|
|
|
def test_rejects_non_https_issuer(self):
|
|
with pytest.raises(ProviderError, match="https"):
|
|
oidc_plugin.SelfHostedOIDCProvider(
|
|
issuer="http://auth.example.com", client_id=_CLIENT_ID
|
|
)
|
|
|
|
def test_allows_http_localhost_issuer(self):
|
|
# Local dev against a loopback IDP is allowed.
|
|
p = oidc_plugin.SelfHostedOIDCProvider(
|
|
issuer="http://localhost:9000", client_id=_CLIENT_ID
|
|
)
|
|
assert p._issuer == "http://localhost:9000"
|
|
|
|
def test_default_scopes(self):
|
|
p = oidc_plugin.SelfHostedOIDCProvider(issuer=_ISSUER, client_id=_CLIENT_ID)
|
|
assert p._scopes == "openid profile email"
|
|
|
|
def test_empty_scopes_falls_back_to_default(self):
|
|
p = oidc_plugin.SelfHostedOIDCProvider(
|
|
issuer=_ISSUER, client_id=_CLIENT_ID, scopes=" "
|
|
)
|
|
assert p._scopes == "openid profile email"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OIDC discovery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDiscovery:
|
|
def _provider(self):
|
|
return oidc_plugin.SelfHostedOIDCProvider(
|
|
issuer=_ISSUER, client_id=_CLIENT_ID
|
|
)
|
|
|
|
def _mock_get(self, status_code, body, *, ctype="application/json"):
|
|
resp = MagicMock(spec=httpx.Response)
|
|
resp.status_code = status_code
|
|
resp.json = MagicMock(return_value=body)
|
|
resp.text = json.dumps(body) if isinstance(body, dict) else str(body)
|
|
resp.headers = {"content-type": ctype}
|
|
return resp
|
|
|
|
def test_discovery_url(self):
|
|
p = self._provider()
|
|
assert p._discovery_url() == (
|
|
f"{_ISSUER}/.well-known/openid-configuration"
|
|
)
|
|
|
|
def test_fetches_and_caches(self):
|
|
p = self._provider()
|
|
mock_resp = self._mock_get(200, dict(_DISCOVERY_DOC))
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
|
|
) as mock_get:
|
|
disco1 = p._get_discovery()
|
|
disco2 = p._get_discovery()
|
|
assert disco1["token_endpoint"] == f"{_ISSUER}/token"
|
|
assert disco1["authorization_endpoint"] == f"{_ISSUER}/authorize"
|
|
assert disco1["jwks_uri"] == f"{_ISSUER}/jwks"
|
|
assert disco1["revocation_endpoint"] == f"{_ISSUER}/revoke"
|
|
# Cached — only one network call.
|
|
assert mock_get.call_count == 1
|
|
assert disco2 is disco1
|
|
|
|
def test_discovery_404_raises(self):
|
|
p = self._provider()
|
|
mock_resp = self._mock_get(404, {})
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="404"):
|
|
p._get_discovery()
|
|
|
|
def test_discovery_unreachable_raises(self):
|
|
p = self._provider()
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get",
|
|
side_effect=httpx.ConnectError("no route"),
|
|
):
|
|
with pytest.raises(ProviderError, match="unreachable"):
|
|
p._get_discovery()
|
|
|
|
def test_discovery_missing_endpoint_raises(self):
|
|
p = self._provider()
|
|
doc = dict(_DISCOVERY_DOC)
|
|
del doc["token_endpoint"]
|
|
mock_resp = self._mock_get(200, doc)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="token_endpoint"):
|
|
p._get_discovery()
|
|
|
|
def test_discovery_issuer_mismatch_raises(self):
|
|
p = self._provider()
|
|
doc = dict(_DISCOVERY_DOC)
|
|
doc["issuer"] = "https://evil.example"
|
|
mock_resp = self._mock_get(200, doc)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="issuer mismatch"):
|
|
p._get_discovery()
|
|
|
|
def test_discovery_issuer_trailing_slash_tolerated(self):
|
|
p = self._provider()
|
|
doc = dict(_DISCOVERY_DOC)
|
|
doc["issuer"] = _ISSUER + "/" # only a trailing-slash difference
|
|
mock_resp = self._mock_get(200, doc)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
|
|
):
|
|
disco = p._get_discovery()
|
|
assert disco["token_endpoint"] == f"{_ISSUER}/token"
|
|
|
|
def test_discovery_rejects_non_https_endpoint(self):
|
|
p = self._provider()
|
|
doc = dict(_DISCOVERY_DOC)
|
|
doc["token_endpoint"] = "http://auth.example.com/token" # not loopback
|
|
mock_resp = self._mock_get(200, doc)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="https"):
|
|
p._get_discovery()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# start_login
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStartLogin:
|
|
@pytest.fixture
|
|
def provider(self, rsa_keypair):
|
|
return _make_provider(rsa_keypair)
|
|
|
|
def test_returns_login_start(self, provider):
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
assert isinstance(result, LoginStart)
|
|
|
|
def test_redirect_url_targets_authorize_endpoint(self, provider):
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
assert result.redirect_url.startswith(f"{_ISSUER}/authorize?")
|
|
|
|
def test_authorize_url_has_required_params(self, provider):
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
parsed = urllib.parse.urlparse(result.redirect_url)
|
|
params = dict(urllib.parse.parse_qsl(parsed.query))
|
|
assert params["response_type"] == "code"
|
|
assert params["client_id"] == _CLIENT_ID
|
|
assert params["redirect_uri"] == "https://hermes.example/auth/callback"
|
|
assert params["scope"] == "openid profile email"
|
|
assert params["code_challenge_method"] == "S256"
|
|
assert "state" in params
|
|
assert "code_challenge" in params
|
|
|
|
def test_custom_scopes_used(self, rsa_keypair):
|
|
provider = _make_provider(rsa_keypair, scopes="openid email groups")
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
parsed = urllib.parse.urlparse(result.redirect_url)
|
|
params = dict(urllib.parse.parse_qsl(parsed.query))
|
|
assert params["scope"] == "openid email groups"
|
|
|
|
def test_code_verifier_length(self, provider):
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
pkce = result.cookie_payload["hermes_session_pkce"]
|
|
parts = dict(seg.split("=", 1) for seg in pkce.split(";") if "=" in seg)
|
|
assert 43 <= len(parts["verifier"]) <= 128 # RFC 7636 §4.1
|
|
|
|
def test_state_in_cookie_matches_url(self, provider):
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
parsed = urllib.parse.urlparse(result.redirect_url)
|
|
params = dict(urllib.parse.parse_qsl(parsed.query))
|
|
pkce = result.cookie_payload["hermes_session_pkce"]
|
|
parts = dict(seg.split("=", 1) for seg in pkce.split(";") if "=" in seg)
|
|
assert parts["state"] == params["state"]
|
|
|
|
def test_code_challenge_is_s256_of_verifier(self, provider):
|
|
result = provider.start_login(
|
|
redirect_uri="https://hermes.example/auth/callback"
|
|
)
|
|
parsed = urllib.parse.urlparse(result.redirect_url)
|
|
params = dict(urllib.parse.parse_qsl(parsed.query))
|
|
pkce = result.cookie_payload["hermes_session_pkce"]
|
|
parts = dict(seg.split("=", 1) for seg in pkce.split(";") if "=" in seg)
|
|
expected = (
|
|
base64.urlsafe_b64encode(
|
|
hashlib.sha256(parts["verifier"].encode("ascii")).digest()
|
|
)
|
|
.rstrip(b"=")
|
|
.decode()
|
|
)
|
|
assert params["code_challenge"] == expected
|
|
|
|
def test_two_calls_differ(self, provider):
|
|
a = provider.start_login(redirect_uri="https://hermes.example/auth/callback")
|
|
b = provider.start_login(redirect_uri="https://hermes.example/auth/callback")
|
|
assert (
|
|
a.cookie_payload["hermes_session_pkce"]
|
|
!= b.cookie_payload["hermes_session_pkce"]
|
|
)
|
|
|
|
def test_rejects_wrong_callback_path(self, provider):
|
|
with pytest.raises(ProviderError, match="/auth/callback"):
|
|
provider.start_login(redirect_uri="https://x.example/oauth/cb")
|
|
|
|
def test_allows_http_localhost_redirect(self, provider):
|
|
provider.start_login(redirect_uri="http://localhost:8080/auth/callback")
|
|
provider.start_login(redirect_uri="http://127.0.0.1:8080/auth/callback")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# complete_login
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCompleteLogin:
|
|
@pytest.fixture
|
|
def provider(self, rsa_keypair):
|
|
return _make_provider(rsa_keypair)
|
|
|
|
def test_happy_path_returns_session(self, provider, rsa_keypair):
|
|
id_token = _mint_id_token(rsa_keypair)
|
|
mock_resp = _mock_post(
|
|
200,
|
|
{
|
|
"access_token": "opaque-at",
|
|
"id_token": id_token,
|
|
"token_type": "Bearer",
|
|
"refresh_token": "rt_initial",
|
|
},
|
|
)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
session = provider.complete_login(
|
|
code="abc",
|
|
state="s",
|
|
code_verifier="vfy",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
assert isinstance(session, Session)
|
|
assert session.user_id == "usr_abc"
|
|
assert session.provider == "self-hosted"
|
|
assert session.email == "alice@example.com"
|
|
assert session.display_name == "Alice Example"
|
|
# The verified ID token is stored in the access_token slot.
|
|
assert session.access_token == id_token
|
|
assert session.refresh_token == "rt_initial"
|
|
|
|
def test_tolerates_missing_refresh_token(self, provider, rsa_keypair):
|
|
id_token = _mint_id_token(rsa_keypair)
|
|
mock_resp = _mock_post(
|
|
200, {"id_token": id_token, "token_type": "Bearer"}
|
|
)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
session = provider.complete_login(
|
|
code="abc",
|
|
state="s",
|
|
code_verifier="vfy",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
assert session.refresh_token == ""
|
|
|
|
def test_missing_id_token_raises(self, provider):
|
|
mock_resp = _mock_post(
|
|
200, {"access_token": "opaque", "token_type": "Bearer"}
|
|
)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="id_token"):
|
|
provider.complete_login(
|
|
code="x",
|
|
state="s",
|
|
code_verifier="v",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
|
|
def test_400_raises_invalid_code(self, provider):
|
|
mock_resp = _mock_post(400, {"error": "invalid_grant"})
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
with pytest.raises(InvalidCodeError, match="invalid_grant"):
|
|
provider.complete_login(
|
|
code="bad",
|
|
state="s",
|
|
code_verifier="v",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
|
|
def test_500_raises_provider_error(self, provider):
|
|
mock_resp = _mock_post(500, "boom", ctype="text/plain")
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="500"):
|
|
provider.complete_login(
|
|
code="x",
|
|
state="s",
|
|
code_verifier="v",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
|
|
def test_network_error_raises_provider_error(self, provider):
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post",
|
|
side_effect=httpx.ConnectError("conn refused"),
|
|
):
|
|
with pytest.raises(ProviderError, match="unreachable"):
|
|
provider.complete_login(
|
|
code="x",
|
|
state="s",
|
|
code_verifier="v",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
|
|
def test_unexpected_token_type_raises(self, provider, rsa_keypair):
|
|
id_token = _mint_id_token(rsa_keypair)
|
|
mock_resp = _mock_post(
|
|
200, {"id_token": id_token, "token_type": "DPoP"}
|
|
)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
with pytest.raises(ProviderError, match="token_type"):
|
|
provider.complete_login(
|
|
code="x",
|
|
state="s",
|
|
code_verifier="v",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
|
|
def test_posts_authorization_code_grant(self, provider, rsa_keypair):
|
|
id_token = _mint_id_token(rsa_keypair)
|
|
mock_resp = _mock_post(200, {"id_token": id_token, "token_type": "Bearer"})
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
) as mock_post:
|
|
provider.complete_login(
|
|
code="the-code",
|
|
state="s",
|
|
code_verifier="the-verifier",
|
|
redirect_uri="https://hermes.example/auth/callback",
|
|
)
|
|
_, kwargs = mock_post.call_args
|
|
assert kwargs["data"]["grant_type"] == "authorization_code"
|
|
assert kwargs["data"]["code"] == "the-code"
|
|
assert kwargs["data"]["code_verifier"] == "the-verifier"
|
|
assert kwargs["data"]["client_id"] == _CLIENT_ID
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# verify_session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVerifySession:
|
|
@pytest.fixture
|
|
def provider(self, rsa_keypair):
|
|
return _make_provider(rsa_keypair)
|
|
|
|
def test_happy_path(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair)
|
|
session = provider.verify_session(access_token=token)
|
|
assert session is not None
|
|
assert session.user_id == "usr_abc"
|
|
assert session.email == "alice@example.com"
|
|
assert session.display_name == "Alice Example"
|
|
|
|
def test_expired_returns_none(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, ttl_seconds=-1)
|
|
assert provider.verify_session(access_token=token) is None
|
|
|
|
def test_wrong_audience_raises(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, aud="some-other-client")
|
|
with pytest.raises(ProviderError, match="verification failed"):
|
|
provider.verify_session(access_token=token)
|
|
|
|
def test_wrong_issuer_raises(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, iss="https://evil.example")
|
|
with pytest.raises(ProviderError, match="verification failed"):
|
|
provider.verify_session(access_token=token)
|
|
|
|
def test_failure_message_surfaces_claims(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, iss="https://evil.example")
|
|
with pytest.raises(ProviderError) as excinfo:
|
|
provider.verify_session(access_token=token)
|
|
msg = str(excinfo.value)
|
|
assert "'https://evil.example'" in msg
|
|
assert f"'{_ISSUER}'" in msg
|
|
|
|
def test_missing_sub_raises(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, sub="")
|
|
with pytest.raises(ProviderError, match="sub"):
|
|
provider.verify_session(access_token=token)
|
|
|
|
def test_display_name_falls_back_to_preferred_username(
|
|
self, provider, rsa_keypair
|
|
):
|
|
token = _mint_id_token(
|
|
rsa_keypair,
|
|
name=None,
|
|
email=None,
|
|
extra_claims={"preferred_username": "alice42"},
|
|
)
|
|
session = provider.verify_session(access_token=token)
|
|
assert session is not None
|
|
assert session.display_name == "alice42"
|
|
|
|
def test_org_id_from_org_claim(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, org_id="acme-corp")
|
|
session = provider.verify_session(access_token=token)
|
|
assert session is not None
|
|
assert session.org_id == "acme-corp"
|
|
|
|
def test_org_id_from_groups_when_no_org_claim(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair, groups=["admins", "users"])
|
|
session = provider.verify_session(access_token=token)
|
|
assert session is not None
|
|
assert session.org_id == "admins,users"
|
|
|
|
def test_org_id_empty_when_neither_present(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair)
|
|
session = provider.verify_session(access_token=token)
|
|
assert session is not None
|
|
assert session.org_id == ""
|
|
|
|
def test_jwks_unreachable_raises(self, provider, rsa_keypair):
|
|
token = _mint_id_token(rsa_keypair)
|
|
bad_client = MagicMock()
|
|
bad_client.get_signing_key_from_jwt.side_effect = jwt.PyJWKClientError(
|
|
"fetch failed"
|
|
)
|
|
provider._jwks_client = bad_client
|
|
with pytest.raises(ProviderError, match="JWKS"):
|
|
provider.verify_session(access_token=token)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# refresh_session + revoke_session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRefreshAndRevoke:
|
|
@pytest.fixture
|
|
def provider(self, rsa_keypair):
|
|
return _make_provider(rsa_keypair)
|
|
|
|
def test_refresh_happy_path_rotates(self, provider, rsa_keypair):
|
|
id_token = _mint_id_token(rsa_keypair)
|
|
mock_resp = _mock_post(
|
|
200,
|
|
{
|
|
"id_token": id_token,
|
|
"token_type": "Bearer",
|
|
"refresh_token": "rt_rotated",
|
|
},
|
|
)
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
) as mock_post:
|
|
session = provider.refresh_session(refresh_token="rt_old")
|
|
assert isinstance(session, Session)
|
|
assert session.access_token == id_token
|
|
assert session.refresh_token == "rt_rotated"
|
|
assert session.provider == "self-hosted"
|
|
_, kwargs = mock_post.call_args
|
|
assert kwargs["data"]["grant_type"] == "refresh_token"
|
|
assert kwargs["data"]["refresh_token"] == "rt_old"
|
|
assert kwargs["data"]["client_id"] == _CLIENT_ID
|
|
|
|
def test_refresh_keeps_previous_rt_when_idp_omits(self, provider, rsa_keypair):
|
|
# Some IDPs don't rotate; keep the caller's existing RT alive.
|
|
id_token = _mint_id_token(rsa_keypair)
|
|
mock_resp = _mock_post(200, {"id_token": id_token, "token_type": "Bearer"})
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
session = provider.refresh_session(refresh_token="rt_kept")
|
|
assert session.refresh_token == "rt_kept"
|
|
|
|
def test_refresh_400_raises_refresh_expired(self, provider):
|
|
mock_resp = _mock_post(400, {"error": "invalid_grant"})
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
|
|
):
|
|
with pytest.raises(RefreshExpiredError, match="invalid_grant"):
|
|
provider.refresh_session(refresh_token="rt_dead")
|
|
|
|
def test_refresh_empty_token_no_network(self, provider):
|
|
with patch("plugins.dashboard_auth.self_hosted.httpx.post") as mock_post:
|
|
with pytest.raises(RefreshExpiredError):
|
|
provider.refresh_session(refresh_token="")
|
|
mock_post.assert_not_called()
|
|
|
|
def test_refresh_network_error_raises_provider_error(self, provider):
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post",
|
|
side_effect=httpx.RequestError("boom"),
|
|
):
|
|
with pytest.raises(ProviderError, match="unreachable"):
|
|
provider.refresh_session(refresh_token="rt_x")
|
|
|
|
def test_revoke_posts_to_revocation_endpoint(self, provider):
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post"
|
|
) as mock_post:
|
|
provider.revoke_session(refresh_token="rt_x")
|
|
mock_post.assert_called_once()
|
|
args, kwargs = mock_post.call_args
|
|
assert args[0] == f"{_ISSUER}/revoke"
|
|
assert kwargs["data"]["token"] == "rt_x"
|
|
|
|
def test_revoke_empty_token_noop(self, provider):
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post"
|
|
) as mock_post:
|
|
assert provider.revoke_session(refresh_token="") is None
|
|
mock_post.assert_not_called()
|
|
|
|
def test_revoke_swallows_errors(self, provider):
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post",
|
|
side_effect=httpx.RequestError("down"),
|
|
):
|
|
# Must not raise.
|
|
assert provider.revoke_session(refresh_token="rt_x") is None
|
|
|
|
def test_revoke_noop_when_no_revocation_endpoint(self, provider):
|
|
provider._discovery["revocation_endpoint"] = ""
|
|
with patch(
|
|
"plugins.dashboard_auth.self_hosted.httpx.post"
|
|
) as mock_post:
|
|
assert provider.revoke_session(refresh_token="rt_x") is None
|
|
mock_post.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin entry point: env + config.yaml precedence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPluginRegister:
|
|
@pytest.fixture(autouse=True)
|
|
def clear_env(self, monkeypatch):
|
|
for var in (
|
|
"HERMES_DASHBOARD_OIDC_ISSUER",
|
|
"HERMES_DASHBOARD_OIDC_CLIENT_ID",
|
|
"HERMES_DASHBOARD_OIDC_SCOPES",
|
|
):
|
|
monkeypatch.delenv(var, raising=False)
|
|
|
|
@pytest.fixture
|
|
def patch_config(self, monkeypatch):
|
|
def _set(oauth_block):
|
|
cfg = {}
|
|
if oauth_block is not None:
|
|
cfg = {"dashboard": {"oauth": oauth_block}}
|
|
monkeypatch.setattr("hermes_cli.config.load_config", lambda: cfg)
|
|
|
|
return _set
|
|
|
|
def test_skips_when_unconfigured(self, patch_config):
|
|
patch_config(None)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
assert "HERMES_DASHBOARD_OIDC_ISSUER" in oidc_plugin.LAST_SKIP_REASON
|
|
assert "self_hosted" in oidc_plugin.LAST_SKIP_REASON
|
|
|
|
def test_skips_when_only_issuer_set(self, patch_config, monkeypatch):
|
|
patch_config(None)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", _ISSUER)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
|
|
def test_registers_from_env(self, patch_config, monkeypatch):
|
|
patch_config(None)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", _ISSUER)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", _CLIENT_ID)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_called_once()
|
|
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert isinstance(registered, oidc_plugin.SelfHostedOIDCProvider)
|
|
assert registered._issuer == _ISSUER
|
|
assert registered._client_id == _CLIENT_ID
|
|
assert registered._scopes == "openid profile email"
|
|
assert oidc_plugin.LAST_SKIP_REASON == ""
|
|
|
|
def test_registers_from_config_yaml(self, patch_config):
|
|
patch_config(
|
|
{"self_hosted": {"issuer": _ISSUER, "client_id": _CLIENT_ID}}
|
|
)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_called_once()
|
|
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert registered._issuer == _ISSUER
|
|
assert registered._client_id == _CLIENT_ID
|
|
|
|
def test_env_overrides_config(self, patch_config, monkeypatch):
|
|
patch_config(
|
|
{
|
|
"self_hosted": {
|
|
"issuer": "https://config.example",
|
|
"client_id": "config-client",
|
|
}
|
|
}
|
|
)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", _ISSUER)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", _CLIENT_ID)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert registered._issuer == _ISSUER
|
|
assert registered._client_id == _CLIENT_ID
|
|
|
|
def test_empty_env_does_not_shadow_config(self, patch_config, monkeypatch):
|
|
patch_config(
|
|
{"self_hosted": {"issuer": _ISSUER, "client_id": _CLIENT_ID}}
|
|
)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", "")
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", "")
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_called_once()
|
|
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert registered._issuer == _ISSUER
|
|
|
|
def test_custom_scopes_from_config(self, patch_config):
|
|
patch_config(
|
|
{
|
|
"self_hosted": {
|
|
"issuer": _ISSUER,
|
|
"client_id": _CLIENT_ID,
|
|
"scopes": "openid email",
|
|
}
|
|
}
|
|
)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
|
|
assert registered._scopes == "openid email"
|
|
|
|
def test_config_load_failure_falls_through(self, monkeypatch):
|
|
def _broken():
|
|
raise OSError("unreadable")
|
|
|
|
monkeypatch.setattr("hermes_cli.config.load_config", _broken)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx) # must not raise
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
|
|
def test_non_dict_oauth_section_tolerated(self, monkeypatch):
|
|
monkeypatch.setattr(
|
|
"hermes_cli.config.load_config",
|
|
lambda: {"dashboard": {"oauth": "wrong type"}},
|
|
)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
|
|
def test_non_https_issuer_skips_with_reason(self, patch_config, monkeypatch):
|
|
patch_config(None)
|
|
monkeypatch.setenv(
|
|
"HERMES_DASHBOARD_OIDC_ISSUER", "http://insecure.example"
|
|
)
|
|
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", _CLIENT_ID)
|
|
ctx = MagicMock()
|
|
oidc_plugin.register(ctx)
|
|
ctx.register_dashboard_auth_provider.assert_not_called()
|
|
assert "construction failed" in oidc_plugin.LAST_SKIP_REASON
|