Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cmake-build-*/
.pydevproject
.scannerwork/
.vscode/
.sisyphus/
**/.idea/*
!**/.idea/dictionaries
!**/.idea/dictionaries/*
Expand Down
38 changes: 24 additions & 14 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -2199,7 +2199,8 @@ static void rx_session_update_unordered(rx_session_t* const self,
rx_frame_t* const frame,
const udpard_deleter_t payload_deleter)
{
UDPARD_ASSERT(self->port->reordering_window < 0);
UDPARD_ASSERT(self->port->mode == udpard_rx_unordered);
UDPARD_ASSERT(self->port->reordering_window == 0);
// We do not check interned transfers because in the UNORDERED mode they are never interned, always ejected ASAP.
// We don't care about the ordering, either; we just accept anything that looks new.
if (!rx_session_is_transfer_ejected(self, frame->meta.transfer_id)) {
Expand Down Expand Up @@ -2353,30 +2354,38 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
bool udpard_rx_port_new(udpard_rx_port_t* const self,
const uint64_t topic_hash,
const size_t extent,
const udpard_rx_mode_t mode,
const udpard_us_t reordering_window,
const udpard_rx_mem_resources_t memory,
const udpard_rx_port_vtable_t* const vtable)
{
const bool win_ok = (reordering_window >= 0) || //
(reordering_window == UDPARD_RX_REORDERING_WINDOW_UNORDERED) ||
(reordering_window == UDPARD_RX_REORDERING_WINDOW_STATELESS);
const bool ok = (self != NULL) && rx_validate_mem_resources(memory) && win_ok && (vtable != NULL) &&
(vtable->on_message != NULL) && (vtable->on_collision != NULL);
bool ok = (self != NULL) && rx_validate_mem_resources(memory) && (reordering_window >= 0) && (vtable != NULL) &&
(vtable->on_message != NULL) && (vtable->on_collision != NULL);
if (ok) {
mem_zero(sizeof(*self), self);
self->topic_hash = topic_hash;
self->extent = extent;
self->reordering_window = reordering_window;
self->mode = mode;
self->memory = memory;
self->index_session_by_remote_uid = NULL;
self->vtable = vtable;
self->user = NULL;
if (reordering_window == UDPARD_RX_REORDERING_WINDOW_STATELESS) {
self->vtable_private = &rx_port_vtb_stateless;
} else if (reordering_window == UDPARD_RX_REORDERING_WINDOW_UNORDERED) {
self->vtable_private = &rx_port_vtb_unordered;
} else {
self->vtable_private = &rx_port_vtb_ordered;
switch (mode) {
case udpard_rx_stateless:
self->vtable_private = &rx_port_vtb_stateless;
self->reordering_window = 0;
break;
case udpard_rx_unordered:
self->vtable_private = &rx_port_vtb_unordered;
self->reordering_window = 0;
break;
case udpard_rx_ordered:
self->vtable_private = &rx_port_vtb_ordered;
self->reordering_window = reordering_window;
UDPARD_ASSERT(self->reordering_window >= 0);
break;
default:
ok = false;
}
}
return ok;
Expand Down Expand Up @@ -2452,7 +2461,8 @@ bool udpard_rx_port_new_p2p(udpard_rx_port_p2p_t* const self,
return udpard_rx_port_new((udpard_rx_port_t*)self, //
local_uid,
extent + UDPARD_P2P_HEADER_BYTES,
UDPARD_RX_REORDERING_WINDOW_UNORDERED,
udpard_rx_unordered,
0,
memory,
&proxy);
}
Expand Down
49 changes: 27 additions & 22 deletions libudpard/udpard.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,15 @@ void udpard_tx_free(udpard_tx_t* const self);
///
/// The transfer reassembly state machine can operate in several modes described below. First, a brief summary:
///
/// Mode Guarantees Limitations Reordering window setting
/// -----------------------------------------−--------------------------------------------------------------------------
/// ORDERED Strictly increasing transfer-ID May delay transfers, CPU heavier Non-negative number of microseconds
/// UNORDERED Unique transfer-ID Ordering not guaranteed UDPARD_RX_REORDERING_WINDOW_UNORDERED
/// STATELESS Constant time, constant memory 1-frame only, dups, no responses UDPARD_RX_REORDERING_WINDOW_STATELESS
/// Mode Guarantees Limitations Reordering window
/// -----------------------------------------−------------------------------------------------------------------
/// ORDERED Strictly increasing transfer-ID May delay transfers, CPU heavier Non-negative microseconds
/// UNORDERED Unique transfer-ID Ordering not guaranteed Ignored
/// STATELESS Constant time, constant memory 1-frame only, dups, no responses Ignored
///
/// If not sure, choose UNORDERED. The ORDERED mode is a good fit for ordering-sensitive use cases like state estimators
/// and control loops, but it is not suitable for P2P. The STATELESS mode is chiefly intended for the heartbeat topic.
/// If not sure, choose unordered. The ordered mode is a good fit for ordering-sensitive use cases like state
/// estimators and control loops, but it is not suitable for P2P.
/// The stateless mode is chiefly intended for the heartbeat topic.
///
/// ORDERED
///
Expand All @@ -656,9 +657,9 @@ void udpard_tx_free(udpard_tx_t* const self);
///
/// This mode requires much more bookkeeping which results in a greater processing load per received fragment/transfer.
///
/// The ORDERED mode is used if the reordering window is non-negative. Zero is not really a special case, it
/// simply means that out-of-order transfers are not waited for at all (declared permanently lost immediately),
/// and no received transfer is delayed before ejection to the application.
/// Zero is not really a special case for the reordering window; it simply means that out-of-order transfers
/// are not waited for at all (declared permanently lost immediately), and no received transfer is delayed
/// before ejection to the application.
///
/// The ORDERED mode is mostly intended for applications like state estimators, control systems, and data streaming
/// where ordering is critical.
Expand All @@ -676,8 +677,7 @@ void udpard_tx_free(udpard_tx_t* const self);
/// respect to Y. This would cause the ORDERED mode to delay or drop the response to X, which is undesirable;
/// therefore, the UNORDERED mode is preferred for request-response topics.
///
/// The UNORDERED mode is used if the reordering window duration is set to UDPARD_RX_REORDERING_WINDOW_UNORDERED.
/// This should be the default mode for most use cases.
/// The unordered mode should be the default mode for most use cases.
///
/// STATELESS
///
Expand All @@ -689,11 +689,6 @@ void udpard_tx_free(udpard_tx_t* const self);
/// The stateless mode allocates only a fragment header per accepted frame and does not contain any
/// variable-complexity processing logic, enabling great scalability for topics with a very large number of
/// publishers where unordered and duplicated messages are acceptable, such as the heartbeat topic.
///
/// The STATELESS mode is used if the reordering window duration is set to UDPARD_RX_REORDERING_WINDOW_STATELESS.

#define UDPARD_RX_REORDERING_WINDOW_UNORDERED ((udpard_us_t)(-1))
#define UDPARD_RX_REORDERING_WINDOW_STATELESS ((udpard_us_t)(-2))

/// The application will have a single RX instance to manage all subscriptions and P2P ports.
typedef struct udpard_rx_t
Expand Down Expand Up @@ -736,6 +731,14 @@ typedef struct udpard_rx_port_p2p_t udpard_rx_port_p2p_t;
typedef struct udpard_rx_transfer_t udpard_rx_transfer_t;
typedef struct udpard_rx_transfer_p2p_t udpard_rx_transfer_p2p_t;

/// RX port mode for transfer reassembly behavior.
typedef enum udpard_rx_mode_t
{
udpard_rx_unordered = 0,
udpard_rx_ordered = 1,
udpard_rx_stateless = 2,
} udpard_rx_mode_t;

/// Provided by the application per port instance to specify the callbacks to be invoked on certain events.
/// This design allows distinct callbacks per port, which is especially useful for the P2P port.
typedef struct udpard_rx_port_vtable_t
Expand All @@ -758,9 +761,9 @@ struct udpard_rx_port_t
/// For P2P ports, UDPARD_P2P_HEADER_BYTES must be included in this value (the library takes care of this).
size_t extent;

/// See UDPARD_RX_REORDERING_WINDOW_... above.
/// Behavior undefined if the reassembly mode is switched on a live port with ongoing transfers.
udpard_us_t reordering_window;
/// Behavior undefined if the reassembly mode or the reordering window are switched on a live port.
udpard_rx_mode_t mode;
udpard_us_t reordering_window;

udpard_rx_mem_resources_t memory;

Expand Down Expand Up @@ -894,8 +897,9 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now);
/// The topic hash is needed to detect and ignore transfers that use different topics on the same subject-ID.
/// The collision callback is invoked if a topic hash collision is detected.
///
/// If not sure which reassembly mode to choose, consider UDPARD_RX_REORDERING_WINDOW_UNORDERED as the default choice.
/// For ordering-sensitive use cases, such as state estimators and control loops, use ORDERED with a short window.
/// If not sure which reassembly mode to choose, consider `udpard_rx_unordered` as the default choice.
/// For ordering-sensitive use cases, such as state estimators and control loops, use `udpard_rx_ordered` with a short
/// window.
///
/// The pointed-to vtable instance must outlive the port instance.
///
Expand All @@ -904,6 +908,7 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now);
bool udpard_rx_port_new(udpard_rx_port_t* const self,
const uint64_t topic_hash, // For P2P ports, this is the local node's UID.
const size_t extent,
const udpard_rx_mode_t mode,
const udpard_us_t reordering_window,
const udpard_rx_mem_resources_t memory,
const udpard_rx_port_vtable_t* const vtable);
Expand Down
6 changes: 2 additions & 4 deletions tests/src/test_e2e_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ void test_reliable_delivery_under_losses()
udpard_rx_new(&sub_rx, &sub_tx);
udpard_rx_port_t sub_port{};
const uint64_t topic_hash = 0x0123456789ABCDEFULL;
TEST_ASSERT_TRUE(
udpard_rx_port_new(&sub_port, topic_hash, 6000, UDPARD_RX_REORDERING_WINDOW_UNORDERED, sub_rx_mem, &callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(&sub_port, topic_hash, 6000, udpard_rx_unordered, 0, sub_rx_mem, &callbacks));

// Endpoints.
const std::array<udpard_udpip_ep_t, UDPARD_IFACE_COUNT_MAX> publisher_sources{
Expand Down Expand Up @@ -402,8 +401,7 @@ void test_reliable_stats_and_failures()
ctx.expected.assign({ 1U, 2U, 3U, 4U });
udpard_rx_new(&rx, nullptr);
rx.user = &ctx;
TEST_ASSERT_TRUE(
udpard_rx_port_new(&port, 0x12340000ULL, 64, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 0x12340000ULL, 64, udpard_rx_unordered, 0, rx_mem, &callbacks));

const udpard_bytes_scattered_t src_payload = make_scattered(ctx.expected.data(), ctx.expected.size());
FeedbackState fb_ignore{};
Expand Down
27 changes: 12 additions & 15 deletions tests/src/test_e2e_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ struct Fixture
Fixture(Fixture&&) = delete;
Fixture& operator=(Fixture&&) = delete;

explicit Fixture(const udpard_us_t reordering_window)
explicit Fixture(const udpard_rx_mode_t mode, const udpard_us_t reordering_window)
{
instrumented_allocator_new(&tx_alloc_transfer);
instrumented_allocator_new(&tx_alloc_payload);
Expand All @@ -138,7 +138,7 @@ struct Fixture
ctx.expected_uid = tx.local_uid;
ctx.source = source;
rx.user = &ctx;
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 1024, reordering_window, rx_mem, &callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 1024, mode, reordering_window, rx_mem, &callbacks));
}

~Fixture()
Expand Down Expand Up @@ -219,7 +219,7 @@ void on_message_p2p(udpard_rx_t* const rx, udpard_rx_port_p2p_t* const port, con
/// UNORDERED mode should drop duplicates while keeping arrival order.
void test_udpard_rx_unordered_duplicates()
{
Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED };
Fixture fix{ udpard_rx_unordered, 0 };
udpard_us_t now = 0;

constexpr std::array<uint64_t, 6> ids{ 100, 20000, 10100, 5000, 20000, 100 };
Expand All @@ -241,7 +241,7 @@ void test_udpard_rx_unordered_duplicates()
/// ORDERED mode waits for the window, then rejects late arrivals.
void test_udpard_rx_ordered_out_of_order()
{
Fixture fix{ 50 };
Fixture fix{ udpard_rx_ordered, 50 };
udpard_us_t now = 0;

// First batch builds the ordered baseline.
Expand Down Expand Up @@ -282,7 +282,7 @@ void test_udpard_rx_ordered_out_of_order()
/// ORDERED mode after head advance should reject late IDs arriving after window expiry.
void test_udpard_rx_ordered_head_advanced_late()
{
Fixture fix{ 50 };
Fixture fix{ udpard_rx_ordered, 50 };
udpard_us_t now = 0;

fix.push_single(now, 100);
Expand Down Expand Up @@ -317,7 +317,7 @@ void test_udpard_rx_ordered_head_advanced_late()
/// ORDERED mode rejects transfer-IDs far behind the recent history window.
void test_udpard_rx_ordered_reject_far_past()
{
Fixture fix{ 50 };
Fixture fix{ udpard_rx_ordered, 50 };
udpard_us_t now = 0;

fix.push_single(now, 200000);
Expand Down Expand Up @@ -660,8 +660,7 @@ void test_udpard_tx_minimum_mtu()
ctx.source = { .ip = 0x0A000001U, .port = 7501U };
udpard_rx_new(&rx, nullptr);
rx.user = &ctx;
TEST_ASSERT_TRUE(
udpard_rx_port_new(&port, topic_hash, 4096, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 4096, udpard_rx_unordered, 0, rx_mem, &callbacks));

// Send a payload that will require fragmentation at minimum MTU
std::array<uint8_t, 1000> payload{};
Expand Down Expand Up @@ -717,7 +716,7 @@ void test_udpard_tx_minimum_mtu()
/// Test with transfer-ID at uint64 boundary values (0, large values)
void test_udpard_transfer_id_boundaries()
{
Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED };
Fixture fix{ udpard_rx_unordered, 0 };

// Test transfer-ID = 0 (first valid value)
fix.push_single(0, 0);
Expand Down Expand Up @@ -771,8 +770,7 @@ void test_udpard_rx_zero_extent()
udpard_rx_new(&rx, nullptr);

// Create port with zero extent
TEST_ASSERT_TRUE(
udpard_rx_port_new(&port, topic_hash, 0, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 0, udpard_rx_unordered, 0, rx_mem, &callbacks));

// Track received transfers
struct ZeroExtentContext
Expand Down Expand Up @@ -857,7 +855,7 @@ void test_udpard_rx_zero_extent()
/// Test empty payload transfer (zero-size payload)
void test_udpard_empty_payload()
{
Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED };
Fixture fix{ udpard_rx_unordered, 0 };

// Send an empty payload
fix.frames.clear();
Expand Down Expand Up @@ -893,7 +891,7 @@ void test_udpard_empty_payload()
/// Test priority levels from exceptional (0) to optional (7)
void test_udpard_all_priority_levels()
{
Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED };
Fixture fix{ udpard_rx_unordered, 0 };
udpard_us_t now = 0;

constexpr uint16_t iface_bitmap_1 = (1U << 0U);
Expand Down Expand Up @@ -967,8 +965,7 @@ void test_udpard_topic_hash_collision()
ctx.source = { .ip = 0x0A000003U, .port = 7503U };
udpard_rx_new(&rx, nullptr);
rx.user = &ctx;
TEST_ASSERT_TRUE(
udpard_rx_port_new(&port, rx_topic_hash, 1024, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, rx_topic_hash, 1024, udpard_rx_unordered, 0, rx_mem, &callbacks));

// Send with mismatched topic hash
std::array<uint8_t, 8> payload{};
Expand Down
9 changes: 5 additions & 4 deletions tests/src/test_e2e_random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,18 @@ void test_udpard_tx_rx_end_to_end()
udpard_rx_new(&ack_rx, &tx);

// Test parameters.
constexpr std::array<uint64_t, 3> topic_hashes{ 0x123456789ABCDEF0ULL,
constexpr std::array<uint64_t, 3> topic_hashes{ 0x123456789ABCDEF0ULL,
0x0FEDCBA987654321ULL,
0x00ACE00ACE00ACEULL };
constexpr std::array<udpard_us_t, 3> reorder_windows{ 2000, UDPARD_RX_REORDERING_WINDOW_UNORDERED, 5000 };
constexpr std::array<size_t, 3> extents{ 1000, 5000, SIZE_MAX };
constexpr std::array<udpard_rx_mode_t, 3> modes{ udpard_rx_ordered, udpard_rx_unordered, udpard_rx_ordered };
constexpr std::array<udpard_us_t, 3> windows{ 2000, 0, 5000 };
constexpr std::array<size_t, 3> extents{ 1000, 5000, SIZE_MAX };

// Configure ports with varied extents and reordering windows to cover truncation and different RX modes.
std::array<udpard_rx_port_t, 3> ports{};
for (size_t i = 0; i < ports.size(); i++) {
TEST_ASSERT_TRUE(
udpard_rx_port_new(&ports[i], topic_hashes[i], extents[i], reorder_windows[i], rx_mem, &callbacks));
udpard_rx_port_new(&ports[i], topic_hashes[i], extents[i], modes[i], windows[i], rx_mem, &callbacks));
}

// Setup the context.
Expand Down
4 changes: 2 additions & 2 deletions tests/src/test_e2e_reliable_ordered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ void test_reliable_ordered_with_loss_and_reordering()
receiver_rx.user = &receiver_ctx;

udpard_rx_port_t receiver_topic_port{};
TEST_ASSERT_TRUE(
udpard_rx_port_new(&receiver_topic_port, topic_hash, 4096, reordering_window, receiver_rx_mem, &topic_callbacks));
TEST_ASSERT_TRUE(udpard_rx_port_new(
&receiver_topic_port, topic_hash, 4096, udpard_rx_ordered, reordering_window, receiver_rx_mem, &topic_callbacks));

// Payloads
const std::array<uint8_t, 4> payload_a{ 0xAA, 0xAA, 0xAA, 0xAA };
Expand Down
8 changes: 4 additions & 4 deletions tests/src/test_e2e_responses.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ void test_topic_with_p2p_response()

// B's topic subscription port
udpard_rx_port_t b_topic_port{};
TEST_ASSERT_TRUE(udpard_rx_port_new(
&b_topic_port, topic_hash, 4096, UDPARD_RX_REORDERING_WINDOW_UNORDERED, b_rx_mem, &topic_callbacks));
TEST_ASSERT_TRUE(
udpard_rx_port_new(&b_topic_port, topic_hash, 4096, udpard_rx_unordered, 0, b_rx_mem, &topic_callbacks));

// B's P2P port for receiving response ACKs
udpard_rx_port_p2p_t b_p2p_port{};
Expand Down Expand Up @@ -570,8 +570,8 @@ void test_topic_with_p2p_response_under_loss()
b_rx.user = &b_node_ctx;

udpard_rx_port_t b_topic_port{};
TEST_ASSERT_TRUE(udpard_rx_port_new(
&b_topic_port, topic_hash, 4096, UDPARD_RX_REORDERING_WINDOW_UNORDERED, b_rx_mem, &topic_callbacks));
TEST_ASSERT_TRUE(
udpard_rx_port_new(&b_topic_port, topic_hash, 4096, udpard_rx_unordered, 0, b_rx_mem, &topic_callbacks));

udpard_rx_port_p2p_t b_p2p_port{};
TEST_ASSERT_TRUE(
Expand Down
3 changes: 1 addition & 2 deletions tests/src/test_integration_sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ struct RxFixture
udpard_rx_port_t make_subject_port(const uint64_t topic_hash, const size_t extent, RxFixture& rx)
{
udpard_rx_port_t port{};
TEST_ASSERT_TRUE(
udpard_rx_port_new(&port, topic_hash, extent, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx.mem, &rx_port_vtable));
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, extent, udpard_rx_unordered, 0, rx.mem, &rx_port_vtable));
return port;
}

Expand Down
Loading