Skip to content

Add embedded NVIDIA Dynamo support to vLLM ModelHandler#38701

Open
akshayjadiyanv wants to merge 3 commits into
apache:masterfrom
akshayjadiyanv:add-vllm-dynamo-support
Open

Add embedded NVIDIA Dynamo support to vLLM ModelHandler#38701
akshayjadiyanv wants to merge 3 commits into
apache:masterfrom
akshayjadiyanv:add-vllm-dynamo-support

Conversation

@akshayjadiyanv
Copy link
Copy Markdown

Summary

Adds opt-in support for NVIDIA Dynamo (ai-dynamo[vllm]) as the underlying engine for VLLMCompletionsModelHandler and VLLMChatModelHandler. When use_dynamo=True, the handler launches a dynamo.frontend process as the OpenAI-compatible local endpoint and a separate dynamo.vllm worker, instead of vllm.entrypoints.openai.api_server. Existing native-vLLM behavior is unchanged when the flag is absent.

This supersedes #36966 (now closed) and rebases the same approach onto current master, preserving the recent batching-kwargs additions to the ModelHandler base.

Embedded mode scope and limitations

This change adds embedded, single-worker Dynamo — Beam launches one Dynamo frontend + one vLLM worker per Beam worker, in-process. The following Dynamo features are not active in embedded mode:

  • KV-aware routing (defaults to --router-mode round-robin and --no-router-kv-events).
  • Disaggregated prefill / decode workers.
  • KVBM offload across nodes.
  • The Dynamo Planner (autoscaling) and Grove orchestration.

Embedded Dynamo also requires an etcd-style discovery service. When ETCD_ENDPOINTS is unset, Beam starts a local etcd process (requires the etcd binary in the worker container); when set, Beam uses the external discovery service.

API additions

VLLMCompletionsModelHandler and VLLMChatModelHandler gain two keyword-only parameters:

  • use_dynamo: bool = False — opt in to Dynamo.
  • dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None — extra kwargs forwarded to dynamo.frontend.

When use_dynamo=True, the existing vllm_server_kwargs are forwarded to dynamo.vllm instead of vllm.entrypoints.openai.api_server. Sensible Dynamo defaults are layered in so users only need use_dynamo=True for a working setup.

validate_inference_args is now a no-op on both handlers, so OpenAI-style request kwargs (e.g. max_tokens) can be passed via RunInference(model_handler, inference_args={...}).

Example pipeline

apache_beam/examples/inference/vllm_text_completion.py gains --use_dynamo and --max_tokens flags. Without --use_dynamo, behavior is unchanged from current master.

Validation

  • Unit tests: new vllm_inference_test.py covers (a) native vLLM still launches a single server process; (b) Dynamo launches two processes with separate frontend/engine kwargs; (c) validate_inference_args accepts OpenAI request kwargs.
  • End-to-end smoke: validated the runtime code on a GCP T4 VM (DirectRunner) with Qwen/Qwen3-0.6B running through ai-dynamo[vllm] 0.7.0 + the embedded etcd path.
  • Lint: yapf, pylint (10.00/10), flake8, isort all clean for the touched files.

Not in this change (deliberate follow-ups)

  • Dataflow IT for the Dynamo path: a commented-out block is included in sdks/python/test-suites/dataflow/common.gradle documenting how to enable it. Enabling requires bumping vllm.dockerfile.old to install etcd + ai-dynamo[vllm] and provisioning an L4 (or larger) GPU pool. The existing two vLLM Dataflow ITs are unchanged.
  • Dockerfile changes: vllm.dockerfile.old is intentionally left alone. Bumping the Beam version and adding ai-dynamo[vllm] to the existing 2.58.1-based image has non-trivial dependency-resolution risk and should be handled alongside enabling the Dataflow IT.

Credits

Builds on the users/damccorm/dynamo branch and PR #36966. The runtime/API design here mirrors that work, rebased onto current master and scoped to embedded mode with explicit defaults.

