diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 1d31dd3c9..daa3a9145 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -399,6 +399,7 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch): target = "20260409_010101_abc123" created_sids: list[str] = [] + closed_sids: list[str] = [] first_agent_started = threading.Event() agent_can_finish = threading.Event() @@ -422,11 +423,20 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch): def close(self): pass + class _Agent: + def __init__(self, sid, session_id): + self.sid = sid + self.model = "test/model" + self.session_id = session_id + + def close(self): + closed_sids.append(self.sid) + def make_agent(sid, key, session_id=None): created_sids.append(sid) first_agent_started.set() assert agent_can_finish.wait(timeout=1) - return types.SimpleNamespace(model="test/model", session_id=session_id or key) + return _Agent(sid, session_id or key) monkeypatch.setattr(server, "_get_db", lambda: _DB()) monkeypatch.setattr(server, "_make_agent", make_agent) @@ -490,10 +500,18 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch): assert "error" not in first assert "error" not in second + # Both resumes resolve to the SAME single live session — the core invariant. assert second["result"]["session_id"] == first["result"]["session_id"] assert len(server._sessions) == 1 assert [s.get("session_key") for s in server._sessions.values()].count(target) == 1 - assert created_sids == [first["result"]["session_id"]] + winner = first["result"]["session_id"] + # The agent build happens outside the resume lock, so a racing resume may + # build a redundant agent; double-checked locking keeps only one live + # session and closes any loser's agent (no worker/poller is wired for it). + assert winner in created_sids + survivors = [sid for sid in created_sids if sid not in closed_sids] + assert survivors == [winner] + assert all(sid == winner for sid in server._sessions) def test_session_resume_live_payload_uses_current_history_with_ancestors(server, monkeypatch): diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 113e29a1a..5ac7ccf5d 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -2994,6 +2994,7 @@ def _(rid, params: dict) -> dict: target = found["id"] else: return _err(rid, 4007, "session not found") + # Fast path: if the session is already live, reuse it under the lock. with _session_resume_lock: live = _find_live_session_by_key(target) if live is not None: @@ -3008,44 +3009,73 @@ def _(rid, params: dict) -> dict: payload["resumed"] = target return _ok(rid, payload) - sid = uuid.uuid4().hex[:8] - _enable_gateway_prompts() + # Build the agent OUTSIDE the lock — _make_agent can block for seconds + # (MCP discovery, prompt/skill build, AIAgent construction). Holding + # _session_resume_lock across it would stall session.close on the main + # dispatch thread (it's not a _LONG_HANDLER), blocking fast-path RPCs. + sid = uuid.uuid4().hex[:8] + _enable_gateway_prompts() + try: + db.reopen_session(target) + history = db.get_messages_as_conversation(target) + display_history = db.get_messages_as_conversation( + target, include_ancestors=True + ) + display_history_prefix = display_history[ + : max(0, len(display_history) - len(history)) + ] + messages = _history_to_messages(display_history) + tokens = _set_session_context(target) try: - db.reopen_session(target) - history = db.get_messages_as_conversation(target) - display_history = db.get_messages_as_conversation( - target, include_ancestors=True - ) - display_history_prefix = display_history[ - : max(0, len(display_history) - len(history)) - ] - messages = _history_to_messages(display_history) - tokens = _set_session_context(target) + agent = _make_agent(sid, target, session_id=target) + finally: + _clear_session_context(tokens) + except Exception as e: + return _err(rid, 5000, f"resume failed: {e}") + + # Double-checked locking: another concurrent resume may have created the + # live session while we were building. Re-check under the lock; if it won, + # discard our just-built agent and reuse theirs (no worker/poller wired yet). + with _session_resume_lock: + live = _find_live_session_by_key(target) + if live is not None: try: - agent = _make_agent(sid, target, session_id=target) - finally: - _clear_session_context(tokens) + if hasattr(agent, "close"): + agent.close() + except Exception: + pass + other_sid, other_session = live + payload = _live_session_payload( + other_sid, + other_session, + cols=cols, + touch=True, + transport=current_transport() or _stdio_transport, + ) + payload["resumed"] = target + return _ok(rid, payload) + try: _init_session(sid, target, agent, history, cols=cols) if sid in _sessions: _sessions[sid]["display_history_prefix"] = display_history_prefix except Exception as e: return _err(rid, 5000, f"resume failed: {e}") session = _sessions.get(sid) or {} - return _ok( - rid, - { - "session_id": sid, - "resumed": target, - "message_count": len(messages), - "messages": messages, - "info": _session_info(agent, session), - "inflight": None, - "running": False, - "session_key": target, - "started_at": float(session.get("created_at") or time.time()), - "status": "idle", - }, - ) + return _ok( + rid, + { + "session_id": sid, + "resumed": target, + "message_count": len(messages), + "messages": messages, + "info": _session_info(agent, session), + "inflight": None, + "running": False, + "session_key": target, + "started_at": float(session.get("created_at") or time.time()), + "status": "idle", + }, + ) @method("session.cwd.set")