-
Notifications
You must be signed in to change notification settings - Fork 881
flatkv cache #3027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flatkv cache #3027
Changes from 43 commits
7c5e216
4a404ee
d36e825
2ccbe62
f412e85
e310037
cf1071c
11232ff
a8c1c75
221d114
8eca079
267feae
50b0be6
2ca00d6
8f8534a
23c0277
02d3ca1
7ee1b08
20c70c3
b714789
cc9d41d
53b2bd8
c10e0cd
04f40fa
b4e4d2c
e53fefa
438fc8d
19a8a19
23440f6
4ecc8fd
7a315c6
a3f3907
f255b87
cf0a73d
0b34737
452aa4d
1c804a8
663b2ea
bb530b5
04daf75
354818e
b1574ac
07e071c
d090796
dfd92c1
f751a9b
7b5538e
dc8d0c9
e9cc9ca
c7a418c
087fd0f
eb9bc51
cea0ebb
e58bec2
c3f34b1
111459f
d40395f
c8e85d2
ed7e4b6
5c46647
be0d4f5
9ff2199
bb2fe7e
f4b8326
4b2247b
36d7328
d759a9b
4ba242b
e19a998
94ae673
ed10a26
81dfd46
480839d
7835683
950197c
cff96ab
003fcc9
a208a1b
157a600
b41639f
fe31475
0702197
6d435f5
64f8530
d9c5fc1
14593ec
2d88076
ccad074
38ffd35
f143d30
a18fd93
34e711d
b596f89
33378ce
a402238
b18bc4e
ee30ca9
c8fd5ec
8094375
faf4871
9c8454f
872b0a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package metrics | ||
|
|
||
| // Shared histogram bucket boundaries for use across the codebase. | ||
| // The OTel defaults are too coarse for meaningful percentile queries in Grafana. | ||
|
|
||
| // LatencyBuckets covers 10μs to 5 minutes — wide enough for both fast key | ||
| // lookups and slow compactions/flushes without needing per-metric tuning. | ||
| var LatencyBuckets = []float64{ | ||
| 0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, // 10μs–500μs | ||
| 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, // 1ms–50ms | ||
| 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, // 100ms–5min | ||
| } | ||
|
|
||
| // ByteSizeBuckets covers 256B to 1GB for data size histograms. | ||
| var ByteSizeBuckets = []float64{ | ||
| 256, 1024, 4096, 16384, 65536, 262144, // 256B–256KB | ||
| 1 << 20, 4 << 20, 16 << 20, 64 << 20, 256 << 20, 1 << 30, // 1MB–1GB | ||
| } | ||
|
|
||
| // CountBuckets covers 1 to 1M for per-operation step/iteration counts. | ||
| var CountBuckets = []float64{ | ||
| 1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 100000, 1000000, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| package threading | ||
|
|
||
| import "context" | ||
|
|
||
| var _ Pool = (*adHocPool)(nil) | ||
|
|
||
| // adHocPool is a Pool that runs each task in a new goroutine. | ||
| // Intended for use in unit tests or where performance is not important. | ||
| type adHocPool struct{} | ||
|
|
||
| // NewAdHocPool creates a Pool that runs each submitted task in a one-off goroutine. | ||
| func NewAdHocPool() Pool { | ||
| return &adHocPool{} | ||
| } | ||
|
|
||
| func (p *adHocPool) Submit(_ context.Context, task func()) error { | ||
| go task() | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package threading | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| ) | ||
|
|
||
| // TODO unit test before merge | ||
|
|
||
| // Push to a channel, returning an error if the context is cancelled before the value is pushed. | ||
| func InterruptiblePush[T any](ctx context.Context, ch chan T, value T) error { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return fmt.Errorf("context cancelled: %w", ctx.Err()) | ||
| case ch <- value: | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // Pull from a channel, returning an error if the context is cancelled before the value is pulled. | ||
| func InterruptiblePull[T any](ctx context.Context, ch <-chan T) (T, error) { | ||
| var zero T | ||
| select { | ||
| case <-ctx.Done(): | ||
| return zero, fmt.Errorf("context cancelled: %w", ctx.Err()) | ||
| case value, ok := <-ch: | ||
| if !ok { | ||
| return zero, fmt.Errorf("channel closed") | ||
| } | ||
| return value, nil | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| package threading | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| ) | ||
|
|
||
| var _ Pool = (*elasticPool)(nil) | ||
|
|
||
| // elasticPool is a pool that guarantees every submitted task begins executing | ||
| // immediately without waiting for other tasks to finish first. It maintains a | ||
| // set of warm workers for goroutine reuse, and spawns temporary goroutines when | ||
| // all warm workers are busy. | ||
| // | ||
| // This is useful when tasks submitted to the pool may depend on other tasks in | ||
| // the same pool. For example, if task A is submitted and then submits task B, | ||
| // and A waits for B to complete, a fixed-size pool may deadlock when all | ||
| // workers are occupied, since task B can never be scheduled. An | ||
| // elastic pool avoids this by ensuring B starts immediately in a temporary | ||
| // goroutine if all workers are busy. | ||
| type elasticPool struct { | ||
| workQueue chan func() | ||
| } | ||
|
|
||
| // NewElasticPool creates a pool with the given number of warm workers. Submitted | ||
| // tasks are handed off to an idle warm worker if one is available, otherwise a | ||
| // temporary goroutine is spawned. Tasks are never queued behind other tasks. | ||
| func NewElasticPool( | ||
| ctx context.Context, | ||
| name string, | ||
| warmWorkers int, | ||
| ) Pool { | ||
| workQueue := make(chan func()) | ||
| ep := &elasticPool{ | ||
| workQueue: workQueue, | ||
| } | ||
|
|
||
| for i := 0; i < warmWorkers; i++ { | ||
| go ep.worker() | ||
Check noticeCode scanning / CodeQL Spawning a Go routine Note
Spawning a Go routine may be a possible source of non-determinism
|
||
| } | ||
|
|
||
| go func() { | ||
| <-ctx.Done() | ||
| close(workQueue) | ||
| }() | ||
Check noticeCode scanning / CodeQL Spawning a Go routine Note
Spawning a Go routine may be a possible source of non-determinism
|
||
|
|
||
| return ep | ||
| } | ||
|
|
||
| func (ep *elasticPool) Submit(ctx context.Context, task func()) (err error) { | ||
| defer func() { | ||
| if recover() != nil { | ||
| err = fmt.Errorf("elastic pool is shut down") | ||
| } | ||
| }() | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case ep.workQueue <- task: | ||
| return nil | ||
| default: | ||
| // We hit this case when all workers are busy. Under standard operation, this should | ||
| // be fairly rare, but it's not catastrophic if it happens. | ||
| go task() | ||
Check noticeCode scanning / CodeQL Spawning a Go routine Note
Spawning a Go routine may be a possible source of non-determinism
|
||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func (ep *elasticPool) worker() { | ||
| for task := range ep.workQueue { | ||
| task() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| package threading | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| ) | ||
|
|
||
| var _ Pool = (*fixedPool)(nil) | ||
|
|
||
| // fixedPool is a pool of workers that can be used to execute tasks concurrently. | ||
| // More efficient than spawning large numbers of short lived goroutines. | ||
| type fixedPool struct { | ||
| workQueue chan func() | ||
| } | ||
|
|
||
| // TODO add metrics! | ||
| // TODO unit test before merging! | ||
|
|
||
| // Create a new work pool. | ||
| func NewFixedPool( | ||
| // The work pool shuts down when the context is done. | ||
| ctx context.Context, | ||
| // The name of the work pool. Used for metrics. | ||
| name string, | ||
| // The number of workers to create. | ||
| workers int, | ||
| // The size of the work queue. Once full, Submit will block until a slot is available. | ||
| queueSize int, | ||
| ) Pool { | ||
|
|
||
| workQueue := make(chan func(), queueSize) | ||
| fp := &fixedPool{ | ||
| workQueue: workQueue, | ||
| } | ||
|
|
||
| for i := 0; i < workers; i++ { | ||
| go fp.worker() | ||
Check noticeCode scanning / CodeQL Spawning a Go routine Note
Spawning a Go routine may be a possible source of non-determinism
|
||
| } | ||
|
|
||
| // Shutdown the work pool when the context is done. | ||
| go func() { | ||
| <-ctx.Done() | ||
| close(workQueue) | ||
|
|
||
| // Handle any remaining tasks in the queue to avoid caller deadlock. | ||
| for task := range workQueue { | ||
| task() | ||
| } | ||
| }() | ||
Check noticeCode scanning / CodeQL Spawning a Go routine Note
Spawning a Go routine may be a possible source of non-determinism
|
||
|
|
||
| return fp | ||
| } | ||
|
|
||
| func (fp *fixedPool) Submit(ctx context.Context, task func()) (err error) { | ||
| defer func() { | ||
| if recover() != nil { | ||
| err = fmt.Errorf("fixed pool is shut down") | ||
| } | ||
| }() | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case fp.workQueue <- task: | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func (fp *fixedPool) worker() { | ||
| for task := range fp.workQueue { | ||
| task() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| package threading | ||
|
|
||
| import "context" | ||
|
|
||
| // Pool is a pool of workers that can be used to execute tasks concurrently. | ||
| type Pool interface { | ||
| // Submit submits a task to the pool. | ||
| Submit(ctx context.Context, task func()) error | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| package unit | ||
|
|
||
| const ( | ||
| // KB is the number of bytes in a kilobyte. | ||
| KB = 1024 | ||
| // MB is the number of bytes in a megabyte. | ||
| MB = KB * 1024 | ||
| // GB is the number of bytes in a gigabyte. | ||
| GB = MB * 1024 | ||
| // TB is the number of bytes in a terabyte. | ||
| TB = GB * 1024 | ||
| // PB is the number of bytes in a petabyte. | ||
| PB = TB * 1024 | ||
| // EB is the number of bytes in an exabyte. | ||
| EB = PB * 1024 | ||
| // ZB is the number of bytes in a zettabyte. | ||
| ZB = EB * 1024 | ||
| // YB is the number of bytes in a yottabyte. | ||
| YB = ZB * 1024 | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,7 @@ type StateCommitConfig struct { | |
| MemIAVLConfig memiavl.Config | ||
|
|
||
| // FlatKVConfig is the configuration for the FlatKV (EVM) backend | ||
| FlatKVConfig flatkv.Config | ||
| FlatKVConfig *flatkv.Config | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to make this a pointer?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted back to a direct reference. No need to make this a pointer, except that I prefer pointer types for any struct that has more than one or two fields. It's unlikely that this is going to be a major performance factor in this case, so I don't mind using a direct reference if that's your preferred pattern. |
||
|
|
||
| // Max concurrent historical proof queries (RPC /store path). | ||
| HistoricalProofMaxInFlight int `mapstructure:"historical-proof-max-inflight"` | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.