Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func NewBee(
}
}(probe)

stamperStore, err := InitStamperStore(logger, o.DataDir, stateStore)
stamperStore, wasClean, err := InitStamperStore(logger, o.DataDir, stateStore)
if err != nil {
return nil, fmt.Errorf("failed to initialize stamper store: %w", err)
}
Expand Down Expand Up @@ -660,7 +660,7 @@ func NewBee(
b.p2pService = p2ps
b.p2pHalter = p2ps

post, err := postage.NewService(logger, stamperStore, batchStore, chainID)
post, err := postage.NewService(logger, stamperStore, batchStore, chainID, wasClean)
if err != nil {
return nil, fmt.Errorf("postage service: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
} else {
dataDir = filepath.Join(dataDir, "statestore")
}
ldb, err := leveldbstore.New(dataDir, nil)
ldb, _, err := leveldbstore.New(dataDir, nil)
if err != nil {
return nil, nil, err
}
Expand All @@ -45,18 +45,18 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
// InitStamperStore will create new stamper store with the given path to the
// data directory. When given an empty directory path, the function will instead
// initialize an in-memory state store that will not be persisted.
func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, error) {
// The returned bool indicates whether the previous shutdown was unclean (dirty).
func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, bool, error) {
if dataDir == "" {
logger.Warning("using in-mem stamper store, no node state will be persisted")
} else {
dataDir = filepath.Join(dataDir, "stamperstore")
}
stamperStore, err := leveldbstore.New(dataDir, nil)
store, dirty, err := leveldbstore.New(dataDir, nil)
if err != nil {
return nil, err
return nil, false, err
}

return stamperStore, nil
return store, dirty, nil
}

const (
Expand Down
93 changes: 86 additions & 7 deletions pkg/postage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"math/big"
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/storage"
Expand All @@ -27,6 +28,11 @@ const (
blockThreshold = 10
)

const (
// stampIssuerSaveInterval is how often dirty stamp issuers are flushed to disk.
stampIssuerSaveInterval = time.Minute
)

var (
// ErrNotFound is the error returned when issuer with given batch ID does not exist.
ErrNotFound = errors.New("not found")
Expand Down Expand Up @@ -54,18 +60,24 @@ type service struct {
postageStore Storer
chainID int64
issuers []*StampIssuer

quit chan struct{}
done chan struct{}
}

// NewService constructs a new Service.
func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64) (Service, error) {
// NewService constructs a new Service. wasClean indicates whether the previous
// shutdown was graceful; if false, bucket counts are recovered from the stamp store.
func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64, wasClean bool) (Service, error) {
s := &service{
logger: logger.WithName(loggerName).Register(),
store: store,
postageStore: postageStore,
chainID: chainID,
quit: make(chan struct{}),
done: make(chan struct{}),
}

return s, s.store.Iterate(
err := s.store.Iterate(
storage.Query{
Factory: func() storage.Item {
return new(StampIssuerItem)
Expand All @@ -75,6 +87,68 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha
_ = s.add(issuer)
return false, nil
})
if err != nil {
return nil, err
}

if !wasClean {
s.logger.Info("recovering bucket counts from stamper store")
if err := s.recoverBuckets(); err != nil {
s.logger.Error(err, "postage stamper store recovery failed")
}
}

go s.run()

return s, nil
}

func (s *service) recoverBuckets() error {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.store.Iterate(
storage.Query{
Factory: func() storage.Item { return new(StampItem) },
}, func(result storage.Result) (bool, error) {
item := result.Entry.(*StampItem)
for _, issuer := range s.issuers {
if bytes.Equal(issuer.data.BatchID, item.BatchID) {
if err := issuer.recover(item.BatchIndex); err != nil {
s.logger.Error(err, "postage recovery of bucket count failed")
}
break
}
}
return false, nil
})
}

func (s *service) run() {
defer close(s.done)
// using 1 minute to significantly reduce disk writes
ticker := time.NewTicker(stampIssuerSaveInterval)
defer ticker.Stop()

for {
select {
case <-s.quit:
return
case <-ticker.C:
s.mtx.Lock()
issuers := make([]*StampIssuer, len(s.issuers))
copy(issuers, s.issuers)
s.mtx.Unlock()

for _, issuer := range issuers {
if issuer.isDirty() {
if err := s.save(issuer); err != nil {
s.logger.Error(err, "failed to save stamp issuer")
}
}
}
}
}
}

// Add adds a stamp issuer to the active issuers.
Expand Down Expand Up @@ -163,9 +237,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e
return nil, nil, ErrNotUsable
}
return st, func() error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.save(st)
st.setDirty(true)
return nil
}, nil
}
}
Expand All @@ -182,15 +255,21 @@ func (ps *service) save(st *StampIssuer) error {
}); err != nil {
return err
}
st.dirty = false
return nil
}

func (ps *service) Close() error {
close(ps.quit)
<-ps.done

ps.mtx.Lock()
defer ps.mtx.Unlock()
var err error
for _, issuer := range ps.issuers {
err = errors.Join(err, ps.save(issuer))
if issuer.isDirty() {
err = errors.Join(err, ps.save(issuer))
}
}
return err
}
Expand Down
76 changes: 72 additions & 4 deletions pkg/postage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSaveLoad(t *testing.T) {
defer store.Close()
pstore := pstoremock.New()
saved := func(id int64) postage.Service {
ps, err := postage.NewService(log.Noop, store, pstore, id)
ps, err := postage.NewService(log.Noop, store, pstore, id, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -48,7 +48,7 @@ func TestSaveLoad(t *testing.T) {
return ps
}
loaded := func(id int64) postage.Service {
ps, err := postage.NewService(log.Noop, store, pstore, id)
ps, err := postage.NewService(log.Noop, store, pstore, id, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,6 +57,7 @@ func TestSaveLoad(t *testing.T) {
test := func(id int64) {
psS := saved(id)
psL := loaded(id)
defer psL.Close()

sMap := map[string]struct{}{}
stampIssuers := psS.StampIssuers()
Expand Down Expand Up @@ -87,10 +88,12 @@ func TestGetStampIssuer(t *testing.T) {
}
validBlockNumber := testChainState.Block - uint64(postage.BlockThreshold+1)
pstore := pstoremock.New(pstoremock.WithChainState(testChainState))
ps, err := postage.NewService(log.Noop, store, pstore, chainID)
ps, err := postage.NewService(log.Noop, store, pstore, chainID, true)
if err != nil {
t.Fatal(err)
}
defer ps.Close()

ids := make([][]byte, 8)
for i := range ids {
id := make([]byte, 32)
Expand Down Expand Up @@ -224,7 +227,7 @@ func TestSetExpired(t *testing.T) {
return bytes.Equal(b, batch), nil
}))

ps, err := postage.NewService(log.Noop, store, pstore, 1)
ps, err := postage.NewService(log.Noop, store, pstore, 1, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -298,3 +301,68 @@ func TestSetExpired(t *testing.T) {

testutil.CleanupCloser(t, ps)
}

// TestCrashRecovery verifies that bucket counts are restored from stamp items
// when the service starts after an unclean shutdown (wasClean=false).
func TestCrashRecovery(t *testing.T) {
t.Parallel()

store := inmemstore.New()
defer store.Close()
pstore := pstoremock.New()

issuer := newTestStampIssuer(t, 1000)
batchID := issuer.ID()

// Pick two random chunk addresses and compute their bucket indices.
chunkAddr0 := swarm.RandAddress(t)
chunkAddr1 := swarm.RandAddress(t)
bIdx0 := postage.ToBucket(issuer.BucketDepth(), chunkAddr0)
bIdx1 := postage.ToBucket(issuer.BucketDepth(), chunkAddr1)

// Write StampItems directly, simulating stamps issued before a crash
// without the issuer bucket state being saved.
// bIdx0: issued at collision count 2 → bucket should recover to 3
// bIdx1: issued at collision count 0 → bucket should recover to 1
items := []*postage.StampItem{
postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr0).WithBatchIndex(postage.IndexToBytes(bIdx0, 2)),
postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr1).WithBatchIndex(postage.IndexToBytes(bIdx1, 0)),
}
for _, item := range items {
if err := store.Put(item); err != nil {
t.Fatal(err)
}
}

// Save the issuer with zero bucket counts to the store.
ps, err := postage.NewService(log.Noop, store, pstore, 1, true)
if err != nil {
t.Fatal(err)
}
if err := ps.Add(issuer); err != nil {
t.Fatal(err)
}
if err := ps.Close(); err != nil {
t.Fatal(err)
}

// Restart with wasClean=false — should trigger bucket recovery.
ps2, err := postage.NewService(log.Noop, store, pstore, 1, false)
if err != nil {
t.Fatal(err)
}
defer ps2.Close()

issuers := ps2.StampIssuers()
if len(issuers) != 1 {
t.Fatalf("expected 1 issuer, got %d", len(issuers))
}

buckets := issuers[0].Buckets()
if buckets[bIdx0] != 3 {
t.Errorf("bucket %d: want 3, got %d", bIdx0, buckets[bIdx0])
}
if buckets[bIdx1] != 1 {
t.Errorf("bucket %d: want 1, got %d", bIdx1, buckets[bIdx1])
}
}
41 changes: 39 additions & 2 deletions pkg/postage/stampissuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (s stampIssuerData) Clone() stampIssuerData {
// A StampIssuer instance extends a batch with bucket collision tracking
// embedded in multiple Stampers, can be used concurrently.
type StampIssuer struct {
data stampIssuerData
mtx sync.Mutex
data stampIssuerData
dirty bool
mtx sync.Mutex
}

// NewStampIssuer constructs a StampIssuer as an extension of a batch for local
Expand Down Expand Up @@ -268,6 +269,42 @@ func (si *StampIssuer) Buckets() []uint32 {
return b
}

// setDirty sets the dirty flag of the StampIssuer indicating it has unsaved bucket changes.
func (si *StampIssuer) setDirty(dirty bool) {
si.mtx.Lock()
defer si.mtx.Unlock()
si.dirty = dirty
}

// isDirty returns the dirty flag of the StampIssuer.
func (si *StampIssuer) isDirty() bool {
si.mtx.Lock()
defer si.mtx.Unlock()
return si.dirty
}

// recover restores the bucket count from a stored batchIndex, used during crash recovery.
func (si *StampIssuer) recover(batchIndex []byte) error {
si.mtx.Lock()
defer si.mtx.Unlock()

bIdx, bCnt := BucketIndexFromBytes(batchIndex)
if bIdx >= uint32(len(si.data.Buckets)) {
return fmt.Errorf("bucket index %d out of bounds", bIdx)
}

// bCnt is the collision count WHEN the stamp was issued,
// meaning the bucket count has already reached AT LEAST bCnt + 1
if si.data.Buckets[bIdx] <= bCnt {
si.data.Buckets[bIdx] = bCnt + 1

if si.data.Buckets[bIdx] > si.data.MaxBucketCount {
si.data.MaxBucketCount = si.data.Buckets[bIdx]
}
}
return nil
}

// StampIssuerItem is a storage.Item implementation for StampIssuer.
type StampIssuerItem struct {
Issuer *StampIssuer
Expand Down
2 changes: 1 addition & 1 deletion pkg/statestore/storeadapter/storeadapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestStateStoreAdapter(t *testing.T) {
test.RunPersist(t, func(t *testing.T, dir string) storage.StateStorer {
t.Helper()

leveldb, err := leveldbstore.New(dir, nil)
leveldb, _, err := leveldbstore.New(dir, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestCache(t *testing.T) {
t.Parallel()

store, err := leveldbstore.New(t.TempDir(), nil)
store, _, err := leveldbstore.New(t.TempDir(), nil)
if err != nil {
t.Fatalf("create store failed: %v", err)
}
Expand Down
Loading
Loading