diff --git a/src/connections/stream_buffer.rs b/src/connections/stream_buffer.rs index 129ea77..78e228c 100644 --- a/src/connections/stream_buffer.rs +++ b/src/connections/stream_buffer.rs @@ -14,6 +14,7 @@ use super::wrappers::encoded_data::IncomingStreamData; pub struct StreamBuffer { buffer: Vec, decoded_packet_tx: UnboundedSender, + log_line: Vec, } /// An enum that represents the possible errors that can occur when processing @@ -49,6 +50,7 @@ impl StreamBuffer { StreamBuffer { buffer: vec![], decoded_packet_tx, + log_line: vec![], } } @@ -79,12 +81,14 @@ impl StreamBuffer { Ok(packet) => packet, Err(err) => match err { StreamBufferError::MissingHeaderBytes => { - error!("Could not find header sequence [0x94, 0xc3], purging buffer and waiting for more data"); + trace!( + "Could not find header sequence [0x94, 0xc3], waiting for more data" + ); break; // Wait for more data } StreamBufferError::IncorrectFramingByte { found_framing_byte } => { - error!( + debug!( "Byte {found_framing_byte} not equal to 0xc3, waiting for more data" ); @@ -94,19 +98,19 @@ impl StreamBuffer { buffer_size, packet_size, } => { - error!( + trace!( "Incomplete packet data, expected {packet_size} bytes, found {buffer_size} bytes" ); break; // Wait for more data } StreamBufferError::MissingMSB { msb_index } => { - error!("Could not find MSB at index {msb_index}, waiting for more data"); + trace!("Could not find MSB at index {msb_index}, waiting for more data"); break; // Wait for more data } StreamBufferError::MissingLSB { lsb_index } => { - error!("Could not find LSB at index {lsb_index}, waiting for more data"); + trace!("Could not find LSB at index {lsb_index}, waiting for more data"); break; // Wait for more data } @@ -185,18 +189,46 @@ impl StreamBuffer { Ok(decoded_packet) } + fn handle_log_bytes(&mut self, idx: usize) { + let bytes = &self.buffer[0..idx]; + for &b in bytes { + if b == b'\n' { + if !self.log_line.is_empty() { + let line = String::from_utf8_lossy(&self.log_line); + debug!("Radio: {}", line); + self.log_line.clear(); + } + } else if b != b'\r' { + // ignore carriage return + self.log_line.push(b); + } + } + } + fn shift_buffer_to_first_valid_header(&mut self) -> Result<(), StreamBufferError> { - let framing_index = Self::find_framing_index(&self.buffer).ok_or_else(|| { - self.buffer.clear(); // Clear buffer since no packets exist - StreamBufferError::MissingHeaderBytes - })?; - - if framing_index != 0 { - debug!("Found framing byte at index {framing_index}, shifting buffer"); - self.buffer.drain(0..framing_index); - trace!("Buffer after shifting: {:?}", self.buffer); + let framing_index = Self::find_framing_index(&self.buffer); + + match framing_index { + Some(idx) => { + if idx > 0 { + self.handle_log_bytes(idx); + self.buffer.drain(0..idx); + } + Ok(()) + } + None => { + if !self.buffer.is_empty() { + let ends_with_start1 = self.buffer.last() == Some(&0x94); + let log_len = self.buffer.len() - ends_with_start1 as usize; + + if log_len > 0 { + self.handle_log_bytes(log_len); + self.buffer.drain(0..log_len); + } + } + Err(StreamBufferError::MissingHeaderBytes) + } } - Ok(()) } // All valid packets start with the sequence [0x94 0xc3 size_msb size_lsb], where @@ -686,5 +718,63 @@ mod tests { #[tokio::test] async fn detect_malformed_packets_with_internal_header_sequence() {} + #[tokio::test] + async fn process_log_lines() { + let (mock_tx, mut _mock_rx) = unbounded_channel::(); + let mut buffer = StreamBuffer::new(mock_tx); + + // Feed some complete log lines with a trailing newline + let log_data = b"[DeviceTelemetry] Send packet to mesh\nSecond line of logs\n"; + buffer.process_incoming_bytes(log_data.to_vec().into()); + + // Assert buffer is empty and log_line is cleared + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line.len(), 0); + } + + #[tokio::test] + async fn process_partial_log_line() { + let (mock_tx, mut _mock_rx) = unbounded_channel::(); + let mut buffer = StreamBuffer::new(mock_tx); + + // Feed in a partial log line (no newline at the end) + let log_data = b"Hello, partial log"; + buffer.process_incoming_bytes(log_data.to_vec().into()); + + // Assert buffer is empty (because it was consumed) and log_line contains the partial data + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line, b"Hello, partial log".to_vec()); + + // Feed in the rest of the log line + buffer.process_incoming_bytes(b" rest\n".to_vec().into()); + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line.len(), 0); + } + + #[tokio::test] + async fn process_interleaved_logs_and_packet() { + let payload_variant_1 = + protobufs::from_radio::PayloadVariant::MyInfo(protobufs::MyNodeInfo::default()); + + let (packet_1, packet_data_1) = mock_encoded_from_radio_packet(payload_variant_1, None); + let encoded_packet_1 = format_data_packet(packet_data_1.into()).unwrap(); + + let (mock_tx, mut mock_rx) = unbounded_channel::(); + let mut buffer = StreamBuffer::new(mock_tx); + + // Interleave log line, a valid packet, and another log line + let mut data = Vec::new(); + data.extend_from_slice(b"[RadioIf] Can not send yet, busyRx\n"); + data.extend_from_slice(encoded_packet_1.data()); + data.extend_from_slice(b"Some device log after packet\n"); + buffer.process_incoming_bytes(data.into()); + + // The packet should be decoded and sent + assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(packet_1)); + // The buffer and log_line should be fully processed and empty + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line.len(), 0); + } + // TODO need to test that we update the framing index after shifting the buffer }