Skip to content

Parquet receiptdb refactor coordinator thread#3345

Open
jewei1997 wants to merge 29 commits intomainfrom
parquet-refactor-v2-coordinator-thread2
Open

Parquet receiptdb refactor coordinator thread#3345
jewei1997 wants to merge 29 commits intomainfrom
parquet-refactor-v2-coordinator-thread2

Conversation

@jewei1997
Copy link
Copy Markdown
Contributor

@jewei1997 jewei1997 commented Apr 30, 2026

Describe your changes and provide context

  • Refactors the v2 parquet receipt store onto a single coordinator goroutine that owns all
    mutable state (closed files, writers, WAL, version metadata) and serializes requests over a
    channel.
  • New coordinator package handles event loop, WAL append/replay, rotation, pruning, and
    DuckDB-backed reads; parquet_v2.Store is a thin facade wrapping it.
  • Wires the new backend into the existing dispatcher as opt-in rs-backend = "parquet_v2";
    default remains pebbledb, public ReceiptStore interface is unchanged, and v2 writes to
    its own data subdirectory so upgrading nodes are unaffected.
  • Known follow-up: reads currently dispatch through the coordinator goroutine and
    serialize with writes. This is intentional for this PR to keep the correctness story simple;
    a follow-up will move reads off the loop to restore the unbounded read concurrency v1 had
    via RWMutex.
  • Known follow-up: a dedicated write worker will be added to operate on the latest open
    parquet file, offloading the actual parquet append from the coordinator loop while the
    coordinator continues to own rotation and WAL ordering.

Testing performed to validate your change

  • New tests cover dispatch, write path, block-boundary and empty-block rotation, WAL append
  • replay, and crash recovery via SimulateCrash.
  • Pruning, read-after-reopen, and corrupt-tail file cleanup are exercised end-to-end.
  • Round-trip tests through the ReceiptStore interface confirm parity with existing
    backends on the public API.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 30, 2026

The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed✅ passed✅ passed✅ passedMay 6, 2026, 4:00 AM

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 30, 2026

Codecov Report

❌ Patch coverage is 75.99668% with 289 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.30%. Comparing base (1b32293) to head (0e63a08).
⚠️ Report is 30 commits behind head on main.

Files with missing lines Patch % Lines
...dger_db/receipt/parquet_v2/coordinator/handlers.go 77.52% 37 Missing and 32 partials ⚠️
...i-db/ledger_db/receipt/parquet_v2_receipt_store.go 68.34% 47 Missing and 22 partials ⚠️
...ledger_db/receipt/parquet_v2/coordinator/reader.go 74.21% 30 Missing and 19 partials ⚠️
...db/ledger_db/receipt/parquet_v2/coordinator/wal.go 61.34% 25 Missing and 21 partials ⚠️
...r_db/receipt/parquet_v2/coordinator/coordinator.go 80.20% 25 Missing and 14 partials ⚠️
.../ledger_db/receipt/parquet_v2/coordinator/prune.go 78.12% 4 Missing and 3 partials ⚠️
.../ledger_db/receipt/parquet_v2/coordinator/files.go 88.67% 3 Missing and 3 partials ⚠️
.../ledger_db/receipt/parquet_v2/coordinator/types.go 50.00% 1 Missing and 1 partial ⚠️
sei-db/ledger_db/receipt/parquet_v2/store.go 95.12% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3345      +/-   ##
==========================================
+ Coverage   59.18%   59.30%   +0.11%     
==========================================
  Files        2097     2107      +10     
  Lines      172517   173719    +1202     
==========================================
+ Hits       102107   103020     +913     
- Misses      61558    61731     +173     
- Partials     8852     8968     +116     
Flag Coverage Δ
sei-chain-pr 75.83% <75.99%> (?)
sei-db 70.41% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
sei-db/config/receipt_config.go 79.16% <100.00%> (ø)
sei-db/ledger_db/parquet/store.go 69.68% <ø> (ø)
...dger_db/receipt/parquet_v2/coordinator/requests.go 100.00% <100.00%> (ø)
sei-db/ledger_db/receipt/receipt_store.go 67.64% <100.00%> (+0.98%) ⬆️
.../ledger_db/receipt/parquet_v2/coordinator/types.go 50.00% <50.00%> (ø)
sei-db/ledger_db/receipt/parquet_v2/store.go 95.12% <95.12%> (ø)
.../ledger_db/receipt/parquet_v2/coordinator/files.go 88.67% <88.67%> (ø)
.../ledger_db/receipt/parquet_v2/coordinator/prune.go 78.12% <78.12%> (ø)
...r_db/receipt/parquet_v2/coordinator/coordinator.go 80.20% <80.20%> (ø)
...db/ledger_db/receipt/parquet_v2/coordinator/wal.go 61.34% <61.34%> (ø)
... and 3 more
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@cody-littley cody-littley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attached is an LLM audit. I did not verify if these bugs were real, so it's possible some may be false positives. Usually when I generate these sorts of reports on my own code, at least two thirds of the issues it finds tend to be real.

parquet-v2-coordinator-audit.md

Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/api.go Outdated
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go Outdated
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/api.go Outdated
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/api.go Outdated
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/api.go Outdated
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/reader.go
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/requests.go
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/types.go
Comment thread sei-db/ledger_db/receipt/parquet_v2/coordinator/wal.go Outdated
Comment thread sei-db/ledger_db/receipt/parquet_v2/store.go Outdated
jewei1997 and others added 7 commits May 4, 2026 09:16
The rotation interval is set at construction and only mutated by the
test-only SetMaxBlocksPerFile, so the request-channel round-trip on
every read isn't needed. Drop the request type/handler/dispatch case
and read c.config.MaxBlocksPerFile directly with a doc comment about
the no-race-with-mutation contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…coord request

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the pruneTicker/pruneTick struct fields and stopPruneTicker()
helper with a local ticker in run() guarded by defer ticker.Stop().
The defer covers all three exit paths (done, simulateCrashReq,
closeReq), so the explicit stopPruneTicker() calls in handleClose and
handleSimulateCrash are no longer needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the public ReplayWAL methods with construction-time replay driven
by a ReplayHooks struct (converter + per-block tx-index callback). Wrappers
now open the tx-hash index first, build the hooks closure, and pass it into
NewStore — by the time NewStore returns, parquet state, tx index, and
warmup records are all hot.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jewei1997 and others added 4 commits May 4, 2026 11:50
NewStore is back to a single argument. The converter is a config field
("what kind of receipts does this store hold"), not a wiring callback.
The wrapper drains store.ReplayedBlocks() after construction to
re-populate its tx-hash index — wrapper-specific business stays in the
wrapper.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously, an error in flushOpenFile, closeWriters, or wal.Close would
return early from handleClose, leaving the remaining resources (WAL
background goroutines, DuckDB reader connection, open file descriptors)
attached to the coordinator after run() exited. Now every step runs
unconditionally and errors are aggregated via errors.Join.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The captured err was a local in Close(), so a second invocation got a
fresh nil — closeOnce.Do skips the closure body but the second caller's
local err was never written. Move err onto the struct as closeErr so
all callers observe the result of the single close attempt. The
happens-before semantics of sync.Once guarantee the read is safe.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CacheRotateInterval was reading c.config.MaxBlocksPerFile directly from
caller goroutines while handleSetMaxBlocksPerFile wrote it from the run
goroutine — a data race the prior comment acknowledged but didn't fix.
Mirror the value in an atomic.Uint64 that handleSetMaxBlocksPerFile
keeps in sync, and read from the mirror.

Also thread context through sendAndAwaitResponse/awaitError so reads
respect caller cancellation, and gate parquetReceiptStoreV2.Close on a
sync.Once (indexPruner.Stop closes a channel and panics on a second
call).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@cody-littley
Copy link
Copy Markdown
Contributor

Overall, moving in a good direction. An LLM provided the following audit report. Can you please work through these? If you think the AI is incorrect, feel free to explain why and to skip the fix.

Everything below this header is LLM generated:

Deep Audit: sei-db/ledger_db/receipt/parquet_v2


Suggested priority

