From e7a7872a874837ca36105ca3e90db464cf7c125a Mon Sep 17 00:00:00 2001 From: flooryyyy <67979730+flooryyyy@users.noreply.github.com> Date: Wed, 27 May 2026 17:48:49 +0100 Subject: [PATCH] fix(tui_gateway): dedup re-queued process notifications flooding TUI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _ notification_poller_loop_ re-emits status.update every cycle when a background process completes while the session is busy. The same completion event gets re-queued and re-emitted to the TUI every few ms, flooding the transcript with duplicate lines. Add _notification_event_dedup_key(evt) that returns a tuple identity for each notification event. Only emit status.update on first sight per identity: - completions: (sid, type) — one-shot per process session - watch_match: (sid, type, command, pattern, output, ...) - watch_overflow/disabled: (sid, type, command, message, ...) The dedup key design was refined from an initial sid:type approach after @lordbuffcloud identified that distinct watch_match events (READY vs DONE) for the same process would be incorrectly collapsed. Tests from @tymrtn cover distinct watch matches, exact replay dedup, and completion one-shot behavior. Co-authored-by: tymrtn --- tests/test_tui_gateway_server.py | 94 ++++++++++++++++++++++++++++++++ tui_gateway/server.py | 47 +++++++++++++++- 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index ef94dc27a..7899a6de4 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -5474,6 +5474,8 @@ def test_notification_poller_requeues_when_busy(monkeypatch): assert requeued["session_id"] == "proc_busy_test" finally: server._sessions.pop("sid_busy", None) + while not process_registry.completion_queue.empty(): + process_registry.completion_queue.get_nowait() def test_session_save_writes_under_hermes_home_with_system_prompt(monkeypatch, tmp_path): @@ -5533,3 +5535,95 @@ def test_session_save_writes_under_hermes_home_with_system_prompt(monkeypatch, t assert payload["session_start"] == "2026-01-01T12:00:00" assert payload["system_prompt"] == "You are Hermes." assert payload["messages"] == history + + +def test_notification_event_dedup_key_preserves_distinct_watch_matches(): + """Watch-match identity includes match content, not just session/type.""" + base = { + "type": "watch_match", + "session_id": "proc_watch", + "command": "tail -f app.log", + "pattern": "READY", + "output": "READY on port 8000", + "suppressed": 0, + } + + identical = dict(base) + distinct_output = {**base, "output": "READY on port 9000"} + distinct_pattern = {**base, "pattern": "MIGRATION_DONE"} + + base_key = server._notification_event_dedup_key(base) + assert server._notification_event_dedup_key(identical) == base_key + assert server._notification_event_dedup_key(distinct_output) != base_key + assert server._notification_event_dedup_key(distinct_pattern) != base_key + + +def test_notification_poller_emits_distinct_watch_matches_once(monkeypatch): + """Distinct watch matches from one process emit; exact replay is deduped.""" + from tools.process_registry import process_registry + + turns = [] + emitted = [] + + def _fake_run_prompt_submit(rid, sid, session, text): + turns.append(text) + with session["history_lock"]: + session["running"] = False + + sess = _session() + server._sessions["sid_watch_dedup"] = sess + monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a)) + monkeypatch.setattr(server, "_run_prompt_submit", _fake_run_prompt_submit) + + while not process_registry.completion_queue.empty(): + process_registry.completion_queue.get_nowait() + + base = { + "type": "watch_match", + "session_id": "proc_watch_dedup", + "command": "tail -f app.log", + "pattern": "READY", + "output": "READY on port 8000", + "suppressed": 0, + } + process_registry.completion_queue.put(base) + process_registry.completion_queue.put({**base, "output": "READY on port 9000"}) + process_registry.completion_queue.put(dict(base)) + + stop = threading.Event() + stop.set() + + try: + server._notification_poller_loop(stop, "sid_watch_dedup", sess) + status_calls = [a for a in emitted if a[0] == "status.update"] + assert len(status_calls) == 2 + status_text = "\n".join(call[2]["text"] for call in status_calls) + assert "READY on port 8000" in status_text + assert "READY on port 9000" in status_text + assert len(turns) == 3 + finally: + server._sessions.pop("sid_watch_dedup", None) + while not process_registry.completion_queue.empty(): + process_registry.completion_queue.get_nowait() + + +def test_notification_event_dedup_key_keeps_completions_one_shot(): + """Completion identity remains process-session scoped to avoid floods.""" + first = { + "type": "completion", + "session_id": "proc_done", + "command": "make build", + "exit_code": 0, + "output": "first output", + } + replay = { + "type": "completion", + "session_id": "proc_done", + "command": "make build --again", + "exit_code": 1, + "output": "different output should not change completion key", + } + + assert server._notification_event_dedup_key(first) == server._notification_event_dedup_key( + replay + ) diff --git a/tui_gateway/server.py b/tui_gateway/server.py index ace784135..61822b6da 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -4109,6 +4109,38 @@ def _notification_event_belongs_elsewhere(session: dict, evt: dict) -> bool: ) +def _notification_event_dedup_key(evt: dict) -> tuple: + """Return the UI-emission identity for a process notification event. + + Completion events are terminal notifications for a background process, so + they remain one-shot per process session. Watch-match events are not + terminal: a single background process can legitimately match the same or + different patterns many times, so include event-specific content to avoid + suppressing later distinct matches from the same process. + """ + evt_type = evt.get("type", "completion") + evt_sid = evt.get("session_id", "") + if evt_type == "watch_match": + return ( + evt_sid, + evt_type, + evt.get("command", ""), + evt.get("pattern", ""), + evt.get("output", ""), + evt.get("suppressed", 0), + evt.get("message_id", ""), + ) + if evt_type.startswith("watch_overflow_") or evt_type == "watch_disabled": + return ( + evt_sid, + evt_type, + evt.get("command", ""), + evt.get("message", ""), + evt.get("suppressed", 0), + ) + return (evt_sid, evt_type) + + def _notification_poller_loop( stop_event: threading.Event, sid: str, session: dict ) -> None: @@ -4125,6 +4157,7 @@ def _notification_poller_loop( """ from tools.process_registry import process_registry, format_process_notification + _emitted = set() # dedup re-queued events so same completion isn't emitted 50 times while session is busy while not stop_event.is_set() and not session.get("_finalized"): try: evt = process_registry.completion_queue.get(timeout=0.5) @@ -4149,7 +4182,14 @@ def _notification_poller_loop( if not text: continue - _emit("status.update", sid, {"kind": "process", "text": text}) + # Only emit the same notification identity to TUI once — re-queued + # completions get re-emitted every 0.5s otherwise when session is busy, + # while distinct watch_match events from the same process must remain + # visible independently. + _dedup_key = _notification_event_dedup_key(evt) + if _dedup_key not in _emitted: + _emit("status.update", sid, {"kind": "process", "text": text}) + _emitted.add(_dedup_key) with session["history_lock"]: if session.get("running"): @@ -4189,7 +4229,10 @@ def _notification_poller_loop( if not text: continue - _emit("status.update", sid, {"kind": "process", "text": text}) + _dedup_key = _notification_event_dedup_key(evt) + if _dedup_key not in _emitted: + _emit("status.update", sid, {"kind": "process", "text": text}) + _emitted.add(_dedup_key) with session["history_lock"]: if session.get("running"):