Skip to content

chip ingress batching#1862

Closed
thomaska wants to merge 30 commits into
mainfrom
infoplat-3436-chipingress-publishBatch
Closed

chip ingress batching#1862
thomaska wants to merge 30 commits into
mainfrom
infoplat-3436-chipingress-publishBatch

Conversation

@thomaska
Copy link
Copy Markdown

@thomaska thomaska commented Feb 27, 2026

Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436

Summary

This PR adds chip-ingress batch delivery behind beholder, makes beholder.Client own the batch emitter lifecycle, and adds a service-layer escape hatch for cleanup when a lifecycle-managed service is closed before it was ever started.

What changed

  • added ChipIngressBatchEmitterService, which batches emitted events and publishes them through chipingress.Client.PublishBatch
  • beholder.Client now owns the optional batch emitter as a sub-service and starts/stops it through its service lifecycle
  • added services.Config.CloseIfNeverStarted so ctor-held resources can still run their close hook even when Start() was never called
  • updated DualSourceEmitter.Close() so it does not directly close a service-managed batch emitter
  • pkg/chipingress/batch.Client now records batch request metrics and flushes shutdown work independently of caller cancellation
  • LOOP now starts/stops beholder.Client directly and no longer uses test-only indirection in pkg/loop/server.go
  • removed the obsolete pkg/loop/server_test.go
  • added LOOP config support for CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED

Data Flow

flowchart LR
    A["Caller or app code"] -->|"Emit(ctx, body, attrs)"| B["beholder.Client.Emitter"]
    B --> C["DualSourceEmitter"]
    C -->|"synchronous"| D["OTLP message emitter"]
    C -->|"batch enabled"| E["ChipIngressBatchEmitterService"]
    E -->|"queue event"| F["batch.Client"]
    F -->|"accumulate by size or interval"| G["PublishBatch request"]
    G --> H["chipingress.Client"]
    H --> I["Chip Ingress gRPC endpoint"]

    F --> J["batch client metrics"]
    E --> K["emitter success or drop metrics"]
    B --> L["beholder logger, tracer, and meter providers"]
Loading

Dependency Diagram

flowchart TD
    S["loop.Server"] -->|"start and close"| BC["beholder.Client"]
    S --> CFG["loop.EnvConfig"]
    CFG --> FLAG["CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"]

    BC --> ENG1["services.Engine"]
    BC --> DSE["DualSourceEmitter"]
    BC --> CHIP["chipingress.Client"]
    BC --> OTLP["OTLP providers"]
    ENG1 --> BES["ChipIngressBatchEmitterService"]

    BES --> ENG2["services.Engine"]
    BES --> BATCH["batch.Client"]
    BATCH --> CHIP

    ENG1 --> CINS["CloseIfNeverStarted"]
    ENG2 --> CINS
    CINS --> SVC["services.Config"]
Loading

Metrics

Added batch delivery metrics for:

  • chip_ingress.batch.send_requests_total
  • chip_ingress.batch.send_failures_total
  • chip_ingress.batch.request_size_messages
  • chip_ingress.batch.request_size_bytes
  • chip_ingress.batch.request_latency_ms
  • chip_ingress.batch.config.info
  • chip_ingress.events_sent
  • chip_ingress.events_dropped

Tests

Added coverage for:

  • CloseIfNeverStarted lifecycle behavior
  • beholder client ownership of the batch emitter sub-service
  • batch emitter lifecycle and success/drop metrics
  • batch client metrics
  • benchmark smoke coverage for batch queueing and emitter enqueue paths

Supports

smartcontractkit/chainlink#21327

Copilot AI review requested due to automatic review settings February 27, 2026 14:43
@thomaska thomaska requested a review from a team as a code owner February 27, 2026 14:43
@github-actions
Copy link
Copy Markdown

👋 thomaska, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.

Changes:

  • Introduced ChipIngressBatchEmitter with per-(domain, entity) worker goroutines for batching events
  • Added chipIngressEmitterWorker to handle batch assembly and sending with configurable timeouts
  • Removed goroutine wrapper from DualSourceEmitter.Emit() since batching is now non-blocking (channel send)
  • Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
pkg/beholder/chip_ingress_batch_emitter.go New batch emitter with per-worker buffering and periodic flushing via PublishBatch
pkg/beholder/chip_ingress_emitter_worker.go Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops
pkg/beholder/chip_ingress_batch_emitter_test.go Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults
pkg/beholder/dual_source_emitter.go Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking
pkg/beholder/client.go Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering
pkg/beholder/config.go Added 4 new config fields with inline documentation and default values
pkg/beholder/config_test.go Updated expected output to include new config fields
Comments suppressed due to low confidence (2)

