Skip to content

Commit 765c5b7

Browse files
feat: Support roundtrip ListView in parquet arrow writer (#9352)
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. --> - Closes #9344 . # Rationale for this change This PR implements support for roundtrip reading and writing of ListView and LargeListView types in Parquet Arrow integration. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Handle ListView/LargeListView in Arrow ↔ Parquet schema conversion - Reader/Writer support - Added some tests # Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> YES # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. --> Users can now write and read ListView/LargeListView arrays to/from Parquet files --------- Co-authored-by: Jeffrey Vo <jeffreyvo@apache.org>
1 parent 198846d commit 765c5b7

7 files changed

Lines changed: 582 additions & 35 deletions

File tree

parquet/src/arrow/array_reader/builder.rs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ use crate::arrow::array_reader::row_group_cache::RowGroupCache;
2929
use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
3030
use crate::arrow::array_reader::row_number::RowNumberReader;
3131
use crate::arrow::array_reader::{
32-
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
33-
PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
34-
make_byte_array_reader,
32+
ArrayReader, FixedSizeListArrayReader, ListArrayReader, ListViewArrayReader, MapArrayReader,
33+
NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader,
34+
make_byte_array_dictionary_reader, make_byte_array_reader,
3535
};
3636
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
3737
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
@@ -178,8 +178,10 @@ impl<'a> ArrayReaderBuilder<'a> {
178178
ParquetFieldType::Group { .. } => match &field.arrow_type {
179179
DataType::Map(_, _) => self.build_map_reader(field, mask),
180180
DataType::Struct(_) => self.build_struct_reader(field, mask),
181-
DataType::List(_) => self.build_list_reader(field, mask, false),
182-
DataType::LargeList(_) => self.build_list_reader(field, mask, true),
181+
DataType::List(_)
182+
| DataType::LargeList(_)
183+
| DataType::ListView(_)
184+
| DataType::LargeListView(_) => self.build_list_reader(field, mask),
183185
DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask),
184186
d => unimplemented!("reading group type {} not implemented", d),
185187
},
@@ -266,7 +268,6 @@ impl<'a> ArrayReaderBuilder<'a> {
266268
&self,
267269
field: &ParquetField,
268270
mask: &ProjectionMask,
269-
is_large: bool,
270271
) -> Result<Option<Box<dyn ArrayReader>>> {
271272
let children = field.children().unwrap();
272273
assert_eq!(children.len(), 1);
@@ -275,32 +276,56 @@ impl<'a> ArrayReaderBuilder<'a> {
275276
Some(item_reader) => {
276277
// Need to retrieve underlying data type to handle projection
277278
let item_type = item_reader.get_data_type().clone();
278-
let data_type = match &field.arrow_type {
279+
let reader: Box<dyn ArrayReader> = match &field.arrow_type {
279280
DataType::List(f) => {
280-
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
281+
let data_type =
282+
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)));
283+
Box::new(ListArrayReader::<i32>::new(
284+
item_reader,
285+
data_type,
286+
field.def_level,
287+
field.rep_level,
288+
field.nullable,
289+
))
281290
}
282291
DataType::LargeList(f) => {
283-
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
292+
let data_type = DataType::LargeList(Arc::new(
293+
f.as_ref().clone().with_data_type(item_type),
294+
));
295+
Box::new(ListArrayReader::<i64>::new(
296+
item_reader,
297+
data_type,
298+
field.def_level,
299+
field.rep_level,
300+
field.nullable,
301+
))
302+
}
303+
DataType::ListView(f) => {
304+
let data_type = DataType::ListView(Arc::new(
305+
f.as_ref().clone().with_data_type(item_type),
306+
));
307+
Box::new(ListViewArrayReader::<i32>::new(
308+
item_reader,
309+
data_type,
310+
field.def_level,
311+
field.rep_level,
312+
field.nullable,
313+
))
314+
}
315+
DataType::LargeListView(f) => {
316+
let data_type = DataType::LargeListView(Arc::new(
317+
f.as_ref().clone().with_data_type(item_type),
318+
));
319+
Box::new(ListViewArrayReader::<i64>::new(
320+
item_reader,
321+
data_type,
322+
field.def_level,
323+
field.rep_level,
324+
field.nullable,
325+
))
284326
}
285327
_ => unreachable!(),
286328
};
287-
288-
let reader = match is_large {
289-
false => Box::new(ListArrayReader::<i32>::new(
290-
item_reader,
291-
data_type,
292-
field.def_level,
293-
field.rep_level,
294-
field.nullable,
295-
)) as _,
296-
true => Box::new(ListArrayReader::<i64>::new(
297-
item_reader,
298-
data_type,
299-
field.def_level,
300-
field.rep_level,
301-
field.nullable,
302-
)) as _,
303-
};
304329
Some(reader)
305330
}
306331
None => None,
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::array_reader::{ArrayReader, ListArrayReader};
19+
use crate::errors::Result;
20+
use arrow_array::cast::AsArray;
21+
use arrow_array::{Array, ArrayRef, GenericListViewArray, OffsetSizeTrait, new_empty_array};
22+
use arrow_schema::DataType as ArrowType;
23+
use std::any::Any;
24+
use std::sync::Arc;
25+
26+
/// Implementation of list view array reader.
27+
/// This wraps a ListArrayReader and converts the result to ListViewArray.
28+
pub struct ListViewArrayReader<OffsetSize: OffsetSizeTrait> {
29+
inner: ListArrayReader<OffsetSize>,
30+
data_type: ArrowType,
31+
}
32+
33+
impl<OffsetSize: OffsetSizeTrait> ListViewArrayReader<OffsetSize> {
34+
/// Construct list view array reader.
35+
pub fn new(
36+
item_reader: Box<dyn ArrayReader>,
37+
data_type: ArrowType,
38+
def_level: i16,
39+
rep_level: i16,
40+
nullable: bool,
41+
) -> Self {
42+
// Create the underlying ListArrayReader with the corresponding List type
43+
let list_data_type = match &data_type {
44+
ArrowType::ListView(f) => ArrowType::List(f.clone()),
45+
ArrowType::LargeListView(f) => ArrowType::LargeList(f.clone()),
46+
_ => unreachable!(),
47+
};
48+
49+
let inner =
50+
ListArrayReader::new(item_reader, list_data_type, def_level, rep_level, nullable);
51+
52+
Self { inner, data_type }
53+
}
54+
}
55+
56+
impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListViewArrayReader<OffsetSize> {
57+
fn as_any(&self) -> &dyn Any {
58+
self
59+
}
60+
61+
/// Returns data type.
62+
/// This must be a ListView.
63+
fn get_data_type(&self) -> &ArrowType {
64+
&self.data_type
65+
}
66+
67+
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
68+
self.inner.read_records(batch_size)
69+
}
70+
71+
fn consume_batch(&mut self) -> Result<ArrayRef> {
72+
let array = self.inner.consume_batch()?;
73+
if array.is_empty() {
74+
return Ok(new_empty_array(&self.data_type));
75+
}
76+
77+
// Convert ListArray to ListViewArray
78+
let list_array = array.as_list::<OffsetSize>();
79+
80+
let list_view_array =
81+
Arc::new(GenericListViewArray::<OffsetSize>::from(list_array.clone()));
82+
83+
// Ensure the data type is correct
84+
assert_eq!(
85+
list_view_array.data_type(),
86+
&self.data_type,
87+
"Converted array type does not match expected type"
88+
);
89+
90+
Ok(list_view_array)
91+
}
92+
93+
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
94+
self.inner.skip_records(num_records)
95+
}
96+
97+
fn get_def_levels(&self) -> Option<&[i16]> {
98+
self.inner.get_def_levels()
99+
}
100+
101+
fn get_rep_levels(&self) -> Option<&[i16]> {
102+
self.inner.get_rep_levels()
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use super::*;
109+
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
110+
use arrow::datatypes::Int32Type as ArrowInt32;
111+
use arrow_array::PrimitiveArray;
112+
113+
fn test_nullable_list_view<OffsetSize: OffsetSizeTrait>() {
114+
// [[1, null, 2], null, [], [3, 4], [], [], null, [], [null, 1]]
115+
let expected =
116+
GenericListViewArray::<OffsetSize>::from_iter_primitive::<ArrowInt32, _, _>(vec![
117+
Some(vec![Some(1), None, Some(2)]),
118+
None,
119+
Some(vec![]),
120+
Some(vec![Some(3), Some(4)]),
121+
Some(vec![]),
122+
Some(vec![]),
123+
None,
124+
Some(vec![]),
125+
Some(vec![None, Some(1)]),
126+
]);
127+
128+
let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
129+
Some(1),
130+
None,
131+
Some(2),
132+
None,
133+
None,
134+
Some(3),
135+
Some(4),
136+
None,
137+
None,
138+
None,
139+
None,
140+
None,
141+
Some(1),
142+
]));
143+
144+
let item_array_reader = InMemoryArrayReader::new(
145+
ArrowType::Int32,
146+
array,
147+
Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]),
148+
Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]),
149+
);
150+
151+
let field = Arc::new(arrow_schema::Field::new_list_field(ArrowType::Int32, true));
152+
let data_type = if OffsetSize::IS_LARGE {
153+
ArrowType::LargeListView(field)
154+
} else {
155+
ArrowType::ListView(field)
156+
};
157+
158+
let mut list_view_array_reader = ListViewArrayReader::<OffsetSize>::new(
159+
Box::new(item_array_reader),
160+
data_type,
161+
2,
162+
1,
163+
true,
164+
);
165+
166+
let actual = list_view_array_reader.next_batch(1024).unwrap();
167+
let actual = actual
168+
.as_any()
169+
.downcast_ref::<GenericListViewArray<OffsetSize>>()
170+
.unwrap();
171+
172+
assert_eq!(&expected, actual)
173+
}
174+
175+
fn test_required_list_view<OffsetSize: OffsetSizeTrait>() {
176+
// [[1, null, 2], [], [3, 4], [], [], [null, 1]]
177+
let expected =
178+
GenericListViewArray::<OffsetSize>::from_iter_primitive::<ArrowInt32, _, _>(vec![
179+
Some(vec![Some(1), None, Some(2)]),
180+
Some(vec![]),
181+
Some(vec![Some(3), Some(4)]),
182+
Some(vec![]),
183+
Some(vec![]),
184+
Some(vec![None, Some(1)]),
185+
]);
186+
187+
let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
188+
Some(1),
189+
None,
190+
Some(2),
191+
None,
192+
Some(3),
193+
Some(4),
194+
None,
195+
None,
196+
None,
197+
Some(1),
198+
]));
199+
200+
let item_array_reader = InMemoryArrayReader::new(
201+
ArrowType::Int32,
202+
array,
203+
Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]),
204+
Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]),
205+
);
206+
207+
let field = Arc::new(arrow_schema::Field::new_list_field(ArrowType::Int32, true));
208+
let data_type = if OffsetSize::IS_LARGE {
209+
ArrowType::LargeListView(field)
210+
} else {
211+
ArrowType::ListView(field)
212+
};
213+
214+
let mut list_view_array_reader = ListViewArrayReader::<OffsetSize>::new(
215+
Box::new(item_array_reader),
216+
data_type,
217+
1,
218+
1,
219+
false,
220+
);
221+
222+
let actual = list_view_array_reader.next_batch(1024).unwrap();
223+
let actual = actual
224+
.as_any()
225+
.downcast_ref::<GenericListViewArray<OffsetSize>>()
226+
.unwrap();
227+
228+
assert_eq!(&expected, actual)
229+
}
230+
231+
fn test_list_view_array<OffsetSize: OffsetSizeTrait>() {
232+
test_nullable_list_view::<OffsetSize>();
233+
test_required_list_view::<OffsetSize>();
234+
}
235+
236+
#[test]
237+
fn test_list_view_array_reader() {
238+
test_list_view_array::<i32>();
239+
}
240+
241+
#[test]
242+
fn test_large_list_view_array_reader() {
243+
test_list_view_array::<i64>()
244+
}
245+
}

parquet/src/arrow/array_reader/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod empty_array;
3939
mod fixed_len_byte_array;
4040
mod fixed_size_list_array;
4141
mod list_array;
42+
mod list_view_array;
4243
mod map_array;
4344
mod null_array;
4445
mod primitive_array;
@@ -61,6 +62,7 @@ pub use byte_view_array::make_byte_view_array_reader;
6162
pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
6263
pub use fixed_size_list_array::FixedSizeListArrayReader;
6364
pub use list_array::ListArrayReader;
65+
pub use list_view_array::ListViewArrayReader;
6466
pub use map_array::MapArrayReader;
6567
pub use null_array::NullArrayReader;
6668
pub use primitive_array::PrimitiveArrayReader;

0 commit comments

Comments
 (0)