Useful Snippets#

Various examples of Isaac Sim Replicator snippets that can be run as Standalone Applications or from the UI using the Script Editor.

Annotator and Custom Writer Data from Multiple Cameras#

Example on how to access data from multiple cameras in a scene using annotators or custom writers. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/multi_camera.py
Annotator and Custom Writer Data from Multiple Cameras
  1import asyncio
  2import os
  3
  4import carb.settings
  5import omni.replicator.core as rep
  6import omni.usd
  7from omni.replicator.core import Writer
  8from omni.replicator.core.backends import DiskBackend
  9from omni.replicator.core.functional import write_image
 10
 11NUM_FRAMES = 5
 12
 13# Randomize cube color every frame using a graph-based replicator randomizer
 14def cube_color_randomizer():
 15    cube_prims = rep.get.prims(path_pattern="Cube")
 16    with cube_prims:
 17        rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
 18    return cube_prims.node
 19
 20# Example of custom writer class to access the annotator data
 21class MyWriter(Writer):
 22    def __init__(self, rgb: bool = True):
 23        # Organize data from render product perspective (legacy, annotator, renderProduct)
 24        self.data_structure = "renderProduct"
 25        self.annotators = []
 26        self._frame_id = 0
 27        if rgb:
 28            # Create a new rgb annotator and add it to the writer's list of annotators
 29            self.annotators.append(rep.annotators.get("rgb"))
 30        # Create writer output directory and initialize DiskBackend
 31        output_dir = os.path.join(os.getcwd(), "_out_mc_writer")
 32        print(f"Writing writer data to {output_dir}")
 33        self.backend = DiskBackend(output_dir=output_dir, overwrite=True)
 34
 35    def write(self, data):
 36        if "renderProducts" in data:
 37            for rp_name, rp_data in data["renderProducts"].items():
 38                if "rgb" in rp_data:
 39                    file_path = f"{rp_name}_frame_{self._frame_id}.png"
 40                    self.backend.schedule(write_image, data=rp_data["rgb"]["data"], path=file_path)
 41        self._frame_id += 1
 42
 43rep.WriterRegistry.register(MyWriter)
 44
 45# Create a new stage
 46omni.usd.get_context().new_stage()
 47
 48# Set global random seed for the replicator randomizer
 49rep.set_global_seed(11)
 50
 51# Disable capture on play to capture data manually using step
 52rep.orchestrator.set_capture_on_play(False)
 53
 54# Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 55carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
 56
 57# Setup stage
 58rep.functional.create.xform(name="World")
 59rep.functional.create.dome_light(intensity=900, parent="/World", name="DomeLight")
 60cube = rep.functional.create.cube(parent="/World", name="Cube", semantics={"class": "my_cube"})
 61
 62# Register the graph-based cube color randomizer to trigger on every frame
 63rep.randomizer.register(cube_color_randomizer)
 64with rep.trigger.on_frame():
 65    rep.randomizer.cube_color_randomizer()
 66
 67# Create cameras
 68cam_top = rep.functional.create.camera(position=(0, 0, 5), look_at=(0, 0, 0), parent="/World", name="CamTop")
 69cam_side = rep.functional.create.camera(position=(2, 2, 0), look_at=(0, 0, 0), parent="/World", name="CamSide")
 70cam_persp = rep.functional.create.camera(
 71    position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="CamPersp"
 72)
 73
 74# Create the render products
 75rp_top = rep.create.render_product(cam_top, resolution=(320, 320), name="RpTop")
 76rp_side = rep.create.render_product(cam_side, resolution=(640, 640), name="RpSide")
 77rp_persp = rep.create.render_product(cam_persp, resolution=(1024, 1024), name="RpPersp")
 78
 79# Example of accessing the data through a custom writer
 80writer = rep.WriterRegistry.get("MyWriter")
 81writer.initialize(rgb=True)
 82writer.attach([rp_top, rp_side, rp_persp])
 83
 84# Example of accessing the data directly through annotators
 85rgb_annotators = []
 86for rp in [rp_top, rp_side, rp_persp]:
 87    # Create a new rgb annotator for each render product
 88    rgb = rep.annotators.get("rgb")
 89    # Attach the annotator to the render product
 90    rgb.attach(rp)
 91    rgb_annotators.append(rgb)
 92
 93# Create annotator output directory
 94output_dir_annot = os.path.join(os.getcwd(), "_out_mc_annot")
 95print(f"Writing annotator data to {output_dir_annot}")
 96os.makedirs(output_dir_annot, exist_ok=True)
 97
 98async def run_example_async():
 99    for i in range(NUM_FRAMES):
100        print(f"Step {i}")
101        # The step function triggers registered graph-based randomizers, collects data from annotators,
102        # and invokes the write function of attached writers with the annotator data
103        await rep.orchestrator.step_async(rt_subframes=32)
104        for j, rgb_annot in enumerate(rgb_annotators):
105            file_path = os.path.join(output_dir_annot, f"rp{j}_step_{i}.png")
106            write_image(path=file_path, data=rgb_annot.get_data())
107
108    # Wait for the data to be written and release resources
109    await rep.orchestrator.wait_until_complete_async()
110    writer.detach()
111    for annot in rgb_annotators:
112        annot.detach()
113    for rp in [rp_top, rp_side, rp_persp]:
114        rp.destroy()
115
116asyncio.ensure_future(run_example_async())
Annotator and Custom Writer Data from Multiple Cameras
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp(launch_config={"headless": False})
  4
  5import os
  6
  7import carb.settings
  8import omni.replicator.core as rep
  9import omni.usd
 10from omni.replicator.core import Writer
 11from omni.replicator.core.backends import DiskBackend
 12from omni.replicator.core.functional import write_image
 13
 14NUM_FRAMES = 5
 15
 16
 17# Randomize cube color every frame using a graph-based replicator randomizer
 18def cube_color_randomizer():
 19    cube_prims = rep.get.prims(path_pattern="Cube")
 20    with cube_prims:
 21        rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
 22    return cube_prims.node
 23
 24
 25# Example of custom writer class to access the annotator data
 26class MyWriter(Writer):
 27    def __init__(self, rgb: bool = True):
 28        # Organize data from render product perspective (legacy, annotator, renderProduct)
 29        self.data_structure = "renderProduct"
 30        self.annotators = []
 31        self._frame_id = 0
 32        if rgb:
 33            # Create a new rgb annotator and add it to the writer's list of annotators
 34            self.annotators.append(rep.annotators.get("rgb"))
 35        # Create writer output directory and initialize DiskBackend
 36        output_dir = os.path.join(os.getcwd(), "_out_mc_writer")
 37        print(f"Writing writer data to {output_dir}")
 38        self.backend = DiskBackend(output_dir=output_dir, overwrite=True)
 39
 40    def write(self, data):
 41        if "renderProducts" in data:
 42            for rp_name, rp_data in data["renderProducts"].items():
 43                if "rgb" in rp_data:
 44                    file_path = f"{rp_name}_frame_{self._frame_id}.png"
 45                    self.backend.schedule(write_image, data=rp_data["rgb"]["data"], path=file_path)
 46        self._frame_id += 1
 47
 48
 49rep.WriterRegistry.register(MyWriter)
 50
 51# Create a new stage
 52omni.usd.get_context().new_stage()
 53
 54# Set global random seed for the replicator randomizer
 55rep.set_global_seed(11)
 56
 57# Disable capture on play to capture data manually using step
 58rep.orchestrator.set_capture_on_play(False)
 59
 60# Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 61carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
 62
 63# Setup stage
 64rep.functional.create.xform(name="World")
 65rep.functional.create.dome_light(intensity=900, parent="/World", name="DomeLight")
 66cube = rep.functional.create.cube(parent="/World", name="Cube", semantics={"class": "my_cube"})
 67
 68# Register the graph-based cube color randomizer to trigger on every frame
 69rep.randomizer.register(cube_color_randomizer)
 70with rep.trigger.on_frame():
 71    rep.randomizer.cube_color_randomizer()
 72
 73# Create cameras
 74cam_top = rep.functional.create.camera(position=(0, 0, 5), look_at=(0, 0, 0), parent="/World", name="CamTop")
 75cam_side = rep.functional.create.camera(position=(2, 2, 0), look_at=(0, 0, 0), parent="/World", name="CamSide")
 76cam_persp = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="CamPersp")
 77
 78# Create the render products
 79rp_top = rep.create.render_product(cam_top, resolution=(320, 320), name="RpTop")
 80rp_side = rep.create.render_product(cam_side, resolution=(640, 640), name="RpSide")
 81rp_persp = rep.create.render_product(cam_persp, resolution=(1024, 1024), name="RpPersp")
 82
 83# Example of accessing the data through a custom writer
 84writer = rep.WriterRegistry.get("MyWriter")
 85writer.initialize(rgb=True)
 86writer.attach([rp_top, rp_side, rp_persp])
 87
 88# Example of accessing the data directly through annotators
 89rgb_annotators = []
 90for rp in [rp_top, rp_side, rp_persp]:
 91    # Create a new rgb annotator for each render product
 92    rgb = rep.annotators.get("rgb")
 93    # Attach the annotator to the render product
 94    rgb.attach(rp)
 95    rgb_annotators.append(rgb)
 96
 97# Create annotator output directory
 98output_dir_annot = os.path.join(os.getcwd(), "_out_mc_annot")
 99print(f"Writing annotator data to {output_dir_annot}")
100os.makedirs(output_dir_annot, exist_ok=True)
101
102for i in range(NUM_FRAMES):
103    print(f"Step {i}")
104    # The step function triggers registered graph-based randomizers, collects data from annotators,
105    # and invokes the write function of attached writers with the annotator data
106    rep.orchestrator.step(rt_subframes=32)
107    for j, rgb_annot in enumerate(rgb_annotators):
108        file_path = os.path.join(output_dir_annot, f"rp{j}_step_{i}.png")
109        write_image(path=file_path, data=rgb_annot.get_data())
110
111# Wait for the data to be written and release resources
112rep.orchestrator.wait_until_complete()
113writer.detach()
114for annot in rgb_annotators:
115    annot.detach()
116for rp in [rp_top, rp_side, rp_persp]:
117    rp.destroy()
118
119simulation_app.close()

