Skip to content

Commit 52cb04d

Browse files
committed
Instrument on subscribe and add dcid to span attributes
1 parent 722bf3e commit 52cb04d

File tree

2 files changed

+54
-26
lines changed

2 files changed

+54
-26
lines changed

src/workflows/recipe/__init__.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import functools
44
import logging
55
from collections.abc import Callable
6+
from opentelemetry import trace
67
from typing import Any
78

89
from workflows.recipe.recipe import Recipe
@@ -69,6 +70,39 @@ def unwrap_recipe(header, message):
6970
message = mangle_for_receiving(message)
7071
if header.get("workflows-recipe") in {True, "True", "true", 1}:
7172
rw = RecipeWrapper(message=message, transport=transport_layer)
73+
print(rw)
74+
logger.log(1, rw)
75+
76+
# Extract and set DCID on the current span
77+
span = trace.get_current_span()
78+
dcid = None
79+
80+
# Try multiple locations where DCID might be stored
81+
top_level_params = {}
82+
if isinstance(message, dict):
83+
# Direct parameters (top-level or in recipe)
84+
top_level_params = message.get("parameters", {})
85+
86+
# Payload parameters (most common location)
87+
payload = message.get("payload", {})
88+
payload_params = {}
89+
if isinstance(payload, dict):
90+
payload_params = payload.get("parameters", {})
91+
92+
# Try all common locations
93+
dcid = (
94+
top_level_params.get("ispyb_dcid") or
95+
top_level_params.get("dcid") or
96+
payload_params.get("ispyb_dcid") or
97+
payload_params.get("dcid") or
98+
payload.get("ispyb_dcid") or
99+
payload.get("dcid")
100+
)
101+
102+
if dcid:
103+
span.set_attribute("dcid", dcid)
104+
span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid})
105+
72106
if log_extender and rw.environment and rw.environment.get("ID"):
73107
with log_extender("recipe_ID", rw.environment["ID"]):
74108
return callback(rw, header, message.get("payload"))

src/workflows/transport/middleware/otel_tracing.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from workflows.transport.middleware import BaseTransportMiddleware
33
from collections.abc import Callable
44
import functools
5-
from opentelemetry.propagate import inject
5+
from opentelemetry.propagate import inject, extract
66

77
class OTELTracingMiddleware(BaseTransportMiddleware):
88
def __init__(self, tracer: trace.Tracer, service_name: str):
@@ -14,29 +14,23 @@ def __init__(self, tracer: trace.Tracer, service_name: str):
1414
self.tracer = tracer
1515
self.service_name = service_name
1616

17-
18-
def send(self, call_next: Callable, destination, message, **kwargs):
19-
"""
20-
Middleware for tracing the `send` operation
21-
22-
:param call_next: The next middleware or the original `send` method.
23-
:param destination: The destination service to which the message is being sent.
24-
:param message: The message being sent.
25-
:param kwargs: Additional arguments for the `send` method.
26-
"""
27-
28-
# Start a new span for the `send` operation
29-
with self.tracer.start_as_current_span("transport.send") as span:
30-
# Attributes we're interested in
31-
span.set_attribute("service_name", self.service_name)
32-
span.set_attribute("destination", destination)
33-
span.set_attribute("message", str(message))
17+
def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
18+
@functools.wraps(callback)
19+
def wrapped_callback(header, message):
20+
# Extract trace context from message headers
21+
ctx = extract(header) if header else None
3422

35-
# Inject trace context into message headers
36-
headers = kwargs.setdefault("headers", {})
37-
inject(headers)
38-
kwargs["headers"] = headers
39-
40-
# Call the next middleware or the original `send` method
41-
return call_next(destination, message, **kwargs)
42-
23+
# Start a new span with the extracted context
24+
with self.tracer.start_as_current_span(
25+
"transport.subscribe",
26+
context=ctx
27+
) as span:
28+
span.set_attribute("service_name", self.service_name)
29+
span.set_attribute("channel", channel)
30+
31+
32+
# Call the original callback
33+
return callback(header, message)
34+
35+
# Call the next middleware with the wrapped callback
36+
return call_next(channel, wrapped_callback, **kwargs)

0 commit comments

Comments
 (0)