From b14e15c48e5226d9fabd356a76e9b0f6d15f7816 Mon Sep 17 00:00:00 2001 From: helix4u <4317663+helix4u@users.noreply.github.com> Date: Sun, 31 May 2026 20:35:11 -0600 Subject: [PATCH] fix(gateway): clean service restart notifications --- gateway/platforms/telegram.py | 6 +- gateway/run.py | 196 ++++++++++++++++-- hermes_cli/gateway.py | 6 +- hermes_cli/main.py | 9 +- tests/gateway/restart_test_helpers.py | 1 + tests/gateway/test_gateway_shutdown.py | 115 +++++++++- tests/gateway/test_restart_notification.py | 42 ++++ .../gateway/test_telegram_thread_fallback.py | 29 +++ tests/gateway/test_update_command.py | 6 + 9 files changed, 377 insertions(+), 33 deletions(-) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 8e31fee1f..14820c0fe 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -595,7 +595,11 @@ class TelegramAdapter(BasePlatformAdapter): metadata: Optional[Dict[str, Any]], ) -> bool: if cls._metadata_direct_messages_topic_id(metadata) is not None: - return False + return bool( + metadata + and metadata.get("telegram_dm_topic_reply_fallback") + and cls._metadata_reply_to_message_id(metadata) is not None + ) if metadata and metadata.get("telegram_dm_topic_created_for_send"): return False return bool( diff --git a/gateway/run.py b/gateway/run.py index 933e88af3..c12c635e4 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -735,6 +735,19 @@ def _restart_notification_pending() -> bool: return (_hermes_home / ".restart_notify.json").exists() +def _planned_restart_notification_path() -> Path: + return _hermes_home / ".restart_pending.json" + + +def _planned_restart_notification_pending() -> bool: + """Return True when a non-chat planned restart should notify home channels.""" + return _planned_restart_notification_path().exists() + + +def _clear_planned_restart_notification() -> None: + _planned_restart_notification_path().unlink(missing_ok=True) + + # Mark this process as a gateway so cli.py's module-level load_cli_config() # knows not to clobber TERMINAL_CWD if lazily imported. os.environ["_HERMES_GATEWAY"] = "1" @@ -1680,6 +1693,7 @@ class GatewayRunner: _restart_task_started: bool = False _restart_detached: bool = False _restart_via_service: bool = False + _restart_command_source: Optional[SessionSource] = None _stop_task: Optional[asyncio.Task] = None _session_model_overrides: Dict[str, Dict[str, str]] = {} _session_reasoning_overrides: Dict[str, Dict[str, Any]] = {} @@ -1723,6 +1737,7 @@ class GatewayRunner: self._restart_task_started = False self._restart_detached = False self._restart_via_service = False + self._restart_command_source: Optional[SessionSource] = None self._stop_task: Optional[asyncio.Task] = None # Track running agents per session for interrupt support @@ -3480,6 +3495,7 @@ class GatewayRunner: logged and swallowed so they never block the shutdown sequence. """ active = self._snapshot_running_agents() + restart_source = self._restart_command_source if self._restart_requested else None action = "restarting" if self._restart_requested else "shutting down" hint = ( @@ -3543,11 +3559,23 @@ class GatewayRunner: ) continue + reply_to_message_id = getattr(source, "message_id", None) if source is not None else None + if reply_to_message_id is None and restart_source is not None: + try: + restart_platform = restart_source.platform.value + restart_chat_id = str(restart_source.chat_id) + restart_thread_id = str(restart_source.thread_id) if restart_source.thread_id else None + if (restart_platform, restart_chat_id, restart_thread_id) == dedup_key: + reply_to_message_id = getattr(restart_source, "message_id", None) + except Exception: + pass + metadata = self._thread_metadata_for_target( platform, chat_id, thread_id, chat_type=getattr(source, "chat_type", None) if source is not None else None, + reply_to_message_id=reply_to_message_id, adapter=adapter, ) @@ -3572,6 +3600,10 @@ class GatewayRunner: platform_str, chat_id, e, ) + if self._restart_requested and restart_source is not None: + logger.debug("Skipping home-channel shutdown notifications for in-chat restart") + return + # Snapshot adapters up front: adapter.send() can hit a fatal error # path that pops the adapter from self.adapters (see _handle_fatal # elsewhere), which would otherwise trigger @@ -3881,6 +3913,83 @@ class GatewayRunner: start_new_session=True, ) + def _launch_systemd_restart_shortcut(self) -> None: + """Best-effort helper to bypass systemd's automatic restart delay. + + For planned in-chat restarts, the gateway exits cleanly so systemd does + not record a failure. However, units with RestartSteps still count + automatic restarts and can delay repeated /restart tests. A transient + user service survives our cgroup teardown and explicitly starts the + gateway as soon as this PID exits, while the unit keeps its normal + backoff for real crash loops. + """ + if sys.platform != "linux" or not os.environ.get("INVOCATION_ID"): + return + + try: + import shutil + import subprocess + + systemd_run = shutil.which("systemd-run") + systemctl = shutil.which("systemctl") + if not systemd_run or not systemctl: + return + + try: + from hermes_cli.gateway import get_service_name + + service_name = get_service_name() + except Exception: + service_name = "hermes-gateway" + + current_pid = os.getpid() + show = subprocess.run( + [ + systemctl, + "--user", + "show", + service_name, + "--property=MainPID", + "--value", + ], + capture_output=True, + text=True, + timeout=2, + ) + if (show.stdout or "").strip() != str(current_pid): + return + + systemctl_user = "systemctl --user" + service_arg = shlex.quote(service_name) + shell_cmd = ( + f"while kill -0 {current_pid} 2>/dev/null; do sleep 0.2; done; " + f"{systemctl_user} reset-failed {service_arg}; " + f"{systemctl_user} restart {service_arg}" + ) + unit_name = f"{service_name}-planned-restart-{current_pid}".replace(".", "-") + subprocess.Popen( + [ + systemd_run, + "--user", + "--collect", + "--unit", + unit_name, + "/bin/sh", + "-lc", + shell_cmd, + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + logger.info( + "Launched systemd planned-restart helper for %s (pid=%s)", + service_name, + current_pid, + ) + except Exception as e: + logger.debug("Failed to launch systemd planned-restart helper: %s", e) + def request_restart(self, *, detached: bool = False, via_service: bool = False) -> bool: if self._restart_task_started: return False @@ -4449,21 +4558,21 @@ class GatewayRunner: await asyncio.sleep(1.0) # Notify the chat that initiated /restart that the gateway is back. - restart_notification_pending = _restart_notification_pending() - delivered_restart_target = await self._send_restart_notification() + planned_restart_notification_pending = _planned_restart_notification_pending() + await self._send_restart_notification() - # Broadcast a lightweight "gateway is back" message to configured - # home channels only when this startup is resuming from /restart. If a - # /restart requester already received a direct completion notice in the - # same chat, skip the generic broadcast there to avoid duplicates while - # still allowing a home-channel fallback when the direct send fails. - if restart_notification_pending or delivered_restart_target is not None: - skip_home_targets = ( - {delivered_restart_target} if delivered_restart_target else None - ) - await self._send_home_channel_startup_notifications( - skip_targets=skip_home_targets, - ) + # Broadcast a lightweight "gateway is back" message to configured home + # channels only for non-chat planned restarts (terminal/SIGUSR1/service + # paths). Chat-originated /restart already has a precise reply target + # in .restart_notify.json, so keep that lifecycle in the originating + # chat/topic instead of also leaking it to the configured home channel. + if planned_restart_notification_pending: + try: + await self._send_home_channel_startup_notifications( + skip_targets=None, + ) + finally: + _clear_planned_restart_notification() # Automatically continue fresh sessions that were interrupted by the # previous gateway restart/shutdown. The resume_pending flag is cleared @@ -6375,8 +6484,33 @@ class GatewayRunner: if active_agents: self._increment_restart_failure_counts(set(active_agents.keys())) + if self._restart_requested and self._restart_command_source is None: + try: + atomic_json_write( + _planned_restart_notification_path(), + { + "requested_at": time.time(), + "via_service": bool(self._restart_via_service), + "detached": bool(self._restart_detached), + }, + indent=None, + ) + except Exception as e: + logger.debug("Failed to write planned restart notification marker: %s", e) + if self._restart_requested and self._restart_via_service: - self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE + self._launch_systemd_restart_shortcut() + # systemd units use Restart=always, so a planned restart should + # exit cleanly and still be relaunched. Using TEMPFAIL here + # makes systemd treat the operator-requested restart as a + # failure and can trip stepped restart backoff. launchd's + # KeepAlive.SuccessfulExit=false needs a non-zero exit to + # relaunch, so keep the old code on macOS. + self._exit_code = ( + GATEWAY_SERVICE_RESTART_EXIT_CODE + if sys.platform == "darwin" or not os.environ.get("INVOCATION_ID") + else 0 + ) self._exit_reason = self._exit_reason or "Gateway restart requested" self._draining = False @@ -10347,6 +10481,18 @@ class GatewayRunner: } if event.source.thread_id: notify_data["thread_id"] = event.source.thread_id + if event.message_id: + notify_data["message_id"] = event.message_id + if event.source is not None: + try: + self._restart_command_source = dataclasses.replace( + event.source, + message_id=str(event.message_id) + if event.message_id is not None + else event.source.message_id, + ) + except Exception: + self._restart_command_source = event.source atomic_json_write( _hermes_home / ".restart_notify.json", notify_data, @@ -14476,6 +14622,8 @@ class GatewayRunner: } if event.source.thread_id: pending["thread_id"] = event.source.thread_id + if event.message_id: + pending["message_id"] = event.message_id _tmp_pending = pending_path.with_suffix(".tmp") _tmp_pending.write_text(json.dumps(pending)) _tmp_pending.replace(pending_path) @@ -14623,6 +14771,7 @@ class GatewayRunner: chat_type = pending.get("chat_type") session_key = pending.get("session_key") thread_id = pending.get("thread_id") + message_id = pending.get("message_id") if platform_str and chat_id: platform = Platform(platform_str) adapter = self.adapters.get(platform) @@ -14631,6 +14780,7 @@ class GatewayRunner: chat_id, thread_id, chat_type=chat_type, + reply_to_message_id=message_id, adapter=adapter, ) # Fallback session key if not stored (old pending files) @@ -14838,6 +14988,7 @@ class GatewayRunner: chat_id = pending.get("chat_id") chat_type = pending.get("chat_type") thread_id = pending.get("thread_id") + message_id = pending.get("message_id") if not exit_code_path.exists(): logger.info("Update notification deferred: update still running") @@ -14864,6 +15015,7 @@ class GatewayRunner: chat_id, thread_id, chat_type=chat_type, + reply_to_message_id=message_id, adapter=adapter, ) # Strip ANSI escape codes for clean display @@ -14909,6 +15061,7 @@ class GatewayRunner: chat_id = data.get("chat_id") chat_type = data.get("chat_type") thread_id = data.get("thread_id") + message_id = data.get("message_id") if not platform_str or not chat_id: return None @@ -14935,6 +15088,7 @@ class GatewayRunner: chat_id, thread_id, chat_type=chat_type, + reply_to_message_id=message_id, adapter=adapter, ) result = await adapter.send( @@ -19296,16 +19450,12 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = ) return False # → sys.exit(1) in the caller - # When the gateway is restarting via the service manager (SIGUSR1 → - # launchd_restart or /restart / /update commands), exit with code 75 so - # that launchd's ``KeepAlive → SuccessfulExit → false`` policy treats - # the exit as *unsuccessful* and relaunches the service. This mirrors - # the systemd ``RestartForceExitStatus=75`` convention already used by - # the systemd unit template. + # Older restart paths may reach here without ``runner.exit_code`` set. + # Keep the historical non-zero fallback for service-managed restarts. if runner._restart_via_service: logger.info( - "Exiting with code 75 (service-restart requested) so " - "launchd KeepAlive relaunches the gateway." + "Exiting with code 75 (service-restart requested) so the service " + "manager relaunches the gateway." ) raise SystemExit(75) diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index ec3c433ce..0b1f97046 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -227,9 +227,9 @@ def _graceful_restart_via_sigusr1(pid: int, drain_timeout: float) -> bool: SIGUSR1 is wired in gateway/run.py to ``request_restart(via_service=True)`` which drains in-flight agent runs (up to ``agent.restart_drain_timeout`` - seconds), then exits with code 75. Both systemd (``Restart=always`` - + ``RestartForceExitStatus=75``) and launchd (``KeepAlive.SuccessfulExit - = false``) relaunch the process after the graceful exit. + seconds), then exits. systemd relaunches clean exits via + ``Restart=always``; launchd still uses a non-zero planned-restart exit + because its plist has ``KeepAlive.SuccessfulExit = false``. This is the drain-aware alternative to ``systemctl restart`` / ``SIGTERM``, which SIGKILL in-flight agents after a short timeout. diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 3c02c9818..fa4bbb9e4 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -10174,8 +10174,7 @@ def _cmd_update_impl(args, gateway_mode: bool): # agent runs drain instead of being SIGKILLed. # The gateway's SIGUSR1 handler calls # request_restart(via_service=True) → drain → - # exit(75); systemd's Restart=on-failure (and - # RestartForceExitStatus=75) respawns the unit. + # exit; systemd's Restart=always respawns the unit. _main_pid = 0 try: _show = subprocess.run( @@ -10209,9 +10208,9 @@ def _cmd_update_impl(args, gateway_mode: bool): ) if _graceful_ok: - # Gateway exited 75. ``Restart=always`` + - # ``RestartForceExitStatus=75`` means systemd - # WILL respawn the unit — but only after + # Gateway exited after a planned restart. + # ``Restart=always`` means systemd WILL respawn + # the unit — but only after # ``RestartSec`` (default 60s on our unit # file). That 60s wait is a crash-loop guard, # and is the right default when the gateway diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index a91816c4e..01be2b4cc 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -69,6 +69,7 @@ def make_restart_runner( runner._restart_task_started = False runner._restart_detached = False runner._restart_via_service = False + runner._restart_command_source = None runner._restart_drain_timeout = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT runner._stop_task = None runner._busy_input_mode = "interrupt" diff --git a/tests/gateway/test_gateway_shutdown.py b/tests/gateway/test_gateway_shutdown.py index d12fac14b..eae7d0377 100644 --- a/tests/gateway/test_gateway_shutdown.py +++ b/tests/gateway/test_gateway_shutdown.py @@ -3,6 +3,8 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest +import gateway.run as gateway_run +from gateway.config import HomeChannel, Platform from gateway.platforms.base import MessageEvent from gateway.restart import GATEWAY_SERVICE_RESTART_EXIT_CODE from gateway.session import build_session_key @@ -132,16 +134,127 @@ async def test_gateway_stop_interrupts_after_drain_timeout(): @pytest.mark.asyncio -async def test_gateway_stop_service_restart_sets_named_exit_code(): +async def test_gateway_stop_systemd_service_restart_exits_cleanly(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) runner, adapter = make_restart_runner() adapter.disconnect = AsyncMock() + monkeypatch.setenv("INVOCATION_ID", "systemd-test") + runner._launch_systemd_restart_shortcut = MagicMock() with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"): await runner.stop(restart=True, service_restart=True) + runner._launch_systemd_restart_shortcut.assert_called_once_with() + assert runner._exit_code == 0 + assert (tmp_path / ".restart_pending.json").exists() + + +@pytest.mark.asyncio +async def test_gateway_stop_launchd_service_restart_keeps_nonzero_exit(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + runner, adapter = make_restart_runner() + adapter.disconnect = AsyncMock() + + with patch("gateway.run.sys.platform", "darwin"), patch( + "gateway.status.remove_pid_file" + ), patch("gateway.status.write_runtime_status"): + await runner.stop(restart=True, service_restart=True) + assert runner._exit_code == GATEWAY_SERVICE_RESTART_EXIT_CODE +@pytest.mark.asyncio +async def test_restart_shutdown_warning_uses_restart_command_reply_anchor_for_active_session(): + runner, adapter = make_restart_runner() + source = make_restart_source(thread_id="42") + session_key = build_session_key(source) + runner._running_agents = {session_key: MagicMock()} + runner._cache_session_source(session_key, source) + restart_source = make_restart_source(thread_id="42") + restart_source.message_id = "restart-command" + runner._restart_requested = True + runner._restart_command_source = restart_source + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id=source.chat_id, + name="Telegram", + thread_id=source.thread_id, + ) + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent_calls) == 1 + chat_id, message, metadata = adapter.sent_calls[0] + assert chat_id == source.chat_id + assert "Gateway restarting" in message + assert metadata["thread_id"] == source.thread_id + assert metadata["telegram_dm_topic_reply_fallback"] is True + assert metadata["direct_messages_topic_id"] == source.thread_id + assert metadata["telegram_reply_to_message_id"] == "restart-command" + + +@pytest.mark.asyncio +async def test_in_chat_restart_skips_home_shutdown_even_with_active_session(): + runner, adapter = make_restart_runner() + source = make_restart_source(thread_id="42") + session_key = build_session_key(source) + runner._running_agents = {session_key: MagicMock()} + runner._cache_session_source(session_key, source) + restart_source = make_restart_source(thread_id="42") + restart_source.message_id = "restart-command" + runner._restart_requested = True + runner._restart_command_source = restart_source + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-chat", + name="Telegram Home", + ) + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent_calls) == 1 + chat_id, message, metadata = adapter.sent_calls[0] + assert chat_id == source.chat_id + assert "Gateway restarting" in message + assert metadata["telegram_reply_to_message_id"] == "restart-command" + + +@pytest.mark.asyncio +async def test_idle_in_chat_restart_does_not_send_interruption_warning(): + runner, adapter = make_restart_runner() + source = make_restart_source(thread_id="42") + source.message_id = "restart-command" + runner._restart_requested = True + runner._restart_command_source = source + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id=source.chat_id, + name="Telegram", + thread_id=source.thread_id, + ) + + await runner._notify_active_sessions_of_shutdown() + + assert adapter.sent_calls == [] + + +@pytest.mark.asyncio +async def test_in_chat_restart_does_not_write_home_startup_marker(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + runner, adapter = make_restart_runner() + adapter.disconnect = AsyncMock() + source = make_restart_source(thread_id="42") + source.message_id = "restart-command" + runner._restart_command_source = source + runner._launch_systemd_restart_shortcut = MagicMock() + monkeypatch.setenv("INVOCATION_ID", "systemd-test") + + with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"): + await runner.stop(restart=True, service_restart=True) + + assert not (tmp_path / ".restart_pending.json").exists() + + @pytest.mark.asyncio async def test_drain_active_agents_throttles_status_updates(): runner, _adapter = make_restart_runner() diff --git a/tests/gateway/test_restart_notification.py b/tests/gateway/test_restart_notification.py index 56be23370..0f0dadc42 100644 --- a/tests/gateway/test_restart_notification.py +++ b/tests/gateway/test_restart_notification.py @@ -32,6 +32,19 @@ def test_restart_notification_pending_true_with_marker(tmp_path, monkeypatch): assert gateway_run._restart_notification_pending() is True +def test_planned_restart_notification_pending_roundtrip(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + marker = tmp_path / ".restart_pending.json" + + assert gateway_run._planned_restart_notification_pending() is False + marker.write_text("{}") + assert gateway_run._planned_restart_notification_pending() is True + + gateway_run._clear_planned_restart_notification() + + assert gateway_run._planned_restart_notification_pending() is False + + # ── _handle_restart_command writes .restart_notify.json ────────────────── @@ -60,6 +73,7 @@ async def test_restart_command_writes_notify_file(tmp_path, monkeypatch): assert data["platform"] == "telegram" assert data["chat_id"] == "42" assert data["chat_type"] == "dm" + assert data["message_id"] == "m1" assert "thread_id" not in data # no thread → omitted @@ -127,6 +141,7 @@ async def test_restart_command_preserves_thread_id(tmp_path, monkeypatch): data = json.loads((tmp_path / ".restart_notify.json").read_text()) assert data["chat_type"] == "dm" assert data["thread_id"] == "777" + assert data["message_id"] == "m2" @pytest.mark.asyncio @@ -390,6 +405,7 @@ async def test_send_restart_notification_with_thread(tmp_path, monkeypatch): "chat_id": "99", "chat_type": "dm", "thread_id": "777", + "message_id": "m2", })) runner, adapter = make_restart_runner() @@ -403,6 +419,7 @@ async def test_send_restart_notification_with_thread(tmp_path, monkeypatch): "thread_id": "777", "telegram_dm_topic_reply_fallback": True, "direct_messages_topic_id": "777", + "telegram_reply_to_message_id": "m2", } assert not notify_path.exists() @@ -642,3 +659,28 @@ async def test_shutdown_notifications_use_cached_live_thread_source_when_origin_ "⚠️ Gateway shutting down — Your current task will be interrupted.", metadata={"thread_id": "topic-7"}, ) + + +@pytest.mark.asyncio +async def test_restart_shutdown_notification_anchors_telegram_dm_topic(): + runner, adapter = make_restart_runner() + runner._restart_requested = True + source = make_restart_source(chat_id="123456", thread_id="20197") + source.message_id = "462" + session_key = build_session_key(source) + + runner._running_agents[session_key] = object() + runner.session_store._entries[session_key] = MagicMock(origin=source) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="shutdown")) + + await runner._notify_active_sessions_of_shutdown() + + call = adapter.send.await_args + assert call.args[0] == "123456" + assert "Gateway restarting" in call.args[1] + assert call.kwargs["metadata"] == { + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "direct_messages_topic_id": "20197", + "telegram_reply_to_message_id": "462", + } diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 5f56baebc..036d27e77 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -597,6 +597,35 @@ async def test_send_uses_reply_fallback_for_hermes_dm_topics(): assert "direct_messages_topic_id" not in call_log[0] +@pytest.mark.asyncio +async def test_send_uses_reply_anchor_when_direct_topic_fallback_metadata_exists(): + """Restart/update replay metadata keeps the anchor authoritative when present.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(kwargs) + return SimpleNamespace(message_id=777) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="test message", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "direct_messages_topic_id": "20197", + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert "direct_messages_topic_id" not in call_log[0] + + @pytest.mark.asyncio async def test_send_created_private_topic_uses_message_thread_without_anchor(): """Topics created via createForumTopic are addressable by message_thread_id directly.""" diff --git a/tests/gateway/test_update_command.py b/tests/gateway/test_update_command.py index e3f74694b..6ff37c0fb 100644 --- a/tests/gateway/test_update_command.py +++ b/tests/gateway/test_update_command.py @@ -189,6 +189,7 @@ class TestHandleUpdateCommand: """Writes .update_pending.json with correct platform and chat info.""" runner = _make_runner() event = _make_event(platform=Platform.TELEGRAM, chat_id="99999") + event.message_id = "m-update" fake_root = tmp_path / "project" fake_root.mkdir() @@ -211,6 +212,7 @@ class TestHandleUpdateCommand: assert data["platform"] == "telegram" assert data["chat_id"] == "99999" assert data["chat_type"] == "dm" + assert data["message_id"] == "m-update" assert "timestamp" in data assert not (hermes_home / ".update_exit_code").exists() @@ -223,6 +225,7 @@ class TestHandleUpdateCommand: chat_id="99999", thread_id="777", ) + event.message_id = "m-update-thread" fake_root = tmp_path / "project" fake_root.mkdir() @@ -241,6 +244,7 @@ class TestHandleUpdateCommand: data = json.loads((hermes_home / ".update_pending.json").read_text()) assert data["thread_id"] == "777" + assert data["message_id"] == "m-update-thread" @pytest.mark.asyncio async def test_spawns_setsid(self, tmp_path): @@ -472,6 +476,7 @@ class TestSendUpdateNotification: "chat_id": "67890", "chat_type": "dm", "thread_id": "777", + "message_id": "m-update-thread", "user_id": "12345", } (hermes_home / ".update_pending.json").write_text(json.dumps(pending)) @@ -488,6 +493,7 @@ class TestSendUpdateNotification: "thread_id": "777", "telegram_dm_topic_reply_fallback": True, "direct_messages_topic_id": "777", + "telegram_reply_to_message_id": "m-update-thread", } @pytest.mark.asyncio