diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index 2ad341fa1..9a93ba4a4 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -350,10 +350,48 @@ def compress_context( _lock_db = getattr(agent, "_session_db", None) _lock_sid = agent.session_id or "" _lock_holder: Optional[str] = None + # Probe whether the lock subsystem is actually available on this + # SessionDB instance. A process running mismatched module versions + # (e.g. ``conversation_compression.py`` reloaded after a pull but the + # long-lived ``hermes_state.SessionDB`` class still bound to the + # pre-#34351 version in memory) has the call site but not the method. + # In that case ``try_acquire_compression_lock`` raises AttributeError — + # NOT a ``sqlite3.Error`` — so the method's own fail-open guard never + # runs and the exception propagates to the outer agent loop, which + # prints the error and retries. Because compression never succeeds, + # the token count never drops and the loop re-triggers compaction + # forever (the "API call #47/#48/#49 ... has no attribute + # try_acquire_compression_lock" spin). Fail OPEN here: if the lock + # subsystem is missing or broken in any unexpected way, skip locking + # and proceed with compression. Skipping the lock risks a rare + # concurrent-compression session fork; an infinite no-progress loop + # that never compresses at all is strictly worse. if _lock_db is not None and _lock_sid: _lock_holder = _compression_lock_holder(agent) - if not _lock_db.try_acquire_compression_lock(_lock_sid, _lock_holder): - existing = _lock_db.get_compression_lock_holder(_lock_sid) + try: + _lock_acquired = _lock_db.try_acquire_compression_lock( + _lock_sid, _lock_holder + ) + except Exception as _lock_err: + # Broken/absent lock subsystem (version skew, etc.). Log once + # per session and proceed WITHOUT the lock rather than letting + # the exception spin the outer loop. + _lock_holder = None # we don't own anything to release + if getattr(agent, "_last_compression_lock_error_sid", None) != _lock_sid: + agent._last_compression_lock_error_sid = _lock_sid + logger.warning( + "compression lock subsystem unavailable for session=%s " + "(%s: %s) — proceeding without lock. This usually means a " + "stale in-memory module after an update; restart the " + "process (or `hermes update`) to resync.", + _lock_sid, type(_lock_err).__name__, _lock_err, + ) + _lock_acquired = True # treat as acquired-but-unlocked; proceed + if not _lock_acquired: + try: + existing = _lock_db.get_compression_lock_holder(_lock_sid) + except Exception: + existing = None logger.warning( "compression skipped: another path is compressing session=%s " "(holder=%s) — returning messages unchanged to avoid session fork", diff --git a/tests/agent/test_compression_concurrent_fork.py b/tests/agent/test_compression_concurrent_fork.py index 5147f7146..76e8a4592 100644 --- a/tests/agent/test_compression_concurrent_fork.py +++ b/tests/agent/test_compression_concurrent_fork.py @@ -171,3 +171,70 @@ def test_skipped_compression_returns_messages_unchanged(tmp_path: Path) -> None: assert agent.session_id == parent_sid # Compressor was never called (the skip happens before .compress()) agent.context_compressor.compress.assert_not_called() + + +class _NoLockSubsystemDB: + """Wraps a real SessionDB but simulates a pre-#34351 version skew. + + A long-lived process can hold ``hermes_state.SessionDB`` bound to the + OLD class in memory (no compression-lock methods) while a lazily + re-imported ``conversation_compression.py`` calls the NEW lock code. + ``try_acquire_compression_lock`` then raises ``AttributeError`` — which + is NOT a ``sqlite3.Error``, so the method's own fail-open guard never + runs. Before the fix the exception propagated to the outer agent loop, + which printed the error and retried; compression never succeeded, the + token count never dropped, and the loop re-triggered compaction forever. + """ + + def __init__(self, real_db: SessionDB) -> None: + self._real = real_db + + def try_acquire_compression_lock(self, *_a, **_k): # noqa: D401 + raise AttributeError( + "'SessionDB' object has no attribute 'try_acquire_compression_lock'" + ) + + def get_compression_lock_holder(self, *_a, **_k): + raise AttributeError("'SessionDB' object has no attribute 'get_compression_lock_holder'") + + def release_compression_lock(self, *_a, **_k): + raise AttributeError("'SessionDB' object has no attribute 'release_compression_lock'") + + def __getattr__(self, name): + # Everything else (create_session, append, rotation helpers) goes to + # the real db so the post-lock compression + rotation path runs. + return getattr(self._real, name) + + +def test_missing_lock_subsystem_fails_open_not_infinite_loop(tmp_path: Path) -> None: + """Version skew (no lock methods) must fail OPEN, not raise into the loop. + + Reproduces the "API call #47/#48/#49 ... has no attribute + try_acquire_compression_lock" infinite-compaction spin: when the lock + subsystem is absent, ``_compress_context`` must skip locking and proceed + with compression (so the loop makes progress and terminates) instead of + letting the ``AttributeError`` escape to the retry loop. + """ + db = SessionDB(db_path=tmp_path / "state.db") + parent_sid = "SKEW_TEST_SESSION" + db.create_session(parent_sid, source="discord") + + agent = _build_agent_with_db(db, parent_sid) + # Swap in the lock-less wrapper AFTER construction (the agent already + # holds a normal db reference; we only break the lock methods). + agent._session_db = _NoLockSubsystemDB(db) + + messages = [{"role": "user", "content": f"m{i}"} for i in range(20)] + + # MUST NOT raise AttributeError. Before the fix this raised and the + # outer loop would retry forever. + compressed, _sp = agent._compress_context(messages, "sys", approx_tokens=120_000) + + # Compression actually ran (proceeded past the broken lock) and made + # progress, so the auto-compress loop would terminate. + agent.context_compressor.compress.assert_called_once() + assert len(compressed) < len(messages), ( + "Compression made no progress despite failing open — loop would still spin." + ) + # Session rotated (compression succeeded end-to-end). + assert agent.session_id != parent_sid