fix(kanban): close three blocked/iteration-exhausted handling gaps (#29747)
Reporter diagnosed three independent gaps that together allowed infinite 'unblock → re-stuck' loops with no surfacing or escalation: GAP 1: `_rule_stuck_in_blocked` resets timer on any `commented`/`unblocked` event, so a task that cycles every few minutes is invisible to it regardless of how many times it cycles. Fix: new `_rule_block_unblock_cycling` rule (`hermes_cli/kanban_diagnostics.py`) that counts block→unblock cycles in a sliding window. Default threshold 3 cycles within 24h, configurable via `block_cycle_threshold` / `block_cycle_window_seconds`. Walks events in arrival order (event id) since multiple events can share the same `created_at` second. Fires as a warning with a CLI hint to inspect the block reasons. GAP 2: Iteration-budget-exhausted runs in kanban workers map to `kanban_block` (status=blocked, but a clean exit from the kernel's perspective). `_rule_repeated_failures` reads `consecutive_failures`, which `_record_task_failure` increments only for crashed/timed_out/ spawn_failed — `blocked` outcome bypasses the failure counter, so the `kanban.failure_limit` circuit breaker never trips on budget-exhaustion loops. Fix: `agent/conversation_loop.py` budget-exhaustion path now calls `_record_task_failure(outcome="timed_out")` instead of `kanban_block`. Budget exhaustion is genuinely a timeout-shaped failure (the task ran out of allowed iterations), so this is more honest semantics; it also routes through the unified failure counter, so repeated budget exhaustions trip the circuit breaker and the task auto-blocks with `gave_up` after `failure_limit` retries. GAP 3: `release_stale_claims` uses `_pid_alive(worker_pid)` only and ignores `last_heartbeat_at`. Reporter observed a 91-min run that held its claim with frozen heartbeat because the worker entered a logic loop with no tool calls — `_pid_alive` kept returning True so the claim was extended every 15 minutes indefinitely. Fix: heartbeat-stale backstop. If `last_heartbeat_at` is set AND older than `DEFAULT_CLAIM_HEARTBEAT_MAX_STALE_SECONDS` (default 1h), reclaim even if the PID is alive. NULL `last_heartbeat_at` preserves backward compatibility (no heartbeat yet = extend, as before). The reclaim event payload now includes a `heartbeat_stale` boolean so operators see why a live-PID worker was reclaimed. This works cleanly in concert with PR #34418 (#31752 runtime → heartbeat bridge): once `_touch_activity` keeps `last_heartbeat_at` fresh as a side effect of normal API traffic, the backstop only fires for genuinely wedged workers (no chunks, no tool results, no progress at all). Co-authored-by: baofuen <45189813+baofuen@users.noreply.github.com>
This commit is contained in:
@ -4303,36 +4303,54 @@ def run_conversation(
|
||||
)
|
||||
final_response = agent._handle_max_iterations(messages, api_call_count)
|
||||
|
||||
# If running as a kanban worker, block the task so the dispatcher
|
||||
# knows the worker could not complete (rather than treating it as a
|
||||
# If running as a kanban worker, signal the dispatcher that the
|
||||
# worker could not complete (rather than treating it as a
|
||||
# protocol violation). The agent loop strips tools before calling
|
||||
# _handle_max_iterations, so the model cannot call kanban_block
|
||||
# itself — we must do it on its behalf.
|
||||
#
|
||||
# We route through ``_record_task_failure(outcome="timed_out")``
|
||||
# rather than ``kanban_block`` so this counts toward the
|
||||
# ``consecutive_failures`` counter and the dispatcher's
|
||||
# ``failure_limit`` circuit breaker (#29747 gap 2). Without this,
|
||||
# a task whose worker keeps exhausting its budget would block
|
||||
# silently each run, get auto-promoted by the operator (or never
|
||||
# surface), and re-block in an endless loop with no signal.
|
||||
_kanban_task = os.environ.get("HERMES_KANBAN_TASK")
|
||||
if _kanban_task:
|
||||
try:
|
||||
_ra().handle_function_call(
|
||||
"kanban_block",
|
||||
{
|
||||
"task_id": _kanban_task,
|
||||
"reason": (
|
||||
from hermes_cli import kanban_db as _kb
|
||||
_conn = _kb.connect()
|
||||
try:
|
||||
_kb._record_task_failure(
|
||||
_conn,
|
||||
_kanban_task,
|
||||
error=(
|
||||
f"Iteration budget exhausted "
|
||||
f"({api_call_count}/{agent.max_iterations}) — "
|
||||
"task could not complete within the allowed "
|
||||
"iterations"
|
||||
),
|
||||
},
|
||||
task_id=effective_task_id,
|
||||
)
|
||||
logger.info(
|
||||
"kanban_block called for task %s after iteration "
|
||||
"exhaustion (%d/%d)",
|
||||
_kanban_task, api_call_count, agent.max_iterations,
|
||||
)
|
||||
outcome="timed_out",
|
||||
release_claim=True,
|
||||
end_run=True,
|
||||
event_payload_extra={
|
||||
"budget_used": api_call_count,
|
||||
"budget_max": agent.max_iterations,
|
||||
},
|
||||
)
|
||||
logger.info(
|
||||
"recorded budget-exhausted failure for task %s (%d/%d)",
|
||||
_kanban_task, api_call_count, agent.max_iterations,
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to call kanban_block after iteration "
|
||||
"exhaustion for task %s",
|
||||
"Failed to record budget-exhausted failure for task %s",
|
||||
_kanban_task,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
@ -110,6 +110,16 @@ _IS_WINDOWS = sys.platform == "win32"
|
||||
# long single-call MCP workflows.
|
||||
DEFAULT_CLAIM_TTL_SECONDS = 15 * 60
|
||||
|
||||
# If a worker's PID is still alive but its ``last_heartbeat_at`` is
|
||||
# older than this when ``release_stale_claims`` runs, treat the worker
|
||||
# as wedged and reclaim regardless of PID liveness (#29747 gap 3).
|
||||
# This catches the logic-loop case where the process is technically
|
||||
# running but not making observable progress. ``_touch_activity``
|
||||
# bridges chunk-level liveness into ``last_heartbeat_at`` via #31752,
|
||||
# so any genuinely active worker keeps its heartbeat fresh as a side
|
||||
# effect of normal API traffic.
|
||||
DEFAULT_CLAIM_HEARTBEAT_MAX_STALE_SECONDS = 60 * 60
|
||||
|
||||
|
||||
def _resolve_claim_ttl_seconds(ttl_seconds: Optional[int] = None) -> int:
|
||||
"""Return the effective claim TTL, honoring the kanban env override.
|
||||
@ -2740,9 +2750,19 @@ def release_stale_claims(
|
||||
then-immediately-reclaim loop seen on slow models that spend longer
|
||||
than ``DEFAULT_CLAIM_TTL_SECONDS`` inside a single tool-free LLM
|
||||
call (#23025): no tool calls means no ``kanban_heartbeat``, even
|
||||
though the subprocess is healthy. ``enforce_max_runtime`` and
|
||||
``detect_crashed_workers`` remain the upper bounds for genuinely
|
||||
wedged or dead workers.
|
||||
though the subprocess is healthy.
|
||||
|
||||
Backstop (#29747 gap 3): if the worker's PID is still alive but its
|
||||
``last_heartbeat_at`` is stale by more than
|
||||
``DEFAULT_CLAIM_HEARTBEAT_MAX_STALE_SECONDS`` (1h), the worker has
|
||||
been making no observable progress and we reclaim anyway — even if
|
||||
``_pid_alive`` is still true. This catches the wedged-in-a-logic-loop
|
||||
case where the process is technically running but accomplishing
|
||||
nothing. ``_touch_activity`` (run_agent.py) bridges chunk-level
|
||||
liveness into ``last_heartbeat_at`` via #31752, so any genuinely
|
||||
active worker keeps its heartbeat fresh as a side effect of normal
|
||||
API traffic. ``enforce_max_runtime`` and ``detect_crashed_workers``
|
||||
remain the upper bounds for genuinely wedged or dead workers.
|
||||
|
||||
Returns the number of stale claims actually reclaimed (live-pid
|
||||
extensions don't count). Safe to call often.
|
||||
@ -2760,7 +2780,21 @@ def release_stale_claims(
|
||||
for row in stale:
|
||||
lock = row["claim_lock"] or ""
|
||||
host_local = lock.startswith(host_prefix)
|
||||
if host_local and row["worker_pid"] and _pid_alive(row["worker_pid"]):
|
||||
hb = row["last_heartbeat_at"]
|
||||
# Heartbeat staleness backstop: if we have a heartbeat at all
|
||||
# and it's older than the max-stale threshold, the worker is
|
||||
# not making observable progress. Reclaim instead of extending,
|
||||
# even if the PID is still alive (it's likely in a logic loop).
|
||||
heartbeat_stale = (
|
||||
hb is not None
|
||||
and (now - int(hb)) > DEFAULT_CLAIM_HEARTBEAT_MAX_STALE_SECONDS
|
||||
)
|
||||
if (
|
||||
host_local
|
||||
and row["worker_pid"]
|
||||
and _pid_alive(row["worker_pid"])
|
||||
and not heartbeat_stale
|
||||
):
|
||||
new_expires = now + _resolve_claim_ttl_seconds()
|
||||
with write_txn(conn):
|
||||
cur = conn.execute(
|
||||
@ -2829,6 +2863,7 @@ def release_stale_claims(
|
||||
),
|
||||
"now": now,
|
||||
"host_local": host_local,
|
||||
"heartbeat_stale": bool(heartbeat_stale),
|
||||
}
|
||||
payload.update(termination)
|
||||
_append_event(
|
||||
|
||||
@ -791,6 +791,83 @@ def _rule_stuck_in_blocked(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
)]
|
||||
|
||||
|
||||
def _rule_block_unblock_cycling(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
"""Task has cycled through blocked → unblocked many times — the
|
||||
``unblock`` is not fixing the underlying problem and the worker
|
||||
keeps re-blocking for substantially the same reason.
|
||||
|
||||
``_rule_stuck_in_blocked`` resets its timer on any ``commented`` /
|
||||
``unblocked`` event, so a task that cycles every few minutes is
|
||||
invisible to it regardless of how many times it cycles (#29747
|
||||
gap 1). This rule complements that one by counting block→unblock
|
||||
cycles in a sliding window.
|
||||
|
||||
Threshold: cfg["block_cycle_threshold"] (default 3) cycles within
|
||||
cfg["block_cycle_window_seconds"] (default 24h).
|
||||
"""
|
||||
threshold = _positive_int(cfg.get("block_cycle_threshold"), 3)
|
||||
window_seconds = float(cfg.get("block_cycle_window_seconds", 24 * 3600))
|
||||
cycle_cutoff = now - window_seconds
|
||||
|
||||
# Walk events chronologically (arrival order — callers pre-sort by
|
||||
# id, which is the canonical chronological order; ``created_at``
|
||||
# alone is insufficient because multiple events can share the same
|
||||
# second). Count "blocked after unblocked" transitions: every time
|
||||
# a blocked event follows at least one unblocked event since the
|
||||
# last cycle was counted, that's a new cycle.
|
||||
cycles = 0
|
||||
seen_unblock_since_last_cycle = False
|
||||
initial_blocked_ts = 0
|
||||
last_cycle_blocked_ts = 0
|
||||
for ev in events:
|
||||
ts = _event_ts(ev)
|
||||
if ts < cycle_cutoff:
|
||||
continue
|
||||
kind = _event_kind(ev)
|
||||
if kind == "blocked":
|
||||
if initial_blocked_ts == 0:
|
||||
initial_blocked_ts = ts
|
||||
if seen_unblock_since_last_cycle:
|
||||
cycles += 1
|
||||
last_cycle_blocked_ts = ts
|
||||
seen_unblock_since_last_cycle = False
|
||||
elif kind == "unblocked":
|
||||
seen_unblock_since_last_cycle = True
|
||||
|
||||
if cycles < threshold:
|
||||
return []
|
||||
|
||||
task_id = _task_field(task, "id")
|
||||
actions: list[DiagnosticAction] = []
|
||||
if task_id:
|
||||
actions.append(DiagnosticAction(
|
||||
kind="cli_hint",
|
||||
label=f"Check block reasons: hermes kanban events {task_id}",
|
||||
payload={"command": f"hermes kanban events {task_id}"},
|
||||
suggested=True,
|
||||
))
|
||||
return [Diagnostic(
|
||||
kind="block_unblock_cycling",
|
||||
severity="warning",
|
||||
title=f"Task block→unblock cycled {cycles}x in {int(window_seconds/3600)}h",
|
||||
detail=(
|
||||
f"This task has been blocked {cycles} times after being "
|
||||
"unblocked, suggesting the unblock is not addressing the "
|
||||
"root cause and the worker keeps hitting the same wall. "
|
||||
"Review the block reasons in the event history; a different "
|
||||
"intervention (reassign, change scope, archive) may be needed."
|
||||
),
|
||||
actions=actions,
|
||||
first_seen_at=int(initial_blocked_ts) if initial_blocked_ts else int(now),
|
||||
last_seen_at=int(last_cycle_blocked_ts) if last_cycle_blocked_ts else int(now),
|
||||
count=cycles,
|
||||
data={
|
||||
"cycles": cycles,
|
||||
"window_seconds": int(window_seconds),
|
||||
},
|
||||
)]
|
||||
|
||||
|
||||
def _rule_stranded_in_ready(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
"""Task has been in ``ready`` status for too long without any worker
|
||||
claiming it.
|
||||
@ -923,6 +1000,7 @@ _RULES: list[RuleFn] = [
|
||||
_rule_repeated_failures,
|
||||
_rule_repeated_crashes,
|
||||
_rule_stuck_in_blocked,
|
||||
_rule_block_unblock_cycling,
|
||||
_rule_stranded_in_ready,
|
||||
]
|
||||
|
||||
@ -936,6 +1014,7 @@ DIAGNOSTIC_KINDS = (
|
||||
"repeated_failures",
|
||||
"repeated_crashes",
|
||||
"stuck_in_blocked",
|
||||
"block_unblock_cycling",
|
||||
"stranded_in_ready",
|
||||
)
|
||||
|
||||
|
||||
@ -62,6 +62,7 @@ AUTHOR_MAP = {
|
||||
"211828103+julio-cloudvisor@users.noreply.github.com": "julio-cloudvisor",
|
||||
"17778+kweiner@users.noreply.github.com": "kweiner",
|
||||
"223516181+faisfamilytravel@users.noreply.github.com": "faisfamilytravel",
|
||||
"45189813+baofuen@users.noreply.github.com": "baofuen",
|
||||
# teknium (multiple emails)
|
||||
"teknium1@gmail.com": "teknium1",
|
||||
"kenyon1977@gmail.com": "kenyonxu",
|
||||
|
||||
Reference in New Issue
Block a user