From 4b2247b519bc3434b8bac04c25fe0bf8d79ad70b Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Fri, 13 Mar 2026 11:37:01 -0500 Subject: [PATCH 1/2] Helper files for the flatKV cache implementation --- Makefile | 7 + sei-db/db_engine/dbcache/cache.go | 47 +++ sei-db/db_engine/dbcache/cached_batch.go | 55 ++++ sei-db/db_engine/dbcache/cached_batch_test.go | 204 ++++++++++++ .../db_engine/dbcache/cached_key_value_db.go | 83 +++++ sei-db/db_engine/dbcache/lru_queue.go | 83 +++++ sei-db/db_engine/dbcache/lru_queue_test.go | 310 ++++++++++++++++++ sei-db/db_engine/dbcache/noop_cache.go | 58 ++++ sei-db/db_engine/dbcache/noop_cache_test.go | 152 +++++++++ sei-db/db_engine/dbcache/shard_manager.go | 46 +++ .../db_engine/dbcache/shard_manager_test.go | 271 +++++++++++++++ sei-db/db_engine/types/types.go | 13 + 12 files changed, 1329 insertions(+) create mode 100644 sei-db/db_engine/dbcache/cache.go create mode 100644 sei-db/db_engine/dbcache/cached_batch.go create mode 100644 sei-db/db_engine/dbcache/cached_batch_test.go create mode 100644 sei-db/db_engine/dbcache/cached_key_value_db.go create mode 100644 sei-db/db_engine/dbcache/lru_queue.go create mode 100644 sei-db/db_engine/dbcache/lru_queue_test.go create mode 100644 sei-db/db_engine/dbcache/noop_cache.go create mode 100644 sei-db/db_engine/dbcache/noop_cache_test.go create mode 100644 sei-db/db_engine/dbcache/shard_manager.go create mode 100644 sei-db/db_engine/dbcache/shard_manager_test.go diff --git a/Makefile b/Makefile index 83b26cd84b..de0e50f8e2 100644 --- a/Makefile +++ b/Makefile @@ -157,6 +157,13 @@ lint: go mod tidy go mod verify +# Run lint on the sei-db package. Much faster than running lint on the entire project.\ +# Makes life easier for storage team when iterating on changes inside the sei-db package. +dblint: + go run github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.8.0 run ./sei-db/... + go fmt ./sei-db/... + go vet ./sei-db/... + build: go build $(BUILD_FLAGS) -o ./build/seid ./cmd/seid diff --git a/sei-db/db_engine/dbcache/cache.go b/sei-db/db_engine/dbcache/cache.go new file mode 100644 index 0000000000..fa4b292b85 --- /dev/null +++ b/sei-db/db_engine/dbcache/cache.go @@ -0,0 +1,47 @@ +package dbcache + +import ( + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +// Cache describes a cache capable of being used by a FlatKV store. +type Cache interface { + + // Get returns the value for the given key, or (nil, false) if not found. + Get( + // The entry to fetch. + key []byte, + // If true, the LRU queue will be updated. If false, the LRU queue will not be updated. + // Useful for when an operation is performed multiple times in close succession on the same key, + // since it requires non-zero overhead to do so with little benefit. + updateLru bool, + ) ([]byte, bool, error) + + // Perform a batch read operation. Given a map of keys to read, performs the reads and updates the + // map with the results. + // + // It is not thread safe to read or mutate the map while this method is running. + BatchGet(keys map[string]types.BatchGetResult) error + + // Set sets the value for the given key. + Set(key []byte, value []byte) + + // Delete deletes the value for the given key. + Delete(key []byte) + + // BatchSet applies the given updates to the cache. + BatchSet(updates []CacheUpdate) error +} + +// CacheUpdate describes a single key-value mutation to apply to the cache. +type CacheUpdate struct { + // The key to update. + Key []byte + // The value to set. If nil, the key will be deleted. + Value []byte +} + +// IsDelete returns true if the update is a delete operation. +func (u *CacheUpdate) IsDelete() bool { + return u.Value == nil +} diff --git a/sei-db/db_engine/dbcache/cached_batch.go b/sei-db/db_engine/dbcache/cached_batch.go new file mode 100644 index 0000000000..e4995fe33b --- /dev/null +++ b/sei-db/db_engine/dbcache/cached_batch.go @@ -0,0 +1,55 @@ +package dbcache + +import ( + "fmt" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +// cachedBatch wraps a types.Batch and applies pending mutations to the cache +// after a successful commit. +type cachedBatch struct { + inner types.Batch + cache Cache + pending []CacheUpdate +} + +var _ types.Batch = (*cachedBatch)(nil) + +func newCachedBatch(inner types.Batch, cache Cache) *cachedBatch { + return &cachedBatch{inner: inner, cache: cache} +} + +func (cb *cachedBatch) Set(key, value []byte) error { + cb.pending = append(cb.pending, CacheUpdate{Key: key, Value: value}) + return cb.inner.Set(key, value) +} + +func (cb *cachedBatch) Delete(key []byte) error { + cb.pending = append(cb.pending, CacheUpdate{Key: key, Value: nil}) + return cb.inner.Delete(key) +} + +func (cb *cachedBatch) Commit(opts types.WriteOptions) error { + if err := cb.inner.Commit(opts); err != nil { + return err + } + if err := cb.cache.BatchSet(cb.pending); err != nil { + return fmt.Errorf("failed to update cache after commit: %w", err) + } + cb.pending = nil + return nil +} + +func (cb *cachedBatch) Len() int { + return cb.inner.Len() +} + +func (cb *cachedBatch) Reset() { + cb.inner.Reset() + cb.pending = nil +} + +func (cb *cachedBatch) Close() error { + return cb.inner.Close() +} diff --git a/sei-db/db_engine/dbcache/cached_batch_test.go b/sei-db/db_engine/dbcache/cached_batch_test.go new file mode 100644 index 0000000000..5aeb533238 --- /dev/null +++ b/sei-db/db_engine/dbcache/cached_batch_test.go @@ -0,0 +1,204 @@ +package dbcache + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +// --------------------------------------------------------------------------- +// mock batch +// --------------------------------------------------------------------------- + +type mockBatch struct { + sets []CacheUpdate + deletes [][]byte + committed bool + closed bool + resetCount int + commitErr error +} + +func (m *mockBatch) Set(key, value []byte) error { + m.sets = append(m.sets, CacheUpdate{Key: key, Value: value}) + return nil +} + +func (m *mockBatch) Delete(key []byte) error { + m.deletes = append(m.deletes, key) + return nil +} + +func (m *mockBatch) Commit(opts types.WriteOptions) error { + if m.commitErr != nil { + return m.commitErr + } + m.committed = true + return nil +} + +func (m *mockBatch) Len() int { + return len(m.sets) + len(m.deletes) +} + +func (m *mockBatch) Reset() { + m.sets = nil + m.deletes = nil + m.committed = false + m.resetCount++ +} + +func (m *mockBatch) Close() error { + m.closed = true + return nil +} + +// --------------------------------------------------------------------------- +// mock cache +// --------------------------------------------------------------------------- + +type mockCache struct { + data map[string][]byte + batchSetErr error +} + +func newMockCache() *mockCache { + return &mockCache{data: make(map[string][]byte)} +} + +func (mc *mockCache) Get(key []byte, _ bool) ([]byte, bool, error) { + v, ok := mc.data[string(key)] + return v, ok, nil +} + +func (mc *mockCache) BatchGet(keys map[string]types.BatchGetResult) error { + for k := range keys { + v, ok := mc.data[k] + if ok { + keys[k] = types.BatchGetResult{Value: v} + } + } + return nil +} + +func (mc *mockCache) Set(key, value []byte) { + mc.data[string(key)] = value +} + +func (mc *mockCache) Delete(key []byte) { + delete(mc.data, string(key)) +} + +func (mc *mockCache) BatchSet(updates []CacheUpdate) error { + if mc.batchSetErr != nil { + return mc.batchSetErr + } + for _, u := range updates { + if u.IsDelete() { + delete(mc.data, string(u.Key)) + } else { + mc.data[string(u.Key)] = u.Value + } + } + return nil +} + +// --------------------------------------------------------------------------- +// tests +// --------------------------------------------------------------------------- + +func TestCachedBatchCommitUpdatesCacheOnSuccess(t *testing.T) { + inner := &mockBatch{} + cache := newMockCache() + cb := newCachedBatch(inner, cache) + + require.NoError(t, cb.Set([]byte("a"), []byte("1"))) + require.NoError(t, cb.Set([]byte("b"), []byte("2"))) + require.NoError(t, cb.Commit(types.WriteOptions{})) + + require.True(t, inner.committed) + v, ok := cache.data["a"] + require.True(t, ok) + require.Equal(t, []byte("1"), v) + v, ok = cache.data["b"] + require.True(t, ok) + require.Equal(t, []byte("2"), v) +} + +func TestCachedBatchCommitDoesNotUpdateCacheOnInnerFailure(t *testing.T) { + inner := &mockBatch{commitErr: errors.New("disk full")} + cache := newMockCache() + cb := newCachedBatch(inner, cache) + + require.NoError(t, cb.Set([]byte("a"), []byte("1"))) + err := cb.Commit(types.WriteOptions{}) + + require.Error(t, err) + require.Contains(t, err.Error(), "disk full") + _, ok := cache.data["a"] + require.False(t, ok, "cache should not be updated when inner commit fails") +} + +func TestCachedBatchCommitReturnsCacheError(t *testing.T) { + inner := &mockBatch{} + cache := newMockCache() + cache.batchSetErr = errors.New("cache broken") + cb := newCachedBatch(inner, cache) + + require.NoError(t, cb.Set([]byte("a"), []byte("1"))) + err := cb.Commit(types.WriteOptions{}) + + require.Error(t, err) + require.Contains(t, err.Error(), "cache broken") + require.True(t, inner.committed, "inner batch should have committed") +} + +func TestCachedBatchDeleteMarksKeyForRemoval(t *testing.T) { + inner := &mockBatch{} + cache := newMockCache() + cache.Set([]byte("x"), []byte("old")) + cb := newCachedBatch(inner, cache) + + require.NoError(t, cb.Delete([]byte("x"))) + require.NoError(t, cb.Commit(types.WriteOptions{})) + + _, ok := cache.data["x"] + require.False(t, ok, "key should be deleted from cache") +} + +func TestCachedBatchResetClearsPending(t *testing.T) { + inner := &mockBatch{} + cache := newMockCache() + cb := newCachedBatch(inner, cache) + + require.NoError(t, cb.Set([]byte("a"), []byte("1"))) + require.NoError(t, cb.Set([]byte("b"), []byte("2"))) + cb.Reset() + + require.NoError(t, cb.Commit(types.WriteOptions{})) + + require.Empty(t, cache.data, "cache should have no entries after reset + commit") +} + +func TestCachedBatchLenDelegatesToInner(t *testing.T) { + inner := &mockBatch{} + cache := newMockCache() + cb := newCachedBatch(inner, cache) + + require.Equal(t, 0, cb.Len()) + require.NoError(t, cb.Set([]byte("a"), []byte("1"))) + require.NoError(t, cb.Delete([]byte("b"))) + require.Equal(t, 2, cb.Len()) +} + +func TestCachedBatchCloseDelegatesToInner(t *testing.T) { + inner := &mockBatch{} + cache := newMockCache() + cb := newCachedBatch(inner, cache) + + require.NoError(t, cb.Close()) + require.True(t, inner.closed) +} diff --git a/sei-db/db_engine/dbcache/cached_key_value_db.go b/sei-db/db_engine/dbcache/cached_key_value_db.go new file mode 100644 index 0000000000..0f926dff98 --- /dev/null +++ b/sei-db/db_engine/dbcache/cached_key_value_db.go @@ -0,0 +1,83 @@ +package dbcache + +import ( + "fmt" + + errorutils "github.com/sei-protocol/sei-chain/sei-db/common/errors" + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +var _ types.KeyValueDB = (*cachedKeyValueDB)(nil) +var _ types.Checkpointable = (*cachedKeyValueDB)(nil) + +// Combines a cache and a key-value database to create a new key-value database with caching. +type cachedKeyValueDB struct { + db types.KeyValueDB + cache Cache +} + +// Combine a cache and a key-value database to create a new key-value database with caching. +func NewCachedKeyValueDB(db types.KeyValueDB, cache Cache) types.KeyValueDB { + return &cachedKeyValueDB{db: db, cache: cache} +} + +func (c *cachedKeyValueDB) Get(key []byte) ([]byte, error) { + val, found, err := c.cache.Get(key, true) + if err != nil { + return nil, fmt.Errorf("failed to get value from cache: %w", err) + } + if !found { + return nil, errorutils.ErrNotFound + } + return val, nil +} + +func (c *cachedKeyValueDB) BatchGet(keys map[string]types.BatchGetResult) error { + err := c.cache.BatchGet(keys) + if err != nil { + return fmt.Errorf("failed to get values from cache: %w", err) + } + return nil +} + +func (c *cachedKeyValueDB) Set(key []byte, value []byte, opts types.WriteOptions) error { + err := c.db.Set(key, value, opts) + if err != nil { + return fmt.Errorf("failed to set value in database: %w", err) + } + c.cache.Set(key, value) + return nil +} + +func (c *cachedKeyValueDB) Delete(key []byte, opts types.WriteOptions) error { + err := c.db.Delete(key, opts) + if err != nil { + return fmt.Errorf("failed to delete value in database: %w", err) + } + c.cache.Delete(key) + return nil +} + +func (c *cachedKeyValueDB) NewIter(opts *types.IterOptions) (types.KeyValueDBIterator, error) { + return c.db.NewIter(opts) +} + +func (c *cachedKeyValueDB) NewBatch() types.Batch { + return newCachedBatch(c.db.NewBatch(), c.cache) +} + +func (c *cachedKeyValueDB) Flush() error { + return c.db.Flush() +} + +func (c *cachedKeyValueDB) Close() error { + return c.db.Close() +} + +func (c *cachedKeyValueDB) Checkpoint(destDir string) error { + cp, ok := c.db.(types.Checkpointable) + if !ok { + return fmt.Errorf("underlying database does not support Checkpoint") + } + return cp.Checkpoint(destDir) +} diff --git a/sei-db/db_engine/dbcache/lru_queue.go b/sei-db/db_engine/dbcache/lru_queue.go new file mode 100644 index 0000000000..6870679c9d --- /dev/null +++ b/sei-db/db_engine/dbcache/lru_queue.go @@ -0,0 +1,83 @@ +package dbcache + +import "container/list" + +// Implements a queue-like abstraction with LRU semantics. Not thread safe. +type lruQueue struct { + order *list.List + entries map[string]*list.Element + totalSize uint64 +} + +type lruQueueEntry struct { + key string + size uint64 +} + +// Create a new LRU queue. +func newLRUQueue() *lruQueue { + return &lruQueue{ + order: list.New(), + entries: make(map[string]*list.Element), + } +} + +// Add a new entry to the LRU queue. Can also be used to update an existing value with a new weight. +func (lru *lruQueue) Push( + // the key in the cache that was recently interacted with + key []byte, + // the size of the key + value + size uint64, +) { + if elem, ok := lru.entries[string(key)]; ok { + entry := elem.Value.(*lruQueueEntry) + lru.totalSize += size - entry.size + entry.size = size + lru.order.MoveToBack(elem) + return + } + + keyStr := string(key) + elem := lru.order.PushBack(&lruQueueEntry{ + key: keyStr, + size: size, + }) + lru.entries[keyStr] = elem + lru.totalSize += size +} + +// Signal that an entry has been interated with, moving it to the back of the queue +// (i.e. making it so it doesn't get popped soon). +func (lru *lruQueue) Touch(key []byte) { + elem, ok := lru.entries[string(key)] + if !ok { + return + } + lru.order.MoveToBack(elem) +} + +// Returns the total size of all entries in the LRU queue. +func (lru *lruQueue) GetTotalSize() uint64 { + return lru.totalSize +} + +// Returns a count of the number of entries in the LRU queue, where each entry counts for 1 regardless of size. +func (lru *lruQueue) GetCount() uint64 { + return uint64(len(lru.entries)) +} + +// Pops a single element out of the queue. The element removed is the entry least recently passed to Update(). +// Returns the key in string form to avoid copying the key an additional time. +// Panics if the queue is empty. +func (lru *lruQueue) PopLeastRecentlyUsed() string { + elem := lru.order.Front() + if elem == nil { + panic("cannot pop from empty LRU queue") + } + + lru.order.Remove(elem) + entry := elem.Value.(*lruQueueEntry) + delete(lru.entries, entry.key) + lru.totalSize -= entry.size + return entry.key +} diff --git a/sei-db/db_engine/dbcache/lru_queue_test.go b/sei-db/db_engine/dbcache/lru_queue_test.go new file mode 100644 index 0000000000..0073e6d1f0 --- /dev/null +++ b/sei-db/db_engine/dbcache/lru_queue_test.go @@ -0,0 +1,310 @@ +package dbcache + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLRUQueueIsolatesFromCallerMutation(t *testing.T) { + lru := newLRUQueue() + + key := []byte("a") + lru.Push(key, 1) + key[0] = 'z' + + require.Equal(t, "a", lru.PopLeastRecentlyUsed()) +} + +func TestNewLRUQueueStartsEmpty(t *testing.T) { + lru := newLRUQueue() + + require.Equal(t, uint64(0), lru.GetCount()) + require.Equal(t, uint64(0), lru.GetTotalSize()) +} + +func TestPopLeastRecentlyUsedPanicsOnEmptyQueue(t *testing.T) { + lru := newLRUQueue() + require.Panics(t, func() { lru.PopLeastRecentlyUsed() }) +} + +func TestPopLeastRecentlyUsedPanicsAfterDrain(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("x"), 1) + lru.PopLeastRecentlyUsed() + + require.Panics(t, func() { lru.PopLeastRecentlyUsed() }) +} + +func TestPushSingleElement(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("only"), 42) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, uint64(42), lru.GetTotalSize()) + require.Equal(t, "only", lru.PopLeastRecentlyUsed()) +} + +func TestPushDuplicateDecreasesSize(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("k"), 100) + lru.Push([]byte("k"), 30) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, uint64(30), lru.GetTotalSize()) +} + +func TestPushDuplicateMovesToBack(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 1) + lru.Push([]byte("b"), 1) + lru.Push([]byte("c"), 1) + + // Re-push "a" — should move it behind "b" and "c" + lru.Push([]byte("a"), 1) + + require.Equal(t, "b", lru.PopLeastRecentlyUsed()) + require.Equal(t, "c", lru.PopLeastRecentlyUsed()) + require.Equal(t, "a", lru.PopLeastRecentlyUsed()) +} + +func TestPushZeroSize(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("z"), 0) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, uint64(0), lru.GetTotalSize()) + require.Equal(t, "z", lru.PopLeastRecentlyUsed()) + require.Equal(t, uint64(0), lru.GetTotalSize()) +} + +func TestPushEmptyKey(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte(""), 5) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, "", lru.PopLeastRecentlyUsed()) +} + +func TestPushRepeatedUpdatesToSameKey(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("k"), 1) + lru.Push([]byte("k"), 2) + lru.Push([]byte("k"), 3) + lru.Push([]byte("k"), 4) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, uint64(4), lru.GetTotalSize()) +} + +func TestTouchNonexistentKeyIsNoop(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 1) + + lru.Touch([]byte("missing")) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, "a", lru.PopLeastRecentlyUsed()) +} + +func TestTouchOnEmptyQueueIsNoop(t *testing.T) { + lru := newLRUQueue() + lru.Touch([]byte("ghost")) + + require.Equal(t, uint64(0), lru.GetCount()) +} + +func TestTouchSingleElement(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("solo"), 10) + lru.Touch([]byte("solo")) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, "solo", lru.PopLeastRecentlyUsed()) +} + +func TestTouchDoesNotAffectSizeOrCount(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 3) + lru.Push([]byte("b"), 7) + + lru.Touch([]byte("a")) + + require.Equal(t, uint64(2), lru.GetCount()) + require.Equal(t, uint64(10), lru.GetTotalSize()) +} + +func TestMultipleTouchesChangeOrder(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 1) + lru.Push([]byte("b"), 1) + lru.Push([]byte("c"), 1) + + // Order: a, b, c + lru.Touch([]byte("a")) // Order: b, c, a + lru.Touch([]byte("b")) // Order: c, a, b + + require.Equal(t, "c", lru.PopLeastRecentlyUsed()) + require.Equal(t, "a", lru.PopLeastRecentlyUsed()) + require.Equal(t, "b", lru.PopLeastRecentlyUsed()) +} + +func TestTouchAlreadyMostRecentIsNoop(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 1) + lru.Push([]byte("b"), 1) + + lru.Touch([]byte("b")) // "b" is already at back + + require.Equal(t, "a", lru.PopLeastRecentlyUsed()) + require.Equal(t, "b", lru.PopLeastRecentlyUsed()) +} + +func TestPopDecrementsCountAndSize(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 10) + lru.Push([]byte("b"), 20) + lru.Push([]byte("c"), 30) + + lru.PopLeastRecentlyUsed() + + require.Equal(t, uint64(2), lru.GetCount()) + require.Equal(t, uint64(50), lru.GetTotalSize()) + + lru.PopLeastRecentlyUsed() + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, uint64(30), lru.GetTotalSize()) +} + +func TestPopFIFOOrderWithoutTouches(t *testing.T) { + lru := newLRUQueue() + keys := []string{"first", "second", "third", "fourth"} + for _, k := range keys { + lru.Push([]byte(k), 1) + } + + for _, want := range keys { + require.Equal(t, want, lru.PopLeastRecentlyUsed()) + } +} + +func TestPushAfterDrain(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 5) + lru.PopLeastRecentlyUsed() + + lru.Push([]byte("x"), 10) + lru.Push([]byte("y"), 20) + + require.Equal(t, uint64(2), lru.GetCount()) + require.Equal(t, uint64(30), lru.GetTotalSize()) + require.Equal(t, "x", lru.PopLeastRecentlyUsed()) +} + +func TestPushPreviouslyPoppedKey(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("recycled"), 5) + lru.PopLeastRecentlyUsed() + + lru.Push([]byte("recycled"), 99) + + require.Equal(t, uint64(1), lru.GetCount()) + require.Equal(t, uint64(99), lru.GetTotalSize()) + require.Equal(t, "recycled", lru.PopLeastRecentlyUsed()) +} + +func TestInterleavedPushAndPop(t *testing.T) { + lru := newLRUQueue() + + lru.Push([]byte("a"), 1) + lru.Push([]byte("b"), 2) + + require.Equal(t, "a", lru.PopLeastRecentlyUsed()) + + lru.Push([]byte("c"), 3) + + require.Equal(t, uint64(2), lru.GetCount()) + require.Equal(t, uint64(5), lru.GetTotalSize()) + require.Equal(t, "b", lru.PopLeastRecentlyUsed()) + require.Equal(t, "c", lru.PopLeastRecentlyUsed()) +} + +func TestTouchThenPushSameKey(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 1) + lru.Push([]byte("b"), 1) + + lru.Touch([]byte("a")) // order: b, a + lru.Push([]byte("a"), 50) // updates size, stays at back + + require.Equal(t, uint64(2), lru.GetCount()) + require.Equal(t, uint64(51), lru.GetTotalSize()) + require.Equal(t, "b", lru.PopLeastRecentlyUsed()) +} + +func TestBinaryKeyData(t *testing.T) { + lru := newLRUQueue() + k1 := []byte{0x00, 0xFF, 0x01} + k2 := []byte{0x00, 0xFF, 0x02} + + lru.Push(k1, 10) + lru.Push(k2, 20) + + require.Equal(t, uint64(2), lru.GetCount()) + require.Equal(t, string(k1), lru.PopLeastRecentlyUsed()) + + lru.Touch(k2) + require.Equal(t, string(k2), lru.PopLeastRecentlyUsed()) +} + +func TestCallerMutationAfterTouchDoesNotAffectQueue(t *testing.T) { + lru := newLRUQueue() + key := []byte("abc") + lru.Push(key, 1) + + key[0] = 'Z' + lru.Touch(key) // Touch with mutated key ("Zbc") — should be a no-op + + require.Equal(t, "abc", lru.PopLeastRecentlyUsed()) +} + +func TestManyEntries(t *testing.T) { + lru := newLRUQueue() + n := 1000 + var totalSize uint64 + + for i := 0; i < n; i++ { + k := fmt.Sprintf("key-%04d", i) + lru.Push([]byte(k), uint64(i+1)) + totalSize += uint64(i + 1) + } + + require.Equal(t, uint64(n), lru.GetCount()) + require.Equal(t, totalSize, lru.GetTotalSize()) + + for i := 0; i < n; i++ { + want := fmt.Sprintf("key-%04d", i) + require.Equal(t, want, lru.PopLeastRecentlyUsed(), "pop %d", i) + } + + require.Equal(t, uint64(0), lru.GetCount()) + require.Equal(t, uint64(0), lru.GetTotalSize()) +} + +func TestPushUpdatedSizeThenPopVerifySizeAccounting(t *testing.T) { + lru := newLRUQueue() + lru.Push([]byte("a"), 10) + lru.Push([]byte("b"), 20) + lru.Push([]byte("a"), 5) // decrease a's size from 10 to 5 + + require.Equal(t, uint64(25), lru.GetTotalSize()) + + // Pop "b" (it's the LRU since "a" was re-pushed to back). + lru.PopLeastRecentlyUsed() + require.Equal(t, uint64(5), lru.GetTotalSize()) + + lru.PopLeastRecentlyUsed() + require.Equal(t, uint64(0), lru.GetTotalSize()) +} diff --git a/sei-db/db_engine/dbcache/noop_cache.go b/sei-db/db_engine/dbcache/noop_cache.go new file mode 100644 index 0000000000..1e40e02879 --- /dev/null +++ b/sei-db/db_engine/dbcache/noop_cache.go @@ -0,0 +1,58 @@ +package dbcache + +import ( + "fmt" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +var _ Cache = (*noOpCache)(nil) + +// noOpCache is a Cache that performs no caching. Every Get falls through +// to the underlying readFunc. Set, Delete, and BatchSet are no-ops. +// Useful for testing the storage layer without cache interference, or for +// workloads where caching is not beneficial. +type noOpCache struct { + readFunc func(key []byte) ([]byte, bool, error) +} + +// NewNoOpCache creates a Cache that always reads from readFunc and never caches. +func NewNoOpCache(readFunc func(key []byte) ([]byte, bool, error)) Cache { + return &noOpCache{readFunc: readFunc} +} + +func (c *noOpCache) Get(key []byte, _ bool) ([]byte, bool, error) { + return c.readFunc(key) +} + +func (c *noOpCache) BatchGet(keys map[string]types.BatchGetResult) error { + var firstErr error + for k := range keys { + val, _, err := c.readFunc([]byte(k)) + if err != nil { + keys[k] = types.BatchGetResult{Error: err} + if firstErr == nil { + firstErr = err + } + } else { + keys[k] = types.BatchGetResult{Value: val} + } + } + if firstErr != nil { + return fmt.Errorf("unable to batch get: %w", firstErr) + } + return nil +} + +func (c *noOpCache) Set([]byte, []byte) { + // intentional no-op +} + +func (c *noOpCache) Delete([]byte) { + // intentional no-op +} + +func (c *noOpCache) BatchSet([]CacheUpdate) error { + // intentional no-op + return nil +} diff --git a/sei-db/db_engine/dbcache/noop_cache_test.go b/sei-db/db_engine/dbcache/noop_cache_test.go new file mode 100644 index 0000000000..2fd7bb2790 --- /dev/null +++ b/sei-db/db_engine/dbcache/noop_cache_test.go @@ -0,0 +1,152 @@ +package dbcache + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" +) + +func newNoOpTestCache(store map[string][]byte) Cache { + return NewNoOpCache(func(key []byte) ([]byte, bool, error) { + v, ok := store[string(key)] + if !ok { + return nil, false, nil + } + return v, true, nil + }) +} + +func TestNoOpGetFound(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{"k": []byte("v")}) + + val, found, err := c.Get([]byte("k"), true) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, "v", string(val)) +} + +func TestNoOpGetNotFound(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{}) + + val, found, err := c.Get([]byte("missing"), true) + require.NoError(t, err) + require.False(t, found) + require.Nil(t, val) +} + +func TestNoOpGetError(t *testing.T) { + dbErr := errors.New("broken") + c := NewNoOpCache(func(key []byte) ([]byte, bool, error) { + return nil, false, dbErr + }) + + _, _, err := c.Get([]byte("k"), true) + require.ErrorIs(t, err, dbErr) +} + +func TestNoOpGetIgnoresUpdateLru(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{"k": []byte("v")}) + + val1, _, _ := c.Get([]byte("k"), true) + val2, _, _ := c.Get([]byte("k"), false) + require.Equal(t, string(val1), string(val2)) +} + +func TestNoOpGetAlwaysReadsFromFunc(t *testing.T) { + store := map[string][]byte{"k": []byte("v1")} + c := newNoOpTestCache(store) + + val, _, _ := c.Get([]byte("k"), true) + require.Equal(t, "v1", string(val)) + + store["k"] = []byte("v2") + + val, _, _ = c.Get([]byte("k"), true) + require.Equal(t, "v2", string(val), "should re-read from func, not cache") +} + +func TestNoOpSetIsNoOp(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{}) + + c.Set([]byte("k"), []byte("v")) + + _, found, err := c.Get([]byte("k"), true) + require.NoError(t, err) + require.False(t, found, "Set should not cache anything") +} + +func TestNoOpDeleteIsNoOp(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{"k": []byte("v")}) + + c.Delete([]byte("k")) + + val, found, err := c.Get([]byte("k"), true) + require.NoError(t, err) + require.True(t, found, "Delete should not affect reads") + require.Equal(t, "v", string(val)) +} + +func TestNoOpBatchSetIsNoOp(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{}) + + err := c.BatchSet([]CacheUpdate{ + {Key: []byte("a"), Value: []byte("1")}, + {Key: []byte("b"), Value: []byte("2")}, + }) + require.NoError(t, err) + + _, found, _ := c.Get([]byte("a"), true) + require.False(t, found) + _, found, _ = c.Get([]byte("b"), true) + require.False(t, found) +} + +func TestNoOpBatchSetEmptyAndNil(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{}) + + require.NoError(t, c.BatchSet(nil)) + require.NoError(t, c.BatchSet([]CacheUpdate{})) +} + +func TestNoOpBatchGetAllFound(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{"a": []byte("1"), "b": []byte("2")}) + + keys := map[string]types.BatchGetResult{"a": {}, "b": {}} + require.NoError(t, c.BatchGet(keys)) + + require.True(t, keys["a"].IsFound()) + require.Equal(t, "1", string(keys["a"].Value)) + require.True(t, keys["b"].IsFound()) + require.Equal(t, "2", string(keys["b"].Value)) +} + +func TestNoOpBatchGetNotFound(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{}) + + keys := map[string]types.BatchGetResult{"x": {}} + require.NoError(t, c.BatchGet(keys)) + require.False(t, keys["x"].IsFound()) +} + +func TestNoOpBatchGetError(t *testing.T) { + dbErr := errors.New("fail") + c := NewNoOpCache(func(key []byte) ([]byte, bool, error) { + return nil, false, dbErr + }) + + keys := map[string]types.BatchGetResult{"k": {}} + err := c.BatchGet(keys) + require.Error(t, err) + require.ErrorIs(t, err, dbErr) + require.Error(t, keys["k"].Error) +} + +func TestNoOpBatchGetEmpty(t *testing.T) { + c := newNoOpTestCache(map[string][]byte{}) + + keys := map[string]types.BatchGetResult{} + require.NoError(t, c.BatchGet(keys)) +} diff --git a/sei-db/db_engine/dbcache/shard_manager.go b/sei-db/db_engine/dbcache/shard_manager.go new file mode 100644 index 0000000000..bfc837845c --- /dev/null +++ b/sei-db/db_engine/dbcache/shard_manager.go @@ -0,0 +1,46 @@ +package dbcache + +import ( + "errors" + "hash/maphash" + "sync" +) + +var ErrNumShardsNotPowerOfTwo = errors.New("numShards must be a power of two and > 0") + +// A utility for assigning keys to shard indices. +type shardManager struct { + // A random seed that makes it hard for an attacker to predict the shard index and to skew the distribution. + seed maphash.Seed + // Used to perform a quick modulo operation to get the shard index (since numShards is a power of two) + mask uint64 + // reusable Hash objects to avoid allocs + pool sync.Pool +} + +// Creates a new Sharder. Number of shards must be a power of two and greater than 0. +func newShardManager(numShards uint64) (*shardManager, error) { + if numShards == 0 || (numShards&(numShards-1)) != 0 { + return nil, ErrNumShardsNotPowerOfTwo + } + + return &shardManager{ + seed: maphash.MakeSeed(), // secret, randomized + mask: numShards - 1, + pool: sync.Pool{ + New: func() any { return new(maphash.Hash) }, + }, + }, nil +} + +// Shard returns a shard index in [0, numShards). +// addr should be the raw address bytes (e.g., 20-byte ETH address). +func (s *shardManager) Shard(addr []byte) uint64 { + h := s.pool.Get().(*maphash.Hash) + h.SetSeed(s.seed) + _, _ = h.Write(addr) + x := h.Sum64() + s.pool.Put(h) + + return x & s.mask +} diff --git a/sei-db/db_engine/dbcache/shard_manager_test.go b/sei-db/db_engine/dbcache/shard_manager_test.go new file mode 100644 index 0000000000..07aa2041a2 --- /dev/null +++ b/sei-db/db_engine/dbcache/shard_manager_test.go @@ -0,0 +1,271 @@ +package dbcache + +import ( + "fmt" + "math" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +// --- NewShardManager --- + +func TestNewShardManagerValidPowersOfTwo(t *testing.T) { + for exp := 0; exp < 20; exp++ { + n := uint64(1) << exp + sm, err := newShardManager(n) + require.NoError(t, err, "numShards=%d", n) + require.NotNil(t, sm, "numShards=%d", n) + } +} + +func TestNewShardManagerZeroReturnsError(t *testing.T) { + sm, err := newShardManager(0) + require.ErrorIs(t, err, ErrNumShardsNotPowerOfTwo) + require.Nil(t, sm) +} + +func TestNewShardManagerNonPowersOfTwoReturnError(t *testing.T) { + bad := []uint64{3, 5, 6, 7, 9, 10, 12, 15, 17, 100, 255, 1023} + for _, n := range bad { + sm, err := newShardManager(n) + require.ErrorIs(t, err, ErrNumShardsNotPowerOfTwo, "numShards=%d", n) + require.Nil(t, sm, "numShards=%d", n) + } +} + +func TestNewShardManagerMaxUint64ReturnsError(t *testing.T) { + sm, err := newShardManager(math.MaxUint64) + require.ErrorIs(t, err, ErrNumShardsNotPowerOfTwo) + require.Nil(t, sm) +} + +func TestNewShardManagerLargePowerOfTwo(t *testing.T) { + n := uint64(1) << 40 + sm, err := newShardManager(n) + require.NoError(t, err) + require.NotNil(t, sm) +} + +// --- Shard: basic behaviour --- + +func TestShardReturnsBoundedIndex(t *testing.T) { + for _, numShards := range []uint64{1, 2, 4, 16, 256, 1024} { + sm, err := newShardManager(numShards) + require.NoError(t, err) + + for i := 0; i < 500; i++ { + key := []byte(fmt.Sprintf("key-%d", i)) + idx := sm.Shard(key) + require.Less(t, idx, numShards, "numShards=%d key=%s", numShards, key) + } + } +} + +func TestShardDeterministic(t *testing.T) { + sm, err := newShardManager(16) + require.NoError(t, err) + + key := []byte("deterministic-test-key") + first := sm.Shard(key) + for i := 0; i < 100; i++ { + require.Equal(t, first, sm.Shard(key)) + } +} + +func TestShardSingleShardAlwaysReturnsZero(t *testing.T) { + sm, err := newShardManager(1) + require.NoError(t, err) + + keys := [][]byte{ + {}, + {0x00}, + {0xFF}, + []byte("anything"), + []byte("another key entirely"), + } + for _, k := range keys { + require.Equal(t, uint64(0), sm.Shard(k), "key=%q", k) + } +} + +func TestShardEmptyKey(t *testing.T) { + sm, err := newShardManager(8) + require.NoError(t, err) + + idx := sm.Shard([]byte{}) + require.Less(t, idx, uint64(8)) + + // Deterministic + require.Equal(t, idx, sm.Shard([]byte{})) +} + +func TestShardNilKey(t *testing.T) { + sm, err := newShardManager(4) + require.NoError(t, err) + + idx := sm.Shard(nil) + require.Less(t, idx, uint64(4)) + require.Equal(t, idx, sm.Shard(nil)) +} + +func TestShardBinaryKeys(t *testing.T) { + sm, err := newShardManager(16) + require.NoError(t, err) + + k1 := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + k2 := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02} + + idx1 := sm.Shard(k1) + idx2 := sm.Shard(k2) + require.Less(t, idx1, uint64(16)) + require.Less(t, idx2, uint64(16)) +} + +func TestShardCallerMutationDoesNotAffectFutureResults(t *testing.T) { + sm, err := newShardManager(16) + require.NoError(t, err) + + key := []byte("mutable") + first := sm.Shard(key) + + key[0] = 'X' + second := sm.Shard([]byte("mutable")) + require.Equal(t, first, second) +} + +// --- Distribution --- + +func TestShardDistribution(t *testing.T) { + const numShards = 16 + const numKeys = 10_000 + sm, err := newShardManager(numShards) + require.NoError(t, err) + + counts := make([]int, numShards) + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("addr-%06d", i)) + counts[sm.Shard(key)]++ + } + + expected := float64(numKeys) / float64(numShards) + for shard, count := range counts { + ratio := float64(count) / expected + require.Greater(t, ratio, 0.5, "shard %d is severely underrepresented (%d)", shard, count) + require.Less(t, ratio, 1.5, "shard %d is severely overrepresented (%d)", shard, count) + } +} + +// --- Distinct managers --- + +func TestDifferentManagersHaveDifferentSeeds(t *testing.T) { + sm1, err := newShardManager(256) + require.NoError(t, err) + sm2, err := newShardManager(256) + require.NoError(t, err) + + // With distinct random seeds, at least some keys should hash differently. + diffCount := 0 + for i := 0; i < 200; i++ { + key := []byte(fmt.Sprintf("seed-test-%d", i)) + if sm1.Shard(key) != sm2.Shard(key) { + diffCount++ + } + } + require.Greater(t, diffCount, 0, "two managers with independent seeds should differ on at least one key") +} + +// --- Concurrency --- + +func TestShardConcurrentAccess(t *testing.T) { + sm, err := newShardManager(64) + require.NoError(t, err) + + const goroutines = 32 + const iters = 1000 + + key := []byte("concurrent-key") + expected := sm.Shard(key) + + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + go func() { + defer wg.Done() + for i := 0; i < iters; i++ { + got := sm.Shard(key) + if got != expected { + t.Errorf("concurrent Shard returned %d, want %d", got, expected) + return + } + } + }() + } + wg.Wait() +} + +func TestShardConcurrentDifferentKeys(t *testing.T) { + sm, err := newShardManager(32) + require.NoError(t, err) + + const goroutines = 16 + const keysPerGoroutine = 500 + + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + g := g + go func() { + defer wg.Done() + for i := 0; i < keysPerGoroutine; i++ { + key := []byte(fmt.Sprintf("g%d-k%d", g, i)) + idx := sm.Shard(key) + if idx >= 32 { + t.Errorf("Shard(%q) = %d, want < 32", key, idx) + return + } + } + }() + } + wg.Wait() +} + +// --- Mask correctness --- + +func TestShardMaskMatchesNumShards(t *testing.T) { + for exp := 0; exp < 16; exp++ { + numShards := uint64(1) << exp + sm, err := newShardManager(numShards) + require.NoError(t, err) + require.Equal(t, numShards-1, sm.mask, "numShards=%d", numShards) + } +} + +// --- 20-byte ETH-style addresses --- + +func TestShardWith20ByteAddresses(t *testing.T) { + sm, err := newShardManager(16) + require.NoError(t, err) + + addr := make([]byte, 20) + for i := 0; i < 20; i++ { + addr[i] = byte(i + 1) + } + + idx := sm.Shard(addr) + require.Less(t, idx, uint64(16)) + require.Equal(t, idx, sm.Shard(addr)) +} + +func TestShardSingleByteKey(t *testing.T) { + sm, err := newShardManager(4) + require.NoError(t, err) + + for b := 0; b < 256; b++ { + idx := sm.Shard([]byte{byte(b)}) + require.Less(t, idx, uint64(4), "byte=%d", b) + } +} diff --git a/sei-db/db_engine/types/types.go b/sei-db/db_engine/types/types.go index 0f82ac85a2..446ed39a65 100644 --- a/sei-db/db_engine/types/types.go +++ b/sei-db/db_engine/types/types.go @@ -20,6 +20,19 @@ type IterOptions struct { UpperBound []byte } +// BatchGetResult describes the result of a single key lookup within a BatchGet call. +type BatchGetResult struct { + // The value for the given key. If nil, the key was not found (but no error occurred). + Value []byte + // The error, if any, that occurred during the read. + Error error +} + +// IsFound returns true if the key was found (i.e. Value is not nil). +func (b BatchGetResult) IsFound() bool { + return b.Value != nil +} + // OpenOptions configures opening a DB. // // NOTE: This is intentionally minimal today. Most performance-critical knobs From 36d7328acbbd461ade625eb00c4b3f408f9671e1 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Fri, 13 Mar 2026 11:38:17 -0500 Subject: [PATCH 2/2] bugfix --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index de0e50f8e2..85dfb480d7 100644 --- a/Makefile +++ b/Makefile @@ -157,7 +157,7 @@ lint: go mod tidy go mod verify -# Run lint on the sei-db package. Much faster than running lint on the entire project.\ +# Run lint on the sei-db package. Much faster than running lint on the entire project. # Makes life easier for storage team when iterating on changes inside the sei-db package. dblint: go run github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.8.0 run ./sei-db/...