fix(cron): decouple job dispatch from completion in tick()
PR #13021 fixed serial starvation by adding ThreadPoolExecutor to tick(), but kept as_completed(timeout=600) which still blocks the ticker thread until the slowest job finishes. This causes the same starvation pattern: when one job runs long (15+ min), other jobs' next_run_at expires past the grace window and they get perpetually fast-forwarded instead of running. This PR decouples dispatch from completion: - Persistent ThreadPoolExecutor (reused across ticks, no auto-join) - Fire-and-forget dispatch: tick submits and returns immediately - Running-job guard: prevents re-dispatching active jobs - sync parameter: defaults to True (backward compatible), callers opt into sync=False for non-blocking behavior - atexit shutdown handler for clean pool teardown - gateway/run.py: production ticker opts into sync=False Refs #33315 (complementary — that issue's PRs fix grace handling in jobs.py; this PR prevents the grace from expiring in the first place)
This commit is contained in:
@ -9,6 +9,7 @@ runs at a time if multiple processes overlap.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import atexit
|
||||
import concurrent.futures
|
||||
import contextvars
|
||||
import json
|
||||
@ -17,6 +18,7 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
|
||||
# fcntl is Unix-only; on Windows use msvcrt for file locking
|
||||
@ -154,6 +156,43 @@ from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_
|
||||
# locally for audit.
|
||||
SILENT_MARKER = "[SILENT]"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Persistent thread pool for parallel cron jobs.
|
||||
# The tick function submits jobs here and returns immediately so the ticker
|
||||
# thread is never blocked by long-running jobs (e.g. the fixer running 15+ min).
|
||||
# ---------------------------------------------------------------------------
|
||||
_parallel_pool: Optional[concurrent.futures.ThreadPoolExecutor] = None
|
||||
_parallel_pool_max_workers: Optional[int] = None
|
||||
_running_job_ids: set = set()
|
||||
_running_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_parallel_pool(max_workers: Optional[int]) -> concurrent.futures.ThreadPoolExecutor:
|
||||
"""Return (or create) the persistent parallel pool."""
|
||||
global _parallel_pool, _parallel_pool_max_workers
|
||||
if _parallel_pool is None or _parallel_pool_max_workers != max_workers:
|
||||
if _parallel_pool is not None:
|
||||
_parallel_pool.shutdown(wait=False, cancel_futures=False)
|
||||
_parallel_pool = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
thread_name_prefix="cron-parallel",
|
||||
)
|
||||
_parallel_pool_max_workers = max_workers
|
||||
return _parallel_pool
|
||||
|
||||
|
||||
def _shutdown_parallel_pool() -> None:
|
||||
"""Shut down the persistent pool on process exit."""
|
||||
global _parallel_pool, _parallel_pool_max_workers
|
||||
if _parallel_pool is not None:
|
||||
_parallel_pool.shutdown(wait=True, cancel_futures=False)
|
||||
_parallel_pool = None
|
||||
_parallel_pool_max_workers = None
|
||||
|
||||
|
||||
atexit.register(_shutdown_parallel_pool)
|
||||
|
||||
|
||||
# Backward-compatible module override used by tests and emergency monkeypatches.
|
||||
_hermes_home: Path | None = None
|
||||
|
||||
@ -1895,7 +1934,7 @@ def _run_job_impl(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
|
||||
|
||||
|
||||
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int:
|
||||
"""
|
||||
Check and run all due jobs.
|
||||
|
||||
@ -1939,6 +1978,9 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
|
||||
# Advance next_run_at for all recurring jobs FIRST, under the file lock,
|
||||
# before any execution begins. This preserves at-most-once semantics.
|
||||
# For parallel jobs that are already running, advance_next_run keeps
|
||||
# bumping next_run_at forward so the grace window never expires.
|
||||
# mark_job_run() overwrites next_run_at on completion.
|
||||
for job in due_jobs:
|
||||
advance_next_run(job["id"])
|
||||
|
||||
@ -2036,14 +2078,37 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
_ctx = contextvars.copy_context()
|
||||
_results.append(_ctx.run(_process_job, job))
|
||||
|
||||
# Parallel pass for the rest — same behaviour as before.
|
||||
# Parallel pass — persistent pool, non-blocking dispatch.
|
||||
# Jobs that are already running (from a previous tick) are skipped.
|
||||
# mark_job_run() updates next_run_at on completion, so the next tick
|
||||
# after completion finds the job due again naturally. No catch-up
|
||||
# queue needed.
|
||||
if parallel_jobs:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as _tick_pool:
|
||||
_futures = []
|
||||
for job in parallel_jobs:
|
||||
_ctx = contextvars.copy_context()
|
||||
_futures.append(_tick_pool.submit(_ctx.run, _process_job, job))
|
||||
for f in concurrent.futures.as_completed(_futures, timeout=600):
|
||||
pool = _get_parallel_pool(_max_workers)
|
||||
_parallel_futures = []
|
||||
for job in parallel_jobs:
|
||||
job_id = job["id"]
|
||||
with _running_lock:
|
||||
if job_id in _running_job_ids:
|
||||
logger.info("Job '%s' already running — skipping", job.get("name", job_id))
|
||||
continue
|
||||
_running_job_ids.add(job_id)
|
||||
_ctx = contextvars.copy_context()
|
||||
# Fire-and-forget: remove from running set when done.
|
||||
def _run_and_release(j=job, ctx=_ctx):
|
||||
try:
|
||||
return ctx.run(_process_job, j)
|
||||
finally:
|
||||
with _running_lock:
|
||||
_running_job_ids.discard(j["id"])
|
||||
fut = pool.submit(_run_and_release)
|
||||
if sync:
|
||||
_parallel_futures.append(fut)
|
||||
else:
|
||||
_results.append(True) # optimistically counted
|
||||
# In sync mode (tests), wait for all parallel jobs to finish.
|
||||
if sync and _parallel_futures:
|
||||
for f in concurrent.futures.as_completed(_parallel_futures):
|
||||
try:
|
||||
_results.append(f.result())
|
||||
except Exception as exc:
|
||||
|
||||
@ -19215,7 +19215,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
||||
tick_count = 0
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
cron_tick(verbose=False, adapters=adapters, loop=loop)
|
||||
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
|
||||
except Exception as e:
|
||||
logger.debug("Cron tick error: %s", e)
|
||||
|
||||
|
||||
171
tests/cron/test_parallel_pool.py
Normal file
171
tests/cron/test_parallel_pool.py
Normal file
@ -0,0 +1,171 @@
|
||||
"""Tests for the persistent parallel pool and running-job guard in cron/scheduler.py.
|
||||
|
||||
These verify the fix for the tick-blocking issue where as_completed(timeout=600)
|
||||
prevented the ticker thread from firing, causing all other jobs to be fast-forwarded.
|
||||
"""
|
||||
|
||||
import concurrent.futures
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestPersistentPool:
|
||||
"""_get_parallel_pool returns a persistent ThreadPoolExecutor."""
|
||||
|
||||
def test_pool_is_reused(self, monkeypatch):
|
||||
"""Same pool instance returned when max_workers doesn't change."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
# Reset module state.
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
|
||||
pool1 = sched._get_parallel_pool(4)
|
||||
pool2 = sched._get_parallel_pool(4)
|
||||
assert pool1 is pool2
|
||||
|
||||
# Cleanup.
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
def test_pool_is_recreated_on_worker_change(self, monkeypatch):
|
||||
"""New pool when max_workers changes."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
|
||||
pool1 = sched._get_parallel_pool(2)
|
||||
pool2 = sched._get_parallel_pool(4)
|
||||
assert pool1 is not pool2
|
||||
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
def test_shutdown_clears_pool(self, monkeypatch):
|
||||
"""_shutdown_parallel_pool resets state."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
sched._get_parallel_pool(2)
|
||||
|
||||
sched._shutdown_parallel_pool()
|
||||
assert sched._parallel_pool is None
|
||||
assert sched._parallel_pool_max_workers is None
|
||||
|
||||
|
||||
class TestRunningJobGuard:
|
||||
"""_running_job_ids prevents double-dispatch of active jobs."""
|
||||
|
||||
def test_running_set_prevents_double_dispatch(self, tmp_path, monkeypatch):
|
||||
"""A job already in _running_job_ids is skipped on the next tick."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
# Reset state.
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
sched._running_job_ids.clear()
|
||||
|
||||
job = {
|
||||
"id": "guard-job",
|
||||
"name": "guard-test",
|
||||
"prompt": "test",
|
||||
"schedule": "every 5m",
|
||||
"enabled": True,
|
||||
"next_run_at": "2020-01-01T00:00:00",
|
||||
"deliver": "local",
|
||||
}
|
||||
|
||||
# Simulate the job already running.
|
||||
sched._running_job_ids.add("guard-job")
|
||||
|
||||
dispatched = []
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [job])
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "run_job", lambda j: dispatched.append(j["id"]) or (True, "out", "resp", None))
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None)
|
||||
|
||||
n = sched.tick(verbose=False)
|
||||
assert n == 0 # skipped, not dispatched
|
||||
assert dispatched == []
|
||||
|
||||
sched._running_job_ids.discard("guard-job")
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
|
||||
class TestSyncMode:
|
||||
"""tick() blocks by default (sync=True); tick(sync=False) returns immediately."""
|
||||
|
||||
def test_sync_true_blocks_and_returns_correct_count(self, tmp_path, monkeypatch):
|
||||
"""sync=True waits for jobs and returns actual results."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
sched._running_job_ids.clear()
|
||||
|
||||
jobs = [
|
||||
{"id": f"job-{i}", "name": f"Job {i}", "prompt": "test",
|
||||
"schedule": "every 5m", "enabled": True,
|
||||
"next_run_at": "2020-01-01T00:00:00", "deliver": "local"}
|
||||
for i in range(3)
|
||||
]
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: jobs)
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "run_job", lambda j: (True, "out", "resp", None))
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: "/tmp/out")
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None)
|
||||
|
||||
n = sched.tick(verbose=False)
|
||||
assert n == 3
|
||||
|
||||
sched._shutdown_parallel_pool()
|
||||
|
||||
def test_sync_false_returns_immediately(self, tmp_path, monkeypatch):
|
||||
"""sync=False returns before parallel jobs finish (optimistic count)."""
|
||||
import cron.scheduler as sched
|
||||
|
||||
sched._parallel_pool = None
|
||||
sched._parallel_pool_max_workers = None
|
||||
sched._running_job_ids.clear()
|
||||
|
||||
job = {
|
||||
"id": "slow-job",
|
||||
"name": "slow",
|
||||
"prompt": "test",
|
||||
"schedule": "every 5m",
|
||||
"enabled": True,
|
||||
"next_run_at": "2020-01-01T00:00:00",
|
||||
"deliver": "local",
|
||||
}
|
||||
|
||||
barrier = threading.Barrier(2, timeout=5)
|
||||
|
||||
def slow_run(j):
|
||||
barrier.wait() # blocks until test thread also waits
|
||||
return True, "out", "resp", None
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: [job])
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "run_job", slow_run)
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda *_a, **_kw: "/tmp/out")
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda *_a, **_kw: None)
|
||||
monkeypatch.setattr(sched, "_deliver_result", lambda *_a, **_kw: None)
|
||||
|
||||
start = time.monotonic()
|
||||
n = sched.tick(verbose=False, sync=False) # opt-in: non-blocking
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert n == 1 # optimistic count
|
||||
assert elapsed < 1.0 # returned immediately, didn't wait for slow_run
|
||||
|
||||
# Let the job finish so cleanup works.
|
||||
barrier.wait()
|
||||
time.sleep(0.1)
|
||||
sched._shutdown_parallel_pool()
|
||||
Reference in New Issue
Block a user