Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
7c5e216
Created a cache for flatKV.
Mar 5, 2026
4a404ee
checkpoint
Mar 5, 2026
d36e825
incremental progress
Mar 5, 2026
2ccbe62
address feedback
Mar 5, 2026
f412e85
more fixes
Mar 5, 2026
e310037
bugfix
Mar 5, 2026
cf1071c
wire in cache
Mar 5, 2026
11232ff
Merge branch 'main' into cjl/flatkv-cache
Mar 6, 2026
a8c1c75
incremental improvements
Mar 6, 2026
221d114
checkin
Mar 6, 2026
8eca079
Moved where the cache sits
Mar 6, 2026
267feae
bugfix
Mar 6, 2026
50b0be6
Batch update the cache
Mar 6, 2026
2ca00d6
Add batch read to cache
Mar 6, 2026
8f8534a
Add batch get to db interface
Mar 6, 2026
23c0277
integrate batch reads
Mar 6, 2026
02d3ca1
wire in cache
Mar 6, 2026
7ee1b08
Introduce work pool, size caches differently
Mar 6, 2026
20c70c3
bugfix
Mar 6, 2026
b714789
Add unit constants
Mar 9, 2026
cc9d41d
refactor threading utils
Mar 9, 2026
53b2bd8
cleanup
Mar 9, 2026
c10e0cd
Cleanup, fix race condition
Mar 9, 2026
04f40fa
cleanup
Mar 9, 2026
b4e4d2c
cleanup
Mar 9, 2026
e53fefa
use pool
Mar 9, 2026
438fc8d
fix ctx lifecycle
Mar 9, 2026
19a8a19
Merge branch 'main' into cjl/flatkv-cache
Mar 9, 2026
23440f6
rename package
Mar 9, 2026
4ecc8fd
Clean up string copies
Mar 9, 2026
7a315c6
simplify gc
Mar 9, 2026
a3f3907
better error handling
Mar 9, 2026
f255b87
use config to configure cache params
Mar 9, 2026
cf0a73d
Allow flatkv config to be set in tests
Mar 9, 2026
0b34737
tweak config
Mar 10, 2026
452aa4d
incremental progress
Mar 10, 2026
1c804a8
move data dir into config
Mar 10, 2026
663b2ea
fix config file
Mar 10, 2026
bb530b5
cleanup
Mar 10, 2026
04daf75
move pebble metrics to proper location
Mar 10, 2026
354818e
clean up metrics
Mar 10, 2026
b1574ac
updated dashboard
Mar 10, 2026
07e071c
fix histograms
Mar 10, 2026
d090796
threading tests
Mar 10, 2026
dfd92c1
test lru queue
Mar 10, 2026
f751a9b
unit tests for shard
Mar 10, 2026
7b5538e
cache tests
Mar 10, 2026
dc8d0c9
moar unit tests
Mar 10, 2026
e9cc9ca
cleanup
Mar 10, 2026
c7a418c
Merge branch 'main' into cjl/flatkv-cache
Mar 10, 2026
087fd0f
Merge branch 'main' into cjl/flatkv-cache
Mar 10, 2026
eb9bc51
Merge branch 'main' into cjl/flatkv-cache
Mar 11, 2026
cea0ebb
unit test fixes
Mar 11, 2026
e58bec2
fix hash bug
Mar 11, 2026
c3f34b1
fixed path bug
Mar 11, 2026
111459f
Helper files for flatKV cache
Mar 11, 2026
d40395f
add missing struct
Mar 11, 2026
c8e85d2
Merge branch 'main' into cjl/cache-auxilery
Mar 12, 2026
ed7e4b6
made suggested changes
Mar 12, 2026
5c46647
fix tests
Mar 12, 2026
be0d4f5
Merge branch 'main' into cjl/flatkv-cache
Mar 12, 2026
9ff2199
Merge branch 'cjl/cache-auxilery' into cjl/flatkv-cache
Mar 12, 2026
bb2fe7e
Maded suggested change to cache structure
Mar 13, 2026
f4b8326
rename cache -> dbcache to avoid gitignore
Mar 13, 2026
4b2247b
Helper files for the flatKV cache implementation
Mar 13, 2026
36d7328
bugfix
Mar 13, 2026
d759a9b
Merge branch 'cjl/cache-auxilery-2' into cjl/flatkv-cache
Mar 16, 2026
4ba242b
fix merge problems
Mar 16, 2026
e19a998
refactor API
Mar 16, 2026
94ae673
made suggested changes
Mar 16, 2026
ed10a26
made suggested changes
Mar 16, 2026
81dfd46
fix bug
Mar 16, 2026
480839d
Merge branch 'main' into cjl/flatkv-cache
Mar 16, 2026
7835683
Implement a standard cache.
Mar 16, 2026
950197c
cleanup
Mar 16, 2026
cff96ab
Merge branch 'main' into cjl/cache-impl
Mar 17, 2026
003fcc9
made suggested changes
Mar 17, 2026
a208a1b
made suggested change
Mar 17, 2026
157a600
made suggested changes
Mar 17, 2026
b41639f
fix unit test
Mar 17, 2026
fe31475
fix unit test
Mar 17, 2026
0702197
Merge branch 'cjl/cache-impl' into cjl/flatkv-cache
Mar 17, 2026
6d435f5
Merge branch 'main' into cjl/flatkv-cache
Mar 20, 2026
64f8530
fixed merge bugs
Mar 20, 2026
d9c5fc1
fix teardown race
Mar 20, 2026
14593ec
Add logging metric, clean up log files before/after run
Mar 20, 2026
2d88076
fix unit test
Mar 20, 2026
ccad074
fix unit tests
Mar 20, 2026
38ffd35
fix unit test
Mar 20, 2026
f143d30
made suggested changes
Mar 25, 2026
a18fd93
config changes
Mar 25, 2026
34e711d
made suggested changes
Mar 25, 2026
b596f89
Merge branch 'main' into cjl/flatkv-cache
Mar 25, 2026
33378ce
bugfix
Mar 27, 2026
a402238
don't ignore errors from batch get
Mar 30, 2026
b18bc4e
Merge branch 'main' into cjl/flatkv-cache
Mar 30, 2026
ee30ca9
made suggested changes
Mar 31, 2026
c8fd5ec
Merge branch 'main' into cjl/flatkv-cache
Mar 31, 2026
8094375
Merge branch 'main' into cjl/flatkv-cache
Mar 31, 2026
faf4871
make suggested change to pool
Apr 1, 2026
9c8454f
Merge branch 'main' into cjl/flatkv-cache
Apr 1, 2026
872b0a6
fix merge problem
Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions sei-db/common/utils/chan_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package utils

