-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtracing_producer.py
More file actions
82 lines (63 loc) · 2.4 KB
/
tracing_producer.py
File metadata and controls
82 lines (63 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import logging
import typing
import opentelemetry
import opentelemetry.context
from opentelemetry.context import Context
import contextlib
from typing import Optional, Generator
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
# ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # type: ignore[attr-defined]
from opsqueue.producer import ProducerClient
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
def set_up_global_tracer() -> None:
"""
This is usually called once per app, at startup time.
"""
resource = Resource(
attributes={SERVICE_NAME: "tracing_with_opsqueue_example_producer"}
)
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
def do_something() -> None:
with trace.get_tracer(__name__).start_as_current_span("do_something"):
with added_baggage(baggage={"app_mode": "preview"}):
# print(opentelemetry.baggage.get_all())
client = ProducerClient(
"localhost:3999", "file:///tmp/opsqueue/tracing_example"
)
input_iter = range(0, 100)
output_iter = client.run_submission(input_iter, chunk_size=10)
# Now do something with the output:
# for x in output_iter:
# print(x)
print(sum(output_iter))
def main() -> None:
set_up_global_tracer()
do_something()
@contextlib.contextmanager
def added_baggage(
baggage: Optional[dict[str, str]] = None,
context: Optional[Context] = None,
) -> Generator[None, None, None]:
attached_context_tokens: list[Context] = list()
if baggage:
for key, value in baggage.items():
attached_token = opentelemetry.baggage.set_baggage(key, value, context)
attached_context_tokens.append(
typing.cast(Context, opentelemetry.context.attach(attached_token))
)
try:
yield
finally:
for attached_token in attached_context_tokens:
opentelemetry.context.detach(attached_token) # type: ignore[arg-type]
if __name__ == "__main__":
main()