VLLMCompletionsModelHandler and VLLMChatModelHandler gain two
keyword-only parameters, use_dynamo (default False) and
dynamo_frontend_kwargs. When use_dynamo=True, the handler launches a
dynamo.frontend process as the OpenAI-compatible local endpoint plus a
separate dynamo.vllm worker, instead of vllm.entrypoints.openai.api_server.
The existing native-vLLM path is unchanged when the flag is absent.

The example pipeline vllm_text_completion.py gains --use_dynamo and
--max_tokens flags. validate_inference_args is now a no-op on both
handlers so OpenAI-style request kwargs (e.g. max_tokens) can be passed
through RunInference. A new unit-test module covers process-launch
behaviour for both paths.

This supersedes apache#36966 (now closed) and rebases the embedded-Dynamo
approach onto current master, preserving the recent batching-kwargs
additions to the ModelHandler base.

Co-authored-by: Danny McCormick <damccorm@google.com>
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enables embedded NVIDIA Dynamo support within the Apache Beam vLLM ModelHandler. By allowing the execution of Dynamo frontend and worker processes in-process, it provides a streamlined way to leverage Dynamo's capabilities for local inference tasks. The implementation includes robust process management for the required etcd discovery service and ensures backward compatibility with existing native vLLM workflows.

Highlights

  • Embedded NVIDIA Dynamo Support: Introduced an opt-in embedded mode for NVIDIA Dynamo in VLLMModelHandler, allowing users to run Dynamo frontend and vLLM worker processes within the same Beam worker.
  • Discovery Service Integration: Added automatic local etcd management for embedded Dynamo mode, with support for external ETCD_ENDPOINTS configuration.
  • API and Example Updates: Updated VLLMCompletionsModelHandler and VLLMChatModelHandler with new parameters (use_dynamo, dynamo_frontend_kwargs) and added support for OpenAI-style request arguments.
  • Testing and Validation: Added comprehensive unit tests in vllm_inference_test.py to verify process orchestration and argument handling, and documented future Dataflow integration steps.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for running NVIDIA Dynamo as an embedded vLLM engine within Apache Beam's vLLM inference handlers, including managing a local etcd process when external endpoints are not configured. The review feedback focuses on improving the robustness of process management and resource cleanup within _VLLMModelServer. Specifically, the reviewer recommends tracking and cleaning up the temporary etcd data directory to prevent disk leaks, wrapping process termination in try...except OSError blocks to avoid crashes, importing modules locally during interpreter shutdown, and adding the etcd process to the connectivity polling loop for fail-fast behavior.

Comment on lines 152 to +162
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
self._dynamo_process = None
self._etcd_process = None
self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
self._use_dynamo = use_dynamo
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Initialize self._etcd_data_dir to None in __init__ to track the temporary etcd data directory for proper cleanup.

Suggested change
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
self._dynamo_process = None
self._etcd_process = None
self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
self._use_dynamo = use_dynamo
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
self._dynamo_process = None
self._etcd_process = None
self._etcd_data_dir = None
self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
self._use_dynamo = use_dynamo

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 329dd28self._etcd_data_dir: Optional[str] = None added next to the other process handles in __init__.

Comment on lines +166 to +175
@staticmethod
def _stop_process(process: Optional[subprocess.Popen]) -> None:
if process is None or process.poll() is not None:
return
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wrap the process termination sequence in a try...except OSError block. If a process exits concurrently between the poll() check and terminate()/kill(), calling these methods can raise an OSError (such as ProcessLookupError), which would crash the cleanup sequence and leave other processes running.

Suggested change
@staticmethod
def _stop_process(process: Optional[subprocess.Popen]) -> None:
if process is None or process.poll() is not None:
return
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
@staticmethod
def _stop_process(process: Optional[subprocess.Popen]) -> None:
if process is None or process.poll() is not None:
return
try:
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
except OSError:
pass

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 329dd28 — the terminate / wait / kill calls are now wrapped in a single try/except OSError so a ProcessLookupError (or any other OS-level race) is swallowed instead of aborting the rest of _stop_processes.

Comment on lines +177 to +189
def _stop_processes(self) -> None:
self._stop_process(self._dynamo_process)
self._stop_process(self._server_process)
self._stop_process(self._etcd_process)
if (self._managed_etcd_endpoint is not None and
os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
del os.environ['ETCD_ENDPOINTS']
self._dynamo_process = None
self._server_process = None
self._etcd_process = None
self._managed_etcd_endpoint = None
self._server_started = False
self._server_port = -1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent disk space leaks in /tmp from abandoned etcd data directories, and to avoid potential AttributeError or NameError during interpreter shutdown (when global modules like os or shutil might be garbage-collected), we should:

  1. Clean up the temporary etcd data directory using shutil.rmtree in _stop_processes.
  2. Import os and shutil locally inside _stop_processes to ensure they are available during interpreter shutdown.
  3. Use local_os.environ.pop to safely remove the environment variable without raising a KeyError.
  def _stop_processes(self) -> None:
    import os as local_os
    import shutil as local_shutil
    self._stop_process(self._dynamo_process)
    self._stop_process(self._server_process)
    self._stop_process(self._etcd_process)
    if (self._managed_etcd_endpoint is not None and
        local_os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
      local_os.environ.pop('ETCD_ENDPOINTS', None)
    if getattr(self, '_etcd_data_dir', None):
      local_shutil.rmtree(self._etcd_data_dir, ignore_errors=True)
      self._etcd_data_dir = None
    self._dynamo_process = None
    self._server_process = None
    self._etcd_process = None
    self._managed_etcd_endpoint = None
    self._server_started = False
    self._server_port = -1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly accepted in 329dd28:

  • shutil.rmtree(self._etcd_data_dir, ignore_errors=True) is now called from _stop_processes, so worker restarts no longer leak /tmp/beam-dynamo-etcd-* dirs.
  • del os.environ['ETCD_ENDPOINTS'] was switched to os.environ.pop('ETCD_ENDPOINTS', None) so the cleanup is idempotent.

I skipped the local_os/local_shutil rebinding inside _stop_processes. The shutdown-safety concern is real, but os/shutil are already used unguarded throughout the rest of this class (_uses_embedded_etcd, _ensure_etcd, _wait_for_etcd), so threading the local-imports pattern through only _stop_processes is inconsistent and a bit hard to read. Instead I wrapped __del__ itself in try/except Exception — that gives the same finalizer-during-shutdown guarantee with a single guard and zero churn elsewhere.

Comment on lines +240 to +261
etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
etcd_data_dir = f'/tmp/{etcd_name}'
peer_port, = subprocess_server.pick_port(None)
etcd_cmd = [
'etcd',
'--name',
etcd_name,
'--listen-client-urls',
'http://127.0.0.1:{{PORT}}',
'--advertise-client-urls',
'http://127.0.0.1:{{PORT}}',
'--listen-peer-urls',
f'http://127.0.0.1:{peer_port}',
'--initial-advertise-peer-urls',
f'http://127.0.0.1:{peer_port}',
'--initial-cluster',
f'{etcd_name}=http://127.0.0.1:{peer_port}',
'--data-dir',
etcd_data_dir,
'--log-level',
'warn',
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Store the temporary etcd data directory path in self._etcd_data_dir instead of a local variable so that it can be cleaned up when stopping the processes.

    etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
    self._etcd_data_dir = f'/tmp/{etcd_name}'
    peer_port, = subprocess_server.pick_port(None)
    etcd_cmd = [
        'etcd',
        '--name',
        etcd_name,
        '--listen-client-urls',
        'http://127.0.0.1:{{PORT}}',
        '--advertise-client-urls',
        'http://127.0.0.1:{{PORT}}',
        '--listen-peer-urls',
        f'http://127.0.0.1:{peer_port}',
        '--initial-advertise-peer-urls',
        f'http://127.0.0.1:{peer_port}',
        '--initial-cluster',
        f'{etcd_name}=http://127.0.0.1:{peer_port}',
        '--data-dir',
        self._etcd_data_dir,
        '--log-level',
        'warn',
    ]

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 329dd28_ensure_etcd now writes the temp directory to self._etcd_data_dir and _stop_processes is the one place that owns the rmtree.

Comment on lines +318 to +321
while (time.time() - start_time < timeout_secs and
self._server_process.poll() is None and
(self._dynamo_process is None or
self._dynamo_process.poll() is None)):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a check for self._etcd_process in the connectivity polling loop. If the embedded etcd process dies, the frontend and engine won't be able to communicate, and the server will fail. Checking it here allows the loop to fail-fast instead of waiting for the full 10-minute timeout.

      while (time.time() - start_time < timeout_secs and
             self._server_process.poll() is None and
             (self._dynamo_process is None or
              self._dynamo_process.poll() is None) and
             (self._etcd_process is None or
              self._etcd_process.poll() is None)):

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 329dd28 — the check_connectivity loop now also exits early if self._etcd_process.poll() is not None, so a dead embedded etcd fails fast instead of waiting out the 10-minute timeout_secs.

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Apply five robustness fixes flagged on PR apache#38701:

- Track the temporary etcd data dir as self._etcd_data_dir and
  shutil.rmtree(..., ignore_errors=True) it in _stop_processes so worker
  restarts don't leak /tmp directories.
- Wrap process.terminate() / process.wait() / process.kill() in a single
  try/except OSError to absorb the ProcessLookupError race when a process
  exits between poll() and the signal call.
- Switch the ETCD_ENDPOINTS removal from `del os.environ[...]` to
  `os.environ.pop(..., None)` to be idempotent.
- Wrap __del__ in try/except Exception so cleanup never raises during
  interpreter shutdown.
- Add the embedded etcd process to the check_connectivity() poll loop so
  an etcd death fails fast instead of waiting out the 10-minute timeout.
@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @damccorm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Copy Markdown
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this forward! I think we need a running integration test to feel good about merging given the constraints I ran into in the original PR, but this makes good progress

Comment on lines +487 to +488
// The embedded mode was validated end-to-end on a T4 VM with
// Qwen/Qwen3-0.6B via DirectRunner; this change is scoped to the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this was validated locally with a T4, why do we need an L4?

We should try running this on Dataflow which has built in accelerator support; that way we don't need to provision our own pool

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in 6f8849169b2 — now uncommented and uses nvidia-tesla-t4 (no separate pool) per your suggestion. Validation details in the top-level reply.

Bump vllm.dockerfile.old to apache-beam[gcp]==2.71.0 (and the
COPY-from beam_python3.12_sdk image to 2.71.0), install
ai-dynamo[vllm], and add the etcd binary required by embedded
Dynamo's runtime discovery.

Uncomment the Dynamo IT block in common.gradle. Drop the unused
machine_type override so it inherits n1-standard-4 from argMap,
and switch nvidia-l4 -> nvidia-tesla-t4 to match the existing
native vLLM ITs and the local Dataflow validation (per @damccorm
review).

Validated end-to-end on Dataflow with Qwen/Qwen3-0.6B; the
nvext.timing field present on every PredictionResult confirms the
Dynamo frontend served the requests.
@akshayjadiyanv
Copy link
Copy Markdown
Author

Thanks Danny. Pushed 6f8849169b2: vllm.dockerfile.old is bumped to apache-beam[gcp]==2.71.0 + matching COPY source, with ai-dynamo[vllm] and etcd v3.5.13 added. common.gradle's Dynamo IT is uncommented; it now inherits n1-standard-4 from argMap (like the existing vLLM ITs) and uses nvidia-tesla-t4 per your suggestion. No separate GPU pool — per-job worker_accelerator experiment.

Validated end-to-end on Dataflow against an image built from the updated dockerfile, with --sdk_container_image=<my-image>. Job 2026-05-27_10_43_15-18102219387511428172 finished JOB_STATE_DONE with 5 coherent Qwen3-0.6B completions in GCS. The nvext.timing field on every Completion confirms the Dynamo frontend served the request (vanilla vLLM's OpenAI server doesn't emit it). Worker logs confirm Tesla T4 attached.

Ask: when convenient, please rebuild us.gcr.io/apache-beam-testing/python-postcommit-it/vllm:latest from the updated dockerfile so the new Dynamo IT in common.gradle (which reuses that tag) can actually run in apache-beam-testing. Happy to coordinate on a staged tag first if you'd prefer.

@akshayjadiyanv akshayjadiyanv requested a review from damccorm May 28, 2026 15:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants