diff --git a/src/paimon/common/compression/block_compression_factory.cpp b/src/paimon/common/compression/block_compression_factory.cpp new file mode 100644 index 0000000..4f5e2fd --- /dev/null +++ b/src/paimon/common/compression/block_compression_factory.cpp @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/compression/block_compression_factory.h" + +#include "fmt/format.h" +#include "paimon/common/compression/lz4/lz4_block_compression_factory.h" +#include "paimon/common/compression/none_block_compression_factory.h" +#include "paimon/common/compression/zstd/zstd_block_compression_factory.h" +#include "paimon/common/utils/string_utils.h" + +namespace paimon { + +Result> BlockCompressionFactory::Create( + const CompressOptions& compression) { + auto compress = StringUtils::ToLowerCase(compression.compress); + if (compress == "none") { + return std::make_shared(); + } else if (compress == "zstd") { + return std::make_shared(compression.zstd_level); + } else if (compress == "lz4") { + return std::make_shared(); + } + // TODO(liangzi): LZO support + return Status::Invalid(fmt::format("Unsupported compression type: {}", compress)); +} + +Result> BlockCompressionFactory::Create( + BlockCompressionType compression) { + switch (compression) { + case BlockCompressionType::NONE: + return std::make_shared(); + case BlockCompressionType::LZ4: + return std::make_shared(); + case BlockCompressionType::ZSTD: + return std::make_shared(ZSTD_COMPRESSION_LEVEL); + default: + // TODO(liangzi): LZO support + return Status::Invalid( + fmt::format("Unsupported compression type: {}", static_cast(compression))); + } +} +} // namespace paimon diff --git a/src/paimon/common/compression/block_compression_factory.h b/src/paimon/common/compression/block_compression_factory.h new file mode 100644 index 0000000..f103fc8 --- /dev/null +++ b/src/paimon/common/compression/block_compression_factory.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/compression/block_compression_type.h" +#include "paimon/common/compression/block_compressor.h" +#include "paimon/common/compression/block_decompressor.h" +#include "paimon/core/options/compress_options.h" +#include "paimon/result.h" +#include "paimon/visibility.h" +namespace paimon { + +/// Each compression codec has an implementation of {@link BlockCompressionFactory} to create +/// compressors and decompressors. +class PAIMON_EXPORT BlockCompressionFactory { + public: + static Result> Create( + const CompressOptions& compression); + + static Result> Create( + BlockCompressionType compress_type); + + BlockCompressionFactory() = default; + virtual ~BlockCompressionFactory() = default; + + public: + virtual BlockCompressionType GetCompressionType() const = 0; + + virtual std::shared_ptr GetCompressor() = 0; + + virtual std::shared_ptr GetDecompressor() = 0; + + private: + // Align java implementation + static constexpr int32_t ZSTD_COMPRESSION_LEVEL = 1; +}; +} // namespace paimon diff --git a/src/paimon/common/compression/block_compression_factory_test.cpp b/src/paimon/common/compression/block_compression_factory_test.cpp new file mode 100644 index 0000000..107d20a --- /dev/null +++ b/src/paimon/common/compression/block_compression_factory_test.cpp @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/compression/block_compression_factory.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/mock/mock_file_batch_reader.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon { +class Predicate; +} // namespace paimon + +namespace paimon::test { +class CompressionFactoryTest : public ::testing::TestWithParam {}; + +TEST_P(CompressionFactoryTest, TESTCompressThenDecompress) { + int32_t original_len = 16; + BlockCompressionType type = GetParam(); + + std::string data(original_len, '\0'); + for (int32_t i = 0; i < original_len; i++) { + data[i] = static_cast(i); + } + + ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type)); + ASSERT_EQ(type, factory->GetCompressionType()); + + // compress + auto compressor = factory->GetCompressor(); + auto max_len = compressor->GetMaxCompressedSize(data.size()); + std::string compressed_data(max_len, '\0'); + auto compressed_size = + compressor->Compress(data.data(), data.size(), compressed_data.data(), max_len); + ASSERT_OK(compressed_size); + ASSERT_GT(compressed_size.value(), 0); + compressed_data.resize(compressed_size.value()); + + // decompress + auto decompressor = factory->GetDecompressor(); + std::string decompressed_data(original_len, '\0'); + auto decompressed_size = + decompressor->Decompress(compressed_data.data(), compressed_data.size(), + decompressed_data.data(), decompressed_data.size()); + ASSERT_OK(decompressed_size); + ASSERT_GT(decompressed_size.value(), 0); + ASSERT_EQ(data, decompressed_data); + + std::string read_write_le{4, '\0'}; + compressor->WriteIntLE(123, read_write_le.data()); + ASSERT_EQ(123, decompressor->ReadIntLE(read_write_le.data())); + compressor->WriteIntLE(100000, read_write_le.data()); + ASSERT_EQ(100000, decompressor->ReadIntLE(read_write_le.data())); + compressor->WriteIntLE(-6555, read_write_le.data()); + ASSERT_EQ(-6555, decompressor->ReadIntLE(read_write_le.data())); + compressor->WriteIntLE(0, read_write_le.data()); + ASSERT_EQ(0, decompressor->ReadIntLE(read_write_le.data())); +} + +TEST_P(CompressionFactoryTest, TestDecompressTruncatedHeader) { + BlockCompressionType type = GetParam(); + ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type)); + auto decompressor = factory->GetDecompressor(); + + // Source shorter than 8-byte header should return Invalid, not read out of bounds + char short_buf[] = {0x01, 0x02, 0x03}; + char dst[64] = {}; + ASSERT_NOK(decompressor->Decompress(short_buf, 3, dst, sizeof(dst))); +} + +TEST_P(CompressionFactoryTest, TestCompressInsufficientOutputBuffer) { + BlockCompressionType type = GetParam(); + ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type)); + auto compressor = factory->GetCompressor(); + + // Incompressible data with a tiny output buffer should fail, not produce a corrupt block + std::string data(1024, '\0'); + for (int32_t i = 0; i < static_cast(data.size()); i++) { + data[i] = static_cast(i % 251); + } + // Output buffer too small: only HEADER_LENGTH bytes, no room for compressed payload + std::string tiny_dst(8, '\0'); + ASSERT_NOK(compressor->Compress(data.data(), data.size(), tiny_dst.data(), tiny_dst.size())); + + // Output buffer smaller than HEADER_LENGTH — must not trigger UB from negative capacity + char micro_dst[4] = {}; + ASSERT_NOK(compressor->Compress(data.data(), data.size(), micro_dst, sizeof(micro_dst))); + + // Zero-length output buffer + ASSERT_NOK(compressor->Compress(data.data(), data.size(), nullptr, 0)); +} + +INSTANTIATE_TEST_SUITE_P(BlockCompressionTypeGroup, CompressionFactoryTest, + ::testing::Values(BlockCompressionType::LZ4, BlockCompressionType::ZSTD)); + +} // namespace paimon::test diff --git a/src/paimon/common/compression/block_compression_type.h b/src/paimon/common/compression/block_compression_type.h new file mode 100644 index 0000000..16126a8 --- /dev/null +++ b/src/paimon/common/compression/block_compression_type.h @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/result.h" + +namespace paimon { + +/// Block Compression type. +enum class BlockCompressionType { NONE = 0, ZSTD = 1, LZ4 = 2, LZO = 3 }; + +} // namespace paimon diff --git a/src/paimon/common/compression/block_compressor.cpp b/src/paimon/common/compression/block_compressor.cpp new file mode 100644 index 0000000..f2ed4ca --- /dev/null +++ b/src/paimon/common/compression/block_compressor.cpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/compression/block_compressor.h" + +namespace paimon { + +void BlockCompressor::WriteIntLE(int32_t val, char* buf) { + buf[0] = static_cast(val); + buf[1] = static_cast(val >> 8); + buf[2] = static_cast(val >> 16); + buf[3] = static_cast(val >> 24); +} + +} // namespace paimon diff --git a/src/paimon/common/compression/block_compressor.h b/src/paimon/common/compression/block_compressor.h new file mode 100644 index 0000000..473afb7 --- /dev/null +++ b/src/paimon/common/compression/block_compressor.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/result.h" + +namespace paimon { + +/// A compressor which compresses a whole byte array each time. +class BlockCompressor { + public: + static void WriteIntLE(int32_t val, char* buf); + + public: + virtual ~BlockCompressor() = default; + + public: + /// Get the max compressed size for a given original size. + /// @param src_size The original size + /// @return The max compressed size + virtual int32_t GetMaxCompressedSize(int32_t src_size) = 0; + + /// Compress data read from src, and write the compressed data to dst. + /// + /// @param src Uncompressed data to read from + /// @param src_length The length of data which want to be compressed + /// @param dst The target to write compressed data + /// @param dst_length The max length of data + /// @return Length of compressed data + virtual Result Compress(const char* src, int32_t src_length, char* dst, + int32_t dst_length) = 0; + + public: + /// We put two integers before each compressed block, the first integer represents the + /// compressed length of the block, and the second one represents the original length of the + /// block. + static constexpr int32_t HEADER_LENGTH = 8; +}; +} // namespace paimon diff --git a/src/paimon/common/compression/block_decompressor.cpp b/src/paimon/common/compression/block_decompressor.cpp new file mode 100644 index 0000000..5356d74 --- /dev/null +++ b/src/paimon/common/compression/block_decompressor.cpp @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/compression/block_decompressor.h" + +#include "fmt/format.h" +namespace paimon { + +int32_t BlockDecompressor::ReadIntLE(const char* buf) { + return static_cast(static_cast(static_cast(buf[0])) | + (static_cast(static_cast(buf[1])) << 8) | + (static_cast(static_cast(buf[2])) << 16) | + (static_cast(static_cast(buf[3])) << 24)); +} + +Status BlockDecompressor::ValidateLength(int32_t compressed_len, int32_t original_len) { + if (original_len < 0 || compressed_len < 0 || (original_len == 0 && compressed_len != 0) || + (original_len != 0 && compressed_len == 0)) { + return Status::Invalid( + fmt::format("Input is corrupted, compressed_len={}, , original_len={}", compressed_len, + original_len)); + } + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/common/compression/block_decompressor.h b/src/paimon/common/compression/block_decompressor.h new file mode 100644 index 0000000..582ad4e --- /dev/null +++ b/src/paimon/common/compression/block_decompressor.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/result.h" + +namespace paimon { + +/// A decompressor which decompresses a block each time. +class BlockDecompressor { + public: + static int32_t ReadIntLE(const char* buf); + + static Status ValidateLength(int32_t compressed_len, int32_t original_len); + + public: + virtual ~BlockDecompressor() = default; + + public: + /// Decompress data read from src, and write the decompressed data to dst. + /// + /// @param src Compressed data to read from + /// @param src_length The length of data which want to be decompressed + /// @param dst The target to write decompressed data + /// @param dst_length The max length of data + /// @return Length of decompressed data + virtual Result Decompress(const char* src, int32_t src_length, char* dst, + int32_t dst_length) = 0; + + public: + /// We put two integers before each compressed block, the first integer represents the + /// compressed length of the block, and the second one represents the original length of the + /// block. + static constexpr int32_t HEADER_LENGTH = 8; +}; +} // namespace paimon diff --git a/src/paimon/common/compression/lz4/lz4_block_compression_factory.h b/src/paimon/common/compression/lz4/lz4_block_compression_factory.h new file mode 100644 index 0000000..99c2362 --- /dev/null +++ b/src/paimon/common/compression/lz4/lz4_block_compression_factory.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/compression/block_compression_factory.h" +#include "paimon/common/compression/lz4/lz4_block_compressor.h" +#include "paimon/common/compression/lz4/lz4_block_decompressor.h" +#include "paimon/result.h" + +namespace paimon { + +/// Implementation of {@link BlockCompressionFactory} for lz4 codec. +class Lz4BlockCompressionFactory : public BlockCompressionFactory { + public: + BlockCompressionType GetCompressionType() const override { + return BlockCompressionType::LZ4; + } + + std::shared_ptr GetCompressor() override { + return std::make_shared(); + } + + std::shared_ptr GetDecompressor() override { + return std::make_shared(); + } +}; +} // namespace paimon diff --git a/src/paimon/common/compression/lz4/lz4_block_compressor.h b/src/paimon/common/compression/lz4/lz4_block_compressor.h new file mode 100644 index 0000000..f8d941c --- /dev/null +++ b/src/paimon/common/compression/lz4/lz4_block_compressor.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "fmt/format.h" +#include "paimon/common/compression/block_compressor.h" + +namespace paimon { + +/// Encode data into LZ4 format (not compatible with the LZ4 Frame format). +class Lz4BlockCompressor : public BlockCompressor { + public: + int32_t GetMaxCompressedSize(int32_t src_size) override { + return BlockCompressor::HEADER_LENGTH + LZ4_compressBound(src_size); + } + + Result Compress(const char* src, int32_t src_length, char* dst, + int32_t dst_length) override { + if (dst_length < BlockCompressor::HEADER_LENGTH) { + return Status::Invalid(fmt::format( + "Output buffer too small for LZ4 block header, expected at least {} bytes, got {}", + BlockCompressor::HEADER_LENGTH, dst_length)); + } + int32_t compressed_size = + LZ4_compress_default(src, dst + BlockCompressor::HEADER_LENGTH, src_length, + dst_length - BlockCompressor::HEADER_LENGTH); + if (compressed_size <= 0) { + return Status::Invalid(fmt::format("Compression failed with code {}", compressed_size)); + } + WriteIntLE(compressed_size, dst); + WriteIntLE(src_length, dst + 4); + return BlockCompressor::HEADER_LENGTH + compressed_size; + } +}; +} // namespace paimon diff --git a/src/paimon/common/compression/lz4/lz4_block_decompressor.h b/src/paimon/common/compression/lz4/lz4_block_decompressor.h new file mode 100644 index 0000000..edbb2a8 --- /dev/null +++ b/src/paimon/common/compression/lz4/lz4_block_decompressor.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "fmt/format.h" +#include "paimon/common/compression/block_decompressor.h" + +namespace paimon { + +/// Decode data written with {@link Lz4BlockCompressor}. +class Lz4BlockDecompressor : public BlockDecompressor { + public: + Result Decompress(const char* src, int32_t src_length, char* dst, + int32_t dst_length) override { + if (src_length < HEADER_LENGTH) { + return Status::Invalid(fmt::format( + "Source data too short for LZ4 block header, expected at least {} bytes, got {}", + HEADER_LENGTH, src_length)); + } + auto compressed_len = ReadIntLE(src); + auto original_len = ReadIntLE(src + 4); + PAIMON_RETURN_NOT_OK(ValidateLength(compressed_len, original_len)); + + if (dst_length < original_len) { + return Status::Invalid( + fmt::format("Buffer length too small, compressed_len= {}, original_len={}", + compressed_len, original_len)); + } + + if (src_length - HEADER_LENGTH < compressed_len) { + return Status::Invalid("Source data is not integral for decompression."); + } + + int32_t decompressed_size = + LZ4_decompress_safe(src + HEADER_LENGTH, dst, src_length - HEADER_LENGTH, dst_length); + if (decompressed_size != original_len) { + return Status::Invalid(fmt::format("Input is corrupted, expected {}, but got {}", + original_len, decompressed_size)); + } + return decompressed_size; + } +}; +} // namespace paimon diff --git a/src/paimon/common/compression/none_block_compression_factory.h b/src/paimon/common/compression/none_block_compression_factory.h new file mode 100644 index 0000000..19afbfa --- /dev/null +++ b/src/paimon/common/compression/none_block_compression_factory.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/compression/block_compression_factory.h" +#include "paimon/result.h" + +namespace paimon { + +/// Implementation of {@link BlockCompressionFactory} for none. +class NoneBlockCompressionFactory : public BlockCompressionFactory { + public: + BlockCompressionType GetCompressionType() const override { + return BlockCompressionType::NONE; + } + + std::shared_ptr GetCompressor() override { + return nullptr; + } + + std::shared_ptr GetDecompressor() override { + return nullptr; + } +}; +} // namespace paimon diff --git a/src/paimon/common/compression/zstd/zstd_block_compression_factory.h b/src/paimon/common/compression/zstd/zstd_block_compression_factory.h new file mode 100644 index 0000000..ab8c8ac --- /dev/null +++ b/src/paimon/common/compression/zstd/zstd_block_compression_factory.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/compression/block_compression_factory.h" +#include "paimon/common/compression/zstd/zstd_block_compressor.h" +#include "paimon/common/compression/zstd/zstd_block_decompressor.h" +#include "paimon/result.h" + +namespace paimon { + +/// Implementation of {@link BlockCompressionFactory} for zstd codec. +class ZstdBlockCompressionFactory : public BlockCompressionFactory { + public: + explicit ZstdBlockCompressionFactory(int32_t level = 1) : level_(level) {} + + public: + BlockCompressionType GetCompressionType() const override { + return BlockCompressionType::ZSTD; + } + + std::shared_ptr GetCompressor() override { + return std::make_shared(level_); + } + + std::shared_ptr GetDecompressor() override { + return std::make_shared(); + } + + private: + int32_t level_; +}; +} // namespace paimon diff --git a/src/paimon/common/compression/zstd/zstd_block_compressor.h b/src/paimon/common/compression/zstd/zstd_block_compressor.h new file mode 100644 index 0000000..3645309 --- /dev/null +++ b/src/paimon/common/compression/zstd/zstd_block_compressor.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "fmt/format.h" +#include "paimon/common/compression/block_compressor.h" + +namespace paimon { + +/// A {@link BlockCompressor} for zstd. +class ZstdBlockCompressor : public BlockCompressor { + public: + explicit ZstdBlockCompressor(int32_t level) : level_(level) {} + + int32_t GetMaxCompressedSize(int32_t src_size) override { + return ZSTD_compressBound(src_size); + } + + Result Compress(const char* src, int32_t src_length, char* dst, + int32_t dst_length) override { + size_t const compressed_size = ZSTD_compress(dst, dst_length, src, src_length, level_); + if (ZSTD_isError(compressed_size)) { + return Status::Invalid(fmt::format("Compression failed with code {}", compressed_size)); + } + return compressed_size; + } + + private: + int32_t level_; +}; +} // namespace paimon diff --git a/src/paimon/common/compression/zstd/zstd_block_decompressor.h b/src/paimon/common/compression/zstd/zstd_block_decompressor.h new file mode 100644 index 0000000..2494258 --- /dev/null +++ b/src/paimon/common/compression/zstd/zstd_block_decompressor.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "fmt/format.h" +#include "paimon/common/compression/block_decompressor.h" + +namespace paimon { + +/// Decode data written with {@link ZstdBlockDecompressor}. +class ZstdBlockDecompressor : public BlockDecompressor { + public: + Result Decompress(const char* src, int32_t src_length, char* dst, + int32_t dst_length) override { + int32_t decompressed_size = ZSTD_decompress(dst, dst_length, src, src_length); + if (ZSTD_isError(decompressed_size)) { + return Status::Invalid( + fmt::format("Input is corrupted with return code {}", decompressed_size)); + } + return decompressed_size; + } +}; +} // namespace paimon diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp b/src/paimon/format/parquet/predicate_pushdown_test.cpp index 67d7cd5..7ccbaa2 100644 --- a/src/paimon/format/parquet/predicate_pushdown_test.cpp +++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp @@ -345,7 +345,7 @@ TEST_F(PredicatePushdownTest, TestStringData) { CheckResult(read_schema, predicate, /*expected_array=*/expected_array); } { - // f0 like 'me', no data + // f0 like 'me', has data ASSERT_OK_AND_ASSIGN(const auto predicate, PredicateBuilder::Like( /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,