fix(desktop): guard reconnect sockets and keep branch search precise

Avoid stale WebSocket events from an old reconnect attempt flipping the gateway state after a newer socket opens. Also limit session-search dedupe to compression edges so branch-specific hits still open the branch instead of collapsing to the parent.
This commit is contained in:
Brooklyn Nicholson
2026-06-03 13:13:21 -05:00
parent 93228d5299
commit 1b89715e15
3 changed files with 83 additions and 21 deletions

View File

@ -103,10 +103,19 @@ export class JsonRpcGatewayClient {
this.socket = socket
socket.addEventListener('message', message => {
if (this.socket !== socket) {
return
}
this.handleMessage(message.data)
})
socket.addEventListener('close', () => {
if (this.socket !== socket) {
return
}
this.socket = null
this.setState('closed')
this.rejectAllPending(new Error(this.options.closedErrorMessage))
})
@ -125,7 +134,7 @@ export class JsonRpcGatewayClient {
}
const onOpen = () => {
if (settled) {
if (settled || this.socket !== socket) {
return
}
@ -136,7 +145,7 @@ export class JsonRpcGatewayClient {
}
const onError = () => {
if (settled) {
if (settled || this.socket !== socket) {
return
}
@ -159,13 +168,15 @@ export class JsonRpcGatewayClient {
cleanup()
// Drop the half-open socket so the next connect() starts clean
// instead of short-circuiting on a zombie 'connecting' state.
try {
this.socket?.close()
} catch {
// ignore
}
if (this.socket === socket) {
try {
socket.close()
} catch {
// ignore
}
this.socket = null
this.socket = null
}
this.setState('error')
reject(new Error(this.options.connectErrorMessage))
}, this.options.connectTimeoutMs)

View File

@ -1531,15 +1531,12 @@ async def get_sessions(
async def search_sessions(q: str = "", limit: int = 20):
"""Full-text search across session message content using FTS5.
Results are deduped by *conversation lineage*, not by raw ``session_id``.
Results are deduped by compression lineage, not by raw ``session_id``.
Auto-compression rotates a conversation onto a fresh session id (and leaves
the old segment's messages in the FTS index), and branches copy the
transcript into a new row — so one logical chat owns many ``sessions`` rows
that all match the same query. Without lineage dedup the sidebar shows the
same conversation several times, which is the "multiple copies / branches"
navigation complaint. We collapse every match to its lineage root and
surface the live compression tip so clicking the result resumes the current
session.
the old segment's messages in the FTS index), so one logical chat can own
many ``sessions`` rows that all match the same query. Branches also use
``parent_session_id``, but they are real alternate conversations; don't
collapse branch-specific hits back into the parent.
"""
if not q or not q.strip():
return {"results": []}
@ -1563,11 +1560,13 @@ async def search_sessions(q: str = "", limit: int = 20):
fetch_limit = max(limit * 5, 50)
matches = db.search_messages(query=prefix_query, limit=fetch_limit)
# Walk parent_session_id to the lineage root, memoized so a chain of
# compression segments only costs one walk.
# Walk parent_session_id to the compression root, memoized so a
# chain of compression segments only costs one walk. We deliberately
# stop at branch/delegate edges: those sessions may diverge from the
# parent and should remain searchable on their own.
root_cache: dict = {}
def lineage_root(session_id: str) -> str:
def compression_root(session_id: str) -> str:
if not session_id:
return session_id
if session_id in root_cache:
@ -1593,6 +1592,24 @@ async def search_sessions(q: str = "", limit: int = 20):
if not parent:
root = cur
break
try:
parent_session = db.get_session(parent)
except Exception:
parent_session = None
if not parent_session:
root = cur
break
parent_ended_at = parent_session.get("ended_at")
started_at = s.get("started_at")
is_compression_edge = (
parent_session.get("end_reason") == "compression"
and parent_ended_at is not None
and started_at is not None
and started_at >= parent_ended_at
)
if not is_compression_edge:
root = cur
break
cur = parent
for node in chain:
root_cache[node] = root
@ -1613,11 +1630,11 @@ async def search_sessions(q: str = "", limit: int = 20):
tip_cache[root_id] = tip
return tip
# Keep the best (first / most relevant) hit per lineage root.
# Keep the best (first / most relevant) hit per compression root.
seen: dict = {}
for m in matches:
raw_sid = m["session_id"]
root = lineage_root(raw_sid)
root = compression_root(raw_sid)
if root in seen:
continue
seen[root] = {

View File

@ -466,6 +466,40 @@ class TestWebServerEndpoints:
assert hit["session_id"] == "search-tip"
assert hit["lineage_root"] == "search-root"
def test_search_keeps_branch_specific_hits_on_branch(self):
"""Branch sessions share parent_session_id, but they are not compression
continuations. A query that only exists in the branch must open the
branch instead of being collapsed back to the parent/root."""
import time as _time
from hermes_state import SessionDB
db = SessionDB()
try:
now = _time.time()
db.create_session(session_id="branch-parent", source="cli")
db.append_message(session_id="branch-parent", role="user", content="ancestor context")
db.end_session("branch-parent", "branched")
db._conn.execute(
"UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = ?",
(now - 100, now - 90, "branch-parent"),
)
db.create_session(session_id="branch-child", source="cli", parent_session_id="branch-parent")
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = ?", (now - 80, "branch-child"))
db.append_message(session_id="branch-child", role="user", content="branchspecificneedle only here")
db._conn.commit()
finally:
db.close()
resp = self.client.get("/api/sessions/search?q=branchspecificneedle")
assert resp.status_code == 200
results = resp.json()["results"]
assert any(
r["session_id"] == "branch-child" and r.get("lineage_root") == "branch-child"
for r in results
)
def test_get_sessions_archived_is_boolean(self):
from hermes_state import SessionDB