fix(gateway): clean service restart notifications

This commit is contained in:
helix4u
2026-05-31 20:35:11 -06:00
committed by Teknium
parent 380ce4789b
commit b14e15c48e
9 changed files with 377 additions and 33 deletions

View File

@ -595,7 +595,11 @@ class TelegramAdapter(BasePlatformAdapter):
metadata: Optional[Dict[str, Any]],
) -> bool:
if cls._metadata_direct_messages_topic_id(metadata) is not None:
return False
return bool(
metadata
and metadata.get("telegram_dm_topic_reply_fallback")
and cls._metadata_reply_to_message_id(metadata) is not None
)
if metadata and metadata.get("telegram_dm_topic_created_for_send"):
return False
return bool(

View File

@ -735,6 +735,19 @@ def _restart_notification_pending() -> bool:
return (_hermes_home / ".restart_notify.json").exists()
def _planned_restart_notification_path() -> Path:
return _hermes_home / ".restart_pending.json"
def _planned_restart_notification_pending() -> bool:
"""Return True when a non-chat planned restart should notify home channels."""
return _planned_restart_notification_path().exists()
def _clear_planned_restart_notification() -> None:
_planned_restart_notification_path().unlink(missing_ok=True)
# Mark this process as a gateway so cli.py's module-level load_cli_config()
# knows not to clobber TERMINAL_CWD if lazily imported.
os.environ["_HERMES_GATEWAY"] = "1"
@ -1680,6 +1693,7 @@ class GatewayRunner:
_restart_task_started: bool = False
_restart_detached: bool = False
_restart_via_service: bool = False
_restart_command_source: Optional[SessionSource] = None
_stop_task: Optional[asyncio.Task] = None
_session_model_overrides: Dict[str, Dict[str, str]] = {}
_session_reasoning_overrides: Dict[str, Dict[str, Any]] = {}
@ -1723,6 +1737,7 @@ class GatewayRunner:
self._restart_task_started = False
self._restart_detached = False
self._restart_via_service = False
self._restart_command_source: Optional[SessionSource] = None
self._stop_task: Optional[asyncio.Task] = None
# Track running agents per session for interrupt support
@ -3480,6 +3495,7 @@ class GatewayRunner:
logged and swallowed so they never block the shutdown sequence.
"""
active = self._snapshot_running_agents()
restart_source = self._restart_command_source if self._restart_requested else None
action = "restarting" if self._restart_requested else "shutting down"
hint = (
@ -3543,11 +3559,23 @@ class GatewayRunner:
)
continue
reply_to_message_id = getattr(source, "message_id", None) if source is not None else None
if reply_to_message_id is None and restart_source is not None:
try:
restart_platform = restart_source.platform.value
restart_chat_id = str(restart_source.chat_id)
restart_thread_id = str(restart_source.thread_id) if restart_source.thread_id else None
if (restart_platform, restart_chat_id, restart_thread_id) == dedup_key:
reply_to_message_id = getattr(restart_source, "message_id", None)
except Exception:
pass
metadata = self._thread_metadata_for_target(
platform,
chat_id,
thread_id,
chat_type=getattr(source, "chat_type", None) if source is not None else None,
reply_to_message_id=reply_to_message_id,
adapter=adapter,
)
@ -3572,6 +3600,10 @@ class GatewayRunner:
platform_str, chat_id, e,
)
if self._restart_requested and restart_source is not None:
logger.debug("Skipping home-channel shutdown notifications for in-chat restart")
return
# Snapshot adapters up front: adapter.send() can hit a fatal error
# path that pops the adapter from self.adapters (see _handle_fatal
# elsewhere), which would otherwise trigger
@ -3881,6 +3913,83 @@ class GatewayRunner:
start_new_session=True,
)
def _launch_systemd_restart_shortcut(self) -> None:
"""Best-effort helper to bypass systemd's automatic restart delay.
For planned in-chat restarts, the gateway exits cleanly so systemd does
not record a failure. However, units with RestartSteps still count
automatic restarts and can delay repeated /restart tests. A transient
user service survives our cgroup teardown and explicitly starts the
gateway as soon as this PID exits, while the unit keeps its normal
backoff for real crash loops.
"""
if sys.platform != "linux" or not os.environ.get("INVOCATION_ID"):
return
try:
import shutil
import subprocess
systemd_run = shutil.which("systemd-run")
systemctl = shutil.which("systemctl")
if not systemd_run or not systemctl:
return
try:
from hermes_cli.gateway import get_service_name
service_name = get_service_name()
except Exception:
service_name = "hermes-gateway"
current_pid = os.getpid()
show = subprocess.run(
[
systemctl,
"--user",
"show",
service_name,
"--property=MainPID",
"--value",
],
capture_output=True,
text=True,
timeout=2,
)
if (show.stdout or "").strip() != str(current_pid):
return
systemctl_user = "systemctl --user"
service_arg = shlex.quote(service_name)
shell_cmd = (
f"while kill -0 {current_pid} 2>/dev/null; do sleep 0.2; done; "
f"{systemctl_user} reset-failed {service_arg}; "
f"{systemctl_user} restart {service_arg}"
)
unit_name = f"{service_name}-planned-restart-{current_pid}".replace(".", "-")
subprocess.Popen(
[
systemd_run,
"--user",
"--collect",
"--unit",
unit_name,
"/bin/sh",
"-lc",
shell_cmd,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
logger.info(
"Launched systemd planned-restart helper for %s (pid=%s)",
service_name,
current_pid,
)
except Exception as e:
logger.debug("Failed to launch systemd planned-restart helper: %s", e)
def request_restart(self, *, detached: bool = False, via_service: bool = False) -> bool:
if self._restart_task_started:
return False
@ -4449,21 +4558,21 @@ class GatewayRunner:
await asyncio.sleep(1.0)
# Notify the chat that initiated /restart that the gateway is back.
restart_notification_pending = _restart_notification_pending()
delivered_restart_target = await self._send_restart_notification()
planned_restart_notification_pending = _planned_restart_notification_pending()
await self._send_restart_notification()
# Broadcast a lightweight "gateway is back" message to configured
# home channels only when this startup is resuming from /restart. If a
# /restart requester already received a direct completion notice in the
# same chat, skip the generic broadcast there to avoid duplicates while
# still allowing a home-channel fallback when the direct send fails.
if restart_notification_pending or delivered_restart_target is not None:
skip_home_targets = (
{delivered_restart_target} if delivered_restart_target else None
)
await self._send_home_channel_startup_notifications(
skip_targets=skip_home_targets,
)
# Broadcast a lightweight "gateway is back" message to configured home
# channels only for non-chat planned restarts (terminal/SIGUSR1/service
# paths). Chat-originated /restart already has a precise reply target
# in .restart_notify.json, so keep that lifecycle in the originating
# chat/topic instead of also leaking it to the configured home channel.
if planned_restart_notification_pending:
try:
await self._send_home_channel_startup_notifications(
skip_targets=None,
)
finally:
_clear_planned_restart_notification()
# Automatically continue fresh sessions that were interrupted by the
# previous gateway restart/shutdown. The resume_pending flag is cleared
@ -6375,8 +6484,33 @@ class GatewayRunner:
if active_agents:
self._increment_restart_failure_counts(set(active_agents.keys()))
if self._restart_requested and self._restart_command_source is None:
try:
atomic_json_write(
_planned_restart_notification_path(),
{
"requested_at": time.time(),
"via_service": bool(self._restart_via_service),
"detached": bool(self._restart_detached),
},
indent=None,
)
except Exception as e:
logger.debug("Failed to write planned restart notification marker: %s", e)
if self._restart_requested and self._restart_via_service:
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
self._launch_systemd_restart_shortcut()
# systemd units use Restart=always, so a planned restart should
# exit cleanly and still be relaunched. Using TEMPFAIL here
# makes systemd treat the operator-requested restart as a
# failure and can trip stepped restart backoff. launchd's
# KeepAlive.SuccessfulExit=false needs a non-zero exit to
# relaunch, so keep the old code on macOS.
self._exit_code = (
GATEWAY_SERVICE_RESTART_EXIT_CODE
if sys.platform == "darwin" or not os.environ.get("INVOCATION_ID")
else 0
)
self._exit_reason = self._exit_reason or "Gateway restart requested"
self._draining = False
@ -10347,6 +10481,18 @@ class GatewayRunner:
}
if event.source.thread_id:
notify_data["thread_id"] = event.source.thread_id
if event.message_id:
notify_data["message_id"] = event.message_id
if event.source is not None:
try:
self._restart_command_source = dataclasses.replace(
event.source,
message_id=str(event.message_id)
if event.message_id is not None
else event.source.message_id,
)
except Exception:
self._restart_command_source = event.source
atomic_json_write(
_hermes_home / ".restart_notify.json",
notify_data,
@ -14476,6 +14622,8 @@ class GatewayRunner:
}
if event.source.thread_id:
pending["thread_id"] = event.source.thread_id
if event.message_id:
pending["message_id"] = event.message_id
_tmp_pending = pending_path.with_suffix(".tmp")
_tmp_pending.write_text(json.dumps(pending))
_tmp_pending.replace(pending_path)
@ -14623,6 +14771,7 @@ class GatewayRunner:
chat_type = pending.get("chat_type")
session_key = pending.get("session_key")
thread_id = pending.get("thread_id")
message_id = pending.get("message_id")
if platform_str and chat_id:
platform = Platform(platform_str)
adapter = self.adapters.get(platform)
@ -14631,6 +14780,7 @@ class GatewayRunner:
chat_id,
thread_id,
chat_type=chat_type,
reply_to_message_id=message_id,
adapter=adapter,
)
# Fallback session key if not stored (old pending files)
@ -14838,6 +14988,7 @@ class GatewayRunner:
chat_id = pending.get("chat_id")
chat_type = pending.get("chat_type")
thread_id = pending.get("thread_id")
message_id = pending.get("message_id")
if not exit_code_path.exists():
logger.info("Update notification deferred: update still running")
@ -14864,6 +15015,7 @@ class GatewayRunner:
chat_id,
thread_id,
chat_type=chat_type,
reply_to_message_id=message_id,
adapter=adapter,
)
# Strip ANSI escape codes for clean display
@ -14909,6 +15061,7 @@ class GatewayRunner:
chat_id = data.get("chat_id")
chat_type = data.get("chat_type")
thread_id = data.get("thread_id")
message_id = data.get("message_id")
if not platform_str or not chat_id:
return None
@ -14935,6 +15088,7 @@ class GatewayRunner:
chat_id,
thread_id,
chat_type=chat_type,
reply_to_message_id=message_id,
adapter=adapter,
)
result = await adapter.send(
@ -19296,16 +19450,12 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
)
return False # → sys.exit(1) in the caller
# When the gateway is restarting via the service manager (SIGUSR1 →
# launchd_restart or /restart / /update commands), exit with code 75 so
# that launchd's ``KeepAlive → SuccessfulExit → false`` policy treats
# the exit as *unsuccessful* and relaunches the service. This mirrors
# the systemd ``RestartForceExitStatus=75`` convention already used by
# the systemd unit template.
# Older restart paths may reach here without ``runner.exit_code`` set.
# Keep the historical non-zero fallback for service-managed restarts.
if runner._restart_via_service:
logger.info(
"Exiting with code 75 (service-restart requested) so "
"launchd KeepAlive relaunches the gateway."
"Exiting with code 75 (service-restart requested) so the service "
"manager relaunches the gateway."
)
raise SystemExit(75)

View File

@ -227,9 +227,9 @@ def _graceful_restart_via_sigusr1(pid: int, drain_timeout: float) -> bool:
SIGUSR1 is wired in gateway/run.py to ``request_restart(via_service=True)``
which drains in-flight agent runs (up to ``agent.restart_drain_timeout``
seconds), then exits with code 75. Both systemd (``Restart=always``
+ ``RestartForceExitStatus=75``) and launchd (``KeepAlive.SuccessfulExit
= false``) relaunch the process after the graceful exit.
seconds), then exits. systemd relaunches clean exits via
``Restart=always``; launchd still uses a non-zero planned-restart exit
because its plist has ``KeepAlive.SuccessfulExit = false``.
This is the drain-aware alternative to ``systemctl restart`` / ``SIGTERM``,
which SIGKILL in-flight agents after a short timeout.

View File

@ -10174,8 +10174,7 @@ def _cmd_update_impl(args, gateway_mode: bool):
# agent runs drain instead of being SIGKILLed.
# The gateway's SIGUSR1 handler calls
# request_restart(via_service=True) → drain →
# exit(75); systemd's Restart=on-failure (and
# RestartForceExitStatus=75) respawns the unit.
# exit; systemd's Restart=always respawns the unit.
_main_pid = 0
try:
_show = subprocess.run(
@ -10209,9 +10208,9 @@ def _cmd_update_impl(args, gateway_mode: bool):
)
if _graceful_ok:
# Gateway exited 75. ``Restart=always`` +
# ``RestartForceExitStatus=75`` means systemd
# WILL respawn the unit — but only after
# Gateway exited after a planned restart.
# ``Restart=always`` means systemd WILL respawn
# the unit — but only after
# ``RestartSec`` (default 60s on our unit
# file). That 60s wait is a crash-loop guard,
# and is the right default when the gateway

View File

@ -69,6 +69,7 @@ def make_restart_runner(
runner._restart_task_started = False
runner._restart_detached = False
runner._restart_via_service = False
runner._restart_command_source = None
runner._restart_drain_timeout = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
runner._stop_task = None
runner._busy_input_mode = "interrupt"

View File

@ -3,6 +3,8 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import gateway.run as gateway_run
from gateway.config import HomeChannel, Platform
from gateway.platforms.base import MessageEvent
from gateway.restart import GATEWAY_SERVICE_RESTART_EXIT_CODE
from gateway.session import build_session_key
@ -132,16 +134,127 @@ async def test_gateway_stop_interrupts_after_drain_timeout():
@pytest.mark.asyncio
async def test_gateway_stop_service_restart_sets_named_exit_code():
async def test_gateway_stop_systemd_service_restart_exits_cleanly(tmp_path, monkeypatch):
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
runner, adapter = make_restart_runner()
adapter.disconnect = AsyncMock()
monkeypatch.setenv("INVOCATION_ID", "systemd-test")
runner._launch_systemd_restart_shortcut = MagicMock()
with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"):
await runner.stop(restart=True, service_restart=True)
runner._launch_systemd_restart_shortcut.assert_called_once_with()
assert runner._exit_code == 0
assert (tmp_path / ".restart_pending.json").exists()
@pytest.mark.asyncio
async def test_gateway_stop_launchd_service_restart_keeps_nonzero_exit(tmp_path, monkeypatch):
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
runner, adapter = make_restart_runner()
adapter.disconnect = AsyncMock()
with patch("gateway.run.sys.platform", "darwin"), patch(
"gateway.status.remove_pid_file"
), patch("gateway.status.write_runtime_status"):
await runner.stop(restart=True, service_restart=True)
assert runner._exit_code == GATEWAY_SERVICE_RESTART_EXIT_CODE
@pytest.mark.asyncio
async def test_restart_shutdown_warning_uses_restart_command_reply_anchor_for_active_session():
runner, adapter = make_restart_runner()
source = make_restart_source(thread_id="42")
session_key = build_session_key(source)
runner._running_agents = {session_key: MagicMock()}
runner._cache_session_source(session_key, source)
restart_source = make_restart_source(thread_id="42")
restart_source.message_id = "restart-command"
runner._restart_requested = True
runner._restart_command_source = restart_source
runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
platform=Platform.TELEGRAM,
chat_id=source.chat_id,
name="Telegram",
thread_id=source.thread_id,
)
await runner._notify_active_sessions_of_shutdown()
assert len(adapter.sent_calls) == 1
chat_id, message, metadata = adapter.sent_calls[0]
assert chat_id == source.chat_id
assert "Gateway restarting" in message
assert metadata["thread_id"] == source.thread_id
assert metadata["telegram_dm_topic_reply_fallback"] is True
assert metadata["direct_messages_topic_id"] == source.thread_id
assert metadata["telegram_reply_to_message_id"] == "restart-command"
@pytest.mark.asyncio
async def test_in_chat_restart_skips_home_shutdown_even_with_active_session():
runner, adapter = make_restart_runner()
source = make_restart_source(thread_id="42")
session_key = build_session_key(source)
runner._running_agents = {session_key: MagicMock()}
runner._cache_session_source(session_key, source)
restart_source = make_restart_source(thread_id="42")
restart_source.message_id = "restart-command"
runner._restart_requested = True
runner._restart_command_source = restart_source
runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
platform=Platform.TELEGRAM,
chat_id="home-chat",
name="Telegram Home",
)
await runner._notify_active_sessions_of_shutdown()
assert len(adapter.sent_calls) == 1
chat_id, message, metadata = adapter.sent_calls[0]
assert chat_id == source.chat_id
assert "Gateway restarting" in message
assert metadata["telegram_reply_to_message_id"] == "restart-command"
@pytest.mark.asyncio
async def test_idle_in_chat_restart_does_not_send_interruption_warning():
runner, adapter = make_restart_runner()
source = make_restart_source(thread_id="42")
source.message_id = "restart-command"
runner._restart_requested = True
runner._restart_command_source = source
runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
platform=Platform.TELEGRAM,
chat_id=source.chat_id,
name="Telegram",
thread_id=source.thread_id,
)
await runner._notify_active_sessions_of_shutdown()
assert adapter.sent_calls == []
@pytest.mark.asyncio
async def test_in_chat_restart_does_not_write_home_startup_marker(tmp_path, monkeypatch):
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
runner, adapter = make_restart_runner()
adapter.disconnect = AsyncMock()
source = make_restart_source(thread_id="42")
source.message_id = "restart-command"
runner._restart_command_source = source
runner._launch_systemd_restart_shortcut = MagicMock()
monkeypatch.setenv("INVOCATION_ID", "systemd-test")
with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"):
await runner.stop(restart=True, service_restart=True)
assert not (tmp_path / ".restart_pending.json").exists()
@pytest.mark.asyncio
async def test_drain_active_agents_throttles_status_updates():
runner, _adapter = make_restart_runner()

View File

@ -32,6 +32,19 @@ def test_restart_notification_pending_true_with_marker(tmp_path, monkeypatch):
assert gateway_run._restart_notification_pending() is True
def test_planned_restart_notification_pending_roundtrip(tmp_path, monkeypatch):
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
marker = tmp_path / ".restart_pending.json"
assert gateway_run._planned_restart_notification_pending() is False
marker.write_text("{}")
assert gateway_run._planned_restart_notification_pending() is True
gateway_run._clear_planned_restart_notification()
assert gateway_run._planned_restart_notification_pending() is False
# ── _handle_restart_command writes .restart_notify.json ──────────────────
@ -60,6 +73,7 @@ async def test_restart_command_writes_notify_file(tmp_path, monkeypatch):
assert data["platform"] == "telegram"
assert data["chat_id"] == "42"
assert data["chat_type"] == "dm"
assert data["message_id"] == "m1"
assert "thread_id" not in data # no thread → omitted
@ -127,6 +141,7 @@ async def test_restart_command_preserves_thread_id(tmp_path, monkeypatch):
data = json.loads((tmp_path / ".restart_notify.json").read_text())
assert data["chat_type"] == "dm"
assert data["thread_id"] == "777"
assert data["message_id"] == "m2"
@pytest.mark.asyncio
@ -390,6 +405,7 @@ async def test_send_restart_notification_with_thread(tmp_path, monkeypatch):
"chat_id": "99",
"chat_type": "dm",
"thread_id": "777",
"message_id": "m2",
}))
runner, adapter = make_restart_runner()
@ -403,6 +419,7 @@ async def test_send_restart_notification_with_thread(tmp_path, monkeypatch):
"thread_id": "777",
"telegram_dm_topic_reply_fallback": True,
"direct_messages_topic_id": "777",
"telegram_reply_to_message_id": "m2",
}
assert not notify_path.exists()
@ -642,3 +659,28 @@ async def test_shutdown_notifications_use_cached_live_thread_source_when_origin_
"⚠️ Gateway shutting down — Your current task will be interrupted.",
metadata={"thread_id": "topic-7"},
)
@pytest.mark.asyncio
async def test_restart_shutdown_notification_anchors_telegram_dm_topic():
runner, adapter = make_restart_runner()
runner._restart_requested = True
source = make_restart_source(chat_id="123456", thread_id="20197")
source.message_id = "462"
session_key = build_session_key(source)
runner._running_agents[session_key] = object()
runner.session_store._entries[session_key] = MagicMock(origin=source)
adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="shutdown"))
await runner._notify_active_sessions_of_shutdown()
call = adapter.send.await_args
assert call.args[0] == "123456"
assert "Gateway restarting" in call.args[1]
assert call.kwargs["metadata"] == {
"thread_id": "20197",
"telegram_dm_topic_reply_fallback": True,
"direct_messages_topic_id": "20197",
"telegram_reply_to_message_id": "462",
}

View File

@ -597,6 +597,35 @@ async def test_send_uses_reply_fallback_for_hermes_dm_topics():
assert "direct_messages_topic_id" not in call_log[0]
@pytest.mark.asyncio
async def test_send_uses_reply_anchor_when_direct_topic_fallback_metadata_exists():
"""Restart/update replay metadata keeps the anchor authoritative when present."""
adapter = _make_adapter()
call_log = []
async def mock_send_message(**kwargs):
call_log.append(kwargs)
return SimpleNamespace(message_id=777)
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(
chat_id="123",
content="test message",
metadata={
"thread_id": "20197",
"telegram_dm_topic_reply_fallback": True,
"direct_messages_topic_id": "20197",
"telegram_reply_to_message_id": "462",
},
)
assert result.success is True
assert call_log[0]["reply_to_message_id"] == 462
assert call_log[0]["message_thread_id"] == 20197
assert "direct_messages_topic_id" not in call_log[0]
@pytest.mark.asyncio
async def test_send_created_private_topic_uses_message_thread_without_anchor():
"""Topics created via createForumTopic are addressable by message_thread_id directly."""

View File

@ -189,6 +189,7 @@ class TestHandleUpdateCommand:
"""Writes .update_pending.json with correct platform and chat info."""
runner = _make_runner()
event = _make_event(platform=Platform.TELEGRAM, chat_id="99999")
event.message_id = "m-update"
fake_root = tmp_path / "project"
fake_root.mkdir()
@ -211,6 +212,7 @@ class TestHandleUpdateCommand:
assert data["platform"] == "telegram"
assert data["chat_id"] == "99999"
assert data["chat_type"] == "dm"
assert data["message_id"] == "m-update"
assert "timestamp" in data
assert not (hermes_home / ".update_exit_code").exists()
@ -223,6 +225,7 @@ class TestHandleUpdateCommand:
chat_id="99999",
thread_id="777",
)
event.message_id = "m-update-thread"
fake_root = tmp_path / "project"
fake_root.mkdir()
@ -241,6 +244,7 @@ class TestHandleUpdateCommand:
data = json.loads((hermes_home / ".update_pending.json").read_text())
assert data["thread_id"] == "777"
assert data["message_id"] == "m-update-thread"
@pytest.mark.asyncio
async def test_spawns_setsid(self, tmp_path):
@ -472,6 +476,7 @@ class TestSendUpdateNotification:
"chat_id": "67890",
"chat_type": "dm",
"thread_id": "777",
"message_id": "m-update-thread",
"user_id": "12345",
}
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
@ -488,6 +493,7 @@ class TestSendUpdateNotification:
"thread_id": "777",
"telegram_dm_topic_reply_fallback": True,
"direct_messages_topic_id": "777",
"telegram_reply_to_message_id": "m-update-thread",
}
@pytest.mark.asyncio