10.12. Randomization in Simulation – UR10 Palletizing
Example of using Isaac Sim and Replicator to capture synthetic data from simulated environments (UR10 palletizing).
10.12.1. Learning Objectives
The goal of this tutorial is to provide an example on how to extend an existing Isaac Sim simulation to trigger a synthetic data generation (SDG) pipeline to randomize the environment and collect synthetic data at specific simulation events using the omni.replicator extension. The tutorial will make sure the SDG pipeline will not change the outcome of the running simulation and will cleanup its changes after each capture.
After this tutorial you will be able to:
Collect synthetic data at specific simulation events with Replicator:
Use annotators to collect the data and manually write it to disk
Use writers to implicitly write the data to disk
Setup various Replicator randomization graphs to:
Randomize lights around the object of interest
Randomize materials and textures of objects of interest running at different rates
Create and destroy Replicator randomization and capture graphs within the same simulation instance
Switch between different rendering modes on the fly
Create and destroy render products on the fly to improve runtime performance
10.12.1.1. Prerequisites
Familiarity with the omni.replicator extension and its annotators and writers.
Familiarity with Replicator randomizers and OmniGraph for a better understanding of the randomization pipeline.
Executing code from the Script Editor.
10.12.2. Scenario
For this tutorial, we will build on top of the UR10 Palletizing demo scene, which will programmatically be loaded and started by the provided script. The demo scene depicts a simple palletizing scenario where the UR10 robot picks up bins from a conveyor belt and places them on a pallet. For bins that are in a flipped state, the robot will first flip them over with a helper object before placing them on the pallet.
The events for which synthetic data will be collected are:
When the bin is placed on the flipping helper object
When the bin is placed on the pallet (or on another bin that is already on the pallet)
The annotator data collected by the scenario include the LdrColor (rgb) and instance segmentation. In the depicted figure, the left side image data belongs to the bin flip scenario. For each frame in this scenario, the camera pose is iterated through a predefined sequence, while the custom lights’ parameters are randomized. The data is directly accessed from the annotators and saved to disk using custom helper functions. On the right side of the figure, the data belongs to the bin on pallet scenario. Here, each captured frame the bin colors are randomized, and at a lower rate the camera pose and the pallet texture. The data is written to disk using a built-in Replicator writer (BasicWriter
). By default, data is generated for each manipulated bin in the palletizing demo, totaling to 36 bin iterations.
10.12.3. Implementation
The following snippet loads and starts the default UR10 Palletizing demo scene, followed by the synthetic data generation (SDG) addition part running for the given number of frames (num_captures
):
Starting the Demo
NUM_CAPTURES = 36
async def run_example_async():
from omni.isaac.examples.ur10_palletizing.ur10_palletizing import BinStacking
bin_staking_sample = BinStacking()
await bin_staking_sample.load_world_async()
await bin_staking_sample.on_event_async()
sdg_demo = PalletizingSDGDemo()
sdg_demo.start(num_captures=NUM_CAPTURES)
asyncio.ensure_future(run_example_async())
The demo script is wrapped in the PalletizingSDGDemo
class. It oversees the simulation environment and manages the synthetic data generation. The attributes of this class include:
self._bin_counter
andself._num_captures
are used to track the current bin index and the requested number of frames to captureself._stage
is used to access objects of interest in the environment during the simulationself._active_bin
is tracking the current active binself._stage_event_sub
is a subscriber to stage closing events, it is used to cleanup the demo if the stage is closedself._in_running_state
indicates whether the demo is currently runningself._bin_flip_scenario_done
flag to mark if the bin flip scenario has been completed, to avoid triggering it againself._timeline
- used to pause and resume the simulation in response to Synthetic Data Generation (SDG) eventsself._timeline_sub
- subscriber to timeline events, allowing the monitoring of the simulation state (tracking the active bin’s surroundings).self._overlap_extent
- represents an extent cache of the bin size, used to query for overlaps around the active binself._rep_camera
points to the temporary replicator camera to capture SDG dataself._output_dir
is the output directory where the SDG data gets stored
PalletizingSDGDemo Class
class PalletizingSDGDemo:
BINS_FOLDER_PATH = "/World/Ur10Table/bins"
FLIP_HELPER_PATH = "/World/Ur10Table/pallet_holder"
PALLET_PRIM_MESH_PATH = "/World/Ur10Table/pallet/Xform/Mesh_015"
BIN_FLIP_SCENARIO_FRAMES = 4
PALLET_SCENARIO_FRAMES = 16
def __init__(self):
# There are 36 bins in total
self._bin_counter = 0
self._num_captures = 36
self._stage = None
self._active_bin = None
# Cleanup in case the user closes the stage
self._stage_event_sub = None
# Simulation state flags
self._in_running_state = False
self._bin_flip_scenario_done = False
# Used to pause/resume the simulation
self._timeline = None
# Used to actively track the active bins surroundings (e.g., in contact with pallet)
self._timeline_sub = None
self._overlap_extent = None
# SDG
self._rep_camera = None
self._output_dir = os.path.join(os.getcwd(), "_out_palletizing_sdg_demo", "")
The start
function initializes and starts the SDG demo. During initialization (via self._init()
), it checks whether the UR10 palletizing demo is loaded and running. Additionally, it sets up the self._stage
and self._active_bin
attributes. The demo is then started with the self._start()
function. This function subscribes to timeline events through self._timeline_sub
, which uses the self._on_timeline_event
callback function to monitor the simulation state.
Workflow
def start(self, num_captures):
self._num_captures = num_captures if 1 <= num_captures <= 36 else 36
if self._init():
self._start()
[..]
def _init(self):
self._stage = omni.usd.get_context().get_stage()
self._active_bin = self._stage.GetPrimAtPath(f"{self.BINS_FOLDER_PATH}/bin_{self._bin_counter}")
if not self._active_bin:
print("[PalletizingSDGDemo] Could not find bin, make sure the palletizing demo is loaded..")
return False
bb_cache = create_bbox_cache()
half_ext = bb_cache.ComputeLocalBound(self._active_bin).GetRange().GetSize() * 0.5
self._overlap_extent = carb.Float3(half_ext[0], half_ext[1], half_ext[2] * 1.1)
[..]
def _start(self):
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
[..]
On every timeline advance update, the self._check_bin_overlaps
function is called to monitor the surroundings of the active bin. If an overlap is detected, the self._on_overlap_hit
callback function is invoked. This function will determine if the overlap is relevant to one of the two scenarios: bin flip or bin on pallet. If relevant, the simulation is paused, the timeline event subscription is removed, and the Synthetic Data Generation (SDG) starts for the current active bin. Depending on the current simulation state, the SDG is initiated either by the self._run_bin_flip_scenario
or the self._run_pallet_scenario
functions.
Bin Tracking
def _on_timeline_event(self, e: carb.events.IEvent):
self._check_bin_overlaps()
def _check_bin_overlaps(self):
bin_pose = omni.usd.get_world_transform_matrix(self._active_bin)
origin = bin_pose.ExtractTranslation()
quat_gf = bin_pose.ExtractRotation().GetQuaternion()
any_hit_flag = False
hit_info = get_physx_scene_query_interface().overlap_box(
carb.Float3(self._overlap_extent),
carb.Float3(origin[0], origin[1], origin[2]),
carb.Float4(
quat_gf.GetImaginary()[0], quat_gf.GetImaginary()[1], quat_gf.GetImaginary()[2], quat_gf.GetReal()
),
self._on_overlap_hit,
any_hit_flag,
)
def _on_overlap_hit(self, hit):
if hit.rigid_body == self._active_bin.GetPrimPath():
return True # Self hit, return True to continue the query
# First contact with the flip helper
if hit.rigid_body.startswith(self.FLIP_HELPER_PATH) and not self._bin_flip_scenario_done:
self._timeline.pause()
self._timeline_sub.unsubscribe()
self._timeline_sub = None
asyncio.ensure_future(self._run_bin_flip_scenario())
return False # Relevant hit, return False to finish the hit query
# Contact with the pallet or other bin on the pallet
pallet_hit = hit.rigid_body.startswith(self.PALLET_PRIM_MESH_PATH)
other_bin_hit = hit.rigid_body.startswith(f"{self.BINS_FOLDER_PATH}/bin_")
if pallet_hit or other_bin_hit:
self._timeline.pause()
self._timeline_sub.unsubscribe()
self._timeline_sub = None
asyncio.ensure_future(self._run_pallet_scenario())
return False # Relevant hit, return False to finish the hit query
return True # No relevant hit, return True to continue the query
When the active bin is positioned on the flip helper object, it triggers the bin flip scenario. In this scenario, path tracing is chosen as the rendering mode. For collecting the data, Replicator annotators are used directly to access the data, and helper functions will write the data to disk.
The _create_bin_flip_graph
function is used to create the Replicator randomization graphs for the bin flip scenario. This includes the creation of a camera and randomized lights. After setting up the graph, a delayed preview command is dispatched, ensuring the graph is fully created prior to launching the Synthetic Data Generation (SDG).
The rep.orchestrator.step_async
function is called for the requested number of frames (BIN_FLIP_SCENARIO_FRAMES
) to advance the randomization graph by one frame and provide the annotators with the new data. The data is then retrieved using the get_data()
function and saved to disk using the helper functions. To optimize simulation performance, render products are discarded after each SDG pipeline, and the constructed Replicator graphs are removed.
Once the SDG scenario is completed, the render mode is set back to raytrace. The timeline then resumes the simulation, and the timeline subscriber is reactivated to continue monitoring the simulation environment. To ensure the bin flip scenario doesn’t re-trigger, given that the bin remains in contact with the flip helper object, the self._bin_flip_scenario_done
flag is set to `True
.
Bin Flip Scenario
async def _run_bin_flip_scenario(self):
await omni.kit.app.get_app().next_update_async()
print(f"[PalletizingSDGDemo] Running bin flip scenario for bin {self._bin_counter}..")
# Util function to save rgb images to file
def save_img(rgb_data, filename):
rgb_img = Image.fromarray(rgb_data, "RGBA")
rgb_img.save(filename + ".png")
self._switch_to_pathtracing()
self._create_bin_flip_graph()
# Because graphs are constantly created and destroyed during the demo
# a delayed preview command is required to guarantee a full generation of the graph
EVENT_NAME = carb.events.type_from_string("omni.replicator.core.orchestrator")
rep.orchestrator._orchestrator._message_bus.push(EVENT_NAME, payload={"command": "preview"})
rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
is_annot = rep.AnnotatorRegistry.get_annotator("instance_segmentation", init_params={"colorize": True})
rp = rep.create.render_product(self._rep_camera, (512, 512))
rgb_annot.attach(rp)
is_annot.attach(rp)
out_dir = os.path.join(self._output_dir, f"annot_bin_{self._bin_counter}", "")
os.makedirs(out_dir, exist_ok=True)
for i in range(self.BIN_FLIP_SCENARIO_FRAMES):
await rep.orchestrator.step_async(pause_timeline=False)
rgb_data = rgb_annot.get_data()
rgb_filename = f"{out_dir}rgb_{i}"
save_img(rgb_data, rgb_filename)
is_data = is_annot.get_data()
is_filename = f"{out_dir}is_{i}"
is_img_data = is_data["data"]
height, width = is_img_data.shape[:2]
is_img_data = is_img_data.view(np.uint8).reshape(height, width, -1)
save_img(is_img_data, is_filename)
is_info = is_data["info"]
with open(f"{out_dir}is_info_{i}.json", "w") as f:
json.dump(is_info, f, indent=4)
await rep.orchestrator.wait_until_complete_async()
rgb_annot.detach()
is_annot.detach()
rp.destroy()
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
self._switch_to_raytracing()
self._bin_flip_scenario_done = True
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
self._timeline.play()
For the bin flip scenario, the Replicator randomization graph uses a predefined color palette list. This list provides options for the system to randomly select colors when varying the lights using rep.distribution.choice(color_palette)
. Meanwhile, the camera operates from a set of predefined locations. Instead of random selections, the camera sequentially transitions between these locations using rep.distribution.sequence(camera_positions)
. Both the randomization of lights and the systematic camera movement are programmed to execute with every frame capture, as indicated by rep.trigger.on_frame()
.
Bin Flip Randomization Graph
def _create_bin_flip_graph(self):
# Create new random lights using the color palette for the color attribute
color_palette = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
def randomize_bin_flip_lights():
lights = rep.create.light(
light_type="Sphere",
temperature=rep.distribution.normal(6500, 2000),
intensity=rep.distribution.normal(45000, 15000),
position=rep.distribution.uniform((0.25, 0.25, 0.5), (1, 1, 0.75)),
scale=rep.distribution.uniform(0.5, 0.8),
color=rep.distribution.choice(color_palette),
count=3,
)
return lights.node
rep.randomizer.register(randomize_bin_flip_lights)
# Move the camera to the given location sequences and look at the predefined location
camera_positions = [(1.96, 0.72, -0.34), (1.48, 0.70, 0.90), (0.79, -0.86, 0.12), (-0.49, 1.47, 0.58)]
self._rep_camera = rep.create.camera()
with rep.trigger.on_frame():
rep.randomizer.randomize_bin_flip_lights()
with self._rep_camera:
rep.modify.pose(position=rep.distribution.sequence(camera_positions), look_at=(0.78, 0.72, -0.1))
When the active bin is placed on the pallet, or on top of another bin on the pallet, it triggers the bin on pallet scenario. Because the randomization graph will be modifying the materials and textures of the bins and the pallet, these original materials are cached. This ensures they can be reapplied once the simulation resumes.
The _create_bin_and_pallet_graph
function sets up the Replicator randomization graphs for this scenario. These graphs include the camera, which will randomize its position around the pallet, the varying materials for the bins placed on the pallet, and the alternating textures for the pallet itself. After the graph is created, a delayed preview command is dispatched to ensure it is fully generated before the Synthetic Data Generation (SDG) begins.
For data writing, the bin on pallet scenario uses a built-in Replicator writer known as BasicWriter
. For each frame defined by PALLET_SCENARIO_FRAMES
, the rep.orchestrator.step_async
function advances the randomization graph by a single frame. This action also triggers the writer to save the data to disk. To improve performance during the simulation, the created render products are discarded after each scenario, and the generated graphs are removed.
Once the scenario is done the cached materials are re-applied and the simulation is resumed. The system then checks if it has processed the last bin. If not, it designates the next bin as active and reactivates the timeline subscriber to continue monitoring the simulation environment.
Bin on Pallet Scenario
async def _run_pallet_scenario(self):
await omni.kit.app.get_app().next_update_async()
print(f"[PalletizingSDGDemo] Running pallet scenario for bin {self._bin_counter}..")
mesh_to_orig_mats = {}
pallet_mesh = self._stage.GetPrimAtPath(self.PALLET_PRIM_MESH_PATH)
pallet_orig_mat, _ = UsdShade.MaterialBindingAPI(pallet_mesh).ComputeBoundMaterial()
mesh_to_orig_mats[pallet_mesh] = pallet_orig_mat
for i in range(self._bin_counter + 1):
bin_mesh = self._stage.GetPrimAtPath(f"{self.BINS_FOLDER_PATH}/bin_{i}/Visuals/FOF_Mesh_Magenta_Box")
bin_orig_mat, _ = UsdShade.MaterialBindingAPI(bin_mesh).ComputeBoundMaterial()
mesh_to_orig_mats[bin_mesh] = bin_orig_mat
self._create_bin_and_pallet_graph()
# Because graphs are constantly created and destroyed during the demo
# a delayed preview command is required to guarantee a full generation of the graph
EVENT_NAME = carb.events.type_from_string("omni.replicator.core.orchestrator")
rep.orchestrator._orchestrator._message_bus.push(EVENT_NAME, payload={"command": "preview"})
out_dir = os.path.join(self._output_dir, f"writer_bin_{self._bin_counter}", "")
writer = rep.WriterRegistry.get("BasicWriter")
writer.initialize(output_dir=out_dir, rgb=True, instance_segmentation=True, colorize_instance_segmentation=True)
rp = rep.create.render_product(self._rep_camera, (512, 512))
writer.attach(rp)
for i in range(self.PALLET_SCENARIO_FRAMES):
await rep.orchestrator.step_async(rt_subframes=24, pause_timeline=False)
writer.detach()
rp.destroy()
for mesh, mat in mesh_to_orig_mats.items():
print(f"[PalletizingSDGDemo] Restoring original material({mat}) for {mesh.GetPath()}")
UsdShade.MaterialBindingAPI(mesh).Bind(mat, UsdShade.Tokens.strongerThanDescendants)
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
self._replicator_running = False
self._timeline.play()
if self._next_bin():
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
For the bin on pallet scenario, the Replicator randomization graph randomizes the colors of the bin materials. A predefined list of textures is used, from which the graph randomly selects and applies th pallet textures, this is done by rep.randomizer.texture(texture_paths,..)
. The camera’s position varies around the pallet using rep.distribution.uniform(..)
and is oriented towards the pallet’s location. The trigger is split into two parts: the bin materials are changed every frame as shown by rep.trigger.on_frame()
, while the pallet textures and the camera positions are executed every 4 frames, represented by rep.trigger.on_frame(interval=4)
.
Bin on Pallet Randomization Graph
def _create_bin_and_pallet_graph(self):
# Bin material randomization
bin_paths = [
f"{self.BINS_FOLDER_PATH}/bin_{i}/Visuals/FOF_Mesh_Magenta_Box" for i in range(self._bin_counter + 1)
]
bins_node = rep.get.prim_at_path(bin_paths)
with rep.trigger.on_frame():
mats = rep.create.material_omnipbr(
diffuse=rep.distribution.uniform((0.2, 0.1, 0.3), (0.6, 0.6, 0.7)),
roughness=rep.distribution.choice([0.1, 0.9]),
count=10,
)
with bins_node:
rep.randomizer.materials(mats)
# Camera and pallet texture randomization at a slower rate
assets_root_path = get_assets_root_path()
texture_paths = [
assets_root_path + "/NVIDIA/Materials/Base/Wood/Oak/Oak_BaseColor.png",
assets_root_path + "/NVIDIA/Materials/Base/Wood/Ash/Ash_BaseColor.png",
assets_root_path + "/NVIDIA/Materials/Base/Wood/Plywood/Plywood_BaseColor.png",
assets_root_path + "/NVIDIA/Materials/Base/Wood/Timber/Timber_BaseColor.png",
]
pallet_node = rep.get.prim_at_path(self.PALLET_PRIM_MESH_PATH)
pallet_prim = pallet_node.get_output_prims()["prims"][0]
pallet_loc = omni.usd.get_world_transform_matrix(pallet_prim).ExtractTranslation()
self._rep_camera = rep.create.camera()
with rep.trigger.on_frame(interval=4):
with pallet_node:
rep.randomizer.texture(texture_paths, texture_rotate=rep.distribution.uniform(80, 95))
with self._rep_camera:
rep.modify.pose(
position=rep.distribution.uniform((0, -2, 1), (2, 1, 2)),
look_at=(pallet_loc[0], pallet_loc[1], pallet_loc[2]),
)
The example can be run from UI using the Script Editor:
import asyncio
import json
import os
import carb
import numpy as np
import omni
import omni.kit.commands
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from omni.isaac.core.utils.bounds import create_bbox_cache
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.kit.utils import set_carb_setting
from omni.physx import get_physx_scene_query_interface
from PIL import Image
from pxr import UsdShade
class PalletizingSDGDemo:
BINS_FOLDER_PATH = "/World/Ur10Table/bins"
FLIP_HELPER_PATH = "/World/Ur10Table/pallet_holder"
PALLET_PRIM_MESH_PATH = "/World/Ur10Table/pallet/Xform/Mesh_015"
BIN_FLIP_SCENARIO_FRAMES = 4
PALLET_SCENARIO_FRAMES = 16
def __init__(self):
# There are 36 bins in total
self._bin_counter = 0
self._num_captures = 36
self._stage = None
self._active_bin = None
# Cleanup in case the user closes the stage
self._stage_event_sub = None
# Simulation state flags
self._in_running_state = False
self._bin_flip_scenario_done = False
# Used to pause/resume the simulation
self._timeline = None
# Used to actively track the active bins surroundings (e.g., in contact with pallet)
self._timeline_sub = None
self._overlap_extent = None
# SDG
self._rep_camera = None
self._output_dir = os.path.join(os.getcwd(), "_out_palletizing_sdg_demo", "")
print(f"[PalletizingSDGDemo] Output directory: {self._output_dir}")
def start(self, num_captures):
self._num_captures = num_captures if 1 <= num_captures <= 36 else 36
if self._init():
self._start()
def is_running(self):
return self._in_running_state
def _init(self):
self._stage = omni.usd.get_context().get_stage()
self._active_bin = self._stage.GetPrimAtPath(f"{self.BINS_FOLDER_PATH}/bin_{self._bin_counter}")
if not self._active_bin:
print("[PalletizingSDGDemo] Could not find bin, make sure the palletizing demo is loaded..")
return False
bb_cache = create_bbox_cache()
half_ext = bb_cache.ComputeLocalBound(self._active_bin).GetRange().GetSize() * 0.5
self._overlap_extent = carb.Float3(half_ext[0], half_ext[1], half_ext[2] * 1.1)
self._timeline = omni.timeline.get_timeline_interface()
if not self._timeline.is_playing():
print("[PalletizingSDGDemo] Please start the palletizing demo first..")
return False
carb_settings = carb.settings.get_settings()
carb_settings.set("/omni/replicator/captureOnPlay", False)
carb_settings.set("/omni/replicator/asyncRendering", False)
carb_settings.set("/app/asyncRendering", False)
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
return True
def _start(self):
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
self._stage_event_sub = (
omni.usd.get_context()
.get_stage_event_stream()
.create_subscription_to_pop_by_type(int(omni.usd.StageEventType.CLOSING), self._on_stage_closing_event)
)
self._in_running_state = True
print("[PalletizingSDGDemo] Starting the palletizing SDG demo..")
def clear(self):
if self._timeline_sub:
self._timeline_sub.unsubscribe()
self._timeline_sub = None
self._stage_event_sub = None
self._in_running_state = False
self._bin_counter = 0
self._active_bin = None
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
def _on_stage_closing_event(self, e: carb.events.IEvent):
# Make sure the subscribers are unsubscribed for new stages
self.clear()
def _on_timeline_event(self, e: carb.events.IEvent):
self._check_bin_overlaps()
def _check_bin_overlaps(self):
bin_pose = omni.usd.get_world_transform_matrix(self._active_bin)
origin = bin_pose.ExtractTranslation()
quat_gf = bin_pose.ExtractRotation().GetQuaternion()
any_hit_flag = False
hit_info = get_physx_scene_query_interface().overlap_box(
carb.Float3(self._overlap_extent),
carb.Float3(origin[0], origin[1], origin[2]),
carb.Float4(
quat_gf.GetImaginary()[0], quat_gf.GetImaginary()[1], quat_gf.GetImaginary()[2], quat_gf.GetReal()
),
self._on_overlap_hit,
any_hit_flag,
)
def _on_overlap_hit(self, hit):
if hit.rigid_body == self._active_bin.GetPrimPath():
return True # Self hit, return True to continue the query
# First contact with the flip helper
if hit.rigid_body.startswith(self.FLIP_HELPER_PATH) and not self._bin_flip_scenario_done:
self._timeline.pause()
self._timeline_sub.unsubscribe()
self._timeline_sub = None
asyncio.ensure_future(self._run_bin_flip_scenario())
return False # Relevant hit, return False to finish the hit query
# Contact with the pallet or other bin on the pallet
pallet_hit = hit.rigid_body.startswith(self.PALLET_PRIM_MESH_PATH)
other_bin_hit = hit.rigid_body.startswith(f"{self.BINS_FOLDER_PATH}/bin_")
if pallet_hit or other_bin_hit:
self._timeline.pause()
self._timeline_sub.unsubscribe()
self._timeline_sub = None
asyncio.ensure_future(self._run_pallet_scenario())
return False # Relevant hit, return False to finish the hit query
return True # No relevant hit, return True to continue the query
def _switch_to_pathtracing(self):
carb_settings = carb.settings.get_settings()
set_carb_setting(carb_settings, "/rtx/rendermode", "PathTracing")
set_carb_setting(carb_settings, "/rtx/pathtracing/spp", 32)
set_carb_setting(carb_settings, "/rtx/pathtracing/totalSpp", 32)
set_carb_setting(carb_settings, "/rtx/pathtracing/clampSpp", 32)
def _switch_to_raytracing(self):
carb_settings = carb.settings.get_settings()
set_carb_setting(carb_settings, "/rtx/rendermode", "RayTracedLighting")
# 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
set_carb_setting(carb_settings, "/rtx/post/aa/op", 3)
async def _run_bin_flip_scenario(self):
await omni.kit.app.get_app().next_update_async()
print(f"[PalletizingSDGDemo] Running bin flip scenario for bin {self._bin_counter}..")
# Util function to save rgb images to file
def save_img(rgb_data, filename):
rgb_img = Image.fromarray(rgb_data, "RGBA")
rgb_img.save(filename + ".png")
self._switch_to_pathtracing()
self._create_bin_flip_graph()
# Because graphs are constantly created and destroyed during the demo
# a delayed preview command is required to guarantee a full generation of the graph
EVENT_NAME = carb.events.type_from_string("omni.replicator.core.orchestrator")
rep.orchestrator._orchestrator._message_bus.push(EVENT_NAME, payload={"command": "preview"})
rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
is_annot = rep.AnnotatorRegistry.get_annotator("instance_segmentation", init_params={"colorize": True})
rp = rep.create.render_product(self._rep_camera, (512, 512))
rgb_annot.attach(rp)
is_annot.attach(rp)
out_dir = os.path.join(self._output_dir, f"annot_bin_{self._bin_counter}", "")
os.makedirs(out_dir, exist_ok=True)
for i in range(self.BIN_FLIP_SCENARIO_FRAMES):
await rep.orchestrator.step_async(pause_timeline=False)
rgb_data = rgb_annot.get_data()
rgb_filename = f"{out_dir}rgb_{i}"
save_img(rgb_data, rgb_filename)
is_data = is_annot.get_data()
is_filename = f"{out_dir}is_{i}"
is_img_data = is_data["data"]
height, width = is_img_data.shape[:2]
is_img_data = is_img_data.view(np.uint8).reshape(height, width, -1)
save_img(is_img_data, is_filename)
is_info = is_data["info"]
with open(f"{out_dir}is_info_{i}.json", "w") as f:
json.dump(is_info, f, indent=4)
await rep.orchestrator.wait_until_complete_async()
rgb_annot.detach()
is_annot.detach()
rp.destroy()
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
self._switch_to_raytracing()
self._bin_flip_scenario_done = True
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
self._timeline.play()
def _create_bin_flip_graph(self):
# Create new random lights using the color palette for the color attribute
color_palette = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]
def randomize_bin_flip_lights():
lights = rep.create.light(
light_type="Sphere",
temperature=rep.distribution.normal(6500, 2000),
intensity=rep.distribution.normal(45000, 15000),
position=rep.distribution.uniform((0.25, 0.25, 0.5), (1, 1, 0.75)),
scale=rep.distribution.uniform(0.5, 0.8),
color=rep.distribution.choice(color_palette),
count=3,
)
return lights.node
rep.randomizer.register(randomize_bin_flip_lights)
# Move the camera to the given location sequences and look at the predefined location
camera_positions = [(1.96, 0.72, -0.34), (1.48, 0.70, 0.90), (0.79, -0.86, 0.12), (-0.49, 1.47, 0.58)]
self._rep_camera = rep.create.camera()
with rep.trigger.on_frame():
rep.randomizer.randomize_bin_flip_lights()
with self._rep_camera:
rep.modify.pose(position=rep.distribution.sequence(camera_positions), look_at=(0.78, 0.72, -0.1))
async def _run_pallet_scenario(self):
await omni.kit.app.get_app().next_update_async()
print(f"[PalletizingSDGDemo] Running pallet scenario for bin {self._bin_counter}..")
mesh_to_orig_mats = {}
pallet_mesh = self._stage.GetPrimAtPath(self.PALLET_PRIM_MESH_PATH)
pallet_orig_mat, _ = UsdShade.MaterialBindingAPI(pallet_mesh).ComputeBoundMaterial()
mesh_to_orig_mats[pallet_mesh] = pallet_orig_mat
for i in range(self._bin_counter + 1):
bin_mesh = self._stage.GetPrimAtPath(f"{self.BINS_FOLDER_PATH}/bin_{i}/Visuals/FOF_Mesh_Magenta_Box")
bin_orig_mat, _ = UsdShade.MaterialBindingAPI(bin_mesh).ComputeBoundMaterial()
mesh_to_orig_mats[bin_mesh] = bin_orig_mat
self._create_bin_and_pallet_graph()
# Because graphs are constantly created and destroyed during the demo
# a delayed preview command is required to guarantee a full generation of the graph
EVENT_NAME = carb.events.type_from_string("omni.replicator.core.orchestrator")
rep.orchestrator._orchestrator._message_bus.push(EVENT_NAME, payload={"command": "preview"})
out_dir = os.path.join(self._output_dir, f"writer_bin_{self._bin_counter}", "")
writer = rep.WriterRegistry.get("BasicWriter")
writer.initialize(output_dir=out_dir, rgb=True, instance_segmentation=True, colorize_instance_segmentation=True)
rp = rep.create.render_product(self._rep_camera, (512, 512))
writer.attach(rp)
for i in range(self.PALLET_SCENARIO_FRAMES):
await rep.orchestrator.step_async(rt_subframes=24, pause_timeline=False)
writer.detach()
rp.destroy()
for mesh, mat in mesh_to_orig_mats.items():
print(f"[PalletizingSDGDemo] Restoring original material({mat}) for {mesh.GetPath()}")
UsdShade.MaterialBindingAPI(mesh).Bind(mat, UsdShade.Tokens.strongerThanDescendants)
if self._stage.GetPrimAtPath("/Replicator"):
omni.kit.commands.execute("DeletePrimsCommand", paths=["/Replicator"])
self._replicator_running = False
self._timeline.play()
if self._next_bin():
self._timeline_sub = self._timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.CURRENT_TIME_TICKED), self._on_timeline_event
)
def _create_bin_and_pallet_graph(self):
# Bin material randomization
bin_paths = [
f"{self.BINS_FOLDER_PATH}/bin_{i}/Visuals/FOF_Mesh_Magenta_Box" for i in range(self._bin_counter + 1)
]
bins_node = rep.get.prim_at_path(bin_paths)
with rep.trigger.on_frame():
mats = rep.create.material_omnipbr(
diffuse=rep.distribution.uniform((0.2, 0.1, 0.3), (0.6, 0.6, 0.7)),
roughness=rep.distribution.choice([0.1, 0.9]),
count=10,
)
with bins_node:
rep.randomizer.materials(mats)
# Camera and pallet texture randomization at a slower rate
assets_root_path = get_assets_root_path()
texture_paths = [
assets_root_path + "/NVIDIA/Materials/Base/Wood/Oak/Oak_BaseColor.png",
assets_root_path + "/NVIDIA/Materials/Base/Wood/Ash/Ash_BaseColor.png",
assets_root_path + "/NVIDIA/Materials/Base/Wood/Plywood/Plywood_BaseColor.png",
assets_root_path + "/NVIDIA/Materials/Base/Wood/Timber/Timber_BaseColor.png",
]
pallet_node = rep.get.prim_at_path(self.PALLET_PRIM_MESH_PATH)
pallet_prim = pallet_node.get_output_prims()["prims"][0]
pallet_loc = omni.usd.get_world_transform_matrix(pallet_prim).ExtractTranslation()
self._rep_camera = rep.create.camera()
with rep.trigger.on_frame(interval=4):
with pallet_node:
rep.randomizer.texture(texture_paths, texture_rotate=rep.distribution.uniform(80, 95))
with self._rep_camera:
rep.modify.pose(
position=rep.distribution.uniform((0, -2, 1), (2, 1, 2)),
look_at=(pallet_loc[0], pallet_loc[1], pallet_loc[2]),
)
def _next_bin(self):
self._bin_counter += 1
if self._bin_counter >= self._num_captures:
self.clear()
print("[PalletizingSDGDemo] Palletizing SDG demo finished..")
return False
self._active_bin = self._stage.GetPrimAtPath(f"{self.BINS_FOLDER_PATH}/bin_{self._bin_counter}")
print(f"[PalletizingSDGDemo] Moving to bin {self._bin_counter}..")
self._bin_flip_scenario_done = False
return True
NUM_CAPTURES = 36
async def run_example_async():
from omni.isaac.examples.ur10_palletizing.ur10_palletizing import BinStacking
bin_staking_sample = BinStacking()
await bin_staking_sample.load_world_async()
await bin_staking_sample.on_event_async()
sdg_demo = PalletizingSDGDemo()
sdg_demo.start(num_captures=NUM_CAPTURES)
asyncio.ensure_future(run_example_async())