Merge origin/main into bb/desktop-profile-support

Resolve conflicts in desktop settings/cron/messaging/sidebar: adopt main's
ListRow + actions-menu refactors for credential rows; keep our profileColor
import on the sidebar. Drop the now-orphaned Tip-based helpers.
This commit is contained in:
Brooklyn Nicholson
2026-06-04 20:17:07 -05:00
144 changed files with 5796 additions and 1681 deletions

View File

@ -0,0 +1,201 @@
"""Tests for the update_session_meta fix.
Verifies that:
1. SessionDB.update_session_meta() exists and works correctly via the
public _execute_write path (not db._lock / db._conn directly).
2. session.py _persist() no longer touches db._lock or db._conn.
3. update_session_meta updates the correct columns atomically.
"""
import ast
import json
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
from hermes_state import SessionDB
from acp_adapter.session import SessionManager
def _tmp_db(tmp_path):
return SessionDB(db_path=tmp_path / "state.db")
def _mock_agent():
return MagicMock(name="MockAIAgent")
# ---------------------------------------------------------------------------
# hermes_state.SessionDB.update_session_meta — unit tests
# ---------------------------------------------------------------------------
class TestUpdateSessionMeta:
"""Direct unit tests for the new public method."""
def test_method_exists(self, tmp_path):
db = _tmp_db(tmp_path)
assert hasattr(db, "update_session_meta"), (
"SessionDB must have update_session_meta() public method"
)
assert callable(db.update_session_meta)
def test_updates_model_config(self, tmp_path):
db = _tmp_db(tmp_path)
db.create_session("s1", source="acp", model="gpt-4")
new_meta = json.dumps({"cwd": "/new/path", "provider": "openai"})
db.update_session_meta("s1", new_meta, model=None)
row = db.get_session("s1")
stored = json.loads(row["model_config"])
assert stored["cwd"] == "/new/path"
assert stored["provider"] == "openai"
def test_updates_model_when_provided(self, tmp_path):
db = _tmp_db(tmp_path)
db.create_session("s2", source="acp", model="gpt-3.5")
db.update_session_meta("s2", json.dumps({"cwd": "."}), model="gpt-4o")
row = db.get_session("s2")
assert row["model"] == "gpt-4o"
def test_preserves_existing_model_when_none(self, tmp_path):
"""Passing model=None must leave the stored model unchanged (COALESCE)."""
db = _tmp_db(tmp_path)
db.create_session("s3", source="acp", model="claude-3")
db.update_session_meta("s3", json.dumps({"cwd": "."}), model=None)
row = db.get_session("s3")
assert row["model"] == "claude-3"
def test_uses_execute_write_not_private_api(self, tmp_path):
"""update_session_meta must route through _execute_write, not _conn directly."""
db = _tmp_db(tmp_path)
db.create_session("s4", source="acp")
call_count = [0]
original = db._execute_write
def patched(fn):
call_count[0] += 1
return original(fn)
db._execute_write = patched
db.update_session_meta("s4", json.dumps({"cwd": "."}), model="m")
assert call_count[0] >= 1, (
"update_session_meta must call _execute_write at least once"
)
def test_noop_on_nonexistent_session(self, tmp_path):
"""Updating a non-existent session must not raise."""
db = _tmp_db(tmp_path)
db.update_session_meta("ghost", json.dumps({"cwd": "."}), model=None)
# ---------------------------------------------------------------------------
# AST check: session.py must not access db._lock or db._conn
# ---------------------------------------------------------------------------
class TestNoPrviateDBAccess:
"""_persist() in session.py must not access db._lock or db._conn."""
def test_no_db_private_lock_access(self):
with open("acp_adapter/session.py", encoding="utf-8") as f:
source = f.read()
tree = ast.parse(source)
violations = []
for node in ast.walk(tree):
# Looking for: db._lock or db._conn
if isinstance(node, ast.Attribute):
if isinstance(node.value, ast.Name) and node.value.id == "db":
if node.attr in ("_lock", "_conn"):
violations.append(
f"db.{node.attr} at line {node.lineno}"
)
assert violations == [], (
"session.py accesses private SessionDB internals: "
+ ", ".join(violations)
+ " — use db.update_session_meta() instead"
)
def test_persist_calls_update_session_meta(self):
"""AST check: _persist must call db.update_session_meta()."""
with open("acp_adapter/session.py", encoding="utf-8") as f:
tree = ast.parse(f.read())
found = False
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == "_persist":
for child in ast.walk(node):
if isinstance(child, ast.Call):
func = child.func
if isinstance(func, ast.Attribute):
if func.attr == "update_session_meta":
found = True
break
break
assert found, (
"_persist() must call db.update_session_meta() "
"instead of db._conn.execute() directly"
)
# ---------------------------------------------------------------------------
# Integration: _persist round-trip via SessionManager
# ---------------------------------------------------------------------------
class TestPersistRoundTrip:
"""End-to-end: save a session and verify DB state is correct."""
def test_cwd_persisted_via_update_session_meta(self, tmp_path):
db = _tmp_db(tmp_path)
manager = SessionManager(agent_factory=_mock_agent, db=db)
state = manager.create_session(cwd="/original")
assert db.get_session(state.session_id) is not None
# Simulate cwd change and save
state.cwd = "/updated"
manager.save_session(state.session_id)
row = db.get_session(state.session_id)
mc = json.loads(row["model_config"])
assert mc["cwd"] == "/updated"
def test_model_persisted_via_update_session_meta(self, tmp_path):
db = _tmp_db(tmp_path)
manager = SessionManager(agent_factory=_mock_agent, db=db)
state = manager.create_session()
state.model = "new-model-xyz"
manager.save_session(state.session_id)
row = db.get_session(state.session_id)
assert row["model"] == "new-model-xyz"
def test_existing_model_not_cleared_on_save(self, tmp_path):
"""If state.model is empty, the DB model column must not be overwritten."""
db = _tmp_db(tmp_path)
manager = SessionManager(agent_factory=_mock_agent, db=db)
state = manager.create_session()
# Manually set a model in DB
db.update_session_meta(state.session_id, json.dumps({"cwd": "."}), model="stored-model")
# Now save with empty model
state.model = ""
manager.save_session(state.session_id)
row = db.get_session(state.session_id)
assert row["model"] == "stored-model", (
"COALESCE must preserve the existing model when new value is NULL"
)

View File

