88from pathlib import Path
99from typing import Self
1010
11- from bluesky_stomp .messaging import MessageContext
12- from observability_utils .tracing import get_tracer , start_as_current_span
11+ from bluesky_stomp .messaging import MessageContext , StompClient
12+ from bluesky_stomp .models import Broker
13+ from observability_utils .tracing import (
14+ get_tracer ,
15+ start_as_current_span ,
16+ )
1317
1418from blueapi .config import (
1519 ApplicationConfig ,
@@ -197,7 +201,7 @@ class BlueapiClient:
197201 """Unified client for controlling blueapi"""
198202
199203 _rest : BlueapiRestClient
200- _event_bus_client : EventBusClient | None
204+ _events : EventBusClient | None
201205 _instrument_session : str | None = None
202206 _callbacks : dict [int , OnAnyEvent ]
203207 _callback_id : itertools .count
@@ -208,7 +212,7 @@ def __init__(
208212 events : EventBusClient | None = None ,
209213 ):
210214 self ._rest = rest
211- self ._event_bus_client = events
215+ self ._events = events
212216 self ._callbacks = {}
213217 self ._callback_id = itertools .count ()
214218
@@ -226,8 +230,20 @@ def from_config(cls, config: ApplicationConfig) -> Self:
226230 except Exception :
227231 ... # Swallow exceptions
228232 rest = BlueapiRestClient (config .api , session_manager = session_manager )
229- event_bus = EventBusClient .from_stomp_config (config .stomp )
230- return cls (rest , event_bus )
233+ if config .stomp .enabled :
234+ assert config .stomp .url .host is not None , "Stomp URL missing host"
235+ assert config .stomp .url .port is not None , "Stomp URL missing port"
236+ client = StompClient .for_broker (
237+ broker = Broker (
238+ host = config .stomp .url .host ,
239+ port = config .stomp .url .port ,
240+ auth = config .stomp .auth ,
241+ )
242+ )
243+ events = EventBusClient (client )
244+ return cls (rest , events )
245+ else :
246+ return cls (rest )
231247
232248 @cached_property
233249 @start_as_current_span (TRACER )
@@ -444,7 +460,7 @@ def run_task(
444460 of task execution.
445461 """
446462
447- if ( event_bus := self ._event_bus ()) is None :
463+ if self ._events is None :
448464 raise MissingStompConfigurationError (
449465 "Stomp configuration required to run plans is missing or disabled"
450466 )
@@ -488,8 +504,8 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
488504 else :
489505 complete .set_result (event .task_status )
490506
491- with event_bus :
492- event_bus .subscribe_to_all_events (inner_on_event )
507+ with self . _events :
508+ self . _events .subscribe_to_all_events (inner_on_event )
493509 self ._rest .update_worker_task (WorkerTask (task_id = task_id ))
494510 return complete .result (timeout = timeout )
495511
@@ -728,10 +744,3 @@ def login(self, token_path: Path | None = None):
728744 auth .start_device_flow ()
729745 else :
730746 print ("Server is not configured to use authentication!" )
731-
732- def _event_bus (self ) -> EventBusClient | None :
733- if not self ._event_bus_client :
734- if stomp_config := self ._rest .get_stomp_config ():
735- self ._event_bus_client = EventBusClient .from_stomp_config (stomp_config )
736-
737- return self ._event_bus_client
0 commit comments