* fix(desktop): triage 24 GUI quality-of-life fixes across sidebar, composer, tool cards, messaging, and platform plumbing
A grab-bag of high-leverage UX fixes plus a few backend touches that the
GUI needs to behave correctly on Windows.
Sidebar / sessions
- Decrement $sessionsTotal on delete + archive so "Load N more" stops
claiming removed rows are still on the server.
- Hide the "Group by workspace" toggle when no unpinned sessions exist.
- Accept Cmd/Ctrl+N as a "new session" accelerator (in addition to bare
Shift+N), and render the kbd hint per-platform.
- Switch the statusbar to overflow-x-clip so untitled sessions don't
paint a horizontal scrollbar at the bottom of the window.
Messaging + Cron
- Add [-webkit-app-region: no-drag] to the page-search input so clicks
reach the field instead of routing to the OS window-drag handler.
- Replace single-letter PlatformAvatar with brand glyphs from
@icons-pack/react-simple-icons (telegram, discord, matrix, signal,
whatsapp, mattermost, wechat, qq, ...). Letter monogram fallback for
Slack / Dingtalk / Feishu / WeCom (removed from Simple Icons at brand
owner request).
- Drop the duplicate "Create first cron" button in the empty state.
Composer
- Dedupe pasted images by (name, size, lastModified, type) instead of
Blob identity; Chromium hands us the same screenshot via both
clipboard.items and clipboard.files with fresh File instances.
- Enable spellcheck on the contentEditable, configure Chromium's
spellchecker with the system locale on whenReady, and add
replaceMisspelling + "Add to dictionary" entries to the context menu.
- Render user messages through a minimal markdown pipeline (inline
backtick code + fenced ``` blocks) while keeping @file:/@image:
directive chips intact.
- max-h-[60vh] overflow-y-auto + collisionPadding on the prompt-snippet
submenu.
- Bake cursor-pointer into the <Button> primitive (with
disabled:cursor-default) and into titlebarButtonClass.
Dialogs + tabs + version
- Default DialogContent now has max-h-[85vh] overflow-y-auto so long
bodies scroll instead of falling off-screen.
- Right-rail preview tabs close on middle-click (button === 1), with an
onMouseDown swallow to suppress Chromium autoscroll.
- New refreshDesktopVersion() helper called from About mount, after
every update check, and on throttled window focus so About reflects
the just-installed binary.
Keys + Artifacts + Terminal
- Drop the global "Show advanced" toggle in KeysSettings. Provider
groups now default-expand when they have any key set.
- Extend openExternalUrl to handle file:// via shell.openPath, with
showItemInFolder fallback when the OS can't open the file.
- New lib/ansi.ts SGR parser + <AnsiText> component, applied to
terminal/execute_code tool output.
- ToolView gained stdout / stderr / rendersAnsi; tool-fallback renders
the two streams as separate labeled blocks with stderr in a neutral
tone (not destructive — many CLIs log info on stderr).
- Drop 'stderr' from ERROR_MSG_KEYS in tool-result-summary.
Paths + platform
- resolveHermesCwd skips process.cwd() when packaged and prefers a
user-configurable default project directory.
- New hermes:setting:defaultProjectDir:{get,set,pick} IPC handlers +
preload bridge + global.d.ts typing + a "Default project directory"
row in Sessions settings.
- FileOperations.delete_path(path, recursive=True) on the abstract
base; ShellFileOperations.delete_file rewritten to run a cross-
platform python3 -c snippet so deletes work on Windows shells (which
have no rm/rm -rf). Fallback to `python` when `python3` isn't on PATH.
- README troubleshooting block split into macOS/Linux + Windows
PowerShell recipes.
- Tightened renderer favicon links in index.html + added color-scheme
and theme-color meta.
Backend lifecycle (renderer-side mitigation)
- New noteSessionActivity() heartbeat + session.ts watchdog: an
8-minute silence on the stream auto-clears stuck $workingSessionIds
entries so "Session Busy" never gets permanently wedged. Wired into
useSessionStateCache so every state update refreshes the timer.
i18n spike
- docs/desktop-i18n-rfc.md scoping a future language-switcher PR
(recommends react-intl, audits IME/RTL/CJK in the composer +
chat bubbles, 4-PR rollout plan, ~3-4 eng-weeks for the first
non-English locale).
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(desktop): replace native OS scrollbar in portaled dropdown menus
Radix's DropdownMenuPrimitive.Portal renders content under document.body,
outside the `.scrollbar-dt` scope on #root. Whenever a menu's max-height
clipped its content (even by a pixel — common for the composer "+" menu
that opens upward near the bottom of the window), the user saw the OS's
chunky native scrollbar painted across the whole menu.
Bake a thin, slot-styled scrollbar onto DropdownMenuContent and
DropdownMenuSubContent via [scrollbar-width:thin] + WebKit pseudo-element
arbitrary variants. The submenu also gets a max-h tied to
--radix-dropdown-menu-content-available-height so long snippet lists scroll
cleanly instead of running off the bottom of the viewport. Drop the now-
redundant max-h-[60vh] override on the prompt-snippet submenu.
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(desktop): unbork dropdown menu — submenu opens, parent isn't a circle
Two regressions from the previous dropdown-scrollbar fix:
- The parent menu rendered as a rounded oval. Long Tailwind v4 arbitrary-
variant strings like [&::-webkit-scrollbar-thumb]:rounded-full inside a
cn() call were being mis-resolved so the `rounded-full` leaked onto the
menu container itself. Replaced the whole tower of arbitrary variants
with a real `.dt-portal-scrollbar` class in styles.css that mirrors what
`.scrollbar-dt` already does for #root descendants. Plain CSS, no Tailwind
parser ambiguity.
- The Prompt snippets submenu didn't open. Radix publishes
--radix-dropdown-menu-content-available-height on Content but NOT on
SubContent, so the `max-h` bound to that variable computed to 0 and the
submenu collapsed to zero height. Switched SubContent to a fixed
max-h-80 (≈20rem) which is plenty for a snippet list and never collapses.
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(desktop): promote prompt snippets from Radix submenu to a real Dialog
The submenu refused to open when the parent dropdown was anchored at the
bottom of the window (composer "+" button) — Radix's collision detection +
SubContent positioning was fighting us. Rather than keep tuning side /
sideOffset / collisionPadding / max-h until something stuck, replace the
DropdownMenuSub with a clicked DropdownMenuItem that opens a proper
Dialog.
Side benefits over the submenu:
- Each snippet gets a description line, so a glance is enough to pick one.
- Focus management is handled by Dialog automatically.
- Easy to grow (search, custom user snippets, categories) without
another round of Radix positioning bugs.
Also extract types/interfaces to the bottom of the file per workspace
convention.
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(desktop): move cron 'New cron' button off the top bar into the body
Reverses the previous direction on cron empty-state dedup. The body
button is more discoverable for first-time users (it's anchored next to
the "No scheduled jobs yet" copy that explains the feature) and frees
the top bar from a global CTA that wasn't pulling its weight.
- Empty (zero jobs): EmptyState renders the "Create first cron" button
again, like the original design.
- Empty (search filtered out all jobs): no button, just "Try a broader
search query" copy.
- Has jobs: small inline header above the list shows `N/M active` plus
a single "New cron" button (right-aligned). The rows themselves
already cover edit/pause/trigger/delete, so this is the only "create"
affordance.
Also drop the dead `<div className="hidden">…</div>` enabledCount line
the previous patch left behind; the count is now visible in the new
header instead of hidden.
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(desktop): address Copilot review on PR 37536
- sessions-settings: guard the WHOLE bridge call rather than chaining
`?.settings.foo().then(...)` — the latter throws when
`window.hermesDesktop` is undefined (non-Electron / Vitest contexts)
because the chain short-circuits to `undefined.then(...)`.
- file_operations: drop `Path.unlink(missing_ok=True)` (Py>=3.8) so the
generated delete snippet still works on remote backends running
Python 3.7. The existing FileNotFoundError handler covers the same
case and works back to 3.4.
- ansi.test.ts: add focused Vitest coverage for the SGR parser
(basic/bright colors, bold toggles, default-fg reset, coalescing,
256-color / truecolor arg consumption, non-SGR CSI drop, empty SGR
full-reset) so future refactors can't silently regress terminal
rendering.
Co-authored-by: Cursor <cursoragent@cursor.com>
* fix(desktop/updates): swallow refreshDesktopVersion bridge errors
`refreshDesktopVersion()` is called best-effort with `void` from
`checkUpdates()`, `startUpdatePoller()`, and the window focus handler.
If the IPC bridge rejects (main process shutting down during reload,
bridge not yet ready on first paint), the rejection surfaces as an
unhandled promise rejection in the renderer. Wrap the call in try/catch
and return null on failure so callers can keep the existing
fire-and-forget pattern safely.
Co-authored-by: Cursor <cursoragent@cursor.com>
* chore(desktop): drop work duplicated by other in-flight PRs
- composer/text-utils.ts: revert paste-image dedupe — PR #37596
ships the same fix with a cleaner content-key approach and a
Vitest file (text-utils.test.ts). Letting that PR own the change.
- docs/desktop-i18n-rfc.md: delete the i18n scoping RFC — PR #37568
has already shipped a working i18n surface (homegrown nanostores
`t()` helper over en/zh dictionaries), so the RFC's framework
recommendation (`react-intl`) is now obsolete and would just
contradict the implementation that's actually landing.
Co-authored-by: Cursor <cursoragent@cursor.com>
---------
Co-authored-by: Cursor <cursoragent@cursor.com>
2205 lines
93 KiB
Python
2205 lines
93 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
File Operations Module
|
|
|
|
Provides file manipulation capabilities (read, write, patch, search) that work
|
|
across all terminal backends (local, docker, ssh, singularity, modal, daytona).
|
|
|
|
The key insight is that all file operations can be expressed as shell commands,
|
|
so we wrap the terminal backend's execute() interface to provide a unified file API.
|
|
|
|
Usage:
|
|
from tools.file_operations import ShellFileOperations
|
|
from tools.terminal_tool import _active_environments
|
|
|
|
# Get file operations for a terminal environment
|
|
file_ops = ShellFileOperations(terminal_env)
|
|
|
|
# Read a file
|
|
result = file_ops.read_file("/path/to/file.py")
|
|
|
|
# Write a file
|
|
result = file_ops.write_file("/path/to/new.py", "print('hello')")
|
|
|
|
# Search for content
|
|
result = file_ops.search("TODO", path=".", file_glob="*.py")
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import difflib
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, List, Dict, Any
|
|
from pathlib import Path
|
|
from tools.binary_extensions import BINARY_EXTENSIONS
|
|
|
|
from agent.file_safety import (
|
|
build_write_denied_paths,
|
|
build_write_denied_prefixes,
|
|
is_write_denied as _shared_is_write_denied,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Write-path deny list — blocks writes to sensitive system/credential files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_HOME = str(Path.home())
|
|
|
|
WRITE_DENIED_PATHS = build_write_denied_paths(_HOME)
|
|
|
|
WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME)
|
|
|
|
|
|
_OSC_SEQUENCE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
|
|
_FENCE_MARKER_RE = re.compile(r"'?\x07?__HERMES_FENCE_[A-Za-z0-9]+__\x07?'?")
|
|
|
|
|
|
def _strip_terminal_fence_leaks(text: str) -> str:
|
|
"""Strip leaked terminal fence wrappers from file read output."""
|
|
if not text:
|
|
return text
|
|
|
|
cleaned_lines: List[str] = []
|
|
for line in text.splitlines(keepends=True):
|
|
had_terminal_wrapper = "__HERMES_FENCE_" in line or "\x1b]" in line
|
|
cleaned = _OSC_SEQUENCE_RE.sub("", line)
|
|
cleaned = _FENCE_MARKER_RE.sub("", cleaned)
|
|
cleaned = cleaned.replace("\x07", "")
|
|
if had_terminal_wrapper and cleaned.strip("'\r\n\t ") == "":
|
|
continue
|
|
cleaned_lines.append(cleaned)
|
|
return "".join(cleaned_lines)
|
|
|
|
|
|
def _detect_line_ending(sample: str) -> Optional[str]:
|
|
"""Return the dominant line ending in ``sample`` or None if undetermined.
|
|
|
|
Looks at the first few line breaks and picks ``\\r\\n`` if any are
|
|
present (Windows / DOS), otherwise ``\\n`` (Unix). Returns ``None``
|
|
for empty / single-line content where we can't tell. Used to
|
|
preserve the file's original line endings across write_file and
|
|
patch operations — without this the agent's bare-LF tool args
|
|
silently normalize Windows-line-ending files, and patch produces
|
|
mixed endings when only a substituted region changes.
|
|
"""
|
|
if not sample:
|
|
return None
|
|
# Look at the first chunk — enough to tell, cheap to scan.
|
|
head = sample[:4096]
|
|
if "\r\n" in head:
|
|
return "\r\n"
|
|
if "\n" in head:
|
|
return "\n"
|
|
return None
|
|
|
|
|
|
def _normalize_line_endings(text: str, target: str) -> str:
|
|
"""Convert all line endings in ``text`` to ``target`` (``\\n`` or ``\\r\\n``).
|
|
|
|
Idempotent: ``_normalize_line_endings(_normalize_line_endings(x, "\\r\\n"), "\\r\\n") == _normalize_line_endings(x, "\\r\\n")``.
|
|
Strips lone ``\\r`` characters as well, so mixed-ending content is
|
|
homogenized in a single pass.
|
|
"""
|
|
# First collapse to LF (handle CRLF and lone CR), then expand if target
|
|
# is CRLF. Order matters: doing the replacements separately would
|
|
# double-convert a CRLF -> LFLF.
|
|
lf_normalized = text.replace("\r\n", "\n").replace("\r", "\n")
|
|
if target == "\n":
|
|
return lf_normalized
|
|
if target == "\r\n":
|
|
return lf_normalized.replace("\n", "\r\n")
|
|
return text
|
|
|
|
|
|
# UTF-8 byte order mark. Some Windows editors (Notepad, older Visual Studio,
|
|
# some PowerShell redirects) prepend this invisible 3-byte marker
|
|
# (EF BB BF == U+FEFF) to UTF-8 text files. It renders as nothing but is a
|
|
# real character at the start of the decoded string, so without handling it:
|
|
# - read_file would surface a stray U+FEFF as the first character (the
|
|
# model sees a phantom char before `import ...`), and
|
|
# - patch matches against the true first line would miss, and write_file
|
|
# would silently drop or double the marker on rewrite.
|
|
# We strip it on read so the model sees clean content, and restore it on
|
|
# write when the original file had one — exactly mirroring the line-ending
|
|
# preservation above (detect on disk, preserve across the edit).
|
|
_UTF8_BOM = "\ufeff"
|
|
|
|
|
|
def _strip_bom(text: str) -> tuple[str, bool]:
|
|
"""Return (text-without-leading-BOM, had_bom).
|
|
|
|
Only a single leading BOM is stripped; a BOM appearing mid-content is
|
|
left alone (it's legitimate data there, not a file marker).
|
|
"""
|
|
if text and text.startswith(_UTF8_BOM):
|
|
return text[len(_UTF8_BOM):], True
|
|
return text, False
|
|
|
|
|
|
def _has_bom(text: Optional[str]) -> bool:
|
|
"""True if ``text`` begins with a UTF-8 BOM."""
|
|
return bool(text) and text.startswith(_UTF8_BOM)
|
|
|
|
|
|
def _is_write_denied(path: str) -> bool:
|
|
"""Return True if path is on the write deny list."""
|
|
return _shared_is_write_denied(path)
|
|
|
|
|
|
# =============================================================================
|
|
# Result Data Classes
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class ReadResult:
|
|
"""Result from reading a file."""
|
|
content: str = ""
|
|
total_lines: int = 0
|
|
file_size: int = 0
|
|
truncated: bool = False
|
|
hint: Optional[str] = None
|
|
is_binary: bool = False
|
|
is_image: bool = False
|
|
base64_content: Optional[str] = None
|
|
mime_type: Optional[str] = None
|
|
dimensions: Optional[str] = None # For images: "WIDTHxHEIGHT"
|
|
error: Optional[str] = None
|
|
similar_files: List[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {k: v for k, v in self.__dict__.items() if v is not None and v != []}
|
|
|
|
|
|
@dataclass
|
|
class WriteResult:
|
|
"""Result from writing a file."""
|
|
bytes_written: int = 0
|
|
dirs_created: bool = False
|
|
lint: Optional[Dict[str, Any]] = None
|
|
# Semantic diagnostics from the LSP layer, when applicable. Kept in
|
|
# its own field (not folded into ``lint``) so the model and any
|
|
# downstream parsers can read syntax errors and semantic errors as
|
|
# separate signals. ``None`` when LSP is disabled, when the file
|
|
# isn't in a git workspace, or when no diagnostics were introduced
|
|
# by this edit.
|
|
lsp_diagnostics: Optional[str] = None
|
|
error: Optional[str] = None
|
|
warning: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {k: v for k, v in self.__dict__.items() if v is not None}
|
|
|
|
|
|
@dataclass
|
|
class PatchResult:
|
|
"""Result from patching a file."""
|
|
success: bool = False
|
|
diff: str = ""
|
|
files_modified: List[str] = field(default_factory=list)
|
|
files_created: List[str] = field(default_factory=list)
|
|
files_deleted: List[str] = field(default_factory=list)
|
|
lint: Optional[Dict[str, Any]] = None
|
|
# See :class:`WriteResult.lsp_diagnostics`.
|
|
lsp_diagnostics: Optional[str] = None
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {"success": self.success}
|
|
if self.diff:
|
|
result["diff"] = self.diff
|
|
if self.files_modified:
|
|
result["files_modified"] = self.files_modified
|
|
if self.files_created:
|
|
result["files_created"] = self.files_created
|
|
if self.files_deleted:
|
|
result["files_deleted"] = self.files_deleted
|
|
if self.lint:
|
|
result["lint"] = self.lint
|
|
if self.lsp_diagnostics:
|
|
result["lsp_diagnostics"] = self.lsp_diagnostics
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class SearchMatch:
|
|
"""A single search match."""
|
|
path: str
|
|
line_number: int
|
|
content: str
|
|
mtime: float = 0.0 # Modification time for sorting
|
|
|
|
|
|
@dataclass
|
|
class SearchResult:
|
|
"""Result from searching."""
|
|
matches: List[SearchMatch] = field(default_factory=list)
|
|
files: List[str] = field(default_factory=list)
|
|
counts: Dict[str, int] = field(default_factory=dict)
|
|
total_count: int = 0
|
|
truncated: bool = False
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {"total_count": self.total_count}
|
|
if self.matches:
|
|
result["matches"] = [
|
|
{"path": m.path, "line": m.line_number, "content": m.content}
|
|
for m in self.matches
|
|
]
|
|
if self.files:
|
|
result["files"] = self.files
|
|
if self.counts:
|
|
result["counts"] = self.counts
|
|
if self.truncated:
|
|
result["truncated"] = True
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class LintResult:
|
|
"""Result from linting a file."""
|
|
success: bool = True
|
|
skipped: bool = False
|
|
output: str = ""
|
|
message: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
if self.skipped:
|
|
return {"status": "skipped", "message": self.message}
|
|
result = {"status": "ok" if self.success else "error", "output": self.output}
|
|
if self.message:
|
|
result["message"] = self.message
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class ExecuteResult:
|
|
"""Result from executing a shell command."""
|
|
stdout: str = ""
|
|
exit_code: int = 0
|
|
|
|
|
|
def _parse_search_context_line(line: str) -> tuple[str, int, str] | None:
|
|
"""Parse grep/rg context output in ``path-line-content`` format.
|
|
|
|
Context lines are ambiguous because filenames may legitimately contain
|
|
``-<digits>-`` segments. Prefer the rightmost numeric separator so a path
|
|
like ``dir/file-12-name.py-8-context`` resolves to
|
|
``dir/file-12-name.py`` line ``8`` instead of truncating at ``file``.
|
|
"""
|
|
if not line or line == "--":
|
|
return None
|
|
|
|
match = None
|
|
for candidate in re.finditer(r'-(\d+)-', line):
|
|
match = candidate
|
|
|
|
if match is None:
|
|
return None
|
|
|
|
path = line[:match.start()]
|
|
if not path:
|
|
return None
|
|
|
|
return path, int(match.group(1)), line[match.end():]
|
|
|
|
|
|
# =============================================================================
|
|
# Abstract Interface
|
|
# =============================================================================
|
|
|
|
class FileOperations(ABC):
|
|
"""Abstract interface for file operations across terminal backends."""
|
|
|
|
@abstractmethod
|
|
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
|
"""Read a file with pagination support."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def read_file_raw(self, path: str) -> ReadResult:
|
|
"""Read the complete file content as a plain string.
|
|
|
|
No pagination, no line-number prefixes, no per-line truncation.
|
|
Returns ReadResult with .content = full file text, .error set on
|
|
failure. Always reads to EOF regardless of file size.
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
def write_file(self, path: str, content: str) -> WriteResult:
|
|
"""Write content to a file, creating directories as needed."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def patch_replace(self, path: str, old_string: str, new_string: str,
|
|
replace_all: bool = False) -> PatchResult:
|
|
"""Replace text in a file using fuzzy matching."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def patch_v4a(self, patch_content: str) -> PatchResult:
|
|
"""Apply a V4A format patch."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def delete_file(self, path: str) -> WriteResult:
|
|
"""Delete a file. Returns WriteResult with .error set on failure."""
|
|
...
|
|
|
|
def delete_path(self, path: str, recursive: bool = False) -> WriteResult:
|
|
"""Cross-platform delete that handles files and (with recursive=True)
|
|
directory trees. Default implementation delegates to ``delete_file``
|
|
for the non-recursive case; backends with native recursive support
|
|
should override.
|
|
"""
|
|
if recursive:
|
|
return WriteResult(error="Recursive delete not implemented for this backend")
|
|
return self.delete_file(path)
|
|
|
|
@abstractmethod
|
|
def move_file(self, src: str, dst: str) -> WriteResult:
|
|
"""Move/rename a file from src to dst. Returns WriteResult with .error set on failure."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def search(self, pattern: str, path: str = ".", target: str = "content",
|
|
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
|
output_mode: str = "content", context: int = 0) -> SearchResult:
|
|
"""Search for content or files."""
|
|
...
|
|
|
|
|
|
# =============================================================================
|
|
# Shell-based Implementation
|
|
# =============================================================================
|
|
|
|
# Image extensions (subset of binary that we can return as base64)
|
|
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
|
|
|
|
# Shell-based linters by file extension. Invoked via _exec() with the
|
|
# filesystem path. Cover languages where a compile/type check needs an
|
|
# external toolchain (py_compile, node, tsc, go vet, rustfmt).
|
|
LINTERS = {
|
|
'.py': 'python -m py_compile {file} 2>&1',
|
|
'.js': 'node --check {file} 2>&1',
|
|
'.ts': 'npx tsc --noEmit {file} 2>&1',
|
|
'.go': 'go vet {file} 2>&1',
|
|
'.rs': 'rustfmt --check {file} 2>&1',
|
|
}
|
|
|
|
# Extensions where the per-file shell linter is structurally weaker than
|
|
# a real LSP server AND produces phantom errors on real-world projects:
|
|
#
|
|
# - ``.ts``: ``tsc --noEmit FILE.ts`` ignores ``tsconfig.json`` and
|
|
# defaults to no-lib / ES5, so every ES2015+ stdlib reference
|
|
# (``Promise``, ``Map``, ``Set``, ``ReadonlySet``, ``Iterable``,
|
|
# ``Math.imul``, ``Number.isFinite``, etc.) reports as missing. This
|
|
# floods the agent's lint field with 20K+ tokens of false positives on
|
|
# every edit. No supported tsc flag fixes the single-file invocation;
|
|
# the canonical replacement is ``tsserver`` via LSP, which respects
|
|
# tsconfig and gives true diagnostics.
|
|
#
|
|
# ``.tsx`` is intentionally NOT in ``LINTERS`` (and therefore not
|
|
# here): it has no shell linter entry, so it falls through to the
|
|
# ``ext not in LINTERS`` skip case unchanged. Pre-PR behavior:
|
|
# ``.tsx`` was implicitly ``skipped``. Keeping it that way means
|
|
# ``.tsx`` edits with LSP disabled get no per-file syntax check
|
|
# (same as before this PR) instead of the broken ``tsc`` invocation
|
|
# that ``.ts`` used to get. When LSP is enabled, ``.tsx`` is covered
|
|
# by the LSP tier via ``_maybe_lsp_diagnostics`` exactly as ``.ts``.
|
|
#
|
|
# - ``.go``: ``go vet FILE.go`` fails outside a module / GOPATH with
|
|
# "cannot find package" — already partially handled by
|
|
# ``_LINTER_UNUSABLE_PATTERNS`` but only when the package error is the
|
|
# ONLY output; mixed real+phantom output still leaks through.
|
|
# ``gopls`` is the canonical replacement.
|
|
#
|
|
# - ``.rs``: ``rustfmt --check FILE.rs`` is style, not type-checking, and
|
|
# rejects non-Cargo project files. ``rust-analyzer`` is the canonical
|
|
# replacement.
|
|
#
|
|
# When the LSP service is configured AND ``enabled_for(path)`` for this
|
|
# extension's file, ``_check_lint`` skips the shell linter for these
|
|
# extensions — the ``lsp_diagnostics`` channel carries the real signal.
|
|
# Everything else in ``LINTERS`` (Python ``py_compile``, ``node --check``)
|
|
# is fast, file-local, and correct, so it runs unconditionally.
|
|
_SHELL_LINTER_LSP_REDUNDANT = frozenset({'.ts', '.go', '.rs'})
|
|
|
|
|
|
# Patterns that indicate the linter base command exists on PATH but
|
|
# couldn't actually run — e.g. ``npx tsc`` when tsc isn't installed in
|
|
# node_modules, or rustfmt complaining there's no Cargo project. When
|
|
# any of these substrings appears in the linter output, ``_check_lint``
|
|
# returns ``skipped`` instead of ``error`` so:
|
|
#
|
|
# 1. The write isn't flagged for a tooling problem the agent can't fix.
|
|
# 2. The LSP semantic tier still runs (it gates on success/skipped).
|
|
#
|
|
# Patterns are matched case-insensitively against linter stdout.
|
|
_LINTER_UNUSABLE_PATTERNS = {
|
|
'npx': (
|
|
# npx prints this banner when the package isn't installed locally
|
|
# AND it can't auto-install (no internet, registry off, etc.) or
|
|
# when the binary it tried to run is the wrong one.
|
|
'this is not the tsc command you are looking for',
|
|
# npx with --no-install resolution failures
|
|
'could not determine executable to run',
|
|
'not found in npm registry',
|
|
),
|
|
'rustfmt': (
|
|
# rustfmt outside a Cargo project
|
|
'no input filename given',
|
|
'error: not a workspace',
|
|
),
|
|
'go': (
|
|
# ``go vet`` on a file outside a module / GOPATH
|
|
'cannot find package',
|
|
'go: cannot find main module',
|
|
),
|
|
}
|
|
|
|
|
|
def _looks_like_linter_unusable(base_cmd: str, output: str) -> bool:
|
|
"""Return True iff ``output`` from ``base_cmd`` indicates the linter
|
|
itself couldn't run (a tooling gap), as opposed to a real lint error
|
|
in the file being checked.
|
|
|
|
``base_cmd`` is the first word of the linter command line (``npx``,
|
|
``rustfmt``, ``go``, ...). ``output`` is the stdout/stderr captured
|
|
from running it.
|
|
"""
|
|
patterns = _LINTER_UNUSABLE_PATTERNS.get(base_cmd)
|
|
if not patterns:
|
|
return False
|
|
lower = output.lower()
|
|
return any(p in lower for p in patterns)
|
|
|
|
|
|
def _lint_json_inproc(content: str) -> tuple[bool, str]:
|
|
"""In-process JSON syntax check. Returns (ok, error_message)."""
|
|
import json as _json
|
|
try:
|
|
_json.loads(content)
|
|
return True, ""
|
|
except _json.JSONDecodeError as e:
|
|
return False, f"JSONDecodeError: {e.msg} (line {e.lineno}, column {e.colno})"
|
|
except Exception as e: # noqa: BLE001 — any parse failure is a lint failure
|
|
return False, f"{type(e).__name__}: {e}"
|
|
|
|
|
|
def _lint_yaml_inproc(content: str) -> tuple[bool, str]:
|
|
"""In-process YAML syntax check. Returns (ok, error_message).
|
|
|
|
Skipped gracefully if PyYAML isn't installed — YAML parsing is optional.
|
|
"""
|
|
try:
|
|
import yaml as _yaml
|
|
except ImportError:
|
|
# PyYAML not available — skip silently, caller treats as no linter.
|
|
return True, "__SKIP__"
|
|
try:
|
|
_yaml.safe_load(content)
|
|
return True, ""
|
|
except _yaml.YAMLError as e:
|
|
return False, f"YAMLError: {e}"
|
|
except Exception as e: # noqa: BLE001
|
|
return False, f"{type(e).__name__}: {e}"
|
|
|
|
|
|
def _lint_toml_inproc(content: str) -> tuple[bool, str]:
|
|
"""In-process TOML syntax check (stdlib tomllib, Python 3.11+)."""
|
|
try:
|
|
import tomllib as _toml
|
|
except ImportError:
|
|
# Pre-3.11 fallback via tomli, if installed.
|
|
try:
|
|
import tomli as _toml # type: ignore[no-redef]
|
|
except ImportError:
|
|
return True, "__SKIP__"
|
|
try:
|
|
_toml.loads(content)
|
|
return True, ""
|
|
except Exception as e: # tomllib raises TOMLDecodeError, a ValueError subclass
|
|
return False, f"{type(e).__name__}: {e}"
|
|
|
|
|
|
def _lint_python_inproc(content: str) -> tuple[bool, str]:
|
|
"""In-process Python syntax check via ast.parse.
|
|
|
|
Catches SyntaxError, IndentationError, and everything else the
|
|
ast module rejects — matching py_compile's scope but with no
|
|
subprocess overhead and no dependency on a ``python`` in PATH.
|
|
"""
|
|
import ast as _ast
|
|
try:
|
|
_ast.parse(content)
|
|
return True, ""
|
|
except SyntaxError as e:
|
|
loc = f" (line {e.lineno}, column {e.offset})" if e.lineno else ""
|
|
return False, f"{type(e).__name__}: {e.msg}{loc}"
|
|
except Exception as e: # noqa: BLE001
|
|
return False, f"{type(e).__name__}: {e}"
|
|
|
|
|
|
# In-process linters by file extension. Preferred over shell linters when
|
|
# present — no subprocess overhead, microseconds per call. Each callable
|
|
# takes file content (str) and returns (ok: bool, error: str). An error
|
|
# string of ``"__SKIP__"`` signals the linter isn't available (missing
|
|
# dependency) and should be treated as "no linter".
|
|
LINTERS_INPROC = {
|
|
'.py': _lint_python_inproc,
|
|
'.json': _lint_json_inproc,
|
|
'.yaml': _lint_yaml_inproc,
|
|
'.yml': _lint_yaml_inproc,
|
|
'.toml': _lint_toml_inproc,
|
|
}
|
|
|
|
# Max limits for read operations
|
|
MAX_LINES = 2000
|
|
MAX_LINE_LENGTH = 2000
|
|
MAX_FILE_SIZE = 50 * 1024 # 50KB
|
|
DEFAULT_READ_OFFSET = 1
|
|
DEFAULT_READ_LIMIT = 500
|
|
DEFAULT_SEARCH_OFFSET = 0
|
|
DEFAULT_SEARCH_LIMIT = 50
|
|
|
|
|
|
def _coerce_int(value: Any, default: int) -> int:
|
|
"""Best-effort integer coercion for tool pagination inputs."""
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def normalize_read_pagination(offset: Any = DEFAULT_READ_OFFSET,
|
|
limit: Any = DEFAULT_READ_LIMIT) -> tuple[int, int]:
|
|
"""Return safe read_file pagination bounds.
|
|
|
|
Tool schemas declare minimum/maximum values, but not every caller or
|
|
provider enforces schemas before dispatch. Clamp here so invalid values
|
|
cannot leak into sed ranges like ``0,-1p``.
|
|
|
|
The upper bound on ``limit`` comes from ``tool_output.max_lines`` in
|
|
config.yaml (defaults to the module-level ``MAX_LINES`` constant).
|
|
"""
|
|
from tools.tool_output_limits import get_max_lines
|
|
max_lines = get_max_lines()
|
|
normalized_offset = max(1, _coerce_int(offset, DEFAULT_READ_OFFSET))
|
|
normalized_limit = _coerce_int(limit, DEFAULT_READ_LIMIT)
|
|
normalized_limit = max(1, min(normalized_limit, max_lines))
|
|
return normalized_offset, normalized_limit
|
|
|
|
|
|
def normalize_search_pagination(offset: Any = DEFAULT_SEARCH_OFFSET,
|
|
limit: Any = DEFAULT_SEARCH_LIMIT) -> tuple[int, int]:
|
|
"""Return safe search pagination bounds for shell head/tail pipelines."""
|
|
normalized_offset = max(0, _coerce_int(offset, DEFAULT_SEARCH_OFFSET))
|
|
normalized_limit = max(1, _coerce_int(limit, DEFAULT_SEARCH_LIMIT))
|
|
return normalized_offset, normalized_limit
|
|
|
|
|
|
class ShellFileOperations(FileOperations):
|
|
"""
|
|
File operations implemented via shell commands.
|
|
|
|
Works with ANY terminal backend that has execute(command, cwd) method.
|
|
This includes local, docker, singularity, ssh, modal, and daytona environments.
|
|
"""
|
|
|
|
def __init__(self, terminal_env, cwd: str = None):
|
|
"""
|
|
Initialize file operations with a terminal environment.
|
|
|
|
Args:
|
|
terminal_env: Any object with execute(command, cwd) method.
|
|
Returns {"output": str, "returncode": int}
|
|
cwd: Optional explicit fallback cwd when the terminal env has
|
|
no cwd attribute (rare — most backends track cwd live).
|
|
|
|
Note:
|
|
Every _exec() call prefers the LIVE ``terminal_env.cwd`` over
|
|
``self.cwd`` so ``cd`` commands run via the terminal tool are
|
|
picked up immediately. ``self.cwd`` is only used as a fallback
|
|
when the env has no cwd at all — it is NOT the authoritative
|
|
cwd, despite being settable at init time.
|
|
|
|
Historical bug (fixed): prior versions of this class used the
|
|
init-time cwd for every _exec() call, which caused relative
|
|
paths passed to patch/read/write to target the wrong directory
|
|
after the user ran ``cd`` in the terminal. Patches would
|
|
claim success and return a plausible diff but land in the
|
|
original directory, producing apparent silent failures.
|
|
"""
|
|
self.env = terminal_env
|
|
# Determine cwd from various possible sources.
|
|
# IMPORTANT: do NOT fall back to os.getcwd() -- that's the HOST's local
|
|
# path which doesn't exist inside container/cloud backends (modal, docker).
|
|
# If nothing provides a cwd, use "/" as a safe universal default.
|
|
self.cwd = cwd or getattr(terminal_env, 'cwd', None) or \
|
|
getattr(getattr(terminal_env, 'config', None), 'cwd', None) or "/"
|
|
|
|
# Cache for command availability checks
|
|
self._command_cache: Dict[str, bool] = {}
|
|
|
|
def _exec(self, command: str, cwd: str = None, timeout: int = None,
|
|
stdin_data: str = None) -> ExecuteResult:
|
|
"""Execute command via terminal backend.
|
|
|
|
Args:
|
|
stdin_data: If provided, piped to the process's stdin instead of
|
|
embedding in the command string. Bypasses ARG_MAX.
|
|
|
|
Cwd resolution order (critical — see class docstring):
|
|
1. Explicit ``cwd`` arg (if provided)
|
|
2. Live ``self.env.cwd`` (tracks ``cd`` commands run via terminal)
|
|
3. Init-time ``self.cwd`` (fallback when env has no cwd attribute)
|
|
|
|
This ordering ensures relative paths in file operations follow the
|
|
terminal's current directory — not the directory this file_ops was
|
|
originally created in. See test_file_ops_cwd_tracking.py.
|
|
"""
|
|
kwargs = {}
|
|
if timeout:
|
|
kwargs['timeout'] = timeout
|
|
if stdin_data is not None:
|
|
kwargs['stdin_data'] = stdin_data
|
|
|
|
# Resolve cwd from the live env so `cd` commands are picked up.
|
|
# Fall through to init-time self.cwd only if the env doesn't track cwd.
|
|
effective_cwd = cwd or getattr(self.env, 'cwd', None) or self.cwd
|
|
result = self.env.execute(command, cwd=effective_cwd, **kwargs)
|
|
return ExecuteResult(
|
|
stdout=result.get("output", ""),
|
|
exit_code=result.get("returncode", 0)
|
|
)
|
|
|
|
def _has_command(self, cmd: str) -> bool:
|
|
"""Check if a command exists in the environment (cached)."""
|
|
if cmd not in self._command_cache:
|
|
result = self._exec(f"command -v {cmd} >/dev/null 2>&1 && echo 'yes'")
|
|
self._command_cache[cmd] = result.stdout.strip() == 'yes'
|
|
return self._command_cache[cmd]
|
|
|
|
def _is_likely_binary(self, path: str, content_sample: str = None) -> bool:
|
|
"""
|
|
Check if a file is likely binary.
|
|
|
|
Uses extension check (fast) + content analysis (fallback).
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
if ext in BINARY_EXTENSIONS:
|
|
return True
|
|
|
|
# Content analysis: >30% non-printable chars = binary
|
|
if content_sample:
|
|
non_printable = sum(1 for c in content_sample[:1000]
|
|
if ord(c) < 32 and c not in '\n\r\t')
|
|
return non_printable / min(len(content_sample), 1000) > 0.30
|
|
|
|
return False
|
|
|
|
def _is_image(self, path: str) -> bool:
|
|
"""Check if file is an image we can return as base64."""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
return ext in IMAGE_EXTENSIONS
|
|
|
|
def _add_line_numbers(self, content: str, start_line: int = 1) -> str:
|
|
"""Add line numbers to content in ``LINE_NUM|CONTENT`` format.
|
|
|
|
The gutter uses a compact ``<n>|`` prefix (e.g. ``34|foo``) rather
|
|
than a fixed-width zero/space-padded one (`` 34|foo``). The
|
|
padding was pure token overhead: on dense source the padded gutter
|
|
cost ~48% more tokens than the bare content and ~16% more than the
|
|
compact form, because the leading spaces + zero-padding tokenize
|
|
into extra tokens on every single line. An A/B (Sonnet 4.6, 2
|
|
passes) showed the compact gutter matches the padded gutter on
|
|
line-reference / patch / value-lookup / structure tasks (4/4 both),
|
|
while dropping line numbers entirely regressed line-referencing
|
|
(the model hand-counted and was off-by-one, 3/4) — so we keep the
|
|
numbers, just not the padding.
|
|
"""
|
|
from tools.tool_output_limits import get_max_line_length
|
|
max_line_length = get_max_line_length()
|
|
lines = content.split('\n')
|
|
numbered = []
|
|
for i, line in enumerate(lines, start=start_line):
|
|
# Truncate long lines
|
|
if len(line) > max_line_length:
|
|
line = line[:max_line_length] + "... [truncated]"
|
|
numbered.append(f"{i}|{line}")
|
|
return '\n'.join(numbered)
|
|
|
|
def _expand_path(self, path: str) -> str:
|
|
"""
|
|
Expand shell-style paths like ~ and ~user to absolute paths.
|
|
|
|
This must be done BEFORE shell escaping, since ~ doesn't expand
|
|
inside single quotes.
|
|
"""
|
|
if not path:
|
|
return path
|
|
|
|
# Handle ~ and ~user
|
|
if path.startswith('~'):
|
|
# Get home directory via the terminal environment
|
|
result = self._exec("echo $HOME")
|
|
if result.exit_code == 0 and result.stdout.strip():
|
|
home = result.stdout.strip()
|
|
if path == '~':
|
|
return home
|
|
elif path.startswith('~/'):
|
|
return home + path[1:] # Replace ~ with home
|
|
# ~username format - extract and validate username before
|
|
# letting shell expand it (prevent shell injection via
|
|
# paths like "~; rm -rf /").
|
|
rest = path[1:] # strip leading ~
|
|
slash_idx = rest.find('/')
|
|
username = rest[:slash_idx] if slash_idx >= 0 else rest
|
|
if username and re.fullmatch(r'[a-zA-Z0-9._-]+', username):
|
|
# Only expand ~username (not the full path) to avoid shell
|
|
# injection via path suffixes like "~user/$(malicious)".
|
|
expand_result = self._exec(f"echo ~{username}")
|
|
if expand_result.exit_code == 0 and expand_result.stdout.strip():
|
|
user_home = expand_result.stdout.strip()
|
|
suffix = path[1 + len(username):] # e.g. "/rest/of/path"
|
|
return user_home + suffix
|
|
|
|
return path
|
|
|
|
def _escape_shell_arg(self, arg: str) -> str:
|
|
"""Escape a string for safe use in shell commands."""
|
|
# Use single quotes and escape any single quotes in the string
|
|
return "'" + arg.replace("'", "'\"'\"'") + "'"
|
|
|
|
def _atomic_write(self, path: str, content: str) -> "ExecuteResult":
|
|
"""Write ``content`` to ``path`` atomically via temp-file + rename.
|
|
|
|
Streams ``content`` over stdin into a temp file in the SAME
|
|
directory as ``path`` (so the final ``mv`` is a real rename on the
|
|
same filesystem, not a non-atomic cross-device copy), preserves the
|
|
existing file's mode if it exists, then renames over the target.
|
|
On any failure the temp file is removed so we never leak a partial
|
|
``.hermes-tmp`` file next to the user's data, and the original file
|
|
is left untouched. Content rides stdin so there is no ARG_MAX limit.
|
|
|
|
Returns an :class:`ExecuteResult`; ``exit_code == 0`` means the file
|
|
was swapped into place atomically. A non-zero exit means nothing was
|
|
renamed and the original (if any) is intact.
|
|
"""
|
|
q_path = self._escape_shell_arg(path)
|
|
parent = os.path.dirname(path) or "."
|
|
q_parent = self._escape_shell_arg(parent)
|
|
# template basename: hidden so it doesn't show up in casual `ls`,
|
|
# carries a marker so an orphaned temp (only possible on a hard
|
|
# crash *between* cat and mv) is identifiable.
|
|
tmpl = self._escape_shell_arg(".hermes-tmp.XXXXXX")
|
|
|
|
# One shell script, fully quoted. Notes:
|
|
# - `mktemp` lands the temp in the target's own dir (-p) so `mv` is
|
|
# same-FS atomic; we fall back to a PID-stamped name if the
|
|
# backend lacks mktemp (rare; busybox/macOS/Linux all ship it).
|
|
# - `chmod --reference` is GNU-only, so we read the octal mode with
|
|
# `stat` (GNU `-c%a` or BSD `-f%Lp`) and `chmod` it explicitly;
|
|
# silent best-effort — a perms-copy failure must not abort the
|
|
# write, the file still lands with default umask perms.
|
|
# - `trap ... EXIT` guarantees the temp is removed on every error
|
|
# path (cat failure, mv failure, signal) but NOT after a
|
|
# successful mv (the temp no longer exists by then).
|
|
# - we `cat >` the temp, then `mv -f` it over the target.
|
|
script = (
|
|
"set -e; "
|
|
f"d={q_parent}; t={q_path}; "
|
|
'tmp="$(mktemp -p "$d" ' + tmpl + ' 2>/dev/null '
|
|
'|| mktemp "$d/.hermes-tmp.$$.XXXXXX" 2>/dev/null '
|
|
'|| { tmp="$d/.hermes-tmp.$$"; : > "$tmp" && echo "$tmp"; })"; '
|
|
'[ -n "$tmp" ] || { echo "atomic write: could not create temp file" >&2; exit 1; }; '
|
|
"trap 'rm -f \"$tmp\"' EXIT; "
|
|
# preserve mode of an existing target (best-effort, never fatal)
|
|
'if [ -e "$t" ]; then '
|
|
'm="$(stat -c%a "$t" 2>/dev/null || stat -f%Lp "$t" 2>/dev/null || true)"; '
|
|
'[ -n "$m" ] && chmod "$m" "$tmp" 2>/dev/null || true; '
|
|
"fi; "
|
|
'cat > "$tmp"; '
|
|
'mv -f "$tmp" "$t"; '
|
|
"trap - EXIT"
|
|
)
|
|
return self._exec(script, stdin_data=content)
|
|
|
|
def _detect_file_line_ending(self, path: str, pre_content: Optional[str] = None) -> Optional[str]:
|
|
"""Detect the dominant line ending of a file on disk.
|
|
|
|
If ``pre_content`` is already available (we just read the file
|
|
for lint/LSP purposes), inspect that — zero extra exec calls.
|
|
Otherwise issue a tiny ``head -c 4096`` to sample the first 4KB.
|
|
|
|
Returns ``"\\r\\n"`` for CRLF (Windows), ``"\\n"`` for LF (Unix),
|
|
or ``None`` if undetermined (new file, empty file, single-line
|
|
file with no line break in the first chunk).
|
|
"""
|
|
if pre_content:
|
|
return _detect_line_ending(pre_content)
|
|
# File may not exist (new write) — `head` exits 0 with empty
|
|
# stdout in that case which yields None below. Cheap probe.
|
|
head_cmd = f"head -c 4096 {self._escape_shell_arg(path)} 2>/dev/null"
|
|
head_result = self._exec(head_cmd)
|
|
if head_result.exit_code != 0 or not head_result.stdout:
|
|
return None
|
|
return _detect_line_ending(head_result.stdout)
|
|
|
|
def _file_has_bom(self, path: str, pre_content: Optional[str] = None) -> bool:
|
|
"""Whether the file on disk starts with a UTF-8 BOM.
|
|
|
|
Uses ``pre_content`` if we already read the file (zero extra exec
|
|
calls); otherwise issues a tiny ``head -c 3`` to sample just the
|
|
marker. A missing/empty file returns False (new writes get no BOM
|
|
unless the caller explicitly includes one).
|
|
"""
|
|
if pre_content is not None:
|
|
return _has_bom(pre_content)
|
|
head_cmd = f"head -c 3 {self._escape_shell_arg(path)} 2>/dev/null"
|
|
head_result = self._exec(head_cmd)
|
|
if head_result.exit_code != 0 or not head_result.stdout:
|
|
return False
|
|
return _has_bom(head_result.stdout)
|
|
|
|
|
|
def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str:
|
|
"""Generate unified diff between old and new content."""
|
|
old_lines = old_content.splitlines(keepends=True)
|
|
new_lines = new_content.splitlines(keepends=True)
|
|
diff = difflib.unified_diff(
|
|
old_lines, new_lines,
|
|
fromfile=f"a/{filename}",
|
|
tofile=f"b/{filename}"
|
|
)
|
|
return ''.join(diff)
|
|
|
|
# =========================================================================
|
|
# READ Implementation
|
|
# =========================================================================
|
|
|
|
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
|
"""
|
|
Read a file with pagination, binary detection, and line numbers.
|
|
|
|
Args:
|
|
path: File path (absolute or relative to cwd)
|
|
offset: Line number to start from (1-indexed, default 1)
|
|
limit: Maximum lines to return (default 500, max 2000)
|
|
|
|
Returns:
|
|
ReadResult with content, metadata, or error info
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
offset, limit = normalize_read_pagination(offset, limit)
|
|
|
|
# Check if file exists and get size (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
|
|
if stat_result.exit_code != 0:
|
|
# File not found - try to suggest similar files
|
|
return self._suggest_similar_files(path)
|
|
|
|
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
|
|
try:
|
|
file_size = int(stat_output.strip())
|
|
except ValueError:
|
|
file_size = 0
|
|
|
|
# Check if file is too large
|
|
if file_size > MAX_FILE_SIZE:
|
|
# Still try to read, but warn
|
|
pass
|
|
|
|
# Images are never inlined — redirect to the vision tool
|
|
if self._is_image(path):
|
|
return ReadResult(
|
|
is_image=True,
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
hint=(
|
|
"Image file detected. Automatically redirected to vision_analyze tool. "
|
|
"Use vision_analyze with this file path to inspect the image contents."
|
|
),
|
|
)
|
|
|
|
# Read a sample to check for binary content
|
|
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
|
|
sample_result = self._exec(sample_cmd)
|
|
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
|
|
|
|
if self._is_likely_binary(path, sample_output):
|
|
return ReadResult(
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
error="Binary file - cannot display as text. Use appropriate tools to handle this file type."
|
|
)
|
|
|
|
# Read with pagination using sed
|
|
end_line = offset + limit - 1
|
|
read_cmd = f"sed -n '{offset},{end_line}p' {self._escape_shell_arg(path)}"
|
|
read_result = self._exec(read_cmd)
|
|
|
|
if read_result.exit_code != 0:
|
|
return ReadResult(error=f"Failed to read file: {read_result.stdout}")
|
|
read_output = _strip_terminal_fence_leaks(read_result.stdout)
|
|
# Strip a leading UTF-8 BOM so the model never sees a phantom U+FEFF
|
|
# before the first real character. Only meaningful on the first
|
|
# chunk (the marker lives at byte 0); later pages can't carry it.
|
|
if offset == 1:
|
|
read_output, _ = _strip_bom(read_output)
|
|
|
|
# Get total line count
|
|
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
|
|
wc_result = self._exec(wc_cmd)
|
|
wc_output = _strip_terminal_fence_leaks(wc_result.stdout)
|
|
try:
|
|
total_lines = int(wc_output.strip())
|
|
except ValueError:
|
|
total_lines = 0
|
|
|
|
# Check if truncated
|
|
truncated = total_lines > end_line
|
|
hint = None
|
|
if truncated:
|
|
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
|
|
|
|
return ReadResult(
|
|
content=self._add_line_numbers(read_output, offset),
|
|
total_lines=total_lines,
|
|
file_size=file_size,
|
|
truncated=truncated,
|
|
hint=hint
|
|
)
|
|
|
|
def _suggest_similar_files(self, path: str) -> ReadResult:
|
|
"""Suggest similar files when the requested file is not found."""
|
|
dir_path = os.path.dirname(path) or "."
|
|
filename = os.path.basename(path)
|
|
basename_no_ext = os.path.splitext(filename)[0]
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
lower_name = filename.lower()
|
|
|
|
# List files in the target directory
|
|
ls_cmd = f"ls -1 {self._escape_shell_arg(dir_path)} 2>/dev/null | head -50"
|
|
ls_result = self._exec(ls_cmd)
|
|
|
|
scored: list = [] # (score, filepath) — higher is better
|
|
if ls_result.exit_code == 0 and ls_result.stdout.strip():
|
|
for f in ls_result.stdout.strip().split('\n'):
|
|
if not f:
|
|
continue
|
|
lf = f.lower()
|
|
score = 0
|
|
|
|
# Exact match (shouldn't happen, but guard)
|
|
if lf == lower_name:
|
|
score = 100
|
|
# Same base name, different extension (e.g. config.yml vs config.yaml)
|
|
elif os.path.splitext(f)[0].lower() == basename_no_ext.lower():
|
|
score = 90
|
|
# Target is prefix of candidate or vice-versa
|
|
elif lf.startswith(lower_name) or lower_name.startswith(lf):
|
|
score = 70
|
|
# Substring match (candidate contains query)
|
|
elif lower_name in lf:
|
|
score = 60
|
|
# Reverse substring (query contains candidate name)
|
|
elif lf in lower_name and len(lf) > 2:
|
|
score = 40
|
|
# Same extension with some overlap
|
|
elif ext and os.path.splitext(f)[1].lower() == ext:
|
|
common = set(lower_name) & set(lf)
|
|
if len(common) >= max(len(lower_name), len(lf)) * 0.4:
|
|
score = 30
|
|
|
|
if score > 0:
|
|
scored.append((score, os.path.join(dir_path, f)))
|
|
|
|
scored.sort(key=lambda x: -x[0])
|
|
similar = [fp for _, fp in scored[:5]]
|
|
|
|
return ReadResult(
|
|
error=f"File not found: {path}",
|
|
similar_files=similar
|
|
)
|
|
|
|
def read_file_raw(self, path: str) -> ReadResult:
|
|
"""Read the complete file content as a plain string.
|
|
|
|
No pagination, no line-number prefixes, no per-line truncation.
|
|
Uses cat so the full file is returned regardless of size.
|
|
"""
|
|
path = self._expand_path(path)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
if stat_result.exit_code != 0:
|
|
return self._suggest_similar_files(path)
|
|
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
|
|
try:
|
|
file_size = int(stat_output.strip())
|
|
except ValueError:
|
|
file_size = 0
|
|
if self._is_image(path):
|
|
return ReadResult(is_image=True, is_binary=True, file_size=file_size)
|
|
sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null")
|
|
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
|
|
if self._is_likely_binary(path, sample_output):
|
|
return ReadResult(
|
|
is_binary=True, file_size=file_size,
|
|
error="Binary file — cannot display as text."
|
|
)
|
|
cat_result = self._exec(f"cat {self._escape_shell_arg(path)}")
|
|
if cat_result.exit_code != 0:
|
|
return ReadResult(error=f"Failed to read file: {cat_result.stdout}")
|
|
# Strip a leading UTF-8 BOM so patch's fuzzy matcher operates on
|
|
# clean content (a phantom U+FEFF before line 1 would defeat an
|
|
# exact first-line match). write_file restores the BOM on the way
|
|
# back out — it re-probes the on-disk file, which still has the
|
|
# marker — so the round-trip preserves it.
|
|
raw_content, _ = _strip_bom(_strip_terminal_fence_leaks(cat_result.stdout))
|
|
return ReadResult(
|
|
content=raw_content,
|
|
file_size=file_size,
|
|
)
|
|
|
|
def delete_file(self, path: str) -> WriteResult:
|
|
"""Delete a single file.
|
|
|
|
Cross-platform: runs via ``python -c`` against the terminal env's
|
|
Python so it works on Windows shells (``cmd.exe``/PowerShell) that
|
|
don't ship ``rm``. Directories are rejected here — use
|
|
``delete_path(recursive=True)`` for trees.
|
|
"""
|
|
return self._python_delete(path, recursive=False)
|
|
|
|
def delete_path(self, path: str, recursive: bool = False) -> WriteResult:
|
|
"""Cross-platform delete that handles files and (with recursive=True)
|
|
directory trees. Always preferred over emitting ``rm -rf`` /
|
|
``Remove-Item -Recurse`` directly so the same tool call works on
|
|
every backend (local / docker / ssh / Windows).
|
|
"""
|
|
return self._python_delete(path, recursive=recursive)
|
|
|
|
def _python_delete(self, path: str, recursive: bool) -> WriteResult:
|
|
path = self._expand_path(path)
|
|
if _is_write_denied(path):
|
|
return WriteResult(error=f"Delete denied: {path} is a protected path")
|
|
|
|
# We can't shell out to ``rm`` here — it doesn't exist on Windows
|
|
# ``cmd.exe`` or PowerShell, so this code path is what's left when
|
|
# the backend's terminal is a Windows shell. Path is baked into the
|
|
# snippet via ``repr()`` so quoting is correct on every shell.
|
|
snippet = (
|
|
"import shutil, pathlib, sys\n"
|
|
f"p = pathlib.Path({path!r})\n"
|
|
f"recursive = {bool(recursive)!r}\n"
|
|
"try:\n"
|
|
" if p.is_dir() and not p.is_symlink():\n"
|
|
" if recursive:\n"
|
|
" shutil.rmtree(p)\n"
|
|
" else:\n"
|
|
" print('is a directory: ' + str(p), file=sys.stderr); sys.exit(2)\n"
|
|
" else:\n"
|
|
# NOTE: avoid ``unlink(missing_ok=True)`` — that kwarg lands in
|
|
# Python 3.8 and the remote interpreter (docker/ssh) may still
|
|
# be 3.7 on older distros. The FileNotFoundError handler below
|
|
# covers the same case and works back to 3.4.
|
|
" p.unlink()\n"
|
|
"except FileNotFoundError:\n"
|
|
" pass\n"
|
|
"except Exception as exc:\n"
|
|
" print(str(exc), file=sys.stderr); sys.exit(1)\n"
|
|
)
|
|
|
|
result = self._exec(f"python3 -c {self._escape_shell_arg(snippet)}")
|
|
|
|
# Fall back to ``python`` (Windows / older systems where there's no
|
|
# ``python3`` symlink but a ``python`` binary is on PATH).
|
|
if result.exit_code != 0 and "python3" in (result.stdout or ""):
|
|
result = self._exec(f"python -c {self._escape_shell_arg(snippet)}")
|
|
|
|
if result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to delete {path}: {(result.stdout or '').strip() or 'unknown error'}")
|
|
|
|
return WriteResult()
|
|
|
|
def move_file(self, src: str, dst: str) -> WriteResult:
|
|
"""Move a file via mv."""
|
|
src = self._expand_path(src)
|
|
dst = self._expand_path(dst)
|
|
for p in (src, dst):
|
|
if _is_write_denied(p):
|
|
return WriteResult(error=f"Move denied: {p} is a protected path")
|
|
result = self._exec(
|
|
f"mv {self._escape_shell_arg(src)} {self._escape_shell_arg(dst)}"
|
|
)
|
|
if result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to move {src} -> {dst}: {result.stdout}")
|
|
return WriteResult()
|
|
|
|
# =========================================================================
|
|
# WRITE Implementation
|
|
# =========================================================================
|
|
|
|
def write_file(self, path: str, content: str) -> WriteResult:
|
|
"""
|
|
Write content to a file, creating parent directories as needed.
|
|
|
|
Pipes content through stdin to avoid OS ARG_MAX limits on large
|
|
files. The content never appears in the shell command string —
|
|
only the file path does.
|
|
|
|
After the write, runs a post-first / pre-lazy lint check via
|
|
``_check_lint_delta()``. If the new content is clean, the lint
|
|
call is O(one parse). If the new content has errors, the pre-write
|
|
content is linted too and only errors newly introduced by this
|
|
write are surfaced — pre-existing problems are filtered out so
|
|
the agent isn't distracted chasing them.
|
|
|
|
Args:
|
|
path: File path to write
|
|
content: Content to write
|
|
|
|
Returns:
|
|
WriteResult with bytes written, lint summary, or error.
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Block writes to sensitive paths
|
|
if _is_write_denied(path):
|
|
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
|
|
|
# Capture pre-write content. Two consumers want it:
|
|
#
|
|
# 1. The lint-delta layer (for in-process linters like ast.parse
|
|
# and json.loads) needs the previous content to compute the
|
|
# set of NEW lint errors introduced by this write.
|
|
# 2. The LSP layer needs pre/post content to build a line-shift
|
|
# map — pre-existing diagnostics below the edit point shift
|
|
# when lines are added/removed, and the shift map remaps
|
|
# baseline diagnostics into post-edit coordinates so the
|
|
# strict (range-aware) delta key matches.
|
|
#
|
|
# The set of extensions we capture pre_content for is therefore
|
|
# the UNION of in-process lint coverage and LSP coverage. For
|
|
# extensions outside both sets (binaries, opaque formats),
|
|
# skipping the read keeps the hot path fast.
|
|
ext = os.path.splitext(path)[1].lower()
|
|
pre_content: Optional[str] = None
|
|
want_pre = ext in LINTERS_INPROC or self._lsp_handles_extension(ext)
|
|
if want_pre:
|
|
# Best-effort read; failure (file missing, permission) leaves
|
|
# pre_content as None which makes both downstream consumers
|
|
# degrade gracefully (lint reports all errors; LSP skips the
|
|
# shift map).
|
|
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
read_result = self._exec(read_cmd)
|
|
if read_result.exit_code == 0 and read_result.stdout:
|
|
pre_content = read_result.stdout
|
|
|
|
# ── Line-ending preservation (Roo Code pattern) ──────────────
|
|
# If the file existed with CRLF endings and the agent's content
|
|
# has bare LFs, convert to CRLF before writing. Otherwise the
|
|
# write silently normalizes a Windows-line-ending file (and patch
|
|
# produces mixed endings when only a substituted region changes).
|
|
# Detect from a small head sample to avoid reading the full file
|
|
# for line-ending purposes alone.
|
|
original_ending = self._detect_file_line_ending(path, pre_content)
|
|
if original_ending == "\r\n":
|
|
content = _normalize_line_endings(content, "\r\n")
|
|
|
|
# ── BOM preservation ──────────────────────────────────────────
|
|
# If the file on disk started with a UTF-8 BOM, keep it. read_file
|
|
# strips the BOM so the agent never sees it, which means the
|
|
# content it hands back to write_file / patch has no BOM either —
|
|
# without restoring it here a round-trip would silently strip the
|
|
# marker and change the file's byte signature (some Windows
|
|
# toolchains key on it). Only prepend when the original had a BOM
|
|
# and the new content doesn't already carry one (guards against
|
|
# double-BOM if a caller passed raw bytes).
|
|
if self._file_has_bom(path, pre_content) and not _has_bom(content):
|
|
content = _UTF8_BOM + content
|
|
|
|
# Snapshot LSP diagnostics for this file (best-effort) so the
|
|
# post-write LSP layer can return only diagnostics introduced
|
|
# by this specific edit. Mirrors claude-code's
|
|
# ``beforeFileEdited`` pattern but wired to the local LSP
|
|
# rather than an external IDE.
|
|
self._snapshot_lsp_baseline(path)
|
|
|
|
# Create parent directories
|
|
parent = os.path.dirname(path)
|
|
dirs_created = False
|
|
|
|
if parent:
|
|
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
|
|
mkdir_result = self._exec(mkdir_cmd)
|
|
if mkdir_result.exit_code == 0:
|
|
dirs_created = True
|
|
|
|
# Write atomically: stream into a temp file in the SAME directory,
|
|
# then ``mv`` it over the target. The rename is atomic on POSIX
|
|
# (and on every backend FS we run on), so a crash / power loss /
|
|
# truncated pipe mid-write leaves the original file intact instead
|
|
# of a half-written corrupt file. Same-directory is load-bearing —
|
|
# ``mv`` across filesystems degrades to copy+unlink, which is NOT
|
|
# atomic; keeping the temp beside the target guarantees a real
|
|
# rename. Content still rides stdin so there's no ARG_MAX limit.
|
|
#
|
|
# The temp file is created with ``mktemp`` (collision-safe) when the
|
|
# backend has it, falling back to a PID-stamped name otherwise. We
|
|
# then chmod the temp to match the existing file's mode (if any) so
|
|
# the atomic swap doesn't silently widen or narrow permissions, and
|
|
# clean the temp up on any failure so we never leak a ``.hermes-tmp``
|
|
# turd next to the user's file.
|
|
write_result = self._atomic_write(path, content)
|
|
|
|
if write_result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
|
|
|
|
# Get bytes written (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
|
|
try:
|
|
bytes_written = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
bytes_written = len(content.encode('utf-8'))
|
|
|
|
# Post-write lint with delta refinement.
|
|
lint_result = self._check_lint_delta(path, pre_content=pre_content, post_content=content)
|
|
|
|
# Semantic diagnostics from the LSP layer — separate channel.
|
|
# Only fired when the syntax tier reported clean (no point asking
|
|
# an LSP for a file that won't even parse). Pass pre/post
|
|
# content so the LSP layer can build a line-shift map and
|
|
# remap baseline diagnostics into post-edit coordinates.
|
|
# Best-effort: ``""`` is returned for any failure path.
|
|
lsp_diagnostics: Optional[str] = None
|
|
if lint_result.success or lint_result.skipped:
|
|
block = self._maybe_lsp_diagnostics(
|
|
path, pre_content=pre_content, post_content=content
|
|
)
|
|
if block:
|
|
lsp_diagnostics = block
|
|
|
|
return WriteResult(
|
|
bytes_written=bytes_written,
|
|
dirs_created=dirs_created,
|
|
lint=lint_result.to_dict() if lint_result else None,
|
|
lsp_diagnostics=lsp_diagnostics,
|
|
)
|
|
|
|
# =========================================================================
|
|
# PATCH Implementation (Replace Mode)
|
|
# =========================================================================
|
|
|
|
def patch_replace(self, path: str, old_string: str, new_string: str,
|
|
replace_all: bool = False) -> PatchResult:
|
|
"""
|
|
Replace text in a file using fuzzy matching.
|
|
|
|
Args:
|
|
path: File path to modify
|
|
old_string: Text to find (must be unique unless replace_all=True)
|
|
new_string: Replacement text
|
|
replace_all: If True, replace all occurrences
|
|
|
|
Returns:
|
|
PatchResult with diff and lint results
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Block writes to sensitive paths
|
|
if _is_write_denied(path):
|
|
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
|
|
|
# Read current content
|
|
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
read_result = self._exec(read_cmd)
|
|
|
|
if read_result.exit_code != 0:
|
|
return PatchResult(error=f"Failed to read file: {path}")
|
|
|
|
content = read_result.stdout
|
|
# Strip a leading UTF-8 BOM before matching so the fuzzy matcher and
|
|
# the diff operate on clean content (a phantom U+FEFF before line 1
|
|
# defeats an exact first-line match). write_file restores the BOM on
|
|
# the way back out by re-probing the on-disk file, so the round-trip
|
|
# preserves the marker.
|
|
content, _ = _strip_bom(content)
|
|
|
|
# Import and use fuzzy matching
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
new_content, match_count, _strategy, error = fuzzy_find_and_replace(
|
|
content, old_string, new_string, replace_all
|
|
)
|
|
|
|
if error or match_count == 0:
|
|
err_msg = error or f"Could not find match for old_string in {path}"
|
|
try:
|
|
from tools.fuzzy_match import format_no_match_hint
|
|
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
|
|
except Exception:
|
|
pass
|
|
return PatchResult(error=err_msg)
|
|
|
|
# ── Line-ending preservation ──────────────────────────────────
|
|
# Models nearly always send old_string/new_string with bare LF
|
|
# in tool args (JSON-encoded), but the file may have CRLF on
|
|
# disk. After fuzzy_find_and_replace, ``new_content`` is a
|
|
# mixed-ending string: the substituted region is LF, surrounding
|
|
# text keeps the file's CRLF. Normalize the whole thing to the
|
|
# file's detected line ending so the on-disk file is consistent
|
|
# and the unified diff below reflects the actual change.
|
|
file_ending = _detect_line_ending(content)
|
|
if file_ending:
|
|
new_content = _normalize_line_endings(new_content, file_ending)
|
|
|
|
# Write back
|
|
write_result = self.write_file(path, new_content)
|
|
if write_result.error:
|
|
return PatchResult(error=f"Failed to write changes: {write_result.error}")
|
|
|
|
# Post-write verification — re-read the file and confirm the bytes we
|
|
# intended to write actually landed. Catches silent persistence
|
|
# failures (backend FS oddities, race with another task, truncated
|
|
# pipe, etc.) that would otherwise return success-with-diff while the
|
|
# file is unchanged on disk.
|
|
verify_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
verify_result = self._exec(verify_cmd)
|
|
if verify_result.exit_code != 0:
|
|
return PatchResult(error=f"Post-write verification failed: could not re-read {path}")
|
|
# Normalize line endings before comparing. On Windows, Python's
|
|
# default text-mode ``open()`` translates ``\n`` → ``\r\n`` on
|
|
# write, so the file on disk legitimately holds CRLFs while our
|
|
# ``new_content`` string has bare LFs. Without this normalization
|
|
# every patch on Windows returns a bogus "wrote 39, read 42"
|
|
# false-negative even though the edit landed correctly. POSIX
|
|
# backends don't translate, so this is a no-op there. We also
|
|
# strip a leading BOM from the re-read: write_file restored the
|
|
# marker on disk but ``new_content`` is the BOM-less string we
|
|
# matched against, so the comparison must drop it to stay
|
|
# apples-to-apples.
|
|
_verify_bomless, _ = _strip_bom(verify_result.stdout)
|
|
_verify_stdout_normalized = _verify_bomless.replace("\r\n", "\n").replace("\r", "\n")
|
|
_new_content_normalized = new_content.replace("\r\n", "\n").replace("\r", "\n")
|
|
if _verify_stdout_normalized != _new_content_normalized:
|
|
return PatchResult(error=(
|
|
f"Post-write verification failed for {path}: on-disk content "
|
|
f"differs from intended write "
|
|
f"(wrote {len(_new_content_normalized)} chars, read back "
|
|
f"{len(_verify_stdout_normalized)} chars after normalizing line endings). "
|
|
"The patch did not persist. Re-read the file and try again."
|
|
))
|
|
|
|
# Generate diff
|
|
diff = self._unified_diff(content, new_content, path)
|
|
|
|
# Auto-lint with delta refinement: only surface errors introduced
|
|
# by this patch, filtering out pre-existing lint failures so the
|
|
# agent isn't distracted by problems that were already there.
|
|
lint_result = self._check_lint_delta(path, pre_content=content, post_content=new_content)
|
|
|
|
return PatchResult(
|
|
success=True,
|
|
diff=diff,
|
|
files_modified=[path],
|
|
lint=lint_result.to_dict() if lint_result else None,
|
|
# Propagate the LSP diagnostics already captured by the
|
|
# internal ``write_file`` call. Its baseline was the
|
|
# pre-patch content (taken at the start of write_file via
|
|
# ``_snapshot_lsp_baseline``) so the delta is correct for
|
|
# the patch as a whole. Keep the field separate from the
|
|
# syntax-check ``lint`` so the agent can read both signals.
|
|
lsp_diagnostics=write_result.lsp_diagnostics,
|
|
)
|
|
|
|
def patch_v4a(self, patch_content: str) -> PatchResult:
|
|
"""
|
|
Apply a V4A format patch.
|
|
|
|
V4A format:
|
|
*** Begin Patch
|
|
*** Update File: path/to/file.py
|
|
@@ context hint @@
|
|
context line
|
|
-removed line
|
|
+added line
|
|
*** End Patch
|
|
|
|
Args:
|
|
patch_content: V4A format patch string
|
|
|
|
Returns:
|
|
PatchResult with changes made
|
|
"""
|
|
# Import patch parser
|
|
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
|
|
|
operations, parse_error = parse_v4a_patch(patch_content)
|
|
if parse_error:
|
|
return PatchResult(error=f"Failed to parse patch: {parse_error}")
|
|
|
|
# Apply operations
|
|
result = apply_v4a_operations(operations, self)
|
|
return result
|
|
|
|
def _check_lint(self, path: str, content: Optional[str] = None) -> LintResult:
|
|
"""
|
|
Run syntax check on a file after editing.
|
|
|
|
Prefers the in-process linter for structured formats (JSON, YAML,
|
|
TOML) when possible — those parse via the Python stdlib in
|
|
microseconds and don't require a subprocess. Falls back to the
|
|
shell linter table for compiled/type-checked languages
|
|
(py_compile, node --check, tsc, go vet, rustfmt).
|
|
|
|
Args:
|
|
path: File path (used to select the linter + for shell invocation).
|
|
content: Optional file content. If provided AND an in-process
|
|
linter matches the extension, we lint the content
|
|
directly without re-reading the file from disk. Ignored
|
|
for shell linters.
|
|
|
|
Returns:
|
|
LintResult with status and any errors.
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
|
# Prefer in-process linter when available.
|
|
inproc = LINTERS_INPROC.get(ext)
|
|
if inproc is not None:
|
|
# Need content — either passed in or read from disk.
|
|
if content is None:
|
|
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
read_result = self._exec(read_cmd)
|
|
if read_result.exit_code != 0:
|
|
return LintResult(skipped=True, message=f"Failed to read {path} for lint")
|
|
content = read_result.stdout
|
|
ok, err = inproc(content)
|
|
if err == "__SKIP__":
|
|
return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)")
|
|
return LintResult(success=ok, output="" if ok else err)
|
|
|
|
# Fall back to shell linter.
|
|
if ext not in LINTERS:
|
|
return LintResult(skipped=True, message=f"No linter for {ext} files")
|
|
|
|
# If a real LSP server is active and claims this file, skip the
|
|
# shell linter for extensions whose per-file shell invocation is
|
|
# structurally weaker / floods phantom errors. See
|
|
# ``_SHELL_LINTER_LSP_REDUNDANT`` above for the rationale per ext.
|
|
# The LSP tier runs separately via ``_maybe_lsp_diagnostics`` and
|
|
# carries the real diagnostics in ``lsp_diagnostics`` on the
|
|
# WriteResult / PatchResult.
|
|
if ext in _SHELL_LINTER_LSP_REDUNDANT and self._lsp_will_handle(path):
|
|
return LintResult(
|
|
skipped=True,
|
|
message=f"LSP server handles {ext} — shell linter skipped",
|
|
)
|
|
|
|
linter_cmd = LINTERS[ext]
|
|
# Extract the base command (first word)
|
|
base_cmd = linter_cmd.split()[0]
|
|
|
|
if not self._has_command(base_cmd):
|
|
return LintResult(skipped=True, message=f"{base_cmd} not available")
|
|
|
|
# Run linter
|
|
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
|
|
result = self._exec(cmd, timeout=30)
|
|
|
|
if result.exit_code != 0 and _looks_like_linter_unusable(base_cmd, result.stdout):
|
|
# The linter command exists on PATH but couldn't actually run
|
|
# (e.g. ``npx tsc`` when tsc isn't in node_modules; ``rustfmt
|
|
# --check`` without a Cargo project). This is a tooling gap,
|
|
# not a real lint failure — surface it as ``skipped`` so the
|
|
# write doesn't get flagged AND so the LSP tier still runs.
|
|
from tools.ansi_strip import strip_ansi
|
|
cleaned = strip_ansi(result.stdout).strip()
|
|
# Collapse to a single line — the npx banner is multi-line ASCII.
|
|
first_line = next(
|
|
(ln.strip() for ln in cleaned.splitlines() if ln.strip()),
|
|
cleaned[:120],
|
|
)
|
|
return LintResult(
|
|
skipped=True,
|
|
message=f"{base_cmd} not usable: {first_line[:200]}",
|
|
)
|
|
|
|
return LintResult(
|
|
success=result.exit_code == 0,
|
|
output=result.stdout.strip() if result.stdout.strip() else ""
|
|
)
|
|
|
|
def _check_lint_delta(self, path: str, pre_content: Optional[str],
|
|
post_content: Optional[str] = None) -> LintResult:
|
|
"""
|
|
Run post-write syntax lint with pre-write baseline comparison.
|
|
|
|
Two-tier strategy:
|
|
|
|
1. **Syntax check** (in-process or shell-based, microseconds).
|
|
Catches the bug class that motivated this layer: corrupt
|
|
writes, mashed quotes, truncated output. Hot path.
|
|
|
|
2. **Delta refinement against pre-write content** when the
|
|
syntax tier reports errors. Filter out errors that already
|
|
existed pre-edit so the agent isn't distracted by inherited
|
|
state.
|
|
|
|
Semantic diagnostics from the LSP layer are fetched separately
|
|
via :meth:`_maybe_lsp_diagnostics` and surfaced in the
|
|
``lsp_diagnostics`` field on :class:`WriteResult` /
|
|
:class:`PatchResult`. Keeping the two channels separate lets
|
|
the agent (and any downstream parsers) read syntax errors and
|
|
semantic errors as independent signals.
|
|
|
|
Args:
|
|
path: File path (for linter selection).
|
|
pre_content: File content BEFORE the write. Pass None for new
|
|
files or when the pre-state isn't available — the
|
|
delta refinement is skipped and all post errors
|
|
are returned.
|
|
post_content: File content AFTER the write. Optional; if None,
|
|
the shell linter reads from disk (same as
|
|
_check_lint).
|
|
|
|
Returns:
|
|
LintResult. ``output`` contains either the full post-lint
|
|
errors (no pre-state) or just the new-error lines (delta
|
|
refinement applied).
|
|
"""
|
|
post = self._check_lint(path, content=post_content)
|
|
|
|
# Hot path: clean post-write syntactically.
|
|
if post.success or post.skipped:
|
|
return post
|
|
|
|
# Post-write has syntax errors. If we have pre-content, run the
|
|
# delta refinement to filter out pre-existing errors.
|
|
if pre_content is None:
|
|
return post
|
|
|
|
pre = self._check_lint(path, content=pre_content)
|
|
if pre.success or pre.skipped or not pre.output:
|
|
# Pre-write was clean (or we couldn't lint it) — post errors
|
|
# are all new. Return the full post output.
|
|
return post
|
|
|
|
# Both pre- and post-write had errors. Compute the set-difference
|
|
# on non-empty stripped lines. Caveat: single-error parsers
|
|
# (ast.parse, json.loads) stop at the first error and don't report
|
|
# later ones — if the pre-existing error blocks parsing before
|
|
# reaching the edit region, we can't prove the edit is clean. So
|
|
# if every post error also appeared pre-edit, we report the file
|
|
# as still broken but annotate that this edit introduced nothing
|
|
# new on top — the agent knows it's inherited state, not fresh
|
|
# damage, without silently dropping the error.
|
|
pre_lines = {ln.strip() for ln in pre.output.splitlines() if ln.strip()}
|
|
post_lines = [ln for ln in post.output.splitlines() if ln.strip() and ln.strip() not in pre_lines]
|
|
|
|
if not post_lines:
|
|
# Every error in post was also in pre — this edit didn't make
|
|
# anything obviously worse, but the file remains broken and
|
|
# the agent should know.
|
|
return LintResult(
|
|
success=False,
|
|
output=post.output,
|
|
message="Pre-existing lint errors — this edit didn't introduce new ones but the file is still broken.",
|
|
)
|
|
|
|
return LintResult(
|
|
success=False,
|
|
output=(
|
|
"New lint errors introduced by this edit "
|
|
"(pre-existing errors filtered out):\n" + "\n".join(post_lines)
|
|
)
|
|
)
|
|
|
|
def _lsp_local_only(self) -> bool:
|
|
"""Return True iff this FileOperations is wired to a local backend.
|
|
|
|
LSP servers run on the host process — they need access to the
|
|
files they're linting. Remote/sandboxed backends (Docker,
|
|
Modal, SSH, Daytona) keep files inside the sandbox where the
|
|
host-side LSP server can't reach them, so we skip the LSP
|
|
path for those entirely.
|
|
"""
|
|
env = getattr(self, "env", None)
|
|
if env is None:
|
|
# Defensive: some tests construct ShellFileOperations via
|
|
# ``__new__`` without going through ``__init__``, so
|
|
# ``self.env`` may be missing. No env = no LSP path.
|
|
return False
|
|
try:
|
|
from tools.environments.local import LocalEnvironment
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
return isinstance(env, LocalEnvironment)
|
|
|
|
def _lsp_handles_extension(self, ext: str) -> bool:
|
|
"""Return True iff some registered LSP server claims this extension.
|
|
|
|
Used to decide whether to capture pre-write content for the
|
|
line-shift map. Capturing is cheap (one ``cat`` on the host)
|
|
but pointless if no LSP would ever look at the file.
|
|
|
|
Safe to call on remote backends — the registry is purely
|
|
in-process metadata; we still gate the actual LSP path on
|
|
:meth:`_lsp_local_only`.
|
|
"""
|
|
if not ext:
|
|
return False
|
|
try:
|
|
from agent.lsp.servers import SERVERS
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
ext_lower = ext.lower()
|
|
for srv in SERVERS:
|
|
if ext_lower in srv.extensions:
|
|
return True
|
|
return False
|
|
|
|
def _lsp_will_handle(self, path: str) -> bool:
|
|
"""Return True iff the LSP service is active AND will lint this file.
|
|
|
|
Stronger than :meth:`_lsp_handles_extension` — that one only checks
|
|
the static server registry. This one additionally requires the
|
|
LSP service to be configured/enabled and the file to pass
|
|
:meth:`agent.lsp.manager.LSPService.enabled_for` (which gates on
|
|
workspace detection, disabled-server set, and the broken-pair
|
|
short-circuit).
|
|
|
|
Used by :meth:`_check_lint` to decide whether to skip the per-file
|
|
shell linter for extensions in ``_SHELL_LINTER_LSP_REDUNDANT``.
|
|
|
|
Best-effort: any failure path returns False so the shell linter
|
|
runs as before — never suppress lint based on an LSP probe that
|
|
couldn't actually answer the question.
|
|
"""
|
|
if not self._lsp_local_only():
|
|
return False
|
|
try:
|
|
from agent.lsp import get_service
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
try:
|
|
svc = get_service()
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
if svc is None:
|
|
return False
|
|
try:
|
|
return bool(svc.enabled_for(path))
|
|
except Exception: # noqa: BLE001
|
|
return False
|
|
|
|
def _snapshot_lsp_baseline(self, path: str) -> None:
|
|
"""Capture pre-edit LSP diagnostics so the post-write delta is correct.
|
|
|
|
Best-effort. Silent on every failure path — LSP is an
|
|
enrichment layer and must never break a write.
|
|
|
|
Skipped entirely on non-local backends (Docker, Modal, SSH,
|
|
etc.) — the server can't see files inside the sandbox.
|
|
"""
|
|
if not self._lsp_local_only():
|
|
return
|
|
try:
|
|
from agent.lsp import get_service
|
|
svc = get_service()
|
|
except Exception: # noqa: BLE001
|
|
return
|
|
if svc is None:
|
|
return
|
|
try:
|
|
svc.snapshot_baseline(path)
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
def _maybe_lsp_diagnostics(
|
|
self,
|
|
path: str,
|
|
*,
|
|
pre_content: Optional[str] = None,
|
|
post_content: Optional[str] = None,
|
|
) -> str:
|
|
"""Best-effort LSP semantic diagnostics for ``path``.
|
|
|
|
Returns a formatted ``<diagnostics>`` block, or empty string
|
|
when LSP is unavailable / disabled / produced no errors.
|
|
|
|
When both ``pre_content`` and ``post_content`` are provided,
|
|
a line-shift map is built and passed to the LSPService so
|
|
baseline diagnostics are remapped into post-edit coordinates
|
|
before the set-difference. Without this, edits that delete
|
|
or insert lines surface every pre-existing diagnostic below
|
|
the edit point as "introduced by this edit".
|
|
|
|
Wraps everything in a try/except so a misbehaving LSP server
|
|
can't break a write. This intentionally swallows all errors
|
|
— the calling tier already returned a clean syntax result, so
|
|
``""`` here just means "no extra info to add".
|
|
|
|
Skipped entirely on non-local backends (Docker, Modal, SSH,
|
|
etc.) — same reasoning as ``_snapshot_lsp_baseline``.
|
|
"""
|
|
if not self._lsp_local_only():
|
|
return ""
|
|
try:
|
|
from agent.lsp import get_service
|
|
except Exception: # noqa: BLE001
|
|
return ""
|
|
try:
|
|
svc = get_service()
|
|
except Exception: # noqa: BLE001
|
|
return ""
|
|
if svc is None or not svc.enabled_for(path):
|
|
return ""
|
|
|
|
# Build a line-shift map when we have both pre and post — it
|
|
# remaps baseline diagnostics into post-edit coordinates so
|
|
# the strict (range-aware) delta key matches correctly.
|
|
line_shift = None
|
|
if pre_content is not None and post_content is not None and pre_content != post_content:
|
|
try:
|
|
from agent.lsp.range_shift import build_line_shift
|
|
line_shift = build_line_shift(pre_content, post_content)
|
|
except Exception: # noqa: BLE001
|
|
line_shift = None
|
|
|
|
try:
|
|
diagnostics = svc.get_diagnostics_sync(path, delta=True, line_shift=line_shift)
|
|
except Exception: # noqa: BLE001
|
|
return ""
|
|
if not diagnostics:
|
|
return ""
|
|
try:
|
|
from agent.lsp.reporter import report_for_file, truncate
|
|
block = report_for_file(path, diagnostics)
|
|
if not block:
|
|
return ""
|
|
return truncate("LSP diagnostics introduced by this edit:\n" + block)
|
|
except Exception: # noqa: BLE001
|
|
return ""
|
|
|
|
# =========================================================================
|
|
# SEARCH Implementation
|
|
# =========================================================================
|
|
|
|
def search(self, pattern: str, path: str = ".", target: str = "content",
|
|
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
|
output_mode: str = "content", context: int = 0) -> SearchResult:
|
|
"""
|
|
Search for content or files.
|
|
|
|
Args:
|
|
pattern: Regex (for content) or glob pattern (for files)
|
|
path: Directory/file to search (default: cwd)
|
|
target: "content" (grep) or "files" (glob)
|
|
file_glob: File pattern filter for content search (e.g., "*.py")
|
|
limit: Max results (default 50)
|
|
offset: Skip first N results
|
|
output_mode: "content", "files_only", or "count"
|
|
context: Lines of context around matches
|
|
|
|
Returns:
|
|
SearchResult with matches or file list
|
|
"""
|
|
offset, limit = normalize_search_pagination(offset, limit)
|
|
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Validate that the path exists before searching
|
|
check = self._exec(f"test -e {self._escape_shell_arg(path)} && echo exists || echo not_found")
|
|
if "not_found" in check.stdout:
|
|
# Try to suggest nearby paths
|
|
parent = os.path.dirname(path) or "."
|
|
basename_query = os.path.basename(path)
|
|
hint_parts = [f"Path not found: {path}"]
|
|
# Check if parent directory exists and list similar entries
|
|
parent_check = self._exec(
|
|
f"test -d {self._escape_shell_arg(parent)} && echo yes || echo no"
|
|
)
|
|
if "yes" in parent_check.stdout and basename_query:
|
|
ls_result = self._exec(
|
|
f"ls -1 {self._escape_shell_arg(parent)} 2>/dev/null | head -20"
|
|
)
|
|
if ls_result.exit_code == 0 and ls_result.stdout.strip():
|
|
lower_q = basename_query.lower()
|
|
candidates = []
|
|
for entry in ls_result.stdout.strip().split('\n'):
|
|
if not entry:
|
|
continue
|
|
le = entry.lower()
|
|
if lower_q in le or le in lower_q or le.startswith(lower_q[:3]):
|
|
candidates.append(os.path.join(parent, entry))
|
|
if candidates:
|
|
hint_parts.append(
|
|
"Similar paths: " + ", ".join(candidates[:5])
|
|
)
|
|
return SearchResult(
|
|
error=". ".join(hint_parts),
|
|
total_count=0
|
|
)
|
|
|
|
if target == "files":
|
|
return self._search_files(pattern, path, limit, offset)
|
|
else:
|
|
return self._search_content(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
|
|
def _search_files(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
|
"""Search for files by name pattern (glob-like)."""
|
|
# Auto-prepend **/ for recursive search if not already present
|
|
if not pattern.startswith('**/') and '/' not in pattern:
|
|
search_pattern = pattern
|
|
else:
|
|
search_pattern = pattern.split('/')[-1]
|
|
|
|
search_root = Path(path)
|
|
has_hidden_path_ancestor = any(
|
|
part not in {".", ".."} and part.startswith(".")
|
|
for part in search_root.parts
|
|
)
|
|
|
|
# Prefer ripgrep: respects .gitignore, excludes hidden dirs by
|
|
# default, and has parallel directory traversal (~200x faster than
|
|
# find on wide trees). Mirrors _search_content which already uses rg.
|
|
if self._has_command('rg'):
|
|
return self._search_files_rg(search_pattern, path, limit, offset)
|
|
|
|
# Fallback: find (slower, no .gitignore awareness)
|
|
if not self._has_command('find'):
|
|
return SearchResult(
|
|
error="File search requires 'rg' (ripgrep) or 'find'. "
|
|
"Install ripgrep for best results: "
|
|
"https://github.com/BurntSushi/ripgrep#installation"
|
|
)
|
|
|
|
# Exclude hidden directories (matching ripgrep's default behavior).
|
|
hidden_exclude = "-not -path '*/.*'" if not has_hidden_path_ancestor else ""
|
|
hidden_filter_expr = f" {hidden_exclude}" if hidden_exclude else ""
|
|
|
|
# Use shell pagination for standard roots. For hidden roots, gather full
|
|
# output so we can re-apply hidden-descendant filtering while allowing
|
|
# explicit hidden-root searches.
|
|
pagination_expr = ""
|
|
if not has_hidden_path_ancestor:
|
|
pagination_expr = f" | tail -n +{offset + 1} | head -n {limit}"
|
|
|
|
cmd = f"find {self._escape_shell_arg(path)}{hidden_filter_expr} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
|
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn{pagination_expr}"
|
|
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
if not result.stdout.strip():
|
|
# Try without -printf (BSD find compatibility -- macOS)
|
|
cmd_simple = f"find {self._escape_shell_arg(path)}{hidden_filter_expr} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
|
f"2>/dev/null | sort -rn{pagination_expr}"
|
|
result = self._exec(cmd_simple, timeout=60)
|
|
|
|
files = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(' ', 1)
|
|
if len(parts) == 2 and parts[0].replace('.', '').isdigit():
|
|
files.append(parts[1])
|
|
else:
|
|
files.append(line)
|
|
|
|
# For explicit hidden roots, find's path-based filtering excludes every
|
|
# file under the hidden path. Apply descendant filtering after command
|
|
# execution so only the explicit root ancestry is bypassed.
|
|
if has_hidden_path_ancestor:
|
|
normalized_root = search_root.resolve()
|
|
filtered_files = []
|
|
for file_path in files:
|
|
try:
|
|
rel_parts = Path(file_path).resolve().relative_to(normalized_root).parts
|
|
except ValueError:
|
|
rel_parts = Path(file_path).parts
|
|
if any(part not in {".", ".."} and part.startswith(".") for part in rel_parts):
|
|
continue
|
|
filtered_files.append(file_path)
|
|
files = filtered_files[offset:offset + limit]
|
|
# pagination for standard roots is already applied in shell
|
|
|
|
return SearchResult(
|
|
files=files,
|
|
total_count=len(files)
|
|
)
|
|
|
|
def _search_files_rg(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
|
"""Search for files by name using ripgrep's --files mode.
|
|
|
|
rg --files respects .gitignore and excludes hidden directories by
|
|
default, and uses parallel directory traversal for ~200x speedup
|
|
over find on wide trees. Results are sorted by modification time
|
|
(most recently edited first) when rg >= 13.0 supports --sortr.
|
|
"""
|
|
# rg --files -g uses glob patterns; wrap bare names so they match
|
|
# at any depth (equivalent to find -name).
|
|
if '/' not in pattern and not pattern.startswith('*'):
|
|
glob_pattern = f"*{pattern}"
|
|
else:
|
|
glob_pattern = pattern
|
|
|
|
fetch_limit = limit + offset
|
|
# Try mtime-sorted first (rg 13+); fall back to unsorted if not supported.
|
|
cmd_sorted = (
|
|
f"rg --files --sortr=modified -g {self._escape_shell_arg(glob_pattern)} "
|
|
f"{self._escape_shell_arg(path)} 2>/dev/null "
|
|
f"| head -n {fetch_limit}"
|
|
)
|
|
result = self._exec(cmd_sorted, timeout=60)
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
|
|
if not all_files:
|
|
# --sortr may have failed on older rg; retry without it.
|
|
cmd_plain = (
|
|
f"rg --files -g {self._escape_shell_arg(glob_pattern)} "
|
|
f"{self._escape_shell_arg(path)} 2>/dev/null "
|
|
f"| head -n {fetch_limit}"
|
|
)
|
|
result = self._exec(cmd_plain, timeout=60)
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
|
|
page = all_files[offset:offset + limit]
|
|
|
|
return SearchResult(
|
|
files=page,
|
|
total_count=len(all_files),
|
|
truncated=len(all_files) >= fetch_limit,
|
|
)
|
|
|
|
def _search_content(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Search for content inside files (grep-like)."""
|
|
# Try ripgrep first (fast), fallback to grep (slower but works)
|
|
if self._has_command('rg'):
|
|
return self._search_with_rg(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
elif self._has_command('grep'):
|
|
return self._search_with_grep(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
else:
|
|
# Neither rg nor grep available (Windows without Git Bash, etc.)
|
|
return SearchResult(
|
|
error="Content search requires ripgrep (rg) or grep. "
|
|
"Install ripgrep: https://github.com/BurntSushi/ripgrep#installation"
|
|
)
|
|
|
|
def _search_with_rg(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Search using ripgrep."""
|
|
cmd_parts = ["rg", "--line-number", "--no-heading", "--with-filename"]
|
|
|
|
# Add context if requested
|
|
if context > 0:
|
|
cmd_parts.extend(["-C", str(context)])
|
|
|
|
# Add file glob filter (must be quoted to prevent shell expansion)
|
|
if file_glob:
|
|
cmd_parts.extend(["--glob", self._escape_shell_arg(file_glob)])
|
|
|
|
# Output mode handling
|
|
if output_mode == "files_only":
|
|
cmd_parts.append("-l") # Files only
|
|
elif output_mode == "count":
|
|
cmd_parts.append("-c") # Count per file
|
|
|
|
# Add pattern and path
|
|
cmd_parts.append(self._escape_shell_arg(pattern))
|
|
cmd_parts.append(self._escape_shell_arg(path))
|
|
|
|
# Fetch extra rows so we can report the true total before slicing.
|
|
# For context mode, rg emits separator lines ("--") between groups,
|
|
# so we grab generously and filter in Python.
|
|
fetch_limit = limit + offset + 200 if context > 0 else limit + offset
|
|
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
# rg exit codes: 0=matches found, 1=no matches, 2=error
|
|
if result.exit_code == 2 and not result.stdout.strip():
|
|
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
|
|
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
|
|
|
|
# Parse results based on output mode
|
|
if output_mode == "files_only":
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
total = len(all_files)
|
|
page = all_files[offset:offset + limit]
|
|
return SearchResult(files=page, total_count=total)
|
|
|
|
elif output_mode == "count":
|
|
counts = {}
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.rsplit(':', 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
counts[parts[0]] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
|
|
|
else:
|
|
# Parse content matches and context lines.
|
|
# rg match lines: "file:lineno:content" (colon separator)
|
|
# rg context lines: "file-lineno-content" (dash separator)
|
|
# rg group seps: "--"
|
|
# Note: on Windows, paths contain drive letters (e.g. C:\path),
|
|
# so naive split(":") breaks. Use regex to handle both platforms.
|
|
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
|
|
matches = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or line == "--":
|
|
continue
|
|
|
|
# Try match line first (colon-separated: file:line:content)
|
|
m = _match_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
continue
|
|
|
|
# Try context line (dash-separated: file-line-content)
|
|
# Only attempt if context was requested to avoid false positives
|
|
if context > 0:
|
|
parsed = _parse_search_context_line(line)
|
|
if parsed:
|
|
matches.append(SearchMatch(
|
|
path=parsed[0],
|
|
line_number=parsed[1],
|
|
content=parsed[2][:500]
|
|
))
|
|
|
|
total = len(matches)
|
|
page = matches[offset:offset + limit]
|
|
return SearchResult(
|
|
matches=page,
|
|
total_count=total,
|
|
truncated=total > offset + limit
|
|
)
|
|
|
|
def _search_with_grep(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Fallback search using grep."""
|
|
cmd_parts = ["grep", "-rnH"] # -H forces filename even for single-file searches
|
|
|
|
# Exclude hidden directories (matching ripgrep's default behavior).
|
|
# This prevents searching inside .hub/index-cache/, .git/, etc.
|
|
cmd_parts.append("--exclude-dir='.*'")
|
|
|
|
# Add context if requested
|
|
if context > 0:
|
|
cmd_parts.extend(["-C", str(context)])
|
|
|
|
# Add file pattern filter (must be quoted to prevent shell expansion)
|
|
if file_glob:
|
|
cmd_parts.extend(["--include", self._escape_shell_arg(file_glob)])
|
|
|
|
# Output mode handling
|
|
if output_mode == "files_only":
|
|
cmd_parts.append("-l")
|
|
elif output_mode == "count":
|
|
cmd_parts.append("-c")
|
|
|
|
# Add pattern and path
|
|
cmd_parts.append(self._escape_shell_arg(pattern))
|
|
cmd_parts.append(self._escape_shell_arg(path))
|
|
|
|
# Fetch generously so we can compute total before slicing
|
|
fetch_limit = limit + offset + (200 if context > 0 else 0)
|
|
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
# grep exit codes: 0=matches found, 1=no matches, 2=error
|
|
if result.exit_code == 2 and not result.stdout.strip():
|
|
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
|
|
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
|
|
|
|
if output_mode == "files_only":
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
total = len(all_files)
|
|
page = all_files[offset:offset + limit]
|
|
return SearchResult(files=page, total_count=total)
|
|
|
|
elif output_mode == "count":
|
|
counts = {}
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.rsplit(':', 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
counts[parts[0]] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
|
|
|
else:
|
|
# grep match lines: "file:lineno:content" (colon)
|
|
# grep context lines: "file-lineno-content" (dash)
|
|
# grep group seps: "--"
|
|
# Note: on Windows, paths contain drive letters (e.g. C:\path),
|
|
# so naive split(":") breaks. Use regex to handle both platforms.
|
|
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
|
|
matches = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or line == "--":
|
|
continue
|
|
|
|
m = _match_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
continue
|
|
|
|
if context > 0:
|
|
parsed = _parse_search_context_line(line)
|
|
if parsed:
|
|
matches.append(SearchMatch(
|
|
path=parsed[0],
|
|
line_number=parsed[1],
|
|
content=parsed[2][:500]
|
|
))
|
|
|
|
|
|
total = len(matches)
|
|
page = matches[offset:offset + limit]
|
|
return SearchResult(
|
|
matches=page,
|
|
total_count=total,
|
|
truncated=total > offset + limit
|
|
)
|