Skip to content

Commit 356efee

Browse files
authored
AVRO-4132: [C++] Add ZSTD codec (#3364)
* AVRO-4132: [C++] Add ZSTD codec * assign value to the enum Codec
1 parent d5d5466 commit 356efee

6 files changed

Lines changed: 196 additions & 6 deletions

File tree

.github/workflows/test-lang-c++-ARM.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
- name: Install dependencies
4545
run: |
4646
sudo apt-get update -q
47-
sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev zlib1g-dev cmake
47+
sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev zlib1g-dev libzstd-dev cmake
4848
4949
- name: Build
5050
run: |

.github/workflows/test-lang-c++.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ jobs:
3939
- uses: actions/checkout@v4
4040

4141
- name: Install Dependencies
42-
run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake
42+
run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev libzstd-dev cmake
4343

4444
- name: Print Versions
4545
run: |

lang/c++/CMakeLists.txt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@ else ()
9898
message("Disabled snappy codec.")
9999
endif ()
100100

101+
find_package(zstd CONFIG)
102+
if(zstd_FOUND)
103+
message("Enabled zstd codec, version: ${zstd_VERSION}")
104+
set(ZSTD_TARGET $<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>)
105+
add_definitions(-DZSTD_CODEC_AVAILABLE)
106+
else()
107+
message("Disabled zstd codec.")
108+
set(ZSTD_TARGET "")
109+
endif()
110+
101111
# FindZLIB guarantees that ZLIB::ZLIB target exists if found
102112
# See https://cmake.org/cmake/help/latest/module/FindZLIB.html#imported-targets
103113
find_package(ZLIB REQUIRED)
@@ -135,18 +145,22 @@ target_link_libraries(avrocpp PUBLIC
135145
$<BUILD_INTERFACE:fmt::fmt-header-only>
136146
$<BUILD_INTERFACE:ZLIB::ZLIB>
137147
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
148+
$<$<BOOL:${zstd_FOUND}>:$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
138149
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
139150
$<INSTALL_INTERFACE:ZLIB::ZLIB>
140151
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
152+
$<$<BOOL:${zstd_FOUND}>:$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
141153
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
142154
)
143155
target_link_libraries(avrocpp_s PUBLIC
144156
$<BUILD_INTERFACE:fmt::fmt-header-only>
145157
$<BUILD_INTERFACE:ZLIB::ZLIB>
146158
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
159+
$<$<BOOL:${zstd_FOUND}>:$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
147160
$<BUILD_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
148161
$<INSTALL_INTERFACE:ZLIB::ZLIB>
149162
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Snappy::snappy>>
163+
$<$<BOOL:${zstd_FOUND}>:$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>>
150164
$<INSTALL_INTERFACE:$<TARGET_NAME_IF_EXISTS:Boost::system>>
151165
)
152166

@@ -205,7 +219,7 @@ if (AVRO_BUILD_TESTS)
205219

206220
macro (unittest name)
207221
add_executable (${name} test/${name}.cc)
208-
target_link_libraries (${name} avrocpp_s Boost::system ZLIB::ZLIB $<TARGET_NAME_IF_EXISTS:Snappy::snappy>)
222+
target_link_libraries (${name} avrocpp_s Boost::system ZLIB::ZLIB $<TARGET_NAME_IF_EXISTS:Snappy::snappy> $<$<BOOL:${zstd_FOUND}>:$<TARGET_NAME_IF_EXISTS:${ZSTD_TARGET}>>)
209223
add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
210224
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
211225
endmacro (unittest)

lang/c++/impl/DataFile.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
#include <snappy.h>
2828
#endif
2929

30+
#ifdef ZSTD_CODEC_AVAILABLE
31+
#include <zstd.h>
32+
#endif
33+
3034
#include <zlib.h>
3135

3236
namespace avro {
@@ -49,6 +53,10 @@ const string AVRO_DEFLATE_CODEC("deflate");
4953
const string AVRO_SNAPPY_CODEC = "snappy";
5054
#endif
5155

56+
#ifdef ZSTD_CODEC_AVAILABLE
57+
const string AVRO_ZSTD_CODEC = "zstd";
58+
#endif
59+
5260
const size_t minSyncInterval = 32;
5361
const size_t maxSyncInterval = 1u << 30;
5462

@@ -100,6 +108,10 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co
100108
#ifdef SNAPPY_CODEC_AVAILABLE
101109
} else if (codec_ == SNAPPY_CODEC) {
102110
setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
111+
#endif
112+
#ifdef ZSTD_CODEC_AVAILABLE
113+
} else if (codec_ == ZSTD_CODEC) {
114+
setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
103115
#endif
104116
} else {
105117
throw Exception("Unknown codec: {}", int(codec));
@@ -216,6 +228,39 @@ void DataFileWriterBase::sync() {
216228
avro::encode(*encoderPtr_, byteCount);
217229
encoderPtr_->flush();
218230
copy(*in, *stream_);
231+
#endif
232+
#ifdef ZSTD_CODEC_AVAILABLE
233+
} else if (codec_ == ZSTD_CODEC) {
234+
// Read all uncompressed data into a single buffer
235+
std::vector<char> uncompressed;
236+
const uint8_t *data;
237+
size_t len;
238+
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
239+
while (input->next(&data, &len)) {
240+
uncompressed.insert(uncompressed.end(), reinterpret_cast<const char *>(data),
241+
reinterpret_cast<const char *>(data) + len);
242+
}
243+
244+
// Pre-allocate buffer for compressed data
245+
size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
246+
std::vector<char> compressed(max_compressed_size);
247+
248+
// Compress the data using ZSTD block API
249+
size_t compressed_size = ZSTD_compress(
250+
compressed.data(), max_compressed_size,
251+
uncompressed.data(), uncompressed.size(),
252+
ZSTD_CLEVEL_DEFAULT);
253+
254+
if (ZSTD_isError(compressed_size)) {
255+
throw Exception("ZSTD compression error: {}", ZSTD_getErrorName(compressed_size));
256+
}
257+
258+
compressed.resize(compressed_size);
259+
std::unique_ptr<InputStream> in = memoryInputStream(
260+
reinterpret_cast<const uint8_t *>(compressed.data()), compressed.size());
261+
avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
262+
encoderPtr_->flush();
263+
copy(*in, *stream_);
219264
#endif
220265
}
221266

@@ -431,6 +476,46 @@ void DataFileReaderBase::readDataBlock() {
431476
dataDecoder_->init(*in);
432477
dataStream_ = std::move(in);
433478
#endif
479+
#ifdef ZSTD_CODEC_AVAILABLE
480+
} else if (codec_ == ZSTD_CODEC) {
481+
compressed_.clear();
482+
const uint8_t *data;
483+
size_t len;
484+
while (st->next(&data, &len)) {
485+
compressed_.insert(compressed_.end(), data, data + len);
486+
}
487+
488+
// Get the decompressed size
489+
size_t decompressed_size = ZSTD_getFrameContentSize(
490+
reinterpret_cast<const char *>(compressed_.data()), compressed_.size());
491+
if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
492+
throw Exception("ZSTD: Not a valid compressed frame");
493+
} else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
494+
throw Exception("ZSTD: Unable to determine decompressed size");
495+
}
496+
497+
// Decompress the data
498+
uncompressed.clear();
499+
uncompressed.resize(decompressed_size);
500+
size_t result = ZSTD_decompress(
501+
uncompressed.data(), decompressed_size,
502+
reinterpret_cast<const char *>(compressed_.data()), compressed_.size());
503+
504+
if (ZSTD_isError(result)) {
505+
throw Exception("ZSTD decompression error: {}", ZSTD_getErrorName(result));
506+
}
507+
if (result != decompressed_size) {
508+
throw Exception("ZSTD: Decompressed size mismatch: expected {}, got {}",
509+
decompressed_size, result);
510+
}
511+
512+
std::unique_ptr<InputStream> in = memoryInputStream(
513+
reinterpret_cast<const uint8_t *>(uncompressed.data()),
514+
uncompressed.size());
515+
516+
dataDecoder_->init(*in);
517+
dataStream_ = std::move(in);
518+
#endif
434519
} else {
435520
compressed_.clear();
436521
uncompressed.clear();
@@ -530,6 +615,11 @@ void DataFileReaderBase::readHeader() {
530615
} else if (it != metadata_.end()
531616
&& toString(it->second) == AVRO_SNAPPY_CODEC) {
532617
codec_ = SNAPPY_CODEC;
618+
#endif
619+
#ifdef ZSTD_CODEC_AVAILABLE
620+
} else if (it != metadata_.end()
621+
&& toString(it->second) == AVRO_ZSTD_CODEC) {
622+
codec_ = ZSTD_CODEC;
533623
#endif
534624
} else {
535625
codec_ = NULL_CODEC;

lang/c++/include/avro/DataFile.hh

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ namespace avro {
3535

3636
/** Specify type of compression to use when writing data files. */
3737
enum Codec {
38-
NULL_CODEC,
39-
DEFLATE_CODEC,
38+
NULL_CODEC = 0,
39+
DEFLATE_CODEC = 1,
4040

4141
#ifdef SNAPPY_CODEC_AVAILABLE
42-
SNAPPY_CODEC
42+
SNAPPY_CODEC = 2,
43+
#endif
44+
45+
#ifdef ZSTD_CODEC_AVAILABLE
46+
ZSTD_CODEC = 3,
4347
#endif
4448

4549
};

lang/c++/test/DataFileTests.cc

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,12 @@ class DataFileTest {
216216
}
217217
#endif
218218

219+
#ifdef ZSTD_CODEC_AVAILABLE
220+
void testWriteWithZstdCodec() {
221+
testWriteWithCodec(avro::ZSTD_CODEC);
222+
}
223+
#endif
224+
219225
void testWriteWithCodec(avro::Codec codec) {
220226
avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100, codec);
221227
int64_t re = 3;
@@ -618,6 +624,39 @@ class DataFileTest {
618624
}
619625
#endif
620626

627+
#ifdef ZSTD_CODEC_AVAILABLE
628+
void testZstd() {
629+
// Add enough objects to span multiple blocks
630+
const size_t number_of_objects = 1000000;
631+
// first create a large file
632+
ValidSchema dschema = avro::compileJsonSchemaFromString(sch);
633+
{
634+
avro::DataFileWriter<ComplexInteger> writer(
635+
filename, dschema, 16 * 1024, avro::ZSTD_CODEC);
636+
637+
for (size_t i = 0; i < number_of_objects; ++i) {
638+
ComplexInteger d;
639+
d.re = i;
640+
d.im = 2 * i;
641+
writer.write(d);
642+
}
643+
}
644+
{
645+
avro::DataFileReader<ComplexInteger> reader(filename, dschema);
646+
std::this_thread::sleep_for(std::chrono::seconds(1));
647+
std::vector<int64_t> found;
648+
ComplexInteger record;
649+
while (reader.read(record)) {
650+
found.push_back(record.re);
651+
}
652+
BOOST_CHECK_EQUAL(found.size(), number_of_objects);
653+
for (unsigned int i = 0; i < found.size(); ++i) {
654+
BOOST_CHECK_EQUAL(found[i], i);
655+
}
656+
}
657+
}
658+
#endif
659+
621660
void testSchemaReadWrite() {
622661
uint32_t a = 42;
623662
{
@@ -792,6 +831,13 @@ void testSkipStringSnappyCodec() {
792831
}
793832
#endif
794833

834+
#ifdef ZSTD_CODEC_AVAILABLE
835+
void testSkipStringZstdCodec() {
836+
BOOST_TEST_CHECKPOINT(__func__);
837+
testSkipString(avro::ZSTD_CODEC);
838+
}
839+
#endif
840+
795841
struct TestRecord {
796842
std::string s1;
797843
int64_t id;
@@ -1005,6 +1051,13 @@ void testLastSyncSnappyCodec() {
10051051
}
10061052
#endif
10071053

1054+
#ifdef ZSTD_CODEC_AVAILABLE
1055+
void testLastSyncZstdCodec() {
1056+
BOOST_TEST_CHECKPOINT(__func__);
1057+
testLastSync(avro::ZSTD_CODEC);
1058+
}
1059+
#endif
1060+
10081061
void testReadRecordEfficientlyUsingLastSyncNullCodec() {
10091062
BOOST_TEST_CHECKPOINT(__func__);
10101063
testReadRecordEfficientlyUsingLastSync(avro::NULL_CODEC);
@@ -1022,6 +1075,13 @@ void testReadRecordEfficientlyUsingLastSyncSnappyCodec() {
10221075
}
10231076
#endif
10241077

1078+
#ifdef ZSTD_CODEC_AVAILABLE
1079+
void testReadRecordEfficientlyUsingLastSyncZstdCodec() {
1080+
BOOST_TEST_CHECKPOINT(__func__);
1081+
testReadRecordEfficientlyUsingLastSync(avro::ZSTD_CODEC);
1082+
}
1083+
#endif
1084+
10251085
test_suite *
10261086
init_unit_test_suite(int, char *[]) {
10271087
{
@@ -1055,6 +1115,16 @@ init_unit_test_suite(int, char *[]) {
10551115
addReaderTests(ts, t1);
10561116
boost::unit_test::framework::master_test_suite().add(ts);
10571117
}
1118+
#endif
1119+
#ifdef ZSTD_CODEC_AVAILABLE
1120+
{
1121+
auto *ts = BOOST_TEST_SUITE("DataFile tests: test1.zstd.df");
1122+
shared_ptr<DataFileTest> t1(new DataFileTest("test1.zstd.df", sch, isch));
1123+
ts->add(BOOST_CLASS_TEST_CASE(
1124+
&DataFileTest::testWriteWithZstdCodec, t1));
1125+
addReaderTests(ts, t1);
1126+
boost::unit_test::framework::master_test_suite().add(ts);
1127+
}
10581128
#endif
10591129
{
10601130
auto *ts = BOOST_TEST_SUITE("DataFile tests: test2.df");
@@ -1101,6 +1171,9 @@ init_unit_test_suite(int, char *[]) {
11011171
shared_ptr<DataFileTest> t8(new DataFileTest("test8.df", dsch, dblsch));
11021172
#ifdef SNAPPY_CODEC_AVAILABLE
11031173
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8));
1174+
#endif
1175+
#ifdef ZSTD_CODEC_AVAILABLE
1176+
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZstd, t8));
11041177
#endif
11051178
boost::unit_test::framework::master_test_suite().add(ts);
11061179
}
@@ -1165,18 +1238,27 @@ init_unit_test_suite(int, char *[]) {
11651238
#ifdef SNAPPY_CODEC_AVAILABLE
11661239
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringSnappyCodec));
11671240
#endif
1241+
#ifdef ZSTD_CODEC_AVAILABLE
1242+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringZstdCodec));
1243+
#endif
11681244

11691245
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncNullCodec));
11701246
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
11711247
#ifdef SNAPPY_CODEC_AVAILABLE
11721248
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncSnappyCodec));
11731249
#endif
1250+
#ifdef ZSTD_CODEC_AVAILABLE
1251+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncZstdCodec));
1252+
#endif
11741253

11751254
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncNullCodec));
11761255
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncDeflateCodec));
11771256
#ifdef SNAPPY_CODEC_AVAILABLE
11781257
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncSnappyCodec));
11791258
#endif
1259+
#ifdef ZSTD_CODEC_AVAILABLE
1260+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncZstdCodec));
1261+
#endif
11801262

11811263
return nullptr;
11821264
}

0 commit comments

Comments
 (0)