Synthetic Data Access at Specific Simulation Timepoints#

Example on how to access synthetic data (RGB, semantic segmentation) from multiple cameras in a simulation scene at specific events using annotators or writers. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/simulation_get_data.py
Synthetic Data Access at Specific Simulation Timepoints
 1import asyncio
 2import json
 3import os
 4
 5import carb.settings
 6import numpy as np
 7import omni
 8import omni.replicator.core as rep
 9from isaacsim.core.experimental.objects import GroundPlane
10from isaacsim.core.simulation_manager import SimulationManager
11from omni.replicator.core.functional import write_image, write_json
12from pxr import UsdPhysics
13
14# Util function to save semantic segmentation annotator data
15def write_sem_data(sem_data, file_path):
16    id_to_labels = sem_data["info"]["idToLabels"]
17    write_json(path=file_path + ".json", data=id_to_labels)
18    sem_image_data = sem_data["data"]
19    write_image(path=file_path + ".png", data=sem_image_data)
20
21# Create a new stage
22omni.usd.get_context().new_stage()
23
24# Setting capture on play to False will prevent the replicator from capturing data each frame
25rep.orchestrator.set_capture_on_play(False)
26
27# Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
28carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
29
30# Add a dome light and a ground plane
31rep.functional.create.xform(name="World")
32rep.functional.create.dome_light(intensity=500, parent="/World", name="DomeLight")
33ground_plane = GroundPlane("/World/GroundPlane")
34rep.functional.modify.semantics(ground_plane.prims, {"class": "ground_plane"}, mode="add")
35
36# Create a camera and render product to collect the data from
37rep.functional.create.xform(name="World")
38cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
39rp = rep.create.render_product(cam, resolution=(512, 512), name="MyRenderProduct")
40
41# Set the output directory for the data
42out_dir = os.path.join(os.getcwd(), "_out_sim_event")
43writer_dir = os.path.join(out_dir, "writer")
44annotator_dir = os.path.join(out_dir, "annotator")
45
46os.makedirs(out_dir, exist_ok=True)
47os.makedirs(writer_dir, exist_ok=True)
48os.makedirs(annotator_dir, exist_ok=True)
49
50print(f"Outputting data to {out_dir}..")
51backend = rep.backends.get("DiskBackend")
52backend.initialize(output_dir=writer_dir)
53
54# Example of using a writer to save the data
55writer = rep.WriterRegistry.get("BasicWriter")
56writer.initialize(backend=backend, rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True)
57writer.attach(rp)
58
59# Example of accesing the data directly from annotators
60rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
61rgb_annot.attach(rp)
62sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
63sem_annot.attach(rp)
64
65# Initialize the simulation manager
66SimulationManager.initialize_physics()
67
68async def run_example_async():
69    # Spawn and drop a few cubes, capture data when they stop moving
70    for i in range(5):
71        cube = rep.functional.create.cube(name=f"Cuboid_{i}", parent="/World")
72        rep.functional.modify.position(cube, (0, 0, 10 + i))
73        rep.functional.modify.semantics(cube, {"class": "cuboid"}, mode="add")
74        rep.functional.physics.apply_rigid_body(cube, with_collider=True)
75        physics_rigid_body_api = UsdPhysics.RigidBodyAPI(cube)
76
77        for s in range(500):
78            SimulationManager.step()
79            linear_velocity = physics_rigid_body_api.GetVelocityAttr().Get()
80            speed = np.linalg.norm(linear_velocity)
81
82            if speed < 0.1:
83                print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
84                # Tigger the writer and update the annotators with new data
85                await rep.orchestrator.step_async(rt_subframes=4, delta_time=0.0, pause_timeline=False)
86                rgb_path = os.path.join(annotator_dir, f"Cube_{i}_step_{s}_rgb.png")
87                sem_path = os.path.join(annotator_dir, f"Cube_{i}_step_{s}_sem")
88                write_image(path=rgb_path, data=rgb_annot.get_data())
89                write_sem_data(sem_annot.get_data(), sem_path)
90                break
91
92    # Wait for the data to be written to disk and clean up resources
93    await rep.orchestrator.wait_until_complete_async()
94    rgb_annot.detach()
95    sem_annot.detach()
96    writer.detach()
97    rp.destroy()
98
99asyncio.ensure_future(run_example_async())
Synthetic Data Access at Specific Simulation Timepoints
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp(launch_config={"headless": False})
  4
  5import json
  6import os
  7
  8import carb.settings
  9import numpy as np
 10import omni
 11import omni.replicator.core as rep
 12from isaacsim.core.experimental.objects import GroundPlane
 13from isaacsim.core.simulation_manager import SimulationManager
 14from omni.replicator.core.functional import write_image, write_json
 15from pxr import UsdPhysics
 16
 17
 18# Util function to save semantic segmentation annotator data
 19def write_sem_data(sem_data, file_path):
 20    id_to_labels = sem_data["info"]["idToLabels"]
 21    write_json(path=file_path + ".json", data=id_to_labels)
 22    sem_image_data = sem_data["data"]
 23    write_image(path=file_path + ".png", data=sem_image_data)
 24
 25
 26# Create a new stage
 27omni.usd.get_context().new_stage()
 28
 29# Setting capture on play to False will prevent the replicator from capturing data each frame
 30rep.orchestrator.set_capture_on_play(False)
 31
 32# Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 33carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
 34
 35# Add a dome light and a ground plane
 36rep.functional.create.xform(name="World")
 37rep.functional.create.dome_light(intensity=500, parent="/World", name="DomeLight")
 38ground_plane = GroundPlane("/World/GroundPlane")
 39rep.functional.modify.semantics(ground_plane.prims, {"class": "ground_plane"}, mode="add")
 40
 41# Create a camera and render product to collect the data from
 42cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
 43rp = rep.create.render_product(cam, resolution=(512, 512), name="MyRenderProduct")
 44
 45# Set the output directory for the data
 46out_dir = os.path.join(os.getcwd(), "_out_sim_event")
 47writer_dir = os.path.join(out_dir, "writer")
 48annotator_dir = os.path.join(out_dir, "annotator")
 49
 50os.makedirs(out_dir, exist_ok=True)
 51os.makedirs(writer_dir, exist_ok=True)
 52os.makedirs(annotator_dir, exist_ok=True)
 53
 54print(f"Outputting data to {out_dir}..")
 55backend = rep.backends.get("DiskBackend")
 56backend.initialize(output_dir=writer_dir)
 57
 58# Example of using a writer to save the data
 59writer = rep.WriterRegistry.get("BasicWriter")
 60writer.initialize(backend=backend, rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True)
 61writer.attach(rp)
 62
 63# Example of accesing the data directly from annotators
 64rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
 65rgb_annot.attach(rp)
 66sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
 67sem_annot.attach(rp)
 68
 69# Initialize the simulation manager
 70simulation_manager = SimulationManager()
 71simulation_manager.initialize_physics()
 72
 73# Spawn and drop a few cubes, capture data when they stop moving
 74for i in range(5):
 75    cube = rep.functional.create.cube(name=f"Cuboid_{i}", parent="/World")
 76    rep.functional.modify.position(cube, (0, 0, 10 + i))
 77    rep.functional.modify.semantics(cube, {"class": "cuboid"}, mode="add")
 78    rep.functional.physics.apply_rigid_body(cube, with_collider=True)
 79    physics_rigid_body_api = UsdPhysics.RigidBodyAPI(cube)
 80
 81    for s in range(500):
 82        simulation_manager.step(render=False)
 83        linear_velocity = physics_rigid_body_api.GetVelocityAttr().Get()
 84        speed = np.linalg.norm(linear_velocity)
 85
 86        if speed < 0.1:
 87            print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
 88            # Tigger the writer and update the annotators with new data
 89            rep.orchestrator.step(rt_subframes=4, delta_time=0.0, pause_timeline=False)
 90            rgb_path = os.path.join(annotator_dir, f"Cube_{i}_step_{s}_rgb.png")
 91            write_image(path=rgb_path, data=rgb_annot.get_data())
 92            sem_path = os.path.join(annotator_dir, f"Cube_{i}_step_{s}_sem")
 93            write_sem_data(sem_annot.get_data(), sem_path)
 94            break
 95
 96# Wait for the data to be written to disk and clean up resources
 97rep.orchestrator.wait_until_complete()
 98rgb_annot.detach()
 99sem_annot.detach()
100writer.detach()
101rp.destroy()
102
103simulation_app.close()

Custom Event Randomization and Writing#

The following example showcases the use of custom events to trigger randomizations and data writing at various times throughout the simulation. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_event_and_write.py
Custom Event Randomization and Writing
 1import asyncio
 2import os
 3
 4import carb.settings
 5import omni.replicator.core as rep
 6import omni.usd
 7
 8omni.usd.get_context().new_stage()
 9
