diff --git a/.gitignore b/.gitignore index f58561a..d42729f 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ cmake-build-*/ .pydevproject .scannerwork/ .vscode/ +.sisyphus/ **/.idea/* !**/.idea/dictionaries !**/.idea/dictionaries/* diff --git a/libudpard/udpard.c b/libudpard/udpard.c index 86175c2..62f7c6a 100644 --- a/libudpard/udpard.c +++ b/libudpard/udpard.c @@ -2199,7 +2199,8 @@ static void rx_session_update_unordered(rx_session_t* const self, rx_frame_t* const frame, const udpard_deleter_t payload_deleter) { - UDPARD_ASSERT(self->port->reordering_window < 0); + UDPARD_ASSERT(self->port->mode == udpard_rx_unordered); + UDPARD_ASSERT(self->port->reordering_window == 0); // We do not check interned transfers because in the UNORDERED mode they are never interned, always ejected ASAP. // We don't care about the ordering, either; we just accept anything that looks new. if (!rx_session_is_transfer_ejected(self, frame->meta.transfer_id)) { @@ -2353,30 +2354,38 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now) bool udpard_rx_port_new(udpard_rx_port_t* const self, const uint64_t topic_hash, const size_t extent, + const udpard_rx_mode_t mode, const udpard_us_t reordering_window, const udpard_rx_mem_resources_t memory, const udpard_rx_port_vtable_t* const vtable) { - const bool win_ok = (reordering_window >= 0) || // - (reordering_window == UDPARD_RX_REORDERING_WINDOW_UNORDERED) || - (reordering_window == UDPARD_RX_REORDERING_WINDOW_STATELESS); - const bool ok = (self != NULL) && rx_validate_mem_resources(memory) && win_ok && (vtable != NULL) && - (vtable->on_message != NULL) && (vtable->on_collision != NULL); + bool ok = (self != NULL) && rx_validate_mem_resources(memory) && (reordering_window >= 0) && (vtable != NULL) && + (vtable->on_message != NULL) && (vtable->on_collision != NULL); if (ok) { mem_zero(sizeof(*self), self); self->topic_hash = topic_hash; self->extent = extent; - self->reordering_window = reordering_window; + self->mode = mode; self->memory = memory; self->index_session_by_remote_uid = NULL; self->vtable = vtable; self->user = NULL; - if (reordering_window == UDPARD_RX_REORDERING_WINDOW_STATELESS) { - self->vtable_private = &rx_port_vtb_stateless; - } else if (reordering_window == UDPARD_RX_REORDERING_WINDOW_UNORDERED) { - self->vtable_private = &rx_port_vtb_unordered; - } else { - self->vtable_private = &rx_port_vtb_ordered; + switch (mode) { + case udpard_rx_stateless: + self->vtable_private = &rx_port_vtb_stateless; + self->reordering_window = 0; + break; + case udpard_rx_unordered: + self->vtable_private = &rx_port_vtb_unordered; + self->reordering_window = 0; + break; + case udpard_rx_ordered: + self->vtable_private = &rx_port_vtb_ordered; + self->reordering_window = reordering_window; + UDPARD_ASSERT(self->reordering_window >= 0); + break; + default: + ok = false; } } return ok; @@ -2452,7 +2461,8 @@ bool udpard_rx_port_new_p2p(udpard_rx_port_p2p_t* const self, return udpard_rx_port_new((udpard_rx_port_t*)self, // local_uid, extent + UDPARD_P2P_HEADER_BYTES, - UDPARD_RX_REORDERING_WINDOW_UNORDERED, + udpard_rx_unordered, + 0, memory, &proxy); } diff --git a/libudpard/udpard.h b/libudpard/udpard.h index 3acdef9..8d405e3 100644 --- a/libudpard/udpard.h +++ b/libudpard/udpard.h @@ -633,14 +633,15 @@ void udpard_tx_free(udpard_tx_t* const self); /// /// The transfer reassembly state machine can operate in several modes described below. First, a brief summary: /// -/// Mode Guarantees Limitations Reordering window setting -/// -----------------------------------------−-------------------------------------------------------------------------- -/// ORDERED Strictly increasing transfer-ID May delay transfers, CPU heavier Non-negative number of microseconds -/// UNORDERED Unique transfer-ID Ordering not guaranteed UDPARD_RX_REORDERING_WINDOW_UNORDERED -/// STATELESS Constant time, constant memory 1-frame only, dups, no responses UDPARD_RX_REORDERING_WINDOW_STATELESS +/// Mode Guarantees Limitations Reordering window +/// -----------------------------------------−------------------------------------------------------------------ +/// ORDERED Strictly increasing transfer-ID May delay transfers, CPU heavier Non-negative microseconds +/// UNORDERED Unique transfer-ID Ordering not guaranteed Ignored +/// STATELESS Constant time, constant memory 1-frame only, dups, no responses Ignored /// -/// If not sure, choose UNORDERED. The ORDERED mode is a good fit for ordering-sensitive use cases like state estimators -/// and control loops, but it is not suitable for P2P. The STATELESS mode is chiefly intended for the heartbeat topic. +/// If not sure, choose unordered. The ordered mode is a good fit for ordering-sensitive use cases like state +/// estimators and control loops, but it is not suitable for P2P. +/// The stateless mode is chiefly intended for the heartbeat topic. /// /// ORDERED /// @@ -656,9 +657,9 @@ void udpard_tx_free(udpard_tx_t* const self); /// /// This mode requires much more bookkeeping which results in a greater processing load per received fragment/transfer. /// -/// The ORDERED mode is used if the reordering window is non-negative. Zero is not really a special case, it -/// simply means that out-of-order transfers are not waited for at all (declared permanently lost immediately), -/// and no received transfer is delayed before ejection to the application. +/// Zero is not really a special case for the reordering window; it simply means that out-of-order transfers +/// are not waited for at all (declared permanently lost immediately), and no received transfer is delayed +/// before ejection to the application. /// /// The ORDERED mode is mostly intended for applications like state estimators, control systems, and data streaming /// where ordering is critical. @@ -676,8 +677,7 @@ void udpard_tx_free(udpard_tx_t* const self); /// respect to Y. This would cause the ORDERED mode to delay or drop the response to X, which is undesirable; /// therefore, the UNORDERED mode is preferred for request-response topics. /// -/// The UNORDERED mode is used if the reordering window duration is set to UDPARD_RX_REORDERING_WINDOW_UNORDERED. -/// This should be the default mode for most use cases. +/// The unordered mode should be the default mode for most use cases. /// /// STATELESS /// @@ -689,11 +689,6 @@ void udpard_tx_free(udpard_tx_t* const self); /// The stateless mode allocates only a fragment header per accepted frame and does not contain any /// variable-complexity processing logic, enabling great scalability for topics with a very large number of /// publishers where unordered and duplicated messages are acceptable, such as the heartbeat topic. -/// -/// The STATELESS mode is used if the reordering window duration is set to UDPARD_RX_REORDERING_WINDOW_STATELESS. - -#define UDPARD_RX_REORDERING_WINDOW_UNORDERED ((udpard_us_t)(-1)) -#define UDPARD_RX_REORDERING_WINDOW_STATELESS ((udpard_us_t)(-2)) /// The application will have a single RX instance to manage all subscriptions and P2P ports. typedef struct udpard_rx_t @@ -736,6 +731,14 @@ typedef struct udpard_rx_port_p2p_t udpard_rx_port_p2p_t; typedef struct udpard_rx_transfer_t udpard_rx_transfer_t; typedef struct udpard_rx_transfer_p2p_t udpard_rx_transfer_p2p_t; +/// RX port mode for transfer reassembly behavior. +typedef enum udpard_rx_mode_t +{ + udpard_rx_unordered = 0, + udpard_rx_ordered = 1, + udpard_rx_stateless = 2, +} udpard_rx_mode_t; + /// Provided by the application per port instance to specify the callbacks to be invoked on certain events. /// This design allows distinct callbacks per port, which is especially useful for the P2P port. typedef struct udpard_rx_port_vtable_t @@ -758,9 +761,9 @@ struct udpard_rx_port_t /// For P2P ports, UDPARD_P2P_HEADER_BYTES must be included in this value (the library takes care of this). size_t extent; - /// See UDPARD_RX_REORDERING_WINDOW_... above. - /// Behavior undefined if the reassembly mode is switched on a live port with ongoing transfers. - udpard_us_t reordering_window; + /// Behavior undefined if the reassembly mode or the reordering window are switched on a live port. + udpard_rx_mode_t mode; + udpard_us_t reordering_window; udpard_rx_mem_resources_t memory; @@ -894,8 +897,9 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now); /// The topic hash is needed to detect and ignore transfers that use different topics on the same subject-ID. /// The collision callback is invoked if a topic hash collision is detected. /// -/// If not sure which reassembly mode to choose, consider UDPARD_RX_REORDERING_WINDOW_UNORDERED as the default choice. -/// For ordering-sensitive use cases, such as state estimators and control loops, use ORDERED with a short window. +/// If not sure which reassembly mode to choose, consider `udpard_rx_unordered` as the default choice. +/// For ordering-sensitive use cases, such as state estimators and control loops, use `udpard_rx_ordered` with a short +/// window. /// /// The pointed-to vtable instance must outlive the port instance. /// @@ -904,6 +908,7 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now); bool udpard_rx_port_new(udpard_rx_port_t* const self, const uint64_t topic_hash, // For P2P ports, this is the local node's UID. const size_t extent, + const udpard_rx_mode_t mode, const udpard_us_t reordering_window, const udpard_rx_mem_resources_t memory, const udpard_rx_port_vtable_t* const vtable); diff --git a/tests/src/test_e2e_api.cpp b/tests/src/test_e2e_api.cpp index d2e01eb..c09f721 100644 --- a/tests/src/test_e2e_api.cpp +++ b/tests/src/test_e2e_api.cpp @@ -200,8 +200,7 @@ void test_reliable_delivery_under_losses() udpard_rx_new(&sub_rx, &sub_tx); udpard_rx_port_t sub_port{}; const uint64_t topic_hash = 0x0123456789ABCDEFULL; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&sub_port, topic_hash, 6000, UDPARD_RX_REORDERING_WINDOW_UNORDERED, sub_rx_mem, &callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&sub_port, topic_hash, 6000, udpard_rx_unordered, 0, sub_rx_mem, &callbacks)); // Endpoints. const std::array publisher_sources{ @@ -402,8 +401,7 @@ void test_reliable_stats_and_failures() ctx.expected.assign({ 1U, 2U, 3U, 4U }); udpard_rx_new(&rx, nullptr); rx.user = &ctx; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&port, 0x12340000ULL, 64, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 0x12340000ULL, 64, udpard_rx_unordered, 0, rx_mem, &callbacks)); const udpard_bytes_scattered_t src_payload = make_scattered(ctx.expected.data(), ctx.expected.size()); FeedbackState fb_ignore{}; diff --git a/tests/src/test_e2e_edge.cpp b/tests/src/test_e2e_edge.cpp index fc6ef21..15e5626 100644 --- a/tests/src/test_e2e_edge.cpp +++ b/tests/src/test_e2e_edge.cpp @@ -115,7 +115,7 @@ struct Fixture Fixture(Fixture&&) = delete; Fixture& operator=(Fixture&&) = delete; - explicit Fixture(const udpard_us_t reordering_window) + explicit Fixture(const udpard_rx_mode_t mode, const udpard_us_t reordering_window) { instrumented_allocator_new(&tx_alloc_transfer); instrumented_allocator_new(&tx_alloc_payload); @@ -138,7 +138,7 @@ struct Fixture ctx.expected_uid = tx.local_uid; ctx.source = source; rx.user = &ctx; - TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 1024, reordering_window, rx_mem, &callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 1024, mode, reordering_window, rx_mem, &callbacks)); } ~Fixture() @@ -219,7 +219,7 @@ void on_message_p2p(udpard_rx_t* const rx, udpard_rx_port_p2p_t* const port, con /// UNORDERED mode should drop duplicates while keeping arrival order. void test_udpard_rx_unordered_duplicates() { - Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED }; + Fixture fix{ udpard_rx_unordered, 0 }; udpard_us_t now = 0; constexpr std::array ids{ 100, 20000, 10100, 5000, 20000, 100 }; @@ -241,7 +241,7 @@ void test_udpard_rx_unordered_duplicates() /// ORDERED mode waits for the window, then rejects late arrivals. void test_udpard_rx_ordered_out_of_order() { - Fixture fix{ 50 }; + Fixture fix{ udpard_rx_ordered, 50 }; udpard_us_t now = 0; // First batch builds the ordered baseline. @@ -282,7 +282,7 @@ void test_udpard_rx_ordered_out_of_order() /// ORDERED mode after head advance should reject late IDs arriving after window expiry. void test_udpard_rx_ordered_head_advanced_late() { - Fixture fix{ 50 }; + Fixture fix{ udpard_rx_ordered, 50 }; udpard_us_t now = 0; fix.push_single(now, 100); @@ -317,7 +317,7 @@ void test_udpard_rx_ordered_head_advanced_late() /// ORDERED mode rejects transfer-IDs far behind the recent history window. void test_udpard_rx_ordered_reject_far_past() { - Fixture fix{ 50 }; + Fixture fix{ udpard_rx_ordered, 50 }; udpard_us_t now = 0; fix.push_single(now, 200000); @@ -660,8 +660,7 @@ void test_udpard_tx_minimum_mtu() ctx.source = { .ip = 0x0A000001U, .port = 7501U }; udpard_rx_new(&rx, nullptr); rx.user = &ctx; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&port, topic_hash, 4096, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 4096, udpard_rx_unordered, 0, rx_mem, &callbacks)); // Send a payload that will require fragmentation at minimum MTU std::array payload{}; @@ -717,7 +716,7 @@ void test_udpard_tx_minimum_mtu() /// Test with transfer-ID at uint64 boundary values (0, large values) void test_udpard_transfer_id_boundaries() { - Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED }; + Fixture fix{ udpard_rx_unordered, 0 }; // Test transfer-ID = 0 (first valid value) fix.push_single(0, 0); @@ -771,8 +770,7 @@ void test_udpard_rx_zero_extent() udpard_rx_new(&rx, nullptr); // Create port with zero extent - TEST_ASSERT_TRUE( - udpard_rx_port_new(&port, topic_hash, 0, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 0, udpard_rx_unordered, 0, rx_mem, &callbacks)); // Track received transfers struct ZeroExtentContext @@ -857,7 +855,7 @@ void test_udpard_rx_zero_extent() /// Test empty payload transfer (zero-size payload) void test_udpard_empty_payload() { - Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED }; + Fixture fix{ udpard_rx_unordered, 0 }; // Send an empty payload fix.frames.clear(); @@ -893,7 +891,7 @@ void test_udpard_empty_payload() /// Test priority levels from exceptional (0) to optional (7) void test_udpard_all_priority_levels() { - Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED }; + Fixture fix{ udpard_rx_unordered, 0 }; udpard_us_t now = 0; constexpr uint16_t iface_bitmap_1 = (1U << 0U); @@ -967,8 +965,7 @@ void test_udpard_topic_hash_collision() ctx.source = { .ip = 0x0A000003U, .port = 7503U }; udpard_rx_new(&rx, nullptr); rx.user = &ctx; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&port, rx_topic_hash, 1024, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, rx_topic_hash, 1024, udpard_rx_unordered, 0, rx_mem, &callbacks)); // Send with mismatched topic hash std::array payload{}; diff --git a/tests/src/test_e2e_random.cpp b/tests/src/test_e2e_random.cpp index 6410d43..52e8d8b 100644 --- a/tests/src/test_e2e_random.cpp +++ b/tests/src/test_e2e_random.cpp @@ -236,17 +236,18 @@ void test_udpard_tx_rx_end_to_end() udpard_rx_new(&ack_rx, &tx); // Test parameters. - constexpr std::array topic_hashes{ 0x123456789ABCDEF0ULL, + constexpr std::array topic_hashes{ 0x123456789ABCDEF0ULL, 0x0FEDCBA987654321ULL, 0x00ACE00ACE00ACEULL }; - constexpr std::array reorder_windows{ 2000, UDPARD_RX_REORDERING_WINDOW_UNORDERED, 5000 }; - constexpr std::array extents{ 1000, 5000, SIZE_MAX }; + constexpr std::array modes{ udpard_rx_ordered, udpard_rx_unordered, udpard_rx_ordered }; + constexpr std::array windows{ 2000, 0, 5000 }; + constexpr std::array extents{ 1000, 5000, SIZE_MAX }; // Configure ports with varied extents and reordering windows to cover truncation and different RX modes. std::array ports{}; for (size_t i = 0; i < ports.size(); i++) { TEST_ASSERT_TRUE( - udpard_rx_port_new(&ports[i], topic_hashes[i], extents[i], reorder_windows[i], rx_mem, &callbacks)); + udpard_rx_port_new(&ports[i], topic_hashes[i], extents[i], modes[i], windows[i], rx_mem, &callbacks)); } // Setup the context. diff --git a/tests/src/test_e2e_reliable_ordered.cpp b/tests/src/test_e2e_reliable_ordered.cpp index 1e770f7..68d7578 100644 --- a/tests/src/test_e2e_reliable_ordered.cpp +++ b/tests/src/test_e2e_reliable_ordered.cpp @@ -237,8 +237,8 @@ void test_reliable_ordered_with_loss_and_reordering() receiver_rx.user = &receiver_ctx; udpard_rx_port_t receiver_topic_port{}; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&receiver_topic_port, topic_hash, 4096, reordering_window, receiver_rx_mem, &topic_callbacks)); + TEST_ASSERT_TRUE(udpard_rx_port_new( + &receiver_topic_port, topic_hash, 4096, udpard_rx_ordered, reordering_window, receiver_rx_mem, &topic_callbacks)); // Payloads const std::array payload_a{ 0xAA, 0xAA, 0xAA, 0xAA }; diff --git a/tests/src/test_e2e_responses.cpp b/tests/src/test_e2e_responses.cpp index 6673498..b3bb66c 100644 --- a/tests/src/test_e2e_responses.cpp +++ b/tests/src/test_e2e_responses.cpp @@ -282,8 +282,8 @@ void test_topic_with_p2p_response() // B's topic subscription port udpard_rx_port_t b_topic_port{}; - TEST_ASSERT_TRUE(udpard_rx_port_new( - &b_topic_port, topic_hash, 4096, UDPARD_RX_REORDERING_WINDOW_UNORDERED, b_rx_mem, &topic_callbacks)); + TEST_ASSERT_TRUE( + udpard_rx_port_new(&b_topic_port, topic_hash, 4096, udpard_rx_unordered, 0, b_rx_mem, &topic_callbacks)); // B's P2P port for receiving response ACKs udpard_rx_port_p2p_t b_p2p_port{}; @@ -570,8 +570,8 @@ void test_topic_with_p2p_response_under_loss() b_rx.user = &b_node_ctx; udpard_rx_port_t b_topic_port{}; - TEST_ASSERT_TRUE(udpard_rx_port_new( - &b_topic_port, topic_hash, 4096, UDPARD_RX_REORDERING_WINDOW_UNORDERED, b_rx_mem, &topic_callbacks)); + TEST_ASSERT_TRUE( + udpard_rx_port_new(&b_topic_port, topic_hash, 4096, udpard_rx_unordered, 0, b_rx_mem, &topic_callbacks)); udpard_rx_port_p2p_t b_p2p_port{}; TEST_ASSERT_TRUE( diff --git a/tests/src/test_integration_sockets.cpp b/tests/src/test_integration_sockets.cpp index 3b4a8e0..d9255bb 100644 --- a/tests/src/test_integration_sockets.cpp +++ b/tests/src/test_integration_sockets.cpp @@ -230,8 +230,7 @@ struct RxFixture udpard_rx_port_t make_subject_port(const uint64_t topic_hash, const size_t extent, RxFixture& rx) { udpard_rx_port_t port{}; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&port, topic_hash, extent, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx.mem, &rx_port_vtable)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, extent, udpard_rx_unordered, 0, rx.mem, &rx_port_vtable)); return port; } diff --git a/tests/src/test_intrusive_guards.c b/tests/src/test_intrusive_guards.c index e56778f..4b04b1d 100644 --- a/tests/src/test_intrusive_guards.c +++ b/tests/src/test_intrusive_guards.c @@ -198,12 +198,14 @@ static void test_rx_guards(void) const udpard_rx_mem_resources_t rx_mem = { .session = make_mem(&rx_tag_a), .fragment = make_mem(&rx_tag_b) }; const udpard_rx_port_vtable_t rx_vtb = { .on_message = on_message_stub, .on_collision = on_collision_stub }; udpard_rx_port_t port; - TEST_ASSERT_FALSE(udpard_rx_port_new(NULL, 0, 0, 0, rx_mem, &rx_vtb)); + TEST_ASSERT_FALSE(udpard_rx_port_new(NULL, 0, 0, udpard_rx_ordered, 0, rx_mem, &rx_vtb)); udpard_rx_mem_resources_t bad_rx_mem = rx_mem; bad_rx_mem.session.vtable = NULL; - TEST_ASSERT_FALSE(udpard_rx_port_new(&port, 0, 0, UDPARD_RX_REORDERING_WINDOW_UNORDERED, bad_rx_mem, &rx_vtb)); - TEST_ASSERT_FALSE(udpard_rx_port_new(&port, 0, 0, (udpard_us_t)-3, rx_mem, &rx_vtb)); - TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 0xAA, 8U, UDPARD_RX_REORDERING_WINDOW_STATELESS, rx_mem, &rx_vtb)); + TEST_ASSERT_FALSE(udpard_rx_port_new(&port, 0, 0, udpard_rx_unordered, 0, bad_rx_mem, &rx_vtb)); + // NOLINTNEXTLINE(clang-analyzer-optin.core.EnumCastOutOfRange) + TEST_ASSERT_FALSE(udpard_rx_port_new(&port, 0, 0, (udpard_rx_mode_t)99, 0, rx_mem, &rx_vtb)); + TEST_ASSERT_FALSE(udpard_rx_port_new(&port, 0, 0, udpard_rx_ordered, (udpard_us_t)-1, rx_mem, &rx_vtb)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 0xAA, 8U, udpard_rx_stateless, 0, rx_mem, &rx_vtb)); // Invalid datagram inputs are rejected without processing. udpard_rx_t rx; diff --git a/tests/src/test_intrusive_rx.c b/tests/src/test_intrusive_rx.c index 8bb33a3..a9d0309 100644 --- a/tests/src/test_intrusive_rx.c +++ b/tests/src/test_intrusive_rx.c @@ -1758,12 +1758,13 @@ static void test_rx_ack_enqueued(void) callback_result_t cb_result = { 0 }; rx.user = &cb_result; - const uint64_t topic_hash = 0x4E81E200CB479D4CULL; - udpard_rx_port_t port; - const udpard_us_t window = UDPARD_RX_REORDERING_WINDOW_UNORDERED; - const uint64_t remote_uid = 0xA1B2C3D4E5F60718ULL; - const size_t extent = 1000; - TEST_ASSERT(udpard_rx_port_new(&port, topic_hash, extent, window, rx_mem, &callbacks)); + const uint64_t topic_hash = 0x4E81E200CB479D4CULL; + udpard_rx_port_t port; + const udpard_rx_mode_t mode = udpard_rx_unordered; + const udpard_us_t window = 0; + const uint64_t remote_uid = 0xA1B2C3D4E5F60718ULL; + const size_t extent = 1000; + TEST_ASSERT(udpard_rx_port_new(&port, topic_hash, extent, mode, window, rx_mem, &callbacks)); rx_session_factory_args_t fac_args = { .owner = &port, .sessions_by_animation = &rx.list_session_by_animation, @@ -1851,7 +1852,8 @@ static void test_rx_session_ordered(void) udpard_us_t now = 0; const uint64_t remote_uid = 0xA1B2C3D4E5F60718ULL; udpard_rx_port_t port; - TEST_ASSERT(udpard_rx_port_new(&port, 0x4E81E200CB479D4CULL, 1000, 20 * KILO, rx_mem, &callbacks)); + TEST_ASSERT( + udpard_rx_port_new(&port, 0x4E81E200CB479D4CULL, 1000, udpard_rx_ordered, 20 * KILO, rx_mem, &callbacks)); rx_session_factory_args_t fac_args = { .owner = &port, .sessions_by_animation = &rx.list_session_by_animation, @@ -1990,8 +1992,7 @@ static void test_rx_session_unordered(void) const uint64_t topic_hash = 0xC3C8E4974254E1F5ULL; udpard_rx_port_t port = { 0 }; - TEST_ASSERT( - udpard_rx_port_new(&port, topic_hash, SIZE_MAX, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT(udpard_rx_port_new(&port, topic_hash, SIZE_MAX, udpard_rx_unordered, 0, rx_mem, &callbacks)); udpard_us_t now = 0; const uint64_t remote_uid = 0xA1B2C3D4E5F60718ULL; @@ -2135,8 +2136,7 @@ static void test_rx_session_unordered_reject_old(void) const uint64_t local_uid = 0xFACEB00CFACEB00CULL; udpard_rx_port_t port = { 0 }; - TEST_ASSERT( - udpard_rx_port_new(&port, local_uid, SIZE_MAX, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT(udpard_rx_port_new(&port, local_uid, SIZE_MAX, udpard_rx_unordered, 0, rx_mem, &callbacks)); udpard_us_t now = 0; const uint64_t remote_uid = 0x0123456789ABCDEFULL; @@ -2238,8 +2238,7 @@ static void test_rx_session_unordered_duplicates(void) rx.user = &cb_result; udpard_rx_port_t port = { 0 }; - TEST_ASSERT(udpard_rx_port_new( - &port, 0xFEE1DEADBEEFF00DULL, SIZE_MAX, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT(udpard_rx_port_new(&port, 0xFEE1DEADBEEFF00DULL, SIZE_MAX, udpard_rx_unordered, 0, rx_mem, &callbacks)); udpard_us_t now = 0; const uint64_t remote_uid = 0xAABBCCDDEEFF0011ULL; @@ -2314,7 +2313,8 @@ static void test_rx_session_ordered_reject_stale_after_jump(void) rx.user = &cb_result; udpard_rx_port_t port = { 0 }; - TEST_ASSERT(udpard_rx_port_new(&port, 0x123456789ABCDEF0ULL, 1000, 20 * KILO, rx_mem, &callbacks)); + TEST_ASSERT( + udpard_rx_port_new(&port, 0x123456789ABCDEF0ULL, 1000, udpard_rx_ordered, 20 * KILO, rx_mem, &callbacks)); udpard_us_t now = 0; const uint64_t remote_uid = 0xCAFEBEEFFACEFEEDULL; @@ -2421,7 +2421,7 @@ static void test_rx_session_ordered_zero_reordering_window(void) rx.user = &cb_result; udpard_rx_port_t port = { 0 }; - TEST_ASSERT(udpard_rx_port_new(&port, 0x0F0E0D0C0B0A0908ULL, 256, 0, rx_mem, &callbacks)); + TEST_ASSERT(udpard_rx_port_new(&port, 0x0F0E0D0C0B0A0908ULL, 256, udpard_rx_ordered, 0, rx_mem, &callbacks)); udpard_us_t now = 0; const uint64_t remote_uid = 0x0102030405060708ULL; @@ -2673,7 +2673,8 @@ static void test_rx_port_timeouts(void) rx.user = &cb_result; udpard_rx_port_t port = { 0 }; - TEST_ASSERT(udpard_rx_port_new(&port, 0xBADC0FFEE0DDF00DULL, 128, 20 * KILO, rx_mem, &callbacks)); + TEST_ASSERT( + udpard_rx_port_new(&port, 0xBADC0FFEE0DDF00DULL, 128, udpard_rx_ordered, 20 * KILO, rx_mem, &callbacks)); meta_t meta = { .priority = udpard_prio_nominal, .flag_ack = false, @@ -2736,8 +2737,7 @@ static void test_rx_port_oom(void) rx.user = &cb_result; udpard_rx_port_t port = { 0 }; - TEST_ASSERT( - udpard_rx_port_new(&port, 0xCAFEBABECAFEBABEULL, 64, UDPARD_RX_REORDERING_WINDOW_UNORDERED, rx_mem, &callbacks)); + TEST_ASSERT(udpard_rx_port_new(&port, 0xCAFEBABECAFEBABEULL, 64, udpard_rx_unordered, 0, rx_mem, &callbacks)); meta_t meta = { .priority = udpard_prio_nominal, .flag_ack = false, @@ -2797,7 +2797,7 @@ static void test_rx_port_free_loop(void) TEST_ASSERT(udpard_rx_port_new_p2p(&port_p2p, 0xCAFED00DCAFED00DULL, SIZE_MAX, rx_mem, &callbacks_p2p)); udpard_rx_port_t port_extra = { 0 }; const uint64_t topic_hash_extra = 0xDEADBEEFF00D1234ULL; - TEST_ASSERT(udpard_rx_port_new(&port_extra, topic_hash_extra, 1000, 5000, rx_mem, &callbacks)); + TEST_ASSERT(udpard_rx_port_new(&port_extra, topic_hash_extra, 1000, udpard_rx_ordered, 5000, rx_mem, &callbacks)); udpard_us_t now = 0; @@ -2905,6 +2905,7 @@ static void test_rx_additional_coverage(void) udpard_rx_port_t port = { .memory = mem, .vtable = &(udpard_rx_port_vtable_t){ .on_message = stub_on_message, .on_collision = stub_on_collision }, + .mode = udpard_rx_ordered, .reordering_window = 10, .topic_hash = 1 }; rx_session_t* ses = mem_res_alloc(mem.session, sizeof(rx_session_t)); @@ -2978,7 +2979,8 @@ static void test_rx_additional_coverage(void) g_collision_count = 0; port.vtable = &(udpard_rx_port_vtable_t){ .on_message = stub_on_message, .on_collision = stub_on_collision }; port.extent = 8; - port.reordering_window = UDPARD_RX_REORDERING_WINDOW_STATELESS; + port.mode = udpard_rx_stateless; + port.reordering_window = 0; rx_frame_t frame; byte_t payload[4] = { 1, 2, 3, 4 }; mem_zero(sizeof(frame), &frame); @@ -3001,8 +3003,7 @@ static void test_rx_additional_coverage(void) frame.meta.transfer_payload_size = 8; rx_port_accept_stateless(&rx, &port, 0, make_ep(1), &frame, instrumented_allocator_make_deleter(&alloc_frag), 0); udpard_rx_port_t port_stateless_new = { 0 }; - TEST_ASSERT_TRUE( - udpard_rx_port_new(&port_stateless_new, 22, 8, UDPARD_RX_REORDERING_WINDOW_STATELESS, mem, port.vtable)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port_stateless_new, 22, 8, udpard_rx_stateless, 0, mem, port.vtable)); TEST_ASSERT_NOT_NULL(port_stateless_new.vtable_private); udpard_rx_port_free(&rx, &port_stateless_new); instrumented_allocator_reset(&alloc_frag); @@ -3044,7 +3045,7 @@ static void test_rx_additional_coverage(void) // Port push collision and malformed header. udpard_rx_port_t port_normal = { 0 }; - TEST_ASSERT_TRUE(udpard_rx_port_new(&port_normal, 1, 8, 10, mem, port.vtable)); + TEST_ASSERT_TRUE(udpard_rx_port_new(&port_normal, 1, 8, udpard_rx_ordered, 10, mem, port.vtable)); udpard_bytes_mut_t bad_payload = { .data = mem_res_alloc(mem.fragment, 4), .size = 4 }; TEST_ASSERT(udpard_rx_port_push( &rx, &port_normal, 0, make_ep(2), bad_payload, instrumented_allocator_make_deleter(&alloc_frag), 0));