Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu
long bytesWritten =
Zstd.compressUnsafe(
compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
dstSize,
maxSize,
/*src*/ uncompressedBuffer.memoryAddress(),
/* srcSize= */ uncompressedBuffer.writerIndex(),
/* level= */ this.compressionLevel);
Expand Down

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think this change is fine, nothing should be modifying the buffer during this method. I don't believe any vectors/etc. are safe to use concurrently, so the comment is misleading about the guarantees we can provide.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — the original framing implied a concurrency guarantee we don't actually provide, and there's no in-tree doCompress implementation that mutates the source buffer's writerIndex. Tightened the comment to state only the engineering invariant (three in-method consumers must observe the same value) and dropped the "shared reference" / doCompress speculation:

// GH-1116: capture writerIndex() once so the empty-buffer check, size
// comparison, and uncompressed-length prefix all see the same value.
long uncompressedLength = uncompressedBuffer.writerIndex();

Pushed in d7b6a8f.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ public abstract class AbstractCompressionCodec implements CompressionCodec {

@Override
public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
if (uncompressedBuffer.writerIndex() == 0L) {
// GH-1116: capture writerIndex() once so the empty-buffer check, size
// comparison, and uncompressed-length prefix all see the same value.
long uncompressedLength = uncompressedBuffer.writerIndex();

if (uncompressedLength == 0L) {
// shortcut for empty buffer
ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
compressedBuffer.setLong(0, 0);
Expand All @@ -41,7 +45,6 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer)
ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
long compressedLength =
compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
long uncompressedLength = uncompressedBuffer.writerIndex();

if (compressedLength > uncompressedLength) {
// compressed buffer is larger, send the raw buffer
Expand Down
Loading