From 2d28b2096e5003b705c58692055bfe85f7711eb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:46:29 +0200 Subject: [PATCH 1/3] Fix raft leader handoff regression after SIGTERM --- block/internal/syncing/raft_retriever.go | 1 + block/internal/syncing/raft_retriever_test.go | 61 +++++++++++++++++++ pkg/raft/node.go | 2 +- 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 block/internal/syncing/raft_retriever_test.go diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index cfa55662bd..aaebb7a458 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() { r.mtx.Unlock() r.wg.Wait() + r.raftNode.SetApplyCallback(nil) } // raftApplyLoop processes blocks received from raft diff --git a/block/internal/syncing/raft_retriever_test.go b/block/internal/syncing/raft_retriever_test.go new file mode 100644 index 0000000000..ec176aad2a --- /dev/null +++ b/block/internal/syncing/raft_retriever_test.go @@ -0,0 +1,61 @@ +package syncing + +import ( + "context" + "sync" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/genesis" + pkgraft "github.com/evstack/ev-node/pkg/raft" +) + +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- pkgraft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } + +func (s *stubRaftNode) HasQuorum() bool { return false } + +func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil } + +func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil } + +func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + +func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) { + t.Parallel() + + raftNode := &stubRaftNode{} + retriever := newRaftRetriever( + raftNode, + genesis.Genesis{}, + zerolog.Nop(), + nil, + func(context.Context, *pkgraft.RaftBlockState) error { return nil }, + ) + + require.NoError(t, retriever.Start(t.Context())) + retriever.Stop() + + callbacks := raftNode.recordedCallbacks() + require.Len(t, callbacks, 2) + require.NotNil(t, callbacks[0]) + require.Nil(t, callbacks[1]) +} diff --git a/pkg/raft/node.go b/pkg/raft/node.go index ada6838560..c6b2529e14 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -159,7 +159,7 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { for { select { case <-ticker.C: - if n.raft.AppliedIndex() >= n.raft.LastIndex() { + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } case <-timeoutTicker.C: From 2106f04cb2b5fda70fe18bcd765dfa547871b0b5 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:03:46 +0200 Subject: [PATCH 2/3] fix: follower crash on restart when EVM is ahead of stale raft snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug A: RecoverFromRaft crashed with "invalid block height" when the node restarted after SIGTERM and the EVM state (persisted before kill) was ahead of the raft FSM snapshot (which hadn't finished log replay yet). The function now verifies the hash of the local block at raftState.Height — if it matches the snapshot hash the EVM history is correct and recovery is safely skipped; a mismatch returns an error indicating a genuine fork. Bug B: waitForMsgsLanded used two repeating tickers with the same effective period (SendTimeout/2 poll, SendTimeout timeout), so both could fire simultaneously in select and the timeout would win even when AppliedIndex >= CommitIndex. Replaced the deadline ticker with a one-shot time.NewTimer, added a final check in the deadline branch, and reduced poll interval to min(50ms, timeout/4) for more responsive detection. Fixes the crash-restart Docker backoff loop observed in SIGTERM HA test cycle 7 (poc-ha-2 never rejoining within the 300s kill interval). Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer.go | 21 ++++- block/internal/syncing/syncer_test.go | 112 ++++++++++++++++++++++++++ pkg/raft/node.go | 16 +++- 3 files changed, 144 insertions(+), 5 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243d..dbb0e9f8ab 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -1234,7 +1234,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } if currentState.LastBlockHeight > raftState.Height { - return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1) + // Local EVM is ahead of the raft snapshot. This is expected on restart when + // the raft FSM hasn't finished replaying log entries yet (stale snapshot height), + // or when log entries were compacted and the FSM is awaiting a new snapshot from + // the leader. Verify that our local block at raftState.Height has the same hash + // to confirm shared history before skipping recovery. + localHeader, err := s.store.GetHeader(ctx, raftState.Height) + if err != nil { + return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w", + currentState.LastBlockHeight, raftState.Height, err) + } + localHash := localHeader.Hash() + if !bytes.Equal(localHash, raftState.Hash) { + return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x", + raftState.Height, localHash, raftState.Hash) + } + s.logger.Info(). + Uint64("local_height", currentState.LastBlockHeight). + Uint64("raft_height", raftState.Height). + Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up") + return nil } return nil diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..66ac7e9e05 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -422,6 +422,118 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing require.ErrorContains(t, err, "invalid chain ID") } +// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node +// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to +// timing or log compaction), RecoverFromRaft should skip recovery if the local +// block at raftState.Height has a matching hash, rather than crashing. +func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM). + data1 := makeData(gen.ChainID, 1, 0) + headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil) + dataBz1, err := data1.MarshalBinary() + require.NoError(t, err) + + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastHeaderHash: hdr1.Hash(), + })) + require.NoError(t, batch.Commit()) + + // Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no + // block 0 to check, so use height 1 EVM vs stale snapshot at height 0. + // More realistic: EVM at height 2, raft snapshot at height 1. + // Build a second block and advance the store state to height 2. + data2 := makeData(gen.ChainID, 2, 0) + headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash()) + dataBz2, err := data2.MarshalBinary() + require.NoError(t, err) + + batch2, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature)) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + })) + require.NoError(t, batch2.Commit()) + + // Set lastState to height 2 (EVM is at 2). + s.SetLastState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + }) + + t.Run("matching hash skips recovery", func(t *testing.T) { + // raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr1.Hash(), + Header: headerBz1, + Data: dataBz1, + }) + require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error") + }) + + t.Run("diverged hash returns error", func(t *testing.T) { + wrongHash := make([]byte, len(hdr1.Hash())) + copy(wrongHash, hdr1.Hash()) + wrongHash[0] ^= 0xFF // flip a byte to produce a different hash + + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: wrongHash, + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "diverged from raft") + }) +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 3fdda58000..a9988f793a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -146,9 +146,13 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n == nil { return nil } - timeoutTicker := time.NewTicker(timeout) - defer timeoutTicker.Stop() - ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2) + // Use a one-shot timer for the deadline to avoid the race where a repeating + // ticker and the timeout ticker fire simultaneously in select, causing a + // spurious timeout even when AppliedIndex >= CommitIndex. + deadline := time.NewTimer(timeout) + defer deadline.Stop() + pollInterval := min(50*time.Millisecond, timeout/4) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { @@ -157,7 +161,11 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } - case <-timeoutTicker.C: + case <-deadline.C: + // Final check after deadline before giving up. + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { + return nil + } return errors.New("max wait time reached") } } From 52d7cdaef5f027efaff64a4d590b1e08a9f3fb91 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:13:23 +0200 Subject: [PATCH 3/3] fix(raft): guard FSM apply callback with RWMutex to prevent data race SetApplyCallback(nil) called from raftRetriever.Stop() raced with FSM.Apply reading applyCh: wg.Wait() only ensures the consumer goroutine exited, but the raft library can still invoke Apply concurrently. Add applyMu sync.RWMutex to FSM; take write lock in SetApplyCallback and read lock in Apply before reading the channel pointer. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index a9988f793a..157b437367 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type Config struct { type FSM struct { logger zerolog.Logger state *atomic.Pointer[RaftBlockState] + applyMu sync.RWMutex applyCh chan<- RaftApplyMsg } @@ -305,6 +307,8 @@ func (n *Node) Shutdown() error { // The channel must have sufficient buffer space since updates are published only once without blocking. // If the channel is full, state updates will be skipped to prevent blocking the raft cluster. func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) { + n.fsm.applyMu.Lock() + defer n.fsm.applyMu.Unlock() n.fsm.applyCh = ch } @@ -327,9 +331,12 @@ func (f *FSM) Apply(log *raft.Log) any { Int("data_bytes", len(state.Data)). Msg("applied raft block state") - if f.applyCh != nil { + f.applyMu.RLock() + ch := f.applyCh + f.applyMu.RUnlock() + if ch != nil { select { - case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message")