import (
"context"
"fmt"
)

// TODO unit test before merge

// Push to a channel, returning an error if the context is cancelled before the value is pushed.
func InterruptiblePush[T any](ctx context.Context, ch chan T, value T) error {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case ch <- value:
return nil
}
}

// Pull from a channel, returning an error if the context is cancelled before the value is pulled.
func InterruptiblePull[T any](ctx context.Context, ch <-chan T) (T, error) {
var zero T
select {
case <-ctx.Done():
return zero, fmt.Errorf("context cancelled: %w", ctx.Err())
case value, ok := <-ch:
if !ok {
return zero, fmt.Errorf("channel closed")
}
return value, nil
}
}
21 changes: 21 additions & 0 deletions sei-db/state_db/sc/flatkv/flatcache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package flatcache

import "github.com/sei-protocol/sei-chain/sei-db/proto"

// Cache describes a cache kapable of being used by a FlatKV store.
type Cache interface {

// TODO decide if we should support individual modifications

// Get returns the value for the given key, or (nil, false) if not found.
Get(key []byte) ([]byte, bool, error)

// Set sets the value for the given key.
Set(key []byte, value []byte)

// Delete deletes the value for the given key.
Delete(key []byte)

// BatchSet applies the given changesets to the cache.
BatchSet(cs []*proto.NamedChangeSet)
}
168 changes: 168 additions & 0 deletions sei-db/state_db/sc/flatkv/flatcache/cache_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package flatcache

import (
"context"
"fmt"
"time"

"github.com/sei-protocol/sei-chain/sei-db/proto"
iavl "github.com/sei-protocol/sei-chain/sei-iavl/proto"
)

var _ Cache = (*cache)(nil)

// A standard implementation of a flatcache.
type cache struct {
ctx context.Context

// A utility for assigning keys to shard indices.
shardManager *shardManager

// The shards in the cache.
shards []*shard

// A scheduler for asyncronous reads.

Check failure on line 24 in sei-db/state_db/sc/flatkv/flatcache/cache_impl.go

View workflow job for this annotation

GitHub Actions / Analyze

`asyncronous` is a misspelling of `asynchronous` (misspell)

Check failure on line 24 in sei-db/state_db/sc/flatkv/flatcache/cache_impl.go

View workflow job for this annotation

GitHub Actions / lint

`asyncronous` is a misspelling of `asynchronous` (misspell)
readScheduler *readScheduler

// The interval at which to run garbage collection.
garbageCollectionInterval time.Duration
}

