From eb9cde734642cc2a8ecd142ba7c63d3964ca835b Mon Sep 17 00:00:00 2001 From: Vynxe Vainglory Date: Tue, 2 Jun 2026 05:12:13 -0400 Subject: [PATCH] fix(cron): decouple job dispatch from completion in tick() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #13021 fixed serial starvation by adding ThreadPoolExecutor to tick(), but kept as_completed(timeout=600) which still blocks the ticker thread until the slowest job finishes. This causes the same starvation pattern: when one job runs long (15+ min), other jobs' next_run_at expires past the grace window and they get perpetually fast-forwarded instead of running. This PR decouples dispatch from completion: - Persistent ThreadPoolExecutor (reused across ticks, no auto-join) - Fire-and-forget dispatch: tick submits and returns immediately - Running-job guard: prevents re-dispatching active jobs - sync parameter: defaults to True (backward compatible), callers opt into sync=False for non-blocking behavior - atexit shutdown handler for clean pool teardown - gateway/run.py: production ticker opts into sync=False Refs #33315 (complementary — that issue's PRs fix grace handling in jobs.py; this PR prevents the grace from expiring in the first place) --- cron/scheduler.py | 81 +++++++++++++-- gateway/run.py | 2 +- tests/cron/test_parallel_pool.py | 171 +++++++++++++++++++++++++++++++ 3 files changed, 245 insertions(+), 9 deletions(-) create mode 100644 tests/cron/test_parallel_pool.py diff --git a/cron/scheduler.py b/cron/scheduler.py index 401b140d8..4fd1f3059 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -9,6 +9,7 @@ runs at a time if multiple processes overlap. """ import asyncio +import atexit import concurrent.futures import contextvars import json @@ -17,6 +18,7 @@ import os import shutil import subprocess import sys +import threading from contextlib import contextmanager # fcntl is Unix-only; on Windows use msvcrt for file locking @@ -154,6 +156,43 @@ from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_ # locally for audit. SILENT_MARKER = "[SILENT]" +# --------------------------------------------------------------------------- +# Persistent thread pool for parallel cron jobs. +# The tick function submits jobs here and returns immediately so the ticker +# thread is never blocked by long-running jobs (e.g. the fixer running 15+ min). +# --------------------------------------------------------------------------- +_parallel_pool: Optional[concurrent.futures.ThreadPoolExecutor] = None +_parallel_pool_max_workers: Optional[int] = None +_running_job_ids: set = set() +_running_lock = threading.Lock() + + +def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadPoolExecutor: + """Return (or create) the persistent parallel pool.""" + global _parallel_pool, _parallel_pool_max_workers + if _parallel_pool is None or _parallel_pool_max_workers != max_workers: + if _parallel_pool is not None: + _parallel_pool.shutdown(wait=False, cancel_futures=False) + _parallel_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="cron-parallel", + ) + _parallel_pool_max_workers = max_workers + return _parallel_pool + + +def _shutdown_parallel_pool() -> None: + """Shut down the persistent pool on process exit.""" + global _parallel_pool, _parallel_pool_max_workers + if _parallel_pool is not None: + _parallel_pool.shutdown(wait=True, cancel_futures=False) + _parallel_pool = None + _parallel_pool_max_workers = None + + +atexit.register(_shutdown_parallel_pool) + + # Backward-compatible module override used by tests and emergency monkeypatches. _hermes_home: Path | None = None @@ -1895,7 +1934,7 @@ def _run_job_impl(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) -def tick(verbose: bool = True, adapters=None, loop=None) -> int: +def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int: """ Check and run all due jobs. @@ -1939,6 +1978,9 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: # Advance next_run_at for all recurring jobs FIRST, under the file lock, # before any execution begins. This preserves at-most-once semantics. + # For parallel jobs that are already running, advance_next_run keeps + # bumping next_run_at forward so the grace window never expires. + # mark_job_run() overwrites next_run_at on completion. for job in due_jobs: advance_next_run(job["id"]) @@ -2036,14 +2078,37 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: _ctx = contextvars.copy_context() _results.append(_ctx.run(_process_job, job)) - # Parallel pass for the rest — same behaviour as before. + # Parallel pass — persistent pool, non-blocking dispatch. + # Jobs that are already running (from a previous tick) are skipped. + # mark_job_run() updates next_run_at on completion, so the next tick + # after completion finds the job due again naturally. No catch-up + # queue needed. if parallel_jobs: - with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as _tick_pool: - _futures = [] - for job in parallel_jobs: - _ctx = contextvars.copy_context() - _futures.append(_tick_pool.submit(_ctx.run, _process_job, job)) - for f in concurrent.futures.as_completed(_futures, timeout=600): + pool = _get_parallel_pool(_max_workers) + _parallel_futures = [] + for job in parallel_jobs: + job_id = job["id"] + with _running_lock: + if job_id in _running_job_ids: + logger.info("Job '%s' already running — skipping", job.get("name", job_id)) + continue + _running_job_ids.add(job_id) + _ctx = contextvars.copy_context() + # Fire-and-forget: remove from running set when done. + def _run_and_release(j=job, ctx=_ctx): + try: + return ctx.run(_process_job, j) + finally: + with _running_lock: + _running_job_ids.discard(j["id"]) + fut = pool.submit(_run_and_release) + if sync: + _parallel_futures.append(fut) + else: + _results.append(True) # optimistically counted + # In sync mode (tests), wait for all parallel jobs to finish. + if sync and _parallel_futures: + for f in concurrent.futures.as_completed(_parallel_futures): try: _results.append(f.result()) except Exception as exc: diff --git a/gateway/run.py b/gateway/run.py index e437958d4..13c192399 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -19215,7 +19215,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in tick_count = 0 while not stop_event.is_set(): try: - cron_tick(verbose=False, adapters=adapters, loop=loop) + cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False) except Exception as e: logger.debug("Cron tick error: %s", e) diff --git a/tests/cron/test_parallel_pool.py b/tests/cron/test_parallel_pool.py new file mode 100644 index 000000000..65d5d43c9 --- /dev/null +++ b/tests/cron/test_parallel_pool.py @@ -0,0 +1,171 @@ +"""Tests for the persistent parallel pool and running-job guard in cron/scheduler.py. + +These verify the fix for the tick-blocking issue where as_completed(timeout=600) +prevented the ticker thread from firing, causing all other jobs to be fast-forwarded. +""" + +import concurrent.futures +import threading +import time +from unittest.mock import patch + +import pytest + + +class TestPersistentPool: + """_get_parallel_pool returns a persistent ThreadPoolExecutor.""" + + def test_pool_is_reused(self, monkeypatch): + """Same pool instance returned when max_workers doesn't change.""" + import cron.scheduler as sched + + # Reset module state. + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + + pool1 = sched._get_parallel_pool(4) + pool2 = sched._get_parallel_pool(4) + assert pool1 is pool2 + + # Cleanup. + sched._shutdown_parallel_pool() + + def test_pool_is_recreated_on_worker_change(self, monkeypatch): + """New pool when max_workers changes.""" + import cron.scheduler as sched + + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + + pool1 = sched._get_parallel_pool(2) + pool2 = sched._get_parallel_pool(4) + assert pool1 is not pool2 + + sched._shutdown_parallel_pool() + + def test_shutdown_clears_pool(self, monkeypatch): + """_shutdown_parallel_pool resets state.""" + import cron.scheduler as sched + + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + sched._get_parallel_pool(2) + + sched._shutdown_parallel_pool() + assert sched._parallel_pool is None + assert sched._parallel_pool_max_workers is None + + +class TestRunningJobGuard: + """_running_job_ids prevents double-dispatch of active jobs.""" + + def test_running_set_prevents_double_dispatch(self, tmp_path, monkeypatch): + """A job already in _running_job_ids is skipped on the next tick.""" + import cron.scheduler as sched + + # Reset state. + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + sched._running_job_ids.clear() + + job = { + "id": "guard-job", + "name": "guard-test", + "prompt": "test", + "schedule": "every 5m", + "enabled": True, + "next_run_at": "2020-01-01T00:00:00", + "deliver": "local", + } + + # Simulate the job already running. + sched._running_job_ids.add("guard-job") + + dispatched = [] + monkeypatch.setattr(sched, "get_due_jobs", lambda: [job]) + monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "run_job", lambda j: dispatched.append(j["id"]) or (True, "out", "resp", None)) + monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None) + + n = sched.tick(verbose=False) + assert n == 0 # skipped, not dispatched + assert dispatched == [] + + sched._running_job_ids.discard("guard-job") + sched._shutdown_parallel_pool() + + +class TestSyncMode: + """tick() blocks by default (sync=True); tick(sync=False) returns immediately.""" + + def test_sync_true_blocks_and_returns_correct_count(self, tmp_path, monkeypatch): + """sync=True waits for jobs and returns actual results.""" + import cron.scheduler as sched + + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + sched._running_job_ids.clear() + + jobs = [ + {"id": f"job-{i}", "name": f"Job {i}", "prompt": "test", + "schedule": "every 5m", "enabled": True, + "next_run_at": "2020-01-01T00:00:00", "deliver": "local"} + for i in range(3) + ] + + monkeypatch.setattr(sched, "get_due_jobs", lambda: jobs) + monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "run_job", lambda j: (True, "out", "resp", None)) + monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: "/tmp/out") + monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None) + + n = sched.tick(verbose=False) + assert n == 3 + + sched._shutdown_parallel_pool() + + def test_sync_false_returns_immediately(self, tmp_path, monkeypatch): + """sync=False returns before parallel jobs finish (optimistic count).""" + import cron.scheduler as sched + + sched._parallel_pool = None + sched._parallel_pool_max_workers = None + sched._running_job_ids.clear() + + job = { + "id": "slow-job", + "name": "slow", + "prompt": "test", + "schedule": "every 5m", + "enabled": True, + "next_run_at": "2020-01-01T00:00:00", + "deliver": "local", + } + + barrier = threading.Barrier(2, timeout=5) + + def slow_run(j): + barrier.wait() # blocks until test thread also waits + return True, "out", "resp", None + + monkeypatch.setattr(sched, "get_due_jobs", lambda: [job]) + monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "run_job", slow_run) + monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: "/tmp/out") + monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None) + monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None) + + start = time.monotonic() + n = sched.tick(verbose=False, sync=False) # opt-in: non-blocking + elapsed = time.monotonic() - start + + assert n == 1 # optimistic count + assert elapsed < 1.0 # returned immediately, didn't wait for slow_run + + # Let the job finish so cleanup works. + barrier.wait() + time.sleep(0.1) + sched._shutdown_parallel_pool()