diff --git a/agent/agent_runtime_helpers.py b/agent/agent_runtime_helpers.py index 73f3cba43..685272368 100644 --- a/agent/agent_runtime_helpers.py +++ b/agent/agent_runtime_helpers.py @@ -47,6 +47,20 @@ def _ra(): return run_agent +AGENT_RUNTIME_POST_HOOK_TOOL_NAMES = frozenset( + {"todo", "session_search", "memory", "clarify", "delegate_task"} +) + + +def agent_runtime_owns_post_tool_hook(agent: Any, function_name: str) -> bool: + """Return True when an agent-level tool path emits its own post hook.""" + if function_name in AGENT_RUNTIME_POST_HOOK_TOOL_NAMES: + return True + if getattr(agent, "_context_engine_tool_names", None) and function_name in agent._context_engine_tool_names: + return True + memory_manager = getattr(agent, "_memory_manager", None) + return bool(memory_manager and memory_manager.has_tool(function_name)) + def convert_to_trajectory_format(agent, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]: """ @@ -1618,36 +1632,88 @@ def invoke_tool(agent, function_name: str, function_args: dict, effective_task_i try: from hermes_cli.plugins import get_pre_tool_call_block_message block_message = get_pre_tool_call_block_message( - function_name, function_args, task_id=effective_task_id or "", + function_name, + function_args, + task_id=effective_task_id or "", + session_id=getattr(agent, "session_id", "") or "", + tool_call_id=tool_call_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", ) except Exception: pass if block_message is not None: - return json.dumps({"error": block_message}, ensure_ascii=False) + result = json.dumps({"error": block_message}, ensure_ascii=False) + try: + from model_tools import _emit_post_tool_call_hook + _emit_post_tool_call_hook( + function_name=function_name, + function_args=function_args, + result=result, + task_id=effective_task_id or "", + session_id=getattr(agent, "session_id", "") or "", + tool_call_id=tool_call_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", + status="blocked", + error_type="plugin_block", + error_message=block_message, + ) + except Exception: + pass + return result + + tool_start_time = time.monotonic() + + def _finish_agent_tool(result: Any) -> Any: + try: + from model_tools import _emit_post_tool_call_hook, _tool_result_observer_fields + status, error_type, error_message = _tool_result_observer_fields(result) + _emit_post_tool_call_hook( + function_name=function_name, + function_args=function_args, + result=result, + task_id=effective_task_id or "", + session_id=getattr(agent, "session_id", "") or "", + tool_call_id=tool_call_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", + duration_ms=int((time.monotonic() - tool_start_time) * 1000), + status=status, + error_type=error_type, + error_message=error_message, + ) + except Exception: + pass + return result if function_name == "todo": from tools.todo_tool import todo_tool as _todo_tool - return _todo_tool( - todos=function_args.get("todos"), - merge=function_args.get("merge", False), - store=agent._todo_store, + return _finish_agent_tool( + _todo_tool( + todos=function_args.get("todos"), + merge=function_args.get("merge", False), + store=agent._todo_store, + ) ) elif function_name == "session_search": session_db = agent._get_session_db_for_recall() if not session_db: from hermes_state import format_session_db_unavailable - return json.dumps({"success": False, "error": format_session_db_unavailable()}) + return _finish_agent_tool(json.dumps({"success": False, "error": format_session_db_unavailable()})) from tools.session_search_tool import session_search as _session_search - return _session_search( - query=function_args.get("query", ""), - role_filter=function_args.get("role_filter"), - limit=function_args.get("limit", 3), - session_id=function_args.get("session_id"), - around_message_id=function_args.get("around_message_id"), - window=function_args.get("window", 5), - sort=function_args.get("sort"), - db=session_db, - current_session_id=agent.session_id, + return _finish_agent_tool( + _session_search( + query=function_args.get("query", ""), + role_filter=function_args.get("role_filter"), + limit=function_args.get("limit", 3), + session_id=function_args.get("session_id"), + around_message_id=function_args.get("around_message_id"), + window=function_args.get("window", 5), + sort=function_args.get("sort"), + db=session_db, + current_session_id=agent.session_id, + ) ) elif function_name == "memory": target = function_args.get("target", "memory") @@ -1673,23 +1739,27 @@ def invoke_tool(agent, function_name: str, function_args: dict, effective_task_i ) except Exception: pass - return result + return _finish_agent_tool(result) elif agent._memory_manager and agent._memory_manager.has_tool(function_name): - return agent._memory_manager.handle_tool_call(function_name, function_args) + return _finish_agent_tool(agent._memory_manager.handle_tool_call(function_name, function_args)) elif function_name == "clarify": from tools.clarify_tool import clarify_tool as _clarify_tool - return _clarify_tool( - question=function_args.get("question", ""), - choices=function_args.get("choices"), - callback=agent.clarify_callback, + return _finish_agent_tool( + _clarify_tool( + question=function_args.get("question", ""), + choices=function_args.get("choices"), + callback=agent.clarify_callback, + ) ) elif function_name == "delegate_task": - return agent._dispatch_delegate_task(function_args) + return _finish_agent_tool(agent._dispatch_delegate_task(function_args)) else: return _ra().handle_function_call( function_name, function_args, effective_task_id, tool_call_id=tool_call_id, session_id=agent.session_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", enabled_tools=list(agent.valid_tool_names) if agent.valid_tool_names else None, skip_pre_tool_call_hook=True, enabled_toolsets=getattr(agent, "enabled_toolsets", None), diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index f1ea6740b..c52b9b72d 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -435,6 +435,9 @@ def run_conversation( # state registry. Set BEFORE any tool dispatch so snapshots taken at # child-launch time see the parent's real id, not None. agent._current_task_id = effective_task_id + turn_id = f"{agent.session_id or 'session'}:{effective_task_id}:{uuid.uuid4().hex[:8]}" + agent._current_turn_id = turn_id + agent._current_api_request_id = "" # Reset retry counters and iteration budget at the start of each turn # so subagent usage from a previous turn doesn't eat into the next one. @@ -702,6 +705,8 @@ def run_conversation( _pre_results = _invoke_hook( "pre_llm_call", session_id=agent.session_id, + task_id=effective_task_id, + turn_id=turn_id, user_message=original_user_message, conversation_history=list(messages), is_first_turn=(not bool(conversation_history)), @@ -1153,6 +1158,8 @@ def run_conversation( finish_reason = "stop" response = None # Guard against UnboundLocalError if all retries fail api_kwargs = None # Guard against UnboundLocalError in except handler + api_request_id = f"{turn_id}:api:{api_call_count}" + agent._current_api_request_id = api_request_id while retry_count < max_retries: # ── Nous Portal rate limit guard ────────────────────── @@ -1220,37 +1227,58 @@ def run_conversation( api_kwargs = agent._get_transport().preflight_kwargs(api_kwargs, allow_stream=False) try: - from hermes_cli.plugins import invoke_hook as _invoke_hook - request_messages = api_kwargs.get("messages") - if not isinstance(request_messages, list): - request_messages = api_kwargs.get("input") - if not isinstance(request_messages, list): - request_messages = api_messages - # Shallow-copy the outer list so plugins that retain the - # reference for async snapshotting don't observe later - # mutations of api_messages. The inner dicts are not - # mutated by the agent loop, so a shallow copy is - # sufficient; a deepcopy would walk every tool result - # and base64 image on every API call. - _invoke_hook( - "pre_api_request", - task_id=effective_task_id, - session_id=agent.session_id or "", - user_message=original_user_message, - conversation_history=list(messages), - platform=agent.platform or "", - model=agent.model, - provider=agent.provider, - base_url=agent.base_url, - api_mode=agent.api_mode, - api_call_count=api_call_count, - request_messages=list(request_messages) if isinstance(request_messages, list) else [], - message_count=len(api_messages), - tool_count=len(agent.tools or []), - approx_input_tokens=approx_tokens, - request_char_count=total_chars, - max_tokens=agent.max_tokens, + from hermes_cli.plugins import ( + has_hook, + invoke_hook as _invoke_hook, ) + if has_hook("pre_api_request"): + request_messages = api_kwargs.get("messages") + if not isinstance(request_messages, list): + request_messages = api_kwargs.get("input") + if not isinstance(request_messages, list): + request_messages = api_messages + # Shallow-copy the outer list so plugins that retain the + # reference for async snapshotting don't observe later + # mutations of api_messages. The inner dicts are not + # mutated by the agent loop, so a shallow copy is + # sufficient; a deepcopy would walk every tool result + # and base64 image on every API call. + # + # The ``request_messages`` and ``conversation_history`` + # kwargs below are pre-existing raw passthroughs + # consumed by the bundled langfuse plugin + # (``plugins/observability/langfuse/__init__.py:_coerce_request_messages``). + # They predate ``request`` and are intentionally NOT + # sanitised — secrets are not expected here because + # ``api_kwargs`` is the same object passed to the + # provider client. New consumers should read the + # sanitised view from ``request["body"]["messages"]``. + _request_payload = agent._api_request_payload_for_hook(api_kwargs) + _invoke_hook( + "pre_api_request", + task_id=effective_task_id, + turn_id=turn_id, + api_request_id=api_request_id, + session_id=agent.session_id or "", + user_message=original_user_message, + conversation_history=list(messages), + platform=agent.platform or "", + model=agent.model, + provider=agent.provider, + base_url=agent.base_url, + api_mode=agent.api_mode, + api_call_count=api_call_count, + request_messages=list(request_messages) + if isinstance(request_messages, list) + else [], + message_count=len(api_messages), + tool_count=len(agent.tools or []), + approx_input_tokens=approx_tokens, + request_char_count=total_chars, + max_tokens=agent.max_tokens, + started_at=api_start_time, + request=_request_payload, + ) except Exception: pass @@ -1300,12 +1328,14 @@ def run_conversation( if isinstance(getattr(agent, "client", None), Mock): _use_streaming = False - if _use_streaming: - response = agent._interruptible_streaming_api_call( - api_kwargs, on_first_delta=_stop_spinner - ) - else: - response = agent._interruptible_api_call(api_kwargs) + def _perform_api_call(next_api_kwargs): + if _use_streaming: + return agent._interruptible_streaming_api_call( + next_api_kwargs, on_first_delta=_stop_spinner + ) + return agent._interruptible_api_call(next_api_kwargs) + + response = _perform_api_call(api_kwargs) api_duration = time.time() - api_start_time @@ -1406,6 +1436,21 @@ def run_conversation( error_details.append("response.choices is empty") if response_invalid: + agent._invoke_api_request_error_hook( + task_id=effective_task_id, + turn_id=turn_id, + api_request_id=api_request_id, + api_call_count=api_call_count, + api_start_time=api_start_time, + api_kwargs=api_kwargs, + error_type="InvalidAPIResponse", + error_message=", ".join(error_details) or "Invalid API response", + status_code=getattr(getattr(response, "error", None), "code", None), + retry_count=retry_count, + max_retries=max_retries, + retryable=True, + reason="invalid_response", + ) # Stop spinner silently — retry status is now buffered # and only surfaced if every retry+fallback exhausts. if thinking_spinner: @@ -2278,6 +2323,21 @@ def run_conversation( classified.retryable, classified.should_compress, classified.should_rotate_credential, classified.should_fallback, ) + agent._invoke_api_request_error_hook( + task_id=effective_task_id, + turn_id=turn_id, + api_request_id=api_request_id, + api_call_count=api_call_count, + api_start_time=api_start_time, + api_kwargs=api_kwargs, + error_type=type(api_error).__name__, + error_message=str(api_error), + status_code=status_code, + retry_count=retry_count, + max_retries=max_retries, + retryable=classified.retryable, + reason=classified.reason.value, + ) if ( classified.reason == FailoverReason.billing @@ -3507,29 +3567,44 @@ def run_conversation( assistant_message.content = str(raw) try: - from hermes_cli.plugins import invoke_hook as _invoke_hook - _assistant_tool_calls = getattr(assistant_message, "tool_calls", None) or [] - _assistant_text = assistant_message.content or "" - _invoke_hook( - "post_api_request", - task_id=effective_task_id, - session_id=agent.session_id or "", - platform=agent.platform or "", - model=agent.model, - provider=agent.provider, - base_url=agent.base_url, - api_mode=agent.api_mode, - api_call_count=api_call_count, - api_duration=api_duration, - finish_reason=finish_reason, - message_count=len(api_messages), - response_model=getattr(response, "model", None), - response=response, - usage=agent._usage_summary_for_api_request_hook(response), - assistant_message=assistant_message, - assistant_content_chars=len(_assistant_text), - assistant_tool_call_count=len(_assistant_tool_calls), + from hermes_cli.plugins import ( + has_hook, + invoke_hook as _invoke_hook, ) + if has_hook("post_api_request"): + _assistant_tool_calls = ( + getattr(assistant_message, "tool_calls", None) or [] + ) + _assistant_text = assistant_message.content or "" + _api_ended_at = api_start_time + api_duration + _invoke_hook( + "post_api_request", + task_id=effective_task_id, + turn_id=turn_id, + api_request_id=api_request_id, + session_id=agent.session_id or "", + platform=agent.platform or "", + model=agent.model, + provider=agent.provider, + base_url=agent.base_url, + api_mode=agent.api_mode, + api_call_count=api_call_count, + api_duration=api_duration, + started_at=api_start_time, + ended_at=_api_ended_at, + finish_reason=finish_reason, + message_count=len(api_messages), + response_model=getattr(response, "model", None), + response=agent._api_response_payload_for_hook( + response, + assistant_message, + finish_reason=finish_reason, + ), + usage=agent._usage_summary_for_api_request_hook(response), + assistant_message=assistant_message, + assistant_content_chars=len(_assistant_text), + assistant_tool_call_count=len(_assistant_tool_calls), + ) except Exception: pass @@ -4623,6 +4698,8 @@ def run_conversation( _invoke_hook( "post_llm_call", session_id=agent.session_id, + task_id=effective_task_id, + turn_id=turn_id, user_message=original_user_message, assistant_response=final_response, conversation_history=list(messages), @@ -4742,6 +4819,8 @@ def run_conversation( _invoke_hook( "on_session_end", session_id=agent.session_id, + task_id=effective_task_id, + turn_id=turn_id, completed=completed, interrupted=interrupted, model=agent.model, diff --git a/agent/tool_executor.py b/agent/tool_executor.py index bbbd239df..e99dce3fb 100644 --- a/agent/tool_executor.py +++ b/agent/tool_executor.py @@ -19,7 +19,7 @@ import os import random import threading import time -from typing import Optional +from typing import Any, Optional from agent.display import ( KawaiiSpinner, @@ -58,6 +58,78 @@ def _ra(): return run_agent +def _emit_terminal_post_tool_call( + agent, + *, + function_name: str, + function_args: dict, + result: Any, + effective_task_id: str, + tool_call_id: str, + duration_ms: int = 0, + status: str | None = None, + error_type: str | None = None, + error_message: str | None = None, +) -> None: + try: + from model_tools import _emit_post_tool_call_hook, _tool_result_observer_fields + if status is None: + status, error_type, error_message = _tool_result_observer_fields(result) + _emit_post_tool_call_hook( + function_name=function_name, + function_args=function_args, + result=result, + task_id=effective_task_id or "", + session_id=getattr(agent, "session_id", "") or "", + tool_call_id=tool_call_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", + duration_ms=duration_ms, + status=status, + error_type=error_type, + error_message=error_message, + ) + except Exception: + pass + + +def _cancelled_tool_result(reason: str = "user interrupt") -> str: + return json.dumps( + { + "error": f"Tool execution cancelled by {reason}", + "status": "cancelled", + }, + ensure_ascii=False, + ) + + +def _emit_cancelled_terminal_post_tool_call( + agent, + *, + function_name: str, + function_args: dict, + effective_task_id: str, + tool_call_id: str, + start_time: float, + reason: str = "user interrupt", + error_type: str = "keyboard_interrupt", +) -> str: + result = _cancelled_tool_result(reason) + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=result, + effective_task_id=effective_task_id, + tool_call_id=tool_call_id, + duration_ms=int((time.time() - start_time) * 1000), + status="cancelled", + error_type=error_type, + error_message=f"Tool execution cancelled by {reason}", + ) + return result + + def _tool_search_scoped_names(agent) -> frozenset: """Return the deferrable tool names the session may invoke via tool_call. @@ -188,22 +260,61 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe if _ts_scope_block is not None: # Out-of-scope tool_call: reject before hooks/guardrails/dispatch. block_result = _ts_scope_block + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=block_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + status="blocked", + error_type="tool_scope_block", + error_message=_ts_scope_block, + ) else: try: from hermes_cli.plugins import get_pre_tool_call_block_message block_message = get_pre_tool_call_block_message( - function_name, function_args, task_id=effective_task_id or "", + function_name, + function_args, + task_id=effective_task_id or "", + session_id=getattr(agent, "session_id", "") or "", + tool_call_id=getattr(tool_call, "id", "") or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", ) except Exception: block_message = None if block_message is not None: block_result = json.dumps({"error": block_message}, ensure_ascii=False) + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=block_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + status="blocked", + error_type="plugin_block", + error_message=block_message, + ) else: guardrail_decision = agent._tool_guardrails.before_call(function_name, function_args) if not guardrail_decision.allows_execution: block_result = agent._guardrail_block_result(guardrail_decision) blocked_by_guardrail = True + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=block_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + status="blocked", + error_type="guardrail_block", + error_message=getattr(guardrail_decision, "message", None) or "Tool blocked by guardrail policy", + ) # ── Checkpoint preflight (only for tools that will execute) ── if block_result is None: @@ -315,6 +426,23 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe messages=messages, pre_tool_block_checked=True, ) + except KeyboardInterrupt: + try: + agent.interrupt("keyboard interrupt") + except Exception: + pass + result = _emit_cancelled_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + start_time=start, + ) + duration = time.time() - start + logger.info("tool %s cancelled (%.2fs)", function_name, duration) + results[index] = (function_name, function_args, result, duration, True, False) + return except Exception as tool_error: result = f"Error executing tool '{function_name}': {tool_error}" logger.error("_invoke_tool raised for %s: %s", function_name, tool_error, exc_info=True) @@ -426,8 +554,30 @@ def execute_tool_calls_concurrent(agent, assistant_message, messages: list, effe # Tool was cancelled (interrupt) or thread didn't return if agent._interrupt_requested: function_result = f"[Tool execution cancelled — {name} was skipped due to user interrupt]" + _emit_terminal_post_tool_call( + agent, + function_name=name, + function_args=args, + result=function_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tc, "id", "") or "", + status="cancelled", + error_type="keyboard_interrupt", + error_message="Tool execution cancelled by user interrupt", + ) else: function_result = f"Error executing tool '{name}': thread did not return a result" + _emit_terminal_post_tool_call( + agent, + function_name=name, + function_args=args, + result=function_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tc, "id", "") or "", + status="error", + error_type="thread_missing_result", + error_message=function_result, + ) tool_duration = 0.0 else: function_name, function_args, function_result, tool_duration, is_error, blocked = r @@ -592,13 +742,21 @@ def execute_tool_calls_sequential(agent, assistant_message, messages: list, effe # Check plugin hooks for a block directive before executing. _block_msg: Optional[str] = None + _block_error_type = "plugin_block" if _ts_scope_block is not None: _block_msg = _ts_scope_block + _block_error_type = "tool_scope_block" else: try: from hermes_cli.plugins import get_pre_tool_call_block_message _block_msg = get_pre_tool_call_block_message( - function_name, function_args, task_id=effective_task_id or "", + function_name, + function_args, + task_id=effective_task_id or "", + session_id=getattr(agent, "session_id", "") or "", + tool_call_id=getattr(tool_call, "id", "") or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", ) except Exception: pass @@ -687,11 +845,33 @@ def execute_tool_calls_sequential(agent, assistant_message, messages: list, effe # Tool blocked by plugin policy — return error without executing. function_result = json.dumps({"error": _block_msg}, ensure_ascii=False) tool_duration = 0.0 + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=function_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + status="blocked", + error_type=_block_error_type, + error_message=_block_msg, + ) elif _guardrail_block_decision is not None: # Tool blocked by tool-loop guardrail — synthesize exactly one # tool result for the original tool_call_id without executing. function_result = agent._guardrail_block_result(_guardrail_block_decision) tool_duration = 0.0 + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=function_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + status="blocked", + error_type="guardrail_block", + error_message=getattr(_guardrail_block_decision, "message", None) or "Tool blocked by guardrail policy", + ) elif function_name == "todo": from tools.todo_tool import todo_tool as _todo_tool function_result = _todo_tool( @@ -850,12 +1030,29 @@ def execute_tool_calls_sequential(agent, assistant_message, messages: list, effe function_name, function_args, effective_task_id, tool_call_id=tool_call.id, session_id=agent.session_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", enabled_tools=list(agent.valid_tool_names) if agent.valid_tool_names else None, skip_pre_tool_call_hook=True, enabled_toolsets=getattr(agent, "enabled_toolsets", None), disabled_toolsets=getattr(agent, "disabled_toolsets", None), ) _spinner_result = function_result + except KeyboardInterrupt: + function_result = _emit_cancelled_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + start_time=tool_start_time, + ) + _spinner_result = function_result + try: + agent.interrupt("keyboard interrupt") + except Exception: + pass + raise except Exception as tool_error: function_result = f"Error executing tool '{function_name}': {tool_error}" logger.error("handle_function_call raised for %s: %s", function_name, tool_error, exc_info=True) @@ -872,11 +1069,27 @@ def execute_tool_calls_sequential(agent, assistant_message, messages: list, effe function_name, function_args, effective_task_id, tool_call_id=tool_call.id, session_id=agent.session_id or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", enabled_tools=list(agent.valid_tool_names) if agent.valid_tool_names else None, skip_pre_tool_call_hook=True, enabled_toolsets=getattr(agent, "enabled_toolsets", None), disabled_toolsets=getattr(agent, "disabled_toolsets", None), ) + except KeyboardInterrupt: + _emit_cancelled_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + start_time=tool_start_time, + ) + try: + agent.interrupt("keyboard interrupt") + except Exception: + pass + raise except Exception as tool_error: function_result = f"Error executing tool '{function_name}': {tool_error}" logger.error("handle_function_call raised for %s: %s", function_name, tool_error, exc_info=True) @@ -895,6 +1108,27 @@ def execute_tool_calls_sequential(agent, assistant_message, messages: list, effe # Log tool errors to the persistent error log so [error] tags # in the UI always have a corresponding detailed entry on disk. _is_error_result, _ = _detect_tool_failure(function_name, function_result) + # The agent-runtime tools above (todo, session_search, memory, + # context-engine, memory-manager, clarify, delegate_task) are + # dispatched inline — they never reach handle_function_call, so the + # executor is the one that has to fire post_tool_call. For + # registry-dispatched tools the else-branch above invoked + # handle_function_call, which already fires the hook. + from agent.agent_runtime_helpers import agent_runtime_owns_post_tool_hook + _executor_must_emit_post_hook = ( + not _execution_blocked + and agent_runtime_owns_post_tool_hook(agent, function_name) + ) + if _executor_must_emit_post_hook: + _emit_terminal_post_tool_call( + agent, + function_name=function_name, + function_args=function_args, + result=function_result, + effective_task_id=effective_task_id, + tool_call_id=getattr(tool_call, "id", "") or "", + duration_ms=int(tool_duration * 1000), + ) if not _execution_blocked: function_result = agent._append_guardrail_observation( function_name, diff --git a/cli.py b/cli.py index 4120a7191..598e6d3b5 100644 --- a/cli.py +++ b/cli.py @@ -969,7 +969,12 @@ def _run_cleanup(): # session boundary — NOT per-turn inside run_conversation(). try: from hermes_cli.plugins import invoke_hook as _invoke_hook - _invoke_hook("on_session_finalize", session_id=_active_agent_ref.session_id if _active_agent_ref else None, platform="cli") + _invoke_hook( + "on_session_finalize", + session_id=_active_agent_ref.session_id if _active_agent_ref else None, + platform="cli", + reason="shutdown", + ) except Exception: pass try: @@ -989,6 +994,42 @@ def _run_cleanup(): pass +def _emit_interrupted_session_end(cli, *, reason: str = "keyboard_interrupt") -> None: + """Best-effort on_session_end hook for interrupted non-interactive runs.""" + agent = getattr(cli, "agent", None) + if agent is None: + return + + try: + agent.interrupt(reason.replace("_", " ")) + except Exception: + pass + + session_id = getattr(agent, "session_id", None) or getattr(cli, "session_id", None) + if session_id: + try: + cli.session_id = session_id + except Exception: + pass + + try: + from hermes_cli.plugins import invoke_hook as _invoke_hook + _invoke_hook( + "on_session_end", + session_id=session_id, + task_id=getattr(agent, "_current_task_id", "") or "", + turn_id=getattr(agent, "_current_turn_id", "") or "", + api_request_id=getattr(agent, "_current_api_request_id", "") or "", + completed=False, + interrupted=True, + model=getattr(agent, "model", None), + platform=getattr(agent, "platform", None) or "cli", + reason=reason, + ) + except Exception: + pass + + def _reset_terminal_input_modes_on_exit() -> None: """Best-effort: disable focus reporting + mouse tracking on TUI exit so they don't leak into the next shell session sharing the tab. @@ -6530,6 +6571,7 @@ class HermesCLI: event_type, session_id=self.agent.session_id if self.agent else None, platform=getattr(self, "platform", None) or "cli", + reason="new_session" if event_type == "on_session_reset" else "session_boundary", ) except Exception: pass @@ -15284,6 +15326,7 @@ class HermesCLI: interrupted=True, model=getattr(self.agent, 'model', None), platform=getattr(self.agent, 'platform', None) or "cli", + reason="shutdown", ) except Exception: pass @@ -15643,6 +15686,7 @@ def main( raise KeyboardInterrupt() try: import signal as _signal + _signal.signal(_signal.SIGINT, _signal_handler_q) _signal.signal(_signal.SIGTERM, _signal_handler_q) if hasattr(_signal, "SIGHUP"): _signal.signal(_signal.SIGHUP, _signal_handler_q) @@ -15764,10 +15808,15 @@ def main( # status lines). The response is printed once below. cli.agent.stream_delta_callback = None cli.agent.tool_gen_callback = None - result = cli.agent.run_conversation( - user_message=effective_query, - conversation_history=cli.conversation_history, - ) + try: + result = cli.agent.run_conversation( + user_message=effective_query, + conversation_history=cli.conversation_history, + ) + except KeyboardInterrupt: + _emit_interrupted_session_end(cli, reason="keyboard_interrupt") + print(f"\nsession_id: {cli.session_id}", file=sys.stderr) + sys.exit(130) # Sync session_id if mid-run compression created a # continuation session. The exit line below reports # session_id to stderr for automation wrappers; without diff --git a/docs/observability/README.md b/docs/observability/README.md new file mode 100644 index 000000000..9040929ca --- /dev/null +++ b/docs/observability/README.md @@ -0,0 +1,316 @@ +# Hermes Observer Hooks + +Hermes observer hooks are the read-only telemetry contract for plugins that +need to reconstruct agent execution without changing runtime behavior. This +contract supports trace, metrics, audit, replay, and export integrations such +as Langfuse, OpenTelemetry-style collectors, and NeMo Relay. + +Observer hooks are intentionally backend-neutral. They expose stable lifecycle +events, correlation IDs, sanitized payloads, timing, status, and error fields. +They do not replace Hermes' planner, model providers, memory, tool registry, +approval UX, CLI, gateway behavior, or execution semantics. + +Behavior-changing request or execution wrappers are outside this observer +contract. Observer hooks should report what happened; they should not replace +provider requests, tool arguments, or execution callbacks. + +## Contract + +Plugins register observer callbacks from `register(ctx)`: + +```python +def register(ctx): + ctx.register_hook("pre_api_request", on_pre_api_request) + ctx.register_hook("post_api_request", on_post_api_request) + ctx.register_hook("pre_tool_call", on_pre_tool_call) + ctx.register_hook("post_tool_call", on_post_tool_call) +``` + +Every hook callback receives keyword arguments. Plugins should accept +`**kwargs` so additive fields remain backward-compatible: + +```python +def on_post_tool_call(**kwargs): + tool_name = kwargs.get("tool_name") + status = kwargs.get("status") + result = kwargs.get("result") +``` + +The plugin manager injects this field into every hook payload: + +```text +telemetry_schema_version = "hermes.observer.v1" +``` + +Hook callbacks are fail-open. Hermes catches callback exceptions, logs a +warning, and keeps the agent loop running. + +Most observer hook return values are ignored. The exceptions are older +behavior-affecting hooks: + +| Hook | Return behavior | +| --- | --- | +| `pre_llm_call` | May return a string or `{"context": "..."}` to inject ephemeral context into the current user message. | +| `pre_tool_call` | May return `{"action": "block", "message": "..."}` to block a tool before execution. | +| `transform_tool_result` | May return a replacement tool result string after `post_tool_call`. | +| `transform_llm_output` | May return a replacement final assistant text string. | + +Telemetry plugins should treat these behavior-affecting returns as optional +compatibility features, not as observability requirements. + +## Correlation IDs + +Observer payloads use stable IDs so plugins can join events without relying on +callback order alone. + +| Field | Meaning | +| --- | --- | +| `session_id` | Conversation/session identity. | +| `task_id` | Task identity, especially useful for subagents and isolated execution. | +| `turn_id` | User-turn identity shared by API attempts and tool calls in a turn. | +| `api_request_id` | Opaque provider-attempt identity. Do not parse its string format. | +| `api_call_count` | Numeric API attempt count within the agent loop. | +| `tool_call_id` | Provider-supplied tool call ID when available. | +| `parent_session_id` / `child_session_id` | Session link for delegated subagents. | +| `parent_subagent_id` / `child_subagent_id` | Subagent link when available. | +| `parent_turn_id` | Parent turn that spawned delegated work. | + +Consumers should prefer explicit fields over parsing compound IDs. In +particular, `api_request_id` is an opaque correlation value. + +## Event Families + +### Session Lifecycle + +Session hooks describe conversation boundaries and resets: + +| Hook | When it fires | +| --- | --- | +| `on_session_start` | A brand-new session starts after the system prompt is built. | +| `on_session_end` | A `run_conversation` call ends, including interrupted or incomplete turns. | +| `on_session_finalize` | CLI or gateway tears down an active session identity. | +| `on_session_reset` | CLI or gateway moves from an old session identity to a new one. | + +Common fields include `session_id`, `completed`, `interrupted`, `reason`, +`old_session_id`, and `new_session_id` where available. + +`on_session_end` is turn/run scoped. It is not necessarily the final lifetime +boundary for a chat identity. Use `on_session_finalize` and `on_session_reset` +for lifecycle cleanup that must happen once per session identity. + +### Turn-Scoped LLM Hooks + +These hooks frame the user turn, not individual provider API attempts: + +| Hook | When it fires | +| --- | --- | +| `pre_llm_call` | Before the tool loop begins for a user turn. | +| `post_llm_call` | After the turn completes with final assistant output. | + +Common `pre_llm_call` fields include `session_id`, `turn_id`, +`user_message`, `conversation_history`, `is_first_turn`, `model`, `platform`, +and `sender_id`. + +Common `post_llm_call` fields include `session_id`, `turn_id`, +`user_message`, `assistant_response`, `conversation_history`, `model`, and +`platform`. + +Use request-scoped API hooks for LLM span telemetry. Use `pre_llm_call` and +`post_llm_call` for turn-level context, compatibility, and final turn summary. + +### Request-Scoped API Hooks + +API hooks describe provider attempts inside the agent loop: + +| Hook | When it fires | +| --- | --- | +| `pre_api_request` | Immediately before a provider API request. | +| `post_api_request` | After a successful provider response. | +| `api_request_error` | After a failed provider request or retryable error path. | + +`pre_api_request` includes: + +- identity: `session_id`, `task_id`, `turn_id`, `api_request_id` +- runtime: `platform`, `model`, `provider`, `base_url`, `api_mode` +- attempt metadata: `api_call_count`, `message_count`, `tool_count`, + `approx_input_tokens`, `request_char_count`, `max_tokens` +- timing: `started_at` +- sanitized request payload: `request` + +`post_api_request` includes the same identity/runtime fields plus: + +- `api_duration`, `started_at`, `ended_at` +- `finish_reason`, `message_count`, `response_model` +- `usage` +- `assistant_content_chars`, `assistant_tool_call_count` +- sanitized response payload: `response` +- compatibility object: `assistant_message` + +`api_request_error` includes the same identity/runtime fields plus: + +- `api_duration`, `started_at`, `ended_at` +- `status_code`, `retry_count`, `max_retries`, `retryable`, `reason` +- structured `error = {"type": ..., "message": ...}` +- sanitized failed request payload: `request` + +The sanitized `request`, `response`, and `error` fields are the canonical +observer inputs for new consumers. + +### Tool Lifecycle + +Tool hooks describe individual tool calls: + +| Hook | When it fires | +| --- | --- | +| `pre_tool_call` | Before guardrail-approved tool dispatch. | +| `post_tool_call` | After tool dispatch, cancellation, block, or error completion. | +| `transform_tool_result` | After `post_tool_call`, before the result is appended to model context. | + +`pre_tool_call` includes `tool_name`, `args`, `task_id`, `session_id`, +`tool_call_id`, `turn_id`, and `api_request_id`. + +`post_tool_call` includes the same identity fields plus `result`, +`duration_ms`, `status`, `error_type`, and `error_message`. + +`status` is the observer-grade lifecycle outcome. Common values include: + +| Status | Meaning | +| --- | --- | +| `ok` | Tool completed normally. | +| `error` | Tool ran and returned or raised an error outcome. | +| `blocked` | A `pre_tool_call` hook blocked execution. | +| `cancelled` | Execution was cancelled before normal completion. | + +`post_tool_call` is emitted for blocked and cancelled paths so telemetry +plugins can close spans cleanly. + +### Approval Lifecycle + +Approval hooks describe dangerous-command approval prompts: + +| Hook | When it fires | +| --- | --- | +| `pre_approval_request` | Before the approval request is shown or sent. | +| `post_approval_response` | After the user responds or the request times out. | + +Common fields include `command`, `description`, `pattern_key`, +`pattern_keys`, `session_key`, and `surface`. + +`post_approval_response` also includes `choice`, with values such as `once`, +`session`, `always`, `deny`, and `timeout`. + +Approval hooks are observer-only. Plugins cannot pre-answer or veto approvals +from these hooks. To prevent a tool from reaching approval, use +`pre_tool_call` blocking. + +### Subagent Lifecycle + +Subagent hooks describe delegated child-agent work: + +| Hook | When it fires | +| --- | --- | +| `subagent_start` | A delegated child agent is created. | +| `subagent_stop` | A delegated child agent returns or fails. | + +`subagent_start` fields include `parent_session_id`, `parent_turn_id`, +`parent_subagent_id`, `child_session_id`, `child_subagent_id`, `child_role`, +and `child_goal`. + +`subagent_stop` fields include parent/child session IDs, role/status fields, +`child_summary`, and `duration_ms`. + +Observers can use these hooks to model nested trajectories while keeping child +agent execution linked to the parent turn that spawned it. + +## Payload Safety + +Observer payloads are designed for telemetry consumers, not raw object access. +New consumers should use the sanitized API payloads: + +- `pre_api_request.request` +- `post_api_request.response` +- `api_request_error.request` +- `api_request_error.error` + +Sanitization converts provider objects to JSON-compatible structures, bounds +large payloads, redacts sensitive keys, and avoids exposing raw response +objects in sanitized fields. + +Legacy compatibility fields such as `request_messages`, `conversation_history`, +and `assistant_message` may still be present for existing plugins. New +observability consumers should prefer the sanitized payloads. + +## Performance + +The default uninstrumented path should stay cheap. Expensive request/response +payload construction is gated behind `has_hook(...)`, so Hermes only builds +sanitized API telemetry payloads when at least one plugin registered the +relevant hook. + +Plugin authors should preserve this property: + +- Register only hooks the plugin actually consumes. +- Avoid deep-copying or re-sanitizing already sanitized payloads. +- Keep hook callbacks fast and fail-open. +- Offload network export or batch writes when practical. + +## Writing An Observer Plugin + +Minimal observer plugin: + +```python +def register(ctx): + ctx.register_hook("pre_api_request", on_pre_api_request) + ctx.register_hook("post_api_request", on_post_api_request) + ctx.register_hook("pre_tool_call", on_pre_tool_call) + ctx.register_hook("post_tool_call", on_post_tool_call) + + +def on_pre_api_request(**kwargs): + start_llm_span( + request_id=kwargs.get("api_request_id"), + turn_id=kwargs.get("turn_id"), + request=kwargs.get("request"), + model=kwargs.get("model"), + ) + + +def on_post_api_request(**kwargs): + finish_llm_span( + request_id=kwargs.get("api_request_id"), + response=kwargs.get("response"), + usage=kwargs.get("usage"), + duration=kwargs.get("api_duration"), + ) + + +def on_pre_tool_call(**kwargs): + start_tool_span( + call_id=kwargs.get("tool_call_id"), + name=kwargs.get("tool_name"), + args=kwargs.get("args"), + ) + + +def on_post_tool_call(**kwargs): + finish_tool_span( + call_id=kwargs.get("tool_call_id"), + result=kwargs.get("result"), + status=kwargs.get("status"), + duration_ms=kwargs.get("duration_ms"), + ) +``` + +Use `session_id`, `turn_id`, `api_request_id`, and `tool_call_id` for span +correlation. Use subagent and approval hooks when the export format supports +nested agent work or security lifecycle events. + +## Existing Consumers + +The bundled Langfuse plugin demonstrates direct hook-based observability for +turns, provider requests, and tool calls. + +The bundled NeMo Relay plugin maps the same generic observer contract to NeMo +Relay scopes, LLM spans, tool spans, marks, ATOF streams, and ATIF exports. +NeMo Relay-specific configuration and examples live in +[`plugins/observability/nemo_relay/README.md`](../../plugins/observability/nemo_relay/README.md). diff --git a/gateway/run.py b/gateway/run.py index 3f950685f..df0d76ed3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3804,6 +3804,7 @@ class GatewayRunner: "on_session_finalize", session_id=getattr(agent, "session_id", None), platform="gateway", + reason="shutdown", ) except Exception: pass @@ -5036,6 +5037,7 @@ class GatewayRunner: "on_session_finalize", session_id=entry.session_id, platform=_platform, + reason="session_expired", ) except Exception: pass @@ -9995,12 +9997,19 @@ class GatewayRunner: # previous conversation must not survive the reset. self._clear_session_boundary_security_state(session_key) + _old_sid = old_entry.session_id if old_entry else None + # Fire plugin on_session_finalize hook (session boundary) try: from hermes_cli.plugins import invoke_hook as _invoke_hook - _old_sid = old_entry.session_id if old_entry else None - _invoke_hook("on_session_finalize", session_id=_old_sid, - platform=source.platform.value if source.platform else "") + _invoke_hook( + "on_session_finalize", + session_id=_old_sid, + platform=source.platform.value if source.platform else "", + reason="new_session", + old_session_id=_old_sid, + new_session_id=new_entry.session_id if new_entry else None, + ) except Exception: pass @@ -10069,8 +10078,14 @@ class GatewayRunner: try: from hermes_cli.plugins import invoke_hook as _invoke_hook _new_sid = new_entry.session_id if new_entry else None - _invoke_hook("on_session_reset", session_id=_new_sid, - platform=source.platform.value if source.platform else "") + _invoke_hook( + "on_session_reset", + session_id=_new_sid, + platform=source.platform.value if source.platform else "", + reason="new_session", + old_session_id=_old_sid, + new_session_id=_new_sid, + ) except Exception: pass diff --git a/hermes_cli/plugins.py b/hermes_cli/plugins.py index b904b8a01..fd449fc27 100644 --- a/hermes_cli/plugins.py +++ b/hermes_cli/plugins.py @@ -49,6 +49,7 @@ from typing import Any, Callable, Dict, List, Optional, Set, Union from hermes_constants import get_hermes_home from utils import env_var_enabled from hermes_cli.config import cfg_get +OBSERVER_SCHEMA_VERSION = "hermes.observer.v1" def get_bundled_plugins_dir() -> Path: @@ -137,10 +138,12 @@ VALID_HOOKS: Set[str] = { "post_llm_call", "pre_api_request", "post_api_request", + "api_request_error", "on_session_start", "on_session_end", "on_session_finalize", "on_session_reset", + "subagent_start", "subagent_stop", # Gateway pre-dispatch hook. Fired once per incoming MessageEvent # after the internal-event guard but BEFORE auth/pairing and agent @@ -1551,6 +1554,7 @@ class PluginManager: are reused. All injected context is ephemeral — never persisted to session DB. """ + kwargs.setdefault("telemetry_schema_version", OBSERVER_SCHEMA_VERSION) callbacks = self._hooks.get(hook_name, []) results: List[Any] = [] for cb in callbacks: @@ -1567,6 +1571,10 @@ class PluginManager: ) return results + def has_hook(self, hook_name: str) -> bool: + """Return True when at least one callback is registered for a hook.""" + return bool(self._hooks.get(hook_name)) + # ----------------------------------------------------------------------- # Introspection # ----------------------------------------------------------------------- @@ -1647,6 +1655,10 @@ def invoke_hook(hook_name: str, **kwargs: Any) -> List[Any]: return get_plugin_manager().invoke_hook(hook_name, **kwargs) +def has_hook(hook_name: str) -> bool: + """Return True when a hook has registered callbacks.""" + return get_plugin_manager().has_hook(hook_name) + _thread_tool_whitelist = threading.local() @@ -1669,6 +1681,8 @@ def get_pre_tool_call_block_message( task_id: str = "", session_id: str = "", tool_call_id: str = "", + turn_id: str = "", + api_request_id: str = "", ) -> Optional[str]: """Check ``pre_tool_call`` hooks for a blocking directive. @@ -1693,6 +1707,8 @@ def get_pre_tool_call_block_message( task_id=task_id, session_id=session_id, tool_call_id=tool_call_id, + turn_id=turn_id, + api_request_id=api_request_id, ) for result in hook_results: diff --git a/model_tools.py b/model_tools.py index 8e85581be..863a07dde 100644 --- a/model_tools.py +++ b/model_tools.py @@ -799,12 +799,60 @@ def _coerce_boolean(value: str): return value +def _emit_post_tool_call_hook( + *, + function_name: str, + function_args: Dict[str, Any], + result: Any, + task_id: Optional[str] = None, + session_id: Optional[str] = None, + tool_call_id: Optional[str] = None, + turn_id: Optional[str] = None, + api_request_id: Optional[str] = None, + duration_ms: int = 0, + status: str = "ok", + error_type: Optional[str] = None, + error_message: Optional[str] = None, +) -> None: + try: + from hermes_cli.plugins import invoke_hook + invoke_hook( + "post_tool_call", + tool_name=function_name, + args=function_args, + result=result, + task_id=task_id or "", + session_id=session_id or "", + tool_call_id=tool_call_id or "", + turn_id=turn_id or "", + api_request_id=api_request_id or "", + duration_ms=duration_ms, + status=status, + error_type=error_type, + error_message=error_message, + ) + except Exception as _hook_err: + logger.debug("post_tool_call hook error: %s", _hook_err) + + +def _tool_result_observer_fields(result: Any) -> tuple[str, Optional[str], Optional[str]]: + try: + parsed_result = json.loads(result) if isinstance(result, str) else result + if isinstance(parsed_result, dict) and parsed_result.get("error"): + return "error", "tool_error", str(parsed_result.get("error")) + except Exception: + pass + return "ok", None, None + + def handle_function_call( function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None, tool_call_id: Optional[str] = None, session_id: Optional[str] = None, + turn_id: Optional[str] = None, + api_request_id: Optional[str] = None, user_task: Optional[str] = None, enabled_tools: Optional[List[str]] = None, skip_pre_tool_call_hook: bool = False, @@ -837,6 +885,8 @@ def handle_function_call( """ # Coerce string arguments to their schema-declared types (e.g. "42"→42) function_args = coerce_tool_args(function_name, function_args) + if not isinstance(function_args, dict): + function_args = {} # ── Tool Search bridge dispatch ────────────────────────────────── # tool_search and tool_describe are pure catalog reads — handle them @@ -935,12 +985,28 @@ def handle_function_call( task_id=task_id or "", session_id=session_id or "", tool_call_id=tool_call_id or "", + turn_id=turn_id or "", + api_request_id=api_request_id or "", ) except Exception as _hook_err: logger.debug("pre_tool_call hook error: %s", _hook_err) if block_message is not None: - return json.dumps({"error": block_message}, ensure_ascii=False) + result = json.dumps({"error": block_message}, ensure_ascii=False) + _emit_post_tool_call_hook( + function_name=function_name, + function_args=function_args, + result=result, + task_id=task_id, + session_id=session_id, + tool_call_id=tool_call_id, + turn_id=turn_id, + api_request_id=api_request_id, + status="blocked", + error_type="plugin_block", + error_message=block_message, + ) + return result # ACP/Zed edit approval runs before any file mutation. The requester # is bound via ContextVar only for ACP sessions, so CLI/gateway paths @@ -973,37 +1039,60 @@ def handle_function_call( # to wrap every tool manually. We use monotonic() so the value is # unaffected by wall-clock adjustments during the call. _dispatch_start = time.monotonic() - if function_name == "execute_code": - # Prefer the caller-provided list so subagents can't overwrite - # the parent's tool set via the process-global. - sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names - result = registry.dispatch( - function_name, function_args, - task_id=task_id, - enabled_tools=sandbox_enabled, - ) - else: - result = registry.dispatch( - function_name, function_args, - task_id=task_id, - user_task=user_task, - ) - duration_ms = int((time.monotonic() - _dispatch_start) * 1000) - + _approval_tokens = None try: - from hermes_cli.plugins import invoke_hook - invoke_hook( - "post_tool_call", - tool_name=function_name, - args=function_args, - result=result, - task_id=task_id or "", - session_id=session_id or "", - tool_call_id=tool_call_id or "", - duration_ms=duration_ms, + from tools.approval import ( + reset_current_observability_context, + set_current_observability_context, ) - except Exception as _hook_err: - logger.debug("post_tool_call hook error: %s", _hook_err) + _approval_tokens = set_current_observability_context( + turn_id=turn_id or "", + tool_call_id=tool_call_id or "", + ) + except Exception: + reset_current_observability_context = None + try: + if function_name == "execute_code": + # Prefer the caller-provided list so subagents can't overwrite + # the parent's tool set via the process-global. + sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names + def _dispatch(next_args: Dict[str, Any]) -> Any: + return registry.dispatch( + function_name, next_args, + task_id=task_id, + enabled_tools=sandbox_enabled, + ) + else: + def _dispatch(next_args: Dict[str, Any]) -> Any: + return registry.dispatch( + function_name, next_args, + task_id=task_id, + user_task=user_task, + ) + result = _dispatch(function_args) + finally: + if _approval_tokens is not None and reset_current_observability_context is not None: + try: + reset_current_observability_context(_approval_tokens) + except Exception: + pass + duration_ms = int((time.monotonic() - _dispatch_start) * 1000) + status, error_type, error_message = _tool_result_observer_fields(result) + + _emit_post_tool_call_hook( + function_name=function_name, + function_args=function_args, + result=result, + task_id=task_id, + session_id=session_id, + tool_call_id=tool_call_id, + turn_id=turn_id, + api_request_id=api_request_id, + duration_ms=duration_ms, + status=status, + error_type=error_type, + error_message=error_message, + ) # Generic tool-result canonicalization seam: plugins receive the # final result string (JSON, usually) and may replace it by @@ -1021,7 +1110,12 @@ def handle_function_call( task_id=task_id or "", session_id=session_id or "", tool_call_id=tool_call_id or "", + turn_id=turn_id or "", + api_request_id=api_request_id or "", duration_ms=duration_ms, + status=status, + error_type=error_type, + error_message=error_message, ) for hook_result in hook_results: if isinstance(hook_result, str): diff --git a/plugins/observability/nemo_relay/README.md b/plugins/observability/nemo_relay/README.md new file mode 100644 index 000000000..f1a2c3b7d --- /dev/null +++ b/plugins/observability/nemo_relay/README.md @@ -0,0 +1,368 @@ +# NeMo Relay Observability + +Optional Hermes observability plugin that maps Hermes observer hooks to +NeMo Relay scopes, LLM spans, tool spans, marks, ATOF, and ATIF. + +NeMo Relay is NVIDIA's runtime layer for agent execution boundaries. It does +not replace Hermes Agent's planner, tools, memory, model provider routing, or +CLI UX. Instead, this plugin lets Hermes emit NeMo Relay lifecycle events for +the work Hermes already owns: sessions, turns, provider/API calls, tool calls, +approval prompts, and delegated subagents. + +With this plugin enabled, Hermes Agent can: + +- Preserve Hermes execution as NeMo Relay scopes, LLM spans, tool spans, and + mark events. +- Export raw lifecycle events as Agent Trajectory Observability Format (ATOF) + JSONL for debugging and offline inspection. +- Export Agent Trajectory Interchange Format (ATIF) trajectories for replay, + evaluation, and harness analysis workflows. +- Correlate parent sessions, delegated subagents, tool calls, and provider + calls through shared session, turn, and trajectory metadata. + +See the NeMo Relay overview for the broader runtime model: +https://docs.nvidia.com/nemo/relay/about-nemo-relay/overview + +ATOF is NVIDIA's canonical JSONL event stream representation for NeMo Relay +lifecycle events. The format is documented in the NeMo Agent Toolkit: +https://github.com/NVIDIA/NeMo-Agent-Toolkit/blob/develop/packages/nvidia_nat_atif/atof-event-format.md + +ATIF is the trajectory representation produced from those events. NVIDIA and +Harbor upstreamed ATIF v1.7 support for complex harness workflows, including +subagent trajectory embedding, trajectory IDs, multi-LLM-call step metadata, and +deterministic no-LLM orchestration steps: +https://github.com/harbor-framework/harbor/blob/main/rfcs/0001-trajectory-format.md + +## Enablement + +Enable the plugin before setting export options: + +```bash +hermes plugins enable observability/nemo_relay +``` + +The `HERMES_NEMO_RELAY_*` environment variables below only configure an +already-enabled plugin. They do not enable plugin discovery by themselves. + +For isolated test homes, enable the plugin in the same `HERMES_HOME` that the +agent run will use: + +```bash +env HERMES_HOME=/tmp/hermes-nemo-relay-test \ + hermes plugins enable observability/nemo_relay +``` + +Runs started with `--ignore_user_config` skip the enabled-plugin state from +`HERMES_HOME`, so local E2E tests should omit that flag unless the test harness +loads `observability/nemo_relay` explicitly another way. + +`HERMES_HOME` is the Hermes profile/config home used by both +`hermes plugins enable ...` and the later `hermes chat ...` run. If unset, +Hermes uses the user's default home, usually `~/.hermes`. For isolated smoke +tests, choose any writable temporary directory and use the same value for every +command in that test: + +```bash +export HERMES_HOME=/tmp/hermes-nemo-relay-test +hermes plugins enable observability/nemo_relay +hermes chat --query 'Reply exactly ok' --provider custom --model qwen3.6:35b +``` + +For source checkouts, make sure the `hermes` command you run is built from the +checkout that contains this plugin. A globally installed older CLI will not see +new bundled plugins from your working tree. + +```bash +uv sync --extra nemo-relay +uv run hermes plugins enable observability/nemo_relay +uv run hermes chat --query 'Reply exactly ok' --provider custom --model qwen3.6:35b +``` + +To ship the updated CLI into another environment, build and install a fresh +wheel from this checkout, then install the official NeMo Relay runtime extra: + +```bash +uv build --wheel +python -m pip install --force-reinstall dist/hermes_agent-*.whl +python -m pip install "nemo-relay==0.3" +hermes plugins enable observability/nemo_relay +``` + +The plugin fails open when `nemo-relay` is not installed. Install and test it against the official NeMo Relay 0.3 PyPI distribution: + +```bash +pip install "nemo-relay==0.3" +``` + +## Export Configuration + +The plugin can configure exporters directly from `HERMES_NEMO_RELAY_*` +environment variables, or delegate exporter setup to a NeMo Relay +`plugins.toml` component config. + +Use environment variables for local smoke tests, CI jobs, and one-off CLI +runs. Use `plugins.toml` when you want one NeMo Relay configuration document to +own observability components such as ATOF, ATIF, OpenTelemetry, and +OpenInference. + +### Environment Variables + +Useful local export settings after the plugin is enabled: + +```bash +export HERMES_NEMO_RELAY_ATOF_ENABLED=1 +export HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY=.nemo-relay/atof +export HERMES_NEMO_RELAY_ATIF_ENABLED=1 +export HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY=.nemo-relay/atif +``` + +Optional overrides: + +- `HERMES_NEMO_RELAY_ATOF_FILENAME` +- `HERMES_NEMO_RELAY_ATOF_MODE` (`append` or `overwrite`) +- `HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE` +- `HERMES_NEMO_RELAY_ATIF_AGENT_NAME` +- `HERMES_NEMO_RELAY_ATIF_AGENT_VERSION` +- `HERMES_NEMO_RELAY_ATIF_MODEL_NAME` +- `HERMES_NEMO_RELAY_ATIF_SUBAGENT_EXPORT_MODE` (`embedded` by default; set `all` to also write standalone child files) + +### NeMo Relay Component Config + +To initialize NeMo Relay from a component config, create a `plugins.toml` file +and point Hermes at it: + +```bash +export HERMES_NEMO_RELAY_PLUGINS_TOML=.nemo-relay/plugins.toml +``` + +Minimal ATOF and ATIF config: + +```toml +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config] +version = 1 + +[components.config.atof] +enabled = true +output_directory = ".nemo-relay/atof" +filename = "events.jsonl" +mode = "overwrite" + +[components.config.atif] +enabled = true +output_directory = ".nemo-relay/atif" +filename_template = "trajectory-{session_id}.json" +agent_name = "Hermes Agent" +agent_version = "local" +``` + +When `HERMES_NEMO_RELAY_PLUGINS_TOML` is set and initializes successfully, NeMo +Relay owns exporter lifecycle through that config. The direct +`HERMES_NEMO_RELAY_ATOF_*` fallback setup is skipped. + +## Canonical Local Examples + +The examples below use the official `nemo-relay==0.3` distribution and a local +Ollama model served through the OpenAI-compatible API. + +```bash +pip install "nemo-relay==0.3" + +export HERMES_HOME=/tmp/hermes-nemo-relay-docs/hermes-home +mkdir -p "$HERMES_HOME" + +cat > "$HERMES_HOME/config.yaml" <<'YAML' +model: + provider: custom + default: qwen3.6:35b + base_url: http://127.0.0.1:11434/v1 + api_key: ollama +plugins: + enabled: + - observability/nemo_relay +delegation: + max_spawn_depth: 2 + max_concurrent_children: 2 + child_timeout_seconds: 180 + model: qwen3.6:35b + provider: custom + base_url: http://127.0.0.1:11434/v1 + api_key: ollama +YAML +``` + +### Delegated Subagent Tool Call + +This run starts a parent Hermes session, delegates to a child subagent, has the +child call `terminal`, and writes both ATOF and ATIF. + +```bash +export HERMES_NEMO_RELAY_ATOF_ENABLED=1 +export HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY=/tmp/hermes-nemo-relay-docs/subagent/atof +export HERMES_NEMO_RELAY_ATOF_FILENAME=nested-subagent-atof.jsonl +export HERMES_NEMO_RELAY_ATOF_MODE=overwrite +export HERMES_NEMO_RELAY_ATIF_ENABLED=1 +export HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY=/tmp/hermes-nemo-relay-docs/subagent/atif +export HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE='nested-subagent-atif-{session_id}.json' +export HERMES_NEMO_RELAY_ATIF_AGENT_NAME='Hermes Agent E2E' +export HERMES_NEMO_RELAY_ATIF_AGENT_VERSION=docs-example +export HERMES_NEMO_RELAY_ATIF_SUBAGENT_EXPORT_MODE=all + +hermes chat \ + --query 'Use delegate_task exactly once. Ask the child subagent to use the terminal tool exactly once to run printf docs_nested_leaf_function. After the child returns, reply with exactly: parent received nested subagent result.' \ + --provider custom \ + --model qwen3.6:35b \ + --toolsets delegation,terminal \ + --max-turns 10 \ + --quiet \ + --accept-hooks +``` + +CLI output: + +```text +session_id: docs-parent-session +parent received nested subagent result. +``` + +Sanitized ATOF excerpt: + +```jsonl +{"kind":"scope","category":"tool","name":"delegate_task","scope_category":"start","metadata":{"session_id":"docs-parent-session","tool_call_id":"call_delegate"},"data":{"goal":"Run the command `printf docs_nested_leaf_function` using the terminal tool.","toolsets":["terminal"]}} +{"kind":"mark","name":"hermes.subagent.start","metadata":{"parent_session_id":"docs-parent-session","session_id":"docs-child-session","subagent_id":"sa-0-docs","child_role":"leaf"}} +{"kind":"scope","category":"tool","name":"terminal","scope_category":"end","metadata":{"session_id":"docs-child-session","tool_call_id":"call_terminal","status":"ok"},"data":"{\"output\":\"docs_nested_leaf_function\",\"exit_code\":0,\"error\":null}"} +{"kind":"scope","category":"tool","name":"delegate_task","scope_category":"end","metadata":{"session_id":"docs-parent-session","tool_call_id":"call_delegate","status":"ok"}} +``` + +Sanitized ATIF excerpt: + +```json +{ + "schema_version": "ATIF-v1.7", + "session_id": "docs-parent-session", + "agent": {"name": "Hermes Agent E2E", "version": "docs-example", "model_name": "qwen3.6:35b"}, + "steps": [ + { + "source": "agent", + "tool_calls": [{"function_name": "delegate_task"}], + "observation": { + "results": [ + { + "subagent_trajectory_ref": [{"session_id": "docs-child-session"}], + "content": "{\"results\":[{\"status\":\"completed\",\"tool_trace\":[{\"tool\":\"terminal\",\"status\":\"ok\"}]}]}" + } + ] + } + }, + {"source": "agent", "message": "parent received nested subagent result."} + ], + "subagent_trajectories": [ + { + "session_id": "docs-child-session", + "steps": [ + { + "source": "agent", + "tool_calls": [{"function_name": "terminal", "arguments": {"command": "printf docs_nested_leaf_function"}}], + "observation": {"results": [{"content": "{\"output\":\"docs_nested_leaf_function\",\"exit_code\":0,\"error\":null}"}]} + } + ] + } + ] +} +``` + +### Parallel Tool Calls + +This run asks the model to emit two `read_file` tool calls in the same assistant +message. Hermes dispatches the read-only tools as one batch, and NeMo Relay +records both tool invocations. + +```bash +mkdir -p /tmp/hermes-nemo-relay-docs/workdir +printf 'docs_parallel_alpha_function\n' > /tmp/hermes-nemo-relay-docs/workdir/alpha.txt +printf 'docs_parallel_beta_function\n' > /tmp/hermes-nemo-relay-docs/workdir/beta.txt +cd /tmp/hermes-nemo-relay-docs/workdir + +export HERMES_NEMO_RELAY_ATOF_ENABLED=1 +export HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY=/tmp/hermes-nemo-relay-docs/parallel/atof +export HERMES_NEMO_RELAY_ATOF_FILENAME=parallel-tools-atof.jsonl +export HERMES_NEMO_RELAY_ATOF_MODE=overwrite +export HERMES_NEMO_RELAY_ATIF_ENABLED=1 +export HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY=/tmp/hermes-nemo-relay-docs/parallel/atif +export HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE='parallel-tools-atif-{session_id}.json' +export HERMES_NEMO_RELAY_ATIF_AGENT_NAME='Hermes Agent E2E' +export HERMES_NEMO_RELAY_ATIF_AGENT_VERSION=docs-example + +hermes chat \ + --query 'Use exactly two read_file tool calls in the same assistant message. Read alpha.txt and beta.txt. Do not call terminal. After both tool results are available, reply with exactly: parallel tools complete.' \ + --provider custom \ + --model qwen3.6:35b \ + --toolsets file \ + --max-turns 8 \ + --quiet \ + --accept-hooks +``` + +CLI output: + +```text +session_id: docs-parallel-session +parallel tools complete. +``` + +Sanitized ATOF excerpt: + +```jsonl +{"kind":"scope","category":"llm","name":"custom","scope_category":"end","data":{"assistant_message":{"tool_calls":[{"id":"call_alpha","name":"read_file","arguments":"{\"path\":\"alpha.txt\"}"},{"id":"call_beta","name":"read_file","arguments":"{\"path\":\"beta.txt\"}"}]},"finish_reason":"tool_calls"}} +{"kind":"scope","category":"tool","name":"read_file","scope_category":"start","timestamp":"2026-05-31T00:15:08.956732+00:00","metadata":{"session_id":"docs-parallel-session","tool_call_id":"call_alpha"},"data":{"path":"alpha.txt"}} +{"kind":"scope","category":"tool","name":"read_file","scope_category":"start","timestamp":"2026-05-31T00:15:08.956804+00:00","metadata":{"session_id":"docs-parallel-session","tool_call_id":"call_beta"},"data":{"path":"beta.txt"}} +{"kind":"scope","category":"tool","name":"read_file","scope_category":"end","metadata":{"session_id":"docs-parallel-session","tool_call_id":"call_beta","status":"ok"},"data":"{\"content\":\" 1|docs_parallel_beta_function\\n\"}"} +{"kind":"scope","category":"tool","name":"read_file","scope_category":"end","metadata":{"session_id":"docs-parallel-session","tool_call_id":"call_alpha","status":"ok"},"data":"{\"content\":\" 1|docs_parallel_alpha_function\\n\"}"} +``` + +Sanitized ATIF excerpt: + +```json +{ + "schema_version": "ATIF-v1.7", + "session_id": "docs-parallel-session", + "agent": {"name": "Hermes Agent E2E", "version": "docs-example", "model_name": "qwen3.6:35b"}, + "steps": [ + { + "source": "agent", + "tool_calls": [ + {"tool_call_id": "call_alpha", "function_name": "read_file", "arguments": {"path": "alpha.txt"}}, + {"tool_call_id": "call_beta", "function_name": "read_file", "arguments": {"path": "beta.txt"}} + ], + "observation": { + "results": [ + {"source_call_id": "call_beta", "content": "{\"content\":\" 1|docs_parallel_beta_function\\n\"}"}, + {"source_call_id": "call_alpha", "content": "{\"content\":\" 1|docs_parallel_alpha_function\\n\"}"} + ] + } + }, + {"source": "agent", "message": "parallel tools complete."} + ] +} +``` + +## ATOF Mapping + +The plugin keeps NeMo Relay's native event model: + +- Hermes sessions map to `agent` scopes. +- Hermes API request hooks map to `llm` scope start/end events. +- Hermes tool hooks map to `tool` scope start/end events. +- Turn, approval, subagent, and diagnostic fallback events map to `mark` + events. + +For subagent correlation, mark metadata includes parent and child session IDs, +subagent IDs, role/status fields when present, and derived +`parent_trajectory_id` / `child_trajectory_id` values. This keeps the ATOF +stream lossless for later ATIF conversion that can compact subagents into +separate trajectories. diff --git a/plugins/observability/nemo_relay/__init__.py b/plugins/observability/nemo_relay/__init__.py new file mode 100644 index 000000000..25078a21c --- /dev/null +++ b/plugins/observability/nemo_relay/__init__.py @@ -0,0 +1,568 @@ +"""nemo_relay — optional Hermes plugin for NeMo Relay observability.""" + +from __future__ import annotations + +import asyncio +import inspect +import json +import logging +import os +import threading +import tomllib +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional + +logger = logging.getLogger(__name__) + +_INIT_FAILED = object() +_LOCK = threading.RLock() +_RUNTIME: "_Runtime | object | None" = None + + +@dataclass +class _SessionState: + session_id: str + handle: Any = None + atif_exporter: Any = None + atif_subscriber_name: str = "" + is_embedded_subagent: bool = False + parent_session_id: str = "" + llm_spans: dict[str, Any] = field(default_factory=dict) + tool_spans: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class _SubagentParent: + parent_session_id: str + parent_handle: Any + metadata: dict[str, Any] + + +@dataclass +class _Settings: + plugins_toml_path: str = "" + atof_enabled: bool = False + atof_output_directory: str = "" + atof_filename: str = "hermes-atof.jsonl" + atof_mode: str = "append" + atif_enabled: bool = False + atif_output_directory: str = "" + atif_filename_template: str = "hermes-atif-{session_id}.json" + atif_subagent_export_mode: str = "embedded" + atif_agent_name: str = "Hermes Agent" + atif_agent_version: str = "unknown" + atif_model_name: str = "unknown" + + +class _Runtime: + def __init__(self, nemo_relay: Any, settings: _Settings) -> None: + self.nemo_relay = nemo_relay + self.settings = settings + self.sessions: dict[str, _SessionState] = {} + self.subagent_parents: dict[str, _SubagentParent] = {} + self.atof_exporter: Any = None + self._plugin_config_initialized = self._configure_plugins_toml() + if not self._plugin_config_initialized: + self._configure_atof() + + def _configure_plugins_toml(self) -> bool: + if not self.settings.plugins_toml_path: + return False + plugin_mod = getattr(self.nemo_relay, "plugin", None) + initialize = getattr(plugin_mod, "initialize", None) + if not callable(initialize): + return False + config_path = Path(self.settings.plugins_toml_path) + try: + config = tomllib.loads(config_path.read_text(encoding="utf-8")) + self._ensure_plugin_config_output_dirs(config) + result = initialize(config) + if inspect.isawaitable(result): + asyncio.run(result) + return True + except RuntimeError: + logger.debug("NeMo Relay plugins.toml init skipped inside a running event loop") + return False + except Exception as exc: + logger.debug("NeMo Relay plugins.toml init failed: %s", exc, exc_info=True) + return False + + def _ensure_plugin_config_output_dirs(self, config: dict[str, Any]) -> None: + for component in config.get("components", []): + if not isinstance(component, dict): + continue + if component.get("kind") != "observability": + continue + if component.get("enabled") is False: + continue + component_config = component.get("config") + if not isinstance(component_config, dict): + continue + for exporter_name in ("atof", "atif"): + exporter_config = component_config.get(exporter_name) + if not isinstance(exporter_config, dict): + continue + output_directory = exporter_config.get("output_directory") + if isinstance(output_directory, str) and output_directory.strip(): + Path(output_directory).mkdir(parents=True, exist_ok=True) + + def _configure_atof(self) -> None: + if not self.settings.atof_enabled: + return + config = self.nemo_relay.AtofExporterConfig() + if self.settings.atof_output_directory: + Path(self.settings.atof_output_directory).mkdir(parents=True, exist_ok=True) + config.output_directory = self.settings.atof_output_directory + config.filename = self.settings.atof_filename + if self.settings.atof_mode.lower() == "overwrite": + config.mode = self.nemo_relay.AtofExporterMode.Overwrite + else: + config.mode = self.nemo_relay.AtofExporterMode.Append + self.atof_exporter = self.nemo_relay.AtofExporter(config) + self.atof_exporter.register("hermes.nemo_relay.atof") + + def ensure_session(self, kwargs: dict[str, Any]) -> _SessionState: + session_id = _session_id(kwargs) + state = self.sessions.get(session_id) + if state is not None: + return state + + state = _SessionState(session_id=session_id) + if self.settings.atif_enabled: + state.atif_exporter = self.nemo_relay.AtifExporter( + session_id, + self.settings.atif_agent_name, + self.settings.atif_agent_version, + model_name=str(kwargs.get("model") or self.settings.atif_model_name), + extra={"source": "hermes-agent", "plugin": "observability/nemo_relay"}, + ) + state.atif_subscriber_name = f"hermes.nemo_relay.atif.{session_id}" + state.atif_exporter.register(state.atif_subscriber_name) + + subagent_parent = self.subagent_parents.get(session_id) + metadata = _metadata(kwargs) + parent_handle = None + if subagent_parent is not None: + parent_handle = subagent_parent.parent_handle + metadata = {**metadata, **subagent_parent.metadata} + state.is_embedded_subagent = True + state.parent_session_id = subagent_parent.parent_session_id + + state.handle = self.nemo_relay.scope.push( + f"hermes-session-{session_id}", + self.nemo_relay.ScopeType.Agent, + handle=parent_handle, + data={"session_id": session_id}, + metadata=metadata, + ) + self.sessions[session_id] = state + return state + + def export_atif(self, state: _SessionState) -> None: + if not self.settings.atif_enabled or state.atif_exporter is None: + return + if state.is_embedded_subagent and self.settings.atif_subagent_export_mode != "all": + return + output_dir = self.settings.atif_output_directory + if not output_dir: + return + Path(output_dir).mkdir(parents=True, exist_ok=True) + filename = self.settings.atif_filename_template.format(session_id=state.session_id) + Path(output_dir, filename).write_text(state.atif_exporter.export_json(), encoding="utf-8") + + def close_session(self, kwargs: dict[str, Any]) -> None: + session_id = _session_id(kwargs) + self.subagent_parents.pop(session_id, None) + state = self.sessions.pop(session_id, None) + if state is None: + return + if state.handle is not None: + try: + self.nemo_relay.scope.pop(state.handle, output=_jsonable(kwargs)) + except Exception: + logger.debug("NeMo Relay session pop failed", exc_info=True) + self.export_atif(state) + if state.atif_exporter is not None and state.atif_subscriber_name: + try: + state.atif_exporter.deregister(state.atif_subscriber_name) + except Exception: + logger.debug("NeMo Relay ATIF deregister failed", exc_info=True) + + def mark(self, name: str, kwargs: dict[str, Any]) -> None: + state = self.ensure_session(kwargs) + self.nemo_relay.scope.event( + name, + handle=state.handle, + data=_jsonable(kwargs), + metadata=_metadata(kwargs), + ) + + def mark_subagent_start(self, kwargs: dict[str, Any]) -> None: + parent_state = self.ensure_session(kwargs) + metadata = _metadata(kwargs) + child_session_id = _child_session_id(kwargs) + if child_session_id: + self.subagent_parents[child_session_id] = _SubagentParent( + parent_session_id=parent_state.session_id, + parent_handle=parent_state.handle, + metadata=_subagent_child_metadata(kwargs, metadata), + ) + self.nemo_relay.scope.event( + "hermes.subagent.start", + handle=parent_state.handle, + data=_jsonable(kwargs), + metadata=metadata, + ) + + def mark_subagent_stop(self, kwargs: dict[str, Any]) -> None: + child_session_id = _child_session_id(kwargs) + if child_session_id: + self.subagent_parents.pop(child_session_id, None) + self.mark("hermes.subagent.stop", kwargs) + + +def register(ctx) -> None: + ctx.register_hook("on_session_start", on_session_start) + ctx.register_hook("on_session_end", on_session_end) + ctx.register_hook("on_session_finalize", on_session_finalize) + ctx.register_hook("on_session_reset", on_session_reset) + ctx.register_hook("pre_llm_call", on_pre_llm_call) + ctx.register_hook("post_llm_call", on_post_llm_call) + ctx.register_hook("pre_api_request", on_pre_api_request) + ctx.register_hook("post_api_request", on_post_api_request) + ctx.register_hook("api_request_error", on_api_request_error) + ctx.register_hook("pre_tool_call", on_pre_tool_call) + ctx.register_hook("post_tool_call", on_post_tool_call) + ctx.register_hook("pre_approval_request", on_pre_approval_request) + ctx.register_hook("post_approval_response", on_post_approval_response) + ctx.register_hook("subagent_start", on_subagent_start) + ctx.register_hook("subagent_stop", on_subagent_stop) + + +def on_session_start(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.ensure_session(kwargs)) + + +def on_session_end(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: (runtime.mark("hermes.session.end", kwargs), runtime.export_atif(runtime.ensure_session(kwargs)))) + + +def on_session_finalize(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.close_session(kwargs)) + + +def on_session_reset(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.close_session(kwargs)) + + +def on_pre_llm_call(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.mark("hermes.turn.start", kwargs)) + + +def on_post_llm_call(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.mark("hermes.turn.end", kwargs)) + + +def on_pre_api_request(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is None: + return + + def _record() -> None: + state = runtime.ensure_session(kwargs) + request_payload = kwargs.get("request") + request_body = request_payload.get("body") if isinstance(request_payload, dict) else {} + request = runtime.nemo_relay.LLMRequest({}, _jsonable(request_body)) + span = runtime.nemo_relay.llm.call( + str(kwargs.get("provider") or "llm"), + request, + handle=state.handle, + data=_jsonable({"turn_id": kwargs.get("turn_id"), "api_request_id": kwargs.get("api_request_id")}), + metadata=_metadata(kwargs), + model_name=str(kwargs.get("model") or ""), + ) + state.llm_spans[_api_key(kwargs)] = span + + _safe(_record) + + +def on_post_api_request(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is None: + return + + def _record() -> None: + state = runtime.ensure_session(kwargs) + span = state.llm_spans.pop(_api_key(kwargs), None) + if span is None: + runtime.mark("hermes.api.response.unmatched", kwargs) + return + runtime.nemo_relay.llm.call_end( + span, + _jsonable(kwargs.get("response") or {}), + data=_jsonable({"usage": kwargs.get("usage"), "finish_reason": kwargs.get("finish_reason")}), + metadata=_metadata(kwargs), + ) + + _safe(_record) + + +def on_api_request_error(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is None: + return + + def _record() -> None: + state = runtime.ensure_session(kwargs) + span = state.llm_spans.pop(_api_key(kwargs), None) + if span is None: + runtime.mark("hermes.api.error", kwargs) + return + runtime.nemo_relay.llm.call_end( + span, + {"error": _jsonable(kwargs.get("error") or {})}, + data=_jsonable(kwargs), + metadata=_metadata(kwargs), + ) + + _safe(_record) + + +def on_pre_tool_call(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is None: + return + + def _record() -> None: + state = runtime.ensure_session(kwargs) + span = runtime.nemo_relay.tools.call( + str(kwargs.get("tool_name") or "tool"), + _jsonable(kwargs.get("args") or {}), + handle=state.handle, + data=_jsonable({"turn_id": kwargs.get("turn_id"), "api_request_id": kwargs.get("api_request_id")}), + metadata=_metadata(kwargs), + tool_call_id=str(kwargs.get("tool_call_id") or ""), + ) + state.tool_spans[_tool_key(kwargs)] = span + + _safe(_record) + + +def on_post_tool_call(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is None: + return + + def _record() -> None: + state = runtime.ensure_session(kwargs) + span = state.tool_spans.pop(_tool_key(kwargs), None) + if span is None: + runtime.mark("hermes.tool.response.unmatched", kwargs) + return + runtime.nemo_relay.tools.call_end( + span, + _jsonable(kwargs.get("result")), + data=_jsonable({"status": kwargs.get("status"), "duration_ms": kwargs.get("duration_ms")}), + metadata=_metadata(kwargs), + ) + + _safe(_record) + + +def on_pre_approval_request(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.mark("hermes.approval.request", kwargs)) + + +def on_post_approval_response(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.mark("hermes.approval.response", kwargs)) + + +def on_subagent_start(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.mark_subagent_start(kwargs)) + + +def on_subagent_stop(**kwargs: Any) -> None: + runtime = _get_runtime() + if runtime is not None: + _safe(lambda: runtime.mark_subagent_stop(kwargs)) + + +def _get_runtime() -> Optional[_Runtime]: + global _RUNTIME + with _LOCK: + if _RUNTIME is _INIT_FAILED: + return None + if isinstance(_RUNTIME, _Runtime): + return _RUNTIME + try: + import nemo_relay as nemo_runtime + except Exception as exc: + logger.debug("NeMo Relay plugin disabled: import failed: %s", exc) + _RUNTIME = _INIT_FAILED + return None + try: + _RUNTIME = _Runtime(nemo_relay=nemo_runtime, settings=_load_settings()) + except Exception as exc: + logger.debug("NeMo Relay plugin disabled: init failed: %s", exc, exc_info=True) + _RUNTIME = _INIT_FAILED + return None + return _RUNTIME + + +def _load_settings() -> _Settings: + return _Settings( + plugins_toml_path=_env("HERMES_NEMO_RELAY_PLUGINS_TOML"), + atof_enabled=_env_bool("HERMES_NEMO_RELAY_ATOF_ENABLED"), + atof_output_directory=_env("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY"), + atof_filename=_env("HERMES_NEMO_RELAY_ATOF_FILENAME") or "hermes-atof.jsonl", + atof_mode=_env("HERMES_NEMO_RELAY_ATOF_MODE") or "append", + atif_enabled=_env_bool("HERMES_NEMO_RELAY_ATIF_ENABLED"), + atif_output_directory=_env("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY"), + atif_filename_template=_env("HERMES_NEMO_RELAY_ATIF_FILENAME_TEMPLATE") or "hermes-atif-{session_id}.json", + atif_subagent_export_mode=_atif_subagent_export_mode(), + atif_agent_name=_env("HERMES_NEMO_RELAY_ATIF_AGENT_NAME") or "Hermes Agent", + atif_agent_version=_env("HERMES_NEMO_RELAY_ATIF_AGENT_VERSION") or "unknown", + atif_model_name=_env("HERMES_NEMO_RELAY_ATIF_MODEL_NAME") or "unknown", + ) + + +def _env(name: str) -> str: + return os.environ.get(name, "").strip() + + +def _atif_subagent_export_mode() -> str: + mode = _env("HERMES_NEMO_RELAY_ATIF_SUBAGENT_EXPORT_MODE").lower() + return "all" if mode == "all" else "embedded" + + +def _env_bool(name: str) -> bool: + return _env(name).lower() in {"1", "true", "yes", "on"} + + +def _session_id(kwargs: dict[str, Any]) -> str: + return str(kwargs.get("session_id") or kwargs.get("parent_session_id") or "default") + + +def _child_session_id(kwargs: dict[str, Any]) -> str: + return str(kwargs.get("child_session_id") or "") + + +def _subagent_child_metadata(kwargs: dict[str, Any], parent_metadata: dict[str, Any]) -> dict[str, Any]: + child_session_id = _child_session_id(kwargs) + metadata = { + "session_id": child_session_id, + "trajectory_id": child_session_id, + "nemo_relay_scope_role": "subagent", + } + for target, source in ( + ("subagent_id", "child_subagent_id"), + ("child_session_id", "child_session_id"), + ("child_subagent_id", "child_subagent_id"), + ("child_role", "child_role"), + ("parent_session_id", "parent_session_id"), + ("parent_turn_id", "parent_turn_id"), + ("parent_subagent_id", "parent_subagent_id"), + ("parent_trajectory_id", "parent_trajectory_id"), + ("telemetry_schema_version", "telemetry_schema_version"), + ): + value = parent_metadata.get(source) + if value is not None: + metadata[target] = value + return metadata + + +def _api_key(kwargs: dict[str, Any]) -> str: + return str(kwargs.get("api_request_id") or f"{_session_id(kwargs)}:{kwargs.get('api_call_count') or 'api'}") + + +def _tool_key(kwargs: dict[str, Any]) -> str: + return str( + kwargs.get("tool_call_id") + or f"{_session_id(kwargs)}:{kwargs.get('turn_id') or ''}:{kwargs.get('tool_name') or 'tool'}" + ) + + +def _metadata(kwargs: dict[str, Any]) -> dict[str, Any]: + keys = ( + "telemetry_schema_version", + "session_id", + "platform", + "task_id", + "turn_id", + "api_request_id", + "tool_call_id", + "parent_session_id", + "parent_turn_id", + "parent_subagent_id", + "child_session_id", + "child_subagent_id", + "child_role", + "child_status", + "provider", + "model", + "api_mode", + "status", + "reason", + ) + metadata = { + key: _jsonable(kwargs[key]) + for key in keys + if key in kwargs and kwargs[key] is not None + } + if "session_id" in metadata: + metadata.setdefault("trajectory_id", metadata["session_id"]) + if "parent_session_id" in metadata: + metadata.setdefault("parent_trajectory_id", metadata["parent_session_id"]) + if "child_session_id" in metadata: + metadata.setdefault("child_trajectory_id", metadata["child_session_id"]) + return metadata + + +def _jsonable(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(k): _jsonable(v) for k, v in value.items()} + if isinstance(value, (list, tuple, set)): + return [_jsonable(v) for v in value] + try: + if hasattr(value, "model_dump"): + return _jsonable(value.model_dump(mode="json")) + except Exception: + pass + try: + return json.loads(json.dumps(value, default=str)) + except Exception: + return str(value) + + +def _safe(fn) -> None: + try: + fn() + except Exception as exc: + logger.debug("NeMo Relay hook handling failed: %s", exc, exc_info=True) + + +def reset_for_tests() -> None: + global _RUNTIME + with _LOCK: + _RUNTIME = None diff --git a/plugins/observability/nemo_relay/plugin.yaml b/plugins/observability/nemo_relay/plugin.yaml new file mode 100644 index 000000000..b1b00f25d --- /dev/null +++ b/plugins/observability/nemo_relay/plugin.yaml @@ -0,0 +1,20 @@ +name: nemo_relay +version: "0.1.0" +description: "Optional NeMo Relay observability for Hermes. Opt in with `hermes plugins enable observability/nemo_relay`; HERMES_NEMO_RELAY_* env vars configure exports after the plugin is enabled." +author: NousResearch +hooks: + - on_session_start + - on_session_end + - on_session_finalize + - on_session_reset + - pre_llm_call + - post_llm_call + - pre_api_request + - post_api_request + - api_request_error + - pre_tool_call + - post_tool_call + - pre_approval_request + - post_approval_response + - subagent_start + - subagent_stop diff --git a/pyproject.toml b/pyproject.toml index bb5faf3a9..f2bb0813b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,6 +124,7 @@ honcho = ["honcho-ai==2.0.1"] # extra that exposes a Starlette-backed server surface so pip/uv can't resolve # a vulnerable pre-1.0.1 transitive. Bump in lockstep with uv.lock. mcp = ["mcp==1.26.0", "starlette==1.0.1"] # starlette: CVE-2026-48710 +nemo-relay = ["nemo-relay==0.3"] homeassistant = ["aiohttp==3.13.3"] sms = ["aiohttp==3.13.3"] # Computer use — macOS background desktop control via cua-driver (MCP stdio). @@ -244,6 +245,7 @@ plugins = [ # " (#34034), web-search providers go missing (#28149), etc. "**/plugin.yaml", "**/plugin.yml", + "**/README.md", ] [tool.setuptools.packages.find] diff --git a/run_agent.py b/run_agent.py index bb93ff682..d0d029343 100644 --- a/run_agent.py +++ b/run_agent.py @@ -1822,6 +1822,254 @@ class AIAgent: summary["total_tokens"] = cu.total_tokens return summary + @staticmethod + def _hook_payload_max_chars() -> int: + raw = os.getenv("HERMES_PLUGIN_PAYLOAD_MAX_CHARS", "50000") + try: + return max(1000, int(raw)) + except (TypeError, ValueError): + return 50000 + + @staticmethod + def _is_sensitive_hook_key(key: Any) -> bool: + if not isinstance(key, str): + return False + lowered = key.lower().replace("-", "_") + exact = { + "api_key", + "authorization", + "proxy_authorization", + "cookie", + "set_cookie", + } + return lowered in exact or lowered.endswith("_api_key") + + @classmethod + def _hook_jsonable( + cls, + value: Any, + *, + depth: int = 0, + max_depth: int = 8, + max_string: int = 8000, + max_sequence: int = 200, + ) -> Any: + if depth > max_depth: + return f"<{type(value).__name__} depth limit>" + if value is None or isinstance(value, (bool, int, float)): + return value + if isinstance(value, str): + if len(value) > max_string: + return value[:max_string] + f"...[truncated {len(value) - max_string} chars]" + return value + if isinstance(value, (bytes, bytearray)): + return f"<{len(value)} bytes>" + if isinstance(value, dict): + out: Dict[str, Any] = {} + for idx, (key, item) in enumerate(value.items()): + if idx >= max_sequence: + out["_truncated_items"] = len(value) - max_sequence + break + str_key = str(key) + if cls._is_sensitive_hook_key(str_key): + out[str_key] = "" + else: + out[str_key] = cls._hook_jsonable( + item, + depth=depth + 1, + max_depth=max_depth, + max_string=max_string, + max_sequence=max_sequence, + ) + return out + if isinstance(value, (list, tuple, set)): + seq = list(value) + out = [ + cls._hook_jsonable( + item, + depth=depth + 1, + max_depth=max_depth, + max_string=max_string, + max_sequence=max_sequence, + ) + for item in seq[:max_sequence] + ] + if len(seq) > max_sequence: + out.append({"_truncated_items": len(seq) - max_sequence}) + return out + try: + if hasattr(value, "model_dump"): + try: + dumped = value.model_dump(mode="json") + except TypeError: + dumped = value.model_dump() + return cls._hook_jsonable( + dumped, + depth=depth + 1, + max_depth=max_depth, + max_string=max_string, + max_sequence=max_sequence, + ) + except Exception: + pass + try: + from dataclasses import asdict, is_dataclass + if is_dataclass(value): + return cls._hook_jsonable( + asdict(value), + depth=depth + 1, + max_depth=max_depth, + max_string=max_string, + max_sequence=max_sequence, + ) + except Exception: + pass + if isinstance(value, SimpleNamespace): + return cls._hook_jsonable( + vars(value), + depth=depth + 1, + max_depth=max_depth, + max_string=max_string, + max_sequence=max_sequence, + ) + if hasattr(value, "__dict__"): + try: + public_attrs = { + k: v + for k, v in vars(value).items() + if not str(k).startswith("_") + } + return cls._hook_jsonable( + public_attrs, + depth=depth + 1, + max_depth=max_depth, + max_string=max_string, + max_sequence=max_sequence, + ) + except Exception: + pass + return str(value)[:max_string] + + @classmethod + def _sanitize_hook_payload(cls, value: Any) -> Any: + payload = cls._hook_jsonable(value) + limit = cls._hook_payload_max_chars() + try: + encoded = json.dumps(payload, ensure_ascii=False, default=str) + except Exception: + return str(payload)[:limit] + if len(encoded) <= limit: + return payload + payload = cls._hook_jsonable(value, max_string=1000, max_sequence=50) + try: + encoded = json.dumps(payload, ensure_ascii=False, default=str) + except Exception: + return str(payload)[:limit] + if len(encoded) <= limit: + return payload + return { + "_truncated": True, + "original_type": type(value).__name__, + "preview": encoded[:limit], + } + + def _api_request_payload_for_hook(self, api_kwargs: Optional[Dict[str, Any]]) -> Dict[str, Any]: + body = { + key: value + for key, value in (api_kwargs or {}).items() + if key not in {"timeout", "http_client"} + } + return self._sanitize_hook_payload( + { + "method": "POST", + "body": body, + } + ) + + def _api_response_payload_for_hook( + self, + response: Any, + assistant_message: Any, + *, + finish_reason: Optional[str], + ) -> Dict[str, Any]: + # ``tool_calls`` is the raw list of provider SDK objects (e.g. + # OpenAI ``ChatCompletionMessageToolCall``). We deliberately hand + # the raw objects to ``_sanitize_hook_payload`` and rely on + # ``_hook_jsonable`` to normalise them via ``model_dump`` / + # ``__dict__`` / dataclass introspection — a future refactor of + # the sanitiser MUST preserve that capability or hook subscribers + # will receive opaque ``str(obj)`` blobs here. + tool_calls = getattr(assistant_message, "tool_calls", None) or [] + return self._sanitize_hook_payload( + { + "model": getattr(response, "model", None), + "finish_reason": finish_reason, + "assistant_message": { + "role": getattr(assistant_message, "role", "assistant"), + "content": getattr(assistant_message, "content", None), + "tool_calls": tool_calls, + }, + "usage": self._usage_summary_for_api_request_hook(response), + } + ) + + def _invoke_api_request_error_hook( + self, + *, + task_id: str, + turn_id: str, + api_request_id: str, + api_call_count: int, + api_start_time: float, + api_kwargs: Optional[Dict[str, Any]], + error_type: str, + error_message: str, + status_code: Optional[int] = None, + retry_count: Optional[int] = None, + max_retries: Optional[int] = None, + retryable: Optional[bool] = None, + reason: Optional[str] = None, + ) -> None: + # Lazy module import (not from-import) so tests that + # ``monkeypatch.setattr("hermes_cli.plugins.has_hook", ...)`` still + # take effect on this call site. After first call the import is a + # ``sys.modules`` dict lookup, so retries don't repay any real cost. + try: + from hermes_cli import plugins as _plugins + + if not _plugins.has_hook("api_request_error"): + return + ended_at = time.time() + _plugins.invoke_hook( + "api_request_error", + task_id=task_id, + turn_id=turn_id, + api_request_id=api_request_id, + session_id=self.session_id or "", + platform=self.platform or "", + model=self.model, + provider=self.provider, + base_url=self.base_url, + api_mode=self.api_mode, + api_call_count=api_call_count, + api_duration=ended_at - api_start_time, + started_at=api_start_time, + ended_at=ended_at, + status_code=status_code, + retry_count=retry_count, + max_retries=max_retries, + retryable=retryable, + reason=reason, + error={ + "type": error_type, + "message": error_message, + }, + request=self._api_request_payload_for_hook(api_kwargs), + ) + except Exception: + pass + def _dump_api_request_debug( self, api_kwargs: Dict[str, Any], diff --git a/tests/cli/test_session_boundary_hooks.py b/tests/cli/test_session_boundary_hooks.py index 3fcab991e..52c64c01c 100644 --- a/tests/cli/test_session_boundary_hooks.py +++ b/tests/cli/test_session_boundary_hooks.py @@ -1,4 +1,5 @@ from unittest.mock import MagicMock, patch +from types import SimpleNamespace from hermes_cli.plugins import VALID_HOOKS, PluginManager from cli import HermesCLI @@ -20,12 +21,18 @@ def test_session_finalize_on_reset(mock_invoke_hook): cli.new_session(silent=True) # Check if on_session_finalize was called for the old session - mock_invoke_hook.assert_any_call( - "on_session_finalize", session_id="test-session-id", platform="cli" + assert any( + c.args == ("on_session_finalize",) + and c.kwargs["session_id"] == "test-session-id" + and c.kwargs["platform"] == "cli" + for c in mock_invoke_hook.call_args_list ) # Check if on_session_reset was called for the new session - mock_invoke_hook.assert_any_call( - "on_session_reset", session_id=cli.session_id, platform="cli" + assert any( + c.args == ("on_session_reset",) + and c.kwargs["session_id"] == cli.session_id + and c.kwargs["platform"] == "cli" + for c in mock_invoke_hook.call_args_list ) @@ -41,11 +48,45 @@ def test_session_finalize_on_cleanup(mock_invoke_hook): cli_mod._run_cleanup() - mock_invoke_hook.assert_any_call( - "on_session_finalize", session_id="cleanup-session-id", platform="cli" + assert any( + c.args == ("on_session_finalize",) + and c.kwargs["session_id"] == "cleanup-session-id" + and c.kwargs["platform"] == "cli" + and c.kwargs["reason"] == "shutdown" + for c in mock_invoke_hook.call_args_list ) +@patch("hermes_cli.plugins.invoke_hook") +def test_interrupted_session_end_helper_emits_observer_shape(mock_invoke_hook): + """Verify quiet single-query interruption emits a correlated session end.""" + import cli as cli_mod + + mock_agent = MagicMock() + mock_agent.session_id = "agent-session-id" + mock_agent.model = "test-model" + mock_agent.platform = "cli" + mock_agent._current_task_id = "task-1" + mock_agent._current_turn_id = "turn-1" + mock_agent._current_api_request_id = "api-1" + cli = SimpleNamespace(agent=mock_agent, session_id="cli-session-id") + + cli_mod._emit_interrupted_session_end(cli, reason="keyboard_interrupt") + + mock_agent.interrupt.assert_called_once_with("keyboard interrupt") + assert cli.session_id == "agent-session-id" + mock_invoke_hook.assert_called_once() + call = mock_invoke_hook.call_args + assert call.args == ("on_session_end",) + assert call.kwargs["session_id"] == "agent-session-id" + assert call.kwargs["task_id"] == "task-1" + assert call.kwargs["turn_id"] == "turn-1" + assert call.kwargs["api_request_id"] == "api-1" + assert call.kwargs["completed"] is False + assert call.kwargs["interrupted"] is True + assert call.kwargs["reason"] == "keyboard_interrupt" + + @patch("hermes_cli.plugins.invoke_hook") def test_hook_errors_are_caught(mock_invoke_hook): """Verify hook exceptions are caught and don't crash the agent.""" diff --git a/tests/gateway/test_session_boundary_hooks.py b/tests/gateway/test_session_boundary_hooks.py index 305845133..9831e636c 100644 --- a/tests/gateway/test_session_boundary_hooks.py +++ b/tests/gateway/test_session_boundary_hooks.py @@ -81,8 +81,13 @@ async def test_reset_fires_finalize_hook(mock_invoke_hook): await runner._handle_reset_command(_make_event("/new")) - mock_invoke_hook.assert_any_call( - "on_session_finalize", session_id="sess-old", platform="telegram" + assert any( + c.args == ("on_session_finalize",) + and c.kwargs["session_id"] == "sess-old" + and c.kwargs["platform"] == "telegram" + and c.kwargs["old_session_id"] == "sess-old" + and c.kwargs["new_session_id"] == "sess-new" + for c in mock_invoke_hook.call_args_list ) @@ -94,8 +99,13 @@ async def test_reset_fires_reset_hook(mock_invoke_hook): await runner._handle_reset_command(_make_event("/new")) - mock_invoke_hook.assert_any_call( - "on_session_reset", session_id="sess-new", platform="telegram" + assert any( + c.args == ("on_session_reset",) + and c.kwargs["session_id"] == "sess-new" + and c.kwargs["platform"] == "telegram" + and c.kwargs["old_session_id"] == "sess-old" + and c.kwargs["new_session_id"] == "sess-new" + for c in mock_invoke_hook.call_args_list ) diff --git a/tests/hermes_cli/test_plugins.py b/tests/hermes_cli/test_plugins.py index b78e8b292..baf7f92fc 100644 --- a/tests/hermes_cli/test_plugins.py +++ b/tests/hermes_cli/test_plugins.py @@ -323,6 +323,8 @@ class TestPluginHooks: def test_valid_hooks_include_request_scoped_api_hooks(self): assert "pre_api_request" in VALID_HOOKS assert "post_api_request" in VALID_HOOKS + assert "api_request_error" in VALID_HOOKS + assert "subagent_start" in VALID_HOOKS assert "transform_terminal_output" in VALID_HOOKS assert "transform_tool_result" in VALID_HOOKS assert "transform_llm_output" in VALID_HOOKS @@ -369,6 +371,26 @@ class TestPluginHooks: # Should not raise mgr.invoke_hook("pre_tool_call", tool_name="test", args={}, task_id="t1") + def test_invoke_hook_adds_observer_schema_version(self, tmp_path, monkeypatch): + """invoke_hook() supplies the observer schema version for all hooks.""" + plugins_dir = tmp_path / "hermes_test" / "plugins" + _make_plugin_dir( + plugins_dir, + "schema_plugin", + register_body=( + 'ctx.register_hook("pre_tool_call", ' + 'lambda **kw: kw.get("telemetry_schema_version"))' + ), + ) + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test")) + + mgr = PluginManager() + mgr.discover_and_load() + + assert mgr.invoke_hook("pre_tool_call", tool_name="test", args={}) == [ + "hermes.observer.v1" + ] + def test_hook_exception_does_not_propagate(self, tmp_path, monkeypatch): """A hook callback that raises does NOT crash the caller.""" plugins_dir = tmp_path / "hermes_test" / "plugins" @@ -435,6 +457,8 @@ class TestPluginHooks: mgr = PluginManager() mgr.discover_and_load() + assert mgr.has_hook("pre_api_request") is True + assert mgr.has_hook("post_api_request") is False results = mgr.invoke_hook( "pre_api_request", session_id="s1", @@ -488,7 +512,6 @@ class TestPluginHooks: assert any("on_banana" in record.message for record in caplog.records) - class TestPreToolCallBlocking: """Tests for the pre_tool_call block directive helper.""" diff --git a/tests/plugins/test_nemo_relay_plugin.py b/tests/plugins/test_nemo_relay_plugin.py new file mode 100644 index 000000000..7c18493fd --- /dev/null +++ b/tests/plugins/test_nemo_relay_plugin.py @@ -0,0 +1,444 @@ +"""Tests for the bundled observability/nemo_relay plugin.""" + +from __future__ import annotations + +import builtins +import importlib +import json +import sys +from pathlib import Path +from types import SimpleNamespace + +import yaml + +from hermes_cli.plugins import PluginManager + + +REPO_ROOT = Path(__file__).resolve().parents[2] +PLUGIN_DIR = REPO_ROOT / "plugins" / "observability" / "nemo_relay" + + +class _FakeNemoRelay: + def __init__(self): + self.events = [] + self.ScopeType = SimpleNamespace(Agent="agent") + self.scope = SimpleNamespace( + push=self._scope_push, + pop=self._scope_pop, + event=self._scope_event, + ) + self.llm = SimpleNamespace(call=self._llm_call, call_end=self._llm_call_end) + self.tools = SimpleNamespace(call=self._tool_call, call_end=self._tool_call_end) + self.plugin = SimpleNamespace(initialize=self._plugin_initialize) + self.LLMRequest = _FakeLLMRequest + self.AtofExporterConfig = _FakeAtofExporterConfig + self.AtofExporterMode = SimpleNamespace(Append="append", Overwrite="overwrite") + self.AtofExporter = self._make_atof_exporter + self.AtifExporter = self._make_atif_exporter + + def _scope_push(self, name, scope_type, **kwargs): + handle = ("scope", name) + self.events.append(("scope.push", name, scope_type, kwargs)) + return handle + + def _scope_pop(self, handle, **kwargs): + self.events.append(("scope.pop", handle, kwargs)) + + def _scope_event(self, name, **kwargs): + self.events.append(("scope.event", name, kwargs)) + + def _llm_call(self, name, request, **kwargs): + handle = ("llm", name) + self.events.append(("llm.call", name, request.content, kwargs)) + return handle + + def _llm_call_end(self, handle, response, **kwargs): + self.events.append(("llm.call_end", handle, response, kwargs)) + + def _tool_call(self, name, args, **kwargs): + handle = ("tool", name) + self.events.append(("tool.call", name, args, kwargs)) + return handle + + def _tool_call_end(self, handle, result, **kwargs): + self.events.append(("tool.call_end", handle, result, kwargs)) + + def _make_atof_exporter(self, config): + return _FakeAtofExporter(self.events, config) + + def _make_atif_exporter(self, session_id, agent_name, agent_version, **kwargs): + return _FakeAtifExporter(self.events, session_id, agent_name, agent_version, kwargs) + + async def _plugin_initialize(self, config): + self.events.append(("plugin.initialize", config)) + return {"diagnostics": []} + + +class _FakeLLMRequest: + def __init__(self, headers, content): + self.headers = headers + self.content = content + + +class _FakeAtofExporterConfig: + def __init__(self): + self.output_directory = "" + self.filename = "events.jsonl" + self.mode = "append" + + +class _FakeAtofExporter: + def __init__(self, events, config): + self.events = events + self.config = config + + def register(self, name): + self.events.append(("atof.register", name, self.config.output_directory, self.config.filename)) + + +class _FakeAtifExporter: + def __init__(self, events, session_id, agent_name, agent_version, kwargs): + self.events = events + self.session_id = session_id + self.agent_name = agent_name + self.agent_version = agent_version + self.kwargs = kwargs + + def register(self, name): + self.events.append(("atif.register", name, self.session_id)) + + def deregister(self, name): + self.events.append(("atif.deregister", name, self.session_id)) + return True + + def export_json(self): + return json.dumps({"session_id": self.session_id, "agent_name": self.agent_name}) + + +def _fresh_plugin(monkeypatch, fake): + monkeypatch.setitem(sys.modules, "nemo_relay", fake) + sys.modules.pop("plugins.observability.nemo_relay", None) + plugin = importlib.import_module("plugins.observability.nemo_relay") + plugin.reset_for_tests() + return plugin + + +def test_manifest_fields(): + data = yaml.safe_load((PLUGIN_DIR / "plugin.yaml").read_text()) + assert data["name"] == "nemo_relay" + assert set(data["hooks"]) == { + "on_session_start", + "on_session_end", + "on_session_finalize", + "on_session_reset", + "pre_llm_call", + "post_llm_call", + "pre_api_request", + "post_api_request", + "api_request_error", + "pre_tool_call", + "post_tool_call", + "pre_approval_request", + "post_approval_response", + "subagent_start", + "subagent_stop", + } + + +def test_nemo_relay_plugin_is_discoverable_as_bundled_plugin(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test")) + + manager = PluginManager() + manager.discover_and_load() + + loaded = manager._plugins["observability/nemo_relay"] + assert loaded.manifest.name == "nemo_relay" + assert loaded.manifest.source == "bundled" + assert not loaded.enabled + + +def test_nemo_relay_plugin_uses_nemo_relay_runtime(monkeypatch): + fake_relay = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake_relay) + + plugin.on_session_start(session_id="s1") + + assert any(event[0] == "scope.push" for event in fake_relay.events) + + +def test_nemo_relay_plugin_emits_llm_tool_and_exports_atif(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATOF_OUTPUT_DIRECTORY", str(tmp_path / "atof")) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "atif")) + + base = { + "session_id": "s1", + "task_id": "t1", + "turn_id": "turn-1", + "telemetry_schema_version": "hermes.observer.v1", + } + plugin.on_session_start(**base, model="demo-model", platform="cli") + plugin.on_pre_api_request( + **base, + api_request_id="api-1", + provider="openai", + model="demo-model", + request={"method": "POST", "body": {"messages": [{"role": "user", "content": "hi"}]}}, + ) + plugin.on_post_api_request( + **base, + api_request_id="api-1", + response={"assistant_message": {"role": "assistant", "content": "hello"}}, + ) + plugin.on_pre_tool_call(**base, tool_name="read_file", tool_call_id="tool-1", args={"path": "x"}) + plugin.on_post_tool_call(**base, tool_name="read_file", tool_call_id="tool-1", result='{"ok": true}', status="ok") + plugin.on_session_end(**base, completed=True, interrupted=False) + plugin.on_session_finalize(**base, reason="shutdown") + + event_names = [event[0] for event in fake.events] + assert "atof.register" in event_names + assert "atif.register" in event_names + assert "llm.call" in event_names + assert "llm.call_end" in event_names + assert "tool.call" in event_names + assert "tool.call_end" in event_names + assert "scope.pop" in event_names + assert (tmp_path / "atif" / "hermes-atif-s1.json").exists() + + +def test_nemo_relay_plugin_closes_api_span_on_error(monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + base = { + "session_id": "s1", + "task_id": "t1", + "turn_id": "turn-1", + "telemetry_schema_version": "hermes.observer.v1", + } + + plugin.on_pre_api_request( + **base, + api_request_id="api-err", + provider="openai", + model="demo-model", + request={"body": {"messages": [{"role": "user", "content": "hi"}]}}, + ) + plugin.on_api_request_error( + **base, + api_request_id="api-err", + error={"type": "RateLimitError", "message": "rate limited"}, + retryable=True, + reason="rate_limit", + ) + + call_end = next(event for event in fake.events if event[0] == "llm.call_end") + assert call_end[1] == ("llm", "openai") + assert call_end[2] == {"error": {"type": "RateLimitError", "message": "rate limited"}} + assert call_end[3]["data"]["reason"] == "rate_limit" + assert not plugin._get_runtime().sessions["s1"].llm_spans + + +def test_nemo_relay_plugin_emits_approval_marks(monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + + plugin.on_pre_approval_request(session_id="s1", approval_id="approval-1", tool_name="shell") + plugin.on_post_approval_response(session_id="s1", approval_id="approval-1", approved=True) + + mark_names = [event[1] for event in fake.events if event[0] == "scope.event"] + assert "hermes.approval.request" in mark_names + assert "hermes.approval.response" in mark_names + + +def test_nemo_relay_plugin_emits_unmatched_fallback_marks(monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + + plugin.on_post_api_request(session_id="s1", api_request_id="missing-api", response={"ok": True}) + plugin.on_api_request_error( + session_id="s1", + api_request_id="missing-api", + error={"type": "TimeoutError", "message": "timed out"}, + ) + plugin.on_post_tool_call(session_id="s1", tool_call_id="missing-tool", result={"ok": True}) + + mark_names = [event[1] for event in fake.events if event[0] == "scope.event"] + assert "hermes.api.response.unmatched" in mark_names + assert "hermes.api.error" in mark_names + assert "hermes.tool.response.unmatched" in mark_names + + +def test_nemo_relay_plugin_metadata_promotes_trajectory_and_subagent_ids(monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + + plugin.on_pre_llm_call( + session_id="parent-session", + task_id="task-1", + turn_id="turn-1", + telemetry_schema_version="hermes.observer.v1", + ) + plugin.on_subagent_start( + parent_session_id="parent-session", + parent_turn_id="turn-1", + parent_subagent_id="parent-sa", + child_session_id="child-session", + child_subagent_id="child-sa", + child_role="leaf", + telemetry_schema_version="hermes.observer.v1", + ) + plugin.on_subagent_stop( + parent_session_id="parent-session", + parent_turn_id="turn-1", + child_session_id="child-session", + child_role="leaf", + child_status="completed", + telemetry_schema_version="hermes.observer.v1", + ) + + turn_mark = next(event for event in fake.events if event[0] == "scope.event" and event[1] == "hermes.turn.start") + turn_metadata = turn_mark[2]["metadata"] + assert turn_metadata["session_id"] == "parent-session" + assert turn_metadata["trajectory_id"] == "parent-session" + + start_mark = next(event for event in fake.events if event[0] == "scope.event" and event[1] == "hermes.subagent.start") + start_metadata = start_mark[2]["metadata"] + assert start_metadata["parent_session_id"] == "parent-session" + assert start_metadata["parent_trajectory_id"] == "parent-session" + assert start_metadata["child_session_id"] == "child-session" + assert start_metadata["child_trajectory_id"] == "child-session" + assert start_metadata["child_subagent_id"] == "child-sa" + assert start_metadata["child_role"] == "leaf" + + stop_mark = next(event for event in fake.events if event[0] == "scope.event" and event[1] == "hermes.subagent.stop") + assert stop_mark[2]["metadata"]["child_status"] == "completed" + + +def test_nemo_relay_plugin_reparents_child_session_scope_for_embedded_atif(monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + + plugin.on_session_start(session_id="parent-session") + plugin.on_subagent_start( + parent_session_id="parent-session", + parent_turn_id="turn-1", + child_session_id="child-session", + child_subagent_id="child-sa", + child_role="leaf", + telemetry_schema_version="hermes.observer.v1", + ) + plugin.on_session_start(session_id="child-session") + + child_push = next( + event + for event in fake.events + if event[0] == "scope.push" and event[1] == "hermes-session-child-session" + ) + child_kwargs = child_push[3] + assert child_kwargs["handle"] == ("scope", "hermes-session-parent-session") + assert child_kwargs["metadata"]["session_id"] == "child-session" + assert child_kwargs["metadata"]["trajectory_id"] == "child-session" + assert child_kwargs["metadata"]["nemo_relay_scope_role"] == "subagent" + assert child_kwargs["metadata"]["subagent_id"] == "child-sa" + assert child_kwargs["metadata"]["parent_session_id"] == "parent-session" + + +def test_nemo_relay_plugin_skips_embedded_child_atif_file_by_default(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "atif")) + + plugin.on_session_start(session_id="parent-session") + plugin.on_subagent_start( + parent_session_id="parent-session", + child_session_id="child-session", + child_subagent_id="child-sa", + ) + plugin.on_session_start(session_id="child-session") + plugin.on_session_end(session_id="child-session") + plugin.on_session_finalize(session_id="child-session") + plugin.on_session_end(session_id="parent-session") + plugin.on_session_finalize(session_id="parent-session") + + assert (tmp_path / "atif" / "hermes-atif-parent-session.json").exists() + assert not (tmp_path / "atif" / "hermes-atif-child-session.json").exists() + + +def test_nemo_relay_plugin_can_write_embedded_child_atif_file_in_all_mode(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_ENABLED", "1") + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_OUTPUT_DIRECTORY", str(tmp_path / "atif")) + monkeypatch.setenv("HERMES_NEMO_RELAY_ATIF_SUBAGENT_EXPORT_MODE", "all") + + plugin.on_session_start(session_id="parent-session") + plugin.on_subagent_start( + parent_session_id="parent-session", + child_session_id="child-session", + child_subagent_id="child-sa", + ) + plugin.on_session_start(session_id="child-session") + plugin.on_session_end(session_id="child-session") + plugin.on_session_finalize(session_id="child-session") + plugin.on_session_end(session_id="parent-session") + plugin.on_session_finalize(session_id="parent-session") + + assert (tmp_path / "atif" / "hermes-atif-parent-session.json").exists() + assert (tmp_path / "atif" / "hermes-atif-child-session.json").exists() + + +def test_nemo_relay_plugin_can_initialize_plugins_toml(tmp_path, monkeypatch): + fake = _FakeNemoRelay() + plugin = _fresh_plugin(monkeypatch, fake) + plugins_toml = tmp_path / "plugins.toml" + atof_dir = tmp_path / "exports" / "events" + atif_dir = tmp_path / "exports" / "trajectories" + plugins_toml.write_text( + f""" +version = 1 + +[[components]] +kind = "observability" +enabled = true + +[components.config.atof] +enabled = true +output_directory = "{atof_dir}" + +[components.config.atif] +enabled = true +output_directory = "{atif_dir}" +""", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_NEMO_RELAY_PLUGINS_TOML", str(plugins_toml)) + + plugin.on_session_start(session_id="s1") + + assert any(event[0] == "plugin.initialize" for event in fake.events) + assert not any(event[0] == "atof.register" for event in fake.events) + assert atof_dir.is_dir() + assert atif_dir.is_dir() + + +def test_nemo_relay_plugin_noops_without_dependency(monkeypatch): + monkeypatch.delitem(sys.modules, "nemo_relay", raising=False) + sys.modules.pop("plugins.observability.nemo_relay", None) + plugin = importlib.import_module("plugins.observability.nemo_relay") + plugin.reset_for_tests() + + real_import = builtins.__import__ + + def blocked_import(name, *args, **kwargs): + if name == "nemo_relay": + raise ModuleNotFoundError(f"No module named {name!r}") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", blocked_import) + + plugin.on_pre_api_request(session_id="s1", api_request_id="api-1") + plugin.on_post_api_request(session_id="s1", api_request_id="api-1") diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 63657a730..9fa45c70c 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -5,6 +5,8 @@ pieces. The OpenAI client and tool loading are mocked so no network calls are made. """ +import ast +import inspect import io import json import logging @@ -2051,6 +2053,39 @@ class TestExecuteToolCalls: assert messages[0]["role"] == "tool" assert "search result" in messages[0]["content"] + def test_keyboard_interrupt_emits_cancelled_post_tool_hook(self, agent, monkeypatch): + tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1") + mock_msg = _mock_assistant_msg(content="", tool_calls=[tc]) + messages = [] + hook_calls = [] + agent.session_id = "session-1" + agent._current_turn_id = "turn-1" + agent._current_api_request_id = "api-1" + + def _capture_hook(hook_name, **kwargs): + hook_calls.append((hook_name, kwargs)) + return [] + + monkeypatch.setattr("hermes_cli.plugins.invoke_hook", _capture_hook) + + with ( + patch("run_agent.handle_function_call", side_effect=KeyboardInterrupt), + patch("run_agent._set_interrupt"), + pytest.raises(KeyboardInterrupt), + ): + agent._execute_tool_calls_sequential(mock_msg, messages, "task-1") + + post_calls = [kwargs for name, kwargs in hook_calls if name == "post_tool_call"] + assert len(post_calls) == 1 + assert post_calls[0]["tool_name"] == "web_search" + assert post_calls[0]["tool_call_id"] == "c1" + assert post_calls[0]["session_id"] == "session-1" + assert post_calls[0]["turn_id"] == "turn-1" + assert post_calls[0]["api_request_id"] == "api-1" + assert post_calls[0]["status"] == "cancelled" + assert post_calls[0]["error_type"] == "keyboard_interrupt" + assert json.loads(post_calls[0]["result"])["status"] == "cancelled" + def test_interrupt_skips_remaining(self, agent): tc1 = _mock_tool_call(name="web_search", arguments="{}", call_id="c1") tc2 = _mock_tool_call(name="web_search", arguments="{}", call_id="c2") @@ -2426,6 +2461,8 @@ class TestConcurrentToolExecution: "web_search", {"q": "test"}, "task-1", tool_call_id=None, session_id=agent.session_id, + turn_id="", + api_request_id="", enabled_tools=list(agent.valid_tool_names), skip_pre_tool_call_hook=True, enabled_toolsets=agent.enabled_toolsets, @@ -2476,6 +2513,30 @@ class TestConcurrentToolExecution: mock_todo.assert_called_once() assert "ok" in result + def test_invoke_tool_agent_level_tool_emits_terminal_post_tool_hook(self, agent, monkeypatch): + """Agent-owned tool paths should close observer tool spans.""" + hook_calls = [] + monkeypatch.setattr( + "hermes_cli.plugins.get_pre_tool_call_block_message", + lambda *args, **kwargs: None, + ) + monkeypatch.setattr( + "hermes_cli.plugins.invoke_hook", + lambda hook_name, **kwargs: hook_calls.append((hook_name, kwargs)) or [], + ) + + with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo: + result = agent._invoke_tool("todo", {"todos": []}, "task-1", tool_call_id="todo-1") + + mock_todo.assert_called_once() + assert result == '{"ok":true}' + post_call = next(call for call in hook_calls if call[0] == "post_tool_call") + assert post_call[1]["tool_name"] == "todo" + assert post_call[1]["tool_call_id"] == "todo-1" + assert post_call[1]["status"] == "ok" + assert post_call[1]["error_type"] is None + assert isinstance(post_call[1]["duration_ms"], int) + def test_invoke_tool_blocked_returns_error_and_skips_execution(self, agent, monkeypatch): """_invoke_tool should return error JSON when a plugin blocks the tool.""" monkeypatch.setattr( @@ -2528,6 +2589,74 @@ class TestConcurrentToolExecution: assert messages[0]["role"] == "tool" assert json.loads(messages[0]["content"]) == {"error": "Blocked by policy"} + def test_sequential_blocked_tool_emits_terminal_post_tool_hook(self, agent, monkeypatch): + """Blocked pre_tool_call decisions still terminate observer tool spans.""" + tool_call = _mock_tool_call(name="write_file", + arguments='{"path":"test.txt","content":"hello"}', + call_id="c1") + mock_msg = _mock_assistant_msg(content="", tool_calls=[tool_call]) + messages = [] + hook_calls = [] + + monkeypatch.setattr( + "hermes_cli.plugins.get_pre_tool_call_block_message", + lambda *args, **kwargs: "Blocked by policy", + ) + monkeypatch.setattr( + "hermes_cli.plugins.invoke_hook", + lambda hook_name, **kwargs: hook_calls.append((hook_name, kwargs)) or [], + ) + + with patch("run_agent.handle_function_call", side_effect=AssertionError("should not run")): + agent._execute_tool_calls_sequential(mock_msg, messages, "task-1") + + post_call = next(call for call in hook_calls if call[0] == "post_tool_call") + assert post_call[1]["tool_name"] == "write_file" + assert post_call[1]["tool_call_id"] == "c1" + assert post_call[1]["status"] == "blocked" + assert post_call[1]["error_type"] == "plugin_block" + assert post_call[1]["error_message"] == "Blocked by policy" + + def test_sequential_agent_level_tool_emits_terminal_post_tool_hook(self, agent, monkeypatch): + """Sequential built-in tool paths should also close observer tool spans.""" + tool_call = _mock_tool_call(name="todo", arguments='{"todos":[]}', call_id="todo-1") + mock_msg = _mock_assistant_msg(content="", tool_calls=[tool_call]) + messages = [] + hook_calls = [] + + monkeypatch.setattr( + "hermes_cli.plugins.get_pre_tool_call_block_message", + lambda *args, **kwargs: None, + ) + monkeypatch.setattr( + "hermes_cli.plugins.invoke_hook", + lambda hook_name, **kwargs: hook_calls.append((hook_name, kwargs)) or [], + ) + + with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo: + agent._execute_tool_calls_sequential(mock_msg, messages, "task-1") + + mock_todo.assert_called_once() + post_call = next(call for call in hook_calls if call[0] == "post_tool_call") + assert post_call[1]["tool_name"] == "todo" + assert post_call[1]["tool_call_id"] == "todo-1" + assert post_call[1]["result"] == '{"ok":true}' + assert post_call[1]["status"] == "ok" + + def test_agent_runtime_post_hook_ownership_predicate_covers_agent_tools(self, agent): + """Sequential and concurrent agent-level paths share post-hook ownership.""" + from agent.agent_runtime_helpers import agent_runtime_owns_post_tool_hook + + for tool_name in ("todo", "session_search", "memory", "clarify", "delegate_task"): + assert agent_runtime_owns_post_tool_hook(agent, tool_name) is True + + agent._context_engine_tool_names = {"context_query"} + assert agent_runtime_owns_post_tool_hook(agent, "context_query") is True + + agent._memory_manager = SimpleNamespace(has_tool=lambda name: name == "memory_extra") + assert agent_runtime_owns_post_tool_hook(agent, "memory_extra") is True + assert agent_runtime_owns_post_tool_hook(agent, "web_search") is False + def test_blocked_memory_tool_does_not_reset_counter(self, agent, monkeypatch): """Blocked memory tool should not reset the nudge counter.""" agent._turns_since_memory = 5 @@ -2660,6 +2789,135 @@ class TestConcurrentToolExecution: cp_mock.assert_called_once() +class TestAgentRuntimePostHookOwnershipSync: + """Pin the inline-dispatch tool list against the post-hook ownership set. + + The post_tool_call hook fires from two places: the inline dispatcher in + agent/tool_executor.py:execute_tool_calls_sequential (for agent-runtime + tools that never reach handle_function_call) and + model_tools.handle_function_call itself (for registry-dispatched tools). + To prevent the executor from silently dropping or double-emitting, + AGENT_RUNTIME_POST_HOOK_TOOL_NAMES has to match exactly the static + `function_name == "..."` branches in the inline dispatch chain. + + The chain is the if/elif tower anchored on `_block_msg is not None`. + Pre-dispatch `function_name == "..."` checks (counter resets, checkpoint + triggers) live outside the dispatch chain and are explicitly skipped. + """ + + _DISPATCH_ANCHOR_LEFT = "_block_msg" + + @classmethod + def _is_dispatch_anchor(cls, test_node) -> bool: + # Looking for `_block_msg is not None`. + if not isinstance(test_node, ast.Compare): + return False + if not (isinstance(test_node.left, ast.Name) and test_node.left.id == cls._DISPATCH_ANCHOR_LEFT): + return False + if not (len(test_node.ops) == 1 and isinstance(test_node.ops[0], ast.IsNot)): + return False + comparator = test_node.comparators[0] + return isinstance(comparator, ast.Constant) and comparator.value is None + + @staticmethod + def _function_name_literal(test_node) -> str | None: + """Return the string literal X for `function_name == "X"`, else None.""" + if not isinstance(test_node, ast.Compare): + return None + if not (isinstance(test_node.left, ast.Name) and test_node.left.id == "function_name"): + return None + if not (len(test_node.ops) == 1 and isinstance(test_node.ops[0], ast.Eq)): + return None + comparator = test_node.comparators[0] + if isinstance(comparator, ast.Constant) and isinstance(comparator.value, str): + return comparator.value + return None + + @classmethod + def _extract_dispatch_chain_names(cls, func) -> set[str]: + """Find the if/elif chain anchored on `_block_msg is not None`, return its + `function_name == "..."` literals.""" + source = inspect.cleandoc("\n" + inspect.getsource(func)) + tree = ast.parse(source) + names: set[str] = set() + for node in ast.walk(tree): + if not isinstance(node, ast.If): + continue + if not cls._is_dispatch_anchor(node.test): + continue + current = node + while current is not None: + literal = cls._function_name_literal(current.test) + if literal is not None: + names.add(literal) + if current.orelse and len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If): + current = current.orelse[0] + else: + current = None + break + return names + + @classmethod + def _extract_invoke_tool_names(cls, func) -> set[str]: + """invoke_tool uses a flat if/elif on function_name directly; walk every + Compare in the function body (no other static `function_name == "..."` + checks live there).""" + source = inspect.cleandoc("\n" + inspect.getsource(func)) + tree = ast.parse(source) + names: set[str] = set() + for node in ast.walk(tree): + literal = cls._function_name_literal(node) + if literal is not None: + names.add(literal) + return names + + def test_frozenset_matches_inline_dispatch_chain(self): + from agent import tool_executor + from agent.agent_runtime_helpers import AGENT_RUNTIME_POST_HOOK_TOOL_NAMES + + inline_names = self._extract_dispatch_chain_names( + tool_executor.execute_tool_calls_sequential + ) + assert inline_names, ( + "Could not find the dispatch chain (anchored on " + "`_block_msg is not None`) in execute_tool_calls_sequential. " + "If the dispatcher was refactored, update _DISPATCH_ANCHOR_LEFT " + "and the walker in this test." + ) + assert inline_names == set(AGENT_RUNTIME_POST_HOOK_TOOL_NAMES), ( + "Inline dispatch chain in " + "agent/tool_executor.py:execute_tool_calls_sequential has drifted " + "from AGENT_RUNTIME_POST_HOOK_TOOL_NAMES in " + "agent/agent_runtime_helpers.py.\n" + f" Inline branches: {sorted(inline_names)}\n" + f" Ownership frozenset: {sorted(AGENT_RUNTIME_POST_HOOK_TOOL_NAMES)}\n" + "Update both together so post_tool_call fires exactly once per " + "tool execution." + ) + + def test_invoke_tool_dispatch_matches_inline_dispatch_chain(self): + """invoke_tool (concurrent path) and the inline dispatcher (sequential + path) must cover the same set of agent-runtime tools — otherwise + post_tool_call fires inconsistently depending on which executor ran + the tool.""" + from agent import agent_runtime_helpers, tool_executor + + invoke_tool_names = self._extract_invoke_tool_names( + agent_runtime_helpers.invoke_tool + ) + inline_names = self._extract_dispatch_chain_names( + tool_executor.execute_tool_calls_sequential + ) + assert invoke_tool_names == inline_names, ( + "Static `function_name == \"...\"` branches diverged between " + "agent/agent_runtime_helpers.py:invoke_tool (concurrent path) " + "and agent/tool_executor.py:execute_tool_calls_sequential " + "(sequential path).\n" + f" invoke_tool: {sorted(invoke_tool_names)}\n" + f" execute_tool_calls_sequential: {sorted(inline_names)}" + ) + + class TestPathsOverlap: """Unit tests for the _paths_overlap helper.""" @@ -3088,6 +3346,10 @@ class TestRunConversation: with ( patch("run_agent.handle_function_call", return_value="search result"), + patch( + "hermes_cli.plugins.has_hook", + side_effect=lambda name: name in {"pre_api_request", "post_api_request"}, + ), patch("hermes_cli.plugins.invoke_hook", side_effect=_record_hook), patch.object(agent, "_persist_session"), patch.object(agent, "_save_trajectory"), @@ -3103,9 +3365,85 @@ class TestRunConversation: assert [call["api_call_count"] for call in pre_request_calls] == [1, 2] assert [call["api_call_count"] for call in post_request_calls] == [1, 2] assert all(call["session_id"] == agent.session_id for call in pre_request_calls) + assert all(call["turn_id"] == pre_request_calls[0]["turn_id"] for call in pre_request_calls + post_request_calls) + assert [call["api_request_id"] for call in pre_request_calls] == [ + call["api_request_id"] for call in post_request_calls + ] assert all("message_count" in c and isinstance(c.get("request_messages"), list) for c in pre_request_calls) + assert all("request" in c and "messages" in c["request"]["body"] for c in pre_request_calls) assert any(msg.get("role") == "user" and msg.get("content") == "search something" for msg in pre_request_calls[0]["request_messages"]) - assert all("usage" in c and "response" in c and "assistant_message" in c for c in post_request_calls) + assert all("usage" in c and "response" in c for c in post_request_calls) + assert all("assistant_message" in c["response"] for c in post_request_calls) + + def test_api_request_error_hook_skips_payload_work_without_listener(self, agent, monkeypatch): + payload_built = False + hook_called = False + + def _payload_for_hook(_api_kwargs): + nonlocal payload_built + payload_built = True + return {} + + def _invoke_hook(_name, **_kwargs): + nonlocal hook_called + hook_called = True + return [] + + monkeypatch.setattr("hermes_cli.plugins.has_hook", lambda name: False) + monkeypatch.setattr("hermes_cli.plugins.invoke_hook", _invoke_hook) + monkeypatch.setattr(agent, "_api_request_payload_for_hook", _payload_for_hook) + + agent._invoke_api_request_error_hook( + task_id="task-1", + turn_id="turn-1", + api_request_id="api-1", + api_call_count=1, + api_start_time=0.0, + api_kwargs={"messages": [{"role": "user", "content": "hi"}]}, + error_type="RuntimeError", + error_message="boom", + ) + + assert payload_built is False + assert hook_called is False + + def test_request_scoped_api_hooks_skip_payload_work_without_listeners(self, agent, monkeypatch): + self._setup_agent(agent) + agent.client.chat.completions.create.return_value = _mock_response( + content="No listeners", + finish_reason="stop", + ) + hook_checks = {"pre_api_request": 0, "post_api_request": 0} + payload_counts = {"request": 0, "response": 0} + + def _has_hook(name): + if name in hook_checks: + hook_checks[name] += 1 + return False + + def _request_payload(_api_kwargs): + payload_counts["request"] += 1 + return {} + + def _response_payload(_response, _assistant_message, *, finish_reason): + payload_counts["response"] += 1 + return {} + + monkeypatch.setattr("hermes_cli.plugins.has_hook", _has_hook) + monkeypatch.setattr(agent, "_api_request_payload_for_hook", _request_payload) + monkeypatch.setattr(agent, "_api_response_payload_for_hook", _response_payload) + + with ( + patch("hermes_cli.plugins.invoke_hook", return_value=[]), + patch.object(agent, "_persist_session"), + patch.object(agent, "_save_trajectory"), + patch.object(agent, "_cleanup_task_resources"), + ): + result = agent.run_conversation("hello") + + assert result["final_response"] == "No listeners" + assert hook_checks == {"pre_api_request": 1, "post_api_request": 1} + assert payload_counts == {"request": 0, "response": 0} def test_content_with_tool_calls_stays_silent_for_non_cli_quiet_mode(self, agent): self._setup_agent(agent) diff --git a/tests/test_model_tools.py b/tests/test_model_tools.py index cb8f9f7a9..633f82c1d 100644 --- a/tests/test_model_tools.py +++ b/tests/test_model_tools.py @@ -61,6 +61,8 @@ class TestHandleFunctionCall: task_id="task-1", session_id="session-1", tool_call_id="call-1", + turn_id="", + api_request_id="", ), call( "post_tool_call", @@ -70,7 +72,12 @@ class TestHandleFunctionCall: task_id="task-1", session_id="session-1", tool_call_id="call-1", + turn_id="", + api_request_id="", duration_ms=ANY, + status="ok", + error_type=None, + error_message=None, ), call( "transform_tool_result", @@ -80,7 +87,12 @@ class TestHandleFunctionCall: task_id="task-1", session_id="session-1", tool_call_id="call-1", + turn_id="", + api_request_id="", duration_ms=ANY, + status="ok", + error_type=None, + error_message=None, ), ] @@ -136,7 +148,10 @@ class TestPreToolCallBlocking: """Verify that pre_tool_call hooks can block tool execution.""" def test_blocked_tool_returns_error_and_skips_dispatch(self, monkeypatch): + hook_calls = [] + def fake_invoke_hook(hook_name, **kwargs): + hook_calls.append((hook_name, kwargs)) if hook_name == "pre_tool_call": return [{"action": "block", "message": "Blocked by policy"}] return [] @@ -155,6 +170,11 @@ class TestPreToolCallBlocking: result = json.loads(handle_function_call("read_file", {"path": "test.txt"}, task_id="t1")) assert result == {"error": "Blocked by policy"} assert not dispatch_called + post_call = next(call for call in hook_calls if call[0] == "post_tool_call") + assert post_call[1]["status"] == "blocked" + assert post_call[1]["error_type"] == "plugin_block" + assert post_call[1]["error_message"] == "Blocked by policy" + assert post_call[1]["duration_ms"] == 0 def test_blocked_tool_skips_read_loop_notification(self, monkeypatch): notifications = [] diff --git a/tests/test_project_metadata.py b/tests/test_project_metadata.py index 51ca9cf91..29f32c6bb 100644 --- a/tests/test_project_metadata.py +++ b/tests/test_project_metadata.py @@ -113,6 +113,16 @@ def test_feishu_extra_includes_qrcode_for_qr_login(): assert any(dep.startswith("qrcode") for dep in feishu_extra) +def test_nemo_relay_extra_uses_official_0_3_distribution(): + optional_dependencies = _load_optional_dependencies() + + assert optional_dependencies["nemo-relay"] == ["nemo-relay==0.3"] + assert not any( + spec == "hermes-agent[nemo-relay]" + for spec in optional_dependencies["all"] + ) + + def test_dashboard_plugin_manifests_and_assets_are_packaged(): """Bundled dashboard plugins need their manifests and built assets in wheel installs so /api/dashboard/plugins can discover them outside a @@ -123,3 +133,13 @@ def test_dashboard_plugin_manifests_and_assets_are_packaged(): assert "*/dashboard/manifest.json" in plugin_data assert "*/dashboard/dist/*" in plugin_data assert "*/dashboard/dist/**/*" in plugin_data + + +def test_nested_bundled_plugin_metadata_is_packaged(): + """Nested opt-in plugins need manifests and READMEs in wheel installs.""" + package_data = _load_package_data() + plugin_data = package_data["plugins"] + + assert "**/plugin.yaml" in plugin_data + assert "**/plugin.yml" in plugin_data + assert "**/README.md" in plugin_data diff --git a/tests/tools/test_file_tools_cwd_resolution.py b/tests/tools/test_file_tools_cwd_resolution.py index 6bb7c1bf3..03620f486 100644 --- a/tests/tools/test_file_tools_cwd_resolution.py +++ b/tests/tools/test_file_tools_cwd_resolution.py @@ -194,4 +194,3 @@ def test_patch_reports_resolved_absolute_path(_isolated_cwd, monkeypatch): assert "WORKSPACE_PATCHED" in (workspace / "target.py").read_text() # And the decoy copy is untouched. assert (decoy / "target.py").read_text() == "DECOY_ORIGINAL\n" - diff --git a/tests/tools/test_tool_search.py b/tests/tools/test_tool_search.py index 9c8c8a33c..aa86e8ba1 100644 --- a/tests/tools/test_tool_search.py +++ b/tests/tools/test_tool_search.py @@ -535,4 +535,3 @@ class TestRegression_ToolsetScoping: assert "mcp_helper_op" in names # core tools are never deferrable assert "terminal" not in names - diff --git a/tools/approval.py b/tools/approval.py index 47f4a5f44..f853b6b57 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -36,6 +36,14 @@ _approval_session_key: contextvars.ContextVar[str] = contextvars.ContextVar( "approval_session_key", default="", ) +_approval_turn_id: contextvars.ContextVar[str] = contextvars.ContextVar( + "approval_turn_id", + default="", +) +_approval_tool_call_id: contextvars.ContextVar[str] = contextvars.ContextVar( + "approval_tool_call_id", + default="", +) def _fire_approval_hook(hook_name: str, **kwargs) -> None: @@ -55,6 +63,8 @@ def _fire_approval_hook(hook_name: str, **kwargs) -> None: # (e.g. bare tool-only imports, minimal test environments). return try: + kwargs.setdefault("turn_id", _approval_turn_id.get()) + kwargs.setdefault("tool_call_id", _approval_tool_call_id.get()) invoke_hook(hook_name, **kwargs) except Exception as exc: # invoke_hook() already swallows per-callback errors, so reaching here @@ -74,6 +84,27 @@ def reset_current_session_key(token: contextvars.Token[str]) -> None: _approval_session_key.reset(token) +def set_current_observability_context( + *, + turn_id: str = "", + tool_call_id: str = "", +) -> tuple[contextvars.Token[str], contextvars.Token[str]]: + """Bind active tool correlation IDs to approval hooks.""" + return ( + _approval_turn_id.set(turn_id or ""), + _approval_tool_call_id.set(tool_call_id or ""), + ) + + +def reset_current_observability_context( + tokens: tuple[contextvars.Token[str], contextvars.Token[str]], +) -> None: + """Restore prior approval hook correlation IDs.""" + turn_token, tool_token = tokens + _approval_tool_call_id.reset(tool_token) + _approval_turn_id.reset(turn_token) + + def get_current_session_key(default: str = "default") -> str: """Return the active session key, preferring context-local state. diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 86dcd0715..d696cab41 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -1146,6 +1146,7 @@ def _build_child_agent( child._subagent_id = subagent_id child._parent_subagent_id = parent_subagent_id child._subagent_goal = goal + child._parent_turn_id = getattr(parent_agent, "_current_turn_id", "") or "" # Share a credential pool with the child when possible so subagents can # rotate credentials on rate limits instead of getting pinned to one key. @@ -1171,6 +1172,21 @@ def _build_child_agent( except Exception as exc: logger.debug("spawn_requested relay failed: %s", exc) + try: + from hermes_cli.plugins import invoke_hook as _invoke_hook + _invoke_hook( + "subagent_start", + parent_session_id=getattr(parent_agent, "session_id", None), + parent_turn_id=getattr(parent_agent, "_current_turn_id", "") or "", + parent_subagent_id=parent_subagent_id, + child_session_id=getattr(child, "session_id", None), + child_subagent_id=subagent_id, + child_role=effective_role, + child_goal=goal, + ) + except Exception: + logger.debug("subagent_start hook invocation failed", exc_info=True) + return child @@ -2265,9 +2281,17 @@ def delegate_task( if _invoke_hook is None: continue try: + _child_index = entry.get("task_index", -1) + _child_agent = ( + children[_child_index][2] + if isinstance(_child_index, int) and 0 <= _child_index < len(children) + else None + ) _invoke_hook( "subagent_stop", parent_session_id=_parent_session_id, + parent_turn_id=getattr(parent_agent, "_current_turn_id", "") or "", + child_session_id=getattr(_child_agent, "session_id", None), child_role=child_role, child_summary=entry.get("summary"), child_status=entry.get("status"), diff --git a/uv.lock b/uv.lock index 996de0ffb..5a70bb3aa 100644 --- a/uv.lock +++ b/uv.lock @@ -1739,6 +1739,9 @@ mistral = [ modal = [ { name = "modal" }, ] +nemo-relay = [ + { name = "nemo-relay" }, +] parallel-web = [ { name = "parallel-web" }, ] @@ -1859,6 +1862,7 @@ requires-dist = [ { name = "mcp", marker = "extra == 'mcp'", specifier = "==1.26.0" }, { name = "mistralai", marker = "extra == 'mistral'", specifier = "==2.4.8" }, { name = "modal", marker = "extra == 'modal'", specifier = "==1.3.4" }, + { name = "nemo-relay", marker = "extra == 'nemo-relay'", specifier = "==0.3" }, { name = "numpy", marker = "extra == 'voice'", specifier = "==2.4.3" }, { name = "openai", specifier = "==2.24.0" }, { name = "parallel-web", marker = "extra == 'parallel-web'", specifier = "==0.4.2" }, @@ -1901,7 +1905,7 @@ requires-dist = [ { name = "uvicorn", extras = ["standard"], marker = "extra == 'web'", specifier = "==0.41.0" }, { name = "youtube-transcript-api", marker = "extra == 'youtube'", specifier = "==1.2.4" }, ] -provides-extras = ["anthropic", "exa", "firecrawl", "parallel-web", "fal", "edge-tts", "modal", "daytona", "hindsight", "dev", "messaging", "cron", "slack", "matrix", "wecom", "cli", "tts-premium", "voice", "pty", "honcho", "mcp", "homeassistant", "sms", "computer-use", "acp", "mistral", "bedrock", "azure-identity", "termux", "termux-all", "dingtalk", "feishu", "google", "youtube", "web", "all"] +provides-extras = ["anthropic", "exa", "firecrawl", "parallel-web", "fal", "edge-tts", "modal", "daytona", "hindsight", "dev", "messaging", "cron", "slack", "matrix", "wecom", "cli", "tts-premium", "voice", "pty", "honcho", "mcp", "nemo-relay", "homeassistant", "sms", "computer-use", "acp", "mistral", "bedrock", "azure-identity", "termux", "termux-all", "dingtalk", "feishu", "google", "youtube", "web", "all"] [[package]] name = "hf-xet" @@ -2691,6 +2695,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/08/7036c080d7117f28a4af526d794aab6a84463126db031b007717c1a6676e/multidict-6.7.1-py3-none-any.whl", hash = "sha256:55d97cc6dae627efa6a6e548885712d4864b81110ac76fa4e534c03819fa4a56", size = 12319, upload-time = "2026-01-26T02:46:44.004Z" }, ] +[[package]] +name = "nemo-relay" +version = "0.3.0" +source = { registry = "https://pypi.org/simple" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/15/55/3b65643db2df02fb3838b3d251442e05b7306cbbc77e69884ae78743546f/nemo_relay-0.3.0-cp311-abi3-macosx_11_0_arm64.whl", hash = "sha256:a6e44abe38bf6dda8f6a8b149cd9069a548186f24263ae8469d5a37cefc95209", size = 5282297, upload-time = "2026-05-29T22:32:36.315Z" }, + { url = "https://files.pythonhosted.org/packages/c7/ef/e2f4bc02f99f38706fdde0cdda6b6d8fddea9e6975da1b66377c35958b85/nemo_relay-0.3.0-cp311-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:99ba247121cd0d17b9c5e2a95beb42512243430773b93435848b8b4b9f9ca61f", size = 4722431, upload-time = "2026-05-29T22:32:38.088Z" }, + { url = "https://files.pythonhosted.org/packages/12/18/bedb5d57f206e2859cef11f12f568974a529acf382436523a37e095b2cc7/nemo_relay-0.3.0-cp311-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:20bb1e4ed87f179befc4db3af2e35f08be711378b322f3672222d80294f57be7", size = 5002734, upload-time = "2026-05-29T22:32:40.191Z" }, + { url = "https://files.pythonhosted.org/packages/26/ec/4b2e758a0a25397f2e97be889a11b7787d108658e0b60ddff5048b25adb8/nemo_relay-0.3.0-cp311-abi3-win_amd64.whl", hash = "sha256:e659c2772b35c0ea4897a91f6bf86b551ed403f6f41d0b60fd8151f5c78b83d0", size = 4947785, upload-time = "2026-05-29T22:32:41.784Z" }, + { url = "https://files.pythonhosted.org/packages/2b/91/45a3397559679d846ae023a82527c43b14631786b3a7c95e69c05e8bb553/nemo_relay-0.3.0-cp311-abi3-win_arm64.whl", hash = "sha256:9655514adb518e19caf7b3f6a08bbe82c1b24759c0a8021c3df949fd265bcef6", size = 4662147, upload-time = "2026-05-29T22:32:43.554Z" }, +] + [[package]] name = "nest-asyncio" version = "1.6.0" diff --git a/website/docs/user-guide/features/hooks.md b/website/docs/user-guide/features/hooks.md index 40eff4895..eeba34685 100644 --- a/website/docs/user-guide/features/hooks.md +++ b/website/docs/user-guide/features/hooks.md @@ -353,6 +353,9 @@ Gateway hooks only fire in the **gateway** (Telegram, Discord, Slack, WhatsApp, [Plugins](/user-guide/features/plugins) can register hooks that fire in **both CLI and gateway** sessions. These are registered programmatically via `ctx.register_hook()` in your plugin's `register()` function. +For plugin packaging and registration details, see +the [Plugins guide](/docs/user-guide/features/plugins). + ```python def register(ctx): ctx.register_hook("pre_tool_call", my_tool_observer) @@ -368,6 +371,7 @@ def register(ctx): - Callbacks receive **keyword arguments**. Always accept `**kwargs` for forward compatibility — new parameters may be added in future versions without breaking your plugin. - If a callback **crashes**, it's logged and skipped. Other hooks and the agent continue normally. A misbehaving plugin can never break the agent. - Two hooks' return values affect behavior: [`pre_tool_call`](#pre_tool_call) can **block** the tool, and [`pre_llm_call`](#pre_llm_call) can **inject context** into the LLM call. All other hooks are fire-and-forget observers. +- Observer callbacks receive `telemetry_schema_version` automatically. When present, `turn_id`, `api_request_id`, `task_id`, `session_id`, and `api_call_count` are separate correlation fields. Treat `api_request_id` as an opaque identifier; do not parse its string format. ### Quick reference