Skip to content

Commit 757f8c7

Browse files
committed
feedback from the c++ PR
1 parent 9e71cc2 commit 757f8c7

4 files changed

Lines changed: 282 additions & 227 deletions

File tree

parquet/src/file/metadata/flatbuf/converter.rs

Lines changed: 99 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@
1919
//!
2020
//! This module provides functionality to convert Parquet metadata to/from
2121
//! the FlatBuffers format defined in `parquet3.fbs`.
22+
//!
23+
//! The FlatBuffers format packs statistics into integral fields for efficient
24+
//! storage and zero-copy access. See [`pack_statistics`] and [`unpack_statistics`]
25+
//! for the encoding scheme.
26+
//!
27+
//! Related:
28+
//! - Design doc: <https://github.com/apache/parquet-format/pull/544>
29+
//! - C++ implementation: <https://github.com/apache/arrow/pull/48431>
2230
2331
use std::sync::Arc;
2432

@@ -30,6 +38,7 @@ use crate::file::metadata::{
3038
ColumnChunkMetaData, ColumnChunkMetaDataBuilder, FileMetaData, KeyValue,
3139
ParquetMetaData, RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
3240
};
41+
use crate::data_type::{ByteArray, FixedLenByteArray, Int96};
3342
use crate::file::statistics::Statistics;
3443
use crate::schema::types::{
3544
ColumnDescriptor, SchemaDescPtr, SchemaDescriptor, Type as SchemaType,
@@ -252,22 +261,26 @@ fn unpack_statistics(
252261
}
253262
}
254263

255-
/// Converter from Parquet metadata to FlatBuffers format
256-
pub struct ThriftToFlatBufferConverter<'a> {
264+
/// Converts [`ParquetMetaData`] to the FlatBuffers wire format.
265+
///
266+
/// This serializes the schema, row groups, column chunks, statistics, and
267+
/// key-value metadata into a single FlatBuffer. The result can be embedded
268+
/// into a Parquet footer via [`append_flatbuffer`].
269+
pub(super) struct ThriftToFlatBufferConverter<'a> {
257270
metadata: &'a ParquetMetaData,
258271
builder: FlatBufferBuilder<'a>,
259272
}
260273

261274
impl<'a> ThriftToFlatBufferConverter<'a> {
262-
/// Create a new converter from ParquetMetaData
275+
/// Create a new converter from [`ParquetMetaData`].
263276
pub fn new(metadata: &'a ParquetMetaData) -> Self {
264277
Self {
265278
metadata,
266279
builder: FlatBufferBuilder::with_capacity(64 * 1024),
267280
}
268281
}
269282

270-
/// Convert the metadata to FlatBuffers format and return the serialized bytes
283+
/// Convert the metadata to FlatBuffers format and return the serialized bytes.
271284
pub fn convert(mut self) -> Vec<u8> {
272285
let file_metadata = self.metadata.file_metadata();
273286
let schema_descr = file_metadata.schema_descr();
@@ -314,6 +327,7 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
314327
self.builder.finished_data().to_vec()
315328
}
316329

330+
/// Build the flattened schema element vector from the schema descriptor.
317331
fn build_schema(
318332
&mut self,
319333
schema_descr: &SchemaDescriptor,
@@ -324,6 +338,7 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
324338
elements
325339
}
326340

341+
/// Recursively build a schema element and its children.
327342
fn build_schema_element(
328343
&mut self,
329344
schema_type: &SchemaType,
@@ -410,6 +425,7 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
410425
}
411426
}
412427

