chore: remove dead code — 28 unused functions/classes across 16 files

Vulture + per-symbol verification (whole-repo grep incl. tests, string
literals, getattr, decorator/registry/argparse dispatch) confirmed each of
these has zero callers anywhere — not reachable via any dynamic-dispatch path,
not referenced by tests, not re-exported.

Removed:
- acp_adapter/tools.py: _build_patch_mode_content
- agent/anthropic_adapter.py: read_claude_managed_key (diagnostics-only, never called)
- agent/bedrock_adapter.py: get_bedrock_model_ids
- agent/browser_registry.py: get_active_browser_provider
- agent/chat_completion_helpers.py: _take_request_client (x2 nested closures, never invoked)
- gateway/platforms/weixin.py: _rewrite_headers_for_weixin, _rewrite_table_block_for_weixin
- hermes_cli/banner.py: _skin_branding
- hermes_cli/debug.py: _delete_hint
- hermes_cli/gateway.py: _setup_email, _setup_sms, _setup_yuanbao
  (platform keys absent from the _builtin_setup_fn dispatch dict; handled by
  the _setup_standard_platform fallback)
- hermes_cli/kanban_db.py: set_max_runtime, active_run
- hermes_cli/kanban_diagnostics.py: severity_of_highest, _latest_clean_event_ts
- hermes_cli/main.py: _build_provider_choices, cmd_portal
  (portal subcommand is wired via portal_cli.add_parser, not this wrapper)
- hermes_cli/model_switch.py: CustomAutoResult (orphaned by the switch_model() extraction)
- hermes_cli/models.py: format_model_pricing_table, fetch_nous_account_tier
- hermes_cli/portal_cli.py: _nous_portal_base_url
- hermes_cli/proxy/server.py: handle_models_fallback (defined but never registered on the router)
- tools/computer_use/cua_backend.py: _parse_element, _is_arm_mac
- tools/file_operations.py: _get_safe_write_root (prod uses the imported
  agent.file_safety.get_safe_write_root directly)
- tools/skills_tool.py: _load_category_description

Also dropped two imports left unused by the removals:
- tools/file_operations.py: get_safe_write_root alias
- tools/computer_use/cua_backend.py: import platform

Pure deletion: -551 LOC. No behavior change. Test files covering the edited
modules pass (640/640); the broader suite's pre-existing/env-dependent
failures reproduce unchanged on origin/main.
This commit is contained in:
kshitijk4poor
2026-05-29 15:36:58 +05:30
committed by Teknium
parent 0aa9f6acfa
commit dc235e93cb
19 changed files with 0 additions and 551 deletions

View File

