diff --git a/sei-db/config/receipt_config.go b/sei-db/config/receipt_config.go index dbbe347a31..15bcdcfe78 100644 --- a/sei-db/config/receipt_config.go +++ b/sei-db/config/receipt_config.go @@ -42,7 +42,7 @@ type ReceiptStoreConfig struct { DBDirectory string `mapstructure:"db-directory"` // Backend defines the backend database used for receipt-store. - // Supported backends: pebbledb (aka pebble), parquet + // Supported backends: pebbledb (aka pebble), parquet, parquet_v2 // defaults to pebbledb Backend string `mapstructure:"rs-backend"` @@ -63,12 +63,12 @@ type ReceiptStoreConfig struct { PruneIntervalSeconds int `mapstructure:"prune-interval-seconds"` // TxIndexBackend selects the tx-hash index implementation used by the - // parquet receipt store. Set to "pebbledb" (the default) to maintain a + // parquet receipt stores. Set to "pebbledb" (the default) to maintain a // Pebble-backed tx_hash -> block_number index alongside parquet files so // receipt-by-hash lookups can target a single file instead of scanning all // files. Set to "" to disable the index; receipt-by-hash lookups that miss // the in-memory cache then fail (no full-parquet scan). Ignored when the - // receipt backend is not parquet. + // receipt backend is not parquet or parquet_v2. TxIndexBackend string `mapstructure:"tx-index-backend"` } @@ -105,10 +105,10 @@ func ReadReceiptConfig(opts AppOptions) (ReceiptStoreConfig, error) { } backend = strings.ToLower(strings.TrimSpace(backend)) switch backend { - case "pebbledb", "pebble", "parquet": + case "pebbledb", "pebble", "parquet", "parquet_v2": cfg.Backend = backend default: - return cfg, fmt.Errorf("unsupported receipt-store backend %q; supported: pebbledb, parquet", backend) + return cfg, fmt.Errorf("unsupported receipt-store backend %q; supported: pebbledb, parquet, parquet_v2", backend) } } if v := opts.Get(flagRSAsyncWriteBuffer); v != nil { diff --git a/sei-db/config/receipt_config_test.go b/sei-db/config/receipt_config_test.go index 3f9bea786f..eca53ec1e3 100644 --- a/sei-db/config/receipt_config_test.go +++ b/sei-db/config/receipt_config_test.go @@ -22,6 +22,24 @@ func TestReadReceiptConfigRejectsMisnamedBackendKey(t *testing.T) { require.ErrorContains(t, err, "receipt-store.rs-backend") } +func TestReadReceiptConfigAcceptsParquetV2Backend(t *testing.T) { + cfg, err := ReadReceiptConfig(mapAppOpts{ + "receipt-store.rs-backend": " parquet_v2 ", + }) + + require.NoError(t, err) + require.Equal(t, "parquet_v2", cfg.Backend) +} + +func TestReadReceiptConfigBackendErrorListsParquetV2(t *testing.T) { + _, err := ReadReceiptConfig(mapAppOpts{ + "receipt-store.rs-backend": "rocksdb", + }) + + require.Error(t, err) + require.ErrorContains(t, err, "parquet_v2") +} + func TestReadReceiptConfigTxIndexBackendOverride(t *testing.T) { cfg, err := ReadReceiptConfig(mapAppOpts{ "receipt-store.tx-index-backend": "", diff --git a/sei-db/config/toml.go b/sei-db/config/toml.go index eb387cb1d5..a55f1cfde4 100644 --- a/sei-db/config/toml.go +++ b/sei-db/config/toml.go @@ -149,7 +149,7 @@ const ReceiptStoreConfigTemplate = ` [receipt-store] # Backend defines the receipt store backend. -# Supported backends: pebble (aka pebbledb), parquet +# Supported backends: pebble (aka pebbledb), parquet, parquet_v2 # defaults to pebbledb rs-backend = "{{ .ReceiptStore.Backend }}" @@ -157,7 +157,7 @@ rs-backend = "{{ .ReceiptStore.Backend }}" db-directory = "{{ .ReceiptStore.DBDirectory }}" # AsyncWriteBuffer defines the async queue length for commits to be applied to receipt store. -# Applies only when rs-backend = "pebbledb"; parquet ignores this setting. +# Applies only when rs-backend = "pebbledb"; parquet and parquet_v2 ignore this setting. # Set <= 0 for synchronous writes. # defaults to 100 async-write-buffer = {{ .ReceiptStore.AsyncWriteBuffer }} @@ -169,7 +169,7 @@ prune-interval-seconds = {{ .ReceiptStore.PruneIntervalSeconds }} # TxIndexBackend selects the tx-hash index implementation for parquet receipts. # Set to "pebbledb" to enable the index, or "" to disable it. -# Ignored unless rs-backend = "parquet". +# Ignored unless rs-backend = "parquet" or "parquet_v2". tx-index-backend = "{{ .ReceiptStore.TxIndexBackend }}" ` diff --git a/sei-db/config/toml_test.go b/sei-db/config/toml_test.go index fd0a51f932..284ab29ad8 100644 --- a/sei-db/config/toml_test.go +++ b/sei-db/config/toml_test.go @@ -112,6 +112,7 @@ func TestReceiptStoreConfigTemplate(t *testing.T) { require.Contains(t, output, "[receipt-store]", "Missing receipt-store section") require.Contains(t, output, `rs-backend = "pebbledb"`, "Missing or incorrect rs-backend") + require.Contains(t, output, `parquet_v2`, "Missing parquet_v2 supported backend note") require.Contains(t, output, `db-directory = ""`, "Missing or incorrect db-directory") require.Contains(t, output, "async-write-buffer =", "Missing async-write-buffer") require.Contains(t, output, "prune-interval-seconds =", "Missing prune-interval-seconds") diff --git a/sei-db/ledger_db/parquet/store.go b/sei-db/ledger_db/parquet/store.go index d4bfed2fab..844b298b09 100644 --- a/sei-db/ledger_db/parquet/store.go +++ b/sei-db/ledger_db/parquet/store.go @@ -38,6 +38,29 @@ type StoreConfig struct { BlockFlushInterval uint64 MaxBlocksPerFile uint64 TxIndexBackend string + + // WALConverter, when non-nil, drives synchronous WAL replay during + // store construction. The function decodes one raw WAL receipt blob + // into the structured fields the store needs to re-stage it. Only + // consumed by the v2 store; v1 ignores it. When nil, replay is + // skipped — used by lower-level tests that drive replay manually. + WALConverter WALReceiptConverter +} + +// WALReceiptConverter decodes a raw WAL receipt blob into the structured +// fields the v2 store needs to re-stage it. logStartIndex carries the +// running per-block log offset so logs from earlier txs in the same block +// don't collide. +type WALReceiptConverter func(blockNumber uint64, receiptBytes []byte, logStartIndex uint) (ReplayReceipt, error) + +// ReplayReceipt is one converted WAL entry: the receipt input to re-stage, +// its tx hash, the warmup record returned to the wrapper, and the log +// count consumed (used to advance logStartIndex). +type ReplayReceipt struct { + Input ReceiptInput + TxHash common.Hash + Warmup ReceiptRecord + LogCount uint } // DefaultStoreConfig returns the default store configuration. diff --git a/sei-db/ledger_db/receipt/cached_receipt_store_test.go b/sei-db/ledger_db/receipt/cached_receipt_store_test.go index cacc526c0d..057e79b1e2 100644 --- a/sei-db/ledger_db/receipt/cached_receipt_store_test.go +++ b/sei-db/ledger_db/receipt/cached_receipt_store_test.go @@ -411,43 +411,47 @@ func TestCachedReceiptStoreReportsCacheMiss(t *testing.T) { // Wrapper tests for cachedReceiptStore using parquet backend. func TestCachedReceiptStoreFallsBackToDuckDBOnReceiptCacheMiss(t *testing.T) { - ctx, storeKey := newTestContext() - cfg := dbconfig.DefaultReceiptStoreConfig() - cfg.Backend = "parquet" - cfg.DBDirectory = t.TempDir() - - store, err := NewReceiptStore(cfg, storeKey) - require.NoError(t, err) - - txHash := common.HexToHash("0x12") - addr := common.HexToAddress("0x212") - topic := common.HexToHash("0xcafe") - receipt := makeTestReceipt(txHash, 8, 0, addr, []common.Hash{topic}) - - require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(8), []ReceiptRecord{ - {TxHash: txHash, Receipt: receipt}, - })) - require.NoError(t, store.Close()) - - store, err = NewReceiptStore(cfg, storeKey) - require.NoError(t, err) - t.Cleanup(func() { _ = store.Close() }) - - cached, ok := store.(*cachedReceiptStore) - require.True(t, ok, "expected cached receipt store wrapper") - - // A clean reopen leaves no WAL warmup records, so receipt lookup must - // miss the in-memory cache and fall through to the parquet/DuckDB backend. - _, ok = cached.cache.GetReceipt(txHash) - require.False(t, ok, "receipt cache should be cold after clean reopen") - - // There is no legacy KV receipt entry to rescue the lookup, so success - // here proves GetReceipt() can recover from DuckDB after a cache miss. - require.Nil(t, ctx.KVStore(storeKey).Get(types.ReceiptKey(txHash))) - - got, err := store.GetReceipt(ctx.WithBlockHeight(8), txHash) - require.NoError(t, err) - require.Equal(t, receipt.TxHashHex, got.TxHashHex) + for _, backend := range []string{"parquet", "parquet_v2"} { + t.Run(backend, func(t *testing.T) { + ctx, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = backend + cfg.DBDirectory = t.TempDir() + + store, err := NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + + txHash := common.HexToHash("0x12") + addr := common.HexToAddress("0x212") + topic := common.HexToHash("0xcafe") + receipt := makeTestReceipt(txHash, 8, 0, addr, []common.Hash{topic}) + + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(8), []ReceiptRecord{ + {TxHash: txHash, Receipt: receipt}, + })) + require.NoError(t, store.Close()) + + store, err = NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + cached, ok := store.(*cachedReceiptStore) + require.True(t, ok, "expected cached receipt store wrapper") + + // A clean reopen leaves no WAL warmup records, so receipt lookup must + // miss the in-memory cache and fall through to the parquet/DuckDB backend. + _, ok = cached.cache.GetReceipt(txHash) + require.False(t, ok, "receipt cache should be cold after clean reopen") + + // There is no legacy KV receipt entry to rescue the lookup, so success + // here proves GetReceipt() can recover from DuckDB after a cache miss. + require.Nil(t, ctx.KVStore(storeKey).Get(types.ReceiptKey(txHash))) + + got, err := store.GetReceipt(ctx.WithBlockHeight(8), txHash) + require.NoError(t, err) + require.Equal(t, receipt.TxHashHex, got.TxHashHex) + }) + } } func TestCachedReceiptStoreMergesDuckDBAndCacheAcrossBoundary(t *testing.T) { diff --git a/sei-db/ledger_db/receipt/export_test.go b/sei-db/ledger_db/receipt/export_test.go index 8950a2e4ae..87a1cb9869 100644 --- a/sei-db/ledger_db/receipt/export_test.go +++ b/sei-db/ledger_db/receipt/export_test.go @@ -30,4 +30,12 @@ func CloseTxHashIndex(store ReceiptStore) { _ = pq.txHashIndex.Close() pq.txHashIndex = nil } + if pq, ok := store.(*parquetReceiptStoreV2); ok && pq.txHashIndex != nil { + if pq.indexPruner != nil { + pq.indexPruner.Stop() + pq.indexPruner = nil + } + _ = pq.txHashIndex.Close() + pq.txHashIndex = nil + } } diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go new file mode 100644 index 0000000000..2b45e809b7 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go @@ -0,0 +1,380 @@ +package coordinator + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + parquetgo "github.com/parquet-go/parquet-go" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + dbwal "github.com/sei-protocol/sei-chain/sei-db/wal" +) + +type closedFile struct { + startBlock uint64 + receiptPath string + logPath string +} + +// Coordinator owns parquet_v2's mutable state and serializes all access via +// its requests channel. Construct with New; interact through the typed +// methods (WriteReceipts, GetLogs, ...). +type Coordinator struct { + requests chan coordRequest + done chan struct{} + closeOnce sync.Once + closeErr error + + config parquet.StoreConfig + + // cacheRotateInterval mirrors config.MaxBlocksPerFile but is read + // without holding the run goroutine. CacheRotateInterval is the only + // external reader; all internal callers go through config under the + // run goroutine. Kept in sync by handleSetMaxBlocksPerFile. + cacheRotateInterval atomic.Uint64 + + basePath string + fileStartBlock uint64 + receiptWriter *parquetgo.GenericWriter[parquet.ReceiptRecord] + logWriter *parquetgo.GenericWriter[parquet.LogRecord] + receiptFile *os.File + logFile *os.File + closedFiles []closedFile + + receiptsBuffer []parquet.ReceiptRecord + logsBuffer []parquet.LogRecord + lastSeenBlock uint64 + blocksSinceFlush uint64 + + tempWriteCache map[common.Hash][]tempReceipt + + latestVersion int64 + earliestVersion int64 + + faultHooks *parquet.FaultHooks + + wal dbwal.GenericWAL[parquet.WALEntry] + reader *Reader + + warmupRecords []parquet.ReceiptRecord + replayedBlocks []ReplayedBlock +} + +// New constructs a Coordinator and drives WAL replay synchronously before +// starting the request goroutine, so the returned Coordinator already +// reflects any persisted-but-uncheckpointed receipts. Replay runs only +// when cfg.WALConverter is non-nil; tests that exercise replayWAL +// directly leave it nil. After construction, callers drain the recovered +// state via WarmupRecords and ReplayedBlocks. +func New(cfg parquet.StoreConfig) (*Coordinator, error) { + storeCfg := resolveStoreConfig(cfg) + + if err := os.MkdirAll(storeCfg.DBDirectory, 0o750); err != nil { + return nil, fmt.Errorf("failed to create parquet base directory: %w", err) + } + + requests := make(chan coordRequest) + done := make(chan struct{}) + reader, err := NewReaderWithMaxBlocksPerFile(cfg.DBDirectory, storeCfg.MaxBlocksPerFile) + if err != nil { + return nil, err + } + cleanupReader := true + defer func() { + if cleanupReader { + _ = reader.Close() + } + }() + + walDir := filepath.Join(storeCfg.DBDirectory, "parquet-wal") + receiptWAL, err := parquet.NewWAL(walDir) + if err != nil { + return nil, err + } + cleanupWAL := true + defer func() { + if cleanupWAL { + _ = receiptWAL.Close() + } + }() + + closedFiles, err := scanClosedFiles(storeCfg.DBDirectory, reader) + if err != nil { + return nil, err + } + + c := &Coordinator{ + requests: requests, + done: done, + config: storeCfg, + basePath: cfg.DBDirectory, + closedFiles: closedFiles, + receiptsBuffer: make([]parquet.ReceiptRecord, 0, 1000), + logsBuffer: make([]parquet.LogRecord, 0, 10000), + tempWriteCache: make(map[common.Hash][]tempReceipt), + reader: reader, + wal: receiptWAL, + latestVersion: 0, + earliestVersion: 0, + } + cleanupWriters := true + defer func() { + if cleanupWriters { + _ = c.closeWriters() + } + }() + c.cacheRotateInterval.Store(storeCfg.MaxBlocksPerFile) + + receiptFiles := make([]string, 0, len(closedFiles)) + for _, f := range closedFiles { + receiptFiles = append(receiptFiles, f.receiptPath) + } + if maxBlock, ok, err := reader.MaxReceiptBlockNumber(context.Background(), receiptFiles); err != nil { + return nil, err + } else if ok { + latest, err := int64FromUint64(maxBlock) + if err != nil { + return nil, err + } + c.latestVersion = latest + if maxBlock < ^uint64(0) { + c.fileStartBlock = maxBlock + 1 + } + } + + if cfg.WALConverter != nil { + result, err := c.replayWAL(cfg.WALConverter) + if err != nil { + return nil, err + } + c.warmupRecords = result.WarmupRecords + c.replayedBlocks = result.Blocks + } + + go c.run() + cleanupReader = false + cleanupWAL = false + cleanupWriters = false + + return c, nil +} + +func resolveStoreConfig(cfg parquet.StoreConfig) parquet.StoreConfig { + resolved := parquet.DefaultStoreConfig() + resolved.DBDirectory = cfg.DBDirectory + resolved.KeepRecent = cfg.KeepRecent + resolved.PruneIntervalSeconds = cfg.PruneIntervalSeconds + if cfg.TxIndexBackend != "" { + resolved.TxIndexBackend = cfg.TxIndexBackend + } + if cfg.BlockFlushInterval > 0 { + resolved.BlockFlushInterval = cfg.BlockFlushInterval + } + if cfg.MaxBlocksPerFile > 0 { + resolved.MaxBlocksPerFile = cfg.MaxBlocksPerFile + } + return resolved +} + +func (c *Coordinator) run() { + var pruneTick <-chan time.Time + if c.config.KeepRecent > 0 && c.config.PruneIntervalSeconds > 0 { + ticker := time.NewTicker(time.Duration(c.config.PruneIntervalSeconds) * time.Second) + defer ticker.Stop() + pruneTick = ticker.C + } + for { + select { + case req := <-c.requests: + if req.dispatch(c) { + return + } + case <-pruneTick: + c.handlePruneTick() + case <-c.done: + return + } + } +} + +// WriteReceipts records a committed block. inputs may be empty, in which case +// the call only advances rotation/cursor state — equivalent to the former +// ObserveEmptyBlock. height is authoritative; inputs[i].BlockNumber is +// ignored. +func (c *Coordinator) WriteReceipts(height uint64, inputs []parquet.ReceiptInput) error { + resp := make(chan writeResp, 1) + r, err := sendAndAwaitResponse(context.Background(), c, writeReq{height: height, inputs: inputs, resp: resp}, resp) + if err != nil { + return err + } + return r.err +} + +func (c *Coordinator) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*parquet.ReceiptResult, error) { + resp := make(chan readReceiptResp, 1) + r, err := sendAndAwaitResponse(ctx, c, readByTxHashReq{ctx: ctx, txHash: txHash, resp: resp}, resp) + if err != nil { + return nil, err + } + return r.result, r.err +} + +func (c *Coordinator) GetReceiptByTxHashInBlock(ctx context.Context, txHash common.Hash, blockNumber uint64) (*parquet.ReceiptResult, error) { + resp := make(chan readReceiptResp, 1) + r, err := sendAndAwaitResponse(ctx, c, readByTxHashInBlockReq{ + ctx: ctx, + txHash: txHash, + blockNumber: blockNumber, + resp: resp, + }, resp) + if err != nil { + return nil, err + } + return r.result, r.err +} + +func (c *Coordinator) GetLogs(ctx context.Context, filter parquet.LogFilter) ([]parquet.LogResult, error) { + resp := make(chan getLogsResp, 1) + r, err := sendAndAwaitResponse(ctx, c, getLogsReq{ctx: ctx, filter: filter, resp: resp}, resp) + if err != nil { + return nil, err + } + return r.results, r.err +} + +func (c *Coordinator) FileStartBlock() uint64 { + resp := make(chan uint64, 1) + r, err := sendAndAwaitResponse(context.Background(), c, fileStartBlockReq{resp: resp}, resp) + if err != nil { + return 0 + } + return r +} + +func (c *Coordinator) LatestVersion() int64 { + resp := make(chan int64, 1) + r, err := sendAndAwaitResponse(context.Background(), c, latestVersionReq{resp: resp}, resp) + if err != nil { + return 0 + } + return r +} + +func (c *Coordinator) SetLatestVersion(version int64) { + resp := make(chan error, 1) + _ = awaitError(context.Background(), c, setLatestVersionReq{version: version, resp: resp}, resp) +} + +func (c *Coordinator) SetEarliestVersion(version int64) { + resp := make(chan error, 1) + _ = awaitError(context.Background(), c, setEarliestVersionReq{version: version, resp: resp}, resp) +} + +func (c *Coordinator) UpdateLatestVersion(version int64) { + resp := make(chan error, 1) + _ = awaitError(context.Background(), c, updateLatestVersionReq{version: version, resp: resp}, resp) +} + +// CacheRotateInterval returns the rotation boundary used by the cached receipt +// store. Backed by an atomic mirror of config.MaxBlocksPerFile so external +// readers don't race with handleSetMaxBlocksPerFile in the run goroutine. +func (c *Coordinator) CacheRotateInterval() uint64 { + return c.cacheRotateInterval.Load() +} + +func (c *Coordinator) Flush() error { + resp := make(chan error, 1) + return awaitError(context.Background(), c, flushReq{resp: resp}, resp) +} + +// Close performs a graceful shutdown. Subsequent callers receive the same +// error as the first caller — closeErr is set inside closeOnce.Do, which +// provides the happens-before edge for all later reads. +func (c *Coordinator) Close() error { + c.closeOnce.Do(func() { + resp := make(chan error, 1) + c.closeErr = awaitError(context.Background(), c, closeReq{resp: resp}, resp) + close(c.done) + }) + return c.closeErr +} + +func (c *Coordinator) SimulateCrash() { + c.closeOnce.Do(func() { + resp := make(chan struct{}, 1) + _, _ = sendAndAwaitResponse(context.Background(), c, simulateCrashReq{resp: resp}, resp) + close(c.done) + }) +} + +func (c *Coordinator) SetBlockFlushInterval(interval uint64) { + resp := make(chan error, 1) + _ = awaitError(context.Background(), c, setBlockFlushIntervalReq{interval: interval, resp: resp}, resp) +} + +func (c *Coordinator) SetMaxBlocksPerFile(n uint64) { + resp := make(chan error, 1) + _ = awaitError(context.Background(), c, setMaxBlocksPerFileReq{maxBlocksPerFile: n, resp: resp}, resp) +} + +func (c *Coordinator) SetFaultHooks(hooks *parquet.FaultHooks) { + resp := make(chan error, 1) + _ = awaitError(context.Background(), c, setFaultHooksReq{hooks: hooks, resp: resp}, resp) +} + +// WarmupRecords returns and clears the warmup receipt records recovered +// during construction-time WAL replay. Callers drain this once after +// construction to seed an external receipt cache. +func (c *Coordinator) WarmupRecords() []parquet.ReceiptRecord { + records := c.warmupRecords + c.warmupRecords = nil + return records +} + +// ReplayedBlocks returns and clears the per-block tx-hash listing +// recovered during construction-time WAL replay. Wrappers drain this +// once after construction to repopulate an external tx-hash index. +func (c *Coordinator) ReplayedBlocks() []ReplayedBlock { + blocks := c.replayedBlocks + c.replayedBlocks = nil + return blocks +} + +// sendAndAwaitResponse enqueues req on the requests channel and waits for the +// reply. If ctx is cancelled at either stage the call returns ctx.Err() so +// callers don't sit blocked behind an in-flight handler. Non-cancellable +// callers (writes, lifecycle ops) pass context.Background(); the read APIs +// pass the caller's ctx so eth_getLogs / GetReceiptByTxHash respect timeouts. +func sendAndAwaitResponse[T any](ctx context.Context, c *Coordinator, req coordRequest, resp <-chan T) (T, error) { + var zero T + + select { + case c.requests <- req: + case <-c.done: + return zero, ErrStoreClosed + case <-ctx.Done(): + return zero, ctx.Err() + } + + select { + case r := <-resp: + return r, nil + case <-c.done: + return zero, ErrStoreClosed + case <-ctx.Done(): + return zero, ctx.Err() + } +} + +func awaitError(ctx context.Context, c *Coordinator, req coordRequest, resp <-chan error) error { + err, waitErr := sendAndAwaitResponse(ctx, c, req, resp) + if waitErr != nil { + return waitErr + } + return err +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/dispatch_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/dispatch_test.go new file mode 100644 index 0000000000..5b037f8cf7 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/dispatch_test.go @@ -0,0 +1,123 @@ +package coordinator + +import ( + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestSetMaxBlocksPerFileUpdatesReaderState(t *testing.T) { + reader, err := NewReaderWithMaxBlocksPerFile(t.TempDir(), 10) + require.NoError(t, err) + t.Cleanup(func() { _ = reader.Close() }) + + resp := make(chan error, 1) + coord := &Coordinator{ + config: parquet.StoreConfig{ + MaxBlocksPerFile: 10, + }, + reader: reader, + } + + coord.handleSetMaxBlocksPerFile(setMaxBlocksPerFileReq{ + maxBlocksPerFile: 3, + resp: resp, + }) + + require.NoError(t, <-resp) + require.Equal(t, uint64(3), coord.config.MaxBlocksPerFile) + require.Equal(t, uint64(3), coord.cacheRotateInterval.Load()) + require.Equal(t, uint64(3), reader.maxBlocksPerFile) +} + +func TestHandleCloseReleasesAllResourcesOnFlushError(t *testing.T) { + coord, err := New(parquet.StoreConfig{ + DBDirectory: t.TempDir(), + MaxBlocksPerFile: 4, + }) + require.NoError(t, err) + + require.NotNil(t, coord.wal) + require.NotNil(t, coord.reader) + + require.NoError(t, coord.WriteReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, common.HexToHash("0x1")), + })) + require.NotNil(t, coord.receiptWriter) + require.NotNil(t, coord.receiptFile) + + coord.SetFaultHooks(&parquet.FaultHooks{ + BeforeFlush: func(uint64) error { return errors.New("injected flush failure") }, + }) + + closeErr := coord.Close() + require.Error(t, closeErr) + require.ErrorContains(t, closeErr, "injected flush failure") + + require.Nil(t, coord.wal, "WAL must be released even when flushOpenFile errors") + require.Nil(t, coord.reader, "reader must be released even when flushOpenFile errors") + require.Nil(t, coord.receiptWriter) + require.Nil(t, coord.logWriter) + require.Nil(t, coord.receiptFile) + require.Nil(t, coord.logFile) +} + +func TestCloseReturnsSameErrorToRepeatCallers(t *testing.T) { + coord, err := New(parquet.StoreConfig{ + DBDirectory: t.TempDir(), + MaxBlocksPerFile: 4, + }) + require.NoError(t, err) + + require.NoError(t, coord.WriteReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, common.HexToHash("0x1")), + })) + + coord.SetFaultHooks(&parquet.FaultHooks{ + BeforeFlush: func(uint64) error { return errors.New("injected flush failure") }, + }) + + first := coord.Close() + second := coord.Close() + require.Error(t, first) + require.Error(t, second, "second Close() must surface the original close error, not nil") + require.Equal(t, first, second) +} + +func TestUnbufferedRequestsApplyBackpressure(t *testing.T) { + requests := make(chan coordRequest) + done := make(chan struct{}) + coord := &Coordinator{ + requests: requests, + done: done, + } + go coord.run() + + require.Zero(t, cap(coord.requests)) + + firstResp := make(chan writeResp) + coord.requests <- writeReq{ + inputs: []parquet.ReceiptInput{testReceiptInput(1, common.HexToHash("0x1"))}, + resp: firstResp, + } + time.Sleep(10 * time.Millisecond) + + secondDone := make(chan error, 1) + go func() { + secondDone <- coord.Flush() + }() + + select { + case err := <-secondDone: + t.Fatalf("second request completed before first unblocked: %v", err) + case <-time.After(25 * time.Millisecond): + } + + require.Error(t, (<-firstResp).err) + require.NoError(t, <-secondDone) + require.NoError(t, coord.Close()) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go new file mode 100644 index 0000000000..29067570b8 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go @@ -0,0 +1,5 @@ +package coordinator + +func forcePruneTickForTest(c *Coordinator) { + c.handlePruneTick() +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/files.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/files.go new file mode 100644 index 0000000000..73e2c93124 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/files.go @@ -0,0 +1,97 @@ +package coordinator + +import ( + "fmt" + "os" + "path/filepath" + "sort" + + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" +) + +// scanClosedFiles enumerates closed receipt/log parquet pairs under basePath. +// It drops trailing files that fail a readability probe (likely truncated by +// a crash mid-flush) and returns the surviving pairs sorted by start block. +func scanClosedFiles(basePath string, reader *Reader) ([]closedFile, error) { + receiptFiles, err := parquetFilesByPrefix(basePath, "receipts") + if err != nil { + return nil, err + } + logFiles, err := parquetFilesByPrefix(basePath, "logs") + if err != nil { + return nil, err + } + + receiptFiles = validateAndCleanFiles(basePath, reader, receiptFiles, "logs") + logFiles = validateAndCleanFiles(basePath, reader, logFiles, "receipts") + + logByStart := make(map[uint64]string, len(logFiles)) + for _, path := range logFiles { + if fileExists(path) { + logByStart[parquet.ExtractBlockNumber(path)] = path + } + } + + closed := make([]closedFile, 0, len(receiptFiles)) + for _, receiptPath := range receiptFiles { + if !fileExists(receiptPath) { + continue + } + startBlock := parquet.ExtractBlockNumber(receiptPath) + logPath, ok := logByStart[startBlock] + if !ok { + continue + } + closed = append(closed, closedFile{ + startBlock: startBlock, + receiptPath: receiptPath, + logPath: logPath, + }) + } + + sort.Slice(closed, func(i, j int) bool { + return closed[i].startBlock < closed[j].startBlock + }) + return closed, nil +} + +// parquetFilesByPrefix globs parquet files of the form "{prefix}_*.parquet" +// directly under basePath. +func parquetFilesByPrefix(basePath, prefix string) ([]string, error) { + pattern := filepath.Join(basePath, prefix+"_*.parquet") + files, err := filepath.Glob(pattern) + if err != nil { + return nil, fmt.Errorf("failed to glob %s parquet files with pattern %q: %w", prefix, pattern, err) + } + return files, nil +} + +// validateAndCleanFiles probes the highest-numbered file for readability and, +// if it fails, removes both it and its same-start-block counterpart (the +// receipt/log sibling) from disk. Only the trailing file is checked because +// a crash can only corrupt the most recently written one. +func validateAndCleanFiles(basePath string, reader *Reader, files []string, counterpartPrefix string) []string { + if len(files) == 0 { + return nil + } + + sort.Slice(files, func(i, j int) bool { + return parquet.ExtractBlockNumber(files[i]) < parquet.ExtractBlockNumber(files[j]) + }) + + lastFile := files[len(files)-1] + if reader.isFileReadable(lastFile) { + return files + } + + startBlock := parquet.ExtractBlockNumber(lastFile) + _ = os.Remove(lastFile) + counterpart := filepath.Join(basePath, fmt.Sprintf("%s_%d.parquet", counterpartPrefix, startBlock)) + _ = os.Remove(counterpart) + return files[:len(files)-1] +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + return err == nil +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/files_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/files_test.go new file mode 100644 index 0000000000..6363c95987 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/files_test.go @@ -0,0 +1,26 @@ +package coordinator + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestScanClosedFilesSortsByStartBlock(t *testing.T) { + dir := t.TempDir() + for _, startBlock := range []uint64{1000, 0, 500} { + writeReceiptFile(t, dir, startBlock, []uint64{startBlock + 1}) + writeLogFile(t, dir, startBlock) + } + + reader, err := NewReaderWithMaxBlocksPerFile(dir, 500) + require.NoError(t, err) + t.Cleanup(func() { _ = reader.Close() }) + + closedFiles, err := scanClosedFiles(dir, reader) + require.NoError(t, err) + require.Len(t, closedFiles, 3) + require.Equal(t, uint64(0), closedFiles[0].startBlock) + require.Equal(t, uint64(500), closedFiles[1].startBlock) + require.Equal(t, uint64(1000), closedFiles[2].startBlock) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go new file mode 100644 index 0000000000..1e35d012ec --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go @@ -0,0 +1,601 @@ +package coordinator + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/ethereum/go-ethereum/common" + parquetgo "github.com/parquet-go/parquet-go" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" +) + +// handleWrite serves a writeReq by appending receipts for a single block and +// replying with any error encountered during WAL append, rotation, or buffer +// staging. +func (c *Coordinator) handleWrite(req writeReq) { + req.resp <- writeResp{err: c.writeReceipts(req.height, req.inputs)} +} + +// handleReadByTxHash serves a readByTxHashReq by checking the in-memory write +// cache first, then falling back to a DuckDB query over closed parquet files. +func (c *Coordinator) handleReadByTxHash(req readByTxHashReq) { + if result := c.cachedReceiptByTxHash(req.txHash); result != nil { + req.resp <- readReceiptResp{result: result} + return + } + if c.reader == nil { + req.resp <- readReceiptResp{err: fmt.Errorf("parquet reader is not initialized")} + return + } + + result, err := c.reader.QueryReceiptByTxHash(req.ctx, c.receiptFilesSnapshot(), req.txHash) + req.resp <- readReceiptResp{result: result, err: err} +} + +// handleReadByTxHashInBlock serves a readByTxHashInBlockReq, narrowing the +// parquet file scan to the single closed file that contains the requested +// block (if any). +func (c *Coordinator) handleReadByTxHashInBlock(req readByTxHashInBlockReq) { + if result := c.cachedReceiptByTxHashInBlock(req.txHash, req.blockNumber); result != nil { + req.resp <- readReceiptResp{result: result} + return + } + if c.reader == nil { + req.resp <- readReceiptResp{err: fmt.Errorf("parquet reader is not initialized")} + return + } + + result, err := c.reader.QueryReceiptByTxHashInBlock(req.ctx, c.receiptFileSnapshotForBlock(req.blockNumber), req.txHash, req.blockNumber) + req.resp <- readReceiptResp{result: result, err: err} +} + +// handleGetLogs serves a getLogsReq by querying logs across the closed log +// parquet files using the supplied filter. +func (c *Coordinator) handleGetLogs(req getLogsReq) { + if c.reader == nil { + req.resp <- getLogsResp{err: fmt.Errorf("parquet reader is not initialized")} + return + } + + results, err := c.reader.QueryLogs(req.ctx, c.logFilesSnapshot(), req.filter) + req.resp <- getLogsResp{results: results, err: err} +} + +// handleFlush serves a flushReq by flushing buffered receipts/logs for the +// open parquet file to disk. +func (c *Coordinator) handleFlush(req flushReq) { + req.resp <- c.flushOpenFile() +} + +// handleLatestVersion returns the highest block height the coordinator has +// observed via WriteReceipts or WAL replay. +func (c *Coordinator) handleLatestVersion(req latestVersionReq) { + req.resp <- c.latestVersion +} + +// handleSetLatestVersion overwrites latestVersion. Used by callers that +// authoritatively know the chain height (e.g., genesis/init paths). +func (c *Coordinator) handleSetLatestVersion(req setLatestVersionReq) { + c.latestVersion = req.version + req.resp <- nil +} + +// handleSetEarliestVersion records the lowest retained block height. Pruning +// uses this as a hint about the visible window. +func (c *Coordinator) handleSetEarliestVersion(req setEarliestVersionReq) { + c.earliestVersion = req.version + req.resp <- nil +} + +// handleUpdateLatestVersion advances latestVersion only when the supplied +// value is greater, preventing accidental rewinds. +func (c *Coordinator) handleUpdateLatestVersion(req updateLatestVersionReq) { + if req.version > c.latestVersion { + c.latestVersion = req.version + } + req.resp <- nil +} + +// handleFileStartBlock returns the start block of the currently open parquet +// file (the next file's name will derive from this). +func (c *Coordinator) handleFileStartBlock(req fileStartBlockReq) { + req.resp <- c.fileStartBlock +} + +// handleSetBlockFlushInterval updates how often (in blocks) the buffered +// receipt/log writer is flushed to disk. +func (c *Coordinator) handleSetBlockFlushInterval(req setBlockFlushIntervalReq) { + c.config.BlockFlushInterval = req.interval + req.resp <- nil +} + +// handleSetMaxBlocksPerFile updates the rotation interval and propagates it +// to the reader so log-file pruning by block range stays consistent. Mirrors +// the new value into cacheRotateInterval so external readers see it without +// going through the request channel. +func (c *Coordinator) handleSetMaxBlocksPerFile(req setMaxBlocksPerFileReq) { + c.config.MaxBlocksPerFile = req.maxBlocksPerFile + c.cacheRotateInterval.Store(req.maxBlocksPerFile) + if c.reader != nil { + c.reader.setMaxBlocksPerFile(req.maxBlocksPerFile) + } + req.resp <- nil +} + +// handleSetFaultHooks installs the supplied test hooks. In production the +// hooks pointer is nil and all hook checks become no-ops. +func (c *Coordinator) handleSetFaultHooks(req setFaultHooksReq) { + c.faultHooks = req.hooks + req.resp <- nil +} + +// handlePruneTick fires on the prune ticker and removes closed parquet pairs +// whose end block falls below latestVersion - KeepRecent. +func (c *Coordinator) handlePruneTick() { + // TODO(future-async): if read I/O moves to a worker pool, gate prune on + // map[fileID]int reference counts that the coordinator increments on + // dispatch and decrements on completion. + if c.config.KeepRecent <= 0 { + return + } + pruneBeforeBlock := c.latestVersion - c.config.KeepRecent + if pruneBeforeBlock <= 0 { + return + } + c.pruneOldFiles(uint64(pruneBeforeBlock)) +} + +// handleClose performs a graceful shutdown: flush and close the open writers, +// then close the WAL and reader. Each step runs even if an earlier one +// errors so resources (file descriptors, WAL background goroutines, DuckDB +// connections) are always released. Errors from every step are joined and +// returned together. The prune ticker is stopped via defer in run(). +func (c *Coordinator) handleClose(req closeReq) { + var errs []error + if err := c.flushOpenFile(); err != nil { + errs = append(errs, fmt.Errorf("flush: %w", err)) + } + if err := c.closeWriters(); err != nil { + errs = append(errs, err) + } + if c.wal != nil { + if err := c.wal.Close(); err != nil { + errs = append(errs, fmt.Errorf("wal close: %w", err)) + } + c.wal = nil + } + if c.reader != nil { + if err := c.reader.Close(); err != nil { + errs = append(errs, fmt.Errorf("reader close: %w", err)) + } + c.reader = nil + } + req.resp <- errors.Join(errs...) +} + +// handleSimulateCrash drops in-memory writer state without flushing — the +// open parquet files remain truncated/partial on disk so subsequent recovery +// paths can be exercised. Test-only. +func (c *Coordinator) handleSimulateCrash(req simulateCrashReq) { + if c.receiptFile != nil { + _ = c.receiptFile.Close() + c.receiptFile = nil + } + if c.logFile != nil { + _ = c.logFile.Close() + c.logFile = nil + } + c.receiptWriter = nil + c.logWriter = nil + if c.wal != nil { + _ = c.wal.Close() + } + if c.reader != nil { + _ = c.reader.Close() + } + req.resp <- struct{}{} +} + +// writeReceipts records a committed block at height. When inputs is empty it +// degenerates to the rotation/cursor-advance path (formerly ObserveEmptyBlock): +// no WAL entry is written, but if height lands on a rotation boundary the +// open file is rotated so it never spans more than MaxBlocksPerFile blocks. +// height is authoritative; inputs[i].BlockNumber is ignored. +func (c *Coordinator) writeReceipts(height uint64, inputs []parquet.ReceiptInput) error { + if len(inputs) == 0 { + return c.observeBlock(height) + } + if c.wal == nil { + return fmt.Errorf("parquet WAL is not initialized") + } + + receiptBytes := make([][]byte, len(inputs)) + for i := range inputs { + receiptBytes[i] = inputs[i].ReceiptBytes + } + if err := c.wal.Write(parquet.WALEntry{BlockNumber: height, Receipts: receiptBytes}); err != nil { + return err + } + + if h := c.faultHooks; h != nil && h.AfterWALWrite != nil { + if err := h.AfterWALWrite(height); err != nil { + return err + } + } + + if c.receiptWriter != nil && height != c.lastSeenBlock && c.isRotationBoundary(height) { + if err := c.rotateOpenFile(height); err != nil { + return err + } + } + + for i := range inputs { + if err := c.applyReceipt(height, inputs[i]); err != nil { + return err + } + } + + latest, err := int64FromUint64(height) + if err != nil { + return err + } + if latest > c.latestVersion { + c.latestVersion = latest + } + return nil +} + +// applyReceipt stages a single receipt into the open parquet writer's +// in-memory buffers and the temp write cache, lazily creating writers if this +// is the first receipt for the current file. Triggers a flush when +// blocksSinceFlush has reached BlockFlushInterval. +func (c *Coordinator) applyReceipt(blockNumber uint64, input parquet.ReceiptInput) error { + if c.receiptWriter == nil { + aligned := alignedFileStartBlock(blockNumber, c.config.MaxBlocksPerFile) + if aligned >= c.fileStartBlock { + c.fileStartBlock = aligned + } + if err := c.initWriters(); err != nil { + return err + } + } + + if blockNumber != c.lastSeenBlock { + if c.lastSeenBlock != 0 { + c.blocksSinceFlush++ + } + c.lastSeenBlock = blockNumber + } + + c.receiptsBuffer = append(c.receiptsBuffer, input.Receipt) + if len(input.Logs) > 0 { + c.logsBuffer = append(c.logsBuffer, input.Logs...) + } + + txHash := common.BytesToHash(input.Receipt.TxHash) + c.tempWriteCache[txHash] = append(c.tempWriteCache[txHash], tempReceipt{ + blockNumber: blockNumber, + receiptBytes: input.ReceiptBytes, + }) + + if c.config.BlockFlushInterval > 0 && c.blocksSinceFlush >= c.config.BlockFlushInterval { + if err := c.flushOpenFile(); err != nil { + return err + } + c.blocksSinceFlush = 0 + } + + return nil +} + +// cachedReceiptByTxHash returns the earliest cached receipt for txHash, or +// nil if the temp write cache has no entry. Used to serve reads for receipts +// that are still buffered and not yet flushed to a closed parquet file. +func (c *Coordinator) cachedReceiptByTxHash(txHash common.Hash) *parquet.ReceiptResult { + entries := c.tempWriteCache[txHash] + if len(entries) == 0 { + return nil + } + return receiptResultFromTemp(txHash, entries[0]) +} + +// cachedReceiptByTxHashInBlock returns the cached receipt for txHash at the +// given block, or nil if the temp write cache has no matching entry. +func (c *Coordinator) cachedReceiptByTxHashInBlock(txHash common.Hash, blockNumber uint64) *parquet.ReceiptResult { + for _, entry := range c.tempWriteCache[txHash] { + if entry.blockNumber == blockNumber { + return receiptResultFromTemp(txHash, entry) + } + } + return nil +} + +// receiptResultFromTemp converts a tempReceipt cache entry into the public +// ReceiptResult shape, copying byte slices to decouple from cache storage. +func receiptResultFromTemp(txHash common.Hash, entry tempReceipt) *parquet.ReceiptResult { + return &parquet.ReceiptResult{ + TxHash: append([]byte(nil), txHash[:]...), + BlockNumber: entry.blockNumber, + ReceiptBytes: append([]byte(nil), entry.receiptBytes...), + } +} + +// receiptFilesSnapshot returns the receipt parquet paths for all closed +// files. Reads use this list as the file set for full-range queries. +func (c *Coordinator) receiptFilesSnapshot() []string { + files := make([]string, 0, len(c.closedFiles)) + for _, f := range c.closedFiles { + files = append(files, f.receiptPath) + } + return files +} + +// receiptFileSnapshotForBlock returns the single closed receipt file whose +// start block is the largest one not exceeding blockNumber, or nil if no +// such file exists. Used to narrow point lookups by block. +func (c *Coordinator) receiptFileSnapshotForBlock(blockNumber uint64) []string { + var best string + for _, f := range c.closedFiles { + if f.startBlock > blockNumber { + break + } + best = f.receiptPath + } + if best == "" { + return nil + } + return []string{best} +} + +// logFilesSnapshot returns the log parquet paths for all closed files. Log +// queries use this list as the file set, which the Reader further narrows +// by from/to-block range. +func (c *Coordinator) logFilesSnapshot() []string { + files := make([]string, 0, len(c.closedFiles)) + for _, f := range c.closedFiles { + files = append(files, f.logPath) + } + return files +} + +// isRotationBoundary reports whether blockNumber lands on a MaxBlocksPerFile +// boundary, in which case the open parquet file should rotate before this +// block's receipts are written. +func (c *Coordinator) isRotationBoundary(blockNumber uint64) bool { + return c.config.MaxBlocksPerFile > 0 && blockNumber%c.config.MaxBlocksPerFile == 0 +} + +// alignedFileStartBlock floors blockNumber to the nearest multiple of +// maxBlocksPerFile, used to derive a parquet file's start-block name. +func alignedFileStartBlock(blockNumber, maxBlocksPerFile uint64) uint64 { + if maxBlocksPerFile == 0 { + return blockNumber + } + return (blockNumber / maxBlocksPerFile) * maxBlocksPerFile +} + +// initWriters creates the receipt and log parquet files for the current +// fileStartBlock and constructs sorted parquet writers over them. If the log +// file fails to open, the receipt file is closed before returning. +func (c *Coordinator) initWriters() error { + receiptPath := filepath.Join(c.basePath, fmt.Sprintf("receipts_%d.parquet", c.fileStartBlock)) + logPath := filepath.Join(c.basePath, fmt.Sprintf("logs_%d.parquet", c.fileStartBlock)) + + // #nosec G304 -- paths are constructed from configured base directory. + receiptFile, err := os.Create(receiptPath) + if err != nil { + return fmt.Errorf("failed to create receipt parquet file: %w", err) + } + + // #nosec G304 -- paths are constructed from configured base directory. + logFile, err := os.Create(logPath) + if err != nil { + if closeErr := receiptFile.Close(); closeErr != nil { + return fmt.Errorf("failed to create log parquet file: %w; close receipt file error: %v", err, closeErr) + } + return fmt.Errorf("failed to create log parquet file: %w", err) + } + + blockNumberSorting := parquetgo.SortingWriterConfig( + parquetgo.SortingColumns(parquetgo.Ascending("block_number")), + ) + + c.receiptFile = receiptFile + c.logFile = logFile + c.receiptWriter = parquetgo.NewGenericWriter[parquet.ReceiptRecord](receiptFile, + parquetgo.Compression(&parquetgo.Snappy), + blockNumberSorting, + ) + c.logWriter = parquetgo.NewGenericWriter[parquet.LogRecord](logFile, + parquetgo.Compression(&parquetgo.Snappy), + blockNumberSorting, + ) + + return nil +} + +// rotateOpenFile closes the current parquet file pair, opens a new one +// starting at newBlockNumber, truncates the WAL up to (but preserving) the +// most recent entry, and drops cached entries that are now durably stored. +func (c *Coordinator) rotateOpenFile(newBlockNumber uint64) error { + if err := c.rotateOpenFileWithoutWAL(newBlockNumber); err != nil { + return err + } + if err := c.clearWALPreservingLast(); err != nil { + return err + } + if h := c.faultHooks; h != nil && h.AfterWALClear != nil { + if err := h.AfterWALClear(newBlockNumber); err != nil { + return err + } + } + c.dropTempCacheBefore(c.fileStartBlock) + return nil +} + +// rotateOpenFileWithoutWAL performs the file-side rotation steps (flush, +// close, register closed pair, open new pair) without touching the WAL. +// Used during replay where the WAL drives rotation timing externally. +func (c *Coordinator) rotateOpenFileWithoutWAL(newBlockNumber uint64) error { + if c.receiptWriter == nil { + return nil + } + if err := c.flushOpenFile(); err != nil { + return err + } + + oldStartBlock := c.fileStartBlock + oldReceiptPath := filepath.Join(c.basePath, fmt.Sprintf("receipts_%d.parquet", oldStartBlock)) + oldLogPath := filepath.Join(c.basePath, fmt.Sprintf("logs_%d.parquet", oldStartBlock)) + + if err := c.closeWriters(); err != nil { + return err + } + + if h := c.faultHooks; h != nil && h.AfterCloseWriters != nil { + if err := h.AfterCloseWriters(newBlockNumber); err != nil { + return err + } + } + + c.closedFiles = append(c.closedFiles, closedFile{ + startBlock: oldStartBlock, + receiptPath: oldReceiptPath, + logPath: oldLogPath, + }) + c.fileStartBlock = newBlockNumber + if err := c.initWriters(); err != nil { + return err + } + c.blocksSinceFlush = 0 + return nil +} + +// dropTempCacheBefore evicts temp-cache entries for blocks below +// blockNumber, freeing memory once those receipts are durably persisted in +// a closed parquet file. +func (c *Coordinator) dropTempCacheBefore(blockNumber uint64) { + for txHash, entries := range c.tempWriteCache { + kept := entries[:0] + for _, entry := range entries { + if entry.blockNumber >= blockNumber { + kept = append(kept, entry) + } + } + if len(kept) == 0 { + delete(c.tempWriteCache, txHash) + continue + } + c.tempWriteCache[txHash] = kept + } +} + +// observeBlock advances the cursor for a committed block without writing to +// the WAL. Called by writeReceipts when inputs is empty. Out-of-order +// observations must not move the cursor backward — WriteReceipts could +// otherwise mis-handle rotation for a height already seen. +func (c *Coordinator) observeBlock(height uint64) error { + if height <= c.lastSeenBlock { + return nil + } + if c.receiptWriter == nil || !c.isRotationBoundary(height) { + c.lastSeenBlock = height + return nil + } + if err := c.rotateOpenFile(height); err != nil { + return err + } + c.lastSeenBlock = height + return nil +} + +// flushOpenFile drains the in-memory receipt and log buffers into the open +// parquet writers and forces them to disk. No-op when nothing is buffered. +func (c *Coordinator) flushOpenFile() error { + if len(c.receiptsBuffer) == 0 { + return nil + } + if c.receiptWriter == nil { + return fmt.Errorf("cannot flush receipts: receipt writer is not initialized") + } + + if h := c.faultHooks; h != nil && h.BeforeFlush != nil { + if err := h.BeforeFlush(c.lastSeenBlock); err != nil { + return err + } + } + + if _, err := c.receiptWriter.Write(c.receiptsBuffer); err != nil { + return fmt.Errorf("failed to write receipts to parquet: %w", err) + } + if err := c.receiptWriter.Flush(); err != nil { + return fmt.Errorf("failed to flush receipt parquet writer: %w", err) + } + + if len(c.logsBuffer) > 0 { + if c.logWriter == nil { + return fmt.Errorf("cannot flush logs: log writer is not initialized") + } + if _, err := c.logWriter.Write(c.logsBuffer); err != nil { + return fmt.Errorf("failed to write logs to parquet: %w", err) + } + if err := c.logWriter.Flush(); err != nil { + return fmt.Errorf("failed to flush log parquet writer: %w", err) + } + } + + if h := c.faultHooks; h != nil && h.AfterFlush != nil { + if err := h.AfterFlush(c.lastSeenBlock); err != nil { + return err + } + } + + c.receiptsBuffer = c.receiptsBuffer[:0] + c.logsBuffer = c.logsBuffer[:0] + return nil +} + +// closeWriters finalizes the parquet writers (writing the trailer/footer) +// and fsync+closes the underlying files. All errors encountered are +// collected and returned together so partial cleanup still happens. +func (c *Coordinator) closeWriters() error { + var errs []error + + if c.receiptWriter != nil { + if err := c.receiptWriter.Close(); err != nil { + errs = append(errs, fmt.Errorf("receipt writer: %w", err)) + } + c.receiptWriter = nil + } + if c.logWriter != nil { + if err := c.logWriter.Close(); err != nil { + errs = append(errs, fmt.Errorf("log writer: %w", err)) + } + c.logWriter = nil + } + if c.receiptFile != nil { + if err := c.receiptFile.Sync(); err != nil { + errs = append(errs, fmt.Errorf("receipt file sync: %w", err)) + } + if err := c.receiptFile.Close(); err != nil { + errs = append(errs, fmt.Errorf("receipt file: %w", err)) + } + c.receiptFile = nil + } + if c.logFile != nil { + if err := c.logFile.Sync(); err != nil { + errs = append(errs, fmt.Errorf("log file sync: %w", err)) + } + if err := c.logFile.Close(); err != nil { + errs = append(errs, fmt.Errorf("log file: %w", err)) + } + c.logFile = nil + } + + if len(errs) > 0 { + return fmt.Errorf("close errors: %v", errs) + } + return nil +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go new file mode 100644 index 0000000000..7ac9445f86 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go @@ -0,0 +1,210 @@ +package coordinator + +import ( + "context" + "fmt" + "math/big" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/ethereum/go-ethereum/common" + parquetgo "github.com/parquet-go/parquet-go" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func testReceiptInput(blockNumber uint64, txHash common.Hash) parquet.ReceiptInput { + receiptBytes := []byte{byte(blockNumber), txHash[31]} + return parquet.ReceiptInput{ + BlockNumber: blockNumber, + Receipt: parquet.ReceiptRecord{ + TxHash: txHash[:], + BlockNumber: blockNumber, + ReceiptBytes: receiptBytes, + }, + Logs: []parquet.LogRecord{{ + BlockNumber: blockNumber, + TxHash: txHash[:], + Address: common.BigToAddress(new(big.Int).SetUint64(blockNumber)).Bytes(), + }}, + ReceiptBytes: receiptBytes, + } +} + +func newWriteCoordinator(t *testing.T, wal *recordingWAL) *Coordinator { + t.Helper() + + cfg := parquet.DefaultStoreConfig() + cfg.DBDirectory = t.TempDir() + cfg.MaxBlocksPerFile = 500 + cfg.BlockFlushInterval = 0 + + return &Coordinator{ + config: cfg, + basePath: cfg.DBDirectory, + receiptsBuffer: make([]parquet.ReceiptRecord, 0, 1000), + logsBuffer: make([]parquet.LogRecord, 0, 10000), + tempWriteCache: make(map[common.Hash][]tempReceipt), + wal: wal, + } +} + +func newReplayCoordinator(t *testing.T, wal *recordingWAL) *Coordinator { + t.Helper() + + coord := newWriteCoordinator(t, wal) + coord.config.MaxBlocksPerFile = 4 + return coord +} + +func replayWALWithEntries(t *testing.T, entries ...parquet.WALEntry) *recordingWAL { + t.Helper() + + wal := &recordingWAL{} + for _, entry := range entries { + require.NoError(t, wal.Write(entry)) + } + return wal +} + +func replayConverterForTest(blockNumber uint64, receiptBytes []byte, _ uint) (parquet.ReplayReceipt, error) { + txHash := common.BigToHash(new(big.Int).SetUint64(uint64(receiptBytes[0]))) + input := testReceiptInput(blockNumber, txHash) + input.ReceiptBytes = append([]byte(nil), receiptBytes...) + input.Receipt.ReceiptBytes = append([]byte(nil), receiptBytes...) + + return parquet.ReplayReceipt{ + Input: input, + TxHash: txHash, + Warmup: input.Receipt, + LogCount: uint(len(input.Logs)), + }, nil +} + +func writeReceiptFile(t *testing.T, dir string, startBlock uint64, blocks []uint64) { + t.Helper() + + path := filepath.Join(dir, fmt.Sprintf("receipts_%d.parquet", startBlock)) + f, err := os.Create(path) + require.NoError(t, err) + + w := parquetgo.NewGenericWriter[parquet.ReceiptRecord](f) + for _, block := range blocks { + txHash := common.BigToHash(new(big.Int).SetUint64(block)) + _, err := w.Write([]parquet.ReceiptRecord{{ + TxHash: txHash[:], + BlockNumber: block, + ReceiptBytes: []byte{byte(block)}, + }}) + require.NoError(t, err) + } + require.NoError(t, w.Close()) + require.NoError(t, f.Close()) +} + +func writeLogFile(t *testing.T, dir string, startBlock uint64) { + t.Helper() + + path := filepath.Join(dir, fmt.Sprintf("logs_%d.parquet", startBlock)) + f, err := os.Create(path) + require.NoError(t, err) + + w := parquetgo.NewGenericWriter[parquet.LogRecord](f) + txHash := common.BigToHash(new(big.Int).SetUint64(startBlock)) + _, err = w.Write([]parquet.LogRecord{{ + BlockNumber: startBlock, + TxHash: txHash[:], + }}) + require.NoError(t, err) + require.NoError(t, w.Close()) + require.NoError(t, f.Close()) +} + +func writeClosedFileSet(t *testing.T, dir string, starts ...uint64) []closedFile { + t.Helper() + + closed := make([]closedFile, 0, len(starts)) + for _, start := range starts { + block := start + 1 + writeReceiptFile(t, dir, start, []uint64{block}) + writeLogFile(t, dir, start) + closed = append(closed, closedFile{ + startBlock: start, + receiptPath: filepath.Join(dir, "receipts_"+strconv.FormatUint(start, 10)+".parquet"), + logPath: filepath.Join(dir, "logs_"+strconv.FormatUint(start, 10)+".parquet"), + }) + } + return closed +} + +func readClosedReceiptForTest(t *testing.T, coord *Coordinator, txHash common.Hash, blockNumber uint64) *parquet.ReceiptResult { + t.Helper() + + resp := make(chan readReceiptResp, 1) + coord.handleReadByTxHashInBlock(readByTxHashInBlockReq{ + ctx: context.Background(), + txHash: txHash, + blockNumber: blockNumber, + resp: resp, + }) + result := <-resp + require.NoError(t, result.err) + return result.result +} + +type recordingWAL struct { + entries []parquet.WALEntry + firstOffset uint64 + lastOffset uint64 + truncatedBefore []uint64 +} + +func (w *recordingWAL) Write(entry parquet.WALEntry) error { + if w.firstOffset == 0 { + w.firstOffset = 1 + } + w.lastOffset++ + w.entries = append(w.entries, entry) + return nil +} + +func (w *recordingWAL) TruncateBefore(offset uint64) error { + w.truncatedBefore = append(w.truncatedBefore, offset) + return nil +} + +func (w *recordingWAL) TruncateAfter(uint64) error { return nil } + +func (w *recordingWAL) ReadAt(uint64) (parquet.WALEntry, error) { return parquet.WALEntry{}, nil } + +func (w *recordingWAL) FirstOffset() (uint64, error) { return w.firstOffset, nil } + +func (w *recordingWAL) LastOffset() (uint64, error) { return w.lastOffset, nil } + +func (w *recordingWAL) Replay(firstOffset, lastOffset uint64, fn func(uint64, parquet.WALEntry) error) error { + for i, entry := range w.entries { + offset := uint64(i) + 1 + if offset < firstOffset || offset > lastOffset { + continue + } + if err := fn(offset, entry); err != nil { + return err + } + } + return nil +} + +func (w *recordingWAL) Close() error { return nil } + +var _ interface { + Write(parquet.WALEntry) error + TruncateBefore(uint64) error + TruncateAfter(uint64) error + ReadAt(uint64) (parquet.WALEntry, error) + FirstOffset() (uint64, error) + LastOffset() (uint64, error) + Replay(uint64, uint64, func(uint64, parquet.WALEntry) error) error + Close() error +} = (*recordingWAL)(nil) diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go new file mode 100644 index 0000000000..47ade32ac5 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go @@ -0,0 +1,69 @@ +package coordinator + +import ( + "os" + + "github.com/sei-protocol/seilog" +) + +var ( + removeFile = os.Remove + logger = seilog.NewLogger("db", "ledger-db", "parquet-v2") +) + +// pruneOldFiles deletes closed parquet pairs whose entire block range falls +// below pruneBeforeBlock. A pair stays in the list if either of its files +// fails to delete, so a transient error doesn't desync c.closedFiles from +// disk. Returns the number of pairs successfully removed. +func (c *Coordinator) pruneOldFiles(pruneBeforeBlock uint64) int { + if len(c.closedFiles) == 0 { + return 0 + } + + prunedCount := 0 + kept := c.closedFiles[:0] + for _, f := range c.closedFiles { + if !c.shouldPruneClosedFile(f, pruneBeforeBlock) { + kept = append(kept, f) + continue + } + + receiptRemoved := removePrunedFile(f.receiptPath) + if !receiptRemoved { + kept = append(kept, f) + continue + } + logRemoved := removePrunedFile(f.logPath) + if logRemoved { + prunedCount++ + continue + } + kept = append(kept, f) + } + c.closedFiles = kept + return prunedCount +} + +// shouldPruneClosedFile reports whether the file's full block range +// (startBlock + MaxBlocksPerFile) lies entirely below pruneBeforeBlock. +// Saturates on overflow rather than wrapping. +func (c *Coordinator) shouldPruneClosedFile(f closedFile, pruneBeforeBlock uint64) bool { + fileEndBlock := f.startBlock + c.config.MaxBlocksPerFile + if fileEndBlock < f.startBlock { + fileEndBlock = ^uint64(0) + } + return fileEndBlock <= pruneBeforeBlock +} + +// removePrunedFile deletes path. Treats "already gone" as success and logs +// any other failure. The package var removeFile lets tests inject failures. +func removePrunedFile(path string) bool { + if path == "" { + return true + } + if err := removeFile(path); err != nil && !os.IsNotExist(err) { + logger.Error("failed to prune parquet file", "file", path, "err", err) + return false + } + return true +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go new file mode 100644 index 0000000000..033a2e5940 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go @@ -0,0 +1,76 @@ +package coordinator + +import ( + "errors" + "math/big" + "os" + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestPruneTickDeletesEligibleClosedFiles(t *testing.T) { + dir := t.TempDir() + closedFiles := writeClosedFileSet(t, dir, 0, 4, 8) + + reader, err := NewReaderWithMaxBlocksPerFile(dir, 4) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, reader.Close()) }) + + coord := &Coordinator{ + config: parquet.StoreConfig{ + KeepRecent: 4, + MaxBlocksPerFile: 4, + }, + closedFiles: closedFiles, + latestVersion: 12, + reader: reader, + } + + forcePruneTickForTest(coord) + + require.Len(t, coord.closedFiles, 1) + require.Equal(t, uint64(8), coord.closedFiles[0].startBlock) + require.NoFileExists(t, filepath.Join(dir, "receipts_0.parquet")) + require.NoFileExists(t, filepath.Join(dir, "logs_0.parquet")) + require.NoFileExists(t, filepath.Join(dir, "receipts_4.parquet")) + require.NoFileExists(t, filepath.Join(dir, "logs_4.parquet")) + require.FileExists(t, filepath.Join(dir, "receipts_8.parquet")) + require.FileExists(t, filepath.Join(dir, "logs_8.parquet")) + + prunedResult := readClosedReceiptForTest(t, coord, common.BigToHash(new(big.Int).SetUint64(1)), 1) + require.Nil(t, prunedResult) + + keptResult := readClosedReceiptForTest(t, coord, common.BigToHash(new(big.Int).SetUint64(9)), 9) + require.NotNil(t, keptResult) + require.Equal(t, uint64(9), keptResult.BlockNumber) +} + +func TestPruneKeepsFilePairTrackedWhenDeleteFails(t *testing.T) { + dir := t.TempDir() + closedFiles := writeClosedFileSet(t, dir, 0) + failPath := filepath.Join(dir, "receipts_0.parquet") + + originalRemoveFile := removeFile + t.Cleanup(func() { removeFile = originalRemoveFile }) + removeFile = func(path string) error { + if path == failPath { + return errors.New("delete failed") + } + return os.Remove(path) + } + + coord := &Coordinator{ + config: parquet.StoreConfig{MaxBlocksPerFile: 4}, + closedFiles: closedFiles, + } + + require.Zero(t, coord.pruneOldFiles(4)) + require.Len(t, coord.closedFiles, 1) + require.Equal(t, uint64(0), coord.closedFiles[0].startBlock) + require.FileExists(t, failPath) + require.FileExists(t, filepath.Join(dir, "logs_0.parquet")) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/read_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/read_test.go new file mode 100644 index 0000000000..772fdf37b8 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/read_test.go @@ -0,0 +1,56 @@ +package coordinator + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestReadByTxHashHitsTempCache(t *testing.T) { + txHash := common.HexToHash("0xabc") + coord := &Coordinator{ + tempWriteCache: map[common.Hash][]tempReceipt{ + txHash: { + {blockNumber: 10, receiptBytes: []byte("first")}, + {blockNumber: 10, receiptBytes: []byte("second")}, + {blockNumber: 11, receiptBytes: []byte("third")}, + }, + }, + } + + resp := make(chan readReceiptResp, 1) + coord.handleReadByTxHash(readByTxHashReq{ + ctx: context.Background(), + txHash: txHash, + resp: resp, + }) + result := <-resp + require.NoError(t, result.err) + require.Equal(t, uint64(10), result.result.BlockNumber) + require.Equal(t, []byte("first"), result.result.ReceiptBytes) + + resp = make(chan readReceiptResp, 1) + coord.handleReadByTxHashInBlock(readByTxHashInBlockReq{ + ctx: context.Background(), + txHash: txHash, + blockNumber: 11, + resp: resp, + }) + result = <-resp + require.NoError(t, result.err) + require.Equal(t, uint64(11), result.result.BlockNumber) + require.Equal(t, []byte("third"), result.result.ReceiptBytes) + + resp = make(chan readReceiptResp, 1) + coord.handleReadByTxHashInBlock(readByTxHashInBlockReq{ + ctx: context.Background(), + txHash: txHash, + blockNumber: 10, + resp: resp, + }) + result = <-resp + require.NoError(t, result.err) + require.Equal(t, []byte("first"), result.result.ReceiptBytes) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/reader.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/reader.go new file mode 100644 index 0000000000..0946228ae5 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/reader.go @@ -0,0 +1,342 @@ +package coordinator + +import ( + "context" + "database/sql" + "errors" + "fmt" + "runtime" + "strings" + + "github.com/duckdb/duckdb-go/v2" + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" +) + +// Reader is the V2 DuckDB query helper. It intentionally owns no file-list +// state; callers pass explicit file snapshots to each query. +type Reader struct { + db *sql.DB + basePath string + maxBlocksPerFile uint64 +} + +// NewReaderWithMaxBlocksPerFile constructs a Reader backed by an in-process +// DuckDB connection tuned for parquet scans. maxBlocksPerFile=0 falls back +// to the default; the value is used when narrowing log queries by block +// range from the file name alone. +func NewReaderWithMaxBlocksPerFile(basePath string, maxBlocksPerFile uint64) (*Reader, error) { + if maxBlocksPerFile == 0 { + maxBlocksPerFile = parquet.DefaultStoreConfig().MaxBlocksPerFile + } + + connector, err := duckdb.NewConnector("", nil) + if err != nil { + return nil, fmt.Errorf("failed to create DuckDB connector: %w", err) + } + + db := sql.OpenDB(connector) + numCPU := runtime.NumCPU() + db.SetMaxOpenConns(numCPU * 2) + db.SetMaxIdleConns(numCPU) + + settings := []string{ + fmt.Sprintf("SET threads TO %d", numCPU), + "SET memory_limit = '1GB'", + "SET enable_object_cache = true", + "SET enable_progress_bar = false", + "SET preserve_insertion_order = false", + } + for _, statement := range settings { + if _, err = db.Exec(statement); err != nil { + _ = db.Close() + return nil, fmt.Errorf("failed to configure duckdb (%s): %w", statement, err) + } + } + if err = configureParquetMetadataCache(db); err != nil { + _ = db.Close() + return nil, err + } + + return &Reader{ + db: db, + basePath: basePath, + maxBlocksPerFile: maxBlocksPerFile, + }, nil +} + +// setMaxBlocksPerFile updates the rotation interval used to derive each +// file's covered block range. Called by the coordinator when configuration +// changes at runtime in tests. +func (r *Reader) setMaxBlocksPerFile(maxBlocksPerFile uint64) { + r.maxBlocksPerFile = maxBlocksPerFile +} + +// Close shuts down the DuckDB connection pool. Safe to call on a nil Reader +// or after a previous Close. +func (r *Reader) Close() error { + if r == nil || r.db == nil { + return nil + } + err := r.db.Close() + r.db = nil + return err +} + +// QueryReceiptByTxHash returns the lowest-block receipt for txHash across +// the supplied parquet files, or (nil, nil) if none of them contain it. +func (r *Reader) QueryReceiptByTxHash(ctx context.Context, files []string, txHash common.Hash) (*parquet.ReceiptResult, error) { + if len(files) == 0 { + return nil, nil + } + parquetFiles := parquetFilesSQL(files) + + // #nosec G201 -- parquetFiles derived from coordinator-owned local file paths. + query := fmt.Sprintf(` + SELECT + tx_hash, block_number, receipt_bytes + FROM read_parquet(%s, union_by_name=true) + WHERE tx_hash = $1 + ORDER BY block_number + LIMIT 1 + `, parquetFiles) + + row := r.db.QueryRowContext(ctx, query, txHash[:]) + var rec parquet.ReceiptResult + if err := row.Scan(&rec.TxHash, &rec.BlockNumber, &rec.ReceiptBytes); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("failed to query receipt: %w", err) + } + return &rec, nil +} + +// QueryReceiptByTxHashInBlock returns the receipt for txHash at exactly +// blockNumber, or (nil, nil) if no such receipt exists in files. +func (r *Reader) QueryReceiptByTxHashInBlock(ctx context.Context, files []string, txHash common.Hash, blockNumber uint64) (*parquet.ReceiptResult, error) { + if len(files) == 0 { + return nil, nil + } + parquetFiles := parquetFilesSQL(files) + + // #nosec G201 -- parquetFiles derived from coordinator-owned local file paths. + query := fmt.Sprintf(` + SELECT + tx_hash, block_number, receipt_bytes + FROM read_parquet(%s, union_by_name=true) + WHERE tx_hash = $1 AND block_number = $2 + LIMIT 1 + `, parquetFiles) + + row := r.db.QueryRowContext(ctx, query, txHash[:], blockNumber) + var rec parquet.ReceiptResult + if err := row.Scan(&rec.TxHash, &rec.BlockNumber, &rec.ReceiptBytes); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("failed to query receipt: %w", err) + } + return &rec, nil +} + +// QueryLogs returns logs matching filter from files. Files outside the +// from/to-block window are dropped before the SQL query is built. +func (r *Reader) QueryLogs(ctx context.Context, files []string, filter parquet.LogFilter) ([]parquet.LogResult, error) { + files = r.filterLogFiles(files, filter) + if len(files) == 0 { + return nil, nil + } + return r.queryLogFiles(ctx, files, filter) +} + +// filterLogFiles drops files whose block range cannot overlap the filter's +// [FromBlock, ToBlock] window, computed from the start block in the file +// name and maxBlocksPerFile. +func (r *Reader) filterLogFiles(files []string, filter parquet.LogFilter) []string { + filtered := make([]string, 0, len(files)) + for _, f := range files { + startBlock := parquet.ExtractBlockNumber(f) + if filter.ToBlock != nil && startBlock > *filter.ToBlock { + continue + } + if filter.FromBlock != nil && startBlock+r.maxBlocksPerFile <= *filter.FromBlock { + continue + } + filtered = append(filtered, f) + } + return filtered +} + +// queryLogFiles builds and executes the parametrized DuckDB query that +// applies block, address, and per-position topic predicates, and decodes +// the result rows into parquet.LogResult values. +func (r *Reader) queryLogFiles(ctx context.Context, files []string, filter parquet.LogFilter) ([]parquet.LogResult, error) { + // #nosec G201 -- parquetFiles derived from coordinator-owned local file paths. + query := fmt.Sprintf(` + SELECT + block_number, tx_hash, tx_index, log_index, address, + topic0, topic1, topic2, topic3, data, block_hash, removed + FROM read_parquet(%s, union_by_name=true) + WHERE 1=1 + `, parquetFilesSQL(files)) + + var args []any + argIdx := 1 + + if filter.FromBlock != nil { + query += fmt.Sprintf(" AND block_number >= $%d", argIdx) + args = append(args, *filter.FromBlock) + argIdx++ + } + + if filter.ToBlock != nil { + query += fmt.Sprintf(" AND block_number <= $%d", argIdx) + args = append(args, *filter.ToBlock) + argIdx++ + } + + if len(filter.Addresses) > 0 { + placeholders := make([]string, len(filter.Addresses)) + for i, addr := range filter.Addresses { + placeholders[i] = fmt.Sprintf("$%d", argIdx) + args = append(args, addr[:]) + argIdx++ + } + query += fmt.Sprintf(" AND address IN (%s)", strings.Join(placeholders, ", ")) + } + + topicCols := []string{"topic0", "topic1", "topic2", "topic3"} + for i, topicList := range filter.Topics { + if i >= 4 { + break + } + if len(topicList) == 0 { + continue + } + if len(topicList) == 1 { + query += fmt.Sprintf(" AND %s = $%d", topicCols[i], argIdx) + args = append(args, topicList[0][:]) + argIdx++ + continue + } + + placeholders := make([]string, len(topicList)) + for j, topic := range topicList { + placeholders[j] = fmt.Sprintf("$%d", argIdx) + args = append(args, topic[:]) + argIdx++ + } + query += fmt.Sprintf(" AND %s IN (%s)", topicCols[i], strings.Join(placeholders, ", ")) + } + + query += " ORDER BY block_number, tx_index, log_index" + if filter.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", filter.Limit) + } + + rows, err := r.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query logs: %w", err) + } + defer func() { _ = rows.Close() }() + + var results []parquet.LogResult + for rows.Next() { + var log parquet.LogResult + if err := rows.Scan( + &log.BlockNumber, &log.TxHash, &log.TxIndex, &log.LogIndex, + &log.Address, &log.Topic0, &log.Topic1, &log.Topic2, &log.Topic3, + &log.Data, &log.BlockHash, &log.Removed, + ); err != nil { + return nil, fmt.Errorf("failed to scan log: %w", err) + } + results = append(results, log) + } + + return results, rows.Err() +} + +// MaxReceiptBlockNumber returns the largest block_number observed across +// files. The boolean is false when files is empty or contains no rows; +// negative values surface as an error. +func (r *Reader) MaxReceiptBlockNumber(ctx context.Context, files []string) (uint64, bool, error) { + if len(files) == 0 { + return 0, false, nil + } + + var parquetFiles string + if len(files) == 1 { + parquetFiles = quoteSQLString(files[0]) + } else { + parquetFiles = fmt.Sprintf("[%s]", joinQuoted(files)) + } + + // #nosec G201 -- parquetFiles derived from local file paths + query := fmt.Sprintf("SELECT MAX(block_number) FROM read_parquet(%s, union_by_name=true)", parquetFiles) + row := r.db.QueryRowContext(ctx, query) + var max sql.NullInt64 + if err := row.Scan(&max); err != nil { + return 0, false, fmt.Errorf("failed to query max block number: %w", err) + } + if !max.Valid { + return 0, false, nil + } + if max.Int64 < 0 { + return 0, false, fmt.Errorf("invalid negative block number: %d", max.Int64) + } + return uint64(max.Int64), true, nil +} + +// isFileReadable probes a parquet file by issuing a "SELECT 1 LIMIT 1" +// against it. A failure typically indicates a truncated or corrupt file +// from a crash mid-flush. +func (r *Reader) isFileReadable(path string) bool { + // #nosec G201 -- path comes from local parquet file scans, not user input. + _, err := r.db.Exec(fmt.Sprintf("SELECT 1 FROM read_parquet(%s) LIMIT 1", quoteSQLString(path))) + return err == nil +} + +// configureParquetMetadataCache enables DuckDB's parquet metadata cache. It +// prefers the size-based knob (newer DuckDB) and falls back to the boolean +// toggle on older builds that don't recognize the size setting. +func configureParquetMetadataCache(db *sql.DB) error { + const sizeSetting = "SET parquet_metadata_cache_size = 500" + if _, err := db.Exec(sizeSetting); err == nil { + return nil + } else if !strings.Contains(err.Error(), "unrecognized configuration parameter") { + return fmt.Errorf("failed to configure duckdb (%s): %w", sizeSetting, err) + } + + const toggleSetting = "SET parquet_metadata_cache = true" + if _, err := db.Exec(toggleSetting); err != nil { + return fmt.Errorf("failed to configure duckdb (%s): %w", toggleSetting, err) + } + + return nil +} + +// joinQuoted SQL-quotes each path and joins them with ", " for embedding in +// a DuckDB read_parquet([...]) list literal. +func joinQuoted(files []string) string { + quoted := make([]string, len(files)) + for i, f := range files { + quoted[i] = quoteSQLString(f) + } + return strings.Join(quoted, ", ") +} + +// quoteSQLString wraps s in single quotes and escapes embedded quotes for +// safe inclusion in a DuckDB SQL string literal. +func quoteSQLString(s string) string { + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} + +// parquetFilesSQL renders files as either a single quoted path or a quoted +// list, in either case suitable as the first argument to read_parquet(). +func parquetFilesSQL(files []string) string { + if len(files) == 1 { + return quoteSQLString(files[0]) + } + return fmt.Sprintf("[%s]", joinQuoted(files)) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/requests.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/requests.go new file mode 100644 index 0000000000..9166442cce --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/requests.go @@ -0,0 +1,186 @@ +package coordinator + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" +) + +// coordRequest is the sealed-union marker for messages sent to the +// coordinator goroutine. Each concrete request type lives below and carries +// its own per-request reply channel. dispatch invokes the appropriate +// handler on c and returns true when run() should exit afterward +// (graceful close, simulated crash). +type coordRequest interface { + dispatch(c *Coordinator) (terminate bool) +} + +// writeReq asks the coordinator to append receipts for a block. height is +// authoritative; per-input BlockNumber is ignored. +type writeReq struct { + height uint64 + inputs []parquet.ReceiptInput + resp chan writeResp +} + +// writeResp carries the outcome of a writeReq. +type writeResp struct { + err error +} + +// readByTxHashReq asks the coordinator to look up the earliest receipt for +// txHash. The temp write cache is consulted first, then closed parquet +// files. +type readByTxHashReq struct { + ctx context.Context + txHash common.Hash + resp chan readReceiptResp +} + +// readByTxHashInBlockReq asks for the receipt at exactly blockNumber, used +// to disambiguate replayed transactions across reorgs. +type readByTxHashInBlockReq struct { + ctx context.Context + txHash common.Hash + blockNumber uint64 + resp chan readReceiptResp +} + +// readReceiptResp carries the outcome of a receipt read. result==nil with +// err==nil indicates "not found". +type readReceiptResp struct { + result *parquet.ReceiptResult + err error +} + +// getLogsReq asks the coordinator for logs matching filter across the +// closed log parquet files. +type getLogsReq struct { + ctx context.Context + filter parquet.LogFilter + resp chan getLogsResp +} + +// getLogsResp carries the outcome of a getLogsReq. +type getLogsResp struct { + results []parquet.LogResult + err error +} + +// flushReq asks the coordinator to flush buffered receipts/logs to the open +// parquet file. +type flushReq struct { + resp chan error +} + +// latestVersionReq asks for the highest block height observed by the +// coordinator. +type latestVersionReq struct { + resp chan int64 +} + +// setLatestVersionReq overwrites latestVersion. Used when a caller knows +// the chain height authoritatively (e.g., genesis init). +type setLatestVersionReq struct { + version int64 + resp chan error +} + +// setEarliestVersionReq records the lowest retained block height for +// pruning bookkeeping. +type setEarliestVersionReq struct { + version int64 + resp chan error +} + +// updateLatestVersionReq advances latestVersion only when version is +// strictly greater, preventing rewinds. +type updateLatestVersionReq struct { + version int64 + resp chan error +} + +// fileStartBlockReq asks for the start block of the currently open parquet +// file. +type fileStartBlockReq struct { + resp chan uint64 +} + +// setBlockFlushIntervalReq updates how often (in blocks) the open writers +// are flushed to disk. +type setBlockFlushIntervalReq struct { + interval uint64 + resp chan error +} + +// setMaxBlocksPerFileReq updates the rotation interval and propagates it +// to the reader. +type setMaxBlocksPerFileReq struct { + maxBlocksPerFile uint64 + resp chan error +} + +// setFaultHooksReq installs test hooks. nil disables all hook checks. +type setFaultHooksReq struct { + hooks *parquet.FaultHooks + resp chan error +} + +// simulateCrashReq drops in-memory writer state without flushing so that +// recovery paths can be exercised. Test-only. +type simulateCrashReq struct { + resp chan struct{} +} + +// closeReq triggers a graceful shutdown: flush, close writers, close WAL +// and reader. +type closeReq struct { + resp chan error +} + +func (r writeReq) dispatch(c *Coordinator) bool { c.handleWrite(r); return false } +func (r readByTxHashReq) dispatch(c *Coordinator) bool { c.handleReadByTxHash(r); return false } +func (r readByTxHashInBlockReq) dispatch(c *Coordinator) bool { + c.handleReadByTxHashInBlock(r) + return false +} +func (r getLogsReq) dispatch(c *Coordinator) bool { c.handleGetLogs(r); return false } +func (r flushReq) dispatch(c *Coordinator) bool { c.handleFlush(r); return false } +func (r latestVersionReq) dispatch(c *Coordinator) bool { + c.handleLatestVersion(r) + return false +} +func (r setLatestVersionReq) dispatch(c *Coordinator) bool { + c.handleSetLatestVersion(r) + return false +} +func (r setEarliestVersionReq) dispatch(c *Coordinator) bool { + c.handleSetEarliestVersion(r) + return false +} +func (r updateLatestVersionReq) dispatch(c *Coordinator) bool { + c.handleUpdateLatestVersion(r) + return false +} +func (r fileStartBlockReq) dispatch(c *Coordinator) bool { + c.handleFileStartBlock(r) + return false +} +func (r setBlockFlushIntervalReq) dispatch(c *Coordinator) bool { + c.handleSetBlockFlushInterval(r) + return false +} +func (r setMaxBlocksPerFileReq) dispatch(c *Coordinator) bool { + c.handleSetMaxBlocksPerFile(r) + return false +} +func (r setFaultHooksReq) dispatch(c *Coordinator) bool { + c.handleSetFaultHooks(r) + return false +} +func (r simulateCrashReq) dispatch(c *Coordinator) bool { + c.handleSimulateCrash(r) + return true +} +func (r closeReq) dispatch(c *Coordinator) bool { c.handleClose(r); return true } diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/rotation_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/rotation_test.go new file mode 100644 index 0000000000..97f76a6a5d --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/rotation_test.go @@ -0,0 +1,111 @@ +package coordinator + +import ( + "math/big" + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestRotationBoundaryPrimitives(t *testing.T) { + coord := &Coordinator{ + config: parquet.StoreConfig{MaxBlocksPerFile: 500}, + } + + require.True(t, coord.isRotationBoundary(0)) + require.True(t, coord.isRotationBoundary(500)) + require.False(t, coord.isRotationBoundary(501)) + + coord.config.MaxBlocksPerFile = 0 + require.False(t, coord.isRotationBoundary(500)) +} + +func TestAlignedFileStartBlock(t *testing.T) { + require.Equal(t, uint64(5000), alignedFileStartBlock(5234, 500)) + require.Equal(t, uint64(5000), alignedFileStartBlock(5000, 500)) + require.Equal(t, uint64(0), alignedFileStartBlock(499, 500)) + require.Equal(t, uint64(5234), alignedFileStartBlock(5234, 0)) +} + +func TestWriteRotatesAtAlignedBoundary(t *testing.T) { + wal := &recordingWAL{} + coord := newWriteCoordinator(t, wal) + coord.config.MaxBlocksPerFile = 4 + defer func() { require.NoError(t, coord.closeWriters()) }() + + for block := uint64(1); block <= 4; block++ { + require.NoError(t, coord.writeReceipts(block, []parquet.ReceiptInput{ + testReceiptInput(block, common.BigToHash(new(big.Int).SetUint64(block))), + })) + } + + require.Len(t, coord.closedFiles, 1) + require.Equal(t, uint64(0), coord.closedFiles[0].startBlock) + require.Equal(t, uint64(4), coord.fileStartBlock) + require.FileExists(t, filepath.Join(coord.basePath, "receipts_0.parquet")) + require.FileExists(t, filepath.Join(coord.basePath, "logs_0.parquet")) + require.FileExists(t, filepath.Join(coord.basePath, "receipts_4.parquet")) + require.FileExists(t, filepath.Join(coord.basePath, "logs_4.parquet")) + + require.Len(t, wal.truncatedBefore, 1) + require.Equal(t, uint64(4), wal.truncatedBefore[0]) + require.Len(t, coord.tempWriteCache, 1) + require.Contains(t, coord.tempWriteCache, common.BigToHash(big.NewInt(4))) +} + +func TestRotateOpenFilePrunesOnlyOldTempCacheEntries(t *testing.T) { + txHash := common.HexToHash("0xabc") + coord := &Coordinator{ + tempWriteCache: map[common.Hash][]tempReceipt{ + txHash: { + {blockNumber: 1}, + {blockNumber: 4}, + }, + common.HexToHash("0xdef"): { + {blockNumber: 2}, + }, + }, + } + + coord.dropTempCacheBefore(4) + + require.Len(t, coord.tempWriteCache, 1) + require.Len(t, coord.tempWriteCache[txHash], 1) + require.Equal(t, uint64(4), coord.tempWriteCache[txHash][0].blockNumber) +} + +func TestWriteEmptyHonorsMonotonicLastSeen(t *testing.T) { + coord := newWriteCoordinator(t, &recordingWAL{}) + + require.NoError(t, coord.writeReceipts(5, nil)) + require.Equal(t, uint64(5), coord.lastSeenBlock) + + require.NoError(t, coord.writeReceipts(4, nil)) + require.Equal(t, uint64(5), coord.lastSeenBlock) + require.Empty(t, coord.closedFiles) +} + +func TestWriteEmptyRotatesAtBoundary(t *testing.T) { + wal := &recordingWAL{} + coord := newWriteCoordinator(t, wal) + coord.config.MaxBlocksPerFile = 4 + defer func() { require.NoError(t, coord.closeWriters()) }() + + require.NoError(t, coord.writeReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, common.HexToHash("0x1")), + })) + require.NotNil(t, coord.receiptWriter) + + require.NoError(t, coord.writeReceipts(4, nil)) + + require.Equal(t, uint64(4), coord.lastSeenBlock) + require.Equal(t, uint64(4), coord.fileStartBlock) + require.Len(t, coord.closedFiles, 1) + require.Equal(t, uint64(0), coord.closedFiles[0].startBlock) + require.FileExists(t, filepath.Join(coord.basePath, "receipts_0.parquet")) + require.FileExists(t, filepath.Join(coord.basePath, "receipts_4.parquet")) + require.Empty(t, coord.tempWriteCache) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/types.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/types.go new file mode 100644 index 0000000000..b3ee69a004 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/types.go @@ -0,0 +1,46 @@ +package coordinator + +import ( + "errors" + "fmt" + "math" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" +) + +// ErrStoreClosed is returned when a request is made after the coordinator has +// stopped accepting work. +var ErrStoreClosed = errors.New("store closed") + +// tempReceipt is one entry in the in-memory write cache, indexed by tx +// hash. It carries enough to reconstruct a ReceiptResult for reads served +// before the receipt has been flushed to a parquet file. +type tempReceipt struct { + blockNumber uint64 + receiptBytes []byte +} + +// ReplayedBlock summarizes one block recovered from WAL replay: the block +// number and the tx hashes whose receipts were replayed in order. +type ReplayedBlock struct { + BlockNumber uint64 + TxHashes []common.Hash +} + +// ReplayResult is the outcome of a successful WAL replay: warmup records +// to seed external caches, plus the per-block tx hash listing. +type ReplayResult struct { + WarmupRecords []parquet.ReceiptRecord + Blocks []ReplayedBlock +} + +// int64FromUint64 converts value to int64 or errors on overflow. Used at +// the boundary where block heights cross from internal uint64 storage to +// the sdk-style int64 latestVersion. +func int64FromUint64(value uint64) (int64, error) { + if value > uint64(math.MaxInt64) { + return 0, fmt.Errorf("value %d overflows int64", value) + } + return int64(value), nil +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/wal.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/wal.go new file mode 100644 index 0000000000..7f5d4bd09c --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/wal.go @@ -0,0 +1,212 @@ +package coordinator + +import ( + "fmt" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" +) + +// replayWAL re-applies WAL entries on top of the on-disk parquet state. It +// drives rotation when entries cross a MaxBlocksPerFile boundary (so the +// resulting layout matches what a non-crashing run would have produced), +// applies each receipt to the open writer, and finally truncates WAL +// entries that are now durably persisted. +func (c *Coordinator) replayWAL(converter parquet.WALReceiptConverter) (ReplayResult, error) { + if converter == nil { + return ReplayResult{}, fmt.Errorf("WAL receipt converter is nil") + } + if c.wal == nil { + return ReplayResult{}, nil + } + + firstOffset, err := c.wal.FirstOffset() + if err != nil { + return ReplayResult{}, fmt.Errorf("failed to read parquet WAL first offset: %w", err) + } + if firstOffset == 0 { + return ReplayResult{}, nil + } + lastOffset, err := c.wal.LastOffset() + if err != nil { + return ReplayResult{}, fmt.Errorf("failed to read parquet WAL last offset: %w", err) + } + if lastOffset == 0 { + return ReplayResult{}, nil + } + + var ( + currentBlock uint64 + haveBlock bool + logStartIndex uint + maxBlock uint64 + dropOffset uint64 + ) + + result := ReplayResult{} + replayIdx := make(map[uint64]int) + + err = c.wal.Replay(firstOffset, lastOffset, func(offset uint64, entry parquet.WALEntry) error { + if len(entry.Receipts) == 0 { + return nil + } + + blockNumber := entry.BlockNumber + if blockNumber < c.fileStartBlock { + dropOffset = offset + return nil + } + + if haveBlock && blockNumber != currentBlock && c.isRotationBoundary(blockNumber) && blockNumber > c.fileStartBlock && offset > 0 { + dropOffset = offset - 1 + } + + if !haveBlock || blockNumber != currentBlock { + currentBlock = blockNumber + haveBlock = true + logStartIndex = 0 + } + + for _, receiptBytes := range entry.Receipts { + if len(receiptBytes) == 0 { + continue + } + + replayed, err := converter(blockNumber, receiptBytes, logStartIndex) + if err != nil { + return err + } + logStartIndex += replayed.LogCount + + result.WarmupRecords = append(result.WarmupRecords, copyReceiptRecord(replayed.Warmup)) + if idx, ok := replayIdx[blockNumber]; ok { + result.Blocks[idx].TxHashes = append(result.Blocks[idx].TxHashes, replayed.TxHash) + } else { + replayIdx[blockNumber] = len(result.Blocks) + result.Blocks = append(result.Blocks, ReplayedBlock{ + BlockNumber: blockNumber, + TxHashes: []common.Hash{replayed.TxHash}, + }) + } + + input := normalizeReplayInput(blockNumber, receiptBytes, replayed) + if err := c.applyReceiptFromReplay(blockNumber, input); err != nil { + return err + } + + if blockNumber > maxBlock { + maxBlock = blockNumber + } + } + + return nil + }) + if err != nil { + return ReplayResult{}, err + } + + if maxBlock > 0 { + latest, err := int64FromUint64(maxBlock) + if err != nil { + return ReplayResult{}, err + } + if latest > c.latestVersion { + c.latestVersion = latest + } + } + if err := truncateReplayWAL(c.wal, dropOffset); err != nil { + return ReplayResult{}, err + } + return result, nil +} + +// applyReceiptFromReplay is the replay-time variant of applyReceipt: it +// rotates without writing to the WAL (the WAL is the source of replay) and +// drops temp-cache entries from the just-closed file's range. +func (c *Coordinator) applyReceiptFromReplay(blockNumber uint64, input parquet.ReceiptInput) error { + if c.receiptWriter != nil && blockNumber != c.lastSeenBlock && c.isRotationBoundary(blockNumber) { + if err := c.rotateOpenFileWithoutWAL(blockNumber); err != nil { + return err + } + c.dropTempCacheBefore(c.fileStartBlock) + } + return c.applyReceipt(blockNumber, input) +} + +// normalizeReplayInput backfills the ReceiptInput fields that the converter +// may have left empty (block number, tx hash, and the receipt byte +// payloads), so downstream apply code doesn't need replay-aware branches. +func normalizeReplayInput(blockNumber uint64, receiptBytes []byte, replayed parquet.ReplayReceipt) parquet.ReceiptInput { + input := replayed.Input + input.Receipt.BlockNumber = blockNumber + if len(input.Receipt.TxHash) == 0 { + input.Receipt.TxHash = append([]byte(nil), replayed.TxHash[:]...) + } + if len(input.Receipt.ReceiptBytes) == 0 { + input.Receipt.ReceiptBytes = append([]byte(nil), receiptBytes...) + } + if len(input.ReceiptBytes) == 0 { + input.ReceiptBytes = append([]byte(nil), receiptBytes...) + } + return input +} + +// copyReceiptRecord returns a deep copy of record so callers can retain it +// without aliasing the converter's internal buffers. +func copyReceiptRecord(record parquet.ReceiptRecord) parquet.ReceiptRecord { + return parquet.ReceiptRecord{ + TxHash: append([]byte(nil), record.TxHash...), + BlockNumber: record.BlockNumber, + ReceiptBytes: append([]byte(nil), record.ReceiptBytes...), + } +} + +// clearWALPreservingLast truncates the WAL up to (but not including) its +// last offset after a rotation. The final entry is retained so that crash +// recovery can still observe the rotation boundary. +func (c *Coordinator) clearWALPreservingLast() error { + if c.wal == nil { + return nil + } + firstOffset, err := c.wal.FirstOffset() + if err != nil { + return fmt.Errorf("failed to read parquet WAL first offset: %w", err) + } + if firstOffset == 0 { + return nil + } + lastOffset, err := c.wal.LastOffset() + if err != nil { + return fmt.Errorf("failed to read parquet WAL last offset: %w", err) + } + if lastOffset == 0 { + return nil + } + if lastOffset <= firstOffset { + return nil + } + if err := c.wal.TruncateBefore(lastOffset); err != nil { + if strings.Contains(err.Error(), "out of range") { + return nil + } + return fmt.Errorf("failed to truncate parquet WAL before offset %d: %w", lastOffset, err) + } + return nil +} + +// truncateReplayWAL drops WAL entries up to and including dropOffset after +// a successful replay. Out-of-range errors from the underlying WAL are +// treated as no-ops since they mean nothing was left to truncate. +func truncateReplayWAL(w interface{ TruncateBefore(offset uint64) error }, dropOffset uint64) error { + if dropOffset == 0 { + return nil + } + if err := w.TruncateBefore(dropOffset + 1); err != nil { + if strings.Contains(err.Error(), "out of range") { + return nil + } + return fmt.Errorf("failed to truncate replay WAL before offset %d: %w", dropOffset+1, err) + } + return nil +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/wal_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/wal_test.go new file mode 100644 index 0000000000..50e9ced8fb --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/wal_test.go @@ -0,0 +1,110 @@ +package coordinator + +import ( + "context" + "errors" + "math/big" + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestReplayWALAppliesReceiptsAndPreservesDuplicateHashes(t *testing.T) { + wal := replayWALWithEntries(t, + parquet.WALEntry{BlockNumber: 1, Receipts: [][]byte{{7, 1}, {7, 2}}}, + parquet.WALEntry{BlockNumber: 2, Receipts: [][]byte{{8, 1}}}, + ) + coord := newReplayCoordinator(t, wal) + defer func() { require.NoError(t, coord.closeWriters()) }() + + result, err := coord.replayWAL(replayConverterForTest) + require.NoError(t, err) + + duplicateHash := common.BigToHash(new(big.Int).SetUint64(7)) + require.Len(t, result.WarmupRecords, 3) + require.Len(t, result.Blocks, 2) + require.Equal(t, uint64(1), result.Blocks[0].BlockNumber) + require.Equal(t, []common.Hash{duplicateHash, duplicateHash}, result.Blocks[0].TxHashes) + require.Len(t, coord.tempWriteCache[duplicateHash], 2) + require.Equal(t, int64(2), coord.latestVersion) + require.Empty(t, wal.truncatedBefore) +} + +func TestReplayWALSkipsEntriesBeforeFileStartAndTruncates(t *testing.T) { + wal := replayWALWithEntries(t, + parquet.WALEntry{BlockNumber: 2, Receipts: [][]byte{{2}}}, + parquet.WALEntry{BlockNumber: 4, Receipts: [][]byte{{4}}}, + ) + coord := newReplayCoordinator(t, wal) + coord.fileStartBlock = 4 + defer func() { require.NoError(t, coord.closeWriters()) }() + + result, err := coord.replayWAL(func(blockNumber uint64, receiptBytes []byte, logStartIndex uint) (parquet.ReplayReceipt, error) { + require.NotEqual(t, uint64(2), blockNumber) + return replayConverterForTest(blockNumber, receiptBytes, logStartIndex) + }) + require.NoError(t, err) + + require.Len(t, result.WarmupRecords, 1) + require.Equal(t, uint64(4), result.WarmupRecords[0].BlockNumber) + require.Equal(t, []uint64{2}, wal.truncatedBefore) + require.Equal(t, int64(4), coord.latestVersion) +} + +func TestReplayWALRotatesBoundaryWithoutClearingWAL(t *testing.T) { + wal := replayWALWithEntries(t, + parquet.WALEntry{BlockNumber: 1, Receipts: [][]byte{{1}}}, + parquet.WALEntry{BlockNumber: 4, Receipts: [][]byte{{4}}}, + ) + coord := newReplayCoordinator(t, wal) + defer func() { require.NoError(t, coord.closeWriters()) }() + + _, err := coord.replayWAL(replayConverterForTest) + require.NoError(t, err) + + require.Len(t, coord.closedFiles, 1) + require.Equal(t, uint64(0), coord.closedFiles[0].startBlock) + require.Equal(t, uint64(4), coord.fileStartBlock) + require.FileExists(t, filepath.Join(coord.basePath, "receipts_0.parquet")) + require.FileExists(t, filepath.Join(coord.basePath, "receipts_4.parquet")) + require.Equal(t, []uint64{2}, wal.truncatedBefore) + require.Len(t, coord.tempWriteCache, 1) + require.Contains(t, coord.tempWriteCache, common.BigToHash(new(big.Int).SetUint64(4))) +} + +func TestNewClosesReplayWritersOnReplayError(t *testing.T) { + dir := t.TempDir() + wal, err := parquet.NewWAL(filepath.Join(dir, "parquet-wal")) + require.NoError(t, err) + require.NoError(t, wal.Write(parquet.WALEntry{BlockNumber: 1, Receipts: [][]byte{{1}}})) + require.NoError(t, wal.Write(parquet.WALEntry{BlockNumber: 2, Receipts: [][]byte{{2}}})) + require.NoError(t, wal.Close()) + + calls := 0 + _, err = New(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 4, + WALConverter: func(blockNumber uint64, receiptBytes []byte, logStartIndex uint) (parquet.ReplayReceipt, error) { + calls++ + if calls == 2 { + return parquet.ReplayReceipt{}, errors.New("injected replay failure") + } + return replayConverterForTest(blockNumber, receiptBytes, logStartIndex) + }, + }) + require.ErrorContains(t, err, "injected replay failure") + require.Equal(t, 2, calls) + + reader, err := NewReaderWithMaxBlocksPerFile(dir, 4) + require.NoError(t, err) + defer func() { require.NoError(t, reader.Close()) }() + + ctx := context.Background() + _, err = reader.QueryReceiptByTxHash(ctx, []string{filepath.Join(dir, "receipts_0.parquet")}, common.BigToHash(new(big.Int).SetUint64(1))) + require.NoError(t, err) + _, err = reader.QueryLogs(ctx, []string{filepath.Join(dir, "logs_0.parquet")}, parquet.LogFilter{}) + require.NoError(t, err) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/write_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/write_test.go new file mode 100644 index 0000000000..591ca26e24 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/write_test.go @@ -0,0 +1,68 @@ +package coordinator + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestWriteReceiptsWritesOneWALEntryPerCall(t *testing.T) { + wal := &recordingWAL{} + coord := newWriteCoordinator(t, wal) + defer func() { require.NoError(t, coord.closeWriters()) }() + + require.NoError(t, coord.writeReceipts(2, []parquet.ReceiptInput{ + testReceiptInput(2, common.HexToHash("0x22")), + testReceiptInput(2, common.HexToHash("0x23")), + })) + require.NoError(t, coord.writeReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, common.HexToHash("0x11")), + })) + + require.Len(t, wal.entries, 2) + require.Equal(t, uint64(2), wal.entries[0].BlockNumber) + require.Len(t, wal.entries[0].Receipts, 2) + require.Equal(t, uint64(1), wal.entries[1].BlockNumber) + require.Len(t, wal.entries[1].Receipts, 1) +} + +func TestWriteReceiptsKeepsDuplicateHashCacheEntries(t *testing.T) { + wal := &recordingWAL{} + coord := newWriteCoordinator(t, wal) + defer func() { require.NoError(t, coord.closeWriters()) }() + + txHash := common.HexToHash("0xabc") + require.NoError(t, coord.writeReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, txHash), + })) + require.NoError(t, coord.writeReceipts(2, []parquet.ReceiptInput{ + testReceiptInput(2, txHash), + })) + + require.Len(t, coord.receiptsBuffer, 2) + require.Equal(t, int64(2), coord.latestVersion) + require.Len(t, coord.tempWriteCache[txHash], 2) + require.Equal(t, uint64(1), coord.tempWriteCache[txHash][0].blockNumber) + require.Equal(t, uint64(2), coord.tempWriteCache[txHash][1].blockNumber) +} + +func TestWriteReceiptsFlushesAtConfiguredBlockInterval(t *testing.T) { + wal := &recordingWAL{} + coord := newWriteCoordinator(t, wal) + coord.config.BlockFlushInterval = 1 + defer func() { require.NoError(t, coord.closeWriters()) }() + + require.NoError(t, coord.writeReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, common.HexToHash("0x1")), + })) + require.NoError(t, coord.writeReceipts(2, []parquet.ReceiptInput{ + testReceiptInput(2, common.HexToHash("0x2")), + })) + + require.Empty(t, coord.receiptsBuffer) + require.Empty(t, coord.logsBuffer) + require.Zero(t, coord.blocksSinceFlush) + require.Equal(t, int64(2), coord.latestVersion) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/helpers_test.go b/sei-db/ledger_db/receipt/parquet_v2/helpers_test.go new file mode 100644 index 0000000000..26ee5f7573 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/helpers_test.go @@ -0,0 +1,79 @@ +package parquet_v2 + +import ( + "fmt" + "math/big" + "os" + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/common" + parquetgo "github.com/parquet-go/parquet-go" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func testReceiptInput(blockNumber uint64, txHash common.Hash) parquet.ReceiptInput { + receiptBytes := []byte{byte(blockNumber), txHash[31]} + return parquet.ReceiptInput{ + BlockNumber: blockNumber, + Receipt: parquet.ReceiptRecord{ + TxHash: txHash[:], + BlockNumber: blockNumber, + ReceiptBytes: receiptBytes, + }, + Logs: []parquet.LogRecord{{ + BlockNumber: blockNumber, + TxHash: txHash[:], + Address: common.BigToAddress(new(big.Int).SetUint64(blockNumber)).Bytes(), + }}, + ReceiptBytes: receiptBytes, + } +} + +func writeReceiptFile(t *testing.T, dir string, startBlock uint64, blocks []uint64) { + t.Helper() + + path := filepath.Join(dir, fmt.Sprintf("receipts_%d.parquet", startBlock)) + f, err := os.Create(path) + require.NoError(t, err) + + w := parquetgo.NewGenericWriter[parquet.ReceiptRecord](f) + for _, block := range blocks { + txHash := common.BigToHash(new(big.Int).SetUint64(block)) + _, err := w.Write([]parquet.ReceiptRecord{{ + TxHash: txHash[:], + BlockNumber: block, + ReceiptBytes: []byte{byte(block)}, + }}) + require.NoError(t, err) + } + require.NoError(t, w.Close()) + require.NoError(t, f.Close()) +} + +func writeLogFile(t *testing.T, dir string, startBlock uint64) { + t.Helper() + + path := filepath.Join(dir, fmt.Sprintf("logs_%d.parquet", startBlock)) + f, err := os.Create(path) + require.NoError(t, err) + + w := parquetgo.NewGenericWriter[parquet.LogRecord](f) + txHash := common.BigToHash(new(big.Int).SetUint64(startBlock)) + _, err = w.Write([]parquet.LogRecord{{ + BlockNumber: startBlock, + TxHash: txHash[:], + }}) + require.NoError(t, err) + require.NoError(t, w.Close()) + require.NoError(t, f.Close()) +} + +func logBlockNumbers(results []parquet.LogResult) []uint64 { + blocks := make([]uint64, 0, len(results)) + for _, result := range results { + blocks = append(blocks, result.BlockNumber) + } + return blocks +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/store.go b/sei-db/ledger_db/receipt/parquet_v2/store.go new file mode 100644 index 0000000000..25bce6df85 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/store.go @@ -0,0 +1,143 @@ +package parquet_v2 + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt/parquet_v2/coordinator" +) + +// Store is the public facade of the V2 parquet receipt store. It wraps a +// coordinator.Coordinator and forwards all calls to it. +// +// Store does not directly implement the receipt.ReceiptStore interface; +// the parquetReceiptStoreV2 wrapper in the parent package adapts Store to +// that interface (handling tx-hash indexing, replay glue, and the higher +// level ReceiptStore method shapes). Methods here are documented inline +// rather than via a parent interface. +type Store struct { + coord *coordinator.Coordinator +} + +// NewStore creates a V2 store backed by a live coordinator goroutine. +// cfg.WALConverter, when non-nil, drives WAL replay synchronously before +// the store accepts other calls; production callers always supply one. +// Lower-level tests that drive replay manually leave it nil. After +// construction, callers drain WarmupRecords and ReplayedBlocks to +// re-seed external caches and indexes. +func NewStore(cfg parquet.StoreConfig) (*Store, error) { + c, err := coordinator.New(cfg) + if err != nil { + return nil, err + } + return &Store{coord: c}, nil +} + +// WriteReceipts appends receipts for the block at height. height is +// authoritative; any BlockNumber on individual inputs is ignored. +func (s *Store) WriteReceipts(height uint64, inputs []parquet.ReceiptInput) error { + return s.coord.WriteReceipts(height, inputs) +} + +// GetReceiptByTxHash returns the earliest receipt for txHash, or +// (nil, nil) if none is found. +func (s *Store) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*parquet.ReceiptResult, error) { + return s.coord.GetReceiptByTxHash(ctx, txHash) +} + +// GetReceiptByTxHashInBlock returns the receipt for txHash at exactly +// blockNumber, or (nil, nil) if no such receipt exists. +func (s *Store) GetReceiptByTxHashInBlock(ctx context.Context, txHash common.Hash, blockNumber uint64) (*parquet.ReceiptResult, error) { + return s.coord.GetReceiptByTxHashInBlock(ctx, txHash, blockNumber) +} + +// GetLogs returns all logs matching filter across the closed log parquet +// files. +func (s *Store) GetLogs(ctx context.Context, filter parquet.LogFilter) ([]parquet.LogResult, error) { + return s.coord.GetLogs(ctx, filter) +} + +// FileStartBlock returns the start block of the currently open parquet +// file (the next file's name will be derived from this). +func (s *Store) FileStartBlock() uint64 { + return s.coord.FileStartBlock() +} + +// LatestVersion returns the highest block height the store has observed. +func (s *Store) LatestVersion() int64 { + return s.coord.LatestVersion() +} + +// SetLatestVersion overwrites latestVersion. Used during init paths where +// the chain height is known authoritatively. +func (s *Store) SetLatestVersion(version int64) { + s.coord.SetLatestVersion(version) +} + +// SetEarliestVersion records the lowest retained block height for pruning +// bookkeeping. +func (s *Store) SetEarliestVersion(version int64) { + s.coord.SetEarliestVersion(version) +} + +// UpdateLatestVersion advances latestVersion only when version is strictly +// greater than the current value, preventing accidental rewinds. +func (s *Store) UpdateLatestVersion(version int64) { + s.coord.UpdateLatestVersion(version) +} + +// CacheRotateInterval returns the cache rotation interval (configured +// MaxBlocksPerFile) used by the wrapper to manage warmup state. +func (s *Store) CacheRotateInterval() uint64 { + return s.coord.CacheRotateInterval() +} + +// Flush forces buffered receipts/logs in the open parquet file to disk. +func (s *Store) Flush() error { + return s.coord.Flush() +} + +// Close performs a graceful shutdown, flushing and closing all writers, +// the WAL, and the reader. +func (s *Store) Close() error { + return s.coord.Close() +} + +// SimulateCrash drops in-memory writer state without flushing. Test-only; +// used to exercise WAL recovery in the surrounding test suite. +func (s *Store) SimulateCrash() { + s.coord.SimulateCrash() +} + +// SetBlockFlushInterval updates how often (in blocks) the open writers are +// flushed to disk. +func (s *Store) SetBlockFlushInterval(interval uint64) { + s.coord.SetBlockFlushInterval(interval) +} + +// SetMaxBlocksPerFile updates the rotation interval and propagates it to +// the reader. +func (s *Store) SetMaxBlocksPerFile(n uint64) { + s.coord.SetMaxBlocksPerFile(n) +} + +// SetFaultHooks installs the supplied test hooks. nil disables all hook +// checks. +func (s *Store) SetFaultHooks(hooks *parquet.FaultHooks) { + s.coord.SetFaultHooks(hooks) +} + +// WarmupRecords returns and clears the warmup receipt records recovered +// during construction-time WAL replay. Wrappers drain this once after +// NewStore returns to seed an external receipt cache. +func (s *Store) WarmupRecords() []parquet.ReceiptRecord { + return s.coord.WarmupRecords() +} + +// ReplayedBlocks returns and clears the per-block tx-hash listing +// recovered during construction-time WAL replay. Wrappers drain this +// once after NewStore returns to repopulate an external tx-hash index. +func (s *Store) ReplayedBlocks() []ReplayedBlock { + return s.coord.ReplayedBlocks() +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/store_dispatch_test.go b/sei-db/ledger_db/receipt/parquet_v2/store_dispatch_test.go new file mode 100644 index 0000000000..d2fc046959 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/store_dispatch_test.go @@ -0,0 +1,62 @@ +package parquet_v2 + +import ( + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func newDispatchStore(t *testing.T) *Store { + t.Helper() + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: t.TempDir(), + MaxBlocksPerFile: 4, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + return store +} + +func TestMetadataAndConfigRequestsDispatchThroughCoordinator(t *testing.T) { + store := newDispatchStore(t) + + require.Equal(t, uint64(0), store.FileStartBlock()) + require.Equal(t, int64(0), store.LatestVersion()) + require.Equal(t, uint64(4), store.CacheRotateInterval()) + + store.SetLatestVersion(10) + require.Equal(t, int64(10), store.LatestVersion()) + + store.UpdateLatestVersion(8) + require.Equal(t, int64(10), store.LatestVersion()) + + store.UpdateLatestVersion(12) + require.Equal(t, int64(12), store.LatestVersion()) + + store.SetEarliestVersion(3) + store.SetBlockFlushInterval(2) + store.SetFaultHooks(&parquet.FaultHooks{}) + + store.SetMaxBlocksPerFile(3) + require.Equal(t, uint64(3), store.CacheRotateInterval()) +} + +func TestCloseStopsFutureRequests(t *testing.T) { + store, err := NewStore(parquet.StoreConfig{DBDirectory: t.TempDir()}) + require.NoError(t, err) + + require.NoError(t, store.Close()) + require.ErrorIs(t, store.WriteReceipts(0, nil), ErrStoreClosed) + require.NoError(t, store.Close()) +} + +func TestSimulateCrashStopsFutureRequests(t *testing.T) { + store, err := NewStore(parquet.StoreConfig{DBDirectory: t.TempDir()}) + require.NoError(t, err) + + store.SimulateCrash() + require.ErrorIs(t, store.WriteReceipts(0, nil), ErrStoreClosed) + require.NoError(t, store.Close()) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/store_init_test.go b/sei-db/ledger_db/receipt/parquet_v2/store_init_test.go new file mode 100644 index 0000000000..9ab401f309 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/store_init_test.go @@ -0,0 +1,109 @@ +package parquet_v2 + +import ( + "os" + "path/filepath" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestNewStoreCreatesDirectoryAndClosesIdempotently(t *testing.T) { + dir := filepath.Join(t.TempDir(), "nested", "parquet") + + store, err := NewStore(parquet.StoreConfig{DBDirectory: dir}) + require.NoError(t, err) + require.DirExists(t, dir) + require.DirExists(t, filepath.Join(dir, "parquet-wal")) + + require.NoError(t, store.Flush()) + require.NoError(t, store.Close()) + require.NoError(t, store.Close()) +} + +func TestNewStoreSeedsLatestVersionFromClosedFiles(t *testing.T) { + dir := t.TempDir() + writeReceiptFile(t, dir, 100, []uint64{101, 123}) + writeLogFile(t, dir, 100) + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 100, + }) + require.NoError(t, err) + require.Equal(t, int64(123), store.LatestVersion()) + require.Equal(t, uint64(124), store.FileStartBlock()) + require.NoError(t, store.Close()) + + reopened, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 100, + }) + require.NoError(t, err) + require.Equal(t, int64(123), reopened.LatestVersion()) + require.Equal(t, uint64(124), reopened.FileStartBlock()) + require.NoError(t, reopened.Close()) +} + +func TestNewStoreRemovesCorruptTrailingPair(t *testing.T) { + dir := t.TempDir() + writeReceiptFile(t, dir, 0, []uint64{1}) + writeLogFile(t, dir, 0) + + corruptReceipt := filepath.Join(dir, "receipts_500.parquet") + require.NoError(t, os.WriteFile(corruptReceipt, []byte("not parquet"), 0o644)) + corruptLog := filepath.Join(dir, "logs_500.parquet") + require.NoError(t, os.WriteFile(corruptLog, []byte("not parquet"), 0o644)) + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + }) + require.NoError(t, err) + require.NoError(t, store.Close()) + + _, err = os.Stat(corruptReceipt) + require.True(t, os.IsNotExist(err), "corrupt receipt file should be deleted") + _, err = os.Stat(corruptLog) + require.True(t, os.IsNotExist(err), "corrupt log file should be deleted") +} + +func TestNewStoreRemovesReceiptCounterpartForCorruptTrailingLog(t *testing.T) { + dir := t.TempDir() + writeReceiptFile(t, dir, 0, []uint64{1}) + writeLogFile(t, dir, 0) + writeReceiptFile(t, dir, 500, []uint64{501}) + + corruptLog := filepath.Join(dir, "logs_500.parquet") + require.NoError(t, os.WriteFile(corruptLog, []byte("not parquet"), 0o644)) + receiptCounterpart := filepath.Join(dir, "receipts_500.parquet") + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + }) + require.NoError(t, err) + require.Equal(t, int64(1), store.LatestVersion()) + require.NoError(t, store.Close()) + + _, err = os.Stat(receiptCounterpart) + require.True(t, os.IsNotExist(err), "receipt counterpart should be deleted") + _, err = os.Stat(corruptLog) + require.True(t, os.IsNotExist(err), "corrupt log file should be deleted") +} + +func TestNewStoreIgnoresUnmatchedFiles(t *testing.T) { + dir := t.TempDir() + writeReceiptFile(t, dir, 0, []uint64{1}) + writeLogFile(t, dir, 500) + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + }) + require.NoError(t, err) + require.Equal(t, int64(0), store.LatestVersion()) + require.Equal(t, uint64(0), store.FileStartBlock()) + require.NoError(t, store.Close()) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/store_read_test.go b/sei-db/ledger_db/receipt/parquet_v2/store_read_test.go new file mode 100644 index 0000000000..2bf454d70a --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/store_read_test.go @@ -0,0 +1,131 @@ +package parquet_v2 + +import ( + "context" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestReadByTxHashFallsThroughToClosedFiles(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + txHash := common.HexToHash("0xabc") + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 10, + }) + require.NoError(t, err) + require.NoError(t, store.WriteReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, txHash), + })) + require.NoError(t, store.WriteReceipts(2, []parquet.ReceiptInput{ + testReceiptInput(2, txHash), + })) + require.NoError(t, store.Close()) + + reopened, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 10, + }) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, reopened.Close()) }) + + result, err := reopened.GetReceiptByTxHash(ctx, txHash) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, uint64(1), result.BlockNumber) + + result, err = reopened.GetReceiptByTxHashInBlock(ctx, txHash, 2) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, uint64(2), result.BlockNumber) + require.Equal(t, testReceiptInput(2, txHash).ReceiptBytes, result.ReceiptBytes) +} + +func TestReadByTxHashAfterRotationUsesClosedFilesAndTempCache(t *testing.T) { + ctx := context.Background() + txHash := common.HexToHash("0xabc") + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: t.TempDir(), + MaxBlocksPerFile: 4, + }) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, store.Close()) }) + + require.NoError(t, store.WriteReceipts(1, []parquet.ReceiptInput{ + testReceiptInput(1, txHash), + })) + require.NoError(t, store.WriteReceipts(2, []parquet.ReceiptInput{ + testReceiptInput(2, common.HexToHash("0x2")), + })) + require.NoError(t, store.WriteReceipts(3, []parquet.ReceiptInput{ + testReceiptInput(3, common.HexToHash("0x3")), + })) + require.NoError(t, store.WriteReceipts(4, []parquet.ReceiptInput{ + testReceiptInput(4, common.HexToHash("0x4")), + })) + require.NoError(t, store.WriteReceipts(5, []parquet.ReceiptInput{ + testReceiptInput(5, txHash), + })) + + closedResult, err := store.GetReceiptByTxHashInBlock(ctx, txHash, 1) + require.NoError(t, err) + require.NotNil(t, closedResult) + require.Equal(t, uint64(1), closedResult.BlockNumber) + + openResult, err := store.GetReceiptByTxHashInBlock(ctx, txHash, 5) + require.NoError(t, err) + require.NotNil(t, openResult) + require.Equal(t, uint64(5), openResult.BlockNumber) + require.Equal(t, testReceiptInput(5, txHash).ReceiptBytes, openResult.ReceiptBytes) +} + +func TestGetLogsReadsAcrossClosedFiles(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 4, + }) + require.NoError(t, err) + + for block := uint64(1); block <= 8; block++ { + require.NoError(t, store.WriteReceipts(block, []parquet.ReceiptInput{ + testReceiptInput(block, common.BigToHash(new(big.Int).SetUint64(block))), + })) + } + require.NoError(t, store.Close()) + + reopened, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 4, + }) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, reopened.Close()) }) + + from, to := uint64(2), uint64(6) + results, err := reopened.GetLogs(ctx, parquet.LogFilter{ + FromBlock: &from, + ToBlock: &to, + }) + require.NoError(t, err) + require.Len(t, results, 5) + require.Equal(t, []uint64{2, 3, 4, 5, 6}, logBlockNumbers(results)) + + address := common.BigToAddress(new(big.Int).SetUint64(5)) + results, err = reopened.GetLogs(ctx, parquet.LogFilter{ + FromBlock: &from, + ToBlock: &to, + Addresses: []common.Address{address}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, uint64(5), results[0].BlockNumber) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/store_rotation_test.go b/sei-db/ledger_db/receipt/parquet_v2/store_rotation_test.go new file mode 100644 index 0000000000..1eb6530d07 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/store_rotation_test.go @@ -0,0 +1,75 @@ +package parquet_v2 + +import ( + "os" + "path/filepath" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestLazyInitUsesAlignedStartForFirstOffBoundaryWrite(t *testing.T) { + dir := t.TempDir() + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + }) + require.NoError(t, err) + + require.NoError(t, store.WriteReceipts(5234, []parquet.ReceiptInput{ + testReceiptInput(5234, common.HexToHash("0x5234")), + })) + require.NoError(t, store.Close()) + + require.FileExists(t, filepath.Join(dir, "receipts_5000.parquet")) + require.FileExists(t, filepath.Join(dir, "logs_5000.parquet")) +} + +func TestReopenLazyInitPreservesExistingAlignedFile(t *testing.T) { + dir := t.TempDir() + writeReceiptFile(t, dir, 10, []uint64{10}) + writeLogFile(t, dir, 10) + + alignedFile := filepath.Join(dir, "receipts_10.parquet") + infoBefore, err := os.Stat(alignedFile) + require.NoError(t, err) + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 10, + }) + require.NoError(t, err) + require.Equal(t, uint64(11), store.FileStartBlock()) + + require.NoError(t, store.WriteReceipts(11, []parquet.ReceiptInput{ + testReceiptInput(11, common.HexToHash("0x11")), + })) + require.NoError(t, store.Close()) + + infoAfter, err := os.Stat(alignedFile) + require.NoError(t, err) + require.Equal(t, infoBefore.Size(), infoAfter.Size(), "existing aligned file must not be truncated") + require.FileExists(t, filepath.Join(dir, "receipts_11.parquet")) +} + +func TestReopenLazyInitUsesAlignedStartOnGap(t *testing.T) { + dir := t.TempDir() + writeReceiptFile(t, dir, 10, []uint64{10}) + writeLogFile(t, dir, 10) + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 10, + }) + require.NoError(t, err) + + require.NoError(t, store.WriteReceipts(25, []parquet.ReceiptInput{ + testReceiptInput(25, common.HexToHash("0x25")), + })) + require.NoError(t, store.Close()) + + require.FileExists(t, filepath.Join(dir, "receipts_20.parquet")) + require.FileExists(t, filepath.Join(dir, "logs_20.parquet")) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/store_write_test.go b/sei-db/ledger_db/receipt/parquet_v2/store_write_test.go new file mode 100644 index 0000000000..4e4adfe224 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/store_write_test.go @@ -0,0 +1,40 @@ +package parquet_v2 + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +func TestWriteReceiptsUpdatesLatestAndReopens(t *testing.T) { + dir := t.TempDir() + + store, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + BlockFlushInterval: 100, + PruneIntervalSeconds: 0, + }) + require.NoError(t, err) + + for block := uint64(1); block <= 3; block++ { + require.NoError(t, store.WriteReceipts(block, []parquet.ReceiptInput{ + testReceiptInput(block, common.BigToHash(new(big.Int).SetUint64(block))), + })) + } + require.Equal(t, int64(3), store.LatestVersion()) + require.NoError(t, store.Close()) + + reopened, err := NewStore(parquet.StoreConfig{ + DBDirectory: dir, + MaxBlocksPerFile: 500, + PruneIntervalSeconds: 0, + }) + require.NoError(t, err) + require.Equal(t, int64(3), reopened.LatestVersion()) + require.Equal(t, uint64(4), reopened.FileStartBlock()) + require.NoError(t, reopened.Close()) +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/types.go b/sei-db/ledger_db/receipt/parquet_v2/types.go new file mode 100644 index 0000000000..a7c5af5781 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/types.go @@ -0,0 +1,10 @@ +package parquet_v2 + +import "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt/parquet_v2/coordinator" + +type ( + ReplayResult = coordinator.ReplayResult + ReplayedBlock = coordinator.ReplayedBlock +) + +var ErrStoreClosed = coordinator.ErrStoreClosed diff --git a/sei-db/ledger_db/receipt/parquet_v2_crash_test.go b/sei-db/ledger_db/receipt/parquet_v2_crash_test.go new file mode 100644 index 0000000000..aa69fb751c --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2_crash_test.go @@ -0,0 +1,131 @@ +package receipt + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt/parquet_v2" + "github.com/stretchr/testify/require" +) + +func simulateCrashV2(store ReceiptStore, pqStore *parquet_v2.Store) { + CloseTxHashIndex(store) + pqStore.SimulateCrash() +} + +func TestParquetV2CrashRecoveryAtEachHookPoint(t *testing.T) { + type hookSetup struct { + name string + needsRotation bool + install func(h *parquet.FaultHooks, trigger func() error) + } + + hookPoints := []hookSetup{ + { + name: "AfterWALWrite", + install: func(h *parquet.FaultHooks, trigger func() error) { + h.AfterWALWrite = func(uint64) error { return trigger() } + }, + }, + { + name: "BeforeFlush", + install: func(h *parquet.FaultHooks, trigger func() error) { + h.BeforeFlush = func(uint64) error { return trigger() } + }, + }, + { + name: "AfterFlush", + install: func(h *parquet.FaultHooks, trigger func() error) { + h.AfterFlush = func(uint64) error { return trigger() } + }, + }, + { + name: "AfterCloseWriters", + needsRotation: true, + install: func(h *parquet.FaultHooks, trigger func() error) { + h.AfterCloseWriters = func(uint64) error { return trigger() } + }, + }, + { + name: "AfterWALClear", + needsRotation: true, + install: func(h *parquet.FaultHooks, trigger func() error) { + h.AfterWALClear = func(uint64) error { return trigger() } + }, + }, + } + + for _, hp := range hookPoints { + t.Run(hp.name, func(t *testing.T) { + ctx, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "parquet_v2" + cfg.DBDirectory = t.TempDir() + + store, err := NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + + pqStore := extractParquetV2Store(t, store) + pqStore.SetMaxBlocksPerFile(4) + + preBlocks := uint64(2) + if hp.needsRotation { + preBlocks = 3 + } + + addr := common.HexToAddress("0x1") + for block := uint64(1); block <= preBlocks; block++ { + txHash := blockTxHash(block) + receipt := makeTestReceipt(txHash, block, 0, addr, nil) + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(int64(block)), []ReceiptRecord{ + {TxHash: txHash, Receipt: receipt}, + })) + } + + fired := false + hooks := &parquet.FaultHooks{} + hp.install(hooks, func() error { + if !fired { + fired = true + return errSimulatedCrash + } + return nil + }) + pqStore.SetFaultHooks(hooks) + + crashBlock := preBlocks + 1 + crashTxHash := blockTxHash(crashBlock) + crashReceipt := makeTestReceipt(crashTxHash, crashBlock, 0, addr, nil) + err = store.SetReceipts(ctx.WithBlockHeight(int64(crashBlock)), []ReceiptRecord{ + {TxHash: crashTxHash, Receipt: crashReceipt}, + }) + require.ErrorIs(t, err, errSimulatedCrash) + + simulateCrashV2(store, pqStore) + + store, err = NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + extractParquetV2Store(t, store).SetMaxBlocksPerFile(4) + + for block := uint64(1); block <= crashBlock; block++ { + txHash := blockTxHash(block) + got, err := store.GetReceiptFromStore(ctx, txHash) + require.NoError(t, err, "block %d not recovered", block) + require.Equal(t, txHash.Hex(), got.TxHashHex) + } + + postBlock := crashBlock + 1 + postTxHash := blockTxHash(postBlock) + postReceipt := makeTestReceipt(postTxHash, postBlock, 0, addr, nil) + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(int64(postBlock)), []ReceiptRecord{ + {TxHash: postTxHash, Receipt: postReceipt}, + })) + got, err := store.GetReceiptFromStore(ctx, postTxHash) + require.NoError(t, err) + require.Equal(t, postTxHash.Hex(), got.TxHashHex) + }) + } +} diff --git a/sei-db/ledger_db/receipt/parquet_v2_receipt_store.go b/sei-db/ledger_db/receipt/parquet_v2_receipt_store.go new file mode 100644 index 0000000000..b2a23a4e13 --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2_receipt_store.go @@ -0,0 +1,354 @@ +package receipt + +import ( + "context" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt/parquet_v2" + "github.com/sei-protocol/sei-chain/x/evm/types" +) + +type parquetReceiptStoreV2 struct { + store *parquet_v2.Store + storeKey sdk.StoreKey + txHashIndex TxHashIndex + indexPruner *txHashIndexPruner + + closeOnce sync.Once + closeErr error +} + +func newParquetReceiptStoreV2(cfg dbconfig.ReceiptStoreConfig, storeKey sdk.StoreKey) (ReceiptStore, error) { + txIndexBackend := dbconfig.NormalizeReceiptTxIndexBackend(cfg.TxIndexBackend) + parquetTxIndexBackend := txIndexBackend + if parquetTxIndexBackend == dbconfig.ReceiptTxIndexBackendNone { + parquetTxIndexBackend = "none" + } + + store, err := parquet_v2.NewStore(parquet.StoreConfig{ + DBDirectory: cfg.DBDirectory, + KeepRecent: int64(cfg.KeepRecent), + PruneIntervalSeconds: int64(cfg.PruneIntervalSeconds), + TxIndexBackend: parquetTxIndexBackend, + WALConverter: replayConverterV2, + }) + if err != nil { + return nil, err + } + + var txHashIndex TxHashIndex + switch txIndexBackend { + case dbconfig.ReceiptTxIndexBackendNone: + case dbconfig.ReceiptTxIndexBackendPebble: + idx, err := NewPebbleTxHashIndex(TxHashIndexDir(cfg.DBDirectory)) + if err != nil { + _ = store.Close() + return nil, fmt.Errorf("failed to open tx hash index: %w", err) + } + txHashIndex = idx + default: + _ = store.Close() + return nil, fmt.Errorf("unsupported receipt tx index backend: %s", txIndexBackend) + } + + if txHashIndex != nil { + ctx := context.Background() + for _, rb := range store.ReplayedBlocks() { + if err := txHashIndex.IndexBlock(ctx, rb.BlockNumber, rb.TxHashes); err != nil { + _ = txHashIndex.Close() + _ = store.Close() + return nil, fmt.Errorf("failed to re-index replayed block %d: %w", rb.BlockNumber, err) + } + } + } + + wrapper := &parquetReceiptStoreV2{ + store: store, + storeKey: storeKey, + txHashIndex: txHashIndex, + } + if txHashIndex != nil { + wrapper.indexPruner = newTxHashIndexPruner( + txHashIndex, + int64(cfg.KeepRecent), + int64(cfg.PruneIntervalSeconds), + func() int64 { return store.LatestVersion() }, + ) + wrapper.indexPruner.Start() + } + + return wrapper, nil +} + +func replayConverterV2(blockNumber uint64, receiptBytes []byte, logStartIndex uint) (parquet.ReplayReceipt, error) { + receipt := &types.Receipt{} + if err := receipt.Unmarshal(receiptBytes); err != nil { + return parquet.ReplayReceipt{}, err + } + + txHash := common.HexToHash(receipt.TxHashHex) + blockHash := common.Hash{} + txLogs := getLogsForTx(receipt, logStartIndex) + for _, lg := range txLogs { + lg.BlockHash = blockHash + } + + record := parquet.ReceiptRecord{ + TxHash: parquet.CopyBytes(txHash[:]), + BlockNumber: blockNumber, + ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes), + } + return parquet.ReplayReceipt{ + Input: parquet.ReceiptInput{ + Receipt: record, + Logs: BuildParquetLogRecords(txLogs, blockHash), + ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes), + }, + TxHash: txHash, + Warmup: record, + LogCount: uint(len(txLogs)), + }, nil +} + +func (s *parquetReceiptStoreV2) LatestVersion() int64 { + return s.store.LatestVersion() +} + +func (s *parquetReceiptStoreV2) SetLatestVersion(version int64) error { + s.store.SetLatestVersion(version) + return nil +} + +func (s *parquetReceiptStoreV2) SetEarliestVersion(version int64) error { + s.store.SetEarliestVersion(version) + return nil +} + +func (s *parquetReceiptStoreV2) cacheRotateInterval() uint64 { + return s.store.CacheRotateInterval() +} + +func (s *parquetReceiptStoreV2) warmupReceipts() []ReceiptRecord { + raw := s.store.WarmupRecords() + records := make([]ReceiptRecord, 0, len(raw)) + for _, rec := range raw { + receipt := &types.Receipt{} + if err := receipt.Unmarshal(rec.ReceiptBytes); err != nil { + continue + } + records = append(records, ReceiptRecord{ + TxHash: common.BytesToHash(rec.TxHash), + Receipt: receipt, + }) + } + return records +} + +func (s *parquetReceiptStoreV2) GetReceipt(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) { + result, err := s.indexedReceiptLookup(ctx.Context(), txHash) + if err != nil { + return nil, err + } + if result != nil { + receipt := &types.Receipt{} + if err := receipt.Unmarshal(result.ReceiptBytes); err != nil { + return nil, err + } + return receipt, nil + } + + if s.storeKey == nil { + return nil, ErrNotFound + } + store := ctx.KVStore(s.storeKey) + bz := store.Get(types.ReceiptKey(txHash)) + if bz == nil { + return nil, ErrNotFound + } + var r types.Receipt + if err := r.Unmarshal(bz); err != nil { + return nil, err + } + return &r, nil +} + +func (s *parquetReceiptStoreV2) GetReceiptFromStore(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) { + result, err := s.indexedReceiptLookup(ctx.Context(), txHash) + if err != nil { + return nil, err + } + if result == nil { + return nil, ErrNotFound + } + + receipt := &types.Receipt{} + if err := receipt.Unmarshal(result.ReceiptBytes); err != nil { + return nil, err + } + return receipt, nil +} + +func (s *parquetReceiptStoreV2) indexedReceiptLookup(ctx context.Context, txHash common.Hash) (*parquet.ReceiptResult, error) { + if s.txHashIndex == nil { + return nil, ErrTxIndexDisabled + } + blockNum, ok, err := s.txHashIndex.GetBlockNumber(ctx, txHash) + if err != nil { + logger.Error("tx hash index lookup failed, falling back to full scan", "err", err) + return s.store.GetReceiptByTxHash(ctx, txHash) + } + if !ok { + return s.store.GetReceiptByTxHash(ctx, txHash) + } + return s.store.GetReceiptByTxHashInBlock(ctx, txHash, blockNum) +} + +func (s *parquetReceiptStoreV2) SetReceipts(ctx sdk.Context, receipts []ReceiptRecord) error { + height := uint64(ctx.BlockHeight()) //nolint:gosec // block heights fit within uint64 + + inputs, err := buildParquetReceiptInputs(receipts) + if err != nil { + return err + } + + if err := s.store.WriteReceipts(height, inputs); err != nil { + return err + } + + if s.txHashIndex != nil && len(inputs) > 0 { + if err := s.indexReceiptInputs(height, inputs); err != nil { + return fmt.Errorf("tx hash index write failed: %w", err) + } + } + + s.store.UpdateLatestVersion(ctx.BlockHeight()) + return nil +} + +// buildParquetReceiptInputs constructs ReceiptInputs for the v2 store. The +// wrapper-level BlockNumber field is intentionally left zero — v2 carries the +// committed height as an explicit parameter to WriteReceipts. The +// Receipt.BlockNumber column is still populated since it is what gets written +// to the parquet file. +func buildParquetReceiptInputs(receipts []ReceiptRecord) ([]parquet.ReceiptInput, error) { + blockHash := common.Hash{} + inputs := make([]parquet.ReceiptInput, 0, len(receipts)) + + var ( + currentBlock uint64 + logStartIndex uint + ) + + for _, record := range receipts { + if record.Receipt == nil { + continue + } + + receipt := record.Receipt + blockNumber := receipt.BlockNumber + + if currentBlock == 0 { + currentBlock = blockNumber + } + if blockNumber != currentBlock { + currentBlock = blockNumber + logStartIndex = 0 + } + + receiptBytes := record.ReceiptBytes + if len(receiptBytes) == 0 { + var err error + receiptBytes, err = receipt.Marshal() + if err != nil { + return nil, err + } + } + + txLogs := getLogsForTx(receipt, logStartIndex) + logStartIndex += uint(len(txLogs)) + for _, lg := range txLogs { + lg.BlockHash = blockHash + } + + inputs = append(inputs, parquet.ReceiptInput{ + Receipt: parquet.ReceiptRecord{ + TxHash: parquet.CopyBytes(record.TxHash[:]), + BlockNumber: blockNumber, + ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes), + }, + Logs: BuildParquetLogRecords(txLogs, blockHash), + ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes), + }) + } + + return inputs, nil +} + +func (s *parquetReceiptStoreV2) indexReceiptInputs(height uint64, inputs []parquet.ReceiptInput) error { + hashes := make([]common.Hash, len(inputs)) + for i := range inputs { + hashes[i] = common.BytesToHash(inputs[i].Receipt.TxHash) + } + return s.txHashIndex.IndexBlock(context.Background(), height, hashes) +} + +func (s *parquetReceiptStoreV2) FilterLogs(ctx sdk.Context, fromBlock, toBlock uint64, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + if fromBlock > toBlock { + return nil, fmt.Errorf("fromBlock (%d) > toBlock (%d)", fromBlock, toBlock) + } + + results, err := s.store.GetLogs(ctx.Context(), parquet.LogFilter{ + FromBlock: &fromBlock, + ToBlock: &toBlock, + Addresses: crit.Addresses, + Topics: crit.Topics, + }) + if err != nil { + return nil, err + } + + logs := make([]*ethtypes.Log, 0, len(results)) + for i := range results { + lr := results[i] + logEntry := ðtypes.Log{ + BlockNumber: lr.BlockNumber, + TxHash: common.BytesToHash(lr.TxHash), + TxIndex: uint(lr.TxIndex), + Index: uint(lr.LogIndex), + Data: lr.Data, + Removed: lr.Removed, + BlockHash: common.BytesToHash(lr.BlockHash), + } + copy(logEntry.Address[:], lr.Address) + logEntry.Topics = buildTopicsFromParquetLogResult(lr) + logs = append(logs, logEntry) + } + + return logs, nil +} + +// Close releases the parquet store, the tx-hash index, and the index pruner. +// Idempotent: indexPruner.Stop closes a channel that would panic on a second +// call, so the entire teardown is gated on closeOnce. Repeat callers receive +// the same error as the first. +func (s *parquetReceiptStoreV2) Close() error { + s.closeOnce.Do(func() { + if s.indexPruner != nil { + s.indexPruner.Stop() + } + s.closeErr = s.store.Close() + if s.txHashIndex != nil { + if err := s.txHashIndex.Close(); err != nil && s.closeErr == nil { + s.closeErr = err + } + } + }) + return s.closeErr +} diff --git a/sei-db/ledger_db/receipt/parquet_v2_store_test.go b/sei-db/ledger_db/receipt/parquet_v2_store_test.go new file mode 100644 index 0000000000..fcf8a6921d --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2_store_test.go @@ -0,0 +1,133 @@ +package receipt + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/filters" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt/parquet_v2" + "github.com/stretchr/testify/require" +) + +func extractParquetV2Store(t *testing.T, store ReceiptStore) *parquet_v2.Store { + t.Helper() + cached, ok := store.(*cachedReceiptStore) + require.True(t, ok, "expected *cachedReceiptStore") + pq, ok := cached.backend.(*parquetReceiptStoreV2) + require.True(t, ok, "expected *parquetReceiptStoreV2 backend") + return pq.store +} + +func TestParquetV2ReceiptStoreReopenQueries(t *testing.T) { + ctx, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "parquet_v2" + cfg.DBDirectory = t.TempDir() + + store, err := NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + + txHash := common.HexToHash("0x220") + addr := common.HexToAddress("0x300") + topic := common.HexToHash("0x5678") + receipt := makeTestReceipt(txHash, 5, 1, addr, []common.Hash{topic}) + + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(5), []ReceiptRecord{ + {TxHash: txHash, Receipt: receipt}, + })) + require.NoError(t, store.Close()) + + store, err = NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + got, err := store.GetReceiptFromStore(ctx, txHash) + require.NoError(t, err) + require.Equal(t, receipt.TxHashHex, got.TxHashHex) + + logs, err := store.FilterLogs(ctx, 3, 5, filters.FilterCriteria{ + Addresses: []common.Address{addr}, + Topics: [][]common.Hash{{topic}}, + }) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, uint64(5), logs[0].BlockNumber) +} + +func TestParquetV2DuplicateHashLogsSurviveReopen(t *testing.T) { + ctx, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "parquet_v2" + cfg.DBDirectory = t.TempDir() + + store, err := NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + + txHash := common.HexToHash("0x333") + addr := common.HexToAddress("0x3330") + topic := common.HexToHash("0x3331") + for _, block := range []uint64{1, 2} { + receipt := makeTestReceipt(txHash, block, 0, addr, []common.Hash{topic}) + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(int64(block)), []ReceiptRecord{ + {TxHash: txHash, Receipt: receipt}, + })) + } + require.NoError(t, store.Close()) + + store, err = NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + logs, err := store.FilterLogs(ctx, 1, 2, filters.FilterCriteria{ + Addresses: []common.Address{addr}, + Topics: [][]common.Hash{{topic}}, + }) + require.NoError(t, err) + require.Len(t, logs, 2) + require.Equal(t, []uint64{1, 2}, []uint64{logs[0].BlockNumber, logs[1].BlockNumber}) + require.Equal(t, txHash, logs[0].TxHash) + require.Equal(t, txHash, logs[1].TxHash) +} + +func TestParquetV2EmptyBoundaryRotationFeedsClosedFileReads(t *testing.T) { + ctx, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "parquet_v2" + cfg.DBDirectory = t.TempDir() + cfg.TxIndexBackend = "" + + store, err := NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + + pqStore := extractParquetV2Store(t, store) + pqStore.SetMaxBlocksPerFile(4) + + addr := common.HexToAddress("0x440") + topic := common.HexToHash("0x441") + for _, block := range []uint64{2, 5} { + txHash := common.BigToHash(new(big.Int).SetUint64(block)) + receipt := makeTestReceipt(txHash, block, 0, addr, []common.Hash{topic}) + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(int64(block)), []ReceiptRecord{ + {TxHash: txHash, Receipt: receipt}, + })) + if block == 2 { + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(4), nil)) + } + } + require.NoError(t, store.Close()) + + store, err = NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + extractParquetV2Store(t, store).SetMaxBlocksPerFile(4) + + logs, err := store.FilterLogs(ctx, 5, 5, filters.FilterCriteria{ + Addresses: []common.Address{addr}, + Topics: [][]common.Hash{{topic}}, + }) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, uint64(5), logs[0].BlockNumber) +} diff --git a/sei-db/ledger_db/receipt/receipt_store.go b/sei-db/ledger_db/receipt/receipt_store.go index c2ee7f2742..b5e4fdd986 100644 --- a/sei-db/ledger_db/receipt/receipt_store.go +++ b/sei-db/ledger_db/receipt/receipt_store.go @@ -80,8 +80,9 @@ type receiptStore struct { } const ( - receiptBackendPebble = "pebble" - receiptBackendParquet = "parquet" + receiptBackendPebble = "pebble" + receiptBackendParquet = "parquet" + receiptBackendParquetV2 = "parquet_v2" ) func normalizeReceiptBackend(backend string) string { @@ -90,6 +91,8 @@ func normalizeReceiptBackend(backend string) string { return receiptBackendPebble case receiptBackendParquet: return receiptBackendParquet + case receiptBackendParquetV2: + return receiptBackendParquetV2 default: return strings.ToLower(strings.TrimSpace(backend)) } @@ -113,7 +116,7 @@ func NewReceiptStoreWithReadMetrics( return newCachedReceiptStore(backend, metrics), nil } -// BackendTypeName returns the backend implementation name ("parquet" or "pebble") for testing. +// BackendTypeName returns the backend implementation name for testing. // Returns "" if store is nil or the backend type is unknown. func BackendTypeName(store ReceiptStore) string { if store == nil { @@ -123,6 +126,8 @@ func BackendTypeName(store ReceiptStore) string { store = c.backend } switch store.(type) { + case *parquetReceiptStoreV2: + return receiptBackendParquetV2 case *parquetReceiptStore: return receiptBackendParquet case *receiptStore: @@ -139,6 +144,8 @@ func newReceiptBackend(config dbconfig.ReceiptStoreConfig, storeKey sdk.StoreKey backend := normalizeReceiptBackend(config.Backend) switch backend { + case receiptBackendParquetV2: + return newParquetReceiptStoreV2(config, storeKey) case receiptBackendParquet: return newParquetReceiptStore(config, storeKey) case receiptBackendPebble: diff --git a/sei-db/ledger_db/receipt/receipt_store_test.go b/sei-db/ledger_db/receipt/receipt_store_test.go index e7aa1faf41..94a2f5c6a2 100644 --- a/sei-db/ledger_db/receipt/receipt_store_test.go +++ b/sei-db/ledger_db/receipt/receipt_store_test.go @@ -80,6 +80,13 @@ func TestNewReceiptStoreConfigErrors(t *testing.T) { require.NotNil(t, store) require.NoError(t, store.Close()) + cfg.Backend = "parquet_v2" + store, err = receipt.NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + require.NotNil(t, store) + require.Equal(t, "parquet_v2", receipt.BackendTypeName(store)) + require.NoError(t, store.Close()) + cfg.TxIndexBackend = "rocksdb" cfg.Backend = "pebble" store, err = receipt.NewReceiptStore(cfg, storeKey) @@ -92,6 +99,44 @@ func TestNewReceiptStoreConfigErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, store) require.NoError(t, store.Close()) + + cfg.Backend = "parquet_v2" + store, err = receipt.NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + require.NotNil(t, store) + require.NoError(t, store.Close()) +} + +func TestParquetV2ReceiptStoreRoundTripAfterReopen(t *testing.T) { + storeKey := storetypes.NewKVStoreKey("evm") + tkey := storetypes.NewTransientStoreKey("evm_transient") + ctx := testutil.DefaultContext(storeKey, tkey) + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.Backend = "parquet_v2" + cfg.DBDirectory = t.TempDir() + cfg.KeepRecent = 0 + + store, err := receipt.NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + require.Equal(t, "parquet_v2", receipt.BackendTypeName(store)) + + txHash := common.HexToHash("0x99") + addr := common.HexToAddress("0x1") + topic := common.HexToHash("0x2") + want := makeReceipt(txHash, addr, []common.Hash{topic}, 0) + require.NoError(t, store.SetReceipts(ctx.WithBlockHeight(1), []receipt.ReceiptRecord{ + {TxHash: txHash, Receipt: want}, + })) + require.NoError(t, store.Close()) + + reopened, err := receipt.NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + t.Cleanup(func() { _ = reopened.Close() }) + + got, err := reopened.GetReceiptFromStore(ctx, txHash) + require.NoError(t, err) + require.Equal(t, want.TxHashHex, got.TxHashHex) + require.Equal(t, uint64(1), got.BlockNumber) } func TestSetReceiptsAndGet(t *testing.T) {