Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/paimon/common/compression/block_compression_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/compression/block_compression_factory.h"

#include "fmt/format.h"
#include "paimon/common/compression/lz4/lz4_block_compression_factory.h"
#include "paimon/common/compression/none_block_compression_factory.h"
#include "paimon/common/compression/zstd/zstd_block_compression_factory.h"
#include "paimon/common/utils/string_utils.h"

namespace paimon {

Result<std::shared_ptr<BlockCompressionFactory>> BlockCompressionFactory::Create(
const CompressOptions& compression) {
auto compress = StringUtils::ToLowerCase(compression.compress);
if (compress == "none") {
return std::make_shared<NoneBlockCompressionFactory>();
} else if (compress == "zstd") {
return std::make_shared<ZstdBlockCompressionFactory>(compression.zstd_level);
} else if (compress == "lz4") {
return std::make_shared<Lz4BlockCompressionFactory>();
}
// TODO(liangzi): LZO support
return Status::Invalid(fmt::format("Unsupported compression type: {}", compress));
}

Result<std::shared_ptr<BlockCompressionFactory>> BlockCompressionFactory::Create(
BlockCompressionType compression) {
switch (compression) {
case BlockCompressionType::NONE:
return std::make_shared<NoneBlockCompressionFactory>();
case BlockCompressionType::LZ4:
return std::make_shared<Lz4BlockCompressionFactory>();
case BlockCompressionType::ZSTD:
return std::make_shared<ZstdBlockCompressionFactory>(ZSTD_COMPRESSION_LEVEL);
default:
// TODO(liangzi): LZO support
return Status::Invalid(
fmt::format("Unsupported compression type: {}", static_cast<int32_t>(compression)));
}
}
} // namespace paimon
55 changes: 55 additions & 0 deletions src/paimon/common/compression/block_compression_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>

#include "paimon/common/compression/block_compression_type.h"
#include "paimon/common/compression/block_compressor.h"
#include "paimon/common/compression/block_decompressor.h"
#include "paimon/core/options/compress_options.h"
#include "paimon/result.h"
#include "paimon/visibility.h"
namespace paimon {

/// Each compression codec has an implementation of {@link BlockCompressionFactory} to create
/// compressors and decompressors.
class PAIMON_EXPORT BlockCompressionFactory {
public:
static Result<std::shared_ptr<BlockCompressionFactory>> Create(
const CompressOptions& compression);

static Result<std::shared_ptr<BlockCompressionFactory>> Create(
BlockCompressionType compress_type);

BlockCompressionFactory() = default;
virtual ~BlockCompressionFactory() = default;

public:
virtual BlockCompressionType GetCompressionType() const = 0;

virtual std::shared_ptr<BlockCompressor> GetCompressor() = 0;

virtual std::shared_ptr<BlockDecompressor> GetDecompressor() = 0;

private:
// Align java implementation
static constexpr int32_t ZSTD_COMPRESSION_LEVEL = 1;
};
} // namespace paimon
120 changes: 120 additions & 0 deletions src/paimon/common/compression/block_compression_factory_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/compression/block_compression_factory.h"

#include <cstdint>
#include <string>
#include <utility>
#include <vector>

#include "gtest/gtest.h"
#include "paimon/defs.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/status.h"
#include "paimon/testing/mock/mock_file_batch_reader.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon {
class Predicate;
} // namespace paimon

