Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions block/internal/syncing/raft_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() {
r.mtx.Unlock()

r.wg.Wait()
r.raftNode.SetApplyCallback(nil)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify unsynchronized callback read/write sites in raft node implementation.
rg -n -C3 'type FSM struct|applyCh|func \(n \*Node\) SetApplyCallback|func \(f \*FSM\) Apply' pkg/raft/node.go

Repository: evstack/ev-node

Length of output: 1336


🏁 Script executed:

# Check the actual raft_retriever.Stop() implementation to understand what wg.Wait() is waiting for
cat -n block/internal/syncing/raft_retriever.go | grep -A 15 "func (r \*raftRetriever) Stop"

Repository: evstack/ev-node

Length of output: 590


🏁 Script executed:

# Check if there's synchronization in the raft node's Stop or shutdown methods
rg -n -B3 -A10 "func \(n \*Node\) Stop" pkg/raft/node.go

Repository: evstack/ev-node

Length of output: 553


🏁 Script executed:

# Check if the raft library itself (from ethraft package) provides synchronization around FSM.Apply
rg -n "applyCh" pkg/raft/node.go | head -20

Repository: evstack/ev-node

Length of output: 212


🏁 Script executed:

# Check where raftNode is initialized and where Stop is called in context
rg -n -B5 -A15 "func.*NewRaftRetriever|raftNode.*Stop\(\)" block/internal/syncing/raft_retriever.go

Repository: evstack/ev-node

Length of output: 41


🏁 Script executed:

# Check if there's a parent Syncer that coordinates shutdown of both raftNode and raftRetriever
rg -n -B3 -A8 "type Syncer struct|raftRetriever\)" block/internal/syncing/syncer.go | head -40

Repository: evstack/ev-node

Length of output: 328


🏁 Script executed:

# Check the full Stop() implementation and see if there are any shutdown calls to raftNode before SetApplyCallback
cat -n block/internal/syncing/raft_retriever.go | sed -n '60,90p'

Repository: evstack/ev-node

Length of output: 951


🏁 Script executed:

# Find where raftRetriever.Stop() is called
rg -n "raftRetriever.*Stop\(\)|\.Stop\(\)" block/internal/syncing/syncer.go | head -20

Repository: evstack/ev-node

Length of output: 186


🏁 Script executed:

# Get the Syncer.Stop() method to see the full shutdown sequence
rg -n -B3 -A20 "func \(s \*Syncer\) Stop" block/internal/syncing/syncer.go

Repository: evstack/ev-node

Length of output: 646


🏁 Script executed:

# Check where r.raftNode is stopped/shut down
rg -n "raftNode\.*Stop\|raftRetriever.*Stop" block/internal/syncing/raft_retriever.go

Repository: evstack/ev-node

Length of output: 41


Synchronize callback access to prevent data race with FSM.Apply.

Line 77's SetApplyCallback(nil) races with FSM.Apply in pkg/raft/node.go, which reads and sends on applyCh without synchronization. The wg.Wait() only ensures the consumer (raftApplyLoop) has exited, but raftNode continues running and can still invoke Apply(). Guard callback access with a mutex in the raft node implementation.

Suggested fix (in pkg/raft/node.go)
 import (
 	"context"
 	"errors"
 	"fmt"
 	"io"
 	"net"
 	"os"
 	"path/filepath"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"time"
@@
 type FSM struct {
 	logger  zerolog.Logger
 	state   *atomic.Pointer[RaftBlockState]
+	applyMu sync.RWMutex
 	applyCh chan<- RaftApplyMsg
 }
@@
 func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) {
+	n.fsm.applyMu.Lock()
+	defer n.fsm.applyMu.Unlock()
 	n.fsm.applyCh = ch
 }
@@
-	if f.applyCh != nil {
+	f.applyMu.RLock()
+	ch := f.applyCh
+	f.applyMu.RUnlock()
+	if ch != nil {
 		select {
-		case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}:
+		case ch <- RaftApplyMsg{Index: log.Index, State: &state}:
 		default:
 			// on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too.
 			f.logger.Warn().Msg("apply channel full, dropping message")
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/raft_retriever.go` at line 77, The call to
r.raftNode.SetApplyCallback(nil) races with FSM.Apply because Apply reads/sends
on applyCh while the raft node may concurrently invoke the callback; fix by
adding a mutex to the raft node to guard access to the apply callback: protect
the callback field and its setter Get/SetApplyCallback (or SetApplyCallback and
any internal invocation sites) with the new mutex so that FSM.Apply (which
reads/sends on applyCh via the callback) cannot see a nil or changing callback
mid-invocation; update the raft node's invocation path that calls the callback
(where Apply is invoked) to acquire the same mutex (or use a read lock) when
reading the callback and release it immediately after obtaining the pointer,
then call the callback outside the lock if needed to avoid long-held locks.

}

// raftApplyLoop processes blocks received from raft
Expand Down
61 changes: 61 additions & 0 deletions block/internal/syncing/raft_retriever_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package syncing

import (
"context"
"sync"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/evstack/ev-node/pkg/genesis"
pkgraft "github.com/evstack/ev-node/pkg/raft"
)

type stubRaftNode struct {
mu sync.Mutex
callbacks []chan<- pkgraft.RaftApplyMsg
}

func (s *stubRaftNode) IsLeader() bool { return false }

func (s *stubRaftNode) HasQuorum() bool { return false }

func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil }

func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil }

func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) {
s.mu.Lock()
defer s.mu.Unlock()
s.callbacks = append(s.callbacks, ch)
}

func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks))
copy(out, s.callbacks)
return out
}

func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) {
t.Parallel()

raftNode := &stubRaftNode{}
retriever := newRaftRetriever(
raftNode,
genesis.Genesis{},
zerolog.Nop(),
nil,
func(context.Context, *pkgraft.RaftBlockState) error { return nil },
)

require.NoError(t, retriever.Start(t.Context()))
retriever.Stop()

callbacks := raftNode.recordedCallbacks()
require.Len(t, callbacks, 2)
require.NotNil(t, callbacks[0])
require.Nil(t, callbacks[1])
}
21 changes: 20 additions & 1 deletion block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS
}

if currentState.LastBlockHeight > raftState.Height {
return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1)
// Local EVM is ahead of the raft snapshot. This is expected on restart when
// the raft FSM hasn't finished replaying log entries yet (stale snapshot height),
// or when log entries were compacted and the FSM is awaiting a new snapshot from
// the leader. Verify that our local block at raftState.Height has the same hash
// to confirm shared history before skipping recovery.
localHeader, err := s.store.GetHeader(ctx, raftState.Height)
if err != nil {
return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w",
currentState.LastBlockHeight, raftState.Height, err)
}
localHash := localHeader.Hash()
if !bytes.Equal(localHash, raftState.Hash) {
return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x",
raftState.Height, localHash, raftState.Hash)
}
s.logger.Info().
Uint64("local_height", currentState.LastBlockHeight).
Uint64("raft_height", raftState.Height).
Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up")
return nil
}

return nil
Expand Down
112 changes: 112 additions & 0 deletions block/internal/syncing/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,118 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing
require.ErrorContains(t, err, "invalid chain ID")
}

// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node
// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to
// timing or log compaction), RecoverFromRaft should skip recovery if the local
// block at raftState.Height has a matching hash, rather than crashing.
func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)

cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
require.NoError(t, err)

addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{
ChainID: "1234",
InitialHeight: 1,
StartTime: time.Now().Add(-time.Second),
ProposerAddress: addr,
}

mockExec := testmocks.NewMockExecutor(t)
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
s := NewSyncer(
st,
mockExec,
nil,
cm,
common.NopMetrics(),
config.DefaultConfig(),
gen,
mockHeaderStore,
mockDataStore,
zerolog.Nop(),
common.DefaultBlockOptions(),
make(chan error, 1),
nil,
)

// Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM).
data1 := makeData(gen.ChainID, 1, 0)
headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil)
dataBz1, err := data1.MarshalBinary()
require.NoError(t, err)

batch, err := st.NewBatch(t.Context())
require.NoError(t, err)
require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature))
require.NoError(t, batch.SetHeight(1))
require.NoError(t, batch.UpdateState(types.State{
ChainID: gen.ChainID,
InitialHeight: 1,
LastBlockHeight: 1,
LastHeaderHash: hdr1.Hash(),
}))
require.NoError(t, batch.Commit())

// Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no
// block 0 to check, so use height 1 EVM vs stale snapshot at height 0.
// More realistic: EVM at height 2, raft snapshot at height 1.
// Build a second block and advance the store state to height 2.
data2 := makeData(gen.ChainID, 2, 0)
headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash())
dataBz2, err := data2.MarshalBinary()
require.NoError(t, err)

batch2, err := st.NewBatch(t.Context())
require.NoError(t, err)
require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature))
require.NoError(t, batch2.SetHeight(2))
require.NoError(t, batch2.UpdateState(types.State{
ChainID: gen.ChainID,
InitialHeight: 1,
LastBlockHeight: 2,
LastHeaderHash: hdr2.Hash(),
}))
require.NoError(t, batch2.Commit())

// Set lastState to height 2 (EVM is at 2).
s.SetLastState(types.State{
ChainID: gen.ChainID,
InitialHeight: 1,
LastBlockHeight: 2,
LastHeaderHash: hdr2.Hash(),
})

t.Run("matching hash skips recovery", func(t *testing.T) {
// raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1.
err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
Height: 1,
Hash: hdr1.Hash(),
Header: headerBz1,
Data: dataBz1,
})
require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error")
})

t.Run("diverged hash returns error", func(t *testing.T) {
wrongHash := make([]byte, len(hdr1.Hash()))
copy(wrongHash, hdr1.Hash())
wrongHash[0] ^= 0xFF // flip a byte to produce a different hash

err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{
Height: 1,
Hash: wrongHash,
Header: headerBz1,
Data: dataBz1,
})
require.Error(t, err)
require.ErrorContains(t, err, "diverged from raft")
})
}

func TestSyncer_processPendingEvents(t *testing.T) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)
Expand Down
29 changes: 22 additions & 7 deletions pkg/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -43,6 +44,7 @@ type Config struct {
type FSM struct {
logger zerolog.Logger
state *atomic.Pointer[RaftBlockState]
applyMu sync.RWMutex
applyCh chan<- RaftApplyMsg
}

Expand Down Expand Up @@ -146,18 +148,26 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error {
if n == nil {
return nil
}
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()
ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2)
// Use a one-shot timer for the deadline to avoid the race where a repeating
// ticker and the timeout ticker fire simultaneously in select, causing a
// spurious timeout even when AppliedIndex >= CommitIndex.
deadline := time.NewTimer(timeout)
defer deadline.Stop()
pollInterval := min(50*time.Millisecond, timeout/4)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if n.raft.AppliedIndex() >= n.raft.LastIndex() {
if n.raft.AppliedIndex() >= n.raft.CommitIndex() {
return nil
}
case <-deadline.C:
// Final check after deadline before giving up.
if n.raft.AppliedIndex() >= n.raft.CommitIndex() {
return nil
}
case <-timeoutTicker.C:
return errors.New("max wait time reached")
}
}
Expand Down Expand Up @@ -297,6 +307,8 @@ func (n *Node) Shutdown() error {
// The channel must have sufficient buffer space since updates are published only once without blocking.
// If the channel is full, state updates will be skipped to prevent blocking the raft cluster.
func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) {
n.fsm.applyMu.Lock()
defer n.fsm.applyMu.Unlock()
n.fsm.applyCh = ch
}

Expand All @@ -319,9 +331,12 @@ func (f *FSM) Apply(log *raft.Log) any {
Int("data_bytes", len(state.Data)).
Msg("applied raft block state")

if f.applyCh != nil {
f.applyMu.RLock()
ch := f.applyCh
f.applyMu.RUnlock()
if ch != nil {
select {
case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}:
case ch <- RaftApplyMsg{Index: log.Index, State: &state}:
default:
// on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too.
f.logger.Warn().Msg("apply channel full, dropping message")
Expand Down
Loading