10# Set global random seed for the replicator randomizer to ensure reproducibility
11rep.set_global_seed(11)
12
13# Setting capture on play to False will prevent the replicator from capturing data each frame
14rep.orchestrator.set_capture_on_play(False)
15
16# Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
17carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
18
19rep.functional.create.xform(name="World")
20rep.functional.create.distant_light(intensity=4000, rotation=(315, 0, 0), parent="/World", name="DistantLight")
21small_cube = rep.functional.create.cube(scale=0.75, position=(-1.5, 1.5, 0), parent="/World", name="SmallCube")
22large_cube = rep.functional.create.cube(scale=1.25, position=(1.5, -1.5, 0), parent="/World", name="LargeCube")
23
24# Graph-based randomizations triggered on custom events
25with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
26    small_cube_node = rep.get.prim_at_path(small_cube.GetPath())
27    with small_cube_node:
28        rep.randomizer.rotation()
29
30with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
31    large_cube_node = rep.get.prim_at_path(large_cube.GetPath())
32    with large_cube_node:
33        rep.randomizer.rotation()
34
35# Use the disk backend to write the data to disk
36out_dir = os.path.join(os.getcwd(), "_out_custom_event")
37print(f"Writing data to {out_dir}")
38backend = rep.backends.get("DiskBackend")
39backend.initialize(output_dir=out_dir)
40
41cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
42rp = rep.create.render_product(cam, (512, 512))
43writer = rep.WriterRegistry.get("BasicWriter")
44writer.initialize(backend=backend, rgb=True)
45writer.attach(rp)
46
47async def run_example_async():
48    print(f"Capturing at original positions")
49    await rep.orchestrator.step_async(rt_subframes=8)
50
51    print("Randomizing small cube rotation (graph-based) and capturing...")
52    rep.utils.send_og_event(event_name="randomize_small_cube")
53    await rep.orchestrator.step_async(rt_subframes=8)
54
55    print("Moving small cube position (USD API) and capturing...")
56    small_cube.GetAttribute("xformOp:translate").Set((-1.5, 1.5, -2))
57    await rep.orchestrator.step_async(rt_subframes=8)
58
59    print("Randomizing large cube rotation (graph-based) and capturing...")
60    rep.utils.send_og_event(event_name="randomize_large_cube")
61    await rep.orchestrator.step_async(rt_subframes=8)
62
63    print("Moving large cube position (USD API) and capturing...")
64    large_cube.GetAttribute("xformOp:translate").Set((1.5, -1.5, 2))
65    await rep.orchestrator.step_async(rt_subframes=8)
66
67    # Wait until all the data is saved to disk and cleanup writer and render product
68    await rep.orchestrator.wait_until_complete_async()
69    writer.detach()
70    rp.destroy()
71
72asyncio.ensure_future(run_example_async())
Custom Event Randomization and Writing
 1from isaacsim import SimulationApp
 2
 3simulation_app = SimulationApp(launch_config={"headless": False})
 4
 5import os
 6
 7import carb.settings
 8import omni.replicator.core as rep
 9import omni.usd
10
11omni.usd.get_context().new_stage()
12
13# Set global random seed for the replicator randomizer to ensure reproducibility
14rep.set_global_seed(11)
15
16# Setting capture on play to False will prevent the replicator from capturing data each frame
17rep.orchestrator.set_capture_on_play(False)
18
19# Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
20carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
21
22rep.functional.create.xform(name="World")
23rep.functional.create.distant_light(intensity=4000, rotation=(315, 0, 0), parent="/World", name="DistantLight")
24small_cube = rep.functional.create.cube(scale=0.75, position=(-1.5, 1.5, 0), parent="/World", name="SmallCube")
25large_cube = rep.functional.create.cube(scale=1.25, position=(1.5, -1.5, 0), parent="/World", name="LargeCube")
26
27# Graph-based randomizations triggered on custom events
28with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
29    small_cube_node = rep.get.prim_at_path(small_cube.GetPath())
30    with small_cube_node:
31        rep.randomizer.rotation()
32
33with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
34    large_cube_node = rep.get.prim_at_path(large_cube.GetPath())
35    with large_cube_node:
36        rep.randomizer.rotation()
37
38# Use the disk backend to write the data to disk
39out_dir = os.path.join(os.getcwd(), "_out_custom_event")
40print(f"Writing data to {out_dir}")
41backend = rep.backends.get("DiskBackend")
42backend.initialize(output_dir=out_dir)
43
44cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
45rp = rep.create.render_product(cam, (512, 512))
46writer = rep.WriterRegistry.get("BasicWriter")
47writer.initialize(backend=backend, rgb=True)
48writer.attach(rp)
49
50
51def run_example():
52    print(f"Capturing at original positions")
53    rep.orchestrator.step(rt_subframes=8)
54
55    print("Randomizing small cube rotation (graph-based) and capturing...")
56    rep.utils.send_og_event(event_name="randomize_small_cube")
57    rep.orchestrator.step(rt_subframes=8)
58
59    print("Moving small cube position (USD API) and capturing...")
60    small_cube.GetAttribute("xformOp:translate").Set((-1.5, 1.5, -2))
61    rep.orchestrator.step(rt_subframes=8)
62
63    print("Randomizing large cube rotation (graph-based) and capturing...")
64    rep.utils.send_og_event(event_name="randomize_large_cube")
65    rep.orchestrator.step(rt_subframes=8)
66
67    print("Moving large cube position (USD API) and capturing...")
68    large_cube.GetAttribute("xformOp:translate").Set((1.5, -1.5, 2))
69    rep.orchestrator.step(rt_subframes=8)
70
71    # Wait until all the data is saved to disk and cleanup writer and render product
72    rep.orchestrator.wait_until_complete()
73    writer.detach()
74    rp.destroy()
75
76
77run_example()
78
79simulation_app.close()

Motion Blur#

This example demonstrates how to capture motion blur data using RTX Real-Time and RTX Interactive (Path Tracing) rendering modes. For the RTX - Real-Time mode, refer to motion blur parameters. For the RTX – Interactive (Path Tracing) mode, motion blur is achieved by rendering multiple subframes (/omni/replicator/pathTracedMotionBlurSubSamples) and combining them to create the effect.

The example uses animated and physics-enabled assets with synchronized motion. Keyframe animated assets can be advanced at any custom delta time due to their interpolated motion, whereas physics-enabled assets require a custom physics FPS to ensure motion samples at any custom delta time. The example showcases how to compute the target physics FPS, change it if needed, and restore the original physics FPS after capturing the motion blur.

The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/motion_blur.py
Motion Blur
  1import asyncio
  2import os
  3
  4import carb.settings
  5import omni.kit.app
  6import omni.replicator.core as rep
  7import omni.timeline
  8import omni.usd
  9from isaacsim.storage.native import get_assets_root_path
 10from pxr import PhysxSchema, UsdPhysics
 11
 12# Paths to the animated and physics-ready assets
 13PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd"
 14ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd"
 15
 16# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s)
 17ASSET_VELOCITIES = [0, 5, 10]
 18ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)]
 19
 20# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets
 21ANIMATION_DURATION = 10
 22
 23# Number of frames to capture for each scenario
 24NUM_FRAMES = 3
 25
 26# Configuration for motion blur examples
 27DELTA_TIMES = [None, 1 / 30, 1 / 60, 1 / 240]
 28SAMPLES_PER_PIXEL = [32, 128]
 29MOTION_BLUR_SUBSAMPLES = [4, 16]
 30
 31def setup_stage():
 32    """Create a new USD stage with animated and physics-enabled assets with synchronized motion."""
 33    omni.usd.get_context().new_stage()
 34    settings = carb.settings.get_settings()
 35    # Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 36    settings.set("rtx/post/dlss/execMode", 2)
 37
 38    # Capture data only on request
 39    rep.orchestrator.set_capture_on_play(False)
 40
 41    stage = omni.usd.get_context().get_stage()
 42    timeline = omni.timeline.get_timeline_interface()
 43    timeline.set_end_time(ANIMATION_DURATION)
 44
 45    # Create lights
 46    rep.functional.create.xform(name="World")
 47    rep.functional.create.dome_light(intensity=100, parent="/World", name="DomeLight")
 48    rep.functional.create.distant_light(
 49        intensity=2500, rotation=(315, 0, 0), parent="/World", name="DistantLight"
 50    )
 51
 52    # Setup the physics assets with gravity disabled and the requested velocity
 53    assets_root_path = get_assets_root_path()
 54    physics_asset_url = assets_root_path + PHYSICS_ASSET_URL
 55    for location, velocity in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 56        prim = rep.functional.create.reference(
 57            usd_path=physics_asset_url,
 58            parent="/World",
 59            name=f"physics_asset_{int(abs(velocity))}",
 60            position=location,
 61        )
 62        physics_rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
 63        physics_rigid_body_api.GetVelocityAttr().Set((0, 0, -velocity))
 64        physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
 65        physx_rigid_body_api.GetDisableGravityAttr().Set(True)
 66        physx_rigid_body_api.GetAngularDampingAttr().Set(0.0)
 67        physx_rigid_body_api.GetLinearDampingAttr().Set(0.0)
 68
 69    # Setup animated assets maintaining the same velocity as the physics assets
 70    anim_asset_url = assets_root_path + ANIM_ASSET_URL
 71    for location, velocity in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 72        start_location = (-location[0], location[1], location[2])
 73        prim = rep.functional.create.reference(
 74            usd_path=anim_asset_url,
 75            parent="/World",
 76            name=f"anim_asset_{int(abs(velocity))}",
 77            position=start_location,
 78        )
 79        animation_distance = velocity * ANIMATION_DURATION
 80        end_location = (start_location[0], start_location[1], start_location[2] - animation_distance)
 81        end_keyframe_time = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION
 82        # Timesampled keyframe (animated) translation
 83        prim.GetAttribute("xformOp:translate").Set(start_location, time=0)
 84        prim.GetAttribute("xformOp:translate").Set(end_location, time=end_keyframe_time)
 85
 86async def run_motion_blur_example_async(
 87    num_frames=NUM_FRAMES, delta_time=None, use_path_tracing=True, motion_blur_subsamples=8, samples_per_pixel=64
 88):
 89    """Capture motion blur frames with the given delta time step and render mode."""
 90    setup_stage()
 91    stage = omni.usd.get_context().get_stage()
 92    settings = carb.settings.get_settings()
 93
 94    # Enable motion blur capture
 95    settings.set("/omni/replicator/captureMotionBlur", True)
 96
 97    # Set motion blur settings based on the render mode
 98    if use_path_tracing:
 99        print("[MotionBlur] Setting PathTracing render mode motion blur settings")
