1818from aiomqtt import MqttCodeError , MqttError , TLSParameters
1919
2020from roborock .callbacks import CallbackMap
21+ from roborock .diagnostics import Diagnostics
2122
2223from .health_manager import HealthManager
2324from .session import MqttParams , MqttSession , MqttSessionException , MqttSessionUnauthorized
@@ -76,6 +77,7 @@ def __init__(
7677 self ._connection_task : asyncio .Task [None ] | None = None
7778 self ._topic_idle_timeout = topic_idle_timeout
7879 self ._idle_timers : dict [str , asyncio .Task [None ]] = {}
80+ self ._diagnostics = params .diagnostics
7981 self ._health_manager = HealthManager (self .restart )
8082
8183 @property
@@ -96,24 +98,30 @@ async def start(self) -> None:
9698 handle the failure and retry if desired itself. Once connected,
9799 the session will retry connecting in the background.
98100 """
101+ self ._diagnostics .increment ("start_attempt" )
99102 start_future : asyncio .Future [None ] = asyncio .Future ()
100103 loop = asyncio .get_event_loop ()
101104 self ._reconnect_task = loop .create_task (self ._run_reconnect_loop (start_future ))
102105 try :
103106 await start_future
104107 except MqttCodeError as err :
108+ self ._diagnostics .increment (f"start_failure:{ err .rc } " )
105109 if err .rc == MqttReasonCode .RC_ERROR_UNAUTHORIZED :
106110 raise MqttSessionUnauthorized (f"Authorization error starting MQTT session: { err } " ) from err
107111 raise MqttSessionException (f"Error starting MQTT session: { err } " ) from err
108112 except MqttError as err :
113+ self ._diagnostics .increment ("start_failure:unknown" )
109114 raise MqttSessionException (f"Error starting MQTT session: { err } " ) from err
110115 except Exception as err :
116+ self ._diagnostics .increment ("start_failure:uncaught" )
111117 raise MqttSessionException (f"Unexpected error starting session: { err } " ) from err
112118 else :
119+ self ._diagnostics .increment ("start_success" )
113120 _LOGGER .debug ("MQTT session started successfully" )
114121
115122 async def close (self ) -> None :
116123 """Cancels the MQTT loop and shutdown the client library."""
124+ self ._diagnostics .increment ("close" )
117125 self ._stop = True
118126 tasks = [task for task in [self ._connection_task , self ._reconnect_task , * self ._idle_timers .values ()] if task ]
119127 self ._connection_task = None
@@ -136,6 +144,7 @@ async def restart(self) -> None:
136144 the reconnect loop. This is a no-op if there is no active connection.
137145 """
138146 _LOGGER .info ("Forcing MQTT session restart" )
147+ self ._diagnostics .increment ("restart" )
139148 if self ._connection_task :
140149 self ._connection_task .cancel ()
141150 else :
@@ -144,6 +153,7 @@ async def restart(self) -> None:
144153 async def _run_reconnect_loop (self , start_future : asyncio .Future [None ] | None ) -> None :
145154 """Run the MQTT loop."""
146155 _LOGGER .info ("Starting MQTT session" )
156+ self ._diagnostics .increment ("start_loop" )
147157 while True :
148158 try :
149159 self ._connection_task = asyncio .create_task (self ._run_connection (start_future ))
@@ -164,6 +174,7 @@ async def _run_reconnect_loop(self, start_future: asyncio.Future[None] | None) -
164174 _LOGGER .debug ("MQTT session closed, stopping retry loop" )
165175 return
166176 _LOGGER .info ("MQTT session disconnected, retrying in %s seconds" , self ._backoff .total_seconds ())
177+ self ._diagnostics .increment ("reconnect_wait" )
167178 await asyncio .sleep (self ._backoff .total_seconds ())
168179 self ._backoff = min (self ._backoff * BACKOFF_MULTIPLIER , MAX_BACKOFF_INTERVAL )
169180
@@ -175,17 +186,19 @@ async def _run_connection(self, start_future: asyncio.Future[None] | None) -> No
175186 is lost, this method will exit.
176187 """
177188 try :
178- async with self ._mqtt_client (self ._params ) as client :
179- self ._backoff = MIN_BACKOFF_INTERVAL
180- self ._healthy = True
181- _LOGGER .info ("MQTT Session connected." )
182- if start_future and not start_future .done ():
183- start_future .set_result (None )
184-
185- _LOGGER .debug ("Processing MQTT messages" )
186- async for message in client .messages :
187- _LOGGER .debug ("Received message: %s" , message )
188- self ._listeners (message .topic .value , message .payload )
189+ with self ._diagnostics .timer ("connection" ):
190+ async with self ._mqtt_client (self ._params ) as client :
191+ self ._backoff = MIN_BACKOFF_INTERVAL
192+ self ._healthy = True
193+ _LOGGER .info ("MQTT Session connected." )
194+ if start_future and not start_future .done ():
195+ start_future .set_result (None )
196+
197+ _LOGGER .debug ("Processing MQTT messages" )
198+ async for message in client .messages :
199+ _LOGGER .debug ("Received message: %s" , message )
200+ with self ._diagnostics .timer ("dispatch_message" ):
201+ self ._listeners (message .topic .value , message .payload )
189202 except MqttError as err :
190203 if start_future and not start_future .done ():
191204 _LOGGER .info ("MQTT error starting session: %s" , err )
@@ -227,6 +240,7 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
227240 async with self ._client_lock :
228241 self ._client = client
229242 for topic in self ._client_subscribed_topics :
243+ self ._diagnostics .increment ("resubscribe" )
230244 _LOGGER .debug ("Re-establishing subscription to topic %s" , topic )
231245 # TODO: If this fails it will break the whole connection. Make
232246 # this retry again in the background with backoff.
@@ -251,6 +265,7 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
251265
252266 # If there is an idle timer for this topic, cancel it (reuse subscription)
253267 if idle_timer := self ._idle_timers .pop (topic , None ):
268+ self ._diagnostics .increment ("unsubscribe_idle_cancel" )
254269 idle_timer .cancel ()
255270 _LOGGER .debug ("Cancelled idle timer for topic %s (reused subscription)" , topic )
256271
@@ -262,13 +277,15 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
262277 if self ._client :
263278 _LOGGER .debug ("Establishing subscription to topic %s" , topic )
264279 try :
265- await self ._client .subscribe (topic )
280+ with self ._diagnostics .timer ("subscribe" ):
281+ await self ._client .subscribe (topic )
266282 except MqttError as err :
267283 # Clean up the callback if subscription fails
268284 unsub ()
269285 self ._client_subscribed_topics .discard (topic )
270286 raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
271287 else :
288+ self ._diagnostics .increment ("subscribe_pending" )
272289 _LOGGER .debug ("Client not connected, will establish subscription later" )
273290
274291 def schedule_unsubscribe () -> None :
@@ -301,10 +318,11 @@ async def idle_unsubscribe():
301318 self ._idle_timers [topic ] = task
302319
303320 def delayed_unsub ():
321+ self ._diagnostics .increment ("unsubscribe" )
304322 unsub () # Remove the callback from CallbackMap
305323 # If no more callbacks for this topic, start idle timer
306324 if not self ._listeners .get_callbacks (topic ):
307- _LOGGER . debug ( "Unsubscribing topic %s, starting idle timer" , topic )
325+ self . _diagnostics . increment ( "unsubscribe_idle_start" )
308326 schedule_unsubscribe ()
309327 else :
310328 _LOGGER .debug ("Unsubscribing topic %s, still have active callbacks" , topic )
@@ -320,7 +338,8 @@ async def publish(self, topic: str, message: bytes) -> None:
320338 raise MqttSessionException ("Could not publish message, MQTT client not connected" )
321339 client = self ._client
322340 try :
323- await client .publish (topic , message )
341+ with self ._diagnostics .timer ("publish" ):
342+ await client .publish (topic , message )
324343 except MqttError as err :
325344 raise MqttSessionException (f"Error publishing message: { err } " ) from err
326345
@@ -333,11 +352,12 @@ class LazyMqttSession(MqttSession):
333352 is made.
334353 """
335354
336- def __init__ (self , session : RoborockMqttSession ) -> None :
355+ def __init__ (self , session : RoborockMqttSession , diagnostics : Diagnostics ) -> None :
337356 """Initialize the lazy session with an existing session."""
338357 self ._lock = asyncio .Lock ()
339358 self ._started = False
340359 self ._session = session
360+ self ._diagnostics = diagnostics
341361
342362 @property
343363 def connected (self ) -> bool :
@@ -353,6 +373,7 @@ async def _maybe_start(self) -> None:
353373 """Start the MQTT session if not already started."""
354374 async with self ._lock :
355375 if not self ._started :
376+ self ._diagnostics .increment ("start" )
356377 await self ._session .start ()
357378 self ._started = True
358379
@@ -403,4 +424,4 @@ async def create_lazy_mqtt_session(params: MqttParams) -> MqttSession:
403424 This function is a factory for creating an MQTT session that will
404425 only connect when the first attempt to subscribe or publish is made.
405426 """
406- return LazyMqttSession (RoborockMqttSession (params ))
427+ return LazyMqttSession (RoborockMqttSession (params ), diagnostics = params . diagnostics . subkey ( "lazy_mqtt" ) )
0 commit comments