Skip to content

[CELEBORN-2350] Support chunk level compression to optimize storage#3699

Open
saurabhd336 wants to merge 21 commits into
apache:mainfrom
saurabhd336:chunkCompressedWriter
Open

[CELEBORN-2350] Support chunk level compression to optimize storage#3699
saurabhd336 wants to merge 21 commits into
apache:mainfrom
saurabhd336:chunkCompressedWriter

Conversation

@saurabhd336
Copy link
Copy Markdown
Contributor

@saurabhd336 saurabhd336 commented May 23, 2026

What changes were proposed in this pull request?

Adds chunk-level ZSTD compression on the worker write path and streaming decompression on the client read path. Records accumulate in a fixed-size chunk buffer (default 8 MB); when the buffer overflows it is compressed as a single ZSTD frame and written to disk file. On the read side, CelebornInputStream wraps each fetched chunk in a ZstdInputStream to uncompress.

This is orthogonal to the existing batch-level LZ4/ZSTD codec: both can be active simultaneously, or the batch codec can be NONE.

This change primarily helps in reducing the disk usage (~40% lower disk usage seen in tests) as well as read flow celeborn network egress.

Impl details

Writer side

  1. Added a new FileChannelWriter interface which supports write / close functionalities. BypassFileChannelWriter is the default and ensures the current behaviour (directly write flushBuffer to disk file channel).
  2. Added ChunkCompressedFileChannelWriter: Accumulates records in a direct ByteBuffer of chunkSize bytes. On overflow, ZSTD-compresses and writes as a single frame. Records larger than chunkSize stream directly to disk via ZstdOutputStream. Replaces compressed chunk-boundary offsets into ReduceFileMeta on close. Also updates the bytesFlushed to overwrite the FileInfo length post close. The buffers used to buffer chunkSize data before compression and flush is powered by MmapMemoryManager and ChunkBufferPool which uses mmap'ed temporary files to avoid the memory overhead of buffering chunk sized data.
  3. Choice b/w ChunkCompressedFileChannelWriter and FileChannelWriter is made basis the new config set by client during ReserveSlots (conf.isChunkCompressionEnabled).

Read side

  1. No changes in the worker (YET TO IMPLEMENT: Partition sorting during AQE flow)
  2. CelebornInputStream: When reading chunkCompressed chunks, wraps the read ByteBuf into a ZSTDIs to inplace decompress and read.

Configs added

Key Default Meaning
celeborn.chunk.compression.enabled false Client side config. Enables chunk-level ZSTD compression on the worker write path and transparent decompression in CelebornInputStream.

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

celeborn.chunk.compression.enabled config to enable / disable chunk level compression (disabled by default)

How was this patch tested?

UTs, ITs

@saurabhd336 saurabhd336 changed the title [FEATURE] [WIP] Support chunk level compression to optimize storage [CELEBORN-XXXX] [FEATURE] [WIP] Support chunk level compression to optimize storage May 25, 2026
@saurabhd336
Copy link
Copy Markdown
Contributor Author

Hi team @SteNicholas / @s0nskar / @zaynt4606 / others

I wanted to start an early discussion for these proposed changes (can share a design doc too).

At a high level, our Celeborn infra costs have largely been influenced by large locally attached SSD requirements. Additionally, also looking for ways to reduce celeborn network ingress / egress. This change helps is reducing both.

Wanted to start this discussion for the change thanks

@codecov
Copy link
Copy Markdown

codecov Bot commented May 26, 2026

Codecov Report

❌ Patch coverage is 95.00000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.28%. Comparing base (b4cb5a0) to head (39595da).
⚠️ Report is 54 commits behind head on main.

Files with missing lines Patch % Lines
.../org/apache/celeborn/common/meta/DiskFileInfo.java 85.72% 1 Missing ⚠️
...rg/apache/celeborn/common/meta/ReduceFileMeta.java 88.89% 0 Missing and 1 partial ⚠️
...leborn/common/network/buffer/FileChunkBuffers.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3699      +/-   ##
==========================================
+ Coverage   66.91%   67.28%   +0.37%     
==========================================
  Files         358      360       +2     
  Lines       21986    22337     +351     
  Branches     1946     1982      +36     
==========================================
+ Hits        14710    15027     +317     
- Misses       6262     6285      +23     
- Partials     1014     1025      +11     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 1d92a40 to cf8d472 Compare May 27, 2026 02:11
saurabhd336 and others added 9 commits June 1, 2026 12:48
…ge-record chunks

readChunks() was unconditionally decompressing every chunk, but large
records are written raw (uncompressed) by flushLargeRecord(). The fix
consults ReduceFileMeta.getChunkCompressed() per chunk and only calls
ZstdInputStream on chunks that were actually compressed. Also exposes
compressAndFlush() as public so the test can call it directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@saurabhd336 saurabhd336 changed the title [CELEBORN-XXXX] [FEATURE] [WIP] Support chunk level compression to optimize storage [CELEBORN-XXXX] Support chunk level compression to optimize storage Jun 5, 2026
@saurabhd336
Copy link
Copy Markdown
Contributor Author

Hi team
@SteNicholas / @s0nskar / @zaynt4606 / others

Ping on this again!

So far, i've been able to get ~40% reduction in celeborn worker disk usage with a roughly 80% increase in CPU usage. A lot of times, the CPU usage of our celeborn fleets is idle, hence this tradeoff feels reasonable and worth testing at scale for us. Can you please take a look at this PR?

@saurabhd336 saurabhd336 changed the title [CELEBORN-XXXX] Support chunk level compression to optimize storage [CELEBORN-2350] Support chunk level compression to optimize storage Jun 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant