UCXPublishImageNodeBase#

template<typename DatabaseT>
class UCXPublishImageNodeBase : public isaacsim::ucx::nodes::UcxNode#

Templated base class for UCX image data publishing nodes.

This template provides common functionality for publishing camera image data over UCX. It handles CUDA stream management for GPU-based image data. Derived classes implement message generation logic via generateMessage().

Template Parameters:

DatabaseT – The OGN database type for the node

Public Functions

inline virtual void reset() override#

Reset the node state.

Cleans up CUDA resources and resets parent state.

Public Static Functions

static inline void initInstance(
NodeObj const &nodeObj,
GraphInstanceID instanceId,
)#

Initialize the node instance.

Default implementation - can be overridden by derived classes if needed.

Parameters:
  • nodeObj[in] The node object

  • instanceId[in] The instance ID

Protected Functions

inline bool computeImpl(DatabaseT &db, uint16_t port, uint64_t tag)#

Common compute logic for image publishing nodes.

Handles listener initialization, connection checking, input validation, and message publishing. Extracts metadata using extractMetadata() and delegates message generation to derived class via generateMessage().

Parameters:
  • db[in] Database accessor for node inputs/outputs

  • port[in] Port number for UCX listener

  • tag[in] UCX tag for message identification

Returns:

bool True if execution succeeded, false otherwise

inline virtual isaacsim::ucx::nodes::ImageMetadata extractMetadata(
DatabaseT &db,
)#

Extract image metadata from node inputs.

Extracts non-pixel metadata from the database inputs. Pixel data is handled separately by generateMessage() to allow flexibility for CPU or GPU sources.

Parameters:

db[in] Database accessor for node inputs

Returns:

ImageMetadata Extracted image metadata

virtual std::vector<uint8_t> generateMessage(
const isaacsim::ucx::nodes::ImageMetadata &metadata,
DatabaseT &db,
) = 0#

Generate message from image metadata and pixel data.

Pure virtual function that derived classes must implement to serialize image data. Has access to both metadata and db for flexible pixel handling (CPU copy, GPU buffer, GPU texture, or GPU-direct send).

Parameters:
  • metadata[in] Image metadata

  • db[in] Database accessor for pixel data access

Returns:

std::vector<uint8_t> Serialized message data

inline bool publishMessage(
DatabaseT &db,
std::vector<uint8_t> messageData,
uint64_t tag,
)#

Publishes an image message over UCX.

Sends the pre-generated message using UCX tagged send with async tracking. Images are large (>1MB typically) so we use async send with completion tracking to prevent race conditions between send and receive operations.

Parameters:
  • db[in] Database accessor for logging

  • messageData[in] Serialized message to send

  • tag[in] UCX tag for message identification

Returns:

bool True if publish succeeded, false otherwise

inline cudaStream_t getCudaStream()#

Get the CUDA stream for asynchronous operations.

Provides access to the managed CUDA stream for GPU operations.

Returns:

cudaStream_t The CUDA stream

inline int getCudaStreamDevice() const#

Get the current CUDA stream device index.

Returns the device index that the stream was created on.

Returns:

int Device index, or -1 if stream not created

inline bool isStreamNotCreated() const#

Check if CUDA stream needs to be created.

Returns true if no stream has been created yet.

Returns:

bool True if stream needs creation, false otherwise

inline void ensureCudaStream(int deviceIndex)#

Ensure CUDA stream is created for the specified device.

Creates or recreates the CUDA stream if necessary for the given device index.

Parameters:

deviceIndex[in] CUDA device index to use

inline bool initializeListener(uint16_t port)#

Gets or creates a listener for the specified port.

Returns an existing listener if one is already registered for the port, otherwise creates a new listener on the specified port.

The port is validated before attempting to create a listener. Invalid ports (0-1023) are rejected.

Parameters:

port[in] Port number for the listener (must be >= 1024)

Returns:

bool True if listener was successfully obtained or created, false otherwise.

inline bool isInitialized() const#

Checks if the node has a valid listener.

Verifies if the listener has been properly created and initialized.

Returns:

bool True if the listener exists, false otherwise.

template<typename DatabaseT>
inline bool ensureListenerReady(
DatabaseT &db,
uint16_t port,
)#

Ensures the listener is initialized and started.

Initializes the listener if not already done or if the port has changed, and starts the progress thread. Validates the port number before initialization.

Template Parameters:

DatabaseT – The database type for logging

Parameters:
  • db[in] Database accessor for logging

  • port[in] Port number for the listener (must be >= 1024)

Returns:

bool True if listener is ready, false on error

inline bool waitForConnection()#

Waits for a client connection.

Checks if a connection exists, and if not, performs a non-blocking wait. Returns true if connected and ready to communicate, false if not yet connected.

Returns:

bool True if connected, false if no connection available

template<typename DatabaseT>
inline bool publishMessage(
DatabaseT &db,
const std::vector<uint8_t> &messageData,
uint64_t tag,
uint32_t timeoutMs,
)#

Publishes a message over UCX with validation and timeout.

Validates that message data is not empty, then sends using UCX tagged send. Wrapper for UCXListener::tagSend() that adds logging for OmniGraph nodes.

Template Parameters:

DatabaseT – The database type for logging

Parameters:
  • db[in] Database accessor for logging

  • messageData[in] Serialized message data to send

  • tag[in] UCX tag for message identification

  • timeoutMs[in] Timeout in milliseconds for send request (0 = infinite)

Returns:

bool True if send completed successfully, false on error or timeout

Protected Attributes

cudaStream_t m_stream#

CUDA stream for asynchronous operations.

int m_streamDevice = -1#

Device index where stream was created.

bool m_streamNotCreated = true#

Flag indicating if stream needs creation.

std::vector<uint8_t> m_messageBuffer#

Persistent buffer to keep message alive during async send.

std::shared_ptr<ucxx::Request> m_sendRequest#

Request handle to track async send completion.

std::shared_ptr<isaacsim::ucx::core::UCXListener> m_listener = nullptr#

UCX listener instance.

Protected Static Functions

static inline bool isValidPort(uint16_t port)#

Validates port number is in acceptable range for OmniGraph nodes.

Ports 0-1023 are system reserved and should not be used by OmniGraph nodes. Port 0 (auto-assign) is also not allowed to ensure deterministic behavior.

Parameters:

port[in] Port number to validate

Returns:

bool True if port is valid (>= 1024), false otherwise

static inline bool isValidTag(uint64_t tag)#

Validates UCX tag value.

UCX uses the full 64-bit tag space. This function can be extended to restrict tag ranges if needed for application-specific protocols.

Parameters:

tag[in] Tag value to validate

Returns:

bool True if tag is valid, false otherwise