Skip to content

Commit 3e852ac

Browse files
Remove topic hash from the transport layer; remove ordered delivery and use simplified ACK matching based on the transfer-ID alone (#72)
Ordered delivery cannot be correct given the risk of multiple topics temporarily sharing the same subject.
1 parent 9296213 commit 3e852ac

17 files changed

Lines changed: 1563 additions & 2793 deletions

.idea/dictionaries/project.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ next-generation intelligent vehicles: manned and unmanned aircraft, spacecraft,
2424
- ≤1-copy TX pipeline with deduplication across multiple interfaces and scattered input buffer support.
2525
- Support for redundant network interfaces with seamless interface aggregation and zero fail-over delay.
2626
- Robust message reassembler supporting highly distorted datagram streams:
27-
out-of-order fragments, message ordering recovery, fragment/message deduplication, interleaving, variable MTU, ...
28-
- Robust message ordering recovery for ordering-sensitive applications (e.g., state estimators, control loops)
29-
with well-defined deterministic recovery in the event of lost messages.
27+
out-of-order fragments, fragment/message deduplication, interleaving, variable MTU, ...
3028
- Packet loss mitigation via:
3129
- reliable topics (retransmit until acknowledged; callback notifications for successful/failed deliveries).
3230
- redundant interfaces (packet lost on one interface may be received on another, transparent to the application);
3331
- Heap not required (but supported); the library can be used with fixed-size block pool allocators.
3432
- Detailed time complexity and memory requirement models for the benefit of real-time high-integrity applications.
35-
- Highly scalable: designed to handle thousands of topics and hundreds of concurrent transfers with minimal resources.
33+
- Scalable: designed to handle thousands of topics and hundreds of concurrent transfers with minimal resources.
3634
- Runs anywhere out of the box, including extremely resource-constrained baremetal environments with ~100K ROM/RAM.
3735
No porting required.
3836
- Partial MISRA C compliance (reach out to <https://forum.opencyphal.org>).

cyphal_udp_header.dsdl

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# All Cyphal/UDP traffic is sent to port 9382.
2+
# The subject multicast group address is composed as 239.0.0.0 (=0xEF000000) + subject_id (23 bits).
3+
# All frames of a transfer must share the same field values unless otherwise noted.
4+
# Frames may arrive out-of-order, possibly interleaved with neighboring transfers; implementations must cope.
5+
6+
uint5 version #=2 in this version.
7+
uint3 priority # 0=highest, 7=lowest.
8+
9+
uint2 KIND_MSG_BEST_EFFORT = 0 # No ack must be sent.
10+
uint2 KIND_MSG_RELIABLE = 1 # Remote must acknowledge reception by sending an ACK frame back.
11+
uint2 KIND_ACK = 2 # Sent P2P; the transfer_id is of the acknowledged frame. Payload empty/ignored.
12+
uint2 kind
13+
uint6 reserved_incompat # Discard frame if any incompatibility flags are set that are not understood.
14+
15+
void16 # Reserved for compatibility flags and fields (transmit zero, ignore on reception).
16+
17+
# Payload reassembly information.
18+
# We provide both the frame index and the frame payload offset to allow various reassembly strategies depending on the
19+
# preferences of the implementation. The provided information is sufficient for zero-copy out-of-order reassembly.
20+
# Offset 4 bytes.
21+
22+
uint24 frame_index # Zero-based index of the payload fragment carried by this frame.
23+
void8
24+
uint32 frame_payload_offset # The offset of the frame payload relative to the start of the transfer payload.
25+
uint32 transfer_payload_size # Total for all frames.
26+
27+
# Transfer identification information.
28+
# The transfer-ID is a single field that segregates transfers by topic hash and epoch (publisher sequence restarts).
29+
# Offset 16 bytes.
30+
31+
uint64 transfer_id # For multi-frame reassembly and dedup. ACK specifies the acked tfer here.
32+
uint64 sender_uid # Origin identifier ensures invariance to the source IP address for reassembly.
33+
34+
# Integrity checking information.
35+
# Offset 32 bytes.
36+
37+
uint32 prefix_crc32c # crc32c(payload[0:(frame_payload_offset+payload_size)])
38+
uint32 header_crc32c # Covers all fields above. Same as the transfer payload CRC.
39+
40+
# End of header at 40 bytes. Payload follows.

libudpard/udpard.c

Lines changed: 393 additions & 701 deletions
Large diffs are not rendered by default.

libudpard/udpard.h

Lines changed: 48 additions & 139 deletions
Large diffs are not rendered by default.

tests/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ gen_test_single(test_e2e_random "src/test_e2e_random.cpp;${library_dir}/udpard.c
9898
gen_test_single(test_e2e_edge "src/test_e2e_edge.cpp;${library_dir}/udpard.c")
9999
gen_test_single(test_e2e_api "src/test_e2e_api.cpp;${library_dir}/udpard.c")
100100
gen_test_single(test_e2e_responses "src/test_e2e_responses.cpp;${library_dir}/udpard.c")
101-
gen_test_single(test_e2e_reliable_ordered "src/test_e2e_reliable_ordered.cpp;${library_dir}/udpard.c")
102101
gen_test_single(test_integration_sockets "src/test_integration_sockets.cpp;${library_dir}/udpard.c")
103102

104103
# Coverage targets. Usage:

tests/src/test_e2e_api.cpp

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ struct RxContext
3131
std::array<udpard_udpip_ep_t, UDPARD_IFACE_COUNT_MAX> sources{};
3232
uint64_t remote_uid = 0;
3333
size_t received = 0;
34-
size_t collisions = 0;
3534
};
3635

3736
// Refcount helpers keep captured datagrams alive.
@@ -115,19 +114,14 @@ void on_message(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpar
115114
ctx->received++;
116115
}
117116

118-
void on_collision(udpard_rx_t* const rx, udpard_rx_port_t* const /*port*/, const udpard_remote_t /*remote*/)
119-
{
120-
auto* ctx = static_cast<RxContext*>(rx->user);
121-
ctx->collisions++;
122-
}
123-
constexpr udpard_rx_port_vtable_t callbacks{ .on_message = &on_message, .on_collision = &on_collision };
117+
constexpr udpard_rx_port_vtable_t callbacks{ .on_message = &on_message };
124118

125119
// Ack port frees responses.
126120
void on_ack_response(udpard_rx_t*, udpard_rx_port_t* port, const udpard_rx_transfer_t tr)
127121
{
128122
udpard_fragment_free_all(tr.payload, udpard_make_deleter(port->memory.fragment));
129123
}
130-
constexpr udpard_rx_port_vtable_t ack_callbacks{ .on_message = &on_ack_response, .on_collision = &on_collision };
124+
constexpr udpard_rx_port_vtable_t ack_callbacks{ .on_message = &on_ack_response };
131125

132126
// Reliable delivery must survive data and ack loss.
133127
// Each node uses exactly one TX and one RX instance as per the library design.
@@ -163,6 +157,7 @@ void test_reliable_delivery_under_losses()
163157
res = instrumented_allocator_make_resource(&pub_tx_alloc_payload);
164158
}
165159
const udpard_rx_mem_resources_t pub_rx_mem{ .session = instrumented_allocator_make_resource(&pub_rx_alloc_session),
160+
.slot = instrumented_allocator_make_resource(&pub_rx_alloc_session),
166161
.fragment = instrumented_allocator_make_resource(&pub_rx_alloc_frag) };
167162

168163
udpard_tx_mem_resources_t sub_tx_mem{};
@@ -171,6 +166,7 @@ void test_reliable_delivery_under_losses()
171166
res = instrumented_allocator_make_resource(&sub_tx_alloc_payload);
172167
}
173168
const udpard_rx_mem_resources_t sub_rx_mem{ .session = instrumented_allocator_make_resource(&sub_rx_alloc_session),
169+
.slot = instrumented_allocator_make_resource(&sub_rx_alloc_session),
174170
.fragment = instrumented_allocator_make_resource(&sub_rx_alloc_frag) };
175171

