Merge pull request #38393 from NousResearch/bb/desktop-session-fixes
fix(desktop): persist pins, reconnect after sleep, dedupe session search
This commit is contained in:
@ -9,6 +9,7 @@ const {
|
||||
nativeImage,
|
||||
nativeTheme,
|
||||
net: electronNet,
|
||||
powerMonitor,
|
||||
protocol,
|
||||
safeStorage,
|
||||
session,
|
||||
@ -2693,6 +2694,32 @@ function sendClosePreviewRequested() {
|
||||
webContents.send('hermes:close-preview-requested')
|
||||
}
|
||||
|
||||
// Tell the renderer the machine just woke. Sleep silently drops the
|
||||
// renderer's WebSocket to the local backend; the renderer reconnects on this
|
||||
// signal so the chat composer doesn't stay stuck on "Starting Hermes...".
|
||||
function sendPowerResume() {
|
||||
if (!mainWindow || mainWindow.isDestroyed()) return
|
||||
const { webContents } = mainWindow
|
||||
if (!webContents || webContents.isDestroyed()) return
|
||||
webContents.send('hermes:power-resume')
|
||||
}
|
||||
|
||||
let powerResumeRegistered = false
|
||||
|
||||
function registerPowerResumeListeners() {
|
||||
if (powerResumeRegistered) return
|
||||
powerResumeRegistered = true
|
||||
try {
|
||||
// 'resume' covers sleep/wake; 'unlock-screen' covers lock/unlock without a
|
||||
// full suspend. Either can drop an idle socket.
|
||||
powerMonitor.on('resume', sendPowerResume)
|
||||
powerMonitor.on('unlock-screen', sendPowerResume)
|
||||
} catch {
|
||||
// powerMonitor is unavailable before app 'ready' on some platforms; the
|
||||
// caller registers after 'ready', so this should not normally throw.
|
||||
}
|
||||
}
|
||||
|
||||
function getAppIconPath() {
|
||||
return APP_ICON_PATHS.find(fileExists)
|
||||
}
|
||||
@ -4205,6 +4232,7 @@ app.whenReady().then(() => {
|
||||
registerMediaProtocol()
|
||||
ensureWslWindowsFonts()
|
||||
configureSpellChecker()
|
||||
registerPowerResumeListeners()
|
||||
createWindow()
|
||||
|
||||
app.on('activate', () => {
|
||||
|
||||
@ -83,6 +83,11 @@ contextBridge.exposeInMainWorld('hermesDesktop', {
|
||||
ipcRenderer.on('hermes:backend-exit', listener)
|
||||
return () => ipcRenderer.removeListener('hermes:backend-exit', listener)
|
||||
},
|
||||
onPowerResume: callback => {
|
||||
const listener = () => callback()
|
||||
ipcRenderer.on('hermes:power-resume', listener)
|
||||
return () => ipcRenderer.removeListener('hermes:power-resume', listener)
|
||||
},
|
||||
onBootProgress: callback => {
|
||||
const listener = (_event, payload) => callback(payload)
|
||||
ipcRenderer.on('hermes:boot-progress', listener)
|
||||
|
||||
@ -34,7 +34,7 @@ import {
|
||||
shouldAutoDrainOnSettle,
|
||||
updateQueuedPrompt
|
||||
} from '@/store/composer-queue'
|
||||
import { $messages } from '@/store/session'
|
||||
import { $gatewayState, $messages } from '@/store/session'
|
||||
import { $threadScrolledUp } from '@/store/thread-scroll'
|
||||
|
||||
import { extractDroppedFiles, HERMES_PATHS_MIME } from '../hooks/use-composer-actions'
|
||||
@ -156,7 +156,15 @@ export function ChatBar({
|
||||
const busyAction = busy && hasComposerPayload ? 'queue' : 'stop'
|
||||
const showHelpHint = draft === '?'
|
||||
|
||||
const placeholder = disabled ? 'Starting Hermes...' : 'Send follow-up'
|
||||
const gatewayState = useStore($gatewayState)
|
||||
// When the bar is disabled it's because the gateway isn't open. Distinguish a
|
||||
// cold start ("Starting Hermes...") from a dropped connection we're trying to
|
||||
// restore (e.g. after the Mac slept) so the stuck state reads as recoverable.
|
||||
const placeholder = disabled
|
||||
? gatewayState === 'closed' || gatewayState === 'error'
|
||||
? 'Reconnecting to Hermes…'
|
||||
: 'Starting Hermes...'
|
||||
: 'Send follow-up'
|
||||
|
||||
const focusInput = useCallback(() => {
|
||||
focusComposerInput(editorRef.current)
|
||||
|
||||
@ -36,7 +36,8 @@ import {
|
||||
$introSeed,
|
||||
$messages,
|
||||
$selectedStoredSessionId,
|
||||
$sessions
|
||||
$sessions,
|
||||
sessionPinId
|
||||
} from '@/store/session'
|
||||
import type { ModelOptionsResponse } from '@/types/hermes'
|
||||
|
||||
@ -96,9 +97,17 @@ function ChatHeader({
|
||||
}: ChatHeaderProps) {
|
||||
const sessions = useStore($sessions)
|
||||
const pinnedSessionIds = useStore($pinnedSessionIds)
|
||||
const activeStoredSession = sessions.find(session => session.id === selectedSessionId) || null
|
||||
const activeStoredSession =
|
||||
sessions.find(session => session.id === selectedSessionId || session._lineage_root_id === selectedSessionId) || null
|
||||
const title = activeStoredSession ? sessionTitle(activeStoredSession) : 'New session'
|
||||
const selectedIsPinned = selectedSessionId ? pinnedSessionIds.includes(selectedSessionId) : false
|
||||
// Pins live on the durable lineage-root id, but selectedSessionId is the live
|
||||
// (tip) id — resolve through the loaded row so the menu reflects the pin
|
||||
// state after auto-compression rotates the id.
|
||||
const selectedIsPinned = activeStoredSession
|
||||
? pinnedSessionIds.includes(sessionPinId(activeStoredSession))
|
||||
: selectedSessionId
|
||||
? pinnedSessionIds.includes(selectedSessionId)
|
||||
: false
|
||||
|
||||
return (
|
||||
<header className={cn(titlebarHeaderBaseClass, isRoutedSessionView && titlebarHeaderShadowClass)}>
|
||||
|
||||
@ -143,6 +143,7 @@ function searchResultToSession(result: SessionSearchResult): SessionInfo {
|
||||
cwd: null,
|
||||
ended_at: null,
|
||||
id: result.session_id,
|
||||
_lineage_root_id: result.lineage_root ?? null,
|
||||
input_tokens: 0,
|
||||
is_active: false,
|
||||
last_active: ts,
|
||||
|
||||
@ -30,7 +30,7 @@ import { exportSession } from '@/lib/session-export'
|
||||
import { cn } from '@/lib/utils'
|
||||
import { upsertDesktopActionTask } from '@/store/activity'
|
||||
import { $pinnedSessionIds, pinSession, unpinSession } from '@/store/layout'
|
||||
import { $sessions } from '@/store/session'
|
||||
import { $sessions, sessionPinId } from '@/store/session'
|
||||
|
||||
import { useRouteEnumParam } from '../hooks/use-route-enum-param'
|
||||
import { OverlayActionButton, OverlayCard, overlayCardClass, OverlayIconButton } from '../overlays/overlay-chrome'
|
||||
@ -102,6 +102,8 @@ const SECTION_SEARCH_ENTRIES: readonly SectionSearchEntry[] = [
|
||||
interface SessionSearchHit {
|
||||
detail?: string
|
||||
kind: 'session'
|
||||
/** Durable lineage-root id used for pinning so the pin survives compression. */
|
||||
pinId: string
|
||||
sessionId: string
|
||||
snippet: string
|
||||
title: string
|
||||
@ -260,6 +262,7 @@ export function CommandCenterView({
|
||||
return {
|
||||
detail,
|
||||
kind: 'session',
|
||||
pinId: result.lineage_root || result.session_id,
|
||||
sessionId: result.session_id,
|
||||
snippet: result.snippet || '',
|
||||
title
|
||||
@ -491,7 +494,7 @@ export function CommandCenterView({
|
||||
</h3>
|
||||
{group.results.map(result => {
|
||||
if (result.kind === 'session') {
|
||||
const pinned = pinnedSessionIds.includes(result.sessionId)
|
||||
const pinned = pinnedSessionIds.includes(result.pinId)
|
||||
|
||||
return (
|
||||
<OverlayCard className="p-2.5" key={`${group.id}:${result.sessionId}:${result.snippet}`}>
|
||||
@ -515,7 +518,7 @@ export function CommandCenterView({
|
||||
onClick={event => {
|
||||
event.preventDefault()
|
||||
event.stopPropagation()
|
||||
pinned ? unpinSession(result.sessionId) : pinSession(result.sessionId)
|
||||
pinned ? unpinSession(result.pinId) : pinSession(result.pinId)
|
||||
}}
|
||||
title={pinned ? 'Unpin session' : 'Pin session'}
|
||||
>
|
||||
@ -580,7 +583,8 @@ export function CommandCenterView({
|
||||
) : (
|
||||
<div className="grid gap-1.5">
|
||||
{filteredSessions.map(session => {
|
||||
const pinned = pinnedSessionIds.includes(session.id)
|
||||
const pinId = sessionPinId(session)
|
||||
const pinned = pinnedSessionIds.includes(pinId)
|
||||
|
||||
return (
|
||||
<OverlayCard className="flex items-center gap-2 px-2.5 py-2" key={session.id}>
|
||||
@ -595,7 +599,7 @@ export function CommandCenterView({
|
||||
</div>
|
||||
</button>
|
||||
<OverlayIconButton
|
||||
onClick={() => (pinned ? unpinSession(session.id) : pinSession(session.id))}
|
||||
onClick={() => (pinned ? unpinSession(pinId) : pinSession(pinId))}
|
||||
title={pinned ? 'Unpin session' : 'Pin session'}
|
||||
>
|
||||
{pinned ? <IconBookmarkFilled className="size-3.5" /> : <IconBookmark className="size-3.5" />}
|
||||
|
||||
@ -34,7 +34,7 @@ import {
|
||||
$selectedStoredSessionId,
|
||||
$sessions,
|
||||
$workingSessionIds,
|
||||
mergeWorkingSessions,
|
||||
mergeSessionPage,
|
||||
sessionPinId,
|
||||
setAwaitingResponse,
|
||||
setBusy,
|
||||
@ -208,12 +208,13 @@ export function DesktopController() {
|
||||
const result = await listSessions(limit, 1)
|
||||
|
||||
if (refreshSessionsRequestRef.current === requestId) {
|
||||
// Don't hard-replace: a session whose first turn is still in flight has
|
||||
// message_count 0 in the DB, so min_messages=1 omits it. Since every
|
||||
// message.complete refreshes the list, a plain replace would drop the
|
||||
// other still-running new chats the moment one of them finishes. Keep
|
||||
// any working session the server hasn't surfaced yet.
|
||||
setSessions(prev => mergeWorkingSessions(prev, result.sessions, $workingSessionIds.get()))
|
||||
// Don't hard-replace. Two kinds of rows must survive a refresh the
|
||||
// server didn't return: (1) sessions whose first turn is still in
|
||||
// flight (message_count 0, so min_messages=1 omits them) and (2)
|
||||
// pinned sessions that have aged off the most-recent page — otherwise
|
||||
// the pin "disappears until you refresh". mergeSessionPage keeps both.
|
||||
const keepIds = new Set<string>([...$workingSessionIds.get(), ...$pinnedSessionIds.get()])
|
||||
setSessions(prev => mergeSessionPage(prev, result.sessions, keepIds))
|
||||
setSessionsTotal(typeof result.total === 'number' ? result.total : result.sessions.length)
|
||||
}
|
||||
} finally {
|
||||
|
||||
@ -63,6 +63,94 @@ export function useGatewayBoot({
|
||||
return () => void (cancelled = true)
|
||||
}
|
||||
|
||||
// --- Reconnect-after-sleep machinery -------------------------------------
|
||||
// macOS sleep silently drops the renderer's WebSocket. The backend Python
|
||||
// process keeps running, but nothing re-opened the socket on wake, so the
|
||||
// composer stayed disabled forever on "Starting Hermes...". Once the
|
||||
// initial boot succeeds we treat any non-open state as recoverable and
|
||||
// reconnect with backoff, and we nudge a reconnect on the OS/browser
|
||||
// signals that fire around wake (power resume, network online, the window
|
||||
// becoming visible).
|
||||
let bootCompleted = false
|
||||
let reconnecting = false
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null
|
||||
let reconnectAttempt = 0
|
||||
|
||||
// Wrap the live getter in a call so TS control-flow analysis doesn't narrow
|
||||
// `connectionState` to a constant across the early-return guards (the state
|
||||
// genuinely changes between reads).
|
||||
const gatewayOpen = () => gateway.connectionState === 'open'
|
||||
|
||||
const clearReconnectTimer = () => {
|
||||
if (reconnectTimer !== null) {
|
||||
clearTimeout(reconnectTimer)
|
||||
reconnectTimer = null
|
||||
}
|
||||
}
|
||||
|
||||
const attemptReconnect = async () => {
|
||||
if (cancelled || reconnecting || gatewayOpen()) {
|
||||
return
|
||||
}
|
||||
|
||||
reconnecting = true
|
||||
|
||||
try {
|
||||
const conn = await desktop.getConnection()
|
||||
|
||||
if (cancelled) {
|
||||
return
|
||||
}
|
||||
|
||||
publish(conn)
|
||||
await gateway.connect(conn.wsUrl)
|
||||
|
||||
if (cancelled) {
|
||||
return
|
||||
}
|
||||
|
||||
reconnectAttempt = 0
|
||||
// Resync state that may have moved on the backend while we were asleep.
|
||||
await callbacksRef.current.refreshHermesConfig().catch(() => undefined)
|
||||
await callbacksRef.current.refreshSessions().catch(() => undefined)
|
||||
} catch {
|
||||
// Fall through to scheduleReconnect's backoff below.
|
||||
} finally {
|
||||
reconnecting = false
|
||||
|
||||
if (!cancelled && !gatewayOpen()) {
|
||||
scheduleReconnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleReconnect() {
|
||||
if (cancelled || reconnecting || reconnectTimer !== null || gatewayOpen()) {
|
||||
return
|
||||
}
|
||||
|
||||
// 1s, 2s, 4s … capped at 15s.
|
||||
const delay = Math.min(15_000, 1_000 * 2 ** Math.min(reconnectAttempt, 4))
|
||||
reconnectAttempt += 1
|
||||
reconnectTimer = setTimeout(() => {
|
||||
reconnectTimer = null
|
||||
void attemptReconnect()
|
||||
}, delay)
|
||||
}
|
||||
|
||||
const reconnectNow = () => {
|
||||
if (cancelled || !bootCompleted) {
|
||||
return
|
||||
}
|
||||
|
||||
clearReconnectTimer()
|
||||
reconnectAttempt = 0
|
||||
|
||||
if (!gatewayOpen()) {
|
||||
void attemptReconnect()
|
||||
}
|
||||
}
|
||||
|
||||
const offBootProgress = desktop.onBootProgress(payload => applyDesktopBootProgress(payload))
|
||||
void desktop
|
||||
.getBootProgress()
|
||||
@ -79,9 +167,34 @@ export function useGatewayBoot({
|
||||
callbacksRef.current.onGatewayReady(gateway)
|
||||
setGateway(gateway)
|
||||
|
||||
const offState = gateway.onState(st => void setGatewayState(st))
|
||||
const offState = gateway.onState(st => {
|
||||
setGatewayState(st)
|
||||
|
||||
if (st === 'open') {
|
||||
reconnectAttempt = 0
|
||||
clearReconnectTimer()
|
||||
} else if (bootCompleted && (st === 'closed' || st === 'error')) {
|
||||
// The socket dropped after a healthy boot (typically sleep/wake). Try
|
||||
// to bring it back instead of leaving the composer stuck disabled.
|
||||
scheduleReconnect()
|
||||
}
|
||||
})
|
||||
const offEvent = gateway.onEvent(event => callbacksRef.current.handleGatewayEvent(event))
|
||||
|
||||
// Wake signals: power resume (macOS/Windows), network coming back, and the
|
||||
// window regaining focus/visibility. Each nudges an immediate reconnect.
|
||||
const offPowerResume = desktop.onPowerResume?.(() => reconnectNow())
|
||||
|
||||
const onOnline = () => reconnectNow()
|
||||
const onVisible = () => {
|
||||
if (document.visibilityState === 'visible') {
|
||||
reconnectNow()
|
||||
}
|
||||
}
|
||||
|
||||
window.addEventListener('online', onOnline)
|
||||
document.addEventListener('visibilitychange', onVisible)
|
||||
|
||||
const offWindowState = desktop.onWindowStateChanged?.(payload => {
|
||||
const current = $connection.get()
|
||||
|
||||
@ -141,6 +254,7 @@ export function useGatewayBoot({
|
||||
})
|
||||
await callbacksRef.current.refreshSessions()
|
||||
completeDesktopBoot()
|
||||
bootCompleted = true
|
||||
} catch (err) {
|
||||
if (!cancelled) {
|
||||
const message = err instanceof Error ? err.message : String(err)
|
||||
@ -155,6 +269,10 @@ export function useGatewayBoot({
|
||||
|
||||
return () => {
|
||||
cancelled = true
|
||||
clearReconnectTimer()
|
||||
window.removeEventListener('online', onOnline)
|
||||
document.removeEventListener('visibilitychange', onVisible)
|
||||
offPowerResume?.()
|
||||
offState()
|
||||
offEvent()
|
||||
offExit()
|
||||
|
||||
@ -16,6 +16,7 @@ import {
|
||||
$messages,
|
||||
$sessions,
|
||||
getRememberedWorkspaceCwd,
|
||||
sessionPinId,
|
||||
setActiveSessionId,
|
||||
setAwaitingResponse,
|
||||
setBusy,
|
||||
@ -692,12 +693,15 @@ export function useSessionActions({
|
||||
const closingRuntimeId = wasSelected ? activeSessionId : null
|
||||
const previousMessages = $messages.get()
|
||||
const previousPinned = $pinnedSessionIds.get()
|
||||
// Pins are keyed on the durable lineage-root id; the stored id may be the
|
||||
// live tip after compression. Drop both so the pin can't linger.
|
||||
const removedPinId = removed ? sessionPinId(removed) : storedSessionId
|
||||
|
||||
setSessions(prev => prev.filter(s => s.id !== storedSessionId))
|
||||
// Keep $sessionsTotal in sync so the sidebar's "Load N more" footer
|
||||
// doesn't keep claiming the removed row is still on the server.
|
||||
setSessionsTotal(prev => Math.max(0, prev - 1))
|
||||
$pinnedSessionIds.set(previousPinned.filter(id => id !== storedSessionId))
|
||||
$pinnedSessionIds.set(previousPinned.filter(id => id !== storedSessionId && id !== removedPinId))
|
||||
|
||||
// Tear down before awaiting so the route effect can't resume the
|
||||
// doomed session via the stale /<sid> URL.
|
||||
@ -769,6 +773,9 @@ export function useSessionActions({
|
||||
const archived = $sessions.get().find(s => s.id === storedSessionId)
|
||||
const wasSelected = selectedStoredSessionId === storedSessionId
|
||||
const previousPinned = $pinnedSessionIds.get()
|
||||
// Pins are keyed on the durable lineage-root id; the stored id may be the
|
||||
// live tip after compression. Drop both so the pin can't linger.
|
||||
const archivedPinId = archived ? sessionPinId(archived) : storedSessionId
|
||||
|
||||
// Soft-hide: drop from the sidebar immediately, keep the data.
|
||||
setSessions(prev => prev.filter(s => s.id !== storedSessionId))
|
||||
@ -776,7 +783,7 @@ export function useSessionActions({
|
||||
// on the next refresh, so they count as "removed" for the load-more
|
||||
// footer math.
|
||||
setSessionsTotal(prev => Math.max(0, prev - 1))
|
||||
$pinnedSessionIds.set(previousPinned.filter(id => id !== storedSessionId))
|
||||
$pinnedSessionIds.set(previousPinned.filter(id => id !== storedSessionId && id !== archivedPinId))
|
||||
|
||||
if (wasSelected) {
|
||||
startFreshSessionDraft(true)
|
||||
|
||||
1
apps/desktop/src/global.d.ts
vendored
1
apps/desktop/src/global.d.ts
vendored
@ -49,6 +49,7 @@ declare global {
|
||||
onWindowStateChanged?: (callback: (payload: HermesWindowState) => void) => () => void
|
||||
onPreviewFileChanged: (callback: (payload: HermesPreviewFileChanged) => void) => () => void
|
||||
onBackendExit: (callback: (payload: BackendExit) => void) => () => void
|
||||
onPowerResume?: (callback: () => void) => () => void
|
||||
onBootProgress: (callback: (payload: DesktopBootProgress) => void) => () => void
|
||||
getBootstrapState: () => Promise<DesktopBootstrapState>
|
||||
resetBootstrap: () => Promise<{ ok: boolean }>
|
||||
|
||||
@ -2,7 +2,7 @@ import { describe, expect, it } from 'vitest'
|
||||
|
||||
import type { SessionInfo } from '@/types/hermes'
|
||||
|
||||
import { mergeWorkingSessions, sessionPinId } from './session'
|
||||
import { mergeSessionPage, sessionPinId } from './session'
|
||||
|
||||
const session = (over: Partial<SessionInfo>): SessionInfo => ({
|
||||
archived: false,
|
||||
@ -35,12 +35,12 @@ describe('sessionPinId', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('mergeWorkingSessions', () => {
|
||||
it('returns the server page untouched when nothing is working', () => {
|
||||
describe('mergeSessionPage', () => {
|
||||
it('returns the server page untouched when there is nothing to keep', () => {
|
||||
const previous = [session({ id: 'a' }), session({ id: 'b' })]
|
||||
const incoming = [session({ id: 'a' })]
|
||||
|
||||
expect(mergeWorkingSessions(previous, incoming, [])).toBe(incoming)
|
||||
expect(mergeSessionPage(previous, incoming, [])).toBe(incoming)
|
||||
})
|
||||
|
||||
it('keeps a still-working session the server omitted', () => {
|
||||
@ -50,7 +50,7 @@ describe('mergeWorkingSessions', () => {
|
||||
const previous = [session({ id: 'c' }), session({ id: 'b' }), session({ id: 'a' })]
|
||||
const incoming = [session({ id: 'a', message_count: 2 })]
|
||||
|
||||
const merged = mergeWorkingSessions(previous, incoming, ['b', 'c'])
|
||||
const merged = mergeSessionPage(previous, incoming, ['b', 'c'])
|
||||
|
||||
expect(merged.map(s => s.id)).toEqual(['c', 'b', 'a'])
|
||||
// The finished session comes from the fresh server payload, not the stale
|
||||
@ -62,18 +62,42 @@ describe('mergeWorkingSessions', () => {
|
||||
const previous = [session({ id: 'b' }), session({ id: 'a' })]
|
||||
const incoming = [session({ id: 'b', message_count: 4 }), session({ id: 'a' })]
|
||||
|
||||
const merged = mergeWorkingSessions(previous, incoming, ['b'])
|
||||
const merged = mergeSessionPage(previous, incoming, ['b'])
|
||||
|
||||
expect(merged.map(s => s.id)).toEqual(['b', 'a'])
|
||||
expect(merged.find(s => s.id === 'b')?.message_count).toBe(4)
|
||||
})
|
||||
|
||||
it('never resurrects a non-working session the server dropped', () => {
|
||||
it('never resurrects a session the server dropped that is not in the keep set', () => {
|
||||
// A deleted/archived session is removed from `previous` optimistically and
|
||||
// is not in the working set, so it must stay gone after a refresh.
|
||||
// is not in the keep set, so it must stay gone after a refresh.
|
||||
const previous = [session({ id: 'b' }), session({ id: 'gone' })]
|
||||
const incoming = [session({ id: 'b' })]
|
||||
|
||||
expect(mergeWorkingSessions(previous, incoming, ['b']).map(s => s.id)).toEqual(['b'])
|
||||
expect(mergeSessionPage(previous, incoming, ['b']).map(s => s.id)).toEqual(['b'])
|
||||
})
|
||||
|
||||
it('keeps a pinned session that has aged off the recent page', () => {
|
||||
// Repro of "loses pins until you refresh": a pinned chat falls off the
|
||||
// most-recent page, so the server stops returning it. A hard replace would
|
||||
// evict it and the Pinned section would go empty. The keep set (which
|
||||
// carries pinned ids) must hold it in memory.
|
||||
const previous = [session({ id: 'recent' }), session({ id: 'pinned' })]
|
||||
const incoming = [session({ id: 'recent' })]
|
||||
|
||||
const merged = mergeSessionPage(previous, incoming, ['pinned'])
|
||||
|
||||
expect(merged.map(s => s.id)).toEqual(['pinned', 'recent'])
|
||||
})
|
||||
|
||||
it('keeps a pinned session matched by its lineage root after compression', () => {
|
||||
// The pin is stored on the lineage-root id, but the loaded row surfaces
|
||||
// under its live compression tip. Matching on _lineage_root_id keeps it.
|
||||
const previous = [session({ id: 'tip', _lineage_root_id: 'root' })]
|
||||
const incoming = [session({ id: 'other' })]
|
||||
|
||||
const merged = mergeSessionPage(previous, incoming, ['root'])
|
||||
|
||||
expect(merged.map(s => s.id)).toEqual(['tip', 'other'])
|
||||
})
|
||||
})
|
||||
|
||||
@ -28,28 +28,45 @@ export const sessionPinId = (session: Pick<SessionInfo, '_lineage_root_id' | 'id
|
||||
session._lineage_root_id ?? session.id
|
||||
|
||||
/** Merge a fresh server session page into the in-memory list, keeping any
|
||||
* still-"working" session the server omitted.
|
||||
* row the server omitted that we still want visible — both still-"working"
|
||||
* sessions and pinned sessions.
|
||||
*
|
||||
* A brand-new session's first user message isn't flushed to the SessionDB
|
||||
* until its turn is persisted, so `listSessions(min_messages=1)` skips
|
||||
* sessions that are mid-first-response. Because every `message.complete`
|
||||
* triggers a full refresh, a hard replace makes concurrent new chats vanish
|
||||
* the instant any one of them finishes. Preserving the working-but-absent
|
||||
* rows keeps them visible until their own turn persists and the server
|
||||
* starts returning them. Optimistic deletes/archives already drop the row
|
||||
* from `previous`, so a removed session can't be resurrected here. */
|
||||
export function mergeWorkingSessions(
|
||||
* Two reasons the server drops a row we must keep:
|
||||
*
|
||||
* 1. A brand-new session's first user message isn't flushed to the SessionDB
|
||||
* until its turn is persisted, so `listSessions(min_messages=1)` skips
|
||||
* sessions that are mid-first-response. Because every `message.complete`
|
||||
* triggers a full refresh, a hard replace makes concurrent new chats vanish
|
||||
* the instant any one of them finishes.
|
||||
* 2. The sidebar lists only the most-recent page (`SIDEBAR_SESSIONS_PAGE_SIZE`)
|
||||
* ordered by activity. A pinned conversation that hasn't been touched in a
|
||||
* while falls off that page, so a hard replace silently evicts it from the
|
||||
* in-memory list — and because the Pinned section resolves pins against
|
||||
* that list, the pin "disappears until you refresh".
|
||||
*
|
||||
* `keepIds` carries both the working set and the pinned set. Pins are stored
|
||||
* on the durable lineage-root id (see {@link sessionPinId}), while the loaded
|
||||
* row surfaces under its live compression tip, so we match a survivor by
|
||||
* either its live `id` or its `_lineage_root_id`. Optimistic deletes/archives
|
||||
* drop the row from `previous` (and unpin it), so a removed session can't be
|
||||
* resurrected here. */
|
||||
export function mergeSessionPage(
|
||||
previous: SessionInfo[],
|
||||
incoming: SessionInfo[],
|
||||
workingIds: readonly string[]
|
||||
keepIds: Iterable<string>
|
||||
): SessionInfo[] {
|
||||
if (workingIds.length === 0) {
|
||||
const keep = keepIds instanceof Set ? keepIds : new Set(keepIds)
|
||||
|
||||
if (keep.size === 0) {
|
||||
return incoming
|
||||
}
|
||||
|
||||
const working = new Set(workingIds)
|
||||
const incomingIds = new Set(incoming.map(session => session.id))
|
||||
const survivors = previous.filter(session => working.has(session.id) && !incomingIds.has(session.id))
|
||||
const survivors = previous.filter(
|
||||
session =>
|
||||
!incomingIds.has(session.id) &&
|
||||
(keep.has(session.id) || (session._lineage_root_id != null && keep.has(session._lineage_root_id)))
|
||||
)
|
||||
|
||||
return survivors.length ? [...survivors, ...incoming] : incoming
|
||||
}
|
||||
|
||||
@ -503,8 +503,12 @@ export interface ToolsetConfig {
|
||||
}
|
||||
|
||||
export interface SessionSearchResult {
|
||||
/** Lineage root of the matched conversation. Stable across compression and
|
||||
* used as the durable pin id; falls back to session_id when absent. */
|
||||
lineage_root?: string | null
|
||||
model: string | null
|
||||
role: string | null
|
||||
/** Live compression tip of the matched conversation — resume by this id. */
|
||||
session_id: string
|
||||
session_started: number | null
|
||||
snippet: string
|
||||
|
||||
@ -49,6 +49,7 @@ type PendingCall = {
|
||||
export interface GatewayClientOptions {
|
||||
closedErrorMessage?: string
|
||||
connectErrorMessage?: string
|
||||
connectTimeoutMs?: number
|
||||
createRequestId?: (nextId: number) => GatewayRequestId
|
||||
requestIdPrefix?: string
|
||||
requestTimeoutMs?: number
|
||||
@ -58,6 +59,10 @@ export interface GatewayClientOptions {
|
||||
|
||||
const ANY = '*'
|
||||
const DEFAULT_REQUEST_TIMEOUT_MS = 120_000
|
||||
// A reconnect after sleep/wake must not hang forever in 'connecting' (which
|
||||
// keeps the composer disabled and stuck on "Starting Hermes..."). If the open
|
||||
// handshake doesn't land in this window, fail to 'error' so callers can retry.
|
||||
const DEFAULT_CONNECT_TIMEOUT_MS = 15_000
|
||||
|
||||
export class JsonRpcGatewayClient {
|
||||
private nextId = 0
|
||||
@ -73,6 +78,7 @@ export class JsonRpcGatewayClient {
|
||||
this.options = {
|
||||
closedErrorMessage: options.closedErrorMessage ?? 'WebSocket closed',
|
||||
connectErrorMessage: options.connectErrorMessage ?? 'WebSocket connection failed',
|
||||
connectTimeoutMs: options.connectTimeoutMs ?? DEFAULT_CONNECT_TIMEOUT_MS,
|
||||
createRequestId:
|
||||
options.createRequestId ?? ((nextId: number) => `${options.requestIdPrefix ?? 'r'}${nextId}`),
|
||||
notConnectedErrorMessage: options.notConnectedErrorMessage ?? 'gateway not connected',
|
||||
@ -97,29 +103,84 @@ export class JsonRpcGatewayClient {
|
||||
this.socket = socket
|
||||
|
||||
socket.addEventListener('message', message => {
|
||||
if (this.socket !== socket) {
|
||||
return
|
||||
}
|
||||
|
||||
this.handleMessage(message.data)
|
||||
})
|
||||
|
||||
socket.addEventListener('close', () => {
|
||||
if (this.socket !== socket) {
|
||||
return
|
||||
}
|
||||
|
||||
this.socket = null
|
||||
this.setState('closed')
|
||||
this.rejectAllPending(new Error(this.options.closedErrorMessage))
|
||||
})
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const onOpen = () => {
|
||||
let settled = false
|
||||
let timer: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
const cleanup = () => {
|
||||
if (timer !== undefined) {
|
||||
clearTimeout(timer)
|
||||
}
|
||||
|
||||
socket.removeEventListener('open', onOpen)
|
||||
socket.removeEventListener('error', onError)
|
||||
}
|
||||
|
||||
const onOpen = () => {
|
||||
if (settled || this.socket !== socket) {
|
||||
return
|
||||
}
|
||||
|
||||
settled = true
|
||||
cleanup()
|
||||
this.setState('open')
|
||||
resolve()
|
||||
}
|
||||
|
||||
const onError = () => {
|
||||
socket.removeEventListener('open', onOpen)
|
||||
if (settled || this.socket !== socket) {
|
||||
return
|
||||
}
|
||||
|
||||
settled = true
|
||||
cleanup()
|
||||
this.setState('error')
|
||||
reject(new Error(this.options.connectErrorMessage))
|
||||
}
|
||||
|
||||
socket.addEventListener('open', onOpen, { once: true })
|
||||
socket.addEventListener('error', onError, { once: true })
|
||||
|
||||
if (this.options.connectTimeoutMs > 0) {
|
||||
timer = setTimeout(() => {
|
||||
if (settled) {
|
||||
return
|
||||
}
|
||||
|
||||
settled = true
|
||||
cleanup()
|
||||
// Drop the half-open socket so the next connect() starts clean
|
||||
// instead of short-circuiting on a zombie 'connecting' state.
|
||||
if (this.socket === socket) {
|
||||
try {
|
||||
socket.close()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
this.socket = null
|
||||
}
|
||||
this.setState('error')
|
||||
reject(new Error(this.options.connectErrorMessage))
|
||||
}, this.options.connectTimeoutMs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -1529,7 +1529,15 @@ async def get_sessions(
|
||||
|
||||
@app.get("/api/sessions/search")
|
||||
async def search_sessions(q: str = "", limit: int = 20):
|
||||
"""Full-text search across session message content using FTS5."""
|
||||
"""Full-text search across session message content using FTS5.
|
||||
|
||||
Results are deduped by compression lineage, not by raw ``session_id``.
|
||||
Auto-compression rotates a conversation onto a fresh session id (and leaves
|
||||
the old segment's messages in the FTS index), so one logical chat can own
|
||||
many ``sessions`` rows that all match the same query. Branches also use
|
||||
``parent_session_id``, but they are real alternate conversations; don't
|
||||
collapse branch-specific hits back into the parent.
|
||||
"""
|
||||
if not q or not q.strip():
|
||||
return {"results": []}
|
||||
try:
|
||||
@ -1547,20 +1555,99 @@ async def search_sessions(q: str = "", limit: int = 20):
|
||||
else:
|
||||
terms.append(token + "*")
|
||||
prefix_query = " ".join(terms)
|
||||
matches = db.search_messages(query=prefix_query, limit=limit)
|
||||
# Group by session_id — return unique sessions with their best snippet
|
||||
# Over-fetch so lineage dedup can still surface `limit` distinct
|
||||
# conversations even when several hits collapse onto one root.
|
||||
fetch_limit = max(limit * 5, 50)
|
||||
matches = db.search_messages(query=prefix_query, limit=fetch_limit)
|
||||
|
||||
# Walk parent_session_id to the compression root, memoized so a
|
||||
# chain of compression segments only costs one walk. We deliberately
|
||||
# stop at branch/delegate edges: those sessions may diverge from the
|
||||
# parent and should remain searchable on their own.
|
||||
root_cache: dict = {}
|
||||
|
||||
def compression_root(session_id: str) -> str:
|
||||
if not session_id:
|
||||
return session_id
|
||||
if session_id in root_cache:
|
||||
return root_cache[session_id]
|
||||
chain = []
|
||||
cur = session_id
|
||||
visited = set()
|
||||
root = session_id
|
||||
while cur and cur not in visited:
|
||||
visited.add(cur)
|
||||
chain.append(cur)
|
||||
if cur in root_cache:
|
||||
root = root_cache[cur]
|
||||
break
|
||||
try:
|
||||
s = db.get_session(cur)
|
||||
except Exception:
|
||||
s = None
|
||||
if not s:
|
||||
root = cur
|
||||
break
|
||||
parent = s.get("parent_session_id") if isinstance(s, dict) else None
|
||||
if not parent:
|
||||
root = cur
|
||||
break
|
||||
try:
|
||||
parent_session = db.get_session(parent)
|
||||
except Exception:
|
||||
parent_session = None
|
||||
if not parent_session:
|
||||
root = cur
|
||||
break
|
||||
parent_ended_at = parent_session.get("ended_at")
|
||||
started_at = s.get("started_at")
|
||||
is_compression_edge = (
|
||||
parent_session.get("end_reason") == "compression"
|
||||
and parent_ended_at is not None
|
||||
and started_at is not None
|
||||
and started_at >= parent_ended_at
|
||||
)
|
||||
if not is_compression_edge:
|
||||
root = cur
|
||||
break
|
||||
cur = parent
|
||||
for node in chain:
|
||||
root_cache[node] = root
|
||||
return root
|
||||
|
||||
tip_cache: dict = {}
|
||||
|
||||
def lineage_tip(root_id: str) -> str:
|
||||
if root_id in tip_cache:
|
||||
return tip_cache[root_id]
|
||||
tip = root_id
|
||||
try:
|
||||
resolved = db.get_compression_tip(root_id)
|
||||
if resolved:
|
||||
tip = resolved
|
||||
except Exception:
|
||||
pass
|
||||
tip_cache[root_id] = tip
|
||||
return tip
|
||||
|
||||
# Keep the best (first / most relevant) hit per compression root.
|
||||
seen: dict = {}
|
||||
for m in matches:
|
||||
sid = m["session_id"]
|
||||
if sid not in seen:
|
||||
seen[sid] = {
|
||||
"session_id": sid,
|
||||
"snippet": m.get("snippet", ""),
|
||||
"role": m.get("role"),
|
||||
"source": m.get("source"),
|
||||
"model": m.get("model"),
|
||||
"session_started": m.get("session_started"),
|
||||
}
|
||||
raw_sid = m["session_id"]
|
||||
root = compression_root(raw_sid)
|
||||
if root in seen:
|
||||
continue
|
||||
seen[root] = {
|
||||
"session_id": lineage_tip(root),
|
||||
"lineage_root": root,
|
||||
"snippet": m.get("snippet", ""),
|
||||
"role": m.get("role"),
|
||||
"source": m.get("source"),
|
||||
"model": m.get("model"),
|
||||
"session_started": m.get("session_started"),
|
||||
}
|
||||
if len(seen) >= limit:
|
||||
break
|
||||
return {"results": list(seen.values())}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
@ -428,6 +428,78 @@ class TestWebServerEndpoints:
|
||||
tip = next(r for r in rows if r["id"] == "tip-new")
|
||||
assert tip.get("_lineage_root_id") == "root-old"
|
||||
|
||||
def test_search_dedupes_compression_lineage_to_tip(self):
|
||||
"""A conversation that auto-compresses leaves the matched term in both
|
||||
the root segment and the continuation. Search must collapse them to a
|
||||
single result keyed by the lineage root and pointing at the live tip,
|
||||
so the sidebar stops showing the same chat several times."""
|
||||
import time as _time
|
||||
|
||||
from hermes_state import SessionDB
|
||||
|
||||
db = SessionDB()
|
||||
try:
|
||||
db.create_session(session_id="search-root", source="cli")
|
||||
db.append_message(session_id="search-root", role="user", content="distinctneedle in the root")
|
||||
db.end_session("search-root", "compression")
|
||||
now = _time.time()
|
||||
db._conn.execute(
|
||||
"UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = ?",
|
||||
(now - 100, now - 90, "search-root"),
|
||||
)
|
||||
db.create_session(session_id="search-tip", source="cli", parent_session_id="search-root")
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = ?", (now - 90, "search-tip"))
|
||||
db.append_message(session_id="search-tip", role="user", content="distinctneedle again in the tip")
|
||||
db._conn.commit()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
resp = self.client.get("/api/sessions/search?q=distinctneedle")
|
||||
assert resp.status_code == 200
|
||||
results = resp.json()["results"]
|
||||
|
||||
lineage_hits = [r for r in results if r.get("lineage_root") == "search-root"]
|
||||
# One conversation -> exactly one result despite two FTS hits.
|
||||
assert len(lineage_hits) == 1
|
||||
hit = lineage_hits[0]
|
||||
# Surfaced under the live tip so clicking resumes the current session.
|
||||
assert hit["session_id"] == "search-tip"
|
||||
assert hit["lineage_root"] == "search-root"
|
||||
|
||||
def test_search_keeps_branch_specific_hits_on_branch(self):
|
||||
"""Branch sessions share parent_session_id, but they are not compression
|
||||
continuations. A query that only exists in the branch must open the
|
||||
branch instead of being collapsed back to the parent/root."""
|
||||
import time as _time
|
||||
|
||||
from hermes_state import SessionDB
|
||||
|
||||
db = SessionDB()
|
||||
try:
|
||||
now = _time.time()
|
||||
db.create_session(session_id="branch-parent", source="cli")
|
||||
db.append_message(session_id="branch-parent", role="user", content="ancestor context")
|
||||
db.end_session("branch-parent", "branched")
|
||||
db._conn.execute(
|
||||
"UPDATE sessions SET started_at = ?, ended_at = ? WHERE id = ?",
|
||||
(now - 100, now - 90, "branch-parent"),
|
||||
)
|
||||
db.create_session(session_id="branch-child", source="cli", parent_session_id="branch-parent")
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = ?", (now - 80, "branch-child"))
|
||||
db.append_message(session_id="branch-child", role="user", content="branchspecificneedle only here")
|
||||
db._conn.commit()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
resp = self.client.get("/api/sessions/search?q=branchspecificneedle")
|
||||
assert resp.status_code == 200
|
||||
results = resp.json()["results"]
|
||||
|
||||
assert any(
|
||||
r["session_id"] == "branch-child" and r.get("lineage_root") == "branch-child"
|
||||
for r in results
|
||||
)
|
||||
|
||||
def test_get_sessions_archived_is_boolean(self):
|
||||
from hermes_state import SessionDB
|
||||
|
||||
|
||||
Reference in New Issue
Block a user