Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion vortex-bench/src/random_access/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use vortex::array::stream::ArrayStreamExt;
use vortex::buffer::Buffer;
use vortex::file::OpenOptionsSessionExt;
use vortex::file::VortexFile;
use vortex::scan::selection::StrictSortedBuffer;
use vortex::utils::aliases::hash_map::HashMap;

use crate::Format;
Expand Down Expand Up @@ -76,7 +77,7 @@ impl RandomAccessor for VortexRandomAccessor {
let array = self
.file
.scan()?
.with_row_indices(indices_buf)
.with_row_indices(StrictSortedBuffer::try_new(indices_buf)?)
.into_array_stream()?
.read_all()
.await?;
Expand Down
5 changes: 4 additions & 1 deletion vortex-cxx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ mod ffi {
fn with_projection(self: &mut VortexScanBuilder, projection: Box<Expr>);
fn with_projection_ref(self: &mut VortexScanBuilder, projection: &Expr);
fn with_row_range(self: &mut VortexScanBuilder, row_range_start: u64, row_range_end: u64);
fn with_include_by_index(self: &mut VortexScanBuilder, include_by_index: &[u64]);
fn with_include_by_index(
self: &mut VortexScanBuilder,
include_by_index: &[u64],
) -> Result<()>;
fn with_limit(self: &mut VortexScanBuilder, limit: usize);
unsafe fn with_output_schema(
self: &mut VortexScanBuilder,
Expand Down
8 changes: 6 additions & 2 deletions vortex-cxx/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use vortex::io::runtime::BlockingRuntime;
use vortex::layout::scan::arrow::RecordBatchIteratorAdapter;
use vortex::layout::scan::scan_builder::ScanBuilder;
use vortex::scan::selection::Selection;
use vortex::scan::selection::StrictSortedBuffer;

use crate::RUNTIME;
use crate::SESSION;
Expand Down Expand Up @@ -91,9 +92,12 @@ impl VortexScanBuilder {
});
}

pub(crate) fn with_include_by_index(&mut self, include_by_index: &[u64]) {
let selection = Selection::IncludeByIndex(Buffer::copy_from(include_by_index));
pub(crate) fn with_include_by_index(&mut self, include_by_index: &[u64]) -> Result<()> {
let selection = Selection::IncludeByIndex(StrictSortedBuffer::try_new(Buffer::copy_from(
include_by_index,
))?);
take_mut::take(&mut self.inner, |inner| inner.with_selection(selection));
Ok(())
}

pub(crate) fn with_limit(&mut self, limit: usize) {
Expand Down
7 changes: 3 additions & 4 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ mod tests {
use vortex::io::object_store::ObjectStoreWrite;
use vortex::metrics::DefaultMetricsRegistry;
use vortex::scan::selection::Selection;
use vortex::scan::selection::StrictSortedBuffer;
use vortex::session::VortexSession;

use super::*;
Expand Down Expand Up @@ -1193,8 +1194,6 @@ mod tests {
// Test that Selection::IncludeByIndex filters to specific row indices.
async fn test_selection_include_by_index() -> anyhow::Result<()> {
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
use vortex::buffer::Buffer;
use vortex::scan::selection::Selection;

let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "/path/file.vortex";
Expand All @@ -1208,7 +1207,7 @@ mod tests {
file.extensions
.insert(
VortexAccessPlan::default().with_selection(Selection::IncludeByIndex(
Buffer::from_iter(vec![1, 3, 5, 7]),
StrictSortedBuffer::try_new(Buffer::from_iter(vec![1, 3, 5, 7]))?,
)),
);

Expand Down Expand Up @@ -1252,7 +1251,7 @@ mod tests {
file.extensions
.insert(
VortexAccessPlan::default().with_selection(Selection::ExcludeByIndex(
Buffer::from_iter(vec![0, 2, 4, 6, 8]),
StrictSortedBuffer::try_new(Buffer::from_iter(vec![0, 2, 4, 6, 8]))?,
)),
);

Expand Down
13 changes: 10 additions & 3 deletions vortex-duckdb/src/convert/table_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use vortex::scalar_fn::ScalarFnVTableExt;
use vortex::scalar_fn::fns::binary::Binary;
use vortex::scalar_fn::fns::operators::CompareOperator;
use vortex::scan::selection::Selection;
use vortex::scan::selection::StrictSortedBuffer;

use super::expr::try_from_bound_expression_with_col_sub;
use crate::cpp::DUCKDB_VX_EXPR_TYPE;
Expand Down Expand Up @@ -178,7 +179,10 @@ pub fn try_from_virtual_column_filter(
.iter()
.map(nonnegative_number_from_value)
.collect::<VortexResult<Vec<u64>>>()?;
Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None))
Ok((
Selection::IncludeByIndex(StrictSortedBuffer::try_new(Buffer::from_iter(indices))?),
None,
))
}
TableFilterClass::ConstantComparison(const_) => {
let n = nonnegative_number_from_value(const_.value)?;
Expand Down Expand Up @@ -206,7 +210,7 @@ pub fn try_from_virtual_column_filter(
let (sel, range) = try_from_virtual_column_filter(child)?;
if let Selection::IncludeByIndex(buf) = sel {
indices = Some(match indices {
None => buf.iter().copied().collect(),
None => buf.as_slice().to_vec(),
Some(existing) => intersect_sorted(&existing, buf.as_ref()),
});
}
Expand All @@ -217,7 +221,10 @@ pub fn try_from_virtual_column_filter(
}
let range = (start < end).then_some(start..end);
let sel = indices
.map(|v| Selection::IncludeByIndex(Buffer::from_iter(v)))
.map(|v| {
StrictSortedBuffer::try_new(Buffer::from_iter(v)).map(Selection::IncludeByIndex)
})
.transpose()?
.unwrap_or(Selection::All);
Ok((sel, range))
}
Expand Down
5 changes: 3 additions & 2 deletions vortex-ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use vortex::scan::Partition;
use vortex::scan::PartitionStream;
use vortex::scan::ScanRequest;
use vortex::scan::selection::Selection;
use vortex::scan::selection::StrictSortedBuffer;

use crate::RUNTIME;
use crate::array::vx_array;
Expand Down Expand Up @@ -177,13 +178,13 @@ fn scan_request(opts: *const vx_scan_options) -> VortexResult<ScanRequest> {
vortex_ensure!(!selection.idx.is_null());
let buf = unsafe { slice::from_raw_parts(selection.idx, selection.idx_len) };
let buf = Buffer::copy_from(buf);
Selection::IncludeByIndex(buf)
Selection::IncludeByIndex(StrictSortedBuffer::try_new(buf)?)
}
vx_scan_selection_include::VX_SELECTION_EXCLUDE_RANGE => {
vortex_ensure!(!selection.idx.is_null());
let buf = unsafe { slice::from_raw_parts(selection.idx, selection.idx_len) };
let buf = Buffer::copy_from(buf);
Selection::ExcludeByIndex(buf)
Selection::ExcludeByIndex(StrictSortedBuffer::try_new(buf)?)
}
};

Expand Down
21 changes: 13 additions & 8 deletions vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use vortex_layout::layouts::zoned::LegacyStats;
use vortex_layout::layouts::zoned::Zoned;
use vortex_layout::scan::scan_builder::ScanBuilder;
use vortex_layout::session::LayoutSession;
use vortex_scan::selection::StrictSortedBuffer;
use vortex_session::VortexSession;

use crate::OpenOptionsSessionExt;
Expand All @@ -86,6 +87,10 @@ static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
session
});

fn strict_sorted(indices: Buffer<u64>) -> StrictSortedBuffer<u64> {
StrictSortedBuffer::try_new(indices).expect("test indices should be strictly increasing")
}

#[tokio::test]
async fn test_eof_values() {
// this test exists as a reminder to think about whether we should increment the version
Expand Down Expand Up @@ -740,7 +745,7 @@ async fn test_with_indices_simple() {
let actual_kept_array = file
.scan()
.unwrap()
.with_row_indices(Buffer::<u64>::empty())
.with_row_indices(strict_sorted(Buffer::<u64>::empty()))
.into_array_stream()
.unwrap()
.read_all()
Expand All @@ -757,7 +762,7 @@ async fn test_with_indices_simple() {
let actual_kept_array = file
.scan()
.unwrap()
.with_row_indices(Buffer::from_iter(kept_indices))
.with_row_indices(strict_sorted(Buffer::from_iter(kept_indices)))
.into_array_stream()
.unwrap()
.read_all()
Expand All @@ -782,7 +787,7 @@ async fn test_with_indices_simple() {
let actual_array = file
.scan()
.unwrap()
.with_row_indices((0u64..500).collect::<Buffer<_>>())
.with_row_indices(strict_sorted((0u64..500).collect::<Buffer<_>>()))
.into_array_stream()
.unwrap()
.read_all()
Expand Down Expand Up @@ -827,7 +832,7 @@ async fn test_with_indices_on_two_columns() {
let array = file
.scan()
.unwrap()
.with_row_indices(Buffer::from_iter(kept_indices))
.with_row_indices(strict_sorted(Buffer::from_iter(kept_indices)))
.into_array_stream()
.unwrap()
.read_all()
Expand Down Expand Up @@ -884,7 +889,7 @@ async fn test_with_indices_and_with_row_filter_simple() {
.scan()
.unwrap()
.with_filter(gt(get_item("numbers", root()), lit(50_i16)))
.with_row_indices(Buffer::empty())
.with_row_indices(strict_sorted(Buffer::empty()))
.into_array_stream()
.unwrap()
.read_all()
Expand All @@ -902,7 +907,7 @@ async fn test_with_indices_and_with_row_filter_simple() {
.scan()
.unwrap()
.with_filter(gt(get_item("numbers", root()), lit(50_i16)))
.with_row_indices(Buffer::from_iter(kept_indices))
.with_row_indices(strict_sorted(Buffer::from_iter(kept_indices)))
.into_array_stream()
.unwrap()
.read_all()
Expand Down Expand Up @@ -930,7 +935,7 @@ async fn test_with_indices_and_with_row_filter_simple() {
.scan()
.unwrap()
.with_filter(gt(get_item("numbers", root()), lit(50_i16)))
.with_row_indices((0..500).collect::<Buffer<_>>())
.with_row_indices(strict_sorted((0..500).collect::<Buffer<_>>()))
.into_array_stream()
.unwrap()
.read_all()
Expand Down Expand Up @@ -1233,7 +1238,7 @@ async fn file_take() -> VortexResult<()> {
let vxf = chunked_file().await?;
let result = vxf
.scan()?
.with_row_indices(buffer![0, 1, 8])
.with_row_indices(StrictSortedBuffer::try_new(buffer![0, 1, 8])?)
.into_array_stream()?
.read_all()
.await?;
Expand Down
9 changes: 7 additions & 2 deletions vortex-jni/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use vortex::scan::PartitionRef;
use vortex::scan::PartitionStream;
use vortex::scan::ScanRequest;
use vortex::scan::selection::Selection;
use vortex::scan::selection::StrictSortedBuffer;

use crate::RUNTIME;
use crate::data_source::NativeDataSource;
Expand Down Expand Up @@ -96,8 +97,12 @@ fn build_scan_request(

let selection = match selection_include {
0 => Selection::All,
1 => Selection::IncludeByIndex(Buffer::copy_from(selection_idx)),
2 => Selection::ExcludeByIndex(Buffer::copy_from(selection_idx)),
1 => Selection::IncludeByIndex(StrictSortedBuffer::try_new(Buffer::copy_from(
selection_idx,
))?),
2 => Selection::ExcludeByIndex(StrictSortedBuffer::try_new(Buffer::copy_from(
selection_idx,
))?),
Comment on lines +100 to +105

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove the validation from java? JNI actually gets validated selections

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea where you want to assert that? Seems odd to accept it unchecked over FFI?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the code you will notice that with this change this validation happens twice. I am asking you to have it happen once again as before this change. The validation happens on the java side of the jni bindings

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 => Selection::IncludeRoaring(deserialize_roaring_selection(selection_roaring_bitmap)?),
4 => Selection::ExcludeRoaring(deserialize_roaring_selection(selection_roaring_bitmap)?),
other => vortex_bail!("unknown selection include code: {other}"),
Expand Down
6 changes: 3 additions & 3 deletions vortex-layout/src/scan/scan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use vortex_array::iter::ArrayIteratorAdapter;
use vortex_array::stats::StatsSet;
use vortex_array::stream::ArrayStream;
use vortex_array::stream::ArrayStreamAdapter;
use vortex_buffer::Buffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
Expand All @@ -34,6 +33,7 @@ use vortex_io::runtime::Task;
use vortex_io::session::RuntimeSessionExt;
use vortex_metrics::MetricsRegistry;
use vortex_scan::selection::Selection;
use vortex_scan::selection::StrictSortedBuffer;
use vortex_session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;

Expand Down Expand Up @@ -172,8 +172,8 @@ impl<A: 'static + Send> ScanBuilder<A> {
self
}

/// Select rows by absolute indices relative to the scan input.
pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
/// Select rows by strictly sorted absolute indices relative to the scan input.
pub fn with_row_indices(mut self, row_indices: StrictSortedBuffer<u64>) -> Self {
self.selection = Selection::IncludeByIndex(row_indices);
self
}
Expand Down
3 changes: 2 additions & 1 deletion vortex-python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use vortex::file::OpenOptionsSessionExt;
use vortex::file::VortexFile;
use vortex::io::runtime::BlockingRuntime;
use vortex::layout::scan::split_by::SplitBy;
use vortex::scan::selection::StrictSortedBuffer;

use crate::RUNTIME;
use crate::arrays::PyArrayRef;
Expand Down Expand Up @@ -66,7 +67,7 @@ pub fn read_array_from_reader(
if let Some(indices) = indices {
let primitive = indices.execute::<PrimitiveArray>(ctx)?;
let indices = primitive.into_buffer();
scan = scan.with_row_indices(indices);
scan = scan.with_row_indices(StrictSortedBuffer::try_new(indices)?);
}

if let Some((l, r)) = row_range {
Expand Down
3 changes: 2 additions & 1 deletion vortex-python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use vortex::io::runtime::BlockingRuntime;
use vortex::layout::scan::scan_builder::ScanBuilder;
use vortex::layout::scan::split_by::SplitBy;
use vortex::layout::segments::MokaSegmentCache;
use vortex::scan::selection::StrictSortedBuffer;

use crate::RUNTIME;
use crate::arrays::PyArrayRef;
Expand Down Expand Up @@ -220,7 +221,7 @@ fn scan_builder(
if let Some(indices) = indices {
let casted = indices.cast(DType::Primitive(PType::U64, NonNullable))?;
let indices = casted.execute::<PrimitiveArray>(ctx)?.into_buffer::<u64>();
builder = builder.with_row_indices(indices);
builder = builder.with_row_indices(StrictSortedBuffer::try_new(indices)?);
}

if let Some(batch_size) = batch_size {
Expand Down
4 changes: 2 additions & 2 deletions vortex-scan/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ let record_batch: RecordBatch = batch ?;
### Row Selection

```rust
use vortex_scan::Selection;
use vortex_scan::selection::{Selection, StrictSortedBuffer};

// Select specific rows by index
let scan = ScanBuilder::new(layout_reader)
.with_selection(Selection::IncludeByIndex(indices.into()))
.with_selection(Selection::IncludeByIndex(StrictSortedBuffer::try_new(indices.into())?))
.build() ?;

// Or use row ranges
Expand Down
Loading
Loading