Skip to content

Commit 6842191

Browse files
authored
AVRO-4174: [C++] Support read and write key-value metadata (#3468)
* AVRO-4174: [C++] Support read and write key-value metadata * refine docstring
1 parent 78e7124 commit 6842191

3 files changed

Lines changed: 200 additions & 27 deletions

File tree

lang/c++/impl/DataFile.cc

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -66,30 +66,33 @@ const size_t zlibBufGrowSize = 128 * 1024;
6666
} // namespace
6767

6868
DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval,
69-
Codec codec) : filename_(filename),
70-
schema_(schema),
71-
encoderPtr_(binaryEncoder()),
72-
syncInterval_(syncInterval),
73-
codec_(codec),
74-
stream_(fileOutputStream(filename)),
75-
buffer_(memoryOutputStream()),
76-
sync_(makeSync()),
77-
objectCount_(0),
78-
lastSync_(0) {
69+
Codec codec, const Metadata &metadata) : filename_(filename),
70+
schema_(schema),
71+
encoderPtr_(binaryEncoder()),
72+
syncInterval_(syncInterval),
73+
codec_(codec),
74+
stream_(fileOutputStream(filename)),
75+
buffer_(memoryOutputStream()),
76+
sync_(makeSync()),
77+
objectCount_(0),
78+
metadata_(metadata),
79+
lastSync_(0) {
7980
init(schema, syncInterval, codec);
8081
}
8182

8283
DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
83-
const ValidSchema &schema, size_t syncInterval, Codec codec) : filename_(),
84-
schema_(schema),
85-
encoderPtr_(binaryEncoder()),
86-
syncInterval_(syncInterval),
87-
codec_(codec),
88-
stream_(std::move(outputStream)),
89-
buffer_(memoryOutputStream()),
90-
sync_(makeSync()),
91-
objectCount_(0),
92-
lastSync_(0) {
84+
const ValidSchema &schema, size_t syncInterval,
85+
Codec codec, const Metadata &metadata) : filename_(),
86+
schema_(schema),
87+
encoderPtr_(binaryEncoder()),
88+
syncInterval_(syncInterval),
89+
codec_(codec),
90+
stream_(std::move(outputStream)),
91+
buffer_(memoryOutputStream()),
92+
sync_(makeSync()),
93+
objectCount_(0),
94+
metadata_(metadata),
95+
lastSync_(0) {
9396
init(schema, syncInterval, codec);
9497
}
9598

lang/c++/include/avro/DataFile.hh

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,22 @@ const int SyncSize = 16;
5454
*/
5555
typedef std::array<uint8_t, SyncSize> DataFileSync;
5656

57+
/**
58+
* Avro files may include arbitrary user-specified metadata.
59+
* File metadata is written as if defined by the following map schema:
60+
*
61+
* `{"type": "map", "values": "bytes"}`
62+
*
63+
* All metadata properties that start with "avro." are reserved.
64+
* The following file metadata properties are currently used:
65+
*
66+
* - `avro.schema` contains the schema of objects stored in the file, as JSON data (required).
67+
* - `avro.codec`, the name of the compression codec used to compress blocks, as a string.
68+
* Implementations are required to support the following codecs: "null" and "deflate".
69+
* If codec is absent, it is assumed to be "null". See avro.codecs for implementation details.
70+
*/
71+
typedef std::map<std::string, std::vector<uint8_t>> Metadata;
72+
5773
/**
5874
* Type-independent portion of DataFileWriter.
5975
* At any given point in time, at most one file can be written using
@@ -71,8 +87,6 @@ class AVRO_DECL DataFileWriterBase {
7187
const DataFileSync sync_;
7288
int64_t objectCount_;
7389

74-
typedef std::map<std::string, std::vector<uint8_t>> Metadata;
75-
7690
Metadata metadata_;
7791
int64_t lastSync_;
7892

@@ -119,9 +133,11 @@ public:
119133
* Constructs a data file writer with the given sync interval and name.
120134
*/
121135
DataFileWriterBase(const char *filename, const ValidSchema &schema,
122-
size_t syncInterval, Codec codec = NULL_CODEC);
136+
size_t syncInterval, Codec codec = NULL_CODEC,
137+
const Metadata &metadata = {});
123138
DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
124-
const ValidSchema &schema, size_t syncInterval, Codec codec);
139+
const ValidSchema &schema, size_t syncInterval, Codec codec,
140+
const Metadata &metadata = {});
125141