@ -905,74 +905,6 @@ def _build_polished_completion_content(
if not text: if not text:
return None return None
return [_text(text)] return [_text(text)]
def _build_patch_mode_content(patch_text: str) -> List[Any]:
"""Parse V4A patch mode input into ACP diff blocks when possible."""
if not patch_text:
return [acp.tool_content(acp.text_block(""))]
try:
from tools.patch_parser import OperationType, parse_v4a_patch
operations, error = parse_v4a_patch(patch_text)
if error or not operations:
return [acp.tool_content(acp.text_block(patch_text))]
content: List[Any] = []
for op in operations:
if op.operation == OperationType.UPDATE:
old_chunks: list[str] = []
new_chunks: list[str] = []
for hunk in op.hunks:
old_lines = [line.content for line in hunk.lines if line.prefix in {" ", "-"}]
new_lines = [line.content for line in hunk.lines if line.prefix in {" ", "+"}]
if old_lines or new_lines:
old_chunks.append("\n".join(old_lines))
new_chunks.append("\n".join(new_lines))
old_text = "\n...\n".join(chunk for chunk in old_chunks if chunk)
new_text = "\n...\n".join(chunk for chunk in new_chunks if chunk)
if old_text or new_text:
content.append(
acp.tool_diff_content(
path=op.file_path,
old_text=old_text or None,
new_text=new_text or "",
)
)
continue
if op.operation == OperationType.ADD:
added_lines = [line.content for hunk in op.hunks for line in hunk.lines if line.prefix == "+"]
content.append(
acp.tool_diff_content(
path=op.file_path,
new_text="\n".join(added_lines),
)
)
continue
if op.operation == OperationType.DELETE:
content.append(
acp.tool_diff_content(
path=op.file_path,
old_text=f"Delete file: {op.file_path}",
new_text="",
)
)
continue
if op.operation == OperationType.MOVE:
content.append(
acp.tool_content(acp.text_block(f"Move file: {op.file_path} -> {op.new_path}"))
)
return content or [acp.tool_content(acp.text_block(patch_text))]
except Exception:
return [acp.tool_content(acp.text_block(patch_text))]
def _strip_diff_prefix(path: str) -> str: def _strip_diff_prefix(path: str) -> str:
raw = str(path or "").strip() raw = str(path or "").strip()
if raw.startswith(("a/", "b/")): if raw.startswith(("a/", "b/")):

View File

@ -892,22 +892,6 @@ def read_claude_code_credentials() -> Optional[Dict[str, Any]]:
logger.debug("Failed to read ~/.claude/.credentials.json: %s", e) logger.debug("Failed to read ~/.claude/.credentials.json: %s", e)
return None return None
def read_claude_managed_key() -> Optional[str]:
"""Read Claude's native managed key from ~/.claude.json for diagnostics only."""
claude_json = Path.home() / ".claude.json"
if claude_json.exists():
try:
data = json.loads(claude_json.read_text(encoding="utf-8"))
primary_key = data.get("primaryApiKey", "")
if isinstance(primary_key, str) and primary_key.strip():
return primary_key.strip()
except (json.JSONDecodeError, OSError, IOError) as e:
logger.debug("Failed to read ~/.claude.json: %s", e)
return None
def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool: def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool:
"""Check if Claude Code credentials have a non-expired access token.""" """Check if Claude Code credentials have a non-expired access token."""
import time import time

View File

@ -1167,18 +1167,6 @@ def _extract_provider_from_arn(arn: str) -> str:
""" """
match = re.search(r"foundation-model/([^.]+)", arn) match = re.search(r"foundation-model/([^.]+)", arn)
return match.group(1) if match else "" return match.group(1) if match else ""
def get_bedrock_model_ids(region: str) -> List[str]:
"""Return a flat list of available Bedrock model IDs for the given region.
Convenience wrapper around ``discover_bedrock_models()`` for use in
the model selection UI.
"""
models = discover_bedrock_models(region)
return [m["id"] for m in models]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Error classification — Bedrock-specific exceptions # Error classification — Bedrock-specific exceptions
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@ -184,39 +184,6 @@ def _resolve(configured: Optional[str]) -> Optional[BrowserProvider]:
return provider return provider
return None return None
def get_active_browser_provider() -> Optional[BrowserProvider]:
"""Resolve the currently-active cloud browser provider.
Reads ``browser.cloud_provider`` from config.yaml; falls back per the
module docstring. Returns None for local mode or when no provider is
available.
"""
try:
from hermes_cli.config import read_raw_config
cfg = read_raw_config()
browser_cfg = cfg.get("browser", {})
except Exception as exc:
logger.debug("Could not read browser config: %s", exc)
browser_cfg = {}
configured: Optional[str] = None
if isinstance(browser_cfg, dict) and "cloud_provider" in browser_cfg:
try:
from tools.tool_backend_helpers import normalize_browser_cloud_provider
configured = normalize_browser_cloud_provider(
browser_cfg.get("cloud_provider")
)
except Exception as exc:
logger.debug("normalize_browser_cloud_provider failed: %s", exc)
configured = None
return _resolve(configured)
def _reset_for_tests() -> None: def _reset_for_tests() -> None:
"""Clear the registry. **Test-only.**""" """Clear the registry. **Test-only.**"""
with _lock: with _lock:

View File

@ -149,13 +149,6 @@ def interruptible_api_call(agent, api_kwargs: dict):
request_client_holder["owner_tid"] = threading.get_ident() request_client_holder["owner_tid"] = threading.get_ident()
return client return client
def _take_request_client():
with request_client_lock:
client = request_client_holder.get("client")
request_client_holder["client"] = None
request_client_holder["owner_tid"] = None
return client
def _close_request_client_once(reason: str) -> None: def _close_request_client_once(reason: str) -> None:
# #29507: dispatch on the calling thread. # #29507: dispatch on the calling thread.
# #
@ -1628,13 +1621,6 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
request_client_holder["owner_tid"] = threading.get_ident() request_client_holder["owner_tid"] = threading.get_ident()
return client return client
def _take_request_client():
with request_client_lock:
client = request_client_holder.get("client")
request_client_holder["client"] = None
request_client_holder["owner_tid"] = None
return client
def _close_request_client_once(reason: str) -> None: def _close_request_client_once(reason: str) -> None:
# See #29507 explanation in the non-streaming variant above. A # See #29507 explanation in the non-streaming variant above. A
# stranger thread (the interrupt-check / stale-stream detector loop) # stranger thread (the interrupt-check / stale-stream detector loop)

View File

@ -656,54 +656,6 @@ def _split_table_row(line: str) -> List[str]:
if row.endswith("|"): if row.endswith("|"):
row = row[:-1] row = row[:-1]
return [cell.strip() for cell in row.split("|")] return [cell.strip() for cell in row.split("|")]
def _rewrite_headers_for_weixin(line: str) -> str:
match = _HEADER_RE.match(line)
if not match:
return line.rstrip()
level = len(match.group(1))
title = match.group(2).strip()
if level == 1:
return f"{title}"
return f"**{title}**"
def _rewrite_table_block_for_weixin(lines: List[str]) -> str:
if len(lines) < 2:
return "\n".join(lines)
headers = _split_table_row(lines[0])
body_rows = [_split_table_row(line) for line in lines[2:] if line.strip()]
if not headers or not body_rows:
return "\n".join(lines)
formatted_rows: List[str] = []
for row in body_rows:
pairs = []
for idx, header in enumerate(headers):
if idx >= len(row):
break
label = header or f"Column {idx + 1}"
value = row[idx].strip()
if value:
pairs.append((label, value))
if not pairs:
continue
if len(pairs) == 1:
label, value = pairs[0]
formatted_rows.append(f"- {label}: {value}")
continue
if len(pairs) == 2:
label, value = pairs[0]
other_label, other_value = pairs[1]
formatted_rows.append(f"- {label}: {value}")
formatted_rows.append(f" {other_label}: {other_value}")
continue
summary = " | ".join(f"{label}: {value}" for label, value in pairs)
formatted_rows.append(f"- {summary}")
return "\n".join(formatted_rows) if formatted_rows else "\n".join(lines)
def _normalize_markdown_blocks(content: str) -> str: def _normalize_markdown_blocks(content: str) -> str:
lines = content.splitlines() lines = content.splitlines()
result: List[str] = [] result: List[str] = []

View File

@ -50,17 +50,6 @@ def _skin_color(key: str, fallback: str) -> str:
return get_active_skin().get_color(key, fallback) return get_active_skin().get_color(key, fallback)
except Exception: except Exception:
return fallback return fallback
def _skin_branding(key: str, fallback: str) -> str:
"""Get a branding string from the active skin, or return fallback."""
try:
from hermes_cli.skin_engine import get_active_skin
return get_active_skin().get_branding(key, fallback)
except Exception:
return fallback
# ========================================================================= # =========================================================================
# ASCII Art & Branding # ASCII Art & Branding
# ========================================================================= # =========================================================================

View File

@ -256,17 +256,6 @@ def _schedule_auto_delete(urls: list[str], delay_seconds: int = _AUTO_DELETE_SEC
policy handles cleanup. policy handles cleanup.
""" """
_record_pending(urls, delay_seconds=delay_seconds) _record_pending(urls, delay_seconds=delay_seconds)
def _delete_hint(url: str) -> str:
"""Return a one-liner delete command for the given paste URL."""
paste_id = _extract_paste_id(url)
if paste_id:
return f"hermes debug delete {url}"
# dpaste.com — no API delete, expires on its own.
return "(auto-expires per dpaste.com policy)"
def _upload_paste_rs(content: str) -> str: def _upload_paste_rs(content: str) -> str:
"""Upload to paste.rs. Returns the paste URL. """Upload to paste.rs. Returns the paste URL.

View File

@ -3958,20 +3958,6 @@ def _setup_whatsapp():
from hermes_cli.main import cmd_whatsapp from hermes_cli.main import cmd_whatsapp
import argparse import argparse
cmd_whatsapp(argparse.Namespace()) cmd_whatsapp(argparse.Namespace())
def _setup_email():
"""Configure Email via the standard platform setup."""
email_platform = next(p for p in _PLATFORMS if p["key"] == "email")
_setup_standard_platform(email_platform)
def _setup_sms():
"""Configure SMS (Twilio) via the standard platform setup."""
sms_platform = next(p for p in _PLATFORMS if p["key"] == "sms")
_setup_standard_platform(sms_platform)
def _setup_dingtalk(): def _setup_dingtalk():
"""Configure DingTalk — QR scan (recommended) or manual credential entry.""" """Configure DingTalk — QR scan (recommended) or manual credential entry."""
from hermes_cli.setup import ( from hermes_cli.setup import (
@ -4142,14 +4128,6 @@ def _setup_wecom():
print() print()
print_success("💬 WeCom configured!") print_success("💬 WeCom configured!")
def _setup_yuanbao():
"""Configure Yuanbao via the standard platform setup."""
yuanbao_platform = next(p for p in _PLATFORMS if p["key"] == "yuanbao")
_setup_standard_platform(yuanbao_platform)
def _is_service_installed() -> bool: def _is_service_installed() -> bool:
"""Check if the gateway is installed as a system service.""" """Check if the gateway is installed as a system service."""
if supports_systemd_services(): if supports_systemd_services():

View File

@ -4863,23 +4863,6 @@ def detect_stale_running(
# spawn_failed / timed_out / crashed counters. # spawn_failed / timed_out / crashed counters.
return reclaimed return reclaimed
def set_max_runtime(
conn: sqlite3.Connection,
task_id: str,
seconds: Optional[int],
) -> bool:
"""Set or clear the per-task max_runtime_seconds. Returns True on
success."""
with write_txn(conn):
cur = conn.execute(
"UPDATE tasks SET max_runtime_seconds = ? WHERE id = ?",
(int(seconds) if seconds is not None else None, task_id),
)
return cur.rowcount == 1
def _error_fingerprint(error_text: str) -> str: def _error_fingerprint(error_text: str) -> str:
"""Normalize an error message for grouping identical failures. """Normalize an error message for grouping identical failures.
@ -6965,18 +6948,6 @@ def get_run(conn: sqlite3.Connection, run_id: int) -> Optional[Run]:
"SELECT * FROM task_runs WHERE id = ?", (int(run_id),), "SELECT * FROM task_runs WHERE id = ?", (int(run_id),),
).fetchone() ).fetchone()
return Run.from_row(row) if row else None return Run.from_row(row) if row else None
def active_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]:
"""Return the currently-open run for ``task_id`` (``ended_at IS NULL``)."""
row = conn.execute(
"SELECT * FROM task_runs WHERE task_id = ? AND ended_at IS NULL "
"ORDER BY started_at DESC LIMIT 1",
(task_id,),
).fetchone()
return Run.from_row(row) if row else None
def latest_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]: def latest_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]:
"""Return the most recent run regardless of outcome (active or closed).""" """Return the most recent run regardless of outcome (active or closed)."""
row = conn.execute( row = conn.execute(

View File

@ -191,23 +191,6 @@ def _active_hallucination_events(
elif k == kind: elif k == kind:
active.append(ev) active.append(ev)
return active return active
def _latest_clean_event_ts(events: Iterable[Any]) -> int:
"""Timestamp of the most recent clean completion / edit event.
Kept for general "has this task ever been successfully completed"
lookups; hallucination rules use ``_active_hallucination_events``
instead because they need strict ordering.
"""
latest = 0
for ev in events:
if _event_kind(ev) in {"completed", "edited"}:
t = _event_ts(ev)
latest = max(latest, t)
return latest
# Standard always-available actions. Every diagnostic can offer these as # Standard always-available actions. Every diagnostic can offer these as
# fallbacks regardless of kind — they're the two baseline recovery # fallbacks regardless of kind — they're the two baseline recovery
# primitives the kernel supports. # primitives the kernel supports.
@ -1122,16 +1105,3 @@ def compute_task_diagnostics(
) )
) )
return out return out
def severity_of_highest(diagnostics: Iterable[Diagnostic]) -> Optional[str]:
"""Highest severity present in the list, or None if empty. Useful
for card badges that need a single color."""
highest_idx = -1
highest = None
for d in diagnostics:
idx = SEVERITY_ORDER.index(d.severity) if d.severity in SEVERITY_ORDER else -1
if idx > highest_idx:
highest_idx = idx
highest = d.severity
return highest

