feat(video_gen): route FAL video gen through managed Nous gateway

Wire plugins/video_gen/fal/__init__.py to use the same
_ManagedFalSyncClient pattern that image gen already uses.

Changes:
- Add managed gateway resolution, client caching, and
  _submit_fal_video_request() that routes between direct FAL_KEY
  and Nous gateway modes
- Update is_available() to return True when either FAL_KEY or the
  managed gateway is reachable
- Update generate() to use submit+get handle pattern instead of
  fal_client.subscribe() directly
- Fix happy-horse endpoint namespace: fal-ai/ → alibaba/ (matches
  the tool-gateway allowlist from fal-video-gen branch)
- Surface actionable error on 4xx gateway rejections

Tests:
- 4 new tests in test_managed_media_gateways.py (gateway routing,
  client reuse, direct mode fallback, alibaba namespace)
- Updated existing test_fal_plugin.py fixture to use submit/handle
  pattern and patch _resolve_managed_fal_video_gateway for isolation
This commit is contained in:
alt-glitch
2026-05-27 19:03:12 +05:30
committed by Siddharth Balyan
parent 5cd0673217
commit d04b3c193e
3 changed files with 263 additions and 23 deletions

View File

@ -17,7 +17,7 @@ Model families (each with t2v + i2v endpoints):
veo3.1 fal-ai/veo3.1 / fal-ai/veo3.1/image-to-video
seedance-2.0 bytedance/seedance-2.0/text-to-video / bytedance/seedance-2.0/image-to-video
kling-v3-4k fal-ai/kling-video/v3/4k/text-to-video / fal-ai/kling-video/v3/4k/image-to-video
happy-horse fal-ai/happy-horse/text-to-video / fal-ai/happy-horse/image-to-video
happy-horse alibaba/happy-horse/text-to-video / alibaba/happy-horse/image-to-video
Selection precedence for the active family:
1. ``model=`` arg from the tool call
@ -26,14 +26,16 @@ Selection precedence for the active family:
4. ``video_gen.model`` in ``config.yaml`` (when it's one of our family IDs)
5. ``DEFAULT_MODEL``
Authentication via ``FAL_KEY``. Output is an HTTPS URL from FAL's CDN; the
gateway downloads and delivers it.
Authentication via ``FAL_KEY`` or the managed Nous gateway. Output is an
HTTPS URL from FAL's CDN; the gateway downloads and delivers it.
"""
from __future__ import annotations
import logging
import os
import threading
import uuid
from typing import Any, Dict, List, Optional, Tuple
from agent.video_gen_provider import (
@ -148,8 +150,8 @@ FAL_FAMILIES: Dict[str, Dict[str, Any]] = {
"price": "premium",
"strengths": "Alibaba. New model, sparse public docs — conservative defaults.",
"tier": "premium",
"text_endpoint": "fal-ai/happy-horse/text-to-video",
"image_endpoint": "fal-ai/happy-horse/image-to-video",
"text_endpoint": "alibaba/happy-horse/text-to-video",
"image_endpoint": "alibaba/happy-horse/image-to-video",
# Docs don't expose duration/aspect/resolution — let the endpoint
# apply its own defaults.
"aspect_ratios": None,
@ -302,6 +304,92 @@ def _load_fal_client() -> Any:
return _fal_client
# ---------------------------------------------------------------------------
# Managed FAL gateway (Nous Subscription)
# ---------------------------------------------------------------------------
_managed_fal_video_client: Any = None
_managed_fal_video_client_config: Any = None
_managed_fal_video_client_lock = threading.Lock()
def _resolve_managed_fal_video_gateway():
"""Return managed fal-queue gateway config when the user prefers the gateway
or direct FAL credentials are absent."""
from tools.tool_backend_helpers import fal_key_is_configured, prefers_gateway
if fal_key_is_configured() and not prefers_gateway("video_gen"):
return None
from tools.managed_tool_gateway import resolve_managed_tool_gateway
return resolve_managed_tool_gateway("fal-queue")
def _get_managed_fal_video_client(managed_gateway):
"""Reuse the managed FAL client so its internal httpx.Client is not leaked per call."""
global _managed_fal_video_client, _managed_fal_video_client_config
from tools.fal_common import _ManagedFalSyncClient
client_config = (
managed_gateway.gateway_origin.rstrip("/"),
managed_gateway.nous_user_token,
)
with _managed_fal_video_client_lock:
if _managed_fal_video_client is not None and _managed_fal_video_client_config == client_config:
return _managed_fal_video_client
_load_fal_client()
_managed_fal_video_client = _ManagedFalSyncClient(
_fal_client,
key=managed_gateway.nous_user_token,
queue_run_origin=managed_gateway.gateway_origin,
)
_managed_fal_video_client_config = client_config
return _managed_fal_video_client
def _submit_fal_video_request(endpoint: str, arguments: Dict[str, Any]):
"""Submit a FAL video request using direct credentials or the managed queue gateway.
Returns a request handle whose ``.get()`` blocks until the result is ready.
"""
_load_fal_client()
request_headers = {"x-idempotency-key": str(uuid.uuid4())}
managed_gateway = _resolve_managed_fal_video_gateway()
if managed_gateway is None:
return _fal_client.submit(endpoint, arguments=arguments, headers=request_headers)
managed_client = _get_managed_fal_video_client(managed_gateway)
try:
return managed_client.submit(
endpoint,
arguments=arguments,
headers=request_headers,
)
except Exception as exc:
from tools.fal_common import _extract_http_status
status = _extract_http_status(exc)
if status is not None and 400 <= status < 500:
raise ValueError(
f"Nous Subscription gateway rejected endpoint '{endpoint}' "
f"(HTTP {status}). This model may not yet be enabled on "
f"the Nous Portal's FAL proxy. Either:\n"
f" • Set FAL_KEY in your environment to use FAL.ai directly, or\n"
f" • Pick a different model via `hermes tools` → Video Generation."
) from exc
raise
def _check_fal_video_available() -> bool:
"""True if the FAL.ai video backend is reachable (direct key or managed gateway)."""
from tools.tool_backend_helpers import fal_key_is_configured
if fal_key_is_configured():
return True
return _resolve_managed_fal_video_gateway() is not None
# ---------------------------------------------------------------------------
# Provider
# ---------------------------------------------------------------------------
@ -323,13 +411,10 @@ class FALVideoGenProvider(VideoGenProvider):
return "FAL"
def is_available(self) -> bool:
if not os.environ.get("FAL_KEY", "").strip():
return False
try:
import fal_client # noqa: F401
except ImportError:
return _check_fal_video_available()
except Exception: # noqa: BLE001 — never break the picker
return False
return True
def list_models(self) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
@ -394,11 +479,12 @@ class FALVideoGenProvider(VideoGenProvider):
seed: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
if not os.environ.get("FAL_KEY", "").strip():
if not _check_fal_video_available():
return error_response(
error=(
"FAL_KEY not set. Run `hermes tools` → Video Generation "
"→ FAL to configure."
"No FAL backend available. Either set FAL_KEY "
"(run `hermes tools` → Video Generation → FAL to configure) "
"or sign in to Nous (`hermes setup`) for managed gateway access."
),
error_type="auth_required",
provider="fal",
@ -406,7 +492,7 @@ class FALVideoGenProvider(VideoGenProvider):
)
try:
fal_client = _load_fal_client()
_load_fal_client()
except ImportError:
return error_response(
error="fal_client Python package not installed (pip install fal-client)",
@ -467,11 +553,8 @@ class FALVideoGenProvider(VideoGenProvider):
)
try:
result = fal_client.subscribe(
endpoint,
arguments=payload,
with_logs=False,
)
handle = _submit_fal_video_request(endpoint, payload)
result = handle.get()
except Exception as exc:
logger.warning(
"FAL video gen failed (family=%s, endpoint=%s): %s",

View File

@ -85,15 +85,21 @@ def test_fal_list_models_advertises_both_modalities():
def test_fal_unavailable_without_key(monkeypatch):
from plugins.video_gen.fal import FALVideoGenProvider
from plugins.video_gen import fal as fal_plugin
monkeypatch.delenv("FAL_KEY", raising=False)
# Also ensure managed gateway is unavailable
monkeypatch.setattr(fal_plugin, "_resolve_managed_fal_video_gateway", lambda: None)
assert FALVideoGenProvider().is_available() is False
def test_fal_generate_requires_fal_key(monkeypatch):
from plugins.video_gen.fal import FALVideoGenProvider
from plugins.video_gen import fal as fal_plugin
monkeypatch.delenv("FAL_KEY", raising=False)
# Also ensure managed gateway is unavailable
monkeypatch.setattr(fal_plugin, "_resolve_managed_fal_video_gateway", lambda: None)
result = FALVideoGenProvider().generate("a happy dog")
assert result["success"] is False
assert result["error_type"] == "auth_required"
@ -104,25 +110,34 @@ class TestFamilyRouting:
@pytest.fixture
def with_fake_fal(self, monkeypatch):
"""Stub fal_client.subscribe to capture which endpoint we hit."""
"""Stub fal_client.submit to capture which endpoint we hit."""
import sys
import types
captured = {"endpoint": None, "arguments": None}
class FakeHandle:
def get(self):
return {"video": {"url": "https://fake/out.mp4"}}
fake = types.ModuleType("fal_client")
def _subscribe(endpoint, arguments=None, with_logs=False):
def _submit(endpoint, arguments=None, headers=None):
captured["endpoint"] = endpoint
captured["arguments"] = arguments
return {"video": {"url": "https://fake/out.mp4"}}
fake.subscribe = _subscribe # type: ignore
return FakeHandle()
fake.submit = _submit # type: ignore
monkeypatch.setitem(sys.modules, "fal_client", fake)
# Reset the lazy global so it picks up our stub
from plugins.video_gen import fal as fal_plugin
fal_plugin._fal_client = None
# Also reset the managed client cache
fal_plugin._managed_fal_video_client = None
fal_plugin._managed_fal_video_client_config = None
monkeypatch.setenv("FAL_KEY", "test")
# Force direct mode — no managed gateway
monkeypatch.setattr(fal_plugin, "_resolve_managed_fal_video_gateway", lambda: None)
return captured
def test_text_to_video_routes_to_text_endpoint(self, with_fake_fal):

View File

@ -305,3 +305,145 @@ def test_transcription_uses_model_specific_response_formats(monkeypatch, tmp_pat
assert json_result["transcript"] == "hello from gpt-4o"
assert json_capture["transcription_kwargs"]["response_format"] == "json"
assert json_capture["close_calls"] == 1
PLUGINS_DIR = Path(__file__).resolve().parents[2] / "plugins"
def _load_video_gen_plugin(monkeypatch):
"""Load the FAL video gen plugin in isolation."""
_install_fake_tools_package()
# Also need the agent.video_gen_provider ABC
agent_dir = Path(__file__).resolve().parents[2] / "agent"
spec = spec_from_file_location(
"agent.video_gen_provider",
agent_dir / "video_gen_provider.py",
)
assert spec and spec.loader
mod = module_from_spec(spec)
sys.modules["agent.video_gen_provider"] = mod
spec.loader.exec_module(mod)
# Load the plugin
plugin_init = PLUGINS_DIR / "video_gen" / "fal" / "__init__.py"
spec = spec_from_file_location("plugins.video_gen.fal", plugin_init)
assert spec and spec.loader
plugin_mod = module_from_spec(spec)
sys.modules["plugins.video_gen.fal"] = plugin_mod
spec.loader.exec_module(plugin_mod)
return plugin_mod
def test_video_gen_managed_fal_submit_uses_gateway(monkeypatch):
"""Video gen routes through the managed gateway when FAL_KEY is absent."""
captured = {}
fake_fal = _install_fake_fal_client(captured)
monkeypatch.delenv("FAL_KEY", raising=False)
monkeypatch.setenv("FAL_QUEUE_GATEWAY_URL", "http://127.0.0.1:3009")
monkeypatch.setenv("TOOL_GATEWAY_USER_TOKEN", "nous-video-token")
plugin = _load_video_gen_plugin(monkeypatch)
# Patch uuid for deterministic idempotency key
monkeypatch.setattr(plugin.uuid, "uuid4", lambda: "video-submit-456")
plugin._submit_fal_video_request(
"fal-ai/pixverse/v6/text-to-video",
{"prompt": "a cat riding a bicycle", "duration": "5"},
)
assert captured["submit_via"] == "managed_client"
assert captured["client_key"] == "nous-video-token"
assert captured["submit_url"] == "http://127.0.0.1:3009/fal-ai/pixverse/v6/text-to-video"
assert captured["method"] == "POST"
assert captured["arguments"] == {"prompt": "a cat riding a bicycle", "duration": "5"}
assert captured["headers"] == {"x-idempotency-key": "video-submit-456"}
assert captured["sync_client_inits"] == 1
def test_video_gen_managed_client_reused_across_calls(monkeypatch):
"""The managed video client is cached and reused across requests."""
captured = {}
_install_fake_fal_client(captured)
monkeypatch.delenv("FAL_KEY", raising=False)
monkeypatch.setenv("FAL_QUEUE_GATEWAY_URL", "http://127.0.0.1:3009")
monkeypatch.setenv("TOOL_GATEWAY_USER_TOKEN", "nous-video-token")
plugin = _load_video_gen_plugin(monkeypatch)
plugin._submit_fal_video_request("fal-ai/pixverse/v6/text-to-video", {"prompt": "first"})
first_client = captured["http_client"]
plugin._submit_fal_video_request("fal-ai/pixverse/v6/text-to-video", {"prompt": "second"})
assert captured["sync_client_inits"] == 1
assert captured["http_client"] is first_client
def test_video_gen_direct_mode_when_fal_key_set(monkeypatch):
"""When FAL_KEY is set and gateway not preferred, uses direct fal_client.submit."""
captured = {}
_install_fake_fal_client(captured)
monkeypatch.setenv("FAL_KEY", "direct-fal-key-123")
monkeypatch.delenv("FAL_QUEUE_GATEWAY_URL", raising=False)
monkeypatch.delenv("TOOL_GATEWAY_USER_TOKEN", raising=False)
plugin = _load_video_gen_plugin(monkeypatch)
monkeypatch.setattr(plugin.uuid, "uuid4", lambda: "direct-456")
# Trigger the lazy load so _fal_client is populated from our fake
plugin._load_fal_client()
# In direct mode, fal_client.submit is the module-level function.
# Our fake raises AssertionError from the managed path, so we need
# to patch it to actually capture the call.
direct_captured = {}
def direct_submit(endpoint, arguments=None, headers=None):
direct_captured["endpoint"] = endpoint
direct_captured["arguments"] = arguments
direct_captured["headers"] = headers
# Return a mock handle
class FakeHandle:
def get(self):
return {"video": {"url": "https://fal.media/result.mp4"}}
return FakeHandle()
plugin._fal_client.submit = direct_submit
plugin._submit_fal_video_request(
"fal-ai/pixverse/v6/text-to-video",
{"prompt": "test direct"},
)
assert direct_captured["endpoint"] == "fal-ai/pixverse/v6/text-to-video"
assert direct_captured["arguments"] == {"prompt": "test direct"}
assert direct_captured["headers"] == {"x-idempotency-key": "direct-456"}
# Managed client should NOT have been initialized
assert "submit_via" not in captured
def test_video_gen_happy_horse_uses_alibaba_namespace():
"""Verify the happy-horse family uses alibaba/ not fal-ai/ endpoints."""
_install_fake_tools_package()
# Load just the plugin module to check the catalog
plugin_init = PLUGINS_DIR / "video_gen" / "fal" / "__init__.py"
agent_dir = Path(__file__).resolve().parents[2] / "agent"
spec = spec_from_file_location(
"agent.video_gen_provider",
agent_dir / "video_gen_provider.py",
)
mod = module_from_spec(spec)
sys.modules["agent.video_gen_provider"] = mod
spec.loader.exec_module(mod)
spec = spec_from_file_location("plugins.video_gen.fal", plugin_init)
plugin_mod = module_from_spec(spec)
sys.modules["plugins.video_gen.fal"] = plugin_mod
spec.loader.exec_module(plugin_mod)
hh = plugin_mod.FAL_FAMILIES["happy-horse"]
assert hh["text_endpoint"] == "alibaba/happy-horse/text-to-video"
assert hh["image_endpoint"] == "alibaba/happy-horse/image-to-video"