Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f124bd2
parquet v2 refactor coordinator thread - step 1
jewei1997 Apr 28, 2026
8927929
Coordinator goroutine + dispatch shape - step 2
jewei1997 Apr 28, 2026
d6daa29
Initialization, file open, WAL setup -- step 3
jewei1997 Apr 28, 2026
ad93678
Merge branch 'main' into parquet-refactor-v2-coordinator-thread2
jewei1997 Apr 28, 2026
f7ddab1
Add parquet v2 write path
jewei1997 Apr 28, 2026
c0fb4e9
Test parquet v2 rotation boundaries
jewei1997 Apr 28, 2026
b55a9ea
Test parquet v2 lazy writer init
jewei1997 Apr 28, 2026
c1a7d0e
Add parquet v2 boundary rotation
jewei1997 Apr 28, 2026
3573c98
Add parquet v2 empty block rotation
jewei1997 Apr 28, 2026
b83c4b5
Add parquet v2 read path
jewei1997 Apr 28, 2026
90e05cb
Add parquet v2 pruning
jewei1997 Apr 28, 2026
4d409ef
Add parquet v2 WAL replay
jewei1997 Apr 28, 2026
064ae48
Wire parquet v2 receipt backend
jewei1997 Apr 28, 2026
5424632
Test parquet v2 receipt backend
jewei1997 Apr 28, 2026
1938ac0
make a coordinator module
jewei1997 Apr 29, 2026
9dbb046
Merge branch 'main' into parquet-refactor-v2-coordinator-thread2
jewei1997 Apr 30, 2026
6ff4412
review fixes
jewei1997 Apr 30, 2026
ed768cd
fix
jewei1997 May 4, 2026
e96ae46
remove ObserveEmptyBlock and IsRotationBoundary
jewei1997 May 4, 2026
da245f5
simplify CacheRotateInterval to direct config read
jewei1997 May 4, 2026
c2adfc2
add godocs across parquet_v2 coordinator and store, panic on unknown …
jewei1997 May 4, 2026
9ffb2f1
move prune ticker into run() as local var
jewei1997 May 4, 2026
49602ed
use vistor pattern for dispatch
jewei1997 May 4, 2026
37ef152
run WAL replay synchronously inside parquet_v2 NewStore
jewei1997 May 4, 2026
98469dd
move WALConverter onto StoreConfig, drop ReplayHooks struct
jewei1997 May 4, 2026
a461b25
fix handleClose to release all resources on partial failure
jewei1997 May 4, 2026
42fa630
fix Close to return the first caller's error to repeat callers
jewei1997 May 4, 2026
52e76c6
back CacheRotateInterval with an atomic.Uint64 mirror
jewei1997 May 5, 2026
0e63a08
close parquet writers when New errors after replay opened them
jewei1997 May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sei-db/config/receipt_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ReceiptStoreConfig struct {
DBDirectory string `mapstructure:"db-directory"`

// Backend defines the backend database used for receipt-store.
// Supported backends: pebbledb (aka pebble), parquet
// Supported backends: pebbledb (aka pebble), parquet, parquet_v2
// defaults to pebbledb
Backend string `mapstructure:"rs-backend"`

Expand All @@ -63,12 +63,12 @@ type ReceiptStoreConfig struct {
PruneIntervalSeconds int `mapstructure:"prune-interval-seconds"`

// TxIndexBackend selects the tx-hash index implementation used by the
// parquet receipt store. Set to "pebbledb" (the default) to maintain a
// parquet receipt stores. Set to "pebbledb" (the default) to maintain a
// Pebble-backed tx_hash -> block_number index alongside parquet files so
// receipt-by-hash lookups can target a single file instead of scanning all
// files. Set to "" to disable the index; receipt-by-hash lookups that miss
// the in-memory cache then fail (no full-parquet scan). Ignored when the
// receipt backend is not parquet.
// receipt backend is not parquet or parquet_v2.
TxIndexBackend string `mapstructure:"tx-index-backend"`
}

Expand Down Expand Up @@ -105,10 +105,10 @@ func ReadReceiptConfig(opts AppOptions) (ReceiptStoreConfig, error) {
}
backend = strings.ToLower(strings.TrimSpace(backend))
switch backend {
case "pebbledb", "pebble", "parquet":
case "pebbledb", "pebble", "parquet", "parquet_v2":
cfg.Backend = backend
default:
return cfg, fmt.Errorf("unsupported receipt-store backend %q; supported: pebbledb, parquet", backend)
return cfg, fmt.Errorf("unsupported receipt-store backend %q; supported: pebbledb, parquet, parquet_v2", backend)
}
}
if v := opts.Get(flagRSAsyncWriteBuffer); v != nil {
Expand Down
18 changes: 18 additions & 0 deletions sei-db/config/receipt_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ func TestReadReceiptConfigRejectsMisnamedBackendKey(t *testing.T) {
require.ErrorContains(t, err, "receipt-store.rs-backend")
}

func TestReadReceiptConfigAcceptsParquetV2Backend(t *testing.T) {
cfg, err := ReadReceiptConfig(mapAppOpts{
"receipt-store.rs-backend": " parquet_v2 ",
})

require.NoError(t, err)
require.Equal(t, "parquet_v2", cfg.Backend)
}

func TestReadReceiptConfigBackendErrorListsParquetV2(t *testing.T) {
_, err := ReadReceiptConfig(mapAppOpts{
"receipt-store.rs-backend": "rocksdb",
})

require.Error(t, err)
require.ErrorContains(t, err, "parquet_v2")
}

func TestReadReceiptConfigTxIndexBackendOverride(t *testing.T) {
cfg, err := ReadReceiptConfig(mapAppOpts{
"receipt-store.tx-index-backend": "",
Expand Down
6 changes: 3 additions & 3 deletions sei-db/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ const ReceiptStoreConfigTemplate = `

[receipt-store]
# Backend defines the receipt store backend.
# Supported backends: pebble (aka pebbledb), parquet
# Supported backends: pebble (aka pebbledb), parquet, parquet_v2
# defaults to pebbledb
rs-backend = "{{ .ReceiptStore.Backend }}"

# Defines the receipt store directory. If unset, defaults to <home>/data/ledger/receipt/{backend}
db-directory = "{{ .ReceiptStore.DBDirectory }}"

# AsyncWriteBuffer defines the async queue length for commits to be applied to receipt store.
# Applies only when rs-backend = "pebbledb"; parquet ignores this setting.
# Applies only when rs-backend = "pebbledb"; parquet and parquet_v2 ignore this setting.
# Set <= 0 for synchronous writes.
# defaults to 100
async-write-buffer = {{ .ReceiptStore.AsyncWriteBuffer }}
Expand All @@ -169,7 +169,7 @@ prune-interval-seconds = {{ .ReceiptStore.PruneIntervalSeconds }}

# TxIndexBackend selects the tx-hash index implementation for parquet receipts.
# Set to "pebbledb" to enable the index, or "" to disable it.
# Ignored unless rs-backend = "parquet".
# Ignored unless rs-backend = "parquet" or "parquet_v2".
tx-index-backend = "{{ .ReceiptStore.TxIndexBackend }}"
`

Expand Down
1 change: 1 addition & 0 deletions sei-db/config/toml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestReceiptStoreConfigTemplate(t *testing.T) {

require.Contains(t, output, "[receipt-store]", "Missing receipt-store section")
require.Contains(t, output, `rs-backend = "pebbledb"`, "Missing or incorrect rs-backend")
require.Contains(t, output, `parquet_v2`, "Missing parquet_v2 supported backend note")
require.Contains(t, output, `db-directory = ""`, "Missing or incorrect db-directory")
require.Contains(t, output, "async-write-buffer =", "Missing async-write-buffer")
require.Contains(t, output, "prune-interval-seconds =", "Missing prune-interval-seconds")
Expand Down
23 changes: 23 additions & 0 deletions sei-db/ledger_db/parquet/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,29 @@ type StoreConfig struct {
BlockFlushInterval uint64
MaxBlocksPerFile uint64
TxIndexBackend string

// WALConverter, when non-nil, drives synchronous WAL replay during
// store construction. The function decodes one raw WAL receipt blob
// into the structured fields the store needs to re-stage it. Only
// consumed by the v2 store; v1 ignores it. When nil, replay is
// skipped — used by lower-level tests that drive replay manually.
WALConverter WALReceiptConverter
}

// WALReceiptConverter decodes a raw WAL receipt blob into the structured
// fields the v2 store needs to re-stage it. logStartIndex carries the
// running per-block log offset so logs from earlier txs in the same block
// don't collide.
type WALReceiptConverter func(blockNumber uint64, receiptBytes []byte, logStartIndex uint) (ReplayReceipt, error)

// ReplayReceipt is one converted WAL entry: the receipt input to re-stage,
// its tx hash, the warmup record returned to the wrapper, and the log
// count consumed (used to advance logStartIndex).
type ReplayReceipt struct {
Input ReceiptInput
TxHash common.Hash
Warmup ReceiptRecord
LogCount uint
}

// DefaultStoreConfig returns the default store configuration.
Expand Down
78 changes: 41 additions & 37 deletions sei-db/ledger_db/receipt/cached_receipt_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,43 +411,47 @@ func TestCachedReceiptStoreReportsCacheMiss(t *testing.T) {

// Wrapper tests for cachedReceiptStore using parquet backend.
func TestCachedReceiptStoreFallsBackToDuckDBOnReceiptCacheMiss(t *testing.T) {
ctx, storeKey := newTestContext()
cfg := dbconfig.DefaultReceiptStoreConfig()
cfg.Backend = "parquet"
cfg.DBDirectory = t.TempDir()

store, err := NewReceiptStore(cfg, storeKey)
require.NoError(t, err)

txHash := common.HexToHash("0x12")
addr := common.HexToAddress("0x212")
topic := common.HexToHash("0xcafe")
receipt := makeTestReceipt(txHash, 8, 0, addr, []common.Hash{topic})

require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(8), []ReceiptRecord{
{TxHash: txHash, Receipt: receipt},
}))
require.NoError(t, store.Close())

store, err = NewReceiptStore(cfg, storeKey)
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

cached, ok := store.(*cachedReceiptStore)
require.True(t, ok, "expected cached receipt store wrapper")

// A clean reopen leaves no WAL warmup records, so receipt lookup must
// miss the in-memory cache and fall through to the parquet/DuckDB backend.
_, ok = cached.cache.GetReceipt(txHash)
require.False(t, ok, "receipt cache should be cold after clean reopen")

// There is no legacy KV receipt entry to rescue the lookup, so success
// here proves GetReceipt() can recover from DuckDB after a cache miss.
require.Nil(t, ctx.KVStore(storeKey).Get(types.ReceiptKey(txHash)))

got, err := store.GetReceipt(ctx.WithBlockHeight(8), txHash)
require.NoError(t, err)
require.Equal(t, receipt.TxHashHex, got.TxHashHex)
for _, backend := range []string{"parquet", "parquet_v2"} {
t.Run(backend, func(t *testing.T) {
ctx, storeKey := newTestContext()
cfg := dbconfig.DefaultReceiptStoreConfig()
cfg.Backend = backend
cfg.DBDirectory = t.TempDir()

store, err := NewReceiptStore(cfg, storeKey)
require.NoError(t, err)

txHash := common.HexToHash("0x12")
addr := common.HexToAddress("0x212")
topic := common.HexToHash("0xcafe")
receipt := makeTestReceipt(txHash, 8, 0, addr, []common.Hash{topic})

require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(8), []ReceiptRecord{
{TxHash: txHash, Receipt: receipt},
}))
require.NoError(t, store.Close())

store, err = NewReceiptStore(cfg, storeKey)
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

cached, ok := store.(*cachedReceiptStore)
require.True(t, ok, "expected cached receipt store wrapper")

// A clean reopen leaves no WAL warmup records, so receipt lookup must
// miss the in-memory cache and fall through to the parquet/DuckDB backend.
_, ok = cached.cache.GetReceipt(txHash)
require.False(t, ok, "receipt cache should be cold after clean reopen")

// There is no legacy KV receipt entry to rescue the lookup, so success
// here proves GetReceipt() can recover from DuckDB after a cache miss.
require.Nil(t, ctx.KVStore(storeKey).Get(types.ReceiptKey(txHash)))

got, err := store.GetReceipt(ctx.WithBlockHeight(8), txHash)
require.NoError(t, err)
require.Equal(t, receipt.TxHashHex, got.TxHashHex)
})
}
}

func TestCachedReceiptStoreMergesDuckDBAndCacheAcrossBoundary(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions sei-db/ledger_db/receipt/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ func CloseTxHashIndex(store ReceiptStore) {
_ = pq.txHashIndex.Close()
pq.txHashIndex = nil
}
if pq, ok := store.(*parquetReceiptStoreV2); ok && pq.txHashIndex != nil {
if pq.indexPruner != nil {
pq.indexPruner.Stop()
pq.indexPruner = nil
}
_ = pq.txHashIndex.Close()
pq.txHashIndex = nil
}
}
Loading
Loading