View File

@ -6158,15 +6158,6 @@ def cmd_webhook(args):
from hermes_cli.webhook import webhook_command from hermes_cli.webhook import webhook_command
webhook_command(args) webhook_command(args)
def cmd_portal(args):
"""Nous Portal status and Tool Gateway routing surface."""
from hermes_cli.portal_cli import portal_command
return portal_command(args)
def cmd_slack(args): def cmd_slack(args):
"""Slack integration helpers. """Slack integration helpers.
@ -10975,24 +10966,6 @@ def cmd_logs(args):
since=getattr(args, "since", None), since=getattr(args, "since", None),
component=getattr(args, "component", None), component=getattr(args, "component", None),
) )
def _build_provider_choices() -> list[str]:
"""Build the --provider choices list from CANONICAL_PROVIDERS + 'auto'."""
try:
from hermes_cli.models import CANONICAL_PROVIDERS as _cp
return ["auto"] + [p.slug for p in _cp]
except Exception:
# Fallback: static list guarantees the CLI always works
return [
"auto", "openrouter", "nous", "openai-codex", "xai-oauth", "copilot-acp", "copilot",
"anthropic", "gemini", "google-gemini-cli", "xai", "bedrock", "azure-foundry",
"ollama-cloud", "huggingface", "zai", "kimi-coding", "kimi-coding-cn",
"stepfun", "minimax", "minimax-cn", "kilocode", "novita", "xiaomi", "arcee",
"nvidia", "deepseek", "alibaba", "qwen-oauth", "opencode-zen", "opencode-go",
]
# Top-level subcommands that argparse knows about WITHOUT running plugin # Top-level subcommands that argparse knows about WITHOUT running plugin
# discovery. Used to short-circuit eager plugin imports (which can take # discovery. Used to short-circuit eager plugin imports (which can take
# 500ms+ pulling in google.cloud.pubsub_v1, aiohttp, grpc, etc.) when the # 500ms+ pulling in google.cloud.pubsub_v1, aiohttp, grpc, etc.) when the

