Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Changed
^^^^^^^

* Extended the :attr:`~isaaclab_ov.renderers.OVRTXRendererCfg.use_ovrtx_cloning` path to support
heterogeneous scenes as well as homogeneous ones. :meth:`~isaaclab_ov.renderers.OVRTXRenderer.prepare_stage`
now exports only :class:`~isaaclab.cloner.ClonePlan` source prototypes plus global stage metadata, and
replication uses per-row ``clone_usd`` calls from the published plan instead of cloning only
``/World/envs/env_0``.
116 changes: 91 additions & 25 deletions source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import math
import os
from itertools import compress
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand All @@ -41,6 +42,7 @@

from ovrtx import Device, PrimMode, Renderer, RendererConfig, Semantic

from isaaclab.cloner.clone_plan import ClonePlan
from isaaclab.renderers import BaseRenderer, RenderBufferKind, RenderBufferSpec
from isaaclab.sim import SimulationContext
from isaaclab.utils.warp.warp_math import convert_camera_frame_orientation_convention_wp
Expand Down Expand Up @@ -170,14 +172,7 @@ def __init__(self, cfg: OVRTXRendererCfg):
self._exported_usd_string: str | None = None
self._camera_rel_path: str | None = None
self._output_semantic_color_buffer: wp.array | None = None

self._use_ovrtx_cloning = self.cfg.use_ovrtx_cloning

if self._use_ovrtx_cloning:
clone_plan = SimulationContext.instance().get_clone_plan()
if clone_plan and not clone_plan.clone_mask.all().item():
logger.warning("OVRTX cloning disabled because the simulation uses a heterogeneous env setup")
self._use_ovrtx_cloning = False
self._clone_plan: ClonePlan | None = None

