From d3e2a0e5cb991d4e8b54986dbc9e22e0605f724a Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Thu, 14 May 2026 17:55:36 -0400 Subject: [PATCH 1/8] pkg/beholder: add batch emitter service and bump chipingress batching dependency --- go.mod | 2 +- go.sum | 4 +- pkg/beholder/batch_emitter_service.go | 204 +++++++ pkg/beholder/batch_emitter_service_test.go | 611 +++++++++++++++++++++ pkg/beholder/beholdertest/beholder.go | 35 +- pkg/beholder/chip_ingress_emitter.go | 39 +- pkg/beholder/chip_ingress_emitter_test.go | 22 +- pkg/beholder/client.go | 86 ++- pkg/beholder/client_test.go | 94 +++- pkg/beholder/config.go | 34 +- pkg/beholder/config_test.go | 3 +- pkg/beholder/dual_source_emitter.go | 38 +- pkg/beholder/dual_source_emitter_test.go | 58 +- pkg/beholder/global_test.go | 9 +- pkg/beholder/httpclient.go | 29 +- pkg/beholder/noop.go | 50 +- pkg/beholder/noop_test.go | 4 +- pkg/loop/config.go | 13 +- pkg/loop/config_test.go | 11 +- pkg/loop/plugin_relayer_emitter_test.go | 2 +- pkg/loop/server.go | 44 +- pkg/services/service.go | 2 +- 22 files changed, 1258 insertions(+), 136 deletions(-) create mode 100644 pkg/beholder/batch_emitter_service.go create mode 100644 pkg/beholder/batch_emitter_service_test.go diff --git a/go.mod b/go.mod index a400ba39dc..99856d2922 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.89 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514213647-fddcb86b6482 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 50114fe85a..929885cee3 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514213647-fddcb86b6482 h1:CpuTkw3SooswiRubVfKl5AeLASN34QwnU8UuYMMcqbE= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514213647-fddcb86b6482/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 h1:9vjqB+iNqwyazVoVjR1rozHXTeRYyeggavt3Q4sbNrg= diff --git a/pkg/beholder/batch_emitter_service.go b/pkg/beholder/batch_emitter_service.go new file mode 100644 index 0000000000..b23dcaebe8 --- /dev/null +++ b/pkg/beholder/batch_emitter_service.go @@ -0,0 +1,204 @@ +package beholder + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch. +// It implements the Emitter interface. +type ChipIngressBatchEmitterService struct { + services.Service + eng *services.Engine + + batchClient *batch.Client + + metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption + metrics batchEmitterMetrics +} + +type batchEmitterMetrics struct { + eventsSent otelmetric.Int64Counter + eventsDropped otelmetric.Int64Counter +} + +// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client. +func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) { + if client == nil { + return nil, fmt.Errorf("chip ingress client is nil") + } + + defaults := DefaultConfig() + bufferSize := int(cfg.ChipIngressBufferSize) + if bufferSize == 0 { + bufferSize = int(defaults.ChipIngressBufferSize) + } + maxBatchSize := int(cfg.ChipIngressMaxBatchSize) + if maxBatchSize == 0 { + maxBatchSize = int(defaults.ChipIngressMaxBatchSize) + } + maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends + if maxConcurrentSends == 0 { + maxConcurrentSends = defaults.ChipIngressMaxConcurrentSends + } + sendInterval := cfg.ChipIngressSendInterval + if sendInterval == 0 { + sendInterval = defaults.ChipIngressSendInterval + } + sendTimeout := cfg.ChipIngressSendTimeout + if sendTimeout == 0 { + sendTimeout = defaults.ChipIngressSendTimeout + } + drainTimeout := cfg.ChipIngressDrainTimeout + if drainTimeout == 0 { + drainTimeout = defaults.ChipIngressDrainTimeout + } + + meter := otel.Meter("beholder/chip_ingress_batch_emitter") + metrics, err := newBatchEmitterMetrics(meter) + if err != nil { + return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err) + } + + batchClient, err := batch.NewBatchClient(client, + batch.WithBatchSize(maxBatchSize), + batch.WithMessageBuffer(bufferSize), + batch.WithBatchInterval(sendInterval), + batch.WithMaxPublishTimeout(sendTimeout), + batch.WithShutdownTimeout(drainTimeout), + batch.WithMaxConcurrentSends(maxConcurrentSends), + batch.WithEventClone(false), + ) + if err != nil { + return nil, fmt.Errorf("failed to create batch client: %w", err) + } + + e := &ChipIngressBatchEmitterService{ + batchClient: batchClient, + metrics: metrics, + } + + e.Service, e.eng = services.Config{ + Name: "ChipIngressBatchEmitterService", + Start: e.start, + Close: e.stop, + }.NewServiceEngine(lggr) + + return e, nil +} + +func (e *ChipIngressBatchEmitterService) start(ctx context.Context) error { + e.batchClient.Start(ctx) + return nil +} + +func (e *ChipIngressBatchEmitterService) stop() error { + e.batchClient.Stop() + return nil +} + +// Emit queues an event for batched delivery without blocking. +// Returns an error if the emitter is stopped or the context is cancelled. +// If the buffer is full, the event is silently dropped. +func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return e.emitInternal(ctx, body, nil, attrKVs...) +} + +// EmitWithCallback works like Emit but invokes callback once the event's fate +// is determined (nil on success, non-nil on failure or buffer-full drop). +// +// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked. +// If it returns nil, the callback is guaranteed to fire exactly once. +func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.emitInternal(ctx, body, callback, attrKVs...) +} + +func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.eng.IfStarted(func() error { + domain, entity, err := ExtractSourceAndType(attrKVs...) + if err != nil { + return err + } + + attributes := newAttributes(attrKVs...) + + event, err := chipingress.NewEvent(domain, entity, body, attributes) + if err != nil { + return fmt.Errorf("failed to create CloudEvent: %w", err) + } + eventPb, err := chipingress.EventToProto(event) + if err != nil { + return fmt.Errorf("failed to convert to proto: %w", err) + } + + if err := ctx.Err(); err != nil { + return err + } + + metricAttrs := e.metricAttrsFor(domain, entity) + metricsCtx := context.Background() + + queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { + if sendErr != nil { + e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + } else { + e.metrics.eventsSent.Add(metricsCtx, 1, metricAttrs) + } + if callback != nil { + callback(sendErr) + } + }) + if queueErr != nil { + e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + if callback != nil { + callback(queueErr) + } + } + + return nil + }) +} + +func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption { + key := domain + "\x00" + entity + if v, ok := e.metricAttrsCache.Load(key); ok { + return v.(otelmetric.MeasurementOption) + } + attrs := otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("domain", domain), + attribute.String("entity", entity), + )) + v, _ := e.metricAttrsCache.LoadOrStore(key, attrs) + return v.(otelmetric.MeasurementOption) +} + +func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) { + eventsSent, err := meter.Int64Counter("chip_ingress.events_sent", + otelmetric.WithDescription("Total events successfully sent via PublishBatch"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped", + otelmetric.WithDescription("Total events dropped (buffer full or send failure)"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + return batchEmitterMetrics{ + eventsSent: eventsSent, + eventsDropped: eventsDropped, + }, nil +} diff --git a/pkg/beholder/batch_emitter_service_test.go b/pkg/beholder/batch_emitter_service_test.go new file mode 100644 index 0000000000..d685552f44 --- /dev/null +++ b/pkg/beholder/batch_emitter_service_test.go @@ -0,0 +1,611 @@ +package beholder_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func newTestConfig() beholder.Config { + return beholder.Config{ + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressMaxConcurrentSends: 3, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } +} + +func newTestLogger(t *testing.T) logger.Logger { + t.Helper() + lggr, err := logger.New() + require.NoError(t, err) + t.Cleanup(func() { _ = lggr.Sync() }) + return lggr +} + +func TestNewChipIngressBatchEmitterService(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + assert.NotNil(t, emitter) + }) + + t.Run("returns error when client is nil", func(t *testing.T) { + emitter, err := beholder.NewChipIngressBatchEmitterService(nil, newTestConfig(), newTestLogger(t)) + assert.Error(t, err) + assert.Nil(t, emitter) + }) +} + +func TestChipIngressBatchEmitterService_Emit(t *testing.T) { + t.Run("returns error when domain/entity missing", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("test"), "bad_key", "bad_value") + assert.Error(t, err) + }) + + t.Run("events are batched and sent via PublishBatch", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + var receivedBatches []*chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + batch := args.Get(1).(*chipingress.CloudEventBatch) + receivedBatches = append(receivedBatches, batch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(receivedBatches) > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + + totalEvents := 0 + for _, batch := range receivedBatches { + totalEvents += len(batch.Events) + } + assert.Equal(t, 3, totalEvents) + }) +} + +func TestChipIngressBatchEmitterService_CloudEventFormat(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("test-payload"), + beholder.AttrKeyDomain, "my-domain", + beholder.AttrKeyEntity, "my-entity", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) + + event := receivedBatch.Events[0] + assert.Equal(t, "my-domain", event.Source) + assert.Equal(t, "my-entity", event.Type) + assert.NotEmpty(t, event.Id) +} + +func TestChipIngressBatchEmitterService_PublishBatchError(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + callCount := 0 + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + mu.Lock() + defer mu.Unlock() + callCount++ + }). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return callCount > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) +} + +func TestChipIngressBatchEmitterService_ContextCancellation(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 1 + cfg.ChipIngressSendInterval = 10 * time.Second + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + err = emitter.Emit(ctx, []byte("should-fail"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestChipIngressBatchEmitterService_DefaultConfig(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, beholder.Config{}, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 3*time.Second, 50*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) +} + +func TestChipIngressBatchEmitterService_EmitAfterClose(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + require.NoError(t, emitter.Close()) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.Error(t, err) +} + +func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) { + t.Run("callback receives nil on success", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.NoError(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error on PublishBatch failure", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.Error(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error when buffer is full", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + sendBlocked := make(chan struct{}) + firstCallSignal := make(chan struct{}, 1) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + select { + case firstCallSignal <- struct{}{}: + default: + } + <-sendBlocked + }). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 2 + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressMaxConcurrentSends = 1 + cfg.ChipIngressSendInterval = 50 * time.Millisecond + cfg.ChipIngressDrainTimeout = 200 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer close(sendBlocked) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + <-firstCallSignal + time.Sleep(100 * time.Millisecond) + + for i := 0; i < 10; i++ { + _ = emitter.Emit(t.Context(), []byte("filler"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + } + + dropped := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("overflow"), func(sendErr error) { + dropped <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + select { + case dropErr := <-dropped: + assert.Error(t, dropErr) + case <-time.After(time.Second): + t.Fatal("callback was not invoked for dropped event") + } + }) + + t.Run("nil callback behaves like Emit", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.EmitWithCallback(t.Context(), []byte("body"), nil, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + require.NoError(t, emitter.Close()) + }) +} + +func TestChipIngressBatchEmitterService_Metrics(t *testing.T) { + t.Run("records events_sent on successful publish", func(t *testing.T) { + reader, restore := useEmitterTestMeterProvider(t) + defer restore() + + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + done := make(chan struct{}) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + cfg := newTestConfig() + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressSendInterval = time.Second + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "MetricEvent", + ) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for publish") + } + require.NoError(t, emitter.Close()) + + rm := collectEmitterMetrics(t, reader) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_sent") + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricEvent") + assert.GreaterOrEqual(t, dp.Value, int64(1)) + }) + + t.Run("records events_dropped on publish error", func(t *testing.T) { + reader, restore := useEmitterTestMeterProvider(t) + defer restore() + + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + done := make(chan struct{}) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + cfg := newTestConfig() + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressSendInterval = time.Second + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "MetricDropEvent", + ) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for publish") + } + require.NoError(t, emitter.Close()) + + rm := collectEmitterMetrics(t, reader) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped") + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricDropEvent") + assert.GreaterOrEqual(t, dp.Value, int64(1)) + }) +} + +func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) { + cfg := beholder.Config{ + ChipIngressBufferSize: uint(b.N + 10), + ChipIngressMaxBatchSize: uint(b.N + 1), + ChipIngressMaxConcurrentSends: 1, + ChipIngressSendInterval: time.Hour, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } + emitter, err := beholder.NewChipIngressBatchEmitterService(&chipingress.NoopClient{}, cfg, logger.Test(b)) + if err != nil { + b.Fatal(err) + } + if err := emitter.Start(context.Background()); err != nil { + b.Fatal(err) + } + defer func() { _ = emitter.Close() }() + + payload := []byte("benchmark-payload") + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := emitter.Emit(context.Background(), payload, + beholder.AttrKeyDomain, "bench", + beholder.AttrKeyEntity, "BenchmarkEvent", + ); err != nil { + b.Fatal(err) + } + } +} + +func useEmitterTestMeterProvider(t *testing.T) (*sdkmetric.ManualReader, func()) { + t.Helper() + prev := otel.GetMeterProvider() + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(provider) + return reader, func() { + require.NoError(t, provider.Shutdown(t.Context())) + otel.SetMeterProvider(prev) + } +} + +func collectEmitterMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + return rm +} + +func mustEmitterMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + for _, metric := range sm.Metrics { + if metric.Name == name { + return metric + } + } + } + t.Fatalf("metric %q not found", name) + return metricdata.Metrics{} +} + +func mustEmitterInt64SumPoint(t *testing.T, sum metricdata.Sum[int64], k1, v1, k2, v2 string) metricdata.DataPoint[int64] { + t.Helper() + for _, dp := range sum.DataPoints { + if hasEmitterStringAttr(dp.Attributes, k1, v1) && hasEmitterStringAttr(dp.Attributes, k2, v2) { + return dp + } + } + t.Fatalf("sum datapoint not found for attrs %s=%s,%s=%s", k1, v1, k2, v2) + return metricdata.DataPoint[int64]{} +} + +func hasEmitterStringAttr(set attribute.Set, key, want string) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return kv.Value.AsString() == want + } + } + return false +} diff --git a/pkg/beholder/beholdertest/beholder.go b/pkg/beholder/beholdertest/beholder.go index a06acf325a..6b590db571 100644 --- a/pkg/beholder/beholdertest/beholder.go +++ b/pkg/beholder/beholdertest/beholder.go @@ -8,13 +8,11 @@ import ( "testing" "github.com/stretchr/testify/require" - otellognoop "go.opentelemetry.io/otel/log/noop" - otelmetricnoop "go.opentelemetry.io/otel/metric/noop" - oteltracenoop "go.opentelemetry.io/otel/trace/noop" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) const ( @@ -120,39 +118,16 @@ messageLoop: func NewObserver(t *testing.T) Observer { t.Helper() - cfg := beholder.DefaultConfig() - - // Logger - loggerProvider := otellognoop.NewLoggerProvider() - logger := loggerProvider.Logger(packageNameBeholder) - - // Tracer - tracerProvider := oteltracenoop.NewTracerProvider() - tracer := tracerProvider.Tracer(packageNameBeholder) - - // Meter - meterProvider := otelmetricnoop.NewMeterProvider() - meter := meterProvider.Meter(packageNameBeholder) - - // MessageEmitter messageEmitter := &assertMessageEmitter{t: t} - client := &beholder.Client{ - Config: cfg, - Logger: logger, - Tracer: tracer, - Meter: meter, - Emitter: messageEmitter, - LoggerProvider: loggerProvider, - TracerProvider: tracerProvider, - MeterProvider: meterProvider, - MessageLoggerProvider: loggerProvider, - OnClose: func() error { return nil }, - } + client := beholder.NewNoopClient(logger.Test(t)) + client.Emitter = messageEmitter + require.NoError(t, client.Start(t.Context())) //reset NewObserver state after the test prevClient := beholder.GetClient() t.Cleanup(func() { + require.NoError(t, client.Close()) beholder.SetClient(prevClient) t.Setenv(packageNameBeholder, packageNameBeholder) }) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 62ceae0186..d13bd8320f 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -5,26 +5,46 @@ import ( "errors" "fmt" "maps" + "sync/atomic" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) +// ChipIngressEmitter wraps a synchronous chipingress.Client.Publish call +// in a fire-and-forget goroutine so callers are never blocked. type ChipIngressEmitter struct { client chipingress.Client + log logger.Logger + stopCh services.StopChan + wg services.WaitGroup + closed atomic.Bool } -func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) { +func NewChipIngressEmitter(client chipingress.Client, lggr logger.Logger) (Emitter, error) { if client == nil { return nil, errors.New("chip ingress client is nil") } - return &ChipIngressEmitter{client: client}, nil + return &ChipIngressEmitter{ + client: client, + log: lggr, + stopCh: make(services.StopChan), + }, nil } func (c *ChipIngressEmitter) Close() error { + if wasClosed := c.closed.Swap(true); wasClosed { + return errors.New("already closed") + } + close(c.stopCh) + c.wg.Wait() return c.client.Close() } +// Emit fires a synchronous gRPC Publish call in a background goroutine +// so the caller is never blocked. func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...) if err != nil { @@ -41,10 +61,21 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a return fmt.Errorf("failed to convert event to proto: %w", err) } - _, err = c.client.Publish(ctx, eventPb) - if err != nil { + if err := c.wg.TryAdd(1); err != nil { return err } + // Legacy ChipIngressEmitter.Emit is a synchronous gRPC call; + // fire-and-forget via goroutine to avoid blocking the caller. + go func(ctx context.Context) { + defer c.wg.Done() + var cancel context.CancelFunc + ctx, cancel = c.stopCh.Ctx(ctx) + defer cancel() + + if _, err := c.client.Publish(ctx, eventPb); err != nil { + c.log.Infof("failed to emit to chip ingress: %v", err) + } + }(context.WithoutCancel(ctx)) return nil } diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index c619625960..1ad998adfc 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -10,18 +10,19 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNewChipIngressEmitter(t *testing.T) { t.Run("happy path", func(t *testing.T) { clientMock := mocks.NewClient(t) - emitter, err := beholder.NewChipIngressEmitter(clientMock) + emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) require.NoError(t, err) assert.NotNil(t, emitter) }) t.Run("returns error when client is nil", func(t *testing.T) { - emitter, err := beholder.NewChipIngressEmitter(nil) + emitter, err := beholder.NewChipIngressEmitter(nil, logger.Test(t)) assert.Error(t, err) assert.Nil(t, emitter) }) @@ -44,37 +45,44 @@ func TestChipIngressEmit(t *testing.T) { clientMock. On("Publish", mock.Anything, mock.Anything). Return(nil, nil) + clientMock.On("Close").Return(nil) - emitter, err := beholder.NewChipIngressEmitter(clientMock) + emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity, attributes) require.NoError(t, err) + // Close drains in-flight goroutines so mock expectations are met. + require.NoError(t, emitter.Close()) clientMock.AssertExpectations(t) }) t.Run("returns error when ExtractSourceAndType fails", func(t *testing.T) { - emitter, err := beholder.NewChipIngressEmitter(mocks.NewClient(t)) + emitter, err := beholder.NewChipIngressEmitter(mocks.NewClient(t), logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), body, "bad_key", domain) assert.Error(t, err) }) - t.Run("returns error when Publish fails", func(t *testing.T) { + t.Run("logs error when Publish fails", func(t *testing.T) { clientMock := mocks.NewClient(t) clientMock. On("Publish", mock.Anything, mock.Anything). Return(nil, assert.AnError) + clientMock.On("Close").Return(nil) - emitter, err := beholder.NewChipIngressEmitter(clientMock) + emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) require.NoError(t, err) + // Emit returns nil because the error is logged asynchronously. err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity) - require.Error(t, err) + require.NoError(t, err) + // Close drains in-flight goroutines so mock expectations are met. + require.NoError(t, emitter.Close()) clientMock.AssertExpectations(t) }) } diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 12592b7c2a..dc40c9ed70 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -26,6 +26,8 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) const defaultGRPCCompressor = "gzip" @@ -37,6 +39,9 @@ type Emitter interface { } type Client struct { + services.Service + eng *services.Engine + Config Config // Logger Logger otellog.Logger @@ -62,6 +67,21 @@ type Client struct { OnClose func() error } +// initService wires up the services.Service lifecycle for the Client. +// Must be called exactly once after populating the Client fields. +func (c *Client) initService(lggr pkglogger.Logger, batchSvc *ChipIngressBatchEmitterService) { + c.Service, c.eng = services.Config{ + Name: "BeholderClient", + Close: c.close, + NewSubServices: func(l pkglogger.Logger) []services.Service { + if batchSvc == nil { + return nil + } + return []services.Service{batchSvc} + }, + }.NewServiceEngine(lggr) +} + // NewClient creates a new Client with initialized OpenTelemetry components // To handle OpenTelemetry errors use [otel.SetErrorHandler](https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler) func NewClient(cfg Config) (*Client, error) { @@ -186,8 +206,12 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // This will eventually be removed in favor of chip-ingress emitter // and logs will be sent via OTLP using the regular Logger instead of calling Emit emitter := NewMessageEmitter(messageLogger) - + var batchEmitterService *ChipIngressBatchEmitterService var chipIngressClient chipingress.Client = &chipingress.NoopClient{} + lggr := cfg.ChipIngressLogger + if lggr == nil { + lggr = pkglogger.Nop() + } // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" { @@ -221,40 +245,74 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro return nil, err } - chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) - if err != nil { - return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + var chipIngressEmitter Emitter + if cfg.ChipIngressBatchEmitterEnabled { + if cfg.ChipIngressLogger == nil { + return nil, fmt.Errorf("ChipIngressLogger is required when ChipIngressBatchEmitterEnabled is true") + } + batchEmitterService, err = NewChipIngressBatchEmitterService(chipIngressClient, cfg, cfg.ChipIngressLogger) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err) + } + // Batch emitter lifecycle is owned by Client's service-engine sub-service wiring. + // Wrap it with noCloseEmitter so DualSourceEmitter.Close() does not attempt to + // close it directly; the service engine closes batchEmitterService in sub-service + // teardown after parent close hook completes. + chipIngressEmitter = noCloseEmitter{Emitter: batchEmitterService} + } else { + chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient, cfg.ChipIngressLogger) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + } } - emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter) + emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter, lggr) if err != nil { return nil, fmt.Errorf("failed to create dual source emitter: %w", err) } } onClose := func() (err error) { - for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil + c := &Client{ + Config: cfg, + Logger: logger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: chipIngressClient, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: messageLoggerProvider, + lazySigner: signer, + OnClose: onClose, + } + c.initService(lggr, batchEmitterService) + return c, nil } -// Closes all providers, flushes all data and stops all background processes -func (c Client) Close() (err error) { - if c.Chip != nil { - err = errors.Join(err, c.Chip.Close()) - } +// close is the lifecycle Close hook: OTel/message shutdown and CHIP close. +func (c *Client) close() (err error) { if c.Emitter != nil { err = errors.Join(err, c.Emitter.Close()) } if c.OnClose != nil { err = errors.Join(err, c.OnClose()) } - return + return err } +// noCloseEmitter delegates Emit to the wrapped emitter but makes Close a no-op. +// Use this when lifecycle ownership is external (e.g. service engine sub-service). +type noCloseEmitter struct{ Emitter } + +func (n noCloseEmitter) Close() error { return nil } + // Returns a new Client with the same configuration but with a different package name // Deprecated: Use ForName func (c Client) ForPackage(name string) Client { @@ -614,6 +672,8 @@ func newLoggerProviderOpts(cfg Config, baseResource *sdkresource.Resource, share // newMessageLoggerProviderOpts creates logger provider options for custom message emitter func newMessageLoggerProviderOpts(cfg Config, baseResource *sdkresource.Resource, sharedLogExporter sdklog.Exporter) ([]sdklog.LoggerProviderOption, error) { var messageLogProcessor sdklog.Processor + // EmitterBatchProcessor=true uses async batching for custom-message logs; + // false uses a simple processor that exports each record immediately. if cfg.EmitterBatchProcessor { batchProcessorOpts := []sdklog.BatchProcessorOption{} if cfg.EmitterExportTimeout > 0 { diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index 74c75583ae..a64e4a3751 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress" chipmocks "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) type MockExporter struct { @@ -191,6 +192,7 @@ func TestClient_Close(t *testing.T) { client, err := beholder.NewStdoutClient() require.NoError(t, err) + require.NoError(t, client.Start(t.Context())) err = client.Close() require.NoError(t, err) @@ -220,7 +222,8 @@ func TestClient_ForPackage(t *testing.T) { // Meter counter, _ := clientForTest.Meter.Int64Counter("testMetric") counter.Add(t.Context(), 1) - clientForTest.Close() + require.NoError(t, client.Start(t.Context())) + require.NoError(t, clientForTest.Close()) assert.Contains(t, b.String(), `"Name":"TestClient_ForPackage"`) assert.Contains(t, b.String(), "testMetric") } @@ -926,3 +929,92 @@ func TestChipIngressClient(t *testing.T) { assert.NoError(t, err) }) } + +// TestClient_batchEmitterService groups lifecycle and construction tests for the +// ChipIngress batch emitter sub-service embedded in the beholder Client. +func TestClient_batchEmitterService(t *testing.T) { + newBatchClient := func(t *testing.T) *beholder.Client { + t.Helper() + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + // Use simple exporter in this lifecycle test to avoid batch flush/shutdown delays. + EmitterBatchProcessor: false, + LogBatchProcessor: false, + LogRetryConfig: &beholder.RetryConfig{InitialInterval: time.Millisecond, MaxInterval: time.Millisecond, MaxElapsedTime: 0}, + TraceRetryConfig: &beholder.RetryConfig{InitialInterval: time.Millisecond, MaxInterval: time.Millisecond, MaxElapsedTime: 0}, + MetricRetryConfig: &beholder.RetryConfig{InitialInterval: time.Millisecond, MaxInterval: time.Millisecond, MaxElapsedTime: 0}, + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: newTestLogger(t), + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 1 * time.Second, + ChipIngressDrainTimeout: 1 * time.Second, + }) + require.NoError(t, err) + return client + } + + // startsWithClient: batch emitter sub-service starts and stops with the Client lifecycle. + t.Run("starts with client", func(t *testing.T) { + client := newBatchClient(t) + + // Before Start: service is unready and incomplete emit fails validation. + assert.ErrorContains(t, client.Service.Ready(), "not started") + err := client.Emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + // AttrKeyDataSchema intentionally omitted — triggers required-field validation error. + ) + assert.ErrorContains(t, err, "BeholderDataSchema") + + require.NoError(t, client.Service.Start(t.Context())) + assert.NoError(t, client.Service.Ready()) + _ = client.Close() + }) + + // emitSucceedsBeforeStart: a fully-valid Emit returns no error before Start. + // The OTLP path is always active; the batch emitter's service-not-started error is + // swallowed by DualSourceEmitter and only logged. + t.Run("emit succeeds before start", func(t *testing.T) { + client := newBatchClient(t) + + assert.ErrorContains(t, client.Service.Ready(), "not started") + + err := client.Emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + beholder.AttrKeyDataSchema, "test-schema", + ) + assert.NoError(t, err, "emit must not fail when service is not yet started") + + require.NoError(t, client.Service.Start(t.Context())) + assert.NoError(t, client.Service.Ready()) + _ = client.Close() + }) + + // closeWithoutStart: strict service semantics require Start before Close. + t.Run("close without start", func(t *testing.T) { + client := newBatchClient(t) + err := client.Close() + assert.Error(t, err) + assert.ErrorIs(t, err, services.ErrCannotStopUnstarted) + }) + + // requiresLogger: constructing with batch emitter enabled but no logger returns an error. + t.Run("requires logger", func(t *testing.T) { + _, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: nil, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "ChipIngressLogger") + }) +} diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index f3e4561b68..5e5167f0f1 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -7,6 +7,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) type Config struct { @@ -22,7 +24,9 @@ type Config struct { EmitterExportInterval time.Duration EmitterExportMaxBatchSize int EmitterMaxQueueSize int - EmitterBatchProcessor bool // Enabled by default. Disable only for testing. + // EmitterBatchProcessor controls custom-message export mode: + // true = batched async export; false = immediate per-record export. + EmitterBatchProcessor bool // OTel Trace TraceSampleRatio float64 @@ -44,6 +48,16 @@ type Config struct { ChipIngressEmitterGRPCEndpoint string ChipIngressInsecureConnection bool // Disables TLS for Chip Ingress Emitter + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled bool // When true, use batch emitter; when false (default), use legacy per-event emitter + ChipIngressBufferSize uint // Message buffer size (default 1000) + ChipIngressMaxBatchSize uint // Max events per PublishBatch call (default 500) + ChipIngressSendInterval time.Duration // Flush interval (default 100ms) + ChipIngressSendTimeout time.Duration // Timeout per PublishBatch call (default 3s) + ChipIngressDrainTimeout time.Duration // Max time to flush remaining events on shutdown (default 10s) + ChipIngressMaxConcurrentSends int // Max concurrent PublishBatch calls (default 10) + ChipIngressLogger logger.Logger // Required when ChipIngressBatchEmitterEnabled is true + // OTel Log LogExportTimeout time.Duration LogExportInterval time.Duration @@ -91,7 +105,8 @@ var defaultRetryConfig = RetryConfig{ } const ( - defaultPackageName = "beholder" + defaultPackageName = "beholder" + defaultMaxConcurrentSends = 10 ) var defaultOtelAttributes = []attribute.KeyValue{ @@ -110,6 +125,7 @@ func DefaultConfig() Config { EmitterExportMaxBatchSize: 512, EmitterExportInterval: 1 * time.Second, EmitterMaxQueueSize: 2048, + // Keep batched export enabled by default for throughput. EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: defaultRetryConfig.Copy(), @@ -131,8 +147,16 @@ func DefaultConfig() Config { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default - LogLevel: zapcore.InfoLevel, - LogCompressor: "gzip", + LogLevel: zapcore.InfoLevel, + LogCompressor: "gzip", + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled: false, + ChipIngressBufferSize: 1000, + ChipIngressMaxBatchSize: 500, + ChipIngressSendInterval: 100 * time.Millisecond, + ChipIngressSendTimeout: 3 * time.Second, + ChipIngressDrainTimeout: 10 * time.Second, + ChipIngressMaxConcurrentSends: defaultMaxConcurrentSends, // Auth (defaults to static auth mode with TTL=0) AuthHeadersTTL: 0, } @@ -141,6 +165,7 @@ func DefaultConfig() Config { func TestDefaultConfig() Config { config := DefaultConfig() // Should be only disabled for testing + // Use simple (non-batched) exporter in tests for faster, deterministic teardown. config.EmitterBatchProcessor = false config.LogBatchProcessor = false // Retries are disabled for testing @@ -155,6 +180,7 @@ func TestDefaultConfig() Config { func TestDefaultConfigHTTPClient() Config { config := DefaultConfig() // Should be only disabled for testing + // Use simple (non-batched) exporter in tests for faster, deterministic teardown. config.EmitterBatchProcessor = false config.LogBatchProcessor = false config.OtelExporterGRPCEndpoint = "" diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index ae156db9c9..318cc94a91 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -30,6 +30,7 @@ func ExampleConfig() { EmitterExportMaxBatchSize: 512, EmitterExportInterval: 1 * time.Second, EmitterMaxQueueSize: 2048, + // true uses batched async export for custom messages. EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: nil, @@ -67,6 +68,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false ChipIngressBatchEmitterEnabled:false ChipIngressBufferSize:0 ChipIngressMaxBatchSize:0 ChipIngressSendInterval:0s ChipIngressSendTimeout:0s ChipIngressDrainTimeout:0s ChipIngressMaxConcurrentSends:0 ChipIngressLogger: LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index a299999714..4b2edfbc19 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -3,11 +3,8 @@ package beholder import ( "context" "errors" - "fmt" - "sync/atomic" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/services" ) // DualSourceEmitter emits both to chip ingress and to the otel collector @@ -18,12 +15,9 @@ type DualSourceEmitter struct { chipIngressEmitter Emitter otelCollectorEmitter Emitter log logger.Logger - stopCh services.StopChan - wg services.WaitGroup - closed atomic.Bool } -func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) { +func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter, lggr logger.Logger) (Emitter, error) { if chipIngressEmitter == nil { return nil, errors.New("chip ingress emitter is nil") } @@ -32,25 +26,14 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt return nil, errors.New("otel collector emitter is nil") } - logger, err := logger.New() - if err != nil { - return nil, fmt.Errorf("failed to create logger: %w", err) - } - return &DualSourceEmitter{ chipIngressEmitter: chipIngressEmitter, otelCollectorEmitter: otelCollectorEmitter, - log: logger, - stopCh: make(services.StopChan), + log: lggr, }, nil } func (d *DualSourceEmitter) Close() error { - if wasClosed := d.closed.Swap(true); wasClosed { - return errors.New("already closed") - } - close(d.stopCh) - d.wg.Wait() return errors.Join(d.chipIngressEmitter.Close(), d.otelCollectorEmitter.Close()) } @@ -60,22 +43,9 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an return err } - // Emit via chip ingress async - if err := d.wg.TryAdd(1); err != nil { - return err + if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { + d.log.Infof("failed to emit to chip ingress: %v", err) } - go func(ctx context.Context) { - defer d.wg.Done() - var cancel context.CancelFunc - ctx, cancel = d.stopCh.Ctx(ctx) - defer cancel() - - if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { - // If the chip ingress emitter fails, we ONLY log the error - // because we still want to send the data to the OTLP collector and not cause disruption - d.log.Infof("failed to emit to chip ingress: %v", err) - } - }(context.WithoutCancel(ctx)) return nil } diff --git a/pkg/beholder/dual_source_emitter_test.go b/pkg/beholder/dual_source_emitter_test.go index 5e99bedae1..a51e621de7 100644 --- a/pkg/beholder/dual_source_emitter_test.go +++ b/pkg/beholder/dual_source_emitter_test.go @@ -3,12 +3,15 @@ package beholder_test import ( "context" "errors" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNewDualSourceEmitter(t *testing.T) { @@ -17,7 +20,7 @@ func TestNewDualSourceEmitter(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) require.NoError(t, err) assert.NotNil(t, emitter) @@ -27,7 +30,7 @@ func TestNewDualSourceEmitter(t *testing.T) { // Test nil chip ingress emitter t.Run("nil chip ingress emitter", func(t *testing.T) { otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter, logger.Test(t)) assert.Error(t, err) assert.Nil(t, emitter) @@ -36,7 +39,7 @@ func TestNewDualSourceEmitter(t *testing.T) { // Test nil otel collector emitter t.Run("nil otel collector emitter", func(t *testing.T) { chipEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil, logger.Test(t)) assert.Error(t, err) assert.Nil(t, emitter) @@ -47,7 +50,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message"), "key", "value") @@ -62,7 +65,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { }, } - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message")) @@ -71,6 +74,51 @@ func TestDualSourceEmitterEmit(t *testing.T) { }) } +func TestDualSourceEmitterBlockingBehavior(t *testing.T) { + t.Run("chip ingress emit does not block caller", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + // Simulate slow work; the emitter itself is non-blocking + // (fire-and-forget lives inside ChipIngressEmitter or batch service). + time.Sleep(200 * time.Millisecond) + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + require.NoError(t, err) + + err = emitter.Emit(t.Context(), []byte("test")) + assert.NoError(t, err) + + require.NoError(t, emitter.Close()) + }) + + t.Run("chip ingress emit completes inline when emitter is synchronous", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + require.NoError(t, err) + + err = emitter.Emit(t.Context(), []byte("test")) + assert.NoError(t, err) + assert.True(t, chipCalled.Load(), + "chip ingress emit should complete before Emit returns") + + require.NoError(t, emitter.Close()) + }) +} + // Mock emitter for testing type mockEmitter struct { emitFunc func(ctx context.Context, body []byte, attrKVs ...any) error diff --git a/pkg/beholder/global_test.go b/pkg/beholder/global_test.go index 8de51370ba..69c8c11d15 100644 --- a/pkg/beholder/global_test.go +++ b/pkg/beholder/global_test.go @@ -20,17 +20,18 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/beholder/internal/mocks" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestGlobal(t *testing.T) { // Get global logger, tracer, meter, messageEmitter // If not initialized with beholder.SetClient will return noop client logger, tracer, meter, messageEmitter := beholder.GetLogger(), beholder.GetTracer(), beholder.GetMeter(), beholder.GetEmitter() - noopClient := beholder.NewNoopClient() + noopClient := beholder.NewNoopClient(pkglogger.Test(t)) assert.IsType(t, otellognoop.Logger{}, logger) assert.IsType(t, oteltracenoop.Tracer{}, tracer) assert.IsType(t, otelmetricnoop.Meter{}, meter) - expectedMessageEmitter := beholder.NewNoopClient().Emitter + expectedMessageEmitter := beholder.NewNoopClient(pkglogger.Test(t)).Emitter assert.IsType(t, expectedMessageEmitter, messageEmitter) assert.IsType(t, noopClient, beholder.GetClient()) @@ -76,6 +77,10 @@ func TestClient_SetGlobalOtelProviders(t *testing.T) { var b strings.Builder client, err := beholder.NewWriterClient(&b) require.NoError(t, err) + require.NoError(t, client.Start(t.Context())) + defer func() { + require.NoError(t, client.Close()) + }() // Set global Otel Client beholder.SetClient(client) diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 693f581452..4bc3c78db6 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -15,6 +15,8 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" ) // Used for testing to override the default exporter @@ -137,6 +139,8 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro // Message Emitter var messageLogProcessor sdklog.Processor + // EmitterBatchProcessor=true uses async batching for custom-message logs; + // false uses a simple processor that exports each record immediately. if cfg.EmitterBatchProcessor { batchProcessorOpts := []sdklog.BatchProcessorOption{} if cfg.EmitterExportTimeout > 0 { @@ -181,13 +185,32 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } onClose := func() (err error) { - for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - // HTTP client doesn't currently support rotating auth, so lazySigner is always nil - return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil + // HTTP client doesn't currently support rotating auth, so lazySigner is always nil. + c := &Client{ + Config: cfg, + Logger: logger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: nil, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: messageLoggerProvider, + lazySigner: nil, + OnClose: onClose, + } + lggr := cfg.ChipIngressLogger + if lggr == nil { + lggr = pkglogger.Nop() + } + c.initService(lggr, nil) + return c, nil } func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) { diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 3a8378f918..69dcb3b72c 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -18,14 +18,17 @@ import ( oteltracenoop "go.opentelemetry.io/otel/trace/noop" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" ) -// Default client to fallback when is is not initialized properly -func NewNoopClient() *Client { +// Default client to fallback when is is not initialized properly. +// An optional logger may be passed to surface service-engine diagnostics; +// when omitted a no-op logger is used. +func NewNoopClient(optLogger ...pkglogger.Logger) *Client { cfg := DefaultConfig() // Logger loggerProvider := otellognoop.NewLoggerProvider() - logger := loggerProvider.Logger(defaultPackageName) + otelLogger := loggerProvider.Logger(defaultPackageName) // Tracer tracerProvider := oteltracenoop.NewTracerProvider() tracer := tracerProvider.Tracer(defaultPackageName) @@ -40,7 +43,25 @@ func NewNoopClient() *Client { // ChipIngress chipClient := &chipingress.NoopClient{} - return &Client{cfg, logger, tracer, meter, messageEmitter, chipClient, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} + c := &Client{ + Config: cfg, + Logger: otelLogger, + Tracer: tracer, + Meter: meter, + Emitter: messageEmitter, + Chip: chipClient, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: loggerProvider, + OnClose: noopOnClose, + } + lggr := pkglogger.Logger(pkglogger.Nop()) + if len(optLogger) > 0 && optLogger[0] != nil { + lggr = optLogger[0] + } + c.initService(lggr, nil) + return c } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output @@ -62,7 +83,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { return NewNoopClient(), err } loggerProvider := sdklog.NewLoggerProvider(sdklog.WithProcessor(sdklog.NewSimpleProcessor(loggerExporter))) - logger := loggerProvider.Logger(defaultPackageName) + otelLogger := loggerProvider.Logger(defaultPackageName) // Tracer traceExporter, err := stdouttrace.New(cfg.TraceOptions...) @@ -90,7 +111,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { meter := meterProvider.Meter(defaultPackageName) // MessageEmitter - emitter := messageEmitter{messageLogger: logger} + emitter := messageEmitter{messageLogger: otelLogger} onClose := func() (err error) { for _, provider := range []shutdowner{loggerProvider, tracerProvider, meterProvider} { @@ -99,7 +120,22 @@ func NewWriterClient(w io.Writer) (*Client, error) { return } - return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: &chipingress.NoopClient{}, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil + c := &Client{ + Config: cfg.Config, + Logger: otelLogger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: &chipingress.NoopClient{}, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: loggerProvider, + lazySigner: nil, + OnClose: onClose, + } + c.initService(pkglogger.Nop(), nil) + return c, nil } type noopMessageEmitter struct{} diff --git a/pkg/beholder/noop_test.go b/pkg/beholder/noop_test.go index 332c677f1d..5060b54484 100644 --- a/pkg/beholder/noop_test.go +++ b/pkg/beholder/noop_test.go @@ -15,10 +15,11 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNoopClient(t *testing.T) { - noopClient := beholder.NewNoopClient() + noopClient := beholder.NewNoopClient(logger.Test(t)) assert.NotNil(t, noopClient) // Message Emitter @@ -85,6 +86,7 @@ func TestNoopClient(t *testing.T) { err = noopClient.Chip.Close() assert.NoError(t, err) + require.NoError(t, noopClient.Start(t.Context())) err = noopClient.Close() assert.NoError(t, err) } diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 0335b8ce7b..966be68e66 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -84,8 +84,9 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -98,6 +99,7 @@ type EnvConfig struct { ChipIngressEndpoint string ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -146,6 +148,8 @@ type EnvConfig struct { TelemetryAuthHeaders map[string]string TelemetryAuthPubKeyHex string TelemetryAuthHeadersTTL time.Duration + // TelemetryEmitterBatchProcessor maps to beholder Config.EmitterBatchProcessor + // (batched async custom-message export vs immediate per-record export). TelemetryEmitterBatchProcessor bool TelemetryEmitterExportTimeout time.Duration TelemetryEmitterExportInterval time.Duration @@ -255,6 +259,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) + add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -486,6 +491,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressInsecureConnection, err) } + e.ChipIngressBatchEmitterEnabled, err = getBool(envChipIngressBatchEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 60547f4803..024ff85622 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -84,8 +84,9 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryEmitterMaxQueueSize: "1000", envTelemetryLogStreamingEnabled: "false", - envChipIngressEndpoint: "chip-ingress.example.com:50051", - envChipIngressInsecureConnection: "true", + envChipIngressEndpoint: "chip-ingress.example.com:50051", + envChipIngressInsecureConnection: "true", + envChipIngressBatchEmitterEnabled: "false", envCRESettings: `{"global":{}}`, envCRESettingsDefault: `{"foo":"bar"}`, @@ -195,8 +196,9 @@ var envCfgFull = EnvConfig{ TelemetryEmitterMaxQueueSize: 1000, TelemetryLogStreamingEnabled: false, - ChipIngressEndpoint: "chip-ingress.example.com:50051", - ChipIngressInsecureConnection: true, + ChipIngressEndpoint: "chip-ingress.example.com:50051", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, CRESettings: `{"global":{}}`, CRESettingsDefault: `{"foo":"bar"}`, @@ -259,6 +261,7 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { // Assert ChipIngress environment variables assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint]) assert.Equal(t, "true", got[envChipIngressInsecureConnection]) + assert.Equal(t, "false", got[envChipIngressBatchEmitterEnabled]) assert.JSONEq(t, `{"global":{}}`, got[envCRESettings]) assert.JSONEq(t, `{"foo":"bar"}`, got[envCRESettingsDefault]) diff --git a/pkg/loop/plugin_relayer_emitter_test.go b/pkg/loop/plugin_relayer_emitter_test.go index 71219e37df..f08c971565 100644 --- a/pkg/loop/plugin_relayer_emitter_test.go +++ b/pkg/loop/plugin_relayer_emitter_test.go @@ -122,7 +122,7 @@ func TestParseOriginURL(t *testing.T) { func TestNewPluginRelayerConfigEmitterDefaults(t *testing.T) { prev := beholder.GetClient() - client := beholder.NewNoopClient() + client := beholder.NewNoopClient(logger.Test(t)) client.Config.AuthPublicKeyHex = "from-beholder" beholder.SetClient(client) t.Cleanup(func() { beholder.SetClient(prev) }) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index a7d92ee705..307603d502 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -101,6 +101,7 @@ type Server struct { checker *services.HealthChecker LimitsFactory limits.Factory profiler *pyroscope.Profiler + beholderClient *beholder.Client } func newServer(loggerName string) (*Server, error) { @@ -180,6 +181,8 @@ func (s *Server) start(opts ...ServerOpt) error { ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + ChipIngressBatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled, + ChipIngressLogger: s.Logger, MetricCompressor: s.EnvConfig.TelemetryMetricCompressor, } @@ -212,19 +215,8 @@ func (s *Server) start(opts ...ServerOpt) error { beholderCfg.TraceSpanExporter = exporter } - beholderClient, err := beholder.NewClient(beholderCfg) - if err != nil { - return fmt.Errorf("failed to create beholder client: %w", err) - } - beholder.SetClient(beholderClient) - beholder.SetGlobalOtelProviders() - - if beholderCfg.LogStreamingEnabled { - otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel) - if err != nil { - return fmt.Errorf("failed to enable log streaming: %w", err) - } - s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) + if err := s.startBeholderClient(ctx, beholderCfg); err != nil { + return err } } @@ -349,8 +341,34 @@ func (s *Server) MustRegister(c services.HealthReporter) { func (s *Server) Register(c services.HealthReporter) error { return s.checker.Register(c) } +func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.Config) error { + beholderClient, err := beholder.NewClient(beholderCfg) + if err != nil { + return fmt.Errorf("failed to create beholder client: %w", err) + } + if err := beholderClient.Start(ctx); err != nil { + return fmt.Errorf("failed to start beholder client: %w", err) + } + s.beholderClient = beholderClient + beholder.SetClient(beholderClient) + beholder.SetGlobalOtelProviders() + + if beholderCfg.LogStreamingEnabled { + otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel) + if err != nil { + return fmt.Errorf("failed to enable log streaming: %w", err) + } + s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) + } + + return nil +} + // Stop closes resources and flushes logs. func (s *Server) Stop() { + if s.beholderClient != nil { + s.Logger.ErrorIfFn(s.beholderClient.Close, "Failed to close beholder client") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() } diff --git a/pkg/services/service.go b/pkg/services/service.go index bf19f8d918..6257ff6b51 100644 --- a/pkg/services/service.go +++ b/pkg/services/service.go @@ -254,7 +254,7 @@ func (s *service) Name() string { return s.eng.Name() } func (s *service) Start(ctx context.Context) error { return s.StartOnce(s.cfg.Name, func() error { var span trace.Span - ctx, span = s.eng.tracer.Start(ctx, "Start") //nolint + ctx, span = s.eng.tracer.Start(ctx, "Start") defer span.End() s.eng.Info("Starting") From 0f95004f50c8c97b7dd79d010e7def169ab34c1e Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Thu, 14 May 2026 18:02:31 -0400 Subject: [PATCH 2/8] chore: bump chipingress dependency to publishBatch --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 99856d2922..d18b558562 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.89 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514213647-fddcb86b6482 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 929885cee3..0c213b59e5 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514213647-fddcb86b6482 h1:CpuTkw3SooswiRubVfKl5AeLASN34QwnU8UuYMMcqbE= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514213647-fddcb86b6482/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909 h1:r/llpKW6YNTWNjTDElkC1nGJUz/ryomPnRoNON8qO84= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 h1:9vjqB+iNqwyazVoVjR1rozHXTeRYyeggavt3Q4sbNrg= From 4601e78d0db479d8d17bef8edacd0d22992de5c1 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 09:40:29 -0400 Subject: [PATCH 3/8] refactor: remove context.Background() in batch emitter service and tests Use caller ctx for OTel metric Add calls (non-blocking, tolerates cancelled contexts) and b.Context() in benchmarks. --- pkg/beholder/batch_emitter_service.go | 11 +++++++---- pkg/beholder/batch_emitter_service_test.go | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/beholder/batch_emitter_service.go b/pkg/beholder/batch_emitter_service.go index b23dcaebe8..b352525eb6 100644 --- a/pkg/beholder/batch_emitter_service.go +++ b/pkg/beholder/batch_emitter_service.go @@ -146,20 +146,23 @@ func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body } metricAttrs := e.metricAttrsFor(domain, entity) - metricsCtx := context.Background() queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { + // The callback fires asynchronously after the batch is sent, + // so the caller's ctx may already be cancelled. Use ctx directly + // for metric recording — OTel Add is non-blocking and tolerates + // cancelled contexts. if sendErr != nil { - e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + e.metrics.eventsDropped.Add(ctx, 1, metricAttrs) } else { - e.metrics.eventsSent.Add(metricsCtx, 1, metricAttrs) + e.metrics.eventsSent.Add(ctx, 1, metricAttrs) } if callback != nil { callback(sendErr) } }) if queueErr != nil { - e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + e.metrics.eventsDropped.Add(ctx, 1, metricAttrs) if callback != nil { callback(queueErr) } diff --git a/pkg/beholder/batch_emitter_service_test.go b/pkg/beholder/batch_emitter_service_test.go index d685552f44..7fd0f0c543 100644 --- a/pkg/beholder/batch_emitter_service_test.go +++ b/pkg/beholder/batch_emitter_service_test.go @@ -541,7 +541,7 @@ func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) { if err != nil { b.Fatal(err) } - if err := emitter.Start(context.Background()); err != nil { + if err := emitter.Start(b.Context()); err != nil { b.Fatal(err) } defer func() { _ = emitter.Close() }() @@ -549,7 +549,7 @@ func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) { payload := []byte("benchmark-payload") b.ResetTimer() for i := 0; i < b.N; i++ { - if err := emitter.Emit(context.Background(), payload, + if err := emitter.Emit(b.Context(), payload, beholder.AttrKeyDomain, "bench", beholder.AttrKeyEntity, "BenchmarkEvent", ); err != nil { From b5776a538a83ac72e24ff96a93101ab2bdf32fe4 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 13:40:17 -0400 Subject: [PATCH 4/8] chore: bump chipingress dependency to latest main --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d18b558562..d99cfe80a3 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.89 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 0c213b59e5..8953914adb 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909 h1:r/llpKW6YNTWNjTDElkC1nGJUz/ryomPnRoNON8qO84= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad h1:BfgX+8tDqkLJDlTsKykIKUUIlitF6UwSBuzAT6wDzLY= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 h1:9vjqB+iNqwyazVoVjR1rozHXTeRYyeggavt3Q4sbNrg= From 3aa7eecbc409ae9df1d6abb18d78a49977c21209 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 16:12:02 -0400 Subject: [PATCH 5/8] fix: use engine lifecycle context in batch emitter Start instead of startup ctx The services contract forbids retaining the startup context after Start returns. Use eng.NewCtx() to get a lifecycle-owned context that is cancelled when StopChan closes during service shutdown, rather than passing through the caller's startup context. --- pkg/beholder/batch_emitter_service.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/beholder/batch_emitter_service.go b/pkg/beholder/batch_emitter_service.go index b352525eb6..5a4a795708 100644 --- a/pkg/beholder/batch_emitter_service.go +++ b/pkg/beholder/batch_emitter_service.go @@ -97,7 +97,11 @@ func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lg return e, nil } -func (e *ChipIngressBatchEmitterService) start(ctx context.Context) error { +func (e *ChipIngressBatchEmitterService) start(_ context.Context) error { + // Do not pass the startup ctx — the services contract forbids retaining it + // after Start returns. Use the engine's lifecycle context so the batcher + // is cancelled when the service shuts down (StopChan closes before stop() runs). + ctx, _ := e.eng.NewCtx() e.batchClient.Start(ctx) return nil } From a2567d3b1a3de1f3835f6cf9868859521574ec02 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 16:20:04 -0400 Subject: [PATCH 6/8] fix: use defaulted logger for legacy chip-ingress emitter to prevent nil panic --- pkg/beholder/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index dc40c9ed70..85eec273d4 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -260,7 +260,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // teardown after parent close hook completes. chipIngressEmitter = noCloseEmitter{Emitter: batchEmitterService} } else { - chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient, cfg.ChipIngressLogger) + chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient, lggr) if err != nil { return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) } From 65d0ba349fcacff849d3a32ce3439f579e6986c5 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Mon, 18 May 2026 11:02:23 -0400 Subject: [PATCH 7/8] bump chipingress to main (bacfb6ba4146), gomodtidy --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d99cfe80a3..40bbdcb782 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.89 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 8953914adb..f066a4ef4c 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad h1:BfgX+8tDqkLJDlTsKykIKUUIlitF6UwSBuzAT6wDzLY= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260515172105-f60f14be40ad/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 h1:PlkA7NGpBm5sc2P//crDFgMIQ0qsQhKcpjWV7Qzwqz8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 h1:9vjqB+iNqwyazVoVjR1rozHXTeRYyeggavt3Q4sbNrg= From 38c1d352a7d0fe72cc859dd493d4d7dd83dcc8ed Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Mon, 18 May 2026 15:58:42 -0400 Subject: [PATCH 8/8] refactor: use config structs for emitter/noop constructors preserving signatures Introduce ChipIngressEmitterConfig, DualSourceEmitterConfig, and NoopClientConfig structs with New() factory methods. The public constructors (NewChipIngressEmitter, NewDualSourceEmitter, NewNoopClient) retain their original signatures and delegate to the config structs. Internal callers that need a logger use the config struct directly; nil Lggr defaults to logger.Nop(). --- pkg/beholder/beholdertest/beholder.go | 2 +- pkg/beholder/chip_ingress_emitter.go | 22 ++++++++++++++---- pkg/beholder/chip_ingress_emitter_test.go | 17 ++++++++++---- pkg/beholder/client.go | 4 ++-- pkg/beholder/dual_source_emitter.go | 23 +++++++++++++++---- pkg/beholder/dual_source_emitter_test.go | 15 ++++++------ pkg/beholder/global_test.go | 4 ++-- pkg/beholder/noop.go | 28 +++++++++++++++-------- pkg/beholder/noop_test.go | 2 +- pkg/loop/plugin_relayer_emitter_test.go | 2 +- 10 files changed, 81 insertions(+), 38 deletions(-) diff --git a/pkg/beholder/beholdertest/beholder.go b/pkg/beholder/beholdertest/beholder.go index 6b590db571..37d650a98b 100644 --- a/pkg/beholder/beholdertest/beholder.go +++ b/pkg/beholder/beholdertest/beholder.go @@ -120,7 +120,7 @@ func NewObserver(t *testing.T) Observer { messageEmitter := &assertMessageEmitter{t: t} - client := beholder.NewNoopClient(logger.Test(t)) + client := beholder.NoopClientConfig{Lggr: logger.Test(t)}.New() client.Emitter = messageEmitter require.NoError(t, client.Start(t.Context())) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index d13bd8320f..4bca08b47f 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -16,20 +16,34 @@ import ( // in a fire-and-forget goroutine so callers are never blocked. type ChipIngressEmitter struct { client chipingress.Client - log logger.Logger + lggr logger.Logger stopCh services.StopChan wg services.WaitGroup closed atomic.Bool } -func NewChipIngressEmitter(client chipingress.Client, lggr logger.Logger) (Emitter, error) { +func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) { + return ChipIngressEmitterConfig{}.New(client) +} + +// ChipIngressEmitterConfig holds configuration for creating a ChipIngressEmitter. +type ChipIngressEmitterConfig struct { + Lggr logger.Logger +} + +// New creates a ChipIngressEmitter from the config. +func (c ChipIngressEmitterConfig) New(client chipingress.Client) (Emitter, error) { if client == nil { return nil, errors.New("chip ingress client is nil") } + lggr := c.Lggr + if lggr == nil { + lggr = logger.Nop() + } return &ChipIngressEmitter{ client: client, - log: lggr, + lggr: lggr, stopCh: make(services.StopChan), }, nil } @@ -73,7 +87,7 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a defer cancel() if _, err := c.client.Publish(ctx, eventPb); err != nil { - c.log.Infof("failed to emit to chip ingress: %v", err) + c.lggr.Infof("failed to emit to chip ingress: %v", err) } }(context.WithoutCancel(ctx)) diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index 1ad998adfc..11349d335b 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -16,13 +16,20 @@ import ( func TestNewChipIngressEmitter(t *testing.T) { t.Run("happy path", func(t *testing.T) { clientMock := mocks.NewClient(t) - emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) + emitter, err := beholder.NewChipIngressEmitter(clientMock) + require.NoError(t, err) + assert.NotNil(t, emitter) + }) + + t.Run("happy path with config struct", func(t *testing.T) { + clientMock := mocks.NewClient(t) + emitter, err := beholder.ChipIngressEmitterConfig{Lggr: logger.Test(t)}.New(clientMock) require.NoError(t, err) assert.NotNil(t, emitter) }) t.Run("returns error when client is nil", func(t *testing.T) { - emitter, err := beholder.NewChipIngressEmitter(nil, logger.Test(t)) + emitter, err := beholder.NewChipIngressEmitter(nil) assert.Error(t, err) assert.Nil(t, emitter) }) @@ -47,7 +54,7 @@ func TestChipIngressEmit(t *testing.T) { Return(nil, nil) clientMock.On("Close").Return(nil) - emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) + emitter, err := beholder.ChipIngressEmitterConfig{Lggr: logger.Test(t)}.New(clientMock) require.NoError(t, err) err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity, attributes) @@ -59,7 +66,7 @@ func TestChipIngressEmit(t *testing.T) { }) t.Run("returns error when ExtractSourceAndType fails", func(t *testing.T) { - emitter, err := beholder.NewChipIngressEmitter(mocks.NewClient(t), logger.Test(t)) + emitter, err := beholder.ChipIngressEmitterConfig{Lggr: logger.Test(t)}.New(mocks.NewClient(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), body, "bad_key", domain) @@ -74,7 +81,7 @@ func TestChipIngressEmit(t *testing.T) { Return(nil, assert.AnError) clientMock.On("Close").Return(nil) - emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) + emitter, err := beholder.ChipIngressEmitterConfig{Lggr: logger.Test(t)}.New(clientMock) require.NoError(t, err) // Emit returns nil because the error is logged asynchronously. diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 85eec273d4..e8fce549bc 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -260,13 +260,13 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // teardown after parent close hook completes. chipIngressEmitter = noCloseEmitter{Emitter: batchEmitterService} } else { - chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient, lggr) + chipIngressEmitter, err = ChipIngressEmitterConfig{Lggr: lggr}.New(chipIngressClient) if err != nil { return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) } } - emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter, lggr) + emitter, err = DualSourceEmitterConfig{Lggr: lggr}.New(chipIngressEmitter, emitter) if err != nil { return nil, fmt.Errorf("failed to create dual source emitter: %w", err) } diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index 4b2edfbc19..c1f4886c38 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -14,10 +14,20 @@ import ( type DualSourceEmitter struct { chipIngressEmitter Emitter otelCollectorEmitter Emitter - log logger.Logger + lggr logger.Logger } -func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter, lggr logger.Logger) (Emitter, error) { +func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) { + return DualSourceEmitterConfig{}.New(chipIngressEmitter, otelCollectorEmitter) +} + +// DualSourceEmitterConfig holds configuration for creating a DualSourceEmitter. +type DualSourceEmitterConfig struct { + Lggr logger.Logger +} + +// New creates a DualSourceEmitter from the config. +func (c DualSourceEmitterConfig) New(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) { if chipIngressEmitter == nil { return nil, errors.New("chip ingress emitter is nil") } @@ -26,10 +36,15 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt return nil, errors.New("otel collector emitter is nil") } + lggr := c.Lggr + if lggr == nil { + lggr = logger.Nop() + } + return &DualSourceEmitter{ chipIngressEmitter: chipIngressEmitter, otelCollectorEmitter: otelCollectorEmitter, - log: lggr, + lggr: lggr, }, nil } @@ -44,7 +59,7 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an } if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { - d.log.Infof("failed to emit to chip ingress: %v", err) + d.lggr.Infof("failed to emit to chip ingress: %v", err) } return nil diff --git a/pkg/beholder/dual_source_emitter_test.go b/pkg/beholder/dual_source_emitter_test.go index a51e621de7..3f4d99391d 100644 --- a/pkg/beholder/dual_source_emitter_test.go +++ b/pkg/beholder/dual_source_emitter_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/beholder" - "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNewDualSourceEmitter(t *testing.T) { @@ -20,7 +19,7 @@ func TestNewDualSourceEmitter(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) require.NoError(t, err) assert.NotNil(t, emitter) @@ -30,7 +29,7 @@ func TestNewDualSourceEmitter(t *testing.T) { // Test nil chip ingress emitter t.Run("nil chip ingress emitter", func(t *testing.T) { otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter) assert.Error(t, err) assert.Nil(t, emitter) @@ -39,7 +38,7 @@ func TestNewDualSourceEmitter(t *testing.T) { // Test nil otel collector emitter t.Run("nil otel collector emitter", func(t *testing.T) { chipEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil) assert.Error(t, err) assert.Nil(t, emitter) @@ -50,7 +49,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message"), "key", "value") @@ -65,7 +64,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { }, } - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message")) @@ -88,7 +87,7 @@ func TestDualSourceEmitterBlockingBehavior(t *testing.T) { } otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test")) @@ -107,7 +106,7 @@ func TestDualSourceEmitterBlockingBehavior(t *testing.T) { } otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test")) diff --git a/pkg/beholder/global_test.go b/pkg/beholder/global_test.go index 69c8c11d15..0481c70b0c 100644 --- a/pkg/beholder/global_test.go +++ b/pkg/beholder/global_test.go @@ -27,11 +27,11 @@ func TestGlobal(t *testing.T) { // Get global logger, tracer, meter, messageEmitter // If not initialized with beholder.SetClient will return noop client logger, tracer, meter, messageEmitter := beholder.GetLogger(), beholder.GetTracer(), beholder.GetMeter(), beholder.GetEmitter() - noopClient := beholder.NewNoopClient(pkglogger.Test(t)) + noopClient := beholder.NoopClientConfig{Lggr: pkglogger.Test(t)}.New() assert.IsType(t, otellognoop.Logger{}, logger) assert.IsType(t, oteltracenoop.Tracer{}, tracer) assert.IsType(t, otelmetricnoop.Meter{}, meter) - expectedMessageEmitter := beholder.NewNoopClient(pkglogger.Test(t)).Emitter + expectedMessageEmitter := beholder.NoopClientConfig{Lggr: pkglogger.Test(t)}.New().Emitter assert.IsType(t, expectedMessageEmitter, messageEmitter) assert.IsType(t, noopClient, beholder.GetClient()) diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 69dcb3b72c..5c506d527e 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -22,9 +22,21 @@ import ( ) // Default client to fallback when is is not initialized properly. -// An optional logger may be passed to surface service-engine diagnostics; -// when omitted a no-op logger is used. -func NewNoopClient(optLogger ...pkglogger.Logger) *Client { +func NewNoopClient() *Client { + return NoopClientConfig{}.New() +} + +// NoopClientConfig holds configuration for creating a no-op Client. +type NoopClientConfig struct { + Lggr pkglogger.Logger +} + +// New creates a no-op Client from the config. +func (c NoopClientConfig) New() *Client { + lggr := c.Lggr + if lggr == nil { + lggr = pkglogger.Nop() + } cfg := DefaultConfig() // Logger loggerProvider := otellognoop.NewLoggerProvider() @@ -43,7 +55,7 @@ func NewNoopClient(optLogger ...pkglogger.Logger) *Client { // ChipIngress chipClient := &chipingress.NoopClient{} - c := &Client{ + cl := &Client{ Config: cfg, Logger: otelLogger, Tracer: tracer, @@ -56,12 +68,8 @@ func NewNoopClient(optLogger ...pkglogger.Logger) *Client { MessageLoggerProvider: loggerProvider, OnClose: noopOnClose, } - lggr := pkglogger.Logger(pkglogger.Nop()) - if len(optLogger) > 0 && optLogger[0] != nil { - lggr = optLogger[0] - } - c.initService(lggr, nil) - return c + cl.initService(lggr, nil) + return cl } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output diff --git a/pkg/beholder/noop_test.go b/pkg/beholder/noop_test.go index 5060b54484..00c93df14b 100644 --- a/pkg/beholder/noop_test.go +++ b/pkg/beholder/noop_test.go @@ -19,7 +19,7 @@ import ( ) func TestNoopClient(t *testing.T) { - noopClient := beholder.NewNoopClient(logger.Test(t)) + noopClient := beholder.NoopClientConfig{Lggr: logger.Test(t)}.New() assert.NotNil(t, noopClient) // Message Emitter diff --git a/pkg/loop/plugin_relayer_emitter_test.go b/pkg/loop/plugin_relayer_emitter_test.go index f08c971565..3042a1d0b8 100644 --- a/pkg/loop/plugin_relayer_emitter_test.go +++ b/pkg/loop/plugin_relayer_emitter_test.go @@ -122,7 +122,7 @@ func TestParseOriginURL(t *testing.T) { func TestNewPluginRelayerConfigEmitterDefaults(t *testing.T) { prev := beholder.GetClient() - client := beholder.NewNoopClient(logger.Test(t)) + client := beholder.NoopClientConfig{Lggr: logger.Test(t)}.New() client.Config.AuthPublicKeyHex = "from-beholder" beholder.SetClient(client) t.Cleanup(func() { beholder.SetClient(prev) })