View File

@ -277,19 +277,6 @@ class ModelSwitchResult:
capabilities: Optional[ModelCapabilities] = None capabilities: Optional[ModelCapabilities] = None
model_info: Optional[ModelInfo] = None model_info: Optional[ModelInfo] = None
is_global: bool = False is_global: bool = False
@dataclass
class CustomAutoResult:
"""Result of switching to bare 'custom' provider with auto-detect."""
success: bool
model: str = ""
base_url: str = ""
api_key: str = ""
error_message: str = ""
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Flag parsing # Flag parsing
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@ -484,41 +484,6 @@ def _is_model_free(model_id: str, pricing: dict[str, dict[str, str]]) -> bool:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Nous Portal account tier detection # Nous Portal account tier detection
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def fetch_nous_account_tier(access_token: str, portal_base_url: str = "") -> dict[str, Any]:
"""Fetch the user's Nous Portal account/subscription info.
Calls ``<portal>/api/oauth/account`` with the OAuth access token.
Returns the parsed JSON dict on success, e.g.::
{
"subscription": {
"plan": "Plus",
"tier": 2,
"monthly_charge": 20,
"credits_remaining": 1686.60,
...
},
...
}
Returns an empty dict on any failure (network, auth, parse).
"""
base = (portal_base_url or "https://portal.nousresearch.com").rstrip("/")
url = f"{base}/api/oauth/account"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=8) as resp:
return json.loads(resp.read().decode())
except Exception:
return {}
def is_nous_free_tier(account_info: dict[str, Any]) -> bool: def is_nous_free_tier(account_info: dict[str, Any]) -> bool:
"""Return True if the account info indicates a free (unpaid) tier. """Return True if the account info indicates a free (unpaid) tier.
@ -1221,70 +1186,6 @@ def _format_price_per_mtok(per_token_str: str) -> str:
return "free" return "free"
per_m = val * 1_000_000 per_m = val * 1_000_000
return f"${per_m:.2f}" return f"${per_m:.2f}"
def format_model_pricing_table(
models: list[tuple[str, str]],
pricing_map: dict[str, dict[str, str]],
current_model: str = "",
indent: str = " ",
) -> list[str]:
"""Build a column-aligned model+pricing table for terminal display.
Returns a list of pre-formatted lines ready to print.
*models* is ``[(model_id, description), ...]``.
"""
if not models:
return []
# Build rows: (model_id, input_price, output_price, cache_price, is_current)
rows: list[tuple[str, str, str, str, bool]] = []
has_cache = False
for mid, _desc in models:
is_cur = mid == current_model
p = pricing_map.get(mid)
if p:
inp = _format_price_per_mtok(p.get("prompt", ""))
out = _format_price_per_mtok(p.get("completion", ""))
cache_read = p.get("input_cache_read", "")
cache = _format_price_per_mtok(cache_read) if cache_read else ""
if cache:
has_cache = True
else:
inp, out, cache = "", "", ""
rows.append((mid, inp, out, cache, is_cur))
name_col = max(len(r[0]) for r in rows) + 2
# Compute price column widths from the actual data so decimals align
price_col = max(
max((len(r[1]) for r in rows if r[1]), default=4),
max((len(r[2]) for r in rows if r[2]), default=4),
3, # minimum: "In" / "Out" header
)
cache_col = max(
max((len(r[3]) for r in rows if r[3]), default=4),
5, # minimum: "Cache" header
) if has_cache else 0
lines: list[str] = []
# Header
if has_cache:
lines.append(f"{indent}{'Model':<{name_col}} {'In':>{price_col}} {'Out':>{price_col}} {'Cache':>{cache_col}} /Mtok")
lines.append(f"{indent}{'-' * name_col} {'-' * price_col} {'-' * price_col} {'-' * cache_col}")
else:
lines.append(f"{indent}{'Model':<{name_col}} {'In':>{price_col}} {'Out':>{price_col}} /Mtok")
lines.append(f"{indent}{'-' * name_col} {'-' * price_col} {'-' * price_col}")
for mid, inp, out, cache, is_cur in rows:
marker = " ← current" if is_cur else ""
if has_cache:
lines.append(f"{indent}{mid:<{name_col}} {inp:>{price_col}} {out:>{price_col}} {cache:>{cache_col}}{marker}")
else:
lines.append(f"{indent}{mid:<{name_col}} {inp:>{price_col}} {out:>{price_col}}{marker}")
return lines
def fetch_models_with_pricing( def fetch_models_with_pricing(
api_key: str | None = None, api_key: str | None = None,
base_url: str = "https://openrouter.ai/api", base_url: str = "https://openrouter.ai/api",

View File

@ -20,21 +20,6 @@ from hermes_cli.config import load_config
DEFAULT_PORTAL_URL = "https://portal.nousresearch.com" DEFAULT_PORTAL_URL = "https://portal.nousresearch.com"
SUBSCRIPTION_URL = "https://portal.nousresearch.com/manage-subscription" SUBSCRIPTION_URL = "https://portal.nousresearch.com/manage-subscription"
DOCS_URL = "https://hermes-agent.nousresearch.com/docs/user-guide/features/tool-gateway" DOCS_URL = "https://hermes-agent.nousresearch.com/docs/user-guide/features/tool-gateway"
def _nous_portal_base_url() -> str:
"""Resolve the Portal base URL from auth state or default."""
try:
from hermes_cli.auth import get_nous_auth_status
status = get_nous_auth_status() or {}
url = status.get("portal_base_url")
if isinstance(url, str) and url.strip():
return url.rstrip("/")
except Exception:
pass
return DEFAULT_PORTAL_URL
def _cmd_status(args) -> int: def _cmd_status(args) -> int:
"""Show Portal auth + Tool Gateway routing summary.""" """Show Portal auth + Tool Gateway routing summary."""
from hermes_cli.auth import get_nous_auth_status from hermes_cli.auth import get_nous_auth_status

View File

@ -104,17 +104,6 @@ def create_app(adapter: UpstreamAdapter) -> "web.Application":
} }
) )
async def handle_models_fallback(request: "web.Request") -> "web.Response":
# Most clients hit /v1/models on startup. If the upstream doesn't
# serve /models, synthesize a minimal response so clients don't
# crash. The actual forwarding path handles /models when allowed.
return web.json_response(
{
"object": "list",
"data": [],
}
)
async def handle_proxy(request: "web.Request") -> "web.StreamResponse": async def handle_proxy(request: "web.Request") -> "web.StreamResponse":
# Extract the path *after* /v1 # Extract the path *after* /v1
rel_path = request.match_info.get("tail", "") rel_path = request.match_info.get("tail", "")