@ -98,6 +98,16 @@ class TestIsLocalEndpoint:
def test_container_dns_names(self, url):
assert is_local_endpoint(url) is True
@pytest.mark.parametrize("url", [
"http://ollama:11434",
"http://litellm:4000/v1",
"http://hermes-litellm:8080",
"http://vllm:8000",
])
def test_unqualified_docker_hostnames(self, url):
"""Unqualified hostnames (no dots) are local — Docker Compose, /etc/hosts, etc."""
assert is_local_endpoint(url) is True
@pytest.mark.parametrize("url", [
"https://api.openai.com",
"https://openrouter.ai/api",

View File

@ -16,7 +16,6 @@ import json
import os
import sys
import types
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -1651,48 +1650,6 @@ class TestUserOAuthHelper:
},
)
def test_client_secret_is_shared_across_profiles(self, tmp_path, monkeypatch):
"""The OAuth client secret is host-wide infra: a secret seeded at the
default root by the documented one-time `--client-secret` host step
must be visible to a gateway running under a named profile.
Regression: `_client_secret_path()` used to scope to the active
HERMES_HOME, so a profile gateway reported 'No client credentials
stored on the host' even after the host setup had been run.
"""
root = tmp_path / ".hermes"
profile_home = root / "profiles" / "bot1"
profile_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Seed the secret at the default root, as the host setup does.
secret = root / "google_chat_user_client_secret.json"
secret.write_text("{}", encoding="utf-8")
# Resolve from inside a named profile.
monkeypatch.setenv("HERMES_HOME", str(profile_home))
from plugins.platforms.google_chat.oauth import _client_secret_path
assert _client_secret_path() == secret
assert _client_secret_path().exists()
def test_profile_local_client_secret_takes_precedence(self, tmp_path, monkeypatch):
"""A profile-local secret (separate OAuth app per bot, or a legacy
profile-scoped seed) overrides the host-wide default when present."""
root = tmp_path / ".hermes"
profile_home = root / "profiles" / "bot1"
profile_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(profile_home))
(root / "google_chat_user_client_secret.json").write_text(
"{}", encoding="utf-8"
)
profile_secret = profile_home / "google_chat_user_client_secret.json"
profile_secret.write_text("{}", encoding="utf-8")
from plugins.platforms.google_chat.oauth import _client_secret_path
assert _client_secret_path() == profile_secret
def test_store_client_secret_writes_private_json(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
src = tmp_path / "client_secret.json"

View File

@ -954,6 +954,120 @@ class TestMediaDeliveryDefaultMode:
out = BasePlatformAdapter.filter_local_delivery_paths([str(notes)])
assert out == [str(notes.resolve())]
def test_root_home_deliverable_is_accepted(self, tmp_path, monkeypatch):
"""The motivating bug (#38106): a root-run gateway has ``$HOME=/root``,
which is on the system-prefix denylist. A plain deliverable the agent
produced in its working dir (``/root/work/proposal.docx``) must still
deliver — the home itself is not a credential location.
"""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "root"
workdir = fake_home / "work"
workdir.mkdir(parents=True)
doc = workdir / "proposal.docx"
doc.write_bytes(b"PK\x03\x04")
monkeypatch.setenv("HOME", str(fake_home))
# $HOME is itself on the denied-prefix list, mirroring /root.
monkeypatch.setattr(
"gateway.platforms.base._MEDIA_DELIVERY_DENIED_PREFIXES",
(str(fake_home),),
)
assert (
BasePlatformAdapter.validate_media_delivery_path(str(doc))
== str(doc.resolve())
)
def test_root_home_credential_subdir_still_blocked(self, tmp_path, monkeypatch):
"""The $HOME exception must NOT un-block credential sub-dirs inside
home. ``/root/.ssh/id_rsa`` stays denied because ``~/.ssh`` is a
separate, more-specific denylist entry — even when $HOME is itself a
denied prefix.
"""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "root"
ssh_dir = fake_home / ".ssh"
ssh_dir.mkdir(parents=True)
key = ssh_dir / "id_rsa"
key.write_bytes(b"-----BEGIN OPENSSH PRIVATE KEY-----")
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setattr(
"gateway.platforms.base._MEDIA_DELIVERY_DENIED_PREFIXES",
(str(fake_home),),
)
assert BasePlatformAdapter.validate_media_delivery_path(str(key)) is None
def test_root_home_hermes_env_still_blocked(self, tmp_path, monkeypatch):
"""``~/.hermes/.env`` stays blocked under the $HOME exception — it is a
more-specific denied path, not reachable just because home is allowed.
"""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "root"
hermes_dir = fake_home / ".hermes"
hermes_dir.mkdir(parents=True)
env_file = hermes_dir / ".env"
env_file.write_text("OPENROUTER_API_KEY=sk-...")
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setattr(
"gateway.platforms.base._MEDIA_DELIVERY_DENIED_PREFIXES",
(str(fake_home),),
)
monkeypatch.setattr("gateway.platforms.base._HERMES_HOME", hermes_dir)
assert BasePlatformAdapter.validate_media_delivery_path(str(env_file)) is None
def test_other_users_home_still_blocked_for_nonroot(self, tmp_path, monkeypatch):
"""The exception only un-blocks the *running user's own* home. A
non-root gateway ($HOME=/home/me) must not deliver another user's home
(``/root/...``) — that prefix stays denied because it isn't $HOME.
"""
self._patch_roots(monkeypatch)
my_home = tmp_path / "home" / "me"
my_home.mkdir(parents=True)
other_home = tmp_path / "root"
other_home.mkdir()
other_file = other_home / "secret.docx"
other_file.write_bytes(b"PK\x03\x04")
monkeypatch.setenv("HOME", str(my_home))
# Both my home and the other home are denied prefixes; only my home is
# the running user's $HOME, so the other home must stay blocked.
monkeypatch.setattr(
"gateway.platforms.base._MEDIA_DELIVERY_DENIED_PREFIXES",
(str(my_home), str(other_home)),
)
assert (
BasePlatformAdapter.validate_media_delivery_path(str(other_file)) is None
)
def test_root_home_workdir_symlink_to_credential_blocked(self, tmp_path, monkeypatch):
"""A symlink in the workdir pointing at a credential is rejected on its
resolved target, even under the $HOME exception.
"""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "root"
ssh_dir = fake_home / ".ssh"
ssh_dir.mkdir(parents=True)
key = ssh_dir / "id_rsa"
key.write_bytes(b"-----BEGIN OPENSSH PRIVATE KEY-----")
workdir = fake_home / "work"
workdir.mkdir()
link = workdir / "innocent.pdf"
link.symlink_to(key)
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setattr(
"gateway.platforms.base._MEDIA_DELIVERY_DENIED_PREFIXES",
(str(fake_home),),
)
assert BasePlatformAdapter.validate_media_delivery_path(str(link)) is None
# ---------------------------------------------------------------------------
# should_send_media_as_audio

View File

@ -228,3 +228,54 @@ class TestSessionDbCloseOnShutdown:
flaky_db.close.assert_called_once()
healthy_db.close.assert_called_once()
class TestSessionResetZombieRace:
"""Regression for #28686 — a session_reset racing the in-flight run's
guarded release must not leave a dead agent locking the slot forever.
"""
def test_generation_guard_blocks_then_unconditional_release_evicts(self):
runner = _make_runner()
runner._session_run_generation = {}
key = "agent:main:telegram:private:1"
gen_n = runner._begin_session_run_generation(key)
dead_agent = MagicMock()
runner._running_agents[key] = dead_agent
runner._running_agents_ts[key] = 1.0
runner._busy_ack_ts[key] = 1.0
# session_reset bumps the generation while gen-N is still in flight.
runner._invalidate_session_run_generation(key, reason="session_reset")
# gen-N's own guarded release is correctly blocked — slot would be a
# zombie if nothing else cleared it (the pre-fix behaviour).
assert runner._release_running_agent_state(key, run_generation=gen_n) is False
assert runner._running_agents.get(key) is dead_agent
# The fix: unconditional release (no run_generation) always clears it.
assert runner._release_running_agent_state(key) is True
assert key not in runner._running_agents
assert key not in runner._running_agents_ts
assert key not in runner._busy_ack_ts
def test_normal_completion_is_not_evicted_by_outer_release(self):
"""Guarded release with the current generation succeeds; the outer
unconditional release that follows is a harmless no-op.
"""
runner = _make_runner()
runner._session_run_generation = {}
key = "agent:main:telegram:private:2"
gen = runner._begin_session_run_generation(key)
runner._running_agents[key] = MagicMock()
runner._running_agents_ts[key] = 1.0
runner._busy_ack_ts[key] = 1.0
assert runner._release_running_agent_state(key, run_generation=gen) is True
assert key not in runner._running_agents
# Outer finally runs the unconditional release after — nothing stranded.
assert runner._release_running_agent_state(key) is True
assert key not in runner._running_agents_ts
assert key not in runner._busy_ack_ts

View File

@ -17,18 +17,12 @@ from pathlib import Path
import pytest
def _make_auth_store(
pool: dict | None = None,
providers: dict | None = None,
active_provider: str | None = None,
) -> dict:
def _make_auth_store(pool: dict | None = None, providers: dict | None = None) -> dict:
store: dict = {"version": 1}
if pool is not None:
store["credential_pool"] = pool
if providers is not None:
store["providers"] = providers
if active_provider is not None:
store["active_provider"] = active_provider
return store
@ -456,101 +450,3 @@ def test_write_credential_pool_targets_profile_not_global(profile_env):
# Subsequent read returns profile (shadows global).
assert [e["id"] for e in read_credential_pool("openrouter")] == ["prof-new"]
# ---------------------------------------------------------------------------
# get_active_provider — global active_provider fallback (issue #18594 follow-up)
#
# The per-provider state/pool fallbacks let a profile *read* a provider that
# was only authenticated at the global root, but ``resolve_provider()`` picks
# the ``auto`` provider from ``active_provider`` — which only ever read the
# profile store. A named profile running ``model.provider: auto`` could see
# the global Nous login (``get_provider_auth_state('nous')`` succeeds) yet
# still fail to select it. These pin the active_provider shadowing so the
# selection mirrors the state/pool fallbacks: profile wins when present, fall
# back to global when the profile never chose its own provider.
# ---------------------------------------------------------------------------
def test_active_provider_falls_back_to_global(profile_env):
"""An empty profile inherits the global-root active_provider selection."""
from hermes_cli.auth import get_active_provider
_write(profile_env["global"] / "auth.json", _make_auth_store(
providers={"nous": {"access_token": "nous-global"}},
active_provider="nous",
))
_write(profile_env["profile"] / "auth.json", _make_auth_store(providers={}))
assert get_active_provider() == "nous"
def test_active_provider_profile_wins_over_global(profile_env):
"""A profile that selected its own provider shadows the global selection."""
from hermes_cli.auth import get_active_provider
_write(profile_env["global"] / "auth.json", _make_auth_store(
providers={"nous": {"access_token": "nous-global"}},
active_provider="nous",
))
_write(profile_env["profile"] / "auth.json", _make_auth_store(
providers={"anthropic": {"access_token": "ant-profile"}},
active_provider="anthropic",
))
assert get_active_provider() == "anthropic"
def test_active_provider_none_when_neither_has_it(profile_env):
"""No selection anywhere stays None — the fallback must not invent one."""
from hermes_cli.auth import get_active_provider
_write(profile_env["global"] / "auth.json", _make_auth_store(providers={}))
_write(profile_env["profile"] / "auth.json", _make_auth_store(providers={}))
assert get_active_provider() is None
def test_active_provider_classic_mode_reads_profile(tmp_path, monkeypatch):
"""In classic mode there is no global to fall back to; behavior is unchanged."""
fake_home = tmp_path / "home"
fake_home.mkdir()
monkeypatch.setattr(Path, "home", lambda: fake_home)
hermes_home = tmp_path / "classic"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
_write(hermes_home / "auth.json", _make_auth_store(
providers={"nous": {"access_token": "classic-token"}},
active_provider="nous",
))
from hermes_cli.auth import get_active_provider
assert get_active_provider() == "nous"
def test_resolve_provider_uses_global_active_provider(profile_env, monkeypatch):
"""resolve_provider('auto') honors the global-root active_provider.
This is the user-visible contract: a named profile with no provider entry
of its own, started with ``model.provider: auto`` while a valid login
exists at the global root, resolves that provider instead of raising
``No inference provider configured``. ``get_auth_status`` is stubbed so the
login check stays offline (no Nous token refresh / network).
"""
import hermes_cli.auth as auth
_write(profile_env["global"] / "auth.json", _make_auth_store(
providers={"nous": {"access_token": "nous-global"}},
active_provider="nous",
))
_write(profile_env["profile"] / "auth.json", _make_auth_store(providers={}))
monkeypatch.setattr(
auth,
"get_auth_status",
lambda provider=None: {"logged_in": True, "provider": provider},
)
assert auth.resolve_provider("auto") == "nous"

View File

@ -8,6 +8,7 @@ from types import SimpleNamespace
import pytest
pwd = pytest.importorskip("pwd")
grp = pytest.importorskip("grp")
import hermes_cli.gateway as gateway_cli
from gateway import status
@ -1331,7 +1332,6 @@ class TestSystemServiceIdentityRootHandling:
def test_explicit_root_is_allowed(self, monkeypatch):
"""When root is explicitly passed via --run-as-user root, allow it."""
import grp
root_info = pwd.getpwnam("root")
root_group = grp.getgrgid(root_info.pw_gid).gr_name

View File

@ -108,121 +108,108 @@ class TestEnsureUv:
# ---------------------------------------------------------------------------
class TestRebuildVenv:
def test_removes_old_venv_and_creates_new(self, tmp_path):
def test_moves_old_venv_aside_and_creates_new(self, tmp_path):
"""The old venv is moved aside to <venv>.old (never rmtree'd in place),
uv is invoked with --clear, the moved-aside backup is removed on
success, and the rebuilt interpreter is reported."""
venv_dir = tmp_path / "venv"
venv_dir.mkdir()
(venv_dir / "old_file").write_text("stale")
uv_bin = str(tmp_path / "bin" / "uv")
call_log: list[list[str]] = []
def fake_run(cmd, **kwargs):
m = MagicMock(returncode=0)
if cmd[1] == "venv":
# Simulate uv creating the venv dir
venv_dir.mkdir(exist_ok=True)
bin_dir = venv_dir / "bin"
call_log.append(list(cmd))
m = MagicMock(returncode=0, stderr="", stdout="")
if len(cmd) >= 2 and cmd[1] == "venv":
# Simulate uv creating the venv dir with a python interpreter
bin_dir = venv_dir / ("Scripts" if os.name == "nt" else "bin")
bin_dir.mkdir(parents=True, exist_ok=True)
(bin_dir / "python").write_text("#!/bin/sh\necho Python 3.11.0")
python_name = "python.exe" if os.name == "nt" else "python"
(bin_dir / python_name).write_text("#!/bin/sh\necho Python 3.11.0")
elif "--version" in cmd:
m.stdout = "Python 3.11.0"
return m
with patch("hermes_cli.managed_uv.subprocess.run", side_effect=fake_run), \
patch("hermes_cli.managed_uv.shutil.rmtree") as mock_rmtree:
with patch("hermes_cli.managed_uv.subprocess.run", side_effect=fake_run):
from hermes_cli.managed_uv import rebuild_venv
result = rebuild_venv(uv_bin, venv_dir)
assert result is True
mock_rmtree.assert_called_once_with(venv_dir, ignore_errors=True)
assert result is True
# uv venv was invoked exactly once, always with --clear.
venv_calls = [c for c in call_log if len(c) >= 2 and c[1] == "venv"]
assert len(venv_calls) == 1, f"expected 1 venv call, got {venv_calls}"
assert "--clear" in venv_calls[0]
# The moved-aside backup is cleaned up after a successful rebuild.
assert not (tmp_path / "venv.old").exists()
def test_aborts_without_deleting_when_venv_in_use(self, tmp_path):
"""If os.replace fails (Windows file lock — venv in use), we must abort
cleanly WITHOUT deleting the venv and WITHOUT invoking uv."""
venv_dir = tmp_path / "venv"
venv_dir.mkdir()
(venv_dir / "locked") .write_text("held open")
uv_bin = str(tmp_path / "bin" / "uv")
call_log: list[list[str]] = []
def fake_run(cmd, **kwargs):
call_log.append(list(cmd))
return MagicMock(returncode=0, stderr="", stdout="")
with patch("hermes_cli.managed_uv.subprocess.run", side_effect=fake_run), \
patch("hermes_cli.managed_uv.os.replace", side_effect=OSError("in use")):
from hermes_cli.managed_uv import rebuild_venv
result = rebuild_venv(uv_bin, venv_dir)
assert result is False
# venv left fully intact, uv never invoked.
assert venv_dir.exists() and (venv_dir / "locked").exists()
assert [c for c in call_log if len(c) >= 2 and c[1] == "venv"] == []
def test_restores_backup_when_rebuild_fails(self, tmp_path):
"""If uv venv exits non-zero, the moved-aside venv is restored so we
never leave Hermes with no venv at all."""
venv_dir = tmp_path / "venv"
venv_dir.mkdir()
(venv_dir / "marker").write_text("original")
uv_bin = str(tmp_path / "bin" / "uv")
def fake_run(cmd, **kwargs):
return MagicMock(returncode=1, stderr="boom", stdout="")
with patch("hermes_cli.managed_uv.subprocess.run", side_effect=fake_run):
from hermes_cli.managed_uv import rebuild_venv
result = rebuild_venv(uv_bin, venv_dir)
assert result is False
# Original venv restored from the .old backup.
assert venv_dir.exists() and (venv_dir / "marker").read_text() == "original"
assert not (tmp_path / "venv.old").exists()
def test_rebuild_failure_returns_false(self, tmp_path):
venv_dir = tmp_path / "venv"
uv_bin = str(tmp_path / "bin" / "uv")
with patch("hermes_cli.managed_uv.subprocess.run") as mock_run, \
patch("hermes_cli.managed_uv.shutil.rmtree"):
with patch("hermes_cli.managed_uv.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stderr="nope")
from hermes_cli.managed_uv import rebuild_venv
result = rebuild_venv(uv_bin, venv_dir)
assert result is False
def test_retries_with_clear_when_dir_already_exists(self, tmp_path):
"""On Windows, rmtree can silently fail when an open handle holds a
file in the venv (running hermes.exe, gateway, AV scanner). uv then
refuses with ``Caused by: A directory already exists at: venv``.
Make sure we don't give up — retry with ``--clear`` to force uv past
the stale directory and rebuild successfully."""
venv_dir = tmp_path / "venv"
venv_dir.mkdir()
(venv_dir / "stale_open_handle").write_text("rmtree couldn't delete me")
uv_bin = str(tmp_path / "bin" / "uv")
call_log: list[list[str]] = []
def fake_run(cmd, **kwargs):
call_log.append(list(cmd))
m = MagicMock()
if cmd[1] == "venv" and "--clear" not in cmd:
# First attempt: uv refuses because dir still exists
m.returncode = 1
m.stderr = (
"error: Failed to create virtual environment\n"
" Caused by: A directory already exists at: venv\n"
"hint: Use the `--clear` flag or set `UV_VENV_CLEAR=1` to replace the existing directory\n"
)
m.stdout = ""
return m
if cmd[1] == "venv" and "--clear" in cmd:
# Retry: succeeds. Simulate uv writing the python shim.
m.returncode = 0
m.stderr = ""
m.stdout = ""
bin_dir = venv_dir / ("Scripts" if os.name == "nt" else "bin")
bin_dir.mkdir(parents=True, exist_ok=True)
python_name = "python.exe" if os.name == "nt" else "python"
(bin_dir / python_name).write_text("#!/bin/sh\necho Python 3.11.0")
return m
if "--version" in cmd:
m.returncode = 0
m.stdout = "Python 3.11.0"
m.stderr = ""
return m
m.returncode = 0
return m
with patch("hermes_cli.managed_uv.subprocess.run", side_effect=fake_run), \
patch("hermes_cli.managed_uv.shutil.rmtree"):
from hermes_cli.managed_uv import rebuild_venv
result = rebuild_venv(uv_bin, venv_dir)
assert result is True, "rebuild should succeed after --clear retry"
# We expect exactly two ``uv venv`` calls: one without --clear, one with.
venv_calls = [c for c in call_log if len(c) >= 2 and c[1] == "venv"]
assert len(venv_calls) == 2, f"expected 2 venv calls, got {venv_calls}"
assert "--clear" not in venv_calls[0], "first call should not pass --clear"
assert "--clear" in venv_calls[1], "retry must pass --clear"
def test_does_not_retry_when_first_failure_is_not_dir_exists(self, tmp_path):
"""If uv venv fails for some other reason (e.g. interpreter download
failed, disk full), we should NOT silently retry with --clear —
that would mask a real problem. Just surface the original failure."""
def test_rebuild_success_without_python_returns_false(self, tmp_path):
"""uv can exit 0 yet leave no interpreter; that must not count as success
(guard adapted from #38511)."""
venv_dir = tmp_path / "venv"
uv_bin = str(tmp_path / "bin" / "uv")
call_log: list[list[str]] = []
def fake_run(cmd, **kwargs):
call_log.append(list(cmd))
m = MagicMock(returncode=1, stderr="error: No space left on device", stdout="")
return m
with patch("hermes_cli.managed_uv.subprocess.run", side_effect=fake_run), \
patch("hermes_cli.managed_uv.shutil.rmtree"):
with patch("hermes_cli.managed_uv.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
from hermes_cli.managed_uv import rebuild_venv
result = rebuild_venv(uv_bin, venv_dir)
assert result is False
venv_calls = [c for c in call_log if len(c) >= 2 and c[1] == "venv"]
assert len(venv_calls) == 1, "should not retry on non-dir-exists failures"
assert "--clear" not in venv_calls[0]
assert result is False
# Returned before the `python --version` probe ran (only the uv venv call).
assert mock_run.call_count == 1
# ---------------------------------------------------------------------------

View File

@ -411,7 +411,7 @@ class TestUnionWithPortalFreeRecommendations:
}
def test_adds_portal_free_model_missing_from_curated(self):
"""A Portal-advertised free model not in curated is prepended + priced free."""
"""A Portal-advertised free model not in curated is appended + priced free."""
curated = ["anthropic/claude-opus-4.6"]
pricing = {"anthropic/claude-opus-4.6": self._PAID}
with patch(
@ -420,8 +420,9 @@ class TestUnionWithPortalFreeRecommendations:
):
ids, p = union_with_portal_free_recommendations(curated, pricing, "")
assert ids[0] == "qwen/qwen3.6-plus" # prepended
assert "anthropic/claude-opus-4.6" in ids
# Curated ("HA") models stay first; Portal-only picks follow.
assert ids[0] == "anthropic/claude-opus-4.6"
assert ids[-1] == "qwen/qwen3.6-plus" # appended
# Synthetic free pricing entry created
assert p["qwen/qwen3.6-plus"] == self._FREE
# Existing pricing untouched
@ -509,7 +510,7 @@ class TestUnionWithPortalFreeRecommendations:
},
):
ids, p = union_with_portal_free_recommendations(curated, pricing, "")
assert ids == ["qwen/qwen3.6-plus", "a"]
assert ids == ["a", "qwen/qwen3.6-plus"]
assert p["qwen/qwen3.6-plus"] == self._FREE
@ -535,7 +536,7 @@ class TestUnionWithPortalPaidRecommendations:
}
def test_adds_portal_paid_model_missing_from_curated(self):
"""A Portal-advertised paid model not in curated is prepended."""
"""A Portal-advertised paid model not in curated is appended."""
curated = ["anthropic/claude-opus-4.6"]
pricing = {"anthropic/claude-opus-4.6": self._PAID}
with patch(
@ -544,8 +545,9 @@ class TestUnionWithPortalPaidRecommendations:
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids[0] == "openai/gpt-5.4" # prepended
assert "anthropic/claude-opus-4.6" in ids
# Curated ("HA") models stay first; Portal-only picks follow.
assert ids[0] == "anthropic/claude-opus-4.6"
assert ids[-1] == "openai/gpt-5.4" # appended
# Existing pricing untouched
assert p["anthropic/claude-opus-4.6"] == self._PAID
@ -634,12 +636,12 @@ class TestUnionWithPortalPaidRecommendations:
},
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == ["openai/gpt-5.4", "a"]
assert ids == ["a", "openai/gpt-5.4"]
# No synthetic entry — pricing is untouched.
assert "openai/gpt-5.4" not in p
def test_preserves_relative_order_of_new_paid_models(self):
"""Multiple new paid models are prepended in payload order."""
"""Multiple new paid models are appended in payload order, after curated."""
curated = ["anthropic/claude-opus-4.6"]
pricing = {"anthropic/claude-opus-4.6": self._PAID}
with patch(
@ -648,9 +650,9 @@ class TestUnionWithPortalPaidRecommendations:
):
ids, _ = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == [
"anthropic/claude-opus-4.6",
"openai/gpt-5.4",
"openai/gpt-5.5",
"anthropic/claude-opus-4.6",
]

