diff --git a/pkg/sharky/slots.go b/pkg/sharky/slots.go index 71d058f2b88..e8fd72e7571 100644 --- a/pkg/sharky/slots.go +++ b/pkg/sharky/slots.go @@ -40,11 +40,13 @@ func (sl *slots) load() (err error) { return err } -// save persists the free slot bitvector on disk (without closing) +// save persists the free slot bitvector on disk (without closing). +// slots only ever grow (extend is the only mutation), so sl.data is always >= +// the previous file size. Seeking to 0 and overwriting is therefore always +// safe: no stale tail bytes can survive. Truncate(0) is intentionally absent +// because truncating before the write creates a crash window where the file is +// empty; removing it eliminates that vulnerability. func (sl *slots) save() error { - if err := sl.file.Truncate(0); err != nil { - return err - } if _, err := sl.file.Seek(0, 0); err != nil { return err } diff --git a/pkg/storer/recover.go b/pkg/storer/recover.go index 07d300e5d46..78a8c0aa6a9 100644 --- a/pkg/storer/recover.go +++ b/pkg/storer/recover.go @@ -7,12 +7,16 @@ package storer import ( "context" "errors" + "fmt" "io/fs" "os" "path/filepath" "time" + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/sharky" + "github.com/ethersphere/bee/v2/pkg/soc" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -51,29 +55,64 @@ func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.St } }() - c := chunkstore.IterateLocations(ctx, store) - - if err := addLocations(c, sharkyRecover); err != nil { + if err := validateAndAddLocations(ctx, store, sharkyRecover, logger); err != nil { return closer, err } return closer, nil } -func addLocations(locationResultC <-chan chunkstore.LocationResult, sharkyRecover *sharky.Recovery) error { - for res := range locationResultC { - if res.Err != nil { - return res.Err +// validateAndAddLocations iterates every chunk index entry, reads its data from +// Sharky, and validates the content hash. Valid chunks are registered with the +// recovery so their slots are preserved. Corrupted entries (unreadable data or +// hash mismatch) are logged, excluded from the recovery bitmap, and deleted from +// the index store so the node starts clean without serving invalid data. +// If a corrupted index entry cannot be deleted, an error is returned and the +// node startup is aborted to prevent serving or operating on corrupt state. +func validateAndAddLocations(ctx context.Context, store storage.Store, sharkyRecover *sharky.Recovery, logger log.Logger) error { + var corrupted []*chunkstore.RetrievalIndexItem + + err := chunkstore.IterateItems(store, func(item *chunkstore.RetrievalIndexItem) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + buf := make([]byte, item.Location.Length) + if err := sharkyRecover.Read(ctx, item.Location, buf); err != nil { + logger.Warning("recovery: unreadable chunk, marking corrupted", "address", item.Address, "err", err) + corrupted = append(corrupted, item) + return nil } - if err := sharkyRecover.Add(res.Location); err != nil { - return err + ch := swarm.NewChunk(item.Address, buf) + if !cac.Valid(ch) && !soc.Valid(ch) { + logger.Warning("recovery: invalid chunk hash, marking corrupted", "address", item.Address) + corrupted = append(corrupted, item) + return nil } + + return sharkyRecover.Add(item.Location) + }) + if err != nil { + return err } if err := sharkyRecover.Save(); err != nil { return err } + for _, item := range corrupted { + if err := store.Delete(item); err != nil { + logger.Error(err, "recovery: failed deleting corrupted chunk index entry", "address", item.Address) + return fmt.Errorf("recovery: failed deleting corrupted chunk index entry %s: %w", item.Address, err) + } + } + + if len(corrupted) > 0 { + logger.Warning("recovery: removed corrupted chunk index entries", "count", len(corrupted)) + } + return nil } diff --git a/pkg/storer/recover_test.go b/pkg/storer/recover_test.go new file mode 100644 index 00000000000..278c625be18 --- /dev/null +++ b/pkg/storer/recover_test.go @@ -0,0 +1,122 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" + pullerMock "github.com/ethersphere/bee/v2/pkg/puller/mock" + "github.com/ethersphere/bee/v2/pkg/sharky" + "github.com/ethersphere/bee/v2/pkg/storage" + chunk "github.com/ethersphere/bee/v2/pkg/storage/testing" + "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// TestRecoveryPrunesCorruptedChunks verifies that on restart after an unclean +// shutdown, validateAndAddLocations removes index entries whose Sharky data no +// longer validates (hash mismatch), while leaving intact entries unaffected. +func TestRecoveryPrunesCorruptedChunks(t *testing.T) { + t.Parallel() + + ctx := context.Background() + basePath := t.TempDir() + opts := dbTestOps(swarm.RandAddress(t), 1000, nil, nil, time.Minute) + + batch := postagetesting.MustNewBatch() + + // 1. Open the storer and write two chunks. + st, err := storer.New(ctx, basePath, opts) + if err != nil { + t.Fatalf("New: %v", err) + } + readyC := make(chan struct{}) + st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC) + <-readyC + + goodChunk := chunk.GenerateTestRandomChunk().WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + badChunk := chunk.GenerateTestRandomChunk().WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + + putter := st.ReservePutter() + if err := putter.Put(ctx, goodChunk); err != nil { + t.Fatalf("Put good chunk: %v", err) + } + if err := putter.Put(ctx, badChunk); err != nil { + t.Fatalf("Put bad chunk: %v", err) + } + + // 2. Locate the bad chunk's Sharky slot before closing. + var badLoc sharky.Location + if err := st.Storage().IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(chunkstore.RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + item := r.Entry.(*chunkstore.RetrievalIndexItem) + if item.Address.Equal(badChunk.Address()) { + badLoc = item.Location + } + return false, nil + }); err != nil { + t.Fatalf("Iterate index: %v", err) + } + if badLoc.Length == 0 { + t.Fatal("bad chunk not found in index") + } + + // 3. Close cleanly (removes .DIRTY). + if err := st.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + // 4. Simulate unclean shutdown: recreate .DIRTY so recovery runs on next open. + dirtyPath := filepath.Join(basePath, "sharky", ".DIRTY") + if err := os.WriteFile(dirtyPath, []byte{}, 0644); err != nil { + t.Fatalf("WriteFile .DIRTY: %v", err) + } + + // 5. Overwrite the bad chunk's slot with zeros so its hash will not validate. + shardPath := filepath.Join(basePath, "sharky", fmt.Sprintf("shard_%03d", badLoc.Shard)) + f, err := os.OpenFile(shardPath, os.O_RDWR, 0666) + if err != nil { + t.Fatalf("OpenFile shard: %v", err) + } + if _, err := f.WriteAt(make([]byte, badLoc.Length), int64(badLoc.Slot)*int64(swarm.SocMaxChunkSize)); err != nil { + _ = f.Close() + t.Fatalf("WriteAt zeros: %v", err) + } + if err := f.Close(); err != nil { + t.Fatalf("Close shard: %v", err) + } + + // 6. Reopen the storer. The .DIRTY file triggers sharkyRecovery → + // validateAndAddLocations, which prunes the corrupted index entry. + st2, err := storer.New(ctx, basePath, opts) + if err != nil { + t.Fatalf("New (reopen): %v", err) + } + t.Cleanup(func() { _ = st2.Close() }) + + // 7. The valid chunk must still be retrievable after recovery. + got, err := st2.Storage().ChunkStore().Get(ctx, goodChunk.Address()) + if err != nil { + t.Fatalf("Get good chunk after recovery: %v", err) + } + if !got.Address().Equal(goodChunk.Address()) { + t.Fatalf("good chunk address mismatch after recovery") + } + + // 8. The corrupted chunk must have been pruned from the index. + _, err = st2.Storage().ChunkStore().Get(ctx, badChunk.Address()) + if !errors.Is(err, storage.ErrNotFound) { + t.Fatalf("expected ErrNotFound for corrupted chunk, got: %v", err) + } +}