feat(kanban): gate notifier watcher on dispatch_in_gateway
Non-dispatch gateways no longer open per-board kanban DBs for notifier polling. Mirrors the existing dispatcher gate (config kanban.dispatch_in_gateway, default True; env override HERMES_KANBAN_DISPATCH_IN_GATEWAY) so multi-gateway setups collapse to a single process holding kanban.db file descriptors. Salvaged from PR #31964 by @steveonjava; tests and docs trimmed during salvage.
This commit is contained in:
39
docs/kanban/multi-gateway.md
Normal file
39
docs/kanban/multi-gateway.md
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
# Multi-gateway deployment
|
||||||
|
|
||||||
|
Hermes supports multiple gateway processes running concurrently — one per profile
|
||||||
|
(default, writer, admin, coder, researcher). Each gateway opens its own connection
|
||||||
|
to platform APIs and delivers messages for its profile's subscribers.
|
||||||
|
|
||||||
|
## Single-dispatcher posture
|
||||||
|
|
||||||
|
Only one gateway owns the kanban dispatcher. The owning gateway keeps
|
||||||
|
`kanban.dispatch_in_gateway: true` (the default); every other gateway sets it
|
||||||
|
to `false`.
|
||||||
|
|
||||||
|
**Why this matters:** a gateway with `dispatch_in_gateway: true` opens per-board
|
||||||
|
SQLite connections for both the dispatcher and the notifier watcher. Multiple
|
||||||
|
gateways doing this concurrently multiplies the open file descriptors on each
|
||||||
|
`kanban.db` and amplifies WAL `-shm` reader contention. Gating both paths on the
|
||||||
|
same flag means exactly one process touches the kanban DBs.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
On the dispatch-owning gateway (typically the `default` profile), no change is
|
||||||
|
needed. On every other profile gateway, add to `~/.hermes/config.yaml`:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
kanban:
|
||||||
|
dispatch_in_gateway: false
|
||||||
|
```
|
||||||
|
|
||||||
|
Or set the env var: `HERMES_KANBAN_DISPATCH_IN_GATEWAY=false`
|
||||||
|
|
||||||
|
## What each gateway does
|
||||||
|
|
||||||
|
| Gateway role | dispatch_in_gateway | Opens per-board DBs? | Runs dispatcher + notifier? |
|
||||||
|
|---|---|---|---|
|
||||||
|
| default (dispatch owner) | true (default) | yes | yes |
|
||||||
|
| writer, admin, coder, etc. | false | no | no |
|
||||||
|
|
||||||
|
Non-dispatch gateways still deliver messages for their own platform adapters
|
||||||
|
(Telegram, Discord, etc.) — they just don't poll kanban boards.
|
||||||
@ -5121,6 +5121,30 @@ class GatewayRunner:
|
|||||||
cross boards, so delivery semantics are unchanged — this is
|
cross boards, so delivery semantics are unchanged — this is
|
||||||
purely a fan-out of the single-DB poll.
|
purely a fan-out of the single-DB poll.
|
||||||
"""
|
"""
|
||||||
|
# Gate: only the dispatch-owning gateway opens kanban DBs for notifier polling.
|
||||||
|
# Non-dispatch gateways have no subscriptions to deliver — all kanban state lives
|
||||||
|
# in the dispatch owner's per-board DBs. This prevents N-gateway -shm contention.
|
||||||
|
# TODO: gate per-board when per-board dispatcher_owner tracking lands.
|
||||||
|
try:
|
||||||
|
from hermes_cli.config import load_config as _load_config
|
||||||
|
except Exception:
|
||||||
|
logger.warning("kanban notifier: config loader unavailable; disabled")
|
||||||
|
return
|
||||||
|
env_override = os.environ.get("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "").strip().lower()
|
||||||
|
if env_override in {"0", "false", "no", "off"}:
|
||||||
|
logger.info("kanban notifier: disabled via HERMES_KANBAN_DISPATCH_IN_GATEWAY env")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
cfg = _load_config()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("kanban notifier: cannot load config (%s); disabled", exc)
|
||||||
|
return
|
||||||
|
kanban_cfg = cfg.get("kanban", {}) if isinstance(cfg, dict) else {}
|
||||||
|
if not kanban_cfg.get("dispatch_in_gateway", True):
|
||||||
|
logger.info(
|
||||||
|
"kanban notifier: disabled via config kanban.dispatch_in_gateway=false"
|
||||||
|
)
|
||||||
|
return
|
||||||
from gateway.config import Platform as _Platform
|
from gateway.config import Platform as _Platform
|
||||||
try:
|
try:
|
||||||
from hermes_cli import kanban_db as _kb
|
from hermes_cli import kanban_db as _kb
|
||||||
|
|||||||
74
tests/gateway/test_kanban_notifier_watcher_dispatch_gate.py
Normal file
74
tests/gateway/test_kanban_notifier_watcher_dispatch_gate.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
"""Tests for the dispatch_in_gateway gate on _kanban_notifier_watcher.
|
||||||
|
|
||||||
|
- Non-dispatch gateways (dispatch_in_gateway=false) exit before opening any DB.
|
||||||
|
- HERMES_KANBAN_DISPATCH_IN_GATEWAY env var disables without loading config.
|
||||||
|
- Dispatch-owning gateways (dispatch_in_gateway=true) proceed past the gate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from gateway.config import Platform
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
|
||||||
|
def _make_runner(with_adapter=False):
|
||||||
|
runner = GatewayRunner.__new__(GatewayRunner)
|
||||||
|
runner._running = True
|
||||||
|
runner.adapters = {Platform.TELEGRAM: MagicMock()} if with_adapter else {}
|
||||||
|
runner._kanban_sub_fail_counts = {}
|
||||||
|
return runner
|
||||||
|
|
||||||
|
|
||||||
|
def _fake_config(dispatch_in_gateway):
|
||||||
|
return {"kanban": {"dispatch_in_gateway": dispatch_in_gateway}}
|
||||||
|
|
||||||
|
|
||||||
|
def test_notifier_watcher_skips_when_dispatch_disabled():
|
||||||
|
"""dispatch_in_gateway=false returns before opening any board DB."""
|
||||||
|
runner = _make_runner()
|
||||||
|
with patch("hermes_cli.config.load_config", return_value=_fake_config(False)):
|
||||||
|
with patch("hermes_cli.kanban_db.connect") as mock_connect:
|
||||||
|
asyncio.run(runner._kanban_notifier_watcher())
|
||||||
|
mock_connect.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_notifier_watcher_env_override_disables(monkeypatch):
|
||||||
|
"""HERMES_KANBAN_DISPATCH_IN_GATEWAY=false skips config load entirely."""
|
||||||
|
runner = _make_runner()
|
||||||
|
monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "false")
|
||||||
|
with patch("hermes_cli.config.load_config") as mock_load_config:
|
||||||
|
with patch("hermes_cli.kanban_db.connect") as mock_connect:
|
||||||
|
asyncio.run(runner._kanban_notifier_watcher())
|
||||||
|
mock_load_config.assert_not_called()
|
||||||
|
mock_connect.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_notifier_watcher_runs_when_dispatch_enabled():
|
||||||
|
"""dispatch_in_gateway=true proceeds past the gate to the board fan-out."""
|
||||||
|
runner = _make_runner(with_adapter=True)
|
||||||
|
past_gate = []
|
||||||
|
sleep_calls = []
|
||||||
|
|
||||||
|
async def fake_sleep(delay):
|
||||||
|
sleep_calls.append(delay)
|
||||||
|
# Stop after the initial delay + first per-interval sleep so the loop
|
||||||
|
# body runs exactly once.
|
||||||
|
if len(sleep_calls) >= 2:
|
||||||
|
runner._running = False
|
||||||
|
|
||||||
|
async def fake_to_thread(fn, *args, **kwargs):
|
||||||
|
return fn(*args, **kwargs)
|
||||||
|
|
||||||
|
import hermes_cli.kanban_db as _kb
|
||||||
|
|
||||||
|
with patch("hermes_cli.config.load_config", return_value=_fake_config(True)):
|
||||||
|
with patch.object(
|
||||||
|
_kb, "list_boards",
|
||||||
|
side_effect=lambda *a, **kw: past_gate.append(True) or [],
|
||||||
|
):
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
with patch("asyncio.to_thread", side_effect=fake_to_thread):
|
||||||
|
asyncio.run(runner._kanban_notifier_watcher())
|
||||||
|
|
||||||
|
assert past_gate, "list_boards should be called when dispatch_in_gateway=true"
|
||||||
Reference in New Issue
Block a user