diff --git a/pkg/node/node.go b/pkg/node/node.go index 50ed1a44b82..120587a9270 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -438,7 +438,7 @@ func NewBee( } }(probe) - stamperStore, err := InitStamperStore(logger, o.DataDir, stateStore) + stamperStore, wasClean, err := InitStamperStore(logger, o.DataDir, stateStore) if err != nil { return nil, fmt.Errorf("failed to initialize stamper store: %w", err) } @@ -660,7 +660,7 @@ func NewBee( b.p2pService = p2ps b.p2pHalter = p2ps - post, err := postage.NewService(logger, stamperStore, batchStore, chainID) + post, err := postage.NewService(logger, stamperStore, batchStore, chainID, wasClean) if err != nil { return nil, fmt.Errorf("postage service: %w", err) } diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index 626848cf88e..a526b924985 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -27,7 +27,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st } else { dataDir = filepath.Join(dataDir, "statestore") } - ldb, err := leveldbstore.New(dataDir, nil) + ldb, _, err := leveldbstore.New(dataDir, nil) if err != nil { return nil, nil, err } @@ -45,18 +45,18 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st // InitStamperStore will create new stamper store with the given path to the // data directory. When given an empty directory path, the function will instead // initialize an in-memory state store that will not be persisted. -func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, error) { +// The returned bool indicates whether the previous shutdown was unclean (dirty). +func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, bool, error) { if dataDir == "" { logger.Warning("using in-mem stamper store, no node state will be persisted") } else { dataDir = filepath.Join(dataDir, "stamperstore") } - stamperStore, err := leveldbstore.New(dataDir, nil) + store, dirty, err := leveldbstore.New(dataDir, nil) if err != nil { - return nil, err + return nil, false, err } - - return stamperStore, nil + return store, dirty, nil } const ( diff --git a/pkg/postage/service.go b/pkg/postage/service.go index 25091fb2f19..9f34ddc2485 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -13,6 +13,7 @@ import ( "io" "math/big" "sync" + "time" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/storage" @@ -27,6 +28,11 @@ const ( blockThreshold = 10 ) +const ( + // stampIssuerSaveInterval is how often dirty stamp issuers are flushed to disk. + stampIssuerSaveInterval = time.Minute +) + var ( // ErrNotFound is the error returned when issuer with given batch ID does not exist. ErrNotFound = errors.New("not found") @@ -54,18 +60,24 @@ type service struct { postageStore Storer chainID int64 issuers []*StampIssuer + + quit chan struct{} + done chan struct{} } -// NewService constructs a new Service. -func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64) (Service, error) { +// NewService constructs a new Service. wasClean indicates whether the previous +// shutdown was graceful; if false, bucket counts are recovered from the stamp store. +func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64, wasClean bool) (Service, error) { s := &service{ logger: logger.WithName(loggerName).Register(), store: store, postageStore: postageStore, chainID: chainID, + quit: make(chan struct{}), + done: make(chan struct{}), } - return s, s.store.Iterate( + err := s.store.Iterate( storage.Query{ Factory: func() storage.Item { return new(StampIssuerItem) @@ -75,6 +87,68 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha _ = s.add(issuer) return false, nil }) + if err != nil { + return nil, err + } + + if !wasClean { + s.logger.Info("recovering bucket counts from stamper store") + if err := s.recoverBuckets(); err != nil { + s.logger.Error(err, "postage stamper store recovery failed") + } + } + + go s.run() + + return s, nil +} + +func (s *service) recoverBuckets() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.store.Iterate( + storage.Query{ + Factory: func() storage.Item { return new(StampItem) }, + }, func(result storage.Result) (bool, error) { + item := result.Entry.(*StampItem) + for _, issuer := range s.issuers { + if bytes.Equal(issuer.data.BatchID, item.BatchID) { + if err := issuer.recover(item.BatchIndex); err != nil { + s.logger.Error(err, "postage recovery of bucket count failed") + } + break + } + } + return false, nil + }) +} + +func (s *service) run() { + defer close(s.done) + // using 1 minute to significantly reduce disk writes + ticker := time.NewTicker(stampIssuerSaveInterval) + defer ticker.Stop() + + for { + select { + case <-s.quit: + return + case <-ticker.C: + s.mtx.Lock() + issuers := make([]*StampIssuer, len(s.issuers)) + copy(issuers, s.issuers) + s.mtx.Unlock() + + for _, issuer := range issuers { + if issuer.isDirty() { + if err := s.save(issuer); err != nil { + s.logger.Error(err, "failed to save stamp issuer") + } + } + } + } + } } // Add adds a stamp issuer to the active issuers. @@ -163,9 +237,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e return nil, nil, ErrNotUsable } return st, func() error { - ps.mtx.Lock() - defer ps.mtx.Unlock() - return ps.save(st) + st.setDirty(true) + return nil }, nil } } @@ -182,15 +255,21 @@ func (ps *service) save(st *StampIssuer) error { }); err != nil { return err } + st.dirty = false return nil } func (ps *service) Close() error { + close(ps.quit) + <-ps.done + ps.mtx.Lock() defer ps.mtx.Unlock() var err error for _, issuer := range ps.issuers { - err = errors.Join(err, ps.save(issuer)) + if issuer.isDirty() { + err = errors.Join(err, ps.save(issuer)) + } } return err } diff --git a/pkg/postage/service_test.go b/pkg/postage/service_test.go index 1237fecc066..a1d06eaac57 100644 --- a/pkg/postage/service_test.go +++ b/pkg/postage/service_test.go @@ -32,7 +32,7 @@ func TestSaveLoad(t *testing.T) { defer store.Close() pstore := pstoremock.New() saved := func(id int64) postage.Service { - ps, err := postage.NewService(log.Noop, store, pstore, id) + ps, err := postage.NewService(log.Noop, store, pstore, id, true) if err != nil { t.Fatal(err) } @@ -48,7 +48,7 @@ func TestSaveLoad(t *testing.T) { return ps } loaded := func(id int64) postage.Service { - ps, err := postage.NewService(log.Noop, store, pstore, id) + ps, err := postage.NewService(log.Noop, store, pstore, id, true) if err != nil { t.Fatal(err) } @@ -57,6 +57,7 @@ func TestSaveLoad(t *testing.T) { test := func(id int64) { psS := saved(id) psL := loaded(id) + defer psL.Close() sMap := map[string]struct{}{} stampIssuers := psS.StampIssuers() @@ -87,10 +88,12 @@ func TestGetStampIssuer(t *testing.T) { } validBlockNumber := testChainState.Block - uint64(postage.BlockThreshold+1) pstore := pstoremock.New(pstoremock.WithChainState(testChainState)) - ps, err := postage.NewService(log.Noop, store, pstore, chainID) + ps, err := postage.NewService(log.Noop, store, pstore, chainID, true) if err != nil { t.Fatal(err) } + defer ps.Close() + ids := make([][]byte, 8) for i := range ids { id := make([]byte, 32) @@ -224,7 +227,7 @@ func TestSetExpired(t *testing.T) { return bytes.Equal(b, batch), nil })) - ps, err := postage.NewService(log.Noop, store, pstore, 1) + ps, err := postage.NewService(log.Noop, store, pstore, 1, true) if err != nil { t.Fatal(err) } @@ -298,3 +301,68 @@ func TestSetExpired(t *testing.T) { testutil.CleanupCloser(t, ps) } + +// TestCrashRecovery verifies that bucket counts are restored from stamp items +// when the service starts after an unclean shutdown (wasClean=false). +func TestCrashRecovery(t *testing.T) { + t.Parallel() + + store := inmemstore.New() + defer store.Close() + pstore := pstoremock.New() + + issuer := newTestStampIssuer(t, 1000) + batchID := issuer.ID() + + // Pick two random chunk addresses and compute their bucket indices. + chunkAddr0 := swarm.RandAddress(t) + chunkAddr1 := swarm.RandAddress(t) + bIdx0 := postage.ToBucket(issuer.BucketDepth(), chunkAddr0) + bIdx1 := postage.ToBucket(issuer.BucketDepth(), chunkAddr1) + + // Write StampItems directly, simulating stamps issued before a crash + // without the issuer bucket state being saved. + // bIdx0: issued at collision count 2 → bucket should recover to 3 + // bIdx1: issued at collision count 0 → bucket should recover to 1 + items := []*postage.StampItem{ + postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr0).WithBatchIndex(postage.IndexToBytes(bIdx0, 2)), + postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr1).WithBatchIndex(postage.IndexToBytes(bIdx1, 0)), + } + for _, item := range items { + if err := store.Put(item); err != nil { + t.Fatal(err) + } + } + + // Save the issuer with zero bucket counts to the store. + ps, err := postage.NewService(log.Noop, store, pstore, 1, true) + if err != nil { + t.Fatal(err) + } + if err := ps.Add(issuer); err != nil { + t.Fatal(err) + } + if err := ps.Close(); err != nil { + t.Fatal(err) + } + + // Restart with wasClean=false — should trigger bucket recovery. + ps2, err := postage.NewService(log.Noop, store, pstore, 1, false) + if err != nil { + t.Fatal(err) + } + defer ps2.Close() + + issuers := ps2.StampIssuers() + if len(issuers) != 1 { + t.Fatalf("expected 1 issuer, got %d", len(issuers)) + } + + buckets := issuers[0].Buckets() + if buckets[bIdx0] != 3 { + t.Errorf("bucket %d: want 3, got %d", bIdx0, buckets[bIdx0]) + } + if buckets[bIdx1] != 1 { + t.Errorf("bucket %d: want 1, got %d", bIdx1, buckets[bIdx1]) + } +} diff --git a/pkg/postage/stampissuer.go b/pkg/postage/stampissuer.go index 77b632cf0f4..87686f4d5bc 100644 --- a/pkg/postage/stampissuer.go +++ b/pkg/postage/stampissuer.go @@ -151,8 +151,9 @@ func (s stampIssuerData) Clone() stampIssuerData { // A StampIssuer instance extends a batch with bucket collision tracking // embedded in multiple Stampers, can be used concurrently. type StampIssuer struct { - data stampIssuerData - mtx sync.Mutex + data stampIssuerData + dirty bool + mtx sync.Mutex } // NewStampIssuer constructs a StampIssuer as an extension of a batch for local @@ -268,6 +269,42 @@ func (si *StampIssuer) Buckets() []uint32 { return b } +// setDirty sets the dirty flag of the StampIssuer indicating it has unsaved bucket changes. +func (si *StampIssuer) setDirty(dirty bool) { + si.mtx.Lock() + defer si.mtx.Unlock() + si.dirty = dirty +} + +// isDirty returns the dirty flag of the StampIssuer. +func (si *StampIssuer) isDirty() bool { + si.mtx.Lock() + defer si.mtx.Unlock() + return si.dirty +} + +// recover restores the bucket count from a stored batchIndex, used during crash recovery. +func (si *StampIssuer) recover(batchIndex []byte) error { + si.mtx.Lock() + defer si.mtx.Unlock() + + bIdx, bCnt := BucketIndexFromBytes(batchIndex) + if bIdx >= uint32(len(si.data.Buckets)) { + return fmt.Errorf("bucket index %d out of bounds", bIdx) + } + + // bCnt is the collision count WHEN the stamp was issued, + // meaning the bucket count has already reached AT LEAST bCnt + 1 + if si.data.Buckets[bIdx] <= bCnt { + si.data.Buckets[bIdx] = bCnt + 1 + + if si.data.Buckets[bIdx] > si.data.MaxBucketCount { + si.data.MaxBucketCount = si.data.Buckets[bIdx] + } + } + return nil +} + // StampIssuerItem is a storage.Item implementation for StampIssuer. type StampIssuerItem struct { Issuer *StampIssuer diff --git a/pkg/statestore/storeadapter/storeadapter_test.go b/pkg/statestore/storeadapter/storeadapter_test.go index f90db7b7ce1..5e0a51cd77d 100644 --- a/pkg/statestore/storeadapter/storeadapter_test.go +++ b/pkg/statestore/storeadapter/storeadapter_test.go @@ -36,7 +36,7 @@ func TestStateStoreAdapter(t *testing.T) { test.RunPersist(t, func(t *testing.T, dir string) storage.StateStorer { t.Helper() - leveldb, err := leveldbstore.New(dir, nil) + leveldb, _, err := leveldbstore.New(dir, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/cache/cache_test.go b/pkg/storage/cache/cache_test.go index dd9ec24d611..9cbd0478cc8 100644 --- a/pkg/storage/cache/cache_test.go +++ b/pkg/storage/cache/cache_test.go @@ -16,7 +16,7 @@ import ( func TestCache(t *testing.T) { t.Parallel() - store, err := leveldbstore.New(t.TempDir(), nil) + store, _, err := leveldbstore.New(t.TempDir(), nil) if err != nil { t.Fatalf("create store failed: %v", err) } diff --git a/pkg/storage/leveldbstore/store.go b/pkg/storage/leveldbstore/store.go index bf410d54c00..c062200db82 100644 --- a/pkg/storage/leveldbstore/store.go +++ b/pkg/storage/leveldbstore/store.go @@ -18,7 +18,11 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -const separator = "/" +const ( + separator = "/" + // dirtyKey is written on open and deleted on clean close to detect unclean shutdowns. + dirtyKey = ".store-dirty-shutdown" +) // key returns the Item identifier for the leveldb storage. func key(item storage.Key) []byte { @@ -56,7 +60,8 @@ type Store struct { // New returns a new store the backed by leveldb. // If path == "", the leveldb will run with in memory backend storage. -func New(path string, opts *opt.Options) (*Store, error) { +// The returned bool indicates whether the previous shutdown was unclean (dirty). +func New(path string, opts *opt.Options) (*Store, bool, error) { var ( err error db *leveldb.DB @@ -72,13 +77,22 @@ func New(path string, opts *opt.Options) (*Store, error) { } if err != nil { - return nil, err + return nil, false, err + } + + dirty, err := db.Has([]byte(dirtyKey), nil) + if err != nil { + return nil, false, fmt.Errorf("has dirty record: %w", err) + } + + if err = db.Put([]byte(dirtyKey), []byte{}, nil); err != nil { + return nil, false, fmt.Errorf("put dirty record: %w", err) } return &Store{ db: db, path: path, - }, nil + }, dirty, nil } // DB implements the Storer interface. @@ -88,7 +102,7 @@ func (s *Store) DB() *leveldb.DB { // Close implements the storage.Store interface. func (s *Store) Close() (err error) { - return s.db.Close() + return errors.Join(s.db.Delete([]byte(dirtyKey), nil), s.db.Close()) } // Get implements the storage.Store interface. diff --git a/pkg/storage/leveldbstore/store_test.go b/pkg/storage/leveldbstore/store_test.go index 7e7bc809213..e4a5c98ad49 100644 --- a/pkg/storage/leveldbstore/store_test.go +++ b/pkg/storage/leveldbstore/store_test.go @@ -9,13 +9,14 @@ import ( "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" "github.com/ethersphere/bee/v2/pkg/storage/storagetest" + "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" ) func TestStore(t *testing.T) { t.Parallel() - store, err := leveldbstore.New(t.TempDir(), nil) + store, _, err := leveldbstore.New(t.TempDir(), nil) if err != nil { t.Fatalf("create store failed: %v", err) } @@ -24,7 +25,7 @@ func TestStore(t *testing.T) { } func BenchmarkStore(b *testing.B) { - st, err := leveldbstore.New("", &opt.Options{ + st, _, err := leveldbstore.New("", &opt.Options{ Compression: opt.SnappyCompression, }) if err != nil { @@ -37,7 +38,7 @@ func BenchmarkStore(b *testing.B) { func TestBatchedStore(t *testing.T) { t.Parallel() - st, err := leveldbstore.New("", nil) + st, _, err := leveldbstore.New("", nil) if err != nil { t.Fatalf("create store failed: %v", err) } @@ -46,7 +47,7 @@ func TestBatchedStore(t *testing.T) { } func BenchmarkBatchedStore(b *testing.B) { - st, err := leveldbstore.New("", &opt.Options{ + st, _, err := leveldbstore.New("", &opt.Options{ Compression: opt.SnappyCompression, }) if err != nil { @@ -55,3 +56,69 @@ func BenchmarkBatchedStore(b *testing.B) { b.Cleanup(func() { _ = st.Close() }) storagetest.BenchmarkBatchedStore(b, st) } + +func TestDirtyMarker(t *testing.T) { + t.Parallel() + + t.Run("clean on first open", func(t *testing.T) { + t.Parallel() + st, dirty, err := leveldbstore.New(t.TempDir(), nil) + if err != nil { + t.Fatalf("open failed: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + if dirty { + t.Fatal("expected clean on first open, got dirty") + } + }) + + t.Run("clean after clean close", func(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + st, _, err := leveldbstore.New(dir, nil) + if err != nil { + t.Fatalf("first open failed: %v", err) + } + if err := st.Close(); err != nil { + t.Fatalf("close failed: %v", err) + } + + st, dirty, err := leveldbstore.New(dir, nil) + if err != nil { + t.Fatalf("second open failed: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + if dirty { + t.Fatal("expected clean after clean close, got dirty") + } + }) + + t.Run("dirty after crash", func(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + // Simulate a previous session that crashed: open the raw leveldb + // and write the dirty marker directly, then close without deleting it. + db, err := leveldb.OpenFile(dir, nil) + if err != nil { + t.Fatalf("raw open failed: %v", err) + } + if err := db.Put([]byte(".store-dirty-shutdown"), []byte{}, nil); err != nil { + t.Fatalf("write dirty key failed: %v", err) + } + if err := db.Close(); err != nil { + t.Fatalf("raw close failed: %v", err) + } + + // Now open via leveldbstore — should detect the marker as dirty. + st, dirty, err := leveldbstore.New(dir, nil) + if err != nil { + t.Fatalf("open failed: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + if !dirty { + t.Fatal("expected dirty after crash, got clean") + } + }) +} diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index 8b72864f385..28008e20a9d 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -35,7 +35,7 @@ func Test_TransactionStorage(t *testing.T) { sharkyStore, err := sharky.New(&dirFS{basedir: t.TempDir()}, 32, swarm.SocMaxChunkSize) assert.NoError(t, err) - store, err := leveldbstore.New("", nil) + store, _, err := leveldbstore.New("", nil) assert.NoError(t, err) st := transaction.NewStorage(sharkyStore, store) diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go index aeacd310f3e..91404d16827 100644 --- a/pkg/storer/migration/step_05_test.go +++ b/pkg/storer/migration/step_05_test.go @@ -29,7 +29,7 @@ func Test_Step_05(t *testing.T) { sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) assert.NoError(t, err) - lstore, err := leveldbstore.New("", nil) + lstore, _, err := leveldbstore.New("", nil) assert.NoError(t, err) store := transaction.NewStorage(sharkyStore, lstore) diff --git a/pkg/storer/migration/step_06_test.go b/pkg/storer/migration/step_06_test.go index b5a5d3ecc7a..e3a2f170f42 100644 --- a/pkg/storer/migration/step_06_test.go +++ b/pkg/storer/migration/step_06_test.go @@ -35,7 +35,7 @@ func Test_Step_06(t *testing.T) { sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) require.NoError(t, err) - lstore, err := leveldbstore.New("", nil) + lstore, _, err := leveldbstore.New("", nil) require.NoError(t, err) store := transaction.NewStorage(sharkyStore, lstore) diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index d4a41c73491..6425213ca76 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -220,7 +220,7 @@ func closer(closers ...io.Closer) io.Closer { } func initInmemRepository() (transaction.Storage, io.Closer, error) { - store, err := leveldbstore.New("", nil) + store, _, err := leveldbstore.New("", nil) if err != nil { return nil, nil, fmt.Errorf("failed creating inmem levelDB index store: %w", err) } @@ -263,7 +263,7 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) { return nil, err } } - store, err := leveldbstore.New(path.Join(basePath, "indexstore"), &opt.Options{ + store, _, err := leveldbstore.New(path.Join(basePath, "indexstore"), &opt.Options{ OpenFilesCacheCapacity: int(opts.LdbOpenFilesLimit), BlockCacheCapacity: int(opts.LdbBlockCacheCapacity), WriteBuffer: int(opts.LdbWriteBufferSize),