Skip to content

Commit a135d38

Browse files
committed
chore: Cleanup non-smp serial data handling.
Makes the buffer processing hopefully a bit more clear. Non-SMP data is no longer logged to stdout but can be accessed with the read_serial() function.
1 parent 334f5bc commit a135d38

2 files changed

Lines changed: 314 additions & 187 deletions

File tree

smpclient/transport/serial.py

Lines changed: 159 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -42,30 +42,17 @@ class SMPSerialTransport(SMPTransport):
4242
_POLLING_INTERVAL_S = 0.005
4343
_CONNECTION_RETRY_INTERVAL_S = 0.500
4444

45-
class _ReadBuffer:
46-
"""The state of the read buffer."""
47-
48-
@unique
49-
class State(IntEnum):
50-
SMP = 0
51-
"""An SMP start or continue delimiter has been received and the
52-
`smp_buffer` is being filled with the remainder of the SMP packet.
53-
"""
54-
55-
SER = 1
56-
"""The SMP start delimiter has not been received and the
57-
`ser_buffer` is being filled with data.
58-
"""
59-
60-
def __init__(self) -> None:
61-
self.smp = bytearray([])
62-
"""The buffer for the SMP packet."""
63-
64-
self.ser = bytearray([])
65-
"""The buffer for serial data that is not part of an SMP packet."""
45+
@unique
46+
class BufferState(IntEnum):
47+
SMP = 0
48+
"""An SMP start or continue delimiter has been received and
49+
`_buffer` is being parsed as an SMP packet.
50+
"""
6651

67-
self.state = SMPSerialTransport._ReadBuffer.State.SER
68-
"""The state of the read buffer."""
52+
SERIAL = 1
53+
"""The SMP start delimiter has not been received and
54+
`_buffer` is being parsed as serial data.
55+
"""
6956