176172
// Publisher node: single TX, single RX (linked to TX for ACK processing).
@@ -184,8 +180,7 @@ void test_reliable_delivery_under_losses()
184180
udpard_rx_t pub_rx{};
185181
udpard_rx_new(&pub_rx, &pub_tx);
186182
udpard_rx_port_t pub_p2p_port{};
187-
TEST_ASSERT_TRUE(
188-
udpard_rx_port_new(&pub_p2p_port, pub_uid, 16, udpard_rx_unordered, 0, pub_rx_mem, &ack_callbacks));
183+
TEST_ASSERT_TRUE(udpard_rx_port_new_p2p(&pub_p2p_port, 16, pub_rx_mem, &ack_callbacks));
189184

190185
// Subscriber node: single TX, single RX (linked to TX for sending ACKs).
191186
constexpr uint64_t sub_uid = 0xABCDEF0012345678ULL;
@@ -197,8 +192,7 @@ void test_reliable_delivery_under_losses()
197192
udpard_rx_t sub_rx{};
198193
udpard_rx_new(&sub_rx, &sub_tx);
199194
udpard_rx_port_t sub_port{};
200-
const uint64_t topic_hash = 0x0123456789ABCDEFULL;
201-
TEST_ASSERT_TRUE(udpard_rx_port_new(&sub_port, topic_hash, 6000, udpard_rx_unordered, 0, sub_rx_mem, &callbacks));
195+
TEST_ASSERT_TRUE(udpard_rx_port_new(&sub_port, 6000, sub_rx_mem, &callbacks));
202196

