fix(gateway): diagnosable MEDIA rejections + canonical cache roots + null-path guard

Operators can now see which MEDIA path was dropped and why, generated
artifacts under the canonical ~/.hermes/cache/{images,...} layout deliver,
and a crafted ~\x00 path no longer aborts the whole attachment batch.

- MEDIA_DELIVERY_SAFE_ROOTS: add canonical cache/{images,audio,videos,
  documents,screenshots} alongside the legacy *_cache dirs (#31733).
- filter_media/local_delivery_paths: log the rejected path (was a blind
  "outside allowed roots") via _log_safe_path, which strips control chars
  and Unicode line separators so a model-emitted path can't forge a log line.
- validate_media_delivery_path + extract_media: guard os.path.expanduser
  so a ~\x00 path returns None / is skipped instead of raising and dropping
  every other attachment in the response.

Salvaged and slimmed from #33251 (780 LOC -> 35): the reason-tag taxonomy,
the parts-eliding redactor, and the extension-partition hoist are dropped in
favor of logging the path directly. All three findings were verified and
reproduced by the contributor.

Co-authored-by: wysie <wysie@users.noreply.github.com>
This commit is contained in:
Teknium
2026-05-29 01:10:48 -07:00
parent 2765b02021
commit e28a668b40
2 changed files with 85 additions and 6 deletions

View File

@ -848,6 +848,13 @@ MEDIA_DELIVERY_SAFE_ROOTS = (
_HERMES_HOME / "video_cache",
_HERMES_HOME / "document_cache",
_HERMES_HOME / "browser_screenshots",
# Canonical cache layout — listed alongside the legacy *_cache dirs so
# generated artifacts deliver on installs that have both (#31733).
_HERMES_HOME / "cache" / "images",
_HERMES_HOME / "cache" / "audio",
_HERMES_HOME / "cache" / "videos",
_HERMES_HOME / "cache" / "documents",
_HERMES_HOME / "cache" / "screenshots",
)
# Default recency window for trusting freshly-produced files (seconds).
@ -1022,7 +1029,11 @@ def validate_media_delivery_path(path: str) -> Optional[str]:
if not candidate:
return None
expanded = Path(os.path.expanduser(candidate))
try:
expanded = Path(os.path.expanduser(candidate))
except (OSError, RuntimeError, ValueError):
# expanduser raises ValueError("embedded null byte") for a ~\x00 path.
return None
if not expanded.is_absolute():
return None
@ -1066,6 +1077,17 @@ def validate_media_delivery_path(path: str) -> Optional[str]:
return None
# Neutralise control chars and the Unicode line separators (NEL, LS, PS) that
# str.splitlines() / log aggregators treat as breaks, so a model-emitted path
# can't forge a second log line. Truncated to keep records bounded.
_LOG_UNSAFE_CHARS = re.compile(r"[\x00-\x1f\x7f\x85\u2028\u2029]")
def _log_safe_path(path: str) -> str:
"""Return a single-line, length-bounded path for log output."""
return _LOG_UNSAFE_CHARS.sub("?", str(path))[:200]
SUPPORTED_DOCUMENT_TYPES = {
".pdf": "application/pdf",
".md": "text/markdown",
@ -2438,11 +2460,12 @@ class BasePlatformAdapter(ABC):
"""Drop unsafe MEDIA paths and normalize accepted paths."""
safe_media: List[Tuple[str, bool]] = []
for media_path, is_voice in media_files or []:
safe_path = validate_media_delivery_path(str(media_path))
raw = str(media_path)
safe_path = validate_media_delivery_path(raw)
if safe_path:
safe_media.append((safe_path, bool(is_voice)))
else:
logger.warning("Skipping unsafe MEDIA directive path outside allowed roots")
logger.warning("Skipping unsafe MEDIA directive path: %s", _log_safe_path(raw))
return safe_media
@staticmethod
@ -2450,11 +2473,12 @@ class BasePlatformAdapter(ABC):
"""Drop unsafe bare local file paths and normalize accepted paths."""
safe_paths: List[str] = []
for file_path in file_paths or []:
safe_path = validate_media_delivery_path(str(file_path))
raw = str(file_path)
safe_path = validate_media_delivery_path(raw)
if safe_path:
safe_paths.append(safe_path)
else:
logger.warning("Skipping unsafe local file path outside allowed roots")
logger.warning("Skipping unsafe local file path: %s", _log_safe_path(raw))
return safe_paths
@staticmethod
@ -2505,7 +2529,12 @@ class BasePlatformAdapter(ABC):
path = path[1:-1].strip()
path = path.lstrip("`\"'").rstrip("`\"',.;:)}]")
if path:
media.append((os.path.expanduser(path), has_voice_tag))
try:
media.append((os.path.expanduser(path), has_voice_tag))
except (OSError, RuntimeError, ValueError):
# Skip a crafted ~\x00 path rather than aborting extraction
# and dropping every other attachment in the response.
continue
# Remove MEDIA tags from content (including surrounding quote/backtick wrappers)
if media:

View File

@ -12,6 +12,7 @@ from gateway.platforms.base import (
MessageEvent,
safe_url_for_log,
utf16_len,
_log_safe_path,
_prefix_within_utf16_limit,
)
@ -1050,3 +1051,52 @@ class TestProxyKwargsForAiohttp:
sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
assert sess_kw == {}
assert req_kw == {"proxy": "http://proxy:8080"}
class TestMediaDeliveryDiagnosability:
"""Diagnosable rejection logging + crafted-path robustness (#33251)."""
def test_rejected_path_appears_in_log(self, tmp_path, caplog):
outside = tmp_path / "outside.ogg"
outside.write_bytes(b"OggS")
with patch.dict(os.environ, {"HERMES_MEDIA_DELIVERY_STRICT": "1",
"HERMES_MEDIA_TRUST_RECENT_FILES": "0"}), \
patch("gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS", ()):
with caplog.at_level("WARNING"):
out = BasePlatformAdapter.filter_media_delivery_paths([(str(outside), False)])
assert out == []
# The dropped path must be in the log so operators can diagnose it.
assert str(outside) in caplog.text
def test_crafted_null_path_does_not_abort_batch(self, tmp_path, monkeypatch):
"""One crafted ~\\x00 path must not drop every other attachment."""
good = tmp_path / "good.png"
good.write_bytes(b"\x89PNG")
monkeypatch.setenv("HERMES_MEDIA_DELIVERY_STRICT", "0")
out = BasePlatformAdapter.filter_media_delivery_paths([
("~\x00evil.png", False),
(str(good), False),
])
assert out == [(str(good.resolve()), False)]
def test_extract_media_tolerates_crafted_null_path(self):
"""extract_media must not raise on a crafted ~\\x00 MEDIA tag."""
content = "here\nMEDIA:`~\x00evil.png`\ntrailing"
# Must not raise ValueError("embedded null byte").
media, cleaned = BasePlatformAdapter.extract_media(content)
assert all("\x00" not in p for p, _ in media)
def test_log_safe_path_neutralises_line_breaks(self):
forged = "/tmp/a.png\nWARNING forged second line"
assert "\n" not in _log_safe_path(forged)
# Unicode separators that split log lines are also neutralised.
for sep in ("\u2028", "\u2029", "\x85"):
assert sep not in _log_safe_path(f"/tmp/a{sep}b.png")
def test_canonical_cache_roots_present(self):
from gateway.platforms.base import MEDIA_DELIVERY_SAFE_ROOTS
roots = {str(r) for r in MEDIA_DELIVERY_SAFE_ROOTS}
assert any(r.endswith("cache/images") for r in roots)
assert any(r.endswith("cache/documents") for r in roots)
# Legacy layout still present.
assert any(r.endswith("image_cache") for r in roots)