From 57ace783dbe6e3040b0e3b33517ddba38e328648 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Fri, 20 Mar 2026 11:55:18 +0200 Subject: [PATCH 1/6] fix: prevent chunk corruption on unclean shutdown --- pkg/sharky/slots.go | 10 +- pkg/sharky/store.go | 68 +++++++++ pkg/storer/crash_recovery_test.go | 133 ++++++++++++++++++ .../internal/transaction/transaction.go | 21 +++ pkg/storer/recover.go | 53 +++++-- 5 files changed, 272 insertions(+), 13 deletions(-) create mode 100644 pkg/storer/crash_recovery_test.go diff --git a/pkg/sharky/slots.go b/pkg/sharky/slots.go index 71d058f2b88..e8fd72e7571 100644 --- a/pkg/sharky/slots.go +++ b/pkg/sharky/slots.go @@ -40,11 +40,13 @@ func (sl *slots) load() (err error) { return err } -// save persists the free slot bitvector on disk (without closing) +// save persists the free slot bitvector on disk (without closing). +// slots only ever grow (extend is the only mutation), so sl.data is always >= +// the previous file size. Seeking to 0 and overwriting is therefore always +// safe: no stale tail bytes can survive. Truncate(0) is intentionally absent +// because truncating before the write creates a crash window where the file is +// empty; removing it eliminates that vulnerability. func (sl *slots) save() error { - if err := sl.file.Truncate(0); err != nil { - return err - } if _, err := sl.file.Seek(0, 0); err != nil { return err } diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go index d8947f9d042..5d1f8e3b248 100644 --- a/pkg/sharky/store.go +++ b/pkg/sharky/store.go @@ -22,6 +22,11 @@ var ( ErrQuitting = errors.New("quitting") ) +// pendingSync represents a caller waiting for a shard fsync to complete. +type pendingSync struct { + done chan error +} + // Store models the sharded fix-length blobstore // Design provides lockless sharding: // - shard choice responding to backpressure by running operation @@ -33,6 +38,7 @@ type Store struct { shards []*shard // shards wg *sync.WaitGroup // count started operations quit chan struct{} // quit channel + syncCh chan pendingSync // group commit: serialises fsync requests across concurrent transactions metrics metrics } @@ -49,6 +55,7 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { shards: make([]*shard, shardCnt), wg: &sync.WaitGroup{}, quit: make(chan struct{}), + syncCh: make(chan pendingSync), metrics: newMetrics(), } for i := range store.shards { @@ -60,9 +67,70 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { } store.metrics.ShardCount.Set(float64(len(store.shards))) + store.wg.Go(store.runSyncLoop) + return store, nil } +// runSyncLoop is a background goroutine that handles group commit for shard files. +// It waits for a sync request, drains all concurrently pending requests, issues +// a single fdatasync per shard file, then signals all waiters. This amortises +// the fsync cost across all transactions that are committing at the same time. +func (s *Store) runSyncLoop() { + for { + select { + case req := <-s.syncCh: + // Drain all other requests that arrived while we were waiting. + pending := []pendingSync{req} + drain: + for { + select { + case r := <-s.syncCh: + pending = append(pending, r) + default: + break drain + } + } + + var syncErr error + for _, sh := range s.shards { + if err := sh.file.Sync(); err != nil { + syncErr = errors.Join(syncErr, err) + } + } + + for _, p := range pending { + p.done <- syncErr + } + + case <-s.quit: + return + } + } +} + +// SyncWait blocks until all shard files written in the current transaction have +// been fsynced to disk. Multiple concurrent callers share a single fsync (group +// commit), so the cost is amortised under concurrent upload load. +func (s *Store) SyncWait(ctx context.Context) error { + req := pendingSync{done: make(chan error, 1)} + select { + case s.syncCh <- req: + case <-s.quit: + return ErrQuitting + case <-ctx.Done(): + return ctx.Err() + } + select { + case err := <-req.done: + return err + case <-s.quit: + return ErrQuitting + case <-ctx.Done(): + return ctx.Err() + } +} + // Close closes each shard and return incidental errors from each shard func (s *Store) Close() error { close(s.quit) diff --git a/pkg/storer/crash_recovery_test.go b/pkg/storer/crash_recovery_test.go new file mode 100644 index 00000000000..0b998e1a6ce --- /dev/null +++ b/pkg/storer/crash_recovery_test.go @@ -0,0 +1,133 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +// TestCrashRecoveryDataIntegrity reproduces the corruption scenario described in +// https://github.com/ethersphere/bee/issues/4737. +// +// Scenario: the context is cancelled while concurrent writes are in flight, +// simulating a SIGKILL or power loss during an active upload. The store is +// then closed (best-effort) and reopened, which triggers the .DIRTY recovery +// path. Every chunk that was acknowledged as committed (Put returned nil) must +// still be readable and have a valid content hash. +// +// Before the group-commit fix, Sharky's WriteAt was not fsynced before +// LevelDB committed the chunk location. A power loss in that window left +// LevelDB pointing at slots containing stale or zeroed bytes, causing +// cac.Valid to return false for those chunks. +// +// Run with a high -count to stress the timing window: +// +// go test -count=50 -run TestCrashRecoveryDataIntegrity ./pkg/storer/ + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing" + "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestCrashRecoveryDataIntegrity(t *testing.T) { + const iterations = 10 + for i := range iterations { + t.Run(fmt.Sprintf("iter%02d", i), func(t *testing.T) { + t.Parallel() + testCrashRecoveryOnce(t) + }) + } +} + +func testCrashRecoveryOnce(t *testing.T) { + t.Helper() + + basePath := t.TempDir() + baseAddr := swarm.RandAddress(t) + opts := dbTestOps(baseAddr, 0, nil, nil, time.Second) + + st, err := storer.New(context.Background(), basePath, opts) + if err != nil { + t.Fatalf("creating store: %v", err) + } + + const ( + nWorkers = 8 + chunksPerWorker = 64 + ) + + chunks := chunktesting.GenerateTestRandomChunks(nWorkers * chunksPerWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + committed []swarm.Chunk + mu sync.Mutex + wg sync.WaitGroup + ) + + // Each goroutine gets its own cache putter so concurrent transactions + // are independent – this is the exact pattern that triggers the race + // between Sharky WriteAt and LevelDB batch.Commit. + for w := range nWorkers { + wg.Add(1) + go func(batch []swarm.Chunk) { + defer wg.Done() + putter := st.Cache() + for _, ch := range batch { + if err := putter.Put(ctx, ch); err == nil { + mu.Lock() + committed = append(committed, ch) + mu.Unlock() + } + } + }(chunks[w*chunksPerWorker : (w+1)*chunksPerWorker]) + } + + // Let writes start, then cancel to simulate an abrupt shutdown. + time.AfterFunc(5*time.Millisecond, cancel) + wg.Wait() + + // Best-effort close; may return context errors for in-flight ops. + _ = st.Close() + + if len(committed) == 0 { + t.Skip("no chunks committed before cancellation; increase nWorkers or chunksPerWorker") + } + + // Reopen at the same path. The presence of .DIRTY triggers the recovery + // path which (with the fix) also hash-validates every chunk, evicting any + // corrupted entries from the index. + st2, err := storer.New(context.Background(), basePath, opts) + if err != nil { + t.Fatalf("reopening store after simulated crash: %v", err) + } + t.Cleanup(func() { _ = st2.Close() }) + + // Every chunk whose Put returned nil must be readable and have a valid + // content hash. A corrupted chunk means Sharky held stale/zeroed bytes + // that were never flushed before the LevelDB commit – the exact bug. + var corrupted int + for _, ch := range committed { + got, err := st2.Storage().ChunkStore().Get(context.Background(), ch.Address()) + if err != nil { + t.Errorf("committed chunk not found after recovery: %s", ch.Address()) + corrupted++ + continue + } + if !cac.Valid(got) { + t.Errorf("committed chunk has corrupted data after recovery: %s", ch.Address()) + corrupted++ + } + } + if corrupted > 0 { + t.Fatalf("%d/%d committed chunks corrupted after crash recovery", corrupted, len(committed)) + } +} diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index ae97e06ce3b..a57777c63fd 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -69,6 +69,7 @@ func NewStorage(sharky *sharky.Store, bstore storage.BatchStore) Storage { } type transaction struct { + ctx context.Context start time.Time batch storage.Batch indexstore storage.IndexStore @@ -92,6 +93,7 @@ func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) { sharky := &sharkyTrx{s.sharky, s.metrics, nil, nil} t := &transaction{ + ctx: ctx, start: time.Now(), batch: b, indexstore: index, @@ -172,6 +174,25 @@ func (t *transaction) Commit() (err error) { t.sharkyTrx.writtenLocs = nil }() + // Ensure Sharky data is durable before committing the index to LevelDB. + // Without this, a power loss after batch.Commit() but before the OS flushes + // the page cache leaves LevelDB pointing to slots with stale or zero data. + // Under concurrent writes, all in-flight transactions share a single fdatasync + // (group commit), so the cost is amortised and approaches zero under load. + if len(t.sharkyTrx.writtenLocs) > 0 { + h := handleMetric("sharky_sync", t.metrics) + err = t.sharkyTrx.sharky.SyncWait(t.ctx) + h(&err) + if err != nil { + for _, l := range t.sharkyTrx.writtenLocs { + if rerr := t.sharkyTrx.sharky.Release(context.TODO(), l); rerr != nil { + err = errors.Join(err, fmt.Errorf("failed releasing location during sync rollback %s: %w", l, rerr)) + } + } + return err + } + } + h := handleMetric("batch_commit", t.metrics) err = t.batch.Commit() h(&err) diff --git a/pkg/storer/recover.go b/pkg/storer/recover.go index 07d300e5d46..b2cdac49c89 100644 --- a/pkg/storer/recover.go +++ b/pkg/storer/recover.go @@ -12,7 +12,10 @@ import ( "path/filepath" "time" + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/sharky" + "github.com/ethersphere/bee/v2/pkg/soc" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -51,29 +54,61 @@ func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.St } }() - c := chunkstore.IterateLocations(ctx, store) - - if err := addLocations(c, sharkyRecover); err != nil { + if err := validateAndAddLocations(ctx, store, sharkyRecover, logger); err != nil { return closer, err } return closer, nil } -func addLocations(locationResultC <-chan chunkstore.LocationResult, sharkyRecover *sharky.Recovery) error { - for res := range locationResultC { - if res.Err != nil { - return res.Err +// validateAndAddLocations iterates every chunk index entry, reads its data from +// Sharky, and validates the content hash. Valid chunks are registered with the +// recovery so their slots are preserved. Corrupted entries (unreadable data or +// hash mismatch) are logged, excluded from the recovery bitmap, and deleted from +// the index store so the node starts clean without serving invalid data. +func validateAndAddLocations(ctx context.Context, store storage.Store, sharkyRecover *sharky.Recovery, logger log.Logger) error { + var corrupted []*chunkstore.RetrievalIndexItem + + err := chunkstore.IterateItems(store, func(item *chunkstore.RetrievalIndexItem) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + buf := make([]byte, item.Location.Length) + if err := sharkyRecover.Read(ctx, item.Location, buf); err != nil { + logger.Warning("recovery: unreadable chunk, marking corrupted", "address", item.Address, "err", err) + corrupted = append(corrupted, item) + return nil } - if err := sharkyRecover.Add(res.Location); err != nil { - return err + ch := swarm.NewChunk(item.Address, buf) + if !cac.Valid(ch) && !soc.Valid(ch) { + logger.Warning("recovery: invalid chunk hash, marking corrupted", "address", item.Address) + corrupted = append(corrupted, item) + return nil } + + return sharkyRecover.Add(item.Location) + }) + if err != nil { + return err } if err := sharkyRecover.Save(); err != nil { return err } + for _, item := range corrupted { + if err := store.Delete(item); err != nil { + logger.Error(err, "recovery: failed deleting corrupted chunk index entry", "address", item.Address) + } + } + + if len(corrupted) > 0 { + logger.Info("recovery: removed corrupted chunk index entries", "count", len(corrupted)) + } + return nil } From 0c53892fc1a498fadc67e7736bcf1eb9873b2f97 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Fri, 20 Mar 2026 13:11:47 +0200 Subject: [PATCH 2/6] test: add crash corruption regression test for initial issue --- pkg/storer/crash_recovery_test.go | 133 ------------------ .../internal/transaction/transaction_test.go | 106 ++++++++++++++ 2 files changed, 106 insertions(+), 133 deletions(-) delete mode 100644 pkg/storer/crash_recovery_test.go diff --git a/pkg/storer/crash_recovery_test.go b/pkg/storer/crash_recovery_test.go deleted file mode 100644 index 0b998e1a6ce..00000000000 --- a/pkg/storer/crash_recovery_test.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storer_test - -// TestCrashRecoveryDataIntegrity reproduces the corruption scenario described in -// https://github.com/ethersphere/bee/issues/4737. -// -// Scenario: the context is cancelled while concurrent writes are in flight, -// simulating a SIGKILL or power loss during an active upload. The store is -// then closed (best-effort) and reopened, which triggers the .DIRTY recovery -// path. Every chunk that was acknowledged as committed (Put returned nil) must -// still be readable and have a valid content hash. -// -// Before the group-commit fix, Sharky's WriteAt was not fsynced before -// LevelDB committed the chunk location. A power loss in that window left -// LevelDB pointing at slots containing stale or zeroed bytes, causing -// cac.Valid to return false for those chunks. -// -// Run with a high -count to stress the timing window: -// -// go test -count=50 -run TestCrashRecoveryDataIntegrity ./pkg/storer/ - -import ( - "context" - "fmt" - "sync" - "testing" - "time" - - "github.com/ethersphere/bee/v2/pkg/cac" - chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing" - "github.com/ethersphere/bee/v2/pkg/storer" - "github.com/ethersphere/bee/v2/pkg/swarm" -) - -func TestCrashRecoveryDataIntegrity(t *testing.T) { - const iterations = 10 - for i := range iterations { - t.Run(fmt.Sprintf("iter%02d", i), func(t *testing.T) { - t.Parallel() - testCrashRecoveryOnce(t) - }) - } -} - -func testCrashRecoveryOnce(t *testing.T) { - t.Helper() - - basePath := t.TempDir() - baseAddr := swarm.RandAddress(t) - opts := dbTestOps(baseAddr, 0, nil, nil, time.Second) - - st, err := storer.New(context.Background(), basePath, opts) - if err != nil { - t.Fatalf("creating store: %v", err) - } - - const ( - nWorkers = 8 - chunksPerWorker = 64 - ) - - chunks := chunktesting.GenerateTestRandomChunks(nWorkers * chunksPerWorker) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var ( - committed []swarm.Chunk - mu sync.Mutex - wg sync.WaitGroup - ) - - // Each goroutine gets its own cache putter so concurrent transactions - // are independent – this is the exact pattern that triggers the race - // between Sharky WriteAt and LevelDB batch.Commit. - for w := range nWorkers { - wg.Add(1) - go func(batch []swarm.Chunk) { - defer wg.Done() - putter := st.Cache() - for _, ch := range batch { - if err := putter.Put(ctx, ch); err == nil { - mu.Lock() - committed = append(committed, ch) - mu.Unlock() - } - } - }(chunks[w*chunksPerWorker : (w+1)*chunksPerWorker]) - } - - // Let writes start, then cancel to simulate an abrupt shutdown. - time.AfterFunc(5*time.Millisecond, cancel) - wg.Wait() - - // Best-effort close; may return context errors for in-flight ops. - _ = st.Close() - - if len(committed) == 0 { - t.Skip("no chunks committed before cancellation; increase nWorkers or chunksPerWorker") - } - - // Reopen at the same path. The presence of .DIRTY triggers the recovery - // path which (with the fix) also hash-validates every chunk, evicting any - // corrupted entries from the index. - st2, err := storer.New(context.Background(), basePath, opts) - if err != nil { - t.Fatalf("reopening store after simulated crash: %v", err) - } - t.Cleanup(func() { _ = st2.Close() }) - - // Every chunk whose Put returned nil must be readable and have a valid - // content hash. A corrupted chunk means Sharky held stale/zeroed bytes - // that were never flushed before the LevelDB commit – the exact bug. - var corrupted int - for _, ch := range committed { - got, err := st2.Storage().ChunkStore().Get(context.Background(), ch.Address()) - if err != nil { - t.Errorf("committed chunk not found after recovery: %s", ch.Address()) - corrupted++ - continue - } - if !cac.Valid(got) { - t.Errorf("committed chunk has corrupted data after recovery: %s", ch.Address()) - corrupted++ - } - } - if corrupted > 0 { - t.Fatalf("%d/%d committed chunks corrupted after crash recovery", corrupted, len(committed)) - } -} diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index 8b72864f385..ace1842202f 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -6,6 +6,7 @@ package transaction_test import ( "context" + "io" "io/fs" "os" "path/filepath" @@ -187,3 +188,108 @@ func Test_TransactionStorage(t *testing.T) { } }) } + +// mockCrashFile simulates a file that buffers writes in memory and only persists them when Sync() is called. +type mockCrashFile struct { + fs.File // default unimplemented + name string + data []byte // "persisted" to disk + dirty []byte // "volatile" in OS page cache +} + +func newMockCrashFile(name string, d []byte) *mockCrashFile { + return &mockCrashFile{name: name, data: d, dirty: append([]byte(nil), d...)} +} + +func (m *mockCrashFile) WriteAt(p []byte, off int64) (int, error) { + end := int(off) + len(p) + if end > len(m.dirty) { + newDirty := make([]byte, end) + copy(newDirty, m.dirty) + m.dirty = newDirty + } + copy(m.dirty[off:], p) + return len(p), nil +} + +func (m *mockCrashFile) ReadAt(p []byte, off int64) (int, error) { + if int(off) >= len(m.dirty) { + return 0, os.ErrClosed + } + n := copy(p, m.dirty[off:]) + return n, nil +} + +func (m *mockCrashFile) Sync() error { + m.data = append([]byte(nil), m.dirty...) + return nil +} + +func (m *mockCrashFile) Close() error { + return nil +} + +// Read is called by io.ReadAll in slots.load() during sharky.New(). +// Return EOF immediately to simulate an empty (newly created) slots file. +func (m *mockCrashFile) Read(p []byte) (n int, err error) { return 0, io.EOF } +func (m *mockCrashFile) Write(p []byte) (n int, err error) { panic("not impl") } +func (m *mockCrashFile) Seek(offset int64, whence int) (int64, error) { panic("not impl") } +func (m *mockCrashFile) Truncate(size int64) error { panic("not impl") } + +// mockCrashFS simulates a filesystem that retains mocked files. +type mockCrashFS struct { + files map[string]*mockCrashFile +} + +func (fsys *mockCrashFS) Open(name string) (fs.File, error) { + if fsys.files == nil { + fsys.files = make(map[string]*mockCrashFile) + } + if f, ok := fsys.files[name]; ok { + return f, nil + } + f := newMockCrashFile(name, nil) + fsys.files[name] = f + return f, nil +} + +func Test_TransactionCrashCorruption(t *testing.T) { + t.Parallel() + + // 1. Setup mock FS and LevelDB + fsys := &mockCrashFS{files: make(map[string]*mockCrashFile)} + sharkyStore, err := sharky.New(fsys, 1, swarm.SocMaxChunkSize) + assert.NoError(t, err) + + dbStore, err := leveldbstore.New("", nil) // in-memory leveldb + assert.NoError(t, err) + + st := transaction.NewStorage(sharkyStore, dbStore) + + // 2. Put a chunk using a transaction + tx, done := st.NewTransaction(context.Background()) + defer done() + + ch1 := test.GenerateTestRandomChunk() + assert.NoError(t, tx.ChunkStore().Put(context.Background(), ch1)) + + // 3. Commit the transaction (this writes metadata to LevelDB) + assert.NoError(t, tx.Commit()) + + // 4. Simulate a crash! + // The OS page cache drops unsynced dirty bytes. + for _, f := range fsys.files { + f.dirty = append([]byte(nil), f.data...) // Revert dirty to persisted + } + + // 5. A committed chunk must survive a simulated crash. + // On master (no SyncWait): Sharky never fsynced, dirty was dropped, Get fails → test FAILS. + // On the fix branch (SyncWait calls Sync): dirty was persisted to data before commit, + // so crash simulation leaves data intact and Get succeeds → test PASSES. + got, err := st.ChunkStore().Get(context.Background(), ch1.Address()) + if err != nil { + t.Fatalf("BUG: committed chunk unreadable after crash – Sharky was not synced before LevelDB commit: %v", err) + } + assert.Equal(t, ch1.Data(), got.Data(), "chunk data must survive a crash when Sharky is synced before LevelDB commit") +} + From 9cac5d8480cc33d470e77b58b576e64d4d5724af Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 24 Mar 2026 14:32:42 +0200 Subject: [PATCH 3/6] fix: linter --- pkg/sharky/store.go | 10 +++++----- pkg/storer/internal/transaction/transaction_test.go | 11 +++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go index 5d1f8e3b248..8937f12625d 100644 --- a/pkg/sharky/store.go +++ b/pkg/sharky/store.go @@ -33,11 +33,11 @@ type pendingSync struct { // - read prioritisation over writing // - free slots allow write type Store struct { - maxDataSize int // max length of blobs - writes chan write // shared write operations channel - shards []*shard // shards - wg *sync.WaitGroup // count started operations - quit chan struct{} // quit channel + maxDataSize int // max length of blobs + writes chan write // shared write operations channel + shards []*shard // shards + wg *sync.WaitGroup // count started operations + quit chan struct{} // quit channel syncCh chan pendingSync // group commit: serialises fsync requests across concurrent transactions metrics metrics } diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index ace1842202f..de274e2a240 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -231,10 +231,10 @@ func (m *mockCrashFile) Close() error { // Read is called by io.ReadAll in slots.load() during sharky.New(). // Return EOF immediately to simulate an empty (newly created) slots file. -func (m *mockCrashFile) Read(p []byte) (n int, err error) { return 0, io.EOF } -func (m *mockCrashFile) Write(p []byte) (n int, err error) { panic("not impl") } +func (m *mockCrashFile) Read(p []byte) (n int, err error) { return 0, io.EOF } +func (m *mockCrashFile) Write(p []byte) (n int, err error) { panic("not impl") } func (m *mockCrashFile) Seek(offset int64, whence int) (int64, error) { panic("not impl") } -func (m *mockCrashFile) Truncate(size int64) error { panic("not impl") } +func (m *mockCrashFile) Truncate(size int64) error { panic("not impl") } // mockCrashFS simulates a filesystem that retains mocked files. type mockCrashFS struct { @@ -272,11 +272,11 @@ func Test_TransactionCrashCorruption(t *testing.T) { ch1 := test.GenerateTestRandomChunk() assert.NoError(t, tx.ChunkStore().Put(context.Background(), ch1)) - + // 3. Commit the transaction (this writes metadata to LevelDB) assert.NoError(t, tx.Commit()) - // 4. Simulate a crash! + // 4. Simulate a crash! // The OS page cache drops unsynced dirty bytes. for _, f := range fsys.files { f.dirty = append([]byte(nil), f.data...) // Revert dirty to persisted @@ -292,4 +292,3 @@ func Test_TransactionCrashCorruption(t *testing.T) { } assert.Equal(t, ch1.Data(), got.Data(), "chunk data must survive a crash when Sharky is synced before LevelDB commit") } - From 92f9472f19b2197fd2fb44d7203d2108570eda94 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Wed, 25 Mar 2026 11:29:26 +0200 Subject: [PATCH 4/6] fix: revert fsync solution as it would be to slow --- pkg/sharky/store.go | 78 +------------ .../internal/transaction/transaction.go | 21 ---- .../internal/transaction/transaction_test.go | 104 ------------------ 3 files changed, 5 insertions(+), 198 deletions(-) diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go index 8937f12625d..12cc47c7e92 100644 --- a/pkg/sharky/store.go +++ b/pkg/sharky/store.go @@ -22,23 +22,17 @@ var ( ErrQuitting = errors.New("quitting") ) -// pendingSync represents a caller waiting for a shard fsync to complete. -type pendingSync struct { - done chan error -} - // Store models the sharded fix-length blobstore // Design provides lockless sharding: // - shard choice responding to backpressure by running operation // - read prioritisation over writing // - free slots allow write type Store struct { - maxDataSize int // max length of blobs - writes chan write // shared write operations channel - shards []*shard // shards - wg *sync.WaitGroup // count started operations - quit chan struct{} // quit channel - syncCh chan pendingSync // group commit: serialises fsync requests across concurrent transactions + maxDataSize int // max length of blobs + writes chan write // shared write operations channel + shards []*shard // shards + wg *sync.WaitGroup // count started operations + quit chan struct{} // quit channel metrics metrics } @@ -55,7 +49,6 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { shards: make([]*shard, shardCnt), wg: &sync.WaitGroup{}, quit: make(chan struct{}), - syncCh: make(chan pendingSync), metrics: newMetrics(), } for i := range store.shards { @@ -67,70 +60,9 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { } store.metrics.ShardCount.Set(float64(len(store.shards))) - store.wg.Go(store.runSyncLoop) - return store, nil } -// runSyncLoop is a background goroutine that handles group commit for shard files. -// It waits for a sync request, drains all concurrently pending requests, issues -// a single fdatasync per shard file, then signals all waiters. This amortises -// the fsync cost across all transactions that are committing at the same time. -func (s *Store) runSyncLoop() { - for { - select { - case req := <-s.syncCh: - // Drain all other requests that arrived while we were waiting. - pending := []pendingSync{req} - drain: - for { - select { - case r := <-s.syncCh: - pending = append(pending, r) - default: - break drain - } - } - - var syncErr error - for _, sh := range s.shards { - if err := sh.file.Sync(); err != nil { - syncErr = errors.Join(syncErr, err) - } - } - - for _, p := range pending { - p.done <- syncErr - } - - case <-s.quit: - return - } - } -} - -// SyncWait blocks until all shard files written in the current transaction have -// been fsynced to disk. Multiple concurrent callers share a single fsync (group -// commit), so the cost is amortised under concurrent upload load. -func (s *Store) SyncWait(ctx context.Context) error { - req := pendingSync{done: make(chan error, 1)} - select { - case s.syncCh <- req: - case <-s.quit: - return ErrQuitting - case <-ctx.Done(): - return ctx.Err() - } - select { - case err := <-req.done: - return err - case <-s.quit: - return ErrQuitting - case <-ctx.Done(): - return ctx.Err() - } -} - // Close closes each shard and return incidental errors from each shard func (s *Store) Close() error { close(s.quit) diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index a57777c63fd..ae97e06ce3b 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -69,7 +69,6 @@ func NewStorage(sharky *sharky.Store, bstore storage.BatchStore) Storage { } type transaction struct { - ctx context.Context start time.Time batch storage.Batch indexstore storage.IndexStore @@ -93,7 +92,6 @@ func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) { sharky := &sharkyTrx{s.sharky, s.metrics, nil, nil} t := &transaction{ - ctx: ctx, start: time.Now(), batch: b, indexstore: index, @@ -174,25 +172,6 @@ func (t *transaction) Commit() (err error) { t.sharkyTrx.writtenLocs = nil }() - // Ensure Sharky data is durable before committing the index to LevelDB. - // Without this, a power loss after batch.Commit() but before the OS flushes - // the page cache leaves LevelDB pointing to slots with stale or zero data. - // Under concurrent writes, all in-flight transactions share a single fdatasync - // (group commit), so the cost is amortised and approaches zero under load. - if len(t.sharkyTrx.writtenLocs) > 0 { - h := handleMetric("sharky_sync", t.metrics) - err = t.sharkyTrx.sharky.SyncWait(t.ctx) - h(&err) - if err != nil { - for _, l := range t.sharkyTrx.writtenLocs { - if rerr := t.sharkyTrx.sharky.Release(context.TODO(), l); rerr != nil { - err = errors.Join(err, fmt.Errorf("failed releasing location during sync rollback %s: %w", l, rerr)) - } - } - return err - } - } - h := handleMetric("batch_commit", t.metrics) err = t.batch.Commit() h(&err) diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index de274e2a240..bcbfc82a13e 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -6,7 +6,6 @@ package transaction_test import ( "context" - "io" "io/fs" "os" "path/filepath" @@ -189,106 +188,3 @@ func Test_TransactionStorage(t *testing.T) { }) } -// mockCrashFile simulates a file that buffers writes in memory and only persists them when Sync() is called. -type mockCrashFile struct { - fs.File // default unimplemented - name string - data []byte // "persisted" to disk - dirty []byte // "volatile" in OS page cache -} - -func newMockCrashFile(name string, d []byte) *mockCrashFile { - return &mockCrashFile{name: name, data: d, dirty: append([]byte(nil), d...)} -} - -func (m *mockCrashFile) WriteAt(p []byte, off int64) (int, error) { - end := int(off) + len(p) - if end > len(m.dirty) { - newDirty := make([]byte, end) - copy(newDirty, m.dirty) - m.dirty = newDirty - } - copy(m.dirty[off:], p) - return len(p), nil -} - -func (m *mockCrashFile) ReadAt(p []byte, off int64) (int, error) { - if int(off) >= len(m.dirty) { - return 0, os.ErrClosed - } - n := copy(p, m.dirty[off:]) - return n, nil -} - -func (m *mockCrashFile) Sync() error { - m.data = append([]byte(nil), m.dirty...) - return nil -} - -func (m *mockCrashFile) Close() error { - return nil -} - -// Read is called by io.ReadAll in slots.load() during sharky.New(). -// Return EOF immediately to simulate an empty (newly created) slots file. -func (m *mockCrashFile) Read(p []byte) (n int, err error) { return 0, io.EOF } -func (m *mockCrashFile) Write(p []byte) (n int, err error) { panic("not impl") } -func (m *mockCrashFile) Seek(offset int64, whence int) (int64, error) { panic("not impl") } -func (m *mockCrashFile) Truncate(size int64) error { panic("not impl") } - -// mockCrashFS simulates a filesystem that retains mocked files. -type mockCrashFS struct { - files map[string]*mockCrashFile -} - -func (fsys *mockCrashFS) Open(name string) (fs.File, error) { - if fsys.files == nil { - fsys.files = make(map[string]*mockCrashFile) - } - if f, ok := fsys.files[name]; ok { - return f, nil - } - f := newMockCrashFile(name, nil) - fsys.files[name] = f - return f, nil -} - -func Test_TransactionCrashCorruption(t *testing.T) { - t.Parallel() - - // 1. Setup mock FS and LevelDB - fsys := &mockCrashFS{files: make(map[string]*mockCrashFile)} - sharkyStore, err := sharky.New(fsys, 1, swarm.SocMaxChunkSize) - assert.NoError(t, err) - - dbStore, err := leveldbstore.New("", nil) // in-memory leveldb - assert.NoError(t, err) - - st := transaction.NewStorage(sharkyStore, dbStore) - - // 2. Put a chunk using a transaction - tx, done := st.NewTransaction(context.Background()) - defer done() - - ch1 := test.GenerateTestRandomChunk() - assert.NoError(t, tx.ChunkStore().Put(context.Background(), ch1)) - - // 3. Commit the transaction (this writes metadata to LevelDB) - assert.NoError(t, tx.Commit()) - - // 4. Simulate a crash! - // The OS page cache drops unsynced dirty bytes. - for _, f := range fsys.files { - f.dirty = append([]byte(nil), f.data...) // Revert dirty to persisted - } - - // 5. A committed chunk must survive a simulated crash. - // On master (no SyncWait): Sharky never fsynced, dirty was dropped, Get fails → test FAILS. - // On the fix branch (SyncWait calls Sync): dirty was persisted to data before commit, - // so crash simulation leaves data intact and Get succeeds → test PASSES. - got, err := st.ChunkStore().Get(context.Background(), ch1.Address()) - if err != nil { - t.Fatalf("BUG: committed chunk unreadable after crash – Sharky was not synced before LevelDB commit: %v", err) - } - assert.Equal(t, ch1.Data(), got.Data(), "chunk data must survive a crash when Sharky is synced before LevelDB commit") -} From 23aac72aa525c7d4eb2d5d4f7b91dcaae99c8570 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Thu, 26 Mar 2026 13:33:02 +0200 Subject: [PATCH 5/6] test: verify recovery prunes corrupted sharky chunks on unclean shutdown --- pkg/sharky/store.go | 2 +- .../internal/transaction/transaction_test.go | 1 - pkg/storer/recover_test.go | 122 ++++++++++++++++++ 3 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 pkg/storer/recover_test.go diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go index 12cc47c7e92..d8947f9d042 100644 --- a/pkg/sharky/store.go +++ b/pkg/sharky/store.go @@ -30,7 +30,7 @@ var ( type Store struct { maxDataSize int // max length of blobs writes chan write // shared write operations channel - shards []*shard // shards + shards []*shard // shards wg *sync.WaitGroup // count started operations quit chan struct{} // quit channel metrics metrics diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index bcbfc82a13e..8b72864f385 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -187,4 +187,3 @@ func Test_TransactionStorage(t *testing.T) { } }) } - diff --git a/pkg/storer/recover_test.go b/pkg/storer/recover_test.go new file mode 100644 index 00000000000..278c625be18 --- /dev/null +++ b/pkg/storer/recover_test.go @@ -0,0 +1,122 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" + pullerMock "github.com/ethersphere/bee/v2/pkg/puller/mock" + "github.com/ethersphere/bee/v2/pkg/sharky" + "github.com/ethersphere/bee/v2/pkg/storage" + chunk "github.com/ethersphere/bee/v2/pkg/storage/testing" + "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// TestRecoveryPrunesCorruptedChunks verifies that on restart after an unclean +// shutdown, validateAndAddLocations removes index entries whose Sharky data no +// longer validates (hash mismatch), while leaving intact entries unaffected. +func TestRecoveryPrunesCorruptedChunks(t *testing.T) { + t.Parallel() + + ctx := context.Background() + basePath := t.TempDir() + opts := dbTestOps(swarm.RandAddress(t), 1000, nil, nil, time.Minute) + + batch := postagetesting.MustNewBatch() + + // 1. Open the storer and write two chunks. + st, err := storer.New(ctx, basePath, opts) + if err != nil { + t.Fatalf("New: %v", err) + } + readyC := make(chan struct{}) + st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC) + <-readyC + + goodChunk := chunk.GenerateTestRandomChunk().WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + badChunk := chunk.GenerateTestRandomChunk().WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + + putter := st.ReservePutter() + if err := putter.Put(ctx, goodChunk); err != nil { + t.Fatalf("Put good chunk: %v", err) + } + if err := putter.Put(ctx, badChunk); err != nil { + t.Fatalf("Put bad chunk: %v", err) + } + + // 2. Locate the bad chunk's Sharky slot before closing. + var badLoc sharky.Location + if err := st.Storage().IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(chunkstore.RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + item := r.Entry.(*chunkstore.RetrievalIndexItem) + if item.Address.Equal(badChunk.Address()) { + badLoc = item.Location + } + return false, nil + }); err != nil { + t.Fatalf("Iterate index: %v", err) + } + if badLoc.Length == 0 { + t.Fatal("bad chunk not found in index") + } + + // 3. Close cleanly (removes .DIRTY). + if err := st.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + // 4. Simulate unclean shutdown: recreate .DIRTY so recovery runs on next open. + dirtyPath := filepath.Join(basePath, "sharky", ".DIRTY") + if err := os.WriteFile(dirtyPath, []byte{}, 0644); err != nil { + t.Fatalf("WriteFile .DIRTY: %v", err) + } + + // 5. Overwrite the bad chunk's slot with zeros so its hash will not validate. + shardPath := filepath.Join(basePath, "sharky", fmt.Sprintf("shard_%03d", badLoc.Shard)) + f, err := os.OpenFile(shardPath, os.O_RDWR, 0666) + if err != nil { + t.Fatalf("OpenFile shard: %v", err) + } + if _, err := f.WriteAt(make([]byte, badLoc.Length), int64(badLoc.Slot)*int64(swarm.SocMaxChunkSize)); err != nil { + _ = f.Close() + t.Fatalf("WriteAt zeros: %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("Close shard: %v", err) + } + + // 6. Reopen the storer. The .DIRTY file triggers sharkyRecovery → + // validateAndAddLocations, which prunes the corrupted index entry. + st2, err := storer.New(ctx, basePath, opts) + if err != nil { + t.Fatalf("New (reopen): %v", err) + } + t.Cleanup(func() { _ = st2.Close() }) + + // 7. The valid chunk must still be retrievable after recovery. + got, err := st2.Storage().ChunkStore().Get(ctx, goodChunk.Address()) + if err != nil { + t.Fatalf("Get good chunk after recovery: %v", err) + } + if !got.Address().Equal(goodChunk.Address()) { + t.Fatalf("good chunk address mismatch after recovery") + } + + // 8. The corrupted chunk must have been pruned from the index. + _, err = st2.Storage().ChunkStore().Get(ctx, badChunk.Address()) + if !errors.Is(err, storage.ErrNotFound) { + t.Fatalf("expected ErrNotFound for corrupted chunk, got: %v", err) + } +} From e18aa45ea316656b61c6075e862251e8d9a47fb3 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 31 Mar 2026 14:29:46 +0300 Subject: [PATCH 6/6] fix: return error on failing to prune corrupted index entry --- pkg/storer/recover.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storer/recover.go b/pkg/storer/recover.go index b2cdac49c89..78a8c0aa6a9 100644 --- a/pkg/storer/recover.go +++ b/pkg/storer/recover.go @@ -7,6 +7,7 @@ package storer import ( "context" "errors" + "fmt" "io/fs" "os" "path/filepath" @@ -66,6 +67,8 @@ func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.St // recovery so their slots are preserved. Corrupted entries (unreadable data or // hash mismatch) are logged, excluded from the recovery bitmap, and deleted from // the index store so the node starts clean without serving invalid data. +// If a corrupted index entry cannot be deleted, an error is returned and the +// node startup is aborted to prevent serving or operating on corrupt state. func validateAndAddLocations(ctx context.Context, store storage.Store, sharkyRecover *sharky.Recovery, logger log.Logger) error { var corrupted []*chunkstore.RetrievalIndexItem @@ -103,11 +106,12 @@ func validateAndAddLocations(ctx context.Context, store storage.Store, sharkyRec for _, item := range corrupted { if err := store.Delete(item); err != nil { logger.Error(err, "recovery: failed deleting corrupted chunk index entry", "address", item.Address) + return fmt.Errorf("recovery: failed deleting corrupted chunk index entry %s: %w", item.Address, err) } } if len(corrupted) > 0 { - logger.Info("recovery: removed corrupted chunk index entries", "count", len(corrupted)) + logger.Warning("recovery: removed corrupted chunk index entries", "count", len(corrupted)) } return nil