feat(cli): ranked fuzzy search in the curses model picker

Wires the salvaged search helpers into the shared curses menu driver and
turns on type-to-filter for the CLI model pickers (the 100+ model lists
that previously required scrolling).

- Search lives in the shared `_run_curses_menu` driver behind a
  `searchable` flag + `search_labels`, so both `curses_radiolist` and
  `curses_single_select` get it without per-menu duplication. `/` opens
  the filter, BACKSPACE edits, Ctrl+U clears, ESC clears the filter then
  cancels. Returned values are always original item indices.
- `_filter_indices` RANKS matches (best-first) via a Python port of the
  TS scorer in ui-tui/src/lib/fuzzy.ts and web/src/lib/fuzzy.ts. The port
  is byte-identical in score: same per-char bonuses, prefix (+8) and
  exact (+20) bonuses, camelCase/word-boundary detection (matching on the
  lowercased target, boundary on the original case), and the -len*0.01
  length tiebreak — so the CLI, TUI, and WebUI rank results identically.
  A cross-language parity test pins the exact scores.
- `_prompt_model_selection` (the canonical picker across the model flows)
  and the custom-provider model list pass `searchable=True`.
- Split `_decode_menu_key` out of `read_menu_key` so the search loop can
  peek the raw key (catch `/`) before nav decoding.
- ESC during active search now clears the query (restores the full list)
  so a no-match filter can't strand the user; printable-key capture is
  restricted to ASCII to avoid Latin-1 mojibake.
- Update two setup-menu tests whose mock signatures predate the new
  `searchable` kwarg; add ranked-scorer + parity + state-machine tests.
This commit is contained in:
kshitijk4poor
2026-06-01 23:18:25 +05:30
committed by Teknium
parent 53f598e7a2
commit 0fdab53ef0
5 changed files with 375 additions and 33 deletions

View File

@ -6165,6 +6165,7 @@ def _prompt_model_selection(
selected=default_idx, selected=default_idx,
cancel_returns=-1, cancel_returns=-1,
description=description, description=description,
searchable=True,
) )
if idx < 0: if idx < 0:
return None return None

View File