100        settings.set("/rtx/rendermode", "PathTracing")
101        # (int): Total number of samples for each rendered pixel, per frame.
102        settings.set("/rtx/pathtracing/spp", samples_per_pixel)
103        # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit.
104        settings.set("/rtx/pathtracing/totalSpp", samples_per_pixel)
105        settings.set("/rtx/pathtracing/optixDenoiser/enabled", 0)
106        # Number of sub samples to render if in PathTracing render mode and motion blur is enabled.
107        settings.set("/omni/replicator/pathTracedMotionBlurSubSamples", motion_blur_subsamples)
108    else:
109        print("[MotionBlur] Setting RealTimePathTracing render mode motion blur settings")
110        settings.set("/rtx/rendermode", "RealTimePathTracing")
111        # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
112        settings.set("/rtx/post/aa/op", 2)
113        # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter.
114        settings.set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02)
115        # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample.
116        settings.set("/rtx/post/motionblur/exposureFraction", 1.0)
117        # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance.
118        settings.set("/rtx/post/motionblur/numSamples", 8)
119
120    # Setup backend
121    mode_str = f"pt_subsamples_{motion_blur_subsamples}_spp_{samples_per_pixel}" if use_path_tracing else "rt"
122    delta_time_str = "None" if delta_time is None else f"{delta_time:.4f}"
123    output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_func_dt_{delta_time_str}_{mode_str}")
124    print(f"[MotionBlur] Output directory: {output_directory}")
125    backend = rep.backends.get("DiskBackend")
126    backend.initialize(output_dir=output_directory)
127
128    # Setup writer and render product
129    camera = rep.functional.create.camera(
130        position=(0, 1.5, 0), look_at=(0, 0, 0), parent="/World", name="MotionBlurCam"
131    )
132    render_product = rep.create.render_product(camera, (1280, 720))
133    writer = rep.WriterRegistry.get("BasicWriter")
134    writer.initialize(backend=backend, rgb=True)
135    writer.attach(render_product)
136
137    # Run a few updates to make sure all materials are fully loaded for capture
138    for _ in range(5):
139        await omni.kit.app.get_app().next_update_async()
140
141    # Create or get the physics scene
142    rep.functional.physics.create_physics_scene(path="/PhysicsScene")
143    physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
144
145    # Check the target physics depending on the delta time and the render mode
146    target_physics_fps = stage.GetTimeCodesPerSecond() if delta_time is None else 1 / delta_time
147    if use_path_tracing:
148        target_physics_fps *= motion_blur_subsamples
149
150    # Check if the physics FPS needs to be increased to match the delta time
151    original_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
152    if target_physics_fps > original_physics_fps:
153        print(f"[MotionBlur] Changing physics FPS from {original_physics_fps} to {target_physics_fps}")
154        physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
155
156    # Start the timeline for physics updates in the step function
157    timeline = omni.timeline.get_timeline_interface()
158    timeline.play()
159
160    # Capture frames
161    for i in range(num_frames):
162        print(f"[MotionBlur] \tCapturing frame {i}")
163        await rep.orchestrator.step_async(delta_time=delta_time)
164
165    # Restore the original physics FPS
166    if target_physics_fps > original_physics_fps:
167        print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {original_physics_fps}")
168        physx_scene.GetTimeStepsPerSecondAttr().Set(original_physics_fps)
169
170    # Switch back to the raytracing render mode
171    if use_path_tracing:
172        print("[MotionBlur] Restoring render mode to RealTimePathTracing")
173        settings.set("/rtx/rendermode", "RealTimePathTracing")
174
175    # Wait until the data is fully written
176    await rep.orchestrator.wait_until_complete_async()
177
178    # Cleanup
179    writer.detach()
180    render_product.destroy()
181
182async def run_motion_blur_examples_async(num_frames, delta_times, samples_per_pixel, motion_blur_subsamples):
183    print(
184        f"[MotionBlur] Running with delta_times={delta_times}, samples_per_pixel={samples_per_pixel}, motion_blur_subsamples={motion_blur_subsamples}"
185    )
186
187    for delta_time in delta_times:
188        # RayTracing examples
189        await run_motion_blur_example_async(
190            num_frames=num_frames, delta_time=delta_time, use_path_tracing=False
191        )
192        # PathTracing examples
193        for motion_blur_subsample in motion_blur_subsamples:
194            for samples_per_pixel_value in samples_per_pixel:
195                await run_motion_blur_example_async(
196                    num_frames=num_frames,
197                    delta_time=delta_time,
198                    use_path_tracing=True,
199                    motion_blur_subsamples=motion_blur_subsample,
200                    samples_per_pixel=samples_per_pixel_value,
201                )
202
203asyncio.ensure_future(
204    run_motion_blur_examples_async(
205        num_frames=NUM_FRAMES,
206        delta_times=DELTA_TIMES,
207        samples_per_pixel=SAMPLES_PER_PIXEL,
208        motion_blur_subsamples=MOTION_BLUR_SUBSAMPLES,
209    )
210)
211
212
213async def run_motion_blur_examples_async():
214    motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240]
215    for custom_delta_time in motion_blur_step_duration:
216        # RayTracing examples
217        await run_motion_blur_example_async(delta_time=custom_delta_time, use_path_tracing=False)
218        # PathTracing examples
219        spps = [32, 128]
220        motion_blur_sub_samples = [4, 16]
221        for motion_blur_sub_sample in motion_blur_sub_samples:
222            for spp in spps:
223                await run_motion_blur_example_async(
224                    delta_time=custom_delta_time,
225                    use_path_tracing=True,
226                    motion_blur_subsamples=motion_blur_sub_sample,
227                    samples_per_pixel=spp,
228                )
229
230
231asyncio.ensure_future(run_motion_blur_examples_async())
Motion Blur
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import argparse
  6import os
  7
  8import carb.settings
  9import omni.replicator.core as rep
 10import omni.timeline
 11import omni.usd
 12from isaacsim.storage.native import get_assets_root_path
 13from pxr import PhysxSchema, UsdPhysics
 14
 15# Paths to the animated and physics-ready assets
 16PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd"
 17ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd"
 18
 19# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s)
 20ASSET_VELOCITIES = [0, 5, 10]
 21ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)]
 22
 23# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets
 24ANIMATION_DURATION = 10
 25
 26# Number of frames to capture for each scenario
 27NUM_FRAMES = 3
 28
 29
 30def parse_delta_time(value):
 31    """Convert string to float or None. Accepts 'None', -1, 0, or numeric values."""
 32    if value.lower() == "none":
 33        return None
 34    float_value = float(value)
 35    return None if float_value in (-1, 0) else float_value
 36
 37
 38parser = argparse.ArgumentParser()
 39parser.add_argument(
 40    "--delta_times",
 41    nargs="*",
 42    type=parse_delta_time,
 43    default=[None, 1 / 30, 1 / 60, 1 / 240],
 44    help="List of delta times (seconds per frame) to use for motion blur captures. Use 'None' for default stage time.",
 45)
 46parser.add_argument(
 47    "--samples_per_pixel",
 48    nargs="*",
 49    type=int,
 50    default=[32, 128],
 51    help="List of samples per pixel (spp) values for path tracing",
 52)
 53parser.add_argument(
 54    "--motion_blur_subsamples",
 55    nargs="*",
 56    type=int,
 57    default=[4, 16],
 58    help="List of motion blur subsample values for path tracing",
 59)
 60args, _ = parser.parse_known_args()
 61delta_times = args.delta_times
 62samples_per_pixel = args.samples_per_pixel
 63motion_blur_subsamples = args.motion_blur_subsamples
 64
 65
 66def setup_stage():
 67    """Create a new USD stage with animated and physics-enabled assets with synchronized motion."""
 68    omni.usd.get_context().new_stage()
 69    settings = carb.settings.get_settings()
 70    # Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 71    settings.set("rtx/post/dlss/execMode", 2)
 72
 73    # Capture data only on request
 74    rep.orchestrator.set_capture_on_play(False)
 75
 76    stage = omni.usd.get_context().get_stage()
 77    timeline = omni.timeline.get_timeline_interface()
 78    timeline.set_end_time(ANIMATION_DURATION)
 79
 80    # Create lights
 81    rep.functional.create.xform(name="World")
 82    rep.functional.create.dome_light(intensity=100, parent="/World", name="DomeLight")
 83    rep.functional.create.distant_light(intensity=2500, rotation=(315, 0, 0), parent="/World", name="DistantLight")
 84
 85    # Setup the physics assets with gravity disabled and the requested velocity
 86    assets_root_path = get_assets_root_path()
 87    physics_asset_url = assets_root_path + PHYSICS_ASSET_URL
 88    for location, velocity in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 89        prim = rep.functional.create.reference(
 90            usd_path=physics_asset_url, parent="/World", name=f"physics_asset_{int(abs(velocity))}", position=location
 91        )
 92        physics_rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
 93        physics_rigid_body_api.GetVelocityAttr().Set((0, 0, -velocity))
 94        physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
 95        physx_rigid_body_api.GetDisableGravityAttr().Set(True)
 96        physx_rigid_body_api.GetAngularDampingAttr().Set(0.0)
 97        physx_rigid_body_api.GetLinearDampingAttr().Set(0.0)
 98
 99    # Setup animated assets maintaining the same velocity as the physics assets
100    anim_asset_url = assets_root_path + ANIM_ASSET_URL
101    for location, velocity in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
102        start_location = (-location[0], location[1], location[2])
103        prim = rep.functional.create.reference(
104            usd_path=anim_asset_url, parent="/World", name=f"anim_asset_{int(abs(velocity))}", position=start_location
105        )
106        animation_distance = velocity * ANIMATION_DURATION
107        end_location = (start_location[0], start_location[1], start_location[2] - animation_distance)
108        end_keyframe_time = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION
109        # Timesampled keyframe (animated) translation
110        prim.GetAttribute("xformOp:translate").Set(start_location, time=0)
111        prim.GetAttribute("xformOp:translate").Set(end_location, time=end_keyframe_time)
112
113
114def run_motion_blur_example(
115    num_frames, delta_time=None, use_path_tracing=True, motion_blur_subsamples=8, samples_per_pixel=64
116):
117    """Capture motion blur frames with the given delta time step and render mode."""
118    setup_stage()
119    stage = omni.usd.get_context().get_stage()
120    settings = carb.settings.get_settings()
121
122    # Enable motion blur capture
123    settings.set("/omni/replicator/captureMotionBlur", True)
124
125    # Set motion blur settings based on the render mode
126    if use_path_tracing:
127        print("[MotionBlur] Setting PathTracing render mode motion blur settings")
128        settings.set("/rtx/rendermode", "PathTracing")
129        # (int): Total number of samples for each rendered pixel, per frame.
130        settings.set("/rtx/pathtracing/spp", samples_per_pixel)
131        # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit.
132        settings.set("/rtx/pathtracing/totalSpp", samples_per_pixel)
133        settings.set("/rtx/pathtracing/optixDenoiser/enabled", 0)
134        # Number of sub samples to render if in PathTracing render mode and motion blur is enabled.
135        settings.set("/omni/replicator/pathTracedMotionBlurSubSamples", motion_blur_subsamples)
136    else:
137        print("[MotionBlur] Setting RealTimePathTracing render mode motion blur settings")
138        settings.set("/rtx/rendermode", "RealTimePathTracing")
139        # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
140        settings.set("/rtx/post/aa/op", 2)
141        # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter.
142        settings.set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02)
143        # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample.
144        settings.set("/rtx/post/motionblur/exposureFraction", 1.0)
145        # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance.
146        settings.set("/rtx/post/motionblur/numSamples", 8)
147
148    # Setup backend
149    mode_str = f"pt_subsamples_{motion_blur_subsamples}_spp_{samples_per_pixel}" if use_path_tracing else "rt"
150    delta_time_str = "None" if delta_time is None else f"{delta_time:.4f}"
151    output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_func_dt_{delta_time_str}_{mode_str}")
152    print(f"[MotionBlur] Output directory: {output_directory}")
153    backend = rep.backends.get("DiskBackend")
154    backend.initialize(output_dir=output_directory)
155
156    # Setup writer and render product
157    camera = rep.functional.create.camera(
158        position=(0, 1.5, 0), look_at=(0, 0, 0), parent="/World", name="MotionBlurCam"
159    )
160    render_product = rep.create.render_product(camera, (1280, 720))
161    writer = rep.WriterRegistry.get("BasicWriter")
162    writer.initialize(backend=backend, rgb=True)
163    writer.attach(render_product)
164
165    # Run a few updates to make sure all materials are fully loaded for capture
166    for _ in range(5):
167        simulation_app.update()
168
169    # Create or get the physics scene
170    rep.functional.physics.create_physics_scene(path="/PhysicsScene")
171    physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
172
173    # Check the target physics depending on the delta time and the render mode
174    target_physics_fps = stage.GetTimeCodesPerSecond() if delta_time is None else 1 / delta_time
175    if use_path_tracing:
176        target_physics_fps *= motion_blur_subsamples
177
178    # Check if the physics FPS needs to be increased to match the delta time
179    original_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
180    if target_physics_fps > original_physics_fps:
181        print(f"[MotionBlur] Changing physics FPS from {original_physics_fps} to {target_physics_fps}")
182        physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
183
184    # Start the timeline for physics updates in the step function
185    timeline = omni.timeline.get_timeline_interface()
186    timeline.play()
187
188    # Capture frames
189    for i in range(num_frames):
190        print(f"[MotionBlur] \tCapturing frame {i}")
191        rep.orchestrator.step(delta_time=delta_time)
192
193    # Restore the original physics FPS
194    if target_physics_fps > original_physics_fps:
195        print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {original_physics_fps}")
196        physx_scene.GetTimeStepsPerSecondAttr().Set(original_physics_fps)
197
198    # Switch back to the raytracing render mode
199    if use_path_tracing:
200        print("[MotionBlur] Restoring render mode to RealTimePathTracing")
201        settings.set("/rtx/rendermode", "RealTimePathTracing")
202
203    # Wait until the data is fully written
204    rep.orchestrator.wait_until_complete()
205
206    # Cleanup
207    writer.detach()
208    render_product.destroy()
209
210
211def run_motion_blur_examples(num_frames, delta_times, samples_per_pixel, motion_blur_subsamples):
212    print(
213        f"[MotionBlur] Running with delta_times={delta_times}, samples_per_pixel={samples_per_pixel}, motion_blur_subsamples={motion_blur_subsamples}"
214    )
215    for delta_time in delta_times:
216        # RayTracing examples
217        run_motion_blur_example(num_frames=num_frames, delta_time=delta_time, use_path_tracing=False)
218        # PathTracing examples
219        for motion_blur_subsample in motion_blur_subsamples:
220            for samples_per_pixel_value in samples_per_pixel:
221                run_motion_blur_example(
222                    num_frames=num_frames,
223                    delta_time=delta_time,
224                    use_path_tracing=True,
225                    motion_blur_subsamples=motion_blur_subsample,
226                    samples_per_pixel=samples_per_pixel_value,
227                )
228
229
230run_motion_blur_examples(
231    num_frames=NUM_FRAMES,
232    delta_times=delta_times,
233    samples_per_pixel=samples_per_pixel,
234    motion_blur_subsamples=motion_blur_subsamples,
235)
236
237simulation_app.close()

Subscribers and Events at Custom FPS#

Examples of subscribing to various events (such as stage, physics, and render/app), setting custom update rates, and adjusting various related settings. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/subscribers_and_events.py
Subscribers and Events at Custom FPS
  1import asyncio
  2import time
  3
  4import carb.eventdispatcher
  5import carb.settings
  6import omni.kit.app
  7import omni.physics.core
  8import omni.timeline
  9import omni.usd
 10from pxr import PhysxSchema, UsdPhysics
 11
 12# TIMELINE / STAGE
 13USE_CUSTOM_TIMELINE_SETTINGS = True
 14USE_FIXED_TIME_STEPPING = True
 15PLAY_EVERY_FRAME = True
 16PLAY_DELAY_COMPENSATION = 0.0
 17SUBSAMPLE_RATE = 1
 18STAGE_FPS = 30.0
 19
 20# PHYSX
 21USE_CUSTOM_PHYSX_FPS = False
 22PHYSX_FPS = 60.0
 23MIN_SIM_FPS = 30
 24
 25# Simulations can also be enabled/disabled at runtime
 26DISABLE_SIMULATIONS = False
 27
 28# APP / RENDER
 29LIMIT_APP_FPS = False
 30APP_FPS = 120
 31
 32# Number of app updates to run while collecting events
 33NUM_APP_UPDATES = 100
 34
 35# Print the captured events
 36VERBOSE = False
 37
 38
 39async def run_subscribers_and_events_async():
 40    def on_timeline_event(event: carb.eventdispatcher.Event):
 41        nonlocal timeline_events
 42        timeline_events.append(event)
 43        if VERBOSE:
 44            print(f"  [timeline][{len(timeline_events)}] {event}")
 45
 46    def on_physics_step(dt: float, context):
 47        nonlocal physics_events
 48        physics_events.append(dt)
 49        if VERBOSE:
 50            print(f"  [physics][{len(physics_events)}] dt={dt}")
 51
 52    def on_stage_render_event(event: carb.eventdispatcher.Event):
 53        nonlocal stage_render_events
 54        stage_render_events.append(event.event_name)
 55        if VERBOSE:
 56            print(f"  [stage render][{len(stage_render_events)}] {event.event_name}")
 57
 58    def on_app_update(event: carb.eventdispatcher.Event):
 59        nonlocal app_update_events
 60        app_update_events.append(event.event_name)
 61        if VERBOSE:
 62            print(f"  [app update][{len(app_update_events)}] {event.event_name}")
 63
 64    stage = omni.usd.get_context().get_stage()
 65    timeline = omni.timeline.get_timeline_interface()
 66
 67    if USE_CUSTOM_TIMELINE_SETTINGS:
 68        # Ideal to make simulation and animation synchronized.
 69        # Default: True in editor, False in standalone.
 70        # NOTE:
 71        # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time.
 72        # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs').
 73        # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate`
 74        carb.settings.get_settings().set("/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING)
 75
 76        # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback.
 77        # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect.
 78        # Default: 0.0
 79        # NOTE:
 80        # - only effective if `useFixedTimeStepping` is set to True
 81        # - setting a large value results in long fast playback after a huge lag spike
 82        carb.settings.get_settings().set("/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION)
 83
 84        # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`.
 85        # Default: False
 86        # NOTE:
 87        # - only effective if `useFixedTimeStepping` is set to True
 88        # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop
 89        # - useful for recording
 90        # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)`
 91        timeline.set_play_every_frame(PLAY_EVERY_FRAME)
 92
 93        # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame.
 94        # Default: 1
 95        # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)`
 96        timeline.set_ticks_per_frame(SUBSAMPLE_RATE)
 97
 98        # Time codes per second for the stage
 99        # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)`
100        timeline.set_time_codes_per_second(STAGE_FPS)
101
102    # Create a PhysX scene to set the physics time step
103    if USE_CUSTOM_PHYSX_FPS:
104        physx_scene = None
105        for prim in stage.Traverse():
106            if prim.IsA(UsdPhysics.Scene):
107                physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
108                break
109        if physx_scene is None:
110            UsdPhysics.Scene.Define(stage, "/PhysicsScene")
111            physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
112
113        # Time step for the physics simulation
114        # Default: 60.0
115        physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS)
116
117        # Minimum simulation frequency to prevent clamping; if the frame rate drops below this,
118        # physics steps are discarded to avoid app slowdown if the overall frame rate is too low.
119        # Default: 30.0
120        # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update.
121        carb.settings.get_settings().set("/persistent/simulation/minFrameRate", MIN_SIM_FPS)
122
123    # Throttle Render/UI/Main thread update rate
124    if LIMIT_APP_FPS:
125        # Enable rate limiting of the main run loop (UI, rendering, etc.)
126        # Default: False
127        carb.settings.get_settings().set("/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS)
128
129        # FPS limit of the main run loop (UI, rendering, etc.)
130        # Default: 120
131        # NOTE: disabled if `/app/player/useFixedTimeStepping` is False
132        carb.settings.get_settings().set("/app/runLoops/main/rateLimitFrequency", int(APP_FPS))
133
134    # Simulations can be selectively disabled (or toggled at specific times)
135    if DISABLE_SIMULATIONS:
136        carb.settings.get_settings().set("/app/player/playSimulations", False)
137
138    print("Configuration:")
139    print(f"  Timeline:")
140    print(f"    - Stage FPS: {STAGE_FPS}  (/app/stage/timeCodesPerSecond)")
141    print(f"    - Fixed time stepping: {USE_FIXED_TIME_STEPPING}  (/app/player/useFixedTimeStepping)")
142    print(f"    - Play every frame: {PLAY_EVERY_FRAME}  (/app/player/useFastMode)")
143    print(f"    - Subsample rate: {SUBSAMPLE_RATE}  (/app/player/timelineSubsampleRate)")
144    print(f"    - Play delay compensation: {PLAY_DELAY_COMPENSATION}s  (/app/player/CompensatePlayDelayInSecs)")
145    print(f"  Physics:")
146    print(f"    - PhysX FPS: {PHYSX_FPS}  (physxScene.timeStepsPerSecond)")
147    print(f"    - Min simulation FPS: {MIN_SIM_FPS}  (/persistent/simulation/minFrameRate)")
148    print(f"    - Simulations enabled: {not DISABLE_SIMULATIONS}  (/app/player/playSimulations)")
149    print(f"  Rendering:")
150    print(f"    - App FPS limit: {APP_FPS if LIMIT_APP_FPS else 'unlimited'}  (/app/runLoops/main/rateLimitFrequency)")
151
152    # Start the timeline
153    print(f"Starting the timeline...")
154    timeline.set_current_time(0)
155    timeline.set_end_time(10000)
156    timeline.set_looping(False)
157    timeline.play()
158    timeline.commit()
159    wall_start_time = time.time()
160
161    # Subscribe to events
162    print(f"Subscribing to events...")
163    timeline_events = []
164    timeline_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
165        event_name=omni.timeline.GLOBAL_EVENT_CURRENT_TIME_TICKED,
166        on_event=on_timeline_event,
167        observer_name="test_sdg_useful_snippets_timeline_based.on_timeline_event",
168    )
169    physics_events = []
170    physics_sub = omni.physics.core.get_physics_simulation_interface().subscribe_physics_on_step_events(
171        pre_step=False, order=0, on_update=on_physics_step
172    )
173    stage_render_events = []
174    stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
175        event_name=omni.usd.get_context().stage_rendering_event_name(omni.usd.StageRenderingEventType.NEW_FRAME, True),
176        on_event=on_stage_render_event,
177        observer_name="subscribers_and_events.on_stage_render_event",
178    )
179    app_update_events = []
180    app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
181        event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
182        on_event=on_app_update,
183        observer_name="subscribers_and_events.on_app_update",
184    )
185
186    # Run app updates and cache events
187    print(f"Starting running the application for {NUM_APP_UPDATES} updates...")
188    for i in range(NUM_APP_UPDATES):
189        if VERBOSE:
190            print(f"[app update loop][{i+1}/{NUM_APP_UPDATES}]")
191        await omni.kit.app.get_app().next_update_async()
192    elapsed_wall_time = time.time() - wall_start_time
193    print(f"Finished running the application for {NUM_APP_UPDATES} updates...")
194
195    # Stop timeline and unsubscribe from all events
196    print(f"Stopping timeline and unsubscribing from all events...")
197    timeline.stop()
198    if app_sub:
199        app_sub.reset()
200        app_sub = None
201    if stage_render_sub:
202        stage_render_sub.reset()
203        stage_render_sub = None
204    if physics_sub:
205        physics_sub.unsubscribe()
206        physics_sub = None
207    if timeline_sub:
208        timeline_sub.reset()
209        timeline_sub = None
210
211    # Print summary statistics
212    print("\nStats:")
213    print(f"- App updates: {NUM_APP_UPDATES}")
214    print(f"- Wall time: {elapsed_wall_time:.4f} seconds")
215    print(f"- Timeline events: {len(timeline_events)}")
216    print(f"- Physics events: {len(physics_events)}")
217    print(f"- Stage render events: {len(stage_render_events)}")
218    print(f"- App update events: {len(app_update_events)}")
219
220    # Calculate and display real-time performance factor
221    if len(physics_events) > 0:
222        sim_time = sum(physics_events)
223        realtime_factor = sim_time / elapsed_wall_time if elapsed_wall_time > 0 else 0
224        print(f"- Simulation time: {sim_time:.4f}s")
225        print(f"- Real-time factor: {realtime_factor:.2f}x")
226
227
228asyncio.ensure_future(run_subscribers_and_events_async())
Subscribers and Events at Custom FPS
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import time
  6
  7import carb.eventdispatcher
  8import carb.settings
  9import omni.kit.app
 10import omni.physics.core
 11import omni.timeline
 12import omni.usd
 13from pxr import PhysxSchema, UsdPhysics
 14
 15# TIMELINE / STAGE
 16USE_CUSTOM_TIMELINE_SETTINGS = True
 17USE_FIXED_TIME_STEPPING = True
 18PLAY_EVERY_FRAME = True
 19PLAY_DELAY_COMPENSATION = 0.0
 20SUBSAMPLE_RATE = 1
 21STAGE_FPS = 30.0
 22
 23# PHYSX
 24USE_CUSTOM_PHYSX_FPS = False
 25PHYSX_FPS = 60.0
 26MIN_SIM_FPS = 30
 27
 28# Simulations can also be enabled/disabled at runtime
 29DISABLE_SIMULATIONS = False
 30
 31# APP / RENDER
 32LIMIT_APP_FPS = False
 33APP_FPS = 120
 34
 35# Number of app updates to run while collecting events
 36NUM_APP_UPDATES = 100
 37
 38# Print the captured events
 39VERBOSE = False
 40
 41
 42def on_timeline_event(event: carb.eventdispatcher.Event):
 43    global timeline_events
 44    timeline_events.append(event)
 45    if VERBOSE:
 46        print(f"  [timeline][{len(timeline_events)}] {event}")
 47
 48
 49def on_physics_step(dt, context):
 50    global physics_events
 51    physics_events.append(dt)
 52    if VERBOSE:
 53        print(f"  [physics][{len(physics_events)}] dt={dt}")
 54
 55
 56def on_stage_render_event(event: carb.eventdispatcher.Event):
 57    global stage_render_events
 58    stage_render_events.append(event.event_name)
 59    if VERBOSE:
 60        print(f"  [stage render][{len(stage_render_events)}] {event.event_name}")
 61
 62
 63def on_app_update(event: carb.eventdispatcher.Event):
 64    global app_update_events
 65    app_update_events.append(event.event_name)
 66    if VERBOSE:
 67        print(f"  [app update][{len(app_update_events)}] {event.event_name}")
 68
 69
 70stage = omni.usd.get_context().get_stage()
 71timeline = omni.timeline.get_timeline_interface()
 72
 73
 74if USE_CUSTOM_TIMELINE_SETTINGS:
 75    # Ideal to make simulation and animation synchronized.
 76    # Default: True in editor, False in standalone.
 77    # NOTE:
 78    # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time.
 79    # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs').
 80    # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate`
 81    carb.settings.get_settings().set("/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING)
 82
 83    # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback.
 84    # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect.
 85    # Default: 0.0
 86    # NOTE:
 87    # - only effective if `useFixedTimeStepping` is set to True
 88    # - setting a large value results in long fast playback after a huge lag spike
 89    carb.settings.get_settings().set("/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION)
 90
 91    # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`.
 92    # Default: False
 93    # NOTE:
 94    # - only effective if `useFixedTimeStepping` is set to True
 95    # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop
 96    # - useful for recording
 97    # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)`
 98    timeline.set_play_every_frame(PLAY_EVERY_FRAME)
 99
100    # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame.
101    # Default: 1
102    # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)`
103    timeline.set_ticks_per_frame(SUBSAMPLE_RATE)
104
105    # Time codes per second for the stage
106    # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)`
107    timeline.set_time_codes_per_second(STAGE_FPS)
108
109
110# Create a PhysX scene to set the physics time step
111if USE_CUSTOM_PHYSX_FPS:
112    physx_scene = None
113    for prim in stage.Traverse():
114        if prim.IsA(UsdPhysics.Scene):
115            physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
116            break
117    if physx_scene is None:
118        UsdPhysics.Scene.Define(stage, "/PhysicsScene")
119        physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
120
121    # Time step for the physics simulation
122    # Default: 60.0
123    physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS)
124
125    # Minimum simulation frequency to prevent clamping; if the frame rate drops below this,
126    # physics steps are discarded to avoid app slowdown if the overall frame rate is too low.
127    # Default: 30.0
128    # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update.
129    carb.settings.get_settings().set("/persistent/simulation/minFrameRate", MIN_SIM_FPS)
130
131
132# Throttle Render/UI/Main thread update rate
133if LIMIT_APP_FPS:
134    # Enable rate limiting of the main run loop (UI, rendering, etc.)
135    # Default: False
136    carb.settings.get_settings().set("/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS)
137
138    # FPS limit of the main run loop (UI, rendering, etc.)
139    # Default: 120
140    # NOTE: disabled if `/app/player/useFixedTimeStepping` is False
141    carb.settings.get_settings().set("/app/runLoops/main/rateLimitFrequency", int(APP_FPS))
142
143
144# Simulations can be selectively disabled (or toggled at specific times)
145if DISABLE_SIMULATIONS:
146    carb.settings.get_settings().set("/app/player/playSimulations", False)
147
148print("Configuration:")
149print(f"  Timeline:")
150print(f"    - Stage FPS: {STAGE_FPS}  (/app/stage/timeCodesPerSecond)")
151print(f"    - Fixed time stepping: {USE_FIXED_TIME_STEPPING}  (/app/player/useFixedTimeStepping)")
152print(f"    - Play every frame: {PLAY_EVERY_FRAME}  (/app/player/useFastMode)")
153print(f"    - Subsample rate: {SUBSAMPLE_RATE}  (/app/player/timelineSubsampleRate)")
154print(f"    - Play delay compensation: {PLAY_DELAY_COMPENSATION}s  (/app/player/CompensatePlayDelayInSecs)")
155print(f"  Physics:")
156print(f"    - PhysX FPS: {PHYSX_FPS}  (physxScene.timeStepsPerSecond)")
157print(f"    - Min simulation FPS: {MIN_SIM_FPS}  (/persistent/simulation/minFrameRate)")
158print(f"    - Simulations enabled: {not DISABLE_SIMULATIONS}  (/app/player/playSimulations)")
159print(f"  Rendering:")
160print(f"    - App FPS limit: {APP_FPS if LIMIT_APP_FPS else 'unlimited'}  (/app/runLoops/main/rateLimitFrequency)")
161
162
163# Start the timeline
164print(f"Starting the timeline...")
165timeline.set_current_time(0)
166timeline.set_end_time(10000)
167timeline.set_looping(False)
168timeline.play()
169timeline.commit()
170wall_start_time = time.time()
171
172# Subscribe to events
173print(f"Subscribing to events...")
174timeline_events = []
175timeline_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
176    event_name=omni.timeline.GLOBAL_EVENT_CURRENT_TIME_TICKED,
177    on_event=on_timeline_event,
178    observer_name="subscribers_and_events.on_timeline_event",
179)
180physics_events = []
181physics_sub = omni.physics.core.get_physics_simulation_interface().subscribe_physics_on_step_events(
182    pre_step=False, order=0, on_update=on_physics_step
183)
184stage_render_events = []
185stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
186    event_name=omni.usd.get_context().stage_rendering_event_name(omni.usd.StageRenderingEventType.NEW_FRAME, True),
187    on_event=on_stage_render_event,
188    observer_name="subscribers_and_events.on_stage_render_event",
189)
190app_update_events = []
191app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
192    event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
193    on_event=on_app_update,
194    observer_name="subscribers_and_events.on_app_update",
195)
196
197# Run app updates and cache events
198print(f"Starting running the application for {NUM_APP_UPDATES} updates.")
199for i in range(NUM_APP_UPDATES):
200    if VERBOSE:
201        print(f"[app update loop][{i+1}/{NUM_APP_UPDATES}]")
202    simulation_app.update()
203elapsed_wall_time = time.time() - wall_start_time
204print(f"Finished running the application for {NUM_APP_UPDATES} updates...")
205
206# Stop timeline and unsubscribe from all events
207timeline.stop()
208if app_sub:
209    app_sub.reset()
210    app_sub = None
211if stage_render_sub:
212    stage_render_sub.reset()
213    stage_render_sub = None
214if physics_sub:
215    physics_sub.unsubscribe()
216    physics_sub = None
217if timeline_sub:
218    timeline_sub.reset()
219    timeline_sub = None
220
221
222# Print summary statistics
223print("\nStats:")
224print(f"- App updates: {NUM_APP_UPDATES}")
225print(f"- Wall time: {elapsed_wall_time:.4f} seconds")
226print(f"- Timeline events: {len(timeline_events)}")
227print(f"- Physics events: {len(physics_events)}")
228print(f"- Stage render events: {len(stage_render_events)}")
229print(f"- App update events: {len(app_update_events)}")
230
231# Calculate and display real-time performance factor
232if len(physics_events) > 0:
233    sim_time = sum(physics_events)
234    realtime_factor = sim_time / elapsed_wall_time if elapsed_wall_time > 0 else 0
235    print(f"- Simulation time: {sim_time:.4f}s")
236    print(f"- Real-time factor: {realtime_factor:.2f}x")
237
238simulation_app.close()