logger.info("Creating OVRTX renderer...")
OVRTX_CONFIG = RendererConfig(
Expand Down Expand Up @@ -213,19 +208,70 @@ def prepare_cameras(self, stage: Any, spec: CameraRenderSpec) -> None:
return
apply_rtx_exposure_overrides(stage, list(spec.camera_prim_paths))

@staticmethod
def _resolve_clone_plan(num_envs: int) -> ClonePlan:
"""Resolve clone plan for local use.

If no clone plan is published by the scene or it has no active rows, returns a homogeneous clone plan.
If the clone plan has some inactive rows, returns a copy of the published clone plan with only the active rows.
If the clone plan has all active rows, returns the published clone plan (shallow copy).
"""

def _create_homogeneous_clone_plan(num_envs: int) -> ClonePlan:
"""Homogeneous fallback plan: replicate ``env_0`` to every environment."""
return ClonePlan(
sources=("/World/envs/env_0",),
destinations=("/World/envs/env_{}",),
clone_mask=torch.ones((1, num_envs), dtype=torch.bool, device="cpu"),
)

published_clone_plan = SimulationContext.instance().get_clone_plan()

if published_clone_plan is None:
logger.warning("No clone plan is published by the scene; returning homogeneous clone plan")
return _create_homogeneous_clone_plan(num_envs)

active_rows = published_clone_plan.clone_mask.any(dim=1)

# If no rows are active, return a homogeneous clone plan.
if not active_rows.any():
logger.warning("Clone plan has no active rows, returning homogeneous clone plan")
return _create_homogeneous_clone_plan(num_envs)

# If some rows are inactive, return a copy of the published clone plan with only the active rows.
if not active_rows.all():
logger.warning("Clone plan has some inactive rows; returning a copy with only active rows")
active = active_rows.tolist()
return ClonePlan(
sources=tuple(compress(published_clone_plan.sources, active)),
destinations=tuple(compress(published_clone_plan.destinations, active)),
clone_mask=published_clone_plan.clone_mask[active_rows],
)

# If all rows are active, return the published clone plan (shallow copy).
return published_clone_plan

def prepare_stage(self, stage: Any, num_envs: int) -> None:
"""Prepare the USD stage for OVRTX before :meth:`create_render_data`.

Adds cloning attributes and exports the stage to a string held on the renderer until
Adds scene partition attributes and exports the stage to a string held on the renderer until
:meth:`create_render_data` is called.
"""
if stage is None:
return

logger.info("Preparing stage for export (%d envs, cloning=%s)...", num_envs, self._use_ovrtx_cloning)
create_scene_partition_attributes(stage, num_envs, self._use_ovrtx_cloning)
logger.info("Preparing stage (%d envs, use_ovrtx_cloning=%s)...", num_envs, self.cfg.use_ovrtx_cloning)
create_scene_partition_attributes(stage, num_envs)

if self.cfg.use_ovrtx_cloning:
self._clone_plan = self._resolve_clone_plan(num_envs)
assert self._clone_plan is not None, "Clone plan is required when OVRTX cloning is enabled"
Copy link
Copy Markdown
Collaborator

@pbarejko pbarejko May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this runtime error please? Throwing an exception is more Pythonic way of handling runtime errors. Placing assertion here you essentially delay error downstream.

Python skips assertions when run in optimized mode -O.

source_paths = self._clone_plan.sources
else:
self._clone_plan = None
source_paths = ()

self._exported_usd_string = export_stage_to_string(stage, num_envs, self._use_ovrtx_cloning)
self._exported_usd_string = export_stage_to_string(stage, num_envs, self.cfg.use_ovrtx_cloning, source_paths)

def _initialize_from_spec(self, spec: CameraRenderSpec):
"""Initialize the OVRTX renderer with internal environment cloning.
Expand Down Expand Up @@ -279,9 +325,9 @@ def _initialize_from_spec(self, spec: CameraRenderSpec):
logger.exception("Error loading USD: %s", e)
raise

if self._use_ovrtx_cloning and num_envs > 1:
if self.cfg.use_ovrtx_cloning and num_envs > 1:
logger.info("Using OVRTX internal cloning")
self._clone_environments_in_ovrtx(num_envs)
self._clone_sources_in_ovrtx(num_envs)
self._update_scene_partitions_after_clone(num_envs)

self._initialized_scene = True
Expand Down Expand Up @@ -311,17 +357,37 @@ def _initialize_from_spec(self, spec: CameraRenderSpec):

self._setup_object_bindings()

def _clone_environments_in_ovrtx(self, num_envs: int):
"""Clone base environment (env_0) to all other environments using OvRTX."""
logger.info("Cloning base environment to %d targets...", num_envs - 1)
source_path = "/World/envs/env_0"
target_paths = [f"/World/envs/env_{i}" for i in range(1, num_envs)]
try:
self._renderer.clone_usd(source_path, target_paths)
logger.info("Cloned %d environments successfully", len(target_paths))
except Exception as e:
logger.error("Failed to clone environments: %s", e)
raise RuntimeError(f"OvRTX environment cloning failed: {e}")
def _clone_sources_in_ovrtx(self, num_envs: int):
"""Clone sources in OVRTX using the scene :class:`~isaaclab.cloner.ClonePlan`."""
logger.info("Cloning sources in OVRTX...")

clone_plan = self._clone_plan
assert clone_plan is not None, "Clone plan is required when OVRTX cloning is enabled"

num_envs = clone_plan.clone_mask.shape[1]
env_ids = torch.arange(num_envs, dtype=torch.int32, device=clone_plan.clone_mask.device)

num_cloned_sources = 0

for i, (source, destination) in enumerate(zip(clone_plan.sources, clone_plan.destinations, strict=True)):
target_env_ids = env_ids[clone_plan.clone_mask[i]].tolist()

target_paths = []
for env_id in target_env_ids:
resolved_destination = destination.format(env_id)
if resolved_destination != source:
target_paths.append(resolved_destination)

if target_paths:
logger.debug("Cloning row %d: %s -> %d target(s)", i, source, len(target_paths))
try:
self._renderer.clone_usd(source, target_paths)
num_cloned_sources += 1
except Exception as e:
logger.error("Failed to clone row %d from %s: %s", i, source, e)
raise RuntimeError(f"Cloning failed for source '{source}': {e}")

logger.info("Cloned %d sources successfully", num_cloned_sources)

def _update_scene_partitions_after_clone(self, num_envs: int):
"""Update scene partition attributes on cloned environments and cameras in OvRTX."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ class OVRTXRendererCfg(RendererCfg):
"""

use_ovrtx_cloning: bool = True
"""When True, export only env_0 and use OVRTX ``clone_usd``. When False, export full multi-environment stage.
"""When True, export cloning sources and use OVRTX ``clone_usd``. When False, export full multi-environment stage.

OVRTX cloning is only supported in OVRTX 0.3.0 or newer.

If the simulation uses a heterogeneous env setup, the renderer disables this path and exports the full
multi-environment stage instead (same effect as setting this to ``False`` for that run).
"""

log_level: str = "verbose"
Expand Down
121 changes: 93 additions & 28 deletions source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_usd.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

"""USD manipulation for OVRTX: Render scope building, camera injection, and stage prim activation."""

from __future__ import annotations

import logging
import math
import re
from collections.abc import Callable

from pxr import Sdf, Usd, UsdGeom

Expand Down Expand Up @@ -178,24 +182,17 @@ def build_render_product_as_string(
def create_scene_partition_attributes(
stage,
num_envs: int = 1,
use_ovrtx_cloning: bool = True,
) -> None:
"""Create scene partition attributes for env roots and cameras.

If use_ovrtx_cloning is True, only env_0 is exported for OVRTX; env_1..env_{n-1} are deactivated before export.
OVRTX clones env_0 internally and _update_scene_partitions_after_clone sets partition attributes on the clones.
So we only need to set attributes on env_0 here.

Camera prims are discovered by USD type (``UsdGeom.Camera``) rather than by name, so this works regardless of
where the camera is placed in the hierarchy.

Args:
stage: USD stage to modify.
num_envs: Number of environments.
use_ovrtx_cloning: Whether OVRTX cloning is enabled.
"""
env_indices = [0] if use_ovrtx_cloning else range(num_envs)
for env_idx in env_indices:
for env_idx in range(num_envs):
env_path = f"/World/envs/env_{env_idx}"
env_prim = stage.GetPrimAtPath(env_path)
if not env_prim.IsValid():
Expand All @@ -217,38 +214,106 @@ def create_scene_partition_attributes(
logger.debug("Set scene partition '%s' on camera '%s'", scene_partition, prim.GetPath())


def export_stage_to_string(stage, num_envs: int, use_ovrtx_cloning: bool = True) -> str:
"""Export the stage to a string; when num_envs > 1, only env_0 is exported for OVRTX cloning.
def _deactivate_child_prims(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic gets progressively more complex. Should we add unit test for this feature that we properly deactivate child prims?

prim: Usd.Prim,
source_paths: frozenset[Sdf.Path],
deactivated_prims: list[Usd.Prim],
should_keep_prim: Callable[[Sdf.Path], bool] | None = None,
) -> None:
"""Deactivate child prims under ``prim`` that are outside all source prototype subtrees.

For each child:

* **Source:** keep the full subtree and stop descending.
* **Ancestor of a source:** recurse to deactivate non-source siblings deeper in the tree.
* **Otherwise:** deactivate the child prim (including descendants).

Only prims that were active before deactivation are recorded in ``deactivated_prims`` for
reactivation after export.

Args:
prim: Parent prim whose children are considered.
source_paths: The paths to the cloning sources.
deactivated_prims: Prims deactivated by this call; used to reactivate them after export.
should_keep_prim: Optional predicate on each child path. If provided, a child prim for
which this returns ``True`` is retained (not deactivated and not descended). If not
provided, every child is considered.
"""
for child in list(prim.GetChildren()):
child_path = child.GetPath()

# If the optional predicate is provided and returns True, keep the prim and stop walking down the tree.
if should_keep_prim is not None and should_keep_prim(child_path):
continue

# If the child is a source, keep it and stop walking down the tree.
if child_path in source_paths:
continue

# If the child is an ancestor of some source, recurse to deactivate non-source siblings deeper in the tree.
if any(source.HasPrefix(child_path) for source in source_paths):
_deactivate_child_prims(child, source_paths, deactivated_prims, should_keep_prim)
continue

# Deactivate the child and record it for reactivation after export.
if child.IsActive():
child.SetActive(False)
deactivated_prims.append(child)
logger.debug("Deactivated prim: %s", child_path)


def export_stage_to_string(
stage,
num_envs: int,
use_ovrtx_cloning: bool = True,
source_paths: tuple[str, ...] = (),
) -> str:
"""Export the USD stage as a USDA string for OVRTX loading.

When num_envs > 1, deactivates env_1..env_{num_envs-1} before export and reactivates
them after, so the exported content contains only env_0. The stage is modified in place.
When ``use_ovrtx_cloning`` is disabled or ``num_envs`` is 1, the full stage is exported
unchanged. Otherwise the stage is trimmed so OVRTX receives only the prototype geometry
it will replicate with ``clone_usd``.

Args:
stage: USD stage to export.
num_envs: Number of environments.
use_ovrtx_cloning: Whether OVRTX cloning is enabled.
num_envs: Number of parallel environments on the stage.
use_ovrtx_cloning: When ``True`` and ``num_envs > 1``, export only clone-plan prototypes;
otherwise export the full stage.
source_paths: The paths to source prims to clone. Required when ``use_ovrtx_cloning`` is ``True``.

Returns:
The exported stage as a string.
USDA text of the (possibly trimmed) stage.
"""
deactivated_prims = []
if use_ovrtx_cloning and num_envs > 1:
logger.info("Deactivating %d environment roots...", num_envs - 1)
for env_idx in range(1, num_envs):
env_path = f"/World/envs/env_{env_idx}"
prim = stage.GetPrimAtPath(env_path)
if prim.IsValid() and prim.IsActive():
prim.SetActive(False)
deactivated_prims.append(prim)
logger.debug("Deactivated environment root: %s", env_path)

logger.info("Deactivated %d environment roots in total", len(deactivated_prims))
if not use_ovrtx_cloning or num_envs <= 1:
return stage.ExportToString()

envs_path = Sdf.Path("/World/envs")
envs_prim = stage.GetPrimAtPath(envs_path)
if not envs_prim.IsValid():
raise RuntimeError(f"Failed to get prim at path: {envs_path}")

env_name_pattern = re.compile(r"^env_(\d+)$")

def _is_env_root(prim_path: Sdf.Path) -> bool:
"""Return True if ``prim_path`` is an env root (e.g. ``/World/envs/env_0``)."""
return prim_path.GetParentPath() == envs_path and env_name_pattern.fullmatch(prim_path.name) is not None

deactivated_prims: list[Usd.Prim] = []

_deactivate_child_prims(
envs_prim,
frozenset(map(Sdf.Path, source_paths)),
deactivated_prims,
should_keep_prim=lambda prim_path: not _is_env_root(prim_path),
)

logger.info("Deactivated %d prims in total", len(deactivated_prims))

try:
return stage.ExportToString()
finally:
if deactivated_prims:
logger.info("Reactivating %d environment roots...", len(deactivated_prims))
logger.info("Reactivating %d prims...", len(deactivated_prims))
for prim in deactivated_prims:
if prim.IsValid():
prim.SetActive(True)
Loading