From 8ee145d406794c01ca6c3fcf60238050821d68dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Pol=C3=A1=C4=8Dek?= Date: Wed, 20 May 2026 23:27:27 +0200 Subject: [PATCH] Fix polluted serial logs The device sends text log messages, they should be filtered out. They are expected, so they are not errors. We show them as log messages on the debug level. --- src/connections/stream_buffer.rs | 120 +++++++++++++++++++++++++++---- 1 file changed, 105 insertions(+), 15 deletions(-) diff --git a/src/connections/stream_buffer.rs b/src/connections/stream_buffer.rs index 129ea77..78e228c 100644 --- a/src/connections/stream_buffer.rs +++ b/src/connections/stream_buffer.rs @@ -14,6 +14,7 @@ use super::wrappers::encoded_data::IncomingStreamData; pub struct StreamBuffer { buffer: Vec, decoded_packet_tx: UnboundedSender, + log_line: Vec, } /// An enum that represents the possible errors that can occur when processing @@ -49,6 +50,7 @@ impl StreamBuffer { StreamBuffer { buffer: vec![], decoded_packet_tx, + log_line: vec![], } } @@ -79,12 +81,14 @@ impl StreamBuffer { Ok(packet) => packet, Err(err) => match err { StreamBufferError::MissingHeaderBytes => { - error!("Could not find header sequence [0x94, 0xc3], purging buffer and waiting for more data"); + trace!( + "Could not find header sequence [0x94, 0xc3], waiting for more data" + ); break; // Wait for more data } StreamBufferError::IncorrectFramingByte { found_framing_byte } => { - error!( + debug!( "Byte {found_framing_byte} not equal to 0xc3, waiting for more data" ); @@ -94,19 +98,19 @@ impl StreamBuffer { buffer_size, packet_size, } => { - error!( + trace!( "Incomplete packet data, expected {packet_size} bytes, found {buffer_size} bytes" ); break; // Wait for more data } StreamBufferError::MissingMSB { msb_index } => { - error!("Could not find MSB at index {msb_index}, waiting for more data"); + trace!("Could not find MSB at index {msb_index}, waiting for more data"); break; // Wait for more data } StreamBufferError::MissingLSB { lsb_index } => { - error!("Could not find LSB at index {lsb_index}, waiting for more data"); + trace!("Could not find LSB at index {lsb_index}, waiting for more data"); break; // Wait for more data } @@ -185,18 +189,46 @@ impl StreamBuffer { Ok(decoded_packet) } + fn handle_log_bytes(&mut self, idx: usize) { + let bytes = &self.buffer[0..idx]; + for &b in bytes { + if b == b'\n' { + if !self.log_line.is_empty() { + let line = String::from_utf8_lossy(&self.log_line); + debug!("Radio: {}", line); + self.log_line.clear(); + } + } else if b != b'\r' { + // ignore carriage return + self.log_line.push(b); + } + } + } + fn shift_buffer_to_first_valid_header(&mut self) -> Result<(), StreamBufferError> { - let framing_index = Self::find_framing_index(&self.buffer).ok_or_else(|| { - self.buffer.clear(); // Clear buffer since no packets exist - StreamBufferError::MissingHeaderBytes - })?; - - if framing_index != 0 { - debug!("Found framing byte at index {framing_index}, shifting buffer"); - self.buffer.drain(0..framing_index); - trace!("Buffer after shifting: {:?}", self.buffer); + let framing_index = Self::find_framing_index(&self.buffer); + + match framing_index { + Some(idx) => { + if idx > 0 { + self.handle_log_bytes(idx); + self.buffer.drain(0..idx); + } + Ok(()) + } + None => { + if !self.buffer.is_empty() { + let ends_with_start1 = self.buffer.last() == Some(&0x94); + let log_len = self.buffer.len() - ends_with_start1 as usize; + + if log_len > 0 { + self.handle_log_bytes(log_len); + self.buffer.drain(0..log_len); + } + } + Err(StreamBufferError::MissingHeaderBytes) + } } - Ok(()) } // All valid packets start with the sequence [0x94 0xc3 size_msb size_lsb], where @@ -686,5 +718,63 @@ mod tests { #[tokio::test] async fn detect_malformed_packets_with_internal_header_sequence() {} + #[tokio::test] + async fn process_log_lines() { + let (mock_tx, mut _mock_rx) = unbounded_channel::(); + let mut buffer = StreamBuffer::new(mock_tx); + + // Feed some complete log lines with a trailing newline + let log_data = b"[DeviceTelemetry] Send packet to mesh\nSecond line of logs\n"; + buffer.process_incoming_bytes(log_data.to_vec().into()); + + // Assert buffer is empty and log_line is cleared + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line.len(), 0); + } + + #[tokio::test] + async fn process_partial_log_line() { + let (mock_tx, mut _mock_rx) = unbounded_channel::(); + let mut buffer = StreamBuffer::new(mock_tx); + + // Feed in a partial log line (no newline at the end) + let log_data = b"Hello, partial log"; + buffer.process_incoming_bytes(log_data.to_vec().into()); + + // Assert buffer is empty (because it was consumed) and log_line contains the partial data + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line, b"Hello, partial log".to_vec()); + + // Feed in the rest of the log line + buffer.process_incoming_bytes(b" rest\n".to_vec().into()); + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line.len(), 0); + } + + #[tokio::test] + async fn process_interleaved_logs_and_packet() { + let payload_variant_1 = + protobufs::from_radio::PayloadVariant::MyInfo(protobufs::MyNodeInfo::default()); + + let (packet_1, packet_data_1) = mock_encoded_from_radio_packet(payload_variant_1, None); + let encoded_packet_1 = format_data_packet(packet_data_1.into()).unwrap(); + + let (mock_tx, mut mock_rx) = unbounded_channel::(); + let mut buffer = StreamBuffer::new(mock_tx); + + // Interleave log line, a valid packet, and another log line + let mut data = Vec::new(); + data.extend_from_slice(b"[RadioIf] Can not send yet, busyRx\n"); + data.extend_from_slice(encoded_packet_1.data()); + data.extend_from_slice(b"Some device log after packet\n"); + buffer.process_incoming_bytes(data.into()); + + // The packet should be decoded and sent + assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(packet_1)); + // The buffer and log_line should be fully processed and empty + assert_eq!(buffer.buffer.len(), 0); + assert_eq!(buffer.log_line.len(), 0); + } + // TODO need to test that we update the framing index after shifting the buffer }