Accessing Writer and Annotator Data at Custom FPS#

Example of how to trigger a writer and access annotator data at a custom FPS, with product rendering disabled when the data is not needed. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_fps_writer_annotator.py

Note

It is currently not possible to change timeline (stage) FPS after the replicator graph creation as it causes a graph reset. This issue is being addressed. As a workaround make sure you are setting the timeline (stage) parameters before creating the replicator graph.

Accessing Writer and Annotator Data at Custom FPS
  1import asyncio
  2import os
  3
  4import carb.settings
  5import omni.kit.app
  6import omni.replicator.core as rep
  7import omni.timeline
  8import omni.usd
  9
 10# Configuration
 11NUM_CAPTURES = 6
 12VERBOSE = True
 13
 14# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate
 15STAGE_FPS = 100.0
 16SENSOR_FPS = 10.0
 17SENSOR_DT = 1.0 / SENSOR_FPS
 18
 19async def run_custom_fps_example_async(duration_seconds):
 20    # Create a new stage
 21    await omni.usd.get_context().new_stage_async()
 22
 23    # Disable capture on play to capture data manually using step
 24    rep.orchestrator.set_capture_on_play(False)
 25
 26    # Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 27    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
 28
 29    # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time)
 30    carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True)
 31
 32    # Create scene with a semantically annotated cube with physics
 33    rep.functional.create.xform(name="World")
 34    rep.functional.create.dome_light(intensity=250, parent="/World", name="DomeLight")
 35    cube = rep.functional.create.cube(
 36        position=(0, 0, 2), parent="/World", name="Cube", semantics={"class": "cube"}
 37    )
 38    rep.functional.physics.apply_collider(cube)
 39    rep.functional.physics.apply_rigid_body(cube)
 40
 41    # Create render product (disabled until data capture is needed)
 42    cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
 43    rp = rep.create.render_product(cam, resolution=(512, 512), name="rp")
 44    rp.hydra_texture.set_updates_enabled(False)
 45
 46    # Create the backend for the writer
 47    out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb")
 48    print(f"Writer data will be written to: {out_dir_rgb}")
 49    backend = rep.backends.get("DiskBackend")
 50    backend.initialize(output_dir=out_dir_rgb)
 51
 52    # Create a writer and an annotator as examples of different ways of accessing data
 53    writer_rgb = rep.WriterRegistry.get("BasicWriter")
 54    writer_rgb.initialize(backend=backend, rgb=True)
 55    writer_rgb.attach(rp)
 56
 57    # Create an annotator to access the data directly
 58    annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
 59    annot_depth.attach(rp)
 60
 61    # Run the simulation for the given number of frames and access the data at the desired framerates
 62    print(
 63        f"Starting simulation: {duration_seconds:.2f}s duration, {SENSOR_FPS:.0f} FPS sensor, {STAGE_FPS:.0f} FPS timeline"
 64    )
 65
 66    # Set the timeline parameters
 67    timeline = omni.timeline.get_timeline_interface()
 68    timeline.set_looping(False)
 69    timeline.set_current_time(0.0)
 70    timeline.set_end_time(10)
 71    timeline.set_time_codes_per_second(STAGE_FPS)
 72    timeline.play()
 73    timeline.commit()
 74
 75    # Run the simulation for the given number of frames and access the data at the desired framerates
 76    frame_count = 0
 77    previous_time = timeline.get_current_time()
 78    elapsed_time = 0.0
 79    iteration = 0
 80
 81    while timeline.get_current_time() < duration_seconds:
 82        current_time = timeline.get_current_time()
 83        delta_time = current_time - previous_time
 84        elapsed_time += delta_time
 85
 86        # Simulation progress
 87        if VERBOSE:
 88            print(f"Step {iteration}: timeline time={current_time:.3f}s, elapsed time={elapsed_time:.3f}s")
 89
 90        # Trigger sensor at desired framerate (use small epsilon for floating point comparison)
 91        if elapsed_time >= SENSOR_DT - 1e-9:
 92            elapsed_time -= SENSOR_DT  # Reset with remainder to maintain accuracy
 93
 94            rp.hydra_texture.set_updates_enabled(True)
 95            await rep.orchestrator.step_async(delta_time=0.0, pause_timeline=False, rt_subframes=16)
 96            annot_data = annot_depth.get_data()
 97
 98            print(
 99                f"\n  >> Capturing frame {frame_count} at time={current_time:.3f}s | shape={annot_data.shape}\n"
100            )
101            frame_count += 1
102
103            rp.hydra_texture.set_updates_enabled(False)
104
105        previous_time = current_time
106        # Advance the app (timeline) by one frame
107        await omni.kit.app.get_app().next_update_async()
108        iteration += 1
109
110    # Wait for writer to finish
111    await rep.orchestrator.wait_until_complete_async()
112
113    # Cleanup
114    timeline.pause()
115    writer_rgb.detach()
116    annot_depth.detach()
117    rp.destroy()
118
119# Run example with duration for all captures plus a buffer of 5 frames
120duration = (NUM_CAPTURES * SENSOR_DT) + (5.0 / STAGE_FPS)
121asyncio.ensure_future(run_custom_fps_example_async(duration_seconds=duration))
Accessing Writer and Annotator Data at Custom FPS
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import os
  6
  7import carb.settings
  8import omni.kit.app
  9import omni.replicator.core as rep
 10import omni.timeline
 11import omni.usd
 12
 13# Configuration
 14NUM_CAPTURES = 6
 15VERBOSE = True
 16
 17# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate
 18STAGE_FPS = 100.0
 19SENSOR_FPS = 10.0
 20SENSOR_DT = 1.0 / SENSOR_FPS
 21
 22
 23def run_custom_fps_example(duration_seconds):
 24    # Create a new stage
 25    omni.usd.get_context().new_stage()
 26
 27    # Disable capture on play to capture data manually using step
 28    rep.orchestrator.set_capture_on_play(False)
 29
 30    # Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
 31    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
 32
 33    # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time)
 34    carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True)
 35
 36    # Create scene with a semantically annotated cube with physics
 37    rep.functional.create.xform(name="World")
 38    rep.functional.create.dome_light(intensity=250, parent="/World", name="DomeLight")
 39    cube = rep.functional.create.cube(position=(0, 0, 2), parent="/World", name="Cube", semantics={"class": "cube"})
 40    rep.functional.physics.apply_collider(cube)
 41    rep.functional.physics.apply_rigid_body(cube)
 42
 43    # Create render product (disabled until data capture is needed)
 44    cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
 45    rp = rep.create.render_product(cam, resolution=(512, 512), name="rp")
 46    rp.hydra_texture.set_updates_enabled(False)
 47
 48    # Create the backend for the writer
 49    out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb")
 50    print(f"Writer data will be written to: {out_dir_rgb}")
 51    backend = rep.backends.get("DiskBackend")
 52    backend.initialize(output_dir=out_dir_rgb)
 53
 54    # Create a writer and an annotator as examples of different ways of accessing data
 55    writer_rgb = rep.WriterRegistry.get("BasicWriter")
 56    writer_rgb.initialize(backend=backend, rgb=True)
 57    writer_rgb.attach(rp)
 58
 59    # Create an annotator to access the data directly
 60    annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
 61    annot_depth.attach(rp)
 62
 63    # Run the simulation for the given number of frames and access the data at the desired framerates
 64    print(
 65        f"Starting simulation: {duration_seconds:.2f}s duration, {SENSOR_FPS:.0f} FPS sensor, {STAGE_FPS:.0f} FPS timeline"
 66    )
 67
 68    # Set the timeline parameters
 69    timeline = omni.timeline.get_timeline_interface()
 70    timeline.set_looping(False)
 71    timeline.set_current_time(0.0)
 72    timeline.set_end_time(10)
 73    timeline.set_time_codes_per_second(STAGE_FPS)
 74    timeline.play()
 75    timeline.commit()
 76
 77    # Run the simulation for the given number of frames and access the data at the desired framerates
 78    frame_count = 0
 79    previous_time = timeline.get_current_time()
 80    elapsed_time = 0.0
 81    iteration = 0
 82
 83    while timeline.get_current_time() < duration_seconds:
 84        current_time = timeline.get_current_time()
 85        delta_time = current_time - previous_time
 86        elapsed_time += delta_time
 87
 88        # Simulation progress
 89        if VERBOSE:
 90            print(f"Step {iteration}: timeline time={current_time:.3f}s, elapsed time={elapsed_time:.3f}s")
 91
 92        # Trigger sensor at desired framerate (use small epsilon for floating point comparison)
 93        if elapsed_time >= SENSOR_DT - 1e-9:
 94            elapsed_time -= SENSOR_DT  # Reset with remainder to maintain accuracy
 95
 96            rp.hydra_texture.set_updates_enabled(True)
 97            rep.orchestrator.step(delta_time=0.0, pause_timeline=False, rt_subframes=16)
 98            annot_data = annot_depth.get_data()
 99
