Skip to content

Commit 722bf3e

Browse files
committed
Add basic tracing middleware and global control
1 parent 75cf450 commit 722bf3e

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

src/workflows/services/common_service.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@
1212
import workflows
1313
import workflows.logging
1414

15+
from opentelemetry import trace
16+
from opentelemetry.sdk.trace import TracerProvider
17+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
18+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
19+
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
20+
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
21+
1522

1623
class Status(enum.Enum):
1724
"""
@@ -185,6 +192,31 @@ def start_transport(self):
185192
self.transport.subscription_callback_set_intercept(
186193
self._transport_interceptor
187194
)
195+
196+
# Configure OTELTracing
197+
resource = Resource.create({
198+
SERVICE_NAME: self._service_name,
199+
})
200+
201+
self.log.debug("Configuring OTELTracing")
202+
provider = TracerProvider(resource=resource)
203+
trace.set_tracer_provider(provider)
204+
205+
# Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector
206+
otlp_exporter = OTLPSpanExporter(
207+
endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces",
208+
timeout=10
209+
)
210+
span_processor = BatchSpanProcessor(otlp_exporter)
211+
provider.add_span_processor(span_processor)
212+
213+
# Add OTELTracingMiddleware to the transport layer
214+
tracer = trace.get_tracer(__name__)
215+
otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name)
216+
self._transport.add_middleware(otel_middleware)
217+
218+
self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name)
219+
188220
metrics = self._environment.get("metrics")
189221
if metrics:
190222
import prometheus_client
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from opentelemetry import trace
2+
from workflows.transport.middleware import BaseTransportMiddleware
3+
from collections.abc import Callable
4+
import functools
5+
from opentelemetry.propagate import inject
6+
7+
class OTELTracingMiddleware(BaseTransportMiddleware):
8+
def __init__(self, tracer: trace.Tracer, service_name: str):
9+
"""
10+
Initialize the OpenTelemetry Tracing Middleware.
11+
12+
:param tracer: An OpenTelemetry tracer instance used to create spans.
13+
"""
14+
self.tracer = tracer
15+
self.service_name = service_name
16+
17+
18+
def send(self, call_next: Callable, destination, message, **kwargs):
19+
"""
20+
Middleware for tracing the `send` operation
21+
22+
:param call_next: The next middleware or the original `send` method.
23+
:param destination: The destination service to which the message is being sent.
24+
:param message: The message being sent.
25+
:param kwargs: Additional arguments for the `send` method.
26+
"""
27+
28+
# Start a new span for the `send` operation
29+
with self.tracer.start_as_current_span("transport.send") as span:
30+
# Attributes we're interested in
31+
span.set_attribute("service_name", self.service_name)
32+
span.set_attribute("destination", destination)
33+
span.set_attribute("message", str(message))
34+
35+
# Inject trace context into message headers
36+
headers = kwargs.setdefault("headers", {})
37+
inject(headers)
38+
kwargs["headers"] = headers
39+
40+
# Call the next middleware or the original `send` method
41+
return call_next(destination, message, **kwargs)
42+

0 commit comments

Comments
 (0)