Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ use vortex_layout::layouts::compressed::CompressingStrategy;
use vortex_layout::layouts::compressed::CompressorPlugin;
use vortex_layout::layouts::dict::writer::DictStrategy;
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
#[cfg(feature = "unstable_encodings")]
use vortex_layout::layouts::list::writer::ListLayoutStrategy;
use vortex_layout::layouts::repartition::RepartitionStrategy;
use vortex_layout::layouts::repartition::RepartitionWriterOptions;
use vortex_layout::layouts::table::TableStrategy;
Expand Down Expand Up @@ -242,21 +240,12 @@ impl WriteStrategyBuilder {
Arc::new(FlatLayoutStrategy::default())
};

// 7. for each chunk create a layout. Under the `unstable_encodings` feature, list-typed
// chunks route through `ListLayoutStrategy` (separately-addressable elements/offsets/
// validity sub-layouts; non-list chunks fall through its built-in fallback to `flat`).
// Nested lists (`list<list<...>>`) recurse, shredding each level into its own
// `ListLayout`. Otherwise everything goes through the flat strategy.
// 7. for each chunk create a layout. Under the `unstable_encodings` feature, route
// list-typed chunks through `TableStrategy` so lists and nested list/struct elements are
// recursively shredded into child layouts. Otherwise everything goes through flat.
#[cfg(feature = "unstable_encodings")]
let leaf: Arc<dyn LayoutStrategy> = Arc::new(
// Thread the configured `flat` (which carries `allow_encodings` / any custom flat
// override) through every child; list elements still recurse into a nested ListLayout.
ListLayoutStrategy::default()
.with_elements(Arc::clone(&flat))
.with_offsets(Arc::clone(&flat))
.with_validity(Arc::clone(&flat))
.with_fallback(Arc::clone(&flat)),
);
let leaf: Arc<dyn LayoutStrategy> =
Arc::new(TableStrategy::new(Arc::clone(&flat), Arc::clone(&flat)).with_list_layout());
#[cfg(not(feature = "unstable_encodings"))]
let leaf: Arc<dyn LayoutStrategy> = Arc::clone(&flat);

Expand Down
165 changes: 164 additions & 1 deletion vortex-layout/src/layouts/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use vortex_utils::aliases::hash_set::HashSet;
use crate::IntoLayout;
use crate::LayoutRef;
use crate::LayoutStrategy;
use crate::layouts::list::writer::ListLayoutStrategy;
use crate::layouts::struct_::StructLayout;
use crate::segments::SegmentSinkRef;
use crate::sequence::SendableSequentialStream;
Expand All @@ -54,6 +55,8 @@ pub struct TableStrategy {
validity: Arc<dyn LayoutStrategy>,
/// The fallback writer for any fields that do not have an explicit writer set in `leaf_writers`
fallback: Arc<dyn LayoutStrategy>,
/// Whether list arrays should be recursively shredded with [`ListLayoutStrategy`].
list_layout: bool,
}

impl TableStrategy {
Expand All @@ -79,6 +82,7 @@ impl TableStrategy {
leaf_writers: Default::default(),
validity,
fallback,
list_layout: false,
}
}

Expand Down Expand Up @@ -145,6 +149,16 @@ impl TableStrategy {
self.validity = validity;
self
}

/// Recursively write list-typed fields using [`ListLayoutStrategy`].
///
/// List elements are written with a nested [`TableStrategy`], so list elements that are
/// themselves lists or structs are recursively shredded. Offsets use the fallback strategy and
/// list validity uses the validity strategy.
pub fn with_list_layout(mut self) -> Self {
self.list_layout = true;
self
}
}

impl TableStrategy {
Expand All @@ -155,7 +169,7 @@ impl TableStrategy {
let mut new_writers = HashMap::with_capacity(self.leaf_writers.len());

for (field_path, strategy) in &self.leaf_writers {
if field_path.starts_with_field(field)
if field_path.parts().first() == Some(field)
&& let Some(subpath) = field_path.clone().step_into()
{
new_writers.insert(subpath, Arc::clone(strategy));
Expand All @@ -166,9 +180,17 @@ impl TableStrategy {
leaf_writers: new_writers,
validity: Arc::clone(&self.validity),
fallback: Arc::clone(&self.fallback),
list_layout: self.list_layout,
}
}

fn list_elements_writer(&self) -> Arc<dyn LayoutStrategy> {
self.leaf_writers
.get(&FieldPath::from(Field::ElementType))
.cloned()
.unwrap_or_else(|| Arc::new(self.descend(&Field::ElementType)))
}

fn validate_path(&self, path: FieldPath) -> FieldPath {
assert!(
!path.is_root(),
Expand Down Expand Up @@ -201,6 +223,18 @@ impl LayoutStrategy for TableStrategy {
) -> VortexResult<LayoutRef> {
let dtype = stream.dtype().clone();

if self.list_layout && dtype.is_list() {
let writer = ListLayoutStrategy::default()
.with_elements(self.list_elements_writer())
.with_offsets(Arc::clone(&self.fallback))
.with_validity(Arc::clone(&self.validity))
.with_fallback(Arc::clone(&self.fallback));

return writer
.write_stream(ctx, segment_sink, stream, eof, session)
.await;
}

// Fallback: if the array is not a struct, fallback to writing a single array.
if !dtype.is_struct() {
return self
Expand Down Expand Up @@ -333,6 +367,9 @@ impl LayoutStrategy for TableStrategy {
if dtype.is_struct() {
// Step into the field path for struct columns
Arc::new(self.descend(&field))
} else if self.list_layout && dtype.is_list() {
// Step into list-typed fields so list elements can recurse.
Arc::new(self.descend(&field))
} else {
// Use fallback for leaf columns
Arc::clone(&self.fallback)
Expand Down Expand Up @@ -371,11 +408,137 @@ impl LayoutStrategy for TableStrategy {
mod tests {
use std::sync::Arc;

use vortex_array::ArrayContext;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::arrays::ListArray;
use vortex_array::arrays::StructArray;
use vortex_array::dtype::FieldPath;
use vortex_array::field_path;
use vortex_array::validity::Validity;
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use crate::LayoutRef;
use crate::LayoutStrategy;
use crate::layouts::flat::writer::FlatLayoutStrategy;
use crate::layouts::table::TableStrategy;
use crate::segments::TestSegments;
use crate::sequence::SequenceId;
use crate::sequence::SequentialArrayStreamExt;
use crate::test::SESSION;

fn flat_strategy() -> Arc<dyn LayoutStrategy> {
Arc::new(FlatLayoutStrategy::default())
}

async fn write<S: LayoutStrategy>(strategy: &S, array: ArrayRef) -> VortexResult<LayoutRef> {
let segments = Arc::new(TestSegments::default());
let (ptr, eof) = SequenceId::root().split();
let stream = array.to_array_stream().sequenced(ptr);
strategy
.write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION)
.await
}

fn basic_list() -> VortexResult<ArrayRef> {
Ok(ListArray::try_new(
buffer![1i32, 2, 3, 4, 5].into_array(),
buffer![0u32, 2, 5, 5].into_array(),
Validity::NonNullable,
)?
.into_array())
}

fn nested_list() -> VortexResult<ArrayRef> {
let inner_list = ListArray::try_new(
buffer![1i32, 2, 3, 4, 5, 6].into_array(),
buffer![0u32, 2, 5, 5, 6].into_array(),
Validity::NonNullable,
)?
.into_array();

Ok(ListArray::try_new(
inner_list,
buffer![0u32, 2, 4].into_array(),
Validity::NonNullable,
)?
.into_array())
}

fn list_of_struct() -> VortexResult<ArrayRef> {
let struct_array = StructArray::from_fields(
[
("a", buffer![1i32, 2, 3, 4, 5].into_array()),
("b", buffer![10i32, 20, 30, 40, 50].into_array()),
]
.as_slice(),
)?
.into_array();

Ok(ListArray::try_new(
struct_array,
buffer![0u32, 2, 5, 5].into_array(),
Validity::NonNullable,
)?
.into_array())
}

#[tokio::test]
async fn list_layout_disabled_uses_fallback() -> VortexResult<()> {
let flat = flat_strategy();
let strategy = TableStrategy::new(Arc::clone(&flat), flat);

let layout = write(&strategy, basic_list()?).await?;
insta::assert_snapshot!(layout.display_tree(), @"vortex.flat, dtype: list(i32), segment: 0");
Ok(())
}

#[tokio::test]
async fn with_list_layout_shreds_list() -> VortexResult<()> {
let flat = flat_strategy();
let strategy = TableStrategy::new(Arc::clone(&flat), flat).with_list_layout();

let layout = write(&strategy, basic_list()?).await?;
insta::assert_snapshot!(layout.display_tree(), @"
vortex.list, dtype: list(i32), children: 2
├── elements: vortex.flat, dtype: i32, segment: 0
└── offsets: vortex.flat, dtype: u32, segment: 1
");
Ok(())
}

#[tokio::test]
async fn with_list_layout_recurses_into_nested_lists() -> VortexResult<()> {
let flat = flat_strategy();
let strategy = TableStrategy::new(Arc::clone(&flat), flat).with_list_layout();

let layout = write(&strategy, nested_list()?).await?;
insta::assert_snapshot!(layout.display_tree(), @"
vortex.list, dtype: list(list(i32)), children: 2
├── elements: vortex.list, dtype: list(i32), children: 2
│ ├── elements: vortex.flat, dtype: i32, segment: 1
│ └── offsets: vortex.flat, dtype: u32, segment: 2
└── offsets: vortex.flat, dtype: u32, segment: 0
");
Ok(())
}

#[tokio::test]
async fn with_list_layout_recurses_into_list_struct_elements() -> VortexResult<()> {
let flat = flat_strategy();
let strategy = TableStrategy::new(Arc::clone(&flat), flat).with_list_layout();

let layout = write(&strategy, list_of_struct()?).await?;
insta::assert_snapshot!(layout.display_tree(), @"
vortex.list, dtype: list({a=i32, b=i32}), children: 2
├── elements: vortex.struct, dtype: {a=i32, b=i32}, children: 2
│ ├── a: vortex.flat, dtype: i32, segment: 1
│ └── b: vortex.flat, dtype: i32, segment: 2
└── offsets: vortex.flat, dtype: u32, segment: 0
");
Ok(())
}

#[test]
#[should_panic(
Expand Down
Loading