View File

@ -21,6 +21,7 @@ from hermes_cli.tools_config import (
_toolset_needs_configuration_prompt,
CONFIGURABLE_TOOLSETS,
TOOL_CATEGORIES,
gui_toolset_label,
_visible_providers,
tools_command,
)
@ -79,6 +80,13 @@ def test_get_platform_tools_uses_default_when_platform_not_configured():
assert enabled.isdisjoint(_DEFAULT_OFF_TOOLSETS)
def test_gui_toolset_label_strips_leading_emoji():
assert gui_toolset_label("🔍 Web Search & Scraping") == "Web Search & Scraping"
assert gui_toolset_label("👁️ Vision / Image Analysis") == "Vision / Image Analysis"
assert gui_toolset_label("🔌 My Plugin") == "My Plugin"
assert gui_toolset_label("Terminal & Processes") == "Terminal & Processes"
def test_configurable_toolsets_include_messaging():
assert any(ts_key == "messaging" for ts_key, _, _ in CONFIGURABLE_TOOLSETS)

View File

@ -423,6 +423,41 @@ def test_cmd_update_succeeds_with_extras(monkeypatch, tmp_path):
assert ".[all]" in install_cmds[0]
def test_cmd_update_aborts_when_fresh_managed_uv_rebuild_fails(monkeypatch, tmp_path):
"""A failed fresh managed-uv venv rebuild must not continue into pip install
(guard adapted from #38511)."""
_setup_update_mocks(monkeypatch, tmp_path)
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/uv" if name == "uv" else None)
monkeypatch.setattr(hermes_main, "_is_termux_env", lambda env=None: False)
recorded = []
def fake_run(cmd, **kwargs):
recorded.append(cmd)
# Tolerant matching: the update flow's exact git invocations vary by
# checkout, so key off the verb. Branch detection must return a real name
# and rev-list a parseable count, or the flow aborts early before it ever
# reaches the venv rebuild this test exercises.
if isinstance(cmd, (list, tuple)) and cmd and cmd[0] == "git":
if "rev-parse" in cmd:
return SimpleNamespace(stdout="main\n", stderr="", returncode=0)
if "rev-list" in cmd:
return SimpleNamespace(stdout="1\n", stderr="", returncode=0)
if "pull" in cmd:
return SimpleNamespace(stdout="Updating\n", stderr="", returncode=0)
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(hermes_main.subprocess, "run", fake_run)
with patch("hermes_cli.managed_uv.ensure_uv", return_value=("/usr/bin/uv", True)), \
patch("hermes_cli.managed_uv.rebuild_venv", return_value=False), \
pytest.raises(RuntimeError, match="venv rebuild failed"):
hermes_main.cmd_update(SimpleNamespace())
install_cmds = [c for c in recorded if "pip" in c and "install" in c]
assert install_cmds == []
def test_install_with_optional_fallback_honors_custom_group(monkeypatch):
"""Termux update path should target .[termux-all] when requested."""
calls = []

