diff --git a/source/isaaclab_ov/changelog.d/ovrtx-heterogeneous-cloning.rst b/source/isaaclab_ov/changelog.d/ovrtx-heterogeneous-cloning.rst new file mode 100644 index 00000000000..3d10587434e --- /dev/null +++ b/source/isaaclab_ov/changelog.d/ovrtx-heterogeneous-cloning.rst @@ -0,0 +1,8 @@ +Changed +^^^^^^^ + +* Extended the :attr:`~isaaclab_ov.renderers.OVRTXRendererCfg.use_ovrtx_cloning` path to support + heterogeneous scenes as well as homogeneous ones. :meth:`~isaaclab_ov.renderers.OVRTXRenderer.prepare_stage` + now exports only :class:`~isaaclab.cloner.ClonePlan` source prototypes plus global stage metadata, and + replication uses per-row ``clone_usd`` calls from the published plan instead of cloning only + ``/World/envs/env_0``. diff --git a/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer.py b/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer.py index 9d31953a521..4214b7f3764 100644 --- a/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer.py +++ b/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer.py @@ -22,6 +22,7 @@ import logging import math import os +from itertools import compress from pathlib import Path from typing import TYPE_CHECKING, Any @@ -41,6 +42,7 @@ from ovrtx import Device, PrimMode, Renderer, RendererConfig, Semantic +from isaaclab.cloner.clone_plan import ClonePlan from isaaclab.renderers import BaseRenderer, RenderBufferKind, RenderBufferSpec from isaaclab.sim import SimulationContext from isaaclab.utils.warp.warp_math import convert_camera_frame_orientation_convention_wp @@ -170,14 +172,7 @@ def __init__(self, cfg: OVRTXRendererCfg): self._exported_usd_string: str | None = None self._camera_rel_path: str | None = None self._output_semantic_color_buffer: wp.array | None = None - - self._use_ovrtx_cloning = self.cfg.use_ovrtx_cloning - - if self._use_ovrtx_cloning: - clone_plan = SimulationContext.instance().get_clone_plan() - if clone_plan and not clone_plan.clone_mask.all().item(): - logger.warning("OVRTX cloning disabled because the simulation uses a heterogeneous env setup") - self._use_ovrtx_cloning = False + self._clone_plan: ClonePlan | None = None logger.info("Creating OVRTX renderer...") OVRTX_CONFIG = RendererConfig( @@ -213,19 +208,70 @@ def prepare_cameras(self, stage: Any, spec: CameraRenderSpec) -> None: return apply_rtx_exposure_overrides(stage, list(spec.camera_prim_paths)) + @staticmethod + def _resolve_clone_plan(num_envs: int) -> ClonePlan: + """Resolve clone plan for local use. + + If no clone plan is published by the scene or it has no active rows, returns a homogeneous clone plan. + If the clone plan has some inactive rows, returns a copy of the published clone plan with only the active rows. + If the clone plan has all active rows, returns the published clone plan (shallow copy). + """ + + def _create_homogeneous_clone_plan(num_envs: int) -> ClonePlan: + """Homogeneous fallback plan: replicate ``env_0`` to every environment.""" + return ClonePlan( + sources=("/World/envs/env_0",), + destinations=("/World/envs/env_{}",), + clone_mask=torch.ones((1, num_envs), dtype=torch.bool, device="cpu"), + ) + + published_clone_plan = SimulationContext.instance().get_clone_plan() + + if published_clone_plan is None: + logger.warning("No clone plan is published by the scene; returning homogeneous clone plan") + return _create_homogeneous_clone_plan(num_envs) + + active_rows = published_clone_plan.clone_mask.any(dim=1) + + # If no rows are active, return a homogeneous clone plan. + if not active_rows.any(): + logger.warning("Clone plan has no active rows, returning homogeneous clone plan") + return _create_homogeneous_clone_plan(num_envs) + + # If some rows are inactive, return a copy of the published clone plan with only the active rows. + if not active_rows.all(): + logger.warning("Clone plan has some inactive rows; returning a copy with only active rows") + active = active_rows.tolist() + return ClonePlan( + sources=tuple(compress(published_clone_plan.sources, active)), + destinations=tuple(compress(published_clone_plan.destinations, active)), + clone_mask=published_clone_plan.clone_mask[active_rows], + ) + + # If all rows are active, return the published clone plan (shallow copy). + return published_clone_plan + def prepare_stage(self, stage: Any, num_envs: int) -> None: """Prepare the USD stage for OVRTX before :meth:`create_render_data`. - Adds cloning attributes and exports the stage to a string held on the renderer until + Adds scene partition attributes and exports the stage to a string held on the renderer until :meth:`create_render_data` is called. """ if stage is None: return - logger.info("Preparing stage for export (%d envs, cloning=%s)...", num_envs, self._use_ovrtx_cloning) - create_scene_partition_attributes(stage, num_envs, self._use_ovrtx_cloning) + logger.info("Preparing stage (%d envs, use_ovrtx_cloning=%s)...", num_envs, self.cfg.use_ovrtx_cloning) + create_scene_partition_attributes(stage, num_envs) + + if self.cfg.use_ovrtx_cloning: + self._clone_plan = self._resolve_clone_plan(num_envs) + assert self._clone_plan is not None, "Clone plan is required when OVRTX cloning is enabled" + source_paths = self._clone_plan.sources + else: + self._clone_plan = None + source_paths = () - self._exported_usd_string = export_stage_to_string(stage, num_envs, self._use_ovrtx_cloning) + self._exported_usd_string = export_stage_to_string(stage, num_envs, self.cfg.use_ovrtx_cloning, source_paths) def _initialize_from_spec(self, spec: CameraRenderSpec): """Initialize the OVRTX renderer with internal environment cloning. @@ -279,9 +325,9 @@ def _initialize_from_spec(self, spec: CameraRenderSpec): logger.exception("Error loading USD: %s", e) raise - if self._use_ovrtx_cloning and num_envs > 1: + if self.cfg.use_ovrtx_cloning and num_envs > 1: logger.info("Using OVRTX internal cloning") - self._clone_environments_in_ovrtx(num_envs) + self._clone_sources_in_ovrtx(num_envs) self._update_scene_partitions_after_clone(num_envs) self._initialized_scene = True @@ -311,17 +357,37 @@ def _initialize_from_spec(self, spec: CameraRenderSpec): self._setup_object_bindings() - def _clone_environments_in_ovrtx(self, num_envs: int): - """Clone base environment (env_0) to all other environments using OvRTX.""" - logger.info("Cloning base environment to %d targets...", num_envs - 1) - source_path = "/World/envs/env_0" - target_paths = [f"/World/envs/env_{i}" for i in range(1, num_envs)] - try: - self._renderer.clone_usd(source_path, target_paths) - logger.info("Cloned %d environments successfully", len(target_paths)) - except Exception as e: - logger.error("Failed to clone environments: %s", e) - raise RuntimeError(f"OvRTX environment cloning failed: {e}") + def _clone_sources_in_ovrtx(self, num_envs: int): + """Clone sources in OVRTX using the scene :class:`~isaaclab.cloner.ClonePlan`.""" + logger.info("Cloning sources in OVRTX...") + + clone_plan = self._clone_plan + assert clone_plan is not None, "Clone plan is required when OVRTX cloning is enabled" + + num_envs = clone_plan.clone_mask.shape[1] + env_ids = torch.arange(num_envs, dtype=torch.int32, device=clone_plan.clone_mask.device) + + num_cloned_sources = 0 + + for i, (source, destination) in enumerate(zip(clone_plan.sources, clone_plan.destinations, strict=True)): + target_env_ids = env_ids[clone_plan.clone_mask[i]].tolist() + + target_paths = [] + for env_id in target_env_ids: + resolved_destination = destination.format(env_id) + if resolved_destination != source: + target_paths.append(resolved_destination) + + if target_paths: + logger.debug("Cloning row %d: %s -> %d target(s)", i, source, len(target_paths)) + try: + self._renderer.clone_usd(source, target_paths) + num_cloned_sources += 1 + except Exception as e: + logger.error("Failed to clone row %d from %s: %s", i, source, e) + raise RuntimeError(f"Cloning failed for source '{source}': {e}") + + logger.info("Cloned %d sources successfully", num_cloned_sources) def _update_scene_partitions_after_clone(self, num_envs: int): """Update scene partition attributes on cloned environments and cameras in OvRTX.""" diff --git a/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer_cfg.py b/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer_cfg.py index f60affcdc77..018c198f163 100644 --- a/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer_cfg.py +++ b/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_renderer_cfg.py @@ -32,12 +32,9 @@ class OVRTXRendererCfg(RendererCfg): """ use_ovrtx_cloning: bool = True - """When True, export only env_0 and use OVRTX ``clone_usd``. When False, export full multi-environment stage. + """When True, export cloning sources and use OVRTX ``clone_usd``. When False, export full multi-environment stage. OVRTX cloning is only supported in OVRTX 0.3.0 or newer. - - If the simulation uses a heterogeneous env setup, the renderer disables this path and exports the full - multi-environment stage instead (same effect as setting this to ``False`` for that run). """ log_level: str = "verbose" diff --git a/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_usd.py b/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_usd.py index 1557349cd6b..3a6c8350a32 100644 --- a/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_usd.py +++ b/source/isaaclab_ov/isaaclab_ov/renderers/ovrtx_usd.py @@ -5,8 +5,12 @@ """USD manipulation for OVRTX: Render scope building, camera injection, and stage prim activation.""" +from __future__ import annotations + import logging import math +import re +from collections.abc import Callable from pxr import Sdf, Usd, UsdGeom @@ -178,24 +182,17 @@ def build_render_product_as_string( def create_scene_partition_attributes( stage, num_envs: int = 1, - use_ovrtx_cloning: bool = True, ) -> None: """Create scene partition attributes for env roots and cameras. - If use_ovrtx_cloning is True, only env_0 is exported for OVRTX; env_1..env_{n-1} are deactivated before export. - OVRTX clones env_0 internally and _update_scene_partitions_after_clone sets partition attributes on the clones. - So we only need to set attributes on env_0 here. - Camera prims are discovered by USD type (``UsdGeom.Camera``) rather than by name, so this works regardless of where the camera is placed in the hierarchy. Args: stage: USD stage to modify. num_envs: Number of environments. - use_ovrtx_cloning: Whether OVRTX cloning is enabled. """ - env_indices = [0] if use_ovrtx_cloning else range(num_envs) - for env_idx in env_indices: + for env_idx in range(num_envs): env_path = f"/World/envs/env_{env_idx}" env_prim = stage.GetPrimAtPath(env_path) if not env_prim.IsValid(): @@ -217,38 +214,106 @@ def create_scene_partition_attributes( logger.debug("Set scene partition '%s' on camera '%s'", scene_partition, prim.GetPath()) -def export_stage_to_string(stage, num_envs: int, use_ovrtx_cloning: bool = True) -> str: - """Export the stage to a string; when num_envs > 1, only env_0 is exported for OVRTX cloning. +def _deactivate_child_prims( + prim: Usd.Prim, + source_paths: frozenset[Sdf.Path], + deactivated_prims: list[Usd.Prim], + should_keep_prim: Callable[[Sdf.Path], bool] | None = None, +) -> None: + """Deactivate child prims under ``prim`` that are outside all source prototype subtrees. + + For each child: + + * **Source:** keep the full subtree and stop descending. + * **Ancestor of a source:** recurse to deactivate non-source siblings deeper in the tree. + * **Otherwise:** deactivate the child prim (including descendants). + + Only prims that were active before deactivation are recorded in ``deactivated_prims`` for + reactivation after export. + + Args: + prim: Parent prim whose children are considered. + source_paths: The paths to the cloning sources. + deactivated_prims: Prims deactivated by this call; used to reactivate them after export. + should_keep_prim: Optional predicate on each child path. If provided, a child prim for + which this returns ``True`` is retained (not deactivated and not descended). If not + provided, every child is considered. + """ + for child in list(prim.GetChildren()): + child_path = child.GetPath() + + # If the optional predicate is provided and returns True, keep the prim and stop walking down the tree. + if should_keep_prim is not None and should_keep_prim(child_path): + continue + + # If the child is a source, keep it and stop walking down the tree. + if child_path in source_paths: + continue + + # If the child is an ancestor of some source, recurse to deactivate non-source siblings deeper in the tree. + if any(source.HasPrefix(child_path) for source in source_paths): + _deactivate_child_prims(child, source_paths, deactivated_prims, should_keep_prim) + continue + + # Deactivate the child and record it for reactivation after export. + if child.IsActive(): + child.SetActive(False) + deactivated_prims.append(child) + logger.debug("Deactivated prim: %s", child_path) + + +def export_stage_to_string( + stage, + num_envs: int, + use_ovrtx_cloning: bool = True, + source_paths: tuple[str, ...] = (), +) -> str: + """Export the USD stage as a USDA string for OVRTX loading. - When num_envs > 1, deactivates env_1..env_{num_envs-1} before export and reactivates - them after, so the exported content contains only env_0. The stage is modified in place. + When ``use_ovrtx_cloning`` is disabled or ``num_envs`` is 1, the full stage is exported + unchanged. Otherwise the stage is trimmed so OVRTX receives only the prototype geometry + it will replicate with ``clone_usd``. Args: stage: USD stage to export. - num_envs: Number of environments. - use_ovrtx_cloning: Whether OVRTX cloning is enabled. + num_envs: Number of parallel environments on the stage. + use_ovrtx_cloning: When ``True`` and ``num_envs > 1``, export only clone-plan prototypes; + otherwise export the full stage. + source_paths: The paths to source prims to clone. Required when ``use_ovrtx_cloning`` is ``True``. Returns: - The exported stage as a string. + USDA text of the (possibly trimmed) stage. """ - deactivated_prims = [] - if use_ovrtx_cloning and num_envs > 1: - logger.info("Deactivating %d environment roots...", num_envs - 1) - for env_idx in range(1, num_envs): - env_path = f"/World/envs/env_{env_idx}" - prim = stage.GetPrimAtPath(env_path) - if prim.IsValid() and prim.IsActive(): - prim.SetActive(False) - deactivated_prims.append(prim) - logger.debug("Deactivated environment root: %s", env_path) - - logger.info("Deactivated %d environment roots in total", len(deactivated_prims)) + if not use_ovrtx_cloning or num_envs <= 1: + return stage.ExportToString() + + envs_path = Sdf.Path("/World/envs") + envs_prim = stage.GetPrimAtPath(envs_path) + if not envs_prim.IsValid(): + raise RuntimeError(f"Failed to get prim at path: {envs_path}") + + env_name_pattern = re.compile(r"^env_(\d+)$") + + def _is_env_root(prim_path: Sdf.Path) -> bool: + """Return True if ``prim_path`` is an env root (e.g. ``/World/envs/env_0``).""" + return prim_path.GetParentPath() == envs_path and env_name_pattern.fullmatch(prim_path.name) is not None + + deactivated_prims: list[Usd.Prim] = [] + + _deactivate_child_prims( + envs_prim, + frozenset(map(Sdf.Path, source_paths)), + deactivated_prims, + should_keep_prim=lambda prim_path: not _is_env_root(prim_path), + ) + + logger.info("Deactivated %d prims in total", len(deactivated_prims)) try: return stage.ExportToString() finally: if deactivated_prims: - logger.info("Reactivating %d environment roots...", len(deactivated_prims)) + logger.info("Reactivating %d prims...", len(deactivated_prims)) for prim in deactivated_prims: if prim.IsValid(): prim.SetActive(True)