Skip to content

feat: add ChipIngress batch emitter support#21327

Open
thomaska wants to merge 22 commits into
developfrom
infoplat-3436-chipingress-publishBatch
Open

feat: add ChipIngress batch emitter support#21327
thomaska wants to merge 22 commits into
developfrom
infoplat-3436-chipingress-publishBatch

Conversation

@thomaska
Copy link
Copy Markdown

@thomaska thomaska commented Feb 27, 2026

Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436

Summary

  • Add ChipIngressBatchEmitterEnabled telemetry config flag and enable it by default for this flow
  • Move Beholder lifecycle ownership into Shell startup/shutdown so logger wiring and cleanup are deterministic
  • Add CHiP ingress batching test coverage (smoke + regression) and update module refs to branch heads

Why

When CHiP batch emission is enabled, sink and startup lifecycle behavior must stay consistent across local/dev/test runs. This update aligns configuration defaults, startup ownership, and test coverage so batch ingress can run reliably.

Flow (new CHiP batch path)

flowchart TD
    A[Node boot] --> B[Load telemetry config]
    B --> C{Batch emitter enabled}
    C -- No --> D[Use single publish]
    C -- Yes --> E[Build event batch]
    E --> F[Call publish batch]
    F --> G[chip testsink receives batch]
    G --> H[Handle each event with publish func]
    H --> I[Forward upstream and assert in tests]
Loading

Beholder lifecycle

sequenceDiagram
    participant S as Shell
    participant C as Config
    participant B as Beholder
    participant A as App

    S->>C: Read telemetry and CHiP settings
    S->>B: Initialize beholder globals and logger
    S->>A: Start services
    A-->>B: Emit CHiP events
    S->>A: Stop services
    S->>B: Flush and close lifecycle resources
Loading

Implementation details

  • Telemetry config
    • Added and wired ChipIngressBatchEmitterEnabled through config interfaces, TOML types, docs, and fixtures
  • Lifecycle ownership
    • Beholder lifecycle handling moved to Shell so setup and teardown are explicit and ordered
  • Tests
    • Added CHiP ingress batching smoke test
    • Added cron invalid-schedule regression assertion for CHiP sink behavior
  • Dependencies
    • Updated github.com/smartcontractkit/chainlink-common and github.com/smartcontractkit/chainlink-common/pkg/chipingress to infoplat-3436-chipingress-publishBatch branch revisions

Requires

Copilot AI review requested due to automatic review settings February 27, 2026 14:43
@thomaska thomaska requested review from a team as code owners February 27, 2026 14:43
@github-actions
Copy link
Copy Markdown
Contributor

👋 thomaska, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 27, 2026

✅ No conflicts with other open PRs targeting develop

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds PublishBatch support to the chip-testsink gRPC server so CRE/system tests don’t fail with UNIMPLEMENTED when nodes emit batched ChIP ingress events.

Changes:

  • Implement PublishBatch on the chip-testsink ChipIngressServer.
  • Delegate batch handling to the existing Publish flow (including configured PublishFunc and optional upstream forwarding).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

