From fd9931a0f66529c1471771ee1ac49b20b550b034 Mon Sep 17 00:00:00 2001 From: Kaloyan <253267049+kaloyan-inherent@users.noreply.github.com> Date: Sun, 17 May 2026 12:02:48 +0000 Subject: [PATCH 1/2] vllm asyncmpc client: seriealise requests to input socket Signed-off-by: Kaloyan <253267049+kaloyan-inherent@users.noreply.github.com> --- .../generation/vllm/vllm_worker_async.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/nemo_rl/models/generation/vllm/vllm_worker_async.py b/nemo_rl/models/generation/vllm/vllm_worker_async.py index d6089ffb06..9f31770118 100644 --- a/nemo_rl/models/generation/vllm/vllm_worker_async.py +++ b/nemo_rl/models/generation/vllm/vllm_worker_async.py @@ -184,6 +184,8 @@ def _create_engine(self, llm_kwargs: dict[str, Any]) -> None: self.server_thread, self.base_url, self.http_server = None, None, None if self.cfg["vllm_cfg"].get("expose_http_server"): + # Must run before _setup_vllm_server spawns the uvicorn thread. + self._install_engine_input_socket_lock() self.server_thread, self.base_url, self.http_server = ( self._setup_vllm_server() ) @@ -194,6 +196,37 @@ def _create_engine(self, llm_kwargs: dict[str, Any]) -> None: if self.cfg["vllm_cfg"].get("enable_vllm_metrics_logger", False): self._start_vllm_metrics_logger() + def _install_engine_input_socket_lock(self) -> None: + """Serialise sends on AsyncMPClient.input_socket across OS threads + to prevent race conditions that block the vLLM engine (e.g. during + in flight weight updates in async grpo). + """ + try: + shadow_sock = self.llm.engine_core.input_socket._shadow_sock + except AttributeError as exc: + # ``self.llm`` already owns EngineCore subprocesses; tear them down + # before re-raising so we don't leak when actor __init__ aborts. + try: + self.llm.shutdown() + except Exception: + pass + raise RuntimeError( + "AsyncMPClient.input_socket._shadow_sock is unreachable; vLLM " + "internals have shifted. Update this guard before exposing the " + "vLLM HTTP server." + ) from exc + + lock = threading.Lock() + original_send_multipart = shadow_sock.send_multipart + + def locked_send_multipart(*args: Any, **kwargs: Any) -> Any: + with lock: + return original_send_multipart(*args, **kwargs) + + # Replace the bound method on this socket instance only; other zmq + # sockets in the process are unaffected. + shadow_sock.send_multipart = locked_send_multipart # type: ignore[assignment] + def _start_vllm_metrics_logger(self) -> None: """Start a background thread that periodically collects vLLM logger metrics. From 225d7afa6a54059b400e6377907a38a058bbe55b Mon Sep 17 00:00:00 2001 From: Kaloyan <253267049+kaloyan-inherent@users.noreply.github.com> Date: Thu, 21 May 2026 17:25:18 +0000 Subject: [PATCH 2/2] pr review address: better comment + remove defensive guard Signed-off-by: Kaloyan <253267049+kaloyan-inherent@users.noreply.github.com> --- .../generation/vllm/vllm_worker_async.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/nemo_rl/models/generation/vllm/vllm_worker_async.py b/nemo_rl/models/generation/vllm/vllm_worker_async.py index 9f31770118..93b1d57050 100644 --- a/nemo_rl/models/generation/vllm/vllm_worker_async.py +++ b/nemo_rl/models/generation/vllm/vllm_worker_async.py @@ -184,7 +184,8 @@ def _create_engine(self, llm_kwargs: dict[str, Any]) -> None: self.server_thread, self.base_url, self.http_server = None, None, None if self.cfg["vllm_cfg"].get("expose_http_server"): - # Must run before _setup_vllm_server spawns the uvicorn thread. + # Must run after AsyncLLM.from_engine_args and before + # _setup_vllm_server spawns the uvicorn thread. self._install_engine_input_socket_lock() self.server_thread, self.base_url, self.http_server = ( self._setup_vllm_server() @@ -201,20 +202,7 @@ def _install_engine_input_socket_lock(self) -> None: to prevent race conditions that block the vLLM engine (e.g. during in flight weight updates in async grpo). """ - try: - shadow_sock = self.llm.engine_core.input_socket._shadow_sock - except AttributeError as exc: - # ``self.llm`` already owns EngineCore subprocesses; tear them down - # before re-raising so we don't leak when actor __init__ aborts. - try: - self.llm.shutdown() - except Exception: - pass - raise RuntimeError( - "AsyncMPClient.input_socket._shadow_sock is unreachable; vLLM " - "internals have shifted. Update this guard before exposing the " - "vLLM HTTP server." - ) from exc + shadow_sock = self.llm.engine_core.input_socket._shadow_sock lock = threading.Lock() original_send_multipart = shadow_sock.send_multipart