126142
DataFileWriterBase(const DataFileWriterBase &) = delete;
127143
DataFileWriterBase &operator=(const DataFileWriterBase &) = delete;
@@ -156,10 +172,12 @@ public:
156172
* Constructs a new data file.
157173
*/
158174
DataFileWriter(const char *filename, const ValidSchema &schema,
159-
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) {}
175+
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
176+
const Metadata &metadata = {}) : base_(std::make_unique<DataFileWriterBase>(filename, schema, syncInterval, codec, metadata)) {}
160177

161178
DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
162-
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) {}
179+
size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC,
180+
const Metadata &metadata = {}) : base_(std::make_unique<DataFileWriterBase>(std::move(outputStream), schema, syncInterval, codec, metadata)) {}
163181

164182
DataFileWriter(const DataFileWriter &) = delete;
165183
DataFileWriter &operator=(const DataFileWriter &) = delete;
@@ -212,7 +230,6 @@ class AVRO_DECL DataFileReaderBase {
212230
ValidSchema dataSchema_;
213231
DecoderPtr dataDecoder_;
214232
std::unique_ptr<InputStream> dataStream_;
215-
typedef std::map<std::string, std::vector<uint8_t>> Metadata;
216233

217234
Metadata metadata_;
218235
DataFileSync sync_{};
@@ -306,6 +323,11 @@ public:
306323
* Return the last synchronization point before our current position.
307324
*/
308325
int64_t previousSync() const;
326+
327+
/**
328+
* Return the metadata for the data file.
329+
*/
330+
const Metadata &metadata() const { return metadata_; }
309331
};
310332

311333
/**
@@ -421,6 +443,11 @@ public:
421443
* Return the last synchronization point before our current position.
422444
*/
423445
int64_t previousSync() { return base_->previousSync(); }
446+
447+
/**
448+
* Return the metadata for the data file.
449+
*/
450+
const Metadata &metadata() const { return base_->metadata(); }
424451
};
425452

426453
} // namespace avro

lang/c++/test/DataFileTests.cc

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,60 @@ class DataFileTest {
716716
ComplexDouble unused;
717717
BOOST_CHECK_NO_THROW(df.write(unused)); // write has not effect on closed stream
718718
}
719+
720+
void testMetadata() {
721+
avro::Metadata customMetadata;
722+
std::string key1 = "author";
723+
std::string value1 = "test-user";
724+
customMetadata[key1] = std::vector<uint8_t>(value1.begin(), value1.end());
725+
726+
std::string key2 = "version";
727+
std::string value2 = "1.0.0";
728+
customMetadata[key2] = std::vector<uint8_t>(value2.begin(), value2.end());
729+
730+
std::string key3 = "description";
731+
std::string value3 = "Test file with custom metadata";
732+
customMetadata[key3] = std::vector<uint8_t>(value3.begin(), value3.end());
733+
734+
// Write data with custom metadata
735+
{
736+
avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100, avro::NULL_CODEC, customMetadata);
737+
int64_t re = 10;
738+
int64_t im = 20;
739+
for (int i = 0; i < 5; ++i, re += 5, im += 10) {
740+
ComplexInteger c(re, im);
741+
df.write(c);
742+
}
743+
df.close();
744+
}
745+
746+
// Read and verify metadata
747+
{
748+
avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
749+
const avro::Metadata &readMetadata = df.metadata();
750+
751+
// Check that our custom metadata is present
752+
auto it1 = readMetadata.find(key1);
753+
BOOST_CHECK(it1 != readMetadata.end());
754+
BOOST_CHECK_EQUAL(std::string(it1->second.begin(), it1->second.end()), value1);
755+
756+
auto it2 = readMetadata.find(key2);
757+
BOOST_CHECK(it2 != readMetadata.end());
758+
BOOST_CHECK_EQUAL(std::string(it2->second.begin(), it2->second.end()), value2);
759+
760+
auto it3 = readMetadata.find(key3);
761+
BOOST_CHECK(it3 != readMetadata.end());
762+
BOOST_CHECK_EQUAL(std::string(it3->second.begin(), it3->second.end()), value3);
763+
764+
// Check that standard metadata is also present
765+
auto schemaIt = readMetadata.find("avro.schema");
766+
BOOST_CHECK(schemaIt != readMetadata.end());
767+
768+
auto codecIt = readMetadata.find("avro.codec");
769+
BOOST_CHECK(codecIt != readMetadata.end());
770+
BOOST_CHECK_EQUAL(std::string(codecIt->second.begin(), codecIt->second.end()), "null");
771+
}
772+
}
719773
};
720774

