Parquet receiptdb refactor coordinator thread#3345
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
cody-littley
left a comment
There was a problem hiding this comment.
Attached is an LLM audit. I did not verify if these bugs were real, so it's possible some may be false positives. Usually when I generate these sorts of reports on my own code, at least two thirds of the issues it finds tend to be real.
The rotation interval is set at construction and only mutated by the test-only SetMaxBlocksPerFile, so the request-channel round-trip on every read isn't needed. Drop the request type/handler/dispatch case and read c.config.MaxBlocksPerFile directly with a doc comment about the no-race-with-mutation contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…coord request Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the pruneTicker/pruneTick struct fields and stopPruneTicker() helper with a local ticker in run() guarded by defer ticker.Stop(). The defer covers all three exit paths (done, simulateCrashReq, closeReq), so the explicit stopPruneTicker() calls in handleClose and handleSimulateCrash are no longer needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the public ReplayWAL methods with construction-time replay driven by a ReplayHooks struct (converter + per-block tx-index callback). Wrappers now open the tx-hash index first, build the hooks closure, and pass it into NewStore — by the time NewStore returns, parquet state, tx index, and warmup records are all hot. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
NewStore is back to a single argument. The converter is a config field
("what kind of receipts does this store hold"), not a wiring callback.
The wrapper drains store.ReplayedBlocks() after construction to
re-populate its tx-hash index — wrapper-specific business stays in the
wrapper.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously, an error in flushOpenFile, closeWriters, or wal.Close would return early from handleClose, leaving the remaining resources (WAL background goroutines, DuckDB reader connection, open file descriptors) attached to the coordinator after run() exited. Now every step runs unconditionally and errors are aggregated via errors.Join. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The captured err was a local in Close(), so a second invocation got a fresh nil — closeOnce.Do skips the closure body but the second caller's local err was never written. Move err onto the struct as closeErr so all callers observe the result of the single close attempt. The happens-before semantics of sync.Once guarantee the read is safe. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CacheRotateInterval was reading c.config.MaxBlocksPerFile directly from caller goroutines while handleSetMaxBlocksPerFile wrote it from the run goroutine — a data race the prior comment acknowledged but didn't fix. Mirror the value in an atomic.Uint64 that handleSetMaxBlocksPerFile keeps in sync, and read from the mirror. Also thread context through sendAndAwaitResponse/awaitError so reads respect caller cancellation, and gate parquetReceiptStoreV2.Close on a sync.Once (indexPruner.Stop closes a channel and panics on a second call). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Overall, moving in a good direction. An LLM provided the following audit report. Can you please work through these? If you think the AI is incorrect, feel free to explain why and to skip the fix. Everything below this header is LLM generated: Deep Audit:
|
If WAL replay lazily opens the receipt/log writers via initWriters and then a later replay step fails, New returns without starting the run goroutine, so the writers and their *os.File handles were leaking until process exit. Adds a deferred cleanupWriters guard mirroring the existing reader/WAL cleanup pattern, plus a regression test that injects a converter failure and verifies the on-disk parquet files are re-readable after the failed New call. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Describe your changes and provide context
mutable state (closed files, writers, WAL, version metadata) and serializes requests over a
channel.
coordinatorpackage handles event loop, WAL append/replay, rotation, pruning, andDuckDB-backed reads;
parquet_v2.Storeis a thin facade wrapping it.rs-backend = "parquet_v2";default remains
pebbledb, publicReceiptStoreinterface is unchanged, and v2 writes toits own data subdirectory so upgrading nodes are unaffected.
serialize with writes. This is intentional for this PR to keep the correctness story simple;
a follow-up will move reads off the loop to restore the unbounded read concurrency v1 had
via
RWMutex.parquet file, offloading the actual parquet append from the coordinator loop while the
coordinator continues to own rotation and WAL ordering.
Testing performed to validate your change
SimulateCrash.ReceiptStoreinterface confirm parity with existingbackends on the public API.