From 1b89715e153f3daac8f697c95f386a1a4bf0947d Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Wed, 3 Jun 2026 13:13:21 -0500 Subject: [PATCH] fix(desktop): guard reconnect sockets and keep branch search precise Avoid stale WebSocket events from an old reconnect attempt flipping the gateway state after a newer socket opens. Also limit session-search dedupe to compression edges so branch-specific hits still open the branch instead of collapsing to the parent. --- apps/shared/src/json-rpc-gateway.ts | 27 ++++++++++++------ hermes_cli/web_server.py | 43 ++++++++++++++++++++--------- tests/hermes_cli/test_web_server.py | 34 +++++++++++++++++++++++ 3 files changed, 83 insertions(+), 21 deletions(-) diff --git a/apps/shared/src/json-rpc-gateway.ts b/apps/shared/src/json-rpc-gateway.ts index d7d30c200..af48290d7 100644 --- a/apps/shared/src/json-rpc-gateway.ts +++ b/apps/shared/src/json-rpc-gateway.ts @@ -103,10 +103,19 @@ export class JsonRpcGatewayClient { this.socket = socket socket.addEventListener('message', message => { + if (this.socket !== socket) { + return + } + this.handleMessage(message.data) }) socket.addEventListener('close', () => { + if (this.socket !== socket) { + return + } + + this.socket = null this.setState('closed') this.rejectAllPending(new Error(this.options.closedErrorMessage)) }) @@ -125,7 +134,7 @@ export class JsonRpcGatewayClient { } const onOpen = () => { - if (settled) { + if (settled || this.socket !== socket) { return } @@ -136,7 +145,7 @@ export class JsonRpcGatewayClient { } const onError = () => { - if (settled) { + if (settled || this.socket !== socket) { return } @@ -159,13 +168,15 @@ export class JsonRpcGatewayClient { cleanup() // Drop the half-open socket so the next connect() starts clean // instead of short-circuiting on a zombie 'connecting' state. - try { - this.socket?.close() - } catch { - // ignore - } + if (this.socket === socket) { + try { + socket.close() + } catch { + // ignore + } - this.socket = null + this.socket = null + } this.setState('error') reject(new Error(this.options.connectErrorMessage)) }, this.options.connectTimeoutMs) diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index e94385e94..01eab9196 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -1531,15 +1531,12 @@ async def get_sessions( async def search_sessions(q: str = "", limit: int = 20): """Full-text search across session message content using FTS5. - Results are deduped by *conversation lineage*, not by raw ``session_id``. + Results are deduped by compression lineage, not by raw ``session_id``. Auto-compression rotates a conversation onto a fresh session id (and leaves - the old segment's messages in the FTS index), and branches copy the - transcript into a new row — so one logical chat owns many ``sessions`` rows - that all match the same query. Without lineage dedup the sidebar shows the - same conversation several times, which is the "multiple copies / branches" - navigation complaint. We collapse every match to its lineage root and - surface the live compression tip so clicking the result resumes the current - session. + the old segment's messages in the FTS index), so one logical chat can own + many ``sessions`` rows that all match the same query. Branches also use + ``parent_session_id``, but they are real alternate conversations; don't + collapse branch-specific hits back into the parent. """ if not q or not q.strip(): return {"results": []} @@ -1563,11 +1560,13 @@ async def search_sessions(q: str = "", limit: int = 20): fetch_limit = max(limit * 5, 50) matches = db.search_messages(query=prefix_query, limit=fetch_limit) - # Walk parent_session_id to the lineage root, memoized so a chain of - # compression segments only costs one walk. + # Walk parent_session_id to the compression root, memoized so a + # chain of compression segments only costs one walk. We deliberately + # stop at branch/delegate edges: those sessions may diverge from the + # parent and should remain searchable on their own. root_cache: dict = {} - def lineage_root(session_id: str) -> str: + def compression_root(session_id: str) -> str: if not session_id: return session_id if session_id in root_cache: @@ -1593,6 +1592,24 @@ async def search_sessions(q: str = "", limit: int = 20): if not parent: root = cur break + try: + parent_session = db.get_session(parent) + except Exception: + parent_session = None + if not parent_session: + root = cur + break + parent_ended_at = parent_session.get("ended_at") + started_at = s.get("started_at") + is_compression_edge = ( + parent_session.get("end_reason") == "compression" + and parent_ended_at is not None + and started_at is not None + and started_at >= parent_ended_at + ) + if not is_compression_edge: + root = cur + break cur = parent for node in chain: root_cache[node] = root @@ -1613,11 +1630,11 @@ async def search_sessions(q: str = "", limit: int = 20): tip_cache[root_id] = tip return tip - # Keep the best (first / most relevant) hit per lineage root. + # Keep the best (first / most relevant) hit per compression root. seen: dict = {} for m in matches: raw_sid = m["session_id"] - root = lineage_root(raw_sid) + root = compression_root(raw_sid) if root in seen: continue seen[root] = { diff --git a/tests/hermes_cli/test_web_server.py b/tests/hermes_cli/test_web_server.py index 9ff0e1444..7ff246035 100644 --- a/tests/hermes_cli/test_web_server.py +++ b/tests/hermes_cli/test_web_server.py @@ -466,6 +466,40 @@ class TestWebServerEndpoints: assert hit["session_id"] == "search-tip" assert hit["lineage_root"] == "search-root" + def test_search_keeps_branch_specific_hits_on_branch(self): + """Branch sessions share parent_session_id, but they are not compression + continuations. A query that only exists in the branch must open the + branch instead of being collapsed back to the parent/root.""" + import time as _time + + from hermes_state import SessionDB + + db = SessionDB() + try: + now = _time.time() + db.create_session(session_id="branch-parent", source="cli") + db.append_message(session_id="branch-parent", role="user", content="ancestor context") + db.end_session("branch-parent", "branched") + db._conn.execute( + "UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = ?", + (now - 100, now - 90, "branch-parent"), + ) + db.create_session(session_id="branch-child", source="cli", parent_session_id="branch-parent") + db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = ?", (now - 80, "branch-child")) + db.append_message(session_id="branch-child", role="user", content="branchspecificneedle only here") + db._conn.commit() + finally: + db.close() + + resp = self.client.get("/api/sessions/search?q=branchspecificneedle") + assert resp.status_code == 200 + results = resp.json()["results"] + + assert any( + r["session_id"] == "branch-child" and r.get("lineage_root") == "branch-child" + for r in results + ) + def test_get_sessions_archived_is_boolean(self): from hermes_state import SessionDB