feat(kanban): add max_in_progress config to cap concurrent running tasks
Salvages #22981 by @SimbaKingjoe. Adds 'kanban.max_in_progress' config that caps simultaneously running tasks. When the board already has N running, dispatcher skips spawning so slow workers (local LLMs, resource-constrained hosts) don't pile up and time out. Threads through dispatch_once(max_in_progress=) and gateway dispatcher config parsing with validation (warns on invalid/below-1 values).
This commit is contained in:
@ -4776,6 +4776,31 @@ class GatewayRunner:
|
|||||||
if max_spawn is not None:
|
if max_spawn is not None:
|
||||||
logger.info(f"kanban dispatcher: max_spawn={max_spawn}")
|
logger.info(f"kanban dispatcher: max_spawn={max_spawn}")
|
||||||
|
|
||||||
|
# Cap the number of simultaneously running tasks so slow workers
|
||||||
|
# (local LLMs, resource-constrained hosts) don't pile up and time
|
||||||
|
# out. When set, the dispatcher skips spawning when the board
|
||||||
|
# already has this many tasks in 'running' status.
|
||||||
|
raw_max_in_progress = kanban_cfg.get("max_in_progress", None)
|
||||||
|
max_in_progress = None
|
||||||
|
if raw_max_in_progress is not None:
|
||||||
|
try:
|
||||||
|
max_in_progress = int(raw_max_in_progress)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
logger.warning(
|
||||||
|
"kanban dispatcher: invalid kanban.max_in_progress=%r; ignoring",
|
||||||
|
raw_max_in_progress,
|
||||||
|
)
|
||||||
|
max_in_progress = None
|
||||||
|
else:
|
||||||
|
if max_in_progress < 1:
|
||||||
|
logger.warning(
|
||||||
|
"kanban dispatcher: kanban.max_in_progress=%r is below 1; ignoring",
|
||||||
|
raw_max_in_progress,
|
||||||
|
)
|
||||||
|
max_in_progress = None
|
||||||
|
else:
|
||||||
|
logger.info(f"kanban dispatcher: max_in_progress={max_in_progress}")
|
||||||
|
|
||||||
raw_failure_limit = kanban_cfg.get("failure_limit", _kb.DEFAULT_FAILURE_LIMIT)
|
raw_failure_limit = kanban_cfg.get("failure_limit", _kb.DEFAULT_FAILURE_LIMIT)
|
||||||
try:
|
try:
|
||||||
failure_limit = int(raw_failure_limit)
|
failure_limit = int(raw_failure_limit)
|
||||||
@ -4828,6 +4853,7 @@ class GatewayRunner:
|
|||||||
conn,
|
conn,
|
||||||
board=slug,
|
board=slug,
|
||||||
max_spawn=max_spawn,
|
max_spawn=max_spawn,
|
||||||
|
max_in_progress=max_in_progress,
|
||||||
failure_limit=failure_limit,
|
failure_limit=failure_limit,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|||||||
@ -4112,6 +4112,7 @@ def dispatch_once(
|
|||||||
ttl_seconds: Optional[int] = None,
|
ttl_seconds: Optional[int] = None,
|
||||||
dry_run: bool = False,
|
dry_run: bool = False,
|
||||||
max_spawn: Optional[int] = None,
|
max_spawn: Optional[int] = None,
|
||||||
|
max_in_progress: Optional[int] = None,
|
||||||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||||||
board: Optional[str] = None,
|
board: Optional[str] = None,
|
||||||
) -> DispatchResult:
|
) -> DispatchResult:
|
||||||
@ -4209,6 +4210,20 @@ def dispatch_once(
|
|||||||
"WHERE status = 'ready' AND claim_lock IS NULL "
|
"WHERE status = 'ready' AND claim_lock IS NULL "
|
||||||
"ORDER BY priority DESC, created_at ASC"
|
"ORDER BY priority DESC, created_at ASC"
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
# Honour kanban.max_in_progress: if the board already has enough running
|
||||||
|
# tasks, skip spawning this tick so slow workers (local LLMs,
|
||||||
|
# resource-constrained hosts) can finish what they have before more tasks
|
||||||
|
# pile up and time out.
|
||||||
|
if max_in_progress is not None and ready_rows:
|
||||||
|
in_progress = conn.execute(
|
||||||
|
"SELECT COUNT(*) FROM tasks WHERE status = 'running'"
|
||||||
|
).fetchone()[0]
|
||||||
|
if in_progress >= max_in_progress:
|
||||||
|
return result
|
||||||
|
# Only spawn enough to reach the cap, respecting max_spawn too.
|
||||||
|
remaining = max_in_progress - in_progress
|
||||||
|
if max_spawn is None or max_spawn > remaining:
|
||||||
|
max_spawn = remaining
|
||||||
spawned = 0
|
spawned = 0
|
||||||
for row in ready_rows:
|
for row in ready_rows:
|
||||||
if max_spawn is not None and running_count + spawned >= max_spawn:
|
if max_spawn is not None and running_count + spawned >= max_spawn:
|
||||||
|
|||||||
@ -1890,3 +1890,64 @@ def test_create_task_with_explicit_workspace_ignores_board_default(kanban_home):
|
|||||||
assert t is not None
|
assert t is not None
|
||||||
assert t.workspace_path == explicit
|
assert t.workspace_path == explicit
|
||||||
assert t.workspace_path != "/board/default"
|
assert t.workspace_path != "/board/default"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# dispatch_once — max_in_progress
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_max_in_progress_skips_when_at_limit(kanban_home, all_assignees_spawnable):
|
||||||
|
"""When max_in_progress=N and N tasks are already running, spawn nothing."""
|
||||||
|
spawns = []
|
||||||
|
|
||||||
|
def fake_spawn(task, workspace):
|
||||||
|
spawns.append(task.id)
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
# Two running tasks.
|
||||||
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
||||||
|
t2 = kb.create_task(conn, title="b", assignee="bob")
|
||||||
|
kb.claim_task(conn, t1)
|
||||||
|
kb.claim_task(conn, t2)
|
||||||
|
# Two more ready to spawn — but cap is 2 so none should fire.
|
||||||
|
kb.create_task(conn, title="c", assignee="bob")
|
||||||
|
kb.create_task(conn, title="d", assignee="alice")
|
||||||
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=2)
|
||||||
|
|
||||||
|
assert len(spawns) == 0, f"expected 0 spawns, got {len(spawns)}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_max_in_progress_spawns_up_to_cap(kanban_home, all_assignees_spawnable):
|
||||||
|
"""When max_in_progress=3 and only 1 is running, spawn up to 2 more."""
|
||||||
|
spawns = []
|
||||||
|
|
||||||
|
def fake_spawn(task, workspace):
|
||||||
|
spawns.append(task.id)
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
# One running task.
|
||||||
|
t1 = kb.create_task(conn, title="a", assignee="alice")
|
||||||
|
kb.claim_task(conn, t1)
|
||||||
|
# Three ready tasks — only the first 2 should be spawned.
|
||||||
|
kb.create_task(conn, title="b", assignee="bob")
|
||||||
|
kb.create_task(conn, title="c", assignee="bob")
|
||||||
|
kb.create_task(conn, title="d", assignee="bob")
|
||||||
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=3)
|
||||||
|
|
||||||
|
assert len(spawns) == 2, f"expected 2 spawns (cap 3 - 1 running), got {len(spawns)}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_max_in_progress_none_is_unlimited(kanban_home, all_assignees_spawnable):
|
||||||
|
"""Default None means no limit — all ready tasks are spawned."""
|
||||||
|
spawns = []
|
||||||
|
|
||||||
|
def fake_spawn(task, workspace):
|
||||||
|
spawns.append(task.id)
|
||||||
|
|
||||||
|
with kb.connect() as conn:
|
||||||
|
for title in ["a", "b", "c", "d"]:
|
||||||
|
kb.create_task(conn, title=title, assignee="alice")
|
||||||
|
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None)
|
||||||
|
|
||||||
|
assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"
|
||||||
|
|||||||
Reference in New Issue
Block a user