@@ -75,6 +75,8 @@ typedef unsigned char byte_t;
7575
7676#define FOREACH_IFACE (i ) for (size_t i = 0; i < CANARD_IFACE_COUNT; i++)
7777
78+ #define TREE_NULL (canard_tree_t){ NULL, { NULL, NULL }, 0 }
79+
7880typedef enum transfer_kind_t
7981{
8082 transfer_kind_message = 0 ,
@@ -510,18 +512,23 @@ void canard_refcount_dec(canard_t* const self, const canard_bytes_t obj)
510512 }
511513}
512514
513- /// We don't know v1.1 subject-ID bits until transmission time so in the CAN ID they are not defined.
515+ /// We don't know v1.1 subject-ID bits until transmission time so in the CAN ID they are not defined (assume arbitrary) .
514516/// The local node-ID bits are also set at the transmission time in case it is changed to avoid collisions on the bus.
515517///
516518/// The fields are weakly arranged to reduce padding, but logical grouping is prioritized.
517519/// If o1heap is used and the platform is 32-bit (x4 pointer size overhead per block), a struct up to 112 bytes fits
518520/// into a 128-byte block; reducing the footprint does not bring any benefit until <=48 bytes (needs a 64-byte block).
519521struct canard_txfer_t
520522{
521- /// Memory-efficient but not very fast indexes optimized for a low number of pending transfers .
523+ // Lookup and ordering indexes.
522524 canard_listed_t list_pending [CANARD_IFACE_COUNT ];
523525 canard_listed_t list_delayed ;
524- canard_listed_t list_oldest ;
526+ canard_listed_t list_agewise ;
527+ /// Ordered by (topic hash, transfer-ID), ALL pending reliable transfers.
528+ /// Used for:
529+ /// - Checking for duplicates when a new reliable transfer is created.
530+ /// - Finding the pending reliable transfers waiting for ack when an ack frame is received.
531+ canard_tree_t index_reliable ;
525532
526533 /// Mutable transmission state. All other fields, except for the index handles, are immutable.
527534 ///
@@ -531,7 +538,7 @@ struct canard_txfer_t
531538 /// in which case the old head is dereferenced and the head points to the next frame to transmit.
532539 tx_frame_t * head [CANARD_IFACE_COUNT ];
533540 tx_frame_t * cursor [CANARD_IFACE_COUNT ];
534- canard_us_t delayed_until ; ///< When the transfer becomes eligible for retransmission. HEAT_DEATH if backlogged .
541+ canard_us_t delayed_until ; ///< HEAT_DEATH if backlogged; BIG_BANG if not delayed (not in the delayed index) .
535542 byte_t epoch ; ///< No overflow due to exponential backoff; e.g. 1us @ epoch=48 => 9 years.
536543
537544 /// Constant transfer properties supplied by the client.
@@ -545,7 +552,7 @@ struct canard_txfer_t
545552 byte_t remote_transfer_id : 5 ;
546553 byte_t fd : 1 ;
547554 byte_t reliable : 1 ;
548- uint32_t can_id ; ///< For v1.1 messages, the subject-ID bits are zeroed . Node-ID is always zeroed.
555+ uint32_t can_id ; ///< For v1.1 messages, the subject-ID bits are UNDEFINED . Node-ID is always zeroed.
549556 uint64_t topic_hash ;
550557 uint64_t remote_topic_hash ;
551558 canard_us_t deadline ;
@@ -576,14 +583,15 @@ static canard_txfer_t* txfer_new(const canard_mem_t mem,
576583 FOREACH_IFACE (i ) {
577584 tr -> list_pending [i ] = LIST_NULL ;
578585 }
579- tr -> list_delayed = LIST_NULL ;
580- tr -> list_oldest = LIST_NULL ;
586+ tr -> list_delayed = LIST_NULL ;
587+ tr -> list_agewise = LIST_NULL ;
588+ tr -> index_reliable = TREE_NULL ;
581589 //
582590 FOREACH_IFACE (i ) {
583591 tr -> head [i ] = tr -> cursor [i ] = NULL ;
584592 }
585593 tr -> epoch = 0 ;
586- tr -> delayed_until = BIG_BANG ;
594+ tr -> delayed_until = BIG_BANG ; ///< Not in the delayed index yet.
587595 //
588596 tr -> iface_bitmap = iface_bitmap ;
589597 tr -> transfer_id = transfer_id & CANARD_TRANSFER_ID_MAX ;
@@ -652,6 +660,7 @@ static void tx_arm_delay_if(canard_t* const self, canard_txfer_t* const tr)
652660 list_delayed ,
653661 anchor ,
654662 (anchor -> delayed_until > tr -> delayed_until ));
663+ CANARD_ASSERT ((tr -> delayed_until != BIG_BANG ) && (tr -> delayed_until != HEAT_DEATH ));
655664 enlist_before (& self -> tx .delayed [shard ], anchor ? & anchor -> list_delayed : NULL , & tr -> list_delayed );
656665 } else {
657666 delist (& self -> tx .delayed [shard ], & tr -> list_delayed );
@@ -686,20 +695,24 @@ static void txfer_retire(canard_t* const self, canard_txfer_t* const tr, const b
686695 const bool backlogged = txfer_is_backlogged (tr );
687696
688697 // Delist everywhere. Remember that delisting a non-listed entity is a safe no-op.
689- if (self -> tx .iterator [ reliable ] == tr ) {
690- self -> tx .iterator [ reliable ] = LIST_NEXT (tr , canard_txfer_t , list_oldest ); // May be NULL, is OK.
698+ if (self -> tx .iter == tr ) {
699+ self -> tx .iter = LIST_NEXT (tr , canard_txfer_t , list_agewise ); // May be NULL, is OK.
691700 }
692701 FOREACH_IFACE (i ) {
693702 if ((tr -> iface_bitmap & (1U << i )) != 0 ) {
694- CANARD_ASSERT ((self -> tx .pending_shards_bitmap [i ] & (1U << shard )) != 0U );
703+ CANARD_ASSERT ((( self -> tx .pending_shards_bitmap [i ] & (1U << shard )) != 0U ) || backlogged );
695704 delist (& self -> tx .pending [shard ][i ], & tr -> list_pending [i ]);
696705 if (self -> tx .pending [shard ][i ].head == NULL ) {
697706 self -> tx .pending_shards_bitmap [i ] &= ~(1U << shard );
698707 }
699708 }
700709 }
701- delist (& self -> tx .oldest [reliable ], & tr -> list_oldest );
702710 delist (& self -> tx .delayed [shard ], & tr -> list_delayed );
711+ delist (& self -> tx .agewise , & tr -> list_agewise );
712+ if (reliable ) {
713+ CANARD_ASSERT (cavl2_is_inserted (self -> tx .reliable , & tr -> index_reliable ));
714+ cavl2_remove (& self -> tx .reliable , & tr -> index_reliable );
715+ }
703716
704717 // If the transfer was not backlogged, it may have been blocking other transfers, but only if it is reliable.
705718 // Best-effort transfers do not block anything because in the presence of a stalled interface they may take a
@@ -733,7 +746,7 @@ static void txfer_retire(canard_t* const self, canard_txfer_t* const tr, const b
733746 mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
734747
735748 // Finally, when the internal state is updated and consistent, invoke the feedback callback if any.
736- if (tr -> reliable ) {
749+ if (reliable ) {
737750 self -> vtable -> feedback (self , user_context , topic_hash , transfer_id , success ? 1 : 0 );
738751 }
739752}
@@ -894,20 +907,32 @@ static tx_frame_t* tx_spool_v0(canard_t* const self,
894907 return head ;
895908}
896909
910+ /// To check all transfers under a specific topic hash, use lower bound lookup with transfer_id=0 and
911+ /// then iterate until the topic hash changes.
912+ /// This is O(log n) because the number of distinct transfer-ID in Cyphal/CAN is bounded at a low value.
913+ typedef struct
914+ {
915+ uint64_t topic_hash ;
916+ byte_t transfer_id ;
917+ } txfer_key_t ;
918+
919+ static int32_t tx_cavl_compare_reliable (const void * const user , const canard_tree_t * const node )
920+ {
921+ const txfer_key_t * const key = (const txfer_key_t * )user ;
922+ const canard_txfer_t * const tr = CAVL2_TO_OWNER (node , canard_txfer_t , index_reliable ); // clang-format off
923+ if (key -> topic_hash < tr -> topic_hash ) { return -1 ; }
924+ if (key -> topic_hash > tr -> topic_hash ) { return +1 ; } // clang-format on
925+ return (int32_t )key -> transfer_id - (int32_t )tr -> transfer_id ;
926+ }
927+
897928/// When the queue is exhausted, finds a transfer to sacrifice using simple heuristics and returns it.
898929/// Will return NULL if there are no transfers worth sacrificing (no queue space can be reclaimed).
899930/// We cannot simply stop accepting new transfers when the queue is full, because it may be caused by a single
900931/// stalled interface holding back progress for all transfers.
901932/// The heuristics are subject to review and improvement.
902933static canard_txfer_t * tx_sacrifice (const canard_t * const self )
903934{
904- // A best-effort transfer can be stuck for a long time if there is a stalled redundant interface.
905- // A single stalled interface does not affect reliable transfers because the first ack will free them.
906- canard_txfer_t * tr = LIST_HEAD (self -> tx .oldest [0 ], canard_txfer_t , list_oldest ); // best-effort first
907- if (tr == NULL ) {
908- tr = LIST_HEAD (self -> tx .oldest [1 ], canard_txfer_t , list_oldest );
909- }
910- return tr ;
935+ return LIST_HEAD (self -> tx .agewise , canard_txfer_t , list_agewise );
911936}
912937
913938/// True on success, false if not possible to reclaim enough space.
@@ -963,8 +988,6 @@ static size_t tx_predict_frame_count(const size_t transfer_size, const size_t mt
963988}
964989
965990/// Enqueues a transfer for transmission.
966- /// For v1.1 messages, the subject-ID resolution is postponed until transmission time, and the corresponding bits
967- /// of the CAN ID are zeroed here.
968991static bool tx_push (canard_t * const self ,
969992 canard_txfer_t * const tr ,
970993 const canard_bytes_chain_t payload ,
@@ -978,14 +1001,35 @@ static bool tx_push(canard_t* const self,
9781001 // if they have the same arbitration priority as the new transfer, they should get a chance to go first.
9791002 tx_promote_delayed (self , now );
9801003
1004+ // Ensure there are no duplicate reliable transfers with the same local (topic hash, transfer-ID).
1005+ // This does not affect outgoing acks since they are not reliable. This may affect the application if it is
1006+ // trying to push too many reliable transfers on the same topic too quickly.
1007+ // Insert at the same time to avoid double tree walk.
1008+ if (tr -> reliable ) {
1009+ const txfer_key_t key = { .topic_hash = tr -> topic_hash , .transfer_id = tr -> transfer_id };
1010+ canard_txfer_t * const rel = CAVL2_TO_OWNER (
1011+ cavl2_find_or_insert (& self -> tx .reliable , & key , tx_cavl_compare_reliable , tr , cavl2_trivial_factory ),
1012+ canard_txfer_t ,
1013+ index_reliable );
1014+ if (rel != tr ) { // Duplicate found.
1015+ mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
1016+ self -> err .tx_duplicate ++ ;
1017+ return false;
1018+ }
1019+ }
1020+
9811021 // Ensure the queue has enough space. v0 transfers always use Classic CAN regardless of tr->fd.
9821022 const size_t mtu = tr -> fd ? CANARD_MTU_CAN_FD : CANARD_MTU_CAN_CLASSIC ;
9831023 const size_t size = bytes_chain_size (payload ); // TODO: pass the precomputed size into spool functions
9841024 const size_t n_frames = tx_predict_frame_count (size , mtu );
9851025 CANARD_ASSERT (n_frames > 0 );
9861026 if (!tx_ensure_queue_space (self , n_frames )) {
987- mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
9881027 self -> err .tx_capacity ++ ;
1028+ if (tr -> reliable ) {
1029+ CANARD_ASSERT (cavl2_is_inserted (self -> tx .reliable , & tr -> index_reliable ));
1030+ cavl2_remove (& self -> tx .reliable , & tr -> index_reliable );
1031+ }
1032+ mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
9891033 return false;
9901034 }
9911035
@@ -994,8 +1038,12 @@ static bool tx_push(canard_t* const self,
9941038 tx_frame_t * const spool = transfer_kind_is_v0 (tr -> kind ) ? tx_spool_v0 (self , crc_seed , tr -> transfer_id , payload )
9951039 : tx_spool (self , crc_seed , mtu , tr -> transfer_id , payload );
9961040 if (spool == NULL ) {
997- mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
9981041 self -> err .oom ++ ;
1042+ if (tr -> reliable ) {
1043+ CANARD_ASSERT (cavl2_is_inserted (self -> tx .reliable , & tr -> index_reliable ));
1044+ cavl2_remove (& self -> tx .reliable , & tr -> index_reliable );
1045+ }
1046+ mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
9991047 return false;
10001048 }
10011049 CANARD_ASSERT ((self -> tx .queue_size - queue_size_before ) == n_frames );
@@ -1011,10 +1059,18 @@ static bool tx_push(canard_t* const self,
10111059 }
10121060 }
10131061
1014- // Insert into the oldest list.
1015- enlist_tail (& self -> tx .oldest [tr -> reliable ], & tr -> list_oldest );
1062+ // Attach the spool.
1063+ FOREACH_IFACE (i ) {
1064+ if ((tr -> iface_bitmap & (1U << i )) != 0 ) {
1065+ tr -> head [i ] = spool ;
1066+ tr -> cursor [i ] = spool ;
1067+ }
1068+ }
1069+
1070+ // Register the transfer.
1071+ enlist_tail (& self -> tx .agewise , & tr -> list_agewise );
10161072
1017- // We need to ensure that transfers emitted at the same priority level are send strictly in push order.
1073+ // We need to ensure that same-topic transfers at the same priority level are sent strictly in the push order.
10181074 // For best-effort transfers this is trivial as all we need to do is to enqueue them as-is.
10191075 // Reliable transfers, however, break this order because they may be retransmitted multiple times, and if any
10201076 // other transfer (any QoS) is enqueued while at least one reliable transfer is pending, reordering may result,
@@ -1030,25 +1086,22 @@ static bool tx_push(canard_t* const self,
10301086 // completed the last transmission attempt, no further attempts will be made, and thus no reordering can occur.
10311087 const byte_t shard = txfer_shard (tr );
10321088 bool has_preceding_reliable = false;
1033- FOREACH_IFACE (i ) {
1034- LIST_FIND_FIRST (self -> tx .pending [shard ][i ],
1035- canard_txfer_t ,
1036- list_pending [i ], // check if this is standard-compliant
1037- match ,
1038- (match -> topic_hash == tr -> topic_hash ) && match -> reliable );
1039- if (match != NULL ) {
1040- has_preceding_reliable = true;
1041- break ;
1089+ {
1090+ const txfer_key_t key = { .topic_hash = tr -> topic_hash , .transfer_id = 0 };
1091+ canard_txfer_t * rel = CAVL2_TO_OWNER (
1092+ cavl2_lower_bound (self -> tx .reliable , & key , tx_cavl_compare_reliable ), canard_txfer_t , index_reliable );
1093+ while ((rel != NULL ) && (rel -> topic_hash == tr -> topic_hash )) {
1094+ if ((rel != tr ) && (txfer_shard (rel ) == shard )) {
1095+ const bool delayed = rel -> delayed_until > BIG_BANG ; // Maybe backlogged also, but it's all the same.
1096+ CANARD_ASSERT ((!delayed ) || is_listed (& self -> tx .delayed [shard ], & rel -> list_delayed ));
1097+ if (delayed ) {
1098+ has_preceding_reliable = true;
1099+ break ;
1100+ }
1101+ }
1102+ rel = CAVL2_TO_OWNER (cavl2_next_greater (& rel -> index_reliable ), canard_txfer_t , index_reliable );
10421103 }
10431104 }
1044- if (!has_preceding_reliable ) {
1045- LIST_FIND_FIRST (self -> tx .delayed [shard ],
1046- canard_txfer_t ,
1047- list_delayed ,
1048- match ,
1049- (match -> topic_hash == tr -> topic_hash ) && match -> reliable );
1050- has_preceding_reliable = (match != NULL );
1051- }
10521105
10531106 // Schedule for transmission or backlog depending on the findings above.
10541107 if (has_preceding_reliable ) { // into the backlog you go, buddy
@@ -1057,8 +1110,6 @@ static bool tx_push(canard_t* const self,
10571110 } else {
10581111 FOREACH_IFACE (i ) {
10591112 if ((tr -> iface_bitmap & (1U << i )) != 0 ) {
1060- tr -> head [i ] = spool ;
1061- tr -> cursor [i ] = spool ;
10621113 enlist_tail (& self -> tx .pending [shard ][i ], & tr -> list_pending [i ]);
10631114 self -> tx .pending_shards_bitmap [i ] |= (1U << shard );
10641115 }
@@ -1069,21 +1120,30 @@ static bool tx_push(canard_t* const self,
10691120}
10701121
10711122/// Handle an ACK received from a remote node.
1072- static void tx_receive_ack (canard_t * const self , const uint64_t topic_hash , const byte_t transfer_id )
1123+ /// The topic and transfer-ID are referring to the LOCAL values, not REMOTE, because the remote doesn't care about the
1124+ /// payload of the transfer and doesn't see the P2P header.
1125+ /// Note an important design decision: acks are identified by topic hash, not subject-ID, because the subject-ID may be
1126+ /// changed by the consensus protocol at any moment while the transfer is pending.
1127+ /// We match only 49 bits out of 64 but this is enough to achieve a negligible probability of collision.
1128+ static void tx_receive_ack (canard_t * const self , const uint64_t topic_hash_lower_bound , const byte_t transfer_id )
10731129{
1074- // Scan from oldest because they are the most likely to match.
1075- // We are not expected to hold a large number of pending reliable transfers, so a linear search is acceptable --
1076- // up to a couple dozen transfers should be at least on par with a BST lookup.
1077- // If this ever becomes a problem, we can add a separate index for pending reliable transfers keyed by topic hash.
1078- LIST_FIND_FIRST (self -> tx .oldest [1 ], // Search reliable only; best-effort transfers are not in this list.
1079- canard_txfer_t ,
1080- list_oldest ,
1081- tr , // Backlogged transfers may possibly have conflicting transfer-ID.
1082- (tr -> topic_hash == topic_hash ) && (tr -> transfer_id == transfer_id ) && !txfer_is_backlogged (tr ));
1083- if (tr != NULL ) {
1084- CANARD_ASSERT (tr -> reliable && (tr -> topic_hash == topic_hash ) && (tr -> transfer_id == transfer_id ));
1085- txfer_retire (self , tr , true);
1130+ const txfer_key_t key = { .topic_hash = topic_hash_lower_bound & CANARD_P2P_TOPIC_HASH_LOWER_BOUND_MASK ,
1131+ .transfer_id = transfer_id };
1132+ canard_txfer_t * tr = CAVL2_TO_OWNER (cavl2_lower_bound (self -> tx .reliable , & key , tx_cavl_compare_reliable ), // ------
1133+ canard_txfer_t ,
1134+ index_reliable );
1135+ if ((tr == NULL ) || ((tr -> topic_hash & CANARD_P2P_TOPIC_HASH_LOWER_BOUND_MASK ) != topic_hash_lower_bound )) {
1136+ return ;
1137+ }
1138+ // Linear scan to find the matching transfer ID. This is bounded by the max transfer-ID count (32).
1139+ while ((tr != NULL ) && ((tr -> topic_hash & CANARD_P2P_TOPIC_HASH_LOWER_BOUND_MASK ) == topic_hash_lower_bound )) {
1140+ if (tr -> transfer_id == transfer_id ) { // Found!
1141+ txfer_retire (self , tr , true);
1142+ break ;
1143+ }
1144+ tr = CAVL2_TO_OWNER (cavl2_next_greater (& tr -> index_reliable ), canard_txfer_t , index_reliable );
10861145 }
1146+ // If not found, is fine -- probably a duplicate ACK or a remote error.
10871147}
10881148
10891149bool canard_publish (canard_t * const self ,
@@ -1108,6 +1168,12 @@ bool canard_publish(canard_t* const self,
11081168 can_id |= (3UL << 21U ) | (uint32_t )(topic_hash << 8U ); // set reserved bits 21 and 22
11091169 } else {
11101170 can_id |= (1UL << 7U ); // reserved bit 7 indicates v1.1 message; subject-ID will be set later
1171+ // We don't know the subject-ID yet, so we could leave the corresponding bits zeroed, but that would cause
1172+ // poor sharding if the shard bit count exceeds 4. The most significant bit of the subject-ID maps to the
1173+ // anonymous bit of v1.0, which is nearly always zero, so we force it to 1 to improve sharding.
1174+ // The remaining bits are taken from the topic hash to further improve sharding.
1175+ // All of these will be rewritten with the proper subject-ID at transmission time.
1176+ can_id |= (1UL << 24U ) | (uint32_t )((topic_hash & 0xFFFFUL ) << 8U );
11111177 }
11121178
11131179 // Compose the message header, unless v1.0 -- those have no header. See docs for CANARD_HEADER_MESSAGE_BYTES.
0 commit comments