diff --git a/multinode/config/config.go b/multinode/config/config.go index 4d7d8ca..b218b73 100644 --- a/multinode/config/config.go +++ b/multinode/config/config.go @@ -34,6 +34,9 @@ type MultiNode struct { FinalityDepth *uint32 FinalityTagEnabled *bool FinalizedBlockOffset *uint32 + + // Finalized State Availability Check + FinalizedStateCheckFailureThreshold *uint32 } func (c *MultiNodeConfig) Enabled() bool { @@ -94,6 +97,10 @@ func (c *MultiNodeConfig) FinalityTagEnabled() bool { return *c.MultiNode.Finali func (c *MultiNodeConfig) FinalizedBlockOffset() uint32 { return *c.MultiNode.FinalizedBlockOffset } +func (c *MultiNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { + return *c.MultiNode.FinalizedStateCheckFailureThreshold +} + func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.Enabled != nil { c.MultiNode.Enabled = f.MultiNode.Enabled @@ -150,4 +157,9 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.FinalizedBlockOffset != nil { c.MultiNode.FinalizedBlockOffset = f.MultiNode.FinalizedBlockOffset } + + // Finalized State Availability Check + if f.MultiNode.FinalizedStateCheckFailureThreshold != nil { + c.MultiNode.FinalizedStateCheckFailureThreshold = f.MultiNode.FinalizedStateCheckFailureThreshold + } } diff --git a/multinode/go.mod b/multinode/go.mod index c18efc1..902ee85 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -7,7 +7,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.2 github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 ) diff --git a/multinode/go.sum b/multinode/go.sum index 459485f..5c9709e 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 h1:GK+2aFpW/Z5ZnMGCa9NU6o7LKHQ/9xJVZx2yMAMudnc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= diff --git a/multinode/mock_node_metrics_test.go b/multinode/mock_node_metrics_test.go index 261d7cf..8f5829a 100644 --- a/multinode/mock_node_metrics_test.go +++ b/multinode/mock_node_metrics_test.go @@ -21,6 +21,40 @@ func (_m *mockNodeMetrics) EXPECT() *mockNodeMetrics_Expecter { return &mockNodeMetrics_Expecter{mock: &_m.Mock} } +// IncrementFinalizedStateFailed provides a mock function with given fields: ctx, nodeName +func (_m *mockNodeMetrics) IncrementFinalizedStateFailed(ctx context.Context, nodeName string) { + _m.Called(ctx, nodeName) +} + +// mockNodeMetrics_IncrementFinalizedStateFailed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementFinalizedStateFailed' +type mockNodeMetrics_IncrementFinalizedStateFailed_Call struct { + *mock.Call +} + +// IncrementFinalizedStateFailed is a helper method to define mock.On call +// - ctx context.Context +// - nodeName string +func (_e *mockNodeMetrics_Expecter) IncrementFinalizedStateFailed(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + return &mockNodeMetrics_IncrementFinalizedStateFailed_Call{Call: _e.mock.On("IncrementFinalizedStateFailed", ctx, nodeName)} +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Return() *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Call.Return() + return _c +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Run(run) + return _c +} + // IncrementNodeTransitionsToAlive provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToAlive(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) @@ -55,6 +89,40 @@ func (_c *mockNodeMetrics_IncrementNodeTransitionsToAlive_Call) RunAndReturn(run return _c } +// IncrementNodeTransitionsToFinalizedStateNotAvailable provides a mock function with given fields: ctx, nodeName +func (_m *mockNodeMetrics) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) { + _m.Called(ctx, nodeName) +} + +// mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementNodeTransitionsToFinalizedStateNotAvailable' +type mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call struct { + *mock.Call +} + +// IncrementNodeTransitionsToFinalizedStateNotAvailable is a helper method to define mock.On call +// - ctx context.Context +// - nodeName string +func (_e *mockNodeMetrics_Expecter) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + return &mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call{Call: _e.mock.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", ctx, nodeName)} +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Return() *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Call.Return() + return _c +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Run(run) + return _c +} + // IncrementNodeTransitionsToInSync provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToInSync(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index a90063e..4274e5b 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -79,6 +79,52 @@ func (_c *mockRPCClient_ChainID_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(cont return _c } +// CheckFinalizedStateAvailability provides a mock function with given fields: ctx +func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for CheckFinalizedStateAvailability") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockRPCClient_CheckFinalizedStateAvailability_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckFinalizedStateAvailability' +type mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID ID, HEAD Head] struct { + *mock.Call +} + +// CheckFinalizedStateAvailability is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx interface{}) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx)} +} + +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Return(_a0 error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + // ClientVersion provides a mock function with given fields: _a0 func (_m *mockRPCClient[CHAIN_ID, HEAD]) ClientVersion(_a0 context.Context) (string, error) { ret := _m.Called(_a0) @@ -324,6 +370,52 @@ func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(co return _c } +// PollHealthCheck provides a mock function with given fields: ctx +func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for PollHealthCheck") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockRPCClient_PollHealthCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PollHealthCheck' +type mockRPCClient_PollHealthCheck_Call[CHAIN_ID ID, HEAD Head] struct { + *mock.Call +} + +// PollHealthCheck is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) PollHealthCheck(ctx interface{}) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("PollHealthCheck", ctx)} +} + +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) Return(_a0 error) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) error) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + // SubscribeToFinalizedHeads provides a mock function with given fields: ctx func (_m *mockRPCClient[CHAIN_ID, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { ret := _m.Called(ctx) diff --git a/multinode/node.go b/multinode/node.go index 6729459..9611829 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -27,6 +27,7 @@ type NodeConfig interface { DeathDeclarationDelay() time.Duration NewHeadsPollInterval() time.Duration VerifyChainID() bool + FinalizedStateCheckFailureThreshold() uint32 } type ChainConfig interface { @@ -48,6 +49,7 @@ type nodeMetrics interface { IncrementNodeTransitionsToInvalidChainID(ctx context.Context, nodeName string) IncrementNodeTransitionsToUnusable(ctx context.Context, nodeName string) IncrementNodeTransitionsToSyncing(ctx context.Context, nodeName string) + IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) RecordNodeClientVersion(ctx context.Context, nodeName string, version string) SetHighestSeenBlock(ctx context.Context, nodeName string, blockNumber int64) SetHighestFinalizedBlock(ctx context.Context, nodeName string, blockNumber int64) @@ -55,6 +57,7 @@ type nodeMetrics interface { IncrementPolls(ctx context.Context, nodeName string) IncrementPollsFailed(ctx context.Context, nodeName string) IncrementPollsSuccess(ctx context.Context, nodeName string) + IncrementFinalizedStateFailed(ctx context.Context, nodeName string) } type Node[ @@ -273,7 +276,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg // The node is already closed, and any subsequent transition is invalid. // To make spotting such transitions a bit easier, return the invalid node state. return nodeStateLen - case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: default: panic(fmt.Sprintf("cannot verify node in state %v", st)) } diff --git a/multinode/node_fsm.go b/multinode/node_fsm.go index 818363e..52c31c0 100644 --- a/multinode/node_fsm.go +++ b/multinode/node_fsm.go @@ -35,6 +35,8 @@ func (n nodeState) String() string { return "Syncing" case nodeStateFinalizedBlockOutOfSync: return "FinalizedBlockOutOfSync" + case nodeStateFinalizedStateNotAvailable: + return "FinalizedStateNotAvailable" default: return fmt.Sprintf("nodeState(%d)", n) } @@ -72,6 +74,8 @@ const ( nodeStateSyncing // nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block nodeStateFinalizedBlockOutOfSync + // nodeStateFinalizedStateNotAvailable - node cannot serve historical state at finalized block + nodeStateFinalizedStateNotAvailable // nodeStateLen tracks the number of states nodeStateLen ) @@ -182,7 +186,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: n.state = nodeStateAlive default: panic(transitionFail(n.state, nodeStateAlive)) @@ -288,6 +292,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) { n.declareSyncing() case nodeStateAlive: n.declareAlive() + case nodeStateFinalizedStateNotAvailable: + n.declareFinalizedStateNotAvailable() default: panic(fmt.Sprintf("%#v state declaration is not implemented", state)) } @@ -351,6 +357,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { fn() } +func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() { + n.transitionToFinalizedStateNotAvailable(func() { + n.lfcLog.Errorw("RPC Node cannot serve finalized state", "nodeState", n.state) + n.wg.Add(1) + go n.finalizedStateNotAvailableLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) { + ctx, cancel := n.stopCh.NewCtx() + defer cancel() + n.metrics.IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx, n.name) + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == nodeStateClosed { + return + } + switch n.state { + case nodeStateAlive: + n.rpc.Close() + n.state = nodeStateFinalizedStateNotAvailable + default: + panic(transitionFail(n.state, nodeStateFinalizedStateNotAvailable)) + } + fn() +} + func transitionFail(from nodeState, to nodeState) string { return fmt.Sprintf("cannot transition from %#v to %#v", from, to) } diff --git a/multinode/node_fsm_test.go b/multinode/node_fsm_test.go index 17d312c..6701a23 100644 --- a/multinode/node_fsm_test.go +++ b/multinode/node_fsm_test.go @@ -36,7 +36,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) { t.Run("transitionToAlive", func(t *testing.T) { const destinationState = nodeStateAlive - allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing} + allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} rpc := newMockRPCClient[ID, Head](t) testTransition(t, rpc, testNode.transitionToAlive, destinationState, allowedStates...) }) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index e2974c0..025a0eb 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -2,6 +2,7 @@ package multinode import ( "context" + "errors" "fmt" "math" "math/big" @@ -102,6 +103,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 + // Finalized state availability check config + finalizedStateCheckFailureThreshold := n.nodePoolCfg.FinalizedStateCheckFailureThreshold() + var finalizedStateFailures uint32 + for { select { case <-ctx.Done(): @@ -111,6 +116,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { lggr.Tracew("Pinging RPC", "nodeState", n.State(), "pollFailures", pollFailures) pollCtx, cancel := context.WithTimeout(ctx, pollInterval) version, pingErr := n.RPC().ClientVersion(pollCtx) + if pingErr == nil { + if healthErr := n.RPC().PollHealthCheck(pollCtx); healthErr != nil { + pingErr = fmt.Errorf("poll health check failed: %w", healthErr) + } + } cancel() if pingErr != nil { // prevent overflow @@ -145,6 +155,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareOutOfSync(syncStatusNotInSyncWithPool) return } + // Separate finalized state availability check + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, pollInterval) + stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx) + stateCheckCancel() + if stateErr != nil { + if errors.Is(stateErr, ErrFinalizedStateUnavailable) { + if finalizedStateFailures < math.MaxUint32 { + n.metrics.IncrementFinalizedStateFailed(ctx, n.name) + finalizedStateFailures++ + } + lggr.Warnw("Finalized state not available", "err", stateErr, "failures", finalizedStateFailures, "threshold", finalizedStateCheckFailureThreshold) + if finalizedStateFailures >= finalizedStateCheckFailureThreshold { + lggr.Errorw("RPC node cannot serve finalized state after consecutive failures", "failures", finalizedStateFailures) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint cannot serve finalized state; %s %s", msgCannotDisable, msgDegradedState) + continue + } + } + n.declareFinalizedStateNotAvailable() + return + } + } else { + // Treat as RPC reachability error + if pollFailures < math.MaxUint32 { + n.metrics.IncrementPollsFailed(ctx, n.name) + pollFailures++ + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr, "pollFailures", pollFailures) + } + } else { + finalizedStateFailures = 0 + } case bh, open := <-headsSub.Heads: if !open { lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) @@ -679,3 +722,60 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { } } } + +func (n *node[CHAIN_ID, HEAD, RPC]) finalizedStateNotAvailableLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + state := n.getCachedState() + switch state { + case nodeStateFinalizedStateNotAvailable: + case nodeStateClosed: + return + default: + panic(fmt.Sprintf("finalizedStateNotAvailableLoop can only run for node in FinalizedStateNotAvailable state, got: %s", state)) + } + } + + unavailableAt := time.Now() + + lggr := logger.Sugared(logger.Named(n.lfcLog, "FinalizedStateNotAvailable")) + lggr.Debugw("Trying to revive RPC node with unavailable finalized state", "nodeState", n.getCachedState()) + + dialRetryBackoff := NewRedialBackoff() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(dialRetryBackoff.Duration()): + lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.getCachedState()) + + state := n.createVerifiedConn(ctx, lggr) + if state != nodeStateAlive { + n.setState(nodeStateFinalizedStateNotAvailable) + continue + } + + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, n.nodePoolCfg.PollInterval()) + stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx) + stateCheckCancel() + if stateErr != nil { + if errors.Is(stateErr, ErrFinalizedStateUnavailable) { + lggr.Warnw("Finalized state still not available", "err", stateErr) + n.setState(nodeStateFinalizedStateNotAvailable) + continue + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr) + n.setState(nodeStateFinalizedStateNotAvailable) + continue + } + + lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Finalized state was unavailable for %s", n.String(), time.Since(unavailableAt)), "nodeState", n.getCachedState()) + n.declareState(nodeStateAlive) + return + } + } +} diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 684d0c7..b2943f2 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -1,6 +1,7 @@ package multinode import ( + "context" "errors" "fmt" "math/big" @@ -9,6 +10,7 @@ import ( "sync" "sync/atomic" "testing" + "time" prom "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -147,6 +149,10 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }).Once() // redundant call to stay in alive state rpc.On("ClientVersion", mock.Anything).Return("", nil) + // PollHealthCheck is called after successful ClientVersion - return nil to pass + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() + // CheckFinalizedStateAvailability is called after successful polling + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertLogCountEventually(t, observedLogs, "Ping successful", 2) @@ -170,12 +176,39 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + // CheckFinalizedStateAvailability may be called + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertEventually(t, func() bool { return nodeStateUnreachable == node.State() }) }) + t.Run("optional poll health check failure counts as poll failure and transitions to unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollFailureThreshold: 1, + pollInterval: tests.TestInterval, + }, + rpc: rpc, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("ClientVersion", mock.Anything).Return("mock-version", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(errors.New("health check failed")) + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + + node.declareAlive() + tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), 1) + tests.AssertEventually(t, func() bool { + return nodeStateUnreachable == node.State() + }) + }) t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) @@ -198,6 +231,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) + // CheckFinalizedStateAvailability may be called + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) assert.Equal(t, nodeStateAlive, node.State()) @@ -225,6 +260,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) + // CheckFinalizedStateAvailability may be called + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) tests.AssertEventually(t, func() bool { @@ -247,6 +284,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -282,6 +320,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -310,6 +349,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() poolInfo := newMockPoolChainInfoProvider(t) @@ -344,6 +384,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() + // CheckFinalizedStateAvailability is called after successful polling + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) node.declareAlive() @@ -2146,3 +2189,135 @@ func TestNode_State(t *testing.T) { }) } } + +func TestUnit_NodeLifecycle_finalizedStateNotAvailableLoop(t *testing.T) { + t.Parallel() + + newFinalizedStateNotAvailableNode := func(t *testing.T, opts testNodeOpts) testNode { + node := newTestNode(t, opts) + opts.rpc.On("Close").Return(nil) + node.setState(nodeStateFinalizedStateNotAvailable) + return node + } + + t.Run("returns on closed", func(t *testing.T) { + t.Parallel() + node := newTestNode(t, testNodeOpts{}) + node.setState(nodeStateClosed) + node.wg.Add(1) + node.finalizedStateNotAvailableLoop() + }) + + t.Run("on failed dial keeps trying", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")) + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertLogCountEventually(t, observedLogs, "Node is unreachable", 2) + }) + + t.Run("on finalized state still unavailable keeps trying", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable)) + + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertLogCountEventually(t, observedLogs, "Finalized state still not available", 2) + }) + + t.Run("on successful verification and state check becomes alive", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil) + + setupRPCForAliveLoop(t, rpc) + + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) + + t.Run("transitions from alive to finalizedStateNotAvailable and back", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + node := newTestNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + config: testNodeConfig{ + pollInterval: 10 * time.Millisecond, + finalizedStateCheckFailureThreshold: 2, + }, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Close").Return(nil) + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + + sub := newMockSubscription(t) + sub.On("Err").Return(nil).Maybe() + sub.On("Unsubscribe").Maybe() + headsCh := make(chan Head) + rpc.On("SubscribeToHeads", mock.Anything).Return((<-chan Head)(headsCh), sub, nil).Maybe() + rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Maybe() + rpc.On("SetAliveLoopSub", mock.Anything).Maybe() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Maybe() + rpc.On("ClientVersion", mock.Anything).Return("test-version", nil).Maybe() + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() + + var stateCheckCallCount int32 + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(func(ctx context.Context) error { + count := atomic.AddInt32(&stateCheckCallCount, 1) + if count <= 2 { + return fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable) + } + return nil + }).Maybe() + + node.setState(nodeStateAlive) + node.wg.Add(1) + go node.aliveLoop() + + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateFinalizedStateNotAvailable + }) + + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) +} diff --git a/multinode/node_test.go b/multinode/node_test.go index e3c8d71..440209c 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -15,15 +15,16 @@ import ( ) type testNodeConfig struct { - pollFailureThreshold uint32 - pollInterval time.Duration - selectionMode string - syncThreshold uint32 - nodeIsSyncingEnabled bool - enforceRepeatableRead bool - finalizedBlockPollInterval time.Duration - deathDeclarationDelay time.Duration - newHeadsPollInterval time.Duration + pollFailureThreshold uint32 + pollInterval time.Duration + selectionMode string + syncThreshold uint32 + nodeIsSyncingEnabled bool + enforceRepeatableRead bool + finalizedBlockPollInterval time.Duration + deathDeclarationDelay time.Duration + newHeadsPollInterval time.Duration + finalizedStateCheckFailureThreshold uint32 } func (n testNodeConfig) NewHeadsPollInterval() time.Duration { @@ -66,6 +67,10 @@ func (n testNodeConfig) VerifyChainID() bool { return true } +func (n testNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { + return n.finalizedStateCheckFailureThreshold +} + type testNode struct { *node[ID, Head, RPCClient[ID, Head]] } @@ -129,12 +134,14 @@ func makeMockNodeMetrics(t *testing.T) *mockNodeMetrics { mockMetrics.On("IncrementNodeTransitionsToInvalidChainID", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToUnusable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToSyncing", mock.Anything, mock.Anything).Maybe() + mockMetrics.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestSeenBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestFinalizedBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementSeenBlocks", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPolls", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsSuccess", mock.Anything, mock.Anything).Maybe() + mockMetrics.On("IncrementFinalizedStateFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("RecordNodeClientVersion", mock.Anything, mock.Anything, mock.Anything).Maybe() return mockMetrics } diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index b4a886c..c900483 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -296,3 +296,17 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse defer m.chainInfoLock.RUnlock() return m.latestChainInfo, m.highestUserObservations } + +// PollHealthCheck provides a default no-op implementation for the RPCClient interface. +// Chain-specific RPC clients can override this method to perform additional health checks +// during polling (e.g., verifying historical state availability). +func (m *RPCClientBase[HEAD]) PollHealthCheck(ctx context.Context) error { + return nil +} + +// CheckFinalizedStateAvailability provides a default no-op implementation for the RPCClient interface. +// Chain-specific RPC clients can override this method to verify that the RPC can serve +// historical state at the finalized block (e.g., by calling eth_getBalance at the finalized block). +func (m *RPCClientBase[HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { + return nil +} diff --git a/multinode/types.go b/multinode/types.go index b31c6ca..beebd22 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -2,10 +2,15 @@ package multinode import ( "context" + "errors" "fmt" "math/big" ) +// ErrFinalizedStateUnavailable is returned by CheckFinalizedStateAvailability when the RPC +// cannot serve historical state at the finalized block (e.g., pruned/non-archive node). +var ErrFinalizedStateUnavailable = errors.New("finalized state unavailable") + // ID represents the base type, for any chain's ID. // It should be convertible to a string, that can uniquely identify this chain type ID fmt.Stringer @@ -77,6 +82,15 @@ type RPCClient[ // Ensure implementation does not have a race condition when values are reset before request completion and as // a result latest ChainInfo contains information from the previous cycle. GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) + // PollHealthCheck - performs an optional additional health check during polling. + // Implementations can use this for chain-specific health verification (e.g., historical state availability). + // Return nil if the check passes or is not applicable, or an error if the check fails. + PollHealthCheck(ctx context.Context) error + // CheckFinalizedStateAvailability - verifies if the RPC can serve historical state at the finalized block. + // This is used to detect non-archive nodes that cannot serve state queries for older blocks. + // Returns ErrFinalizedStateUnavailable if the RPC cannot serve historical state. + // Returns nil if the check passes or is not applicable, or another error for RPC issues. + CheckFinalizedStateAvailability(ctx context.Context) error } // Head is the interface required by the NodeClient