Skip to content
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
7c5e216
Created a cache for flatKV.
Mar 5, 2026
4a404ee
checkpoint
Mar 5, 2026
d36e825
incremental progress
Mar 5, 2026
2ccbe62
address feedback
Mar 5, 2026
f412e85
more fixes
Mar 5, 2026
e310037
bugfix
Mar 5, 2026
cf1071c
wire in cache
Mar 5, 2026
11232ff
Merge branch 'main' into cjl/flatkv-cache
Mar 6, 2026
a8c1c75
incremental improvements
Mar 6, 2026
221d114
checkin
Mar 6, 2026
8eca079
Moved where the cache sits
Mar 6, 2026
267feae
bugfix
Mar 6, 2026
50b0be6
Batch update the cache
Mar 6, 2026
2ca00d6
Add batch read to cache
Mar 6, 2026
8f8534a
Add batch get to db interface
Mar 6, 2026
23c0277
integrate batch reads
Mar 6, 2026
02d3ca1
wire in cache
Mar 6, 2026
7ee1b08
Introduce work pool, size caches differently
Mar 6, 2026
20c70c3
bugfix
Mar 6, 2026
b714789
Add unit constants
Mar 9, 2026
cc9d41d
refactor threading utils
Mar 9, 2026
53b2bd8
cleanup
Mar 9, 2026
c10e0cd
Cleanup, fix race condition
Mar 9, 2026
04f40fa
cleanup
Mar 9, 2026
b4e4d2c
cleanup
Mar 9, 2026
e53fefa
use pool
Mar 9, 2026
438fc8d
fix ctx lifecycle
Mar 9, 2026
19a8a19
Merge branch 'main' into cjl/flatkv-cache
Mar 9, 2026
23440f6
rename package
Mar 9, 2026
4ecc8fd
Clean up string copies
Mar 9, 2026
7a315c6
simplify gc
Mar 9, 2026
a3f3907
better error handling
Mar 9, 2026
f255b87
use config to configure cache params
Mar 9, 2026
cf0a73d
Allow flatkv config to be set in tests
Mar 9, 2026
0b34737
tweak config
Mar 10, 2026
452aa4d
incremental progress
Mar 10, 2026
1c804a8
move data dir into config
Mar 10, 2026
663b2ea
fix config file
Mar 10, 2026
bb530b5
cleanup
Mar 10, 2026
04daf75
move pebble metrics to proper location
Mar 10, 2026
354818e
clean up metrics
Mar 10, 2026
b1574ac
updated dashboard
Mar 10, 2026
07e071c
fix histograms
Mar 10, 2026
d090796
threading tests
Mar 10, 2026
dfd92c1
test lru queue
Mar 10, 2026
f751a9b
unit tests for shard
Mar 10, 2026
7b5538e
cache tests
Mar 10, 2026
dc8d0c9
moar unit tests
Mar 10, 2026
e9cc9ca
cleanup
Mar 10, 2026
c7a418c
Merge branch 'main' into cjl/flatkv-cache
Mar 10, 2026
087fd0f
Merge branch 'main' into cjl/flatkv-cache
Mar 10, 2026
eb9bc51
Merge branch 'main' into cjl/flatkv-cache
Mar 11, 2026
cea0ebb
unit test fixes
Mar 11, 2026
e58bec2
fix hash bug
Mar 11, 2026
c3f34b1
fixed path bug
Mar 11, 2026
111459f
Helper files for flatKV cache
Mar 11, 2026
d40395f
add missing struct
Mar 11, 2026
c8e85d2
Merge branch 'main' into cjl/cache-auxilery
Mar 12, 2026
ed7e4b6
made suggested changes
Mar 12, 2026
5c46647
fix tests
Mar 12, 2026
be0d4f5
Merge branch 'main' into cjl/flatkv-cache
Mar 12, 2026
9ff2199
Merge branch 'cjl/cache-auxilery' into cjl/flatkv-cache
Mar 12, 2026
bb2fe7e
Maded suggested change to cache structure
Mar 13, 2026
f4b8326
rename cache -> dbcache to avoid gitignore
Mar 13, 2026
4b2247b
Helper files for the flatKV cache implementation
Mar 13, 2026
36d7328
bugfix
Mar 13, 2026
d759a9b
Merge branch 'cjl/cache-auxilery-2' into cjl/flatkv-cache
Mar 16, 2026
4ba242b
fix merge problems
Mar 16, 2026
e19a998
refactor API
Mar 16, 2026
94ae673
made suggested changes
Mar 16, 2026
ed10a26
made suggested changes
Mar 16, 2026
81dfd46
fix bug
Mar 16, 2026
480839d
Merge branch 'main' into cjl/flatkv-cache
Mar 16, 2026
7835683
Implement a standard cache.
Mar 16, 2026
950197c
cleanup
Mar 16, 2026
cff96ab
Merge branch 'main' into cjl/cache-impl
Mar 17, 2026
003fcc9
made suggested changes
Mar 17, 2026
a208a1b
made suggested change
Mar 17, 2026
157a600
made suggested changes
Mar 17, 2026
b41639f
fix unit test
Mar 17, 2026
fe31475
fix unit test
Mar 17, 2026
0702197
Merge branch 'cjl/cache-impl' into cjl/flatkv-cache
Mar 17, 2026
6d435f5
Merge branch 'main' into cjl/flatkv-cache
Mar 20, 2026
64f8530
fixed merge bugs
Mar 20, 2026
d9c5fc1
fix teardown race
Mar 20, 2026
14593ec
Add logging metric, clean up log files before/after run
Mar 20, 2026
2d88076
fix unit test
Mar 20, 2026
ccad074
fix unit tests
Mar 20, 2026
38ffd35
fix unit test
Mar 20, 2026
f143d30
made suggested changes
Mar 25, 2026
a18fd93
config changes
Mar 25, 2026
34e711d
made suggested changes
Mar 25, 2026
b596f89
Merge branch 'main' into cjl/flatkv-cache
Mar 25, 2026
33378ce
bugfix
Mar 27, 2026
a402238
don't ignore errors from batch get
Mar 30, 2026
b18bc4e
Merge branch 'main' into cjl/flatkv-cache
Mar 30, 2026
ee30ca9
made suggested changes
Mar 31, 2026
c8fd5ec
Merge branch 'main' into cjl/flatkv-cache
Mar 31, 2026
8094375
Merge branch 'main' into cjl/flatkv-cache
Mar 31, 2026
faf4871
make suggested change to pool
Apr 1, 2026
9c8454f
Merge branch 'main' into cjl/flatkv-cache
Apr 1, 2026
872b0a6
fix merge problem
Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,114 changes: 945 additions & 169 deletions docker/monitornode/dashboards/cryptosim-dashboard.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion sei-cosmos/storev2/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ func NewStore(
limiter = rate.NewLimiter(rate.Limit(scConfig.HistoricalProofRateLimit), burst)
}
ctx := context.Background()
scStore := composite.NewCompositeCommitStore(ctx, scDir, logger, scConfig)
scStore, err := composite.NewCompositeCommitStore(ctx, scDir, logger, scConfig)
if err != nil {
panic(err)
}
store := &Store{
logger: logger,
scStore: scStore,
Expand Down
23 changes: 23 additions & 0 deletions sei-db/common/metrics/buckets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package metrics

// Shared histogram bucket boundaries for use across the codebase.
// The OTel defaults are too coarse for meaningful percentile queries in Grafana.

// LatencyBuckets covers 10μs to 5 minutes — wide enough for both fast key
// lookups and slow compactions/flushes without needing per-metric tuning.
var LatencyBuckets = []float64{
0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, // 10μs–500μs
0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, // 1ms–50ms
0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, // 100ms–5min
}

// ByteSizeBuckets covers 256B to 1GB for data size histograms.
var ByteSizeBuckets = []float64{
256, 1024, 4096, 16384, 65536, 262144, // 256B–256KB
1 << 20, 4 << 20, 16 << 20, 64 << 20, 256 << 20, 1 << 30, // 1MB–1GB
}

// CountBuckets covers 1 to 1M for per-operation step/iteration counts.
var CountBuckets = []float64{
1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000,
}
1 change: 1 addition & 0 deletions sei-db/common/metrics/phase_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewPhaseTimerFactory(meter metric.Meter, timerName string) *PhaseTimerFacto
timerName+"_phase_latency_seconds",
metric.WithDescription("Latency per phase (seconds); use for p99, p95, etc."),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(LatencyBuckets...),
)
return &PhaseTimerFactory{
phaseDurationTotal: phaseDurationTotal,
Expand Down
19 changes: 19 additions & 0 deletions sei-db/common/threading/adhoc_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package threading

import "context"

var _ Pool = (*adHocPool)(nil)

// adHocPool is a Pool that runs each task in a new goroutine.
// Intended for use in unit tests or where performance is not important.
type adHocPool struct{}

// NewAdHocPool creates a Pool that runs each submitted task in a one-off goroutine.
func NewAdHocPool() Pool {
return &adHocPool{}
}

func (p *adHocPool) Submit(_ context.Context, task func()) error {
go task()
Comment thread Dismissed
return nil
}
32 changes: 32 additions & 0 deletions sei-db/common/threading/chan_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package threading

import (
"context"
"fmt"
)

// TODO unit test before merge

// Push to a channel, returning an error if the context is cancelled before the value is pushed.
func InterruptiblePush[T any](ctx context.Context, ch chan T, value T) error {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case ch <- value:
return nil
}
}

