Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ jobs:
# client), so they run here across the Go matrix; the live ElasticMQ
# round-trip is `-tags=integration` in the integration job below.
(cd sqs && go build ./... && go vet ./... && go test -race -count=1 ./...)
# The optional OpenTelemetry module (ADR-0025) — its tests use an in-memory
# span recorder, so they are network-free and run across the Go matrix.
(cd otel && go build ./... && go vet ./... && go test -race -count=1 ./...)

- name: Build & vet the Azure Service Bus module
# azure-messaging-servicebus requires Go 1.23+, so build/test this module only
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
- "artemis/v*" # github.com/babelqueue/babelqueue-go/artemis
- "asynq/v*" # github.com/babelqueue/babelqueue-go/asynq
- "machinery/v*" # github.com/babelqueue/babelqueue-go/machinery
- "otel/v*" # github.com/babelqueue/babelqueue-go/otel

permissions:
contents: write
Expand Down
21 changes: 21 additions & 0 deletions otel/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module github.com/babelqueue/babelqueue-go/otel

go 1.21

require (
github.com/babelqueue/babelqueue-go v1.3.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
)

require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
golang.org/x/sys v0.17.0 // indirect
)

// In-repo development: resolve the core locally. Consumers ignore replace
// directives in dependencies and use the required version from the proxy.
replace github.com/babelqueue/babelqueue-go => ../
15 changes: 15 additions & 0 deletions otel/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
144 changes: 144 additions & 0 deletions otel/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Package otel adds optional OpenTelemetry tracing to a babelqueue producer or consumer
// (ADR-0025): produce/consume spans correlated across every hop and SDK through the
// envelope's trace_id, which maps 1:1 to an OTel TraceID. It lives in its own module so the
// zero-dependency core never imports OpenTelemetry — wiring a TracerProvider is opt-in.
package otel

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"

babelqueue "github.com/babelqueue/babelqueue-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const system = "babelqueue"

// TraceIDOf maps an envelope trace_id to a deterministic OTel TraceID: a UUID maps to its
// 16 raw bytes; any other string is hashed (SHA-256, first 16 bytes). The inverse of
// [UUIDOf] for the UUID case.
func TraceIDOf(traceID string) trace.TraceID {
if raw, ok := uuidBytes(traceID); ok {
var t trace.TraceID
copy(t[:], raw)
if t.IsValid() {
return t
}
}
sum := sha256.Sum256([]byte(traceID))
var t trace.TraceID
copy(t[:], sum[:])
return t
}

// UUIDOf formats an OTel TraceID (16 bytes) as a canonical UUID string — the form a producer
// stamps into the message's trace_id so a consumer can recover the same TraceID.
func UUIDOf(t trace.TraceID) string {
b := t[:]
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}

func uuidBytes(s string) ([]byte, bool) {
h := strings.ReplaceAll(s, "-", "")
if len(h) != 32 {
return nil, false
}
raw, err := hex.DecodeString(h)
if err != nil {
return nil, false
}
return raw, true
}

// spanIDOf derives a deterministic, non-zero SpanID from the trace_id so the remote parent
// SpanContext is valid (a span needs a valid parent to inherit a specific trace).
func spanIDOf(traceID string) trace.SpanID {
sum := sha256.Sum256([]byte("babelqueue-span:" + traceID))
var s trace.SpanID
copy(s[:], sum[:8])
if !s.IsValid() {
s[7] = 1
}
return s
}

// parentOf returns ctx carrying a remote parent SpanContext in the trace_id-derived trace,
// so a span started from it lands in that trace (cross-hop correlation).
func parentOf(ctx context.Context, traceID string) context.Context {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: TraceIDOf(traceID),
SpanID: spanIDOf(traceID),
TraceFlags: trace.FlagsSampled,
Remote: true,
})
return trace.ContextWithRemoteSpanContext(ctx, sc)
}

// WrapHandler returns handler decorated to emit a CONSUMER span per message, in the OTel
// trace derived from the envelope's trace_id, recording the handler's error/status. Register
// it like any handler: app.Handle(urn, otel.WrapHandler(tracer, handler)).
func WrapHandler(tracer trace.Tracer, handler babelqueue.Handler) babelqueue.Handler {
return func(ctx context.Context, env babelqueue.Envelope) error {
ctx = parentOf(ctx, env.TraceID)
ctx, span := tracer.Start(ctx, "process "+env.URN(),
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(consumeAttrs(env)...),
)
defer span.End()

err := handler(ctx, env)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
return err
}
}

// Publish starts a PRODUCER span "publish <urn>", carries the active trace's TraceID into the
// message's trace_id, and publishes via app — so the downstream consumer recovers the same
// trace. It otherwise behaves like [babelqueue.App.Publish], returning the message id.
func Publish(
ctx context.Context,
tracer trace.Tracer,
app *babelqueue.App,
urn string,
data map[string]any,
opts ...babelqueue.Option,
) (string, error) {
ctx, span := tracer.Start(ctx, "publish "+urn,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
attribute.String("messaging.system", system),
attribute.String("messaging.operation", "publish"),
attribute.String("messaging.destination.name", urn),
),
)
defer span.End()

traceID := UUIDOf(span.SpanContext().TraceID())
id, err := app.Publish(ctx, urn, data, append(opts, babelqueue.WithTraceID(traceID))...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return "", err
}
span.SetAttributes(attribute.String("messaging.message.id", id))
return id, nil
}