namespace paimon::test {
class CompressionFactoryTest : public ::testing::TestWithParam<BlockCompressionType> {};

TEST_P(CompressionFactoryTest, TESTCompressThenDecompress) {
int32_t original_len = 16;
BlockCompressionType type = GetParam();

std::string data(original_len, '\0');
for (int32_t i = 0; i < original_len; i++) {
data[i] = static_cast<char>(i);
}

ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
ASSERT_EQ(type, factory->GetCompressionType());

// compress
auto compressor = factory->GetCompressor();
auto max_len = compressor->GetMaxCompressedSize(data.size());
std::string compressed_data(max_len, '\0');
auto compressed_size =
compressor->Compress(data.data(), data.size(), compressed_data.data(), max_len);
ASSERT_OK(compressed_size);
ASSERT_GT(compressed_size.value(), 0);
compressed_data.resize(compressed_size.value());

// decompress
auto decompressor = factory->GetDecompressor();
std::string decompressed_data(original_len, '\0');
auto decompressed_size =
decompressor->Decompress(compressed_data.data(), compressed_data.size(),
decompressed_data.data(), decompressed_data.size());
ASSERT_OK(decompressed_size);
ASSERT_GT(decompressed_size.value(), 0);
ASSERT_EQ(data, decompressed_data);

std::string read_write_le{4, '\0'};
compressor->WriteIntLE(123, read_write_le.data());
ASSERT_EQ(123, decompressor->ReadIntLE(read_write_le.data()));
compressor->WriteIntLE(100000, read_write_le.data());
ASSERT_EQ(100000, decompressor->ReadIntLE(read_write_le.data()));
compressor->WriteIntLE(-6555, read_write_le.data());
ASSERT_EQ(-6555, decompressor->ReadIntLE(read_write_le.data()));
compressor->WriteIntLE(0, read_write_le.data());
ASSERT_EQ(0, decompressor->ReadIntLE(read_write_le.data()));
}

TEST_P(CompressionFactoryTest, TestDecompressTruncatedHeader) {
BlockCompressionType type = GetParam();
ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
auto decompressor = factory->GetDecompressor();

// Source shorter than 8-byte header should return Invalid, not read out of bounds
char short_buf[] = {0x01, 0x02, 0x03};
char dst[64] = {};
ASSERT_NOK(decompressor->Decompress(short_buf, 3, dst, sizeof(dst)));
}

TEST_P(CompressionFactoryTest, TestCompressInsufficientOutputBuffer) {
BlockCompressionType type = GetParam();
ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
auto compressor = factory->GetCompressor();

// Incompressible data with a tiny output buffer should fail, not produce a corrupt block
std::string data(1024, '\0');
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
data[i] = static_cast<char>(i % 251);
}
// Output buffer too small: only HEADER_LENGTH bytes, no room for compressed payload
std::string tiny_dst(8, '\0');
ASSERT_NOK(compressor->Compress(data.data(), data.size(), tiny_dst.data(), tiny_dst.size()));

// Output buffer smaller than HEADER_LENGTH — must not trigger UB from negative capacity
char micro_dst[4] = {};
ASSERT_NOK(compressor->Compress(data.data(), data.size(), micro_dst, sizeof(micro_dst)));

// Zero-length output buffer
ASSERT_NOK(compressor->Compress(data.data(), data.size(), nullptr, 0));
}

INSTANTIATE_TEST_SUITE_P(BlockCompressionTypeGroup, CompressionFactoryTest,
::testing::Values(BlockCompressionType::LZ4, BlockCompressionType::ZSTD));

} // namespace paimon::test
28 changes: 28 additions & 0 deletions src/paimon/common/compression/block_compression_type.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "paimon/result.h"

namespace paimon {

/// Block Compression type.
enum class BlockCompressionType { NONE = 0, ZSTD = 1, LZ4 = 2, LZO = 3 };

} // namespace paimon
30 changes: 30 additions & 0 deletions src/paimon/common/compression/block_compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/compression/block_compressor.h"

namespace paimon {

void BlockCompressor::WriteIntLE(int32_t val, char* buf) {
buf[0] = static_cast<char>(val);
buf[1] = static_cast<char>(val >> 8);
buf[2] = static_cast<char>(val >> 16);
buf[3] = static_cast<char>(val >> 24);
}

} // namespace paimon
55 changes: 55 additions & 0 deletions src/paimon/common/compression/block_compressor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "paimon/result.h"

namespace paimon {

/// A compressor which compresses a whole byte array each time.
class BlockCompressor {
public:
static void WriteIntLE(int32_t val, char* buf);

public:
virtual ~BlockCompressor() = default;

public:
/// Get the max compressed size for a given original size.
/// @param src_size The original size
/// @return The max compressed size
virtual int32_t GetMaxCompressedSize(int32_t src_size) = 0;

/// Compress data read from src, and write the compressed data to dst.
///
/// @param src Uncompressed data to read from
/// @param src_length The length of data which want to be compressed
/// @param dst The target to write compressed data
/// @param dst_length The max length of data
/// @return Length of compressed data
virtual Result<int32_t> Compress(const char* src, int32_t src_length, char* dst,
int32_t dst_length) = 0;

public:
/// We put two integers before each compressed block, the first integer represents the
/// compressed length of the block, and the second one represents the original length of the
/// block.
static constexpr int32_t HEADER_LENGTH = 8;
};
} // namespace paimon
41 changes: 41 additions & 0 deletions src/paimon/common/compression/block_decompressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/compression/block_decompressor.h"

#include "fmt/format.h"
namespace paimon {

int32_t BlockDecompressor::ReadIntLE(const char* buf) {
return static_cast<int32_t>(static_cast<uint32_t>(static_cast<uint8_t>(buf[0])) |
(static_cast<uint32_t>(static_cast<uint8_t>(buf[1])) << 8) |
(static_cast<uint32_t>(static_cast<uint8_t>(buf[2])) << 16) |
(static_cast<uint32_t>(static_cast<uint8_t>(buf[3])) << 24));
}

Status BlockDecompressor::ValidateLength(int32_t compressed_len, int32_t original_len) {
if (original_len < 0 || compressed_len < 0 || (original_len == 0 && compressed_len != 0) ||
(original_len != 0 && compressed_len == 0)) {
return Status::Invalid(
fmt::format("Input is corrupted, compressed_len={}, , original_len={}", compressed_len,
original_len));
}
return Status::OK();
}

} // namespace paimon
Loading