203197
// Endpoints.
204198
const std::array<udpard_udpip_ep_t, UDPARD_IFACE_COUNT_MAX> publisher_sources{
@@ -235,7 +229,6 @@ void test_reliable_delivery_under_losses()
235229
deadline,
236230
iface_bitmap_all,
237231
udpard_prio_fast,
238-
topic_hash,
239232
1U,
240233
payload_view,
241234
&record_feedback,
@@ -296,8 +289,6 @@ void test_reliable_delivery_under_losses()
296289
TEST_ASSERT_EQUAL_size_t(1, fb.count);
297290
TEST_ASSERT_EQUAL_UINT32(1, fb.acknowledgements);
298291
TEST_ASSERT_EQUAL_size_t(1, ctx.received);
299-
TEST_ASSERT_EQUAL_size_t(0, ctx.collisions);
300-
301292
// Cleanup.
302293
udpard_rx_port_free(&sub_rx, &sub_port);
303294
udpard_rx_port_free(&pub_rx, &pub_p2p_port);
@@ -350,7 +341,6 @@ void test_reliable_stats_and_failures()
350341
10,
351342
iface_bitmap_1,
352343
udpard_prio_fast,
353-
0xABCULL,
354344
5U,
355345
exp_payload,
356346
&record_feedback,
@@ -380,6 +370,7 @@ void test_reliable_stats_and_failures()
380370
instrumented_allocator_new(&src_alloc_transfer);
381371
instrumented_allocator_new(&src_alloc_payload);
382372
const udpard_rx_mem_resources_t rx_mem{ .session = instrumented_allocator_make_resource(&rx_alloc_session),
373+
.slot = instrumented_allocator_make_resource(&rx_alloc_session),
383374
.fragment = instrumented_allocator_make_resource(&rx_alloc_frag) };
384375
udpard_tx_mem_resources_t src_mem{};
385376
src_mem.transfer = instrumented_allocator_make_resource(&src_alloc_transfer);
@@ -399,7 +390,7 @@ void test_reliable_stats_and_failures()
399390
ctx.expected.assign({ 1U, 2U, 3U, 4U });
400391
udpard_rx_new(&rx, nullptr);
401392
rx.user = &ctx;
402-
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 0x12340000ULL, 64, udpard_rx_unordered, 0, rx_mem, &callbacks));
393+
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 64, rx_mem, &callbacks));
403394

404395
const udpard_bytes_scattered_t src_payload = make_scattered(ctx.expected.data(), ctx.expected.size());
405396
FeedbackState fb_ignore{};
@@ -408,7 +399,6 @@ void test_reliable_stats_and_failures()
408399
1000,
409400
iface_bitmap_1,
410401
udpard_prio_fast,
411-
port.topic_hash,
412402
7U,
413403
src_payload,
414404
&record_feedback,

0 commit comments

Comments
 (0)