feat(dashboard-auth): add generic self-hosted OIDC provider

Adds a bundled dashboard-auth provider plugin that authenticates the
web dashboard against any conformant self-hosted OpenID Connect server
(Authentik, Keycloak, Zitadel, Authelia, Auth0, Okta, Google, …) using
standard OIDC — no per-IDP code.

It's a pure drop-in plugin implementing the DashboardAuthProvider
protocol; it touches no core auth/runtime/login paths. Mechanics:

- OIDC discovery from {issuer}/.well-known/openid-configuration
  (cached; issuer pinned; endpoints required HTTPS, loopback http
  allowed for local-dev IDPs)
- authorization-code + PKCE (S256), public client
- verifies the OIDC ID token (RS256/ES256) against the discovered
  jwks_uri with iss/aud pinned to the configured issuer/client_id, and
  maps standard claims (sub/email/name/preferred_username, groups→org)
  onto a Session
- standard refresh_token grant for silent re-auth; RFC 7009 revocation
  on logout when advertised

Verifies the ID token (not the access token) because OIDC guarantees the
ID token is a signed JWT carrying identity, while access-token format is
opaque to the client per spec — the only universally-correct choice
across self-hosted IDPs.

Config via dashboard.oauth.self_hosted.{issuer,client_id,scopes} in
config.yaml or HERMES_DASHBOARD_OIDC_{ISSUER,CLIENT_ID,SCOPES} env vars
(env-wins-config, empty-is-unset — same convention as the nous plugin).
Confidential clients (client_secret) left as a documented TODO seam.

Docs: adds a Self-hosted OIDC section to the web-dashboard guide,
including a copy-paste Keycloak worked example (realm import + docker
run + dashboard wiring + login walkthrough).

Tests: 65 cases covering construction, discovery (incl. issuer
mismatch + https enforcement), start_login/PKCE, complete_login, ID
token verification, refresh/revoke, and env/config precedence.
This commit is contained in:
Ben
2026-06-04 17:02:01 +10:00
committed by Teknium
parent cae6b5486f
commit f57ce341dc
4 changed files with 1752 additions and 0 deletions

View File

