feat(kanban): add POST /runs/{run_id}/terminate endpoint

Closes the termination-control gap left by PR #28432, which shipped the
read-only sibling endpoints (/workers/active, /runs/{run_id},
/runs/{run_id}/inspect) but no way to stop a misbehaving worker from
the dashboard without dropping to the CLI.

The new endpoint resolves run_id -> task_id and delegates to the
existing kanban_db.reclaim_task() flow, so the SIGTERM->SIGKILL
escalation, run-outcome bookkeeping, and event-log append all match
POST /tasks/{task_id}/reclaim exactly. No new termination semantics
introduced.

Responses:
  200 {ok, run_id, task_id} on success
  404 unknown run_id
  409 run already ended OR task no longer reclaimable

Refs: #23762
This commit is contained in:
Rohit Sharma
2026-05-23 21:19:00 +02:00
committed by Teknium
parent 7d10105918
commit 9d4fda9952
2 changed files with 191 additions and 0 deletions

View File

@ -1310,6 +1310,58 @@ def inspect_run_endpoint(
return {"run_id": run_id, "alive": True, "pid": pid, "error": "access denied"}
class TerminateRunBody(BaseModel):
reason: Optional[str] = None
@router.post("/runs/{run_id}/terminate")
def terminate_run_endpoint(
run_id: int,
payload: TerminateRunBody,
board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"),
):
"""Terminate the worker process backing an in-flight run.
Resolves ``run_id`` to its parent ``task_id`` and routes through
:func:`kanban_db.reclaim_task` so the SIGTERM->SIGKILL flow,
run-outcome bookkeeping, and event-log append all match what the
existing ``POST /tasks/{task_id}/reclaim`` endpoint does.
Responses:
* 200 ``{"ok": true, "run_id": ..., "task_id": ...}`` on success.
* 404 when ``run_id`` is unknown.
* 409 when the run has already ended, or the task is no longer in
a claimable state.
Closes the gap left by PR #28432, which shipped the read-only
sibling endpoints (``/workers/active``, ``/runs/{run_id}``,
``/runs/{run_id}/inspect``) but no termination control surface.
"""
board = _resolve_board(board)
conn = _conn(board=board)
try:
r = kanban_db.get_run(conn, run_id)
if r is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
if r.ended_at is not None:
raise HTTPException(
status_code=409,
detail=f"run {run_id} already ended",
)
ok = kanban_db.reclaim_task(conn, r.task_id, reason=payload.reason)
if not ok:
raise HTTPException(
status_code=409,
detail=(
f"cannot terminate run {run_id}: task {r.task_id} is no "
"longer in a reclaimable state"
),
)
return {"ok": True, "run_id": run_id, "task_id": r.task_id}
finally:
conn.close()
# ---------------------------------------------------------------------------
# Recovery actions — reclaim a running claim, reassign to a new profile
# ---------------------------------------------------------------------------

View File

@ -4,6 +4,7 @@ Covers:
GET /workers/active
GET /runs/{run_id}
GET /runs/{run_id}/inspect
POST /runs/{run_id}/terminate
"""
from __future__ import annotations
@ -299,3 +300,141 @@ def test_inspect_run_live_pid(client, monkeypatch):
assert body["memory_rss_bytes"] == fake_mem.rss
assert body["num_threads"] == 4
assert body["status"] == "sleeping"
# ---------------------------------------------------------------------------
# POST /runs/{run_id}/terminate
# ---------------------------------------------------------------------------
def _setup_running_task_with_run(conn, *, title, assignee, worker_pid):
"""Create a task in 'running' state with a matching open task_runs row.
Mirrors what dispatcher_claim does: stamps tasks.status='running',
tasks.claim_lock, tasks.worker_pid; inserts task_runs row with the
same claim_lock so reclaim_task's preconditions are satisfied.
"""
task_id = kb.create_task(conn, title=title, assignee=assignee)
lock = secrets.token_hex(8)
future = int(time.time()) + 3600
conn.execute(
"UPDATE tasks SET status='running', claim_lock=?, "
"claim_expires=?, worker_pid=? WHERE id=?",
(lock, future, worker_pid, task_id),
)
cur = conn.execute(
"INSERT INTO task_runs "
"(task_id, status, claim_lock, claim_expires, worker_pid, started_at) "
"VALUES (?, 'running', ?, ?, ?, ?)",
(task_id, lock, future, worker_pid, int(time.time())),
)
conn.commit()
return task_id, cur.lastrowid
def test_terminate_run_404_unknown_id(client):
"""POST to unknown run_id returns 404."""
r = client.post(
"/api/plugins/kanban/runs/777777/terminate",
json={"reason": "test"},
)
assert r.status_code == 404
assert "777777" in r.json()["detail"]
def test_terminate_run_409_already_ended(client):
"""POST against a run with ended_at set returns 409."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="ended-terminate", assignee="ivy")
run_id = _insert_run(
conn, task_id, worker_pid=22222, ended_at=int(time.time()) - 30,
)
finally:
conn.close()
r = client.post(
f"/api/plugins/kanban/runs/{run_id}/terminate",
json={"reason": "too late"},
)
assert r.status_code == 409
assert "already ended" in r.json()["detail"]
def test_terminate_run_ok(client, monkeypatch):
"""Happy path: live run is terminated, signal fn invoked, reason recorded."""
conn = kb.connect()
try:
task_id, run_id = _setup_running_task_with_run(
conn, title="kill-me", assignee="jane", worker_pid=33333,
)
finally:
conn.close()
# Capture signal calls so we don't actually SIGTERM a random PID.
sent = []
def _fake_terminate(pid, prev_lock, *, signal_fn=None):
sent.append((pid, prev_lock))
return {"signal": "SIGTERM", "delivered": True}
monkeypatch.setattr(kb, "_terminate_reclaimed_worker", _fake_terminate)
r = client.post(
f"/api/plugins/kanban/runs/{run_id}/terminate",
json={"reason": "operator abort"},
)
assert r.status_code == 200, r.text
body = r.json()
assert body == {"ok": True, "run_id": run_id, "task_id": task_id}
assert sent == [(33333, sent[0][1])]
assert sent[0][1] is not None # claim_lock was non-null
# Task is back to ready, claim cleared.
conn = kb.connect()
try:
row = conn.execute(
"SELECT status, claim_lock, worker_pid FROM tasks WHERE id=?",
(task_id,),
).fetchone()
finally:
conn.close()
assert row["status"] == "ready"
assert row["claim_lock"] is None
assert row["worker_pid"] is None
def test_terminate_run_409_task_not_reclaimable(client, monkeypatch):
"""Open run row whose task is no longer claimable returns 409."""
conn = kb.connect()
try:
task_id = kb.create_task(conn, title="ghost-run", assignee="ken")
# Task left in default 'ready' state with no claim_lock — task_run
# exists but reclaim_task will refuse because status != 'running'
# and claim_lock is NULL.
run_id = _insert_run(conn, task_id, worker_pid=44444)
finally:
conn.close()
# Make sure no signal is ever sent on this code path.
def _boom(*a, **k):
raise AssertionError("_terminate_reclaimed_worker should not be called")
monkeypatch.setattr(kb, "_terminate_reclaimed_worker", _boom)
r = client.post(
f"/api/plugins/kanban/runs/{run_id}/terminate",
json={"reason": "stale"},
)
assert r.status_code == 409
assert "reclaimable" in r.json()["detail"]
def test_terminate_run_accepts_empty_body(client):
"""Empty JSON body (no reason) is still accepted; falls through to 404."""
r = client.post(
"/api/plugins/kanban/runs/666666/terminate",
json={},
)
# 404 because run doesn't exist — what we're asserting here is that
# the endpoint doesn't 422 on a missing 'reason' field.
assert r.status_code == 404