99from opentelemetry .propagate import extract , inject
1010
1111from workflows .transport .common_transport import MessageCallback , TemporarySubscription
12-
13-
1412class OTELTracingMiddleware :
1513 def __init__ (self , tracer : trace .Tracer , service_name : str ):
1614 self .tracer = tracer
1715 self .service_name = service_name
1816
17+ def _set_span_attributes (self , span , ** attributes ):
18+ """Helper method to set common span attributes"""
19+ span .set_attribute ("service_name" , self .service_name )
20+ for key , value in attributes .items ():
21+ if value is not None :
22+ span .set_attribute (key , value )
23+
1924 def send (self , call_next : Callable , destination : str , message , ** kwargs ):
2025 # Get current span context (may be None if this is the root span)
2126 current_span = trace .get_current_span ()
@@ -27,11 +32,7 @@ def send(self, call_next: Callable, destination: str, message, **kwargs):
2732 "transport.send" ,
2833 context = parent_context ,
2934 ) as span :
30- span .set_attribute ("service_name" , self .service_name )
31-
32- span .set_attribute ("message" , json .dumps (message ))
33- span .set_attribute ("destination" , destination )
34- print ("parent_context is..." , parent_context )
35+ self ._set_span_attributes (span , destination = destination )
3536
3637 # Inject the current trace context into the message headers
3738 headers = kwargs .get ("headers" , {})
@@ -55,10 +56,7 @@ def wrapped_callback(header, message):
5556 "transport.subscribe" ,
5657 context = ctx ,
5758 ) as span :
58- span .set_attribute ("service_name" , self .service_name )
59-
60- span .set_attribute ("message" , json .dumps (message ))
61- span .set_attribute ("channel" , channel )
59+ self ._set_span_attributes (span , channel = channel )
6260
6361 # Call the original callback - this will process the message
6462 # and potentially call send() which will pick up this context
@@ -74,15 +72,12 @@ def wrapped_callback(header, message):
7472 # Extract trace context from message headers
7573 ctx = extract (header ) if header else Context ()
7674
77- # # Start a new span with the extracted context
75+ # Start a new span with the extracted context
7876 with self .tracer .start_as_current_span (
7977 "transport.subscribe_broadcast" ,
8078 context = ctx ,
8179 ) as span :
82- span .set_attribute ("service_name" , self .service_name )
83-
84- span .set_attribute ("message" , json .dumps (message ))
85- span .set_attribute ("channel" , channel )
80+ self ._set_span_attributes (span , channel = channel )
8681
8782 return callback (header , message )
8883
@@ -105,11 +100,7 @@ def wrapped_callback(header, message):
105100 "transport.subscribe_temporary" ,
106101 context = ctx ,
107102 ) as span :
108- span .set_attribute ("service_name" , self .service_name )
109-
110- span .set_attribute ("message" , json .dumps (message ))
111- if channel_hint :
112- span .set_attribute ("channel_hint" , channel_hint )
103+ self ._set_span_attributes (span , channel_hint = channel_hint )
113104
114105 return callback (header , message )
115106
@@ -132,8 +123,7 @@ def unsubscribe(
132123 "transport.unsubscribe" ,
133124 context = current_context ,
134125 ) as span :
135- span .set_attribute ("service_name" , self .service_name )
136- span .set_attribute ("subscription_id" , subscription )
126+ self ._set_span_attributes (span , subscription_id = subscription )
137127
138128 call_next (
139129 subscription , drop_callback_reference = drop_callback_reference , ** kwargs
@@ -156,10 +146,7 @@ def ack(
156146 "transport.ack" ,
157147 context = current_context ,
158148 ) as span :
159- span .set_attribute ("service_name" , self .service_name )
160- span .set_attribute ("message" , json .dumps (message ))
161- if subscription_id :
162- span .set_attribute ("subscription_id" , subscription_id )
149+ self ._set_span_attributes (span , subscription_id = subscription_id )
163150
164151 call_next (message , subscription_id = subscription_id , ** kwargs )
165152
@@ -180,11 +167,7 @@ def nack(
180167 "transport.nack" ,
181168 context = current_context ,
182169 ) as span :
183- span .set_attribute ("service_name" , self .service_name )
184-
185- span .set_attribute ("message" , json .dumps (message ))
186- if subscription_id :
187- span .set_attribute ("subscription_id" , subscription_id )
170+ self ._set_span_attributes (span , subscription_id = subscription_id )
188171
189172 call_next (message , subscription_id = subscription_id , ** kwargs )
190173
@@ -202,10 +185,7 @@ def transaction_begin(
202185 "transaction.begin" ,
203186 context = current_context ,
204187 ) as span :
205- span .set_attribute ("service_name" , self .service_name )
206-
207- if subscription_id :
208- span .set_attribute ("subscription_id" , subscription_id )
188+ self ._set_span_attributes (span , subscription_id = subscription_id )
209189
210190 return call_next (subscription_id = subscription_id , ** kwargs )
211191
@@ -223,10 +203,7 @@ def transaction_abort(
223203 "transaction.abort" ,
224204 context = current_context ,
225205 ) as span :
226- span .set_attribute ("service_name" , self .service_name )
227-
228- if transaction_id :
229- span .set_attribute ("transaction_id" , transaction_id )
206+ self ._set_span_attributes (span , transaction_id = transaction_id )
230207
231208 call_next (transaction_id = transaction_id , ** kwargs )
232209
@@ -244,8 +221,6 @@ def transaction_commit(
244221 "transaction.commit" ,
245222 context = current_context ,
246223 ) as span :
247- span .set_attribute ("service_name" , self .service_name )
248- if transaction_id :
249- span .set_attribute ("transaction_id" , transaction_id )
224+ self ._set_span_attributes (span , transaction_id = transaction_id )
250225
251226 call_next (transaction_id = transaction_id , ** kwargs )
0 commit comments