Add embedded NVIDIA Dynamo support to vLLM ModelHandler#38701
Add embedded NVIDIA Dynamo support to vLLM ModelHandler#38701akshayjadiyanv wants to merge 3 commits into
Conversation
VLLMCompletionsModelHandler and VLLMChatModelHandler gain two keyword-only parameters, use_dynamo (default False) and dynamo_frontend_kwargs. When use_dynamo=True, the handler launches a dynamo.frontend process as the OpenAI-compatible local endpoint plus a separate dynamo.vllm worker, instead of vllm.entrypoints.openai.api_server. The existing native-vLLM path is unchanged when the flag is absent. The example pipeline vllm_text_completion.py gains --use_dynamo and --max_tokens flags. validate_inference_args is now a no-op on both handlers so OpenAI-style request kwargs (e.g. max_tokens) can be passed through RunInference. A new unit-test module covers process-launch behaviour for both paths. This supersedes apache#36966 (now closed) and rebases the embedded-Dynamo approach onto current master, preserving the recent batching-kwargs additions to the ModelHandler base. Co-authored-by: Danny McCormick <damccorm@google.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enables embedded NVIDIA Dynamo support within the Apache Beam vLLM ModelHandler. By allowing the execution of Dynamo frontend and worker processes in-process, it provides a streamlined way to leverage Dynamo's capabilities for local inference tasks. The implementation includes robust process management for the required etcd discovery service and ensures backward compatibility with existing native vLLM workflows. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for running NVIDIA Dynamo as an embedded vLLM engine within Apache Beam's vLLM inference handlers, including managing a local etcd process when external endpoints are not configured. The review feedback focuses on improving the robustness of process management and resource cleanup within _VLLMModelServer. Specifically, the reviewer recommends tracking and cleaning up the temporary etcd data directory to prevent disk leaks, wrapping process termination in try...except OSError blocks to avoid crashes, importing modules locally during interpreter shutdown, and adding the etcd process to the connectivity polling loop for fail-fast behavior.
| self._model_name = model_name | ||
| self._vllm_server_kwargs = vllm_server_kwargs | ||
| self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {} | ||
| self._server_started = False | ||
| self._server_process = None | ||
| self._dynamo_process = None | ||
| self._etcd_process = None | ||
| self._managed_etcd_endpoint = None | ||
| self._server_port: int = -1 | ||
| self._server_process_lock = threading.RLock() | ||
| self._use_dynamo = use_dynamo |
There was a problem hiding this comment.
Initialize self._etcd_data_dir to None in __init__ to track the temporary etcd data directory for proper cleanup.
| self._model_name = model_name | |
| self._vllm_server_kwargs = vllm_server_kwargs | |
| self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {} | |
| self._server_started = False | |
| self._server_process = None | |
| self._dynamo_process = None | |
| self._etcd_process = None | |
| self._managed_etcd_endpoint = None | |
| self._server_port: int = -1 | |
| self._server_process_lock = threading.RLock() | |
| self._use_dynamo = use_dynamo | |
| self._model_name = model_name | |
| self._vllm_server_kwargs = vllm_server_kwargs | |
| self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {} | |
| self._server_started = False | |
| self._server_process = None | |
| self._dynamo_process = None | |
| self._etcd_process = None | |
| self._etcd_data_dir = None | |
| self._managed_etcd_endpoint = None | |
| self._server_port: int = -1 | |
| self._server_process_lock = threading.RLock() | |
| self._use_dynamo = use_dynamo |
There was a problem hiding this comment.
Done in 329dd28 — self._etcd_data_dir: Optional[str] = None added next to the other process handles in __init__.
| @staticmethod | ||
| def _stop_process(process: Optional[subprocess.Popen]) -> None: | ||
| if process is None or process.poll() is not None: | ||
| return | ||
| process.terminate() | ||
| try: | ||
| process.wait(timeout=10) | ||
| except subprocess.TimeoutExpired: | ||
| process.kill() | ||
| process.wait() |
There was a problem hiding this comment.
Wrap the process termination sequence in a try...except OSError block. If a process exits concurrently between the poll() check and terminate()/kill(), calling these methods can raise an OSError (such as ProcessLookupError), which would crash the cleanup sequence and leave other processes running.
| @staticmethod | |
| def _stop_process(process: Optional[subprocess.Popen]) -> None: | |
| if process is None or process.poll() is not None: | |
| return | |
| process.terminate() | |
| try: | |
| process.wait(timeout=10) | |
| except subprocess.TimeoutExpired: | |
| process.kill() | |
| process.wait() | |
| @staticmethod | |
| def _stop_process(process: Optional[subprocess.Popen]) -> None: | |
| if process is None or process.poll() is not None: | |
| return | |
| try: | |
| process.terminate() | |
| try: | |
| process.wait(timeout=10) | |
| except subprocess.TimeoutExpired: | |
| process.kill() | |
| process.wait() | |
| except OSError: | |
| pass |
There was a problem hiding this comment.
Done in 329dd28 — the terminate / wait / kill calls are now wrapped in a single try/except OSError so a ProcessLookupError (or any other OS-level race) is swallowed instead of aborting the rest of _stop_processes.
| def _stop_processes(self) -> None: | ||
| self._stop_process(self._dynamo_process) | ||
| self._stop_process(self._server_process) | ||
| self._stop_process(self._etcd_process) | ||
| if (self._managed_etcd_endpoint is not None and | ||
| os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint): | ||
| del os.environ['ETCD_ENDPOINTS'] | ||
| self._dynamo_process = None | ||
| self._server_process = None | ||
| self._etcd_process = None | ||
| self._managed_etcd_endpoint = None | ||
| self._server_started = False | ||
| self._server_port = -1 |
There was a problem hiding this comment.
To prevent disk space leaks in /tmp from abandoned etcd data directories, and to avoid potential AttributeError or NameError during interpreter shutdown (when global modules like os or shutil might be garbage-collected), we should:
- Clean up the temporary etcd data directory using
shutil.rmtreein_stop_processes. - Import
osandshutillocally inside_stop_processesto ensure they are available during interpreter shutdown. - Use
local_os.environ.popto safely remove the environment variable without raising aKeyError.
def _stop_processes(self) -> None:
import os as local_os
import shutil as local_shutil
self._stop_process(self._dynamo_process)
self._stop_process(self._server_process)
self._stop_process(self._etcd_process)
if (self._managed_etcd_endpoint is not None and
local_os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
local_os.environ.pop('ETCD_ENDPOINTS', None)
if getattr(self, '_etcd_data_dir', None):
local_shutil.rmtree(self._etcd_data_dir, ignore_errors=True)
self._etcd_data_dir = None
self._dynamo_process = None
self._server_process = None
self._etcd_process = None
self._managed_etcd_endpoint = None
self._server_started = False
self._server_port = -1There was a problem hiding this comment.
Mostly accepted in 329dd28:
shutil.rmtree(self._etcd_data_dir, ignore_errors=True)is now called from_stop_processes, so worker restarts no longer leak/tmp/beam-dynamo-etcd-*dirs.del os.environ['ETCD_ENDPOINTS']was switched toos.environ.pop('ETCD_ENDPOINTS', None)so the cleanup is idempotent.
I skipped the local_os/local_shutil rebinding inside _stop_processes. The shutdown-safety concern is real, but os/shutil are already used unguarded throughout the rest of this class (_uses_embedded_etcd, _ensure_etcd, _wait_for_etcd), so threading the local-imports pattern through only _stop_processes is inconsistent and a bit hard to read. Instead I wrapped __del__ itself in try/except Exception — that gives the same finalizer-during-shutdown guarantee with a single guard and zero churn elsewhere.
| etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}' | ||
| etcd_data_dir = f'/tmp/{etcd_name}' | ||
| peer_port, = subprocess_server.pick_port(None) | ||
| etcd_cmd = [ | ||
| 'etcd', | ||
| '--name', | ||
| etcd_name, | ||
| '--listen-client-urls', | ||
| 'http://127.0.0.1:{{PORT}}', | ||
| '--advertise-client-urls', | ||
| 'http://127.0.0.1:{{PORT}}', | ||
| '--listen-peer-urls', | ||
| f'http://127.0.0.1:{peer_port}', | ||
| '--initial-advertise-peer-urls', | ||
| f'http://127.0.0.1:{peer_port}', | ||
| '--initial-cluster', | ||
| f'{etcd_name}=http://127.0.0.1:{peer_port}', | ||
| '--data-dir', | ||
| etcd_data_dir, | ||
| '--log-level', | ||
| 'warn', | ||
| ] |
There was a problem hiding this comment.
Store the temporary etcd data directory path in self._etcd_data_dir instead of a local variable so that it can be cleaned up when stopping the processes.
etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
self._etcd_data_dir = f'/tmp/{etcd_name}'
peer_port, = subprocess_server.pick_port(None)
etcd_cmd = [
'etcd',
'--name',
etcd_name,
'--listen-client-urls',
'http://127.0.0.1:{{PORT}}',
'--advertise-client-urls',
'http://127.0.0.1:{{PORT}}',
'--listen-peer-urls',
f'http://127.0.0.1:{peer_port}',
'--initial-advertise-peer-urls',
f'http://127.0.0.1:{peer_port}',
'--initial-cluster',
f'{etcd_name}=http://127.0.0.1:{peer_port}',
'--data-dir',
self._etcd_data_dir,
'--log-level',
'warn',
]There was a problem hiding this comment.
Done in 329dd28 — _ensure_etcd now writes the temp directory to self._etcd_data_dir and _stop_processes is the one place that owns the rmtree.
| while (time.time() - start_time < timeout_secs and | ||
| self._server_process.poll() is None and | ||
| (self._dynamo_process is None or | ||
| self._dynamo_process.poll() is None)): |
There was a problem hiding this comment.
Add a check for self._etcd_process in the connectivity polling loop. If the embedded etcd process dies, the frontend and engine won't be able to communicate, and the server will fail. Checking it here allows the loop to fail-fast instead of waiting for the full 10-minute timeout.
while (time.time() - start_time < timeout_secs and
self._server_process.poll() is None and
(self._dynamo_process is None or
self._dynamo_process.poll() is None) and
(self._etcd_process is None or
self._etcd_process.poll() is None)):There was a problem hiding this comment.
Done in 329dd28 — the check_connectivity loop now also exits early if self._etcd_process.poll() is not None, so a dead embedded etcd fails fast instead of waiting out the 10-minute timeout_secs.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Apply five robustness fixes flagged on PR apache#38701: - Track the temporary etcd data dir as self._etcd_data_dir and shutil.rmtree(..., ignore_errors=True) it in _stop_processes so worker restarts don't leak /tmp directories. - Wrap process.terminate() / process.wait() / process.kill() in a single try/except OSError to absorb the ProcessLookupError race when a process exits between poll() and the signal call. - Switch the ETCD_ENDPOINTS removal from `del os.environ[...]` to `os.environ.pop(..., None)` to be idempotent. - Wrap __del__ in try/except Exception so cleanup never raises during interpreter shutdown. - Add the embedded etcd process to the check_connectivity() poll loop so an etcd death fails fast instead of waiting out the 10-minute timeout.
|
Assigning reviewers: R: @damccorm for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
damccorm
left a comment
There was a problem hiding this comment.
Thanks for taking this forward! I think we need a running integration test to feel good about merging given the constraints I ran into in the original PR, but this makes good progress
| // The embedded mode was validated end-to-end on a T4 VM with | ||
| // Qwen/Qwen3-0.6B via DirectRunner; this change is scoped to the |
There was a problem hiding this comment.
Given that this was validated locally with a T4, why do we need an L4?
We should try running this on Dataflow which has built in accelerator support; that way we don't need to provision our own pool
There was a problem hiding this comment.
Resolved in 6f8849169b2 — now uncommented and uses nvidia-tesla-t4 (no separate pool) per your suggestion. Validation details in the top-level reply.
Bump vllm.dockerfile.old to apache-beam[gcp]==2.71.0 (and the COPY-from beam_python3.12_sdk image to 2.71.0), install ai-dynamo[vllm], and add the etcd binary required by embedded Dynamo's runtime discovery. Uncomment the Dynamo IT block in common.gradle. Drop the unused machine_type override so it inherits n1-standard-4 from argMap, and switch nvidia-l4 -> nvidia-tesla-t4 to match the existing native vLLM ITs and the local Dataflow validation (per @damccorm review). Validated end-to-end on Dataflow with Qwen/Qwen3-0.6B; the nvext.timing field present on every PredictionResult confirms the Dynamo frontend served the requests.
|
Thanks Danny. Pushed Validated end-to-end on Dataflow against an image built from the updated dockerfile, with Ask: when convenient, please rebuild |
Summary
Adds opt-in support for NVIDIA Dynamo (
ai-dynamo[vllm]) as the underlying engine forVLLMCompletionsModelHandlerandVLLMChatModelHandler. Whenuse_dynamo=True, the handler launches adynamo.frontendprocess as the OpenAI-compatible local endpoint and a separatedynamo.vllmworker, instead ofvllm.entrypoints.openai.api_server. Existing native-vLLM behavior is unchanged when the flag is absent.This supersedes #36966 (now closed) and rebases the same approach onto current master, preserving the recent batching-kwargs additions to the ModelHandler base.
Embedded mode scope and limitations
This change adds embedded, single-worker Dynamo — Beam launches one Dynamo frontend + one vLLM worker per Beam worker, in-process. The following Dynamo features are not active in embedded mode:
--router-mode round-robinand--no-router-kv-events).Embedded Dynamo also requires an etcd-style discovery service. When
ETCD_ENDPOINTSis unset, Beam starts a localetcdprocess (requires theetcdbinary in the worker container); when set, Beam uses the external discovery service.API additions
VLLMCompletionsModelHandlerandVLLMChatModelHandlergain two keyword-only parameters:use_dynamo: bool = False— opt in to Dynamo.dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None— extra kwargs forwarded todynamo.frontend.When
use_dynamo=True, the existingvllm_server_kwargsare forwarded todynamo.vllminstead ofvllm.entrypoints.openai.api_server. Sensible Dynamo defaults are layered in so users only needuse_dynamo=Truefor a working setup.validate_inference_argsis now a no-op on both handlers, so OpenAI-style request kwargs (e.g.max_tokens) can be passed viaRunInference(model_handler, inference_args={...}).Example pipeline
apache_beam/examples/inference/vllm_text_completion.pygains--use_dynamoand--max_tokensflags. Without--use_dynamo, behavior is unchanged from current master.Validation
vllm_inference_test.pycovers (a) native vLLM still launches a single server process; (b) Dynamo launches two processes with separate frontend/engine kwargs; (c)validate_inference_argsaccepts OpenAI request kwargs.Qwen/Qwen3-0.6Brunning throughai-dynamo[vllm]0.7.0 + the embedded etcd path.Not in this change (deliberate follow-ups)
sdks/python/test-suites/dataflow/common.gradledocumenting how to enable it. Enabling requires bumpingvllm.dockerfile.oldto installetcd+ai-dynamo[vllm]and provisioning an L4 (or larger) GPU pool. The existing two vLLM Dataflow ITs are unchanged.vllm.dockerfile.oldis intentionally left alone. Bumping the Beam version and addingai-dynamo[vllm]to the existing 2.58.1-based image has non-trivial dependency-resolution risk and should be handled alongside enabling the Dataflow IT.Credits
Builds on the
users/damccorm/dynamobranch and PR #36966. The runtime/API design here mirrors that work, rebased onto current master and scoped to embedded mode with explicit defaults.