428+
/// Convert a Parquet logical type to its FlatBuffer union representation.
413429
fn build_logical_type(
414430
&mut self,
415431
logical_type: Option<&LogicalType>,
@@ -436,9 +452,9 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
436452
LogicalType::Decimal { precision, scale } => (
437453
fb::LogicalType::DecimalType,
438454
Some(
439-
fb::DecimalOpts::create(
455+
fb::DecimalOptions::create(
440456
&mut self.builder,
441-
&fb::DecimalOptsArgs {
457+
&fb::DecimalOptionsArgs {
442458
precision: *precision,
443459
scale: *scale,
444460
},
@@ -456,9 +472,9 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
456472
} => (
457473
fb::LogicalType::TimeType,
458474
Some(
459-
fb::TimeOpts::create(
475+
fb::TimeOptions::create(
460476
&mut self.builder,
461-
&fb::TimeOptsArgs {
477+
&fb::TimeOptionsArgs {
462478
is_adjusted_to_utc: *is_adjusted_to_u_t_c,
463479
unit: convert_time_unit_to_fb(unit),
464480
},
@@ -472,9 +488,9 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
472488
} => (
473489
fb::LogicalType::TimestampType,
474490
Some(
475-
fb::TimeOpts::create(
491+
fb::TimeOptions::create(
476492
&mut self.builder,
477-
&fb::TimeOptsArgs {
493+
&fb::TimeOptionsArgs {
478494
is_adjusted_to_utc: *is_adjusted_to_u_t_c,
479495
unit: convert_time_unit_to_fb(unit),
480496
},
@@ -488,9 +504,9 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
488504
} => (
489505
fb::LogicalType::IntType,
490506
Some(
491-
fb::IntOpts::create(
507+
fb::IntOptions::create(
492508
&mut self.builder,
493-
&fb::IntOptsArgs {
509+
&fb::IntOptionsArgs {
494510
bit_width: *bit_width as i8,
495511
is_signed: *is_signed,
496512
},
@@ -557,6 +573,7 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
557573
}
558574
}
559575

576+
/// Build a row group, including its column chunks and sorting columns.
560577
fn build_row_group(
561578
&mut self,
562579
rg: &RowGroupMetaData,
@@ -590,11 +607,11 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
590607
sorting_columns,
591608
file_offset: rg.file_offset().unwrap_or(0),
592609
total_compressed_size,
593-
ordinal: rg.ordinal(),
594610
},
595611
)
596612
}
597613

614+
/// Build a column chunk with its metadata.
598615
fn build_column_chunk(
599616
&mut self,
600617
cc: &ColumnChunkMetaData,
@@ -614,6 +631,7 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
614631
)
615632
}
616633

634+
/// Build column metadata including compression, offsets, and packed statistics.
617635
fn build_column_metadata(
618636
&mut self,
619637
cc: &ColumnChunkMetaData,
@@ -657,6 +675,7 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
657675
)
658676
}
659677

678+
/// Pack statistics into the FlatBuffer format using [`pack_statistics`].
660679
fn build_statistics(
661680
&mut self,
662681
stats: &Statistics,
@@ -714,25 +733,28 @@ impl<'a> ThriftToFlatBufferConverter<'a> {
714733
)
715734
}
716735

717-
fn build_kv(&mut self, kv: &KeyValue) -> WIPOffset<fb::KV<'a>> {
736+
fn build_kv(&mut self, kv: &KeyValue) -> WIPOffset<fb::KeyValue<'a>> {
718737
let key = self.builder.create_string(&kv.key);
719738
let val = kv.value.as_ref().map(|v| self.builder.create_string(v));
720739

721-
fb::KV::create(
740+
fb::KeyValue::create(
722741
&mut self.builder,
723-
&fb::KVArgs {
742+
&fb::KeyValueArgs {
724743
key: Some(key),
725744
val,
726745
},
727746
)
728747
}
729748
}
730749

731-
/// Converter from FlatBuffers format back to Parquet metadata
732-
pub struct FlatBufferConverter;
750+
/// Converts FlatBuffers wire format back to [`ParquetMetaData`].
751+
///
752+
/// Requires a pre-existing [`SchemaDescPtr`] since the FlatBuffers format
753+
/// relies on the schema to interpret column physical types and statistics.
754+
pub(super) struct FlatBufferConverter;
733755

734756
impl FlatBufferConverter {
735-
/// Parse FlatBuffers metadata from bytes and convert to ParquetMetaData
757+
/// Parse FlatBuffers bytes and reconstruct [`ParquetMetaData`].
736758
pub fn convert(buf: &[u8], schema_descr: SchemaDescPtr) -> Result<ParquetMetaData> {
737759
let fb_meta = fb::root_as_file_meta_data(buf)
738760
.map_err(|e| ParquetError::General(format!("Invalid FlatBuffer: {}", e)))?;
@@ -743,6 +765,7 @@ impl FlatBufferConverter {
743765
Ok(ParquetMetaData::new(file_metadata, row_groups))
744766
}
745767

768+
/// Convert file-level metadata (version, num_rows, created_by, key-value, column orders).
746769
fn convert_file_metadata(
747770
fb_meta: &fb::FileMetaData,
748771
schema_descr: SchemaDescPtr,
@@ -786,6 +809,7 @@ impl FlatBufferConverter {
786809
))
787810
}
788811

812+
/// Convert all row groups from the FlatBuffer.
789813
fn convert_row_groups(
790814
fb_meta: &fb::FileMetaData,
791815
schema_descr: SchemaDescPtr,
@@ -800,6 +824,7 @@ impl FlatBufferConverter {
800824
.collect()
801825
}
802826

827+
/// Convert a single row group and its column chunks.
803828
fn convert_row_group(
804829
fb_rg: &fb::RowGroup,
805830
schema_descr: SchemaDescPtr,
@@ -816,10 +841,6 @@ impl FlatBufferConverter {
816841
builder = builder.set_file_offset(fb_rg.file_offset());
817842
}
818843

819-
if let Some(ordinal) = fb_rg.ordinal() {
820-
builder = builder.set_ordinal(ordinal);
821-
}
822-
823844
// Convert sorting columns
824845
if let Some(sorting_cols) = fb_rg.sorting_columns() {
825846
let sorting: Vec<_> = sorting_cols
@@ -843,6 +864,7 @@ impl FlatBufferConverter {
843864
builder.build()
844865
}
845866

867+
/// Convert a column chunk including compression, offsets, and statistics.
846868
fn convert_column_chunk(
847869
fb_cc: &fb::ColumnChunk,
848870
fb_rg: &fb::RowGroup,
@@ -892,6 +914,7 @@ impl FlatBufferConverter {
892914
builder.build()
893915
}
894916

917+
/// Unpack statistics from the FlatBuffer format into typed [`Statistics`].
895918
fn convert_statistics(fb_stats: &fb::Statistics, physical_type: Type) -> Result<Option<Statistics>> {
896919
let null_count = fb_stats.null_count().map(|n| n as u64);
897920

@@ -989,10 +1012,54 @@ impl FlatBufferConverter {
9891012
return Ok(None);
9901013
}
9911014
}
992-
Type::INT96 | Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
993-
// For these types, we don't fully reconstruct statistics
994-
// as they require more context
995-
return Ok(None);
1015+
Type::INT96 => {
1016+
if let (Some(min), Some(max)) = (min_bytes.as_ref(), max_bytes.as_ref()) {
1017+
if min.len() >= 12 && max.len() >= 12 {
1018+
let mut min_val = Int96::new();
1019+
min_val.set_data(
1020+
u32::from_le_bytes(min[0..4].try_into().unwrap()),
1021+
u32::from_le_bytes(min[4..8].try_into().unwrap()),
1022+
u32::from_le_bytes(min[8..12].try_into().unwrap()),
1023+
);
1024+
let mut max_val = Int96::new();
1025+
max_val.set_data(
1026+
u32::from_le_bytes(max[0..4].try_into().unwrap()),
1027+
u32::from_le_bytes(max[4..8].try_into().unwrap()),
1028+
u32::from_le_bytes(max[8..12].try_into().unwrap()),
1029+
);
1030+
Statistics::int96(Some(min_val), Some(max_val), None, null_count, false)
1031+
} else {
1032+
return Ok(None);
1033+
}
1034+
} else {
1035+
return Ok(None);
1036+
}
1037+
}
1038+
Type::BYTE_ARRAY => {
1039+
if let (Some(min), Some(max)) = (min_bytes, max_bytes) {
1040+
Statistics::byte_array(
1041+
Some(ByteArray::from(min)),
1042+
Some(ByteArray::from(max)),
1043+
None,
1044+
null_count,
1045+
false,
1046+
)
1047+
} else {
1048+
return Ok(None);
1049+
}
1050+
}
1051+
Type::FIXED_LEN_BYTE_ARRAY => {
1052+
if let (Some(min), Some(max)) = (min_bytes, max_bytes) {
1053+
Statistics::fixed_len_byte_array(
1054+
Some(FixedLenByteArray::from(min)),
1055+
Some(FixedLenByteArray::from(max)),
1056+
None,
1057+
null_count,
1058+
false,
1059+
)
1060+
} else {
1061+
return Ok(None);
1062+
}
9961063
}
9971064
};
9981065

@@ -1015,7 +1082,7 @@ fn convert_type_to_fb(t: Type) -> fb::Type {
10151082
}
10161083
}
10171084

1018-
#[allow(dead_code)]
1085+
#[cfg(test)]
10191086
fn convert_type_from_fb(t: fb::Type) -> Type {
10201087
match t {
10211088
fb::Type::BOOLEAN => Type::BOOLEAN,
@@ -1038,16 +1105,6 @@ fn convert_repetition_to_fb(r: Repetition) -> fb::FieldRepetitionType {
10381105
}
10391106
}
10401107

1041-
#[allow(dead_code)]
1042-
fn convert_repetition_from_fb(r: fb::FieldRepetitionType) -> Repetition {
1043-
match r {
1044-
fb::FieldRepetitionType::REQUIRED => Repetition::REQUIRED,
1045-
fb::FieldRepetitionType::OPTIONAL => Repetition::OPTIONAL,
1046-
fb::FieldRepetitionType::REPEATED => Repetition::REPEATED,
1047-
_ => Repetition::OPTIONAL, // Default fallback
1048-
}
1049-
}
1050-
10511108
fn convert_compression_to_fb(c: Compression) -> fb::CompressionCodec {
10521109
match c {
10531110
Compression::UNCOMPRESSED => fb::CompressionCodec::UNCOMPRESSED,
@@ -1076,9 +1133,9 @@ fn convert_compression_from_fb(c: fb::CompressionCodec) -> Compression {
10761133

10771134
fn convert_time_unit_to_fb(unit: &crate::basic::TimeUnit) -> fb::TimeUnit {
10781135
match unit {
1079-
crate::basic::TimeUnit::MILLIS => fb::TimeUnit::MS,
1080-
crate::basic::TimeUnit::MICROS => fb::TimeUnit::US,
1081-
crate::basic::TimeUnit::NANOS => fb::TimeUnit::NS,
1136+
crate::basic::TimeUnit::MILLIS => fb::TimeUnit::Millisecond,
1137+
crate::basic::TimeUnit::MICROS => fb::TimeUnit::Microsecond,
1138+
crate::basic::TimeUnit::NANOS => fb::TimeUnit::Nanosecond,
10821139
}
10831140
}
10841141

@@ -1497,7 +1554,7 @@ mod tests {
14971554
#[test]
14981555
fn test_metadata_roundtrip() {
14991556
use crate::file::metadata::{
1500-
ColumnChunkMetaDataBuilder, FileMetaData, ParquetMetaData,
1557+
ColumnChunkMetaDataBuilder, FileMetaData,
15011558
ParquetMetaDataBuilder, RowGroupMetaDataBuilder,
15021559
};
15031560

parquet/src/file/metadata/flatbuf/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,5 @@ mod parquet3_generated;
4242

4343
pub use converter::{
4444
append_flatbuffer, extract_flatbuffer, flatbuf_to_parquet_metadata,
45-
parquet_metadata_to_flatbuf, ExtractResult, FlatBufferConverter, ThriftToFlatBufferConverter,
45+
parquet_metadata_to_flatbuf, ExtractResult,
4646
};
47-
pub use parquet3_generated::parquet::format_3 as format3;

0 commit comments

Comments
 (0)