Legacy kanban boards (pre-AUTOINCREMENT schema) crashed the gateway notifier on every tick — int(None) on a NULL id in unseen_events_for_sub — silently losing all kanban notifications. CREATE TABLE IF NOT EXISTS skips existing tables regardless of schema and _add_column_if_missing only adds columns, so neither could fix a drifted primary-key type. _rebuild_drifted_tables() detects the legacy shape via PRAGMA table_info and rebuilds task_events/task_comments/task_runs (TEXT PK -> INTEGER AUTOINCREMENT) and kanban_notify_subs.last_event_id (TEXT/NULL -> INTEGER NOT NULL DEFAULT 0), preserving data. The whole pass is one transaction so an interruption can't leave a table half-renamed, and recreates every index DROP TABLE would otherwise take down (including idx_events_run). Co-authored-by: liuhao1024 <liuhao1024@users.noreply.github.com>
178 lines
7.6 KiB
Python
178 lines
7.6 KiB
Python
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from hermes_cli import kanban_db as kb
|
|
|
|
|
|
def _make_legacy_db(path: Path) -> None:
|
|
"""Write a kanban DB with the pre-AUTOINCREMENT (TEXT PK) schema for the
|
|
four tables #35096 affects, keeping every other table current so the
|
|
additive-column migration runs cleanly on top.
|
|
"""
|
|
conn = sqlite3.connect(str(path))
|
|
conn.executescript(kb.SCHEMA_SQL)
|
|
conn.executescript(
|
|
"""
|
|
DROP TABLE task_events;
|
|
DROP TABLE task_comments;
|
|
DROP TABLE task_runs;
|
|
DROP TABLE kanban_notify_subs;
|
|
CREATE TABLE task_comments (id TEXT PRIMARY KEY, task_id TEXT NOT NULL,
|
|
author TEXT NOT NULL, body TEXT NOT NULL, created_at INTEGER NOT NULL);
|
|
CREATE TABLE task_events (id TEXT PRIMARY KEY, task_id TEXT NOT NULL,
|
|
kind TEXT NOT NULL, payload TEXT, created_at INTEGER NOT NULL);
|
|
CREATE TABLE task_runs (id TEXT PRIMARY KEY, task_id TEXT NOT NULL,
|
|
profile TEXT, status TEXT NOT NULL, started_at INTEGER NOT NULL);
|
|
CREATE TABLE kanban_notify_subs (task_id TEXT NOT NULL, platform TEXT NOT NULL,
|
|
chat_id TEXT NOT NULL, thread_id TEXT NOT NULL DEFAULT '', user_id TEXT,
|
|
created_at INTEGER NOT NULL, last_event_id TEXT,
|
|
PRIMARY KEY (task_id, platform, chat_id, thread_id));
|
|
"""
|
|
)
|
|
conn.execute("INSERT INTO tasks (id, title, status, created_at) VALUES ('task-1', 'T', 'done', 1000)")
|
|
conn.execute("INSERT INTO task_comments VALUES ('c-1', 'task-1', 'agent', 'hi', 1500)")
|
|
conn.execute("INSERT INTO task_events VALUES ('e-1', 'task-1', 'completed', NULL, 2000)")
|
|
conn.execute("INSERT INTO task_events VALUES ('e-2', 'task-1', 'blocked', NULL, 2100)")
|
|
conn.execute("INSERT INTO task_runs VALUES ('r-1', 'task-1', 'default', 'done', 1000)")
|
|
conn.execute(
|
|
"INSERT INTO kanban_notify_subs (task_id, platform, chat_id, created_at, last_event_id) "
|
|
"VALUES ('task-1', 'telegram', '123', 1000, 'e-1')"
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _setup_home(tmp_path, monkeypatch) -> Path:
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
db_path = kb.kanban_db_path(board="legacy")
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
|
return db_path
|
|
|
|
|
|
def _table_struct(conn: sqlite3.Connection, table: str):
|
|
cols = [
|
|
(r["name"], (r["type"] or "").upper(), r["notnull"], r["pk"])
|
|
for r in conn.execute(f"PRAGMA table_info({table})")
|
|
]
|
|
idx = sorted(
|
|
r["name"]
|
|
for r in conn.execute(f"PRAGMA index_list({table})")
|
|
if not r["name"].startswith("sqlite_")
|
|
)
|
|
return cols, idx
|
|
|
|
|
|
def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch):
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
|
|
db_path = kb.kanban_db_path(board="default")
|
|
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
|
|
|
errors: list[BaseException] = []
|
|
barrier = threading.Barrier(8)
|
|
|
|
def worker() -> None:
|
|
try:
|
|
barrier.wait(timeout=5)
|
|
conn = kb.connect(board="default")
|
|
conn.close()
|
|
except BaseException as exc: # pragma: no cover - surfaced below
|
|
errors.append(exc)
|
|
|
|
threads = [threading.Thread(target=worker) for _ in range(8)]
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(timeout=10)
|
|
|
|
assert errors == []
|
|
with kb.connect(board="default") as conn:
|
|
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
|
|
assert "max_retries" in cols
|
|
|
|
|
|
def test_legacy_text_pk_tables_rebuilt_to_integer_autoincrement(tmp_path, monkeypatch):
|
|
"""A pre-AUTOINCREMENT DB is migrated in place: id columns become INTEGER
|
|
PKs, ``last_event_id`` becomes INTEGER, data is preserved, and indexes
|
|
are recreated (DROP TABLE would otherwise take them down)."""
|
|
db_path = _setup_home(tmp_path, monkeypatch)
|
|
_make_legacy_db(db_path)
|
|
|
|
with kb.connect(db_path) as conn:
|
|
for table in ("task_events", "task_comments", "task_runs"):
|
|
id_col = {r["name"]: r for r in conn.execute(f"PRAGMA table_info({table})")}["id"]
|
|
assert id_col["type"].upper() == "INTEGER" and id_col["pk"] == 1
|
|
|
|
lei = {r["name"]: r for r in conn.execute("PRAGMA table_info(kanban_notify_subs)")}
|
|
assert lei["last_event_id"]["type"].upper() == "INTEGER"
|
|
|
|
# Data preserved across the rebuild.
|
|
assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2
|
|
assert conn.execute("SELECT body FROM task_comments").fetchone()["body"] == "hi"
|
|
assert len(conn.execute("SELECT * FROM task_runs").fetchall()) == 1
|
|
# Non-numeric legacy cursor ("e-1") casts to 0.
|
|
assert conn.execute("SELECT last_event_id FROM kanban_notify_subs").fetchone()["last_event_id"] == 0
|
|
|
|
# Indexes restored, including idx_events_run (added by the additive pass).
|
|
indexes = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='index'")}
|
|
for name in ("idx_events_task", "idx_events_run", "idx_comments_task",
|
|
"idx_runs_task", "idx_runs_status", "idx_notify_task"):
|
|
assert name in indexes
|
|
|
|
# AUTOINCREMENT actually works after the rebuild.
|
|
conn.execute("INSERT INTO task_events (task_id, kind, created_at) VALUES ('task-1', 'completed', 3000)")
|
|
new_id = conn.execute("SELECT id FROM task_events ORDER BY id DESC LIMIT 1").fetchone()["id"]
|
|
assert isinstance(new_id, int) and new_id >= 1
|
|
|
|
|
|
def test_rebuilt_schema_matches_fresh_db(tmp_path, monkeypatch):
|
|
"""The rebuilt tables must be structurally identical to a fresh DB, so the
|
|
hand-written DDL in ``_REBUILD_SPECS`` can't silently drift from SCHEMA_SQL."""
|
|
legacy_path = _setup_home(tmp_path, monkeypatch)
|
|
_make_legacy_db(legacy_path)
|
|
fresh_path = kb.kanban_db_path(board="fresh")
|
|
fresh_path.parent.mkdir(parents=True, exist_ok=True)
|
|
kb._INITIALIZED_PATHS.discard(str(fresh_path.resolve()))
|
|
|
|
with kb.connect(legacy_path) as migrated, kb.connect(fresh_path) as fresh:
|
|
for table in ("task_events", "task_comments", "task_runs", "kanban_notify_subs"):
|
|
assert _table_struct(migrated, table) == _table_struct(fresh, table)
|
|
|
|
|
|
def test_migration_is_idempotent(tmp_path, monkeypatch):
|
|
"""Re-opening an already-migrated DB is a no-op and leaves data intact."""
|
|
db_path = _setup_home(tmp_path, monkeypatch)
|
|
_make_legacy_db(db_path)
|
|
|
|
with kb.connect(db_path):
|
|
pass
|
|
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
|
|
with kb.connect(db_path) as conn:
|
|
id_col = {r["name"]: r for r in conn.execute("PRAGMA table_info(task_events)")}["id"]
|
|
assert id_col["type"].upper() == "INTEGER"
|
|
assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2
|
|
|
|
|
|
def test_unseen_events_for_sub_survives_migrated_db(tmp_path, monkeypatch):
|
|
"""The crash that motivated #35096 — ``int(None)`` on a NULL cursor — is
|
|
gone after migration; the notifier query returns an integer cursor."""
|
|
db_path = _setup_home(tmp_path, monkeypatch)
|
|
_make_legacy_db(db_path)
|
|
|
|
with kb.connect(db_path) as conn:
|
|
cursor, events = kb.unseen_events_for_sub(
|
|
conn, task_id="task-1", platform="telegram", chat_id="123"
|
|
)
|
|
assert isinstance(cursor, int)
|
|
assert isinstance(events, list)
|