View File

@ -22,7 +22,6 @@ import base64
import json import json
import logging import logging
import os import os
import platform
import re import re
import shutil import shutil
import sys import sys
@ -77,12 +76,6 @@ _ELEMENT_LINE_RE = re.compile(
def _is_macos() -> bool: def _is_macos() -> bool:
return sys.platform == "darwin" return sys.platform == "darwin"
def _is_arm_mac() -> bool:
return _is_macos() and platform.machine() == "arm64"
def cua_driver_binary_available() -> bool: def cua_driver_binary_available() -> bool:
"""True if `cua-driver` is on $PATH or HERMES_CUA_DRIVER_CMD resolves.""" """True if `cua-driver` is on $PATH or HERMES_CUA_DRIVER_CMD resolves."""
return bool(shutil.which(_CUA_DRIVER_CMD)) return bool(shutil.which(_CUA_DRIVER_CMD))
@ -705,29 +698,3 @@ class CuaDriverBackend(ComputerUseBackend):
message = data message = data
return ActionResult(ok=ok, action=name, message=message, return ActionResult(ok=ok, action=name, message=message,
meta=data if isinstance(data, dict) else {}) meta=data if isinstance(data, dict) else {})
def _parse_element(d: Dict[str, Any]) -> UIElement:
bounds = d.get("bounds") or (0, 0, 0, 0)
if isinstance(bounds, dict):
bounds = (
int(bounds.get("x", 0)),
int(bounds.get("y", 0)),
int(bounds.get("w", bounds.get("width", 0))),
int(bounds.get("h", bounds.get("height", 0))),
)
elif isinstance(bounds, (list, tuple)) and len(bounds) == 4:
bounds = tuple(int(v) for v in bounds)
else:
bounds = (0, 0, 0, 0)
return UIElement(
index=int(d.get("index", 0)),
role=str(d.get("role", "") or ""),
label=str(d.get("label", "") or ""),
bounds=bounds, # type: ignore[arg-type]
app=str(d.get("app", "") or ""),
pid=int(d.get("pid", 0) or 0),
window_id=int(d.get("windowId", 0) or 0),
attributes={k: v for k, v in d.items()
if k not in {"index", "role", "label", "bounds", "app", "pid", "windowId"}},
)

View File

@ -37,7 +37,6 @@ from tools.binary_extensions import BINARY_EXTENSIONS
from agent.file_safety import ( from agent.file_safety import (
build_write_denied_paths, build_write_denied_paths,
build_write_denied_prefixes, build_write_denied_prefixes,
get_safe_write_root as _shared_get_safe_write_root,
is_write_denied as _shared_is_write_denied, is_write_denied as _shared_is_write_denied,
) )
@ -112,19 +111,6 @@ def _normalize_line_endings(text: str, target: str) -> str:
if target == "\r\n": if target == "\r\n":
return lf_normalized.replace("\n", "\r\n") return lf_normalized.replace("\n", "\r\n")
return text return text
def _get_safe_write_root() -> Optional[str]:
"""Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.
When set, all write_file/patch operations are constrained to this
directory tree. Writes outside it are denied even if the target is
not on the static deny list. Opt-in hardening for gateway/messaging
deployments that should only touch a workspace checkout.
"""
return _shared_get_safe_write_root()
def _is_write_denied(path: str) -> bool: def _is_write_denied(path: str) -> bool:
"""Return True if path is on the write deny list.""" """Return True if path is on the write deny list."""
return _shared_is_write_denied(path) return _shared_is_write_denied(path)

View File

@ -627,51 +627,6 @@ def _find_all_skills(*, skip_disabled: bool = False) -> List[Dict[str, Any]]:
def _sort_skills(skills: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _sort_skills(skills: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Keep every skill listing path ordered the same way.""" """Keep every skill listing path ordered the same way."""
return sorted(skills, key=lambda s: (s.get("category") or "", s["name"])) return sorted(skills, key=lambda s: (s.get("category") or "", s["name"]))
def _load_category_description(category_dir: Path) -> Optional[str]:
"""
Load category description from DESCRIPTION.md if it exists.
Args:
category_dir: Path to the category directory
Returns:
Description string or None if not found
"""
desc_file = category_dir / "DESCRIPTION.md"
if not desc_file.exists():
return None
try:
content = desc_file.read_text(encoding="utf-8")
# Parse frontmatter if present
frontmatter, body = _parse_frontmatter(content)
# Prefer frontmatter description, fall back to first non-header line
description = frontmatter.get("description", "")
if not description:
for line in body.strip().split("\n"):
line = line.strip()
if line and not line.startswith("#"):
description = line
break
# Truncate to reasonable length
if len(description) > MAX_DESCRIPTION_LENGTH:
description = description[: MAX_DESCRIPTION_LENGTH - 3] + "..."
return description if description else None
except (UnicodeDecodeError, PermissionError) as e:
logger.debug("Failed to read category description %s: %s", desc_file, e)
return None
except Exception as e:
logger.warning(
"Error parsing category description %s: %s", desc_file, e, exc_info=True
)
return None
def skills_list(category: str = None, task_id: str = None) -> str: def skills_list(category: str = None, task_id: str = None) -> str:
""" """
List all available skills (progressive disclosure tier 1 - minimal metadata). List all available skills (progressive disclosure tier 1 - minimal metadata).