diff --git a/sei-cosmos/storev2/rootmulti/flatkv_snapshot_test.go b/sei-cosmos/storev2/rootmulti/flatkv_snapshot_test.go index 8b912f64f6..b0a134cd3d 100644 --- a/sei-cosmos/storev2/rootmulti/flatkv_snapshot_test.go +++ b/sei-cosmos/storev2/rootmulti/flatkv_snapshot_test.go @@ -9,6 +9,7 @@ import ( "testing" protoio "github.com/gogo/protobuf/io" + snapshottypes "github.com/sei-protocol/sei-chain/sei-cosmos/snapshots/types" "github.com/sei-protocol/sei-chain/sei-cosmos/store/types" "github.com/sei-protocol/sei-chain/sei-db/common/keys" seidbconfig "github.com/sei-protocol/sei-chain/sei-db/config" @@ -349,6 +350,59 @@ func TestFlatKVOnlySnapshotRestorePopulatesSS(t *testing.T) { queryEqual("/evm/key", evmData.nonKey, makeNonce(uint64(snapHeight))) } +// TestFlatKVMalformedSnapshotReturnsErrorNotPanic verifies that a malformed +// flatkv snapshot node — such as a malicious peer could send during state sync +// — makes Restore return an error rather than panicking the node. Before the +// fix, the SS import goroutine called panic() on any error from ssStore.Import, +// which a peer could trigger via convertFlatKVNodes (e.g. a key with no module +// prefix, or an undecodable value) and use to crash a syncing node. +// +// The crafted leaf uses a Version that does NOT match the restore height, so +// the SC flatkv importer drops it (see KVImporter.AddNode) and the test +// isolates the SS conversion path being fixed. The SS importer receives the +// node regardless of version, and convertFlatKVNodes fails on StripModulePrefix. +// +// Note: pre-fix this could not be written as require.Panics — the panic fired +// in a goroutine and would crash the test binary outright. Reaching the +// require.Error assertion at all is the regression signal. +func TestFlatKVMalformedSnapshotReturnsErrorNotPanic(t *testing.T) { + cfg := flatKVOnlyConfig() + ssCfg := seidbconfig.DefaultStateStoreConfig() + ssCfg.Enable = true + ssCfg.AsyncWriteBuffer = 0 + + dstStore, _ := newTestRootMultiWithSS(t, t.TempDir(), cfg, ssCfg) + defer func() { require.NoError(t, dstStore.Close()) }() + + const restoreHeight = 1 + + // Hand-craft a snapshot stream: a "flatkv" store header followed by a leaf + // whose physical key has no module-prefix separator, so convertFlatKVNodes + // -> ktype.StripModulePrefix returns an error. + var buf bytes.Buffer + writer := protoio.NewDelimitedWriter(&buf) + require.NoError(t, writer.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_Store{ + Store: &snapshottypes.SnapshotStoreItem{Name: keys.FlatKVStoreKey}, + }, + })) + require.NoError(t, writer.WriteMsg(&snapshottypes.SnapshotItem{ + Item: &snapshottypes.SnapshotItem_IAVL{ + IAVL: &snapshottypes.SnapshotIAVLItem{ + Key: []byte("malformed-no-prefix"), + Value: []byte("garbage"), + Version: restoreHeight + 1, // mismatch: SC drops it, SS still converts + Height: 0, + }, + }, + })) + require.NoError(t, writer.Close()) + + reader := protoio.NewDelimitedReader(bytes.NewReader(buf.Bytes()), 1<<30) + _, err := dstStore.Restore(restoreHeight, 1, reader) + require.Error(t, err, "malformed flatkv snapshot must return an error, not panic") +} + func simulateFlatKVOnlyBlock( t *testing.T, store *Store, diff --git a/sei-cosmos/storev2/rootmulti/store.go b/sei-cosmos/storev2/rootmulti/store.go index 25861a13a0..f6e578e005 100644 --- a/sei-cosmos/storev2/rootmulti/store.go +++ b/sei-cosmos/storev2/rootmulti/store.go @@ -16,6 +16,7 @@ import ( "github.com/sei-protocol/seilog" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" + "golang.org/x/sync/errgroup" "golang.org/x/time/rate" protoio "github.com/gogo/protobuf/io" @@ -929,14 +930,17 @@ func (rs *Store) restore(height int64, protoReader protoio.Reader) (snapshottype if err != nil { return snapshottypes.SnapshotItem{}, err } + // ssStore.Import is a streaming consumer of ssImporter: it runs for the whole + // loop below and only returns once we close the channel, so its result is + // collected after the loop via ssGroup.Wait() rather than panicked on. + // On error the group cancels ssCtx, which the producer send selects on so it never + // blocks against a consumer that has stopped draining. + ssGroup, ssCtx := errgroup.WithContext(context.Background()) if rs.ssStore != nil { ssImporter = make(chan seidbtypes.SnapshotNode, 10000) - go func() { - err := rs.ssStore.Import(height, ssImporter) - if err != nil { - panic(err) - } - }() + ssGroup.Go(func() error { + return rs.ssStore.Import(height, ssImporter) + }) } loop: for { @@ -981,10 +985,17 @@ loop: // Check if we should also import to SS store if rs.ssStore != nil && node.Height == 0 && ssImporter != nil { - ssImporter <- seidbtypes.SnapshotNode{ + // Guard the send: if ssStore.Import errored (e.g. on a malformed + // snapshot) it may have stopped draining ssImporter, so select on + // ssCtx.Done() to avoid blocking forever once the buffer fills. + select { + case ssImporter <- seidbtypes.SnapshotNode{ StoreKey: storeKey, Key: node.Key, Value: node.Value, + }: + case <-ssCtx.Done(): + // SS import has returned (likely an error); stop forwarding. } } default: @@ -1001,6 +1012,9 @@ loop: if ssImporter != nil { close(ssImporter) } + if err := ssGroup.Wait(); err != nil && restoreErr == nil { + restoreErr = fmt.Errorf("ss import failed during restore: %w", err) + } // Initialize SS version metadata. Without SetLatestVersion, GetLatestVersion() // stays 0 until the first post-sync block commits, which is misleading to any // caller that reads it in that window.