View File

@ -815,6 +815,24 @@ class TestWebServerEndpoints:
for key, info in data.items():
assert info["channel_managed"] is (key in channel_keys)
def test_platform_scoped_messaging_env_vars_are_channel_managed(self):
from hermes_cli.web_server import (
_MESSAGING_KEYS_PAGE_KEYS,
_build_catalog_entry,
_channel_managed_env_keys,
)
discord = _build_catalog_entry("discord")
assert "DISCORD_HOME_CHANNEL" in discord["env_vars"]
assert "DISCORD_ALLOW_ALL_USERS" in discord["env_vars"]
managed = _channel_managed_env_keys()
assert "DISCORD_HOME_CHANNEL" in managed
assert "BLUEBUBBLES_ALLOW_ALL_USERS" in managed
assert "MATTERMOST_ALLOW_ALL_USERS" in managed
assert "GATEWAY_PROXY_URL" not in managed
assert "GATEWAY_PROXY_URL" in _MESSAGING_KEYS_PAGE_KEYS
def test_reveal_env_var(self, tmp_path):
"""POST /api/env/reveal should return the real unredacted value."""
from hermes_cli.config import save_env_value
@ -974,6 +992,157 @@ class TestWebServerEndpoints:
assert data["state"] == "not_configured"
assert "DISCORD_BOT_TOKEN" in data["message"]
def test_telegram_onboarding_start_strips_poll_token(self, monkeypatch):
import hermes_cli.web_server as ws
with ws._telegram_onboarding_lock:
ws._telegram_onboarding_pairings.clear()
calls = []
def fake_request(method, path, *, body=None, bearer_token=None):
calls.append((method, path, body, bearer_token))
return {
"pairing_id": "pair123",
"poll_token": "poll-secret",
"suggested_username": "hermes_pair123_bot",
"deep_link": "https://t.me/newbot/HermesSetupBot/hermes_pair123_bot",
"qr_payload": "https://t.me/newbot/HermesSetupBot/hermes_pair123_bot",
"expires_at": "2027-05-18T00:00:00.000Z",
}
monkeypatch.setattr(ws, "_telegram_onboarding_request_sync", fake_request)
resp = self.client.post(
"/api/messaging/telegram/onboarding/start",
json={"bot_name": "Hosted Hermes"},
)
assert resp.status_code == 200
data = resp.json()
assert data["pairing_id"] == "pair123"
assert "poll_token" not in data
assert calls == [
(
"POST",
"/v1/telegram/pairings",
{"bot_name": "Hosted Hermes"},
None,
)
]
def test_telegram_onboarding_ready_and_apply_never_returns_bot_token(self, monkeypatch):
import hermes_cli.web_server as ws
from hermes_cli.config import load_config, load_env
with ws._telegram_onboarding_lock:
ws._telegram_onboarding_pairings.clear()
def fake_request(method, path, *, body=None, bearer_token=None):
if method == "POST":
return {
"pairing_id": "pair-ready",
"poll_token": "poll-secret",
"suggested_username": "hermes_pair_ready_bot",
"deep_link": "https://t.me/newbot/HermesSetupBot/hermes_pair_ready_bot",
"qr_payload": "https://t.me/newbot/HermesSetupBot/hermes_pair_ready_bot",
"expires_at": "2027-05-18T00:00:00.000Z",
}
assert method == "GET"
assert path == "/v1/telegram/pairings/pair-ready"
assert bearer_token == "poll-secret"
return {
"status": "ready",
"bot_username": "hermes_pair_ready_bot",
"owner_user_id": 123456789,
"token": "123456:SECRET",
}
monkeypatch.setattr(ws, "_telegram_onboarding_request_sync", fake_request)
start = self.client.post("/api/messaging/telegram/onboarding/start", json={})
assert start.status_code == 200
ready = self.client.get("/api/messaging/telegram/onboarding/pair-ready")
assert ready.status_code == 200
ready_data = ready.json()
assert ready_data["status"] == "ready"
assert ready_data["owner_user_id"] == "123456789"
assert "token" not in ready_data
applied = self.client.post(
"/api/messaging/telegram/onboarding/pair-ready/apply",
json={"allowed_user_ids": ["123456789", "123456789"]},
)
assert applied.status_code == 200
applied_data = applied.json()
assert applied_data == {
"ok": True,
"platform": "telegram",
"bot_username": "hermes_pair_ready_bot",
"needs_restart": True,
}
env = load_env()
assert env["TELEGRAM_BOT_TOKEN"] == "123456:SECRET"
assert env["TELEGRAM_ALLOWED_USERS"] == "123456789"
assert load_config()["platforms"]["telegram"]["enabled"] is True
def test_telegram_onboarding_apply_requires_ready_pairing(self, monkeypatch):
import hermes_cli.web_server as ws
with ws._telegram_onboarding_lock:
ws._telegram_onboarding_pairings.clear()
def fake_request(method, path, *, body=None, bearer_token=None):
return {
"pairing_id": "pair-waiting",
"poll_token": "poll-secret",
"suggested_username": "hermes_pair_waiting_bot",
"deep_link": "https://t.me/newbot/HermesSetupBot/hermes_pair_waiting_bot",
"qr_payload": "https://t.me/newbot/HermesSetupBot/hermes_pair_waiting_bot",
"expires_at": "2027-05-18T00:00:00.000Z",
}
monkeypatch.setattr(ws, "_telegram_onboarding_request_sync", fake_request)
start = self.client.post("/api/messaging/telegram/onboarding/start", json={})
assert start.status_code == 200
resp = self.client.post(
"/api/messaging/telegram/onboarding/pair-waiting/apply",
json={"allowed_user_ids": ["123456789"]},
)
assert resp.status_code == 409
assert "not ready" in resp.json()["detail"]
def test_telegram_onboarding_cancel_clears_local_session(self, monkeypatch):
import hermes_cli.web_server as ws
with ws._telegram_onboarding_lock:
ws._telegram_onboarding_pairings.clear()
def fake_request(method, path, *, body=None, bearer_token=None):
return {
"pairing_id": "pair-cancel",
"poll_token": "poll-secret",
"suggested_username": "hermes_pair_cancel_bot",
"deep_link": "https://t.me/newbot/HermesSetupBot/hermes_pair_cancel_bot",
"qr_payload": "https://t.me/newbot/HermesSetupBot/hermes_pair_cancel_bot",
"expires_at": "2027-05-18T00:00:00.000Z",
}
monkeypatch.setattr(ws, "_telegram_onboarding_request_sync", fake_request)
start = self.client.post("/api/messaging/telegram/onboarding/start", json={})
assert start.status_code == 200
cancel = self.client.delete("/api/messaging/telegram/onboarding/pair-cancel")
assert cancel.status_code == 200
status = self.client.get("/api/messaging/telegram/onboarding/pair-cancel")
assert status.status_code == 404
def test_session_token_endpoint_removed(self):
"""GET /api/auth/session-token should no longer exist (token injected via HTML)."""
resp = self.client.get("/api/auth/session-token")
@ -1967,7 +2136,7 @@ class TestNewEndpoints:
assert resp.json() == [
{
"name": "web",
"label": "🔍 Web Search & Scraping",
"label": "Web Search & Scraping",
"description": "web_search, web_extract",
"enabled": True,
"available": True,
@ -1976,7 +2145,7 @@ class TestNewEndpoints:
},
{
"name": "skills",
"label": "📚 Skills",
"label": "Skills",
"description": "list, view, manage",
"enabled": True,
"available": True,
@ -1985,7 +2154,7 @@ class TestNewEndpoints:
},
{
"name": "memory",
"label": "💾 Memory",
"label": "Memory",
"description": "persistent memory across sessions",
"enabled": False,
"available": False,
@ -4015,4 +4184,3 @@ class TestValidateProviderCredential:
def test_empty_value_rejected(self):
data = self._post("OPENAI_API_KEY", " ").json()
assert data["ok"] is False

View File

@ -0,0 +1,90 @@
import asyncio
from hermes_cli import web_server
class _FakeSessionDB:
"""Fake backing the /api/sessions/search endpoint.
The endpoint surfaces direct session-id matches first, then FTS message
matches, deduping both by compression lineage root. This fake has no
compression chains (get_session returns no parent), so each session is its
own lineage root.
"""
closed = False
def search_sessions_by_id(self, query, limit=20, include_archived=True):
assert query == "20260603"
assert include_archived is True
return [
{
"id": "20260603_090200_exact",
"preview": "ID match preview",
"source": "cli",
"model": "claude",
"started_at": 100,
}
]
def search_messages(self, query, limit=20):
assert query == "20260603*"
return [
{
"session_id": "20260603_090200_exact",
"snippet": "duplicate content hit should not replace ID hit",
"role": "user",
"source": "cli",
"model": "claude",
"session_started": 100,
},
{
"session_id": "content_session",
"snippet": "content hit",
"role": "assistant",
"source": "desktop",
"model": "gpt",
"session_started": 200,
},
]
def get_session(self, session_id):
# No compression chains in this fixture — every session is its own root.
return {"id": session_id, "parent_session_id": None}
def get_compression_tip(self, session_id):
return session_id
def close(self):
self.closed = True
def test_desktop_session_search_merges_id_matches_before_content_matches(monkeypatch):
monkeypatch.setattr("hermes_state.SessionDB", _FakeSessionDB)
response = asyncio.run(web_server.search_sessions(q="20260603", limit=2))
# ID match surfaces first; the content hit on the SAME session is deduped
# by lineage root (not double-listed); the unrelated content hit follows.
assert response == {
"results": [
{
"session_id": "20260603_090200_exact",
"lineage_root": "20260603_090200_exact",
"snippet": "ID match preview",
"role": None,
"source": "cli",
"model": "claude",
"session_started": 100,
},
{
"session_id": "content_session",
"lineage_root": "content_session",
"snippet": "content hit",
"role": "assistant",
"source": "desktop",
"model": "gpt",
"session_started": 200,
},
]
}

View File

@ -231,3 +231,53 @@ class TestOpenVikingBrowse:
"/api/v1/fs/ls",
{"uri": "viking://user/hermes"},
)]
class TestOpenVikingMemoryUriBuilder:
"""Regression tests for _build_memory_uri — fixes #36969.
Before the fix the URI omitted /agent/{agent}/, causing all agents
under the same user to share the same memory namespace.
"""
def _make_provider(self, user="alice", agent="coder"):
p = OpenVikingMemoryProvider.__new__(OpenVikingMemoryProvider)
p._user = user
p._agent = agent
return p
def test_uri_layout_includes_agent_segment(self):
"""URI must contain /agent/{agent}/ between user and memories."""
p = self._make_provider(user="alice", agent="coder")
uri = p._build_memory_uri("preferences")
assert uri.startswith("viking://user/alice/agent/coder/memories/preferences/mem_")
assert uri.endswith(".md")
def test_uri_uses_configured_agent_not_default(self):
"""_agent value must be interpolated — not hardcoded to 'hermes'."""
p = self._make_provider(user="alice", agent="research-bot")
uri = p._build_memory_uri("entities")
assert "/agent/research-bot/" in uri
assert "/agent/hermes/" not in uri
def test_uri_slug_is_twelve_hex_chars_and_unique(self):
"""Slug must be 12 hex chars and differ between calls."""
import re
p = self._make_provider()
uri1 = p._build_memory_uri("preferences")
uri2 = p._build_memory_uri("preferences")
slug1 = uri1.split("/mem_")[1].replace(".md", "")
slug2 = uri2.split("/mem_")[1].replace(".md", "")
assert re.fullmatch(r"[0-9a-f]{12}", slug1)
assert re.fullmatch(r"[0-9a-f]{12}", slug2)
assert slug1 != slug2
def test_uri_subdir_placed_correctly_for_all_categories(self):
"""All five category subdirs must appear between memories/ and slug."""
p = self._make_provider(user="u", agent="a")
subdirs = ["preferences", "entities", "events", "cases", "patterns"]
for subdir in subdirs:
uri = p._build_memory_uri(subdir)
assert f"/memories/{subdir}/mem_" in uri, (
f"subdir '{subdir}' not placed correctly in URI: {uri}"
)

