Skip to content

Commit f4a875f

Browse files
do not eject from push functions
1 parent 6b517d8 commit f4a875f

2 files changed

Lines changed: 45 additions & 38 deletions

File tree

libudpard/udpard.c

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,41 @@ static void tx_stage_if(udpard_tx_t* const tx, tx_transfer_t* const tr)
863863
}
864864
}
865865

866+
static void tx_purge_expired_transfers(udpard_tx_t* const self, const udpard_us_t now)
867+
{
868+
while (true) { // we can use next_greater instead of doing min search every time
869+
tx_transfer_t* const tr = CAVL2_TO_OWNER(cavl2_min(self->index_deadline), tx_transfer_t, index_deadline);
870+
if ((tr != NULL) && (now > tr->deadline)) {
871+
tx_transfer_retire(self, tr, false);
872+
self->errors_expiration++;
873+
} else {
874+
break;
875+
}
876+
}
877+
}
878+
879+
static void tx_promote_staged_transfers(udpard_tx_t* const self, const udpard_us_t now)
880+
{
881+
while (true) { // we can use next_greater instead of doing min search every time
882+
tx_transfer_t* const tr = CAVL2_TO_OWNER(cavl2_min(self->index_staged), tx_transfer_t, index_staged);
883+
if ((tr != NULL) && (now >= tr->staged_until)) {
884+
// Reinsert into the staged index at the new position, when the next attempt is due (if any).
885+
cavl2_remove(&self->index_staged, &tr->index_staged);
886+
tx_stage_if(self, tr);
887+
// Enqueue for transmission unless it's been there since the last attempt (stalled interface?)
888+
for (size_t i = 0; i < UDPARD_IFACE_COUNT_MAX; i++) {
889+
if (((tr->iface_bitmap & (1U << i)) != 0) && !is_listed(&self->queue[i][tr->priority], &tr->queue[i])) {
890+
UDPARD_ASSERT(tr->head[i] != NULL); // cannot stage without payload, doesn't make sense
891+
UDPARD_ASSERT(tr->cursor[i] == tr->head[i]); // must have been rewound after last attempt
892+
enlist_head(&self->queue[i][tr->priority], &tr->queue[i]);
893+
}
894+
}
895+
} else {
896+
break;
897+
}
898+
}
899+
}
900+
866901
/// A transfer can use the same fragments between two interfaces if
867902
/// (both have the same MTU OR the transfer fits in both MTU) AND both use the same allocator.
868903
/// Either they will share the same spool, or there is only a single frame so the MTU difference does not matter.
@@ -918,6 +953,13 @@ static bool tx_push(udpard_tx_t* const tx,
918953
UDPARD_ASSERT((iface_bitmap & UDPARD_IFACE_BITMAP_ALL) != 0);
919954
UDPARD_ASSERT((iface_bitmap & UDPARD_IFACE_BITMAP_ALL) == iface_bitmap);
920955

956+
// Purge expired transfers before accepting a new one to make room in the queue.
957+
tx_purge_expired_transfers(tx, now);
958+
959+
// Promote staged transfers that are now eligible for retransmission to ensure fairness:
960+
// if they have the same priority as the new transfer, they should get a chance to go first.
961+
tx_promote_staged_transfers(tx, now);
962+
921963
// Construct the empty transfer object, without the frames for now. The frame spools will be constructed next.
922964
tx_transfer_t* const tr = mem_alloc(tx->memory.transfer, sizeof(tx_transfer_t));
923965
if (tr == NULL) {
@@ -1165,7 +1207,6 @@ bool udpard_tx_push(udpard_tx_t* const self,
11651207
((payload.bytes.data != NULL) || (payload.bytes.size == 0U)) &&
11661208
(tx_transfer_find(self, topic_hash, transfer_id) == NULL);
11671209
if (ok) {
1168-
udpard_tx_poll(self, now, UDPARD_IFACE_BITMAP_ALL);
11691210
const meta_t meta = {
11701211
.priority = priority,
11711212
.flag_ack = feedback != NULL,
@@ -1205,7 +1246,6 @@ bool udpard_tx_push_p2p(udpard_tx_t* const self,
12051246
bool ok = (self != NULL) && (deadline >= now) && (now >= 0) && (self->local_uid != 0) && (iface_bitmap != 0) &&
12061247
(priority < UDPARD_PRIORITY_COUNT) && ((payload.bytes.data != NULL) || (payload.bytes.size == 0U));
12071248
if (ok) {
1208-
udpard_tx_poll(self, now, UDPARD_IFACE_BITMAP_ALL);
12091249
// Serialize the P2P header and prepend it to the payload.
12101250
byte_t header[UDPARD_P2P_HEADER_BYTES];
12111251
byte_t* ptr = header;
@@ -1250,41 +1290,6 @@ bool udpard_tx_push_p2p(udpard_tx_t* const self,
12501290
return ok;
12511291
}
12521292

1253-
static void tx_purge_expired_transfers(udpard_tx_t* const self, const udpard_us_t now)
1254-
{
1255-
while (true) { // we can use next_greater instead of doing min search every time
1256-
tx_transfer_t* const tr = CAVL2_TO_OWNER(cavl2_min(self->index_deadline), tx_transfer_t, index_deadline);
1257-
if ((tr != NULL) && (now > tr->deadline)) {
1258-
tx_transfer_retire(self, tr, false);
1259-
self->errors_expiration++;
1260-
} else {
1261-
break;
1262-
}
1263-
}
1264-
}
1265-
1266-
static void tx_promote_staged_transfers(udpard_tx_t* const self, const udpard_us_t now)
1267-
{
1268-
while (true) { // we can use next_greater instead of doing min search every time
1269-
tx_transfer_t* const tr = CAVL2_TO_OWNER(cavl2_min(self->index_staged), tx_transfer_t, index_staged);
1270-
if ((tr != NULL) && (now >= tr->staged_until)) {
1271-
// Reinsert into the staged index at the new position, when the next attempt is due (if any).
1272-
cavl2_remove(&self->index_staged, &tr->index_staged);
1273-
tx_stage_if(self, tr);
1274-
// Enqueue for transmission unless it's been there since the last attempt (stalled interface?)
1275-
for (size_t i = 0; i < UDPARD_IFACE_COUNT_MAX; i++) {
1276-
if (((tr->iface_bitmap & (1U << i)) != 0) && !is_listed(&self->queue[i][tr->priority], &tr->queue[i])) {
1277-
UDPARD_ASSERT(tr->head[i] != NULL); // cannot stage without payload, doesn't make sense
1278-
UDPARD_ASSERT(tr->cursor[i] == tr->head[i]); // must have been rewound after last attempt
1279-
enlist_head(&self->queue[i][tr->priority], &tr->queue[i]);
1280-
}
1281-
}
1282-
} else {
1283-
break;
1284-
}
1285-
}
1286-
}
1287-
12881293
static void tx_eject_pending_frames(udpard_tx_t* const self, const udpard_us_t now, const uint_fast8_t ifindex)
12891294
{
12901295
while (true) {

libudpard/udpard.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,9 @@ typedef struct udpard_tx_ejection_t
392392
/// Virtual function table for the TX pipeline, to be provided by the application.
393393
typedef struct udpard_tx_vtable_t
394394
{
395-
/// Invoked from udpard_tx_poll() et al to push outgoing UDP datagrams into the socket/NIC driver.
395+
/// Invoked from udpard_tx_poll() to push outgoing UDP datagrams into the socket/NIC driver.
396+
/// It is GUARANTEED that ONLY udpard_tx_poll() can invoke this function; in particular, pushing new transfers
397+
/// will not trigger ejection callbacks.
396398
/// The callback must not mutate the TX pipeline (no udpard_tx_push/cancel/free).
397399
///
398400
/// The destination endpoint is provided only for P2P transfers; for multicast transfers, the application

0 commit comments

Comments
 (0)