fix(kanban): isolate board override per concurrent call
This commit is contained in:
@ -15,6 +15,7 @@ Exposes the full Kanban command surface documented in the design spec
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import shlex
|
||||
@ -884,16 +885,7 @@ def kanban_command(args: argparse.Namespace) -> int:
|
||||
# keeps the patch small and inherits the exact same resolution the
|
||||
# dispatcher uses for workers — consistency is a feature here.
|
||||
board_override = getattr(args, "board", None)
|
||||
prev_board_env = os.environ.get("HERMES_KANBAN_BOARD")
|
||||
restore_board_env = False
|
||||
|
||||
def _restore_board_env() -> None:
|
||||
if not restore_board_env:
|
||||
return
|
||||
if prev_board_env is None:
|
||||
os.environ.pop("HERMES_KANBAN_BOARD", None)
|
||||
else:
|
||||
os.environ["HERMES_KANBAN_BOARD"] = prev_board_env
|
||||
board_scope = contextlib.nullcontext()
|
||||
if board_override:
|
||||
try:
|
||||
normed = kb._normalize_board_slug(board_override)
|
||||
@ -912,8 +904,7 @@ def kanban_command(args: argparse.Namespace) -> int:
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
os.environ["HERMES_KANBAN_BOARD"] = normed
|
||||
restore_board_env = True
|
||||
board_scope = kb.scoped_current_board(normed)
|
||||
|
||||
# Auto-initialize the DB before dispatching any subcommand. init_db
|
||||
# is idempotent, so running it every invocation is cheap (one
|
||||
@ -922,66 +913,62 @@ def kanban_command(args: argparse.Namespace) -> int:
|
||||
# HERMES_HOME. Previously only `init` and `daemon` triggered
|
||||
# schema creation; `create` / `list` / every other command would
|
||||
# error out on a fresh install.
|
||||
try:
|
||||
kb.init_db()
|
||||
except Exception as exc:
|
||||
print(f"kanban: could not initialize database: {exc}", file=sys.stderr)
|
||||
_restore_board_env()
|
||||
return 1
|
||||
with board_scope:
|
||||
try:
|
||||
kb.init_db()
|
||||
except Exception as exc:
|
||||
print(f"kanban: could not initialize database: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
handlers = {
|
||||
"init": _cmd_init,
|
||||
"create": _cmd_create,
|
||||
"swarm": _cmd_swarm,
|
||||
"list": _cmd_list,
|
||||
"ls": _cmd_list,
|
||||
"show": _cmd_show,
|
||||
"assign": _cmd_assign,
|
||||
"reclaim": _cmd_reclaim,
|
||||
"reassign": _cmd_reassign,
|
||||
"diagnostics": _cmd_diagnostics,
|
||||
"diag": _cmd_diagnostics,
|
||||
"link": _cmd_link,
|
||||
"unlink": _cmd_unlink,
|
||||
"claim": _cmd_claim,
|
||||
"comment": _cmd_comment,
|
||||
"complete": _cmd_complete,
|
||||
"edit": _cmd_edit,
|
||||
"block": _cmd_block,
|
||||
"schedule": _cmd_schedule,
|
||||
"unblock": _cmd_unblock,
|
||||
"promote": _cmd_promote,
|
||||
"archive": _cmd_archive,
|
||||
"tail": _cmd_tail,
|
||||
"dispatch": _cmd_dispatch,
|
||||
"daemon": _cmd_daemon,
|
||||
"watch": _cmd_watch,
|
||||
"stats": _cmd_stats,
|
||||
"log": _cmd_log,
|
||||
"runs": _cmd_runs,
|
||||
"heartbeat": _cmd_heartbeat,
|
||||
"assignees": _cmd_assignees,
|
||||
"notify-subscribe": _cmd_notify_subscribe,
|
||||
"notify-list": _cmd_notify_list,
|
||||
"notify-unsubscribe": _cmd_notify_unsubscribe,
|
||||
"context": _cmd_context,
|
||||
"specify": _cmd_specify,
|
||||
"decompose": _cmd_decompose,
|
||||
"gc": _cmd_gc,
|
||||
}
|
||||
handler = handlers.get(action)
|
||||
if not handler:
|
||||
print(f"kanban: unknown action {action!r}", file=sys.stderr)
|
||||
_restore_board_env()
|
||||
return 2
|
||||
try:
|
||||
return int(handler(args) or 0)
|
||||
except (ValueError, RuntimeError) as exc:
|
||||
print(f"kanban: {exc}", file=sys.stderr)
|
||||
_restore_board_env()
|
||||
return 1
|
||||
finally:
|
||||
_restore_board_env()
|
||||
handlers = {
|
||||
"init": _cmd_init,
|
||||
"create": _cmd_create,
|
||||
"swarm": _cmd_swarm,
|
||||
"list": _cmd_list,
|
||||
"ls": _cmd_list,
|
||||
"show": _cmd_show,
|
||||
"assign": _cmd_assign,
|
||||
"reclaim": _cmd_reclaim,
|
||||
"reassign": _cmd_reassign,
|
||||
"diagnostics": _cmd_diagnostics,
|
||||
"diag": _cmd_diagnostics,
|
||||
"link": _cmd_link,
|
||||
"unlink": _cmd_unlink,
|
||||
"claim": _cmd_claim,
|
||||
"comment": _cmd_comment,
|
||||
"complete": _cmd_complete,
|
||||
"edit": _cmd_edit,
|
||||
"block": _cmd_block,
|
||||
"schedule": _cmd_schedule,
|
||||
"unblock": _cmd_unblock,
|
||||
"promote": _cmd_promote,
|
||||
"archive": _cmd_archive,
|
||||
"tail": _cmd_tail,
|
||||
"dispatch": _cmd_dispatch,
|
||||
"daemon": _cmd_daemon,
|
||||
"watch": _cmd_watch,
|
||||
"stats": _cmd_stats,
|
||||
"log": _cmd_log,
|
||||
"runs": _cmd_runs,
|
||||
"heartbeat": _cmd_heartbeat,
|
||||
"assignees": _cmd_assignees,
|
||||
"notify-subscribe": _cmd_notify_subscribe,
|
||||
"notify-list": _cmd_notify_list,
|
||||
"notify-unsubscribe": _cmd_notify_unsubscribe,
|
||||
"context": _cmd_context,
|
||||
"specify": _cmd_specify,
|
||||
"decompose": _cmd_decompose,
|
||||
"gc": _cmd_gc,
|
||||
}
|
||||
handler = handlers.get(action)
|
||||
if not handler:
|
||||
print(f"kanban: unknown action {action!r}", file=sys.stderr)
|
||||
return 2
|
||||
try:
|
||||
return int(handler(args) or 0)
|
||||
except (ValueError, RuntimeError) as exc:
|
||||
print(f"kanban: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@ -83,6 +83,7 @@ import sys
|
||||
import threading
|
||||
import logging
|
||||
import time
|
||||
from contextvars import ContextVar, Token
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable, Optional
|
||||
@ -222,6 +223,20 @@ _CTX_MAX_COMMENT_BYTES = 2 * 1024 # 2 KB per comment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_BOARD = "default"
|
||||
_CURRENT_BOARD_OVERRIDE: ContextVar[str | None] = ContextVar(
|
||||
"hermes_kanban_current_board_override",
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def scoped_current_board(slug: str):
|
||||
"""Temporarily pin the active board for the current context only."""
|
||||
token: Token[str | None] = _CURRENT_BOARD_OVERRIDE.set(slug)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_CURRENT_BOARD_OVERRIDE.reset(token)
|
||||
|
||||
# Slug validator: lowercase alphanumerics, digits, hyphens; 1–64 chars.
|
||||
# Strict enough to stop traversal (`..`) and embedded path separators, loose
|
||||
@ -305,6 +320,15 @@ def get_current_board() -> str:
|
||||
with a best-effort warning — the dispatcher must never crash because a
|
||||
user hand-edited a file or removed a board directory.
|
||||
"""
|
||||
scoped = (_CURRENT_BOARD_OVERRIDE.get() or "").strip()
|
||||
if scoped:
|
||||
try:
|
||||
normed = _normalize_board_slug(scoped)
|
||||
if normed and board_exists(normed):
|
||||
return normed
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
env = os.environ.get("HERMES_KANBAN_BOARD", "").strip()
|
||||
if env:
|
||||
try:
|
||||
|
||||
@ -49,6 +49,7 @@ AUTHOR_MAP = {
|
||||
"kyssta-exe@users.noreply.github.com": "kyssta-exe",
|
||||
"copii.list@gmail.com": "stremtec",
|
||||
"solaiagent@gmail.com": "solaitken",
|
||||
"cryptoworlldz@gmail.com": "worlldz",
|
||||
"prostoandrei9@gmail.com": "vladkvlchk",
|
||||
"116314616+ThyFriendlyFox@users.noreply.github.com": "ThyFriendlyFox",
|
||||
"liliangjya@gmail.com": "truenorth-lj",
|
||||
|
||||
@ -5,6 +5,7 @@ from __future__ import annotations
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
@ -263,6 +264,52 @@ def test_run_slash_link_unlink(kanban_home):
|
||||
assert "Unlinked" in kc.run_slash(f"unlink {ta} {tb}")
|
||||
|
||||
|
||||
def test_board_override_is_isolated_per_concurrent_call(kanban_home, monkeypatch):
|
||||
kb.create_board("alpha")
|
||||
kb.create_board("beta")
|
||||
|
||||
parser = argparse.ArgumentParser(prog="hermes", add_help=False)
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
kc.build_parser(sub)
|
||||
|
||||
barrier = threading.Barrier(2)
|
||||
original_init_db = kb.init_db
|
||||
|
||||
def slow_init_db(*args, **kwargs):
|
||||
try:
|
||||
barrier.wait(timeout=5)
|
||||
except threading.BrokenBarrierError:
|
||||
pass
|
||||
return original_init_db(*args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(kb, "init_db", slow_init_db)
|
||||
|
||||
failures: list[str] = []
|
||||
|
||||
def worker(board: str, title: str) -> None:
|
||||
args = parser.parse_args(["kanban", "--board", board, "create", title])
|
||||
rc = kc.kanban_command(args)
|
||||
if rc != 0:
|
||||
failures.append(f"{board}:{rc}")
|
||||
|
||||
t1 = threading.Thread(target=worker, args=("alpha", "alpha-task"))
|
||||
t2 = threading.Thread(target=worker, args=("beta", "beta-task"))
|
||||
t1.start()
|
||||
t2.start()
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
assert failures == []
|
||||
|
||||
with kb.connect_closing(board="alpha") as conn:
|
||||
alpha_titles = [row.title for row in kb.list_tasks(conn, limit=100)]
|
||||
with kb.connect_closing(board="beta") as conn:
|
||||
beta_titles = [row.title for row in kb.list_tasks(conn, limit=100)]
|
||||
|
||||
assert alpha_titles == ["alpha-task"]
|
||||
assert beta_titles == ["beta-task"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration with the COMMAND_REGISTRY
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user