refactor(auth): remove vestigial Nous min_key_ttl/inference_auth_mode params

After the legacy session-key path was removed, two parameters became dead
surface on the Nous runtime-resolution chain:

- min_key_ttl_seconds: del'd inside refresh_nous_oauth_pure and pass-through /
  telemetry-only in refresh_nous_oauth_from_state, _try_import_shared_nous_state,
  _nous_device_code_login, and resolve_nous_runtime_credentials. It controlled the
  now-deleted agent-key mint TTL and drives no behavior.
- inference_auth_mode: with the legacy mode gone, AUTO and FRESH are behaviorally
  identical; the value only fed _normalize_nous_inference_auth_mode validation and
  oauth trace output, never a branch.

Removing inference_auth_mode orphaned its whole supporting cluster
(NOUS_INFERENCE_AUTH_MODE_AUTO/FRESH, NOUS_INFERENCE_AUTH_MODES,
_normalize_nous_inference_auth_mode), and dropping min_key_ttl_seconds orphaned
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS — all deleted here.

Updated every caller (run_agent, auxiliary_client, credential_pool, proxy adapter,
runtime_provider, web_server, main, auth_commands, setup) and pruned the matching
test kwargs. Deleted two tests that exercised the removed surface
(test_legacy_auth_mode_is_rejected, test_try_refresh_..._accepts_explicit_auth_mode).

No behavior change: net -134 LOC of dead code.
This commit is contained in:
kshitijk4poor
2026-05-29 14:48:51 +05:30
committed by kshitij
parent 95cf8f9842
commit a22c250001
13 changed files with 23 additions and 157 deletions

View File

@ -1276,15 +1276,10 @@ def _resolve_nous_runtime_api(*, force_refresh: bool = False) -> Optional[tuple[
or the credential pool.
"""
try:
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_AUTO,
resolve_nous_runtime_credentials,
)
from hermes_cli.auth import resolve_nous_runtime_credentials
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh=force_refresh,
)
except Exception as exc:
@ -2742,7 +2737,6 @@ def _refresh_provider_credentials(provider: str) -> bool:
from hermes_cli.auth import resolve_nous_runtime_credentials
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
force_refresh=True,
)

View File

@ -22,7 +22,6 @@ from agent.credential_persistence import (
import hermes_cli.auth as auth_mod
from hermes_cli.auth import (
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
PROVIDER_REGISTRY,
_auth_store_lock,
_codex_access_token_is_expiring,
@ -932,8 +931,6 @@ class CredentialPool:
if synced is not entry:
entry = synced
auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
inference_auth_mode=auth_mod.NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh=force,
)
updated = self._sync_nous_entry_from_auth_store(entry)

View File

@ -73,14 +73,7 @@ DEFAULT_NOUS_CLIENT_ID = "hermes-cli"
NOUS_INFERENCE_INVOKE_SCOPE = "inference:invoke"
DEFAULT_NOUS_SCOPE = NOUS_INFERENCE_INVOKE_SCOPE
NOUS_DEVICE_CODE_SOURCE = "device_code"
NOUS_INFERENCE_AUTH_MODE_AUTO = "auto"
NOUS_INFERENCE_AUTH_MODE_FRESH = "fresh"
NOUS_INFERENCE_AUTH_MODES = frozenset({
NOUS_INFERENCE_AUTH_MODE_AUTO,
NOUS_INFERENCE_AUTH_MODE_FRESH,
})
NOUS_AUTH_PATH_INVOKE_JWT = "invoke_jwt"
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
NOUS_INVOKE_JWT_MIN_TTL_SECONDS = ACCESS_TOKEN_REFRESH_SKEW_SECONDS
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
@ -1733,17 +1726,6 @@ def _scope_values(raw_scope: Any) -> set[str]:
return scopes
def _normalize_nous_inference_auth_mode(inference_auth_mode: Optional[str]) -> str:
mode = str(inference_auth_mode or NOUS_INFERENCE_AUTH_MODE_AUTO).strip().lower()
if mode not in NOUS_INFERENCE_AUTH_MODES:
allowed = ", ".join(sorted(NOUS_INFERENCE_AUTH_MODES))
raise ValueError(
"Invalid Nous inference auth mode "
f"{inference_auth_mode!r}; expected one of: {allowed}"
)
return mode
def _nous_invoke_jwt_status(
token: Any,
*,
@ -4649,7 +4631,6 @@ def _quarantine_nous_pool_entries(
def _try_import_shared_nous_state(
*,
timeout_seconds: float = 15.0,
min_key_ttl_seconds: int = 5 * 60,
) -> Optional[Dict[str, Any]]:
"""Attempt to rehydrate Nous OAuth state from the shared store.
@ -4692,10 +4673,8 @@ def _try_import_shared_nous_state(
refreshed = refresh_nous_oauth_from_state(
state,
min_key_ttl_seconds=min_key_ttl_seconds,
timeout_seconds=timeout_seconds,
force_refresh=True,
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_FRESH,
on_state_update=_persist_shared_refresh,
)
_write_shared_nous_state(refreshed)
@ -4965,12 +4944,10 @@ def refresh_nous_oauth_pure(
expires_at: Optional[str] = None,
agent_key: Optional[str] = None,
agent_key_expires_at: Optional[str] = None,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
insecure: Optional[bool] = None,
ca_bundle: Optional[str] = None,
force_refresh: bool = False,
inference_auth_mode: str = NOUS_INFERENCE_AUTH_MODE_AUTO,
on_state_update: Optional[Callable[[Dict[str, Any], str], None]] = None,
) -> Dict[str, Any]:
"""Refresh Nous OAuth state without mutating auth.json directly.
@ -4979,7 +4956,6 @@ def refresh_nous_oauth_pure(
Callers that own persistent state can use it to save the newly rotated
refresh token before later validation can fail.
"""
_normalize_nous_inference_auth_mode(inference_auth_mode)
state: Dict[str, Any] = {
"access_token": access_token,
"refresh_token": refresh_token,
@ -5001,7 +4977,6 @@ def refresh_nous_oauth_pure(
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
del min_key_ttl_seconds
current_invoke_jwt_status = _nous_invoke_jwt_status(
state.get("access_token"),
scope=state.get("scope"),
@ -5056,10 +5031,8 @@ def refresh_nous_oauth_pure(
def refresh_nous_oauth_from_state(
state: Dict[str, Any],
*,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
force_refresh: bool = False,
inference_auth_mode: str = NOUS_INFERENCE_AUTH_MODE_AUTO,
on_state_update: Optional[Callable[[Dict[str, Any], str], None]] = None,
) -> Dict[str, Any]:
"""Refresh Nous OAuth from a state dict. Thin wrapper around refresh_nous_oauth_pure."""
@ -5076,12 +5049,10 @@ def refresh_nous_oauth_from_state(
expires_at=state.get("expires_at"),
agent_key=state.get("agent_key"),
agent_key_expires_at=state.get("agent_key_expires_at"),
min_key_ttl_seconds=min_key_ttl_seconds,
timeout_seconds=timeout_seconds,
insecure=tls.get("insecure"),
ca_bundle=tls.get("ca_bundle"),
force_refresh=force_refresh,
inference_auth_mode=inference_auth_mode,
on_state_update=on_state_update,
)
@ -5157,11 +5128,9 @@ def _sync_nous_pool_from_auth_store() -> None:
def resolve_nous_runtime_credentials(
*,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
insecure: Optional[bool] = None,
ca_bundle: Optional[str] = None,
inference_auth_mode: str = NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh: bool = False,
) -> Dict[str, Any]:
"""
@ -5173,8 +5142,6 @@ def resolve_nous_runtime_credentials(
Returns dict with: provider, base_url, api_key, key_id, expires_at,
expires_in, source ("invoke_jwt"), and auth_path.
"""
inference_auth_mode = _normalize_nous_inference_auth_mode(inference_auth_mode)
min_key_ttl_seconds = max(60, int(min_key_ttl_seconds))
sequence_id = uuid.uuid4().hex[:12]
with _auth_store_lock():
@ -5246,8 +5213,6 @@ def resolve_nous_runtime_credentials(
_oauth_trace(
"nous_runtime_credentials_start",
sequence_id=sequence_id,
inference_auth_mode=inference_auth_mode,
min_key_ttl_seconds=min_key_ttl_seconds,
refresh_token_fp=_token_fingerprint(state.get("refresh_token")),
)
@ -5550,7 +5515,7 @@ def _compute_nous_auth_status() -> Dict[str, Any]:
"source": "auth_store",
}
try:
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=60)
creds = resolve_nous_runtime_credentials()
refreshed_state = get_provider_auth_state("nous") or state
base_status.update(
{
@ -7348,7 +7313,6 @@ def _nous_device_code_login(
timeout_seconds: float = 15.0,
insecure: bool = False,
ca_bundle: Optional[str] = None,
min_key_ttl_seconds: int = 5 * 60,
) -> Dict[str, Any]:
"""Run the Nous device-code flow and return full OAuth state without persisting."""
pconfig = PROVIDER_REGISTRY["nous"]
@ -7450,10 +7414,8 @@ def _nous_device_code_login(
try:
return refresh_nous_oauth_from_state(
auth_state,
min_key_ttl_seconds=min_key_ttl_seconds,
timeout_seconds=timeout_seconds,
force_refresh=False,
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_FRESH,
)
except AuthError as exc:
if exc.code == "subscription_required":
@ -7505,7 +7467,6 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
print("Rehydrating Nous session from shared credentials...")
auth_state = _try_import_shared_nous_state(
timeout_seconds=timeout_seconds,
min_key_ttl_seconds=5 * 60,
)
if auth_state is None:
print("Could not refresh shared credentials — falling back to device-code login.")
@ -7520,7 +7481,6 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
timeout_seconds=timeout_seconds,
insecure=insecure,
ca_bundle=ca_bundle,
min_key_ttl_seconds=5 * 60,
)
inference_base_url = auth_state["inference_base_url"]

View File

@ -272,9 +272,6 @@ def auth_add_command(args) -> None:
print("Rehydrating Nous session from shared credentials...")
rehydrated = auth_mod._try_import_shared_nous_state(
timeout_seconds=getattr(args, "timeout", None) or 15.0,
min_key_ttl_seconds=max(
60, int(getattr(args, "min_key_ttl_seconds", 5 * 60))
),
)
if rehydrated is not None:
custom_label = (getattr(args, "label", None) or "").strip() or None
@ -297,7 +294,6 @@ def auth_add_command(args) -> None:
timeout_seconds=getattr(args, "timeout", None) or 15.0,
insecure=bool(getattr(args, "insecure", False)),
ca_bundle=getattr(args, "ca_bundle", None),
min_key_ttl_seconds=max(60, int(getattr(args, "min_key_ttl_seconds", 5 * 60))),
)
# Honor `--label <name>` so nous matches other providers' UX. The
# helper embeds this into providers.nous so that label_from_token

View File

@ -3071,7 +3071,7 @@ def _model_flow_nous(config, current_model="", args=None):
# Verify credentials are still valid (catches expired sessions early)
try:
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
creds = resolve_nous_runtime_credentials()
except Exception as exc:
relogin = isinstance(exc, AuthError) and exc.relogin_required
msg = format_auth_error(exc) if isinstance(exc, AuthError) else str(exc)
@ -3105,7 +3105,6 @@ def _model_flow_nous(config, current_model="", args=None):
if not free_tier:
try:
refreshed_creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=5 * 60,
force_refresh=True,
)
if refreshed_creds:

View File

@ -14,7 +14,6 @@ from typing import Any, Dict, FrozenSet, Optional
from hermes_cli.auth import (
AuthError,
DEFAULT_NOUS_INFERENCE_URL,
NOUS_INFERENCE_AUTH_MODE_AUTO,
_load_auth_store,
_auth_store_lock,
_is_terminal_nous_refresh_error,
@ -74,9 +73,7 @@ class NousPortalAdapter(UpstreamAdapter):
)
def get_credential(self) -> UpstreamCredential:
return self._get_credential(
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_AUTO,
)
return self._get_credential()
def get_retry_credential(
self,
@ -89,14 +86,12 @@ class NousPortalAdapter(UpstreamAdapter):
return None
logger.info("proxy: Nous upstream rejected bearer; force-refreshing invoke JWT")
return self._get_credential(
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_AUTO,
force_refresh=True,
)
def _get_credential(
self,
*,
inference_auth_mode: str,
force_refresh: bool = False,
) -> UpstreamCredential:
with self._lock:
@ -108,7 +103,6 @@ class NousPortalAdapter(UpstreamAdapter):
try:
refreshed = resolve_nous_runtime_credentials(
inference_auth_mode=inference_auth_mode,
force_refresh=force_refresh,
)
except AuthError as exc:

View File

@ -1129,7 +1129,6 @@ def _resolve_explicit_runtime(
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
if not api_key:
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
api_key = creds.get("api_key", "")
@ -1344,7 +1343,6 @@ def resolve_runtime_provider(
if provider == "nous":
try:
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
return {

View File

@ -781,7 +781,6 @@ def setup_model_provider(config: dict, *, quick: bool = False):
timeout=15.0,
insecure=False,
ca_bundle=None,
min_key_ttl_seconds=5 * 60,
)
)
pool = load_pool(selected_provider)
@ -2975,7 +2974,6 @@ def _run_portal_one_shot(config: dict) -> None:
timeout=None,
insecure=False,
ca_bundle=None,
min_key_ttl_seconds=5 * 60,
)
try:
auth_add_command(ns)

View File

@ -2065,7 +2065,6 @@ async def _start_device_code_flow(provider_id: str) -> Dict[str, Any]:
def _nous_poller(session_id: str) -> None:
"""Background poller that drives a Nous device-code flow to completion."""
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_FRESH,
_poll_for_token,
refresh_nous_oauth_from_state,
)
@ -2111,10 +2110,8 @@ def _nous_poller(session_id: str) -> None:
}
full_state = refresh_nous_oauth_from_state(
auth_state,
min_key_ttl_seconds=300,
timeout_seconds=15.0,
force_refresh=False,
inference_auth_mode=NOUS_INFERENCE_AUTH_MODE_FRESH,
)
from hermes_cli.auth import persist_nous_credentials
persist_nous_credentials(full_state)

View File

@ -2999,22 +2999,15 @@ class AIAgent:
self,
*,
force: bool = True,
inference_auth_mode: str | None = None,
) -> bool:
if self.api_mode != "chat_completions" or self.provider != "nous":
return False
try:
from hermes_cli.auth import (
NOUS_INFERENCE_AUTH_MODE_AUTO,
resolve_nous_runtime_credentials,
)
from hermes_cli.auth import resolve_nous_runtime_credentials
selected_auth_mode = inference_auth_mode or NOUS_INFERENCE_AUTH_MODE_AUTO
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
inference_auth_mode=selected_auth_mode,
force_refresh=force,
)
except Exception as exc:

View File

@ -200,7 +200,7 @@ def test_resolve_nous_runtime_credentials_prefers_invoke_jwt_and_mirrors(
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = auth_mod.resolve_nous_runtime_credentials()
assert creds["api_key"] == token
assert creds["source"] == auth_mod.NOUS_AUTH_PATH_INVOKE_JWT
@ -276,7 +276,7 @@ def test_resolve_nous_runtime_credentials_invoke_jwt_is_idempotent(
lambda: sync_calls.append(True),
)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = auth_mod.resolve_nous_runtime_credentials()
assert creds["api_key"] == token
assert creds["source"] == auth_mod.NOUS_AUTH_PATH_INVOKE_JWT
@ -314,7 +314,7 @@ def test_resolve_nous_runtime_credentials_trusts_invoke_jwt_exp_over_stale_metad
monkeypatch.setattr(auth_mod, "_refresh_access_token", _unexpected_refresh)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = auth_mod.resolve_nous_runtime_credentials()
assert creds["api_key"] == token
assert creds["source"] == auth_mod.NOUS_AUTH_PATH_INVOKE_JWT
@ -342,7 +342,7 @@ def test_resolve_nous_runtime_credentials_does_not_apply_agent_key_ttl_to_invoke
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=1800)
creds = auth_mod.resolve_nous_runtime_credentials()
assert creds["api_key"] == token
assert creds["source"] == auth_mod.NOUS_AUTH_PATH_INVOKE_JWT
@ -386,7 +386,7 @@ def test_resolve_nous_runtime_credentials_refreshes_legacy_agent_key_to_invoke_j
monkeypatch.setattr(auth_mod, "_refresh_access_token", _fake_refresh_access_token)
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = auth_mod.resolve_nous_runtime_credentials()
assert refresh_calls == ["refresh-old"]
assert creds["api_key"] == refreshed_token
@ -400,27 +400,6 @@ def test_resolve_nous_runtime_credentials_refreshes_legacy_agent_key_to_invoke_j
assert payload["credential_pool"]["nous"][0]["agent_key"] == refreshed_token
def test_legacy_auth_mode_is_rejected(tmp_path, monkeypatch):
import hermes_cli.auth as auth_mod
hermes_home = tmp_path / "hermes"
token = _invoke_jwt(seconds=3600)
_setup_nous_auth(
hermes_home,
access_token=token,
scope=auth_mod.DEFAULT_NOUS_SCOPE,
expires_at=_future_iso(3600),
expires_in=3600,
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
with pytest.raises(ValueError, match="Invalid Nous inference auth mode"):
auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
inference_auth_mode="legacy",
)
def test_resolve_nous_runtime_credentials_reauths_when_invoke_scope_missing(
tmp_path,
monkeypatch,
@ -444,7 +423,7 @@ def test_resolve_nous_runtime_credentials_reauths_when_invoke_scope_missing(
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
with pytest.raises(AuthError) as exc:
auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
auth_mod.resolve_nous_runtime_credentials()
assert exc.value.code == "missing_inference_invoke_scope"
assert exc.value.relogin_required is True
@ -500,7 +479,7 @@ def test_removed_legacy_session_env_var_does_not_change_jwt_auth(tmp_path, monke
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HERMES_AGENT_USE_LEGACY_SESSION_KEYS", "true")
creds = auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = auth_mod.resolve_nous_runtime_credentials()
assert creds["api_key"] == token
payload = json.loads((hermes_home / "auth.json").read_text())
@ -579,7 +558,6 @@ def test_nous_inference_auth_logs_do_not_include_secret_values(
caplog.set_level(logging.INFO, logger="hermes_cli.auth")
auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
force_refresh=True,
)
@ -678,7 +656,7 @@ def test_get_nous_auth_status_auth_store_fallback(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials",
lambda min_key_ttl_seconds=60: {
lambda **kwargs: {
"base_url": "https://inference.example.com/v1",
"expires_at": "2099-01-01T00:00:00+00:00",
"key_id": "key-1",
@ -718,7 +696,7 @@ def test_get_nous_auth_status_prefers_runtime_auth_store_over_stale_pool(tmp_pat
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials",
lambda min_key_ttl_seconds=60: {
lambda **kwargs: {
"base_url": "https://inference.example.com/v1",
"expires_at": "2099-01-01T00:00:00+00:00",
"key_id": "key-fresh",
@ -740,7 +718,7 @@ def test_get_nous_auth_status_reports_revoked_refresh_session(tmp_path, monkeypa
_setup_nous_auth(hermes_home, access_token="at-123")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
def _boom(min_key_ttl_seconds=60):
def _boom(**kwargs):
raise AuthError("Refresh session has been revoked", provider="nous", relogin_required=True)
monkeypatch.setattr("hermes_cli.auth.resolve_nous_runtime_credentials", _boom)
@ -803,7 +781,7 @@ def test_refresh_token_persisted_when_refreshed_jwt_lacks_invoke_scope(tmp_path,
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
resolve_nous_runtime_credentials()
assert exc.value.code == "missing_inference_invoke_scope"
state_after_failure = get_provider_auth_state("nous")
@ -811,7 +789,7 @@ def test_refresh_token_persisted_when_refreshed_jwt_lacks_invoke_scope(tmp_path,
assert state_after_failure["refresh_token"] == "refresh-1"
assert state_after_failure["access_token"] == bad_jwt
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = resolve_nous_runtime_credentials()
assert creds["api_key"] == good_jwt
assert refresh_calls == ["refresh-old", "refresh-1"]
@ -836,7 +814,7 @@ def test_refresh_token_persisted_when_refreshed_token_is_not_jwt(tmp_path, monke
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
resolve_nous_runtime_credentials()
assert exc.value.code == "access_token_not_jwt"
state_after_failure = get_provider_auth_state("nous")
@ -882,7 +860,7 @@ def test_terminal_refresh_failure_quarantines_tokens(
monkeypatch.setattr(auth_mod, "_refresh_access_token", _terminal_refresh_failure)
with pytest.raises(AuthError, match="Refresh session has been revoked"):
auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
auth_mod.resolve_nous_runtime_credentials()
state_after_failure = auth_mod.get_provider_auth_state("nous")
assert state_after_failure is not None
@ -895,7 +873,7 @@ def test_terminal_refresh_failure_quarantines_tokens(
assert payload.get("credential_pool", {}).get("nous") == []
with pytest.raises(AuthError, match="No access token found"):
auth_mod.resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
auth_mod.resolve_nous_runtime_credentials()
assert refresh_calls == ["refresh-old"]
@ -968,9 +946,9 @@ def test_unusable_access_token_refresh_uses_latest_rotated_refresh_token(tmp_pat
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
with pytest.raises(AuthError) as exc:
resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
resolve_nous_runtime_credentials()
assert exc.value.code == "access_token_not_jwt"
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=300)
creds = resolve_nous_runtime_credentials()
assert creds["api_key"] == good_jwt
assert refresh_calls == ["refresh-old", "refresh-1"]
@ -1251,7 +1229,6 @@ def test_persist_nous_credentials_allows_recovery_from_401(tmp_path, monkeypatch
monkeypatch.setattr("hermes_cli.auth._refresh_access_token", _fake_refresh_access_token)
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
force_refresh=True,
)
assert creds["api_key"] == new_jwt
@ -1788,10 +1765,6 @@ def test_try_import_shared_rehydrates_on_success(shared_store_env, monkeypatch):
def _fake_refresh(state, **kwargs):
# Simulate portal returning a fresh inference JWT.
assert kwargs.get("force_refresh") is True
assert (
kwargs.get("inference_auth_mode")
== auth_mod.NOUS_INFERENCE_AUTH_MODE_FRESH
)
return {
**state,
"access_token": fresh_jwt,
@ -1914,10 +1887,7 @@ def test_runtime_refresh_uses_newer_shared_token_before_local_stale_token(
monkeypatch.setattr(auth_mod, "_refresh_access_token", _refresh_should_not_happen)
creds = auth_mod.resolve_nous_runtime_credentials(
min_key_ttl_seconds=300,
inference_auth_mode=auth_mod.NOUS_INFERENCE_AUTH_MODE_FRESH,
)
creds = auth_mod.resolve_nous_runtime_credentials()
assert creds["api_key"] == shared_token

View File

@ -176,7 +176,6 @@ def test_nous_adapter_retry_credential_force_refreshes_on_jwt_401(tmp_path, monk
assert cred is not None
assert cred.bearer == "fresh-jwt-bearer"
assert mock_resolve.call_args.kwargs["force_refresh"] is True
assert mock_resolve.call_args.kwargs["inference_auth_mode"] == "auto"
def test_nous_adapter_retry_credential_skips_non_401(tmp_path, monkeypatch):

View File

@ -4063,7 +4063,6 @@ class TestNousCredentialRefresh:
assert ok is True
assert closed["value"] is True
assert captured["inference_auth_mode"] == "auto"
assert captured["force_refresh"] is True
assert rebuilt["kwargs"]["api_key"] == "new-nous-key"
assert (
@ -4072,34 +4071,6 @@ class TestNousCredentialRefresh:
assert "default_headers" not in rebuilt["kwargs"]
assert isinstance(agent.client, _RebuiltClient)
def test_try_refresh_nous_client_credentials_accepts_explicit_auth_mode(
self, agent, monkeypatch
):
agent.provider = "nous"
agent.api_mode = "chat_completions"
captured = {}
def _fake_resolve(**kwargs):
captured.update(kwargs)
return {
"api_key": "new-nous-key",
"base_url": "https://inference-api.nousresearch.com/v1",
}
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials", _fake_resolve
)
with patch("run_agent.OpenAI", return_value=MagicMock()):
ok = agent._try_refresh_nous_client_credentials(
force=False,
inference_auth_mode="fresh",
)
assert ok is True
assert captured["inference_auth_mode"] == "fresh"
assert captured["force_refresh"] is False
class TestCredentialPoolRecovery:
def test_recover_with_pool_rotates_on_402(self, agent):