-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathb01_protocol.py
More file actions
55 lines (44 loc) · 1.91 KB
/
b01_protocol.py
File metadata and controls
55 lines (44 loc) · 1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""Roborock B01 Protocol encoding and decoding."""
import json
import logging
from typing import Any
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from roborock import RoborockB01Methods
from roborock.exceptions import RoborockException
from roborock.roborock_message import (
RoborockMessage,
RoborockMessageProtocol,
)
_LOGGER = logging.getLogger(__name__)
B01_VERSION = b"B01"
CommandType = RoborockB01Methods | str
ParamsType = list | dict | int | None
def encode_mqtt_payload(dps: int, command: CommandType, params: ParamsType) -> RoborockMessage:
"""Encode payload for B01 commands over MQTT."""
dps_data = {"dps": {dps: {"method": command, "params": params or []}}}
payload = pad(json.dumps(dps_data).encode("utf-8"), AES.block_size)
return RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
version=B01_VERSION,
payload=payload,
)
def decode_rpc_response(message: RoborockMessage) -> dict[int, Any]:
"""Decode a B01 RPC_RESPONSE message."""
if not message.payload:
raise RoborockException("Invalid B01 message format: missing payload")
try:
unpadded = unpad(message.payload, AES.block_size)
except ValueError as err:
raise RoborockException(f"Unable to unpad B01 payload: {err}")
try:
payload = json.loads(unpadded.decode())
except (json.JSONDecodeError, TypeError) as e:
raise RoborockException(f"Invalid B01 message payload: {e} for {message.payload!r}") from e
datapoints = payload.get("dps", {})
if not isinstance(datapoints, dict):
raise RoborockException(f"Invalid B01 message format: 'dps' should be a dictionary for {message.payload!r}")
try:
return {int(key): value for key, value in datapoints.items()}
except ValueError:
raise RoborockException(f"Invalid B01 message format: 'dps' key should be an integer for {message.payload!r}")