fix(tui_gateway): dedup re-queued process notifications flooding TUI
_ notification_poller_loop_ re-emits status.update every cycle when a background process completes while the session is busy. The same completion event gets re-queued and re-emitted to the TUI every few ms, flooding the transcript with duplicate lines. Add _notification_event_dedup_key(evt) that returns a tuple identity for each notification event. Only emit status.update on first sight per identity: - completions: (sid, type) — one-shot per process session - watch_match: (sid, type, command, pattern, output, ...) - watch_overflow/disabled: (sid, type, command, message, ...) The dedup key design was refined from an initial sid:type approach after @lordbuffcloud identified that distinct watch_match events (READY vs DONE) for the same process would be incorrectly collapsed. Tests from @tymrtn cover distinct watch matches, exact replay dedup, and completion one-shot behavior. Co-authored-by: tymrtn <ty@tmrtn.com>
This commit is contained in:
@ -5474,6 +5474,8 @@ def test_notification_poller_requeues_when_busy(monkeypatch):
|
||||
assert requeued["session_id"] == "proc_busy_test"
|
||||
finally:
|
||||
server._sessions.pop("sid_busy", None)
|
||||
while not process_registry.completion_queue.empty():
|
||||
process_registry.completion_queue.get_nowait()
|
||||
|
||||
|
||||
def test_session_save_writes_under_hermes_home_with_system_prompt(monkeypatch, tmp_path):
|
||||
@ -5533,3 +5535,95 @@ def test_session_save_writes_under_hermes_home_with_system_prompt(monkeypatch, t
|
||||
assert payload["session_start"] == "2026-01-01T12:00:00"
|
||||
assert payload["system_prompt"] == "You are Hermes."
|
||||
assert payload["messages"] == history
|
||||
|
||||
|
||||
def test_notification_event_dedup_key_preserves_distinct_watch_matches():
|
||||
"""Watch-match identity includes match content, not just session/type."""
|
||||
base = {
|
||||
"type": "watch_match",
|
||||
"session_id": "proc_watch",
|
||||
"command": "tail -f app.log",
|
||||
"pattern": "READY",
|
||||
"output": "READY on port 8000",
|
||||
"suppressed": 0,
|
||||
}
|
||||
|
||||
identical = dict(base)
|
||||
distinct_output = {**base, "output": "READY on port 9000"}
|
||||
distinct_pattern = {**base, "pattern": "MIGRATION_DONE"}
|
||||
|
||||
base_key = server._notification_event_dedup_key(base)
|
||||
assert server._notification_event_dedup_key(identical) == base_key
|
||||
assert server._notification_event_dedup_key(distinct_output) != base_key
|
||||
assert server._notification_event_dedup_key(distinct_pattern) != base_key
|
||||
|
||||
|
||||
def test_notification_poller_emits_distinct_watch_matches_once(monkeypatch):
|
||||
"""Distinct watch matches from one process emit; exact replay is deduped."""
|
||||
from tools.process_registry import process_registry
|
||||
|
||||
turns = []
|
||||
emitted = []
|
||||
|
||||
def _fake_run_prompt_submit(rid, sid, session, text):
|
||||
turns.append(text)
|
||||
with session["history_lock"]:
|
||||
session["running"] = False
|
||||
|
||||
sess = _session()
|
||||
server._sessions["sid_watch_dedup"] = sess
|
||||
monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a))
|
||||
monkeypatch.setattr(server, "_run_prompt_submit", _fake_run_prompt_submit)
|
||||
|
||||
while not process_registry.completion_queue.empty():
|
||||
process_registry.completion_queue.get_nowait()
|
||||
|
||||
base = {
|
||||
"type": "watch_match",
|
||||
"session_id": "proc_watch_dedup",
|
||||
"command": "tail -f app.log",
|
||||
"pattern": "READY",
|
||||
"output": "READY on port 8000",
|
||||
"suppressed": 0,
|
||||
}
|
||||
process_registry.completion_queue.put(base)
|
||||
process_registry.completion_queue.put({**base, "output": "READY on port 9000"})
|
||||
process_registry.completion_queue.put(dict(base))
|
||||
|
||||
stop = threading.Event()
|
||||
stop.set()
|
||||
|
||||
try:
|
||||
server._notification_poller_loop(stop, "sid_watch_dedup", sess)
|
||||
status_calls = [a for a in emitted if a[0] == "status.update"]
|
||||
assert len(status_calls) == 2
|
||||
status_text = "\n".join(call[2]["text"] for call in status_calls)
|
||||
assert "READY on port 8000" in status_text
|
||||
assert "READY on port 9000" in status_text
|
||||
assert len(turns) == 3
|
||||
finally:
|
||||
server._sessions.pop("sid_watch_dedup", None)
|
||||
while not process_registry.completion_queue.empty():
|
||||
process_registry.completion_queue.get_nowait()
|
||||
|
||||
|
||||
def test_notification_event_dedup_key_keeps_completions_one_shot():
|
||||
"""Completion identity remains process-session scoped to avoid floods."""
|
||||
first = {
|
||||
"type": "completion",
|
||||
"session_id": "proc_done",
|
||||
"command": "make build",
|
||||
"exit_code": 0,
|
||||
"output": "first output",
|
||||
}
|
||||
replay = {
|
||||
"type": "completion",
|
||||
"session_id": "proc_done",
|
||||
"command": "make build --again",
|
||||
"exit_code": 1,
|
||||
"output": "different output should not change completion key",
|
||||
}
|
||||
|
||||
assert server._notification_event_dedup_key(first) == server._notification_event_dedup_key(
|
||||
replay
|
||||
)
|
||||
|
||||
@ -4109,6 +4109,38 @@ def _notification_event_belongs_elsewhere(session: dict, evt: dict) -> bool:
|
||||
)
|
||||
|
||||
|
||||
def _notification_event_dedup_key(evt: dict) -> tuple:
|
||||
"""Return the UI-emission identity for a process notification event.
|
||||
|
||||
Completion events are terminal notifications for a background process, so
|
||||
they remain one-shot per process session. Watch-match events are not
|
||||
terminal: a single background process can legitimately match the same or
|
||||
different patterns many times, so include event-specific content to avoid
|
||||
suppressing later distinct matches from the same process.
|
||||
"""
|
||||
evt_type = evt.get("type", "completion")
|
||||
evt_sid = evt.get("session_id", "")
|
||||
if evt_type == "watch_match":
|
||||
return (
|
||||
evt_sid,
|
||||
evt_type,
|
||||
evt.get("command", ""),
|
||||
evt.get("pattern", ""),
|
||||
evt.get("output", ""),
|
||||
evt.get("suppressed", 0),
|
||||
evt.get("message_id", ""),
|
||||
)
|
||||
if evt_type.startswith("watch_overflow_") or evt_type == "watch_disabled":
|
||||
return (
|
||||
evt_sid,
|
||||
evt_type,
|
||||
evt.get("command", ""),
|
||||
evt.get("message", ""),
|
||||
evt.get("suppressed", 0),
|
||||
)
|
||||
return (evt_sid, evt_type)
|
||||
|
||||
|
||||
def _notification_poller_loop(
|
||||
stop_event: threading.Event, sid: str, session: dict
|
||||
) -> None:
|
||||
@ -4125,6 +4157,7 @@ def _notification_poller_loop(
|
||||
"""
|
||||
from tools.process_registry import process_registry, format_process_notification
|
||||
|
||||
_emitted = set() # dedup re-queued events so same completion isn't emitted 50 times while session is busy
|
||||
while not stop_event.is_set() and not session.get("_finalized"):
|
||||
try:
|
||||
evt = process_registry.completion_queue.get(timeout=0.5)
|
||||
@ -4149,7 +4182,14 @@ def _notification_poller_loop(
|
||||
if not text:
|
||||
continue
|
||||
|
||||
_emit("status.update", sid, {"kind": "process", "text": text})
|
||||
# Only emit the same notification identity to TUI once — re-queued
|
||||
# completions get re-emitted every 0.5s otherwise when session is busy,
|
||||
# while distinct watch_match events from the same process must remain
|
||||
# visible independently.
|
||||
_dedup_key = _notification_event_dedup_key(evt)
|
||||
if _dedup_key not in _emitted:
|
||||
_emit("status.update", sid, {"kind": "process", "text": text})
|
||||
_emitted.add(_dedup_key)
|
||||
|
||||
with session["history_lock"]:
|
||||
if session.get("running"):
|
||||
@ -4189,7 +4229,10 @@ def _notification_poller_loop(
|
||||
if not text:
|
||||
continue
|
||||
|
||||
_emit("status.update", sid, {"kind": "process", "text": text})
|
||||
_dedup_key = _notification_event_dedup_key(evt)
|
||||
if _dedup_key not in _emitted:
|
||||
_emit("status.update", sid, {"kind": "process", "text": text})
|
||||
_emitted.add(_dedup_key)
|
||||
|
||||
with session["history_lock"]:
|
||||
if session.get("running"):
|
||||
|
||||
Reference in New Issue
Block a user