Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 105 additions & 15 deletions src/connections/stream_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use super::wrappers::encoded_data::IncomingStreamData;
pub struct StreamBuffer {
buffer: Vec<u8>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
log_line: Vec<u8>,
}

/// An enum that represents the possible errors that can occur when processing
Expand Down Expand Up @@ -49,6 +50,7 @@ impl StreamBuffer {
StreamBuffer {
buffer: vec![],
decoded_packet_tx,
log_line: vec![],
}
}

Expand Down Expand Up @@ -79,12 +81,14 @@ impl StreamBuffer {
Ok(packet) => packet,
Err(err) => match err {
StreamBufferError::MissingHeaderBytes => {
error!("Could not find header sequence [0x94, 0xc3], purging buffer and waiting for more data");
trace!(
"Could not find header sequence [0x94, 0xc3], waiting for more data"
);

break; // Wait for more data
}
StreamBufferError::IncorrectFramingByte { found_framing_byte } => {
error!(
debug!(
"Byte {found_framing_byte} not equal to 0xc3, waiting for more data"
);

Expand All @@ -94,19 +98,19 @@ impl StreamBuffer {
buffer_size,
packet_size,
} => {
error!(
trace!(
"Incomplete packet data, expected {packet_size} bytes, found {buffer_size} bytes"
);

break; // Wait for more data
}
StreamBufferError::MissingMSB { msb_index } => {
error!("Could not find MSB at index {msb_index}, waiting for more data");
trace!("Could not find MSB at index {msb_index}, waiting for more data");

break; // Wait for more data
}
StreamBufferError::MissingLSB { lsb_index } => {
error!("Could not find LSB at index {lsb_index}, waiting for more data");
trace!("Could not find LSB at index {lsb_index}, waiting for more data");

break; // Wait for more data
}
Expand Down Expand Up @@ -185,18 +189,46 @@ impl StreamBuffer {
Ok(decoded_packet)
}

fn handle_log_bytes(&mut self, idx: usize) {
let bytes = &self.buffer[0..idx];
for &b in bytes {
if b == b'\n' {
if !self.log_line.is_empty() {
let line = String::from_utf8_lossy(&self.log_line);
debug!("Radio: {}", line);
self.log_line.clear();
}
} else if b != b'\r' {
// ignore carriage return
self.log_line.push(b);
}
}
}

fn shift_buffer_to_first_valid_header(&mut self) -> Result<(), StreamBufferError> {
let framing_index = Self::find_framing_index(&self.buffer).ok_or_else(|| {
self.buffer.clear(); // Clear buffer since no packets exist
StreamBufferError::MissingHeaderBytes
})?;

if framing_index != 0 {
debug!("Found framing byte at index {framing_index}, shifting buffer");
self.buffer.drain(0..framing_index);
trace!("Buffer after shifting: {:?}", self.buffer);
let framing_index = Self::find_framing_index(&self.buffer);

match framing_index {
Some(idx) => {
if idx > 0 {
self.handle_log_bytes(idx);
self.buffer.drain(0..idx);
}
Ok(())
}
None => {
if !self.buffer.is_empty() {
let ends_with_start1 = self.buffer.last() == Some(&0x94);
let log_len = self.buffer.len() - ends_with_start1 as usize;

if log_len > 0 {
self.handle_log_bytes(log_len);
self.buffer.drain(0..log_len);
}
}
Err(StreamBufferError::MissingHeaderBytes)
}
}
Ok(())
}

// All valid packets start with the sequence [0x94 0xc3 size_msb size_lsb], where
Expand Down Expand Up @@ -686,5 +718,63 @@ mod tests {
#[tokio::test]
async fn detect_malformed_packets_with_internal_header_sequence() {}

#[tokio::test]
async fn process_log_lines() {
let (mock_tx, mut _mock_rx) = unbounded_channel::<protobufs::FromRadio>();
let mut buffer = StreamBuffer::new(mock_tx);

// Feed some complete log lines with a trailing newline
let log_data = b"[DeviceTelemetry] Send packet to mesh\nSecond line of logs\n";
buffer.process_incoming_bytes(log_data.to_vec().into());

// Assert buffer is empty and log_line is cleared
assert_eq!(buffer.buffer.len(), 0);
assert_eq!(buffer.log_line.len(), 0);
}

#[tokio::test]
async fn process_partial_log_line() {
let (mock_tx, mut _mock_rx) = unbounded_channel::<protobufs::FromRadio>();
let mut buffer = StreamBuffer::new(mock_tx);

// Feed in a partial log line (no newline at the end)
let log_data = b"Hello, partial log";
buffer.process_incoming_bytes(log_data.to_vec().into());

// Assert buffer is empty (because it was consumed) and log_line contains the partial data
assert_eq!(buffer.buffer.len(), 0);
assert_eq!(buffer.log_line, b"Hello, partial log".to_vec());

// Feed in the rest of the log line
buffer.process_incoming_bytes(b" rest\n".to_vec().into());
assert_eq!(buffer.buffer.len(), 0);
assert_eq!(buffer.log_line.len(), 0);
}

#[tokio::test]
async fn process_interleaved_logs_and_packet() {
let payload_variant_1 =
protobufs::from_radio::PayloadVariant::MyInfo(protobufs::MyNodeInfo::default());

let (packet_1, packet_data_1) = mock_encoded_from_radio_packet(payload_variant_1, None);
let encoded_packet_1 = format_data_packet(packet_data_1.into()).unwrap();

let (mock_tx, mut mock_rx) = unbounded_channel::<protobufs::FromRadio>();
let mut buffer = StreamBuffer::new(mock_tx);

// Interleave log line, a valid packet, and another log line
let mut data = Vec::new();
data.extend_from_slice(b"[RadioIf] Can not send yet, busyRx\n");
data.extend_from_slice(encoded_packet_1.data());
data.extend_from_slice(b"Some device log after packet\n");
buffer.process_incoming_bytes(data.into());

// The packet should be decoded and sent
assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(packet_1));
// The buffer and log_line should be fully processed and empty
assert_eq!(buffer.buffer.len(), 0);
assert_eq!(buffer.log_line.len(), 0);
}

// TODO need to test that we update the framing index after shifting the buffer
}
Loading