Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions vortex-layout/src/layouts/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl VTable for List {
}

fn metadata(layout: &Self::Layout) -> Self::Metadata {
ProstMetadata(ListLayoutMetadata::new(layout.offsets_ptype()))
ProstMetadata(ListLayoutMetadata::new(
layout.offsets_ptype(),
layout.fixed_size(),
))
}

fn segment_ids(_layout: &Self::Layout) -> Vec<SegmentId> {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl VTable for List {
elements,
offsets,
validity,
fixed_size: metadata.fixed_size,
})
}

Expand Down Expand Up @@ -192,6 +196,7 @@ pub struct ListLayout {
elements: LayoutRef,
offsets: LayoutRef,
validity: Option<LayoutRef>,
fixed_size: Option<u64>,
}

impl ListLayout {
Expand All @@ -215,9 +220,16 @@ impl ListLayout {
elements,
offsets,
validity,
fixed_size: None,
}
}

/// Return a copy of this layout with fixed-size list metadata attached.
pub fn with_fixed_size(mut self, fixed_size: Option<u64>) -> Self {
self.fixed_size = fixed_size;
self
}

/// Number of lists in this layout.
#[inline]
pub fn row_count(&self) -> u64 {
Expand Down Expand Up @@ -245,6 +257,12 @@ impl ListLayout {
self.offsets.dtype().as_ptype()
}

/// If present, every list row has exactly this many elements.
#[inline]
pub fn fixed_size(&self) -> Option<u64> {
self.fixed_size
}

/// The dtype of the inner elements column.
pub fn elements_dtype(&self) -> &DType {
self.dtype
Expand All @@ -257,12 +275,15 @@ impl ListLayout {
pub struct ListLayoutMetadata {
#[prost(enumeration = "PType", tag = "1")]
offsets_ptype: i32,
#[prost(uint64, optional, tag = "2")]
fixed_size: Option<u64>,
}

impl ListLayoutMetadata {
pub fn new(offsets_ptype: PType) -> Self {
pub fn new(offsets_ptype: PType, fixed_size: Option<u64>) -> Self {
let mut metadata = Self::default();
metadata.set_offsets_ptype(offsets_ptype);
metadata.fixed_size = fixed_size;
metadata
}
}
46 changes: 46 additions & 0 deletions vortex-layout/src/layouts/list/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use vortex_array::dtype::DType;
use vortex_array::dtype::FieldMask;
use vortex_array::dtype::IntegerPType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::expr::Expression;
use vortex_array::expr::is_root;
use vortex_array::expr::not;
Expand All @@ -33,6 +34,7 @@ use vortex_buffer::Buffer;
use vortex_error::VortexError;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_mask::Mask;
use vortex_session::VortexSession;

Expand Down Expand Up @@ -179,6 +181,11 @@ impl ListReader {
/// Fire the offsets read for `row_range`. The offsets child has an extra entry, so reading
/// `row_range` maps to offsets in `[row_range.start..row_range.end + 1)`.
fn fetch_offsets(&self, row_range: &Range<u64>) -> VortexResult<ArrayFuture> {
if let Some(fixed_size) = self.layout.fixed_size() {
let offsets = fixed_size_offsets(row_range, fixed_size, self.layout.offsets_ptype())?;
return Ok(async move { Ok(offsets) }.boxed());
}

let offsets_range = row_range.start..(row_range.end + 1);
let offsets_count = usize::try_from(offsets_range.end - offsets_range.start)?;
self.offsets.projection_evaluation(
Expand Down Expand Up @@ -220,6 +227,38 @@ fn rebase_offsets(offsets: ArrayRef, first: u64) -> VortexResult<ArrayRef> {
offsets.binary(constant, Operator::Sub)
}

#[allow(clippy::unnecessary_fallible_conversions)]
fn fixed_size_offsets(
row_range: &Range<u64>,
fixed_size: u64,
ptype: PType,
) -> VortexResult<ArrayRef> {
vortex_array::match_each_integer_ptype!(ptype, |O| {
fixed_size_offsets_typed::<O>(row_range, fixed_size)
})
}

fn fixed_size_offsets_typed<O>(row_range: &Range<u64>, fixed_size: u64) -> VortexResult<ArrayRef>
where
O: IntegerPType,
O: TryFrom<u64>,
VortexError: From<<O as TryFrom<u64>>::Error>,
{
let len = usize::try_from(row_range.end - row_range.start)?;
let mut offsets = Vec::with_capacity(len + 1);
for row in row_range.start..=row_range.end {
let offset = row
.checked_mul(fixed_size)
.ok_or_else(|| vortex_err!("fixed-size list offset overflow"))?;
offsets.push(O::try_from(offset)?);
}

Ok(
Array::<Primitive>::new::<O>(Buffer::<O>::from(offsets), Validity::NonNullable)
.into_array(),
)
}

fn create_validity(validity_array: Option<ArrayRef>, nullability: Nullability) -> Validity {
match validity_array {
Some(arr) => Validity::Array(arr),
Expand Down Expand Up @@ -873,6 +912,13 @@ mod tests {
Ok(())
}

#[test]
fn fixed_size_offsets_are_absolute() -> VortexResult<()> {
let offsets = fixed_size_offsets(&(2..5), 3, PType::U32)?;
assert_eq!(materialize_u32_array(offsets), vec![6, 9, 12, 15]);
Ok(())
}

// ---- compute_scatter_gather --------------------------------------------------------------

/// Run `compute_scatter_gather` and unwrap the three derived fields plus the kept count.
Expand Down
80 changes: 79 additions & 1 deletion vortex-layout/src/layouts/list/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::List;
use vortex_array::arrays::ListView;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::list::ListDataParts;
use vortex_array::arrays::listview::list_from_list_view;
use vortex_array::dtype::DType;
use vortex_array::dtype::IntegerPType;
use vortex_array::matcher::Matcher;
use vortex_error::VortexError;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_io::session::RuntimeSessionExt;
Expand Down Expand Up @@ -137,6 +140,7 @@ impl LayoutStrategy for ListLayoutStrategy {

// There is one extra element in `offsets`
let row_count = offsets.len().saturating_sub(1);
let fixed_size = detect_fixed_size(&offsets, &mut exec_ctx)?;
let validity_array = dtype
.is_nullable()
.then(|| {
Expand Down Expand Up @@ -183,7 +187,11 @@ impl LayoutStrategy for ListLayoutStrategy {
}
},)?;

Ok(ListLayout::new(dtype, elements_layout, offsets_layout, validity_layout).into_layout())
Ok(
ListLayout::new(dtype, elements_layout, offsets_layout, validity_layout)
.with_fixed_size(fixed_size)
.into_layout(),
)
}

fn buffered_bytes(&self) -> u64 {
Expand All @@ -194,6 +202,42 @@ impl LayoutStrategy for ListLayoutStrategy {
}
}

#[allow(clippy::unnecessary_fallible_conversions)]
fn detect_fixed_size(offsets: &ArrayRef, exec_ctx: &mut ExecutionCtx) -> VortexResult<Option<u64>> {
if offsets.len() <= 1 {
return Ok(None);
}

let offsets = offsets.clone().execute::<PrimitiveArray>(exec_ctx)?;
let ptype = offsets.ptype();
vortex_array::match_each_integer_ptype!(ptype, |O| {
detect_fixed_size_typed::<O>(offsets.as_slice::<O>())
})
}

fn detect_fixed_size_typed<O>(offsets: &[O]) -> VortexResult<Option<u64>>
where
O: IntegerPType,
u64: TryFrom<O>,
VortexError: From<<u64 as TryFrom<O>>::Error>,
{
let first = u64::try_from(offsets[0])?;
if first != 0 {
return Ok(None);
}

let fixed_size = u64::try_from(offsets[1])? - first;
for window in offsets.windows(2) {
let start = u64::try_from(window[0])?;
let end = u64::try_from(window[1])?;
if end - start != fixed_size {
return Ok(None);
}
}

Ok(Some(fixed_size))
}

/// Canonicalize a list-dtype array into [`ListDataParts`]. Short-circuits when the input is
/// already a `List` or `ListView` array — otherwise drives the execution loop until one of
/// those forms appears. `ListView` is rebuilt into zero-copy-to-list form via
Expand Down Expand Up @@ -252,6 +296,7 @@ mod tests {
use super::*;
use crate::layouts::chunked::writer::ChunkedLayoutStrategy;
use crate::layouts::flat::writer::FlatLayoutStrategy;
use crate::layouts::list::List;
use crate::layouts::table::TableStrategy;
use crate::segments::TestSegments;
use crate::sequence::SequentialArrayStreamExt;
Expand Down Expand Up @@ -291,6 +336,39 @@ mod tests {
.into_array()
}

fn create_fixed_size_list() -> ArrayRef {
ListArray::try_new(
buffer![1i32, 2, 3, 4, 5, 6].into_array(),
buffer![0u32, 2, 4, 6].into_array(),
Validity::NonNullable,
)
.unwrap()
.into_array()
}

#[tokio::test]
async fn fixed_size_metadata_detected() -> VortexResult<()> {
let layout = write(&flat_list_strategy(), create_fixed_size_list()).await?;
assert_eq!(layout.as_::<List>().fixed_size(), Some(2));
Ok(())
}

#[test]
fn fixed_size_detection_rejects_variable_offsets() -> VortexResult<()> {
let offsets = buffer![0u32, 2, 5, 5].into_array();
let mut ctx = SESSION.create_execution_ctx();
assert_eq!(detect_fixed_size(&offsets, &mut ctx)?, None);
Ok(())
}

#[test]
fn fixed_size_detection_rejects_nonzero_start() -> VortexResult<()> {
let offsets = buffer![1u32, 3, 5].into_array();
let mut ctx = SESSION.create_execution_ctx();
assert_eq!(detect_fixed_size(&offsets, &mut ctx)?, None);
Ok(())
}

#[tokio::test]
async fn basic_non_nullable_input() -> VortexResult<()> {
let list = create_basic_list(Validity::NonNullable);
Expand Down
Loading