diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index cfa55662b..aaebb7a45 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() { r.mtx.Unlock() r.wg.Wait() + r.raftNode.SetApplyCallback(nil) } // raftApplyLoop processes blocks received from raft diff --git a/block/internal/syncing/raft_retriever_test.go b/block/internal/syncing/raft_retriever_test.go new file mode 100644 index 000000000..ec176aad2 --- /dev/null +++ b/block/internal/syncing/raft_retriever_test.go @@ -0,0 +1,61 @@ +package syncing + +import ( + "context" + "sync" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/genesis" + pkgraft "github.com/evstack/ev-node/pkg/raft" +) + +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- pkgraft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } + +func (s *stubRaftNode) HasQuorum() bool { return false } + +func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil } + +func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil } + +func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + +func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) { + t.Parallel() + + raftNode := &stubRaftNode{} + retriever := newRaftRetriever( + raftNode, + genesis.Genesis{}, + zerolog.Nop(), + nil, + func(context.Context, *pkgraft.RaftBlockState) error { return nil }, + ) + + require.NoError(t, retriever.Start(t.Context())) + retriever.Stop() + + callbacks := raftNode.recordedCallbacks() + require.Len(t, callbacks, 2) + require.NotNil(t, callbacks[0]) + require.Nil(t, callbacks[1]) +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243..dbb0e9f8a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -1234,7 +1234,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } if currentState.LastBlockHeight > raftState.Height { - return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1) + // Local EVM is ahead of the raft snapshot. This is expected on restart when + // the raft FSM hasn't finished replaying log entries yet (stale snapshot height), + // or when log entries were compacted and the FSM is awaiting a new snapshot from + // the leader. Verify that our local block at raftState.Height has the same hash + // to confirm shared history before skipping recovery. + localHeader, err := s.store.GetHeader(ctx, raftState.Height) + if err != nil { + return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w", + currentState.LastBlockHeight, raftState.Height, err) + } + localHash := localHeader.Hash() + if !bytes.Equal(localHash, raftState.Hash) { + return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x", + raftState.Height, localHash, raftState.Hash) + } + s.logger.Info(). + Uint64("local_height", currentState.LastBlockHeight). + Uint64("raft_height", raftState.Height). + Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up") + return nil } return nil diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35f..66ac7e9e0 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -422,6 +422,118 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing require.ErrorContains(t, err, "invalid chain ID") } +// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node +// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to +// timing or log compaction), RecoverFromRaft should skip recovery if the local +// block at raftState.Height has a matching hash, rather than crashing. +func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM). + data1 := makeData(gen.ChainID, 1, 0) + headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil) + dataBz1, err := data1.MarshalBinary() + require.NoError(t, err) + + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastHeaderHash: hdr1.Hash(), + })) + require.NoError(t, batch.Commit()) + + // Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no + // block 0 to check, so use height 1 EVM vs stale snapshot at height 0. + // More realistic: EVM at height 2, raft snapshot at height 1. + // Build a second block and advance the store state to height 2. + data2 := makeData(gen.ChainID, 2, 0) + headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash()) + dataBz2, err := data2.MarshalBinary() + require.NoError(t, err) + + batch2, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature)) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + })) + require.NoError(t, batch2.Commit()) + + // Set lastState to height 2 (EVM is at 2). + s.SetLastState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + }) + + t.Run("matching hash skips recovery", func(t *testing.T) { + // raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr1.Hash(), + Header: headerBz1, + Data: dataBz1, + }) + require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error") + }) + + t.Run("diverged hash returns error", func(t *testing.T) { + wrongHash := make([]byte, len(hdr1.Hash())) + copy(wrongHash, hdr1.Hash()) + wrongHash[0] ^= 0xFF // flip a byte to produce a different hash + + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: wrongHash, + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "diverged from raft") + }) +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index f156ae478..157b43736 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type Config struct { type FSM struct { logger zerolog.Logger state *atomic.Pointer[RaftBlockState] + applyMu sync.RWMutex applyCh chan<- RaftApplyMsg } @@ -146,18 +148,26 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n == nil { return nil } - timeoutTicker := time.NewTicker(timeout) - defer timeoutTicker.Stop() - ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2) + // Use a one-shot timer for the deadline to avoid the race where a repeating + // ticker and the timeout ticker fire simultaneously in select, causing a + // spurious timeout even when AppliedIndex >= CommitIndex. + deadline := time.NewTimer(timeout) + defer deadline.Stop() + pollInterval := min(50*time.Millisecond, timeout/4) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-ticker.C: - if n.raft.AppliedIndex() >= n.raft.LastIndex() { + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { + return nil + } + case <-deadline.C: + // Final check after deadline before giving up. + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } - case <-timeoutTicker.C: return errors.New("max wait time reached") } } @@ -297,6 +307,8 @@ func (n *Node) Shutdown() error { // The channel must have sufficient buffer space since updates are published only once without blocking. // If the channel is full, state updates will be skipped to prevent blocking the raft cluster. func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) { + n.fsm.applyMu.Lock() + defer n.fsm.applyMu.Unlock() n.fsm.applyCh = ch } @@ -319,9 +331,12 @@ func (f *FSM) Apply(log *raft.Log) any { Int("data_bytes", len(state.Data)). Msg("applied raft block state") - if f.applyCh != nil { + f.applyMu.RLock() + ch := f.applyCh + f.applyMu.RUnlock() + if ch != nil { select { - case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message")