fix(tui): narrow resume lock to avoid blocking session.close
The salvaged fix held _session_resume_lock across _make_agent (MCP discovery + AIAgent construction, seconds), serializing it against session.close. Since session.close runs on the main RPC dispatch thread (not a _LONG_HANDLER), a close racing a mid-build resume would stall all fast-path RPCs (approval.respond, session.interrupt). Restructure to double-checked locking: build the agent outside the lock, then re-check _find_live_session_by_key under the lock before _init_session. A losing concurrent resume discards its just-built agent (no worker/poller wired yet) and reuses the winner. Updated the concurrent-resume regression test to assert the real invariant (one surviving live session + loser agent closed) rather than the implementation detail of a single _make_agent call.
This commit is contained in:
@ -399,6 +399,7 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch):
|
||||
|
||||
target = "20260409_010101_abc123"
|
||||
created_sids: list[str] = []
|
||||
closed_sids: list[str] = []
|
||||
first_agent_started = threading.Event()
|
||||
agent_can_finish = threading.Event()
|
||||
|
||||
@ -422,11 +423,20 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch):
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
class _Agent:
|
||||
def __init__(self, sid, session_id):
|
||||
self.sid = sid
|
||||
self.model = "test/model"
|
||||
self.session_id = session_id
|
||||
|
||||
def close(self):
|
||||
closed_sids.append(self.sid)
|
||||
|
||||
def make_agent(sid, key, session_id=None):
|
||||
created_sids.append(sid)
|
||||
first_agent_started.set()
|
||||
assert agent_can_finish.wait(timeout=1)
|
||||
return types.SimpleNamespace(model="test/model", session_id=session_id or key)
|
||||
return _Agent(sid, session_id or key)
|
||||
|
||||
monkeypatch.setattr(server, "_get_db", lambda: _DB())
|
||||
monkeypatch.setattr(server, "_make_agent", make_agent)
|
||||
@ -490,10 +500,18 @@ def test_session_resume_reuses_existing_live_session(server, monkeypatch):
|
||||
|
||||
assert "error" not in first
|
||||
assert "error" not in second
|
||||
# Both resumes resolve to the SAME single live session — the core invariant.
|
||||
assert second["result"]["session_id"] == first["result"]["session_id"]
|
||||
assert len(server._sessions) == 1
|
||||
assert [s.get("session_key") for s in server._sessions.values()].count(target) == 1
|
||||
assert created_sids == [first["result"]["session_id"]]
|
||||
winner = first["result"]["session_id"]
|
||||
# The agent build happens outside the resume lock, so a racing resume may
|
||||
# build a redundant agent; double-checked locking keeps only one live
|
||||
# session and closes any loser's agent (no worker/poller is wired for it).
|
||||
assert winner in created_sids
|
||||
survivors = [sid for sid in created_sids if sid not in closed_sids]
|
||||
assert survivors == [winner]
|
||||
assert all(sid == winner for sid in server._sessions)
|
||||
|
||||
|
||||
def test_session_resume_live_payload_uses_current_history_with_ancestors(server, monkeypatch):
|
||||
|
||||
@ -2994,6 +2994,7 @@ def _(rid, params: dict) -> dict:
|
||||
target = found["id"]
|
||||
else:
|
||||
return _err(rid, 4007, "session not found")
|
||||
# Fast path: if the session is already live, reuse it under the lock.
|
||||
with _session_resume_lock:
|
||||
live = _find_live_session_by_key(target)
|
||||
if live is not None:
|
||||
@ -3008,44 +3009,73 @@ def _(rid, params: dict) -> dict:
|
||||
payload["resumed"] = target
|
||||
return _ok(rid, payload)
|
||||
|
||||
sid = uuid.uuid4().hex[:8]
|
||||
_enable_gateway_prompts()
|
||||
# Build the agent OUTSIDE the lock — _make_agent can block for seconds
|
||||
# (MCP discovery, prompt/skill build, AIAgent construction). Holding
|
||||
# _session_resume_lock across it would stall session.close on the main
|
||||
# dispatch thread (it's not a _LONG_HANDLER), blocking fast-path RPCs.
|
||||
sid = uuid.uuid4().hex[:8]
|
||||
_enable_gateway_prompts()
|
||||
try:
|
||||
db.reopen_session(target)
|
||||
history = db.get_messages_as_conversation(target)
|
||||
display_history = db.get_messages_as_conversation(
|
||||
target, include_ancestors=True
|
||||
)
|
||||
display_history_prefix = display_history[
|
||||
: max(0, len(display_history) - len(history))
|
||||
]
|
||||
messages = _history_to_messages(display_history)
|
||||
tokens = _set_session_context(target)
|
||||
try:
|
||||
db.reopen_session(target)
|
||||
history = db.get_messages_as_conversation(target)
|
||||
display_history = db.get_messages_as_conversation(
|
||||
target, include_ancestors=True
|
||||
)
|
||||
display_history_prefix = display_history[
|
||||
: max(0, len(display_history) - len(history))
|
||||
]
|
||||
messages = _history_to_messages(display_history)
|
||||
tokens = _set_session_context(target)
|
||||
agent = _make_agent(sid, target, session_id=target)
|
||||
finally:
|
||||
_clear_session_context(tokens)
|
||||
except Exception as e:
|
||||
return _err(rid, 5000, f"resume failed: {e}")
|
||||
|
||||
# Double-checked locking: another concurrent resume may have created the
|
||||
# live session while we were building. Re-check under the lock; if it won,
|
||||
# discard our just-built agent and reuse theirs (no worker/poller wired yet).
|
||||
with _session_resume_lock:
|
||||
live = _find_live_session_by_key(target)
|
||||
if live is not None:
|
||||
try:
|
||||
agent = _make_agent(sid, target, session_id=target)
|
||||
finally:
|
||||
_clear_session_context(tokens)
|
||||
if hasattr(agent, "close"):
|
||||
agent.close()
|
||||
except Exception:
|
||||
pass
|
||||
other_sid, other_session = live
|
||||
payload = _live_session_payload(
|
||||
other_sid,
|
||||
other_session,
|
||||
cols=cols,
|
||||
touch=True,
|
||||
transport=current_transport() or _stdio_transport,
|
||||
)
|
||||
payload["resumed"] = target
|
||||
return _ok(rid, payload)
|
||||
try:
|
||||
_init_session(sid, target, agent, history, cols=cols)
|
||||
if sid in _sessions:
|
||||
_sessions[sid]["display_history_prefix"] = display_history_prefix
|
||||
except Exception as e:
|
||||
return _err(rid, 5000, f"resume failed: {e}")
|
||||
session = _sessions.get(sid) or {}
|
||||
return _ok(
|
||||
rid,
|
||||
{
|
||||
"session_id": sid,
|
||||
"resumed": target,
|
||||
"message_count": len(messages),
|
||||
"messages": messages,
|
||||
"info": _session_info(agent, session),
|
||||
"inflight": None,
|
||||
"running": False,
|
||||
"session_key": target,
|
||||
"started_at": float(session.get("created_at") or time.time()),
|
||||
"status": "idle",
|
||||
},
|
||||
)
|
||||
return _ok(
|
||||
rid,
|
||||
{
|
||||
"session_id": sid,
|
||||
"resumed": target,
|
||||
"message_count": len(messages),
|
||||
"messages": messages,
|
||||
"info": _session_info(agent, session),
|
||||
"inflight": None,
|
||||
"running": False,
|
||||
"session_key": target,
|
||||
"started_at": float(session.get("created_at") or time.time()),
|
||||
"status": "idle",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@method("session.cwd.set")
|
||||
|
||||
Reference in New Issue
Block a user