Skip to content

Commit 8e0d1f9

Browse files
authored
refactor recv to a loop instead of recursion (#63)
1 parent 4903a28 commit 8e0d1f9

1 file changed

Lines changed: 95 additions & 110 deletions

File tree

src/receive.rs

Lines changed: 95 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ use uuid::Uuid;
3232

3333
use std::cmp::{Ordering, max};
3434
use std::collections::HashMap;
35-
use std::fmt;
3635
use std::io::Read;
3736
use std::net::{Ipv4Addr, SocketAddr};
3837
use std::time::{Duration, Instant};
38+
use std::{fmt, io};
3939

4040
/// Extra net imports required for the IPv6 handling on the linux side.
4141
#[cfg(not(target_os = "windows"))]
@@ -504,128 +504,113 @@ impl SacnReceiver {
504504
return Err(SacnError::NoDataUniversesRegistered());
505505
}
506506

507-
self.sequences.check_timeouts(self.announce_timeout)?;
508-
self.check_waiting_data_timeouts();
509-
507+
// if timeout is 0, then it's time to return
510508
if timeout == Some(Duration::from_secs(0)) {
511-
if cfg!(target_os = "windows") {
509+
// always check timeouts
510+
self.sequences.check_timeouts(self.announce_timeout)?;
511+
self.check_waiting_data_timeouts();
512+
return Err(io::Error::new(
512513
// Use the right expected error for the operating system.
513-
return Err(std::io::Error::new(
514-
std::io::ErrorKind::TimedOut,
515-
"No data available in given timeout",
516-
)
517-
.into());
518-
} else {
519-
return Err(std::io::Error::new(
520-
std::io::ErrorKind::WouldBlock,
521-
"No data available in given timeout",
522-
)
523-
.into());
524-
}
514+
if cfg!(target_os = "windows") {
515+
io::ErrorKind::TimedOut
516+
} else {
517+
io::ErrorKind::WouldBlock
518+
},
519+
"No data available in given timeout",
520+
)
521+
.into());
525522
}
526523

527-
// Forces the actual timeout used for receiving from the underlying network to never exceed E131_NETWORK_DATA_LOSS_TIMEOUT.
528-
// This means that the timeouts for the sequence numbers will be checked at least every E131_NETWORK_DATA_LOSS_TIMEOUT even if
529-
// recv is called with a longer timeout.
530-
let actual_timeout =
531-
if timeout.is_some() && timeout.unwrap() < E131_NETWORK_DATA_LOSS_TIMEOUT {
532-
timeout
533-
} else {
534-
Some(E131_NETWORK_DATA_LOSS_TIMEOUT)
535-
};
536-
537-
self.receiver.set_timeout(actual_timeout)?; // "Failed to sent a timeout value for the receiver"
538-
let start_time = Instant::now();
524+
// Fixed instant that should return the whole recv call
525+
let deadline = timeout.and_then(|t| Instant::now().checked_add(t));
539526

527+
// shared buf through loop iterations
540528
let mut buf: [u8; RCV_BUF_DEFAULT_SIZE] = [0; RCV_BUF_DEFAULT_SIZE];
541-
match self.receiver.recv(&mut buf) {
542-
Ok(pkt) => {
543-
let pdu: E131RootLayer = pkt.pdu;
544-
let data: E131RootLayerData = pdu.data;
545-
let res = match data {
546-
DataPacket(d) => self.handle_data_packet(pdu.cid, d)?,
547-
SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?,
548-
UniverseDiscoveryPacket(u) => {
549-
let discovered_src: Option<String> =
550-
self.handle_universe_discovery_packet(pdu.cid, u);
551-
if let Some(src) = discovered_src
552-
&& self.announce_source_discovery
553-
{
554-
return Err(SacnError::SourceDiscovered(src));
555-
};
556-
None
529+
530+
loop {
531+
self.sequences.check_timeouts(self.announce_timeout)?;
532+
self.check_waiting_data_timeouts();
533+
534+
// In the case of `timeout` being longer than `E131_NETWORK_DATA_LOSS_TIMEOUT`:
535+
// Forces the actual timeout used for receiving from the underlying network to never exceed E131_NETWORK_DATA_LOSS_TIMEOUT.
536+
// This means that the timeouts for the sequence numbers will be checked at least every E131_NETWORK_DATA_LOSS_TIMEOUT even if
537+
// recv is called with a longer timeout.
538+
let remaining = match deadline {
539+
None => None, // set to data loss timeout below so timeouts are checked again.
540+
Some(dl) => {
541+
let now = Instant::now();
542+
if now >= dl {
543+
// timeout expired
544+
return Err(io::Error::new(
545+
if cfg!(target_os = "windows") {
546+
io::ErrorKind::TimedOut
547+
} else {
548+
io::ErrorKind::WouldBlock
549+
},
550+
"No data available in given timeout",
551+
)
552+
.into());
557553
}
558-
};
554+
Some(dl - now)
555+
}
556+
};
557+
558+
let actual_timeout = if let Some(rem) = remaining {
559+
rem.min(E131_NETWORK_DATA_LOSS_TIMEOUT)
560+
} else {
561+
E131_NETWORK_DATA_LOSS_TIMEOUT
562+
};
559563

560-
match res {
561-
Some(r) => Ok(r),
562-
None => {
563-
// Indicates that there is no data ready to pass up yet even if a packet was received.
564-
// To stop recv blocking forever with a non-None timeout due to packets being received consistently (that reset the timeout)
565-
// within the receive timeout (e.g. universe discovery packets if the discovery interval < timeout) the timeout needs to be
566-
// adjusted to account for the time already taken.
567-
if let Some(timeout) = timeout {
568-
let elapsed = start_time.elapsed();
569-
match timeout.checked_sub(elapsed) {
570-
None => {
571-
// Indicates that elapsed is bigger than timeout so its time to return.
572-
Err(std::io::Error::new(
573-
std::io::ErrorKind::WouldBlock,
574-
"No data available in given timeout",
575-
)
576-
.into())
577-
}
578-
Some(new_timeout) => self.recv(Some(new_timeout)),
564+
self.receiver.set_timeout(Some(actual_timeout))?; // "Failed to set a timeout value for the receiver"
565+
566+
// Zero out the buffer before receiving. This may be redundant since recv should pack the whole buffer.
567+
buf.fill(0);
568+
569+
match self.receiver.recv(&mut buf) {
570+
Ok(pkt) => {
571+
let pdu: E131RootLayer = pkt.pdu;
572+
let data: E131RootLayerData = pdu.data;
573+
let res = match data {
574+
DataPacket(d) => self.handle_data_packet(pdu.cid, d)?,
575+
SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?,
576+
UniverseDiscoveryPacket(u) => {
577+
let discovered_src: Option<String> =
578+
self.handle_universe_discovery_packet(pdu.cid, u);
579+
if let Some(src) = discovered_src
580+
&& self.announce_source_discovery
581+
{
582+
return Err(SacnError::SourceDiscovered(src));
579583
}
580-
} else {
581-
// If the timeout was none then would keep looping till data is returned as the method should keep blocking till then.
582-
self.recv(timeout)
584+
None
583585
}
586+
};
587+
588+
// return the data, otherwise continue if no data is ready
589+
if let Some(r) = res {
590+
return Ok(r);
591+
} else {
592+
continue;
584593
}
585594
}
586-
}
587-
Err(err) => {
588-
match err {
589-
SacnError::Io(ref s) => {
590-
match s.kind() {
591-
// Windows and Unix use different error types (WouldBlock/TimedOut) for the same error.
592-
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
593-
if let Some(timeout) = timeout {
594-
let elapsed = start_time.elapsed();
595-
match timeout.checked_sub(elapsed) {
596-
None => {
597-
// Indicates that elapsed is bigger than timeout so its time to return.
598-
if cfg!(target_os = "windows") {
599-
// Use the right expected error for the operating system.
600-
Err(std::io::Error::new(
601-
std::io::ErrorKind::TimedOut,
602-
"No data available in given timeout",
603-
)
604-
.into())
605-
} else {
606-
Err(std::io::Error::new(
607-
std::io::ErrorKind::WouldBlock,
608-
"No data available in given timeout",
609-
)
610-
.into())
611-
}
612-
}
613-
Some(new_timeout) => self.recv(Some(new_timeout)),
614-
}
615-
} else {
616-
// If the timeout was none then would keep looping till data is returned as the method should keep blocking till then.
617-
self.recv(timeout)
618-
}
619-
}
620-
_ => {
621-
// Not a timeout/wouldblock error meaning the recv should stop with the given error.
622-
Err(err)
623-
}
595+
596+
Err(err) =>
597+
// This could be the socket-level timeout error or other socket recv error.
598+
{
599+
match err {
600+
// Windows and Unix use different error types (WouldBlock/TimedOut) for the same error.
601+
SacnError::Io(ref s)
602+
if matches!(
603+
s.kind(),
604+
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
605+
) =>
606+
{
607+
// socket read timedout. start new loop to compute new remaining which will return if deadline has passed
608+
continue;
609+
}
610+
_ => {
611+
// Not a timeout/wouldblock error meaning the recv should stop with the given error.
612+
return Err(err);
624613
}
625-
}
626-
_ => {
627-
// Not a timeout/wouldblock error meaning the recv should stop with the given error.
628-
Err(err)
629614
}
630615
}
631616
}

0 commit comments

Comments
 (0)