UCXListener#
Fully qualified name: isaacsim::ucx::core::UCXListener
-
class UCXListener#
Server-side listener for incoming UCX connections.
Manages a listening socket on a specified port and handles incoming connection requests from clients. Provides endpoint creation for accepted connections. The listener operates asynchronously and can wait for client connections with configurable timeouts.
Note
The listener automatically accepts incoming client connections as endpoints.
Public Functions
Constructor for UCX listener.
Creates a new UCX listener that will bind to the specified port and wait for incoming client connections. The listener uses the provided context to create workers and manage connections.
- Parameters:
context – [in] UCX context for creating workers and managing connections
port – [in] Port number to listen on for incoming connections
- Throws:
std::runtime_error – If listener creation or port binding fails
-
~UCXListener()#
Destructor - ensures proper cleanup of listener resources.
Automatically shuts down the listener and cleans up all resources. Any active connections will be properly closed.
-
bool waitForConnection(int timeoutMs = -1)#
Wait for a connection with optional timeout.
Blocks until a connection is established or the timeout expires. When a connection is established, a new UCXConnection object is created and stored internally.
- Parameters:
timeoutMs – [in] Timeout in milliseconds (-1 for infinite timeout)
- Returns:
true if a connection was established within the timeout, false if timeout expired
-
bool isConnected() const#
Check if the listener currently has an active client connection.
Determines whether an endpoint representing a client connection exists and is currently active. This can be used to verify if a client is connected to the server.
- Returns:
true if a client connection exists and is active; false otherwise.
-
inline uint16_t getPort() const#
Get the port number the listener is bound to.
Returns the actual port number that the listener is using. This may differ from the requested port if port 0 was specified (automatic port selection).
- Returns:
Port number the listener is bound to
-
inline std::shared_ptr<ucxx::Worker> getWorker() const#
Get the UCX worker used by this listener.
Returns the worker that manages the listener’s operations and progress thread. This can be used to create client endpoints on the same worker for testing or for applications that need to share a worker between listener and client.
- Returns:
Shared pointer to the UCX worker
-
void startProgressThread()#
Start the progress thread for the worker.
Starts the background progress thread that handles UCX communication operations. This should be called after all endpoints are created to follow the official UCXX pattern and avoid race conditions during endpoint creation.
Note
This function is idempotent - calling it multiple times has no effect if the progress thread is already running.
-
void stopProgressThread()#
Stop the progress thread for the worker.
Stops the background progress thread that handles UCX communication operations. This should be called before destroying the listener to ensure proper cleanup.
-
void shutdown()#
Shutdown the listener and close all connections.
Stops the listener from accepting new connections and closes any existing client connections. This method should be called before destroying the listener to ensure proper cleanup.
Note
After shutdown, the listener cannot be restarted
- UcxSendResult tagSend(
- const void *buffer,
- size_t length,
- uint64_t tag,
- std::string &errorMessage,
- std::optional<uint32_t> timeout = std::nullopt,
Send data using tagged communication with optional timeout.
Sends message data using UCX tagged send.
If timeout is not specified (std::nullopt), returns immediately without waiting (async).
If timeout is g_kUcxInfiniteTimeout (UINT32_MAX), waits indefinitely until completion or failure.
Otherwise, waits up to the specified timeout in milliseconds.
- Parameters:
buffer – [in] Memory address of the data buffer to send
length – [in] Size of the data buffer in bytes
tag – [in] UCX tag for message identification
errorMessage – [out] String for storing error messages
timeout – [in] Optional timeout in milliseconds (nullopt = async, g_kUcxInfiniteTimeout = infinite wait)
- Returns:
UcxSendResult indicating the outcome (eSuccess, eEmptyMessage, eNullRequest, eTimedOut, eFailed, or eException)
- const void *buffer,
- size_t length,
- uint64_t tag,
- std::string &errorMessage,
- std::shared_ptr<ucxx::Request> &outRequest,
Send data using tagged communication and return request handle for async completion tracking.
Sends message data using UCX tagged send without waiting (async). Returns the request handle so caller can check completion before reusing buffer. Intended for large message sends where buffer lifetime management is critical.
- Parameters:
buffer – [in] Memory address of the data buffer to send
length – [in] Size of the data buffer in bytes
tag – [in] UCX tag for message identification
errorMessage – [out] String for storing error messages
outRequest – [out] Reference to receive the request handle
- Returns:
UcxSendResult indicating the outcome (eSuccess if initiated, error otherwise)
- UcxSendResult tagMultiSend(
- const std::vector<const void*> &buffer,
- const std::vector<size_t> &size,
- const std::vector<int> &isCuda,
- const uint64_t tag,
- std::string &errorMessage,
- std::optional<uint32_t> timeout = std::nullopt,
Send multiple buffers using tagged communication with optional timeout.
Sends multiple buffers using UCX tagged multi-send.
If timeout is not specified (std::nullopt), returns immediately without waiting (async).
If timeout is g_kUcxInfiniteTimeout (UINT32_MAX), waits indefinitely until completion or failure.
Otherwise, waits up to the specified timeout in milliseconds.
- Parameters:
buffer – [in] List of memory addresses pointing to data buffers
size – [in] List of buffer sizes in bytes
isCuda – [in] List indicating if each buffer is CUDA memory
tag – [in] UCX tag for message identification
errorMessage – [out] String for storing error messages
timeout – [in] Optional timeout in milliseconds (nullopt = async, g_kUcxInfiniteTimeout = infinite wait)
- Returns:
UcxSendResult indicating the outcome
- UcxReceiveResult tagReceive(
- void *buffer,
- size_t length,
- uint64_t tag,
- uint64_t mask,
- std::string &errorMessage,
- uint32_t timeout = g_kUcxInfiniteTimeout,
Receive data using tagged communication with optional timeout.
Receives message data using UCX tagged receive and waits for completion. Defaults to infinite wait if timeout is not specified.
- Parameters:
buffer – [in] Memory address where received data will be stored
length – [in] Size of the receive buffer in bytes
tag – [in] UCX tag for message identification
mask – [in] Tag mask for selective message matching
errorMessage – [out] String for storing error messages
timeout – [in] Timeout in milliseconds (g_kUcxInfiniteTimeout = infinite wait)
- Returns:
UcxReceiveResult indicating the outcome
- UcxReceiveResult tagMultiReceive(
- const uint64_t tag,
- const uint64_t tagMask,
- std::string &errorMessage,
- uint32_t timeout = g_kUcxInfiniteTimeout,
Receive multiple buffers using tagged communication with optional timeout.
Receives multiple buffers using UCX tagged multi-receive and waits for completion. Defaults to infinite wait if timeout is not specified.
- Parameters:
tag – [in] UCX tag for message identification
tagMask – [in] Tag mask for selective message matching
errorMessage – [out] String for storing error messages
timeout – [in] Timeout in milliseconds (g_kUcxInfiniteTimeout = infinite wait)
- Returns:
UcxReceiveResult indicating the outcome