@ -0,0 +1,736 @@
"""SelfHostedOIDCProvider — generic self-hosted OpenID Connect dashboard auth.
A standards-compliant OpenID Connect Relying Party for the ``hermes dashboard``
OAuth gate. Unlike the bundled ``nous`` provider (which encodes Nous Portal's
bespoke contract — ``agent:{instance_id}`` client ids, a custom access-token
JWT, the ``x-nous-refresh-token`` header, an ``oauth_contract_version`` claim),
this provider speaks **plain OIDC** so it works against any conformant
self-hosted identity provider:
Authentik · Keycloak · Zitadel · Authelia · Auth0 · Okta · Google · …
It is a pure drop-in plugin: it implements the five
:class:`~hermes_cli.dashboard_auth.DashboardAuthProvider` methods and touches
nothing in core auth/runtime/login. The HTTP round trip, cookies, CSRF
``state`` check and ``redirect_uri`` reconstruction are all owned by
``hermes_cli/dashboard_auth/routes.py``; this provider only:
1. discovers the IDP's endpoints from ``{issuer}/.well-known/openid-configuration``,
2. builds the ``/authorize`` URL with PKCE (S256),
3. exchanges the authorization code for tokens at the discovered
``token_endpoint``,
4. verifies the **ID token** (RS256/ES256) against the discovered
``jwks_uri`` with ``iss`` / ``aud`` pinned to the configured issuer /
client id, and maps standard OIDC claims (``sub``, ``email``, ``name``)
onto a :class:`~hermes_cli.dashboard_auth.Session`.
Why the ID token (not the access token)? OIDC guarantees the ID token is a
signed JWT carrying identity claims — that is its entire purpose. The access
token's format is opaque to the client per the spec; many IDPs issue random
opaque strings the client cannot verify locally. Verifying the ID token is the
only choice that is universally correct across self-hosted IDPs. (The ``nous``
provider verifies its *access* token because Nous Portal mints a custom JWT
access token with the dashboard claims baked in — a non-OIDC shortcut.)
Public PKCE clients only. Confidential clients (with a ``client_secret``) are
not yet supported — see the ``# TODO(confidential-client)`` seam in
``complete_login`` / ``refresh_session``. Self-hosters configuring a CLI/SPA
client almost always register a public + PKCE client, which is the smaller,
simpler surface.
Configuration surfaces (env wins over config.yaml when set non-empty, so a
provisioned-but-not-populated secret can't shadow a valid config.yaml entry —
same precedence convention as the ``nous`` plugin)::
# config.yaml — canonical surface
dashboard:
oauth:
provider: self-hosted
self_hosted:
issuer: https://auth.example.com/application/o/hermes/ # required
client_id: hermes-dashboard # required
scopes: "openid profile email" # optional
# Environment overrides (Docker/Fly secret injection)
HERMES_DASHBOARD_OIDC_ISSUER
HERMES_DASHBOARD_OIDC_CLIENT_ID
HERMES_DASHBOARD_OIDC_SCOPES # optional; defaults to "openid profile email"
Skip reasons: when the plugin loads but can't register (missing issuer /
client_id), it writes a human-readable reason to the module-level
:data:`LAST_SKIP_REASON` so the gate's fail-closed branch can surface a useful
operator error instead of the bare "no providers registered".
"""
from __future__ import annotations
import base64
import hashlib
import logging
import os
import secrets
import threading
import time
import urllib.parse
from typing import Any, Dict, Optional
import httpx
from hermes_cli.dashboard_auth import (
DashboardAuthProvider,
InvalidCodeError,
LoginStart,
ProviderError,
RefreshExpiredError,
Session,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Defaults / constants
# ---------------------------------------------------------------------------
# OIDC core scopes. ``openid`` is mandatory (without it the IDP won't issue an
# ID token); ``profile``/``email`` populate the Session's display_name/email.
_DEFAULT_SCOPES = "openid profile email"
# Signing algorithms we accept on the ID token. RS256 is the OIDC default;
# ES256 is common on modern self-hosted IDPs (Zitadel, newer Keycloak realms).
# HS256 is deliberately excluded — it implies a shared secret we don't have in
# the public-client model and is a well-known JWT confusion footgun.
_ALLOWED_ID_TOKEN_ALGS = ("RS256", "ES256", "RS384", "RS512", "ES384", "ES512")
# httpx timeouts.
_DISCOVERY_TIMEOUT_SEC = 10.0
_TOKEN_ENDPOINT_TIMEOUT_SEC = 10.0
# OIDC discovery is low-frequency and the document is effectively static;
# cache it for the process lifetime with a soft TTL so a long-running
# dashboard picks up an IDP endpoint migration within the hour.
_DISCOVERY_CACHE_TTL_SEC = 3600
# JWKS cache (PyJWKClient handles its own caching; this mirrors the nous
# provider's 5-minute lifespan so key rotation is picked up promptly).
_JWKS_CACHE_SECONDS = 300
# ---------------------------------------------------------------------------
# Skip-reason channel (mirrors the nous plugin)
# ---------------------------------------------------------------------------
LAST_SKIP_REASON: str = ""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _b64url_no_pad(raw: bytes) -> str:
"""Base64url-encode without ``=`` padding (RFC 7636 §4)."""
return base64.urlsafe_b64encode(raw).rstrip(b"=").decode()
def _require_https_or_loopback(url: str, *, field: str) -> str:
"""Reject an endpoint URL that isn't HTTPS (loopback http is allowed).
OAuth credentials (codes, tokens) flow over these URLs. We require HTTPS
for everything except an explicit loopback host so a misconfigured issuer
can't ship the authorization code / refresh token in cleartext. Returns
the URL unchanged on success; raises :class:`ProviderError` otherwise.
"""
parsed = urllib.parse.urlparse(url)
if parsed.scheme == "https":
return url
if parsed.scheme == "http" and (parsed.hostname or "") in (
"localhost",
"127.0.0.1",
"::1",
):
return url
raise ProviderError(
f"OIDC {field} must be https:// (or http on localhost), got {url!r}"
)
# ---------------------------------------------------------------------------
# Provider
# ---------------------------------------------------------------------------
class SelfHostedOIDCProvider(DashboardAuthProvider):
"""Generic self-hosted OpenID Connect provider (authorization-code + PKCE)."""
name = "self-hosted"
display_name = "Self-Hosted OIDC"
def __init__(
self,
*,
issuer: str,
client_id: str,
scopes: str = _DEFAULT_SCOPES,
) -> None:
if not issuer:
raise ValueError("issuer is required")
if not client_id:
raise ValueError("client_id is required")
# ``issuer`` is the OIDC issuer identifier. Normalise the trailing
# slash for stable string compares (the ``iss`` claim must match the
# issuer the IDP advertises in discovery — we pin against the
# discovered value, not this normalised one, to be tolerant of a
# trailing-slash mismatch between config and the IDP).
self._issuer = issuer.rstrip("/")
_require_https_or_loopback(self._issuer, field="issuer")
self._client_id = client_id
self._scopes = scopes.strip() or _DEFAULT_SCOPES
# Discovery + JWKS are lazily resolved on first use so plugin
# registration never makes a network call (the IDP may be down at
# boot; the gate should still come up and fail per-request).
self._discovery: Dict[str, Any] | None = None
self._discovery_fetched_at: float = 0.0
self._discovery_lock = threading.Lock()
self._jwks_client: Any = None
# ---- public API (DashboardAuthProvider) -------------------------------
def start_login(self, *, redirect_uri: str) -> LoginStart:
self._validate_redirect_uri(redirect_uri)
disco = self._get_discovery()
code_verifier = _b64url_no_pad(secrets.token_bytes(64)) # ~86 chars
code_challenge = _b64url_no_pad(
hashlib.sha256(code_verifier.encode("ascii")).digest()
)
state = _b64url_no_pad(secrets.token_bytes(32))
params = {
"response_type": "code",
"client_id": self._client_id,
"redirect_uri": redirect_uri,
"scope": self._scopes,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
redirect_url = (
f"{disco['authorization_endpoint']}?{urllib.parse.urlencode(params)}"
)
# Same flat ``state=…;verifier=…`` cookie shape every provider uses;
# the auth-route layer prepends ``provider=`` and parses it back out.
cookie_payload = {
"hermes_session_pkce": f"state={state};verifier={code_verifier}",
}
return LoginStart(redirect_url=redirect_url, cookie_payload=cookie_payload)
def complete_login(
self,
*,
code: str,
state: str,
code_verifier: str,
redirect_uri: str,
) -> Session:
# ``state`` is verified by the auth-route layer before this call.
_ = state
disco = self._get_discovery()
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": self._client_id,
"code_verifier": code_verifier,
}
# TODO(confidential-client): when client_secret support lands, add it
# here (and switch to HTTP Basic auth if the IDP's
# token_endpoint_auth_methods_supported prefers client_secret_basic).
return self._exchange(
disco["token_endpoint"], data, bad_request_exc=InvalidCodeError
)
def refresh_session(self, *, refresh_token: str) -> Session:
if not refresh_token:
raise RefreshExpiredError("no refresh token present in session")
disco = self._get_discovery()
data = {
"grant_type": "refresh_token",
"client_id": self._client_id,
"refresh_token": refresh_token,
# Re-request the same scopes so the rotated ID token keeps the
# identity claims (some IDPs narrow scope on refresh otherwise).
"scope": self._scopes,
}
# TODO(confidential-client): add client_secret here when supported.
return self._exchange(
disco["token_endpoint"],
data,
bad_request_exc=RefreshExpiredError,
previous_refresh_token=refresh_token,
)
def verify_session(self, *, access_token: str) -> Optional[Session]:
# The session cookie stores the ID token in the access-token slot (see
# ``_session_from_tokens``) precisely so this per-request check can
# verify a real JWT. Returns None on expiry/invalidity (middleware
# then refreshes or logs out); raises ProviderError if the IDP/JWKS is
# unreachable.
try:
claims = self._verify_id_token(access_token)
except InvalidCodeError:
# Expired / invalid token — protocol says return None, not raise.
return None
except ProviderError:
raise
# No refresh token available on this path; "" is fine — the middleware
# re-reads the refresh-token cookie separately for refresh_session.
return self._session_from_tokens(
id_token=access_token, refresh_token="", claims=claims
)
def revoke_session(self, *, refresh_token: str) -> None:
# Best-effort RFC 7009 revocation if the IDP advertised an endpoint.
# Must never raise — logout is client-side cookie clearing regardless.
if not refresh_token:
return None
try:
disco = self._get_discovery()
except ProviderError:
return None
endpoint = str(disco.get("revocation_endpoint") or "").strip()
if not endpoint:
return None
try:
httpx.post(
endpoint,
data={
"token": refresh_token,
"token_type_hint": "refresh_token",
"client_id": self._client_id,
},
headers={"Accept": "application/json"},
timeout=_TOKEN_ENDPOINT_TIMEOUT_SEC,
)
except Exception as exc: # noqa: BLE001 — best-effort
logger.debug("self-hosted OIDC: revoke failed (ignored): %s", exc)
return None
# ---- internals: token exchange ----------------------------------------
def _exchange(
self,
token_endpoint: str,
data: Dict[str, str],
*,
bad_request_exc: type[Exception],
previous_refresh_token: str = "",
) -> Session:
"""POST the token endpoint and turn the response into a Session.
Shared by ``complete_login`` (auth-code grant) and ``refresh_session``
(refresh grant). ``bad_request_exc`` is raised on a 400 —
``InvalidCodeError`` for the auth-code path, ``RefreshExpiredError``
for the refresh path — preserving the middleware's distinct handling.
"""
try:
response = httpx.post(
token_endpoint,
data=data,
headers={"Accept": "application/json"},
timeout=_TOKEN_ENDPOINT_TIMEOUT_SEC,
)
except httpx.RequestError as exc:
raise ProviderError(
f"OIDC token endpoint unreachable: {exc}"
) from exc
if response.status_code == 400:
body = self._parse_json_body(response)
error_code = body.get("error", "invalid_request")
raise bad_request_exc(
f"IDP rejected token request: {error_code}"
)
if response.status_code != 200:
raise ProviderError(
f"OIDC token endpoint returned {response.status_code}: "
f"{response.text[:200]!r}"
)
payload = self._parse_json_body(response)
id_token = payload.get("id_token")
if not id_token or not isinstance(id_token, str):
raise ProviderError(
"OIDC token response missing id_token — ensure the 'openid' "
"scope is configured and the client is allowed to receive an "
"ID token."
)
token_type = str(payload.get("token_type", "")).lower()
if token_type and token_type != "bearer":
raise ProviderError(f"unexpected token_type={token_type!r}")
claims = self._verify_id_token(id_token)
# Refresh-token rotation: prefer a freshly-issued one, else keep the
# previous (some IDPs don't rotate). Empty string if neither — the
# session then behaves as ID-token-only until expiry.
refresh_token = payload.get("refresh_token")
if not isinstance(refresh_token, str) or not refresh_token:
refresh_token = previous_refresh_token or ""
return self._session_from_tokens(
id_token=id_token, refresh_token=refresh_token, claims=claims
)
# ---- internals: discovery ---------------------------------------------
def _get_discovery(self) -> Dict[str, Any]:
"""Return the cached OIDC discovery document, fetching if stale."""
now = time.time()
if (
self._discovery is not None
and (now - self._discovery_fetched_at) < _DISCOVERY_CACHE_TTL_SEC
):
return self._discovery
with self._discovery_lock:
now = time.time()
if (
self._discovery is not None
and (now - self._discovery_fetched_at) < _DISCOVERY_CACHE_TTL_SEC
):
return self._discovery
disco = self._fetch_discovery()
self._discovery = disco
self._discovery_fetched_at = now
# New issuer/keys → drop the JWKS client so it re-binds to the
# freshly-discovered jwks_uri.
self._jwks_client = None
return disco
def _discovery_url(self) -> str:
# RFC 8414 / OIDC Discovery: ``{issuer}/.well-known/openid-configuration``.
return f"{self._issuer}/.well-known/openid-configuration"
def _fetch_discovery(self) -> Dict[str, Any]:
url = self._discovery_url()
try:
response = httpx.get(
url,
headers={"Accept": "application/json"},
timeout=_DISCOVERY_TIMEOUT_SEC,
)
except httpx.RequestError as exc:
raise ProviderError(f"OIDC discovery unreachable: {exc}") from exc
if response.status_code != 200:
raise ProviderError(
f"OIDC discovery returned {response.status_code} for {url!r}"
)
payload = self._parse_json_body(response)
if not payload:
raise ProviderError("OIDC discovery returned a non-JSON body")
authorization_endpoint = str(
payload.get("authorization_endpoint", "") or ""
).strip()
token_endpoint = str(payload.get("token_endpoint", "") or "").strip()
jwks_uri = str(payload.get("jwks_uri", "") or "").strip()
if not authorization_endpoint or not token_endpoint or not jwks_uri:
raise ProviderError(
"OIDC discovery missing one of authorization_endpoint / "
"token_endpoint / jwks_uri"
)
# Pin the discovered issuer: a mismatch between the configured issuer
# and the ``issuer`` the IDP advertises means the discovery document
# was served from the wrong place (proxy/MITM/misconfig). We tolerate
# only a trailing-slash difference.
advertised_issuer = str(payload.get("issuer", "") or "").strip()
if advertised_issuer and advertised_issuer.rstrip("/") != self._issuer:
raise ProviderError(
f"OIDC discovery issuer mismatch: document advertises "
f"{advertised_issuer!r} but configured issuer is "
f"{self._issuer!r}"
)
_require_https_or_loopback(
authorization_endpoint, field="authorization_endpoint"
)
_require_https_or_loopback(token_endpoint, field="token_endpoint")
_require_https_or_loopback(jwks_uri, field="jwks_uri")
revocation_endpoint = str(
payload.get("revocation_endpoint", "") or ""
).strip()
return {
"issuer": advertised_issuer or self._issuer,
"authorization_endpoint": authorization_endpoint,
"token_endpoint": token_endpoint,
"jwks_uri": jwks_uri,
"revocation_endpoint": revocation_endpoint,
}
# ---- internals: JWT verification --------------------------------------
def _get_jwks_client(self) -> Any:
if self._jwks_client is None:
from jwt import PyJWKClient # lazy import
disco = self._get_discovery()
self._jwks_client = PyJWKClient(
disco["jwks_uri"],
cache_keys=True,
lifespan=_JWKS_CACHE_SECONDS,
)
return self._jwks_client
def _verify_id_token(self, id_token: str) -> Dict[str, Any]:
import jwt # lazy import — keeps startup fast for the ungated path
disco = self._get_discovery()
try:
signing_key = self._get_jwks_client().get_signing_key_from_jwt(
id_token
)
except jwt.PyJWKClientError as exc:
raise ProviderError(f"JWKS lookup failed: {exc}") from exc
except Exception as exc: # pragma: no cover - defensive
raise ProviderError(f"JWKS lookup failed: {exc!r}") from exc
try:
claims = jwt.decode(
id_token,
signing_key.key,
algorithms=list(_ALLOWED_ID_TOKEN_ALGS),
audience=self._client_id,
issuer=disco["issuer"],
options={"require": ["exp", "iat", "aud", "iss", "sub"]},
)
except jwt.ExpiredSignatureError as exc:
# verify_session() catches this and returns None per protocol.
raise InvalidCodeError(f"ID token expired: {exc}") from exc
except jwt.InvalidTokenError as exc:
# Surface the actual iss/aud the token carried so operators can
# debug config drift between the configured issuer/client_id and
# what the IDP emits. Decoding-without-verification is safe here:
# we already failed verification and never trust these values.
details = ""
try:
unverified = jwt.decode(
id_token,
options={"verify_signature": False, "verify_exp": False},
)
details = (
f" [token iss={unverified.get('iss')!r} "
f"aud={unverified.get('aud')!r}; "
f"expected iss={disco['issuer']!r} "
f"aud={self._client_id!r}]"
)
except Exception:
pass
raise ProviderError(
f"ID token verification failed: {exc}{details}"
) from exc
return claims
# ---- internals: mapping + misc ----------------------------------------
def _session_from_tokens(
self,
*,
id_token: str,
refresh_token: str,
claims: Dict[str, Any],
) -> Session:
"""Map verified OIDC claims onto a Session.
The verified ID token is stored in ``Session.access_token`` so the
per-request ``verify_session`` re-verifies a real JWT. The opaque
OAuth access token is intentionally NOT stored — Hermes does not call
any resource API with it; the dashboard only needs identity.
"""
user_id = str(claims.get("sub", ""))
if not user_id:
raise ProviderError("ID token missing 'sub' (user_id) claim")
email = str(claims.get("email", "") or "")
# Standard OIDC display claims, in preference order.
display_name = str(
claims.get("name")
or claims.get("preferred_username")
or claims.get("nickname")
or email
or ""
)
# Org/tenant is non-standard; accept the common spellings. Groups, if
# present as a list, are joined so multi-tenant IDPs surface *something*
# rather than dropping the info — org_id is a free-form string.
org_id = claims.get("org_id") or claims.get("organization") or ""
if not org_id:
groups = claims.get("groups")
if isinstance(groups, list) and groups:
org_id = ",".join(str(g) for g in groups)
org_id = str(org_id or "")
return Session(
user_id=user_id,
email=email,
display_name=display_name,
org_id=org_id,
provider=self.name,
expires_at=int(claims["exp"]),
access_token=id_token,
refresh_token=refresh_token,
)
def _validate_redirect_uri(self, redirect_uri: str) -> None:
"""Fast-fail obviously-broken redirect_uris before bouncing to the IDP.
The IDP's own allowlist is authoritative; this just catches the common
operator-error case with a clear message. Mirrors the nous provider.
"""
parsed = urllib.parse.urlparse(redirect_uri)
if parsed.scheme not in ("https", "http"):
raise ProviderError(
f"redirect_uri must be http(s), got {redirect_uri!r}"
)
if parsed.scheme == "http" and parsed.hostname not in (
"localhost",
"127.0.0.1",
):
raise ProviderError(
"redirect_uri may only use http:// for localhost/127.0.0.1, "
f"got {redirect_uri!r}"
)
if not parsed.path or not parsed.path.endswith("/auth/callback"):
raise ProviderError(
"redirect_uri path must end with '/auth/callback', "
f"got {redirect_uri!r}"
)
def _parse_json_body(self, response: httpx.Response) -> Dict[str, Any]:
ctype = response.headers.get("content-type", "")
if not ctype.startswith("application/json"):
return {}
try:
body = response.json()
except ValueError:
return {}
return body if isinstance(body, dict) else {}
# ---------------------------------------------------------------------------
# Plugin entry point
# ---------------------------------------------------------------------------
def _load_config_oauth_section() -> dict:
"""Return the ``dashboard.oauth`` block from config.yaml, or ``{}``.
Robust to load_config() raising, the ``dashboard`` key being absent or
non-dict, and ``oauth`` being present but not a dict — each falls through
to ``{}`` so callers can rely on ``.get(...)``.
"""
try:
from hermes_cli.config import cfg_get, load_config
cfg = load_config()
except Exception as exc: # noqa: BLE001 — broad catch is intentional
logger.debug(
"dashboard-auth-self-hosted: load_config() raised %s; "
"falling back to env-only configuration",
exc,
)
return {}
section = cfg_get(cfg, "dashboard", "oauth", default=None)
return section if isinstance(section, dict) else {}
def _oidc_subsection(oauth_section: dict) -> dict:
"""Return the ``dashboard.oauth.self_hosted`` sub-block, or ``{}``."""
sub = oauth_section.get("self_hosted")
return sub if isinstance(sub, dict) else {}
def _resolve_setting(env_var: str, cfg_value: Any) -> str:
"""env-wins-config with empty-is-unset precedence.
1. ``env_var`` when non-empty after strip (an empty provisioned secret
must not shadow a valid config.yaml entry).
2. ``cfg_value`` from config.yaml.
3. Empty string.
"""
env = os.environ.get(env_var, "").strip()
if env:
return env
return str(cfg_value or "").strip()
def register(ctx) -> None:
"""Plugin entry — called by the plugin loader at startup.
Registers :class:`SelfHostedOIDCProvider` only when both an issuer and a
client_id are configured (via ``HERMES_DASHBOARD_OIDC_*`` env vars or the
``dashboard.oauth.self_hosted`` block in config.yaml). Operator-owned
loopback / ``--insecure`` dashboards leave these unset, so the plugin is a
no-op for them.
On skip, writes a reason to :data:`LAST_SKIP_REASON` that names BOTH
configuration surfaces so operators don't guess wrong about which to set.
"""
global LAST_SKIP_REASON
LAST_SKIP_REASON = ""
oauth_section = _load_config_oauth_section()
oidc_cfg = _oidc_subsection(oauth_section)
issuer = _resolve_setting(
"HERMES_DASHBOARD_OIDC_ISSUER", oidc_cfg.get("issuer")
)
client_id = _resolve_setting(
"HERMES_DASHBOARD_OIDC_CLIENT_ID", oidc_cfg.get("client_id")
)
scopes = (
_resolve_setting("HERMES_DASHBOARD_OIDC_SCOPES", oidc_cfg.get("scopes"))
or _DEFAULT_SCOPES
)
if not issuer or not client_id:
LAST_SKIP_REASON = (
"Self-hosted OIDC dashboard auth is not configured. Set both an "
"issuer and a client_id — either as env vars "
"(HERMES_DASHBOARD_OIDC_ISSUER + HERMES_DASHBOARD_OIDC_CLIENT_ID) "
"or under dashboard.oauth.self_hosted.{issuer,client_id} in "
"config.yaml — or pass --insecure to skip the OAuth gate "
"entirely. (issuer set: %s; client_id set: %s)"
% (bool(issuer), bool(client_id))
)
logger.debug("dashboard-auth-self-hosted: %s", LAST_SKIP_REASON)
return
try:
provider = SelfHostedOIDCProvider(
issuer=issuer, client_id=client_id, scopes=scopes
)
except (ValueError, ProviderError) as exc:
LAST_SKIP_REASON = (
f"SelfHostedOIDCProvider construction failed: {exc}"
)
logger.warning("dashboard-auth-self-hosted: %s", LAST_SKIP_REASON)
return
ctx.register_dashboard_auth_provider(provider)
logger.info(
"dashboard-auth-self-hosted: registered provider "
"(issuer=%s, client_id=%s, scopes=%r)",
issuer,
client_id,
scopes,
)

View File

@ -0,0 +1,8 @@
name: self-hosted
version: 1.0.0
description: "Dashboard auth provider — generic self-hosted OpenID Connect (authorization-code + PKCE, public client). Works against any conformant OIDC identity provider (Authentik, Keycloak, Zitadel, Authelia, Auth0, Okta, Google, …) via OIDC discovery. Auto-activates when an issuer + client_id are configured, either under dashboard.oauth.self_hosted.{issuer,client_id} in config.yaml (canonical surface) or via the HERMES_DASHBOARD_OIDC_ISSUER + HERMES_DASHBOARD_OIDC_CLIENT_ID env vars (operator override / secret injection). Scopes default to 'openid profile email'. Verifies the OIDC ID token (RS256/ES256) against the discovered jwks_uri."
author: NousResearch
kind: backend
requires_env:
- HERMES_DASHBOARD_OIDC_ISSUER
- HERMES_DASHBOARD_OIDC_CLIENT_ID

View File

@ -0,0 +1,882 @@
"""Tests for the bundled self-hosted OIDC dashboard-auth plugin.
Covers, by analogy with ``test_nous_provider.py``:
1. Plugin entry-point registration gating (env + config.yaml precedence).
2. ``start_login`` shape (PKCE/state, authorize URL parameters, OIDC discovery).
3. ``complete_login`` httpx-mocked happy path + error mapping (ID-token grant).
4. ``verify_session`` ID-token verification — RSA keypair, audience/issuer
pinning, standard OIDC claim mapping (sub/email/name/groups).
5. ``refresh_session`` rotation + error mapping, ``revoke_session`` (RFC 7009).
6. OIDC discovery: endpoint extraction, issuer pinning, https enforcement.
All HTTP is mocked: nothing here talks to a real IDP.
"""
from __future__ import annotations
import base64
import hashlib
import json
import time
import urllib.parse
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import httpx
import jwt
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import plugins.dashboard_auth.self_hosted as oidc_plugin
from hermes_cli.dashboard_auth import (
InvalidCodeError,
LoginStart,
ProviderError,
RefreshExpiredError,
Session,
assert_protocol_compliance,
)
_ISSUER = "https://auth.example.com/application/o/hermes"
_CLIENT_ID = "hermes-dashboard"
_DISCOVERY_DOC = {
"issuer": _ISSUER,
"authorization_endpoint": f"{_ISSUER}/authorize",
"token_endpoint": f"{_ISSUER}/token",
"jwks_uri": f"{_ISSUER}/jwks",
"revocation_endpoint": f"{_ISSUER}/revoke",
}
# ---------------------------------------------------------------------------
# RSA keypair fixture (module-scope — keygen is slow)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def rsa_keypair() -> Dict[str, Any]:
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
public_numbers = key.public_key().public_numbers()
def _b64url_uint(n: int) -> str:
length = (n.bit_length() + 7) // 8
return (
base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
)
jwk = {
"kty": "RSA",
"use": "sig",
"alg": "RS256",
"kid": "test-key-1",
"n": _b64url_uint(public_numbers.n),
"e": _b64url_uint(public_numbers.e),
}
return {"private_pem": private_pem, "jwk": jwk, "kid": jwk["kid"]}
# ---------------------------------------------------------------------------
# Token-mint helper — standard OIDC ID-token claims
# ---------------------------------------------------------------------------
def _mint_id_token(
rsa_keypair: Dict[str, Any],
*,
iss: str = _ISSUER,
aud: str = _CLIENT_ID,
sub: str = "usr_abc",
email: str | None = "alice@example.com",
name: str | None = "Alice Example",
groups: Any = None,
org_id: str | None = None,
ttl_seconds: int = 900,
extra_claims: Dict[str, Any] | None = None,
) -> str:
now = int(time.time())
claims: Dict[str, Any] = {
"iss": iss,
"aud": aud,
"sub": sub,
"iat": now,
"exp": now + ttl_seconds,
}
if email is not None:
claims["email"] = email
if name is not None:
claims["name"] = name
if groups is not None:
claims["groups"] = groups
if org_id is not None:
claims["org_id"] = org_id
if extra_claims:
claims.update(extra_claims)
return jwt.encode(
claims,
rsa_keypair["private_pem"],
algorithm="RS256",
headers={"kid": rsa_keypair["kid"]},
)
def _make_provider(rsa_keypair, *, scopes: str | None = None):
"""Construct a provider with discovery + JWKS stubbed (no network)."""
kwargs: Dict[str, Any] = {"issuer": _ISSUER, "client_id": _CLIENT_ID}
if scopes is not None:
kwargs["scopes"] = scopes
p = oidc_plugin.SelfHostedOIDCProvider(**kwargs)
# Pre-seed discovery so nothing hits the network.
p._discovery = dict(_DISCOVERY_DOC)
p._discovery_fetched_at = time.time()
# Patch the JWKS client to return our fixture key.
fake_key = MagicMock()
fake_key.key = serialization.load_pem_private_key(
rsa_keypair["private_pem"].encode(), password=None
).public_key()
fake_client = MagicMock()
fake_client.get_signing_key_from_jwt.return_value = fake_key
p._jwks_client = fake_client
return p
def _mock_post(status_code: int, body: Any, *, ctype: str = "application/json"):
resp = MagicMock(spec=httpx.Response)
resp.status_code = status_code
if isinstance(body, dict):
resp.text = json.dumps(body)
resp.json = MagicMock(return_value=body)
else:
resp.text = body
resp.json = MagicMock(side_effect=ValueError("not json"))
resp.headers = {"content-type": ctype}
return resp
# ---------------------------------------------------------------------------
# Construction
# ---------------------------------------------------------------------------
class TestConstruction:
def test_protocol_compliance(self):
assert_protocol_compliance(oidc_plugin.SelfHostedOIDCProvider)
def test_name_and_display(self):
p = oidc_plugin.SelfHostedOIDCProvider(issuer=_ISSUER, client_id=_CLIENT_ID)
assert p.name == "self-hosted"
assert p.display_name == "Self-Hosted OIDC"
def test_strips_trailing_slash_from_issuer(self):
p = oidc_plugin.SelfHostedOIDCProvider(
issuer=_ISSUER + "/", client_id=_CLIENT_ID
)
assert p._issuer == _ISSUER
def test_requires_issuer(self):
with pytest.raises(ValueError, match="issuer"):
oidc_plugin.SelfHostedOIDCProvider(issuer="", client_id=_CLIENT_ID)
def test_requires_client_id(self):
with pytest.raises(ValueError, match="client_id"):
oidc_plugin.SelfHostedOIDCProvider(issuer=_ISSUER, client_id="")
def test_rejects_non_https_issuer(self):
with pytest.raises(ProviderError, match="https"):
oidc_plugin.SelfHostedOIDCProvider(
issuer="http://auth.example.com", client_id=_CLIENT_ID
)
def test_allows_http_localhost_issuer(self):
# Local dev against a loopback IDP is allowed.
p = oidc_plugin.SelfHostedOIDCProvider(
issuer="http://localhost:9000", client_id=_CLIENT_ID
)
assert p._issuer == "http://localhost:9000"
def test_default_scopes(self):
p = oidc_plugin.SelfHostedOIDCProvider(issuer=_ISSUER, client_id=_CLIENT_ID)
assert p._scopes == "openid profile email"
def test_empty_scopes_falls_back_to_default(self):
p = oidc_plugin.SelfHostedOIDCProvider(
issuer=_ISSUER, client_id=_CLIENT_ID, scopes=" "
)
assert p._scopes == "openid profile email"
# ---------------------------------------------------------------------------
# OIDC discovery
# ---------------------------------------------------------------------------
class TestDiscovery:
def _provider(self):
return oidc_plugin.SelfHostedOIDCProvider(
issuer=_ISSUER, client_id=_CLIENT_ID
)
def _mock_get(self, status_code, body, *, ctype="application/json"):
resp = MagicMock(spec=httpx.Response)
resp.status_code = status_code
resp.json = MagicMock(return_value=body)
resp.text = json.dumps(body) if isinstance(body, dict) else str(body)
resp.headers = {"content-type": ctype}
return resp
def test_discovery_url(self):
p = self._provider()
assert p._discovery_url() == (
f"{_ISSUER}/.well-known/openid-configuration"
)
def test_fetches_and_caches(self):
p = self._provider()
mock_resp = self._mock_get(200, dict(_DISCOVERY_DOC))
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
) as mock_get:
disco1 = p._get_discovery()
disco2 = p._get_discovery()
assert disco1["token_endpoint"] == f"{_ISSUER}/token"
assert disco1["authorization_endpoint"] == f"{_ISSUER}/authorize"
assert disco1["jwks_uri"] == f"{_ISSUER}/jwks"
assert disco1["revocation_endpoint"] == f"{_ISSUER}/revoke"
# Cached — only one network call.
assert mock_get.call_count == 1
assert disco2 is disco1
def test_discovery_404_raises(self):
p = self._provider()
mock_resp = self._mock_get(404, {})
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
):
with pytest.raises(ProviderError, match="404"):
p._get_discovery()
def test_discovery_unreachable_raises(self):
p = self._provider()
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get",
side_effect=httpx.ConnectError("no route"),
):
with pytest.raises(ProviderError, match="unreachable"):
p._get_discovery()
def test_discovery_missing_endpoint_raises(self):
p = self._provider()
doc = dict(_DISCOVERY_DOC)
del doc["token_endpoint"]
mock_resp = self._mock_get(200, doc)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
):
with pytest.raises(ProviderError, match="token_endpoint"):
p._get_discovery()
def test_discovery_issuer_mismatch_raises(self):
p = self._provider()
doc = dict(_DISCOVERY_DOC)
doc["issuer"] = "https://evil.example"
mock_resp = self._mock_get(200, doc)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
):
with pytest.raises(ProviderError, match="issuer mismatch"):
p._get_discovery()
def test_discovery_issuer_trailing_slash_tolerated(self):
p = self._provider()
doc = dict(_DISCOVERY_DOC)
doc["issuer"] = _ISSUER + "/" # only a trailing-slash difference
mock_resp = self._mock_get(200, doc)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
):
disco = p._get_discovery()
assert disco["token_endpoint"] == f"{_ISSUER}/token"
def test_discovery_rejects_non_https_endpoint(self):
p = self._provider()
doc = dict(_DISCOVERY_DOC)
doc["token_endpoint"] = "http://auth.example.com/token" # not loopback
mock_resp = self._mock_get(200, doc)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.get", return_value=mock_resp
):
with pytest.raises(ProviderError, match="https"):
p._get_discovery()
# ---------------------------------------------------------------------------
# start_login
# ---------------------------------------------------------------------------
class TestStartLogin:
@pytest.fixture
def provider(self, rsa_keypair):
return _make_provider(rsa_keypair)
def test_returns_login_start(self, provider):
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
assert isinstance(result, LoginStart)
def test_redirect_url_targets_authorize_endpoint(self, provider):
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
assert result.redirect_url.startswith(f"{_ISSUER}/authorize?")
def test_authorize_url_has_required_params(self, provider):
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
parsed = urllib.parse.urlparse(result.redirect_url)
params = dict(urllib.parse.parse_qsl(parsed.query))
assert params["response_type"] == "code"
assert params["client_id"] == _CLIENT_ID
assert params["redirect_uri"] == "https://hermes.example/auth/callback"
assert params["scope"] == "openid profile email"
assert params["code_challenge_method"] == "S256"
assert "state" in params
assert "code_challenge" in params
def test_custom_scopes_used(self, rsa_keypair):
provider = _make_provider(rsa_keypair, scopes="openid email groups")
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
parsed = urllib.parse.urlparse(result.redirect_url)
params = dict(urllib.parse.parse_qsl(parsed.query))
assert params["scope"] == "openid email groups"
def test_code_verifier_length(self, provider):
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
pkce = result.cookie_payload["hermes_session_pkce"]
parts = dict(seg.split("=", 1) for seg in pkce.split(";") if "=" in seg)
assert 43 <= len(parts["verifier"]) <= 128 # RFC 7636 §4.1
def test_state_in_cookie_matches_url(self, provider):
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
parsed = urllib.parse.urlparse(result.redirect_url)
params = dict(urllib.parse.parse_qsl(parsed.query))
pkce = result.cookie_payload["hermes_session_pkce"]
parts = dict(seg.split("=", 1) for seg in pkce.split(";") if "=" in seg)
assert parts["state"] == params["state"]
def test_code_challenge_is_s256_of_verifier(self, provider):
result = provider.start_login(
redirect_uri="https://hermes.example/auth/callback"
)
parsed = urllib.parse.urlparse(result.redirect_url)
params = dict(urllib.parse.parse_qsl(parsed.query))
pkce = result.cookie_payload["hermes_session_pkce"]
parts = dict(seg.split("=", 1) for seg in pkce.split(";") if "=" in seg)
expected = (
base64.urlsafe_b64encode(
hashlib.sha256(parts["verifier"].encode("ascii")).digest()
)
.rstrip(b"=")
.decode()
)
assert params["code_challenge"] == expected
def test_two_calls_differ(self, provider):
a = provider.start_login(redirect_uri="https://hermes.example/auth/callback")
b = provider.start_login(redirect_uri="https://hermes.example/auth/callback")
assert (
a.cookie_payload["hermes_session_pkce"]
!= b.cookie_payload["hermes_session_pkce"]
)
def test_rejects_wrong_callback_path(self, provider):
with pytest.raises(ProviderError, match="/auth/callback"):
provider.start_login(redirect_uri="https://x.example/oauth/cb")
def test_allows_http_localhost_redirect(self, provider):
provider.start_login(redirect_uri="http://localhost:8080/auth/callback")
provider.start_login(redirect_uri="http://127.0.0.1:8080/auth/callback")
# ---------------------------------------------------------------------------
# complete_login
# ---------------------------------------------------------------------------
class TestCompleteLogin:
@pytest.fixture
def provider(self, rsa_keypair):
return _make_provider(rsa_keypair)
def test_happy_path_returns_session(self, provider, rsa_keypair):
id_token = _mint_id_token(rsa_keypair)
mock_resp = _mock_post(
200,
{
"access_token": "opaque-at",
"id_token": id_token,
"token_type": "Bearer",
"refresh_token": "rt_initial",
},
)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
session = provider.complete_login(
code="abc",
state="s",
code_verifier="vfy",
redirect_uri="https://hermes.example/auth/callback",
)
assert isinstance(session, Session)
assert session.user_id == "usr_abc"
assert session.provider == "self-hosted"
assert session.email == "alice@example.com"
assert session.display_name == "Alice Example"
# The verified ID token is stored in the access_token slot.
assert session.access_token == id_token
assert session.refresh_token == "rt_initial"
def test_tolerates_missing_refresh_token(self, provider, rsa_keypair):
id_token = _mint_id_token(rsa_keypair)
mock_resp = _mock_post(
200, {"id_token": id_token, "token_type": "Bearer"}
)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
session = provider.complete_login(
code="abc",
state="s",
code_verifier="vfy",
redirect_uri="https://hermes.example/auth/callback",
)
assert session.refresh_token == ""
def test_missing_id_token_raises(self, provider):
mock_resp = _mock_post(
200, {"access_token": "opaque", "token_type": "Bearer"}
)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
with pytest.raises(ProviderError, match="id_token"):
provider.complete_login(
code="x",
state="s",
code_verifier="v",
redirect_uri="https://hermes.example/auth/callback",
)
def test_400_raises_invalid_code(self, provider):
mock_resp = _mock_post(400, {"error": "invalid_grant"})
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
with pytest.raises(InvalidCodeError, match="invalid_grant"):
provider.complete_login(
code="bad",
state="s",
code_verifier="v",
redirect_uri="https://hermes.example/auth/callback",
)
def test_500_raises_provider_error(self, provider):
mock_resp = _mock_post(500, "boom", ctype="text/plain")
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
with pytest.raises(ProviderError, match="500"):
provider.complete_login(
code="x",
state="s",
code_verifier="v",
redirect_uri="https://hermes.example/auth/callback",
)
def test_network_error_raises_provider_error(self, provider):
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post",
side_effect=httpx.ConnectError("conn refused"),
):
with pytest.raises(ProviderError, match="unreachable"):
provider.complete_login(
code="x",
state="s",
code_verifier="v",
redirect_uri="https://hermes.example/auth/callback",
)
def test_unexpected_token_type_raises(self, provider, rsa_keypair):
id_token = _mint_id_token(rsa_keypair)
mock_resp = _mock_post(
200, {"id_token": id_token, "token_type": "DPoP"}
)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
with pytest.raises(ProviderError, match="token_type"):
provider.complete_login(
code="x",
state="s",
code_verifier="v",
redirect_uri="https://hermes.example/auth/callback",
)
def test_posts_authorization_code_grant(self, provider, rsa_keypair):
id_token = _mint_id_token(rsa_keypair)
mock_resp = _mock_post(200, {"id_token": id_token, "token_type": "Bearer"})
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
) as mock_post:
provider.complete_login(
code="the-code",
state="s",
code_verifier="the-verifier",
redirect_uri="https://hermes.example/auth/callback",
)
_, kwargs = mock_post.call_args
assert kwargs["data"]["grant_type"] == "authorization_code"
assert kwargs["data"]["code"] == "the-code"
assert kwargs["data"]["code_verifier"] == "the-verifier"
assert kwargs["data"]["client_id"] == _CLIENT_ID
# ---------------------------------------------------------------------------
# verify_session
# ---------------------------------------------------------------------------
class TestVerifySession:
@pytest.fixture
def provider(self, rsa_keypair):
return _make_provider(rsa_keypair)
def test_happy_path(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair)
session = provider.verify_session(access_token=token)
assert session is not None
assert session.user_id == "usr_abc"
assert session.email == "alice@example.com"
assert session.display_name == "Alice Example"
def test_expired_returns_none(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, ttl_seconds=-1)
assert provider.verify_session(access_token=token) is None
def test_wrong_audience_raises(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, aud="some-other-client")
with pytest.raises(ProviderError, match="verification failed"):
provider.verify_session(access_token=token)
def test_wrong_issuer_raises(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, iss="https://evil.example")
with pytest.raises(ProviderError, match="verification failed"):
provider.verify_session(access_token=token)
def test_failure_message_surfaces_claims(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, iss="https://evil.example")
with pytest.raises(ProviderError) as excinfo:
provider.verify_session(access_token=token)
msg = str(excinfo.value)
assert "'https://evil.example'" in msg
assert f"'{_ISSUER}'" in msg
def test_missing_sub_raises(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, sub="")
with pytest.raises(ProviderError, match="sub"):
provider.verify_session(access_token=token)
def test_display_name_falls_back_to_preferred_username(
self, provider, rsa_keypair
):
token = _mint_id_token(
rsa_keypair,
name=None,
email=None,
extra_claims={"preferred_username": "alice42"},
)
session = provider.verify_session(access_token=token)
assert session is not None
assert session.display_name == "alice42"
def test_org_id_from_org_claim(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, org_id="acme-corp")
session = provider.verify_session(access_token=token)
assert session is not None
assert session.org_id == "acme-corp"
def test_org_id_from_groups_when_no_org_claim(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair, groups=["admins", "users"])
session = provider.verify_session(access_token=token)
assert session is not None
assert session.org_id == "admins,users"
def test_org_id_empty_when_neither_present(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair)
session = provider.verify_session(access_token=token)
assert session is not None
assert session.org_id == ""
def test_jwks_unreachable_raises(self, provider, rsa_keypair):
token = _mint_id_token(rsa_keypair)
bad_client = MagicMock()
bad_client.get_signing_key_from_jwt.side_effect = jwt.PyJWKClientError(
"fetch failed"
)
provider._jwks_client = bad_client
with pytest.raises(ProviderError, match="JWKS"):
provider.verify_session(access_token=token)
# ---------------------------------------------------------------------------
# refresh_session + revoke_session
# ---------------------------------------------------------------------------
class TestRefreshAndRevoke:
@pytest.fixture
def provider(self, rsa_keypair):
return _make_provider(rsa_keypair)
def test_refresh_happy_path_rotates(self, provider, rsa_keypair):
id_token = _mint_id_token(rsa_keypair)
mock_resp = _mock_post(
200,
{
"id_token": id_token,
"token_type": "Bearer",
"refresh_token": "rt_rotated",
},
)
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
) as mock_post:
session = provider.refresh_session(refresh_token="rt_old")
assert isinstance(session, Session)
assert session.access_token == id_token
assert session.refresh_token == "rt_rotated"
assert session.provider == "self-hosted"
_, kwargs = mock_post.call_args
assert kwargs["data"]["grant_type"] == "refresh_token"
assert kwargs["data"]["refresh_token"] == "rt_old"
assert kwargs["data"]["client_id"] == _CLIENT_ID
def test_refresh_keeps_previous_rt_when_idp_omits(self, provider, rsa_keypair):
# Some IDPs don't rotate; keep the caller's existing RT alive.
id_token = _mint_id_token(rsa_keypair)
mock_resp = _mock_post(200, {"id_token": id_token, "token_type": "Bearer"})
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
session = provider.refresh_session(refresh_token="rt_kept")
assert session.refresh_token == "rt_kept"
def test_refresh_400_raises_refresh_expired(self, provider):
mock_resp = _mock_post(400, {"error": "invalid_grant"})
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post", return_value=mock_resp
):
with pytest.raises(RefreshExpiredError, match="invalid_grant"):
provider.refresh_session(refresh_token="rt_dead")
def test_refresh_empty_token_no_network(self, provider):
with patch("plugins.dashboard_auth.self_hosted.httpx.post") as mock_post:
with pytest.raises(RefreshExpiredError):
provider.refresh_session(refresh_token="")
mock_post.assert_not_called()
def test_refresh_network_error_raises_provider_error(self, provider):
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post",
side_effect=httpx.RequestError("boom"),
):
with pytest.raises(ProviderError, match="unreachable"):
provider.refresh_session(refresh_token="rt_x")
def test_revoke_posts_to_revocation_endpoint(self, provider):
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post"
) as mock_post:
provider.revoke_session(refresh_token="rt_x")
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
assert args[0] == f"{_ISSUER}/revoke"
assert kwargs["data"]["token"] == "rt_x"
def test_revoke_empty_token_noop(self, provider):
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post"
) as mock_post:
assert provider.revoke_session(refresh_token="") is None
mock_post.assert_not_called()
def test_revoke_swallows_errors(self, provider):
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post",
side_effect=httpx.RequestError("down"),
):
# Must not raise.
assert provider.revoke_session(refresh_token="rt_x") is None
def test_revoke_noop_when_no_revocation_endpoint(self, provider):
provider._discovery["revocation_endpoint"] = ""
with patch(
"plugins.dashboard_auth.self_hosted.httpx.post"
) as mock_post:
assert provider.revoke_session(refresh_token="rt_x") is None
mock_post.assert_not_called()
# ---------------------------------------------------------------------------
# Plugin entry point: env + config.yaml precedence
# ---------------------------------------------------------------------------
class TestPluginRegister:
@pytest.fixture(autouse=True)
def clear_env(self, monkeypatch):
for var in (
"HERMES_DASHBOARD_OIDC_ISSUER",
"HERMES_DASHBOARD_OIDC_CLIENT_ID",
"HERMES_DASHBOARD_OIDC_SCOPES",
):
monkeypatch.delenv(var, raising=False)
@pytest.fixture
def patch_config(self, monkeypatch):
def _set(oauth_block):
cfg = {}
if oauth_block is not None:
cfg = {"dashboard": {"oauth": oauth_block}}
monkeypatch.setattr("hermes_cli.config.load_config", lambda: cfg)
return _set
def test_skips_when_unconfigured(self, patch_config):
patch_config(None)
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "HERMES_DASHBOARD_OIDC_ISSUER" in oidc_plugin.LAST_SKIP_REASON
assert "self_hosted" in oidc_plugin.LAST_SKIP_REASON
def test_skips_when_only_issuer_set(self, patch_config, monkeypatch):
patch_config(None)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", _ISSUER)
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
def test_registers_from_env(self, patch_config, monkeypatch):
patch_config(None)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", _ISSUER)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", _CLIENT_ID)
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_called_once()
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
assert isinstance(registered, oidc_plugin.SelfHostedOIDCProvider)
assert registered._issuer == _ISSUER
assert registered._client_id == _CLIENT_ID
assert registered._scopes == "openid profile email"
assert oidc_plugin.LAST_SKIP_REASON == ""
def test_registers_from_config_yaml(self, patch_config):
patch_config(
{"self_hosted": {"issuer": _ISSUER, "client_id": _CLIENT_ID}}
)
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_called_once()
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
assert registered._issuer == _ISSUER
assert registered._client_id == _CLIENT_ID
def test_env_overrides_config(self, patch_config, monkeypatch):
patch_config(
{
"self_hosted": {
"issuer": "https://config.example",
"client_id": "config-client",
}
}
)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", _ISSUER)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", _CLIENT_ID)
ctx = MagicMock()
oidc_plugin.register(ctx)
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
assert registered._issuer == _ISSUER
assert registered._client_id == _CLIENT_ID
def test_empty_env_does_not_shadow_config(self, patch_config, monkeypatch):
patch_config(
{"self_hosted": {"issuer": _ISSUER, "client_id": _CLIENT_ID}}
)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_ISSUER", "")
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", "")
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_called_once()
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
assert registered._issuer == _ISSUER
def test_custom_scopes_from_config(self, patch_config):
patch_config(
{
"self_hosted": {
"issuer": _ISSUER,
"client_id": _CLIENT_ID,
"scopes": "openid email",
}
}
)
ctx = MagicMock()
oidc_plugin.register(ctx)
registered = ctx.register_dashboard_auth_provider.call_args.args[0]
assert registered._scopes == "openid email"
def test_config_load_failure_falls_through(self, monkeypatch):
def _broken():
raise OSError("unreadable")
monkeypatch.setattr("hermes_cli.config.load_config", _broken)
ctx = MagicMock()
oidc_plugin.register(ctx) # must not raise
ctx.register_dashboard_auth_provider.assert_not_called()
def test_non_dict_oauth_section_tolerated(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"dashboard": {"oauth": "wrong type"}},
)
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
def test_non_https_issuer_skips_with_reason(self, patch_config, monkeypatch):
patch_config(None)
monkeypatch.setenv(
"HERMES_DASHBOARD_OIDC_ISSUER", "http://insecure.example"
)
monkeypatch.setenv("HERMES_DASHBOARD_OIDC_CLIENT_ID", _CLIENT_ID)
ctx = MagicMock()
oidc_plugin.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "construction failed" in oidc_plugin.LAST_SKIP_REASON