7057
def __init__( # noqa: DOC301
7158
self,
@@ -131,17 +118,36 @@ def __init__( # noqa: DOC301
131118
inter_byte_timeout=inter_byte_timeout,
132119
exclusive=exclusive,
133120
)
134-
self._buffer = SMPSerialTransport._ReadBuffer()
121+
122+
self._smp_packet_queue: asyncio.Queue[bytes] = asyncio.Queue()
123+
"""Contains full SMP packets."""
124+
self._serial_buffer = bytearray()
125+
"""Contains any non-SMP serial data."""
126+
self._buffer: bytearray = bytearray([])
127+
"""Contains all incoming data (serial + SMP intertwined, may be incomplete)."""
128+
self._buffer_state = SMPSerialTransport.BufferState.SERIAL
129+
"""The state of the read buffer."""
130+
135131
logger.debug(f"Initialized {self.__class__.__name__}")
136132

133+
def _reset_state(self) -> None:
134+
"""Reset internal state and queues for a fresh connection."""
135+
136+
self._smp_packet_queue = asyncio.Queue()
137+
self._serial_buffer.clear()
138+
self._buffer = bytearray([])
139+
self._buffer_state = SMPSerialTransport.BufferState.SERIAL
140+
137141
@override
138142
async def connect(self, address: str, timeout_s: float) -> None:
143+
self._reset_state()
139144
self._conn.port = address
140145
logger.debug(f"Connecting to {self._conn.port=}")
141146
start_time: Final = time.time()
142147
while time.time() - start_time <= timeout_s:
143148
try:
144149
self._conn.open()
150+
self._conn.reset_input_buffer()
145151
logger.debug(f"Connected to {self._conn.port=}")
146152
return
147153
except SerialException as e:
@@ -189,112 +195,145 @@ async def receive(self) -> bytes:
189195

190196
logger.debug("Waiting for response")
191197
while True:
198+
b = await self._read_one_smp_packet()
192199
try:
193-
b = await self._readuntil()
194200
decoder.send(b)
195201
except StopIteration as e:
196202
logger.debug(f"Finished receiving {len(e.value)} byte response")
197203
return e.value
198-
except SerialException as e:
199-
logger.error(f"Failed to receive response: {e}")
200-
raise SMPTransportDisconnected(
201-
f"{self.__class__.__name__} disconnected from {self._conn.port}"
202-
)
203204

204-
async def _readuntil(self) -> bytes:
205-
"""Read `bytes` until the `delimiter` then return the `bytes` including the `delimiter`."""
205+
async def _read_one_smp_packet(self) -> bytes:
206+
"""Returns one received SMP packet from the queue.
207+
Raises `SMPTransportDisconnected` if disconnected"""
208+
if not self._smp_packet_queue.empty():
209+
# There may already be a response in the queue, if for some reason we've received
210+
# multiple responses and haven't read them in-between. This is not standard but
211+
# it is possible, and easier to implement this way.
212+
return self._smp_packet_queue.get_nowait()
213+
214+
await self._read_and_process(read_until_one_smp_packet=True)
215+
return self._smp_packet_queue.get_nowait()
216+
217+
async def read_serial(self, delimiter: bytes | None = None) -> bytes:
218+
"""Drain regular serial traffic (non-SMP bytes) until given delimiter.
219+
Returns all available bytes if no delimiter is given.
220+
May return empty bytes if nothing has been received."""
221+
await self._read_and_process(read_until_one_smp_packet=False)
222+
if delimiter is None:
223+
res = bytes(self._serial_buffer)
224+
self._serial_buffer.clear()
225+
return res
226+
else:
227+
try:
228+
first_match, remaining_data = self._serial_buffer.split(delimiter, 1)
229+
except ValueError:
230+
return bytes()
231+
self._serial_buffer = remaining_data
232+
return bytes(first_match)
233+
234+
async def _read_and_process(self, read_until_one_smp_packet: bool) -> None:
235+
"""Reads raw data from serial and processes it into SMP packets and regular serial data."""
236+
while True:
237+
try:
238+
data = self._conn.read_all() or b""
239+
except StopIteration:
240+
data = b""
241+
except SerialException as exc:
242+
raise SMPTransportDisconnected(f"Failed to read from {self._conn.port}: {exc}")
243+
244+
if data:
245+
self._buffer.extend(data)
246+
await self._process_buffer()
247+
else:
248+
await asyncio.sleep(SMPSerialTransport._POLLING_INTERVAL_S)
206249

207-
START_DELIMITER: Final = smppacket.SIXTY_NINE
208-
CONTINUE_DELIMITER: Final = smppacket.FOUR_TWENTY
209-
END_DELIMITER: Final = b"\n"
250+
if read_until_one_smp_packet:
251+
if self._smp_packet_queue.qsize():
252+
break # Packet found; exit early
253+
else:
254+
# Just polling serial data
255+
break
210256

211-
# fake async until I get around to replacing pyserial
257+
async def _process_buffer(self) -> None:
258+
"""Process buffered data until more bytes are needed."""
212259

213-
i_smp_start = 0
214-
i_smp_end = 0
215-
i_start: int | None = None
216-
i_continue: int | None = None
217260
while True:
218-
if self._buffer.state == SMPSerialTransport._ReadBuffer.State.SER:
219-
# read the entire OS buffer
220-
try:
221-
self._buffer.ser.extend(self._conn.read_all() or [])
222-
except StopIteration:
223-
pass
224-
225-
try: # search the buffer for the index of the start delimiter
226-
i_start = self._buffer.ser.index(START_DELIMITER)
227-
except ValueError:
228-
i_start = None
229-
230-
try: # search the buffer for the index of the continue delimiter
231-
i_continue = self._buffer.ser.index(CONTINUE_DELIMITER)
232-
except ValueError:
233-
i_continue = None
234-
235-
if i_start is not None and i_continue is not None:
236-
i_smp_start = min(i_start, i_continue)
237-
elif i_start is not None:
238-
i_smp_start = i_start
239-
elif i_continue is not None:
240-
i_smp_start = i_continue
241-
else: # no delimiters found yet, clear non SMP data and wait
242-
while True:
243-
try: # search the buffer for newline characters
244-
i = self._buffer.ser.index(b"\n")
245-
try: # log as a string if possible
246-
logger.warning(
247-
f"{self._conn.port}: {self._buffer.ser[:i].decode()}"
248-
)
249-
except UnicodeDecodeError: # log as bytes if not
250-
logger.warning(f"{self._conn.port}: {self._buffer.ser[:i].hex()}")
251-
self._buffer.ser = self._buffer.ser[i + 1 :]
252-
except ValueError:
253-
break
254-
await asyncio.sleep(SMPSerialTransport._POLLING_INTERVAL_S)
255-
continue
256-
257-
if i_smp_start != 0: # log the rest of the serial buffer
258-
try: # log as a string if possible
259-
logger.warning(
260-
f"{self._conn.port}: {self._buffer.ser[:i_smp_start].decode()}"
261-
)
262-
except UnicodeDecodeError: # log as bytes if not
263-
logger.warning(f"{self._conn.port}: {self._buffer.ser[:i_smp_start].hex()}")
264-
265-
self._buffer.smp = self._buffer.ser[i_smp_start:]
266-
self._buffer.ser.clear()
267-
self._buffer.state = SMPSerialTransport._ReadBuffer.State.SMP
268-
i_smp_end = 0
269-
270-
# don't await since the buffer may already contain the end delimiter
271-
272-
elif self._buffer.state == SMPSerialTransport._ReadBuffer.State.SMP:
273-
# read the entire OS buffer
274-
try:
275-
self._buffer.smp.extend(self._conn.read_all() or [])
276-
except StopIteration:
277-
pass
278-
279-
try: # search the buffer for the index of the delimiter
280-
i_smp_end = self._buffer.smp.index(END_DELIMITER, i_smp_end) + len(
281-
END_DELIMITER
282-
)
283-
except ValueError: # delimiter not found yet, wait
284-
await asyncio.sleep(SMPSerialTransport._POLLING_INTERVAL_S)
285-
continue
286-
287-
# out is everything up to and including the delimiter
288-
out = self._buffer.smp[:i_smp_end]
289-
logger.debug(f"Received {len(out)} byte chunk")
290-
291-
# there may be some leftover to save for the next read, but
292-
# it's not necessarily SMP data
293-
self._buffer.ser = self._buffer.smp[i_smp_end:]
294-
295-
self._buffer.state = SMPSerialTransport._ReadBuffer.State.SER
296-
297-
return out
261+
if self._buffer_state == SMPSerialTransport.BufferState.SERIAL:
262+
should_continue = await self._process_buffer_as_serial_data()
263+
else:
264+
should_continue = await self._process_buffer_as_smp_data()
265+
266+
if not should_continue:
267+
break
268+
269+
async def _process_buffer_as_serial_data(self) -> bool:
270+
"""Handle non-SMP data and transition to SMP state when finding SMP frame-start delimiters.
271+
Return True if further data remains to process in the buffer; return False otherwise."""
272+
273+
if not self._buffer:
274+
return False
275+
276+
smp_packet_start: int = self._find_smp_packet_start(self._buffer)
277+
if smp_packet_start >= 0:
278+
serial_data, remaining_data = (
279+
self._buffer[:smp_packet_start],
280+
self._buffer[smp_packet_start:],
281+
)
282+
self._serial_buffer.extend(serial_data)
283+
284+
self._buffer = remaining_data
285+
self._buffer_state = SMPSerialTransport.BufferState.SMP
286+
return True
287+
288+
# No complete delimiter found - everything is serial data, with one rare edge
289+
# case: last byte of buffer could be an incomplete delimiter - must preserve it for now.
290+
if self._could_be_smp_packet_start(self._buffer[-1]):
291+
self._serial_buffer.extend(self._buffer[:-1])
292+
self._buffer = self._buffer[-1:]
293+
else:
294+
self._serial_buffer.extend(self._buffer)
295+
self._buffer.clear()
296+
return False
297+
298+
async def _process_buffer_as_smp_data(self) -> bool:
299+
"""Handle SMP data and transition to SERIAL state when finding SMP frame-end delimiter.
300+
Return True if further data remains to process in the buffer; return False otherwise."""
301+
302+
smp_packet_end: int = self._buffer.find(smppacket.END_DELIMITER)
303+
if smp_packet_end == -1:
304+
return False
305+
smp_packet_end += len(smppacket.END_DELIMITER)
306+
307+
smp_data, remaining_data = (
308+
self._buffer[:smp_packet_end],
309+
self._buffer[smp_packet_end:],
310+
)
311+
await self._smp_packet_queue.put(bytes(smp_data))
312+
313+
self._buffer = remaining_data
314+
# Even if the remaining data is actually SMP data, then the next serial parse
315+
# will simply put us right back into SMP state - no need to check here.
316+
self._buffer_state = SMPSerialTransport.BufferState.SERIAL
317+
318+
return bool(self._buffer)
319+
320+
def _find_smp_packet_start(self, buf: bytearray) -> int:
321+
"""Return index of the earliest SMP frame-start delimiter, if any; -1 if none found."""
322+
323+
indices = [
324+
i
325+
for i in (
326+
buf.find(smppacket.START_DELIMITER),
327+
buf.find(smppacket.CONTINUE_DELIMITER),
328+
)
329+
if i != -1
330+
]
331+
return min(indices) if indices else -1
332+
333+
def _could_be_smp_packet_start(self, byte: int) -> bool:
334+
"""Return True if the given byte value matches the start of any SMP packet delimiter."""
335+
336+
return byte == smppacket.START_DELIMITER[0] or byte == smppacket.CONTINUE_DELIMITER[0]
298337

299338
@override
300339
async def send_and_receive(self, data: bytes) -> bytes:

0 commit comments

Comments
 (0)