// Creates a new Cache.
func NewCache(
ctx context.Context,
// A function that reads a value from the database.
readFunc func(key []byte) []byte,
// The number of shards in the cache. Must be a power of two and greater than 0.
shardCount int,
// The maximum size of the cache, in bytes.
maxSize int,
// The number of background goroutines to read values from the database.
readWorkerCount int,
// The max size of the read queue.
readQueueSize int,
// The interval at which to run garbage collection.
garbageCollectionInterval time.Duration,
) (Cache, error) {
if shardCount <= 0 || (shardCount&(shardCount-1)) != 0 {
return nil, ErrNumShardsNotPowerOfTwo
}
if maxSize <= 0 {
return nil, fmt.Errorf("maxSize must be greater than 0")
}
if readWorkerCount <= 0 {
return nil, fmt.Errorf("readWorkerCount must be greater than 0")
}
if readQueueSize <= 0 {
return nil, fmt.Errorf("readQueueSize must be greater than 0")
}

shardManager, err := NewShardManager(uint64(shardCount))
if err != nil {
return nil, fmt.Errorf("failed to create shard manager: %w", err)
}
if garbageCollectionInterval <= 0 {
return nil, fmt.Errorf("garbageCollectionInterval must be greater than 0")
}

readScheduler := NewReadScheduler(ctx, readFunc, readWorkerCount, readQueueSize)

sizePerShard := maxSize / shardCount
if sizePerShard <= 0 {
return nil, fmt.Errorf("maxSize must be greater than shardCount")
}

shards := make([]*shard, shardCount)
for i := 0; i < shardCount; i++ {
shards[i], err = NewShard(ctx, readScheduler, sizePerShard)
if err != nil {
return nil, fmt.Errorf("failed to create shard: %w", err)
}
}

c := &cache{
ctx: ctx,
shardManager: shardManager,
shards: shards,
readScheduler: readScheduler,
garbageCollectionInterval: garbageCollectionInterval,
}

go c.runGarbageCollection()

return c, nil
}

func (c *cache) BatchSet(cs []*proto.NamedChangeSet) {

// First, sort entries by shard index.
// This allows us to set all values in a single shard with only a single lock acquisition.
shardMap := make(map[uint64][]*iavl.KVPair)
for _, ncs := range cs {
for _, entry := range ncs.Changeset.Pairs {
shardMap[c.shardManager.Shard(entry.Key)] = append(shardMap[c.shardManager.Shard(entry.Key)], entry)
}
}

// This is probably quite fast, but if it isn't it can be parallelized.
for shardIndex, shardEntries := range shardMap {
shard := c.shards[shardIndex]
shard.BatchSet(shardEntries)
}
Comment thread Fixed
}

func (c *cache) Delete(key []byte) {
shardIndex := c.shardManager.Shard(key)
shard := c.shards[shardIndex]
shard.Delete(key)
}

func (c *cache) Get(key []byte) ([]byte, bool, error) {
shardIndex := c.shardManager.Shard(key)
shard := c.shards[shardIndex]

value, ok, err := shard.Get(key)
if err != nil {
return nil, false, fmt.Errorf("failed to get value from shard: %w", err)
}
if !ok {
return nil, false, nil
}
return value, ok, nil
}

func (c *cache) Set(key []byte, value []byte) {
shardIndex := c.shardManager.Shard(key)
shard := c.shards[shardIndex]
shard.Set(key, value)
}

// TODO add GC metrics

// Periodically runs garbage collection in the background.
func (c *cache) runGarbageCollection() {

// Spread out work evenly across all shards, so that we visit each shard roughly once per interval.
gcSubInterval := c.garbageCollectionInterval / time.Duration(len(c.shards))
if gcSubInterval == 0 {
// technically possible if the number of shards is very large and the interval is very small
gcSubInterval = 1
}
ticker := time.NewTicker(gcSubInterval)
defer ticker.Stop()

nextShardIndex := 0

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
shardIndex := nextShardIndex
nextShardIndex = (nextShardIndex + 1) % len(c.shards)
c.shards[shardIndex].RunGarbageCollection()
}
}
}

// TODO create a warming mechanism
83 changes: 83 additions & 0 deletions sei-db/state_db/sc/flatkv/flatcache/lru_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package flatcache

import "container/list"

// Implements a queue-like abstraction with LRU semantics. Not thread safe.
type lruQueue struct {
order *list.List
entries map[string]*list.Element
totalSize int
}

type lruQueueEntry struct {
key []byte
size int
}

// Create a new LRU queue.
func NewLRUQueue() *lruQueue {
return &lruQueue{
order: list.New(),
entries: make(map[string]*list.Element),
}
}

// Add a new entry to the LRU queue. Can also be used to update an existing value with a new weight.
func (lru *lruQueue) Push(
// the key in the cache that was recently interacted with
key []byte,
// the size of the key + value
size int,
) {
keyString := string(key) // TODO revisit and maybe do unsafe copies
if elem, ok := lru.entries[keyString]; ok {
entry := elem.Value.(*lruQueueEntry)
lru.totalSize += size - entry.size
entry.size = size
lru.order.MoveToBack(elem)
return
}

keyCopy := append([]byte(nil), key...) // TODO don't do this
elem := lru.order.PushBack(&lruQueueEntry{
key: keyCopy,
size: size,
})
lru.entries[keyString] = elem
lru.totalSize += size
}

// Signal that an entry has been interated with, moving it to the to the back of the queue
// (i.e. making it so it doesn't get popped soon).
func (lru *lruQueue) Touch(key []byte) {
elem, ok := lru.entries[string(key)]
if !ok {
return
}
lru.order.MoveToBack(elem)
}

// Returns the total size of all entries in the LRU queue.
func (lru *lruQueue) GetTotalSize() int {
return lru.totalSize
}

// Returns a count of the number of entries in the LRU queue, where each entry counts for 1 regardless of size.
func (lru *lruQueue) GetCount() int {
return len(lru.entries)
}

// Pops a single element out of the queue. The element removed is the entry least recently passed to Update().
// Panics if the queue is empty.
func (lru *lruQueue) PopLeastRecentlyUsed() []byte {
elem := lru.order.Front()
if elem == nil {
panic("cannot pop from empty LRU queue")
}

lru.order.Remove(elem)
entry := elem.Value.(*lruQueueEntry)
delete(lru.entries, string(entry.key))
lru.totalSize -= entry.size
return entry.key
}
82 changes: 82 additions & 0 deletions sei-db/state_db/sc/flatkv/flatcache/lru_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package flatcache

import (
"bytes"
"testing"
)

func TestLRUQueueTracksSizeCountAndOrder(t *testing.T) {
lru := NewLRUQueue()

lru.Push([]byte("a"), 3)
lru.Push([]byte("b"), 5)
lru.Push([]byte("c"), 7)

if got := lru.GetCount(); got != 3 {
t.Fatalf("GetCount() = %d, want 3", got)
}

if got := lru.GetTotalSize(); got != 15 {
t.Fatalf("GetTotalSize() = %d, want 15", got)
}

lru.Touch([]byte("a"))

if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("b")) {
t.Fatalf("first pop = %q, want %q", got, []byte("b"))
}

if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("c")) {
t.Fatalf("second pop = %q, want %q", got, []byte("c"))
}

if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("a")) {
t.Fatalf("third pop = %q, want %q", got, []byte("a"))
}

if got := lru.GetCount(); got != 0 {
t.Fatalf("GetCount() after pops = %d, want 0", got)
}

if got := lru.GetTotalSize(); got != 0 {
t.Fatalf("GetTotalSize() after pops = %d, want 0", got)
}
}

func TestLRUQueuePushUpdatesExistingEntry(t *testing.T) {
lru := NewLRUQueue()

lru.Push([]byte("a"), 3)
lru.Push([]byte("b"), 5)
lru.Push([]byte("a"), 11)

if got := lru.GetCount(); got != 2 {
t.Fatalf("GetCount() = %d, want 2", got)
}

if got := lru.GetTotalSize(); got != 16 {
t.Fatalf("GetTotalSize() = %d, want 16", got)
}

if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("b")) {
t.Fatalf("first pop = %q, want %q", got, []byte("b"))
}

if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("a")) {
t.Fatalf("second pop = %q, want %q", got, []byte("a"))
}
}

func TestLRUQueueCopiesInsertedKey(t *testing.T) {
lru := NewLRUQueue()

key := []byte("a")
lru.Push(key, 1)
key[0] = 'z'

if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("a")) {
t.Fatalf("pop after mutating caller key = %q, want %q", got, []byte("a"))
}
}

// TODO expand these tests
Loading
Loading