From 7ca8de26584231ec238a71926846f89c2128b18c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 24 Mar 2026 18:36:52 +0100 Subject: [PATCH 01/13] fix(syncer): refetch latest da height instead of da height +1 --- block/internal/syncing/syncer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 3dcf80ea3..29c12ca11 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -355,7 +355,9 @@ func (s *Syncer) initializeState() error { // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. - s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight)) + // state.DaHeight-1 if the latest DAHeight was containing more heights. + // s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight-1)) + s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, state.DAHeight-1)) // TODO: s.cache.DaHeight() should only be used if p2p works. s.logger.Info(). Uint64("height", state.LastBlockHeight). From dbdd2cc2b5ac6e5fe9de9733a68e91354637d8f1 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 25 Mar 2026 11:04:30 +0100 Subject: [PATCH 02/13] wip --- block/internal/syncing/syncer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 29c12ca11..d151bca55 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -355,9 +355,12 @@ func (s *Syncer) initializeState() error { // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. - // state.DaHeight-1 if the latest DAHeight was containing more heights. - // s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight-1)) - s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, state.DAHeight-1)) // TODO: s.cache.DaHeight() should only be used if p2p works. + // Only use cache.DaHeight() when P2P is enabled, as it contains P2P-specific synchronization info. + daHeight := max(s.genesis.DAStartHeight, state.DAHeight-1) + if s.headerStore != nil { + daHeight = max(daHeight, s.cache.DaHeight()) + } + s.daRetrieverHeight.Store(daHeight) s.logger.Info(). Uint64("height", state.LastBlockHeight). From 5ec817806196ef24815b62c5291d747139c521e4 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 25 Mar 2026 11:10:22 +0100 Subject: [PATCH 03/13] fixes --- block/internal/syncing/syncer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index d151bca55..dcc9acd2a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -355,9 +355,9 @@ func (s *Syncer) initializeState() error { // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. - // Only use cache.DaHeight() when P2P is enabled, as it contains P2P-specific synchronization info. + // Only use cache.DaHeight() when P2P is actively syncing (headerStore has higher height than current state). daHeight := max(s.genesis.DAStartHeight, state.DAHeight-1) - if s.headerStore != nil { + if s.headerStore.Height() > state.LastBlockHeight { daHeight = max(daHeight, s.cache.DaHeight()) } s.daRetrieverHeight.Store(daHeight) From e5aae154e49271d18f0dee6d9e1b5e4a881c739a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 25 Mar 2026 11:43:40 +0100 Subject: [PATCH 04/13] fix changelog and underflow --- CHANGELOG.md | 1 + block/internal/syncing/syncer.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24eaa6173..e6a17a0a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201) - Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162) - Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167) - Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index dcc9acd2a..136b22a5d 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -356,7 +356,7 @@ func (s *Syncer) initializeState() error { // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. // Only use cache.DaHeight() when P2P is actively syncing (headerStore has higher height than current state). - daHeight := max(s.genesis.DAStartHeight, state.DAHeight-1) + daHeight := max(s.genesis.DAStartHeight, min(state.DAHeight-1, 0)) if s.headerStore.Height() > state.LastBlockHeight { daHeight = max(daHeight, s.cache.DaHeight()) } From 37ee626db72c4d0891dd43ea4d92f054933da79d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 25 Mar 2026 12:09:47 +0100 Subject: [PATCH 05/13] fix nil --- block/internal/syncing/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 136b22a5d..a1b926725 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -357,7 +357,7 @@ func (s *Syncer) initializeState() error { // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. // Only use cache.DaHeight() when P2P is actively syncing (headerStore has higher height than current state). daHeight := max(s.genesis.DAStartHeight, min(state.DAHeight-1, 0)) - if s.headerStore.Height() > state.LastBlockHeight { + if s.headerStore != nil && s.headerStore.Height() > state.LastBlockHeight { daHeight = max(daHeight, s.cache.DaHeight()) } s.daRetrieverHeight.Store(daHeight) From 44d8d600edc42e8b6366ad1e9288b275f3d97116 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 26 Mar 2026 10:23:58 +0100 Subject: [PATCH 06/13] fix unit tests --- .../syncing/syncer_forced_inclusion_test.go | 14 ++- block/internal/syncing/syncer_test.go | 106 ++++++++++++++---- 2 files changed, 92 insertions(+), 28 deletions(-) diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 587db84d8..7e43e55cc 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -81,10 +81,13 @@ func newForcedInclusionSyncer(t *testing.T, daStart, epochSize uint64) (*Syncer, fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), cfg.DA.BlockTime.Duration, false, gen.DAStartHeight, gen.DAEpochForcedInclusion) t.Cleanup(fiRetriever.Stop) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() s := NewSyncer( st, mockExec, client, cm, common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), nil, ) s.daRetriever = daRetriever @@ -156,10 +159,13 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), cfg.DA.BlockTime.Duration, false, gen.DAStartHeight, gen.DAEpochForcedInclusion) t.Cleanup(fiRetriever.Stop) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() s := NewSyncer( st, mockExec, client, cm, common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), nil, ) if s.fiRetriever != nil { diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 6a1a2b7d8..5a9c440f8 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -124,6 +124,11 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + s := NewSyncer( st, mockExec, @@ -132,8 +137,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -175,6 +180,11 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + errChan := make(chan error, 1) s := NewSyncer( st, @@ -184,8 +194,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), errChan, @@ -230,6 +240,11 @@ func TestSequentialBlockSync(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + errChan := make(chan error, 1) s := NewSyncer( st, @@ -239,13 +254,14 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), errChan, nil, ) + require.NoError(t, s.initializeState()) s.ctx = t.Context() @@ -358,7 +374,9 @@ func TestSyncLoopPersistState(t *testing.T) { mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() mockP2PHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockP2PHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() mockP2PDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockP2PDataStore.EXPECT().Height().Return(uint64(0)).Maybe() errorCh := make(chan error, 1) syncerInst1 := NewSyncer( @@ -722,6 +740,11 @@ func TestProcessHeightEvent_TriggersAsyncDARetrieval(t *testing.T) { mockDAClient := testmocks.NewMockClient(t) mockDAClient.EXPECT().GetLatestDAHeight(mock.Anything).Return(uint64(200), nil).Maybe() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + s := NewSyncer( st, mockExec, @@ -730,8 +753,8 @@ func TestProcessHeightEvent_TriggersAsyncDARetrieval(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -788,6 +811,11 @@ func TestProcessHeightEvent_RejectsUnreasonableDAHint(t *testing.T) { mockDAClient := testmocks.NewMockClient(t) mockDAClient.EXPECT().GetLatestDAHeight(mock.Anything).Return(uint64(100), nil).Maybe() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + s := NewSyncer( st, mockExec, @@ -796,8 +824,8 @@ func TestProcessHeightEvent_RejectsUnreasonableDAHint(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -852,6 +880,11 @@ func TestProcessHeightEvent_AcceptsValidDAHint(t *testing.T) { mockDAClient := testmocks.NewMockClient(t) mockDAClient.EXPECT().GetLatestDAHeight(mock.Anything).Return(uint64(100), nil).Maybe() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + s := NewSyncer( st, mockExec, @@ -860,8 +893,8 @@ func TestProcessHeightEvent_AcceptsValidDAHint(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -917,6 +950,11 @@ func TestProcessHeightEvent_SkipsDAHintWhenAlreadyDAIncluded(t *testing.T) { mockDAClient := testmocks.NewMockClient(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + s := NewSyncer( st, mockExec, @@ -925,8 +963,8 @@ func TestProcessHeightEvent_SkipsDAHintWhenAlreadyDAIncluded(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -1005,9 +1043,14 @@ func TestProcessHeightEvent_SkipsDAHintWhenBelowRetrieverCursor(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() - // Mock DA client reports latest DA height above the hints we'll send + // Mock DA client reports latest DA height of 100 mockDAClient := testmocks.NewMockClient(t) - mockDAClient.EXPECT().GetLatestDAHeight(mock.Anything).Return(uint64(300), nil).Maybe() + mockDAClient.EXPECT().GetLatestDAHeight(mock.Anything).Return(uint64(100), nil).Maybe() + + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() s := NewSyncer( st, @@ -1017,8 +1060,8 @@ func TestProcessHeightEvent_SkipsDAHintWhenBelowRetrieverCursor(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -1104,6 +1147,11 @@ func TestProcessHeightEvent_ExecutionFailure_DoesNotReschedule(t *testing.T) { mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything). Return([]byte(nil), errors.New("connection refused")).Times(common.MaxRetriesBeforeHalt) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + errChan := make(chan error, 1) s := NewSyncer( st, @@ -1113,8 +1161,8 @@ func TestProcessHeightEvent_ExecutionFailure_DoesNotReschedule(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), errChan, @@ -1166,6 +1214,11 @@ func TestSyncer_Stop_SkipsDrainOnCriticalError(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + errChan := make(chan error, 1) s := NewSyncer( st, @@ -1175,8 +1228,8 @@ func TestSyncer_Stop_SkipsDrainOnCriticalError(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), errChan, @@ -1241,6 +1294,11 @@ func TestSyncer_Stop_DrainWorksWithoutCriticalError(t *testing.T) { mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, mock.Anything). Return([]byte("app1"), nil).Once() + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + errChan := make(chan error, 1) s := NewSyncer( st, @@ -1250,8 +1308,8 @@ func TestSyncer_Stop_DrainWorksWithoutCriticalError(t *testing.T) { common.NopMetrics(), cfg, gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), + mockHeaderStore, + mockDataStore, zerolog.Nop(), common.DefaultBlockOptions(), errChan, From 5d284927da50e3c1a350c623d842176518f65228 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 26 Mar 2026 10:25:13 +0100 Subject: [PATCH 07/13] arrange cl --- CHANGELOG.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2a7c78d8..a8724961f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add AWS & GCP KMS signer backend [#3171](https://github.com/evstack/ev-node/pull/3171) +- Subscribe to forced inclusion namespace events [#3146](https://github.com/evstack/ev-node/pull/3146) +- Display block source in sync log [#3193](https://github.com/evstack/ev-node/pull/3193) + ### Fixed - Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201) @@ -16,12 +22,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167) - Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166) -### Added - -- Add AWS & GCP KMS signer backend [#3171](https://github.com/evstack/ev-node/pull/3171) -- Subscribe to forced inclusion namespace events [#3146](https://github.com/evstack/ev-node/pull/3146) -- Display block source in sync log [#3193](https://github.com/evstack/ev-node/pull/3193) - ## v1.0.0 ### Fixed From b580bc5de1a5a1e29ee82388edc8f92106ccd081 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 26 Mar 2026 15:12:38 +0100 Subject: [PATCH 08/13] updates --- block/internal/syncing/syncer.go | 10 ++++++++++ pkg/config/config.go | 8 +++++++- pkg/config/config_test.go | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index a1b926725..1d4bcb2f4 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -360,6 +360,16 @@ func (s *Syncer) initializeState() error { if s.headerStore != nil && s.headerStore.Height() > state.LastBlockHeight { daHeight = max(daHeight, s.cache.DaHeight()) } + + // dev mode for da start height + if startHeight := s.config.DA.StartHeight; startHeight > 0 { + s.logger.Info(). + Uint64("previous_da_start_height", daHeight). + Uint64("override_da_start_height", s.config.DA.StartHeight). + Msg("DA start height overridden by flag") + daHeight = startHeight + } + s.daRetrieverHeight.Store(daHeight) s.logger.Info(). diff --git a/pkg/config/config.go b/pkg/config/config.go index 1fc3bcb4b..97f4728bf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -89,6 +89,8 @@ const ( FlagDABatchMaxDelay = FlagPrefixEvnode + "da.batch_max_delay" // FlagDABatchMinItems is a flag for specifying the minimum batch items FlagDABatchMinItems = FlagPrefixEvnode + "da.batch_min_items" + // FlagDAStartHeight is a flag for forcing the DA retrieval height to start from a specific height + FlagDAStartHeight = FlagPrefixEvnode + "da.start_height" // P2P configuration flags @@ -237,6 +239,8 @@ type Config struct { // DAConfig contains all Data Availability configuration parameters type DAConfig struct { + StartHeight uint64 `mapstructure:"-" yaml:"-" comment:"Force DA retrieval to start from a specific height (0 for default)"` + Address string `mapstructure:"address" yaml:"address" comment:"Address of the data availability layer service (host:port). This is the endpoint where Rollkit will connect to submit and retrieve data."` AuthToken string `mapstructure:"auth_token" yaml:"auth_token" comment:"Authentication token for the data availability layer service. Required if the DA service needs authentication."` //nolint:gosec // this is ok. SubmitOptions string `mapstructure:"submit_options" yaml:"submit_options" comment:"Additional options passed to the DA layer when submitting data. Format depends on the specific DA implementation being used."` @@ -563,7 +567,7 @@ func AddFlags(cmd *cobra.Command) { }) // Add base flags - cmd.Flags().String(FlagDBPath, def.DBPath, "path for the node database") + cmd.Flags().String(FlagDBPath, def.DBPath, "path for for node database") cmd.Flags().Bool(FlagClearCache, def.ClearCache, "clear the cache") // Node configuration flags @@ -595,6 +599,8 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Float64(FlagDABatchSizeThreshold, def.DA.BatchSizeThreshold, "batch size threshold as fraction of max blob size (0.0-1.0)") cmd.Flags().Duration(FlagDABatchMaxDelay, def.DA.BatchMaxDelay.Duration, "maximum time to wait before submitting a batch") cmd.Flags().Uint64(FlagDABatchMinItems, def.DA.BatchMinItems, "minimum number of items to accumulate before submission") + cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for default)") + cmd.Flags().MarkHidden(FlagDAStartHeight) // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2ae2b0ef6..594114c77 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -128,7 +128,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 77 // Update this number if you add more flag checks above + expectedFlagCount := 78 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 From b619b09d98b74e18a5926fc7171190036f9d6ff0 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 26 Mar 2026 15:22:06 +0100 Subject: [PATCH 09/13] fixes --- pkg/config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 97f4728bf..e278aebba 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -239,7 +239,7 @@ type Config struct { // DAConfig contains all Data Availability configuration parameters type DAConfig struct { - StartHeight uint64 `mapstructure:"-" yaml:"-" comment:"Force DA retrieval to start from a specific height (0 for default)"` + StartHeight uint64 `mapstructure:"start_height" yaml:"-" comment:"Force DA retrieval to start from a specific height (0 for default)"` Address string `mapstructure:"address" yaml:"address" comment:"Address of the data availability layer service (host:port). This is the endpoint where Rollkit will connect to submit and retrieve data."` AuthToken string `mapstructure:"auth_token" yaml:"auth_token" comment:"Authentication token for the data availability layer service. Required if the DA service needs authentication."` //nolint:gosec // this is ok. @@ -599,7 +599,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Float64(FlagDABatchSizeThreshold, def.DA.BatchSizeThreshold, "batch size threshold as fraction of max blob size (0.0-1.0)") cmd.Flags().Duration(FlagDABatchMaxDelay, def.DA.BatchMaxDelay.Duration, "maximum time to wait before submitting a batch") cmd.Flags().Uint64(FlagDABatchMinItems, def.DA.BatchMinItems, "minimum number of items to accumulate before submission") - cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for default)") + cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for disabled)") cmd.Flags().MarkHidden(FlagDAStartHeight) // P2P configuration flags From df4d8a37b0886048521dcca8b0519bd2b4dba5c6 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Thu, 26 Mar 2026 23:11:52 +0100 Subject: [PATCH 10/13] fix --- block/internal/syncing/syncer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 1d4bcb2f4..42841f89a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -356,7 +356,10 @@ func (s *Syncer) initializeState() error { // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. // Only use cache.DaHeight() when P2P is actively syncing (headerStore has higher height than current state). - daHeight := max(s.genesis.DAStartHeight, min(state.DAHeight-1, 0)) + daHeight := s.genesis.DAStartHeight + if state.DAHeight > s.genesis.DAStartHeight { + daHeight = max(daHeight, state.DAHeight-1) + } if s.headerStore != nil && s.headerStore.Height() > state.LastBlockHeight { daHeight = max(daHeight, s.cache.DaHeight()) } From 48c735e287b8a404b9a10d4784eb94b4054c129a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Mar 2026 15:32:53 +0100 Subject: [PATCH 11/13] remove lru from generic cache as slow cleanup already happens --- block/internal/cache/generic_cache.go | 220 ++++++----------- block/internal/cache/generic_cache_test.go | 226 ++++++------------ block/internal/cache/manager.go | 68 +++--- block/internal/cache/pending_events_map.go | 45 ++++ .../internal/cache/pending_events_map_test.go | 75 ++++++ 5 files changed, 312 insertions(+), 322 deletions(-) create mode 100644 block/internal/cache/pending_events_map.go create mode 100644 block/internal/cache/pending_events_map_test.go diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index c04098704..d014062aa 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -8,23 +8,11 @@ import ( "sync" "sync/atomic" - lru "github.com/hashicorp/golang-lru/v2" ds "github.com/ipfs/go-datastore" "github.com/evstack/ev-node/pkg/store" ) -const ( - // DefaultItemsCacheSize is the default size for items cache. - DefaultItemsCacheSize = 200_000 - - // DefaultHashesCacheSize is the default size for hash tracking. - DefaultHashesCacheSize = 200_000 - - // DefaultDAIncludedCacheSize is the default size for DA inclusion tracking. - DefaultDAIncludedCacheSize = 200_000 -) - // snapshotEntry is one record in the persisted snapshot. // Encoded as 16 bytes: [blockHeight uint64 LE][daHeight uint64 LE]. type snapshotEntry struct { @@ -34,142 +22,98 @@ type snapshotEntry struct { const snapshotEntrySize = 16 // bytes per snapshotEntry -// Cache tracks seen blocks and DA inclusion status using bounded LRU caches. -type Cache[T any] struct { - // itemsByHeight stores items keyed by uint64 height. - // Mutex needed for atomic get-and-remove in getNextItem. - itemsByHeight *lru.Cache[uint64, *T] - itemsByHeightMu sync.Mutex +// Cache tracks seen blocks and DA inclusion status. +type Cache struct { + mu sync.Mutex - // hashes tracks whether a given hash has been seen - hashes *lru.Cache[string, bool] + hashes map[string]bool + daIncluded map[string]uint64 + hashByHeight map[uint64]string + maxDAHeight *atomic.Uint64 - // daIncluded maps hash → daHeight. Hash may be a real content hash or a - // height placeholder (see HeightPlaceholderKey) immediately after restore. - daIncluded *lru.Cache[string, uint64] - - // hashByHeight maps blockHeight → hash, used for pruning and height-based - // lookups. Protected by hashByHeightMu only in deleteAllForHeight where a - // read-then-remove must be atomic. - hashByHeight *lru.Cache[uint64, string] - hashByHeightMu sync.Mutex - - // maxDAHeight tracks the maximum DA height seen - maxDAHeight *atomic.Uint64 - - store store.Store // nil = ephemeral, no persistence - // storeKeyPrefix is the prefix used for store keys + store store.Store storeKeyPrefix string } -func (c *Cache[T]) snapshotKey() string { +func (c *Cache) snapshotKey() string { return c.storeKeyPrefix + "__snap" } // NewCache creates a Cache. When store and keyPrefix are set, mutations // persist a snapshot so RestoreFromStore can recover in-flight state. -func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { - // LRU cache creation only fails if size <= 0, which won't happen with our defaults - itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) - hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) - daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) - hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) - - return &Cache[T]{ - itemsByHeight: itemsCache, - hashes: hashesCache, - daIncluded: daIncludedCache, - hashByHeight: hashByHeightCache, +func NewCache(s store.Store, keyPrefix string) *Cache { + return &Cache{ + hashes: make(map[string]bool), + daIncluded: make(map[string]uint64), + hashByHeight: make(map[uint64]string), maxDAHeight: &atomic.Uint64{}, store: s, storeKeyPrefix: keyPrefix, } } -// getItem returns an item from the cache by height. -func (c *Cache[T]) getItem(height uint64) *T { - item, ok := c.itemsByHeight.Get(height) - if !ok { - return nil - } - return item -} - -// setItem sets an item in the cache by height. -func (c *Cache[T]) setItem(height uint64, item *T) { - c.itemsByHeight.Add(height, item) -} - -// getNextItem returns and removes the item at height, or nil if absent. -func (c *Cache[T]) getNextItem(height uint64) *T { - c.itemsByHeightMu.Lock() - defer c.itemsByHeightMu.Unlock() - - item, ok := c.itemsByHeight.Get(height) - if !ok { - return nil - } - c.itemsByHeight.Remove(height) - return item -} - -// itemCount returns the number of items currently stored by height. -func (c *Cache[T]) itemCount() int { - return c.itemsByHeight.Len() +func (c *Cache) isSeen(hash string) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.hashes[hash] } -// isSeen returns true if the hash has been seen. -func (c *Cache[T]) isSeen(hash string) bool { - seen, ok := c.hashes.Get(hash) - return ok && seen +func (c *Cache) setSeen(hash string, height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.hashes[hash] = true + c.hashByHeight[height] = hash } -// setSeen sets the hash as seen and tracks its height for pruning. -func (c *Cache[T]) setSeen(hash string, height uint64) { - c.hashes.Add(hash, true) - c.hashByHeight.Add(height, hash) +func (c *Cache) removeSeen(hash string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.hashes, hash) } -// getDAIncluded returns the DA height if the hash has been DA-included. -func (c *Cache[T]) getDAIncluded(daCommitmentHash string) (uint64, bool) { - return c.daIncluded.Get(daCommitmentHash) +func (c *Cache) getDAIncluded(hash string) (uint64, bool) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.daIncluded[hash] + return v, ok } -// getDAIncludedByHeight resolves DA height via the height→hash index. -// Works for both real hashes (steady state) and snapshot placeholders -// (post-restart, before the DA retriever re-fires the real hash). -func (c *Cache[T]) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { - hash, ok := c.hashByHeight.Get(blockHeight) +func (c *Cache) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { + c.mu.Lock() + defer c.mu.Unlock() + hash, ok := c.hashByHeight[blockHeight] if !ok { return 0, false } - return c.getDAIncluded(hash) + v, exists := c.daIncluded[hash] + return v, exists } // setDAIncluded records DA inclusion in memory. // If a previous entry already exists at blockHeight (e.g. a placeholder from // RestoreFromStore), it is evicted from daIncluded to avoid orphan leaks. -func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { - if prev, ok := c.hashByHeight.Get(blockHeight); ok && prev != hash { - c.daIncluded.Remove(prev) +func (c *Cache) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { + c.mu.Lock() + defer c.mu.Unlock() + if prev, ok := c.hashByHeight[blockHeight]; ok && prev != hash { + delete(c.daIncluded, prev) } - c.daIncluded.Add(hash, daHeight) - c.hashByHeight.Add(blockHeight, hash) + c.daIncluded[hash] = daHeight + c.hashByHeight[blockHeight] = hash c.setMaxDAHeight(daHeight) } -// removeDAIncluded removes the DA-included status of the hash from the cache. -func (c *Cache[T]) removeDAIncluded(hash string) { - c.daIncluded.Remove(hash) +func (c *Cache) removeDAIncluded(hash string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.daIncluded, hash) } -// daHeight returns the maximum DA height from all DA-included items. -func (c *Cache[T]) daHeight() uint64 { +func (c *Cache) daHeight() uint64 { return c.maxDAHeight.Load() } -// setMaxDAHeight sets the maximum DA height if the provided value is greater. -func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { +func (c *Cache) setMaxDAHeight(daHeight uint64) { for range 1_000 { current := c.maxDAHeight.Load() if daHeight <= current { @@ -181,49 +125,41 @@ func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { } } -// removeSeen removes a hash from the seen cache. -func (c *Cache[T]) removeSeen(hash string) { - c.hashes.Remove(hash) -} - -// deleteAllForHeight removes all items and their associated data from the -// cache at the given height. -func (c *Cache[T]) deleteAllForHeight(height uint64) { - c.itemsByHeight.Remove(height) - - c.hashByHeightMu.Lock() - hash, ok := c.hashByHeight.Get(height) - if ok { - c.hashByHeight.Remove(height) +func (c *Cache) deleteAllForHeight(height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + hash, ok := c.hashByHeight[height] + if !ok { + return } - c.hashByHeightMu.Unlock() + delete(c.hashByHeight, height) + delete(c.hashes, hash) + delete(c.daIncluded, hash) +} - if ok { - c.hashes.Remove(hash) - c.daIncluded.Remove(hash) - } +func (c *Cache) daIncludedLen() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.daIncluded) } // persistSnapshot writes all current in-flight [blockHeight, daHeight] pairs to the store under a single key. // Only called explicitly via SaveToStore. NEVER CALL IT ON HOT-PATH TO AVOID BAGER WRITE AMPLIFICATION. -func (c *Cache[T]) persistSnapshot(ctx context.Context) error { +func (c *Cache) persistSnapshot(ctx context.Context) error { if c.store == nil || c.storeKeyPrefix == "" { return nil } - heights := c.hashByHeight.Keys() - entries := make([]snapshotEntry, 0, len(heights)) - for _, h := range heights { - hash, ok := c.hashByHeight.Peek(h) - if !ok { - continue - } - daH, ok := c.daIncluded.Peek(hash) + c.mu.Lock() + entries := make([]snapshotEntry, 0, len(c.hashByHeight)) + for h, hash := range c.hashByHeight { + daH, ok := c.daIncluded[hash] if !ok { continue } entries = append(entries, snapshotEntry{blockHeight: h, daHeight: daH}) } + c.mu.Unlock() return c.store.SetMetadata(ctx, c.snapshotKey(), encodeSnapshot(entries)) } @@ -257,8 +193,7 @@ func decodeSnapshot(buf []byte) []snapshotEntry { // RestoreFromStore loads the in-flight snapshot with a single store read. // Each entry is installed as a height placeholder; real hashes replace them // once the DA retriever re-fires SetHeaderDAIncluded after startup. -// Missing snapshot key is treated as a no-op (fresh node or pre-snapshot version). -func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { +func (c *Cache) RestoreFromStore(ctx context.Context) error { if c.store == nil || c.storeKeyPrefix == "" { return nil } @@ -271,10 +206,13 @@ func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { return fmt.Errorf("reading cache snapshot from store: %w", err) } + c.mu.Lock() + defer c.mu.Unlock() + for _, e := range decodeSnapshot(buf) { placeholder := HeightPlaceholderKey(c.storeKeyPrefix, e.blockHeight) - c.daIncluded.Add(placeholder, e.daHeight) - c.hashByHeight.Add(e.blockHeight, placeholder) + c.daIncluded[placeholder] = e.daHeight + c.hashByHeight[e.blockHeight] = placeholder c.setMaxDAHeight(e.daHeight) } @@ -297,7 +235,7 @@ func HeightPlaceholderKey(prefix string, height uint64) string { } // SaveToStore flushes the current snapshot to the store. -func (c *Cache[T]) SaveToStore(ctx context.Context) error { +func (c *Cache) SaveToStore(ctx context.Context) error { if c.store == nil { return nil } @@ -308,7 +246,7 @@ func (c *Cache[T]) SaveToStore(ctx context.Context) error { } // ClearFromStore deletes the snapshot key from the store. -func (c *Cache[T]) ClearFromStore(ctx context.Context) error { +func (c *Cache) ClearFromStore(ctx context.Context) error { if c.store == nil { return nil } diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index a1d36b487..d3766fc43 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -10,8 +10,6 @@ import ( pkgstore "github.com/evstack/ev-node/pkg/store" ) -type testItem struct{ V int } - // testMemStore creates an in-memory store for testing. func testMemStore(t *testing.T) pkgstore.Store { t.Helper() @@ -21,9 +19,7 @@ func testMemStore(t *testing.T) pkgstore.Store { } // writeSnapshot directly encodes and writes a snapshot into the store under -// the cache's snapshot key (storeKeyPrefix + "__snap"). This simulates the -// state that persistSnapshot would have written during a previous run, so that -// RestoreFromStore can recover from it. +// the cache's snapshot key (storeKeyPrefix + "__snap"). func writeSnapshot(t *testing.T, st pkgstore.Store, storeKeyPrefix string, entries []snapshotEntry) { t.Helper() buf := encodeSnapshot(entries) @@ -37,14 +33,14 @@ func writeSnapshot(t *testing.T, st pkgstore.Store, storeKeyPrefix string, entri // TestCache_MaxDAHeight verifies that daHeight tracks the maximum DA height // across successive setDAIncluded calls. func TestCache_MaxDAHeight(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") assert.Equal(t, uint64(0), c.daHeight(), "initial daHeight should be 0") c.setDAIncluded("hash1", 100, 1) assert.Equal(t, uint64(100), c.daHeight(), "after setDAIncluded(100)") - c.setDAIncluded("hash2", 50, 2) // lower, should not change max + c.setDAIncluded("hash2", 50, 2) assert.Equal(t, uint64(100), c.daHeight(), "after setDAIncluded(50)") c.setDAIncluded("hash3", 200, 3) @@ -52,7 +48,7 @@ func TestCache_MaxDAHeight(t *testing.T) { } // --------------------------------------------------------------------------- -// RestoreFromStore — O(1) snapshot-based recovery +// RestoreFromStore // --------------------------------------------------------------------------- // TestCache_RestoreFromStore_EmptyChain verifies that RestoreFromStore is a @@ -60,10 +56,10 @@ func TestCache_MaxDAHeight(t *testing.T) { func TestCache_RestoreFromStore_EmptyChain(t *testing.T) { st := testMemStore(t) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(context.Background())) - assert.Equal(t, 0, c.daIncluded.Len(), "no entries expected on empty chain") + assert.Equal(t, 0, c.daIncludedLen(), "no entries expected on empty chain") assert.Equal(t, uint64(0), c.daHeight()) } @@ -78,10 +74,10 @@ func TestCache_RestoreFromStore_FullyFinalized(t *testing.T) { // empty (persistSnapshot writes an empty buf when daIncluded is empty). writeSnapshot(t, st, "hdr/", nil) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 0, c.daIncluded.Len(), "no in-flight entries expected") + assert.Equal(t, 0, c.daIncludedLen(), "no in-flight entries expected") assert.Equal(t, uint64(0), c.daHeight(), "no in-flight entries means daHeight is 0") } @@ -91,30 +87,24 @@ func TestCache_RestoreFromStore_InFlightWindow(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // Simulate two in-flight entries written by a previous run: heights 4 and 5. writeSnapshot(t, st, "hdr/", []snapshotEntry{ {blockHeight: 4, daHeight: 13}, {blockHeight: 5, daHeight: 14}, }) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 2, c.daIncluded.Len(), "exactly the in-flight snapshot entries should be loaded") + assert.Equal(t, 2, c.daIncludedLen(), "exactly the in-flight snapshot entries should be loaded") assert.Equal(t, uint64(14), c.daHeight(), "maxDAHeight should reflect the highest in-flight DA height") - // Verify the placeholder keys are addressable by height via hashByHeight. - hash4, ok := c.hashByHeight.Get(4) - require.True(t, ok, "hashByHeight[4] should exist") - daH4, ok := c.daIncluded.Get(hash4) - require.True(t, ok) - assert.Equal(t, uint64(13), daH4) + hash4, ok := c.getDAIncludedByHeight(4) + require.True(t, ok, "height 4 should exist in daIncluded") + assert.Equal(t, uint64(13), hash4) - hash5, ok := c.hashByHeight.Get(5) - require.True(t, ok, "hashByHeight[5] should exist") - daH5, ok := c.daIncluded.Get(hash5) - require.True(t, ok) - assert.Equal(t, uint64(14), daH5) + hash5, ok := c.getDAIncludedByHeight(5) + require.True(t, ok, "height 5 should exist in daIncluded") + assert.Equal(t, uint64(14), hash5) } // TestCache_RestoreFromStore_SingleEntry verifies a snapshot with one in-flight @@ -127,24 +117,22 @@ func TestCache_RestoreFromStore_SingleEntry(t *testing.T) { {blockHeight: 3, daHeight: 20}, }) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 1, c.daIncluded.Len(), "one entry should be in-flight") + assert.Equal(t, 1, c.daIncludedLen(), "one entry should be in-flight") assert.Equal(t, uint64(20), c.daHeight()) - _, ok := c.hashByHeight.Get(4) + _, ok := c.getDAIncludedByHeight(4) assert.False(t, ok, "height 4 was not in snapshot") - _, ok = c.hashByHeight.Get(5) + _, ok = c.getDAIncludedByHeight(5) assert.False(t, ok, "height 5 was not in snapshot") } -// TestCache_RestoreFromStore_NilStore verifies that RestoreFromStore is a -// no-op when the cache has no backing store. func TestCache_RestoreFromStore_NilStore(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") require.NoError(t, c.RestoreFromStore(context.Background())) - assert.Equal(t, 0, c.daIncluded.Len()) + assert.Equal(t, 0, c.daIncludedLen()) } // TestCache_RestoreFromStore_PlaceholderOverwrittenByRealHash @@ -154,28 +142,24 @@ func TestCache_RestoreFromStore_PlaceholderOverwrittenByRealHash(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // Snapshot contains one in-flight entry for height 3. writeSnapshot(t, st, "hdr/", []snapshotEntry{ {blockHeight: 3, daHeight: 99}, }) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 1, c.daIncluded.Len(), "one placeholder for height 3") + assert.Equal(t, 1, c.daIncludedLen(), "one placeholder for height 3") - // Simulate the DA submitter writing the real hash entry. c.setDAIncluded("realHash_height3", 99, 3) - // hashByHeight[3] now points to the new real hash. - newHash, ok := c.hashByHeight.Get(3) - require.True(t, ok) - assert.Equal(t, "realHash_height3", newHash) - - // The real entry must be queryable by its content hash. daH, ok := c.getDAIncluded("realHash_height3") require.True(t, ok) assert.Equal(t, uint64(99), daH) + + daH2, ok := c.getDAIncludedByHeight(3) + require.True(t, ok) + assert.Equal(t, uint64(99), daH2) } // TestCache_RestoreFromStore_RoundTrip verifies that SaveToStore persists a @@ -184,53 +168,40 @@ func TestCache_RestoreFromStore_RoundTrip(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // First cache instance: write some in-flight entries, then flush (shutdown). - c1 := NewCache[testItem](st, "rt/") + c1 := NewCache(st, "rt/") c1.setDAIncluded("hashA", 10, 1) c1.setDAIncluded("hashB", 20, 2) c1.setDAIncluded("hashC", 30, 3) - // Remove one entry to confirm deletions are also snapshotted. c1.removeDAIncluded("hashB") require.NoError(t, c1.SaveToStore(ctx)) - // Second cache instance on same store: should recover {hashA→10, hashC→30}. - c2 := NewCache[testItem](st, "rt/") + c2 := NewCache(st, "rt/") require.NoError(t, c2.RestoreFromStore(ctx)) - assert.Equal(t, 2, c2.daIncluded.Len(), "only non-deleted entries should be restored") + assert.Equal(t, 2, c2.daIncludedLen(), "only non-deleted entries should be restored") assert.Equal(t, uint64(30), c2.daHeight()) - // Placeholder keys are created for heights 1 and 3 (height 2 was removed). - _, ok := c2.hashByHeight.Get(1) + _, ok := c2.getDAIncludedByHeight(1) assert.True(t, ok, "height 1 placeholder should exist") - _, ok = c2.hashByHeight.Get(2) + _, ok = c2.getDAIncludedByHeight(2) assert.False(t, ok, "height 2 was removed, should not exist") - _, ok = c2.hashByHeight.Get(3) + _, ok = c2.getDAIncludedByHeight(3) assert.True(t, ok, "height 3 placeholder should exist") } // --------------------------------------------------------------------------- -// Basic operations (no store required) +// Basic operations // --------------------------------------------------------------------------- func TestCache_BasicOperations(t *testing.T) { - c := NewCache[testItem](nil, "") - - // setItem / getItem - c.setItem(1, &testItem{V: 42}) - got := c.getItem(1) - require.NotNil(t, got) - assert.Equal(t, 42, got.V) - assert.Nil(t, c.getItem(999)) + c := NewCache(nil, "") - // setSeen / isSeen / removeSeen assert.False(t, c.isSeen("hash1")) c.setSeen("hash1", 1) assert.True(t, c.isSeen("hash1")) c.removeSeen("hash1") assert.False(t, c.isSeen("hash1")) - // setDAIncluded / getDAIncluded / removeDAIncluded _, ok := c.getDAIncluded("hash2") assert.False(t, ok) c.setDAIncluded("hash2", 100, 2) @@ -242,51 +213,22 @@ func TestCache_BasicOperations(t *testing.T) { assert.False(t, ok) } -func TestCache_GetNextItem(t *testing.T) { - c := NewCache[testItem](nil, "") - - c.setItem(1, &testItem{V: 1}) - c.setItem(2, &testItem{V: 2}) - c.setItem(3, &testItem{V: 3}) - - got := c.getNextItem(2) - require.NotNil(t, got) - assert.Equal(t, 2, got.V) - - // removed - assert.Nil(t, c.getNextItem(2)) - - // others intact - assert.NotNil(t, c.getItem(1)) - assert.NotNil(t, c.getItem(3)) -} - func TestCache_DeleteAllForHeight(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") - c.setItem(1, &testItem{V: 1}) - c.setItem(2, &testItem{V: 2}) c.setSeen("hash1", 1) c.setSeen("hash2", 2) c.deleteAllForHeight(1) - assert.Nil(t, c.getItem(1)) assert.False(t, c.isSeen("hash1")) - - assert.NotNil(t, c.getItem(2)) assert.True(t, c.isSeen("hash2")) } func TestCache_WithNilStore(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") require.NotNil(t, c) - c.setItem(1, &testItem{V: 1}) - got := c.getItem(1) - require.NotNil(t, got) - assert.Equal(t, 1, got.V) - c.setDAIncluded("hash1", 100, 1) daHeight, ok := c.getDAIncluded("hash1") assert.True(t, ok) @@ -301,19 +243,16 @@ func TestCache_SaveToStore(t *testing.T) { st := testMemStore(t) ctx := context.Background() - c := NewCache[testItem](st, "save-test/") + c := NewCache(st, "save-test/") c.setDAIncluded("hash1", 100, 1) c.setDAIncluded("hash2", 200, 2) require.NoError(t, c.SaveToStore(ctx)) - // SaveToStore rewrites the single snapshot key (storeKeyPrefix + "__snap"). - // Two entries × 16 bytes each = 32 bytes total. raw, err := st.GetMetadata(ctx, "save-test/__snap") require.NoError(t, err) assert.Len(t, raw, 2*snapshotEntrySize, "snapshot should contain 2 entries of 16 bytes each") - // The individual per-hash keys are NOT written by the snapshot design. _, err = st.GetMetadata(ctx, "save-test/hash1") assert.Error(t, err, "per-hash keys should not exist in the snapshot design") } @@ -322,12 +261,11 @@ func TestCache_ClearFromStore(t *testing.T) { st := testMemStore(t) ctx := context.Background() - c := NewCache[testItem](st, "clear-test/") + c := NewCache(st, "clear-test/") c.setDAIncluded("hash1", 100, 1) c.setDAIncluded("hash2", 200, 2) require.NoError(t, c.SaveToStore(ctx)) - // Verify the snapshot key was written before clearing. _, err := st.GetMetadata(ctx, "clear-test/__snap") require.NoError(t, err, "snapshot key should exist before ClearFromStore") @@ -337,26 +275,10 @@ func TestCache_ClearFromStore(t *testing.T) { assert.Error(t, err, "snapshot key should have been removed from store") } -// --------------------------------------------------------------------------- -// Large-dataset smoke test -// --------------------------------------------------------------------------- - -func TestCache_LargeDataset(t *testing.T) { - c := NewCache[testItem](nil, "") - const N = 20_000 - for i := N - 1; i >= 0; i-- { - c.setItem(uint64(i), &testItem{V: i}) - } - for i := 5000; i < 10000; i += 2 { - c.getNextItem(uint64(i)) - } -} - // --------------------------------------------------------------------------- // heightPlaceholderKey // --------------------------------------------------------------------------- -// TestHeightPlaceholderKey verifies the placeholder key format and uniqueness. func TestHeightPlaceholderKey(t *testing.T) { k0 := HeightPlaceholderKey("pfx/", 0) k1 := HeightPlaceholderKey("pfx/", 1) @@ -365,12 +287,10 @@ func TestHeightPlaceholderKey(t *testing.T) { assert.NotEqual(t, k0, k1) assert.NotEqual(t, k1, kMax) - // Must start with the provided prefix. assert.Contains(t, k0, "pfx/") assert.Contains(t, k1, "pfx/") assert.Contains(t, kMax, "pfx/") - // Different prefixes must not collide. assert.NotEqual(t, HeightPlaceholderKey("a/", 1), HeightPlaceholderKey("b/", 1)) } @@ -383,35 +303,28 @@ func TestCache_NoPlaceholderLeakAfterRefire(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // Step 1: initial run — write a real hash for height 3, then flush (shutdown). - c1 := NewCache[testItem](st, "pfx/") + c1 := NewCache(st, "pfx/") c1.setDAIncluded("realHash3", 99, 3) require.NoError(t, c1.SaveToStore(ctx)) - // snapshot now contains [{blockHeight:3, daHeight:99}] - // Step 2: restart — placeholder installed for height 3. - c2 := NewCache[testItem](st, "pfx/") + c2 := NewCache(st, "pfx/") require.NoError(t, c2.RestoreFromStore(ctx)) placeholder := HeightPlaceholderKey("pfx/", 3) - _, placeholderPresent := c2.daIncluded.Get(placeholder) + _, placeholderPresent := c2.getDAIncluded(placeholder) require.True(t, placeholderPresent, "placeholder must be present immediately after restore") - assert.Equal(t, 1, c2.daIncluded.Len(), "only one entry expected before re-fire") + assert.Equal(t, 1, c2.daIncludedLen(), "only one entry expected before re-fire") - // Step 3: DA retriever re-fires with the real hash. c2.setDAIncluded("realHash3", 99, 3) - // The real hash must be present. daH, ok := c2.getDAIncluded("realHash3") require.True(t, ok, "real hash must be present after re-fire") assert.Equal(t, uint64(99), daH) - // The placeholder must be gone — no orphan leak. - _, placeholderPresent = c2.daIncluded.Get(placeholder) + _, placeholderPresent = c2.getDAIncluded(placeholder) assert.False(t, placeholderPresent, "placeholder must be evicted after real hash is written") - // Total entries must still be exactly one. - assert.Equal(t, 1, c2.daIncluded.Len(), "exactly one daIncluded entry after re-fire — no orphan") + assert.Equal(t, 1, c2.daIncludedLen(), "exactly one daIncluded entry after re-fire — no orphan") } // TestCache_RestartIdempotent verifies that multiple successive restarts all @@ -427,35 +340,54 @@ func TestCache_RestartIdempotent(t *testing.T) { const blockH = uint64(5) const daH = uint64(42) - // ── Run 1: normal operation, height 5 in-flight; flush at shutdown ─────── - c1 := NewCache[testItem](st, "pfx/") + c1 := NewCache(st, "pfx/") c1.setDAIncluded(realHash, daH, blockH) require.NoError(t, c1.SaveToStore(ctx)) - // snapshot: [{5, 42}] for restart := 1; restart <= 3; restart++ { - // ── Restart N: restore from snapshot - cR := NewCache[testItem](st, "pfx/") + cR := NewCache(st, "pfx/") require.NoError(t, cR.RestoreFromStore(ctx), "restart %d: RestoreFromStore", restart) - assert.Equal(t, 1, cR.daIncluded.Len(), "restart %d: one placeholder entry", restart) + assert.Equal(t, 1, cR.daIncludedLen(), "restart %d: one placeholder entry", restart) assert.Equal(t, daH, cR.daHeight(), "restart %d: daHeight correct", restart) - // Fallback lookup by height must work. gotDAH, ok := cR.getDAIncludedByHeight(blockH) require.True(t, ok, "restart %d: height-based lookup must succeed", restart) assert.Equal(t, daH, gotDAH, "restart %d: height-based DA height correct", restart) - // ── DA retriever re-fires with the real hash, then flushes (shutdown). cR.setDAIncluded(realHash, daH, blockH) require.NoError(t, cR.SaveToStore(ctx), "restart %d: SaveToStore", restart) - // After re-fire: real hash present, no orphan, snapshot updated. - _, realPresent := cR.daIncluded.Get(realHash) + _, realPresent := cR.getDAIncluded(realHash) assert.True(t, realPresent, "restart %d: real hash present after re-fire", restart) - assert.Equal(t, 1, cR.daIncluded.Len(), "restart %d: no orphan after re-fire", restart) - - // The snapshot written by SaveToStore must still encode the right data - // so the next restart can load it correctly. + assert.Equal(t, 1, cR.daIncludedLen(), "restart %d: no orphan after re-fire", restart) } } + +// --------------------------------------------------------------------------- +// Cleanup via deleteAllForHeight +// --------------------------------------------------------------------------- + +func TestCache_DeleteAllForHeight_CleansHashAndDA(t *testing.T) { + c := NewCache(nil, "") + + c.setDAIncluded("hash1", 100, 1) + c.setSeen("hash1", 1) + c.setDAIncluded("hash2", 200, 2) + c.setSeen("hash2", 2) + + assert.Equal(t, 2, c.daIncludedLen()) + assert.True(t, c.isSeen("hash1")) + assert.True(t, c.isSeen("hash2")) + + c.deleteAllForHeight(1) + + assert.Equal(t, 1, c.daIncludedLen()) + assert.False(t, c.isSeen("hash1")) + assert.True(t, c.isSeen("hash2")) + + _, ok := c.getDAIncludedByHeight(1) + assert.False(t, ok) + _, ok = c.getDAIncludedByHeight(2) + assert.True(t, ok) +} diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 6e22816c3..4823ea4ae 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -89,24 +89,24 @@ type Manager interface { var _ Manager = (*implementation)(nil) type implementation struct { - headerCache *Cache[types.SignedHeader] - dataCache *Cache[types.Data] - txCache *Cache[struct{}] - txTimestamps *sync.Map // map[string]time.Time - pendingEventsCache *Cache[common.DAHeightEvent] - pendingHeaders *PendingHeaders - pendingData *PendingData - store store.Store - config config.Config - logger zerolog.Logger + headerCache *Cache + dataCache *Cache + txCache *Cache + txTimestamps *sync.Map // map[string]time.Time + pendingEvents *pendingEventsMap[common.DAHeightEvent] + pendingHeaders *PendingHeaders + pendingData *PendingData + store store.Store + config config.Config + logger zerolog.Logger } // NewManager creates a new Manager, restoring or clearing persisted state as configured. func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { - headerCache := NewCache[types.SignedHeader](st, HeaderDAIncludedPrefix) - dataCache := NewCache[types.Data](st, DataDAIncludedPrefix) - txCache := NewCache[struct{}](nil, "") - pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") + headerCache := NewCache(st, HeaderDAIncludedPrefix) + dataCache := NewCache(st, DataDAIncludedPrefix) + txCache := NewCache(nil, "") + pendingEvents := newPendingEventsMap[common.DAHeightEvent]() pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { @@ -119,16 +119,16 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag } impl := &implementation{ - headerCache: headerCache, - dataCache: dataCache, - txCache: txCache, - txTimestamps: new(sync.Map), - pendingEventsCache: pendingEventsCache, - pendingHeaders: pendingHeaders, - pendingData: pendingData, - store: st, - config: cfg, - logger: logger, + headerCache: headerCache, + dataCache: dataCache, + txCache: txCache, + txTimestamps: new(sync.Map), + pendingEvents: pendingEvents, + pendingHeaders: pendingHeaders, + pendingData: pendingData, + store: st, + config: cfg, + logger: logger, } if cfg.ClearCache { @@ -252,7 +252,7 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { func (m *implementation) DeleteHeight(blockHeight uint64) { m.headerCache.deleteAllForHeight(blockHeight) m.dataCache.deleteAllForHeight(blockHeight) - m.pendingEventsCache.deleteAllForHeight(blockHeight) + m.pendingEvents.deleteAllForHeight(blockHeight) // Note: txCache is intentionally NOT deleted here because: // 1. Transactions are tracked by hash, not by block height (they use height 0) @@ -319,17 +319,17 @@ func (m *implementation) NumPendingData() uint64 { // SetPendingEvent sets the event at the specified height. func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEvent) { - m.pendingEventsCache.setItem(height, event) + m.pendingEvents.setItem(height, event) } func (m *implementation) PendingEventsCount() int { - return m.pendingEventsCache.itemCount() + return m.pendingEvents.itemCount() } // GetNextPendingEvent efficiently retrieves and removes the event at the specified height. // Returns nil if no event exists at that height. func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEvent { - return m.pendingEventsCache.getNextItem(height) + return m.pendingEvents.getNextItem(height) } // SaveToStore flushes the DA inclusion snapshot to the store. @@ -364,8 +364,8 @@ func (m *implementation) RestoreFromStore() error { m.initDAHeightFromStore(ctx) m.logger.Info(). - Int("header_entries", m.headerCache.daIncluded.Len()). - Int("data_entries", m.dataCache.daIncluded.Len()). + Int("header_entries", m.headerCache.daIncludedLen()). + Int("data_entries", m.dataCache.daIncludedLen()). Uint64("da_height", m.DaHeight()). Msg("restored DA inclusion cache from store") @@ -384,10 +384,10 @@ func (m *implementation) ClearFromStore() error { return fmt.Errorf("failed to clear data cache from store: %w", err) } - m.headerCache = NewCache[types.SignedHeader](m.store, HeaderDAIncludedPrefix) - m.dataCache = NewCache[types.Data](m.store, DataDAIncludedPrefix) - m.txCache = NewCache[struct{}](nil, "") - m.pendingEventsCache = NewCache[common.DAHeightEvent](nil, "") + m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix) + m.dataCache = NewCache(m.store, DataDAIncludedPrefix) + m.txCache = NewCache(nil, "") + m.pendingEvents = newPendingEventsMap[common.DAHeightEvent]() // Initialize DA height from store metadata to ensure DaHeight() is never 0. m.initDAHeightFromStore(ctx) diff --git a/block/internal/cache/pending_events_map.go b/block/internal/cache/pending_events_map.go new file mode 100644 index 000000000..83d6ea065 --- /dev/null +++ b/block/internal/cache/pending_events_map.go @@ -0,0 +1,45 @@ +package cache + +import "sync" + +// pendingEventsMap stores height-keyed events. +// Events are removed by getNextItem (once processed) or deleteAllForHeight (once DA-included/finalized). +type pendingEventsMap[T any] struct { + mu sync.Mutex + items map[uint64]*T +} + +func newPendingEventsMap[T any]() *pendingEventsMap[T] { + return &pendingEventsMap[T]{ + items: make(map[uint64]*T), + } +} + +func (m *pendingEventsMap[T]) setItem(height uint64, item *T) { + m.mu.Lock() + defer m.mu.Unlock() + m.items[height] = item +} + +func (m *pendingEventsMap[T]) getNextItem(height uint64) *T { + m.mu.Lock() + defer m.mu.Unlock() + item, ok := m.items[height] + if !ok { + return nil + } + delete(m.items, height) + return item +} + +func (m *pendingEventsMap[T]) itemCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.items) +} + +func (m *pendingEventsMap[T]) deleteAllForHeight(height uint64) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.items, height) +} diff --git a/block/internal/cache/pending_events_map_test.go b/block/internal/cache/pending_events_map_test.go new file mode 100644 index 000000000..ca337deb1 --- /dev/null +++ b/block/internal/cache/pending_events_map_test.go @@ -0,0 +1,75 @@ +package cache + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/types" +) + +func makeTestEvent(height uint64) *common.DAHeightEvent { + return &common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}}, + DaHeight: height, + } +} + +func TestPendingEventsMap_BasicCRUD(t *testing.T) { + t.Parallel() + m := newPendingEventsMap[common.DAHeightEvent]() + + evt1 := makeTestEvent(1) + evt3 := makeTestEvent(3) + evt5 := makeTestEvent(5) + + m.setItem(1, evt1) + m.setItem(3, evt3) + m.setItem(5, evt5) + + assert.Equal(t, 3, m.itemCount()) + + got1 := m.getNextItem(1) + require.NotNil(t, got1) + assert.Equal(t, uint64(1), got1.Header.Height()) + + assert.Equal(t, 2, m.itemCount()) + + got1Again := m.getNextItem(1) + assert.Nil(t, got1Again) + + got3 := m.getNextItem(3) + require.NotNil(t, got3) + assert.Equal(t, uint64(3), got3.Header.Height()) +} + +func TestPendingEventsMap_UpdateExisting(t *testing.T) { + t.Parallel() + m := newPendingEventsMap[common.DAHeightEvent]() + + evt1 := makeTestEvent(1) + m.setItem(1, evt1) + assert.Equal(t, 1, m.itemCount()) + + evt1Updated := makeTestEvent(1) + m.setItem(1, evt1Updated) + assert.Equal(t, 1, m.itemCount()) +} + +func TestPendingEventsMap_DeleteAllForHeight(t *testing.T) { + t.Parallel() + m := newPendingEventsMap[common.DAHeightEvent]() + + m.setItem(1, makeTestEvent(1)) + m.setItem(2, makeTestEvent(2)) + m.setItem(3, makeTestEvent(3)) + + m.deleteAllForHeight(2) + assert.Equal(t, 2, m.itemCount()) + + assert.Nil(t, m.getNextItem(2)) + assert.NotNil(t, m.getNextItem(1)) + assert.NotNil(t, m.getNextItem(3)) +} From dad4eb2c119cb936b202a9efe2110798728cb363 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Mar 2026 15:35:51 +0100 Subject: [PATCH 12/13] simplify --- block/internal/cache/manager.go | 28 +++++-- block/internal/cache/pending_events_map.go | 45 ----------- .../internal/cache/pending_events_map_test.go | 75 ------------------- 3 files changed, 20 insertions(+), 128 deletions(-) delete mode 100644 block/internal/cache/pending_events_map.go delete mode 100644 block/internal/cache/pending_events_map_test.go diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 4823ea4ae..6f1b9d9cf 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -93,7 +93,8 @@ type implementation struct { dataCache *Cache txCache *Cache txTimestamps *sync.Map // map[string]time.Time - pendingEvents *pendingEventsMap[common.DAHeightEvent] + pendingEvents map[uint64]*common.DAHeightEvent + pendingMu sync.Mutex pendingHeaders *PendingHeaders pendingData *PendingData store store.Store @@ -106,7 +107,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag headerCache := NewCache(st, HeaderDAIncludedPrefix) dataCache := NewCache(st, DataDAIncludedPrefix) txCache := NewCache(nil, "") - pendingEvents := newPendingEventsMap[common.DAHeightEvent]() pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { @@ -123,7 +123,7 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag dataCache: dataCache, txCache: txCache, txTimestamps: new(sync.Map), - pendingEvents: pendingEvents, + pendingEvents: make(map[uint64]*common.DAHeightEvent), pendingHeaders: pendingHeaders, pendingData: pendingData, store: st, @@ -252,7 +252,9 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { func (m *implementation) DeleteHeight(blockHeight uint64) { m.headerCache.deleteAllForHeight(blockHeight) m.dataCache.deleteAllForHeight(blockHeight) - m.pendingEvents.deleteAllForHeight(blockHeight) + m.pendingMu.Lock() + delete(m.pendingEvents, blockHeight) + m.pendingMu.Unlock() // Note: txCache is intentionally NOT deleted here because: // 1. Transactions are tracked by hash, not by block height (they use height 0) @@ -319,17 +321,27 @@ func (m *implementation) NumPendingData() uint64 { // SetPendingEvent sets the event at the specified height. func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEvent) { - m.pendingEvents.setItem(height, event) + m.pendingMu.Lock() + m.pendingEvents[height] = event + m.pendingMu.Unlock() } func (m *implementation) PendingEventsCount() int { - return m.pendingEvents.itemCount() + m.pendingMu.Lock() + defer m.pendingMu.Unlock() + return len(m.pendingEvents) } // GetNextPendingEvent efficiently retrieves and removes the event at the specified height. // Returns nil if no event exists at that height. func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEvent { - return m.pendingEvents.getNextItem(height) + m.pendingMu.Lock() + item, ok := m.pendingEvents[height] + if ok { + delete(m.pendingEvents, height) + } + m.pendingMu.Unlock() + return item } // SaveToStore flushes the DA inclusion snapshot to the store. @@ -387,7 +399,7 @@ func (m *implementation) ClearFromStore() error { m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix) m.dataCache = NewCache(m.store, DataDAIncludedPrefix) m.txCache = NewCache(nil, "") - m.pendingEvents = newPendingEventsMap[common.DAHeightEvent]() + m.pendingEvents = make(map[uint64]*common.DAHeightEvent) // Initialize DA height from store metadata to ensure DaHeight() is never 0. m.initDAHeightFromStore(ctx) diff --git a/block/internal/cache/pending_events_map.go b/block/internal/cache/pending_events_map.go deleted file mode 100644 index 83d6ea065..000000000 --- a/block/internal/cache/pending_events_map.go +++ /dev/null @@ -1,45 +0,0 @@ -package cache - -import "sync" - -// pendingEventsMap stores height-keyed events. -// Events are removed by getNextItem (once processed) or deleteAllForHeight (once DA-included/finalized). -type pendingEventsMap[T any] struct { - mu sync.Mutex - items map[uint64]*T -} - -func newPendingEventsMap[T any]() *pendingEventsMap[T] { - return &pendingEventsMap[T]{ - items: make(map[uint64]*T), - } -} - -func (m *pendingEventsMap[T]) setItem(height uint64, item *T) { - m.mu.Lock() - defer m.mu.Unlock() - m.items[height] = item -} - -func (m *pendingEventsMap[T]) getNextItem(height uint64) *T { - m.mu.Lock() - defer m.mu.Unlock() - item, ok := m.items[height] - if !ok { - return nil - } - delete(m.items, height) - return item -} - -func (m *pendingEventsMap[T]) itemCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return len(m.items) -} - -func (m *pendingEventsMap[T]) deleteAllForHeight(height uint64) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.items, height) -} diff --git a/block/internal/cache/pending_events_map_test.go b/block/internal/cache/pending_events_map_test.go deleted file mode 100644 index ca337deb1..000000000 --- a/block/internal/cache/pending_events_map_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package cache - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/types" -) - -func makeTestEvent(height uint64) *common.DAHeightEvent { - return &common.DAHeightEvent{ - Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}}, - DaHeight: height, - } -} - -func TestPendingEventsMap_BasicCRUD(t *testing.T) { - t.Parallel() - m := newPendingEventsMap[common.DAHeightEvent]() - - evt1 := makeTestEvent(1) - evt3 := makeTestEvent(3) - evt5 := makeTestEvent(5) - - m.setItem(1, evt1) - m.setItem(3, evt3) - m.setItem(5, evt5) - - assert.Equal(t, 3, m.itemCount()) - - got1 := m.getNextItem(1) - require.NotNil(t, got1) - assert.Equal(t, uint64(1), got1.Header.Height()) - - assert.Equal(t, 2, m.itemCount()) - - got1Again := m.getNextItem(1) - assert.Nil(t, got1Again) - - got3 := m.getNextItem(3) - require.NotNil(t, got3) - assert.Equal(t, uint64(3), got3.Header.Height()) -} - -func TestPendingEventsMap_UpdateExisting(t *testing.T) { - t.Parallel() - m := newPendingEventsMap[common.DAHeightEvent]() - - evt1 := makeTestEvent(1) - m.setItem(1, evt1) - assert.Equal(t, 1, m.itemCount()) - - evt1Updated := makeTestEvent(1) - m.setItem(1, evt1Updated) - assert.Equal(t, 1, m.itemCount()) -} - -func TestPendingEventsMap_DeleteAllForHeight(t *testing.T) { - t.Parallel() - m := newPendingEventsMap[common.DAHeightEvent]() - - m.setItem(1, makeTestEvent(1)) - m.setItem(2, makeTestEvent(2)) - m.setItem(3, makeTestEvent(3)) - - m.deleteAllForHeight(2) - assert.Equal(t, 2, m.itemCount()) - - assert.Nil(t, m.getNextItem(2)) - assert.NotNil(t, m.getNextItem(1)) - assert.NotNil(t, m.getNextItem(3)) -} From 952339c633b3be3beb2ae3249f836b5ba1b343db Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Mar 2026 17:05:33 +0100 Subject: [PATCH 13/13] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8724961f..8f2764f39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Avoid evicting yet to be processed heights [#3204](https://github.com/evstack/ev-node/pull/3204) - Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201) - Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162) - Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167)