721775
void addReaderTests(test_suite *ts, const shared_ptr<DataFileTest> &t) {
@@ -1082,6 +1136,79 @@ void testReadRecordEfficientlyUsingLastSyncZstdCodec() {
10821136
}
10831137
#endif
10841138

1139+
void testMetadataWithCodec(avro::Codec codec) {
1140+
const char *filename = "test_metadata_codec.df";
1141+
avro::ValidSchema schema = avro::compileJsonSchemaFromString(sch);
1142+
1143+
avro::Metadata customMetadata;
1144+
std::string key1 = "test.key1";
1145+
std::string value1 = "test-value-1";
1146+
customMetadata[key1] = std::vector<uint8_t>(value1.begin(), value1.end());
1147+
1148+
std::string key2 = "test.key2";
1149+
std::string value2 = "test-value-2-with-special-chars: !@#$%^&*()";
1150+
customMetadata[key2] = std::vector<uint8_t>(value2.begin(), value2.end());
1151+
1152+
// Write data with custom metadata
1153+
{
1154+
avro::DataFileWriter<ComplexInteger> writer(filename, schema, 100, codec, customMetadata);
1155+
for (int i = 0; i < 10; ++i) {
1156+
ComplexInteger c(i * 2, i * 3);
1157+
writer.write(c);
1158+
}
1159+
writer.close();
1160+
}
1161+
1162+
// Read and verify metadata
1163+
{
1164+
avro::DataFileReader<ComplexInteger> reader(filename, schema);
1165+
const avro::Metadata &readMetadata = reader.metadata();
1166+
1167+
// Verify custom metadata
1168+
auto it1 = readMetadata.find(key1);
1169+
BOOST_CHECK(it1 != readMetadata.end());
1170+
BOOST_CHECK_EQUAL(std::string(it1->second.begin(), it1->second.end()), value1);
1171+
1172+
auto it2 = readMetadata.find(key2);
1173+
BOOST_CHECK(it2 != readMetadata.end());
1174+
BOOST_CHECK_EQUAL(std::string(it2->second.begin(), it2->second.end()), value2);
1175+
1176+
// Verify standard metadata
1177+
auto schemaIt = readMetadata.find("avro.schema");
1178+
BOOST_CHECK(schemaIt != readMetadata.end());
1179+
1180+
auto codecIt = readMetadata.find("avro.codec");
1181+
BOOST_CHECK(codecIt != readMetadata.end());
1182+
}
1183+
1184+
// Clean up
1185+
std::filesystem::remove(filename);
1186+
}
1187+
1188+
void testMetadataWithNullCodec() {
1189+
BOOST_TEST_CHECKPOINT(__func__);
1190+
testMetadataWithCodec(avro::NULL_CODEC);
1191+
}
1192+
1193+
void testMetadataWithDeflateCodec() {
1194+
BOOST_TEST_CHECKPOINT(__func__);
1195+
testMetadataWithCodec(avro::DEFLATE_CODEC);
1196+
}
1197+
1198+
#ifdef SNAPPY_CODEC_AVAILABLE
1199+
void testMetadataWithSnappyCodec() {
1200+
BOOST_TEST_CHECKPOINT(__func__);
1201+
testMetadataWithCodec(avro::SNAPPY_CODEC);
1202+
}
1203+
#endif
1204+
1205+
#ifdef ZSTD_CODEC_AVAILABLE
1206+
void testMetadataWithZstdCodec() {
1207+
BOOST_TEST_CHECKPOINT(__func__);
1208+
testMetadataWithCodec(avro::ZSTD_CODEC);
1209+
}
1210+
#endif
1211+
10851212
test_suite *
10861213
init_unit_test_suite(int, char *[]) {
10871214
{
@@ -1232,6 +1359,13 @@ init_unit_test_suite(int, char *[]) {
12321359
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
12331360
boost::unit_test::framework::master_test_suite().add(ts);
12341361
}
1362+
{
1363+
auto *ts = BOOST_TEST_SUITE("DataFile tests: test15.df");
1364+
shared_ptr<DataFileTest> t(new DataFileTest("test15.df", sch, isch));
1365+
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testMetadata, t));
1366+
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
1367+
boost::unit_test::framework::master_test_suite().add(ts);
1368+
}
12351369

12361370
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringNullCodec));
12371371
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringDeflateCodec));
@@ -1260,5 +1394,14 @@ init_unit_test_suite(int, char *[]) {
12601394
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncZstdCodec));
12611395
#endif
12621396

1397+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithNullCodec));
1398+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithDeflateCodec));
1399+
#ifdef SNAPPY_CODEC_AVAILABLE
1400+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithSnappyCodec));
1401+
#endif
1402+
#ifdef ZSTD_CODEC_AVAILABLE
1403+
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testMetadataWithZstdCodec));
1404+
#endif
1405+
12631406
return nullptr;
12641407
}

0 commit comments

Comments
 (0)