forked from Python-roborock/python-roborock
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_b01_q10_protocol.py
More file actions
116 lines (96 loc) · 3.75 KB
/
test_b01_q10_protocol.py
File metadata and controls
116 lines (96 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Tests for the B01 protocol message encoding and decoding."""
import json
import pathlib
from collections.abc import Generator
from typing import Any
import pytest
from freezegun import freeze_time
from syrupy import SnapshotAssertion
from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP, YXWaterLevel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import (
decode_rpc_response,
encode_mqtt_payload,
)
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
TESTDATA_PATH = pathlib.Path("tests/protocols/testdata/b01_q10_protocol/")
TESTDATA_FILES = list(TESTDATA_PATH.glob("*.json"))
TESTDATA_IDS = [x.stem for x in TESTDATA_FILES]
@pytest.fixture(autouse=True)
def fixed_time_fixture() -> Generator[None, None, None]:
"""Fixture to freeze time for predictable request IDs."""
with freeze_time("2025-01-20T12:00:00"):
yield
@pytest.mark.parametrize("filename", TESTDATA_FILES, ids=TESTDATA_IDS)
def test_decode_rpc_payload(filename: str, snapshot: SnapshotAssertion) -> None:
"""Test decoding a B01 RPC response protocol message."""
with open(filename, "rb") as f:
payload = f.read()
message = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=payload,
seq=12750,
version=b"B01",
random=97431,
timestamp=1652547161,
)
decoded_message = decode_rpc_response(message)
assert json.dumps(decoded_message, indent=2) == snapshot
@pytest.mark.parametrize(
("payload", "expected_error_message"),
[
(b"", "missing payload"),
(b"n", "Invalid B01 json payload"),
(b"{}", "missing 'dps'"),
(b'{"dps": []}', "'dps' should be a dictionary"),
(b'{"dps": {"not_a_number": 123}}', "dps key is not a valid integer"),
(b'{"dps": {"101": 123}}', "Invalid dpCommon format: expected dict"),
(b'{"dps": {"101": {"not_a_number": 123}}}', "Invalid dpCommon format: dps key is not a valid intege"),
],
)
def test_decode_invalid_rpc_payload(payload: bytes, expected_error_message: str) -> None:
"""Test decoding a B01 RPC response protocol message."""
message = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=payload,
seq=12750,
version=b"B01",
random=97431,
timestamp=1652547161,
)
with pytest.raises(RoborockException, match=expected_error_message):
decode_rpc_response(message)
def test_decode_unknown_dps_code() -> None:
"""Test decoding a B01 RPC response protocol message."""
message = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=b'{"dps": {"909090": 123, "122":100}}',
seq=12750,
version=b"B01",
random=97431,
timestamp=1652547161,
)
decoded_message = decode_rpc_response(message)
assert decoded_message == {
B01_Q10_DP.BATTERY: 100,
}
@pytest.mark.parametrize(
("command", "params"),
[
(B01_Q10_DP.REQUEST_DPS, {}),
(B01_Q10_DP.REQUEST_DPS, None),
(B01_Q10_DP.START_CLEAN, {"cmd": 1}),
(B01_Q10_DP.WATER_LEVEL, YXWaterLevel.MIDDLE.code),
],
)
def test_encode_mqtt_payload(command: B01_Q10_DP, params: dict[str, Any], snapshot) -> None:
"""Test encoding of MQTT payload for B01 Q10 commands."""
message = encode_mqtt_payload(command, params)
assert isinstance(message, RoborockMessage)
assert message.protocol == RoborockMessageProtocol.RPC_REQUEST
assert message.version == b"B01"
assert message.payload is not None
# Snapshot the raw payload to ensure stable encoding. We verify it is
# valid json
assert snapshot == message.payload
json.loads(message.payload.decode())