Skip to content

Commit 11fb555

Browse files
authored
AVRO-4222: [C++] Support writer to specify compression level (#3610)
* AVRO-4222: [C++] Support writer to specify compression level * address feedback
1 parent 5a3a308 commit 11fb555

5 files changed

Lines changed: 423 additions & 79 deletions

File tree

lang/c++/impl/DataFile.cc

Lines changed: 176 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -47,53 +47,183 @@ using std::array;
4747
namespace {
4848
const string AVRO_SCHEMA_KEY("avro.schema");
4949
const string AVRO_CODEC_KEY("avro.codec");
50-
const string AVRO_NULL_CODEC("null");
51-
const string AVRO_DEFLATE_CODEC("deflate");
5250

51+
const size_t minSyncInterval = 32;
52+
const size_t maxSyncInterval = 1u << 30;
53+
54+
// Recommended by https://www.zlib.net/zlib_how.html
55+
const size_t zlibBufGrowSize = 128 * 1024;
56+
57+
template<Codec codec>
58+
struct codec_trait {
59+
static std::string name() {
60+
throw Exception("Unsupported codec: {}", static_cast<int>(codec));
61+
}
62+
static void validate(std::optional<int> level) {
63+
throw Exception("Unsupported codec: {}", static_cast<int>(codec));
64+
}
65+
static bool available() {
66+
throw Exception("Unsupported codec: {}", static_cast<int>(codec));
67+
}
68+
};
69+
70+
template<>
71+
struct codec_trait<NULL_CODEC> {
72+
static std::string name() {
73+
return "null";
74+
}
75+
static void validate(std::optional<int> /*level*/) {}
76+
static bool available() {
77+
return true;
78+
}
79+
};
80+
81+
template<>
82+
struct codec_trait<DEFLATE_CODEC> {
83+
static std::string name() {
84+
return "deflate";
85+
}
86+
87+
static void validate(std::optional<int> level) {
88+
if (!level.has_value()) {
89+
return;
90+
}
91+
int levelValue = level.value();
92+
if (levelValue < 0 || levelValue > 9) {
93+
throw Exception("Invalid compression level {} for deflate codec. "
94+
"Valid range is 0-9.",
95+
levelValue);
96+
}
97+
}
98+
99+
static bool available() {
100+
return true;
101+
}
102+
};
103+
104+
template<>
105+
struct codec_trait<SNAPPY_CODEC> {
106+
static std::string name() {
107+
return "snappy";
108+
}
109+
110+
static void validate(std::optional<int> /*level*/) {
111+
}
112+
113+
static bool available() {
53114
#ifdef SNAPPY_CODEC_AVAILABLE
54-
const string AVRO_SNAPPY_CODEC = "snappy";
115+
return true;
116+
#else
117+
return false;
55118
#endif
119+
}
120+
};
121+
122+
template<>
123+
struct codec_trait<ZSTD_CODEC> {
124+
static std::string name() {
125+
return "zstandard";
126+
}
56127

128+
static void validate(std::optional<int> level) {
129+
if (!level.has_value()) {
130+
return;
131+
}
132+
int levelValue = level.value();
133+
if (levelValue < 1 || levelValue > 22) {
134+
throw Exception("Invalid compression level {} for zstandard codec. "
135+
"Valid range is 1-22.",
136+
levelValue);
137+
}
138+
}
139+
140+
static bool available() {
57141
#ifdef ZSTD_CODEC_AVAILABLE
58-
const string AVRO_ZSTD_CODEC = "zstandard";
142+
return true;
143+
#else
144+
return false;
59145
#endif
146+
}
147+
};
60148

61-
const size_t minSyncInterval = 32;
62-
const size_t maxSyncInterval = 1u << 30;
149+
#define DISPATCH_CODEC_FUNC(codec, func, ...) \
150+
switch (codec) { \
151+
case NULL_CODEC: \
152+
return codec_trait<NULL_CODEC>::func(__VA_ARGS__); \
153+
case DEFLATE_CODEC: \
154+
return codec_trait<DEFLATE_CODEC>::func(__VA_ARGS__); \
155+
case SNAPPY_CODEC: \
156+
return codec_trait<SNAPPY_CODEC>::func(__VA_ARGS__); \
157+
case ZSTD_CODEC: \
158+
return codec_trait<ZSTD_CODEC>::func(__VA_ARGS__); \
159+
default: \
160+
throw Exception("Unknown codec: {}", static_cast<int>(codec)); \
161+
}
63162

64-
// Recommended by https://www.zlib.net/zlib_how.html
65-
const size_t zlibBufGrowSize = 128 * 1024;
163+
std::string getCodecName(Codec codec) {
164+
DISPATCH_CODEC_FUNC(codec, name);
165+
}
166+
167+
void validateCodec(Codec codec, std::optional<int> level) {
168+
if (!isCodecAvailable(codec)) {
169+
throw Exception("Codec {} is not available.", getCodecName(codec));
170+
}
171+
DISPATCH_CODEC_FUNC(codec, validate, level);
172+
}
173+
174+
Codec getCodec(const std::string &name) {
175+
if (name == codec_trait<NULL_CODEC>::name()) {
176+
return NULL_CODEC;
177+
} else if (name == codec_trait<DEFLATE_CODEC>::name()) {
178+
return DEFLATE_CODEC;
179+
} else if (name == codec_trait<SNAPPY_CODEC>::name()) {
180+
return SNAPPY_CODEC;
181+
} else if (name == codec_trait<ZSTD_CODEC>::name()) {
182+
return ZSTD_CODEC;
183+
} else {
184+
throw Exception("Unknown codec name: {}", name);
185+
}
186+
}
66187

67188
} // namespace
68189

190+
bool isCodecAvailable(Codec codec) {
191+
DISPATCH_CODEC_FUNC(codec, available);
192+
}
193+
194+
#undef DISPATCH_CODEC_FUNC
195+
69196
DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval,
70-
Codec codec, const Metadata &metadata) : filename_(filename),
71-
schema_(schema),
72-
encoderPtr_(binaryEncoder()),
73-
syncInterval_(syncInterval),
74-
codec_(codec),
75-
stream_(fileOutputStream(filename)),
76-
buffer_(memoryOutputStream()),
77-
sync_(makeSync()),
78-
objectCount_(0),
79-
metadata_(metadata),
80-
lastSync_(0) {
197+
Codec codec, const Metadata &metadata,
198+
std::optional<int> compressionLevel) : filename_(filename),
199+
schema_(schema),
200+
encoderPtr_(binaryEncoder()),
201+
syncInterval_(syncInterval),
202+
codec_(codec),
203+
compressionLevel_(compressionLevel),
204+
stream_(fileOutputStream(filename)),
205+
buffer_(memoryOutputStream()),
206+
sync_(makeSync()),
207+
objectCount_(0),
208+
metadata_(metadata),
209+
lastSync_(0) {
81210
init(schema, syncInterval, codec);
82211
}
83212

84-
DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
85-
const ValidSchema &schema, size_t syncInterval,
86-
Codec codec, const Metadata &metadata) : filename_(),
87-
schema_(schema),
88-
encoderPtr_(binaryEncoder()),
89-
syncInterval_(syncInterval),
90-
codec_(codec),
91-
stream_(std::move(outputStream)),
92-
buffer_(memoryOutputStream()),
93-
sync_(makeSync()),
94-
objectCount_(0),
95-
metadata_(metadata),
96-
lastSync_(0) {
213+
DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
214+
size_t syncInterval, Codec codec, const Metadata &metadata,
215+
std::optional<int> compressionLevel) : filename_(),
216+
schema_(schema),
217+
encoderPtr_(binaryEncoder()),
218+
syncInterval_(syncInterval),
219+
codec_(codec),
220+
compressionLevel_(compressionLevel),
221+
stream_(std::move(outputStream)),
222+
buffer_(memoryOutputStream()),
223+
sync_(makeSync()),
224+
objectCount_(0),
225+
metadata_(metadata),
226+
lastSync_(0) {
97227
init(schema, syncInterval, codec);
98228
}
99229

@@ -103,23 +233,9 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co
103233
"Invalid sync interval: {}. Should be between {} and {}",
104234
syncInterval, minSyncInterval, maxSyncInterval);
105235
}
106-
setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
107236

