Files
hermes-agent/tools/tool_search.py
teknium1 7427b9d581 fix(tool-search): scope bridge catalog + dispatch to the session's toolsets
Tool Search read its catalog from the global registry (get_tool_definitions
with no toolset scope = 'start with everything'), so a restricted-toolset
session — subagent, kanban worker, curated gateway session — could:

  1. tool_search the entire process registry, not just its granted tools, and
  2. tool_call any registered plugin/MCP tool it was never given, because
     registry.dispatch() has no enabled_tools gate for non-execute_code tools.

A scoped session (enabled_toolsets=['mcp-github']) reported total_available=26
and successfully invoked an out-of-scope plugin tool via tool_call.

Fix:
- handle_function_call gains enabled_toolsets/disabled_toolsets; the bridge
  dispatch scopes get_tool_definitions to them (also stops polluting the
  process-global _last_resolved_tool_names with out-of-scope tools, which
  leaked into execute_code's sandbox-tool fallback).
- A defense-in-depth gate rejects any tool_call'd name not in the scoped
  deferrable catalog.
- tool_executor's unwrap (both concurrent + sequential paths) enforces the
  same scope before dispatch, since it unwraps tool_call -> underlying name
  and bypasses the bridge branch. New _tool_search_scoped_names() helper,
  cached per-agent on registry generation + toolset scope.
- New scoped_deferrable_names() helper in tool_search.py shared by both sites.

Tests: 4 new regression tests in TestRegression_ToolsetScoping (scoped
catalog, out-of-scope tool_call rejection, no global pollution, helper).
2026-05-29 02:04:12 -07:00

736 lines
27 KiB
Python

"""Progressive tool disclosure ("tool search") for Hermes Agent.
When enabled, MCP and non-core plugin tools are replaced in the model-visible
tools array by three bridge tools — ``tool_search``, ``tool_describe``,
``tool_call`` — and surfaced on demand. Core Hermes tools never defer.
Design constraints this module is built around (see ``openclaw-tool-search-report``
for the full rationale):
* Core tools defined in ``toolsets._HERMES_CORE_TOOLS`` are *never* deferred.
Always-load means always-load. No exceptions.
* The threshold gate runs every assembly: when deferrable tools would consume
less than ``threshold_pct`` of the model's context window (default 10%),
tool search is a no-op and the tools array passes through unchanged.
* The catalog is stateless across turns and tools-array assemblies. It is
rebuilt from the current tool-defs list every time. This is the lesson
from OpenClaw's cron regression (openclaw/openclaw#84141): a session-keyed
catalog that drifts out of sync with the live tool registry produces
silent tool dropouts.
* Bridge tools route through ``model_tools.handle_function_call`` exactly
like a direct call, so guardrails, plugin pre/post hooks, approval flows,
and tool-result truncation all fire identically.
* Display and trajectory unwrap is implemented here so the user (CLI activity
feed, gateway, saved trajectories) always sees the underlying tool, not
the bridge.
"""
from __future__ import annotations
import json
import logging
import math
import re
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Tuple
logger = logging.getLogger("tools.tool_search")
# Bridge tool names. These names are reserved and may not collide with a
# user/plugin/MCP tool — registration of any tool with these names is
# rejected by the registry's existing override-protection logic.
TOOL_SEARCH_NAME = "tool_search"
TOOL_DESCRIBE_NAME = "tool_describe"
TOOL_CALL_NAME = "tool_call"
BRIDGE_TOOL_NAMES = frozenset({TOOL_SEARCH_NAME, TOOL_DESCRIBE_NAME, TOOL_CALL_NAME})
# When estimating tokens from char count without a real tokenizer, this is
# the cheap rule of thumb that's stable across providers. Roughly 4 chars
# per token for English+JSON. Underestimating leads to false negatives
# (tool search not activated when it should); overestimating leads to false
# positives (activated when not needed). 4.0 errs slightly toward
# underestimating, which is the safer default.
CHARS_PER_TOKEN = 4.0
# ---------------------------------------------------------------------------
# Configuration plumbing
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class ToolSearchConfig:
"""Resolved, validated tool-search configuration for a single assembly."""
enabled: str # "auto" | "on" | "off"
threshold_pct: float # 0..100 — only used when enabled == "auto"
search_default_limit: int
max_search_limit: int
@classmethod
def from_raw(cls, raw: Any) -> "ToolSearchConfig":
"""Build a config from a raw dict / bool / None.
Accepts the legacy bool shape (``tools.tool_search: true``) and the
dict shape (``tools.tool_search: {enabled: auto, ...}``). Validates
and clamps every numeric field; unknown values fall back to safe
defaults rather than raising, so a typo in user config does not
break the agent.
"""
if raw is True:
return cls(enabled="auto", threshold_pct=10.0,
search_default_limit=5, max_search_limit=20)
if raw is False:
return cls(enabled="off", threshold_pct=10.0,
search_default_limit=5, max_search_limit=20)
if not isinstance(raw, dict):
return cls(enabled="auto", threshold_pct=10.0,
search_default_limit=5, max_search_limit=20)
enabled_raw = str(raw.get("enabled", "auto")).strip().lower()
if enabled_raw in ("true", "1", "yes"):
enabled = "on"
elif enabled_raw in ("false", "0", "no"):
enabled = "off"
elif enabled_raw in ("auto", "on", "off"):
enabled = enabled_raw
else:
enabled = "auto"
threshold_pct = _safe_float(raw.get("threshold_pct"), 10.0)
threshold_pct = max(0.0, min(100.0, threshold_pct))
max_search_limit = max(1, min(50, _safe_int(raw.get("max_search_limit"), 20)))
search_default_limit = max(1, min(max_search_limit,
_safe_int(raw.get("search_default_limit"), 5)))
return cls(
enabled=enabled,
threshold_pct=threshold_pct,
search_default_limit=search_default_limit,
max_search_limit=max_search_limit,
)
def _safe_int(value: Any, fallback: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return fallback
def _safe_float(value: Any, fallback: float) -> float:
try:
return float(value)
except (TypeError, ValueError):
return fallback
def load_config() -> ToolSearchConfig:
"""Load tool-search config from the user config file."""
try:
from hermes_cli.config import load_config as _load
cfg = _load() or {}
tools_cfg = cfg.get("tools") if isinstance(cfg.get("tools"), dict) else {}
if not isinstance(tools_cfg, dict):
tools_cfg = {}
return ToolSearchConfig.from_raw(tools_cfg.get("tool_search"))
except Exception as e:
logger.debug("Failed to load tool-search config: %s", e)
return ToolSearchConfig.from_raw(None)
# ---------------------------------------------------------------------------
# Tool classification
# ---------------------------------------------------------------------------
def _core_tool_names() -> frozenset[str]:
"""Return the set of tool names that must NEVER be deferred.
Imported lazily because ``toolsets`` imports from ``tools.registry``
and we don't want a hard cycle.
"""
try:
from toolsets import _HERMES_CORE_TOOLS
return frozenset(_HERMES_CORE_TOOLS)
except Exception:
return frozenset()
def is_deferrable_tool_name(name: str) -> bool:
"""Return True if a tool with this name is *eligible* for deferral.
A tool is deferrable iff it is registered with an MCP toolset prefix
OR it is not in ``_HERMES_CORE_TOOLS``. Core tools are never deferred
even when their toolset is technically plugin-provided (this protects
against accidental shadowing).
"""
if name in BRIDGE_TOOL_NAMES:
return False
if name in _core_tool_names():
return False
# Check registry toolset for MCP prefix.
try:
from tools.registry import registry
entry = registry.get_entry(name)
if entry is None:
return False
if entry.toolset.startswith("mcp-"):
return True
# Non-MCP, non-core → plugin tool, eligible.
return True
except Exception:
return False
def classify_tools(tool_defs: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Split a tool-defs list into (visible, deferrable).
``visible`` retains every tool that must stay in the model-facing array:
every core tool, plus any tool we can't classify. ``deferrable`` is the
candidate set for catalog entry.
"""
visible: List[Dict[str, Any]] = []
deferrable: List[Dict[str, Any]] = []
for td in tool_defs:
fn = td.get("function") or {}
name = fn.get("name", "")
if name in BRIDGE_TOOL_NAMES:
# Should never happen — bridge tools are added after classification —
# but be defensive.
continue
if is_deferrable_tool_name(name):
deferrable.append(td)
else:
visible.append(td)
return visible, deferrable
# ---------------------------------------------------------------------------
# Token estimation and threshold gate
# ---------------------------------------------------------------------------
def estimate_tokens_from_schemas(tool_defs: Iterable[Dict[str, Any]]) -> int:
"""Estimate the token cost of a tool-defs list via the chars/4 rule.
Cheap and stable across providers. The number doesn't need to be exact —
it gates the activate/skip decision, and a typical 200K context with a
10% threshold means the decision flips around 20K tokens of schema.
Order-of-magnitude precision is fine.
"""
total_chars = 0
for td in tool_defs:
try:
total_chars += len(json.dumps(td, ensure_ascii=False, separators=(",", ":")))
except (TypeError, ValueError):
total_chars += len(str(td))
return int(math.ceil(total_chars / CHARS_PER_TOKEN))
def should_activate(
config: ToolSearchConfig,
deferrable_tokens: int,
context_length: Optional[int],
) -> bool:
"""Decide whether tool search should activate for the current assembly.
``"off"`` skips unconditionally. ``"on"`` activates unconditionally
(as long as there is at least one deferrable tool — there's no point
swapping a no-op). ``"auto"`` activates when the deferrable schemas
would consume ``threshold_pct`` of context or more.
"""
if config.enabled == "off":
return False
if deferrable_tokens <= 0:
return False
if config.enabled == "on":
return True
# auto
if not context_length or context_length <= 0:
# Without a known context size, fall back to a fixed 20K-token cutoff
# — the cliff above which Anthropic and OpenAI both saw quality drops.
return deferrable_tokens >= 20_000
threshold_tokens = int(context_length * (config.threshold_pct / 100.0))
return deferrable_tokens >= threshold_tokens
# ---------------------------------------------------------------------------
# Catalog + BM25 retrieval
# ---------------------------------------------------------------------------
@dataclass
class CatalogEntry:
"""One deferrable tool, in a form the bridge tools can search and serve."""
name: str
description: str
schema: Dict[str, Any] # The full {"type":"function", "function": {...}} entry.
source: str # "mcp" | "plugin" | "other"
source_name: str # Toolset name, e.g. "mcp-github" or "kanban"
# Pre-tokenized fields for BM25.
_tokens: List[str] = field(default_factory=list)
_TOKEN_RE = re.compile(r"[A-Za-z0-9]+")
def _tokenize(text: str) -> List[str]:
if not text:
return []
return [t.lower() for t in _TOKEN_RE.findall(text)]
def _entry_search_text(td: Dict[str, Any]) -> str:
"""Build the search-text blob for a deferrable tool.
Includes the tool name (with underscores broken into words so BM25 can
match against query terms), the description, and the names of the
top-level parameters. Schema bodies are deliberately excluded —
indexing them adds noise without improving recall in our measurement.
"""
fn = td.get("function") or {}
name = fn.get("name", "")
desc = fn.get("description", "") or ""
params = ((fn.get("parameters") or {}).get("properties") or {})
param_names = " ".join(params.keys())
# Break snake_case and dotted names into words for BM25.
name_words = name.replace("_", " ").replace(".", " ").replace("-", " ").replace(":", " ")
return f"{name_words} {desc} {param_names}"
def _classify_source(name: str) -> Tuple[str, str]:
"""Return (source_kind, source_name) for a registered tool name."""
try:
from tools.registry import registry
entry = registry.get_entry(name)
if entry is None:
return ("other", "")
if entry.toolset.startswith("mcp-"):
return ("mcp", entry.toolset)
return ("plugin", entry.toolset)
except Exception:
return ("other", "")
def build_catalog(tool_defs: List[Dict[str, Any]]) -> List[CatalogEntry]:
"""Build the deferred-tool catalog from a tool-defs list.
Caller is expected to pass only the deferrable subset (``classify_tools``
returns it as the second element).
"""
catalog: List[CatalogEntry] = []
for td in tool_defs:
fn = td.get("function") or {}
name = fn.get("name", "")
if not name:
continue
desc = fn.get("description", "") or ""
source, source_name = _classify_source(name)
entry = CatalogEntry(
name=name,
description=desc,
schema=td,
source=source,
source_name=source_name,
_tokens=_tokenize(_entry_search_text(td)),
)
catalog.append(entry)
return catalog
def _bm25_score(query_tokens: List[str], doc_tokens: List[str],
doc_lengths: List[int], avg_dl: float,
doc_freq: Dict[str, int], n_docs: int,
k1: float = 1.5, b: float = 0.75) -> float:
"""Standard BM25 score for one query against one document.
Inlined small implementation rather than adding a dependency. Performance
is fine — the catalog is bounded by N (tools) typically < 500, and we
score against the in-memory tokens list.
"""
if not doc_tokens:
return 0.0
score = 0.0
dl = len(doc_tokens)
# Pre-count tokens in the doc.
doc_tf: Dict[str, int] = {}
for t in doc_tokens:
doc_tf[t] = doc_tf.get(t, 0) + 1
for q in query_tokens:
df = doc_freq.get(q, 0)
if df == 0:
continue
idf = math.log(1 + (n_docs - df + 0.5) / (df + 0.5))
tf = doc_tf.get(q, 0)
if tf == 0:
continue
norm = tf * (k1 + 1) / (tf + k1 * (1 - b + b * dl / max(avg_dl, 1.0)))
score += idf * norm
return score
def search_catalog(catalog: List[CatalogEntry], query: str, limit: int = 5) -> List[CatalogEntry]:
"""Return the top-``limit`` catalog entries for ``query`` by BM25.
Falls back to a stable name-substring match when BM25 yields no hits
above zero. That ensures a query like ``"github"`` against a catalog
where every tool is named ``github_*`` still returns results — BM25
can underperform when query and document share only one token that
appears in every document (zero IDF).
"""
if not catalog or limit <= 0:
return []
query_tokens = _tokenize(query)
if not query_tokens:
return []
# Precompute doc statistics.
doc_lengths = [len(e._tokens) for e in catalog]
avg_dl = sum(doc_lengths) / max(len(doc_lengths), 1)
doc_freq: Dict[str, int] = {}
for e in catalog:
seen = set(e._tokens)
for t in seen:
doc_freq[t] = doc_freq.get(t, 0) + 1
n_docs = len(catalog)
scored: List[Tuple[float, CatalogEntry]] = []
for entry in catalog:
s = _bm25_score(query_tokens, entry._tokens, doc_lengths, avg_dl,
doc_freq, n_docs)
if s > 0:
scored.append((s, entry))
if not scored:
# Substring fallback against the original tool name.
ql = query.lower()
for entry in catalog:
if ql in entry.name.lower():
scored.append((0.1, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# ---------------------------------------------------------------------------
# Bridge tool schemas
# ---------------------------------------------------------------------------
def bridge_tool_schemas(deferred_count: int) -> List[Dict[str, Any]]:
"""Build the bridge tool schemas to inject in place of deferred tools.
The schemas are intentionally short — every byte added here is a byte
the user pays on every turn. Descriptions are tuned to be unambiguous
about the call sequence the model should follow.
"""
desc_search = (
f"Search {deferred_count} additional tools that are loaded on demand. "
"Returns up to ``limit`` matches with name and description. Follow "
f"with `{TOOL_DESCRIBE_NAME}` to load a tool's full parameter schema, "
f"then `{TOOL_CALL_NAME}` to invoke it. Tools listed at the top of this "
"system prompt are already available and do not need to be searched."
)
desc_describe = (
f"Load the full JSON schema for one tool returned by `{TOOL_SEARCH_NAME}`. "
f"Required before `{TOOL_CALL_NAME}` if the tool's parameters are unknown."
)
desc_call = (
"Invoke a deferred tool by name with the given arguments. Argument shape "
f"matches the tool's schema (see `{TOOL_DESCRIBE_NAME}`). Policy, hooks, "
"and approvals run exactly as for any directly-listed tool."
)
return [
{
"type": "function",
"function": {
"name": TOOL_SEARCH_NAME,
"description": desc_search,
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Keywords describing the capability you need (e.g. 'create github issue').",
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return. Default 5.",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": TOOL_DESCRIBE_NAME,
"description": desc_describe,
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Exact tool name (as returned by tool_search).",
},
},
"required": ["name"],
},
},
},
{
"type": "function",
"function": {
"name": TOOL_CALL_NAME,
"description": desc_call,
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Exact tool name to invoke.",
},
"arguments": {
"type": "object",
"description": "Arguments for the tool, matching its schema.",
},
},
"required": ["name", "arguments"],
},
},
},
]
# ---------------------------------------------------------------------------
# Public entry point: assemble tool-defs with optional tool search
# ---------------------------------------------------------------------------
@dataclass
class AssemblyResult:
"""Outcome of one assembly. Useful for tests and observability."""
tool_defs: List[Dict[str, Any]]
activated: bool
deferred_count: int = 0
deferred_tokens: int = 0
threshold_tokens: int = 0
def assemble_tool_defs(
tool_defs: List[Dict[str, Any]],
*,
context_length: Optional[int] = None,
config: Optional[ToolSearchConfig] = None,
) -> AssemblyResult:
"""Return the tool-defs list the model should actually see.
When tool search is inactive (off, no deferrable tools, or below
threshold), this is a passthrough. When active, MCP and plugin tools
are stripped from the visible list and replaced with the three bridge
tools. Core tools are *never* deferred regardless of config.
Idempotent: calling with bridge tools already in the input is a no-op
(they classify as non-core/non-deferrable but their names are reserved,
so they are filtered out of the deferrable set).
"""
if config is None:
config = load_config()
# Defensive: strip any bridge tools that may already be in the list
# (e.g. someone called assemble twice).
incoming = [td for td in tool_defs
if (td.get("function") or {}).get("name") not in BRIDGE_TOOL_NAMES]
visible, deferrable = classify_tools(incoming)
if not deferrable:
return AssemblyResult(tool_defs=incoming, activated=False)
deferrable_tokens = estimate_tokens_from_schemas(deferrable)
if not should_activate(config, deferrable_tokens, context_length):
return AssemblyResult(
tool_defs=incoming,
activated=False,
deferred_count=len(deferrable),
deferred_tokens=deferrable_tokens,
threshold_tokens=int((context_length or 0) * (config.threshold_pct / 100.0)),
)
bridge = bridge_tool_schemas(len(deferrable))
result = visible + bridge
threshold_tokens = int((context_length or 0) * (config.threshold_pct / 100.0))
logger.info(
"tool_search activated: %d core/visible tools kept, %d deferred (~%d tokens, threshold ~%d)",
len(visible), len(deferrable), deferrable_tokens, threshold_tokens,
)
return AssemblyResult(
tool_defs=result,
activated=True,
deferred_count=len(deferrable),
deferred_tokens=deferrable_tokens,
threshold_tokens=threshold_tokens,
)
# ---------------------------------------------------------------------------
# Bridge tool dispatch
# ---------------------------------------------------------------------------
def is_bridge_tool(name: str) -> bool:
return name in BRIDGE_TOOL_NAMES
def _format_search_hit(entry: CatalogEntry) -> Dict[str, Any]:
return {
"name": entry.name,
"source": entry.source,
"source_name": entry.source_name,
# Cap description so a chatty MCP server doesn't blow up the result.
"description": (entry.description or "")[:400],
}
def dispatch_tool_search(args: Dict[str, Any],
*,
current_tool_defs: List[Dict[str, Any]],
config: Optional[ToolSearchConfig] = None) -> str:
"""Execute the ``tool_search`` bridge tool. Returns a JSON string."""
if config is None:
config = load_config()
query = str(args.get("query") or "").strip()
if not query:
return json.dumps({"error": "query is required"}, ensure_ascii=False)
raw_limit = args.get("limit")
if raw_limit is None:
limit = config.search_default_limit
else:
limit = max(1, min(config.max_search_limit, _safe_int(raw_limit, config.search_default_limit)))
_, deferrable = classify_tools(current_tool_defs)
catalog = build_catalog(deferrable)
hits = search_catalog(catalog, query, limit=limit)
return json.dumps({
"query": query,
"total_available": len(catalog),
"matches": [_format_search_hit(h) for h in hits],
}, ensure_ascii=False)
def dispatch_tool_describe(args: Dict[str, Any],
*,
current_tool_defs: List[Dict[str, Any]]) -> str:
"""Execute the ``tool_describe`` bridge tool. Returns a JSON string."""
name = str(args.get("name") or "").strip()
if not name:
return json.dumps({"error": "name is required"}, ensure_ascii=False)
if not is_deferrable_tool_name(name):
return json.dumps({
"error": (
f"'{name}' is not a deferrable tool. If you see it in the tools list "
"already, call it directly; otherwise check the spelling against tool_search."
),
}, ensure_ascii=False)
_, deferrable = classify_tools(current_tool_defs)
for td in deferrable:
fn = td.get("function") or {}
if fn.get("name") == name:
return json.dumps({
"name": name,
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}, ensure_ascii=False)
return json.dumps({
"error": f"'{name}' is not currently available. Re-run tool_search to refresh.",
}, ensure_ascii=False)
def scoped_deferrable_names(tool_defs: List[Dict[str, Any]]) -> frozenset[str]:
"""Return the set of deferrable tool names present in ``tool_defs``.
``tool_defs`` is expected to be the *pre-assembly* tool list for the
current session's toolset scope (i.e. what
``get_tool_definitions(skip_tool_search_assembly=True)`` returns for the
session's enabled/disabled toolsets). The resulting set is the universe of
tools the session may legitimately reach through ``tool_call``. Used as a
scoping gate by both the ``model_tools`` bridge dispatch and the
``tool_executor`` unwrap so a restricted-toolset session can never invoke
an out-of-scope tool via the bridge.
"""
names: set[str] = set()
for td in tool_defs:
name = (td.get("function") or {}).get("name", "")
if name and is_deferrable_tool_name(name):
names.add(name)
return frozenset(names)
def resolve_underlying_call(args: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, Any], Optional[str]]:
"""Parse a ``tool_call`` invocation into (underlying_name, args, error_msg).
Used by:
* the dispatcher in ``model_tools.handle_function_call``,
* the display layer (so the activity feed shows the underlying tool),
* the trajectory recorder.
On parse error, returns ``(None, {}, error_message)``.
"""
name = str(args.get("name") or "").strip()
if not name:
return None, {}, "tool_call requires a 'name' argument"
if name in BRIDGE_TOOL_NAMES:
return None, {}, f"tool_call cannot invoke '{name}' (it is itself a bridge tool)"
raw_args = args.get("arguments")
if raw_args is None:
raw_args = {}
if isinstance(raw_args, str):
try:
raw_args = json.loads(raw_args)
except json.JSONDecodeError as e:
return None, {}, f"tool_call 'arguments' is not valid JSON: {e}"
if not isinstance(raw_args, dict):
return None, {}, "tool_call 'arguments' must be an object"
if not is_deferrable_tool_name(name):
return None, {}, (
f"'{name}' is not a deferrable tool. If it appears in the model-facing tools "
"list already, call it directly instead of via tool_call."
)
return name, raw_args, None
__all__ = [
"TOOL_SEARCH_NAME",
"TOOL_DESCRIBE_NAME",
"TOOL_CALL_NAME",
"BRIDGE_TOOL_NAMES",
"ToolSearchConfig",
"CatalogEntry",
"AssemblyResult",
"load_config",
"is_deferrable_tool_name",
"classify_tools",
"estimate_tokens_from_schemas",
"should_activate",
"build_catalog",
"search_catalog",
"bridge_tool_schemas",
"assemble_tool_defs",
"is_bridge_tool",
"dispatch_tool_search",
"dispatch_tool_describe",
"resolve_underlying_call",
"scoped_deferrable_names",
]