UCXListener#

Fully qualified name: isaacsim::ucx::core::UCXListener

class UCXListener#

Server-side listener for incoming UCX connections.

Manages a listening socket on a specified port and handles incoming connection requests from clients. Provides endpoint creation for accepted connections. The listener operates asynchronously and can wait for client connections with configurable timeouts.

Note

The listener automatically accepts incoming client connections as endpoints.

Public Functions

UCXListener(std::shared_ptr<ucxx::Context> context, uint16_t port)#

Constructor for UCX listener.

Creates a new UCX listener that will bind to the specified port and wait for incoming client connections. The listener uses the provided context to create workers and manage connections.

Parameters:
  • context[in] UCX context for creating workers and managing connections

  • port[in] Port number to listen on for incoming connections

Throws:

std::runtime_error – If listener creation or port binding fails

~UCXListener()#

Destructor - ensures proper cleanup of listener resources.

Automatically shuts down the listener and cleans up all resources. Any active connections will be properly closed.

bool waitForConnection(int timeoutMs = -1)#

Wait for a connection with optional timeout.

Blocks until a connection is established or the timeout expires. When a connection is established, a new UCXConnection object is created and stored internally.

Parameters:

timeoutMs[in] Timeout in milliseconds (-1 for infinite timeout)

Returns:

true if a connection was established within the timeout, false if timeout expired

bool isConnected() const#

Check if the listener currently has an active client connection.

Determines whether an endpoint representing a client connection exists and is currently active. This can be used to verify if a client is connected to the server.

Returns:

true if a client connection exists and is active; false otherwise.

inline uint16_t getPort() const#

Get the port number the listener is bound to.

Returns the actual port number that the listener is using. This may differ from the requested port if port 0 was specified (automatic port selection).

Returns:

Port number the listener is bound to

inline std::shared_ptr<ucxx::Worker> getWorker() const#

Get the UCX worker used by this listener.

Returns the worker that manages the listener’s operations and progress thread. This can be used to create client endpoints on the same worker for testing or for applications that need to share a worker between listener and client.

Returns:

Shared pointer to the UCX worker

void startProgressThread()#

Start the progress thread for the worker.

Starts the background progress thread that handles UCX communication operations. This should be called after all endpoints are created to follow the official UCXX pattern and avoid race conditions during endpoint creation.

Note

This function is idempotent - calling it multiple times has no effect if the progress thread is already running.

void stopProgressThread()#

Stop the progress thread for the worker.

Stops the background progress thread that handles UCX communication operations. This should be called before destroying the listener to ensure proper cleanup.

void shutdown()#

Shutdown the listener and close all connections.

Stops the listener from accepting new connections and closes any existing client connections. This method should be called before destroying the listener to ensure proper cleanup.

Note

After shutdown, the listener cannot be restarted

UcxSendResult tagSend(
const void *buffer,
size_t length,
uint64_t tag,
std::string &errorMessage,
std::optional<uint32_t> timeout = std::nullopt,
)#

Send data using tagged communication with optional timeout.

Sends message data using UCX tagged send.

  • If timeout is not specified (std::nullopt), returns immediately without waiting (async).

  • If timeout is g_kUcxInfiniteTimeout (UINT32_MAX), waits indefinitely until completion or failure.

  • Otherwise, waits up to the specified timeout in milliseconds.

Parameters:
  • buffer[in] Memory address of the data buffer to send

  • length[in] Size of the data buffer in bytes

  • tag[in] UCX tag for message identification

  • errorMessage[out] String for storing error messages

  • timeout[in] Optional timeout in milliseconds (nullopt = async, g_kUcxInfiniteTimeout = infinite wait)

Returns:

UcxSendResult indicating the outcome (eSuccess, eEmptyMessage, eNullRequest, eTimedOut, eFailed, or eException)

UcxSendResult tagSendWithRequest(
const void *buffer,
size_t length,
uint64_t tag,
std::string &errorMessage,
std::shared_ptr<ucxx::Request> &outRequest,
)#

Send data using tagged communication and return request handle for async completion tracking.

Sends message data using UCX tagged send without waiting (async). Returns the request handle so caller can check completion before reusing buffer. Intended for large message sends where buffer lifetime management is critical.

Parameters:
  • buffer[in] Memory address of the data buffer to send

  • length[in] Size of the data buffer in bytes

  • tag[in] UCX tag for message identification

  • errorMessage[out] String for storing error messages

  • outRequest[out] Reference to receive the request handle

Returns:

UcxSendResult indicating the outcome (eSuccess if initiated, error otherwise)

UcxSendResult tagMultiSend(
const std::vector<const void*> &buffer,
const std::vector<size_t> &size,
const std::vector<int> &isCuda,
const uint64_t tag,
std::string &errorMessage,
std::optional<uint32_t> timeout = std::nullopt,
)#

Send multiple buffers using tagged communication with optional timeout.

Sends multiple buffers using UCX tagged multi-send.

  • If timeout is not specified (std::nullopt), returns immediately without waiting (async).

  • If timeout is g_kUcxInfiniteTimeout (UINT32_MAX), waits indefinitely until completion or failure.

  • Otherwise, waits up to the specified timeout in milliseconds.

Parameters:
  • buffer[in] List of memory addresses pointing to data buffers

  • size[in] List of buffer sizes in bytes

  • isCuda[in] List indicating if each buffer is CUDA memory

  • tag[in] UCX tag for message identification

  • errorMessage[out] String for storing error messages

  • timeout[in] Optional timeout in milliseconds (nullopt = async, g_kUcxInfiniteTimeout = infinite wait)

Returns:

UcxSendResult indicating the outcome

UcxReceiveResult tagReceive(
void *buffer,
size_t length,
uint64_t tag,
uint64_t mask,
std::string &errorMessage,
uint32_t timeout = g_kUcxInfiniteTimeout,
)#

Receive data using tagged communication with optional timeout.

Receives message data using UCX tagged receive and waits for completion. Defaults to infinite wait if timeout is not specified.

Parameters:
  • buffer[in] Memory address where received data will be stored

  • length[in] Size of the receive buffer in bytes

  • tag[in] UCX tag for message identification

  • mask[in] Tag mask for selective message matching

  • errorMessage[out] String for storing error messages

  • timeout[in] Timeout in milliseconds (g_kUcxInfiniteTimeout = infinite wait)

Returns:

UcxReceiveResult indicating the outcome

UcxReceiveResult tagMultiReceive(
const uint64_t tag,
const uint64_t tagMask,
std::string &errorMessage,
uint32_t timeout = g_kUcxInfiniteTimeout,
)#

Receive multiple buffers using tagged communication with optional timeout.

Receives multiple buffers using UCX tagged multi-receive and waits for completion. Defaults to infinite wait if timeout is not specified.

Parameters:
  • tag[in] UCX tag for message identification

  • tagMask[in] Tag mask for selective message matching

  • errorMessage[out] String for storing error messages

  • timeout[in] Timeout in milliseconds (g_kUcxInfiniteTimeout = infinite wait)

Returns:

UcxReceiveResult indicating the outcome