Must fix before merge (real bugs):

  1. (#1) New writer/file-handle leak when replayWAL errors. Trivial: a cleanupWriters defer.
  2. (#5) SetMaxBlocksPerFile(0) corrupts Reader.filterLogFiles. Either reject 0 in the handler or guard inside filterLogFiles.
  3. (#19) Handlers send to req.resp unconditionally. Add select { case req.resp <- ...: case <-c.done: } so a missing/buffer-zero caller can't wedge the run goroutine.

Worth tightening as a follow-up (latent / fragile):

  1. (#2) Either route WarmupRecords/ReplayedBlocks through the request channel, or add a strong "single-shot, on the constructing goroutine, before any other call" doc/assertion.
  2. (#4 + #6) Replace the lastSeenBlock != 0 sentinel with an explicit haveLastSeen bool, and overwrite input.Receipt.BlockNumber in applyReceipt so height is genuinely authoritative.
  3. (#11 + #16 + #24) Document that Close after SimulateCrash returns nil and explain the closeOnce window for c.done.
  4. (#13) handleSimulateCrash should nil c.wal/c.reader for symmetry with handleClose.
  5. (#21) Stop swallowing os.Remove errors in validateAndCleanFiles.
  6. (#9) Add sort.IsSorted on c.closedFiles (or sort defensively) in receiptFileSnapshotForBlock.

Everything else is observation/comment-worthy but doesn't need code changes.

Let me know if you want me to draft fixes for any of (1), (5), or (19) — those three are the ones I'd block merge on.

Real bugs (highest confidence first)

1. New leaks parquet writers/files when replayWAL returns mid-flight

coordinator.go:144-151 — if replayWAL initialized writers (via the applyReceiptFromReplay → applyReceipt → initWriters chain) and then errored, the deferred cleanup only handles the reader and the WAL:

	cleanupReader := true
	defer func() {
		if cleanupReader {
			_ = reader.Close()
		}
	}()

	walDir := filepath.Join(storeCfg.DBDirectory, "parquet-wal")
	receiptWAL, err := parquet.NewWAL(walDir)
	if err != nil {
		return nil, err
	}
	cleanupWAL := true
	defer func() {
		if cleanupWAL {
			_ = receiptWAL.Close()
		}
	}()
	if cfg.WALConverter != nil {
		result, err := c.replayWAL(cfg.WALConverter)
		if err != nil {
			return nil, err
		}
		c.warmupRecords = result.WarmupRecords
		c.replayedBlocks = result.Blocks
	}

Concretely: replayWAL calls applyReceiptFromReplay → applyReceipt → initWriters, which sets c.receiptWriter/c.logWriter/c.receiptFile/c.logFile. If a later iteration of the replay loop fails (a bad WAL entry, converter error, rotation error, etc.) the function returns and we skip go c.run(). Defers only close the reader and WAL — the parquet writers and *os.File handles stay open until process exit. In tests that re-New the same directory, the leaked fd is a real footgun (and on Windows it would block the next os.Create).

Fix: add a cleanupWriters deferred guard, or factor the writer cleanup out of closeWriters and call it on any error path before go c.run().

2. WarmupRecords / ReplayedBlocks accessors are not goroutine-safe by construction — only by an undocumented invariant

func (c *Coordinator) WarmupRecords() []parquet.ReceiptRecord {
	records := c.warmupRecords
	c.warmupRecords = nil
	return records
}

// ReplayedBlocks returns and clears the per-block tx-hash listing
// recovered during construction-time WAL replay. Wrappers drain this
// once after construction to repopulate an external tx-hash index.
func (c *Coordinator) ReplayedBlocks() []ReplayedBlock {
	blocks := c.replayedBlocks
	c.replayedBlocks = nil
	return blocks
}

Every other accessor goes through the requests channel, so all coordinator state is owned by the run goroutine. These two cheat: they read+clear from the caller's goroutine. The "doc says it's only called once after New" works today because the run goroutine never touches warmupRecords/replayedBlocks (no handler reads them). But:

  • The first call goes through right after go c.run(). The go statement gives a happens-before edge, so the initial read is fine.
  • A second call (or a call after the run goroutine is doing other work) is racy in principle — the race detector won't fire because there's still no concurrent writer, but adding any handler that touches these fields silently introduces a race with no warning.

Either route them through coordRequest (cheap; runs in the run goroutine like everything else) or add a comment + assertion that they must only be drained once on the constructing goroutine before any other call. The current state is "works, but only by happenstance."

3. writeReceipts rotates before the new height's receipts are applied — but the newly-written WAL entry can be replayed twice if lastSeenBlock == 0

func (c *Coordinator) writeReceipts(height uint64, inputs []parquet.ReceiptInput) error {
	if len(inputs) == 0 {
		return c.observeBlock(height)
	}
	if c.wal == nil {
		return fmt.Errorf("parquet WAL is not initialized")
	}

	receiptBytes := make([][]byte, len(inputs))
	for i := range inputs {
		receiptBytes[i] = inputs[i].ReceiptBytes
	}
	if err := c.wal.Write(parquet.WALEntry{BlockNumber: height, Receipts: receiptBytes}); err != nil {
		return err
	}
	...
	if c.receiptWriter != nil && height != c.lastSeenBlock && c.isRotationBoundary(height) {
		if err := c.rotateOpenFile(height); err != nil {
			return err
		}
	}

Consider a totally fresh store, MaxBlocksPerFile=4, first call is WriteReceipts(0, [r]):

  • WAL write succeeds.
  • c.receiptWriter == nil → we skip rotation.
  • applyReceipt(0, r): lazy-init at fileStartBlock=0, opens receipts_0.parquet. blockNumber == c.lastSeenBlock == 0 → the body of the if never runs, so c.lastSeenBlock stays 0.

Now WriteReceipts(4, [r2]):

  • WAL write succeeds.
  • c.receiptWriter != nil, height(4) != c.lastSeenBlock(0), isRotationBoundary(4) == true → rotation. Block 0's data is flushed to receipts_0.parquet (good).

That part is fine, but it leans on an extremely subtle interaction with lastSeenBlock's zero sentinel that only works because block 0 happens to be its own boundary. The real bug shows up in the symmetric case below (#4).

4. lastSeenBlock = 0 sentinel breaks rotation/flush bookkeeping if the first observed block is exactly 0

	if blockNumber != c.lastSeenBlock {
		if c.lastSeenBlock != 0 {
			c.blocksSinceFlush++
		}
		c.lastSeenBlock = blockNumber
	}

lastSeenBlock is overloaded as both "previously seen height" and "no height seen yet" (zero value). Two consequences:

a) writeReceipts (non-empty) at boundary 0 with empty cache — the rotation guard height != c.lastSeenBlock is false on the very first non-empty write at height 0 (both are zero), so we don't rotate. That's actually what we want here, but only because receiptWriter == nil is guarding it. If a later refactor flips the order or removes that early-out, this becomes a real bug.

b) blocksSinceFlush is undercounted by one when the chain genuinely starts from block 0. With BlockFlushInterval=2 and blocks 0,1,2,3,…, the first flush fires at block 3 instead of block 2 — because the 0→1 transition is silently skipped by if c.lastSeenBlock != 0.

This is inherited from v1, but v2 carries it forward verbatim. Fix is to use a separate bool haveLastSeen (or initialize lastSeenBlock = ^uint64(0)).

5. Reader.filterLogFiles is unsafe when MaxBlocksPerFile == 0

func (r *Reader) filterLogFiles(files []string, filter parquet.LogFilter) []string {
	filtered := make([]string, 0, len(files))
	for _, f := range files {
		startBlock := parquet.ExtractBlockNumber(f)
		if filter.ToBlock != nil && startBlock > *filter.ToBlock {
			continue
		}
		if filter.FromBlock != nil && startBlock+r.maxBlocksPerFile <= *filter.FromBlock {
			continue
		}
		filtered = append(filtered, f)
	}
	return filtered
}

NewReaderWithMaxBlocksPerFile defends against 0 at construction by substituting the default, but setMaxBlocksPerFile (driven by handleSetMaxBlocksPerFile) does not:

func (c *Coordinator) handleSetMaxBlocksPerFile(req setMaxBlocksPerFileReq) {
	c.config.MaxBlocksPerFile = req.maxBlocksPerFile
	c.cacheRotateInterval.Store(req.maxBlocksPerFile)
	if c.reader != nil {
		c.reader.setMaxBlocksPerFile(req.maxBlocksPerFile)
	}
	req.resp <- nil
}

If anyone calls SetMaxBlocksPerFile(0):

  • isRotationBoundary always returns false → no rotation, single file grows forever (acceptable).
  • But the filter becomes startBlock + 0 <= FromBlock, which drops the very files that contain the from-block range. GetLogs silently returns an empty slice for any non-zero FromBlock.

Either reject 0 in the handler (and in Store.SetMaxBlocksPerFile), or special-case maxBlocksPerFile == 0 in filterLogFiles to be a no-op.

6. WriteReceipts ignores per-input BlockNumber for the WAL but passes the inputs verbatim into the parquet record

// writeReceipts records a committed block at height. … height is
// authoritative; inputs[i].BlockNumber is ignored.

The WAL entry is written with height. But applyReceipt writes input.Receipt straight into c.receiptsBuffer, including input.Receipt.BlockNumber unchanged. If a caller ever has inputs[i].Receipt.BlockNumber != height (e.g. a partial migration, or a future caller that drops the BlockNumber field on the outer ReceiptInput and forgets the inner one), the WAL says block X but the parquet column says block Y. Replay would re-apply at the WAL's height, but the parquet column would still disagree.

Today the wrapper sets both consistently in buildParquetReceiptInputs, so this is latent. Suggest either:

  • Overwrite input.Receipt.BlockNumber = height inside applyReceipt so height is truly authoritative, or
  • Validate that they match.

The doc claims "height is authoritative" — only the wrapper's care makes that true.

7. replayWAL early-exit on len(entry.Receipts) == 0 may corrupt rotation tracking when the empty entry sits between same-block entries

	err = c.wal.Replay(firstOffset, lastOffset, func(offset uint64, entry parquet.WALEntry) error {
		if len(entry.Receipts) == 0 {
			return nil
		}

		blockNumber := entry.BlockNumber
		if blockNumber < c.fileStartBlock {
			dropOffset = offset
			return nil
		}

		if haveBlock && blockNumber != currentBlock && c.isRotationBoundary(blockNumber) && blockNumber > c.fileStartBlock && offset > 0 {
			dropOffset = offset - 1
		}

		if !haveBlock || blockNumber != currentBlock {
			currentBlock = blockNumber
			haveBlock = true
			logStartIndex = 0
		}

Two side-issues here:

  • The early-out for empty WAL entries skips updating dropOffset. If somebody writes empty entries (today they can't, but the format permits it), they'd remain in the WAL forever even after replay — small leak, not a correctness break.
  • More importantly: replayWAL's "rotation detection" only fires on blockNumber != currentBlock. It never reconciles currentBlock with whatever applyReceiptFromReplay actually does. So if WAL has e.g. [block 4 (boundary)] only and fileStartBlock=4, the boundary is never recorded as a dropOffset = offset - 1, because haveBlock is false on the first entry. That's correct (the entry isn't durable yet — it's still buffered post-init), but it took quite a bit of tracing to convince myself. Worth a comment.

8. replayWAL doesn't update dropOffset when applies of non-boundary blocks become durable mid-rotation

When a rotation fires at offset N (boundary entry), dropOffset = N - 1 so we drop everything strictly before N. That's right for the boundary's own buffered data. But by the time the rotation closes the previous file, every receipt at every offset before N is durably persisted. Currently we already drop them as a side effect (anything < N), so this is fine — but only because rotation is the only "durability event" we care about. If BlockFlushInterval > 0 causes a mid-block flush, the data is in the open parquet writer but the trailer hasn't been written, so it isn't durable enough to drop — and we don't try. Correct, but worth a comment.

9. receiptFileSnapshotForBlock selects the file containing blockNumber only if closedFiles is sorted

func (c *Coordinator) receiptFileSnapshotForBlock(blockNumber uint64) []string {
	var best string
	for _, f := range c.closedFiles {
		if f.startBlock > blockNumber {
			break
		}
		best = f.receiptPath
	}

scanClosedFiles does sort, and rotations append in order, so the invariant holds. But there's no assertion / sort.IsSorted check, and dispatch_test.go constructs coordinators by hand. If a future refactor (e.g. moving prune to a worker pool) ever inserts out of order, this silently returns the wrong file. A sort.Slice (or asserting IsSorted) here is cheap insurance.

10. closedFiles invariant + misaligned filenames

alignedFileStartBlock returns 0 when MaxBlocksPerFile is 0, but more importantly it doesn't alter fileStartBlock if the aligned value would step backwards:

	if c.receiptWriter == nil {
		aligned := alignedFileStartBlock(blockNumber, c.config.MaxBlocksPerFile)
		if aligned >= c.fileStartBlock {
			c.fileStartBlock = aligned
		}
		if err := c.initWriters(); err != nil {
			return err
		}
	}

This is intentional (mirrors v1 — see the comment in parquet/store.go's applyReceiptLocked) so we don't os.Create-truncate an existing closed file at the aligned start. But it produces misaligned filenames on reopen: e.g. if maxBlock=100 and MaxBlocksPerFile=100, you reopen with fileStartBlock=101 and the next write creates receipts_101.parquet, which Reader.filterLogFiles then assumes spans [101, 201). The actual file rotates at block 200 (the next boundary), so it spans [101, 200] — meaning Reader's assumption about the file's last block is off by one (claims 201 is included, but 200 is the first block of the next file). For the inclusive <= ToBlock check the off-by-one is harmless, but the off-by-one in the prune end-block calculation:

func (c *Coordinator) shouldPruneClosedFile(f closedFile, pruneBeforeBlock uint64) bool {
	fileEndBlock := f.startBlock + c.config.MaxBlocksPerFile
	if fileEndBlock < f.startBlock {
		fileEndBlock = ^uint64(0)
	}
	return fileEndBlock <= pruneBeforeBlock
}

…always over-estimates the end block of misaligned files. We refuse to prune them until pruneBeforeBlock >= 201 even though the contents end at 199. Not a correctness break; only minor over-retention. But worth flagging.

11. Coordinator.Close returns nil when called after SimulateCrash

func (c *Coordinator) Close() error {
	c.closeOnce.Do(func() {
		resp := make(chan error, 1)
		c.closeErr = awaitError(context.Background(), c, closeReq{resp: resp}, resp)
		close(c.done)
	})
	return c.closeErr
}

closeOnce.Do makes the second call a no-op, so closeErr (zero value nil) is returned. The wrapper's parquet_v2_receipt_store.go happily threads that through parquetReceiptStoreV2.Close. The test TestSimulateCrashStopsFutureRequests even asserts require.NoError(t, store.Close()) after a crash — but a real crash leaves parquet files without trailers, file descriptors leaked from closeWriters never running, and the WAL/Reader closed only because handleSimulateCrash happened to do so. This isn't strictly a bug, but it's a quietly misleading API: "I closed it" returns nil even though I never actually performed a graceful close. At minimum, document it; ideally, return errSimulatedCrash or similar so callers can distinguish.

12. handleClose swallows errors.Join(nil…) — fine, but the WAL is closed after the writers; if the WAL's encoder has buffered any pending appends from a hook-fired-write that happened just before Close, those are lost

func (c *Coordinator) handleClose(req closeReq) {
	var errs []error
	if err := c.flushOpenFile(); err != nil {
		errs = append(errs, fmt.Errorf("flush: %w", err))
	}
	if err := c.closeWriters(); err != nil {
		errs = append(errs, err)
	}
	if c.wal != nil {
		if err := c.wal.Close(); err != nil {
			errs = append(errs, fmt.Errorf("wal close: %w", err))
		}
		c.wal = nil
	}
	if c.reader != nil {
		if err := c.reader.Close(); err != nil {
			errs = append(errs, fmt.Errorf("reader close: %w", err))
		}
		c.reader = nil
	}
	req.resp <- errors.Join(errs...)
}

Order is correct (parquet first so its data is durable before we drop the WAL durability shield). Two minor issues:

  • If flushOpenFile fails, we still try to closeWriters, which writes a parquet trailer over the partially-written buffer. The result might be a parquet file the reader probe accepts, but with truncated row groups. That's "best effort on shutdown" — comment-worthy.
  • If both wal.Close and reader.Close fail, you get a errors.Join whose .Error() is multi-line and slightly user-hostile — fine for logs, but the wrapper assigns it to s.closeErr and surfaces it from parquetReceiptStoreV2.Close.

13. handleSimulateCrash does not nil c.wal and c.reader, while handleClose does

func (c *Coordinator) handleSimulateCrash(req simulateCrashReq) {
	if c.receiptFile != nil {
		_ = c.receiptFile.Close()
		c.receiptFile = nil
	}
	if c.logFile != nil {
		_ = c.logFile.Close()
		c.logFile = nil
	}
	c.receiptWriter = nil
	c.logWriter = nil
	if c.wal != nil {
		_ = c.wal.Close()
	}
	if c.reader != nil {
		_ = c.reader.Close()
	}
	req.resp <- struct{}{}
}

Compare to handleClose which does c.wal = nil and c.reader = nil. It's harmless today because dispatch returns true, the run goroutine exits, and no further handler will read c.wal/c.reader — but a subsequent reader of the closed *Reader.db would NPE on r.db.Exec(...) since Close already nilled r.db. Set them to nil here too for symmetry.

14. TestCloseReturnsSameErrorToRepeatCallers documents the closeOnce/closeErr behavior, but if awaitError fails because ctx.Done() fires (impossible with context.Background(), but…), closeErr = ctx.Err() and c.done is still closed afterwards even though the run goroutine never received the request

func (c *Coordinator) Close() error {
	c.closeOnce.Do(func() {
		resp := make(chan error, 1)
		c.closeErr = awaitError(context.Background(), c, closeReq{resp: resp}, resp)
		close(c.done)
	})
	return c.closeErr
}

With context.Background() this never fires, so it's fine in practice. But with the SimulateCrash-then-Close ordering, if the crash already closed c.done, the closeReq attempt in sendAndAwaitResponse selects on <-c.done and returns ErrStoreClosed — which is fine. Then we close(c.done) again, which panics. ← actually this can fire: closeOnce blocks repeated Close calls, but SimulateCrash and Close are gated by the same closeOnce, so this is safe. Verified fine — but document it; the safety here depends on SimulateCrash and Close sharing closeOnce.

15. clearWALPreservingLast no-ops on the "single-entry WAL" case; that interacts oddly with the observeBlock rotation path

func (c *Coordinator) clearWALPreservingLast() error {
	if c.wal == nil {
		return nil
	}
	firstOffset, err := c.wal.FirstOffset()
	...
	if lastOffset <= firstOffset {
		return nil
	}
	if err := c.wal.TruncateBefore(lastOffset); err != nil {
		...

observeBlock calls rotateOpenFile for empty boundary blocks (e.g. WriteReceipts(4, nil) after WriteReceipts(2, [r])). rotateOpenFile finalizes receipts_0.parquet (containing block 2) and then calls clearWALPreservingLast. At this point the WAL has only block 2's entry (firstOffset == lastOffset), so we skip truncation. The block 2 entry stays in the WAL forever — until the next non-empty WriteReceipts causes a real truncation. Replay correctly skips it (block 2 < new fileStartBlock=4), so functionally fine, but it means parquet-wal carries a redundant entry indefinitely after empty-block rotations.

16. SimulateCrash short-circuits between simulateCrashReq.resp and close(c.done) — but what if dispatch already returned true?

func (c *Coordinator) SimulateCrash() {
	c.closeOnce.Do(func() {
		resp := make(chan struct{}, 1)
		_, _ = sendAndAwaitResponse(context.Background(), c, simulateCrashReq{resp: resp}, resp)
		close(c.done)
	})
}

handleSimulateCrash sends to resp (buffered, OK) and then dispatch returns truerun() returns. There's a window where the run goroutine has exited and c.done is not yet closed. During this window, any new WriteReceipts call blocks indefinitely on c.requests <- req until close(c.done) happens, then unblocks with ErrStoreClosed. In practice the window is microseconds, but there's no test that exercises it. Tighten: close c.done inside the handler before responding, or document the window.

(Same observation applies to Close().)

17. Reader.NewReaderWithMaxBlocksPerFile configures DuckDB with runtime.NumCPU() threads and a 1 GB memory limit

	connector, err := duckdb.NewConnector("", nil)
	...
	db.SetMaxOpenConns(numCPU * 2)
	db.SetMaxIdleConns(numCPU)

	settings := []string{
		fmt.Sprintf("SET threads TO %d", numCPU),
		"SET memory_limit = '1GB'",
		"SET enable_object_cache = true",
		"SET enable_progress_bar = false",
		"SET preserve_insertion_order = false",
	}

Since every read currently runs on the run goroutine (one DuckDB query at a time), the SetMaxOpenConns(numCPU*2) / SetMaxIdleConns(numCPU) connection pool is over-provisioned. Once you move IO to a worker pool, this fits — but until then, you're holding numCPU idle DuckDB connections per coordinator, each of which has its own memory + thread pool. On a 64-core box that's a meaningful idle cost.

Not a correctness bug; just a heads-up.

18. MaxReceiptBlockNumber only scans closed files, not the open writer

	receiptFiles := make([]string, 0, len(closedFiles))
	for _, f := range closedFiles {
		receiptFiles = append(receiptFiles, f.receiptPath)
	}
	if maxBlock, ok, err := reader.MaxReceiptBlockNumber(context.Background(), receiptFiles); err != nil {
		return nil, err
	} else if ok {
		latest, err := int64FromUint64(maxBlock)
		...
		c.latestVersion = latest
		if maxBlock < ^uint64(0) {
			c.fileStartBlock = maxBlock + 1
		}
	}

This is correct (the open file has no parquet trailer post-crash and is removed by validateAndCleanFiles), and replay then bumps latestVersion further. But since the call passes a stale context.Background() rather than a derived context, a hung DuckDB scan during init can wedge New indefinitely with no caller-side timeout. Consider plumbing a config-level init timeout.

19. Response channel buffer mismatch is structurally fragile

Most response channels are make(chan T, 1) so handlers send-without-blocking even if the caller has gone away. But TestUnbufferedRequestsApplyBackpressure deliberately uses an unbuffered firstResp:

	firstResp := make(chan writeResp)
	coord.requests <- writeReq{
		inputs: []parquet.ReceiptInput{testReceiptInput(1, common.HexToHash("0x1"))},
		resp:   firstResp,
	}

The test passes only because the mock has no WAL (so writeReceipts errors immediately). In normal (production) use, callers always buffer their resp. But there's no contract enforced anywhere. If a caller forgets, the run goroutine wedges forever on req.resp <- writeResp{...} — an unrecoverable hang of the whole store.

Recommendation: have writeReq{}.dispatch send via select { case req.resp <- ...: case <-c.done: } so a wedged caller can't pin the run goroutine.

This generalizes to every handler — they all do unconditional req.resp <- .... Cheap to fix while you're still in the "correctness only" phase.

20. writeReceipts's rotation guard checks c.receiptWriter != nil but the WAL entry was already written

	if err := c.wal.Write(parquet.WALEntry{BlockNumber: height, Receipts: receiptBytes}); err != nil {
		return err
	}

	if h := c.faultHooks; h != nil && h.AfterWALWrite != nil {
		if err := h.AfterWALWrite(height); err != nil {
			return err
		}
	}

	if c.receiptWriter != nil && height != c.lastSeenBlock && c.isRotationBoundary(height) {
		if err := c.rotateOpenFile(height); err != nil {
			return err
		}
	}

Suppose MaxBlocksPerFile=4, fresh store, first write is WriteReceipts(8, [r]):

  • WAL entry written for block 8 (offset 1).
  • c.receiptWriter == nil → rotation skipped.
  • applyReceipt(8, r):
    • aligned = alignedFileStartBlock(8, 4) = 8. 8 >= 0, set fileStartBlock = 8. initWriters() opens receipts_8.parquet.
    • Buffer the receipt.

Now WriteReceipts(12, [r]):

  • WAL entry for block 12 (offset 2).
  • c.receiptWriter != nil, 12 != 8, isRotationBoundary(12) = true → rotation. receipts_8.parquet closed, receipts_12.parquet opened. clearWALPreservingLast keeps offset 2 only.

Looks right.

Now consider WriteReceipts(0, [r]) first, then WriteReceipts(4, [r]):

  • After first: lastSeenBlock=0 (still!), buffer has block 0.
  • Second: WAL writes block 4. c.receiptWriter != nil, 4 != 0 (true), isRotationBoundary(4)=true → rotate. Block 0 flushed to receipts_0.parquet. New file at 4. Apply block 4.

That works because 0 != 4. But: after the second call, lastSeenBlock is now 4. Third WriteReceipts(8, [r]):

  • WAL writes block 8. 8 != 4, isRotationBoundary(8)=true → rotate.

OK, good. The flow is fine — but only because of careful ordering. Document the invariant ("the WAL entry for the boundary block must precede rotation; rotation preserves it via clearWALPreservingLast") more loudly. The existing comment on v1's WriteReceipts is the right model and isn't reproduced verbatim on v2's writeReceipts.

21. validateAndCleanFiles removes the corrupt counterpart with os.Remove, ignoring the error

	lastFile := files[len(files)-1]
	if reader.isFileReadable(lastFile) {
		return files
	}

	startBlock := parquet.ExtractBlockNumber(lastFile)
	_ = os.Remove(lastFile)
	counterpart := filepath.Join(basePath, fmt.Sprintf("%s_%d.parquet", counterpartPrefix, startBlock))
	_ = os.Remove(counterpart)
	return files[:len(files)-1]

Two issues:

  • os.Remove errors are silently dropped. If the FS is read-only or the file is locked (Windows), we return as if cleanup succeeded and the next probe will keep finding the same trailing-corrupt file forever. At minimum log the failure; preferably surface an error so New can return.
  • validateAndCleanFiles is called twice: once for receipts (with counterpart=logs), once for logs (counterpart=receipts). If the receipts probe says "trailing receipts is corrupt", we delete the receipts AND the matching logs file. Then the logs slice is still re-globbed by parquetFilesByPrefix before the second call — wait no, scanClosedFiles does the receipts and logs globs first, then calls validateAndCleanFiles on each. So the logs slice at the time of the second call still contains the path we just os.Remove'd. The probe on the logs slice's trailing file may now be a phantom (already deleted) — isFileReadable will fail (file not found), and we'd try to os.Remove it (already gone, OK) and its receipts counterpart (also gone). Both os.Remove calls fail but we ignore errors. The slice ends up correct because the final loop checks fileExists. Net effect is OK, but the logic is brittle.

22. dropTempCacheBefore uses entries[:0] in-place

func (c *Coordinator) dropTempCacheBefore(blockNumber uint64) {
	for txHash, entries := range c.tempWriteCache {
		kept := entries[:0]
		for _, entry := range entries {
			if entry.blockNumber >= blockNumber {
				kept = append(kept, entry)
			}
		}

Standard "filter in place" — safe because we only append fewer or equal elements than we read. delete during range is also explicitly allowed. But: the range entries copies the slice header before the inner loop, so even if kept shares the backing array, the iteration is over the snapshot. Looks fine.

23. Replay's dropOffset decrements without checking offset == 1

		if haveBlock && blockNumber != currentBlock && c.isRotationBoundary(blockNumber) && blockNumber > c.fileStartBlock && offset > 0 {
			dropOffset = offset - 1
		}

The offset > 0 guard is redundant — WAL offsets are 1-indexed. If you wanted to defend against offset == 0, the rotation detection couldn't fire there anyway because haveBlock would be false (we set it when we apply something). Cosmetic, but the condition is confusing.

24. parquet_v2/store.go: SimulateCrash's contract under parquet_v2_receipt_store.Close

The wrapper uses closeOnce to gate teardown. After simulateCrashV2 (which calls store.SimulateCrash()), the next caller to parquetReceiptStoreV2.Close() runs through the wrapper's closeOnce and does:

func (s *parquetReceiptStoreV2) Close() error {
	s.closeOnce.Do(func() {
		if s.indexPruner != nil {
			s.indexPruner.Stop()
		}
		s.closeErr = s.store.Close()
		if s.txHashIndex != nil {
			if err := s.txHashIndex.Close(); err != nil && s.closeErr == nil {
				s.closeErr = err
			}
		}
	})
	return s.closeErr
}

So the wrapper will call store.Close() after a SimulateCrash, which goes through the coordinator's own closeOnce → no-op → returns nil. The wrapper then closes the tx-hash index, which is what simulateCrashV2 already did via CloseTxHashIndex. Idempotency holds, but it's two layers of "I might already be closed; trust me." Fragile.


Things that are not bugs but I want to flag

  • The single-goroutine ownership model is clean and easy to reason about. Most of the issues above are corner cases.
  • closeWriters accumulates errors and continues — good defensive cleanup, mirrors v1.
  • cacheRotateInterval atomic mirror is correctly used (only CacheRotateInterval reads it without the run goroutine; handleSetMaxBlocksPerFile writes it under the run goroutine).
  • WAL-entry-before-rotate ordering is preserved (writeReceipts writes WAL → checks rotation → applies receipts), and clearWALPreservingLast correctly retains the boundary entry.
  • replayWAL's logic matches the post-rotation invariant; the test suite exercises the right cases (skip-low, rotate-without-clear, duplicate hashes).

If WAL replay lazily opens the receipt/log writers via initWriters and
then a later replay step fails, New returns without starting the run
goroutine, so the writers and their *os.File handles were leaking until
process exit. Adds a deferred cleanupWriters guard mirroring the
existing reader/WAL cleanup pattern, plus a regression test that
injects a converter failure and verifies the on-disk parquet files are
re-readable after the failed New call.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants