feat: add ChipIngress batch emitter support#21327
Conversation
|
👋 thomaska, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
|
✅ No conflicts with other open PRs targeting |
There was a problem hiding this comment.
Pull request overview
Adds PublishBatch support to the chip-testsink gRPC server so CRE/system tests don’t fail with UNIMPLEMENTED when nodes emit batched ChIP ingress events.
Changes:
- Implement
PublishBatchon the chip-testsinkChipIngressServer. - Delegate batch handling to the existing
Publishflow (including configuredPublishFuncand optional upstream forwarding).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| for _, event := range batch.Events { | ||
| if _, err := s.Publish(ctx, event); err != nil { |
There was a problem hiding this comment.
Calling s.Publish() inside the batch loop triggers the per-event async upstream forwarding goroutine in Publish(). For large batches this can create a burst of goroutines and N upstream RPCs. Consider handling upstream forwarding in PublishBatch with a single PublishBatch call (or at least a bounded worker/pool), and calling the configured PublishFunc directly for local handling to avoid unbounded goroutine/RPC fan-out per batch.
| if _, err := s.Publish(ctx, event); err != nil { | |
| // Forward upstream synchronously to avoid spawning a goroutine per event. | |
| if s.cfg.UpstreamEndpoint != "" { | |
| forwardCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) | |
| _, err := s.upstream.Publish(forwardCtx, event) | |
| cancelFn() | |
| if err != nil { | |
| log.Printf("failed to forward to upstream: %v", err) | |
| } | |
| } | |
| if _, err := s.cfg.PublishFunc(ctx, event); err != nil { |
| // It delegates each event in the batch to the configured PublishFunc, | ||
| // mirroring how the real ChIP Ingress processes batches atomically. |
There was a problem hiding this comment.
The doc comment claims this mirrors how the real ChIP ingress processes batches "atomically", but this implementation is not atomic: it publishes events one-by-one and can return an error after earlier events have already been accepted/forwarded. Please either adjust the comment to reflect best-effort sequential processing, or change the implementation to provide the atomicity guarantees being documented.
| // It delegates each event in the batch to the configured PublishFunc, | |
| // mirroring how the real ChIP Ingress processes batches atomically. | |
| // It delegates each event in the batch to the configured PublishFunc | |
| // sequentially, returning an error on the first failure. Earlier events | |
| // in the batch may already have been published or forwarded when an error | |
| // is returned, so processing is best-effort rather than atomic. |
|
|
I see you updated files related to
|
|
Based on my thorough review of both PRs, here is a concrete implementation plan for registering Implementation PlanProblemThe
Step 1: Expose the batch emitter from
|
| Concern | How it's addressed |
|---|---|
| Initialization order | initGlobals → beholder.NewGRPCClient runs in beforeNode() before NewApplication, so beholder.GetClient() is already populated |
| Double-start | services.Engine's Start() is idempotent — the service framework calling Start() again is a no-op since it's already running |
| Nil safety | When ChipIngressBatchEmitterEnabled = false (default), batchEmitterService remains nil, and both nil checks protect against it |
| Shutdown order | Services in srvcs are stopped in reverse order. The emitter is closed before beholder's own Client.Close() (which closes the gRPC connection), matching the drain-before-disconnect requirement already implemented in PR #1862's reordered Client.Close() |
| Health checks | ChipIngressBatchEmitter implements services.Service via services.Engine, exposing Name(), Ready(), HealthReport() — all needed for /health |
Files summary
| File | Repo | Change |
|---|---|---|
pkg/beholder/client.go |
chainlink-common (PR #1862) |
Add BatchEmitter field to Client, store it in constructor, add GetChipIngressBatchEmitter() getter |
core/services/chainlink/application.go |
chainlink (PR #21327) |
After telemetryManager, retrieve and append batch emitter to srvcs |
Note: The PR #21327 file list may be incomplete (API results limited to 30 files). You can view the full file list here.
|
Squash the pre-default-enable setup commits into one logical change while preserving later commits from d4d8275 onward as separate commits.
Change the default value of ChipIngressBatchEmitterEnabled from false to true so batch emitting is the default behavior for chip-ingress events.
Bump chainlink-common to 843865edbfdc and chipingress to v0.0.11 pre-release, hashicorp/go-plugin to v1.8.0, and add go.uber.org/goleak indirect dependency.
Move beholder Client.Start/Close from application managed services to Shell.beforeNode/afterNode so the client lifecycle is owned by the CLI layer. Set ChipIngressLogger in the beholder Config (required when ChipIngressBatchEmitterEnabled is true) and reorder initialisation so log streaming is wired before Start(), ensuring the batch emitter service sees the fully-configured logger from the start.
Add ExecuteChipIngressBatchingTest and its suite entry Test_CRE_V2_ChipIngress_Batching that starts a CHiP test sink, deploys a cron workflow, and asserts user-log delivery via the batch emitter path.
Extend CronBeholderFailsWithInvalidScheduleTest to also start a CHiP test sink and assert the engine-init-error BaseMessage arrives via the batch path, in addition to the existing Beholder/Kafka assertion.
…ingress-publishBatch # Conflicts: # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.mod # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/go.mod # system-tests/tests/go.sum
Bump chainlink-common and chipingress module references to the infoplat-3436-chipingress-publishBatch branch revisions requested after merging latest develop.
Extract newBeholderClient and setupLogStreaming from beforeNode/initGlobals to use Shell.BeholderClient instead of the global beholder singleton. Update tests: split telemetry enabled/disabled cases, add nil-client AfterNode test, and add SetOtelCore-nil error path coverage.
| } | ||
|
|
||
| // Start the beholder client after log streaming is wired. | ||
| if s.BeholderClient != nil { |
There was a problem hiding this comment.
Should we store a no-op client so that we don't have to nil check it everywhere?
Store beholder.NewNoopClient() in the else branch so downstream code never needs to nil-check BeholderClient after BeforeNode completes. Remove redundant nil guards in Start and setupLogStreaming paths.
…infoplat-3436-chipingress-publishBatch # Conflicts: # core/scripts/go.mod # core/scripts/go.sum # deployment/go.mod # deployment/go.sum # go.mod # go.sum # integration-tests/go.mod # integration-tests/go.sum # integration-tests/load/go.mod # integration-tests/load/go.sum # system-tests/lib/go.mod # system-tests/lib/go.sum # system-tests/tests/go.mod # system-tests/tests/go.sum
…erEnabled - shell_local_test: expect non-nil BeholderClient when telemetry disabled - config_test: set ChipIngressBatchEmitterEnabled=true to match config-full.toml fixture
…ipingress-batching-part-2 - chipingress v0.0.11-0.20260515172105-f60f14be40ad - chainlink-common v0.11.2-0.20260515174137-7ec6e1a68760 - ran make gomodtidy across all modules
PublishBatch already forwards via Publish; the batch-level forwarding caused duplicate upstream calls.
Tests with Telemetry.Enabled=true need a valid Endpoint and InsecureConnection to avoid gRPC dial failures in CI.
|




Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
ChipIngressBatchEmitterEnabledtelemetry config flag and enable it by default for this flowWhy
When CHiP batch emission is enabled, sink and startup lifecycle behavior must stay consistent across local/dev/test runs. This update aligns configuration defaults, startup ownership, and test coverage so batch ingress can run reliably.
Flow (new CHiP batch path)
flowchart TD A[Node boot] --> B[Load telemetry config] B --> C{Batch emitter enabled} C -- No --> D[Use single publish] C -- Yes --> E[Build event batch] E --> F[Call publish batch] F --> G[chip testsink receives batch] G --> H[Handle each event with publish func] H --> I[Forward upstream and assert in tests]Beholder lifecycle
sequenceDiagram participant S as Shell participant C as Config participant B as Beholder participant A as App S->>C: Read telemetry and CHiP settings S->>B: Initialize beholder globals and logger S->>A: Start services A-->>B: Emit CHiP events S->>A: Stop services S->>B: Flush and close lifecycle resourcesImplementation details
ChipIngressBatchEmitterEnabledthrough config interfaces, TOML types, docs, and fixturesgithub.com/smartcontractkit/chainlink-commonandgithub.com/smartcontractkit/chainlink-common/pkg/chipingresstoinfoplat-3436-chipingress-publishBatchbranch revisionsRequires