22
33from __future__ import annotations
44
5+ import asyncio
56import logging
7+ from collections .abc import Iterable
8+ from typing import Any
69
710from roborock .data .b01_q10 .b01_q10_code_mappings import B01_Q10_DP
811from roborock .devices .transport .mqtt_channel import MqttChannel
912from roborock .exceptions import RoborockException
1013from roborock .protocols .b01_q10_protocol import (
1114 ParamsType ,
15+ decode_rpc_response ,
1216 encode_mqtt_payload ,
1317)
18+ from roborock .roborock_message import RoborockMessage
1419
1520_LOGGER = logging .getLogger (__name__ )
21+ _TIMEOUT = 10.0
1622
1723
1824async def send_command (
@@ -34,3 +40,61 @@ async def send_command(
3440 ex ,
3541 )
3642 raise
43+
44+
45+ async def send_decoded_command (
46+ mqtt_channel : MqttChannel ,
47+ command : B01_Q10_DP ,
48+ params : ParamsType ,
49+ expected_dps : Iterable [B01_Q10_DP ] | None = None ,
50+ ) -> dict [B01_Q10_DP , Any ]:
51+ """Send a command and await the first decoded response.
52+
53+ Q10 responses are not correlated with a message id, so we filter on
54+ expected datapoints when provided.
55+ """
56+ roborock_message = encode_mqtt_payload (command , params )
57+ future : asyncio .Future [dict [B01_Q10_DP , Any ]] = asyncio .get_running_loop ().create_future ()
58+
59+ expected_set = set (expected_dps ) if expected_dps is not None else None
60+
61+ def find_response (response_message : RoborockMessage ) -> None :
62+ try :
63+ decoded_dps = decode_rpc_response (response_message )
64+ except RoborockException as ex :
65+ _LOGGER .debug (
66+ "Failed to decode B01 Q10 RPC response (expecting %s): %s: %s" ,
67+ command ,
68+ response_message ,
69+ ex ,
70+ )
71+ return
72+ if expected_set and not any (dps in decoded_dps for dps in expected_set ):
73+ return
74+ if not future .done ():
75+ future .set_result (decoded_dps )
76+
77+ unsub = await mqtt_channel .subscribe (find_response )
78+
79+ _LOGGER .debug ("Sending MQTT message: %s" , roborock_message )
80+ try :
81+ await mqtt_channel .publish (roborock_message )
82+ return await asyncio .wait_for (future , timeout = _TIMEOUT )
83+ except TimeoutError as ex :
84+ raise RoborockException (f"B01 Q10 command timed out after { _TIMEOUT } s ({ command } )" ) from ex
85+ except RoborockException as ex :
86+ _LOGGER .warning (
87+ "Error sending B01 Q10 decoded command (%s): %s" ,
88+ command ,
89+ ex ,
90+ )
91+ raise
92+ except Exception as ex :
93+ _LOGGER .exception (
94+ "Error sending B01 Q10 decoded command (%s): %s" ,
95+ command ,
96+ ex ,
97+ )
98+ raise
99+ finally :
100+ unsub ()
0 commit comments