fix(cron): make sequential jobs non-blocking too + sweep MCP after jobs finish
Follow-up on the parallel-dispatch decoupling: the sequential pass for workdir/profile jobs still ran inline in the ticker thread, so a long workdir/profile job reintroduced the exact starvation #37312 describes, just for env-mutating jobs. And the MCP orphan sweep ran immediately after dispatch in sync=False mode — before jobs finished — defeating its own 'runs after every job' contract and racing jobs still spawning MCP children. - Sequential jobs now queue to a persistent single-thread cron-seq pool (preserves one-at-a-time ordering across ticks, never blocks the tick). - Same in-flight dedup guard now covers sequential jobs. - MCP orphan sweep runs via a done-callback after the LAST dispatched job completes in async mode; inline after as_completed in sync mode. Verified E2E: tick(sync=False) returns in ~1ms with a 1.5s sequential job in flight; sweep fires only after that job ends.
This commit is contained in:
@ -166,6 +166,12 @@ _parallel_pool_max_workers: Optional[int] = None
|
||||
_running_job_ids: set = set()
|
||||
_running_lock = threading.Lock()
|
||||
|
||||
# Sequential (env/context-mutating) cron jobs — workdir/profile jobs that touch
|
||||
# process-global runtime state — must run one at a time, but must NOT block the
|
||||
# ticker thread. A persistent single-thread executor preserves ordering across
|
||||
# ticks while keeping dispatch fire-and-forget, the same as the parallel pool.
|
||||
_sequential_pool: Optional[concurrent.futures.ThreadPoolExecutor] = None
|
||||
|
||||
|
||||
def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadPoolExecutor:
|
||||
"""Return (or create) the persistent parallel pool."""
|
||||
@ -181,13 +187,33 @@ def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadP
|
||||
return _parallel_pool
|
||||
|
||||
|
||||
def _get_sequential_pool() -> concurrent.futures.ThreadPoolExecutor:
|
||||
"""Return (or create) the persistent single-thread sequential pool.
|
||||
|
||||
A single worker guarantees env/context-mutating jobs never overlap, even
|
||||
across ticks: a job queued by a newer tick waits for the previous tick's
|
||||
sequential jobs to finish rather than corrupting their os.environ /
|
||||
profile state.
|
||||
"""
|
||||
global _sequential_pool
|
||||
if _sequential_pool is None:
|
||||
_sequential_pool = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=1,
|
||||
thread_name_prefix="cron-seq",
|
||||
)
|
||||
return _sequential_pool
|
||||
|
||||
|
||||
def _shutdown_parallel_pool() -> None:
|
||||
"""Shut down the persistent pool on process exit."""
|
||||
global _parallel_pool, _parallel_pool_max_workers
|
||||
"""Shut down the persistent pools on process exit."""
|
||||
global _parallel_pool, _parallel_pool_max_workers, _sequential_pool
|
||||
if _parallel_pool is not None:
|
||||
_parallel_pool.shutdown(wait=True, cancel_futures=False)
|
||||
_parallel_pool = None
|
||||
_parallel_pool_max_workers = None
|
||||
if _sequential_pool is not None:
|
||||
_sequential_pool.shutdown(wait=True, cancel_futures=False)
|
||||
_sequential_pool = None
|
||||
|
||||
|
||||
atexit.register(_shutdown_parallel_pool)
|
||||
@ -2072,11 +2098,47 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i
|
||||
]
|
||||
|
||||
_results: list = []
|
||||
_all_futures: list = []
|
||||
|
||||
# Sequential pass for env/context-mutating jobs.
|
||||
for job in sequential_jobs:
|
||||
def _submit_with_guard(job: dict, pool: concurrent.futures.ThreadPoolExecutor):
|
||||
"""Submit a job fire-and-forget with the in-flight dedup guard.
|
||||
|
||||
Returns the future, or None if the job was skipped because a prior
|
||||
tick's run of the same job is still in flight. The running-set
|
||||
membership is released in the worker's finally block.
|
||||
"""
|
||||
job_id = job["id"]
|
||||
with _running_lock:
|
||||
if job_id in _running_job_ids:
|
||||
logger.info("Job '%s' already running — skipping", job.get("name", job_id))
|
||||
return None
|
||||
_running_job_ids.add(job_id)
|
||||
_ctx = contextvars.copy_context()
|
||||
_results.append(_ctx.run(_process_job, job))
|
||||
|
||||
def _run_and_release(j=job, ctx=_ctx):
|
||||
try:
|
||||
return ctx.run(_process_job, j)
|
||||
finally:
|
||||
with _running_lock:
|
||||
_running_job_ids.discard(j["id"])
|
||||
|
||||
return pool.submit(_run_and_release)
|
||||
|
||||
# Sequential pass for env/context-mutating (workdir/profile) jobs.
|
||||
# Queued to a persistent single-thread pool so they run one at a time
|
||||
# WITHOUT blocking the ticker thread — a long workdir/profile job no
|
||||
# longer starves the rest of the schedule (same fix as the parallel
|
||||
# pass, just serialized). The in-flight guard prevents a still-running
|
||||
# job from being re-queued on the next tick.
|
||||
if sequential_jobs:
|
||||
seq_pool = _get_sequential_pool()
|
||||
for job in sequential_jobs:
|
||||
fut = _submit_with_guard(job, seq_pool)
|
||||
if fut is None:
|
||||
continue
|
||||
_all_futures.append(fut)
|
||||
if not sync:
|
||||
_results.append(True) # optimistically counted
|
||||
|
||||
# Parallel pass — persistent pool, non-blocking dispatch.
|
||||
# Jobs that are already running (from a previous tick) are skipped.
|
||||
@ -2085,46 +2147,54 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i
|
||||
# queue needed.
|
||||
if parallel_jobs:
|
||||
pool = _get_parallel_pool(_max_workers)
|
||||
_parallel_futures = []
|
||||
for job in parallel_jobs:
|
||||
job_id = job["id"]
|
||||
with _running_lock:
|
||||
if job_id in _running_job_ids:
|
||||
logger.info("Job '%s' already running — skipping", job.get("name", job_id))
|
||||
continue
|
||||
_running_job_ids.add(job_id)
|
||||
_ctx = contextvars.copy_context()
|
||||
# Fire-and-forget: remove from running set when done.
|
||||
def _run_and_release(j=job, ctx=_ctx):
|
||||
try:
|
||||
return ctx.run(_process_job, j)
|
||||
finally:
|
||||
with _running_lock:
|
||||
_running_job_ids.discard(j["id"])
|
||||
fut = pool.submit(_run_and_release)
|
||||
if sync:
|
||||
_parallel_futures.append(fut)
|
||||
else:
|
||||
fut = _submit_with_guard(job, pool)
|
||||
if fut is None:
|
||||
continue
|
||||
_all_futures.append(fut)
|
||||
if not sync:
|
||||
_results.append(True) # optimistically counted
|
||||
# In sync mode (tests), wait for all parallel jobs to finish.
|
||||
if sync and _parallel_futures:
|
||||
for f in concurrent.futures.as_completed(_parallel_futures):
|
||||
try:
|
||||
_results.append(f.result())
|
||||
except Exception as exc:
|
||||
logger.error("Parallel cron job future failed: %s", exc)
|
||||
_results.append(False)
|
||||
|
||||
# Best-effort sweep of MCP stdio subprocesses that survived their
|
||||
# session teardown during this tick. Runs AFTER every job has
|
||||
# finished so active sessions (including live user chats) are
|
||||
# never touched — only PIDs explicitly detected as orphans in
|
||||
# tools.mcp_tool._run_stdio's finally block are reaped.
|
||||
try:
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children
|
||||
_kill_orphaned_mcp_children()
|
||||
except Exception as _e:
|
||||
logger.debug("Post-tick MCP orphan cleanup failed: %s", _e)
|
||||
# session teardown. Must run AFTER jobs finish so active sessions
|
||||
# (including live user chats) are never touched — only PIDs explicitly
|
||||
# detected as orphans in tools.mcp_tool._run_stdio's finally block are
|
||||
# reaped.
|
||||
def _sweep_mcp_orphans() -> None:
|
||||
try:
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children
|
||||
_kill_orphaned_mcp_children()
|
||||
except Exception as _e:
|
||||
logger.debug("Post-tick MCP orphan cleanup failed: %s", _e)
|
||||
|
||||
if sync:
|
||||
# Sync mode (tests / manual ticks): wait for all dispatched jobs,
|
||||
# collect results, then sweep once.
|
||||
for f in concurrent.futures.as_completed(_all_futures):
|
||||
try:
|
||||
_results.append(f.result())
|
||||
except Exception as exc:
|
||||
logger.error("Cron job future failed: %s", exc)
|
||||
_results.append(False)
|
||||
_sweep_mcp_orphans()
|
||||
return sum(_results)
|
||||
|
||||
# Async (gateway ticker) mode: don't block. Sweep orphans via a
|
||||
# done-callback fired after the LAST dispatched job completes, so the
|
||||
# sweep still happens after jobs finish without stalling the tick.
|
||||
if _all_futures:
|
||||
_remaining = [len(_all_futures)]
|
||||
|
||||
def _on_done(_f: concurrent.futures.Future) -> None:
|
||||
_remaining[0] -= 1
|
||||
if _remaining[0] <= 0:
|
||||
_sweep_mcp_orphans()
|
||||
|
||||
for _f in _all_futures:
|
||||
_f.add_done_callback(_on_done)
|
||||
else:
|
||||
# Nothing dispatched (all skipped / no due jobs) — sweep inline.
|
||||
_sweep_mcp_orphans()
|
||||
|
||||
return sum(_results)
|
||||
finally:
|
||||
|
||||
@ -410,16 +410,20 @@ class TestTickProfilePartition:
|
||||
import threading
|
||||
import cron.scheduler as sched
|
||||
|
||||
profile_job = {"id": "a", "name": "A", "profile": "default"}
|
||||
parallel_job = {"id": "b", "name": "B", "profile": None}
|
||||
# Two profile jobs (both sequential) + one parallel job.
|
||||
profile_a = {"id": "a", "name": "A", "profile": "default"}
|
||||
profile_b = {"id": "b", "name": "B", "profile": "default"}
|
||||
parallel_job = {"id": "c", "name": "C", "profile": None}
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [profile_job, parallel_job])
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [profile_a, profile_b, parallel_job])
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
|
||||
calls: list[tuple[str, str]] = []
|
||||
order_lock = threading.Lock()
|
||||
|
||||
def fake_run_job(job):
|
||||
calls.append((job["id"], threading.current_thread().name))
|
||||
with order_lock:
|
||||
calls.append((job["id"], threading.current_thread().name))
|
||||
return True, "output", "response", None
|
||||
|
||||
monkeypatch.setattr(sched, "run_job", fake_run_job)
|
||||
@ -429,9 +433,17 @@ class TestTickProfilePartition:
|
||||
|
||||
n = sched.tick(verbose=False)
|
||||
|
||||
assert n == 2
|
||||
assert n == 3
|
||||
ids = [job_id for job_id, _thread_name in calls]
|
||||
# Sequential profile jobs preserve submission order relative to each
|
||||
# other (single-thread pool).
|
||||
assert ids.index("a") < ids.index("b")
|
||||
main_thread_name = threading.current_thread().name
|
||||
profile_thread_name = next(thread for job_id, thread in calls if job_id == "a")
|
||||
assert profile_thread_name == main_thread_name
|
||||
# Sequential (profile) jobs run on the persistent single-thread
|
||||
# cron-seq pool — NOT the main thread — so a long profile job never
|
||||
# blocks the ticker. Parallel jobs run on the cron-parallel pool.
|
||||
for jid in ("a", "b"):
|
||||
seq_thread = next(t for job_id, t in calls if job_id == jid)
|
||||
assert seq_thread != threading.current_thread().name
|
||||
assert seq_thread.startswith("cron-seq"), seq_thread
|
||||
par_thread = next(t for job_id, t in calls if job_id == "c")
|
||||
assert par_thread.startswith("cron-parallel"), par_thread
|
||||
|
||||
@ -207,20 +207,23 @@ class TestTickWorkdirPartition:
|
||||
def test_workdir_jobs_run_sequentially(self, tmp_path, monkeypatch):
|
||||
import cron.scheduler as sched
|
||||
|
||||
# Two "jobs" — one with workdir, one without. get_due_jobs returns both.
|
||||
workdir_job = {"id": "a", "name": "A", "workdir": str(tmp_path)}
|
||||
parallel_job = {"id": "b", "name": "B", "workdir": None}
|
||||
# Two workdir jobs (both sequential) + one parallel job.
|
||||
workdir_a = {"id": "a", "name": "A", "workdir": str(tmp_path)}
|
||||
workdir_b = {"id": "b", "name": "B", "workdir": str(tmp_path)}
|
||||
parallel_job = {"id": "c", "name": "C", "workdir": None}
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [workdir_job, parallel_job])
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [workdir_a, workdir_b, parallel_job])
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
|
||||
# Record call order / thread context.
|
||||
import threading
|
||||
calls: list[tuple[str, bool]] = []
|
||||
calls: list[tuple[str, str]] = []
|
||||
order_lock = threading.Lock()
|
||||
|
||||
def fake_run_job(job):
|
||||
# Return a minimal tuple matching run_job's signature.
|
||||
calls.append((job["id"], threading.current_thread().name))
|
||||
with order_lock:
|
||||
calls.append((job["id"], threading.current_thread().name))
|
||||
return True, "output", "response", None
|
||||
|
||||
monkeypatch.setattr(sched, "run_job", fake_run_job)
|
||||
@ -231,16 +234,22 @@ class TestTickWorkdirPartition:
|
||||
)
|
||||
|
||||
n = sched.tick(verbose=False)
|
||||
assert n == 2
|
||||
assert n == 3
|
||||
|
||||
ids = [c[0] for c in calls]
|
||||
# Workdir jobs always come before parallel jobs.
|
||||
# Sequential workdir jobs preserve submission order relative to each
|
||||
# other (single-thread pool).
|
||||
assert ids.index("a") < ids.index("b")
|
||||
|
||||
# The workdir job must run on the main thread (sequential pass).
|
||||
# Workdir jobs run on the persistent single-thread cron-seq pool —
|
||||
# NOT the main thread — so a long workdir job never blocks the ticker.
|
||||
main_thread_name = threading.current_thread().name
|
||||
workdir_thread_name = next(t for jid, t in calls if jid == "a")
|
||||
assert workdir_thread_name == main_thread_name
|
||||
for jid in ("a", "b"):
|
||||
workdir_thread_name = next(t for j, t in calls if j == jid)
|
||||
assert workdir_thread_name != main_thread_name
|
||||
assert workdir_thread_name.startswith("cron-seq"), workdir_thread_name
|
||||
par_thread_name = next(t for j, t in calls if j == "c")
|
||||
assert par_thread_name.startswith("cron-parallel"), par_thread_name
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@ -169,3 +169,106 @@ class TestSyncMode:
|
||||
barrier.wait()
|
||||
time.sleep(0.1)
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
|
||||
class TestSequentialPool:
|
||||
"""Sequential (workdir/profile) jobs use the persistent cron-seq pool.
|
||||
|
||||
Verifies the follow-up fix: env/context-mutating jobs no longer run inline
|
||||
in the ticker thread, so a long workdir/profile job can't starve the
|
||||
schedule the same way the parallel path used to.
|
||||
"""
|
||||
|
||||
def test_sequential_job_does_not_block_ticker(self, tmp_path, monkeypatch):
|
||||
"""sync=False returns immediately even when a workdir job is slow."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
sched._sequential_pool = None
|
||||
sched._running_job_ids.clear()
|
||||
|
||||
job = {
|
||||
"id": "slow-workdir",
|
||||
"name": "slow-workdir",
|
||||
"prompt": "test",
|
||||
"schedule": "every 5m",
|
||||
"enabled": True,
|
||||
"next_run_at": "2020-01-01T00:00:00",
|
||||
"deliver": "local",
|
||||
"workdir": str(tmp_path), # makes it sequential
|
||||
}
|
||||
|
||||
barrier = threading.Barrier(2, timeout=5)
|
||||
|
||||
def slow_run(j):
|
||||
barrier.wait()
|
||||
return True, "out", "resp", None
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [job])
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "run_job", slow_run)
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: "/tmp/out")
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None)
|
||||
|
||||
start = time.monotonic()
|
||||
n = sched.tick(verbose=False, sync=False)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert n == 1 # optimistic count
|
||||
assert elapsed < 1.0 # did NOT block on the slow workdir job
|
||||
|
||||
barrier.wait()
|
||||
time.sleep(0.1)
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
def test_sequential_running_guard_prevents_double_dispatch(self, tmp_path, monkeypatch):
|
||||
"""A workdir job already in _running_job_ids is skipped on next tick."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
sched._sequential_pool = None
|
||||
sched._running_job_ids.clear()
|
||||
|
||||
job = {
|
||||
"id": "guard-seq",
|
||||
"name": "guard-seq",
|
||||
"prompt": "test",
|
||||
"schedule": "every 5m",
|
||||
"enabled": True,
|
||||
"next_run_at": "2020-01-01T00:00:00",
|
||||
"deliver": "local",
|
||||
"workdir": str(tmp_path),
|
||||
}
|
||||
|
||||
# Simulate the job already running.
|
||||
sched._running_job_ids.add("guard-seq")
|
||||
|
||||
dispatched = []
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [job])
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "run_job", lambda j: dispatched.append(j["id"]) or (True, "out", "resp", None))
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None)
|
||||
|
||||
n = sched.tick(verbose=False)
|
||||
assert n == 0 # skipped, not dispatched
|
||||
assert dispatched == []
|
||||
|
||||
sched._running_job_ids.discard("guard-seq")
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
def test_get_sequential_pool_is_persistent(self):
|
||||
"""_get_sequential_pool returns the same single-thread pool."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._sequential_pool = None
|
||||
pool1 = sched._get_sequential_pool()
|
||||
pool2 = sched._get_sequential_pool()
|
||||
assert pool1 is pool2
|
||||
|
||||
sched._shutdown_parallel_pool()
|
||||
assert sched._sequential_pool is None
|
||||
|
||||
Reference in New Issue
Block a user