for _, event := range batch.Events {
if _, err := s.Publish(ctx, event); err != nil {
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling s.Publish() inside the batch loop triggers the per-event async upstream forwarding goroutine in Publish(). For large batches this can create a burst of goroutines and N upstream RPCs. Consider handling upstream forwarding in PublishBatch with a single PublishBatch call (or at least a bounded worker/pool), and calling the configured PublishFunc directly for local handling to avoid unbounded goroutine/RPC fan-out per batch.

Suggested change
if _, err := s.Publish(ctx, event); err != nil {
// Forward upstream synchronously to avoid spawning a goroutine per event.
if s.cfg.UpstreamEndpoint != "" {
forwardCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
_, err := s.upstream.Publish(forwardCtx, event)
cancelFn()
if err != nil {
log.Printf("failed to forward to upstream: %v", err)
}
}
if _, err := s.cfg.PublishFunc(ctx, event); err != nil {

Copilot uses AI. Check for mistakes.
Comment on lines +124 to +125
// It delegates each event in the batch to the configured PublishFunc,
// mirroring how the real ChIP Ingress processes batches atomically.
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment claims this mirrors how the real ChIP ingress processes batches "atomically", but this implementation is not atomic: it publishes events one-by-one and can return an error after earlier events have already been accepted/forwarded. Please either adjust the comment to reflect best-effort sequential processing, or change the implementation to provide the atomicity guarantees being documented.

Suggested change
// It delegates each event in the batch to the configured PublishFunc,
// mirroring how the real ChIP Ingress processes batches atomically.
// It delegates each event in the batch to the configured PublishFunc
// sequentially, returning an error on the first failure. Earlier events
// in the batch may already have been published or forwarded when an error
// is returned, so processing is best-effort rather than atomic.

Copilot uses AI. Check for mistakes.
@trunk-io
Copy link
Copy Markdown

trunk-io Bot commented Feb 27, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
TestFluxMonitor_Deviation The test failed despite being marked as a known flaky test and quarantine settings not being properly configured to skip it. Logs ↗︎

View Full Report ↗︎Docs

jmank88
jmank88 previously approved these changes Feb 27, 2026
@pkcll pkcll added the build-publish Build and Publish image to SDLC label Feb 27, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Mar 2, 2026

I see you updated files related to core. Please run make gocs in the root directory to add a changeset as well as in the text include at least one of the following tags:

  • #added For any new functionality added.
  • #breaking_change For any functionality that requires manual action for the node to boot.
  • #bugfix For bug fixes.
  • #changed For any change to the existing functionality.
  • #db_update For any feature that introduces updates to database schema.
  • #deprecation_notice For any upcoming deprecation functionality.
  • #internal For changesets that need to be excluded from the final changelog.
  • #nops For any feature that is NOP facing and needs to be in the official Release Notes for the release.
  • #removed For any functionality/config that is removed.
  • #updated For any functionality that is updated.
  • #wip For any change that is not ready yet and external communication about it should be held off till it is feature complete.

@thomaska thomaska requested a review from a team as a code owner March 2, 2026 14:02
@thomaska thomaska changed the title Amend chip-testsink feat: add ChipIngress batch emitter support Mar 2, 2026
@pkcll
Copy link
Copy Markdown
Collaborator

pkcll commented Mar 17, 2026

Based on my thorough review of both PRs, here is a concrete implementation plan for registering ChipIngressBatchEmitter as a managed service in NewApplication.


Implementation Plan

Problem

The ChipIngressBatchEmitter is created and started inside beholder.NewGRPCClient() (called from initGlobals in shell.go), but it's not registered in the chainlink application's service list. This means:

  • No health check visibility (/health)
  • No ordered shutdown participation
  • No service lifecycle management

Step 1: Expose the batch emitter from beholder.Client (chainlink-common PR #1862)

Currently in pkg/beholder/client.go, the batchEmitterService is a local variable inside NewGRPCClient:

var batchEmitterService *ChipIngressBatchEmitter

Changes needed:

A. Add a field to the Client struct:

type Client struct {
    // ... existing fields ...
    BatchEmitter *ChipIngressBatchEmitter // nil when batch mode is disabled
}

B. Store the emitter when constructing the client (line ~261):

Change the return statement to include the batch emitter:

return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose, batchEmitterService}, nil

C. Add a public getter:

// GetChipIngressBatchEmitter returns the batch emitter service, or nil if batch mode is disabled.
func (c *Client) GetChipIngressBatchEmitter() *ChipIngressBatchEmitter {
    if c == nil {
        return nil
    }
    return c.BatchEmitter
}

Step 2: Register the batch emitter in NewApplication (chainlink PR #21327)

In core/services/chainlink/application.go, after telemetryManager is appended to srvcs (around line 447–448), add:

telemetryManager := telemetry.NewManager(cfg.TelemetryIngress(), csaKeystore, globalLogger)
srvcs = append(srvcs, telemetryManager)

// Register ChipIngressBatchEmitter for health checks and ordered shutdown.
// The emitter is already started by beholder.NewGRPCClient during initGlobals;
// appending it here gives us health visibility and ensures Close() runs on shutdown.
if beholderClient := beholder.GetClient(); beholderClient != nil {
    if batchEmitter := beholderClient.GetChipIngressBatchEmitter(); batchEmitter != nil {
        srvcs = append(srvcs, batchEmitter)
    }
}

Step 3: No changes to initGlobals in shell.go

The current PR #21327 changes to shell.go are already correct — they pass ChipIngressBatchEmitterEnabled and ChipIngressLogger through to beholder.Config. Since beholder creates/starts the emitter internally and we just retrieve a reference afterward, initGlobals doesn't need any additional changes.


Why this works

Concern How it's addressed
Initialization order initGlobalsbeholder.NewGRPCClient runs in beforeNode() before NewApplication, so beholder.GetClient() is already populated
Double-start services.Engine's Start() is idempotent — the service framework calling Start() again is a no-op since it's already running
Nil safety When ChipIngressBatchEmitterEnabled = false (default), batchEmitterService remains nil, and both nil checks protect against it
Shutdown order Services in srvcs are stopped in reverse order. The emitter is closed before beholder's own Client.Close() (which closes the gRPC connection), matching the drain-before-disconnect requirement already implemented in PR #1862's reordered Client.Close()
Health checks ChipIngressBatchEmitter implements services.Service via services.Engine, exposing Name(), Ready(), HealthReport() — all needed for /health

Files summary

File Repo Change
pkg/beholder/client.go chainlink-common (PR #1862) Add BatchEmitter field to Client, store it in constructor, add GetChipIngressBatchEmitter() getter
core/services/chainlink/application.go chainlink (PR #21327) After telemetryManager, retrieve and append batch emitter to srvcs

Note: The PR #21327 file list may be incomplete (API results limited to 30 files). You can view the full file list here.

@pkcll pkcll marked this pull request as draft March 25, 2026 05:02
Comment thread .github/workflows/go-mod-validation.yml Fixed
@cl-sonarqube-production
Copy link
Copy Markdown

pkcll added 8 commits May 11, 2026 15:29
Squash the pre-default-enable setup commits into one logical change while preserving later commits from d4d8275 onward as separate commits.
Change the default value of ChipIngressBatchEmitterEnabled from false
to true so batch emitting is the default behavior for chip-ingress
events.
Bump chainlink-common to 843865edbfdc and chipingress to v0.0.11
pre-release, hashicorp/go-plugin to v1.8.0, and add go.uber.org/goleak
indirect dependency.
Move beholder Client.Start/Close from application managed services to
Shell.beforeNode/afterNode so the client lifecycle is owned by the CLI
layer.  Set ChipIngressLogger in the beholder Config (required when
ChipIngressBatchEmitterEnabled is true) and reorder initialisation so
log streaming is wired before Start(), ensuring the batch emitter
service sees the fully-configured logger from the start.
Add ExecuteChipIngressBatchingTest and its suite entry
Test_CRE_V2_ChipIngress_Batching that starts a CHiP test sink,
deploys a cron workflow, and asserts user-log delivery via the
batch emitter path.
Extend CronBeholderFailsWithInvalidScheduleTest to also start a CHiP
test sink and assert the engine-init-error BaseMessage arrives via the
batch path, in addition to the existing Beholder/Kafka assertion.
…ingress-publishBatch

# Conflicts:
#	core/scripts/go.mod
#	core/scripts/go.sum
#	deployment/go.mod
#	deployment/go.sum
#	go.mod
#	go.sum
#	integration-tests/go.mod
#	integration-tests/go.sum
#	integration-tests/load/go.mod
#	integration-tests/load/go.sum
#	system-tests/lib/go.mod
#	system-tests/lib/go.sum
#	system-tests/tests/go.mod
#	system-tests/tests/go.sum
Bump chainlink-common and chipingress module references to the infoplat-3436-chipingress-publishBatch branch revisions requested after merging latest develop.
pkcll added 3 commits May 12, 2026 16:41
Extract newBeholderClient and setupLogStreaming from beforeNode/initGlobals
to use Shell.BeholderClient instead of the global beholder singleton.
Update tests: split telemetry enabled/disabled cases, add nil-client
AfterNode test, and add SetOtelCore-nil error path coverage.
@pkcll pkcll marked this pull request as ready for review May 15, 2026 05:43
@pkcll pkcll requested a review from a team as a code owner May 15, 2026 05:43
Comment thread core/cmd/shell_local.go Outdated
}

// Start the beholder client after log streaming is wired.
if s.BeholderClient != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we store a no-op client so that we don't have to nil check it everywhere?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed 29a66b5

pkcll added 7 commits May 15, 2026 09:28
Store beholder.NewNoopClient() in the else branch so downstream code
never needs to nil-check BeholderClient after BeforeNode completes.
Remove redundant nil guards in Start and setupLogStreaming paths.
…infoplat-3436-chipingress-publishBatch

# Conflicts:
#	core/scripts/go.mod
#	core/scripts/go.sum
#	deployment/go.mod
#	deployment/go.sum
#	go.mod
#	go.sum
#	integration-tests/go.mod
#	integration-tests/go.sum
#	integration-tests/load/go.mod
#	integration-tests/load/go.sum
#	system-tests/lib/go.mod
#	system-tests/lib/go.sum
#	system-tests/tests/go.mod
#	system-tests/tests/go.sum
…erEnabled

- shell_local_test: expect non-nil BeholderClient when telemetry disabled
- config_test: set ChipIngressBatchEmitterEnabled=true to match config-full.toml fixture
…ipingress-batching-part-2

- chipingress v0.0.11-0.20260515172105-f60f14be40ad
- chainlink-common v0.11.2-0.20260515174137-7ec6e1a68760
- ran make gomodtidy across all modules
PublishBatch already forwards via Publish; the batch-level forwarding
caused duplicate upstream calls.
Tests with Telemetry.Enabled=true need a valid Endpoint and
InsecureConnection to avoid gRPC dial failures in CI.
4of9
4of9 previously approved these changes May 15, 2026
@cl-sonarqube-production
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build-publish Build and Publish image to SDLC

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants