From 9d4fda9952019fa7026232b76296d9277720457a Mon Sep 17 00:00:00 2001 From: Rohit Sharma Date: Sat, 23 May 2026 21:19:00 +0200 Subject: [PATCH] feat(kanban): add POST /runs/{run_id}/terminate endpoint Closes the termination-control gap left by PR #28432, which shipped the read-only sibling endpoints (/workers/active, /runs/{run_id}, /runs/{run_id}/inspect) but no way to stop a misbehaving worker from the dashboard without dropping to the CLI. The new endpoint resolves run_id -> task_id and delegates to the existing kanban_db.reclaim_task() flow, so the SIGTERM->SIGKILL escalation, run-outcome bookkeeping, and event-log append all match POST /tasks/{task_id}/reclaim exactly. No new termination semantics introduced. Responses: 200 {ok, run_id, task_id} on success 404 unknown run_id 409 run already ended OR task no longer reclaimable Refs: #23762 --- plugins/kanban/dashboard/plugin_api.py | 52 +++++++++ tests/plugins/test_kanban_worker_runs.py | 139 +++++++++++++++++++++++ 2 files changed, 191 insertions(+) diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 104f666c3..cae4d8723 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -1310,6 +1310,58 @@ def inspect_run_endpoint( return {"run_id": run_id, "alive": True, "pid": pid, "error": "access denied"} +class TerminateRunBody(BaseModel): + reason: Optional[str] = None + + +@router.post("/runs/{run_id}/terminate") +def terminate_run_endpoint( + run_id: int, + payload: TerminateRunBody, + board: Optional[str] = Query(None, description="Kanban board slug (omit for current)"), +): + """Terminate the worker process backing an in-flight run. + + Resolves ``run_id`` to its parent ``task_id`` and routes through + :func:`kanban_db.reclaim_task` so the SIGTERM->SIGKILL flow, + run-outcome bookkeeping, and event-log append all match what the + existing ``POST /tasks/{task_id}/reclaim`` endpoint does. + + Responses: + * 200 ``{"ok": true, "run_id": ..., "task_id": ...}`` on success. + * 404 when ``run_id`` is unknown. + * 409 when the run has already ended, or the task is no longer in + a claimable state. + + Closes the gap left by PR #28432, which shipped the read-only + sibling endpoints (``/workers/active``, ``/runs/{run_id}``, + ``/runs/{run_id}/inspect``) but no termination control surface. + """ + board = _resolve_board(board) + conn = _conn(board=board) + try: + r = kanban_db.get_run(conn, run_id) + if r is None: + raise HTTPException(status_code=404, detail=f"run {run_id} not found") + if r.ended_at is not None: + raise HTTPException( + status_code=409, + detail=f"run {run_id} already ended", + ) + ok = kanban_db.reclaim_task(conn, r.task_id, reason=payload.reason) + if not ok: + raise HTTPException( + status_code=409, + detail=( + f"cannot terminate run {run_id}: task {r.task_id} is no " + "longer in a reclaimable state" + ), + ) + return {"ok": True, "run_id": run_id, "task_id": r.task_id} + finally: + conn.close() + + # --------------------------------------------------------------------------- # Recovery actions — reclaim a running claim, reassign to a new profile # --------------------------------------------------------------------------- diff --git a/tests/plugins/test_kanban_worker_runs.py b/tests/plugins/test_kanban_worker_runs.py index 3e79abbd2..74758ff4e 100644 --- a/tests/plugins/test_kanban_worker_runs.py +++ b/tests/plugins/test_kanban_worker_runs.py @@ -4,6 +4,7 @@ Covers: GET /workers/active GET /runs/{run_id} GET /runs/{run_id}/inspect + POST /runs/{run_id}/terminate """ from __future__ import annotations @@ -299,3 +300,141 @@ def test_inspect_run_live_pid(client, monkeypatch): assert body["memory_rss_bytes"] == fake_mem.rss assert body["num_threads"] == 4 assert body["status"] == "sleeping" + + +# --------------------------------------------------------------------------- +# POST /runs/{run_id}/terminate +# --------------------------------------------------------------------------- + +def _setup_running_task_with_run(conn, *, title, assignee, worker_pid): + """Create a task in 'running' state with a matching open task_runs row. + + Mirrors what dispatcher_claim does: stamps tasks.status='running', + tasks.claim_lock, tasks.worker_pid; inserts task_runs row with the + same claim_lock so reclaim_task's preconditions are satisfied. + """ + task_id = kb.create_task(conn, title=title, assignee=assignee) + lock = secrets.token_hex(8) + future = int(time.time()) + 3600 + conn.execute( + "UPDATE tasks SET status='running', claim_lock=?, " + "claim_expires=?, worker_pid=? WHERE id=?", + (lock, future, worker_pid, task_id), + ) + cur = conn.execute( + "INSERT INTO task_runs " + "(task_id, status, claim_lock, claim_expires, worker_pid, started_at) " + "VALUES (?, 'running', ?, ?, ?, ?)", + (task_id, lock, future, worker_pid, int(time.time())), + ) + conn.commit() + return task_id, cur.lastrowid + + +def test_terminate_run_404_unknown_id(client): + """POST to unknown run_id returns 404.""" + r = client.post( + "/api/plugins/kanban/runs/777777/terminate", + json={"reason": "test"}, + ) + assert r.status_code == 404 + assert "777777" in r.json()["detail"] + + +def test_terminate_run_409_already_ended(client): + """POST against a run with ended_at set returns 409.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="ended-terminate", assignee="ivy") + run_id = _insert_run( + conn, task_id, worker_pid=22222, ended_at=int(time.time()) - 30, + ) + finally: + conn.close() + + r = client.post( + f"/api/plugins/kanban/runs/{run_id}/terminate", + json={"reason": "too late"}, + ) + assert r.status_code == 409 + assert "already ended" in r.json()["detail"] + + +def test_terminate_run_ok(client, monkeypatch): + """Happy path: live run is terminated, signal fn invoked, reason recorded.""" + conn = kb.connect() + try: + task_id, run_id = _setup_running_task_with_run( + conn, title="kill-me", assignee="jane", worker_pid=33333, + ) + finally: + conn.close() + + # Capture signal calls so we don't actually SIGTERM a random PID. + sent = [] + + def _fake_terminate(pid, prev_lock, *, signal_fn=None): + sent.append((pid, prev_lock)) + return {"signal": "SIGTERM", "delivered": True} + + monkeypatch.setattr(kb, "_terminate_reclaimed_worker", _fake_terminate) + + r = client.post( + f"/api/plugins/kanban/runs/{run_id}/terminate", + json={"reason": "operator abort"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body == {"ok": True, "run_id": run_id, "task_id": task_id} + assert sent == [(33333, sent[0][1])] + assert sent[0][1] is not None # claim_lock was non-null + + # Task is back to ready, claim cleared. + conn = kb.connect() + try: + row = conn.execute( + "SELECT status, claim_lock, worker_pid FROM tasks WHERE id=?", + (task_id,), + ).fetchone() + finally: + conn.close() + assert row["status"] == "ready" + assert row["claim_lock"] is None + assert row["worker_pid"] is None + + +def test_terminate_run_409_task_not_reclaimable(client, monkeypatch): + """Open run row whose task is no longer claimable returns 409.""" + conn = kb.connect() + try: + task_id = kb.create_task(conn, title="ghost-run", assignee="ken") + # Task left in default 'ready' state with no claim_lock — task_run + # exists but reclaim_task will refuse because status != 'running' + # and claim_lock is NULL. + run_id = _insert_run(conn, task_id, worker_pid=44444) + finally: + conn.close() + + # Make sure no signal is ever sent on this code path. + def _boom(*a, **k): + raise AssertionError("_terminate_reclaimed_worker should not be called") + + monkeypatch.setattr(kb, "_terminate_reclaimed_worker", _boom) + + r = client.post( + f"/api/plugins/kanban/runs/{run_id}/terminate", + json={"reason": "stale"}, + ) + assert r.status_code == 409 + assert "reclaimable" in r.json()["detail"] + + +def test_terminate_run_accepts_empty_body(client): + """Empty JSON body (no reason) is still accepted; falls through to 404.""" + r = client.post( + "/api/plugins/kanban/runs/666666/terminate", + json={}, + ) + # 404 because run doesn't exist — what we're asserting here is that + # the endpoint doesn't 422 on a missing 'reason' field. + assert r.status_code == 404