View File

@ -598,6 +598,132 @@ The `/auth/password-login` endpoint is rate-limited per client IP (default 10 at
`basic` is just one implementation of an extension point. Any plugin can register a password provider: set `supports_password = True` on your `DashboardAuthProvider` subclass and implement `complete_password_login(*, username, password) -> Session` (raise `InvalidCredentialsError` on rejection, `ProviderError` if your backing store is down). The OAuth `start_login` / `complete_login` methods can be left as `NotImplementedError` stubs for a pure-password provider. This is the path for LDAP-bind, a credentials database, or any other non-redirect auth scheme — the framework handles the form, the route, the cookies, and refresh for you.
### Self-hosted OIDC provider
If you run your own identity provider, the bundled `plugins/dashboard_auth/self_hosted` plugin authenticates the dashboard against it using **standard OpenID Connect** — no per-IDP code, no Nous Portal involved. It's verified against and works with any conformant OIDC server:
> **Authentik · Keycloak · Zitadel · Authelia · Auth0 · Okta · Google · …**
Like the Nous provider, it auto-loads and only registers itself once it's configured, so it's a no-op for loopback / `--insecure` dashboards.
#### Configuration
Configure an **issuer** and a **client_id** (a public PKCE client — no client secret). The plugin fetches the IDP's `authorization_endpoint`, `token_endpoint`, and `jwks_uri` from `{issuer}/.well-known/openid-configuration`, so you never hardcode endpoint URLs.
**`config.yaml`** — the canonical surface:
```yaml
dashboard:
oauth:
provider: self-hosted
self_hosted:
issuer: https://auth.example.com/application/o/hermes/ # required
client_id: hermes-dashboard # required
scopes: "openid profile email" # optional (this is the default)
```
**Environment variables** — operator overrides (env wins over `config.yaml` when set non-empty; an empty value is treated as unset):
| Env var | Overrides | Notes |
|---------|-----------|-------|
| `HERMES_DASHBOARD_OIDC_ISSUER` | `dashboard.oauth.self_hosted.issuer` | OIDC issuer URL — required |
| `HERMES_DASHBOARD_OIDC_CLIENT_ID` | `dashboard.oauth.self_hosted.client_id` | Public client id — required |
| `HERMES_DASHBOARD_OIDC_SCOPES` | `dashboard.oauth.self_hosted.scopes` | Defaults to `openid profile email` |
In your IDP, register a **public** application/client with the authorization-code + PKCE (S256) grant and add the dashboard's callback as an allowed redirect URI. The callback is `<dashboard public URL>/auth/callback` (see [Public URL override](#public-url-override) for how the dashboard derives its public URL behind a proxy).
#### What it verifies
The provider verifies the OpenID Connect **ID token** (RS256/ES256) against the discovered `jwks_uri`, with the `iss` and `aud` claims pinned to your configured `issuer` and `client_id`. Standard OIDC claims map onto the dashboard session:
| Session field | Claim(s) |
|---------------|----------|
| `user_id` | `sub` (required) |
| `email` | `email` |
| `display_name` | `name``preferred_username``nickname``email` |
| `org_id` | `org_id` / `organization`, else joined `groups` |
The ID token is what establishes identity — the access token is treated as opaque (the OIDC spec does not require it to be a JWT). Endpoint URLs are required to be HTTPS (loopback `http://` is allowed for local-dev IDPs), and the discovery document's advertised `issuer` must match your configured one (a trailing-slash difference is tolerated). Refresh tokens, when the IDP issues them, are used for silent re-auth via the standard `refresh_token` grant; logout calls the IDP's RFC 7009 `revocation_endpoint` when advertised.
> **Confidential clients** (those with a `client_secret`) are not supported yet — configure a public + PKCE client, which is the typical choice for a browser-facing dashboard.
#### Worked example: Keycloak
[Keycloak](https://www.keycloak.org/) is one of the easiest self-hosted OIDC servers to stand up for a local test — it runs as a single container in dev mode (in-memory DB) and exposes textbook OIDC discovery. This walkthrough gets you from nothing to a working dashboard login in a few minutes.
**1. Run Keycloak with a pre-configured realm.** Save this realm export as `realm-hermes.json` — it defines a `hermes` realm, a **public PKCE client** (`hermes-dashboard`), and a test user, all imported on boot so there's nothing to click in the admin UI:
```json
{
"realm": "hermes",
"enabled": true,
"clients": [
{
"clientId": "hermes-dashboard",
"name": "Hermes Agent Dashboard",
"enabled": true,
"publicClient": true,
"standardFlowEnabled": true,
"protocol": "openid-connect",
"redirectUris": ["http://localhost:9119/auth/callback"],
"webOrigins": ["http://localhost:9119"],
"attributes": { "pkce.code.challenge.method": "S256" }
}
],
"users": [
{
"username": "testuser",
"enabled": true,
"emailVerified": true,
"email": "testuser@example.com",
"firstName": "Test",
"lastName": "User",
"credentials": [
{ "type": "password", "value": "testpassword", "temporary": false }
]
}
]
}
```
Start it (Keycloak 26+), mounting that file into the import directory:
```bash
docker run --rm -p 8080:8080 \
-e KC_BOOTSTRAP_ADMIN_USERNAME=admin \
-e KC_BOOTSTRAP_ADMIN_PASSWORD=admin \
-v "$PWD/realm-hermes.json:/opt/keycloak/data/import/realm-hermes.json:ro" \
quay.io/keycloak/keycloak:26.0 \
start-dev --import-realm
```
Once it's up, the realm advertises standard OIDC discovery at
`http://localhost:8080/realms/hermes/.well-known/openid-configuration` (issuer
`http://localhost:8080/realms/hermes`). The admin console is at
`http://localhost:8080/` (`admin` / `admin`).
**2. Point the dashboard at it.** The self-hosted plugin permits a loopback `http://` issuer (HTTPS is required for any non-loopback issuer), so the local Keycloak works as-is:
```bash
export HERMES_DASHBOARD_OIDC_ISSUER="http://localhost:8080/realms/hermes"
export HERMES_DASHBOARD_OIDC_CLIENT_ID="hermes-dashboard"
export HERMES_DASHBOARD_PUBLIC_URL="http://localhost:9119"
hermes dashboard --host 0.0.0.0 --port 9119 --no-open
```
`HERMES_DASHBOARD_PUBLIC_URL` tells the dashboard its OAuth callback is
`http://localhost:9119/auth/callback` — the redirect URI the realm registered
above. Binding to `0.0.0.0` (a non-loopback bind) without `--insecure` is what
engages the OAuth gate.
**3. Log in.** Open `http://localhost:9119/`, you'll be bounced to `/login`. Click **Sign in with Self-Hosted OIDC** → authenticate at Keycloak as `testuser` / `testpassword` → land back on the authenticated dashboard. The sidebar shows `Logged in as Test User via self-hosted`, and `GET /api/auth/me` returns the verified session (`provider: self-hosted`, `email: testuser@example.com`).
> If you bind or browse on a different host/port, add that origin's
> `…/auth/callback` to the client's **Valid redirect URIs** in the Keycloak
> admin console (Clients → hermes-dashboard → Settings). The same pattern works
> for Authentik, Zitadel, Authelia, and other OIDC servers — only the issuer
> URL and client registration UI differ.
### Public URL override
By default, the dashboard reconstructs the OAuth callback URL from the request — `X-Forwarded-Host` + `X-Forwarded-Proto` + `X-Forwarded-Prefix` (when uvicorn is configured with `proxy_headers=True`, which `start_server` enables under the gate). This works out of the box on Fly.io, which sets all three headers correctly.