from __future__ import annotations import sqlite3 import threading from pathlib import Path from hermes_cli import kanban_db as kb def _make_legacy_db(path: Path) -> None: """Write a kanban DB with the pre-AUTOINCREMENT (TEXT PK) schema for the four tables #35096 affects, keeping every other table current so the additive-column migration runs cleanly on top. """ conn = sqlite3.connect(str(path)) conn.executescript(kb.SCHEMA_SQL) conn.executescript( """ DROP TABLE task_events; DROP TABLE task_comments; DROP TABLE task_runs; DROP TABLE kanban_notify_subs; CREATE TABLE task_comments (id TEXT PRIMARY KEY, task_id TEXT NOT NULL, author TEXT NOT NULL, body TEXT NOT NULL, created_at INTEGER NOT NULL); CREATE TABLE task_events (id TEXT PRIMARY KEY, task_id TEXT NOT NULL, kind TEXT NOT NULL, payload TEXT, created_at INTEGER NOT NULL); CREATE TABLE task_runs (id TEXT PRIMARY KEY, task_id TEXT NOT NULL, profile TEXT, status TEXT NOT NULL, started_at INTEGER NOT NULL); CREATE TABLE kanban_notify_subs (task_id TEXT NOT NULL, platform TEXT NOT NULL, chat_id TEXT NOT NULL, thread_id TEXT NOT NULL DEFAULT '', user_id TEXT, created_at INTEGER NOT NULL, last_event_id TEXT, PRIMARY KEY (task_id, platform, chat_id, thread_id)); """ ) conn.execute("INSERT INTO tasks (id, title, status, created_at) VALUES ('task-1', 'T', 'done', 1000)") conn.execute("INSERT INTO task_comments VALUES ('c-1', 'task-1', 'agent', 'hi', 1500)") conn.execute("INSERT INTO task_events VALUES ('e-1', 'task-1', 'completed', NULL, 2000)") conn.execute("INSERT INTO task_events VALUES ('e-2', 'task-1', 'blocked', NULL, 2100)") conn.execute("INSERT INTO task_runs VALUES ('r-1', 'task-1', 'default', 'done', 1000)") conn.execute( "INSERT INTO kanban_notify_subs (task_id, platform, chat_id, created_at, last_event_id) " "VALUES ('task-1', 'telegram', '123', 1000, 'e-1')" ) conn.commit() conn.close() def _setup_home(tmp_path, monkeypatch) -> Path: home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) monkeypatch.setattr(Path, "home", lambda: tmp_path) db_path = kb.kanban_db_path(board="legacy") db_path.parent.mkdir(parents=True, exist_ok=True) kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) return db_path def _table_struct(conn: sqlite3.Connection, table: str): cols = [ (r["name"], (r["type"] or "").upper(), r["notnull"], r["pk"]) for r in conn.execute(f"PRAGMA table_info({table})") ] idx = sorted( r["name"] for r in conn.execute(f"PRAGMA index_list({table})") if not r["name"].startswith("sqlite_") ) return cols, idx def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch): home = tmp_path / ".hermes" home.mkdir() monkeypatch.setenv("HERMES_HOME", str(home)) monkeypatch.setattr(Path, "home", lambda: tmp_path) db_path = kb.kanban_db_path(board="default") kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) errors: list[BaseException] = [] barrier = threading.Barrier(8) def worker() -> None: try: barrier.wait(timeout=5) conn = kb.connect(board="default") conn.close() except BaseException as exc: # pragma: no cover - surfaced below errors.append(exc) threads = [threading.Thread(target=worker) for _ in range(8)] for thread in threads: thread.start() for thread in threads: thread.join(timeout=10) assert errors == [] with kb.connect(board="default") as conn: cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")} assert "max_retries" in cols def test_legacy_text_pk_tables_rebuilt_to_integer_autoincrement(tmp_path, monkeypatch): """A pre-AUTOINCREMENT DB is migrated in place: id columns become INTEGER PKs, ``last_event_id`` becomes INTEGER, data is preserved, and indexes are recreated (DROP TABLE would otherwise take them down).""" db_path = _setup_home(tmp_path, monkeypatch) _make_legacy_db(db_path) with kb.connect(db_path) as conn: for table in ("task_events", "task_comments", "task_runs"): id_col = {r["name"]: r for r in conn.execute(f"PRAGMA table_info({table})")}["id"] assert id_col["type"].upper() == "INTEGER" and id_col["pk"] == 1 lei = {r["name"]: r for r in conn.execute("PRAGMA table_info(kanban_notify_subs)")} assert lei["last_event_id"]["type"].upper() == "INTEGER" # Data preserved across the rebuild. assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2 assert conn.execute("SELECT body FROM task_comments").fetchone()["body"] == "hi" assert len(conn.execute("SELECT * FROM task_runs").fetchall()) == 1 # Non-numeric legacy cursor ("e-1") casts to 0. assert conn.execute("SELECT last_event_id FROM kanban_notify_subs").fetchone()["last_event_id"] == 0 # Indexes restored, including idx_events_run (added by the additive pass). indexes = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='index'")} for name in ("idx_events_task", "idx_events_run", "idx_comments_task", "idx_runs_task", "idx_runs_status", "idx_notify_task"): assert name in indexes # AUTOINCREMENT actually works after the rebuild. conn.execute("INSERT INTO task_events (task_id, kind, created_at) VALUES ('task-1', 'completed', 3000)") new_id = conn.execute("SELECT id FROM task_events ORDER BY id DESC LIMIT 1").fetchone()["id"] assert isinstance(new_id, int) and new_id >= 1 def test_rebuilt_schema_matches_fresh_db(tmp_path, monkeypatch): """The rebuilt tables must be structurally identical to a fresh DB, so the hand-written DDL in ``_REBUILD_SPECS`` can't silently drift from SCHEMA_SQL.""" legacy_path = _setup_home(tmp_path, monkeypatch) _make_legacy_db(legacy_path) fresh_path = kb.kanban_db_path(board="fresh") fresh_path.parent.mkdir(parents=True, exist_ok=True) kb._INITIALIZED_PATHS.discard(str(fresh_path.resolve())) with kb.connect(legacy_path) as migrated, kb.connect(fresh_path) as fresh: for table in ("task_events", "task_comments", "task_runs", "kanban_notify_subs"): assert _table_struct(migrated, table) == _table_struct(fresh, table) def test_migration_is_idempotent(tmp_path, monkeypatch): """Re-opening an already-migrated DB is a no-op and leaves data intact.""" db_path = _setup_home(tmp_path, monkeypatch) _make_legacy_db(db_path) with kb.connect(db_path): pass kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) with kb.connect(db_path) as conn: id_col = {r["name"]: r for r in conn.execute("PRAGMA table_info(task_events)")}["id"] assert id_col["type"].upper() == "INTEGER" assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2 def test_unseen_events_for_sub_survives_migrated_db(tmp_path, monkeypatch): """The crash that motivated #35096 — ``int(None)`` on a NULL cursor — is gone after migration; the notifier query returns an integer cursor.""" db_path = _setup_home(tmp_path, monkeypatch) _make_legacy_db(db_path) with kb.connect(db_path) as conn: cursor, events = kb.unseen_events_for_sub( conn, task_id="task-1", platform="telegram", chat_id="123" ) assert isinstance(cursor, int) assert isinstance(events, list)