10.11. Randomization in Simulation – AMR Navigation
Example of using Isaac Sim and Replicator to capture synthetic data from simulated environments (AMR Navigation).
10.11.1. Learning Objectives
The goal of this tutorial is to demonstrate how to setup an Isaac Sim simulation scenario together with the omni.replicator extension to capture synthetic data using diverse randomization techniques.
After this tutorial you will be able to:
Implement scene randomizations using USD / Isaac Sim APIs:
Randomize poses of assets in the scene
Switch between different background environments
Collect synthetic data at specific simulation events with Replicator
Create and destroy render products on the fly to improve runtime performance
Create and destroy Replicator capture graphs within the same simulation instance
10.11.1.1. Prerequisites
Familiarity with USD / Isaac Sim APIs for scene creation and manipulation.
Familiarity with omni.replicator and its writers.
Basic understanding of OmniGraph for the navigation implementation.
Running simulations as Standalone Applications or via the Script Editor.
10.11.2. Scenario
In this tutorial’s scenario, we use the Carter robot equipped with an OmniGraph navigation stack, notably without collision avoidance features. The navigation stack will constantly drive the robot towards a designated Xform target (<..>/targetXform
), positioned at the location of the randomized objects of interest. As the robot comes in the proximity of the object of interest, a synthetic data generation (SDG) pipeline is triggered to capture data from its two main camera sensors. Once the data is captured the objects of interest are re-randomized and the simulation continues. After a certain number of frames (env_interval
) the background environment is changed as well. After num_frames
the application terminates. The use_temp_rp
flag is used to provide an option to use temporary render products to improve the runtime performance. This will speed up the simulation by only using the render products when capturing the data, thus avoiding the overhead of rendering the sensor views when not capturing data.
The scenario will use the left and right camera sensors of Carter (<..>/stereo_cam_<left/right>_sensor_frame/camera_sensor_<left/right>
) to collect LdrColor (rgb) annotator data using Replicator. By default the data will be written to <working_dir>/_out_nav_sdg_demo
and will run for num_frames=9
iterations. Furthermore, it will change the background environment every env_interval=3
captured frames. The use_temp_rp
(default False) flag can be used to optimize performance by using temporary render products only when capturing the data. The following image provides an illustration of the resulting data from the various environments.
10.11.3. Implementation
The following section will provide an overview and explanation of the implementation and examples on how to run the demo.
The following snippet loads and starts the demo scene. The scenario will run for the given num_frames
and will change the background environment every env_interval
. The output will be written to the given out_dir
path. The use_temp_rp
parameter can be used to optimize performance by creating render products only for the frames when the data is captured.
The start method loads and runs the demo with the specified parameters, while clear halts the demo and clears any active subscribers and render products. You can use is_running to verify whether the demo is still running.
Starting the Demo
ENV_INTERVAL = 3
NUM_FRAMES = 9
USE_TEMP_RP = True
nav_demo = NavSDGDemo()
nav_demo.start(
num_frames=NUM_FRAMES,
out_dir=out_dir,
env_urls=ENV_URLS,
env_interval=ENV_INTERVAL,
use_temp_rp=USE_TEMP_RP,
seed=124,
)
The demo script is wrapped in its own class called NavSDGDemo
. Where its main attributes consist of:
self._carter_chassis
andself._carter_nav_target
prims are used to track Carter and its target Xform in the navigation graphself._dolly
is used as the target for the navigation target of carter and to track the distance to Carterself._dolly_light
randomized light placed above the dolly each captured frameself._props
list of prop prims to place and simulate above the dolly each captured frameself._cycled_env_urls
the paths for the background environments to cycle throughself._env_interval
is used to determine after how many frames to change the background environmentself._timeline
is used to control (play/pause) the simulation timeline between frame capturesself._timeline_sub
is the subscriber to the timeline ticks it is used as the feedback loop to trigger the synthetic data generationself._stage_event_sub
is a subscriber to stage closing events, used to clear the demo in case a new stage is openedself._stage
is used to access the active stage in order to create, access, and delete prims of interestself._trigger_distance
is used to determine the distance between Carter and the dolly at which the synthetic data generation should trigger, the value is randomized after each capture.self._num_frames
andself._frame_counter
are used to track and stop the demo after the given number of frames.self._writer
is the writer used to write the synthetic data to diskself._render_products
are the two render products attached to the left and right camera sensors of Carter, the writer is attached to these to access data from the annotators.self._use_temp_rp
is a flag which when set toTrue
will cause the demo to create and destroy render products for each frame capture, otherwise the render products are created once at the start of the demo and destroyed at the end.self._in_running_state
indicates the running state of the demo, useful to track whether the demo has finished or not
NavSDGDemo Class
class NavSDGDemo:
CARTER_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
DOLLY_URL = "/Isaac/Props/Dolly/dolly_physics.usd"
PROPS_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics"
LEFT_CAMERA_PATH = "/NavWorld/CarterNav/chassis_link/front_hawk/left/camera_left"
RIGHT_CAMERA_PATH = "/NavWorld/CarterNav/chassis_link/front_hawk/right/camera_right"
def __init__(self):
self._carter_chassis = None
self._carter_nav_target = None
self._dolly = None
self._dolly_light = None
self._props = []
self._cycled_env_urls = None
self._env_interval = 1
self._timeline = None
self._timeline_sub = None
self._stage_event_sub = None
self._stage = None
self._trigger_distance = 2.0
self._num_frames = 0
self._frame_counter = 0
self._writer = None
self._out_dir = None
self._render_products = []
self._use_temp_rp = False
self._in_running_state = False
The workflow’s main functions are start
and the _on_timeline_event
callback functions. start
will create a new environment with: navigation specific physics scene, Carter, the navigation graph with the target Xform, the dolly, a randomization light, props to drop around the dolly. It will also create the timeline subscriber with _on_timeline_event
as the callback function triggered each timeline tick. The _on_timeline_event
function checks if Carter is close enough to the dolly, if so it pauses the simulation, unsubscribes the timeline callback, and triggers the synthetic data generation (SDG). Depending on if the demo is running in the script editor or as a standalone application it will either run the SDG synchronously or asynchronously.
Workflow
def start(..)
[..]
self._load_env()
self._randomize_dolly_pose()
self._randomize_dolly_light()
self._randomize_prop_poses()
self._setup_sdg()
[..]
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
[..]
def _on_timeline_event(self, e: carb.events.IEvent):
carter_loc = self._carter_chassis.GetAttribute("xformOp:translate").Get()
dolly_loc = self._dolly.GetAttribute("xformOp:translate").Get()
dist = (Gf.Vec2f(dolly_loc[0], dolly_loc[1]) - Gf.Vec2f(carter_loc[0], carter_loc[1])).GetLength()
if dist < self._trigger_distance:
print(f"[NavSDGDemo] Capturing frame no. {self._frame_counter}")
self._timeline.pause()
self._timeline_sub.unsubscribe()
if self._is_running_in_script_editor():
import asyncio
task = asyncio.ensure_future(self._run_sdg_async())
task.add_done_callback(self._on_sdg_done)
else:
self._run_sdg()
self._setup_next_frame()
To randomize the environment before the synthetic data capture, the following functions are used:
_randomize_dolly_pose
: places the dolly at a random pose with a given minimum distance from Carter, once such a pose is found the navigation target is placed at the dolly’s position as well._randomize_dolly_light
: places the dolly light above the dolly with a new random color._randomize_prop_poses
: places the props above the dolly at random locations, which will eventually start to fall once the simulation starts.
Randomizations
def _randomize_dolly_pose(self):
min_dist_from_carter = 4
carter_loc = self._carter_chassis.GetAttribute("xformOp:translate").Get()
for _ in range(100):
x, y = random.uniform(-6, 6), random.uniform(-6, 6)
dist = (Gf.Vec2f(x, y) - Gf.Vec2f(carter_loc[0], carter_loc[1])).GetLength()
if dist > min_dist_from_carter:
self._dolly.GetAttribute("xformOp:translate").Set((x, y, 0))
self._carter_nav_target.GetAttribute("xformOp:translate").Set((x, y, 0))
break
self._dolly.GetAttribute("xformOp:rotateXYZ").Set((0, 0, random.uniform(-180, 180)))
def _randomize_dolly_light(self):
dolly_loc = self._dolly.GetAttribute("xformOp:translate").Get()
self._dolly_light.GetAttribute("xformOp:translate").Set(dolly_loc + (0, 0, 2.5))
self._dolly_light.GetAttribute("inputs:color").Set(
(random.uniform(0, 1), random.uniform(0, 1), random.uniform(0, 1))
)
def _randomize_prop_poses(self):
spawn_loc = self._dolly.GetAttribute("xformOp:translate").Get()
spawn_loc[2] = spawn_loc[2] + 0.5
for prop in self._props:
prop.GetAttribute("xformOp:translate").Set(spawn_loc + (random.uniform(-1, 1), random.uniform(-1, 1), 0))
spawn_loc[2] = spawn_loc[2] + 0.2
When executing the synthetic data generation (SDG) pipeline the rep.orchestrator.step
function is called, initiating the data capture and the execution of the writer’s write function. The rep.orchestrator.wait_until_complete
function will make sure the writer is finished with writing the data to disk. Depending on the value of the use_temp_rp
flag, the sensor’s render products are handled differently: if set to True
, render products are dynamically created for each capture using the _setup_render_products
method and subsequently destroyed with the _destroy_render_products method
. Otherwise (default case), the render products are only initialized once at start and destroyed at finish.
Synthetic Data Generation (SDG)
def _run_sdg(self):
if self._use_temp_rp:
self._setup_render_products()
rep.orchestrator.step(rt_subframes=16, pause_timeline=False)
rep.orchestrator.wait_until_complete()
if self._use_temp_rp:
self._destroy_render_products()
After the synthetic data generation (SDG) completes, the _setup_next_frame
function will prepare the simulation for the next frame. This involves incrementing the frame counter (self._frame_counter
), randomizing the dolly, dolly light, and props, and changing the background environment if the env_interval
is reached. Additionally the timeline and its subscriber will be re-started. In case the _num_frames
is reached the demo will terminate.
Next Frame
def _setup_next_frame(self):
self._frame_counter += 1
if self._frame_counter >= self._num_frames:
print(f"[NavSDGDemo] Finished")
self.clear()
return
self._randomize_dolly_pose()
self._randomize_dolly_light()
self._randomize_prop_poses()
if self._frame_counter % self._env_interval == 0:
self._load_next_env()
# Set a new random distance from which to take capture the next frame
self._trigger_distance = random.uniform(1.75, 2.5)
self._timeline.play()
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
To run the example as a standalone application, use the following command to execute the provided script. The script also accepts several optional arguments to customize its behavior:
./python.sh standalone_examples/replicator/amr_navigation.py
Arguments include:
--use_temp_rp
flag to use temporary render products (default: False)--num_frames
the number of frames to be captured (default: 9)--env_interval
the capture interval at which the background environment is changed (default: 3)
./python.sh standalone_examples/replicator/amr_navigation.py --use_temp_rp --num_frames 9 --env_interval 3
To run the example from the script editor, the following code needs to be executed:
import builtins
import os
import random
from itertools import cycle
import carb.settings
import omni.client
import omni.kit.app
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core.utils.stage import add_reference_to_stage, create_new_stage
from pxr import Gf, PhysxSchema, UsdGeom, UsdLux, UsdPhysics
class NavSDGDemo:
CARTER_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
DOLLY_URL = "/Isaac/Props/Dolly/dolly_physics.usd"
PROPS_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics"
LEFT_CAMERA_PATH = "/NavWorld/CarterNav/chassis_link/front_hawk/left/camera_left"
RIGHT_CAMERA_PATH = "/NavWorld/CarterNav/chassis_link/front_hawk/right/camera_right"
def __init__(self):
self._carter_chassis = None
self._carter_nav_target = None
self._dolly = None
self._dolly_light = None
self._props = []
self._cycled_env_urls = None
self._env_interval = 1
self._timeline = None
self._timeline_sub = None
self._stage_event_sub = None
self._stage = None
self._trigger_distance = 2.0
self._num_frames = 0
self._frame_counter = 0
self._writer = None
self._out_dir = None
self._render_products = []
self._use_temp_rp = False
self._in_running_state = False
def start(
self,
num_frames=10,
out_dir=None,
env_urls=[],
env_interval=3,
use_temp_rp=False,
seed=None,
):
print(f"[NavSDGDemo] Starting")
if seed is not None:
random.seed(seed)
self._num_frames = num_frames
self._out_dir = out_dir if out_dir is not None else os.path.join(os.getcwd(), "_out_nav_sdg_demo")
self._cycled_env_urls = cycle(env_urls)
self._env_interval = env_interval
self._use_temp_rp = use_temp_rp
self._frame_counter = 0
self._trigger_distance = 2.0
self._load_env()
self._randomize_dolly_pose()
self._randomize_dolly_light()
self._randomize_prop_poses()
self._setup_sdg()
self._timeline = omni.timeline.get_timeline_interface()
self._timeline.play()
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
self._stage_event_sub = (
omni.usd.get_context()
.get_stage_event_stream()
.create_subscription_to_pop_by_type(int(omni.usd.StageEventType.CLOSING), self._on_stage_closing_event)
)
self._in_running_state = True
def clear(self):
self._cycled_env_urls = None
self._carter_chassis = None
self._carter_nav_target = None
self._dolly = None
self._dolly_light = None
self._timeline = None
self._frame_counter = 0
if self._stage_event_sub:
self._stage_event_sub.unsubscribe()
self._stage_event_sub = None
if self._timeline_sub:
self._timeline_sub.unsubscribe()
self._timeline_sub = None
self._destroy_render_products()
self._stage = None
self._in_running_state = False
def is_running(self):
return self._in_running_state
def _is_running_in_script_editor(self):
return builtins.ISAAC_LAUNCHED_FROM_TERMINAL is True
def _on_stage_closing_event(self, e: carb.events.IEvent):
self.clear()
def _load_env(self):
# Fresh stage with custom physics scene for carter's navigation
create_new_stage()
self._stage = omni.usd.get_context().get_stage()
self._add_physics_scene()
# Environment
assets_root_path = get_assets_root_path()
add_reference_to_stage(usd_path=assets_root_path + next(self._cycled_env_urls), prim_path="/Environment")
# Carter
add_reference_to_stage(usd_path=assets_root_path + self.CARTER_URL, prim_path="/NavWorld/CarterNav")
self._carter_nav_target = self._stage.GetPrimAtPath("/NavWorld/CarterNav/targetXform")
self._carter_chassis = self._stage.GetPrimAtPath("/NavWorld/CarterNav/chassis_link")
# Dolly
add_reference_to_stage(usd_path=assets_root_path + self.DOLLY_URL, prim_path="/NavWorld/Dolly")
self._dolly = self._stage.GetPrimAtPath("/NavWorld/Dolly")
if not self._dolly.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(self._dolly).AddTranslateOp()
if not self._dolly.GetAttribute("xformOp:rotateXYZ"):
UsdGeom.Xformable(self._dolly).AddRotateXYZOp()
# Light
light = UsdLux.SphereLight.Define(self._stage, f"/NavWorld/DollyLight")
light.CreateRadiusAttr(0.5)
light.CreateIntensityAttr(35000)
light.CreateColorAttr(Gf.Vec3f(1.0, 1.0, 1.0))
self._dolly_light = light.GetPrim()
if not self._dolly_light.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(self._dolly_light).AddTranslateOp()
# Props
props_urls = []
props_folder_path = assets_root_path + self.PROPS_URL
result, entries = omni.client.list(props_folder_path)
if result != omni.client.Result.OK:
carb.log_error(f"Could not list assets in path: {props_folder_path}")
return
for entry in entries:
_, ext = os.path.splitext(entry.relative_path)
if ext == ".usd":
props_urls.append(f"{props_folder_path}/{entry.relative_path}")
cycled_props_url = cycle(props_urls)
for i in range(15):
prop_url = next(cycled_props_url)
prop_name = os.path.splitext(os.path.basename(prop_url))[0]
path = f"/NavWorld/Props/Prop_{prop_name}_{i}"
prim = self._stage.DefinePrim(path, "Xform")
prim.GetReferences().AddReference(prop_url)
self._props.append(prim)
def _add_physics_scene(self):
# Physics setup specific for the navigation graph
physics_scene = UsdPhysics.Scene.Define(self._stage, "/physicsScene")
physx_scene = PhysxSchema.PhysxSceneAPI.Apply(self._stage.GetPrimAtPath("/physicsScene"))
physx_scene.GetEnableCCDAttr().Set(True)
physx_scene.GetEnableGPUDynamicsAttr().Set(False)
physx_scene.GetBroadphaseTypeAttr().Set("MBP")
def _randomize_dolly_pose(self):
min_dist_from_carter = 4
carter_loc = self._carter_chassis.GetAttribute("xformOp:translate").Get()
for _ in range(100):
x, y = random.uniform(-6, 6), random.uniform(-6, 6)
dist = (Gf.Vec2f(x, y) - Gf.Vec2f(carter_loc[0], carter_loc[1])).GetLength()
if dist > min_dist_from_carter:
self._dolly.GetAttribute("xformOp:translate").Set((x, y, 0))
self._carter_nav_target.GetAttribute("xformOp:translate").Set((x, y, 0))
break
self._dolly.GetAttribute("xformOp:rotateXYZ").Set((0, 0, random.uniform(-180, 180)))
def _randomize_dolly_light(self):
dolly_loc = self._dolly.GetAttribute("xformOp:translate").Get()
self._dolly_light.GetAttribute("xformOp:translate").Set(dolly_loc + (0, 0, 2.5))
self._dolly_light.GetAttribute("inputs:color").Set(
(random.uniform(0, 1), random.uniform(0, 1), random.uniform(0, 1))
)
def _randomize_prop_poses(self):
spawn_loc = self._dolly.GetAttribute("xformOp:translate").Get()
spawn_loc[2] = spawn_loc[2] + 0.5
for prop in self._props:
prop.GetAttribute("xformOp:translate").Set(spawn_loc + (random.uniform(-1, 1), random.uniform(-1, 1), 0))
spawn_loc[2] = spawn_loc[2] + 0.2
def _setup_sdg(self):
# Disable capture on play and async rendering
carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
carb.settings.get_settings().set("/omni/replicator/asyncRendering", False)
carb.settings.get_settings().set("/app/asyncRendering", False)
# Set camera sensors fStop to 0.0 to get well lit sharp images
left_camera_prim = self._stage.GetPrimAtPath(self.LEFT_CAMERA_PATH)
left_camera_prim.GetAttribute("fStop").Set(0.0)
right_camera_prim = self._stage.GetPrimAtPath(self.RIGHT_CAMERA_PATH)
right_camera_prim.GetAttribute("fStop").Set(0.0)
self._writer = rep.WriterRegistry.get("BasicWriter")
self._writer.initialize(output_dir=self._out_dir, rgb=True)
# If no temporary render products are requested, create them once here and destroy them only at the end
if not self._use_temp_rp:
self._setup_render_products()
def _setup_render_products(self):
print(f"[NavSDGDemo] Creating render products")
rp_left = rep.create.render_product(
self.LEFT_CAMERA_PATH,
(512, 512),
name="left_sensor",
force_new=True,
)
rp_right = rep.create.render_product(
self.RIGHT_CAMERA_PATH,
(512, 512),
name="right_sensor",
force_new=True,
)
self._render_products = [rp_left, rp_right]
self._writer.attach(self._render_products)
rep.orchestrator.preview()
def _destroy_render_products(self):
print(f"[NavSDGDemo] Destroying render products")
if self._writer:
self._writer.detach()
for rp in self._render_products:
rp.destroy()
self._render_products.clear()
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
def _run_sdg(self):
if self._use_temp_rp:
self._setup_render_products()
rep.orchestrator.step(rt_subframes=16, pause_timeline=False)
rep.orchestrator.wait_until_complete()
if self._use_temp_rp:
self._destroy_render_products()
async def _run_sdg_async(self):
if self._use_temp_rp:
self._setup_render_products()
await rep.orchestrator.step_async(rt_subframes=16, pause_timeline=False)
await rep.orchestrator.wait_until_complete_async()
if self._use_temp_rp:
self._destroy_render_products()
def _load_next_env(self):
if self._stage.GetPrimAtPath("/Environment"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Environment"])
assets_root_path = get_assets_root_path()
add_reference_to_stage(usd_path=assets_root_path + next(self._cycled_env_urls), prim_path="/Environment")
def _on_sdg_done(self, task):
self._setup_next_frame()
def _setup_next_frame(self):
self._frame_counter += 1
if self._frame_counter >= self._num_frames:
print(f"[NavSDGDemo] Finished")
self.clear()
return
self._randomize_dolly_pose()
self._randomize_dolly_light()
self._randomize_prop_poses()
if self._frame_counter % self._env_interval == 0:
self._load_next_env()
# Set a new random distance from which to take capture the next frame
self._trigger_distance = random.uniform(1.75, 2.5)
self._timeline.play()
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
def _on_timeline_event(self, e: carb.events.IEvent):
carter_loc = self._carter_chassis.GetAttribute("xformOp:translate").Get()
dolly_loc = self._dolly.GetAttribute("xformOp:translate").Get()
dist = (Gf.Vec2f(dolly_loc[0], dolly_loc[1]) - Gf.Vec2f(carter_loc[0], carter_loc[1])).GetLength()
if dist < self._trigger_distance:
print(f"[NavSDGDemo] Capturing frame no. {self._frame_counter}")
self._timeline.pause()
self._timeline_sub.unsubscribe()
if self._is_running_in_script_editor():
import asyncio
task = asyncio.ensure_future(self._run_sdg_async())
task.add_done_callback(self._on_sdg_done)
else:
self._run_sdg()
self._setup_next_frame()
ENV_URLS = [
"/Isaac/Environments/Grid/default_environment.usd",
"/Isaac/Environments/Simple_Warehouse/warehouse.usd",
"/Isaac/Environments/Grid/gridroom_black.usd",
]
ENV_INTERVAL = 3
NUM_FRAMES = 9
USE_TEMP_RP = True
out_dir = os.path.join(os.getcwd(), "_out_nav_sdg_demo", "")
nav_demo = NavSDGDemo()
nav_demo.start(
num_frames=NUM_FRAMES,
out_dir=out_dir,
env_urls=ENV_URLS,
env_interval=ENV_INTERVAL,
use_temp_rp=USE_TEMP_RP,
seed=124,
)