Skip to content

Commit a243170

Browse files
authored
Added OTEL tracing (#196)
* Add basic tracing middleware and global control * Instrument on subscribe and add dcid to span attributes * Add spanid and traceid metadata to graylog * configured via zocalo plugin
1 parent 75cf450 commit a243170

File tree

6 files changed

+320
-4
lines changed

6 files changed

+320
-4
lines changed

pyproject.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,15 @@ classifiers = [
2020
]
2121
license = { text = "BSD-3-Clause" }
2222
requires-python = ">=3.10"
23-
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
23+
dependencies = [
24+
"bidict",
25+
"pika",
26+
"setuptools",
27+
"stomp-py>=7",
28+
"opentelemetry-api",
29+
"opentelemetry-sdk",
30+
"opentelemetry-exporter-otlp-proto-http"
31+
]
2432

2533
[project.urls]
2634
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
@@ -53,6 +61,7 @@ OfflineTransport = "workflows.transport.offline_transport:OfflineTransport"
5361
pika = "workflows.util.zocalo.configuration:Pika"
5462
stomp = "workflows.util.zocalo.configuration:Stomp"
5563
transport = "workflows.util.zocalo.configuration:DefaultTransport"
64+
opentelemetry = "workflows.util.zocalo.configuration:OTEL"
5665

5766
[project.scripts]
5867
"workflows.validate_recipe" = "workflows.recipe.validate:main"

requirements_dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ pytest-cov==6.0.0
66
pytest-mock==3.14.0
77
pytest-timeout==2.3.1
88
stomp-py==8.1.2
9-
websocket-client==1.8.0
9+
websocket-client==1.8.0

src/workflows/recipe/__init__.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import functools
44
import logging
55
from collections.abc import Callable
6+
from contextlib import ExitStack
67
from typing import Any
78

9+
from opentelemetry import trace
10+
811
from workflows.recipe.recipe import Recipe
912
from workflows.recipe.validate import validate_recipe
1013
from workflows.recipe.wrapper import RecipeWrapper
@@ -69,10 +72,30 @@ def unwrap_recipe(header, message):
6972
message = mangle_for_receiving(message)
7073
if header.get("workflows-recipe") in {True, "True", "true", 1}:
7174
rw = RecipeWrapper(message=message, transport=transport_layer)
72-
if log_extender and rw.environment and rw.environment.get("ID"):
73-
with log_extender("recipe_ID", rw.environment["ID"]):
75+
76+
if log_extender and rw.environment.get("ID"):
77+
# Extract recipe ID from environment and add to current span
78+
span = trace.get_current_span()
79+
recipe_id = rw.environment["ID"]
80+
span.set_attribute("recipe_id", recipe_id)
81+
82+
# Extract span_id and trace_id for logging
83+
span_context = span.get_span_context()
84+
85+
with ExitStack() as stack:
86+
# Configure the context depending on if service is emitting valid spans
87+
stack.enter_context(log_extender("recipe_ID", recipe_id))
88+
if span_context.is_valid:
89+
stack.enter_context(
90+
log_extender("span_id", span_context.span_id)
91+
)
92+
stack.enter_context(
93+
log_extender("trace_id", span_context.trace_id)
94+
)
7495
return callback(rw, header, message.get("payload"))
96+
7597
return callback(rw, header, message.get("payload"))
98+
7699
if allow_non_recipe_messages:
77100
return callback(None, header, message)
78101
# self.log.warning('Discarding non-recipe message:\n' + \

src/workflows/services/common_service.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,15 @@
99
import time
1010
from typing import Any
1111

12+
from opentelemetry import trace
13+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
15+
from opentelemetry.sdk.trace import TracerProvider
16+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
17+
1218
import workflows
1319
import workflows.logging
20+
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
1421

1522

1623
class Status(enum.Enum):
@@ -185,6 +192,36 @@ def start_transport(self):
185192
self.transport.subscription_callback_set_intercept(
186193
self._transport_interceptor
187194
)
195+
196+
# Configure OTELTracing if configuration is available
197+
otel_config = getattr(self.config, "_opentelemetry", None)
198+
if otel_config:
199+
# Configure OTELTracing
200+
resource = Resource.create(
201+
{
202+
SERVICE_NAME: self._service_name,
203+
}
204+
)
205+
206+
self.log.debug("Configuring OTELTracing")
207+
provider = TracerProvider(resource=resource)
208+
trace.set_tracer_provider(provider)
209+
210+
# Configure BatchProcessor and OTLPSpanExporter using config values
211+
otlp_exporter = OTLPSpanExporter(
212+
endpoint=otel_config["endpoint"],
213+
timeout=otel_config.get("timeout", 10),
214+
)
215+
span_processor = BatchSpanProcessor(otlp_exporter)
216+
provider.add_span_processor(span_processor)
217+
218+
# Add OTELTracingMiddleware to the transport layer
219+
tracer = trace.get_tracer(__name__)
220+
otel_middleware = OTELTracingMiddleware(
221+
tracer, service_name=self._service_name
222+
)
223+
self._transport.add_middleware(otel_middleware)
224+
188225
metrics = self._environment.get("metrics")
189226
if metrics:
190227
import prometheus_client
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
from __future__ import annotations
2+
3+
import functools
4+
from collections.abc import Callable
5+
6+
from opentelemetry import trace
7+
from opentelemetry.context import Context
8+
from opentelemetry.propagate import extract, inject
9+
10+
from workflows.transport.common_transport import MessageCallback, TemporarySubscription
11+
12+
13+
class OTELTracingMiddleware:
14+
def __init__(self, tracer: trace.Tracer, service_name: str):
15+
self.tracer = tracer
16+
self.service_name = service_name
17+
18+
def _set_span_attributes(self, span, **attributes):
19+
"""Helper method to set common span attributes"""
20+
span.set_attribute("service_name", self.service_name)
21+
for key, value in attributes.items():
22+
if value is not None:
23+
span.set_attribute(key, value)
24+
25+
def send(self, call_next: Callable, destination: str, message, **kwargs):
26+
# Get current span context (may be None if this is the root span)
27+
current_span = trace.get_current_span()
28+
parent_context = (
29+
trace.set_span_in_context(current_span) if current_span else None
30+
)
31+
32+
with self.tracer.start_as_current_span(
33+
"transport.send",
34+
context=parent_context,
35+
) as span:
36+
self._set_span_attributes(span, destination=destination)
37+
38+
# Inject the current trace context into the message headers
39+
headers = kwargs.get("headers", {})
40+
if headers is None:
41+
headers = {}
42+
inject(headers) # This modifies headers in-place
43+
kwargs["headers"] = headers
44+
45+
return call_next(destination, message, **kwargs)
46+
47+
def subscribe(
48+
self, call_next: Callable, channel: str, callback: Callable, **kwargs
49+
) -> int:
50+
@functools.wraps(callback)
51+
def wrapped_callback(header, message):
52+
# Extract trace context from message headers
53+
ctx = extract(header) if header else Context()
54+
55+
# Start a new span with the extracted context
56+
with self.tracer.start_as_current_span(
57+
"transport.subscribe",
58+
context=ctx,
59+
) as span:
60+
self._set_span_attributes(span, channel=channel)
61+
62+
# Call the original callback - this will process the message
63+
# and potentially call send() which will pick up this context
64+
return callback(header, message)
65+
66+
return call_next(channel, wrapped_callback, **kwargs)
67+
68+
def subscribe_broadcast(
69+
self, call_next: Callable, channel: str, callback: Callable, **kwargs
70+
) -> int:
71+
@functools.wraps(callback)
72+
def wrapped_callback(header, message):
73+
# Extract trace context from message headers
74+
ctx = extract(header) if header else Context()
75+
76+
# Start a new span with the extracted context
77+
with self.tracer.start_as_current_span(
78+
"transport.subscribe_broadcast",
79+
context=ctx,
80+
) as span:
81+
self._set_span_attributes(span, channel=channel)
82+
83+
return callback(header, message)
84+
85+
return call_next(channel, wrapped_callback, **kwargs)
86+
87+
def subscribe_temporary(
88+
self,
89+
call_next: Callable,
90+
channel_hint: str | None,
91+
callback: MessageCallback,
92+
**kwargs,
93+
) -> TemporarySubscription:
94+
@functools.wraps(callback)
95+
def wrapped_callback(header, message):
96+
# Extract trace context from message headers
97+
ctx = extract(header) if header else Context()
98+
99+
# Start a new span with the extracted context
100+
with self.tracer.start_as_current_span(
101+
"transport.subscribe_temporary",
102+
context=ctx,
103+
) as span:
104+
self._set_span_attributes(span, channel_hint=channel_hint)
105+
106+
return callback(header, message)
107+
108+
return call_next(channel_hint, wrapped_callback, **kwargs)
109+
110+
def unsubscribe(
111+
self,
112+
call_next: Callable,
113+
subscription: int,
114+
drop_callback_reference=False,
115+
**kwargs,
116+
):
117+
# Get current span context
118+
current_span = trace.get_current_span()
119+
current_context = (
120+
trace.set_span_in_context(current_span) if current_span else Context()
121+
)
122+
123+
with self.tracer.start_as_current_span(
124+
"transport.unsubscribe",
125+
context=current_context,
126+
) as span:
127+
self._set_span_attributes(span, subscription_id=subscription)
128+
129+
call_next(
130+
subscription, drop_callback_reference=drop_callback_reference, **kwargs
131+
)
132+
133+
def ack(
134+
self,
135+
call_next: Callable,
136+
message,
137+
subscription_id: int | None = None,
138+
**kwargs,
139+
):
140+
# Get current span context
141+
current_span = trace.get_current_span()
142+
current_context = (
143+
trace.set_span_in_context(current_span) if current_span else Context()
144+
)
145+
146+
with self.tracer.start_as_current_span(
147+
"transport.ack",
148+
context=current_context,
149+
) as span:
150+
self._set_span_attributes(span, subscription_id=subscription_id)
151+
152+
call_next(message, subscription_id=subscription_id, **kwargs)
153+
154+
def nack(
155+
self,
156+
call_next: Callable,
157+
message,
158+
subscription_id: int | None = None,
159+
**kwargs,
160+
):
161+
# Get current span context
162+
current_span = trace.get_current_span()
163+
current_context = (
164+
trace.set_span_in_context(current_span) if current_span else Context()
165+
)
166+
167+
with self.tracer.start_as_current_span(
168+
"transport.nack",
169+
context=current_context,
170+
) as span:
171+
self._set_span_attributes(span, subscription_id=subscription_id)
172+
173+
call_next(message, subscription_id=subscription_id, **kwargs)
174+
175+
def transaction_begin(
176+
self, call_next: Callable, subscription_id: int | None = None, **kwargs
177+
) -> int:
178+
"""Start a new transaction span"""
179+
# Get current span context (may be None if this is the root span)
180+
current_span = trace.get_current_span()
181+
current_context = (
182+
trace.set_span_in_context(current_span) if current_span else Context()
183+
)
184+
185+
with self.tracer.start_as_current_span(
186+
"transaction.begin",
187+
context=current_context,
188+
) as span:
189+
self._set_span_attributes(span, subscription_id=subscription_id)
190+
191+
return call_next(subscription_id=subscription_id, **kwargs)
192+
193+
def transaction_abort(
194+
self, call_next: Callable, transaction_id: int | None = None, **kwargs
195+
):
196+
"""Abort a transaction span"""
197+
# Get current span context
198+
current_span = trace.get_current_span()
199+
current_context = (
200+
trace.set_span_in_context(current_span) if current_span else Context()
201+
)
202+
203+
with self.tracer.start_as_current_span(
204+
"transaction.abort",
205+
context=current_context,
206+
) as span:
207+
self._set_span_attributes(span, transaction_id=transaction_id)
208+
209+
call_next(transaction_id=transaction_id, **kwargs)
210+
211+
def transaction_commit(
212+
self, call_next: Callable, transaction_id: int | None = None, **kwargs
213+
):
214+
"""Commit a transaction span"""
215+
# Get current span context
216+
current_span = trace.get_current_span()
217+
current_context = (
218+
trace.set_span_in_context(current_span) if current_span else Context()
219+
)
220+
221+
with self.tracer.start_as_current_span(
222+
"transaction.commit",
223+
context=current_context,
224+
) as span:
225+
self._set_span_attributes(span, transaction_id=transaction_id)
226+
227+
call_next(transaction_id=transaction_id, **kwargs)

src/workflows/util/zocalo/configuration.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,26 @@
88
from workflows.transport.stomp_transport import StompTransport
99

1010

11+
class OTEL:
12+
"""A Zocalo configuration plugin to pre-populate OTELTracing config defaults"""
13+
14+
class Schema(PluginSchema):
15+
host = fields.Str(required=True)
16+
port = fields.Int(required=True)
17+
timeout = fields.Int(required=False, load_default=10)
18+
19+
# Store configuration for access by services
20+
config = {}
21+
22+
@staticmethod
23+
def activate(configuration):
24+
# Build the full endpoint URL
25+
endpoint = f"https://{configuration['host']}:{configuration['port']}/v1/traces"
26+
OTEL.config["endpoint"] = endpoint
27+
OTEL.config["timeout"] = configuration.get("timeout", 10)
28+
return OTEL.config
29+
30+
1131
class Stomp:
1232
"""A Zocalo configuration plugin to pre-populate StompTransport config defaults"""
1333

0 commit comments

Comments
 (0)