fix(session): survive missing FTS5 runtimes

This commit is contained in:
helix4u
2026-05-30 11:13:30 -06:00
committed by Teknium
parent ec67def5bf
commit 355af2c20f
2 changed files with 322 additions and 91 deletions

View File

@ -72,6 +72,15 @@ _last_init_error_lock = threading.Lock()
_wal_fallback_warned_paths: set[str] = set()
_wal_fallback_warned_lock = threading.Lock()
_FTS_TRIGGERS = (
"messages_fts_insert",
"messages_fts_delete",
"messages_fts_update",
"messages_fts_trigram_insert",
"messages_fts_trigram_delete",
"messages_fts_trigram_update",
)
def _set_last_init_error(msg: Optional[str]) -> None:
"""Record (or clear) the most recent state.db init failure.
@ -381,6 +390,7 @@ class SessionDB:
self._lock = threading.Lock()
self._write_count = 0
self._fts_enabled = False
self._fts_unavailable_warned = False
try:
self._conn = sqlite3.connect(
str(self.db_path),
@ -417,6 +427,111 @@ class SessionDB:
# ── Core write helper ──
@staticmethod
def _is_fts5_unavailable_error(exc: sqlite3.OperationalError) -> bool:
err = str(exc).lower()
return "no such module" in err and "fts5" in err
def _warn_fts5_unavailable(self, exc: sqlite3.OperationalError) -> None:
self._fts_enabled = False
if self._fts_unavailable_warned:
return
self._fts_unavailable_warned = True
logger.warning(
"SQLite FTS5 unavailable for %s; full-text session search "
"disabled. This usually means Hermes is running on an "
"unsupported install (e.g. a pip-installed or pip-managed "
"Python whose bundled SQLite lacks FTS5) rather than a "
"mainline install. Some features may be missing or behave "
"differently. Install the supported way: "
"https://hermes-agent.nousresearch.com (underlying error: %s)",
self.db_path,
exc,
)
def _sqlite_supports_fts5(self, cursor: sqlite3.Cursor) -> bool:
try:
cursor.execute("CREATE VIRTUAL TABLE temp._hermes_fts5_probe USING fts5(x)")
cursor.execute("DROP TABLE temp._hermes_fts5_probe")
return True
except sqlite3.OperationalError as exc:
if not self._is_fts5_unavailable_error(exc):
raise
self._warn_fts5_unavailable(exc)
return False
@staticmethod
def _drop_fts_triggers(cursor: sqlite3.Cursor) -> None:
for trigger in _FTS_TRIGGERS:
try:
cursor.execute(f"DROP TRIGGER IF EXISTS {trigger}")
except sqlite3.OperationalError:
pass
@staticmethod
def _fts_trigger_count(cursor: sqlite3.Cursor) -> int:
placeholders = ",".join("?" for _ in _FTS_TRIGGERS)
row = cursor.execute(
f"SELECT COUNT(*) FROM sqlite_master "
f"WHERE type = 'trigger' AND name IN ({placeholders})",
_FTS_TRIGGERS,
).fetchone()
return int(row[0] if not isinstance(row, sqlite3.Row) else row[0])
@staticmethod
def _rebuild_fts_indexes(cursor: sqlite3.Cursor) -> None:
for table_name in ("messages_fts", "messages_fts_trigram"):
cursor.execute(f"DELETE FROM {table_name}")
cursor.execute(
"INSERT INTO messages_fts(rowid, content) "
"SELECT id, "
"COALESCE(content, '') || ' ' || "
"COALESCE(tool_name, '') || ' ' || "
"COALESCE(tool_calls, '') "
"FROM messages"
)
cursor.execute(
"INSERT INTO messages_fts_trigram(rowid, content) "
"SELECT id, "
"COALESCE(content, '') || ' ' || "
"COALESCE(tool_name, '') || ' ' || "
"COALESCE(tool_calls, '') "
"FROM messages"
)
def _fts_table_probe(self, cursor: sqlite3.Cursor, table_name: str) -> Optional[bool]:
try:
cursor.execute(f"SELECT * FROM {table_name} LIMIT 0")
return True
except sqlite3.OperationalError as exc:
if self._is_fts5_unavailable_error(exc):
self._warn_fts5_unavailable(exc)
return None
if "no such table" in str(exc).lower():
return False
raise
def _ensure_fts_schema(
self,
cursor: sqlite3.Cursor,
table_name: str,
ddl: str,
) -> bool:
status = self._fts_table_probe(cursor, table_name)
if status is None:
return False
try:
# Run even when the virtual table exists so any dropped or missing
# triggers are recreated after a previous no-FTS5 runtime disabled
# them to keep message writes working.
cursor.executescript(ddl)
return True
except sqlite3.OperationalError as exc:
if not self._is_fts5_unavailable_error(exc):
raise
self._warn_fts5_unavailable(exc)
return False
def _execute_write(self, fn: Callable[[sqlite3.Connection], T]) -> T:
"""Execute a write transaction with BEGIN IMMEDIATE and jitter retry.
@ -629,6 +744,16 @@ class SessionDB:
except sqlite3.OperationalError as exc:
logger.debug("idx_messages_platform_msg_id create skipped: %s", exc)
fts5_available = self._sqlite_supports_fts5(cursor)
fts_migrations_complete = True
if not fts5_available:
# Existing FTS triggers can still fire on messages INSERT/UPDATE
# even though the current sqlite runtime cannot read the virtual
# tables they target. Drop only the triggers so core persistence
# continues; if a future runtime has FTS5, _ensure_fts_schema()
# recreates them.
self._drop_fts_triggers(cursor)
# ── Schema version bookkeeping ─────────────────────────────────
# Bump to current so future data migrations (if any) can gate on
# version. No version-gated column additions remain.
@ -650,17 +775,24 @@ class SessionDB:
# virtual table + triggers are created unconditionally via
# FTS_TRIGRAM_SQL below, but existing rows need a one-time
# backfill into the FTS index.
try:
cursor.execute("SELECT * FROM messages_fts_trigram LIMIT 0")
_fts_trigram_exists = True
except sqlite3.OperationalError:
_fts_trigram_exists = False
if not _fts_trigram_exists:
cursor.executescript(FTS_TRIGRAM_SQL)
cursor.execute(
"INSERT INTO messages_fts_trigram(rowid, content) "
"SELECT id, content FROM messages WHERE content IS NOT NULL"
if fts5_available:
_fts_trigram_exists = self._fts_table_probe(
cursor, "messages_fts_trigram"
)
if _fts_trigram_exists is False:
if self._ensure_fts_schema(
cursor, "messages_fts_trigram", FTS_TRIGRAM_SQL
):
cursor.execute(
"INSERT INTO messages_fts_trigram(rowid, content) "
"SELECT id, content FROM messages WHERE content IS NOT NULL"
)
else:
fts_migrations_complete = False
elif _fts_trigram_exists is None:
fts_migrations_complete = False
else:
fts_migrations_complete = False
if current_version < 11:
# v11: re-index FTS5 tables to cover tool_name + tool_calls and
# switch from external-content to inline mode. Existing DBs have
@ -668,45 +800,50 @@ class SessionDB:
# overwrite, so we drop them explicitly and let the post-migration
# existence checks (below) recreate them from FTS_SQL /
# FTS_TRIGRAM_SQL, then backfill every message row. Fixes #16751.
for _trig in (
"messages_fts_insert",
"messages_fts_delete",
"messages_fts_update",
"messages_fts_trigram_insert",
"messages_fts_trigram_delete",
"messages_fts_trigram_update",
):
try:
cursor.execute(f"DROP TRIGGER IF EXISTS {_trig}")
except sqlite3.OperationalError:
pass
for _tbl in ("messages_fts", "messages_fts_trigram"):
try:
cursor.execute(f"DROP TABLE IF EXISTS {_tbl}")
except sqlite3.OperationalError:
pass
# Recreate virtual tables + triggers with the new inline-mode
# schema that indexes content || tool_name || tool_calls.
cursor.executescript(FTS_SQL)
cursor.executescript(FTS_TRIGRAM_SQL)
# Backfill both indexes from every existing messages row.
cursor.execute(
"INSERT INTO messages_fts(rowid, content) "
"SELECT id, "
"COALESCE(content, '') || ' ' || "
"COALESCE(tool_name, '') || ' ' || "
"COALESCE(tool_calls, '') "
"FROM messages"
)
cursor.execute(
"INSERT INTO messages_fts_trigram(rowid, content) "
"SELECT id, "
"COALESCE(content, '') || ' ' || "
"COALESCE(tool_name, '') || ' ' || "
"COALESCE(tool_calls, '') "
"FROM messages"
)
if current_version < SCHEMA_VERSION:
if fts5_available:
self._drop_fts_triggers(cursor)
for _tbl in ("messages_fts", "messages_fts_trigram"):
try:
cursor.execute(f"DROP TABLE IF EXISTS {_tbl}")
except sqlite3.OperationalError as exc:
if not self._is_fts5_unavailable_error(exc):
raise
self._warn_fts5_unavailable(exc)
fts5_available = False
fts_migrations_complete = False
break
if fts5_available:
# Recreate virtual tables + triggers with the new inline-mode
# schema that indexes content || tool_name || tool_calls.
if (
self._ensure_fts_schema(cursor, "messages_fts", FTS_SQL)
and self._ensure_fts_schema(
cursor, "messages_fts_trigram", FTS_TRIGRAM_SQL
)
):
# Backfill both indexes from every existing messages row.
cursor.execute(
"INSERT INTO messages_fts(rowid, content) "
"SELECT id, "
"COALESCE(content, '') || ' ' || "
"COALESCE(tool_name, '') || ' ' || "
"COALESCE(tool_calls, '') "
"FROM messages"
)
cursor.execute(
"INSERT INTO messages_fts_trigram(rowid, content) "
"SELECT id, "
"COALESCE(content, '') || ' ' || "
"COALESCE(tool_name, '') || ' ' || "
"COALESCE(tool_calls, '') "
"FROM messages"
)
else:
fts_migrations_complete = False
else:
fts_migrations_complete = False
if current_version < SCHEMA_VERSION and fts_migrations_complete:
cursor.execute(
"UPDATE schema_version SET version = ?",
(SCHEMA_VERSION,),
@ -721,47 +858,22 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Index already exists
# FTS5 setup (separate because CREATE VIRTUAL TABLE can't be in executescript with IF NOT EXISTS reliably)
try:
cursor.execute("SELECT * FROM messages_fts LIMIT 0")
self._fts_enabled = True
except sqlite3.OperationalError as exc:
if "no such table" not in str(exc).lower():
raise
try:
cursor.executescript(FTS_SQL)
self._fts_enabled = True
except sqlite3.OperationalError as fts_exc:
err = str(fts_exc).lower()
if "fts5" not in err and "no such module" not in err:
raise
logger.warning(
"SQLite FTS5 unavailable for %s; full-text session search "
"disabled. This usually means Hermes is running on an "
"unsupported install (e.g. a pip-installed or pip-managed "
"Python whose bundled SQLite lacks FTS5) rather than a "
"mainline install. Some features may be missing or behave "
"differently. Install the supported way: "
"https://hermes-agent.nousresearch.com (underlying error: %s)",
self.db_path,
fts_exc,
)
if fts5_available:
# FTS5 setup. Run the DDL even when the virtual table exists so
# CREATE TRIGGER IF NOT EXISTS repairs trigger-only degradation from
# an earlier no-FTS5 runtime.
triggers_need_repair = self._fts_trigger_count(cursor) < len(_FTS_TRIGGERS)
self._fts_enabled = self._ensure_fts_schema(cursor, "messages_fts", FTS_SQL)
# Trigram FTS5 for CJK/substring search
try:
cursor.execute("SELECT * FROM messages_fts_trigram LIMIT 0")
except sqlite3.OperationalError as exc:
if "no such table" not in str(exc).lower():
raise
try:
cursor.executescript(FTS_TRIGRAM_SQL)
except sqlite3.OperationalError as fts_exc:
err = str(fts_exc).lower()
if "fts5" not in err and "no such module" not in err:
raise
# Same FTS5-unavailable cause already warned about above for
# messages_fts; the trigram table is an additional CJK index,
# so just degrade silently here. CJK search falls back to LIKE.
# Trigram FTS5 for CJK/substring search. This is optional relative
# to the main FTS table; if it cannot be created, CJK search falls
# back to LIKE.
if self._fts_enabled:
trigram_enabled = self._ensure_fts_schema(
cursor, "messages_fts_trigram", FTS_TRIGRAM_SQL
)
if trigram_enabled and triggers_need_repair:
self._rebuild_fts_indexes(cursor)
self._conn.commit()
@ -3560,4 +3672,3 @@ class SessionDB:
(error[:500], session_id),
)
self._execute_write(_do)

View File

@ -4,7 +4,7 @@ import sqlite3
import time
import pytest
from hermes_state import SessionDB
from hermes_state import SCHEMA_SQL, SessionDB
class _NoFtsCursor(sqlite3.Cursor):
@ -12,6 +12,8 @@ class _NoFtsCursor(sqlite3.Cursor):
def execute(self, sql, parameters=()):
probe = sql.strip()
if "USING fts5" in probe:
raise sqlite3.OperationalError("no such module: fts5")
if probe in (
"SELECT * FROM messages_fts LIMIT 0",
"SELECT * FROM messages_fts_trigram LIMIT 0",
@ -30,6 +32,24 @@ class _NoFtsConnection(sqlite3.Connection):
return super().cursor(factory or _NoFtsCursor)
class _NoFtsExistingTableCursor(_NoFtsCursor):
"""Simulate existing FTS virtual tables under a runtime without FTS5."""
def execute(self, sql, parameters=()):
probe = sql.strip()
if probe in (
"SELECT * FROM messages_fts LIMIT 0",
"SELECT * FROM messages_fts_trigram LIMIT 0",
):
raise sqlite3.OperationalError("no such module: fts5")
return super().execute(sql, parameters)
class _NoFtsExistingTableConnection(sqlite3.Connection):
def cursor(self, factory=None):
return super().cursor(factory or _NoFtsExistingTableCursor)
@pytest.fixture()
def db(tmp_path):
"""Create a SessionDB with a temp database file."""
@ -210,6 +230,106 @@ class TestSessionLifecycle:
finally:
db.close()
def test_existing_fts_tables_do_not_break_without_fts5(
self, tmp_path, monkeypatch
):
db_path = tmp_path / "state.db"
seeded = SessionDB(db_path=db_path)
try:
seeded.create_session(session_id="s1", source="cli")
seeded.append_message("s1", role="user", content="before runtime change")
finally:
seeded.close()
real_connect = sqlite3.connect
def connect_without_fts(*args, **kwargs):
kwargs["factory"] = _NoFtsExistingTableConnection
return real_connect(*args, **kwargs)
monkeypatch.setattr("hermes_state.sqlite3.connect", connect_without_fts)
db = SessionDB(db_path=db_path)
try:
assert db._fts_enabled is False
assert db.get_session("s1") is not None
assert len(db.get_messages("s1")) == 1
# Existing FTS triggers must be disabled too; otherwise this write
# would try to insert into an unusable FTS virtual table.
db.append_message("s1", role="assistant", content="after runtime change")
messages = db.get_messages("s1")
assert len(messages) == 2
assert messages[1]["content"] == "after runtime change"
finally:
db.close()
def test_old_schema_without_fts5_does_not_crash(self, tmp_path, monkeypatch):
db_path = tmp_path / "legacy.db"
conn = sqlite3.connect(str(db_path))
conn.executescript(SCHEMA_SQL)
conn.execute("DELETE FROM schema_version")
conn.execute("INSERT INTO schema_version (version) VALUES (?)", (9,))
conn.commit()
conn.close()
real_connect = sqlite3.connect
def connect_without_fts(*args, **kwargs):
kwargs["factory"] = _NoFtsConnection
return real_connect(*args, **kwargs)
monkeypatch.setattr("hermes_state.sqlite3.connect", connect_without_fts)
db = SessionDB(db_path=db_path)
try:
assert db._fts_enabled is False
db.create_session(session_id="s1", source="cli")
db.append_message("s1", role="user", content="legacy no fts")
assert db.get_messages("s1")[0]["content"] == "legacy no fts"
assert db.search_messages("legacy") == []
# Leave the FTS migration version in place so a future FTS-capable
# runtime can still rebuild and backfill the indexes.
row = db._conn.execute("SELECT version FROM schema_version").fetchone()
assert row["version"] == 9
finally:
db.close()
def test_fts_runtime_restores_triggers_after_no_fts_open(
self, tmp_path, monkeypatch
):
db_path = tmp_path / "state.db"
seeded = SessionDB(db_path=db_path)
try:
seeded.create_session(session_id="s1", source="cli")
seeded.append_message("s1", role="user", content="first searchable")
finally:
seeded.close()
real_connect = sqlite3.connect
def connect_without_fts(*args, **kwargs):
kwargs["factory"] = _NoFtsExistingTableConnection
return real_connect(*args, **kwargs)
monkeypatch.setattr("hermes_state.sqlite3.connect", connect_without_fts)
no_fts = SessionDB(db_path=db_path)
try:
no_fts.append_message("s1", role="assistant", content="not indexed yet")
finally:
no_fts.close()
monkeypatch.setattr("hermes_state.sqlite3.connect", real_connect)
restored = SessionDB(db_path=db_path)
try:
assert restored._fts_enabled is True
restored.append_message("s1", role="assistant", content="indexed again")
assert len(restored.search_messages("not indexed yet")) == 1
assert len(restored.search_messages("indexed")) == 2
finally:
restored.close()
# =========================================================================
# Message storage