diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 777846c553e..e86f39fcd68 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -48,8 +48,6 @@ use vortex_layout::layouts::compressed::CompressingStrategy; use vortex_layout::layouts::compressed::CompressorPlugin; use vortex_layout::layouts::dict::writer::DictStrategy; use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; -#[cfg(feature = "unstable_encodings")] -use vortex_layout::layouts::list::writer::ListLayoutStrategy; use vortex_layout::layouts::repartition::RepartitionStrategy; use vortex_layout::layouts::repartition::RepartitionWriterOptions; use vortex_layout::layouts::table::TableStrategy; @@ -242,21 +240,12 @@ impl WriteStrategyBuilder { Arc::new(FlatLayoutStrategy::default()) }; - // 7. for each chunk create a layout. Under the `unstable_encodings` feature, list-typed - // chunks route through `ListLayoutStrategy` (separately-addressable elements/offsets/ - // validity sub-layouts; non-list chunks fall through its built-in fallback to `flat`). - // Nested lists (`list>`) recurse, shredding each level into its own - // `ListLayout`. Otherwise everything goes through the flat strategy. + // 7. for each chunk create a layout. Under the `unstable_encodings` feature, route + // list-typed chunks through `TableStrategy` so lists and nested list/struct elements are + // recursively shredded into child layouts. Otherwise everything goes through flat. #[cfg(feature = "unstable_encodings")] - let leaf: Arc = Arc::new( - // Thread the configured `flat` (which carries `allow_encodings` / any custom flat - // override) through every child; list elements still recurse into a nested ListLayout. - ListLayoutStrategy::default() - .with_elements(Arc::clone(&flat)) - .with_offsets(Arc::clone(&flat)) - .with_validity(Arc::clone(&flat)) - .with_fallback(Arc::clone(&flat)), - ); + let leaf: Arc = + Arc::new(TableStrategy::new(Arc::clone(&flat), Arc::clone(&flat)).with_list_layout()); #[cfg(not(feature = "unstable_encodings"))] let leaf: Arc = Arc::clone(&flat); diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index 28b3af52c87..05ec37b37a3 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -37,6 +37,7 @@ use vortex_utils::aliases::hash_set::HashSet; use crate::IntoLayout; use crate::LayoutRef; use crate::LayoutStrategy; +use crate::layouts::list::writer::ListLayoutStrategy; use crate::layouts::struct_::StructLayout; use crate::segments::SegmentSinkRef; use crate::sequence::SendableSequentialStream; @@ -54,6 +55,8 @@ pub struct TableStrategy { validity: Arc, /// The fallback writer for any fields that do not have an explicit writer set in `leaf_writers` fallback: Arc, + /// Whether list arrays should be recursively shredded with [`ListLayoutStrategy`]. + list_layout: bool, } impl TableStrategy { @@ -79,6 +82,7 @@ impl TableStrategy { leaf_writers: Default::default(), validity, fallback, + list_layout: false, } } @@ -145,6 +149,16 @@ impl TableStrategy { self.validity = validity; self } + + /// Recursively write list-typed fields using [`ListLayoutStrategy`]. + /// + /// List elements are written with a nested [`TableStrategy`], so list elements that are + /// themselves lists or structs are recursively shredded. Offsets use the fallback strategy and + /// list validity uses the validity strategy. + pub fn with_list_layout(mut self) -> Self { + self.list_layout = true; + self + } } impl TableStrategy { @@ -155,7 +169,7 @@ impl TableStrategy { let mut new_writers = HashMap::with_capacity(self.leaf_writers.len()); for (field_path, strategy) in &self.leaf_writers { - if field_path.starts_with_field(field) + if field_path.parts().first() == Some(field) && let Some(subpath) = field_path.clone().step_into() { new_writers.insert(subpath, Arc::clone(strategy)); @@ -166,9 +180,17 @@ impl TableStrategy { leaf_writers: new_writers, validity: Arc::clone(&self.validity), fallback: Arc::clone(&self.fallback), + list_layout: self.list_layout, } } + fn list_elements_writer(&self) -> Arc { + self.leaf_writers + .get(&FieldPath::from(Field::ElementType)) + .cloned() + .unwrap_or_else(|| Arc::new(self.descend(&Field::ElementType))) + } + fn validate_path(&self, path: FieldPath) -> FieldPath { assert!( !path.is_root(), @@ -201,6 +223,18 @@ impl LayoutStrategy for TableStrategy { ) -> VortexResult { let dtype = stream.dtype().clone(); + if self.list_layout && dtype.is_list() { + let writer = ListLayoutStrategy::default() + .with_elements(self.list_elements_writer()) + .with_offsets(Arc::clone(&self.fallback)) + .with_validity(Arc::clone(&self.validity)) + .with_fallback(Arc::clone(&self.fallback)); + + return writer + .write_stream(ctx, segment_sink, stream, eof, session) + .await; + } + // Fallback: if the array is not a struct, fallback to writing a single array. if !dtype.is_struct() { return self @@ -333,6 +367,9 @@ impl LayoutStrategy for TableStrategy { if dtype.is_struct() { // Step into the field path for struct columns Arc::new(self.descend(&field)) + } else if self.list_layout && dtype.is_list() { + // Step into list-typed fields so list elements can recurse. + Arc::new(self.descend(&field)) } else { // Use fallback for leaf columns Arc::clone(&self.fallback) @@ -371,11 +408,137 @@ impl LayoutStrategy for TableStrategy { mod tests { use std::sync::Arc; + use vortex_array::ArrayContext; + use vortex_array::ArrayRef; + use vortex_array::IntoArray; + use vortex_array::arrays::ListArray; + use vortex_array::arrays::StructArray; use vortex_array::dtype::FieldPath; use vortex_array::field_path; + use vortex_array::validity::Validity; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use crate::LayoutRef; + use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; use crate::layouts::table::TableStrategy; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + use crate::sequence::SequentialArrayStreamExt; + use crate::test::SESSION; + + fn flat_strategy() -> Arc { + Arc::new(FlatLayoutStrategy::default()) + } + + async fn write(strategy: &S, array: ArrayRef) -> VortexResult { + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let stream = array.to_array_stream().sequenced(ptr); + strategy + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await + } + + fn basic_list() -> VortexResult { + Ok(ListArray::try_new( + buffer![1i32, 2, 3, 4, 5].into_array(), + buffer![0u32, 2, 5, 5].into_array(), + Validity::NonNullable, + )? + .into_array()) + } + + fn nested_list() -> VortexResult { + let inner_list = ListArray::try_new( + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![0u32, 2, 5, 5, 6].into_array(), + Validity::NonNullable, + )? + .into_array(); + + Ok(ListArray::try_new( + inner_list, + buffer![0u32, 2, 4].into_array(), + Validity::NonNullable, + )? + .into_array()) + } + + fn list_of_struct() -> VortexResult { + let struct_array = StructArray::from_fields( + [ + ("a", buffer![1i32, 2, 3, 4, 5].into_array()), + ("b", buffer![10i32, 20, 30, 40, 50].into_array()), + ] + .as_slice(), + )? + .into_array(); + + Ok(ListArray::try_new( + struct_array, + buffer![0u32, 2, 5, 5].into_array(), + Validity::NonNullable, + )? + .into_array()) + } + + #[tokio::test] + async fn list_layout_disabled_uses_fallback() -> VortexResult<()> { + let flat = flat_strategy(); + let strategy = TableStrategy::new(Arc::clone(&flat), flat); + + let layout = write(&strategy, basic_list()?).await?; + insta::assert_snapshot!(layout.display_tree(), @"vortex.flat, dtype: list(i32), segment: 0"); + Ok(()) + } + + #[tokio::test] + async fn with_list_layout_shreds_list() -> VortexResult<()> { + let flat = flat_strategy(); + let strategy = TableStrategy::new(Arc::clone(&flat), flat).with_list_layout(); + + let layout = write(&strategy, basic_list()?).await?; + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list(i32), children: 2 + ├── elements: vortex.flat, dtype: i32, segment: 0 + └── offsets: vortex.flat, dtype: u32, segment: 1 + "); + Ok(()) + } + + #[tokio::test] + async fn with_list_layout_recurses_into_nested_lists() -> VortexResult<()> { + let flat = flat_strategy(); + let strategy = TableStrategy::new(Arc::clone(&flat), flat).with_list_layout(); + + let layout = write(&strategy, nested_list()?).await?; + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list(list(i32)), children: 2 + ├── elements: vortex.list, dtype: list(i32), children: 2 + │ ├── elements: vortex.flat, dtype: i32, segment: 1 + │ └── offsets: vortex.flat, dtype: u32, segment: 2 + └── offsets: vortex.flat, dtype: u32, segment: 0 + "); + Ok(()) + } + + #[tokio::test] + async fn with_list_layout_recurses_into_list_struct_elements() -> VortexResult<()> { + let flat = flat_strategy(); + let strategy = TableStrategy::new(Arc::clone(&flat), flat).with_list_layout(); + + let layout = write(&strategy, list_of_struct()?).await?; + insta::assert_snapshot!(layout.display_tree(), @" + vortex.list, dtype: list({a=i32, b=i32}), children: 2 + ├── elements: vortex.struct, dtype: {a=i32, b=i32}, children: 2 + │ ├── a: vortex.flat, dtype: i32, segment: 1 + │ └── b: vortex.flat, dtype: i32, segment: 2 + └── offsets: vortex.flat, dtype: u32, segment: 0 + "); + Ok(()) + } #[test] #[should_panic(