Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions docs/source/how-to/cloning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ Clone Plans

For one source row, passing ``sources``, ``destinations``, and ``mask`` by hand is simple.
For heterogeneous scenes, the mapping is easier to build with
:func:`~isaaclab.cloner.make_clone_plan`.
:func:`~isaaclab.cloner.make_clone_plan`, which returns the raw flat components. Composing
those components into a :class:`~isaaclab.cloner.ClonePlan` together with the per-environment
pose buffer is the caller's responsibility — keeping pose authority on the side that owns the
buffer (typically the scene) avoids duplicating tensors.

:class:`~isaaclab.cloner.ClonePlan` stores the same flat contract used by direct cloning:

Expand Down Expand Up @@ -180,7 +183,7 @@ The plan maps those source rows to all environments:

from isaaclab.cloner import make_clone_plan, sequential

plan = make_clone_plan(
sources, destinations, clone_mask = make_clone_plan(
sources=[
[
"/World/envs/env_0/Object",
Expand All @@ -196,12 +199,12 @@ The plan maps those source rows to all environments:

# source row used by env: 0, 1, 2, 0, 1, 2, 0, 1

Direct code can use the plan exactly like the hand-written direct example:
Direct code can use the components exactly like the hand-written direct example:

.. code-block:: python

physx_replicate(stage, plan.sources, plan.destinations, env_ids, plan.clone_mask, device="cuda:0")
usd_replicate(stage, plan.sources, plan.destinations, env_ids, plan.clone_mask)
physx_replicate(stage, sources, destinations, env_ids, clone_mask, device="cuda:0")
usd_replicate(stage, sources, destinations, env_ids, clone_mask)

When variants span multiple groups, such as robot variants and object variants,
``make_clone_plan`` enumerates the Cartesian product of the groups and assigns one
Expand Down
6 changes: 6 additions & 0 deletions source/isaaclab/changelog.d/newton-clone-plan.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Fixed
^^^^^

* Fixed Newton replicated-scene cloning so source clone plans are available
before sensor construction and asset USD replication is skipped for Newton
physics replication.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added
^^^^^

* Added :func:`~isaaclab.utils.string.compile_template_pattern` to compile a slotted template
(with ``{}`` slots expanding to a caller-provided regex fragment) into a regular expression.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added
^^^^^

* Added :func:`~isaaclab.utils.string.strip_templated_prefix` to strip a templated prefix
(with ``{}`` slots matching one path segment each) from a string and return the remainder.
8 changes: 6 additions & 2 deletions source/isaaclab/isaaclab/cloner/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@
__all__ = [
"CloneCfg",
"ClonePlan",
"random",
"sequential",
"cfg_source_path",
"disabled_fabric_change_notifies",
"filter_collisions",
"grid_transforms",
"make_clone_plan",
"path_source_path",
"random",
"sequential",
"usd_replicate",
]

from .clone_plan import ClonePlan
from .cloner_cfg import CloneCfg
from .cloner_strategies import random, sequential
from .cloner_utils import (
cfg_source_path,
disabled_fabric_change_notifies,
filter_collisions,
grid_transforms,
make_clone_plan,
path_source_path,
usd_replicate,
)
153 changes: 144 additions & 9 deletions source/isaaclab/isaaclab/cloner/cloner_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import itertools
import logging
import math
import re
from collections.abc import Iterator, Sequence

import torch
Expand All @@ -21,6 +22,137 @@
logger = logging.getLogger(__name__)


def get_suffix(path_expr: str, destination_template: str) -> str | None:
"""Return the part of ``path_expr`` below a destination template's env-instance root.

The template's ``"{}"`` slot matches exactly one path segment (a concrete id like ``env_3``
or a wildcard like ``env_.*``).

Example:
>>> tmpl = "/World/scenes/{}/Robot"
>>> get_suffix("/World/scenes/env_3/Robot/base", tmpl)
'/base'
>>> get_suffix("/World/scenes/.*/Robot/base", tmpl)
'/base'
>>> get_suffix("/World/scenes/env_3/Robot", tmpl)
''
>>> get_suffix("/World/scenes/env_3/Sensor", tmpl) is None
True
>>> get_suffix("/World/scenes/env_3/RobotArm", tmpl) is None
True
>>> get_suffix("/World/scenes/env_3/sub/Robot/base", tmpl) is None
True
"""
pattern = re.compile(r"[^/]+".join(re.escape(part) for part in destination_template.split("{}")))
match = pattern.match(path_expr)
if match is None:
return None
suffix = path_expr[match.end() :]
return None if suffix and not suffix.startswith("/") else suffix


def resolve_clone_plan_source(path_expr: str, plan: ClonePlan) -> tuple[str, str, str] | None:
"""Resolve a destination path expression to its row's source path, destination glob, and asset suffix.

Finds the rows whose destination template owns ``path_expr`` (same matching
logic as :func:`iter_clone_plan_matches`), OR-merges their
:attr:`~isaaclab.cloner.ClonePlan.clone_mask` rows, and splits the
expression at the row's destination template so the asset-relative suffix is
returned for downstream walks.

Args:
path_expr: Destination-side path expression (e.g., a sensor's ``prim_path``,
with ``.*`` env wildcard).
plan: Active clone plan.

Returns:
Three-tuple of ``(source_asset_path, dest_glob_prefix, asset_suffix)``. The
``asset_suffix`` is the part of ``path_expr`` beyond the matching row's
destination template (empty when ``path_expr`` equals the row's template).
Returns ``None`` when ``path_expr`` matches no row in the plan, letting
callers fall back to direct stage resolution (e.g. for sensor frames
mounted at the env root rather than under a planned asset).

Raises:
ValueError: When ``path_expr``'s matching rows span multiple distinct
destination templates.
NotImplementedError: When the union of matching rows' clone masks does not
cover every env (partial-env heterogeneous coverage is unsupported).
"""
matching_template: str | None = None
matching_rows: list[int] = []
matching_suffix: str | None = None
for source_index, destination_template in enumerate(plan.destinations):
if "{}" not in destination_template:
continue
suffix = get_suffix(path_expr, destination_template)
if suffix is None:
continue
if matching_template is None:
matching_template = destination_template
matching_suffix = suffix
elif destination_template != matching_template:
raise ValueError(
f"path_expr {path_expr!r}: matches multiple destination templates"
f" {matching_template!r} and {destination_template!r}."
)
matching_rows.append(source_index)
if matching_template is None:
return None
if not plan.clone_mask[matching_rows].any(dim=0).all():
raise NotImplementedError(
f"path_expr {path_expr!r}: partial-env heterogeneous coverage is unsupported;"
" matching rows must collectively cover all envs."
)
return plan.sources[matching_rows[0]], matching_template.replace("{}", "*"), matching_suffix or ""


def iter_clone_plan_matches(plan: ClonePlan, path_expr: str) -> Iterator[tuple[str, str, str, tuple[int, ...]]]:
"""Yield clone-plan entries whose destinations own a path expression.

Example:
For an entry with source root ``"/World/source/Robot"``, destination
template ``"/World/scenes/{}/Robot"``, and populated env ids
``(0, 2)``, querying ``"/World/scenes/.*/Robot/base"`` yields
``("/World/source/Robot", "/World/scenes/{}/Robot",
"/World/source/Robot/base", (0, 2))``.

Args:
plan: Clone plan to query.
path_expr: Destination prim path or path expression. Expressions are
matched against each clone-plan destination template by treating
the template's ``"{}"`` field as the populated environment slot.

Yields:
Tuples ``(source_root, destination_template, source_path, env_ids)``
for the nearest matching destination root. Multiple source variants
with the same destination root are preserved.
"""
matches: list[tuple[str, str, str, tuple[int, ...]]] = []
for source_index, (source_root, destination_template) in enumerate(zip(plan.sources, plan.destinations)):
if "{}" not in destination_template:
continue

env_ids = tuple(int(i) for i in plan.clone_mask[source_index].nonzero(as_tuple=False).flatten().tolist())
if not env_ids:
continue

source_root = source_root.rstrip("/") or "/"
destination_template = destination_template.rstrip("/") or "/"

suffix = get_suffix(path_expr, destination_template)
if suffix is None:
continue
source_path = source_root + suffix if source_root != "/" else suffix or "/"

matches.append((source_root, destination_template, source_path, env_ids))

matches.sort(key=lambda match: len(match[1].format(match[3][0])), reverse=True)
if matches:
owner_length = len(matches[0][1].format(matches[0][3][0]))
yield from (match for match in matches if len(match[1].format(match[3][0])) == owner_length)


@contextlib.contextmanager
def disabled_fabric_change_notifies(stage: Usd.Stage, *, restore: bool = True) -> Iterator[None]:
"""Suspend the ``IFabricUsd`` USD notice listener for the body of the ``with`` block.
Expand Down Expand Up @@ -104,23 +236,26 @@ def make_clone_plan(
num_clones: int,
clone_strategy: callable,
device: str = "cpu",
) -> ClonePlan:
"""Construct a cloning plan mapping prototype prims to per-environment destinations.
) -> tuple[tuple[str, ...], tuple[str, ...], torch.Tensor]:
"""Compute the flat source/destination/mask components of a clone plan.

The plan enumerates all combinations of prototypes, selects a combination per environment using ``clone_strategy``,
and builds a boolean masking matrix indicating which prototype populates each environment slot.
Enumerates all combinations of prototypes, selects a combination per environment using
``clone_strategy``, and builds the boolean masking matrix that indicates which prototype
populates each environment slot. The caller composes the returned tuple into a
:class:`ClonePlan`.

Args:
sources: Prototype prim paths grouped by asset type (e.g., [[robot_a, robot_b], [obj_x]]).
sources: Prototype prim paths grouped by asset type (e.g., ``[[robot_a, robot_b], [obj_x]]``).
destinations: Destination path templates (one per group) with ``"{}"`` placeholder for env id.
num_clones: Number of environments to populate.
clone_strategy: Function that picks a prototype combo per environment; signature
``clone_strategy(combos: Tensor, num_clones: int, device: str) -> Tensor[num_clones, num_groups]``.
device: Torch device for tensors in the plan. Defaults to ``"cpu"``.
device: Torch device for the returned mask. Defaults to ``"cpu"``.

Returns:
A :class:`ClonePlan` whose ``sources`` and ``destinations`` are flattened per-source rows and
whose ``clone_mask`` is a ``[num_src, num_clones]`` boolean tensor.
A tuple ``(sources, destinations, clone_mask)`` where ``sources`` and ``destinations``
are flattened per-source entries (one entry per prototype) and ``clone_mask`` is a
``[num_src, num_clones]`` boolean tensor on ``device``.
"""
if len(sources) != len(destinations):
raise ValueError(f"Expected one destination per source group, got {len(destinations)} and {len(sources)}.")
Expand Down Expand Up @@ -150,7 +285,7 @@ def make_clone_plan(

masking = torch.zeros((sum(group_sizes), num_clones), dtype=torch.bool, device=device)
masking[rows, cols] = True
return ClonePlan(sources=src, destinations=dest, clone_mask=masking)
return src, dest, masking


def usd_replicate(
Expand Down
42 changes: 16 additions & 26 deletions source/isaaclab/isaaclab/envs/mdp/actions/task_space_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from isaaclab.controllers.operational_space import OperationalSpaceController
from isaaclab.managers.action_manager import ActionTerm
from isaaclab.sensors import ContactSensor, ContactSensorCfg, FrameTransformer, FrameTransformerCfg
from isaaclab.sim.utils import find_matching_prims
from isaaclab.sim.utils.queries import get_all_matching_child_prims, resolve_matching_prims_from_source

if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
Expand Down Expand Up @@ -336,8 +336,21 @@ def __init__(self, cfg: actions_cfg.OperationalSpaceControllerActionCfg, env: Ma
# is provided.
if self.cfg.task_frame_rel_path is not None:
# The source RigidObject can be any child of the articulation asset (we will not use it),
# hence, we will use the first RigidObject child.
root_rigidbody_path = self._first_RigidObject_child_path()
# hence, we will use the first RigidObject descendant.
def has_rigid_body_api(prim) -> bool:
return bool(prim.HasAPI(UsdPhysics.RigidBodyAPI))

matches = resolve_matching_prims_from_source(self._asset.cfg.prim_path)
if not matches:
raise ValueError(f"No prim found at '{self._asset.cfg.prim_path}'.")
asset_prim, root_expr = matches[0]
walk_root = asset_prim.GetPath().pathString
rigid_prims = get_all_matching_child_prims(
walk_root, predicate=has_rigid_body_api, traverse_instance_prims=False
)
if not rigid_prims:
raise ValueError(f"No descendant rigid body found under the expression: '{self._asset.cfg.prim_path}'.")
root_rigidbody_path = root_expr + rigid_prims[0].GetPath().pathString[len(walk_root) :]
task_frame_transformer_path = "/World/envs/env_.*/" + self.cfg.task_frame_rel_path
task_frame_transformer_cfg = FrameTransformerCfg(
prim_path=root_rigidbody_path,
Expand Down Expand Up @@ -560,29 +573,6 @@ def reset(self, env_ids: Sequence[int] | None = None) -> None:

"""

def _first_RigidObject_child_path(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to other comment made, something similar to this helper could be used almost everywhere in the code

"""Finds the first ``RigidObject`` child under the articulation asset.

Raises:
ValueError: If no child ``RigidObject`` is found under the articulation asset.

Returns:
str: The path to the first ``RigidObject`` child under the articulation asset.
"""
child_prims = find_matching_prims(self._asset.cfg.prim_path + "/.*")
rigid_child_prim = None
# Loop through the list and stop at the first RigidObject found
for prim in child_prims:
if prim.HasAPI(UsdPhysics.RigidBodyAPI):
rigid_child_prim = prim
break
if rigid_child_prim is None:
raise ValueError("No child rigid body found under the expression: '{self._asset.cfg.prim_path}'/.")
rigid_child_prim_path = rigid_child_prim.GetPath().pathString
# Remove the specific env index from the path string
rigid_child_prim_path = self._asset.cfg.prim_path + "/" + rigid_child_prim_path.split("/")[-1]
return rigid_child_prim_path

def _resolve_command_indexes(self):
"""Resolves the indexes for the various command elements within the command tensor.

Expand Down
Loading
Loading