From 194efa07f682feebbf22646259b537a51879ca26 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 9 Mar 2026 16:07:03 +0200 Subject: [PATCH 01/13] fix: debounce stamp issuer saves to reduce disk io --- pkg/postage/cleanshutdown.go | 43 +++++++++++++++++ pkg/postage/service.go | 91 ++++++++++++++++++++++++++++++++++-- pkg/postage/service_test.go | 3 ++ pkg/postage/stampissuer.go | 41 +++++++++++++++- 4 files changed, 171 insertions(+), 7 deletions(-) create mode 100644 pkg/postage/cleanshutdown.go diff --git a/pkg/postage/cleanshutdown.go b/pkg/postage/cleanshutdown.go new file mode 100644 index 00000000000..db6556405c8 --- /dev/null +++ b/pkg/postage/cleanshutdown.go @@ -0,0 +1,43 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package postage + +import "github.com/ethersphere/bee/v2/pkg/storage" + +// cleanShutdownItem is a storage.Item implementation for marking graceful shutdown. +type cleanShutdownItem struct{} + +// ID implements the storage.Item interface. +func (c *cleanShutdownItem) ID() string { + return "clean_shutdown" +} + +// Namespace implements the storage.Item interface. +func (c *cleanShutdownItem) Namespace() string { + return "postage" +} + +// Marshal implements the storage.Item interface. +func (c *cleanShutdownItem) Marshal() ([]byte, error) { + return []byte{0}, nil +} + +// Unmarshal implements the storage.Item interface. +func (c *cleanShutdownItem) Unmarshal(data []byte) error { + return nil +} + +// Clone implements the storage.Item interface. +func (c *cleanShutdownItem) Clone() storage.Item { + if c == nil { + return nil + } + return &cleanShutdownItem{} +} + +// String implements the fmt.Stringer interface. +func (c cleanShutdownItem) String() string { + return "postage/clean_shutdown" +} diff --git a/pkg/postage/service.go b/pkg/postage/service.go index 25091fb2f19..747b4c161b5 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -13,6 +13,7 @@ import ( "io" "math/big" "sync" + "time" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/storage" @@ -54,6 +55,9 @@ type service struct { postageStore Storer chainID int64 issuers []*StampIssuer + + quit chan struct{} + done chan struct{} } // NewService constructs a new Service. @@ -63,9 +67,11 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha store: store, postageStore: postageStore, chainID: chainID, + quit: make(chan struct{}), + done: make(chan struct{}), } - return s, s.store.Iterate( + err := s.store.Iterate( storage.Query{ Factory: func() storage.Item { return new(StampIssuerItem) @@ -75,6 +81,73 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha _ = s.add(issuer) return false, nil }) + if err != nil { + return nil, err + } + + if err := s.store.Get(&cleanShutdownItem{}); err != nil { + if errors.Is(err, storage.ErrNotFound) { + s.logger.Info("recovering bucket counts from stamper store") + if err := s.recoverBuckets(); err != nil { + s.logger.Warning("postage stamper store recovery failed", "err", err) + } + } + } else { + _ = s.store.Delete(&cleanShutdownItem{}) + } + + go s.run() + + return s, nil +} + +func (s *service) recoverBuckets() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.store.Iterate( + storage.Query{ + Factory: func() storage.Item { return new(StampItem) }, + }, func(result storage.Result) (bool, error) { + item := result.Entry.(*StampItem) + for _, issuer := range s.issuers { + if bytes.Equal(issuer.data.BatchID, item.BatchID) { + if err := issuer.recover(item.BatchIndex); err != nil { + s.logger.Debug("postage recovery of bucket count failed", "err", err) + } + issuer.SetDirty(true) + break + } + } + return false, nil + }) +} + +func (s *service) run() { + defer close(s.done) + // using 1 minute to significantly reduce disk writes + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-s.quit: + return + case <-ticker.C: + s.mtx.Lock() + issuers := make([]*StampIssuer, len(s.issuers)) + copy(issuers, s.issuers) + s.mtx.Unlock() + + for _, issuer := range issuers { + if issuer.IsDirty() { + if err := s.save(issuer); err != nil { + s.logger.Warning("failed to save stamp issuer", "err", err) + } + } + } + } + } } // Add adds a stamp issuer to the active issuers. @@ -163,9 +236,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e return nil, nil, ErrNotUsable } return st, func() error { - ps.mtx.Lock() - defer ps.mtx.Unlock() - return ps.save(st) + st.SetDirty(true) + return nil }, nil } } @@ -182,15 +254,24 @@ func (ps *service) save(st *StampIssuer) error { }); err != nil { return err } + st.dirty = false return nil } func (ps *service) Close() error { + close(ps.quit) + <-ps.done + ps.mtx.Lock() defer ps.mtx.Unlock() var err error for _, issuer := range ps.issuers { - err = errors.Join(err, ps.save(issuer)) + if issuer.IsDirty() { + err = errors.Join(err, ps.save(issuer)) + } + } + if err == nil { + err = ps.store.Put(&cleanShutdownItem{}) } return err } diff --git a/pkg/postage/service_test.go b/pkg/postage/service_test.go index 1237fecc066..a723befbc98 100644 --- a/pkg/postage/service_test.go +++ b/pkg/postage/service_test.go @@ -57,6 +57,7 @@ func TestSaveLoad(t *testing.T) { test := func(id int64) { psS := saved(id) psL := loaded(id) + defer psL.Close() sMap := map[string]struct{}{} stampIssuers := psS.StampIssuers() @@ -91,6 +92,8 @@ func TestGetStampIssuer(t *testing.T) { if err != nil { t.Fatal(err) } + defer ps.Close() + ids := make([][]byte, 8) for i := range ids { id := make([]byte, 32) diff --git a/pkg/postage/stampissuer.go b/pkg/postage/stampissuer.go index 77b632cf0f4..078d7e0caa3 100644 --- a/pkg/postage/stampissuer.go +++ b/pkg/postage/stampissuer.go @@ -151,8 +151,9 @@ func (s stampIssuerData) Clone() stampIssuerData { // A StampIssuer instance extends a batch with bucket collision tracking // embedded in multiple Stampers, can be used concurrently. type StampIssuer struct { - data stampIssuerData - mtx sync.Mutex + data stampIssuerData + dirty bool + mtx sync.Mutex } // NewStampIssuer constructs a StampIssuer as an extension of a batch for local @@ -268,6 +269,42 @@ func (si *StampIssuer) Buckets() []uint32 { return b } +// SetDirty sets the dirty flag of the StampIssuer indicating it has unsaved bucket changes. +func (si *StampIssuer) SetDirty(dirty bool) { + si.mtx.Lock() + defer si.mtx.Unlock() + si.dirty = dirty +} + +// IsDirty returns the dirty flag of the StampIssuer. +func (si *StampIssuer) IsDirty() bool { + si.mtx.Lock() + defer si.mtx.Unlock() + return si.dirty +} + +// recover restores the bucket count from a stored batchIndex, used during crash recovery. +func (si *StampIssuer) recover(batchIndex []byte) error { + si.mtx.Lock() + defer si.mtx.Unlock() + + bIdx, bCnt := BucketIndexFromBytes(batchIndex) + if bIdx >= uint32(len(si.data.Buckets)) { + return fmt.Errorf("bucket index %d out of bounds", bIdx) + } + + // bCnt is the collision count WHEN the stamp was issued, + // meaning the bucket count has already reached AT LEAST bCnt + 1 + if si.data.Buckets[bIdx] <= bCnt { + si.data.Buckets[bIdx] = bCnt + 1 + + if si.data.Buckets[bIdx] > si.data.MaxBucketCount { + si.data.MaxBucketCount = si.data.Buckets[bIdx] + } + } + return nil +} + // StampIssuerItem is a storage.Item implementation for StampIssuer. type StampIssuerItem struct { Issuer *StampIssuer From 481e501decd8edbcb4d159da81f37f56ca193144 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 9 Mar 2026 22:01:22 +0200 Subject: [PATCH 02/13] feat: accelerate stamperstore disk io and accounting serialization --- pkg/accounting/accounting.go | 94 +++++++++++++++++------ pkg/node/statestore.go | 3 +- pkg/postage/stampercache.go | 144 +++++++++++++++++++++++++++++++++++ 3 files changed, 217 insertions(+), 24 deletions(-) create mode 100644 pkg/postage/stampercache.go diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index a877a70d0ec..a0b650e3f4c 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -8,6 +8,7 @@ package accounting import ( "context" + "encoding/json" "errors" "fmt" "maps" @@ -90,6 +91,45 @@ type creditAction struct { applied bool } +// bigInt is an internal wrapper around big.Int that natively bypasses +// proxyItem.Unmarshal json overhead by implementing encoding.BinaryMarshaler +// and encoding.BinaryUnmarshaler directly. +type bigInt struct { + *big.Int +} + +// MarshalBinary serializes the bigInt quickly, prefixing a 1 for negatives, 0 for positives. +func (b *bigInt) MarshalBinary() ([]byte, error) { + if b.Int == nil { + return []byte{0}, nil + } + bytes := b.Int.Bytes() + res := make([]byte, len(bytes)+1) + if b.Int.Sign() < 0 { + res[0] = 1 + } + copy(res[1:], bytes) + return res, nil +} + +// UnmarshalBinary evaluates backward compatibility. If data begins with 0 or 1, +// it uses fast binary parsing. Otherwise, it delegates to json.Unmarshal. +func (b *bigInt) UnmarshalBinary(data []byte) error { + if len(data) == 0 { + return nil + } + // Fallback to JSON if data doesn't start with binary signature + if data[0] != 0 && data[0] != 1 { + b.Int = new(big.Int) + return json.Unmarshal(data, b.Int) + } + b.Int = new(big.Int).SetBytes(data[1:]) + if data[0] == 1 { + b.Int.Neg(b.Int) + } + return nil +} + // PayFunc is the function used for async monetary settlement type PayFunc func(context.Context, swarm.Address, *big.Int) @@ -353,7 +393,7 @@ func (c *creditAction) Apply() error { loggerV2.Debug("credit action apply", "crediting_peer_address", c.peer, "price", c.price, "new_balance", nextBalance) - err = c.accounting.store.Put(peerBalanceKey(c.peer), nextBalance) + err = c.accounting.store.Put(peerBalanceKey(c.peer), &bigInt{Int: nextBalance}) if err != nil { return fmt.Errorf("failed to persist balance: %w", err) } @@ -406,7 +446,7 @@ func (c *creditAction) Apply() error { loggerV2.Debug("credit action apply; decreasing originated balance", "crediting_peer_address", c.peer, "current_balance", nextOriginBalance) } - err = c.accounting.store.Put(originatedBalanceKey(c.peer), nextOriginBalance) + err = c.accounting.store.Put(originatedBalanceKey(c.peer), &bigInt{Int: nextOriginBalance}) if err != nil { return fmt.Errorf("failed to persist originated balance: %w", err) } @@ -519,7 +559,8 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { // Balance returns the current balance for the given peer. func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { - err = a.store.Get(peerBalanceKey(peer), &balance) + var wrapper bigInt + err = a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -528,12 +569,13 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { return nil, err } - return balance, nil + return wrapper.Int, nil } // OriginatedBalance returns the current balance for the given peer. func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) { - err = a.store.Get(originatedBalanceKey(peer), &balance) + var wrapper bigInt + err = a.store.Get(originatedBalanceKey(peer), &wrapper) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -542,12 +584,13 @@ func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, er return nil, err } - return balance, nil + return wrapper.Int, nil } // SurplusBalance returns the current balance for the given peer. func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) { - err = a.store.Get(peerSurplusBalanceKey(peer), &balance) + var wrapper bigInt + err = a.store.Get(peerSurplusBalanceKey(peer), &wrapper) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -556,11 +599,11 @@ func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err e return nil, err } - if balance.Cmp(big.NewInt(0)) < 0 { + if wrapper.Int.Cmp(big.NewInt(0)) < 0 { return nil, ErrInvalidValue } - return balance, nil + return wrapper.Int, nil } // CompensatedBalance returns balance decreased by surplus balance @@ -682,13 +725,13 @@ func (a *Accounting) Balances() (map[string]*big.Int, error) { } if _, ok := s[addr.String()]; !ok { - var storevalue *big.Int - err = a.store.Get(peerBalanceKey(addr), &storevalue) + var wrapper bigInt + err = a.store.Get(peerBalanceKey(addr), &wrapper) if err != nil { return false, fmt.Errorf("get peer %s balance: %w", addr.String(), err) } - s[addr.String()] = storevalue + s[addr.String()] = wrapper.Int } return false, nil @@ -869,12 +912,15 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) { balance := new(big.Int) zero := big.NewInt(0) - err := a.store.Get(peerBalanceKey(peer), &balance) + var wrapper bigInt + err := a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if !errors.Is(err, storage.ErrNotFound) { return nil, err } balance = big.NewInt(0) + } else { + balance = wrapper.Int } peerDebt := new(big.Int).Add(balance, accountingPeer.shadowReservedBalance) @@ -893,13 +939,15 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { balance := new(big.Int) zero := big.NewInt(0) - - err := a.store.Get(peerBalanceKey(peer), &balance) + var wrapper bigInt + err := a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if !errors.Is(err, storage.ErrNotFound) { return nil, err } balance = big.NewInt(0) + } else { + balance = wrapper.Int } if balance.Cmp(zero) < 0 { @@ -919,16 +967,16 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { // shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance // this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve func (a *Accounting) shadowBalance(peer swarm.Address, accountingPeer *accountingPeer) (shadowBalance *big.Int, err error) { - balance := new(big.Int) zero := big.NewInt(0) - - err = a.store.Get(peerBalanceKey(peer), &balance) + var wrapper bigInt + err = a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if errors.Is(err, storage.ErrNotFound) { return zero, nil } return nil, err } + balance := wrapper.Int if balance.Cmp(zero) >= 0 { return zero, nil @@ -986,7 +1034,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece loggerV2.Debug("registering payment sent", "peer_address", peer, "amount", amount, "new_balance", nextBalance) - err = a.store.Put(peerBalanceKey(peer), nextBalance) + err = a.store.Put(peerBalanceKey(peer), &bigInt{Int: nextBalance}) if err != nil { a.logger.Error(err, "notify payment sent; failed to persist balance") return @@ -1165,7 +1213,7 @@ func (a *Accounting) NotifyRefreshmentSent(peer swarm.Address, attemptedAmount, newBalance := new(big.Int).Add(currentBalance, amount) - err = a.store.Put(peerBalanceKey(peer), newBalance) + err = a.store.Put(peerBalanceKey(peer), &bigInt{Int: newBalance}) if err != nil { a.logger.Error(err, "notifyrefreshmentsent failed to persist balance") return @@ -1269,7 +1317,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric if newSurplusBalance.Cmp(big.NewInt(0)) >= 0 { loggerV2.Debug("surplus debiting peer", "peer_address", peer, "price", price, "new_balance", newSurplusBalance) - err = a.store.Put(peerSurplusBalanceKey(peer), newSurplusBalance) + err = a.store.Put(peerSurplusBalanceKey(peer), &bigInt{Int: newSurplusBalance}) if err != nil { return nil, fmt.Errorf("failed to persist surplus balance: %w", err) } @@ -1290,7 +1338,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric // let's store 0 as surplus balance loggerV2.Debug("surplus debiting peer", "peer_address", peer, "amount", debitIncrease, "new_balance", 0) - err = a.store.Put(peerSurplusBalanceKey(peer), big.NewInt(0)) + err = a.store.Put(peerSurplusBalanceKey(peer), &bigInt{Int: big.NewInt(0)}) if err != nil { return nil, fmt.Errorf("failed to persist surplus balance: %w", err) } @@ -1308,7 +1356,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric loggerV2.Debug("debiting peer", "peer_address", peer, "price", price, "new_balance", nextBalance) - err = a.store.Put(peerBalanceKey(peer), nextBalance) + err = a.store.Put(peerBalanceKey(peer), &bigInt{Int: nextBalance}) if err != nil { return nil, fmt.Errorf("failed to persist balance: %w", err) } diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index 626848cf88e..c69a8a16bf1 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -11,6 +11,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" + "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/statestore/storeadapter" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/cache" @@ -56,7 +57,7 @@ func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.Stat return nil, err } - return stamperStore, nil + return postage.NewStamperStoreCache(stamperStore), nil } const ( diff --git a/pkg/postage/stampercache.go b/pkg/postage/stampercache.go new file mode 100644 index 00000000000..49f05aa840c --- /dev/null +++ b/pkg/postage/stampercache.go @@ -0,0 +1,144 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package postage + +import ( + "context" + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/storage" +) + +var _ storage.Store = (*StamperStoreCache)(nil) + +// StamperStoreCache wraps a storage.Store to memory-buffer Put operations. +// It is explicitly designed to intercept high-frequency StampItem writes +// generated by stamper.go during mass chunk uploads, compressing thousands +// of sequential disk writes into sporadic batched flushes. +type StamperStoreCache struct { + storage.Store + + cache sync.Map // map[string][]byte + + quit chan struct{} + wg sync.WaitGroup +} + +// key returns a strictly unique string for the storage Item +func key(i storage.Key) string { + return i.Namespace() + "/" + i.ID() +} + +// NewStamperStoreCache instantiates a write-delayed cache over the given store. +// Flush operations run in the background every 5 seconds. +func NewStamperStoreCache(store storage.Store) *StamperStoreCache { + c := &StamperStoreCache{ + Store: store, + quit: make(chan struct{}), + } + + c.wg.Add(1) + go c.flushLoop() + + return c +} + +// Get intercepts storage queries. It serves from memory if the item is dirty/cached, +// otherwise falls back to the persistent store. +func (c *StamperStoreCache) Get(i storage.Item) error { + k := key(i) + if val, ok := c.cache.Load(k); ok { + item := val.(storage.Item) + // We marshal and unmarshal from the cloned item to ensure + // safe copies and proper unmarshaler interface usage + b, err := item.Marshal() + if err != nil { + return err + } + return i.Unmarshal(b) + } + return c.Store.Get(i) +} + +// Has intercepts existence checks. +func (c *StamperStoreCache) Has(k storage.Key) (bool, error) { + if _, ok := c.cache.Load(key(k)); ok { + return true, nil + } + return c.Store.Has(k) +} + +// Put intercepts writes. The cloned item is staged in RAM. +// It does NOT hit the disk. +func (c *StamperStoreCache) Put(i storage.Item) error { + c.cache.Store(key(i), i.Clone()) + return nil +} + +// Delete intercepts deletes, removing them from cache before falling +// straight through to the underlying persistent store. +func (c *StamperStoreCache) Delete(i storage.Item) error { + c.cache.Delete(key(i)) + return c.Store.Delete(i) +} + +// Close gracefully stops the flush loop and flushes all remaining items to disk. +func (c *StamperStoreCache) Close() error { + close(c.quit) + c.wg.Wait() + return c.Store.Close() +} + +// flushLoop runs passively, persisting memory to disk. +func (c *StamperStoreCache) flushLoop() { + defer c.wg.Done() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.quit: + c.flush() + return + case <-ticker.C: + c.flush() + } + } +} + +// flush commits the dirty cache map to the underlying store. +func (c *StamperStoreCache) flush() { + // Try to get a batch if the underlying store supports it + var batch storage.Batch + if batcher, ok := c.Store.(storage.Batcher); ok { + batch = batcher.Batch(context.Background()) + } + + // Delete from map and prepare to write + c.cache.Range(func(k, v interface{}) bool { + item := v.(storage.Item) + c.cache.Delete(k) // removed from dirty cache + + var err error + if batch != nil { + err = batch.Put(item) + } else { + err = c.Store.Put(item) + } + + if err != nil { + // If flush fails, re-insert to try again next tick + c.cache.Store(k, item) + } + return true + }) + + // Commit batch if it exists + if batch != nil { + _ = batch.Commit() + } +} From 125abf50b18cd27f6da32e198ef1865a9fef41ce Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Wed, 11 Mar 2026 20:50:14 +0200 Subject: [PATCH 03/13] fix: go fmt --- pkg/accounting/accounting.go | 10 +++++----- pkg/postage/stampercache.go | 16 ++++++++-------- pkg/postage/stampissuer.go | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index a0b650e3f4c..9bd1d6a43ae 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -103,9 +103,9 @@ func (b *bigInt) MarshalBinary() ([]byte, error) { if b.Int == nil { return []byte{0}, nil } - bytes := b.Int.Bytes() + bytes := b.Bytes() res := make([]byte, len(bytes)+1) - if b.Int.Sign() < 0 { + if b.Sign() < 0 { res[0] = 1 } copy(res[1:], bytes) @@ -125,7 +125,7 @@ func (b *bigInt) UnmarshalBinary(data []byte) error { } b.Int = new(big.Int).SetBytes(data[1:]) if data[0] == 1 { - b.Int.Neg(b.Int) + b.Neg(b.Int) } return nil } @@ -909,7 +909,7 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) { accountingPeer.lock.Lock() defer accountingPeer.lock.Unlock() - balance := new(big.Int) + var balance *big.Int zero := big.NewInt(0) var wrapper bigInt @@ -937,7 +937,7 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { accountingPeer := a.getAccountingPeer(peer) - balance := new(big.Int) + var balance *big.Int zero := big.NewInt(0) var wrapper bigInt err := a.store.Get(peerBalanceKey(peer), &wrapper) diff --git a/pkg/postage/stampercache.go b/pkg/postage/stampercache.go index 49f05aa840c..0a2fb8942ad 100644 --- a/pkg/postage/stampercache.go +++ b/pkg/postage/stampercache.go @@ -20,9 +20,9 @@ var _ storage.Store = (*StamperStoreCache)(nil) // of sequential disk writes into sporadic batched flushes. type StamperStoreCache struct { storage.Store - + cache sync.Map // map[string][]byte - + quit chan struct{} wg sync.WaitGroup } @@ -39,10 +39,10 @@ func NewStamperStoreCache(store storage.Store) *StamperStoreCache { Store: store, quit: make(chan struct{}), } - + c.wg.Add(1) go c.flushLoop() - + return c } @@ -95,10 +95,10 @@ func (c *StamperStoreCache) Close() error { // flushLoop runs passively, persisting memory to disk. func (c *StamperStoreCache) flushLoop() { defer c.wg.Done() - + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - + for { select { case <-c.quit: @@ -122,14 +122,14 @@ func (c *StamperStoreCache) flush() { c.cache.Range(func(k, v interface{}) bool { item := v.(storage.Item) c.cache.Delete(k) // removed from dirty cache - + var err error if batch != nil { err = batch.Put(item) } else { err = c.Store.Put(item) } - + if err != nil { // If flush fails, re-insert to try again next tick c.cache.Store(k, item) diff --git a/pkg/postage/stampissuer.go b/pkg/postage/stampissuer.go index 078d7e0caa3..f6036ed45ea 100644 --- a/pkg/postage/stampissuer.go +++ b/pkg/postage/stampissuer.go @@ -293,11 +293,11 @@ func (si *StampIssuer) recover(batchIndex []byte) error { return fmt.Errorf("bucket index %d out of bounds", bIdx) } - // bCnt is the collision count WHEN the stamp was issued, + // bCnt is the collision count WHEN the stamp was issued, // meaning the bucket count has already reached AT LEAST bCnt + 1 if si.data.Buckets[bIdx] <= bCnt { si.data.Buckets[bIdx] = bCnt + 1 - + if si.data.Buckets[bIdx] > si.data.MaxBucketCount { si.data.MaxBucketCount = si.data.Buckets[bIdx] } From f5ef47369677d83fdfce941140e1effb4bd83878 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Wed, 11 Mar 2026 20:59:23 +0200 Subject: [PATCH 04/13] fix: linter --- pkg/accounting/accounting.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index 9bd1d6a43ae..86a400267c3 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -599,7 +599,7 @@ func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err e return nil, err } - if wrapper.Int.Cmp(big.NewInt(0)) < 0 { + if wrapper.Cmp(big.NewInt(0)) < 0 { return nil, ErrInvalidValue } From ef6cd1d5fbf9db2a858b75d498591a7d750fba14 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 12 Mar 2026 13:51:40 +0200 Subject: [PATCH 05/13] chore: refactor usage of big int --- pkg/accounting/accounting.go | 70 +++++++++--------------------------- pkg/bigint/bigint.go | 37 +++++++++++++++++++ 2 files changed, 53 insertions(+), 54 deletions(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index 86a400267c3..8a98ed63453 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -8,7 +8,7 @@ package accounting import ( "context" - "encoding/json" + "errors" "fmt" "maps" @@ -21,6 +21,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/pricing" "github.com/ethersphere/bee/v2/pkg/settlement/pseudosettle" + "github.com/ethersphere/bee/v2/pkg/bigint" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -91,45 +92,6 @@ type creditAction struct { applied bool } -// bigInt is an internal wrapper around big.Int that natively bypasses -// proxyItem.Unmarshal json overhead by implementing encoding.BinaryMarshaler -// and encoding.BinaryUnmarshaler directly. -type bigInt struct { - *big.Int -} - -// MarshalBinary serializes the bigInt quickly, prefixing a 1 for negatives, 0 for positives. -func (b *bigInt) MarshalBinary() ([]byte, error) { - if b.Int == nil { - return []byte{0}, nil - } - bytes := b.Bytes() - res := make([]byte, len(bytes)+1) - if b.Sign() < 0 { - res[0] = 1 - } - copy(res[1:], bytes) - return res, nil -} - -// UnmarshalBinary evaluates backward compatibility. If data begins with 0 or 1, -// it uses fast binary parsing. Otherwise, it delegates to json.Unmarshal. -func (b *bigInt) UnmarshalBinary(data []byte) error { - if len(data) == 0 { - return nil - } - // Fallback to JSON if data doesn't start with binary signature - if data[0] != 0 && data[0] != 1 { - b.Int = new(big.Int) - return json.Unmarshal(data, b.Int) - } - b.Int = new(big.Int).SetBytes(data[1:]) - if data[0] == 1 { - b.Neg(b.Int) - } - return nil -} - // PayFunc is the function used for async monetary settlement type PayFunc func(context.Context, swarm.Address, *big.Int) @@ -393,7 +355,7 @@ func (c *creditAction) Apply() error { loggerV2.Debug("credit action apply", "crediting_peer_address", c.peer, "price", c.price, "new_balance", nextBalance) - err = c.accounting.store.Put(peerBalanceKey(c.peer), &bigInt{Int: nextBalance}) + err = c.accounting.store.Put(peerBalanceKey(c.peer), &bigint.BigInt{Int: nextBalance}) if err != nil { return fmt.Errorf("failed to persist balance: %w", err) } @@ -446,7 +408,7 @@ func (c *creditAction) Apply() error { loggerV2.Debug("credit action apply; decreasing originated balance", "crediting_peer_address", c.peer, "current_balance", nextOriginBalance) } - err = c.accounting.store.Put(originatedBalanceKey(c.peer), &bigInt{Int: nextOriginBalance}) + err = c.accounting.store.Put(originatedBalanceKey(c.peer), &bigint.BigInt{Int: nextOriginBalance}) if err != nil { return fmt.Errorf("failed to persist originated balance: %w", err) } @@ -559,7 +521,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { // Balance returns the current balance for the given peer. func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { - var wrapper bigInt + var wrapper bigint.BigInt err = a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { @@ -574,7 +536,7 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { // OriginatedBalance returns the current balance for the given peer. func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) { - var wrapper bigInt + var wrapper bigint.BigInt err = a.store.Get(originatedBalanceKey(peer), &wrapper) if err != nil { @@ -589,7 +551,7 @@ func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, er // SurplusBalance returns the current balance for the given peer. func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) { - var wrapper bigInt + var wrapper bigint.BigInt err = a.store.Get(peerSurplusBalanceKey(peer), &wrapper) if err != nil { @@ -725,7 +687,7 @@ func (a *Accounting) Balances() (map[string]*big.Int, error) { } if _, ok := s[addr.String()]; !ok { - var wrapper bigInt + var wrapper bigint.BigInt err = a.store.Get(peerBalanceKey(addr), &wrapper) if err != nil { return false, fmt.Errorf("get peer %s balance: %w", addr.String(), err) @@ -912,7 +874,7 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) { var balance *big.Int zero := big.NewInt(0) - var wrapper bigInt + var wrapper bigint.BigInt err := a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if !errors.Is(err, storage.ErrNotFound) { @@ -939,7 +901,7 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { var balance *big.Int zero := big.NewInt(0) - var wrapper bigInt + var wrapper bigint.BigInt err := a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if !errors.Is(err, storage.ErrNotFound) { @@ -968,7 +930,7 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { // this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve func (a *Accounting) shadowBalance(peer swarm.Address, accountingPeer *accountingPeer) (shadowBalance *big.Int, err error) { zero := big.NewInt(0) - var wrapper bigInt + var wrapper bigint.BigInt err = a.store.Get(peerBalanceKey(peer), &wrapper) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -1034,7 +996,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece loggerV2.Debug("registering payment sent", "peer_address", peer, "amount", amount, "new_balance", nextBalance) - err = a.store.Put(peerBalanceKey(peer), &bigInt{Int: nextBalance}) + err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance}) if err != nil { a.logger.Error(err, "notify payment sent; failed to persist balance") return @@ -1213,7 +1175,7 @@ func (a *Accounting) NotifyRefreshmentSent(peer swarm.Address, attemptedAmount, newBalance := new(big.Int).Add(currentBalance, amount) - err = a.store.Put(peerBalanceKey(peer), &bigInt{Int: newBalance}) + err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: newBalance}) if err != nil { a.logger.Error(err, "notifyrefreshmentsent failed to persist balance") return @@ -1317,7 +1279,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric if newSurplusBalance.Cmp(big.NewInt(0)) >= 0 { loggerV2.Debug("surplus debiting peer", "peer_address", peer, "price", price, "new_balance", newSurplusBalance) - err = a.store.Put(peerSurplusBalanceKey(peer), &bigInt{Int: newSurplusBalance}) + err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: newSurplusBalance}) if err != nil { return nil, fmt.Errorf("failed to persist surplus balance: %w", err) } @@ -1338,7 +1300,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric // let's store 0 as surplus balance loggerV2.Debug("surplus debiting peer", "peer_address", peer, "amount", debitIncrease, "new_balance", 0) - err = a.store.Put(peerSurplusBalanceKey(peer), &bigInt{Int: big.NewInt(0)}) + err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: big.NewInt(0)}) if err != nil { return nil, fmt.Errorf("failed to persist surplus balance: %w", err) } @@ -1356,7 +1318,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric loggerV2.Debug("debiting peer", "peer_address", peer, "price", price, "new_balance", nextBalance) - err = a.store.Put(peerBalanceKey(peer), &bigInt{Int: nextBalance}) + err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance}) if err != nil { return nil, fmt.Errorf("failed to persist balance: %w", err) } diff --git a/pkg/bigint/bigint.go b/pkg/bigint/bigint.go index 4d5e9557c36..1d2192e5417 100644 --- a/pkg/bigint/bigint.go +++ b/pkg/bigint/bigint.go @@ -42,3 +42,40 @@ func (i *BigInt) UnmarshalJSON(b []byte) error { func Wrap(i *big.Int) *BigInt { return &BigInt{Int: i} } + +// MarshalBinary serializes the BigInt quickly, prefixing a 1 for negatives, 0 for positives. +func (i *BigInt) MarshalBinary() ([]byte, error) { + if i.Int == nil { + return []byte{0}, nil + } + bytes := i.Bytes() + res := make([]byte, len(bytes)+1) + if i.Sign() < 0 { + res[0] = 1 + } + copy(res[1:], bytes) + return res, nil +} + +// UnmarshalBinary evaluates backward compatibility. If data begins with 0 or 1, +// it uses fast binary parsing. Otherwise, it delegates to json.Unmarshal. +func (i *BigInt) UnmarshalBinary(data []byte) error { + if len(data) == 0 { + return nil + } + // Fallback to JSON if data doesn't start with binary signature + if data[0] != 0 && data[0] != 1 { + if i.Int == nil { + i.Int = new(big.Int) + } + return json.Unmarshal(data, i.Int) + } + if i.Int == nil { + i.Int = new(big.Int) + } + i.SetBytes(data[1:]) + if data[0] == 1 { + i.Neg(i.Int) + } + return nil +} From 610c32dedbc90d18a55530514cedc20cfdc8c426 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 12 Mar 2026 15:19:12 +0200 Subject: [PATCH 06/13] chore: go fmt --- pkg/accounting/accounting.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index 8a98ed63453..90be37e1d50 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -17,11 +17,11 @@ import ( "sync" "time" + "github.com/ethersphere/bee/v2/pkg/bigint" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/pricing" "github.com/ethersphere/bee/v2/pkg/settlement/pseudosettle" - "github.com/ethersphere/bee/v2/pkg/bigint" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" ) From 3bcf2e04d921656ee2b56a8228050295316ec962 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 17 Mar 2026 15:39:41 +0200 Subject: [PATCH 07/13] fix: remove stamper store write cache to prevent crash data loss --- pkg/node/statestore.go | 3 +- pkg/postage/stampercache.go | 144 ------------------------------------ 2 files changed, 1 insertion(+), 146 deletions(-) delete mode 100644 pkg/postage/stampercache.go diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index c69a8a16bf1..626848cf88e 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -11,7 +11,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" - "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/statestore/storeadapter" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/cache" @@ -57,7 +56,7 @@ func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.Stat return nil, err } - return postage.NewStamperStoreCache(stamperStore), nil + return stamperStore, nil } const ( diff --git a/pkg/postage/stampercache.go b/pkg/postage/stampercache.go deleted file mode 100644 index 0a2fb8942ad..00000000000 --- a/pkg/postage/stampercache.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package postage - -import ( - "context" - "sync" - "time" - - "github.com/ethersphere/bee/v2/pkg/storage" -) - -var _ storage.Store = (*StamperStoreCache)(nil) - -// StamperStoreCache wraps a storage.Store to memory-buffer Put operations. -// It is explicitly designed to intercept high-frequency StampItem writes -// generated by stamper.go during mass chunk uploads, compressing thousands -// of sequential disk writes into sporadic batched flushes. -type StamperStoreCache struct { - storage.Store - - cache sync.Map // map[string][]byte - - quit chan struct{} - wg sync.WaitGroup -} - -// key returns a strictly unique string for the storage Item -func key(i storage.Key) string { - return i.Namespace() + "/" + i.ID() -} - -// NewStamperStoreCache instantiates a write-delayed cache over the given store. -// Flush operations run in the background every 5 seconds. -func NewStamperStoreCache(store storage.Store) *StamperStoreCache { - c := &StamperStoreCache{ - Store: store, - quit: make(chan struct{}), - } - - c.wg.Add(1) - go c.flushLoop() - - return c -} - -// Get intercepts storage queries. It serves from memory if the item is dirty/cached, -// otherwise falls back to the persistent store. -func (c *StamperStoreCache) Get(i storage.Item) error { - k := key(i) - if val, ok := c.cache.Load(k); ok { - item := val.(storage.Item) - // We marshal and unmarshal from the cloned item to ensure - // safe copies and proper unmarshaler interface usage - b, err := item.Marshal() - if err != nil { - return err - } - return i.Unmarshal(b) - } - return c.Store.Get(i) -} - -// Has intercepts existence checks. -func (c *StamperStoreCache) Has(k storage.Key) (bool, error) { - if _, ok := c.cache.Load(key(k)); ok { - return true, nil - } - return c.Store.Has(k) -} - -// Put intercepts writes. The cloned item is staged in RAM. -// It does NOT hit the disk. -func (c *StamperStoreCache) Put(i storage.Item) error { - c.cache.Store(key(i), i.Clone()) - return nil -} - -// Delete intercepts deletes, removing them from cache before falling -// straight through to the underlying persistent store. -func (c *StamperStoreCache) Delete(i storage.Item) error { - c.cache.Delete(key(i)) - return c.Store.Delete(i) -} - -// Close gracefully stops the flush loop and flushes all remaining items to disk. -func (c *StamperStoreCache) Close() error { - close(c.quit) - c.wg.Wait() - return c.Store.Close() -} - -// flushLoop runs passively, persisting memory to disk. -func (c *StamperStoreCache) flushLoop() { - defer c.wg.Done() - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-c.quit: - c.flush() - return - case <-ticker.C: - c.flush() - } - } -} - -// flush commits the dirty cache map to the underlying store. -func (c *StamperStoreCache) flush() { - // Try to get a batch if the underlying store supports it - var batch storage.Batch - if batcher, ok := c.Store.(storage.Batcher); ok { - batch = batcher.Batch(context.Background()) - } - - // Delete from map and prepare to write - c.cache.Range(func(k, v interface{}) bool { - item := v.(storage.Item) - c.cache.Delete(k) // removed from dirty cache - - var err error - if batch != nil { - err = batch.Put(item) - } else { - err = c.Store.Put(item) - } - - if err != nil { - // If flush fails, re-insert to try again next tick - c.cache.Store(k, item) - } - return true - }) - - // Commit batch if it exists - if batch != nil { - _ = batch.Commit() - } -} From 9df23272c5a6d6eb3fce27744abb77a22b6e2600 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 31 Mar 2026 14:57:12 +0300 Subject: [PATCH 08/13] fix: revert accounting serialization and improve log levels --- pkg/accounting/accounting.go | 60 +++++++++++++++--------------------- pkg/bigint/bigint.go | 36 ---------------------- pkg/postage/service.go | 6 ++-- 3 files changed, 28 insertions(+), 74 deletions(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index 90be37e1d50..a877a70d0ec 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -8,7 +8,6 @@ package accounting import ( "context" - "errors" "fmt" "maps" @@ -17,7 +16,6 @@ import ( "sync" "time" - "github.com/ethersphere/bee/v2/pkg/bigint" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/pricing" @@ -355,7 +353,7 @@ func (c *creditAction) Apply() error { loggerV2.Debug("credit action apply", "crediting_peer_address", c.peer, "price", c.price, "new_balance", nextBalance) - err = c.accounting.store.Put(peerBalanceKey(c.peer), &bigint.BigInt{Int: nextBalance}) + err = c.accounting.store.Put(peerBalanceKey(c.peer), nextBalance) if err != nil { return fmt.Errorf("failed to persist balance: %w", err) } @@ -408,7 +406,7 @@ func (c *creditAction) Apply() error { loggerV2.Debug("credit action apply; decreasing originated balance", "crediting_peer_address", c.peer, "current_balance", nextOriginBalance) } - err = c.accounting.store.Put(originatedBalanceKey(c.peer), &bigint.BigInt{Int: nextOriginBalance}) + err = c.accounting.store.Put(originatedBalanceKey(c.peer), nextOriginBalance) if err != nil { return fmt.Errorf("failed to persist originated balance: %w", err) } @@ -521,8 +519,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { // Balance returns the current balance for the given peer. func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { - var wrapper bigint.BigInt - err = a.store.Get(peerBalanceKey(peer), &wrapper) + err = a.store.Get(peerBalanceKey(peer), &balance) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -531,13 +528,12 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { return nil, err } - return wrapper.Int, nil + return balance, nil } // OriginatedBalance returns the current balance for the given peer. func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) { - var wrapper bigint.BigInt - err = a.store.Get(originatedBalanceKey(peer), &wrapper) + err = a.store.Get(originatedBalanceKey(peer), &balance) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -546,13 +542,12 @@ func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, er return nil, err } - return wrapper.Int, nil + return balance, nil } // SurplusBalance returns the current balance for the given peer. func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) { - var wrapper bigint.BigInt - err = a.store.Get(peerSurplusBalanceKey(peer), &wrapper) + err = a.store.Get(peerSurplusBalanceKey(peer), &balance) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -561,11 +556,11 @@ func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err e return nil, err } - if wrapper.Cmp(big.NewInt(0)) < 0 { + if balance.Cmp(big.NewInt(0)) < 0 { return nil, ErrInvalidValue } - return wrapper.Int, nil + return balance, nil } // CompensatedBalance returns balance decreased by surplus balance @@ -687,13 +682,13 @@ func (a *Accounting) Balances() (map[string]*big.Int, error) { } if _, ok := s[addr.String()]; !ok { - var wrapper bigint.BigInt - err = a.store.Get(peerBalanceKey(addr), &wrapper) + var storevalue *big.Int + err = a.store.Get(peerBalanceKey(addr), &storevalue) if err != nil { return false, fmt.Errorf("get peer %s balance: %w", addr.String(), err) } - s[addr.String()] = wrapper.Int + s[addr.String()] = storevalue } return false, nil @@ -871,18 +866,15 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) { accountingPeer.lock.Lock() defer accountingPeer.lock.Unlock() - var balance *big.Int + balance := new(big.Int) zero := big.NewInt(0) - var wrapper bigint.BigInt - err := a.store.Get(peerBalanceKey(peer), &wrapper) + err := a.store.Get(peerBalanceKey(peer), &balance) if err != nil { if !errors.Is(err, storage.ErrNotFound) { return nil, err } balance = big.NewInt(0) - } else { - balance = wrapper.Int } peerDebt := new(big.Int).Add(balance, accountingPeer.shadowReservedBalance) @@ -899,17 +891,15 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { accountingPeer := a.getAccountingPeer(peer) - var balance *big.Int + balance := new(big.Int) zero := big.NewInt(0) - var wrapper bigint.BigInt - err := a.store.Get(peerBalanceKey(peer), &wrapper) + + err := a.store.Get(peerBalanceKey(peer), &balance) if err != nil { if !errors.Is(err, storage.ErrNotFound) { return nil, err } balance = big.NewInt(0) - } else { - balance = wrapper.Int } if balance.Cmp(zero) < 0 { @@ -929,16 +919,16 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) { // shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance // this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve func (a *Accounting) shadowBalance(peer swarm.Address, accountingPeer *accountingPeer) (shadowBalance *big.Int, err error) { + balance := new(big.Int) zero := big.NewInt(0) - var wrapper bigint.BigInt - err = a.store.Get(peerBalanceKey(peer), &wrapper) + + err = a.store.Get(peerBalanceKey(peer), &balance) if err != nil { if errors.Is(err, storage.ErrNotFound) { return zero, nil } return nil, err } - balance := wrapper.Int if balance.Cmp(zero) >= 0 { return zero, nil @@ -996,7 +986,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece loggerV2.Debug("registering payment sent", "peer_address", peer, "amount", amount, "new_balance", nextBalance) - err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance}) + err = a.store.Put(peerBalanceKey(peer), nextBalance) if err != nil { a.logger.Error(err, "notify payment sent; failed to persist balance") return @@ -1175,7 +1165,7 @@ func (a *Accounting) NotifyRefreshmentSent(peer swarm.Address, attemptedAmount, newBalance := new(big.Int).Add(currentBalance, amount) - err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: newBalance}) + err = a.store.Put(peerBalanceKey(peer), newBalance) if err != nil { a.logger.Error(err, "notifyrefreshmentsent failed to persist balance") return @@ -1279,7 +1269,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric if newSurplusBalance.Cmp(big.NewInt(0)) >= 0 { loggerV2.Debug("surplus debiting peer", "peer_address", peer, "price", price, "new_balance", newSurplusBalance) - err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: newSurplusBalance}) + err = a.store.Put(peerSurplusBalanceKey(peer), newSurplusBalance) if err != nil { return nil, fmt.Errorf("failed to persist surplus balance: %w", err) } @@ -1300,7 +1290,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric // let's store 0 as surplus balance loggerV2.Debug("surplus debiting peer", "peer_address", peer, "amount", debitIncrease, "new_balance", 0) - err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: big.NewInt(0)}) + err = a.store.Put(peerSurplusBalanceKey(peer), big.NewInt(0)) if err != nil { return nil, fmt.Errorf("failed to persist surplus balance: %w", err) } @@ -1318,7 +1308,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric loggerV2.Debug("debiting peer", "peer_address", peer, "price", price, "new_balance", nextBalance) - err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance}) + err = a.store.Put(peerBalanceKey(peer), nextBalance) if err != nil { return nil, fmt.Errorf("failed to persist balance: %w", err) } diff --git a/pkg/bigint/bigint.go b/pkg/bigint/bigint.go index 1d2192e5417..1503023e03f 100644 --- a/pkg/bigint/bigint.go +++ b/pkg/bigint/bigint.go @@ -43,39 +43,3 @@ func Wrap(i *big.Int) *BigInt { return &BigInt{Int: i} } -// MarshalBinary serializes the BigInt quickly, prefixing a 1 for negatives, 0 for positives. -func (i *BigInt) MarshalBinary() ([]byte, error) { - if i.Int == nil { - return []byte{0}, nil - } - bytes := i.Bytes() - res := make([]byte, len(bytes)+1) - if i.Sign() < 0 { - res[0] = 1 - } - copy(res[1:], bytes) - return res, nil -} - -// UnmarshalBinary evaluates backward compatibility. If data begins with 0 or 1, -// it uses fast binary parsing. Otherwise, it delegates to json.Unmarshal. -func (i *BigInt) UnmarshalBinary(data []byte) error { - if len(data) == 0 { - return nil - } - // Fallback to JSON if data doesn't start with binary signature - if data[0] != 0 && data[0] != 1 { - if i.Int == nil { - i.Int = new(big.Int) - } - return json.Unmarshal(data, i.Int) - } - if i.Int == nil { - i.Int = new(big.Int) - } - i.SetBytes(data[1:]) - if data[0] == 1 { - i.Neg(i.Int) - } - return nil -} diff --git a/pkg/postage/service.go b/pkg/postage/service.go index 747b4c161b5..d6a6268a5cf 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -89,7 +89,7 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha if errors.Is(err, storage.ErrNotFound) { s.logger.Info("recovering bucket counts from stamper store") if err := s.recoverBuckets(); err != nil { - s.logger.Warning("postage stamper store recovery failed", "err", err) + s.logger.Error(err, "postage stamper store recovery failed") } } } else { @@ -113,7 +113,7 @@ func (s *service) recoverBuckets() error { for _, issuer := range s.issuers { if bytes.Equal(issuer.data.BatchID, item.BatchID) { if err := issuer.recover(item.BatchIndex); err != nil { - s.logger.Debug("postage recovery of bucket count failed", "err", err) + s.logger.Error(err, "postage recovery of bucket count failed") } issuer.SetDirty(true) break @@ -142,7 +142,7 @@ func (s *service) run() { for _, issuer := range issuers { if issuer.IsDirty() { if err := s.save(issuer); err != nil { - s.logger.Warning("failed to save stamp issuer", "err", err) + s.logger.Error(err, "failed to save stamp issuer") } } } From 3220ab4d740411347a4014bbc4526472d92eb3b2 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 31 Mar 2026 15:01:14 +0300 Subject: [PATCH 09/13] fix: linter --- pkg/bigint/bigint.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/bigint/bigint.go b/pkg/bigint/bigint.go index 1503023e03f..4d5e9557c36 100644 --- a/pkg/bigint/bigint.go +++ b/pkg/bigint/bigint.go @@ -42,4 +42,3 @@ func (i *BigInt) UnmarshalJSON(b []byte) error { func Wrap(i *big.Int) *BigInt { return &BigInt{Int: i} } - From 4f0446151f4b430e55e02b3a975c57ac74c2d6fb Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Wed, 1 Apr 2026 12:02:48 +0300 Subject: [PATCH 10/13] fix: debounce stamperstore writes and recover buckets on unclean shutdown --- pkg/node/node.go | 22 ++++++++++- pkg/node/statestore.go | 12 ++++++ pkg/postage/cleanshutdown.go | 43 --------------------- pkg/postage/service.go | 35 +++++++++-------- pkg/postage/service_test.go | 73 ++++++++++++++++++++++++++++++++++-- pkg/postage/stampissuer.go | 8 ++-- 6 files changed, 123 insertions(+), 70 deletions(-) delete mode 100644 pkg/postage/cleanshutdown.go diff --git a/pkg/node/node.go b/pkg/node/node.go index 50ed1a44b82..c79481e348b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -100,6 +100,7 @@ type Bee struct { tracerCloser io.Closer stateStoreCloser io.Closer stamperStoreCloser io.Closer + stamperCleanShutdown func() error localstoreCloser io.Closer topologyCloser io.Closer topologyHalter topology.Halter @@ -660,7 +661,21 @@ func NewBee( b.p2pService = p2ps b.p2pHalter = p2ps - post, err := postage.NewService(logger, stamperStore, batchStore, chainID) + dirtyItem := &stamperDirtyItem{} + wasClean := false + if err := stamperStore.Get(dirtyItem); err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("stamper store dirty check: %w", err) + } + // marker absent: first run or previous clean shutdown + wasClean = true + } + if err := stamperStore.Put(dirtyItem); err != nil { + return nil, fmt.Errorf("stamper store dirty marker: %w", err) + } + b.stamperCleanShutdown = func() error { return stamperStore.Delete(dirtyItem) } + + post, err := postage.NewService(logger, stamperStore, batchStore, chainID, wasClean) if err != nil { return nil, fmt.Errorf("postage service: %w", err) } @@ -1430,6 +1445,11 @@ func (b *Bee) Shutdown() error { tryClose(b.topologyCloser, "topology driver") tryClose(b.storageIncetivesCloser, "storage incentives agent") tryClose(b.stateStoreCloser, "statestore") + if b.stamperCleanShutdown != nil { + if err := b.stamperCleanShutdown(); err != nil { + mErr = errors.Join(mErr, fmt.Errorf("stamper clean shutdown: %w", err)) + } + } tryClose(b.stamperStoreCloser, "stamperstore") tryClose(b.localstoreCloser, "localstore") tryClose(b.resolverCloser, "resolver service") diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index 626848cf88e..8c5e0330dc4 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -83,6 +83,18 @@ func checkOverlay(storer storage.StateStorer, overlay swarm.Address) error { return nil } +// stamperDirtyItem is a storage.Item used to mark that the stamper store was +// not cleanly shut down. It is written on startup and deleted on clean shutdown, +// following the same pattern as the sharky .DIRTY file. +type stamperDirtyItem struct{} + +func (s *stamperDirtyItem) ID() string { return "dirty" } +func (s *stamperDirtyItem) Namespace() string { return "stamper" } +func (s *stamperDirtyItem) Marshal() ([]byte, error) { return []byte{0}, nil } +func (s *stamperDirtyItem) Unmarshal(_ []byte) error { return nil } +func (s *stamperDirtyItem) Clone() storage.Item { return &stamperDirtyItem{} } +func (s stamperDirtyItem) String() string { return "stamper/dirty" } + func overlayNonceExists(s storage.StateStorer) ([]byte, bool, error) { nonce := make([]byte, 32) if err := s.Get(overlayNonce, &nonce); err != nil { diff --git a/pkg/postage/cleanshutdown.go b/pkg/postage/cleanshutdown.go deleted file mode 100644 index db6556405c8..00000000000 --- a/pkg/postage/cleanshutdown.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package postage - -import "github.com/ethersphere/bee/v2/pkg/storage" - -// cleanShutdownItem is a storage.Item implementation for marking graceful shutdown. -type cleanShutdownItem struct{} - -// ID implements the storage.Item interface. -func (c *cleanShutdownItem) ID() string { - return "clean_shutdown" -} - -// Namespace implements the storage.Item interface. -func (c *cleanShutdownItem) Namespace() string { - return "postage" -} - -// Marshal implements the storage.Item interface. -func (c *cleanShutdownItem) Marshal() ([]byte, error) { - return []byte{0}, nil -} - -// Unmarshal implements the storage.Item interface. -func (c *cleanShutdownItem) Unmarshal(data []byte) error { - return nil -} - -// Clone implements the storage.Item interface. -func (c *cleanShutdownItem) Clone() storage.Item { - if c == nil { - return nil - } - return &cleanShutdownItem{} -} - -// String implements the fmt.Stringer interface. -func (c cleanShutdownItem) String() string { - return "postage/clean_shutdown" -} diff --git a/pkg/postage/service.go b/pkg/postage/service.go index d6a6268a5cf..9b14bad65cb 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -28,6 +28,11 @@ const ( blockThreshold = 10 ) +const ( + // stampIssuerSaveInterval is how often dirty stamp issuers are flushed to disk. + stampIssuerSaveInterval = time.Minute +) + var ( // ErrNotFound is the error returned when issuer with given batch ID does not exist. ErrNotFound = errors.New("not found") @@ -60,8 +65,9 @@ type service struct { done chan struct{} } -// NewService constructs a new Service. -func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64) (Service, error) { +// NewService constructs a new Service. wasClean indicates whether the previous +// shutdown was graceful; if false, bucket counts are recovered from the stamp store. +func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64, wasClean bool) (Service, error) { s := &service{ logger: logger.WithName(loggerName).Register(), store: store, @@ -85,15 +91,11 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha return nil, err } - if err := s.store.Get(&cleanShutdownItem{}); err != nil { - if errors.Is(err, storage.ErrNotFound) { - s.logger.Info("recovering bucket counts from stamper store") - if err := s.recoverBuckets(); err != nil { - s.logger.Error(err, "postage stamper store recovery failed") - } + if !wasClean { + s.logger.Info("recovering bucket counts from stamper store") + if err := s.recoverBuckets(); err != nil { + s.logger.Error(err, "postage stamper store recovery failed") } - } else { - _ = s.store.Delete(&cleanShutdownItem{}) } go s.run() @@ -115,7 +117,7 @@ func (s *service) recoverBuckets() error { if err := issuer.recover(item.BatchIndex); err != nil { s.logger.Error(err, "postage recovery of bucket count failed") } - issuer.SetDirty(true) + issuer.setDirty(true) break } } @@ -126,7 +128,7 @@ func (s *service) recoverBuckets() error { func (s *service) run() { defer close(s.done) // using 1 minute to significantly reduce disk writes - ticker := time.NewTicker(time.Minute) + ticker := time.NewTicker(stampIssuerSaveInterval) defer ticker.Stop() for { @@ -140,7 +142,7 @@ func (s *service) run() { s.mtx.Unlock() for _, issuer := range issuers { - if issuer.IsDirty() { + if issuer.isDirty() { if err := s.save(issuer); err != nil { s.logger.Error(err, "failed to save stamp issuer") } @@ -236,7 +238,7 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e return nil, nil, ErrNotUsable } return st, func() error { - st.SetDirty(true) + st.setDirty(true) return nil }, nil } @@ -266,13 +268,10 @@ func (ps *service) Close() error { defer ps.mtx.Unlock() var err error for _, issuer := range ps.issuers { - if issuer.IsDirty() { + if issuer.isDirty() { err = errors.Join(err, ps.save(issuer)) } } - if err == nil { - err = ps.store.Put(&cleanShutdownItem{}) - } return err } diff --git a/pkg/postage/service_test.go b/pkg/postage/service_test.go index a723befbc98..a1d06eaac57 100644 --- a/pkg/postage/service_test.go +++ b/pkg/postage/service_test.go @@ -32,7 +32,7 @@ func TestSaveLoad(t *testing.T) { defer store.Close() pstore := pstoremock.New() saved := func(id int64) postage.Service { - ps, err := postage.NewService(log.Noop, store, pstore, id) + ps, err := postage.NewService(log.Noop, store, pstore, id, true) if err != nil { t.Fatal(err) } @@ -48,7 +48,7 @@ func TestSaveLoad(t *testing.T) { return ps } loaded := func(id int64) postage.Service { - ps, err := postage.NewService(log.Noop, store, pstore, id) + ps, err := postage.NewService(log.Noop, store, pstore, id, true) if err != nil { t.Fatal(err) } @@ -88,7 +88,7 @@ func TestGetStampIssuer(t *testing.T) { } validBlockNumber := testChainState.Block - uint64(postage.BlockThreshold+1) pstore := pstoremock.New(pstoremock.WithChainState(testChainState)) - ps, err := postage.NewService(log.Noop, store, pstore, chainID) + ps, err := postage.NewService(log.Noop, store, pstore, chainID, true) if err != nil { t.Fatal(err) } @@ -227,7 +227,7 @@ func TestSetExpired(t *testing.T) { return bytes.Equal(b, batch), nil })) - ps, err := postage.NewService(log.Noop, store, pstore, 1) + ps, err := postage.NewService(log.Noop, store, pstore, 1, true) if err != nil { t.Fatal(err) } @@ -301,3 +301,68 @@ func TestSetExpired(t *testing.T) { testutil.CleanupCloser(t, ps) } + +// TestCrashRecovery verifies that bucket counts are restored from stamp items +// when the service starts after an unclean shutdown (wasClean=false). +func TestCrashRecovery(t *testing.T) { + t.Parallel() + + store := inmemstore.New() + defer store.Close() + pstore := pstoremock.New() + + issuer := newTestStampIssuer(t, 1000) + batchID := issuer.ID() + + // Pick two random chunk addresses and compute their bucket indices. + chunkAddr0 := swarm.RandAddress(t) + chunkAddr1 := swarm.RandAddress(t) + bIdx0 := postage.ToBucket(issuer.BucketDepth(), chunkAddr0) + bIdx1 := postage.ToBucket(issuer.BucketDepth(), chunkAddr1) + + // Write StampItems directly, simulating stamps issued before a crash + // without the issuer bucket state being saved. + // bIdx0: issued at collision count 2 → bucket should recover to 3 + // bIdx1: issued at collision count 0 → bucket should recover to 1 + items := []*postage.StampItem{ + postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr0).WithBatchIndex(postage.IndexToBytes(bIdx0, 2)), + postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr1).WithBatchIndex(postage.IndexToBytes(bIdx1, 0)), + } + for _, item := range items { + if err := store.Put(item); err != nil { + t.Fatal(err) + } + } + + // Save the issuer with zero bucket counts to the store. + ps, err := postage.NewService(log.Noop, store, pstore, 1, true) + if err != nil { + t.Fatal(err) + } + if err := ps.Add(issuer); err != nil { + t.Fatal(err) + } + if err := ps.Close(); err != nil { + t.Fatal(err) + } + + // Restart with wasClean=false — should trigger bucket recovery. + ps2, err := postage.NewService(log.Noop, store, pstore, 1, false) + if err != nil { + t.Fatal(err) + } + defer ps2.Close() + + issuers := ps2.StampIssuers() + if len(issuers) != 1 { + t.Fatalf("expected 1 issuer, got %d", len(issuers)) + } + + buckets := issuers[0].Buckets() + if buckets[bIdx0] != 3 { + t.Errorf("bucket %d: want 3, got %d", bIdx0, buckets[bIdx0]) + } + if buckets[bIdx1] != 1 { + t.Errorf("bucket %d: want 1, got %d", bIdx1, buckets[bIdx1]) + } +} diff --git a/pkg/postage/stampissuer.go b/pkg/postage/stampissuer.go index f6036ed45ea..87686f4d5bc 100644 --- a/pkg/postage/stampissuer.go +++ b/pkg/postage/stampissuer.go @@ -269,15 +269,15 @@ func (si *StampIssuer) Buckets() []uint32 { return b } -// SetDirty sets the dirty flag of the StampIssuer indicating it has unsaved bucket changes. -func (si *StampIssuer) SetDirty(dirty bool) { +// setDirty sets the dirty flag of the StampIssuer indicating it has unsaved bucket changes. +func (si *StampIssuer) setDirty(dirty bool) { si.mtx.Lock() defer si.mtx.Unlock() si.dirty = dirty } -// IsDirty returns the dirty flag of the StampIssuer. -func (si *StampIssuer) IsDirty() bool { +// isDirty returns the dirty flag of the StampIssuer. +func (si *StampIssuer) isDirty() bool { si.mtx.Lock() defer si.mtx.Unlock() return si.dirty From d751084ede5f1f748a232d4b4ff47ac6aeae815d Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 2 Apr 2026 12:39:32 +0300 Subject: [PATCH 11/13] fix: move stamper dirty marker management into init stamper store --- pkg/node/node.go | 22 +--------------- pkg/node/statestore.go | 58 ++++++++++++++++++++++++++++++------------ pkg/postage/service.go | 1 - 3 files changed, 43 insertions(+), 38 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index c79481e348b..120587a9270 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -100,7 +100,6 @@ type Bee struct { tracerCloser io.Closer stateStoreCloser io.Closer stamperStoreCloser io.Closer - stamperCleanShutdown func() error localstoreCloser io.Closer topologyCloser io.Closer topologyHalter topology.Halter @@ -439,7 +438,7 @@ func NewBee( } }(probe) - stamperStore, err := InitStamperStore(logger, o.DataDir, stateStore) + stamperStore, wasClean, err := InitStamperStore(logger, o.DataDir, stateStore) if err != nil { return nil, fmt.Errorf("failed to initialize stamper store: %w", err) } @@ -661,20 +660,6 @@ func NewBee( b.p2pService = p2ps b.p2pHalter = p2ps - dirtyItem := &stamperDirtyItem{} - wasClean := false - if err := stamperStore.Get(dirtyItem); err != nil { - if !errors.Is(err, storage.ErrNotFound) { - return nil, fmt.Errorf("stamper store dirty check: %w", err) - } - // marker absent: first run or previous clean shutdown - wasClean = true - } - if err := stamperStore.Put(dirtyItem); err != nil { - return nil, fmt.Errorf("stamper store dirty marker: %w", err) - } - b.stamperCleanShutdown = func() error { return stamperStore.Delete(dirtyItem) } - post, err := postage.NewService(logger, stamperStore, batchStore, chainID, wasClean) if err != nil { return nil, fmt.Errorf("postage service: %w", err) @@ -1445,11 +1430,6 @@ func (b *Bee) Shutdown() error { tryClose(b.topologyCloser, "topology driver") tryClose(b.storageIncetivesCloser, "storage incentives agent") tryClose(b.stateStoreCloser, "statestore") - if b.stamperCleanShutdown != nil { - if err := b.stamperCleanShutdown(); err != nil { - mErr = errors.Join(mErr, fmt.Errorf("stamper clean shutdown: %w", err)) - } - } tryClose(b.stamperStoreCloser, "stamperstore") tryClose(b.localstoreCloser, "localstore") tryClose(b.resolverCloser, "resolver service") diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index 8c5e0330dc4..ca8705e2000 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -42,21 +42,59 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st return stateStore, caching, err } +// stamperDirtyItem is a storage.Item used to mark that the stamper store was +// not cleanly shut down. It is written on startup and deleted on clean shutdown, +// following the same pattern as the sharky .DIRTY file. +type stamperDirtyItem struct{} + +func (s *stamperDirtyItem) ID() string { return "dirty" } +func (s *stamperDirtyItem) Namespace() string { return "stamper" } +func (s *stamperDirtyItem) Marshal() ([]byte, error) { return []byte{0}, nil } +func (s *stamperDirtyItem) Unmarshal(_ []byte) error { return nil } +func (s *stamperDirtyItem) Clone() storage.Item { return &stamperDirtyItem{} } +func (s stamperDirtyItem) String() string { return "stamper/dirty" } + +// stamperStore wraps a storage.Store and manages the dirty marker lifecycle. +type stamperStore struct { + storage.Store +} + +// Close deletes the dirty marker then closes the underlying store, indicating +// a clean shutdown. +func (s *stamperStore) Close() error { + return errors.Join(s.Delete(&stamperDirtyItem{}), s.Store.Close()) +} + // InitStamperStore will create new stamper store with the given path to the // data directory. When given an empty directory path, the function will instead // initialize an in-memory state store that will not be persisted. -func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, error) { +// It also checks and manages the dirty marker to detect unclean shutdowns. +// The returned bool indicates whether the previous shutdown was clean. +func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, bool, error) { if dataDir == "" { logger.Warning("using in-mem stamper store, no node state will be persisted") } else { dataDir = filepath.Join(dataDir, "stamperstore") } - stamperStore, err := leveldbstore.New(dataDir, nil) + store, err := leveldbstore.New(dataDir, nil) if err != nil { - return nil, err + return nil, false, err } - return stamperStore, nil + dirtyItem := &stamperDirtyItem{} + wasClean := false + if err := store.Get(dirtyItem); err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return nil, false, fmt.Errorf("stamper store dirty check: %w", err) + } + // marker absent: first run or previous clean shutdown + wasClean = true + } + if err := store.Put(dirtyItem); err != nil { + return nil, false, fmt.Errorf("stamper store dirty marker: %w", err) + } + + return &stamperStore{Store: store}, wasClean, nil } const ( @@ -83,18 +121,6 @@ func checkOverlay(storer storage.StateStorer, overlay swarm.Address) error { return nil } -// stamperDirtyItem is a storage.Item used to mark that the stamper store was -// not cleanly shut down. It is written on startup and deleted on clean shutdown, -// following the same pattern as the sharky .DIRTY file. -type stamperDirtyItem struct{} - -func (s *stamperDirtyItem) ID() string { return "dirty" } -func (s *stamperDirtyItem) Namespace() string { return "stamper" } -func (s *stamperDirtyItem) Marshal() ([]byte, error) { return []byte{0}, nil } -func (s *stamperDirtyItem) Unmarshal(_ []byte) error { return nil } -func (s *stamperDirtyItem) Clone() storage.Item { return &stamperDirtyItem{} } -func (s stamperDirtyItem) String() string { return "stamper/dirty" } - func overlayNonceExists(s storage.StateStorer) ([]byte, bool, error) { nonce := make([]byte, 32) if err := s.Get(overlayNonce, &nonce); err != nil { diff --git a/pkg/postage/service.go b/pkg/postage/service.go index 9b14bad65cb..9f34ddc2485 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -117,7 +117,6 @@ func (s *service) recoverBuckets() error { if err := issuer.recover(item.BatchIndex); err != nil { s.logger.Error(err, "postage recovery of bucket count failed") } - issuer.setDirty(true) break } } From 870daa132ed2d907584e7b6a9a7b4f76b9e06ed7 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Fri, 3 Apr 2026 12:48:44 +0300 Subject: [PATCH 12/13] fix: move dirty shutdown marker into leveldbstore --- pkg/node/statestore.go | 46 ++----------------- .../storeadapter/storeadapter_test.go | 2 +- pkg/storage/cache/cache_test.go | 2 +- pkg/storage/leveldbstore/store.go | 24 ++++++++-- pkg/storage/leveldbstore/store_test.go | 8 ++-- .../internal/transaction/transaction_test.go | 2 +- pkg/storer/migration/step_05_test.go | 2 +- pkg/storer/migration/step_06_test.go | 2 +- pkg/storer/storer.go | 4 +- 9 files changed, 34 insertions(+), 58 deletions(-) diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index ca8705e2000..a526b924985 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -27,7 +27,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st } else { dataDir = filepath.Join(dataDir, "statestore") } - ldb, err := leveldbstore.New(dataDir, nil) + ldb, _, err := leveldbstore.New(dataDir, nil) if err != nil { return nil, nil, err } @@ -42,59 +42,21 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st return stateStore, caching, err } -// stamperDirtyItem is a storage.Item used to mark that the stamper store was -// not cleanly shut down. It is written on startup and deleted on clean shutdown, -// following the same pattern as the sharky .DIRTY file. -type stamperDirtyItem struct{} - -func (s *stamperDirtyItem) ID() string { return "dirty" } -func (s *stamperDirtyItem) Namespace() string { return "stamper" } -func (s *stamperDirtyItem) Marshal() ([]byte, error) { return []byte{0}, nil } -func (s *stamperDirtyItem) Unmarshal(_ []byte) error { return nil } -func (s *stamperDirtyItem) Clone() storage.Item { return &stamperDirtyItem{} } -func (s stamperDirtyItem) String() string { return "stamper/dirty" } - -// stamperStore wraps a storage.Store and manages the dirty marker lifecycle. -type stamperStore struct { - storage.Store -} - -// Close deletes the dirty marker then closes the underlying store, indicating -// a clean shutdown. -func (s *stamperStore) Close() error { - return errors.Join(s.Delete(&stamperDirtyItem{}), s.Store.Close()) -} - // InitStamperStore will create new stamper store with the given path to the // data directory. When given an empty directory path, the function will instead // initialize an in-memory state store that will not be persisted. -// It also checks and manages the dirty marker to detect unclean shutdowns. -// The returned bool indicates whether the previous shutdown was clean. +// The returned bool indicates whether the previous shutdown was unclean (dirty). func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, bool, error) { if dataDir == "" { logger.Warning("using in-mem stamper store, no node state will be persisted") } else { dataDir = filepath.Join(dataDir, "stamperstore") } - store, err := leveldbstore.New(dataDir, nil) + store, dirty, err := leveldbstore.New(dataDir, nil) if err != nil { return nil, false, err } - - dirtyItem := &stamperDirtyItem{} - wasClean := false - if err := store.Get(dirtyItem); err != nil { - if !errors.Is(err, storage.ErrNotFound) { - return nil, false, fmt.Errorf("stamper store dirty check: %w", err) - } - // marker absent: first run or previous clean shutdown - wasClean = true - } - if err := store.Put(dirtyItem); err != nil { - return nil, false, fmt.Errorf("stamper store dirty marker: %w", err) - } - - return &stamperStore{Store: store}, wasClean, nil + return store, dirty, nil } const ( diff --git a/pkg/statestore/storeadapter/storeadapter_test.go b/pkg/statestore/storeadapter/storeadapter_test.go index f90db7b7ce1..5e0a51cd77d 100644 --- a/pkg/statestore/storeadapter/storeadapter_test.go +++ b/pkg/statestore/storeadapter/storeadapter_test.go @@ -36,7 +36,7 @@ func TestStateStoreAdapter(t *testing.T) { test.RunPersist(t, func(t *testing.T, dir string) storage.StateStorer { t.Helper() - leveldb, err := leveldbstore.New(dir, nil) + leveldb, _, err := leveldbstore.New(dir, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/cache/cache_test.go b/pkg/storage/cache/cache_test.go index dd9ec24d611..9cbd0478cc8 100644 --- a/pkg/storage/cache/cache_test.go +++ b/pkg/storage/cache/cache_test.go @@ -16,7 +16,7 @@ import ( func TestCache(t *testing.T) { t.Parallel() - store, err := leveldbstore.New(t.TempDir(), nil) + store, _, err := leveldbstore.New(t.TempDir(), nil) if err != nil { t.Fatalf("create store failed: %v", err) } diff --git a/pkg/storage/leveldbstore/store.go b/pkg/storage/leveldbstore/store.go index bf410d54c00..c062200db82 100644 --- a/pkg/storage/leveldbstore/store.go +++ b/pkg/storage/leveldbstore/store.go @@ -18,7 +18,11 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -const separator = "/" +const ( + separator = "/" + // dirtyKey is written on open and deleted on clean close to detect unclean shutdowns. + dirtyKey = ".store-dirty-shutdown" +) // key returns the Item identifier for the leveldb storage. func key(item storage.Key) []byte { @@ -56,7 +60,8 @@ type Store struct { // New returns a new store the backed by leveldb. // If path == "", the leveldb will run with in memory backend storage. -func New(path string, opts *opt.Options) (*Store, error) { +// The returned bool indicates whether the previous shutdown was unclean (dirty). +func New(path string, opts *opt.Options) (*Store, bool, error) { var ( err error db *leveldb.DB @@ -72,13 +77,22 @@ func New(path string, opts *opt.Options) (*Store, error) { } if err != nil { - return nil, err + return nil, false, err + } + + dirty, err := db.Has([]byte(dirtyKey), nil) + if err != nil { + return nil, false, fmt.Errorf("has dirty record: %w", err) + } + + if err = db.Put([]byte(dirtyKey), []byte{}, nil); err != nil { + return nil, false, fmt.Errorf("put dirty record: %w", err) } return &Store{ db: db, path: path, - }, nil + }, dirty, nil } // DB implements the Storer interface. @@ -88,7 +102,7 @@ func (s *Store) DB() *leveldb.DB { // Close implements the storage.Store interface. func (s *Store) Close() (err error) { - return s.db.Close() + return errors.Join(s.db.Delete([]byte(dirtyKey), nil), s.db.Close()) } // Get implements the storage.Store interface. diff --git a/pkg/storage/leveldbstore/store_test.go b/pkg/storage/leveldbstore/store_test.go index 7e7bc809213..d5826853bd2 100644 --- a/pkg/storage/leveldbstore/store_test.go +++ b/pkg/storage/leveldbstore/store_test.go @@ -15,7 +15,7 @@ import ( func TestStore(t *testing.T) { t.Parallel() - store, err := leveldbstore.New(t.TempDir(), nil) + store, _, err := leveldbstore.New(t.TempDir(), nil) if err != nil { t.Fatalf("create store failed: %v", err) } @@ -24,7 +24,7 @@ func TestStore(t *testing.T) { } func BenchmarkStore(b *testing.B) { - st, err := leveldbstore.New("", &opt.Options{ + st, _, err := leveldbstore.New("", &opt.Options{ Compression: opt.SnappyCompression, }) if err != nil { @@ -37,7 +37,7 @@ func BenchmarkStore(b *testing.B) { func TestBatchedStore(t *testing.T) { t.Parallel() - st, err := leveldbstore.New("", nil) + st, _, err := leveldbstore.New("", nil) if err != nil { t.Fatalf("create store failed: %v", err) } @@ -46,7 +46,7 @@ func TestBatchedStore(t *testing.T) { } func BenchmarkBatchedStore(b *testing.B) { - st, err := leveldbstore.New("", &opt.Options{ + st, _, err := leveldbstore.New("", &opt.Options{ Compression: opt.SnappyCompression, }) if err != nil { diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index 8b72864f385..28008e20a9d 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -35,7 +35,7 @@ func Test_TransactionStorage(t *testing.T) { sharkyStore, err := sharky.New(&dirFS{basedir: t.TempDir()}, 32, swarm.SocMaxChunkSize) assert.NoError(t, err) - store, err := leveldbstore.New("", nil) + store, _, err := leveldbstore.New("", nil) assert.NoError(t, err) st := transaction.NewStorage(sharkyStore, store) diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go index aeacd310f3e..91404d16827 100644 --- a/pkg/storer/migration/step_05_test.go +++ b/pkg/storer/migration/step_05_test.go @@ -29,7 +29,7 @@ func Test_Step_05(t *testing.T) { sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) assert.NoError(t, err) - lstore, err := leveldbstore.New("", nil) + lstore, _, err := leveldbstore.New("", nil) assert.NoError(t, err) store := transaction.NewStorage(sharkyStore, lstore) diff --git a/pkg/storer/migration/step_06_test.go b/pkg/storer/migration/step_06_test.go index b5a5d3ecc7a..e3a2f170f42 100644 --- a/pkg/storer/migration/step_06_test.go +++ b/pkg/storer/migration/step_06_test.go @@ -35,7 +35,7 @@ func Test_Step_06(t *testing.T) { sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) require.NoError(t, err) - lstore, err := leveldbstore.New("", nil) + lstore, _, err := leveldbstore.New("", nil) require.NoError(t, err) store := transaction.NewStorage(sharkyStore, lstore) diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index d4a41c73491..6425213ca76 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -220,7 +220,7 @@ func closer(closers ...io.Closer) io.Closer { } func initInmemRepository() (transaction.Storage, io.Closer, error) { - store, err := leveldbstore.New("", nil) + store, _, err := leveldbstore.New("", nil) if err != nil { return nil, nil, fmt.Errorf("failed creating inmem levelDB index store: %w", err) } @@ -263,7 +263,7 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) { return nil, err } } - store, err := leveldbstore.New(path.Join(basePath, "indexstore"), &opt.Options{ + store, _, err := leveldbstore.New(path.Join(basePath, "indexstore"), &opt.Options{ OpenFilesCacheCapacity: int(opts.LdbOpenFilesLimit), BlockCacheCapacity: int(opts.LdbBlockCacheCapacity), WriteBuffer: int(opts.LdbWriteBufferSize), From 5dfe4448a9a19f884df27c8249e5d7feb32eff13 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Fri, 3 Apr 2026 12:54:24 +0300 Subject: [PATCH 13/13] test: add dirty marker tests for leveldbstore --- pkg/storage/leveldbstore/store_test.go | 67 ++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/pkg/storage/leveldbstore/store_test.go b/pkg/storage/leveldbstore/store_test.go index d5826853bd2..e4a5c98ad49 100644 --- a/pkg/storage/leveldbstore/store_test.go +++ b/pkg/storage/leveldbstore/store_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" "github.com/ethersphere/bee/v2/pkg/storage/storagetest" + "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" ) @@ -55,3 +56,69 @@ func BenchmarkBatchedStore(b *testing.B) { b.Cleanup(func() { _ = st.Close() }) storagetest.BenchmarkBatchedStore(b, st) } + +func TestDirtyMarker(t *testing.T) { + t.Parallel() + + t.Run("clean on first open", func(t *testing.T) { + t.Parallel() + st, dirty, err := leveldbstore.New(t.TempDir(), nil) + if err != nil { + t.Fatalf("open failed: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + if dirty { + t.Fatal("expected clean on first open, got dirty") + } + }) + + t.Run("clean after clean close", func(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + st, _, err := leveldbstore.New(dir, nil) + if err != nil { + t.Fatalf("first open failed: %v", err) + } + if err := st.Close(); err != nil { + t.Fatalf("close failed: %v", err) + } + + st, dirty, err := leveldbstore.New(dir, nil) + if err != nil { + t.Fatalf("second open failed: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + if dirty { + t.Fatal("expected clean after clean close, got dirty") + } + }) + + t.Run("dirty after crash", func(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + // Simulate a previous session that crashed: open the raw leveldb + // and write the dirty marker directly, then close without deleting it. + db, err := leveldb.OpenFile(dir, nil) + if err != nil { + t.Fatalf("raw open failed: %v", err) + } + if err := db.Put([]byte(".store-dirty-shutdown"), []byte{}, nil); err != nil { + t.Fatalf("write dirty key failed: %v", err) + } + if err := db.Close(); err != nil { + t.Fatalf("raw close failed: %v", err) + } + + // Now open via leveldbstore — should detect the marker as dirty. + st, dirty, err := leveldbstore.New(dir, nil) + if err != nil { + t.Fatalf("open failed: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + if !dirty { + t.Fatal("expected dirty after crash, got clean") + } + }) +}