-
Notifications
You must be signed in to change notification settings - Fork 879
implement CommitKVStore using a migration router #3382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| package migration | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| ics23 "github.com/confio/ics23/go" | ||
| "github.com/sei-protocol/sei-chain/sei-db/proto" | ||
| "github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types" | ||
| db "github.com/tendermint/tm-db" | ||
| ) | ||
|
|
||
| // rootHashSize matches the digest length used by the other CommitKVStore | ||
| // implementations in this codebase: memiavl returns sha256 (32 B) and flatkv | ||
| // returns Blake3-256 (32 B). | ||
| const rootHashSize = 32 | ||
|
|
||
| var _ types.CommitKVStore = (*RouterCommitKVStore)(nil) | ||
|
|
||
| // RouterCommitKVStore adapts a [Router] (which is keyed by store name on every | ||
| // call) to the store-name-less [types.CommitKVStore] interface by binding the | ||
| // view to a single module store name. | ||
| // | ||
| // The CommitKVStore interface does not return errors. Any error returned by the | ||
| // underlying router is therefore surfaced as a panic. This is a short-term | ||
| // limitation; the long-term plan is to plumb errors through the interface. | ||
| type RouterCommitKVStore struct { | ||
| router Router | ||
| storeName string | ||
| versionProvider func() int64 | ||
| } | ||
|
|
||
| func NewRouterCommitKVStore( | ||
| router Router, | ||
| storeName string, | ||
| versionProvider func() int64, | ||
| ) *RouterCommitKVStore { | ||
| return &RouterCommitKVStore{ | ||
| router: router, | ||
| storeName: storeName, | ||
| versionProvider: versionProvider, | ||
| } | ||
| } | ||
|
|
||
| // Close is illegal during the standard CommitKVStore lifecycle for this type: | ||
| // the wrapped Router is owned by the caller and must outlive this view. | ||
| func (r *RouterCommitKVStore) Close() error { | ||
| return fmt.Errorf("RouterCommitKVStore.Close: illegal during standard lifecycle") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we panic instead of error here to match testing?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test was incorrect, and actually failing. Fixed the test. IMO, better to return error than to panic, especially when the interface already has an error return value. |
||
| } | ||
|
|
||
| func (r *RouterCommitKVStore) Get(key []byte) []byte { | ||
| value, _, err := r.router.Read(r.storeName, key) | ||
| if err != nil { | ||
| panic(fmt.Errorf("RouterCommitKVStore.Get(store=%q): %w", r.storeName, err)) | ||
| } | ||
| return value | ||
| } | ||
|
|
||
| func (r *RouterCommitKVStore) Has(key []byte) bool { | ||
| _, found, err := r.router.Read(r.storeName, key) | ||
| if err != nil { | ||
| panic(fmt.Errorf("RouterCommitKVStore.Has(store=%q): %w", r.storeName, err)) | ||
| } | ||
| return found | ||
| } | ||
|
|
||
| func (r *RouterCommitKVStore) Set(key []byte, value []byte) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we ensure this actually works instead of baseapp wraps runTx in recover() and turns the panic into a per-tx error, then keeps processing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should discuss this with execution folk. They should NEVER treat a DB error/panic as a transaction failure, as continuing after an error probably means data corruption and a later app hash break. |
||
| r.applyOne(&proto.KVPair{Key: key, Value: value}) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Set/Remove issues one ApplyChangeSets per call, In the prev PR, MigrationManager.ApplyChangeSets advances exactly one migration batch per call. A single cosmos-sdk transaction can issue dozens of Set calls; that becomes dozens of migration batches per commit
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can tell, there are no callers to The goal of I added the following documentation to telegraph our intentions: |
||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sei KVStore impls (storev2/commitment/Store.Set, cachekv.Set) only append to an in-memory changeset; the real write happens at Commit. how about RouterCommitKVStore buffers writes and flushes once at commit time (needs a Commit/Flush hook, mirroring storev2/commitment/Store).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this topic is probably worth a discussion. Currently FlatKV also only writes to an in-memory buffer. This
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add me to this discussion? |
||
|
|
||
| func (r *RouterCommitKVStore) Remove(key []byte) { | ||
| r.applyOne(&proto.KVPair{Key: key, Delete: true}) | ||
| } | ||
|
|
||
| // applyOne dispatches a single KV change as a one-pair NamedChangeSet through | ||
| // the router, panicking on any router error. | ||
| func (r *RouterCommitKVStore) applyOne(pair *proto.KVPair) { | ||
| cs := []*proto.NamedChangeSet{{ | ||
| Name: r.storeName, | ||
| Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{pair}}, | ||
| }} | ||
| if err := r.router.ApplyChangeSets(context.Background(), cs); err != nil { | ||
| panic(fmt.Errorf("RouterCommitKVStore.ApplyChangeSets(store=%q): %w", r.storeName, err)) | ||
| } | ||
| } | ||
|
|
||
| func (r *RouterCommitKVStore) Iterator(start []byte, end []byte, ascending bool) db.Iterator { | ||
| it, err := r.router.Iterator(r.storeName, start, end, ascending) | ||
| if err != nil { | ||
| panic(fmt.Errorf("RouterCommitKVStore.Iterator(store=%q): %w", r.storeName, err)) | ||
| } | ||
| return it | ||
| } | ||
|
|
||
| func (r *RouterCommitKVStore) GetProof(key []byte) *ics23.CommitmentProof { | ||
| proof, err := r.router.GetProof(r.storeName, key) | ||
| if err != nil { | ||
| panic(fmt.Errorf("RouterCommitKVStore.GetProof(store=%q): %w", r.storeName, err)) | ||
| } | ||
| return proof | ||
| } | ||
|
|
||
| // RootHash is a placeholder that returns a fresh zeroed 32-byte slice on every | ||
| // call. The CommitKVStore contract permits callers to mutate the returned | ||
| // slice, so a fresh allocation is required to keep the placeholder safe. | ||
| // | ||
| // TODO: revisit before shipping to production once the production usage of | ||
| // RootHash() across this code path is understood. | ||
| func (r *RouterCommitKVStore) RootHash() []byte { | ||
| return make([]byte, rootHashSize) | ||
| } | ||
|
|
||
| func (r *RouterCommitKVStore) Version() int64 { | ||
| return r.versionProvider() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,215 @@ | ||
| package migration | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "testing" | ||
|
|
||
| ics23 "github.com/confio/ics23/go" | ||
| "github.com/sei-protocol/sei-chain/sei-db/proto" | ||
| "github.com/stretchr/testify/require" | ||
| dbm "github.com/tendermint/tm-db" | ||
| ) | ||
|
|
||
| const testRouterStoreName = "bank" | ||
|
|
||
| // newRouterCommitKVStoreForTest returns a RouterCommitKVStore wrapping a fresh | ||
| // TestInMemoryRouter with a constant version. Tests that need a different | ||
| // inner router or a non-constant versionProvider construct the store | ||
| // directly. | ||
| func newRouterCommitKVStoreForTest(t *testing.T, version int64) (*RouterCommitKVStore, *TestInMemoryRouter) { | ||
| t.Helper() | ||
| inner := NewTestInMemoryRouter() | ||
| store := NewRouterCommitKVStore(inner, testRouterStoreName, func() int64 { return version }) | ||
| return store, inner | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_GetReturnsValueWrittenViaRouter(t *testing.T) { | ||
| store, inner := newRouterCommitKVStoreForTest(t, 0) | ||
| require.NoError(t, inner.ApplyChangeSets(context.Background(), []*proto.NamedChangeSet{{ | ||
| Name: testRouterStoreName, | ||
| Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ | ||
| {Key: []byte("k"), Value: []byte("v")}, | ||
| }}, | ||
| }})) | ||
|
|
||
| require.Equal(t, []byte("v"), store.Get([]byte("k"))) | ||
| require.True(t, store.Has([]byte("k"))) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_GetMissingKeyReturnsNil(t *testing.T) { | ||
| store, _ := newRouterCommitKVStoreForTest(t, 0) | ||
| require.Nil(t, store.Get([]byte("missing"))) | ||
| require.False(t, store.Has([]byte("missing"))) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_SetWritesViaRouter(t *testing.T) { | ||
| store, inner := newRouterCommitKVStoreForTest(t, 0) | ||
| store.Set([]byte("k"), []byte("v")) | ||
|
|
||
| val, found, err := inner.Read(testRouterStoreName, []byte("k")) | ||
| require.NoError(t, err) | ||
| require.True(t, found) | ||
| require.Equal(t, []byte("v"), val) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_RemoveDeletesViaRouter(t *testing.T) { | ||
| store, inner := newRouterCommitKVStoreForTest(t, 0) | ||
| store.Set([]byte("k"), []byte("v")) | ||
| store.Remove([]byte("k")) | ||
|
|
||
| val, found, err := inner.Read(testRouterStoreName, []byte("k")) | ||
| require.NoError(t, err) | ||
| require.False(t, found) | ||
| require.Nil(t, val) | ||
| } | ||
|
|
||
| // TestRouterCommitKVStore_BindsToSingleStoreName confirms that two wrappers | ||
| // pointed at the same router but bound to different module names see only | ||
| // their own data, and writes land under the configured store name in the | ||
| // underlying router. | ||
| func TestRouterCommitKVStore_BindsToSingleStoreName(t *testing.T) { | ||
| inner := NewTestInMemoryRouter() | ||
| bankStore := NewRouterCommitKVStore(inner, "bank", func() int64 { return 0 }) | ||
| evmStore := NewRouterCommitKVStore(inner, "evm", func() int64 { return 0 }) | ||
|
|
||
| bankStore.Set([]byte("k"), []byte("from-bank")) | ||
| evmStore.Set([]byte("k"), []byte("from-evm")) | ||
|
|
||
| require.Equal(t, []byte("from-bank"), bankStore.Get([]byte("k"))) | ||
| require.Equal(t, []byte("from-evm"), evmStore.Get([]byte("k"))) | ||
|
|
||
| val, found, err := inner.Read("bank", []byte("k")) | ||
| require.NoError(t, err) | ||
| require.True(t, found) | ||
| require.Equal(t, []byte("from-bank"), val) | ||
|
|
||
| val, found, err = inner.Read("evm", []byte("k")) | ||
| require.NoError(t, err) | ||
| require.True(t, found) | ||
| require.Equal(t, []byte("from-evm"), val) | ||
| } | ||
|
|
||
| // TestRouterCommitKVStore_VersionInvokesProviderEachCall confirms that the | ||
| // versionProvider lambda is consulted on every Version() call rather than | ||
| // captured once at construction time, so callers can swap the value at | ||
| // runtime. | ||
| func TestRouterCommitKVStore_VersionInvokesProviderEachCall(t *testing.T) { | ||
| current := int64(7) | ||
| store := NewRouterCommitKVStore(NewTestInMemoryRouter(), testRouterStoreName, func() int64 { return current }) | ||
|
|
||
| require.Equal(t, int64(7), store.Version()) | ||
| current = 42 | ||
| require.Equal(t, int64(42), store.Version()) | ||
| } | ||
|
|
||
| // TestRouterCommitKVStore_RootHashIs32ZeroBytes locks in the placeholder | ||
| // contract: 32 bytes, all zero, freshly allocated on every call so that a | ||
| // caller mutating the returned slice cannot corrupt subsequent reads. | ||
| func TestRouterCommitKVStore_RootHashIs32ZeroBytes(t *testing.T) { | ||
| store, _ := newRouterCommitKVStoreForTest(t, 0) | ||
| hash := store.RootHash() | ||
| require.Len(t, hash, 32) | ||
| require.Equal(t, make([]byte, 32), hash) | ||
|
|
||
| hash[0] = 0xFF | ||
| hash2 := store.RootHash() | ||
| require.Len(t, hash2, 32) | ||
| require.Equal(t, make([]byte, 32), hash2) | ||
| } | ||
|
|
||
| // TestRouterCommitKVStore_CloseReturnsError locks in that Close is illegal | ||
| // during the standard lifecycle: the wrapped Router is owned by the caller | ||
| // and must outlive this view, so Close surfaces a non-nil error rather than | ||
| // performing any teardown. | ||
| func TestRouterCommitKVStore_CloseReturnsError(t *testing.T) { | ||
| store, _ := newRouterCommitKVStoreForTest(t, 0) | ||
| err := store.Close() | ||
| require.EqualError(t, err, "RouterCommitKVStore.Close: illegal during standard lifecycle") | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_IteratorPanicsOnRouterError(t *testing.T) { | ||
| // TestInMemoryRouter.Iterator always returns an error; the wrapper must | ||
| // surface that as a panic. | ||
| store, _ := newRouterCommitKVStoreForTest(t, 0) | ||
| require.Panics(t, func() { _ = store.Iterator(nil, nil, true) }) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_GetProofPanicsOnRouterError(t *testing.T) { | ||
| // TestInMemoryRouter.GetProof always returns an error; the wrapper must | ||
| // surface that as a panic. | ||
| store, _ := newRouterCommitKVStoreForTest(t, 0) | ||
| require.Panics(t, func() { _ = store.GetProof([]byte("k")) }) | ||
| } | ||
|
|
||
| // failingRouter is a Router whose Read and ApplyChangeSets return injected | ||
| // sentinel errors. It exists so we can exercise the panic-on-error path for | ||
| // the methods that TestInMemoryRouter implements without errors. Iterator and | ||
| // GetProof always return a not-implemented error and are not used by these | ||
| // tests; TestInMemoryRouter already covers their panic paths. | ||
| type failingRouter struct { | ||
| readErr error | ||
| writeErr error | ||
| } | ||
|
|
||
| var _ Router = (*failingRouter)(nil) | ||
|
|
||
| func (f *failingRouter) Read(string, []byte) ([]byte, bool, error) { | ||
| return nil, false, f.readErr | ||
| } | ||
|
|
||
| func (f *failingRouter) ApplyChangeSets(context.Context, []*proto.NamedChangeSet) error { | ||
| return f.writeErr | ||
| } | ||
|
|
||
| func (f *failingRouter) Iterator(string, []byte, []byte, bool) (dbm.Iterator, error) { | ||
| return nil, errors.New("failingRouter.Iterator: not used by these tests") | ||
| } | ||
|
|
||
| func (f *failingRouter) GetProof(string, []byte) (*ics23.CommitmentProof, error) { | ||
| return nil, errors.New("failingRouter.GetProof: not used by these tests") | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_GetPanicsOnRouterError(t *testing.T) { | ||
| store := NewRouterCommitKVStore( | ||
| &failingRouter{readErr: errors.New("boom")}, | ||
| testRouterStoreName, | ||
| func() int64 { return 0 }, | ||
| ) | ||
| require.PanicsWithError(t, `RouterCommitKVStore.Get(store="bank"): boom`, func() { | ||
| _ = store.Get([]byte("k")) | ||
| }) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_HasPanicsOnRouterError(t *testing.T) { | ||
| store := NewRouterCommitKVStore( | ||
| &failingRouter{readErr: errors.New("boom")}, | ||
| testRouterStoreName, | ||
| func() int64 { return 0 }, | ||
| ) | ||
| require.PanicsWithError(t, `RouterCommitKVStore.Has(store="bank"): boom`, func() { | ||
| _ = store.Has([]byte("k")) | ||
| }) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_SetPanicsOnRouterError(t *testing.T) { | ||
| store := NewRouterCommitKVStore( | ||
| &failingRouter{writeErr: errors.New("boom")}, | ||
| testRouterStoreName, | ||
| func() int64 { return 0 }, | ||
| ) | ||
| require.PanicsWithError(t, `RouterCommitKVStore.ApplyChangeSets(store="bank"): boom`, func() { | ||
| store.Set([]byte("k"), []byte("v")) | ||
| }) | ||
| } | ||
|
|
||
| func TestRouterCommitKVStore_RemovePanicsOnRouterError(t *testing.T) { | ||
| store := NewRouterCommitKVStore( | ||
| &failingRouter{writeErr: errors.New("boom")}, | ||
| testRouterStoreName, | ||
| func() int64 { return 0 }, | ||
| ) | ||
| require.PanicsWithError(t, `RouterCommitKVStore.ApplyChangeSets(store="bank"): boom`, func() { | ||
| store.Remove([]byte("k")) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.Background() is hardcoded, do we need any outer ctx for per-Set cancellation? not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the main feature branch where I've got all of this code integrated, I actually removed the goroutines from the routers (race conditions + premature optimization 😅). Would you be ok with me addressing that issue in the feature branch instead of here? I can make the next follow up PR be the one that contains those changes.