diff --git a/agent/test_agent.c b/agent/test_agent.c index 157c4d2..e6af6e1 100644 --- a/agent/test_agent.c +++ b/agent/test_agent.c @@ -342,6 +342,183 @@ static void test_cobs_matches_python(void) { ASSERT(mock_tx[i] != 0x00, "python compat: no internal zeros"); } +/* ---------- Regression tests ---------- */ + +/* + * Bug: cobs_decode() stripped trailing 0x00 bytes. When a packet's CRC32 + * had 0x00 as the MSB, the decoded output was 1 byte short → CRC mismatch. + * This caused ~1/256 packets to fail deterministically based on data content. + * Root cause of ALL flash write failures before the fix. + */ +static void test_cobs_trailing_zero_preserved(void) { + /* Build payloads where CRC32 MSB is 0x00 (trailing zero after LE encode). + * The COBS roundtrip must preserve the trailing zero. */ + for (int trial = 0; trial < 1000; trial++) { + /* Construct a packet: [cmd] [data...] [crc32 LE] */ + uint8_t payload[64]; + uint32_t plen = 5 + (trial % 20); /* varying lengths */ + payload[0] = 0x82; /* RSP_DATA */ + for (uint32_t i = 1; i < plen; i++) + payload[i] = (uint8_t)((trial * 7 + i * 13) & 0xFF); + + /* Compute CRC and append */ + uint32_t c = crc32(0, payload, plen); + payload[plen + 0] = (c >> 0) & 0xFF; + payload[plen + 1] = (c >> 8) & 0xFF; + payload[plen + 2] = (c >> 16) & 0xFF; + payload[plen + 3] = (c >> 24) & 0xFF; + uint32_t total = plen + 4; + + /* COBS encode → decode roundtrip */ + uint8_t enc[256], dec[256]; + uint32_t enc_len = cobs_encode(payload, total, enc); + uint32_t dec_len = cobs_decode(enc, enc_len, dec); + + ASSERT(dec_len == total, "cobs trailing zero: length preserved"); + ASSERT(memcmp(payload, dec, total) == 0, + "cobs trailing zero: data preserved"); + } +} + +/* + * Bug: CRC32 extraction used `cp[3] << 24` where cp[3] >= 128. + * uint8_t promotes to int, and left-shifting a signed int into the sign + * bit is undefined behavior. With ASAN this manifests as a runtime error. + * Fix: cast to (uint32_t) before shifting. + */ +static void test_crc32_high_byte_shift(void) { + /* Test CRC32 values where the MSB is >= 0x80 (signed bit set). + * These exercise the (uint32_t) cast in CRC extraction. */ + uint8_t data_a[] = {0x03, 0x00, 0x00}; /* CMD_WRITE + padding */ + uint32_t c = crc32(0, data_a, 3); + + /* Verify the CRC packs/unpacks correctly through all 4 bytes */ + uint8_t packed[4]; + packed[0] = (c >> 0) & 0xFF; + packed[1] = (c >> 8) & 0xFF; + packed[2] = (c >> 16) & 0xFF; + packed[3] = (c >> 24) & 0xFF; + + uint32_t unpacked = (uint32_t)packed[0] + | ((uint32_t)packed[1] << 8) + | ((uint32_t)packed[2] << 16) + | ((uint32_t)packed[3] << 24); + ASSERT(unpacked == c, "crc32 high byte: pack/unpack roundtrip"); + + /* Test with every possible MSB value */ + for (int msb = 0; msb < 256; msb++) { + packed[3] = (uint8_t)msb; + uint32_t val = (uint32_t)packed[0] + | ((uint32_t)packed[1] << 8) + | ((uint32_t)packed[2] << 16) + | ((uint32_t)packed[3] << 24); + ASSERT((val >> 24) == (uint32_t)msb, "crc32 high byte shift"); + } +} + +/* + * End-to-end: for ALL possible payload data (varying byte values that + * produce different CRC patterns), verify proto_send → proto_recv roundtrip. + * This catches the COBS trailing zero bug: ~1/256 CRC values have MSB=0x00. + */ +static void test_cobs_roundtrip_all_crc_patterns(void) { + /* Send 256 different 8-byte payloads through proto_send/recv. + * Each produces a different CRC32, covering all possible MSB values. */ + for (int i = 0; i < 256; i++) { + mock_reset(); + + uint8_t data[8]; + for (int j = 0; j < 8; j++) + data[j] = (uint8_t)((i * 37 + j * 53) & 0xFF); + + proto_send(RSP_DATA, data, 8); + + memcpy(mock_rx, mock_tx, mock_tx_len); + mock_rx_len = mock_tx_len; + mock_rx_pos = 0; + + uint8_t recv_buf[MAX_PAYLOAD + 16]; + uint32_t recv_len = 0; + uint8_t cmd = proto_recv(recv_buf, &recv_len, 1000); + + ASSERT(cmd == RSP_DATA, "crc pattern roundtrip: cmd"); + ASSERT(recv_len == 8, "crc pattern roundtrip: len"); + ASSERT(memcmp(recv_buf, data, 8) == 0, + "crc pattern roundtrip: data"); + } +} + +/* + * page_is_ff helper: verify it correctly identifies all-0xFF pages + * and rejects pages with even a single non-0xFF byte. + */ +static int page_is_ff_test(const uint8_t *data, uint32_t len) { + const uint32_t *w = (const uint32_t *)data; + for (uint32_t i = 0; i < len / 4; i++) + if (w[i] != 0xFFFFFFFF) return 0; + return 1; +} + +static void test_page_is_ff(void) { + uint8_t page[256]; + + /* All 0xFF → true */ + memset(page, 0xFF, 256); + ASSERT(page_is_ff_test(page, 256) == 1, "page_is_ff: all FF"); + + /* All 0x00 → false */ + memset(page, 0x00, 256); + ASSERT(page_is_ff_test(page, 256) == 0, "page_is_ff: all 00"); + + /* Single non-FF byte at each position */ + for (int pos = 0; pos < 256; pos++) { + memset(page, 0xFF, 256); + page[pos] = 0xFE; + ASSERT(page_is_ff_test(page, 256) == 0, + "page_is_ff: single non-FF byte"); + } + + /* Random data → false */ + for (int i = 0; i < 256; i++) page[i] = (uint8_t)i; + ASSERT(page_is_ff_test(page, 256) == 0, "page_is_ff: random data"); +} + +/* + * Sector bitmap: verify bit indexing matches the host Python implementation. + * Bit N of bitmap = sector N. LSB-first within each byte. + */ +static void test_sector_bitmap(void) { + uint8_t bitmap[32]; + memset(bitmap, 0, 32); + + /* Set specific sectors */ + bitmap[0] |= (1 << 0); /* sector 0 */ + bitmap[0] |= (1 << 7); /* sector 7 */ + bitmap[1] |= (1 << 0); /* sector 8 */ + bitmap[31] |= (1 << 7); /* sector 255 */ + + /* Check sector_has_data equivalent */ + ASSERT((bitmap[0 / 8] & (1 << (0 % 8))) != 0, "bitmap: sector 0 set"); + ASSERT((bitmap[7 / 8] & (1 << (7 % 8))) != 0, "bitmap: sector 7 set"); + ASSERT((bitmap[8 / 8] & (1 << (8 % 8))) != 0, "bitmap: sector 8 set"); + ASSERT((bitmap[255 / 8] & (1 << (255 % 8))) != 0, "bitmap: sector 255 set"); + + /* Check unset sectors */ + ASSERT((bitmap[1 / 8] & (1 << (1 % 8))) == 0, "bitmap: sector 1 unset"); + ASSERT((bitmap[128 / 8] & (1 << (128 % 8))) == 0, "bitmap: sector 128 unset"); + ASSERT((bitmap[254 / 8] & (1 << (254 % 8))) == 0, "bitmap: sector 254 unset"); + + /* All-ones bitmap */ + memset(bitmap, 0xFF, 32); + for (int s = 0; s < 256; s++) + ASSERT((bitmap[s / 8] & (1 << (s % 8))) != 0, "bitmap: all set"); + + /* All-zeros bitmap */ + memset(bitmap, 0, 32); + for (int s = 0; s < 256; s++) + ASSERT((bitmap[s / 8] & (1 << (s % 8))) == 0, "bitmap: all clear"); +} + /* ---------- main ---------- */ int main(void) { @@ -375,6 +552,13 @@ int main(void) { printf("Cross-compatibility:\n"); test_cobs_matches_python(); + printf("Regression:\n"); + test_cobs_trailing_zero_preserved(); + test_crc32_high_byte_shift(); + test_cobs_roundtrip_all_crc_patterns(); + test_page_is_ff(); + test_sector_bitmap(); + printf("\n%d/%d tests passed\n", tests_passed, tests_run); return tests_passed == tests_run ? 0 : 1; } diff --git a/tests/test_flash_stream_regression.py b/tests/test_flash_stream_regression.py new file mode 100644 index 0000000..144adae --- /dev/null +++ b/tests/test_flash_stream_regression.py @@ -0,0 +1,403 @@ +"""Regression tests for CMD_FLASH_STREAM and bugs found during hardware testing. + +These tests cover bugs that caused data corruption on real cameras: +1. COBS trailing zero strip: ~1/256 packets lost trailing 0x00 from CRC32 +2. CRC32 high-byte UB: uint8_t << 24 is undefined when value >= 128 +3. _recv_packet_sync partial frame stashing: corrupted next call +4. recv_response READY timeout: READY packets consumed timeout budget +5. FMC memory window 1MB wrap: reads repeated every 1MB (masking write bugs) +6. Sector bitmap: host and agent must agree on bit ordering +""" + +import struct +import zlib + +import pytest + +from defib.agent import cobs as pycobs +from defib.agent.protocol import ( + ACK_OK, + CMD_FLASH_STREAM, + RSP_ACK, + RSP_DATA, + RSP_READY, + build_packet, + parse_packet, + _recv_packet_sync, + _port_buffers, +) + + +# --------------------------------------------------------------------------- +# Test infrastructure (same FakePort pattern as test_agent_protocol.py) +# --------------------------------------------------------------------------- + +class FakePort: + """Minimal pyserial-compatible port backed by a byte buffer.""" + + def __init__(self, rx_data: bytes = b""): + self._rx = bytearray(rx_data) + self._tx = bytearray() + self.timeout = None + + def feed(self, data: bytes) -> None: + self._rx.extend(data) + + @property + def in_waiting(self) -> int: + return len(self._rx) + + def read(self, size: int = 1) -> bytes: + if not self._rx: + return b"" + n = min(size, len(self._rx)) + data = bytes(self._rx[:n]) + del self._rx[:n] + return data + + def write(self, data: bytes) -> int: + self._tx.extend(data) + return len(data) + + @property + def tx_data(self) -> bytes: + return bytes(self._tx) + + +def make_device_packet(cmd: int, data: bytes = b"") -> bytes: + return build_packet(cmd, data) + + +# --------------------------------------------------------------------------- +# Bug 1: COBS trailing zero strip +# Root cause of ALL flash write failures. cobs_decode() stripped trailing +# 0x00 bytes. When CRC32 MSB was 0x00, decoded output was 1 byte short. +# --------------------------------------------------------------------------- + +class TestCobsTrailingZero: + def test_roundtrip_preserves_trailing_zero(self): + """Payload ending with 0x00 must survive COBS roundtrip.""" + original = b"\x01\x02\x03\x00" + encoded = pycobs.encode(original) + decoded = pycobs.decode(encoded) + assert decoded == original + + def test_crc32_with_zero_msb(self): + """Packets where CRC32 MSB is 0x00 must parse correctly. + + This is the exact pattern that caused ~1/256 write failures. + """ + for i in range(1000): + # Vary payload to hit different CRC patterns + payload_data = bytes([(i * 7 + j * 13) & 0xFF for j in range(8)]) + cmd = 0x82 # RSP_DATA + + # Build packet: [cmd][data][crc32 LE] + raw = bytes([cmd]) + payload_data + crc = zlib.crc32(raw) & 0xFFFFFFFF + raw_with_crc = raw + struct.pack("> 24) & 0xFF:02x}" + ) + + def test_build_parse_all_crc_patterns(self): + """build_packet → parse_packet for 256 different payloads. + + Covers all possible CRC32 MSB values (0x00-0xFF). + """ + for i in range(256): + data = bytes([(i * 37 + j) & 0xFF for j in range(12)]) + pkt = build_packet(0x82, data) + # Strip trailing 0x00 delimiter + cmd, parsed = parse_packet(pkt[:-1]) + assert cmd == 0x82 + assert parsed == data, f"Trial {i}: data mismatch" + + +# --------------------------------------------------------------------------- +# Bug 2: recv_packet_sync partial frame stashing +# _recv_packet_sync stashed bytes including partial COBS frames between +# calls, corrupting the next packet's decode. +# --------------------------------------------------------------------------- + +class TestPartialFrameStashing: + def setup_method(self): + """Clear global port buffers between tests.""" + _port_buffers.clear() + + def test_consecutive_packets_no_corruption(self): + """Two consecutive recv calls must not corrupt each other.""" + pkt1 = make_device_packet(RSP_ACK, bytes([ACK_OK])) + pkt2 = make_device_packet(RSP_DATA, struct.pack(" bytearray: + """Build bitmap matching client.py's write_flash() logic.""" + bitmap = bytearray(32) + for s in sectors_with_data: + bitmap[s // 8] |= 1 << (s % 8) + return bitmap + + @staticmethod + def _sector_has_data(bitmap: bytes, s: int) -> bool: + """Check bitmap matching agent's handle_flash_stream() logic.""" + return bool(bitmap[s // 8] & (1 << (s % 8))) + + def test_individual_sectors(self): + """Each sector index maps to the correct bit.""" + for s in range(256): + bitmap = self._build_bitmap([s]) + assert self._sector_has_data(bitmap, s), f"sector {s}" + # All other sectors should be unset + for other in [0, 1, 7, 8, 127, 128, 254, 255]: + if other != s: + assert not self._sector_has_data(bitmap, other), ( + f"sector {other} should be unset when only {s} is set" + ) + + def test_all_sectors_set(self): + bitmap = self._build_bitmap(list(range(256))) + for s in range(256): + assert self._sector_has_data(bitmap, s) + + def test_no_sectors_set(self): + bitmap = self._build_bitmap([]) + for s in range(256): + assert not self._sector_has_data(bitmap, s) + + def test_byte_boundaries(self): + """Sectors at byte boundaries: 0, 7, 8, 15, 16, 255.""" + bitmap = self._build_bitmap([0, 7, 8, 15, 16, 255]) + assert self._sector_has_data(bitmap, 0) + assert self._sector_has_data(bitmap, 7) + assert self._sector_has_data(bitmap, 8) + assert self._sector_has_data(bitmap, 15) + assert self._sector_has_data(bitmap, 16) + assert self._sector_has_data(bitmap, 255) + assert not self._sector_has_data(bitmap, 1) + assert not self._sector_has_data(bitmap, 254) + + def test_bitmap_from_firmware_data(self): + """Simulate write_flash() bitmap construction for 8MB firmware.""" + sector_sz = 0x10000 + # 8MB firmware + 8MB 0xFF padding + firmware = bytes(range(256)) * (8 * 1024 * 1024 // 256) + firmware += b'\xff' * (8 * 1024 * 1024) + assert len(firmware) == 16 * 1024 * 1024 + + num_sectors = len(firmware) // sector_sz + ff_sector = b'\xff' * sector_sz + bitmap = bytearray(32) + for s in range(num_sectors): + sector_data = firmware[s * sector_sz : (s + 1) * sector_sz] + if sector_data != ff_sector[:len(sector_data)]: + bitmap[s // 8] |= 1 << (s % 8) + + # First 128 sectors (8MB) should have data + for s in range(128): + assert self._sector_has_data(bitmap, s), f"sector {s} should have data" + # Last 128 sectors (8MB 0xFF) should be skipped + for s in range(128, 256): + assert not self._sector_has_data(bitmap, s), f"sector {s} should be skip" + + +# --------------------------------------------------------------------------- +# Bug 5: CMD_FLASH_STREAM payload format +# Agent expects 44 bytes: [addr:4][size:4][crc:4][bitmap:32] +# --------------------------------------------------------------------------- + +class TestFlashStreamPayload: + def test_payload_format(self): + """CMD_FLASH_STREAM packet has correct 44-byte payload.""" + addr = 0 + size = 16 * 1024 * 1024 + crc = 0xDEADBEEF + bitmap = bytes([0xFF] * 32) + + payload = struct.pack("> j) & 0xFF for j in range(8)]) + crc = zlib.crc32(raw) & 0xFFFFFFFF + if (crc >> 24) == 0x00: + # This is the dangerous pattern — CRC MSB is 0x00 + pkt = build_packet(0x82, raw[1:]) + cmd, data = parse_packet(pkt[:-1]) + assert cmd == 0x82 + assert data == raw[1:] + return + + pytest.skip("Could not find CRC with MSB=0x00 in 10000 trials") + + def test_page_is_ff_logic(self): + """Python equivalent of agent's page_is_ff check.""" + # All 0xFF + assert b'\xff' * 256 == b'\xff' * 256 + + # Single byte different at each position + for pos in range(256): + page = bytearray(b'\xff' * 256) + page[pos] = 0xFE + assert bytes(page) != b'\xff' * 256