108-
if (codec_ == NULL_CODEC) {
109-
setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
110-
} else if (codec_ == DEFLATE_CODEC) {
111-
setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
112-
#ifdef SNAPPY_CODEC_AVAILABLE
113-
} else if (codec_ == SNAPPY_CODEC) {
114-
setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
115-
#endif
116-
#ifdef ZSTD_CODEC_AVAILABLE
117-
} else if (codec_ == ZSTD_CODEC) {
118-
setMetadata(AVRO_CODEC_KEY, AVRO_ZSTD_CODEC);
119-
#endif
120-
} else {
121-
throw Exception("Unknown codec: {}", int(codec));
122-
}
237+
validateCodec(codec, compressionLevel_);
238+
setMetadata(AVRO_CODEC_KEY, getCodecName(codec));
123239
setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
124240

125241
writeHeader();
@@ -160,7 +276,10 @@ void DataFileWriterBase::sync() {
160276
zs.zfree = Z_NULL;
161277
zs.opaque = Z_NULL;
162278

163-
int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
279+
// Use Z_DEFAULT_COMPRESSION if no level specified
280+
int effectiveLevel = compressionLevel_.value_or(Z_DEFAULT_COMPRESSION);
281+
282+
int ret = deflateInit2(&zs, effectiveLevel, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
164283
if (ret != Z_OK) {
165284
throw Exception("Failed to initialize deflate, error: {}", ret);
166285
}
@@ -246,7 +365,7 @@ void DataFileWriterBase::sync() {
246365
}
247366

248367
ZstdCompressWrapper zstdCompressWrapper;
249-
std::vector<char> compressed = zstdCompressWrapper.compress(uncompressed);
368+
std::vector<char> compressed = zstdCompressWrapper.compress(uncompressed, compressionLevel_);
250369

251370
std::unique_ptr<InputStream> in = memoryInputStream(
252371
reinterpret_cast<const uint8_t *>(compressed.data()), compressed.size());
@@ -580,23 +699,16 @@ void DataFileReaderBase::readHeader() {
580699
readerSchema_ = dataSchema();
581700
}
582701

702+
// Parse codec from metadata using codec_trait
583703
it = metadata_.find(AVRO_CODEC_KEY);
584-
if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
585-
codec_ = DEFLATE_CODEC;
586-
#ifdef SNAPPY_CODEC_AVAILABLE
587-
} else if (it != metadata_.end()
588-
&& toString(it->second) == AVRO_SNAPPY_CODEC) {
589-
codec_ = SNAPPY_CODEC;
590-
#endif
591-
#ifdef ZSTD_CODEC_AVAILABLE
592-
} else if (it != metadata_.end() && toString(it->second) == AVRO_ZSTD_CODEC) {
593-
codec_ = ZSTD_CODEC;
594-
#endif
704+
if (it != metadata_.end()) {
705+
const auto codecName = toString(it->second);
706+
codec_ = getCodec(codecName);
707+
if (!isCodecAvailable(codec_)) {
708+
throw Exception("Codec {} is not available.", codecName);
709+
}
595710
} else {
596711
codec_ = NULL_CODEC;
597-
if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
598-
throw Exception("Unknown codec in data file: " + toString(it->second));
599-
}
600712
}
601713

602714
avro::decode(*decoder_, sync_);

lang/c++/impl/ZstdCompressWrapper.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
namespace avro {
2727

28-
std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> &uncompressed) {
28+
std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> &uncompressed, std::optional<int> compressionLevel) {
2929
// Pre-allocate buffer for compressed data
3030
size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
3131
if (ZSTD_isError(max_compressed_size)) {
@@ -37,7 +37,7 @@ std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> &uncompr
3737
size_t compressed_size = ZSTD_compress(
3838
compressed.data(), max_compressed_size,
3939
uncompressed.data(), uncompressed.size(),
40-
ZSTD_CLEVEL_DEFAULT);
40+
compressionLevel.value_or(ZSTD_CLEVEL_DEFAULT));
4141

4242
if (ZSTD_isError(compressed_size)) {
4343
throw Exception("ZSTD compression error: {}", ZSTD_getErrorName(compressed_size));

lang/c++/impl/ZstdCompressWrapper.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#ifdef ZSTD_CODEC_AVAILABLE
2323

24+
#include <optional>
2425
#include <vector>
2526

2627
#include <zstd.h>
@@ -32,7 +33,7 @@ public:
3233
ZstdCompressWrapper();
3334
~ZstdCompressWrapper();
3435

35-
std::vector<char> compress(const std::vector<char> &uncompressed);
36+
std::vector<char> compress(const std::vector<char> &uncompressed, std::optional<int> compressionLevel = std::nullopt);
3637

3738
private:
3839
ZSTD_CCtx *cctx_ = nullptr;

lang/c++/include/avro/DataFile.hh

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include <array>
3030
#include <map>
31+
#include <optional>
3132
#include <string>
3233
#include <vector>
3334

@@ -37,17 +38,15 @@ namespace avro {
3738
enum Codec {
3839
NULL_CODEC = 0,
3940
DEFLATE_CODEC = 1,
40-
41-
#ifdef SNAPPY_CODEC_AVAILABLE
4241
SNAPPY_CODEC = 2,
43-
#endif
44-
45-
#ifdef ZSTD_CODEC_AVAILABLE
46-
ZSTD_CODEC = 3,
47-
#endif
48-
42+
ZSTD_CODEC = 3
4943
};
5044

45+
/**
46+
* Returns true if the specified codec is available at runtime.
47+
*/
48+
AVRO_DECL bool isCodecAvailable(Codec codec);
49+
5150
const int SyncSize = 16;
5251
/**
5352
* The sync value.
@@ -81,6 +80,7 @@ class AVRO_DECL DataFileWriterBase {
8180
const EncoderPtr encoderPtr_;
8281
const size_t syncInterval_;
8382
Codec codec_;
83+
std::optional<int> compressionLevel_;
8484

8585
std::unique_ptr<OutputStream> stream_;
8686
std::unique_ptr<OutputStream> buffer_;
@@ -134,10 +134,10 @@ public:
134134
*/
135135
DataFileWriterBase(const char *filename, const ValidSchema &schema,
136136
size_t syncInterval, Codec codec = NULL_CODEC,
137-
const Metadata &metadata = {});
137+
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt);
138138
DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
139139
const ValidSchema &schema, size_t syncInterval, Codec codec,
140-
const Metadata &metadata = {});
140+
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt);
141141

142142
DataFileWriterBase(const DataFileWriterBase &) = delete;
143143
DataFileWriterBase &operator=(const DataFileWriterBase &) = delete;
@@ -173,11 +173,13 @@ public:
173173
*/
174174
DataFileWriter(const char *filename, const ValidSchema &schema,
175175
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
176-
const Metadata &metadata = {}) : base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval, codec, metadata)) {}
176+
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt)
177+
: base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval, codec, metadata, compressionLevel)) {}
177178

178179
DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
179180
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
180-
const Metadata &metadata = {}) : base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema, syncInterval, codec, metadata)) {}
181+
const Metadata &metadata = {}, std::optional<int> compressionLevel = std::nullopt)
182+
: base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema, syncInterval, codec, metadata, compressionLevel)) {}
181183

182184
DataFileWriter(const DataFileWriter &) = delete;
183185
DataFileWriter &operator=(const DataFileWriter &) = delete;

0 commit comments

Comments
 (0)