// Pull from a channel, returning an error if the context is cancelled before the value is pulled.
func InterruptiblePull[T any](ctx context.Context, ch <-chan T) (T, error) {
var zero T
select {
case <-ctx.Done():
return zero, fmt.Errorf("context cancelled: %w", ctx.Err())
case value, ok := <-ch:
if !ok {
return zero, fmt.Errorf("channel closed")
}
return value, nil
}
}
74 changes: 74 additions & 0 deletions sei-db/common/threading/elastic_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package threading

import (
"context"
"fmt"
)

var _ Pool = (*elasticPool)(nil)

// elasticPool is a pool that guarantees every submitted task begins executing
// immediately without waiting for other tasks to finish first. It maintains a
// set of warm workers for goroutine reuse, and spawns temporary goroutines when
// all warm workers are busy.
//
// This is useful when tasks submitted to the pool may depend on other tasks in
// the same pool. For example, if task A is submitted and then submits task B,
// and A waits for B to complete, a fixed-size pool may deadlock when all
// workers are occupied, since task B can never be scheduled. An
// elastic pool avoids this by ensuring B starts immediately in a temporary
// goroutine if all workers are busy.
type elasticPool struct {
workQueue chan func()
}

// NewElasticPool creates a pool with the given number of warm workers. Submitted
// tasks are handed off to an idle warm worker if one is available, otherwise a
// temporary goroutine is spawned. Tasks are never queued behind other tasks.
func NewElasticPool(
ctx context.Context,
name string,
warmWorkers int,
) Pool {
workQueue := make(chan func())
ep := &elasticPool{
workQueue: workQueue,
}

for i := 0; i < warmWorkers; i++ {
go ep.worker()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

go func() {
<-ctx.Done()
close(workQueue)
}()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return ep
}

func (ep *elasticPool) Submit(ctx context.Context, task func()) (err error) {
defer func() {
if recover() != nil {
err = fmt.Errorf("elastic pool is shut down")
}
}()

select {
case <-ctx.Done():
return ctx.Err()
case ep.workQueue <- task:
return nil
default:
// We hit this case when all workers are busy. Under standard operation, this should
// be fairly rare, but it's not catastrophic if it happens.
go task()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
return nil
}
}

func (ep *elasticPool) worker() {
for task := range ep.workQueue {
task()
}
}
72 changes: 72 additions & 0 deletions sei-db/common/threading/fixed_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package threading

import (
"context"
"fmt"
)

var _ Pool = (*fixedPool)(nil)

// fixedPool is a pool of workers that can be used to execute tasks concurrently.
// More efficient than spawning large numbers of short lived goroutines.
type fixedPool struct {
workQueue chan func()
}

// TODO add metrics!
// TODO unit test before merging!

// Create a new work pool.
func NewFixedPool(
// The work pool shuts down when the context is done.
ctx context.Context,
// The name of the work pool. Used for metrics.
name string,
// The number of workers to create.
workers int,
// The size of the work queue. Once full, Submit will block until a slot is available.
queueSize int,
) Pool {

workQueue := make(chan func(), queueSize)
fp := &fixedPool{
workQueue: workQueue,
}

for i := 0; i < workers; i++ {
go fp.worker()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

// Shutdown the work pool when the context is done.
go func() {
<-ctx.Done()
close(workQueue)

// Handle any remaining tasks in the queue to avoid caller deadlock.
for task := range workQueue {
task()
}
}()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return fp
}

func (fp *fixedPool) Submit(ctx context.Context, task func()) (err error) {
defer func() {
if recover() != nil {
err = fmt.Errorf("fixed pool is shut down")
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case fp.workQueue <- task:
return nil
}
}

func (fp *fixedPool) worker() {
for task := range fp.workQueue {
task()
}
}
9 changes: 9 additions & 0 deletions sei-db/common/threading/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package threading

import "context"

// Pool is a pool of workers that can be used to execute tasks concurrently.
type Pool interface {
// Submit submits a task to the pool.
Submit(ctx context.Context, task func()) error
}
20 changes: 20 additions & 0 deletions sei-db/common/unit/data_units.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package unit

const (
// KB is the number of bytes in a kilobyte.
KB = 1024
// MB is the number of bytes in a megabyte.
MB = KB * 1024
// GB is the number of bytes in a gigabyte.
GB = MB * 1024
// TB is the number of bytes in a terabyte.
TB = GB * 1024
// PB is the number of bytes in a petabyte.
PB = TB * 1024
// EB is the number of bytes in an exabyte.
EB = PB * 1024
// ZB is the number of bytes in a zettabyte.
ZB = EB * 1024
// YB is the number of bytes in a yottabyte.
YB = ZB * 1024
)
2 changes: 1 addition & 1 deletion sei-db/config/sc_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type StateCommitConfig struct {
MemIAVLConfig memiavl.Config

// FlatKVConfig is the configuration for the FlatKV (EVM) backend
FlatKVConfig flatkv.Config
FlatKVConfig *flatkv.Config
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make this a pointer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back to a direct reference.

No need to make this a pointer, except that I prefer pointer types for any struct that has more than one or two fields. It's unlikely that this is going to be a major performance factor in this case, so I don't mind using a direct reference if that's your preferred pattern.


// Max concurrent historical proof queries (RPC /store path).
HistoricalProofMaxInFlight int `mapstructure:"historical-proof-max-inflight"`
Expand Down
37 changes: 30 additions & 7 deletions sei-db/db_engine/pebbledb/batch.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,61 @@
package pebbledb

import (
"fmt"

"github.com/cockroachdb/pebble/v2"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/pebbledb/pebblecache"
"github.com/sei-protocol/sei-chain/sei-db/db_engine/types"
)

// pebbleBatch wraps a Pebble batch for atomic writes.
// Important: Callers must call Close() after Commit() to release batch resources,
// even if Commit() succeeds. Failure to Close() will leak memory.
type pebbleBatch struct {
b *pebble.Batch
b *pebble.Batch
cache pebblecache.Cache

// Writes are tracked so the cache can be updated after a successful commit.
pendingCacheUpdates []pebblecache.CacheUpdate
}

var _ types.Batch = (*pebbleBatch)(nil)

func newPebbleBatch(db *pebble.DB) *pebbleBatch {
return &pebbleBatch{b: db.NewBatch()}
func newPebbleBatch(db *pebble.DB, cache pebblecache.Cache) *pebbleBatch {
return &pebbleBatch{b: db.NewBatch(), cache: cache}
}

func (p *pebbleDB) NewBatch() types.Batch {
return newPebbleBatch(p.db)
return newPebbleBatch(p.db, p.cache)
}

func (pb *pebbleBatch) Set(key, value []byte) error {
// Durability options are applied on Commit.
pb.pendingCacheUpdates = append(pb.pendingCacheUpdates, pebblecache.CacheUpdate{
Key: key,
Value: value,
})
return pb.b.Set(key, value, nil)
}

func (pb *pebbleBatch) Delete(key []byte) error {
// Durability options are applied on Commit.
pb.pendingCacheUpdates = append(pb.pendingCacheUpdates, pebblecache.CacheUpdate{
Key: key,
IsDelete: true,
})
return pb.b.Delete(key, nil)
}

func (pb *pebbleBatch) Commit(opts types.WriteOptions) error {
return pb.b.Commit(toPebbleWriteOpts(opts))
err := pb.b.Commit(toPebbleWriteOpts(opts))
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
err = pb.cache.BatchSet(pb.pendingCacheUpdates)
if err != nil {
return fmt.Errorf("failed to set cache: %w", err)
}
pb.pendingCacheUpdates = nil
return nil
}

func (pb *pebbleBatch) Len() int {
Expand All @@ -42,6 +64,7 @@ func (pb *pebbleBatch) Len() int {

func (pb *pebbleBatch) Reset() {
pb.b.Reset()
pb.pendingCacheUpdates = nil
}

func (pb *pebbleBatch) Close() error {
Expand Down
Loading
Loading