func consumeAttrs(env babelqueue.Envelope) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("messaging.system", system),
attribute.String("messaging.operation", "process"),
attribute.String("messaging.destination.name", env.Meta.Queue),
attribute.String("messaging.message.id", env.Meta.ID),
attribute.String("messaging.message.conversation_id", env.TraceID),
attribute.Int("messaging.babelqueue.attempts", env.Attempts),
}
}
168 changes: 168 additions & 0 deletions otel/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package otel

import (
"context"
"errors"
"testing"
"time"

babelqueue "github.com/babelqueue/babelqueue-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)

// failTransport is a Transport whose Publish always errors, to exercise the producer span's
// error path.
type failTransport struct{}

func (failTransport) Publish(context.Context, string, string) error {
return errors.New("transport publish failed")
}

func (failTransport) Pop(context.Context, string, time.Duration) (*babelqueue.ReceivedMessage, error) {
return nil, nil
}

func (failTransport) Ack(context.Context, *babelqueue.ReceivedMessage) error { return nil }

func recorder(t *testing.T) (trace.Tracer, *tracetest.SpanRecorder) {
t.Helper()
sr := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
return tp.Tracer("test"), sr
}

func TestTraceIDMappingRoundTrip(t *testing.T) {
uuid := "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b"
tid := TraceIDOf(uuid)
if !tid.IsValid() {
t.Fatal("derived TraceID is invalid")
}
if got := UUIDOf(tid); got != uuid {
t.Fatalf("round-trip: got %q, want %q", got, uuid)
}
a, b := TraceIDOf("not-a-uuid"), TraceIDOf("not-a-uuid")
if a != b || !a.IsValid() {
t.Fatal("a non-uuid trace_id must map deterministically to a valid TraceID")
}
if a == tid {
t.Fatal("different inputs should map to different traces")
}
}

func TestWrapHandler_SpanInTraceIDTraceWithAttrs(t *testing.T) {
tracer, sr := recorder(t)
called := false
h := WrapHandler(tracer, func(ctx context.Context, _ babelqueue.Envelope) error {
called = true
if !trace.SpanContextFromContext(ctx).IsValid() {
t.Error("handler ctx carries no active span")
}
return nil
})
env, _ := babelqueue.Make("urn:babel:orders:created", map[string]any{"order_id": 1})

if err := h(context.Background(), env); err != nil {
t.Fatal(err)
}
if !called {
t.Fatal("handler not called")
}

spans := sr.Ended()
if len(spans) != 1 {
t.Fatalf("want 1 span, got %d", len(spans))
}
s := spans[0]
if s.Name() != "process "+env.URN() {
t.Errorf("name = %q", s.Name())
}
if s.SpanKind() != trace.SpanKindConsumer {
t.Errorf("kind = %v, want consumer", s.SpanKind())
}
if s.SpanContext().TraceID() != TraceIDOf(env.TraceID) {
t.Error("the span is not in the trace_id-derived trace")
}
if !hasStringAttr(s.Attributes(), "messaging.message.conversation_id", env.TraceID) {
t.Error("missing conversation_id == trace_id attribute")
}
}

func TestWrapHandler_RecordsError(t *testing.T) {
tracer, sr := recorder(t)
boom := errors.New("boom")
h := WrapHandler(tracer, func(context.Context, babelqueue.Envelope) error { return boom })
env, _ := babelqueue.Make("urn:babel:orders:created", nil)

if err := h(context.Background(), env); !errors.Is(err, boom) {
t.Fatalf("err = %v", err)
}
s := sr.Ended()[0]
if s.Status().Code != codes.Error {
t.Errorf("status = %v, want Error", s.Status().Code)
}
if len(s.Events()) == 0 {
t.Error("expected a recorded error event")
}
}

func TestPublish_StampsTraceIDFromSpan(t *testing.T) {
tracer, sr := recorder(t)
transport := babelqueue.NewInMemoryTransport()
app := babelqueue.NewApp(transport)

id, err := Publish(context.Background(), tracer, app, "urn:babel:orders:created", map[string]any{"order_id": 7})
if err != nil {
t.Fatal(err)
}
if id == "" {
t.Fatal("empty message id")
}

spans := sr.Ended()
if len(spans) != 1 || spans[0].SpanKind() != trace.SpanKindProducer {
t.Fatalf("want 1 producer span, got %+v", spans)
}

msg, err := transport.Pop(context.Background(), "default", 0)
if err != nil || msg == nil {
t.Fatalf("pop: msg=%v err=%v", msg, err)
}
env, err := babelqueue.Decode([]byte(msg.Body))
if err != nil {
t.Fatal(err)
}
// the published message's trace_id encodes the producer span's trace, so a consumer
// recovers the same trace.
if env.TraceID != UUIDOf(spans[0].SpanContext().TraceID()) {
t.Errorf("message trace_id %q does not encode the producer span's trace", env.TraceID)
}
if TraceIDOf(env.TraceID) != spans[0].SpanContext().TraceID() {
t.Error("a consumer would not recover the producer's trace from this trace_id")
}
}

func TestPublish_RecordsError(t *testing.T) {
tracer, sr := recorder(t)
app := babelqueue.NewApp(failTransport{})

if _, err := Publish(context.Background(), tracer, app, "urn:babel:orders:created", nil); err == nil {
t.Fatal("expected a publish error")
}
s := sr.Ended()[0]
if s.Status().Code != codes.Error {
t.Errorf("status = %v, want Error", s.Status().Code)
}
}

func hasStringAttr(attrs []attribute.KeyValue, key, val string) bool {
for _, a := range attrs {
if string(a.Key) == key && a.Value.AsString() == val {
return true
}
}
return false
}
Loading