[CELEBORN-2350] Support chunk level compression to optimize storage#3699
[CELEBORN-2350] Support chunk level compression to optimize storage#3699saurabhd336 wants to merge 21 commits into
Conversation
|
Hi team @SteNicholas / @s0nskar / @zaynt4606 / others I wanted to start an early discussion for these proposed changes (can share a design doc too). At a high level, our Celeborn infra costs have largely been influenced by large locally attached SSD requirements. Additionally, also looking for ways to reduce celeborn network ingress / egress. This change helps is reducing both. Wanted to start this discussion for the change thanks |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3699 +/- ##
==========================================
+ Coverage 66.91% 67.28% +0.37%
==========================================
Files 358 360 +2
Lines 21986 22337 +351
Branches 1946 1982 +36
==========================================
+ Hits 14710 15027 +317
- Misses 6262 6285 +23
- Partials 1014 1025 +11 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
1d92a40 to
cf8d472
Compare
…ge-record chunks readChunks() was unconditionally decompressing every chunk, but large records are written raw (uncompressed) by flushLargeRecord(). The fix consults ReduceFileMeta.getChunkCompressed() per chunk and only calls ZstdInputStream on chunks that were actually compressed. Also exposes compressAndFlush() as public so the test can call it directly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Hi team Ping on this again! So far, i've been able to get ~40% reduction in celeborn worker disk usage with a roughly 80% increase in CPU usage. A lot of times, the CPU usage of our celeborn fleets is idle, hence this tradeoff feels reasonable and worth testing at scale for us. Can you please take a look at this PR? |
What changes were proposed in this pull request?
Adds chunk-level ZSTD compression on the worker write path and streaming decompression on the client read path. Records accumulate in a fixed-size chunk buffer (default 8 MB); when the buffer overflows it is compressed as a single ZSTD frame and written to disk file. On the read side, CelebornInputStream wraps each fetched chunk in a ZstdInputStream to uncompress.
This is orthogonal to the existing batch-level LZ4/ZSTD codec: both can be active simultaneously, or the batch codec can be NONE.
This change primarily helps in reducing the disk usage (~40% lower disk usage seen in tests) as well as read flow celeborn network egress.
Impl details
Writer side
FileChannelWriterinterface which supports write / close functionalities.BypassFileChannelWriteris the default and ensures the current behaviour (directly write flushBuffer to disk file channel).ChunkCompressedFileChannelWriter: Accumulates records in a directByteBufferofchunkSizebytes. On overflow, ZSTD-compresses and writes as a single frame. Records larger thanchunkSizestream directly to disk viaZstdOutputStream. Replaces compressed chunk-boundary offsets intoReduceFileMetaon close. Also updates thebytesFlushedto overwrite the FileInfo length post close. The buffers used to buffer chunkSize data before compression and flush is powered byMmapMemoryManagerandChunkBufferPoolwhich uses mmap'ed temporary files to avoid the memory overhead of buffering chunk sized data.ChunkCompressedFileChannelWriterandFileChannelWriteris made basis the new config set by client duringReserveSlots(conf.isChunkCompressionEnabled).Read side
CelebornInputStream: When reading chunkCompressed chunks, wraps the readByteBufinto a ZSTDIs to inplace decompress and read.Configs added
celeborn.chunk.compression.enabledfalseCelebornInputStream.Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
celeborn.chunk.compression.enabledconfig to enable / disable chunk level compression (disabled by default)How was this patch tested?
UTs, ITs