View File

@ -170,6 +170,135 @@ class TestGuessCategory:
assert dg.guess_category(p) is None
class TestStaleCronEntryMigration:
"""Regression tests for #37721 — stale cron-output entries in tracked.json."""
def test_quick_skips_stale_cron_output_for_jobs_json(self, _isolate_env):
"""A stale tracked.json entry with category="cron-output" for
cron/jobs.json must NOT be deleted by quick().
This is the exact scenario from #37721: an old tracked.json has
{"path": ".../cron/jobs.json", "category": "cron-output"} which
would pass the delete filter but must be skipped because
guess_category() now returns None for non-output cron paths.
"""
dg = _load_lib()
cron_dir = _isolate_env / "cron"
cron_dir.mkdir()
jobs_json = cron_dir / "jobs.json"
jobs_json.write_text('{"jobs": []}')
# Simulate a stale tracked.json entry from before #34840 by
# directly writing the tracked file (track() would reject it).
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
tracked_file.parent.mkdir(parents=True, exist_ok=True)
tracked_file.write_text(json.dumps([{
"path": str(jobs_json),
"category": "cron-output",
"timestamp": "2025-01-01T00:00:00+00:00", # very old
"size": 123,
}]))
summary = dg.quick()
assert summary["deleted"] == 0, "cron/jobs.json must not be deleted"
assert jobs_json.exists(), "jobs.json must still exist"
# The stale entry should have been dropped from tracking.
remaining = json.loads(tracked_file.read_text())
assert len(remaining) == 0
def test_quick_skips_stale_cron_output_for_cron_dir(self, _isolate_env):
"""Stale entry for the cron/ directory itself must not be deleted."""
dg = _load_lib()
cron_dir = _isolate_env / "cron"
cron_dir.mkdir()
output_dir = cron_dir / "output"
output_dir.mkdir()
(output_dir / "run.md").write_text("x")
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
tracked_file.parent.mkdir(parents=True, exist_ok=True)
tracked_file.write_text(json.dumps([{
"path": str(cron_dir),
"category": "cron-output",
"timestamp": "2025-01-01T00:00:00+00:00",
"size": 0,
}]))
summary = dg.quick()
assert summary["deleted"] == 0, "cron/ dir must not be deleted"
assert cron_dir.exists()
def test_quick_skips_protected_cron_paths_defense_in_depth(self, _isolate_env):
"""Defense-in-depth: even if guess_category returned cron-output
(hypothetically), protected cron paths are never deleted."""
dg = _load_lib()
cron_dir = _isolate_env / "cron"
cron_dir.mkdir()
tick_lock = cron_dir / ".tick.lock"
tick_lock.write_text("")
# Manually inject a stale entry with "test" category (would normally
# be auto-deleted) — the protected path guard must still block it.
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
tracked_file.parent.mkdir(parents=True, exist_ok=True)
tracked_file.write_text(json.dumps([{
"path": str(tick_lock),
"category": "test",
"timestamp": "2025-01-01T00:00:00+00:00",
"size": 0,
}]))
summary = dg.quick()
assert summary["deleted"] == 0, ".tick.lock must not be deleted"
assert tick_lock.exists()
def test_dry_run_omits_stale_cron_output(self, _isolate_env):
"""dry_run() should also skip stale cron-output entries."""
dg = _load_lib()
cron_dir = _isolate_env / "cron"
cron_dir.mkdir()
jobs_json = cron_dir / "jobs.json"
jobs_json.write_text("[]")
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
tracked_file.parent.mkdir(parents=True, exist_ok=True)
tracked_file.write_text(json.dumps([{
"path": str(jobs_json),
"category": "cron-output",
"timestamp": "2025-01-01T00:00:00+00:00",
"size": 123,
}]))
auto, prompt = dg.dry_run()
assert len(auto) == 0, "stale cron-output for jobs.json must not appear"
assert len(prompt) == 0
def test_legitimate_cron_output_still_deleted(self, _isolate_env):
"""A valid cron-output entry under cron/output/ must still be deleted."""
dg = _load_lib()
output_dir = _isolate_env / "cron" / "output" / "job_1"
output_dir.mkdir(parents=True)
run_md = output_dir / "run.md"
run_md.write_text("x")
# Old enough to be deleted (>14 days)
from datetime import datetime, timezone, timedelta
old_ts = (datetime.now(timezone.utc) - timedelta(days=20)).isoformat()
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
tracked_file.parent.mkdir(parents=True, exist_ok=True)
tracked_file.write_text(json.dumps([{
"path": str(run_md),
"category": "cron-output",
"timestamp": old_ts,
"size": 10,
}]))
summary = dg.quick()
assert summary["deleted"] == 1, "valid old cron-output should be deleted"
assert not run_md.exists()
class TestTrackForgetQuick:
def test_track_then_quick_deletes_test(self, _isolate_env):
dg = _load_lib()

View File

@ -2716,6 +2716,44 @@ class TestListSessionsRich:
ids = [s["id"] for s in sessions]
assert "branch" in ids, "Branch session should be visible in default list"
def test_branch_session_visible_after_parent_reopen_and_reend(self, db):
"""Branch sessions stay visible after the parent is reopened and re-ended.
Regression for issue #20856: /branch (aka /fork) sessions vanished from
/resume and /sessions once the parent was reopened (e.g. resumed) and
re-ended with a different end_reason — tui_shutdown overwriting
'branched' — which broke the legacy end_reason heuristic. The stable
_branched_from marker in model_config keeps them visible.
"""
import json as _json
db.create_session("parent", "cli")
db.end_session("parent", "branched")
db.create_session(
"branch",
"cli",
model_config={"_branched_from": "parent"},
parent_session_id="parent",
)
db.append_message("branch", "user", "Exploring the alternative approach")
# Marker is persisted at creation time.
branch_row = db.get_session("branch")
cfg = _json.loads(branch_row["model_config"]) if branch_row["model_config"] else {}
assert cfg.get("_branched_from") == "parent"
# Visible immediately after branching.
assert "branch" in [s["id"] for s in db.list_sessions_rich()]
# Parent reopened + re-ended with a different reason (the bug trigger).
db.reopen_session("parent")
db.end_session("parent", "tui_shutdown")
# Branch must STILL be visible — the marker survives the parent's
# end_reason churn, unlike the legacy 'branched' heuristic.
ids = [s["id"] for s in db.list_sessions_rich()]
assert "branch" in ids, "Branch should stay visible after parent re-end"
def test_subagent_session_still_hidden(self, db):
"""Sub-agent children (parent NOT ended with 'branched') remain hidden."""
db.create_session("root", "cli")
@ -3786,3 +3824,57 @@ class TestSessionArchive:
both = {s["id"] for s in db.list_sessions_rich(include_archived=True)}
assert both == {"live", "hidden"}
assert db.session_count(include_archived=True) == 2
class TestSessionIdSearch:
"""Session id search backs Desktop's Search Sessions UX."""
def _seed(self, db, sid, *, content="ordinary message", archived=False):
db.create_session(session_id=sid, source="cli", model="test-model")
db.append_message(session_id=sid, role="user", content=content)
if archived:
db.set_session_archived(sid, True)
def test_search_sessions_by_id_matches_exact_prefix_and_substring(self, db):
self._seed(db, "20260603_090200_abcd12", content="content without id")
self._seed(db, "20260602_111111_other99", content="other content")
assert [s["id"] for s in db.search_sessions_by_id("20260603_090200_abcd12")] == [
"20260603_090200_abcd12"
]
assert [s["id"] for s in db.search_sessions_by_id("20260603")] == ["20260603_090200_abcd12"]
assert [s["id"] for s in db.search_sessions_by_id("ABCD12")] == ["20260603_090200_abcd12"]
def test_search_sessions_by_id_respects_limit_and_prioritizes_exact_matches(self, db):
self._seed(db, "20260603_090200_abcd12")
self._seed(db, "20260603_090200_abcd12_child")
self._seed(db, "x_20260603_090200_abcd12")
ids = [s["id"] for s in db.search_sessions_by_id("20260603_090200_abcd12", limit=2)]
assert ids == ["20260603_090200_abcd12", "20260603_090200_abcd12_child"]
def test_search_sessions_by_id_can_include_or_exclude_archived(self, db):
self._seed(db, "20260603_090200_live")
self._seed(db, "20260603_090200_archived", archived=True)
included = {s["id"] for s in db.search_sessions_by_id("20260603_090200", include_archived=True)}
excluded = {s["id"] for s in db.search_sessions_by_id("20260603_090200", include_archived=False)}
assert included == {"20260603_090200_live", "20260603_090200_archived"}
assert excluded == {"20260603_090200_live"}
def test_search_sessions_by_id_matches_projected_lineage_root_id(self, db):
root = "20260602_235959_root99"
tip = "20260603_010000_tip01"
db.create_session(session_id=root, source="cli")
db.append_message(root, role="user", content="root conversation")
db.end_session(root, "compression")
db.create_session(session_id=tip, source="cli", parent_session_id=root)
db.append_message(tip, role="user", content="continued conversation")
matches = db.search_sessions_by_id("root99")
assert [s["id"] for s in matches] == [tip]
assert matches[0]["_lineage_root_id"] == root

View File

@ -372,7 +372,8 @@ class TestVisionDispatchLoopSafety:
side_effect=lambda url, dest, **kw: _write_fake_image(dest),
),
patch(
"tools.vision_tools._validate_image_url",
"tools.vision_tools._validate_image_url_async",
new_callable=AsyncMock,
return_value=True,
),
patch(
@ -416,7 +417,8 @@ class TestVisionDispatchLoopSafety:
side_effect=lambda url, dest, **kw: _write_fake_image(dest),
),
patch(
"tools.vision_tools._validate_image_url",
"tools.vision_tools._validate_image_url_async",
new_callable=AsyncMock,
return_value=True,
),
patch(

View File

@ -88,6 +88,17 @@ def test_lazy_installable_extras_excluded_from_all():
)
def test_dev_extra_excluded_from_all():
"""End-user installs should not pull test/lint/debug tooling."""
optional_dependencies = _load_optional_dependencies()
assert "dev" in optional_dependencies
assert not any(
spec == "hermes-agent[dev]"
for spec in optional_dependencies["all"]
)
def test_messaging_extra_includes_qrcode_for_weixin_setup():
optional_dependencies = _load_optional_dependencies()

View File

@ -5474,6 +5474,8 @@ def test_notification_poller_requeues_when_busy(monkeypatch):
assert requeued["session_id"] == "proc_busy_test"
finally:
server._sessions.pop("sid_busy", None)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def test_session_save_writes_under_hermes_home_with_system_prompt(monkeypatch, tmp_path):
@ -5533,3 +5535,95 @@ def test_session_save_writes_under_hermes_home_with_system_prompt(monkeypatch, t
assert payload["session_start"] == "2026-01-01T12:00:00"
assert payload["system_prompt"] == "You are Hermes."
assert payload["messages"] == history
def test_notification_event_dedup_key_preserves_distinct_watch_matches():
"""Watch-match identity includes match content, not just session/type."""
base = {
"type": "watch_match",
"session_id": "proc_watch",
"command": "tail -f app.log",
"pattern": "READY",
"output": "READY on port 8000",
"suppressed": 0,
}
identical = dict(base)
distinct_output = {**base, "output": "READY on port 9000"}
distinct_pattern = {**base, "pattern": "MIGRATION_DONE"}
base_key = server._notification_event_dedup_key(base)
assert server._notification_event_dedup_key(identical) == base_key
assert server._notification_event_dedup_key(distinct_output) != base_key
assert server._notification_event_dedup_key(distinct_pattern) != base_key
def test_notification_poller_emits_distinct_watch_matches_once(monkeypatch):
"""Distinct watch matches from one process emit; exact replay is deduped."""
from tools.process_registry import process_registry
turns = []
emitted = []
def _fake_run_prompt_submit(rid, sid, session, text):
turns.append(text)
with session["history_lock"]:
session["running"] = False
sess = _session()
server._sessions["sid_watch_dedup"] = sess
monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a))
monkeypatch.setattr(server, "_run_prompt_submit", _fake_run_prompt_submit)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
base = {
"type": "watch_match",
"session_id": "proc_watch_dedup",
"command": "tail -f app.log",
"pattern": "READY",
"output": "READY on port 8000",
"suppressed": 0,
}
process_registry.completion_queue.put(base)
process_registry.completion_queue.put({**base, "output": "READY on port 9000"})
process_registry.completion_queue.put(dict(base))
stop = threading.Event()
stop.set()
try:
server._notification_poller_loop(stop, "sid_watch_dedup", sess)
status_calls = [a for a in emitted if a[0] == "status.update"]
assert len(status_calls) == 2
status_text = "\n".join(call[2]["text"] for call in status_calls)
assert "READY on port 8000" in status_text
assert "READY on port 9000" in status_text
assert len(turns) == 3
finally:
server._sessions.pop("sid_watch_dedup", None)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def test_notification_event_dedup_key_keeps_completions_one_shot():
"""Completion identity remains process-session scoped to avoid floods."""
first = {
"type": "completion",
"session_id": "proc_done",
"command": "make build",
"exit_code": 0,
"output": "first output",
}
replay = {
"type": "completion",
"session_id": "proc_done",
"command": "make build --again",
"exit_code": 1,
"output": "different output should not change completion key",
}
assert server._notification_event_dedup_key(first) == server._notification_event_dedup_key(
replay
)

View File

@ -44,6 +44,7 @@ def _make_dummy_env(**kwargs):
auto_mount_cwd=kwargs.get("auto_mount_cwd", False),
env=kwargs.get("env"),
run_as_host_user=kwargs.get("run_as_host_user", False),
persist_across_processes=kwargs.get("persist_across_processes", True),
)
@ -786,6 +787,84 @@ def test_reuse_falls_back_to_fresh_run_when_start_fails(monkeypatch):
assert run_invocations, "fallback to fresh docker run must happen on start failure"
def test_failed_docker_run_cleans_up_orphaned_container(monkeypatch):
"""When ``docker run`` fails (e.g. exit 125), the partially-created
container must be removed by name.
Docker can create the container object before failing to start it,
leaving a stale ``Created`` container. The exited-only orphan reaper
(``reap_orphan_containers``, ``status=exited``) never catches a
``Created`` orphan, so without this cleanup it leaks permanently.
Regression for #7439. Salvage of #7440 (@Tranquil-Flow).
"""
monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker")
monkeypatch.setattr(docker_env, "_get_active_profile_name", lambda: "default")
cleanup_calls = []
def _run(cmd, **kwargs):
if isinstance(cmd, list) and len(cmd) >= 2:
sub = cmd[1]
if sub == "version":
return subprocess.CompletedProcess(cmd, 0, stdout="Docker version", stderr="")
if sub == "ps":
# No reusable container -> fall through to a fresh `docker run`.
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if sub == "run":
raise subprocess.CalledProcessError(
125, cmd, output="", stderr="docker: Error response from daemon"
)
if sub == "rm":
cleanup_calls.append(list(cmd))
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
monkeypatch.setattr(docker_env.subprocess, "run", _run)
with pytest.raises(subprocess.CalledProcessError):
_make_dummy_env()
assert len(cleanup_calls) == 1, "docker rm should be called once for the orphaned container"
rm_cmd = cleanup_calls[0]
assert rm_cmd[1] == "rm" and rm_cmd[2] == "-f"
assert rm_cmd[3].startswith("hermes-"), "should remove the container by its generated name"
def test_docker_run_timeout_cleans_up_orphaned_container(monkeypatch):
"""When ``docker run`` times out (e.g. slow image pull), the
partially-created container must be removed. Salvage of #7440
(@Tranquil-Flow); regression for #7439.
"""
monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker")
monkeypatch.setattr(docker_env, "_get_active_profile_name", lambda: "default")
cleanup_calls = []
def _run(cmd, **kwargs):
if isinstance(cmd, list) and len(cmd) >= 2:
sub = cmd[1]
if sub == "version":
return subprocess.CompletedProcess(cmd, 0, stdout="Docker version", stderr="")
if sub == "ps":
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
if sub == "run":
raise subprocess.TimeoutExpired(cmd, 120)
if sub == "rm":
cleanup_calls.append(list(cmd))
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
monkeypatch.setattr(docker_env.subprocess, "run", _run)
with pytest.raises(subprocess.TimeoutExpired):
_make_dummy_env()
assert len(cleanup_calls) == 1, "docker rm should be called once for the orphaned container"
rm_cmd = cleanup_calls[0]
assert rm_cmd[1] == "rm" and rm_cmd[2] == "-f"
assert rm_cmd[3].startswith("hermes-"), "should remove the container by its generated name"
def test_no_reuse_when_persist_across_processes_disabled(monkeypatch):
"""Opt-out path: ``persist_across_processes=False`` skips the ps probe
entirely and always starts a fresh container, matching the pre-fix
@ -1629,3 +1708,128 @@ def test_plain_image_keeps_docker_init_and_run_noexec(monkeypatch):
assert "noexec" in run_mounts[0], (
f"/run must stay noexec for non-s6 images, got: {run_mounts[0]}"
)
# ---------------------------------------------------------------------------
# Out-of-band container removal recovery (issue #36266, PR #36631)
# ---------------------------------------------------------------------------
def test_is_container_gone_matches_removal_errors(monkeypatch):
"""``_is_container_gone`` recognizes the docker errors that mean the
container no longer exists, and does NOT match ordinary command failures.
"""
monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker")
_mock_subprocess_run(monkeypatch)
env = _make_dummy_env()
# Positive: the daemon's "container gone" phrasings.
assert env._is_container_gone(
"Error response from daemon: No such container: hermes-abc123"
)
assert env._is_container_gone("Error: No such container: deadbeef")
assert env._is_container_gone(
"Error response from daemon: Container abc is not running"
)
# Control / negative: a real command failure must NOT be misclassified as
# the container being gone — otherwise every non-zero exit would trigger a
# spurious container recreation.
assert not env._is_container_gone("bash: nonsuch: command not found")
assert not env._is_container_gone("Traceback (most recent call last): ...")
assert not env._is_container_gone("")
assert not env._is_container_gone("permission denied")
def test_execute_recovers_from_out_of_band_removal(monkeypatch):
"""When a persistent container is removed out-of-band, ``execute`` detects
the "No such container" error, recreates the container, and retries once —
returning success transparently.
"""
monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker")
_mock_subprocess_run(monkeypatch)
env = _make_dummy_env(
persistent_filesystem=True,
persist_across_processes=True,
)
# First execute() sees a dead container; second (post-recovery) succeeds.
outputs = iter([
{"output": "Error response from daemon: No such container: hermes-x", "returncode": 1},
{"output": "ok", "returncode": 0},
])
def _fake_super_execute(self, command, cwd="", **kwargs):
return next(outputs)
recreate_calls = []
def _fake_recreate(self):
recreate_calls.append(True)
self._container_id = "recovered-container-id"
return True
monkeypatch.setattr(docker_env.BaseEnvironment, "execute", _fake_super_execute)
monkeypatch.setattr(
docker_env.DockerEnvironment, "_recreate_container", _fake_recreate
)
result = env.execute("echo hi")
assert recreate_calls == [True], "recovery should have been attempted exactly once"
assert result.get("returncode") == 0, f"expected success after recovery, got {result!r}"
assert result.get("output") == "ok"
def test_execute_does_not_recover_when_not_persistent(monkeypatch):
"""A non-persistent session must NOT trigger container recreation on a
"No such container" error — recovery is only meaningful for the persistent,
cross-process container that can be removed out-of-band.
"""
monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker")
_mock_subprocess_run(monkeypatch)
env = _make_dummy_env(
persistent_filesystem=True,
persist_across_processes=False,
)
def _fake_super_execute(self, command, cwd="", **kwargs):
return {"output": "No such container: x", "returncode": 1}
def _fail_recreate(self):
pytest.fail("recreation must not run when persist_across_processes is False")
monkeypatch.setattr(docker_env.BaseEnvironment, "execute", _fake_super_execute)
monkeypatch.setattr(
docker_env.DockerEnvironment, "_recreate_container", _fail_recreate
)
result = env.execute("echo hi")
assert result.get("returncode") == 1, "the original error must pass through unchanged"
def test_execute_does_not_recover_on_ordinary_failure(monkeypatch):
"""A genuine non-zero exit that is NOT a container-gone error must pass
through without triggering recovery (guards against over-eager recreation).
"""
monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker")
_mock_subprocess_run(monkeypatch)
env = _make_dummy_env(
persistent_filesystem=True,
persist_across_processes=True,
)
def _fake_super_execute(self, command, cwd="", **kwargs):
return {"output": "bash: badcmd: command not found", "returncode": 127}
def _fail_recreate(self):
pytest.fail("recreation must not run for an ordinary command failure")
monkeypatch.setattr(docker_env.BaseEnvironment, "execute", _fake_super_execute)
monkeypatch.setattr(
docker_env.DockerEnvironment, "_recreate_container", _fail_recreate
)
result = env.execute("badcmd")
assert result.get("returncode") == 127
assert "command not found" in result.get("output", "")

View File

@ -595,6 +595,7 @@ class TestSendToPlatformChunking:
"***",
"C123",
"*hello* from <https://example.com|Hermes>",
thread_ts=None,
)
def test_slack_bold_italic_formatted_before_send(self, monkeypatch):

View File

@ -90,7 +90,12 @@ class TestSSHBulkUpload:
assert "/home/testuser/.hermes/credentials" in mkdir_str
def test_staging_symlinks_mirror_remote_layout(self, mock_env, tmp_path):
"""Symlinks in staging dir should mirror the .hermes-relative layout."""
"""Staged file in staging dir should mirror the remote path structure.
On platforms where symlinks are available (Linux/macOS) the staged
entry is a symlink; on Windows it may be a regular copy. Either way
the file must exist at the expected path and contain the right data.
"""
f1 = tmp_path / "local_a.txt"
f1.write_text("content a")
@ -105,11 +110,14 @@ class TestSSHBulkUpload:
# Capture the staging dir from -C argument
c_idx = cmd.index("-C")
staging_dir = cmd[c_idx + 1]
# Check the symlink exists
# Check the staged entry exists at the base-relative path
expected = os.path.join(staging_dir, "skills/my_skill.md")
staging_paths.append(expected)
assert os.path.islink(expected), f"Expected symlink at {expected}"
assert os.readlink(expected) == os.path.abspath(str(f1))
# File must exist (either as symlink or copy)
assert os.path.exists(expected), f"Expected staged file at {expected}"
# Content must match the source
with open(expected, "r") as fh:
assert fh.read() == "content a"
mock = MagicMock()
mock.stdout = MagicMock()

View File

@ -156,14 +156,15 @@ def test_cli_and_gateway_env_maps_agree():
def test_save_config_set_supports_critical_bridged_keys():
"""``hermes config set terminal.X true`` must propagate to .env for
known-critical keys. This used to be an all-keys invariant but several
pre-existing terminal keys (ssh_*, docker_forward_env, docker_volumes)
aren't in _config_to_env_sync and are instead handled via the separate
api_keys TERMINAL_SSH_* fallback path or user-edits-yaml-directly.
known-critical keys. This used to be an all-keys invariant but the SSH
terminal keys (ssh_*) aren't in _config_to_env_sync and are instead
handled via the separate api_keys TERMINAL_SSH_* fallback path or
user-edits-yaml-directly.
Until those gaps are audited and fixed, pin the specific keys that are
load-bearing for the docker backend's ownership flag so the bug we just
fixed cannot silently regress.
load-bearing for the docker backend so the bugs we fixed cannot silently
regress. (docker_volumes / docker_forward_env, previously listed here as
gaps, are now bridged — see the dedicated tests below.)
"""
save_keys = _save_config_env_sync_keys()
required = {
@ -260,3 +261,37 @@ def test_docker_orphan_reaper_is_bridged_everywhere():
assert "docker_orphan_reaper" in _gateway_env_map_keys()
assert "docker_orphan_reaper" in _save_config_env_sync_keys()
assert "TERMINAL_DOCKER_ORPHAN_REAPER" in _terminal_tool_env_var_names()
def test_docker_volumes_is_bridged_everywhere():
"""Regression pin for ``terminal.docker_volumes`` being silently dropped by
``hermes config set``.
The JSON list of ``host:container`` bind mounts was bridged by cli.py and
gateway/run.py and consumed by terminal_tool (via json.loads), but was
missing from set_config_value's _config_to_env_sync. So
``hermes config set terminal.docker_volumes '["/host:/workspace"]'`` wrote
config.yaml yet left the running process's TERMINAL_DOCKER_VOLUMES stale —
the mounts didn't apply until a full restart. Same four-site bridge
invariant as docker_env / docker_run_as_host_user.
"""
assert "docker_volumes" in _cli_env_map_keys()
assert "docker_volumes" in _gateway_env_map_keys()
assert "docker_volumes" in _save_config_env_sync_keys()
assert "TERMINAL_DOCKER_VOLUMES" in _terminal_tool_env_var_names()
def test_docker_forward_env_is_bridged_everywhere():
"""Regression pin for ``terminal.docker_forward_env`` — the sibling gap to
docker_volumes.
The JSON list of host env-var names forwarded into the container was
bridged by cli.py and gateway/run.py and consumed by terminal_tool (via
json.loads), but missing from set_config_value's _config_to_env_sync, so
``hermes config set terminal.docker_forward_env '["GITHUB_TOKEN"]'`` had no
effect on the running process until restart.
"""
assert "docker_forward_env" in _cli_env_map_keys()
assert "docker_forward_env" in _gateway_env_map_keys()
assert "docker_forward_env" in _save_config_env_sync_keys()
assert "TERMINAL_DOCKER_FORWARD_ENV" in _terminal_tool_env_var_names()

View File

@ -5,6 +5,7 @@ from unittest.mock import patch
from tools.url_safety import (
is_safe_url,
async_is_safe_url,
is_always_blocked_url,
_is_blocked_ip,
_global_allow_private_urls,
@ -195,6 +196,24 @@ class TestIsSafeUrl:
assert is_safe_url("https://multimedia.nt.qq.com.cn/download?id=123") is False
class TestAsyncIsSafeUrl:
"""async_is_safe_url must match is_safe_url (runs DNS in a thread pool)."""
@pytest.mark.asyncio
async def test_public_url_allowed(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("93.184.216.34", 0)),
]):
assert await async_is_safe_url("https://example.com/x") is True
@pytest.mark.asyncio
async def test_localhost_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("127.0.0.1", 0)),
]):
assert await async_is_safe_url("http://localhost:8080/") is False
class TestIsBlockedIp:
"""Direct tests for the _is_blocked_ip helper."""

View File

@ -297,7 +297,7 @@ class TestErrorLoggingExcInfo:
async def test_analysis_error_logs_exc_info(self, caplog):
"""When vision_analyze_tool encounters an error, it should log with exc_info."""
with (
patch("tools.vision_tools._validate_image_url", return_value=True),
patch("tools.vision_tools._validate_image_url_async", new_callable=AsyncMock, return_value=True),
patch(
"tools.vision_tools._download_image",
new_callable=AsyncMock,
@ -329,7 +329,7 @@ class TestErrorLoggingExcInfo:
return dest
with (
patch("tools.vision_tools._validate_image_url", return_value=True),
patch("tools.vision_tools._validate_image_url_async", new_callable=AsyncMock, return_value=True),
patch("tools.vision_tools._download_image", side_effect=fake_download),
patch(
"tools.vision_tools._image_to_base64_data_url",
@ -451,7 +451,7 @@ class TestVisionSafetyGuards:
with (
patch("tools.vision_tools.check_website_access", return_value=blocked),
patch("tools.vision_tools._validate_image_url", return_value=True),
patch("tools.vision_tools._validate_image_url_async", new_callable=AsyncMock, return_value=True),
patch("tools.vision_tools._download_image", new_callable=AsyncMock) as mock_download,
):
result = json.loads(await vision_analyze_tool("https://blocked.test/cat.png", "describe"))
@ -549,7 +549,9 @@ class TestTildeExpansion:
img = fake_home / "test_image.png"
img.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 8)
# Windows expanduser() prefers USERPROFILE over HOME; POSIX uses HOME.
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setenv("USERPROFILE", str(fake_home))
mock_response = MagicMock()
mock_choice = MagicMock()
@ -580,6 +582,7 @@ class TestTildeExpansion:
fake_home = tmp_path / "fakehome"
fake_home.mkdir()
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setenv("USERPROFILE", str(fake_home))
result = await vision_analyze_tool(
"~/nonexistent.png", "describe this", "test/model"

View File

@ -259,7 +259,10 @@ class TestBraveFreeSearchOnlyErrors:
monkeypatch.setattr(web_tools, "_load_web_config", lambda: {"backend": "brave-free"})
monkeypatch.setenv("BRAVE_SEARCH_API_KEY", "BSAkey123")
monkeypatch.setattr(web_tools, "_is_tool_gateway_ready", lambda: False)
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
async def _allow_ssrf(_url: str) -> bool:
return True
monkeypatch.setattr(web_tools, "async_is_safe_url", _allow_ssrf)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False, raising=False)
result_str = asyncio.get_event_loop().run_until_complete(

View File

@ -229,7 +229,10 @@ class TestDDGSSearchOnlyErrors:
monkeypatch.setattr(web_tools, "_load_web_config", lambda: {"backend": "ddgs"})
monkeypatch.setattr(web_tools, "_ddgs_package_importable", lambda: True)
monkeypatch.setattr(web_tools, "_is_tool_gateway_ready", lambda: False)
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
async def _allow_ssrf(_url: str) -> bool:
return True
monkeypatch.setattr(web_tools, "async_is_safe_url", _allow_ssrf)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False, raising=False)
result_str = asyncio.get_event_loop().run_until_complete(

View File

@ -318,7 +318,10 @@ class TestSearXNGOnlyExtractCrawlErrors:
monkeypatch.setattr(web_tools, "_load_web_config", lambda: {"backend": "searxng"})
monkeypatch.setenv("SEARXNG_URL", "http://localhost:8080")
monkeypatch.setattr(web_tools, "_is_tool_gateway_ready", lambda: False)
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
async def _allow_ssrf(_url: str) -> bool:
return True
monkeypatch.setattr(web_tools, "async_is_safe_url", _allow_ssrf)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False, raising=False)
result_str = asyncio.get_event_loop().run_until_complete(

View File

@ -372,7 +372,10 @@ class TestWebToolPolicy:
from plugins.web.firecrawl import provider as firecrawl_provider
# Allow test URLs past SSRF check so website policy is what gets tested
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
async def _allow_ssrf(_url: str) -> bool:
return True
monkeypatch.setattr(web_tools, "async_is_safe_url", _allow_ssrf)
# The per-URL website-policy gate moved into the firecrawl plugin's
# extract() during the web-provider migration. Patch it at the new
# location.
@ -406,7 +409,10 @@ class TestWebToolPolicy:
from plugins.web.firecrawl import provider as firecrawl_provider
# Allow test URLs past SSRF check so website policy is what gets tested
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
async def _allow_ssrf(_url: str) -> bool:
return True
monkeypatch.setattr(web_tools, "async_is_safe_url", _allow_ssrf)
def fake_check(url):
if url == "https://allowed.test":

View File

@ -394,6 +394,290 @@ def test_session_resume_handles_multimodal_list_content(server, monkeypatch):
]
def test_session_resume_reuses_existing_live_session(server, monkeypatch):
"""Repeated resume must not allocate duplicate live agents."""
target = "20260409_010101_abc123"
created_sids: list[str] = []
closed_sids: list[str] = []
first_agent_started = threading.Event()
agent_can_finish = threading.Event()
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "yo"},
]
class _Worker:
def close(self):
pass
class _Agent:
def __init__(self, sid, session_id):
self.sid = sid
self.model = "test/model"
self.session_id = session_id
def close(self):
closed_sids.append(self.sid)
def make_agent(sid, key, session_id=None):
created_sids.append(sid)
first_agent_started.set()
assert agent_can_finish.wait(timeout=1)
return _Agent(sid, session_id or key)
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_make_agent", make_agent)
monkeypatch.setattr(server, "_SlashWorker", lambda _key, _model: _Worker())
monkeypatch.setattr(
server,
"_start_notification_poller",
lambda _sid, _session: threading.Event(),
)
monkeypatch.setattr(server, "_notify_session_boundary", lambda *_args, **_kwargs: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *_args, **_kwargs: None)
monkeypatch.setattr(
server,
"_session_info",
lambda _agent, _session=None: {"model": "test/model"},
)
fake_approval = types.SimpleNamespace(
load_permanent_allowlist=lambda: None,
register_gateway_notify=lambda *_args, **_kwargs: None,
)
with patch.dict(sys.modules, {"tools.approval": fake_approval}):
first_holder = {}
def resume_first():
first_holder["resp"] = server.handle_request(
{
"id": "first",
"method": "session.resume",
"params": {"session_id": target, "cols": 100},
}
)
first_thread = threading.Thread(target=resume_first)
first_thread.start()
assert first_agent_started.wait(timeout=1)
second_holder = {}
def resume_second():
second_holder["resp"] = server.handle_request(
{
"id": "second",
"method": "session.resume",
"params": {"session_id": target, "cols": 120},
}
)
second_thread = threading.Thread(target=resume_second)
second_thread.start()
agent_can_finish.set()
first_thread.join(timeout=1)
second_thread.join(timeout=1)
assert not first_thread.is_alive()
assert not second_thread.is_alive()
first = first_holder["resp"]
second = second_holder["resp"]
assert "error" not in first
assert "error" not in second
# Both resumes resolve to the SAME single live session — the core invariant.
assert second["result"]["session_id"] == first["result"]["session_id"]
assert len(server._sessions) == 1
assert [s.get("session_key") for s in server._sessions.values()].count(target) == 1
winner = first["result"]["session_id"]
# The agent build happens outside the resume lock, so a racing resume may
# build a redundant agent; double-checked locking keeps only one live
# session and closes any loser's agent (no worker/poller is wired for it).
assert winner in created_sids
survivors = [sid for sid in created_sids if sid not in closed_sids]
assert survivors == [winner]
assert all(sid == winner for sid in server._sessions)
def test_session_resume_live_payload_uses_current_history_with_ancestors(server, monkeypatch):
"""Live resume should not reuse a stale ancestor-inclusive snapshot."""
target = "20260409_010101_child"
ancestor_history = [{"role": "user", "content": "ancestor"}]
current_history = [
{"role": "user", "content": "current"},
{"role": "assistant", "content": "current reply"},
]
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
if include_ancestors:
return ancestor_history + current_history
return list(current_history)
class _Worker:
def close(self):
pass
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(
server,
"_make_agent",
lambda _sid, key, session_id=None: types.SimpleNamespace(
model="test/model", session_id=session_id or key
),
)
monkeypatch.setattr(server, "_SlashWorker", lambda _key, _model: _Worker())
monkeypatch.setattr(
server,
"_start_notification_poller",
lambda _sid, _session: threading.Event(),
)
monkeypatch.setattr(server, "_notify_session_boundary", lambda *_args, **_kwargs: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *_args, **_kwargs: None)
monkeypatch.setattr(
server,
"_session_info",
lambda _agent, _session=None: {"model": "test/model"},
)
fake_approval = types.SimpleNamespace(
load_permanent_allowlist=lambda: None,
register_gateway_notify=lambda *_args, **_kwargs: None,
)
with patch.dict(sys.modules, {"tools.approval": fake_approval}):
first = server.handle_request(
{
"id": "first",
"method": "session.resume",
"params": {"session_id": target, "cols": 100},
}
)
assert "error" not in first
sid = first["result"]["session_id"]
assert first["result"]["messages"] == [
{"role": "user", "text": "ancestor"},
{"role": "user", "text": "current"},
{"role": "assistant", "text": "current reply"},
]
with server._sessions[sid]["history_lock"]:
server._sessions[sid]["history"] = current_history + [
{"role": "user", "content": "new live turn"},
{"role": "assistant", "content": "new live reply"},
]
second = server.handle_request(
{
"id": "second",
"method": "session.resume",
"params": {"session_id": target, "cols": 120},
}
)
assert "error" not in second
assert second["result"]["session_id"] == sid
assert second["result"]["messages"] == [
{"role": "user", "text": "ancestor"},
{"role": "user", "text": "current"},
{"role": "assistant", "text": "current reply"},
{"role": "user", "text": "new live turn"},
{"role": "assistant", "text": "new live reply"},
]
def test_session_branch_persists_branched_from_marker(server, monkeypatch):
"""TUI /branch must persist a _branched_from marker so the branch stays
visible in /resume and /sessions.
Regression for issue #20856: the TUI branch leaves the parent live (it
never ends it with end_reason='branched'), so list_sessions_rich's legacy
heuristic never surfaces it — the stable model_config marker is the only
thing that keeps a TUI branch visible.
"""
create_calls = []
class _DB:
def get_session_title(self, _key):
return "parent-title"
def get_next_title_in_lineage(self, base):
return f"{base} 2"
def create_session(self, new_key, **kwargs):
create_calls.append((new_key, kwargs))
return new_key
def append_message(self, **_kwargs):
return None
def set_session_title(self, _key, _title):
return None
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_resolve_model", lambda: "test/model")
monkeypatch.setattr(server, "_new_session_key", lambda: "20260101_000001_child0")
monkeypatch.setattr(
server,
"_make_agent",
lambda _sid, key, session_id=None: types.SimpleNamespace(
model="test/model", session_id=session_id or key
),
)
monkeypatch.setattr(server, "_init_session", lambda *_a, **_k: None)
monkeypatch.setattr(server, "_set_session_context", lambda *_a, **_k: [])
monkeypatch.setattr(server, "_clear_session_context", lambda *_a, **_k: None)
monkeypatch.setattr(server, "_session_cwd", lambda _s: "/tmp/branch-cwd")
parent_sid = "parent01"
parent_key = "20260101_000000_parent"
server._sessions[parent_sid] = {
"session_key": parent_key,
"history": [{"role": "user", "content": "hello"}],
"history_lock": threading.Lock(),
"cols": 80,
}
resp = server.handle_request(
{"id": "b1", "method": "session.branch", "params": {"session_id": parent_sid}}
)
assert "error" not in resp, resp
assert len(create_calls) == 1
new_key, kwargs = create_calls[0]
assert new_key == "20260101_000001_child0"
assert kwargs["parent_session_id"] == parent_key
# The marker — without it the branch is invisible in /resume and /sessions.
assert kwargs["model_config"] == {"_branched_from": parent_key}
def test_make_agent_accepts_list_system_prompt(server, monkeypatch):
captured = {}