Skip to content

Commit 13fd72c

Browse files
add udpard_tx_redirect() but without tests; I think there is a much better solution actually
1 parent 8f56aef commit 13fd72c

2 files changed

Lines changed: 63 additions & 25 deletions

File tree

libudpard/udpard.c

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,11 @@ bool udpard_is_valid_endpoint(const udpard_udpip_ep_t ep)
169169
static uint16_t valid_ep_bitmap(const udpard_udpip_ep_t remote_ep[UDPARD_IFACE_COUNT_MAX])
170170
{
171171
uint16_t bitmap = 0U;
172-
for (size_t i = 0; i < UDPARD_IFACE_COUNT_MAX; i++) {
173-
if (udpard_is_valid_endpoint(remote_ep[i])) {
174-
bitmap |= (1U << i);
172+
if (remote_ep != NULL) {
173+
for (size_t i = 0; i < UDPARD_IFACE_COUNT_MAX; i++) {
174+
if (udpard_is_valid_endpoint(remote_ep[i])) {
175+
bitmap |= (1U << i);
176+
}
175177
}
176178
}
177179
return bitmap;
@@ -1323,6 +1325,46 @@ void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now, const uint16
13231325
}
13241326
}
13251327

1328+
size_t udpard_tx_redirect(udpard_tx_t* const self,
1329+
const uint64_t topic_hash,
1330+
const udpard_udpip_ep_t remote_ep[UDPARD_IFACE_COUNT_MAX])
1331+
{
1332+
size_t out = 0;
1333+
if ((self != NULL) && (valid_ep_bitmap(remote_ep) != 0)) {
1334+
// Transfers are ordered lexicographically by (topic_hash, transfer_id), so we can find the first one
1335+
// with a lower_bound search and then iterate until the topic_hash changes.
1336+
const tx_transfer_key_t key = { .topic_hash = topic_hash, .transfer_id = 0 };
1337+
tx_transfer_t* tr = CAVL2_TO_OWNER(
1338+
cavl2_lower_bound(self->index_transfer, &key, &tx_cavl_compare_transfer), tx_transfer_t, index_transfer);
1339+
UDPARD_ASSERT((tr == NULL) || (tr->topic_hash >= topic_hash));
1340+
while ((tr != NULL) && (tr->topic_hash == topic_hash)) {
1341+
for (size_t i = 0; i < UDPARD_IFACE_COUNT_MAX; i++) {
1342+
// We don't want to enable transmission over a new interface that was not originally intended for
1343+
// this transfer. This matters with time synchronization messages, for example.
1344+
if (udpard_is_valid_endpoint(tr->destination[i]) && udpard_is_valid_endpoint(remote_ep[i])) {
1345+
tr->destination[i] = remote_ep[i];
1346+
}
1347+
}
1348+
out++;
1349+
tr = CAVL2_TO_OWNER(cavl2_next_greater(&tr->index_transfer), tx_transfer_t, index_transfer);
1350+
}
1351+
}
1352+
return out;
1353+
}
1354+
1355+
bool udpard_tx_cancel(udpard_tx_t* const self, const uint64_t topic_hash, const uint64_t transfer_id)
1356+
{
1357+
bool cancelled = false;
1358+
if (self != NULL) {
1359+
tx_transfer_t* const tr = tx_transfer_find(self, topic_hash, transfer_id);
1360+
if (tr != NULL) {
1361+
tx_transfer_retire(self, tr, false);
1362+
cancelled = true;
1363+
}
1364+
}
1365+
return cancelled;
1366+
}
1367+
13261368
uint16_t udpard_tx_pending_ifaces(const udpard_tx_t* const self)
13271369
{
13281370
uint16_t bitmap = 0;
@@ -1365,19 +1407,6 @@ void udpard_tx_refcount_dec(const udpard_bytes_t tx_payload_view)
13651407
}
13661408
}
13671409

1368-
bool udpard_tx_cancel(udpard_tx_t* const self, const uint64_t topic_hash, const uint64_t transfer_id)
1369-
{
1370-
bool cancelled = false;
1371-
if (self != NULL) {
1372-
tx_transfer_t* const tr = tx_transfer_find(self, topic_hash, transfer_id);
1373-
if (tr != NULL) {
1374-
tx_transfer_retire(self, tr, false);
1375-
cancelled = true;
1376-
}
1377-
}
1378-
return cancelled;
1379-
}
1380-
13811410
void udpard_tx_free(udpard_tx_t* const self)
13821411
{
13831412
if (self != NULL) {

libudpard/udpard.h

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -547,15 +547,14 @@ bool udpard_tx_push_p2p(udpard_tx_t* const self,
547547
/// The function may deallocate memory. The time complexity is logarithmic in the number of enqueued transfers.
548548
void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now, const uint16_t iface_bitmap);
549549

550-
/// Returns a bitmap of interfaces that have pending transmissions. This is useful for IO multiplexing loops.
551-
/// Zero indicates that there are no pending transmissions.
552-
/// Which interfaces are usable is defined by the remote endpoints provided when pushing transfers.
553-
uint16_t udpard_tx_pending_ifaces(const udpard_tx_t* const self);
554-
555-
/// When a datagram is ejected and the application opts to keep it, these functions must be used to manage the
556-
/// datagram buffer lifetime. The datagram will be freed once the reference count reaches zero.
557-
void udpard_tx_refcount_inc(const udpard_bytes_t tx_payload_view);
558-
void udpard_tx_refcount_dec(const udpard_bytes_t tx_payload_view);
550+
/// If there are enqueued transfers for the given topic hash, this function modifies their remote endpoints
551+
/// to the provided new endpoints. This is useful when the topic allocation consensus protocol finds a new
552+
/// topic->subject allocation while there are outstanding transfers enqueued for transmission.
553+
/// Returns the number of matched transfers.
554+
/// The complexity is logarithmic in the number of enqueued transfers and linear in the number of modified transfers.
555+
size_t udpard_tx_redirect(udpard_tx_t* const self,
556+
const uint64_t topic_hash,
557+
const udpard_udpip_ep_t remote_ep[UDPARD_IFACE_COUNT_MAX]);
559558

560559
/// Cancel a previously enqueued transfer.
561560
/// If provided, the feedback callback will be invoked with success==false.
@@ -566,6 +565,16 @@ void udpard_tx_refcount_dec(const udpard_bytes_t tx_payload_view);
566565
/// The function will free the memory associated with the transfer.
567566
bool udpard_tx_cancel(udpard_tx_t* const self, const uint64_t topic_hash, const uint64_t transfer_id);
568567

568+
/// Returns a bitmap of interfaces that have pending transmissions. This is useful for IO multiplexing loops.
569+
/// Zero indicates that there are no pending transmissions.
570+
/// Which interfaces are usable is defined by the remote endpoints provided when pushing transfers.
571+
uint16_t udpard_tx_pending_ifaces(const udpard_tx_t* const self);
572+
573+
/// When a datagram is ejected and the application opts to keep it, these functions must be used to manage the
574+
/// datagram buffer lifetime. The datagram will be freed once the reference count reaches zero.
575+
void udpard_tx_refcount_inc(const udpard_bytes_t tx_payload_view);
576+
void udpard_tx_refcount_dec(const udpard_bytes_t tx_payload_view);
577+
569578
/// Drops all enqueued items; afterward, the instance is safe to discard. Reliable transfer callbacks are still invoked.
570579
void udpard_tx_free(udpard_tx_t* const self);
571580

0 commit comments

Comments
 (0)