pkg/beholder/config.go:50

  • The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
	ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.

pkg/beholder/client.go:248

  • The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
		for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Feb 27, 2026

⚠️ API Diff Results - github.com/smartcontractkit/chainlink-common

⚠️ Breaking Changes (3)

pkg/beholder (3)
  • NewChipIngressEmitter — Type changed:
func(
  github.com/smartcontractkit/chainlink-common/pkg/chipingress.Client, 
  + github.com/smartcontractkit/chainlink-common/pkg/logger.Logger
)
(Emitter, error)
  • NewDualSourceEmitter — Type changed:
func(
  Emitter, 
  Emitter, 
  + github.com/smartcontractkit/chainlink-common/pkg/logger.Logger
)
(Emitter, error)
  • NewNoopClient — Type changed:
func(
  - 
  + ...github.com/smartcontractkit/chainlink-common/pkg/logger.Logger
)
*Client

✅ Compatible Changes (25)

pkg/beholder (2)
  • ChipIngressBatchEmitterService — ➕ Added

  • NewChipIngressBatchEmitterService — ➕ Added

pkg/beholder.BeholderClient (1)
  • Service — ➕ Added
pkg/beholder.Client (1)
  • Service — ➕ Added
pkg/beholder.Config (8)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressLogger — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxConcurrentSends — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/beholder.writerClientConfig (8)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressLogger — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxConcurrentSends — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/loop.EnvConfig (1)
  • ChipIngressBatchEmitterEnabled — ➕ Added
pkg/services.HealthReporter (3)
  • HealthReport — ➕ Added

  • Name — ➕ Added

  • Ready — ➕ Added

pkg/services.Service (1)
  • Start — ➕ Added

📄 View full apidiff report

@smartcontractkit smartcontractkit deleted a comment from github-actions Bot Feb 27, 2026
Comment thread pkg/beholder/client.go
return nil, err
}

chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a feature flag

// via chipingress.Client.PublishBatch on a periodic interval.
// It satisfies the Emitter interface so it can be used as a drop-in replacement
// for ChipIngressEmitter.
type ChipIngressBatchEmitter struct {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name it Service

return e, nil
}

func (e *ChipIngressBatchEmitter) start(_ context.Context) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the role of this function if it always returns null?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.


// NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client.
// Call Start() to begin health monitoring, and Close() to stop all workers.
func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor

var events []chipingress.CloudEvent

for len(w.ch) > 0 && len(events) < int(w.maxBatchSize) { // #nosec G115
payload := <-w.ch
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if im not mistaken, this can block if the channel is drained by another goroutine.

i'd use a select instead

max := int(w.maxBatchSize)

for len(events) < max {
	select {
	case payload := <-w.ch:
		event, err := w.payloadToEvent(payload)
		if err != nil {
			w.lggr.Warnf("failed to build CloudEvent, dropping: %v", err)
			continue
		}
		events = append(events, event)
	default:
		return
	}
}

Comment thread pkg/beholder/beholdertest/beholder.go Outdated
Comment thread pkg/beholder/noop.go Outdated
Comment thread pkg/beholder/global_test.go
Comment thread pkg/chipingress/client.go Outdated
pkcll added 3 commits May 11, 2026 09:49
NewObserver duplicated noop-provider setup and lacked services.Service
initialization, risking nil-dereference on Start/Close/Ready calls.

Delegate to NewNoopClient() which already wires the service engine,
then swap in the test emitter.
grpc.ClientConn.Close is safe to call; let callers see the error on
double-close rather than silently swallowing it.
Comment thread pkg/beholder/client.go Outdated
pkcll added 5 commits May 11, 2026 15:35
…ice method

Addresses review feedback to have only one instance of the
services.Config{}.NewServiceEngine() setup code. Added Client.initService()
which handles nil-logger fallback and NewSubServices for batch emitter,
replacing 4 duplicated call sites with slight variations.
services.Engine already guards against double-close, so the internal
sync.Once guard was redundant and suppressed errors on repeated calls.
Comment thread pkg/beholder/client.go Outdated
Comment thread pkg/beholder/dual_source_emitter.go Outdated
jmank88
jmank88 previously approved these changes May 14, 2026
…visible

NewNoopClient now accepts an optional logger so tests can pass
logger.Test(t) instead of logger.Nop(). All beholder and loop test
call-sites updated; production callers (global.go init) continue
using the no-arg form which falls back to Nop.
@pkcll
Copy link
Copy Markdown
Contributor

pkcll commented May 15, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants