-
Notifications
You must be signed in to change notification settings - Fork 881
flatkv cache #3027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
flatkv cache #3027
Changes from 7 commits
Commits
Show all changes
102 commits
Select commit
Hold shift + click to select a range
7c5e216
Created a cache for flatKV.
4a404ee
checkpoint
d36e825
incremental progress
2ccbe62
address feedback
f412e85
more fixes
e310037
bugfix
cf1071c
wire in cache
11232ff
Merge branch 'main' into cjl/flatkv-cache
a8c1c75
incremental improvements
221d114
checkin
8eca079
Moved where the cache sits
267feae
bugfix
50b0be6
Batch update the cache
2ca00d6
Add batch read to cache
8f8534a
Add batch get to db interface
23c0277
integrate batch reads
02d3ca1
wire in cache
7ee1b08
Introduce work pool, size caches differently
20c70c3
bugfix
b714789
Add unit constants
cc9d41d
refactor threading utils
53b2bd8
cleanup
c10e0cd
Cleanup, fix race condition
04f40fa
cleanup
b4e4d2c
cleanup
e53fefa
use pool
438fc8d
fix ctx lifecycle
19a8a19
Merge branch 'main' into cjl/flatkv-cache
23440f6
rename package
4ecc8fd
Clean up string copies
7a315c6
simplify gc
a3f3907
better error handling
f255b87
use config to configure cache params
cf0a73d
Allow flatkv config to be set in tests
0b34737
tweak config
452aa4d
incremental progress
1c804a8
move data dir into config
663b2ea
fix config file
bb530b5
cleanup
04daf75
move pebble metrics to proper location
354818e
clean up metrics
b1574ac
updated dashboard
07e071c
fix histograms
d090796
threading tests
dfd92c1
test lru queue
f751a9b
unit tests for shard
7b5538e
cache tests
dc8d0c9
moar unit tests
e9cc9ca
cleanup
c7a418c
Merge branch 'main' into cjl/flatkv-cache
087fd0f
Merge branch 'main' into cjl/flatkv-cache
eb9bc51
Merge branch 'main' into cjl/flatkv-cache
cea0ebb
unit test fixes
e58bec2
fix hash bug
c3f34b1
fixed path bug
111459f
Helper files for flatKV cache
d40395f
add missing struct
c8e85d2
Merge branch 'main' into cjl/cache-auxilery
ed7e4b6
made suggested changes
5c46647
fix tests
be0d4f5
Merge branch 'main' into cjl/flatkv-cache
9ff2199
Merge branch 'cjl/cache-auxilery' into cjl/flatkv-cache
bb2fe7e
Maded suggested change to cache structure
f4b8326
rename cache -> dbcache to avoid gitignore
4b2247b
Helper files for the flatKV cache implementation
36d7328
bugfix
d759a9b
Merge branch 'cjl/cache-auxilery-2' into cjl/flatkv-cache
4ba242b
fix merge problems
e19a998
refactor API
94ae673
made suggested changes
ed10a26
made suggested changes
81dfd46
fix bug
480839d
Merge branch 'main' into cjl/flatkv-cache
7835683
Implement a standard cache.
950197c
cleanup
cff96ab
Merge branch 'main' into cjl/cache-impl
003fcc9
made suggested changes
a208a1b
made suggested change
157a600
made suggested changes
b41639f
fix unit test
fe31475
fix unit test
0702197
Merge branch 'cjl/cache-impl' into cjl/flatkv-cache
6d435f5
Merge branch 'main' into cjl/flatkv-cache
64f8530
fixed merge bugs
d9c5fc1
fix teardown race
14593ec
Add logging metric, clean up log files before/after run
2d88076
fix unit test
ccad074
fix unit tests
38ffd35
fix unit test
f143d30
made suggested changes
a18fd93
config changes
34e711d
made suggested changes
b596f89
Merge branch 'main' into cjl/flatkv-cache
33378ce
bugfix
a402238
don't ignore errors from batch get
b18bc4e
Merge branch 'main' into cjl/flatkv-cache
ee30ca9
made suggested changes
c8fd5ec
Merge branch 'main' into cjl/flatkv-cache
8094375
Merge branch 'main' into cjl/flatkv-cache
faf4871
make suggested change to pool
9c8454f
Merge branch 'main' into cjl/flatkv-cache
872b0a6
fix merge problem
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package utils | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| ) | ||
|
|
||
| // TODO unit test before merge | ||
|
|
||
| // Push to a channel, returning an error if the context is cancelled before the value is pushed. | ||
| func InterruptiblePush[T any](ctx context.Context, ch chan T, value T) error { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return fmt.Errorf("context cancelled: %w", ctx.Err()) | ||
| case ch <- value: | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // Pull from a channel, returning an error if the context is cancelled before the value is pulled. | ||
| func InterruptiblePull[T any](ctx context.Context, ch <-chan T) (T, error) { | ||
| var zero T | ||
| select { | ||
| case <-ctx.Done(): | ||
| return zero, fmt.Errorf("context cancelled: %w", ctx.Err()) | ||
| case value, ok := <-ch: | ||
| if !ok { | ||
| return zero, fmt.Errorf("channel closed") | ||
| } | ||
| return value, nil | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package flatcache | ||
|
|
||
| import "github.com/sei-protocol/sei-chain/sei-db/proto" | ||
|
|
||
| // Cache describes a cache kapable of being used by a FlatKV store. | ||
| type Cache interface { | ||
|
|
||
| // TODO decide if we should support individual modifications | ||
|
|
||
| // Get returns the value for the given key, or (nil, false) if not found. | ||
| Get(key []byte) ([]byte, bool, error) | ||
|
|
||
| // Set sets the value for the given key. | ||
| Set(key []byte, value []byte) | ||
|
|
||
| // Delete deletes the value for the given key. | ||
| Delete(key []byte) | ||
|
|
||
| // BatchSet applies the given changesets to the cache. | ||
| BatchSet(cs []*proto.NamedChangeSet) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| package flatcache | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/sei-protocol/sei-chain/sei-db/proto" | ||
| iavl "github.com/sei-protocol/sei-chain/sei-iavl/proto" | ||
| ) | ||
|
|
||
| var _ Cache = (*cache)(nil) | ||
|
|
||
| // A standard implementation of a flatcache. | ||
| type cache struct { | ||
| ctx context.Context | ||
|
|
||
| // A utility for assigning keys to shard indices. | ||
| shardManager *shardManager | ||
|
|
||
| // The shards in the cache. | ||
| shards []*shard | ||
|
|
||
| // A scheduler for asyncronous reads. | ||
|
Check failure on line 24 in sei-db/state_db/sc/flatkv/flatcache/cache_impl.go
|
||
| readScheduler *readScheduler | ||
|
|
||
| // The interval at which to run garbage collection. | ||
| garbageCollectionInterval time.Duration | ||
| } | ||
|
|
||
| // Creates a new Cache. | ||
| func NewCache( | ||
| ctx context.Context, | ||
| // A function that reads a value from the database. | ||
| readFunc func(key []byte) []byte, | ||
| // The number of shards in the cache. Must be a power of two and greater than 0. | ||
| shardCount int, | ||
| // The maximum size of the cache, in bytes. | ||
| maxSize int, | ||
| // The number of background goroutines to read values from the database. | ||
| readWorkerCount int, | ||
| // The max size of the read queue. | ||
| readQueueSize int, | ||
| // The interval at which to run garbage collection. | ||
| garbageCollectionInterval time.Duration, | ||
| ) (Cache, error) { | ||
| if shardCount <= 0 || (shardCount&(shardCount-1)) != 0 { | ||
| return nil, ErrNumShardsNotPowerOfTwo | ||
| } | ||
| if maxSize <= 0 { | ||
| return nil, fmt.Errorf("maxSize must be greater than 0") | ||
| } | ||
| if readWorkerCount <= 0 { | ||
| return nil, fmt.Errorf("readWorkerCount must be greater than 0") | ||
| } | ||
| if readQueueSize <= 0 { | ||
| return nil, fmt.Errorf("readQueueSize must be greater than 0") | ||
| } | ||
|
|
||
| shardManager, err := NewShardManager(uint64(shardCount)) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create shard manager: %w", err) | ||
| } | ||
| if garbageCollectionInterval <= 0 { | ||
| return nil, fmt.Errorf("garbageCollectionInterval must be greater than 0") | ||
| } | ||
|
|
||
| readScheduler := NewReadScheduler(ctx, readFunc, readWorkerCount, readQueueSize) | ||
|
|
||
| sizePerShard := maxSize / shardCount | ||
| if sizePerShard <= 0 { | ||
| return nil, fmt.Errorf("maxSize must be greater than shardCount") | ||
| } | ||
|
|
||
| shards := make([]*shard, shardCount) | ||
| for i := 0; i < shardCount; i++ { | ||
| shards[i], err = NewShard(ctx, readScheduler, sizePerShard) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create shard: %w", err) | ||
| } | ||
| } | ||
|
|
||
| c := &cache{ | ||
| ctx: ctx, | ||
| shardManager: shardManager, | ||
| shards: shards, | ||
| readScheduler: readScheduler, | ||
| garbageCollectionInterval: garbageCollectionInterval, | ||
| } | ||
|
|
||
| go c.runGarbageCollection() | ||
|
|
||
| return c, nil | ||
| } | ||
|
|
||
| func (c *cache) BatchSet(cs []*proto.NamedChangeSet) { | ||
|
|
||
| // First, sort entries by shard index. | ||
| // This allows us to set all values in a single shard with only a single lock acquisition. | ||
| shardMap := make(map[uint64][]*iavl.KVPair) | ||
| for _, ncs := range cs { | ||
| for _, entry := range ncs.Changeset.Pairs { | ||
| shardMap[c.shardManager.Shard(entry.Key)] = append(shardMap[c.shardManager.Shard(entry.Key)], entry) | ||
| } | ||
| } | ||
|
|
||
| // This is probably quite fast, but if it isn't it can be parallelized. | ||
| for shardIndex, shardEntries := range shardMap { | ||
| shard := c.shards[shardIndex] | ||
| shard.BatchSet(shardEntries) | ||
| } | ||
| } | ||
|
|
||
| func (c *cache) Delete(key []byte) { | ||
| shardIndex := c.shardManager.Shard(key) | ||
| shard := c.shards[shardIndex] | ||
| shard.Delete(key) | ||
| } | ||
|
|
||
| func (c *cache) Get(key []byte) ([]byte, bool, error) { | ||
| shardIndex := c.shardManager.Shard(key) | ||
| shard := c.shards[shardIndex] | ||
|
|
||
| value, ok, err := shard.Get(key) | ||
| if err != nil { | ||
| return nil, false, fmt.Errorf("failed to get value from shard: %w", err) | ||
| } | ||
| if !ok { | ||
| return nil, false, nil | ||
| } | ||
| return value, ok, nil | ||
| } | ||
|
|
||
| func (c *cache) Set(key []byte, value []byte) { | ||
| shardIndex := c.shardManager.Shard(key) | ||
| shard := c.shards[shardIndex] | ||
| shard.Set(key, value) | ||
| } | ||
|
|
||
| // TODO add GC metrics | ||
|
|
||
| // Periodically runs garbage collection in the background. | ||
| func (c *cache) runGarbageCollection() { | ||
|
|
||
| // Spread out work evenly across all shards, so that we visit each shard roughly once per interval. | ||
| gcSubInterval := c.garbageCollectionInterval / time.Duration(len(c.shards)) | ||
| if gcSubInterval == 0 { | ||
| // technically possible if the number of shards is very large and the interval is very small | ||
| gcSubInterval = 1 | ||
| } | ||
| ticker := time.NewTicker(gcSubInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| nextShardIndex := 0 | ||
|
|
||
| for { | ||
| select { | ||
| case <-c.ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| shardIndex := nextShardIndex | ||
| nextShardIndex = (nextShardIndex + 1) % len(c.shards) | ||
| c.shards[shardIndex].RunGarbageCollection() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // TODO create a warming mechanism | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package flatcache | ||
|
|
||
| import "container/list" | ||
|
|
||
| // Implements a queue-like abstraction with LRU semantics. Not thread safe. | ||
| type lruQueue struct { | ||
| order *list.List | ||
| entries map[string]*list.Element | ||
| totalSize int | ||
| } | ||
|
|
||
| type lruQueueEntry struct { | ||
| key []byte | ||
| size int | ||
| } | ||
|
|
||
| // Create a new LRU queue. | ||
| func NewLRUQueue() *lruQueue { | ||
| return &lruQueue{ | ||
| order: list.New(), | ||
| entries: make(map[string]*list.Element), | ||
| } | ||
| } | ||
|
|
||
| // Add a new entry to the LRU queue. Can also be used to update an existing value with a new weight. | ||
| func (lru *lruQueue) Push( | ||
| // the key in the cache that was recently interacted with | ||
| key []byte, | ||
| // the size of the key + value | ||
| size int, | ||
| ) { | ||
| keyString := string(key) // TODO revisit and maybe do unsafe copies | ||
| if elem, ok := lru.entries[keyString]; ok { | ||
| entry := elem.Value.(*lruQueueEntry) | ||
| lru.totalSize += size - entry.size | ||
| entry.size = size | ||
| lru.order.MoveToBack(elem) | ||
| return | ||
| } | ||
|
|
||
| keyCopy := append([]byte(nil), key...) // TODO don't do this | ||
| elem := lru.order.PushBack(&lruQueueEntry{ | ||
| key: keyCopy, | ||
| size: size, | ||
| }) | ||
| lru.entries[keyString] = elem | ||
| lru.totalSize += size | ||
| } | ||
|
|
||
| // Signal that an entry has been interated with, moving it to the to the back of the queue | ||
| // (i.e. making it so it doesn't get popped soon). | ||
| func (lru *lruQueue) Touch(key []byte) { | ||
| elem, ok := lru.entries[string(key)] | ||
| if !ok { | ||
| return | ||
| } | ||
| lru.order.MoveToBack(elem) | ||
| } | ||
|
|
||
| // Returns the total size of all entries in the LRU queue. | ||
| func (lru *lruQueue) GetTotalSize() int { | ||
| return lru.totalSize | ||
| } | ||
|
|
||
| // Returns a count of the number of entries in the LRU queue, where each entry counts for 1 regardless of size. | ||
| func (lru *lruQueue) GetCount() int { | ||
| return len(lru.entries) | ||
| } | ||
|
|
||
| // Pops a single element out of the queue. The element removed is the entry least recently passed to Update(). | ||
| // Panics if the queue is empty. | ||
| func (lru *lruQueue) PopLeastRecentlyUsed() []byte { | ||
| elem := lru.order.Front() | ||
| if elem == nil { | ||
| panic("cannot pop from empty LRU queue") | ||
| } | ||
|
|
||
| lru.order.Remove(elem) | ||
| entry := elem.Value.(*lruQueueEntry) | ||
| delete(lru.entries, string(entry.key)) | ||
| lru.totalSize -= entry.size | ||
| return entry.key | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package flatcache | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "testing" | ||
| ) | ||
|
|
||
| func TestLRUQueueTracksSizeCountAndOrder(t *testing.T) { | ||
| lru := NewLRUQueue() | ||
|
|
||
| lru.Push([]byte("a"), 3) | ||
| lru.Push([]byte("b"), 5) | ||
| lru.Push([]byte("c"), 7) | ||
|
|
||
| if got := lru.GetCount(); got != 3 { | ||
| t.Fatalf("GetCount() = %d, want 3", got) | ||
| } | ||
|
|
||
| if got := lru.GetTotalSize(); got != 15 { | ||
| t.Fatalf("GetTotalSize() = %d, want 15", got) | ||
| } | ||
|
|
||
| lru.Touch([]byte("a")) | ||
|
|
||
| if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("b")) { | ||
| t.Fatalf("first pop = %q, want %q", got, []byte("b")) | ||
| } | ||
|
|
||
| if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("c")) { | ||
| t.Fatalf("second pop = %q, want %q", got, []byte("c")) | ||
| } | ||
|
|
||
| if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("a")) { | ||
| t.Fatalf("third pop = %q, want %q", got, []byte("a")) | ||
| } | ||
|
|
||
| if got := lru.GetCount(); got != 0 { | ||
| t.Fatalf("GetCount() after pops = %d, want 0", got) | ||
| } | ||
|
|
||
| if got := lru.GetTotalSize(); got != 0 { | ||
| t.Fatalf("GetTotalSize() after pops = %d, want 0", got) | ||
| } | ||
| } | ||
|
|
||
| func TestLRUQueuePushUpdatesExistingEntry(t *testing.T) { | ||
| lru := NewLRUQueue() | ||
|
|
||
| lru.Push([]byte("a"), 3) | ||
| lru.Push([]byte("b"), 5) | ||
| lru.Push([]byte("a"), 11) | ||
|
|
||
| if got := lru.GetCount(); got != 2 { | ||
| t.Fatalf("GetCount() = %d, want 2", got) | ||
| } | ||
|
|
||
| if got := lru.GetTotalSize(); got != 16 { | ||
| t.Fatalf("GetTotalSize() = %d, want 16", got) | ||
| } | ||
|
|
||
| if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("b")) { | ||
| t.Fatalf("first pop = %q, want %q", got, []byte("b")) | ||
| } | ||
|
|
||
| if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("a")) { | ||
| t.Fatalf("second pop = %q, want %q", got, []byte("a")) | ||
| } | ||
| } | ||
|
|
||
| func TestLRUQueueCopiesInsertedKey(t *testing.T) { | ||
| lru := NewLRUQueue() | ||
|
|
||
| key := []byte("a") | ||
| lru.Push(key, 1) | ||
| key[0] = 'z' | ||
|
|
||
| if got := lru.PopLeastRecentlyUsed(); !bytes.Equal(got, []byte("a")) { | ||
| t.Fatalf("pop after mutating caller key = %q, want %q", got, []byte("a")) | ||
| } | ||
| } | ||
|
|
||
| // TODO expand these tests |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.