@ -33,14 +33,131 @@ def _query_matches(label: str, query: str) -> bool:
return True return True
_WORD_BOUNDARY = frozenset("-_/. ")
def _is_boundary(target: str, index: int) -> bool:
"""True if position ``index`` in ``target`` starts a word.
Mirrors ``isBoundary`` in the TS scorer: start-of-string, after a
separator char, or a lower->upper camelCase transition.
"""
if index == 0:
return True
prev = target[index - 1]
if prev in _WORD_BOUNDARY:
return True
# camelCase / lower->upper transition (e.g. the `O` in `gptO`).
cur = target[index]
return prev == prev.lower() and cur != cur.lower() and cur == cur.upper()
def _token_score(orig: str, lower: str, token: str) -> float | None:
"""Score one token against a target. None if the token isn't a subsequence.
A faithful port of ``fuzzyScore`` in ui-tui/src/lib/fuzzy.ts and
web/src/lib/fuzzy.ts so all three surfaces rank model ids identically:
contiguous runs, word-boundary / first-char starts, prefix matches, and
exact matches all score higher than scattered subsequence hits.
``lower`` is ``orig`` lowercased; matching is done against ``lower`` while
boundary detection uses ``orig`` (so the camelCase rule works), exactly as
in the TS scorer.
"""
score = 0.0
prev = -1
search_from = 0
positions: list[int] = []
for ch in token:
idx = lower.find(ch, search_from)
if idx < 0:
return None
positions.append(idx)
score += 1
if prev >= 0 and idx == prev + 1:
score += 5
elif prev >= 0:
score -= min(idx - prev - 1, 3)
if _is_boundary(orig, idx):
score += 3
if idx == 0:
score += 5
prev = idx
search_from = idx + 1
# Prefix bonus: the token matched a contiguous prefix of the target.
if positions and positions[0] == 0 and positions[-1] == len(positions) - 1:
score += 8
# Exact full match dominates everything else.
if lower == token:
score += 20
# Slightly prefer shorter targets when scores are otherwise close.
score -= len(lower) * 0.01
return score
def _fuzzy_score(label: str, query: str) -> float | None:
"""Aggregate score for a multi-token query (AND). None if any token fails.
Mirrors ``fuzzyScoreMulti`` in the TS scorer: every whitespace-separated
token must match; per-token scores are summed.
"""
lower = label.lower()
tokens = query.lower().split()
if not tokens:
return 0.0
total = 0.0
for token in tokens:
token_score = _token_score(label, lower, token)
if token_score is None:
return None
total += token_score
return total
def _filter_indices(items: List[str], query: str) -> List[int]: def _filter_indices(items: List[str], query: str) -> List[int]:
"""Return original item indices matching *query*, preserving list order.""" """Return item indices matching *query*, ranked best-first.
An empty query keeps every item in original order. Otherwise items are
filtered to fuzzy matches and sorted by score descending, ties broken by
original index so equal-scoring rows keep their catalog order.
"""
q = query.strip() q = query.strip()
if not q: if not q:
return list(range(len(items))) return list(range(len(items)))
return [i for i, label in enumerate(items) if _query_matches(label, q)] scored = []
for i, label in enumerate(items):
score = _fuzzy_score(label, q)
if score is not None:
scored.append((i, score))
scored.sort(key=lambda pair: (-pair[1], pair[0]))
return [i for i, _ in scored]
@dataclass @dataclass
@ -98,8 +215,13 @@ def _handle_active_search_key(
return False, False, False return False, False, False
if key == 27: if key == 27:
# Esc stops search AND clears the query, restoring the full list (so a
# no-match filter can't strand the user on an empty list). Signals
# `changed` when there was a query so the driver resets scroll/cursor.
had_query = bool(search.query)
search.active = False search.active = False
return True, False, False search.query = ""
return True, False, had_query
if key in (curses_mod.KEY_BACKSPACE, 127, 8): if key in (curses_mod.KEY_BACKSPACE, 127, 8):
search.query = search.query[:-1] search.query = search.query[:-1]
@ -112,7 +234,7 @@ def _handle_active_search_key(
if key in (curses_mod.KEY_ENTER, 10, 13): if key in (curses_mod.KEY_ENTER, 10, 13):
return True, True, False return True, True, False
if 0 <= key < 256 and chr(key).isprintable(): if 32 <= key < 127: # printable ASCII; avoids Latin-1 mojibake from 128-255
search.query += chr(key) search.query += chr(key)
return True, False, True return True, False, True
@ -167,9 +289,16 @@ def read_menu_key(stdscr) -> str:
the escape path; ``q`` also cancels. Unknown sequences map to the escape path; ``q`` also cancels. Unknown sequences map to
``NAV_NONE`` so the caller simply ignores them rather than misfiring. ``NAV_NONE`` so the caller simply ignores them rather than misfiring.
""" """
import curses return _decode_menu_key(stdscr, stdscr.getch())
key = stdscr.getch()
def _decode_menu_key(stdscr, key: int) -> str:
"""Normalize an already-read keypress to a menu action.
Split out from ``read_menu_key`` so search-aware loops can peek the raw
key (e.g. to catch ``/``) before falling back to nav decoding.
"""
import curses
if key in (curses.KEY_UP, ord("k")): if key in (curses.KEY_UP, ord("k")):
return NAV_UP return NAV_UP
@ -230,6 +359,8 @@ def _run_curses_menu(
extra_color_pairs=False, extra_color_pairs=False,
fallback, fallback,
cancel_value, cancel_value,
searchable=False,
search_labels=None,
): ):
"""Shared curses single-/multi-select event loop. """Shared curses single-/multi-select event loop.
@ -244,9 +375,12 @@ def _run_curses_menu(
Callbacks / params: Callbacks / params:
draw_header(stdscr, max_y, max_x) -> int draw_header(stdscr, max_y, max_x) -> int
Draw the title/hint/description rows. Returns the first screen row Draw the title/hint/description rows. Returns the first screen row
index where the scrollable item list should start. index where the scrollable item list should start. When search is
active it receives the live ``_SearchState`` via the optional
``search`` keyword (drawn by the menu so the hint line can show it).
draw_row(stdscr, y, idx, is_cursor, max_x) -> None draw_row(stdscr, y, idx, is_cursor, max_x) -> None
Draw one item row. Draw one item row. ``idx`` is always the ORIGINAL item index, so
per-menu rendering is unchanged whether or not a filter is active.
on_action(action, cursor) -> value on_action(action, cursor) -> value
Reducer for SELECT/TOGGLE/CANCEL. Return ``_KEEP`` to continue the Reducer for SELECT/TOGGLE/CANCEL. Return ``_KEEP`` to continue the
loop; return anything else to resolve the menu with that value. loop; return anything else to resolve the menu with that value.
@ -260,6 +394,10 @@ def _run_curses_menu(
fallback() -> value fallback() -> value
Called when curses errors out on a real TTY (curses unavailable). Called when curses errors out on a real TTY (curses unavailable).
cancel_value: returned on non-TTY stdin, ESC/cancel, or KeyboardInterrupt. cancel_value: returned on non-TTY stdin, ESC/cancel, or KeyboardInterrupt.
searchable: when true, ``/`` opens a type-to-filter prompt over
``search_labels``. Returned values are always ORIGINAL item indices.
search_labels: per-item text used for filtering (required when
``searchable`` is true; length must equal ``item_count``).
""" """
# Non-TTY (piped/redirected stdin): curses and input() both hang or spin, # Non-TTY (piped/redirected stdin): curses and input() both hang or spin,
# so return the cancel value directly — matching the pre-refactor guard in # so return the cancel value directly — matching the pre-refactor guard in
@ -267,6 +405,8 @@ def _run_curses_menu(
if not sys.stdin.isatty(): if not sys.stdin.isatty():
return cancel_value return cancel_value
use_search = searchable and search_labels is not None and len(search_labels) == item_count
try: try:
import curses import curses
result_holder = [_KEEP] result_holder = [_KEEP]
@ -284,22 +424,46 @@ def _run_curses_menu(
) )
cursor = initial_cursor cursor = initial_cursor
scroll_offset = 0 scroll_offset = 0
search = _SearchState()
# Non-None labels for filtering; empty when search is disabled so
# _filter_indices stays a cheap identity range.
labels: List[str] = (
search_labels if (use_search and search_labels is not None) else []
)
while True: while True:
stdscr.clear() stdscr.clear()
max_y, max_x = stdscr.getmaxyx() max_y, max_x = stdscr.getmaxyx()
items_start = draw_header(stdscr, max_y, max_x) filtered = (
_filter_indices(labels, search.query)
if use_search
else list(range(item_count))
)
cursor, cursor_pos = _reconcile_cursor(filtered, cursor)
visible_rows = max_y - items_start - reserve_bottom # draw_header accepts an optional `search` kwarg when the menu
if cursor < scroll_offset: # wants to render the live filter; tolerate headers that don't.
scroll_offset = cursor try:
elif cursor >= scroll_offset + visible_rows: items_start = draw_header(stdscr, max_y, max_x, search=search)
scroll_offset = cursor - visible_rows + 1 except TypeError:
items_start = draw_header(stdscr, max_y, max_x)
for draw_i, i in enumerate( visible_rows = max(1, max_y - items_start - reserve_bottom)
range(scroll_offset, min(item_count, scroll_offset + visible_rows)) scroll_offset = _scroll_for_cursor(
scroll_offset, cursor_pos, visible_rows, len(filtered)
)
if use_search and search.query and not filtered:
try:
stdscr.addnstr(items_start, 0, " No matches", max_x - 1, curses.A_DIM)
except curses.error:
pass
for draw_i, filtered_pos in enumerate(
range(scroll_offset, min(len(filtered), scroll_offset + visible_rows))
): ):
i = filtered[filtered_pos]
y = draw_i + items_start y = draw_i + items_start
if y >= max_y - reserve_bottom: if y >= max_y - reserve_bottom:
break break
@ -309,13 +473,46 @@ def _run_curses_menu(
draw_footer(stdscr, max_y, max_x) draw_footer(stdscr, max_y, max_x)
stdscr.refresh() stdscr.refresh()
action = read_menu_key(stdscr)
if use_search:
key = stdscr.getch()
if search.active:
# Active search consumes query-editing keys; nav keys
# fall through to be decoded below.
handled, confirm, changed = _handle_active_search_key(
curses, key, search
)
if changed:
scroll_offset = 0
cursor, cursor_pos = _reconcile_cursor(
_filter_indices(search_labels, search.query), cursor
)
if confirm:
if filtered:
outcome = on_action(NAV_SELECT, cursor)
if outcome is not _KEEP:
result_holder[0] = outcome
return
continue
if handled:
continue
action = _decode_menu_key(stdscr, key)
elif key == ord("/"):
search.active = True
continue
else:
action = _decode_menu_key(stdscr, key)
else:
action = read_menu_key(stdscr)
if action == NAV_UP: if action == NAV_UP:
cursor = (cursor - 1) % item_count cursor = _move_filtered_cursor(filtered, cursor, cursor_pos, -1)
elif action == NAV_DOWN: elif action == NAV_DOWN:
cursor = (cursor + 1) % item_count cursor = _move_filtered_cursor(filtered, cursor, cursor_pos, 1)
elif action in (NAV_SELECT, NAV_TOGGLE, NAV_CANCEL): elif action in (NAV_SELECT, NAV_TOGGLE, NAV_CANCEL):
if action == NAV_SELECT and use_search and not filtered:
continue
outcome = on_action(action, cursor) outcome = on_action(action, cursor)
if outcome is not _KEEP: if outcome is not _KEEP:
result_holder[0] = outcome result_holder[0] = outcome
@ -429,6 +626,7 @@ def curses_radiolist(
*, *,
cancel_returns: int | None = None, cancel_returns: int | None = None,
description: str | None = None, description: str | None = None,
searchable: bool = False,
) -> int: ) -> int:
"""Curses single-select radio list. Returns the selected index. """Curses single-select radio list. Returns the selected index.
@ -440,6 +638,9 @@ def curses_radiolist(
description: Optional multi-line text shown between the title and description: Optional multi-line text shown between the title and
the item list. Useful for context that should survive the the item list. Useful for context that should survive the
curses screen clear. curses screen clear.
searchable: When true, ``/`` opens a type-to-filter prompt. The
returned value is always the original item index, not a filtered
row position.
""" """
if cancel_returns is None: if cancel_returns is None:
cancel_returns = selected cancel_returns = selected
@ -448,7 +649,7 @@ def curses_radiolist(
if description: if description:
desc_lines = description.splitlines() desc_lines = description.splitlines()
def _draw_header(stdscr, max_y, max_x): def _draw_header(stdscr, max_y, max_x, search=None):
import curses import curses
row = 0 row = 0
try: try:
@ -465,11 +666,13 @@ def curses_radiolist(
stdscr.addnstr(row, 0, dline, max_x - 1, curses.A_NORMAL) stdscr.addnstr(row, 0, dline, max_x - 1, curses.A_NORMAL)
row += 1 row += 1
stdscr.addnstr( if searchable and search is not None and search.active:
row, 0, hint = f" Search: {search.query}\u258e BACKSPACE edit Ctrl+U clear ESC stop"
" \u2191\u2193 navigate ENTER/SPACE select ESC cancel", elif searchable:
max_x - 1, curses.A_DIM, hint = " \u2191\u2193 navigate ENTER/SPACE select / search ESC cancel"
) else:
hint = " \u2191\u2193 navigate ENTER/SPACE select ESC cancel"
stdscr.addnstr(row, 0, hint, max_x - 1, curses.A_DIM)
row += 1 row += 1
except curses.error: except curses.error:
pass pass
@ -505,6 +708,8 @@ def curses_radiolist(
reserve_bottom=1, reserve_bottom=1,
fallback=lambda: _radio_numbered_fallback(title, items, selected, cancel_returns), fallback=lambda: _radio_numbered_fallback(title, items, selected, cancel_returns),
cancel_value=cancel_returns, cancel_value=cancel_returns,
searchable=searchable,
search_labels=list(items) if searchable else None,
) )
@ -540,27 +745,33 @@ def curses_single_select(
default_index: int = 0, default_index: int = 0,
*, *,
cancel_label: str = "Cancel", cancel_label: str = "Cancel",
searchable: bool = False,
) -> int | None: ) -> int | None:
"""Curses single-select menu. Returns selected index or None on cancel. """Curses single-select menu. Returns selected index or None on cancel.
Works inside prompt_toolkit because curses.wrapper() restores the terminal Works inside prompt_toolkit because curses.wrapper() restores the terminal
safely, unlike simple_term_menu which conflicts with /dev/tty. safely, unlike simple_term_menu which conflicts with /dev/tty.
When ``searchable`` is true, ``/`` opens a type-to-filter prompt; the
returned value is always the original item index (or None for cancel).
""" """
all_items = list(items) + [cancel_label] all_items = list(items) + [cancel_label]
cancel_idx = len(items) cancel_idx = len(items)
def _draw_header(stdscr, max_y, max_x): def _draw_header(stdscr, max_y, max_x, search=None):
import curses import curses
try: try:
hattr = curses.A_BOLD hattr = curses.A_BOLD
if curses.has_colors(): if curses.has_colors():
hattr |= curses.color_pair(2) hattr |= curses.color_pair(2)
stdscr.addnstr(0, 0, title, max_x - 1, hattr) stdscr.addnstr(0, 0, title, max_x - 1, hattr)
stdscr.addnstr( if searchable and search is not None and search.active:
1, 0, hint = f" Search: {search.query}\u258e BACKSPACE edit Ctrl+U clear ESC stop"
" ↑↓ navigate ENTER confirm ESC/q cancel", elif searchable:
max_x - 1, curses.A_DIM, hint = " ↑↓ navigate ENTER confirm / search ESC/q cancel"
) else:
hint = " ↑↓ navigate ENTER confirm ESC/q cancel"
stdscr.addnstr(1, 0, hint, max_x - 1, curses.A_DIM)
except curses.error: except curses.error:
pass pass
return 3 return 3
@ -597,6 +808,8 @@ def curses_single_select(
reserve_bottom=1, reserve_bottom=1,
fallback=lambda: _numbered_single_fallback(title, all_items, cancel_idx), fallback=lambda: _numbered_single_fallback(title, all_items, cancel_idx),
cancel_value=None, cancel_value=None,
searchable=searchable,
search_labels=list(all_items) if searchable else None,
) )

View File

@ -4575,6 +4575,7 @@ def _model_flow_named_custom(config, provider_info):
menu_items, menu_items,
selected=default_idx, selected=default_idx,
cancel_returns=-1, cancel_returns=-1,
searchable=True,
) )
print() print()
if idx < 0 or idx >= len(models): if idx < 0 or idx >= len(models):

View File

@ -0,0 +1,127 @@
"""Tests for the ranked fuzzy scorer used by the searchable curses pickers."""
from hermes_cli.curses_ui import (
_SearchState,
_filter_indices,
_fuzzy_score,
_handle_active_search_key,
_is_boundary,
_token_score,
)
class _FakeCurses:
KEY_BACKSPACE = 263
KEY_DOWN = 258
KEY_ENTER = 343
def test_fuzzy_score_matches_subsequence():
assert _fuzzy_score("gpt-4o", "g4o") is not None
assert _fuzzy_score("gpt-4o", "4o") is not None
assert _fuzzy_score("gpt-4o", "o4g") is None
assert _fuzzy_score("gpt-4o", "xyz") is None
def test_scorer_matches_typescript_reference():
"""Score parity with ui-tui/web fuzzy.ts. These exact values are produced
by the TS fuzzyScoreMulti for the same inputs (verified via a cross-language
harness); keep the Python port byte-identical so all three surfaces rank
consistently. If you change the scoring constants, update the TS copies too.
"""
cases = {
("gpt-4o", "g4o"): 15.94,
("gpt-4o", "gpt"): 28.94,
("claude-sonnet-4", "sonnet"): 33.85,
("claude-sonnet-4", "clad snnt"): 30.70,
("GptO", "gpto"): 57.96, # camelCase boundary on the original-case 'O'
}
for (label, query), expected in cases.items():
score = _fuzzy_score(label, query)
assert score is not None
assert round(score, 2) == expected, f"{label!r}/{query!r}: {score} != {expected}"
def test_is_boundary_camelcase_and_separators():
assert _is_boundary("gpt-4o", 0) is True # start
assert _is_boundary("gpt-4o", 4) is True # after '-'
assert _is_boundary("gpt-4o", 2) is False # mid-word
assert _is_boundary("GptO", 3) is True # lower->upper transition
def test_token_score_takes_orig_and_lower():
# Exact match (lower == token) earns the +20 bonus over a prefix.
exact = _token_score("sonnet", "sonnet", "sonnet")
prefix = _token_score("sonnet-x", "sonnet-x", "sonnet")
assert exact is not None and prefix is not None
assert exact > prefix
def test_esc_clears_query_and_signals_changed():
# Esc during active search clears the filter (restores full list) and
# signals `changed` so the driver resets scroll/cursor.
search = _SearchState(active=True, query="gpt")
handled, confirm, changed = _handle_active_search_key(_FakeCurses, 27, search)
assert (handled, confirm, changed) == (True, False, True)
assert search.active is False
assert search.query == ""
# Esc with no query: still stops search, but nothing changed.
search2 = _SearchState(active=True, query="")
assert _handle_active_search_key(_FakeCurses, 27, search2) == (True, False, False)
def test_high_byte_keys_ignored():
# Bytes 128-255 must NOT append Latin-1 mojibake to the query.
search = _SearchState(active=True, query="ab")
handled, _, changed = _handle_active_search_key(_FakeCurses, 200, search)
assert (handled, changed) == (False, False)
assert search.query == "ab"
def test_fuzzy_score_empty_query_is_zero():
assert _fuzzy_score("anything", "") == 0
assert _fuzzy_score("anything", " ") == 0
def test_fuzzy_score_prefix_beats_scattered():
prefix = _fuzzy_score("gpt-4o-mini", "gpt")
scattered = _fuzzy_score("a-g-p-t", "gpt")
assert prefix is not None and scattered is not None
assert prefix > scattered
def test_fuzzy_score_exact_and_shorter_rank_higher():
exact = _fuzzy_score("sonnet", "sonnet")
longer = _fuzzy_score("sonnet-extended", "sonnet")
assert exact is not None and longer is not None
# Same prefix match, but the shorter id wins on the length tiebreak.
assert exact > longer
def test_filter_indices_ranks_best_first():
models = ["gpt-4o", "gpt-4o-mini", "claude-sonnet-4", "claude-haiku", "o1-preview"]
# g4o matches both gpt-4o variants; the shorter exact-ish one ranks first.
ranked = _filter_indices(models, "g4o")
assert [models[i] for i in ranked] == ["gpt-4o", "gpt-4o-mini"]
# son4 surfaces the sonnet model.
assert [models[i] for i in _filter_indices(models, "son4")] == ["claude-sonnet-4"]
# Multi-token AND.
assert [models[i] for i in _filter_indices(models, "clad snnt")] == ["claude-sonnet-4"]
# No match drops everything.
assert _filter_indices(models, "zzz") == []
def test_filter_indices_blank_query_preserves_order():
models = ["b", "a", "c"]
assert _filter_indices(models, "") == [0, 1, 2]
assert _filter_indices(models, " ") == [0, 1, 2]
def test_filter_indices_stable_for_equal_scores():
# Identical labels score identically; original order is the tiebreak.
items = ["ab", "ab", "ab"]
assert _filter_indices(items, "ab") == [0, 1, 2]

View File

@ -13,7 +13,7 @@ def test_prompt_model_selection_uses_curses_radiolist():
seen = {} seen = {}
def _fake(title, items, *, selected=0, cancel_returns=None, description=None): def _fake(title, items, *, selected=0, cancel_returns=None, description=None, searchable=False):
seen["title"] = title seen["title"] = title
seen["items"] = items seen["items"] = items
return 1 # pick second model return 1 # pick second model
@ -67,7 +67,7 @@ def test_model_selection_with_pricing_passes_description():
seen = {} seen = {}
def _fake(title, items, *, selected=0, cancel_returns=None, description=None): def _fake(title, items, *, selected=0, cancel_returns=None, description=None, searchable=False):
seen["description"] = description seen["description"] = description
return len(items) - 1 # Skip return len(items) - 1 # Skip