diff --git a/cron/scheduler.py b/cron/scheduler.py index 4fd1f3059..d0240e9af 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -166,6 +166,12 @@ _parallel_pool_max_workers: Optional[int] = None _running_job_ids: set = set() _running_lock = threading.Lock() +# Sequential (env/context-mutating) cron jobs — workdir/profile jobs that touch +# process-global runtime state — must run one at a time, but must NOT block the +# ticker thread. A persistent single-thread executor preserves ordering across +# ticks while keeping dispatch fire-and-forget, the same as the parallel pool. +_sequential_pool: Optional[concurrent.futures.ThreadPoolExecutor] = None + def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadPoolExecutor: """Return (or create) the persistent parallel pool.""" @@ -181,13 +187,33 @@ def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadP return _parallel_pool +def _get_sequential_pool() -> concurrent.futures.ThreadPoolExecutor: + """Return (or create) the persistent single-thread sequential pool. + + A single worker guarantees env/context-mutating jobs never overlap, even + across ticks: a job queued by a newer tick waits for the previous tick's + sequential jobs to finish rather than corrupting their os.environ / + profile state. + """ + global _sequential_pool + if _sequential_pool is None: + _sequential_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="cron-seq", + ) + return _sequential_pool + + def _shutdown_parallel_pool() -> None: - """Shut down the persistent pool on process exit.""" - global _parallel_pool, _parallel_pool_max_workers + """Shut down the persistent pools on process exit.""" + global _parallel_pool, _parallel_pool_max_workers, _sequential_pool if _parallel_pool is not None: _parallel_pool.shutdown(wait=True, cancel_futures=False) _parallel_pool = None _parallel_pool_max_workers = None + if _sequential_pool is not None: + _sequential_pool.shutdown(wait=True, cancel_futures=False) + _sequential_pool = None atexit.register(_shutdown_parallel_pool) @@ -2072,11 +2098,47 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i ] _results: list = [] + _all_futures: list = [] - # Sequential pass for env/context-mutating jobs. - for job in sequential_jobs: + def _submit_with_guard(job: dict, pool: concurrent.futures.ThreadPoolExecutor): + """Submit a job fire-and-forget with the in-flight dedup guard. + + Returns the future, or None if the job was skipped because a prior + tick's run of the same job is still in flight. The running-set + membership is released in the worker's finally block. + """ + job_id = job["id"] + with _running_lock: + if job_id in _running_job_ids: + logger.info("Job '%s' already running — skipping", job.get("name", job_id)) + return None + _running_job_ids.add(job_id) _ctx = contextvars.copy_context() - _results.append(_ctx.run(_process_job, job)) + + def _run_and_release(j=job, ctx=_ctx): + try: + return ctx.run(_process_job, j) + finally: + with _running_lock: + _running_job_ids.discard(j["id"]) + + return pool.submit(_run_and_release) + + # Sequential pass for env/context-mutating (workdir/profile) jobs. + # Queued to a persistent single-thread pool so they run one at a time + # WITHOUT blocking the ticker thread — a long workdir/profile job no + # longer starves the rest of the schedule (same fix as the parallel + # pass, just serialized). The in-flight guard prevents a still-running + # job from being re-queued on the next tick. + if sequential_jobs: + seq_pool = _get_sequential_pool() + for job in sequential_jobs: + fut = _submit_with_guard(job, seq_pool) + if fut is None: + continue + _all_futures.append(fut) + if not sync: + _results.append(True) # optimistically counted # Parallel pass — persistent pool, non-blocking dispatch. # Jobs that are already running (from a previous tick) are skipped. @@ -2085,46 +2147,54 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i # queue needed. if parallel_jobs: pool = _get_parallel_pool(_max_workers) - _parallel_futures = [] for job in parallel_jobs: - job_id = job["id"] - with _running_lock: - if job_id in _running_job_ids: - logger.info("Job '%s' already running — skipping", job.get("name", job_id)) - continue - _running_job_ids.add(job_id) - _ctx = contextvars.copy_context() - # Fire-and-forget: remove from running set when done. - def _run_and_release(j=job, ctx=_ctx): - try: - return ctx.run(_process_job, j) - finally: - with _running_lock: - _running_job_ids.discard(j["id"]) - fut = pool.submit(_run_and_release) - if sync: - _parallel_futures.append(fut) - else: + fut = _submit_with_guard(job, pool) + if fut is None: + continue + _all_futures.append(fut) + if not sync: _results.append(True) # optimistically counted - # In sync mode (tests), wait for all parallel jobs to finish. - if sync and _parallel_futures: - for f in concurrent.futures.as_completed(_parallel_futures): - try: - _results.append(f.result()) - except Exception as exc: - logger.error("Parallel cron job future failed: %s", exc) - _results.append(False) # Best-effort sweep of MCP stdio subprocesses that survived their - # session teardown during this tick. Runs AFTER every job has - # finished so active sessions (including live user chats) are - # never touched — only PIDs explicitly detected as orphans in - # tools.mcp_tool._run_stdio's finally block are reaped. - try: - from tools.mcp_tool import _kill_orphaned_mcp_children - _kill_orphaned_mcp_children() - except Exception as _e: - logger.debug("Post-tick MCP orphan cleanup failed: %s", _e) + # session teardown. Must run AFTER jobs finish so active sessions + # (including live user chats) are never touched — only PIDs explicitly + # detected as orphans in tools.mcp_tool._run_stdio's finally block are + # reaped. + def _sweep_mcp_orphans() -> None: + try: + from tools.mcp_tool import _kill_orphaned_mcp_children + _kill_orphaned_mcp_children() + except Exception as _e: + logger.debug("Post-tick MCP orphan cleanup failed: %s", _e) + + if sync: + # Sync mode (tests / manual ticks): wait for all dispatched jobs, + # collect results, then sweep once. + for f in concurrent.futures.as_completed(_all_futures): + try: + _results.append(f.result()) + except Exception as exc: + logger.error("Cron job future failed: %s", exc) + _results.append(False) + _sweep_mcp_orphans() + return sum(_results) + + # Async (gateway ticker) mode: don't block. Sweep orphans via a + # done-callback fired after the LAST dispatched job completes, so the + # sweep still happens after jobs finish without stalling the tick. + if _all_futures: + _remaining = [len(_all_futures)] + + def _on_done(_f: concurrent.futures.Future) -> None: + _remaining[0] -= 1 + if _remaining[0] <= 0: + _sweep_mcp_orphans() + + for _f in _all_futures: + _f.add_done_callback(_on_done) + else: + # Nothing dispatched (all skipped / no due jobs) — sweep inline. + _sweep_mcp_orphans() return sum(_results) finally: diff --git a/tests/cron/test_cron_profile.py b/tests/cron/test_cron_profile.py index 7ed28ba38..677082e24 100644 --- a/tests/cron/test_cron_profile.py +++ b/tests/cron/test_cron_profile.py @@ -410,16 +410,20 @@ class TestTickProfilePartition: import threading import cron.scheduler as sched - profile_job = {"id": "a", "name": "A", "profile": "default"} - parallel_job = {"id": "b", "name": "B", "profile": None} + # Two profile jobs (both sequential) + one parallel job. + profile_a = {"id": "a", "name": "A", "profile": "default"} + profile_b = {"id": "b", "name": "B", "profile": "default"} + parallel_job = {"id": "c", "name": "C", "profile": None} - monkeypatch.setattr(sched, "get_due_jobs", lambda: [profile_job, parallel_job]) + monkeypatch.setattr(sched, "get_due_jobs", lambda: [profile_a, profile_b, parallel_job]) monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) calls: list[tuple[str, str]] = [] + order_lock = threading.Lock() def fake_run_job(job): - calls.append((job["id"], threading.current_thread().name)) + with order_lock: + calls.append((job["id"], threading.current_thread().name)) return True, "output", "response", None monkeypatch.setattr(sched, "run_job", fake_run_job) @@ -429,9 +433,17 @@ class TestTickProfilePartition: n = sched.tick(verbose=False) - assert n == 2 + assert n == 3 ids = [job_id for job_id, _thread_name in calls] + # Sequential profile jobs preserve submission order relative to each + # other (single-thread pool). assert ids.index("a") < ids.index("b") - main_thread_name = threading.current_thread().name - profile_thread_name = next(thread for job_id, thread in calls if job_id == "a") - assert profile_thread_name == main_thread_name + # Sequential (profile) jobs run on the persistent single-thread + # cron-seq pool — NOT the main thread — so a long profile job never + # blocks the ticker. Parallel jobs run on the cron-parallel pool. + for jid in ("a", "b"): + seq_thread = next(t for job_id, t in calls if job_id == jid) + assert seq_thread != threading.current_thread().name + assert seq_thread.startswith("cron-seq"), seq_thread + par_thread = next(t for job_id, t in calls if job_id == "c") + assert par_thread.startswith("cron-parallel"), par_thread diff --git a/tests/cron/test_cron_workdir.py b/tests/cron/test_cron_workdir.py index 678038cb5..d8efdfb48 100644 --- a/tests/cron/test_cron_workdir.py +++ b/tests/cron/test_cron_workdir.py @@ -207,20 +207,23 @@ class TestTickWorkdirPartition: def test_workdir_jobs_run_sequentially(self, tmp_path, monkeypatch): import cron.scheduler as sched - # Two "jobs" — one with workdir, one without. get_due_jobs returns both. - workdir_job = {"id": "a", "name": "A", "workdir": str(tmp_path)} - parallel_job = {"id": "b", "name": "B", "workdir": None} + # Two workdir jobs (both sequential) + one parallel job. + workdir_a = {"id": "a", "name": "A", "workdir": str(tmp_path)} + workdir_b = {"id": "b", "name": "B", "workdir": str(tmp_path)} + parallel_job = {"id": "c", "name": "C", "workdir": None} - monkeypatch.setattr(sched, "get_due_jobs", lambda: [workdir_job, parallel_job]) + monkeypatch.setattr(sched, "get_due_jobs", lambda: [workdir_a, workdir_b, parallel_job]) monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) # Record call order / thread context. import threading - calls: list[tuple[str, bool]] = [] + calls: list[tuple[str, str]] = [] + order_lock = threading.Lock() def fake_run_job(job): # Return a minimal tuple matching run_job's signature. - calls.append((job["id"], threading.current_thread().name)) + with order_lock: + calls.append((job["id"], threading.current_thread().name)) return True, "output", "response", None monkeypatch.setattr(sched, "run_job", fake_run_job) @@ -231,16 +234,22 @@ class TestTickWorkdirPartition: ) n = sched.tick(verbose=False) - assert n == 2 + assert n == 3 ids = [c[0] for c in calls] - # Workdir jobs always come before parallel jobs. + # Sequential workdir jobs preserve submission order relative to each + # other (single-thread pool). assert ids.index("a") < ids.index("b") - # The workdir job must run on the main thread (sequential pass). + # Workdir jobs run on the persistent single-thread cron-seq pool — + # NOT the main thread — so a long workdir job never blocks the ticker. main_thread_name = threading.current_thread().name - workdir_thread_name = next(t for jid, t in calls if jid == "a") - assert workdir_thread_name == main_thread_name + for jid in ("a", "b"): + workdir_thread_name = next(t for j, t in calls if j == jid) + assert workdir_thread_name != main_thread_name + assert workdir_thread_name.startswith("cron-seq"), workdir_thread_name + par_thread_name = next(t for j, t in calls if j == "c") + assert par_thread_name.startswith("cron-parallel"), par_thread_name # --------------------------------------------------------------------------- diff --git a/tests/cron/test_parallel_pool.py b/tests/cron/test_parallel_pool.py index 65d5d43c9..146853c4a 100644 --- a/tests/cron/test_parallel_pool.py +++ b/tests/cron/test_parallel_pool.py @@ -169,3 +169,106 @@ class TestSyncMode: barrier.wait() time.sleep(0.1) sched._shutdown_parallel_pool() + + +class TestSequentialPool: + """Sequential (workdir/profile) jobs use the persistent cron-seq pool. + + Verifies the follow-up fix: env/context-mutating jobs no longer run inline + in the ticker thread, so a long workdir/profile job can't starve the + schedule the same way the parallel path used to. + """ + + def test_sequential_job_does_not_block_ticker(self, tmp_path, monkeypatch): + """sync=False returns immediately even when a workdir job is slow.""" + import cron.scheduler as sched + + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + sched._sequential_pool = None + sched._running_job_ids.clear() + + job = { + "id": "slow-workdir", + "name": "slow-workdir", + "prompt": "test", + "schedule": "every 5m", + "enabled": True, + "next_run_at": "2020-01-01T00:00:00", + "deliver": "local", + "workdir": str(tmp_path), # makes it sequential + } + + barrier = threading.Barrier(2, timeout=5) + + def slow_run(j): + barrier.wait() + return True, "out", "resp", None + + monkeypatch.setattr(sched, "get_due_jobs", lambda: [job]) + monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "run_job", slow_run) + monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: "/tmp/out") + monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None) + + start = time.monotonic() + n = sched.tick(verbose=False, sync=False) + elapsed = time.monotonic() - start + + assert n == 1 # optimistic count + assert elapsed < 1.0 # did NOT block on the slow workdir job + + barrier.wait() + time.sleep(0.1) + sched._shutdown_parallel_pool() + + def test_sequential_running_guard_prevents_double_dispatch(self, tmp_path, monkeypatch): + """A workdir job already in _running_job_ids is skipped on next tick.""" + import cron.scheduler as sched + + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + sched._sequential_pool = None + sched._running_job_ids.clear() + + job = { + "id": "guard-seq", + "name": "guard-seq", + "prompt": "test", + "schedule": "every 5m", + "enabled": True, + "next_run_at": "2020-01-01T00:00:00", + "deliver": "local", + "workdir": str(tmp_path), + } + + # Simulate the job already running. + sched._running_job_ids.add("guard-seq") + + dispatched = [] + monkeypatch.setattr(sched, "get_due_jobs", lambda: [job]) + monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "run_job", lambda j: dispatched.append(j["id"]) or (True, "out", "resp", None)) + monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None) + + n = sched.tick(verbose=False) + assert n == 0 # skipped, not dispatched + assert dispatched == [] + + sched._running_job_ids.discard("guard-seq") + sched._shutdown_parallel_pool() + + def test_get_sequential_pool_is_persistent(self): + """_get_sequential_pool returns the same single-thread pool.""" + import cron.scheduler as sched + + sched._sequential_pool = None + pool1 = sched._get_sequential_pool() + pool2 = sched._get_sequential_pool() + assert pool1 is pool2 + + sched._shutdown_parallel_pool() + assert sched._sequential_pool is None