100            print(f"\n  >> Capturing frame {frame_count} at time={current_time:.3f}s | shape={annot_data.shape}\n")
101            frame_count += 1
102
103            rp.hydra_texture.set_updates_enabled(False)
104
105        previous_time = current_time
106        # Advance the app (timeline) by one frame
107        simulation_app.update()
108        iteration += 1
109
110    # Wait for writer to finish
111    rep.orchestrator.wait_until_complete()
112
113    # Cleanup
114    timeline.pause()
115    writer_rgb.detach()
116    annot_depth.detach()
117    rp.destroy()
118
119
120# Run example with duration for all captures plus a buffer of 5 frames
121duration = (NUM_CAPTURES * SENSOR_DT) + (5.0 / STAGE_FPS)
122run_custom_fps_example(duration_seconds=duration)
123
124simulation_app.close()

Cosmos Writer Example#

This example demonstrates the CosmosWriter for capturing multi-modal synthetic data compatible with NVIDIA Cosmos world foundation models. It creates a simple falling box scene and captures synchronized RGB, segmentation, depth, and edge data (images and videos) that can be used with Cosmos Transfer to generate photorealistic variations.

For a more detailed tutorial please see Cosmos Synthetic Data Generation.

The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/cosmos_writer_simple.py
Cosmos Writer Example
 1import asyncio
 2import os
 3
 4import carb.settings
 5import omni.replicator.core as rep
 6import omni.timeline
 7import omni.usd
 8
 9SEGMENTATION_MAPPING = {
10    "plane": [0, 0, 255, 255],
11    "cube": [255, 0, 0, 255],
12    "sphere": [0, 255, 0, 255],
13}
14NUM_FRAMES = 60
15
16async def run_cosmos_example_async(num_frames, segmentation_mapping=None):
17    # Create a new stage
18    omni.usd.get_context().new_stage()
19
20    # CosmosWriter requires script nodes to be enabled
21    carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
22
23    # Disable capture on play, data is captured manually using the step function
24    rep.orchestrator.set_capture_on_play(False)
25
26    # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
27    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
28
29    # Set the stage properties
30    rep.settings.set_stage_up_axis("Z")
31    rep.settings.set_stage_meters_per_unit(1.0)
32    rep.functional.create.dome_light(intensity=500)
33
34    # Create the scenario with a ground plane and a falling sphere and cube.
35    plane = rep.functional.create.plane(position=(0, 0, 0), scale=(10, 10, 1), semantics={"class": "plane"})
36    rep.functional.physics.apply_collider(plane)
37
38    sphere = rep.functional.create.sphere(position=(0, 0, 3), semantics={"class": "sphere"})
39    rep.functional.physics.apply_collider(sphere)
40    rep.functional.physics.apply_rigid_body(sphere)
41
42    cube = rep.functional.create.cube(position=(1, 1, 2), scale=0.5, semantics={"class": "cube"})
43    rep.functional.physics.apply_collider(cube)
44    rep.functional.physics.apply_rigid_body(cube)
45
46    # Set up the writer
47    camera = rep.functional.create.camera(position=(5, 5, 3), look_at=(0, 0, 0))
48    rp = rep.create.render_product(camera, (1280, 720))
49    out_dir = os.path.join(os.getcwd(), "_out_cosmos_simple")
50    print(f"Output directory: {out_dir}")
51    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
52    cosmos_writer.initialize(output_dir=out_dir, segmentation_mapping=segmentation_mapping)
53    cosmos_writer.attach(rp)
54
55    # Start the simulation
56    timeline = omni.timeline.get_timeline_interface()
57    timeline.play()
58
59    # Capture a frame every app update
60    for i in range(num_frames):
61        print(f"Frame {i+1}/{num_frames}")
62        await omni.kit.app.get_app().next_update_async()
63        await rep.orchestrator.step_async(delta_time=0.0, pause_timeline=False)
64    timeline.pause()
65
66    # Wait for all data to be written
67    await rep.orchestrator.wait_until_complete_async()
68    print("Data generation complete!")
69    cosmos_writer.detach()
70    rp.destroy()
71
72asyncio.ensure_future(run_cosmos_example_async(num_frames=NUM_FRAMES, segmentation_mapping=SEGMENTATION_MAPPING))
Cosmos Writer Example
 1from isaacsim import SimulationApp
 2
 3simulation_app = SimulationApp(launch_config={"headless": False})
 4
 5import os
 6
 7import carb.settings
 8import omni.replicator.core as rep
 9import omni.timeline
10import omni.usd
11
12SEGMENTATION_MAPPING = {
13    "plane": [0, 0, 255, 255],
14    "cube": [255, 0, 0, 255],
15    "sphere": [0, 255, 0, 255],
16}
17NUM_FRAMES = 60
18
19
20def run_cosmos_example(num_frames, segmentation_mapping=None):
21    # Create a new stage
22    omni.usd.get_context().new_stage()
23
24    # CosmosWriter requires script nodes to be enabled
25    carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
26
27    # Disable capture on play, data is captured manually using the step function
28    rep.orchestrator.set_capture_on_play(False)
29
30    # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
31    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
32
33    # Set the stage properties
34    rep.settings.set_stage_up_axis("Z")
35    rep.settings.set_stage_meters_per_unit(1.0)
36    rep.functional.create.dome_light(intensity=500)
37
38    # Create the scenario with a ground plane and a falling sphere and cube.
39    plane = rep.functional.create.plane(position=(0, 0, 0), scale=(10, 10, 1), semantics={"class": "plane"})
40    rep.functional.physics.apply_collider(plane)
41
42    sphere = rep.functional.create.sphere(position=(0, 0, 3), semantics={"class": "sphere"})
43    rep.functional.physics.apply_collider(sphere)
44    rep.functional.physics.apply_rigid_body(sphere)
45
46    cube = rep.functional.create.cube(position=(1, 1, 2), scale=0.5, semantics={"class": "cube"})
47    rep.functional.physics.apply_collider(cube)
48    rep.functional.physics.apply_rigid_body(cube)
49
50    # Set up the writer
51    camera = rep.functional.create.camera(position=(5, 5, 3), look_at=(0, 0, 0))
52    rp = rep.create.render_product(camera, (1280, 720))
53    out_dir = os.path.join(os.getcwd(), "_out_cosmos_simple")
54    print(f"Output directory: {out_dir}")
55    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
56    cosmos_writer.initialize(output_dir=out_dir, segmentation_mapping=segmentation_mapping)
57    cosmos_writer.attach(rp)
58
59    # Start the simulation
60    timeline = omni.timeline.get_timeline_interface()
61    timeline.play()
62
63    # Capture a frame every app update
64    for i in range(num_frames):
65        print(f"Frame {i+1}/{num_frames}")
66        simulation_app.update()
67        rep.orchestrator.step(delta_time=0.0, pause_timeline=False)
68    timeline.pause()
69
70    # Wait for all data to be written
71    rep.orchestrator.wait_until_complete()
72    print("Data generation complete!")
73    cosmos_writer.detach()
74    rp.destroy()
75
76
77run_cosmos_example(num_frames=NUM_FRAMES, segmentation_mapping=SEGMENTATION_MAPPING)
78
79simulation_app.close()