Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions src/meshcore/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ async def handle_rx(self, data: bytearray):

elif packet_type_value == PacketType.DEFAULT_FLOOD_SCOPE.value:
res = {}
res["scope_name"] = dbuf.read(31).decode("utf-8", "ignore").replace("\0", "")
res["scope_key"] = dbuf.read(16).hex()
# Firmware emits a 48-byte frame when a scope is configured,
# or a 1-byte sentinel frame (just the response code) when no
# scope is set. Gate the 31+16 byte read so the sentinel
# dispatches an empty payload instead of empty-string fields.
if len(data) >= 48:
res["scope_name"] = dbuf.read(31).decode("utf-8", "ignore").replace("\0", "")
res["scope_key"] = dbuf.read(16).hex()
await self.dispatcher.dispatch(Event(EventType.DEFAULT_FLOOD_SCOPE, res))

elif packet_type_value == PacketType.MSG_SENT.value:
Expand Down Expand Up @@ -491,7 +496,12 @@ async def handle_rx(self, data: bytearray):

res = {}
res["config"] = dbuf.read(1)[0]
await self.dispatcher.dispatch(Event(EventType.AUTOADD_CONFIG, res, res))
# `max_hops` trailing byte added in companion-v1.14.0
# (firmware commit 00566741). Pre-v1.14.0 firmware emits a
# 1-byte response; read defensively to remain compatible.
if len(data) >= 3:
res["max_hops"] = dbuf.read(1)[0]
await self.dispatcher.dispatch(Event(EventType.AUTOADD_CONFIG, res, res))

elif packet_type_value == PacketType.CHANNEL_INFO.value:
logger.debug(f"received channel info response: {data.hex()}")
Expand Down Expand Up @@ -533,6 +543,12 @@ async def handle_rx(self, data: bytearray):

if len(data) >= 5:
ack_data["code"] = dbuf.read(4).hex()
# `trip_time` (round-trip latency in ms) has been on the wire
# since companion-v1.0.0a (firmware commit d9dc76f1, Jan 2025).
# Read defensively so legacy frames without this field still
# dispatch correctly.
if len(data) >= 9:
ack_data["trip_time"] = int.from_bytes(dbuf.read(4), "little")

attributes = {"code": ack_data.get("code", "")}

Expand All @@ -546,7 +562,13 @@ async def handle_rx(self, data: bytearray):
res = {}
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True)
res["payload"] = dbuf.read(4).hex()
# Firmware emits code(1) + snr(1) + rssi(1) + reserved(1, 0xFF)
# + payload(N). The reserved byte is intended as a future
# path_len field; consume it explicitly so `payload` aligns
# with the firmware's `packet->payload` bytes and is not
# truncated to a fixed length.
dbuf.read(1) # discard reserved byte
res["payload"] = dbuf.read().hex()
logger.debug("Received raw data")
logger.debug(res)
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
Expand All @@ -560,8 +582,19 @@ async def handle_rx(self, data: bytearray):
res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set
if len(data) > 7:
res["pubkey_prefix"] = dbuf.read(6).hex()

attributes = {"pubkey_prefix": res.get("pubkey_prefix")}
# The following trailing fields are emitted only by the new-
# style RESP_SERVER_LOGIN_OK path. server_timestamp landed in
# firmware commit 0e90b731 (companion-v1.10.0); acl_permissions
# in 7947e8a2; fw_ver_level in 418ae08b (also companion-v1.10.0).
# Per-field length gates handle every firmware-version tier.
if len(data) >= 12:
res["server_timestamp"] = int.from_bytes(dbuf.read(4), "little")
if len(data) >= 13:
res["acl_permissions"] = dbuf.read(1)[0]
if len(data) >= 14:
res["fw_ver_level"] = dbuf.read(1)[0]

await self.dispatcher.dispatch(
Event(EventType.LOGIN_SUCCESS, res, attributes)
Expand Down
236 changes: 236 additions & 0 deletions tests/unit/test_protocol_surface_gaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,239 @@ async def mock_send(data, expected_events, timeout=None):
assert captured_data is not None
assert captured_data[0] == CommandType.GET_STATS.value # 0x38 = 56
assert captured_data[1] == 0x00


# ---------------------------------------------------------------------------
# Wire-format parity bundle: AUTOADD_CONFIG (max_hops trailing byte)
# ---------------------------------------------------------------------------
# AUTOADD_CONFIG firmware emits 1 byte (legacy) or 2 bytes (companion-v1.14.0+,
# commit 00566741, adds `max_hops`). The SDK had been silently dropping the
# trailing byte. These pairs verify the defensive trailing-field read.

@pytest.mark.asyncio
async def test_autoadd_config_legacy_one_byte_frame():
"""Legacy 1-byte AUTOADD_CONFIG frame: only `config` key present."""
reader, dispatched = _make_reader()
frame = bytes([PacketType.AUTOADD_CONFIG.value, 0x01]) # code + config=1

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.AUTOADD_CONFIG
assert evt.payload == {"config": 1}
assert "max_hops" not in evt.payload


@pytest.mark.asyncio
async def test_autoadd_config_modern_two_byte_frame():
"""Modern 2-byte AUTOADD_CONFIG frame (companion-v1.14.0+): `config` + `max_hops`."""
reader, dispatched = _make_reader()
frame = bytes([PacketType.AUTOADD_CONFIG.value, 0x01, 0x10]) # code + config=1 + max_hops=16

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.AUTOADD_CONFIG
assert evt.payload == {"config": 1, "max_hops": 16}


# ---------------------------------------------------------------------------
# Wire-format parity bundle: LOGIN_SUCCESS (3 trailing fields)
# ---------------------------------------------------------------------------
# Firmware emits an 8-byte legacy frame ("OK" path) or a 14-byte modern frame
# (RESP_SERVER_LOGIN_OK path, companion-v1.10.0+) with trailing server_timestamp
# (4B), acl_permissions (1B), fw_ver_level (1B).

@pytest.mark.asyncio
async def test_login_success_legacy_eight_byte_frame():
"""Legacy 8-byte LOGIN_SUCCESS: only permissions/is_admin/pubkey_prefix."""
reader, dispatched = _make_reader()
pubkey_prefix = bytes([0x01, 0x02, 0x03, 0x04, 0x05, 0x06])
frame = bytes([PacketType.LOGIN_SUCCESS.value, 0x00]) + pubkey_prefix
assert len(frame) == 8

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.LOGIN_SUCCESS
assert evt.payload.get("permissions") == 0
assert evt.payload.get("is_admin") is False
assert evt.payload.get("pubkey_prefix") == pubkey_prefix.hex()
assert "server_timestamp" not in evt.payload
assert "acl_permissions" not in evt.payload
assert "fw_ver_level" not in evt.payload


@pytest.mark.asyncio
async def test_login_success_modern_14_byte_frame():
"""Modern 14-byte LOGIN_SUCCESS (companion-v1.10.0+): all 6 keys."""
reader, dispatched = _make_reader()
pubkey_prefix = bytes([0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF])
server_timestamp = 0x12345678
acl_permissions = 0x07
fw_ver_level = 0x42
frame = (
bytes([PacketType.LOGIN_SUCCESS.value, 0x01]) # code + permissions (is_admin=True)
+ pubkey_prefix
+ server_timestamp.to_bytes(4, "little")
+ bytes([acl_permissions, fw_ver_level])
)
assert len(frame) == 14

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.LOGIN_SUCCESS
assert evt.payload["permissions"] == 1
assert evt.payload["is_admin"] is True
assert evt.payload["pubkey_prefix"] == pubkey_prefix.hex()
assert evt.payload["server_timestamp"] == server_timestamp
assert evt.payload["acl_permissions"] == acl_permissions
assert evt.payload["fw_ver_level"] == fw_ver_level


@pytest.mark.asyncio
async def test_login_success_intermediate_12_byte_frame():
"""Hypothetical 12-byte LOGIN_SUCCESS (server_timestamp only): per-field gates work."""
reader, dispatched = _make_reader()
pubkey_prefix = bytes([0x11, 0x22, 0x33, 0x44, 0x55, 0x66])
server_timestamp = 0xDEADBEEF
frame = (
bytes([PacketType.LOGIN_SUCCESS.value, 0x00])
+ pubkey_prefix
+ server_timestamp.to_bytes(4, "little")
)
assert len(frame) == 12

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.payload["server_timestamp"] == server_timestamp
assert "acl_permissions" not in evt.payload
assert "fw_ver_level" not in evt.payload


# ---------------------------------------------------------------------------
# Wire-format parity bundle: ACK (trailing trip_time)
# ---------------------------------------------------------------------------
# Firmware emits 4-byte ack hash + 4-byte trip_time (ms) since companion-v1.0.0a
# (commit d9dc76f1, Jan 2025). The SDK had been silently dropping trip_time.

@pytest.mark.asyncio
async def test_ack_legacy_5_byte_frame():
"""Legacy 5-byte ACK frame (code + 4B hash): no trip_time."""
reader, dispatched = _make_reader()
ack_hash = bytes([0xDE, 0xAD, 0xBE, 0xEF])
frame = bytes([PacketType.ACK.value]) + ack_hash

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.ACK
assert evt.payload.get("code") == ack_hash.hex()
assert "trip_time" not in evt.payload


@pytest.mark.asyncio
async def test_ack_modern_9_byte_frame():
"""Modern 9-byte ACK frame (companion-v1.0.0a+): code + hash + trip_time."""
reader, dispatched = _make_reader()
ack_hash = bytes([0x01, 0x02, 0x03, 0x04])
trip_time_ms = 1234
frame = (
bytes([PacketType.ACK.value])
+ ack_hash
+ trip_time_ms.to_bytes(4, "little")
)

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.ACK
assert evt.payload["code"] == ack_hash.hex()
assert evt.payload["trip_time"] == trip_time_ms


# ---------------------------------------------------------------------------
# Wire-format parity bundle: RAW_DATA (reserved-byte cursor + variable payload)
# ---------------------------------------------------------------------------
# Firmware emits code(1) + snr(int8, ×4) + rssi(int8) + reserved(0xFF) + payload(N).
# Pre-fix SDK reads code+snr+rssi+payload(4B) -- swallowing the reserved byte
# as the first payload byte AND truncating to 4 bytes. Post-fix discards the
# reserved byte and reads the remainder.

@pytest.mark.asyncio
async def test_raw_data_realistic_frame():
"""RAW_DATA frame: dispatched payload matches firmware-emitted bytes exactly."""
reader, dispatched = _make_reader()
snr_quarters = -40 # -10.0 dB after / 4
rssi = -75
payload_bytes = bytes(range(0x10, 0x1A)) # 10 bytes of distinct data
frame = (
bytes([PacketType.RAW_DATA.value])
+ snr_quarters.to_bytes(1, "little", signed=True)
+ rssi.to_bytes(1, "little", signed=True)
+ bytes([0xFF]) # firmware reserved byte (intended as future path_len)
+ payload_bytes
)

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.RAW_DATA
# SNR/RSSI: keep the historical capitalised keys the SDK has always used.
assert evt.payload["SNR"] == -10.0
assert evt.payload["RSSI"] == rssi
# Payload: full hex string of just the payload bytes (NO leading 0xff,
# NO truncation to 4 bytes).
assert evt.payload["payload"] == payload_bytes.hex()


# ---------------------------------------------------------------------------
# Wire-format parity bundle: DEFAULT_FLOOD_SCOPE (length-guarded read)
# ---------------------------------------------------------------------------
# Firmware emits a 48-byte frame when scope is set OR a 1-byte sentinel frame
# when no scope is configured. Pre-fix SDK over-reads 47 bytes on the sentinel
# and dispatches {scope_name: "", scope_key: ""}; post-fix dispatches {}.

@pytest.mark.asyncio
async def test_default_flood_scope_full_48_byte_frame():
"""48-byte DEFAULT_FLOOD_SCOPE: both scope_name and scope_key populated."""
reader, dispatched = _make_reader()
scope_name = b"my-scope" + b"\x00" * 23 # 31 bytes total, null-padded
scope_key = bytes(range(16))
frame = bytes([PacketType.DEFAULT_FLOOD_SCOPE.value]) + scope_name + scope_key
assert len(frame) == 48

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.DEFAULT_FLOOD_SCOPE
assert evt.payload["scope_name"] == "my-scope"
assert evt.payload["scope_key"] == scope_key.hex()


@pytest.mark.asyncio
async def test_default_flood_scope_sentinel_frame_empty_payload():
"""1-byte sentinel DEFAULT_FLOOD_SCOPE frame: dispatched payload is empty dict."""
reader, dispatched = _make_reader()
frame = bytes([PacketType.DEFAULT_FLOOD_SCOPE.value])

await reader.handle_rx(bytearray(frame))

assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.DEFAULT_FLOOD_SCOPE
# Post-fix: handler gates on len(data) >= 48, so the 1-byte sentinel
# dispatches an empty payload (consumers detect "no scope" via key
# presence, not via empty-string sentinel values).
assert evt.payload == {}