From e7b0a3f444ab3de9e984bf9a20f2fea32d27f32a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 05:52:09 +0300 Subject: [PATCH] feat(otel): optional OpenTelemetry tracing module (ADR-0025) A new optional babelqueue-go/otel module (its own go.mod, so the core stays zero-dep) that emits produce/consume spans correlated across hops via trace_id<->OTel TraceID: WrapHandler (consumer span) and Publish (producer span). Wired into CI (transport-modules step) and release.yml (otel/v* tag). Envelope untouched (GR-1); opt-in. --- .github/workflows/ci.yml | 3 + .github/workflows/release.yml | 1 + otel/go.mod | 21 +++++ otel/go.sum | 15 +++ otel/tracing.go | 144 +++++++++++++++++++++++++++++ otel/tracing_test.go | 168 ++++++++++++++++++++++++++++++++++ 6 files changed, 352 insertions(+) create mode 100644 otel/go.mod create mode 100644 otel/go.sum create mode 100644 otel/tracing.go create mode 100644 otel/tracing_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c16f3a..de07fa7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,6 +45,9 @@ jobs: # client), so they run here across the Go matrix; the live ElasticMQ # round-trip is `-tags=integration` in the integration job below. (cd sqs && go build ./... && go vet ./... && go test -race -count=1 ./...) + # The optional OpenTelemetry module (ADR-0025) — its tests use an in-memory + # span recorder, so they are network-free and run across the Go matrix. + (cd otel && go build ./... && go vet ./... && go test -race -count=1 ./...) - name: Build & vet the Azure Service Bus module # azure-messaging-servicebus requires Go 1.23+, so build/test this module only diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ccaf960..1c38658 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,6 +13,7 @@ on: - "artemis/v*" # github.com/babelqueue/babelqueue-go/artemis - "asynq/v*" # github.com/babelqueue/babelqueue-go/asynq - "machinery/v*" # github.com/babelqueue/babelqueue-go/machinery + - "otel/v*" # github.com/babelqueue/babelqueue-go/otel permissions: contents: write diff --git a/otel/go.mod b/otel/go.mod new file mode 100644 index 0000000..ec9117b --- /dev/null +++ b/otel/go.mod @@ -0,0 +1,21 @@ +module github.com/babelqueue/babelqueue-go/otel + +go 1.21 + +require ( + github.com/babelqueue/babelqueue-go v1.3.0 + go.opentelemetry.io/otel v1.24.0 + go.opentelemetry.io/otel/sdk v1.24.0 + go.opentelemetry.io/otel/trace v1.24.0 +) + +require ( + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect + golang.org/x/sys v0.17.0 // indirect +) + +// In-repo development: resolve the core locally. Consumers ignore replace +// directives in dependencies and use the required version from the proxy. +replace github.com/babelqueue/babelqueue-go => ../ diff --git a/otel/go.sum b/otel/go.sum new file mode 100644 index 0000000..dc1df5c --- /dev/null +++ b/otel/go.sum @@ -0,0 +1,15 @@ +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/otel/tracing.go b/otel/tracing.go new file mode 100644 index 0000000..24a7185 --- /dev/null +++ b/otel/tracing.go @@ -0,0 +1,144 @@ +// Package otel adds optional OpenTelemetry tracing to a babelqueue producer or consumer +// (ADR-0025): produce/consume spans correlated across every hop and SDK through the +// envelope's trace_id, which maps 1:1 to an OTel TraceID. It lives in its own module so the +// zero-dependency core never imports OpenTelemetry — wiring a TracerProvider is opt-in. +package otel + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "strings" + + babelqueue "github.com/babelqueue/babelqueue-go" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +const system = "babelqueue" + +// TraceIDOf maps an envelope trace_id to a deterministic OTel TraceID: a UUID maps to its +// 16 raw bytes; any other string is hashed (SHA-256, first 16 bytes). The inverse of +// [UUIDOf] for the UUID case. +func TraceIDOf(traceID string) trace.TraceID { + if raw, ok := uuidBytes(traceID); ok { + var t trace.TraceID + copy(t[:], raw) + if t.IsValid() { + return t + } + } + sum := sha256.Sum256([]byte(traceID)) + var t trace.TraceID + copy(t[:], sum[:]) + return t +} + +// UUIDOf formats an OTel TraceID (16 bytes) as a canonical UUID string — the form a producer +// stamps into the message's trace_id so a consumer can recover the same TraceID. +func UUIDOf(t trace.TraceID) string { + b := t[:] + return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) +} + +func uuidBytes(s string) ([]byte, bool) { + h := strings.ReplaceAll(s, "-", "") + if len(h) != 32 { + return nil, false + } + raw, err := hex.DecodeString(h) + if err != nil { + return nil, false + } + return raw, true +} + +// spanIDOf derives a deterministic, non-zero SpanID from the trace_id so the remote parent +// SpanContext is valid (a span needs a valid parent to inherit a specific trace). +func spanIDOf(traceID string) trace.SpanID { + sum := sha256.Sum256([]byte("babelqueue-span:" + traceID)) + var s trace.SpanID + copy(s[:], sum[:8]) + if !s.IsValid() { + s[7] = 1 + } + return s +} + +// parentOf returns ctx carrying a remote parent SpanContext in the trace_id-derived trace, +// so a span started from it lands in that trace (cross-hop correlation). +func parentOf(ctx context.Context, traceID string) context.Context { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: TraceIDOf(traceID), + SpanID: spanIDOf(traceID), + TraceFlags: trace.FlagsSampled, + Remote: true, + }) + return trace.ContextWithRemoteSpanContext(ctx, sc) +} + +// WrapHandler returns handler decorated to emit a CONSUMER span per message, in the OTel +// trace derived from the envelope's trace_id, recording the handler's error/status. Register +// it like any handler: app.Handle(urn, otel.WrapHandler(tracer, handler)). +func WrapHandler(tracer trace.Tracer, handler babelqueue.Handler) babelqueue.Handler { + return func(ctx context.Context, env babelqueue.Envelope) error { + ctx = parentOf(ctx, env.TraceID) + ctx, span := tracer.Start(ctx, "process "+env.URN(), + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes(consumeAttrs(env)...), + ) + defer span.End() + + err := handler(ctx, env) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err + } +} + +// Publish starts a PRODUCER span "publish ", carries the active trace's TraceID into the +// message's trace_id, and publishes via app — so the downstream consumer recovers the same +// trace. It otherwise behaves like [babelqueue.App.Publish], returning the message id. +func Publish( + ctx context.Context, + tracer trace.Tracer, + app *babelqueue.App, + urn string, + data map[string]any, + opts ...babelqueue.Option, +) (string, error) { + ctx, span := tracer.Start(ctx, "publish "+urn, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + attribute.String("messaging.system", system), + attribute.String("messaging.operation", "publish"), + attribute.String("messaging.destination.name", urn), + ), + ) + defer span.End() + + traceID := UUIDOf(span.SpanContext().TraceID()) + id, err := app.Publish(ctx, urn, data, append(opts, babelqueue.WithTraceID(traceID))...) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return "", err + } + span.SetAttributes(attribute.String("messaging.message.id", id)) + return id, nil +} + +func consumeAttrs(env babelqueue.Envelope) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("messaging.system", system), + attribute.String("messaging.operation", "process"), + attribute.String("messaging.destination.name", env.Meta.Queue), + attribute.String("messaging.message.id", env.Meta.ID), + attribute.String("messaging.message.conversation_id", env.TraceID), + attribute.Int("messaging.babelqueue.attempts", env.Attempts), + } +} diff --git a/otel/tracing_test.go b/otel/tracing_test.go new file mode 100644 index 0000000..6cb91ef --- /dev/null +++ b/otel/tracing_test.go @@ -0,0 +1,168 @@ +package otel + +import ( + "context" + "errors" + "testing" + "time" + + babelqueue "github.com/babelqueue/babelqueue-go" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" +) + +// failTransport is a Transport whose Publish always errors, to exercise the producer span's +// error path. +type failTransport struct{} + +func (failTransport) Publish(context.Context, string, string) error { + return errors.New("transport publish failed") +} + +func (failTransport) Pop(context.Context, string, time.Duration) (*babelqueue.ReceivedMessage, error) { + return nil, nil +} + +func (failTransport) Ack(context.Context, *babelqueue.ReceivedMessage) error { return nil } + +func recorder(t *testing.T) (trace.Tracer, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + return tp.Tracer("test"), sr +} + +func TestTraceIDMappingRoundTrip(t *testing.T) { + uuid := "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b" + tid := TraceIDOf(uuid) + if !tid.IsValid() { + t.Fatal("derived TraceID is invalid") + } + if got := UUIDOf(tid); got != uuid { + t.Fatalf("round-trip: got %q, want %q", got, uuid) + } + a, b := TraceIDOf("not-a-uuid"), TraceIDOf("not-a-uuid") + if a != b || !a.IsValid() { + t.Fatal("a non-uuid trace_id must map deterministically to a valid TraceID") + } + if a == tid { + t.Fatal("different inputs should map to different traces") + } +} + +func TestWrapHandler_SpanInTraceIDTraceWithAttrs(t *testing.T) { + tracer, sr := recorder(t) + called := false + h := WrapHandler(tracer, func(ctx context.Context, _ babelqueue.Envelope) error { + called = true + if !trace.SpanContextFromContext(ctx).IsValid() { + t.Error("handler ctx carries no active span") + } + return nil + }) + env, _ := babelqueue.Make("urn:babel:orders:created", map[string]any{"order_id": 1}) + + if err := h(context.Background(), env); err != nil { + t.Fatal(err) + } + if !called { + t.Fatal("handler not called") + } + + spans := sr.Ended() + if len(spans) != 1 { + t.Fatalf("want 1 span, got %d", len(spans)) + } + s := spans[0] + if s.Name() != "process "+env.URN() { + t.Errorf("name = %q", s.Name()) + } + if s.SpanKind() != trace.SpanKindConsumer { + t.Errorf("kind = %v, want consumer", s.SpanKind()) + } + if s.SpanContext().TraceID() != TraceIDOf(env.TraceID) { + t.Error("the span is not in the trace_id-derived trace") + } + if !hasStringAttr(s.Attributes(), "messaging.message.conversation_id", env.TraceID) { + t.Error("missing conversation_id == trace_id attribute") + } +} + +func TestWrapHandler_RecordsError(t *testing.T) { + tracer, sr := recorder(t) + boom := errors.New("boom") + h := WrapHandler(tracer, func(context.Context, babelqueue.Envelope) error { return boom }) + env, _ := babelqueue.Make("urn:babel:orders:created", nil) + + if err := h(context.Background(), env); !errors.Is(err, boom) { + t.Fatalf("err = %v", err) + } + s := sr.Ended()[0] + if s.Status().Code != codes.Error { + t.Errorf("status = %v, want Error", s.Status().Code) + } + if len(s.Events()) == 0 { + t.Error("expected a recorded error event") + } +} + +func TestPublish_StampsTraceIDFromSpan(t *testing.T) { + tracer, sr := recorder(t) + transport := babelqueue.NewInMemoryTransport() + app := babelqueue.NewApp(transport) + + id, err := Publish(context.Background(), tracer, app, "urn:babel:orders:created", map[string]any{"order_id": 7}) + if err != nil { + t.Fatal(err) + } + if id == "" { + t.Fatal("empty message id") + } + + spans := sr.Ended() + if len(spans) != 1 || spans[0].SpanKind() != trace.SpanKindProducer { + t.Fatalf("want 1 producer span, got %+v", spans) + } + + msg, err := transport.Pop(context.Background(), "default", 0) + if err != nil || msg == nil { + t.Fatalf("pop: msg=%v err=%v", msg, err) + } + env, err := babelqueue.Decode([]byte(msg.Body)) + if err != nil { + t.Fatal(err) + } + // the published message's trace_id encodes the producer span's trace, so a consumer + // recovers the same trace. + if env.TraceID != UUIDOf(spans[0].SpanContext().TraceID()) { + t.Errorf("message trace_id %q does not encode the producer span's trace", env.TraceID) + } + if TraceIDOf(env.TraceID) != spans[0].SpanContext().TraceID() { + t.Error("a consumer would not recover the producer's trace from this trace_id") + } +} + +func TestPublish_RecordsError(t *testing.T) { + tracer, sr := recorder(t) + app := babelqueue.NewApp(failTransport{}) + + if _, err := Publish(context.Background(), tracer, app, "urn:babel:orders:created", nil); err == nil { + t.Fatal("expected a publish error") + } + s := sr.Ended()[0] + if s.Status().Code != codes.Error { + t.Errorf("status = %v, want Error", s.Status().Code) + } +} + +func hasStringAttr(attrs []attribute.KeyValue, key, val string) bool { + for _, a := range attrs { + if string(a.Key) == key && a.Value.AsString() == val { + return true + } + } + return false +}