Skip to content

Commit 61c62f1

Browse files
udpard_tx_cancel_all
1 parent f1e02b1 commit 61c62f1

3 files changed

Lines changed: 160 additions & 0 deletions

File tree

libudpard/udpard.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,26 @@ bool udpard_tx_cancel(udpard_tx_t* const self, const uint64_t topic_hash, const
13671367
return cancelled;
13681368
}
13691369

1370+
size_t udpard_tx_cancel_all(udpard_tx_t* const self, const uint64_t topic_hash)
1371+
{
1372+
size_t count = 0;
1373+
if (self != NULL) {
1374+
// Find the first transfer with matching topic_hash using transfer_id=0 as lower bound.
1375+
const tx_transfer_key_t key = { .topic_hash = topic_hash, .transfer_id = 0 };
1376+
tx_transfer_t* tr = CAVL2_TO_OWNER(
1377+
cavl2_lower_bound(self->index_transfer, &key, &tx_cavl_compare_transfer), tx_transfer_t, index_transfer);
1378+
// Iterate through all transfers with the same topic_hash.
1379+
while ((tr != NULL) && (tr->topic_hash == topic_hash)) {
1380+
tx_transfer_t* const next =
1381+
CAVL2_TO_OWNER(cavl2_next_greater(&tr->index_transfer), tx_transfer_t, index_transfer);
1382+
tx_transfer_retire(self, tr, false);
1383+
count++;
1384+
tr = next;
1385+
}
1386+
}
1387+
return count;
1388+
}
1389+
13701390
uint16_t udpard_tx_pending_ifaces(const udpard_tx_t* const self)
13711391
{
13721392
uint16_t bitmap = 0;

libudpard/udpard.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,11 @@ void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now, const uint16
572572
/// The function will free the memory associated with the transfer.
573573
bool udpard_tx_cancel(udpard_tx_t* const self, const uint64_t topic_hash, const uint64_t transfer_id);
574574

575+
/// Like udpard_tx_cancel(), but cancels all transfers matching the given topic hash.
576+
/// Returns the number of matched transfers.
577+
/// This is important to invoke when destroying a topic to ensure no dangling callbacks remain.
578+
size_t udpard_tx_cancel_all(udpard_tx_t* const self, const uint64_t topic_hash);
579+
575580
/// Returns a bitmap of interfaces that have pending transmissions. This is useful for IO multiplexing loops.
576581
/// Zero indicates that there are no pending transmissions.
577582
/// Which interfaces are usable is defined by the remote endpoints provided when pushing transfers.

tests/src/test_intrusive_tx.c

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,140 @@ static void test_tx_cancel(void)
755755
instrumented_allocator_reset(&alloc);
756756
}
757757

758+
// Cancels all transfers matching a topic hash.
759+
static void test_tx_cancel_all(void)
760+
{
761+
// NULL self returns zero.
762+
TEST_ASSERT_EQUAL_size_t(0, udpard_tx_cancel_all(NULL, 0));
763+
764+
instrumented_allocator_t alloc = { 0 };
765+
instrumented_allocator_new(&alloc);
766+
udpard_tx_mem_resources_t mem = { .transfer = instrumented_allocator_make_resource(&alloc) };
767+
for (size_t i = 0; i < UDPARD_IFACE_COUNT_MAX; i++) {
768+
mem.payload[i] = instrumented_allocator_make_resource(&alloc);
769+
}
770+
771+
udpard_tx_t tx = { 0 };
772+
feedback_state_t fstate = { 0 };
773+
eject_state_t eject = { .count = 0, .allow = false }; // Block ejection to retain frames.
774+
const uint16_t iface_bitmap_1 = (1U << 0U);
775+
udpard_tx_vtable_t vt = { .eject_subject = eject_subject_with_flag, .eject_p2p = eject_p2p_with_flag };
776+
TEST_ASSERT_TRUE(udpard_tx_new(&tx, 40U, 1U, 16U, mem, &vt));
777+
tx.user = &eject;
778+
779+
// Cancel with no matching transfers returns zero.
780+
TEST_ASSERT_EQUAL_size_t(0, udpard_tx_cancel_all(&tx, 999));
781+
782+
// Push multiple transfers with different topic hashes.
783+
// Topic 100: transfers 1, 2, 3 (reliable)
784+
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
785+
0,
786+
1000,
787+
iface_bitmap_1,
788+
udpard_prio_fast,
789+
100,
790+
1,
791+
make_scattered(NULL, 0),
792+
record_feedback,
793+
make_user_context(&fstate)));
794+
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
795+
0,
796+
1000,
797+
iface_bitmap_1,
798+
udpard_prio_fast,
799+
100,
800+
2,
801+
make_scattered(NULL, 0),
802+
record_feedback,
803+
make_user_context(&fstate)));
804+
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
805+
0,
806+
1000,
807+
iface_bitmap_1,
808+
udpard_prio_fast,
809+
100,
810+
3,
811+
make_scattered(NULL, 0),
812+
record_feedback,
813+
make_user_context(&fstate)));
814+
// Topic 200: transfers 1, 2 (best-effort)
815+
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
816+
0,
817+
1000,
818+
iface_bitmap_1,
819+
udpard_prio_nominal,
820+
200,
821+
1,
822+
make_scattered(NULL, 0),
823+
NULL,
824+
UDPARD_USER_CONTEXT_NULL));
825+
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
826+
0,
827+
1000,
828+
iface_bitmap_1,
829+
udpard_prio_nominal,
830+
200,
831+
2,
832+
make_scattered(NULL, 0),
833+
NULL,
834+
UDPARD_USER_CONTEXT_NULL));
835+
// Topic 300: transfer 1 (reliable)
836+
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
837+
0,
838+
1000,
839+
iface_bitmap_1,
840+
udpard_prio_low,
841+
300,
842+
1,
843+
make_scattered(NULL, 0),
844+
record_feedback,
845+
make_user_context(&fstate)));
846+
847+
TEST_ASSERT_EQUAL_size_t(6, tx.enqueued_frames_count);
848+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 100, 1));
849+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 100, 2));
850+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 100, 3));
851+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 200, 1));
852+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 200, 2));
853+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 300, 1));
854+
855+
// Cancel all topic 100 transfers; feedback invoked for each reliable transfer.
856+
fstate.count = 0;
857+
TEST_ASSERT_EQUAL_size_t(3, udpard_tx_cancel_all(&tx, 100));
858+
TEST_ASSERT_EQUAL_size_t(3, fstate.count);
859+
TEST_ASSERT_EQUAL_UINT32(0, fstate.last.acknowledgements);
860+
TEST_ASSERT_NULL(tx_transfer_find(&tx, 100, 1));
861+
TEST_ASSERT_NULL(tx_transfer_find(&tx, 100, 2));
862+
TEST_ASSERT_NULL(tx_transfer_find(&tx, 100, 3));
863+
// Other topics remain.
864+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 200, 1));
865+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 200, 2));
866+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 300, 1));
867+
TEST_ASSERT_EQUAL_size_t(3, tx.enqueued_frames_count);
868+
869+
// Cancel topic 200 (best-effort, no feedback).
870+
fstate.count = 0;
871+
TEST_ASSERT_EQUAL_size_t(2, udpard_tx_cancel_all(&tx, 200));
872+
TEST_ASSERT_EQUAL_size_t(0, fstate.count);
873+
TEST_ASSERT_NULL(tx_transfer_find(&tx, 200, 1));
874+
TEST_ASSERT_NULL(tx_transfer_find(&tx, 200, 2));
875+
TEST_ASSERT_NOT_NULL(tx_transfer_find(&tx, 300, 1));
876+
TEST_ASSERT_EQUAL_size_t(1, tx.enqueued_frames_count);
877+
878+
// Cancel already-cancelled topic returns zero.
879+
TEST_ASSERT_EQUAL_size_t(0, udpard_tx_cancel_all(&tx, 100));
880+
881+
// Cancel last remaining topic.
882+
fstate.count = 0;
883+
TEST_ASSERT_EQUAL_size_t(1, udpard_tx_cancel_all(&tx, 300));
884+
TEST_ASSERT_EQUAL_size_t(1, fstate.count);
885+
TEST_ASSERT_NULL(tx_transfer_find(&tx, 300, 1));
886+
TEST_ASSERT_EQUAL_size_t(0, tx.enqueued_frames_count);
887+
888+
udpard_tx_free(&tx);
889+
instrumented_allocator_reset(&alloc);
890+
}
891+
758892
static void test_tx_spool_deduplication(void)
759893
{
760894
instrumented_allocator_t alloc_a = { 0 };
@@ -904,6 +1038,7 @@ int main(void)
9041038
RUN_TEST(test_tx_stage_if_via_tx_push);
9051039
RUN_TEST(test_tx_stage_if_short_deadline);
9061040
RUN_TEST(test_tx_cancel);
1041+
RUN_TEST(test_tx_cancel_all);
9071042
RUN_TEST(test_tx_spool_deduplication);
9081043
RUN_TEST(test_tx_ack_and_scheduler);
9091044
return UNITY_END();

0 commit comments

Comments
 (0)