diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index a1e370d08e9..64327ae25df 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -8326,6 +8326,10 @@ pub fn vortex_array::buffer::BufferHandle::as_host_opt(&self) -> core::option::O pub fn vortex_array::buffer::BufferHandle::ensure_aligned(self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult +pub fn vortex_array::buffer::BufferHandle::filter(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult + +pub fn vortex_array::buffer::BufferHandle::filter_typed(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult + pub fn vortex_array::buffer::BufferHandle::into_host(self) -> futures_core::future::BoxFuture<'static, vortex_buffer::ByteBuffer> pub fn vortex_array::buffer::BufferHandle::into_host_sync(self) -> vortex_buffer::ByteBuffer @@ -8400,6 +8404,8 @@ pub fn vortex_array::buffer::DeviceBuffer::copy_to_host(&self, alignment: vortex pub fn vortex_array::buffer::DeviceBuffer::copy_to_host_sync(&self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult +pub fn vortex_array::buffer::DeviceBuffer::filter(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + pub fn vortex_array::buffer::DeviceBuffer::is_empty(&self) -> bool pub fn vortex_array::buffer::DeviceBuffer::len(&self) -> usize @@ -8408,10 +8414,14 @@ pub fn vortex_array::buffer::DeviceBuffer::slice(&self, range: core::ops::range: pub trait vortex_array::buffer::DeviceBufferExt: vortex_array::buffer::DeviceBuffer +pub fn vortex_array::buffer::DeviceBufferExt::filter_typed(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + pub fn vortex_array::buffer::DeviceBufferExt::slice_typed(&self, range: core::ops::range::Range) -> alloc::sync::Arc impl vortex_array::buffer::DeviceBufferExt for B +pub fn B::filter_typed(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + pub fn B::slice_typed(&self, range: core::ops::range::Range) -> alloc::sync::Arc pub mod vortex_array::builders diff --git a/vortex-array/src/buffer.rs b/vortex-array/src/buffer.rs index 07baf843ead..df511e95c3e 100644 --- a/vortex-array/src/buffer.rs +++ b/vortex-array/src/buffer.rs @@ -12,6 +12,7 @@ use futures::future::BoxFuture; use vortex_buffer::ALIGNMENT_TO_HOST_COPY; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_utils::dyn_traits::DynEq; @@ -87,6 +88,16 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { /// Note that slice indices are in byte units. fn slice(&self, range: Range) -> Arc; + /// Select and concatenate multiple byte ranges from this buffer into a new buffer. + /// + /// Unlike [`slice`](DeviceBuffer::slice), this method allocates new memory and copies the + /// selected ranges into a contiguous buffer. + /// + /// # Errors + /// + /// Returns an error if the device cannot allocate memory or copy the data. + fn filter(&self, ranges: &[Range]) -> VortexResult>; + /// Return a buffer with the given alignment. Where possible, this will be zero-copy. /// /// # Errors @@ -98,6 +109,16 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { pub trait DeviceBufferExt: DeviceBuffer { /// Slice a range of elements `T` out of the device buffer. fn slice_typed(&self, range: Range) -> Arc; + + /// Select and concatenate multiple element ranges of type `T` from this buffer. + /// + /// # Errors + /// + /// Returns an error if the device cannot allocate memory or copy the data. + fn filter_typed( + &self, + ranges: &[Range], + ) -> VortexResult>; } impl DeviceBufferExt for B { @@ -106,6 +127,17 @@ impl DeviceBufferExt for B { let end_bytes = range.end * size_of::(); self.slice(start_bytes..end_bytes) } + + fn filter_typed( + &self, + ranges: &[Range], + ) -> VortexResult> { + let byte_ranges: Vec> = ranges + .iter() + .map(|r| (r.start * size_of::())..(r.end * size_of::())) + .collect(); + self.filter(&byte_ranges) + } } impl Hash for dyn DeviceBuffer { @@ -202,6 +234,55 @@ impl BufferHandle { } } + /// Select and concatenate multiple byte ranges from this buffer into a new buffer. + /// + /// Unlike [`slice`](BufferHandle::slice), this method allocates a new buffer and copies + /// the selected ranges. + /// + /// # Example + /// + /// ``` + /// # use vortex_array::buffer::BufferHandle; + /// # use vortex_buffer::buffer; + /// let handle = BufferHandle::new_host(buffer![1u8, 2, 3, 4, 5, 6]); + /// let filtered = handle.filter(&[0..2, 4..6]).unwrap(); + /// assert_eq!(filtered.unwrap_host(), buffer![1u8, 2, 5, 6]); + /// ``` + pub fn filter(&self, ranges: &[Range]) -> VortexResult { + match &self.0 { + Inner::Host(host) => { + let total_len: usize = ranges.iter().map(|r| r.len()).sum(); + let mut result = ByteBufferMut::with_capacity_aligned(total_len, host.alignment()); + for range in ranges { + result.extend_from_slice(&host.as_slice()[range.start..range.end]); + } + Ok(BufferHandle::new_host(result.freeze())) + } + Inner::Device(device) => Ok(BufferHandle::new_device(device.filter(ranges)?)), + } + } + + /// Select and concatenate multiple element ranges of type `T` from this buffer. + /// + /// # Example + /// + /// ``` + /// # use vortex_array::buffer::BufferHandle; + /// # use vortex_buffer::{buffer, Buffer}; + /// let values = buffer![1u32, 2u32, 3u32, 4u32, 5u32, 6u32]; + /// let handle = BufferHandle::new_host(values.into_byte_buffer()); + /// let filtered = handle.filter_typed::(&[0..2, 4..6]).unwrap(); + /// let result = Buffer::::from_byte_buffer(filtered.to_host_sync()); + /// assert_eq!(result, buffer![1, 2, 5, 6]); + /// ``` + pub fn filter_typed(&self, ranges: &[Range]) -> VortexResult { + let byte_ranges: Vec> = ranges + .iter() + .map(|r| (r.start * size_of::())..(r.end * size_of::())) + .collect(); + self.filter(&byte_ranges) + } + /// Reinterpret the pointee as a buffer of `T` and slice the provided element range. /// /// # Example diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index c8d2841f10a..49af1728622 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -287,6 +287,64 @@ impl DeviceBuffer for CudaDeviceBuffer { })) } + fn filter(&self, ranges: &[Range]) -> VortexResult> { + let total_len: usize = ranges.iter().map(|r| r.len()).sum(); + + let stream = self.allocation.stream(); + stream + .context() + .bind_to_thread() + .map_err(|e| vortex_err!("Failed to bind CUDA context: {}", e))?; + + if total_len == 0 { + let dst_slice: CudaSlice = unsafe { + stream + .alloc::(0) + .map_err(|e| vortex_err!("Failed to allocate device memory: {}", e))? + }; + return Ok(Arc::new(CudaDeviceBuffer::new(dst_slice))); + } + + // Allocate new device memory for the filtered result. + let dst_slice: CudaSlice = unsafe { + stream + .alloc::(total_len) + .map_err(|e| vortex_err!("Failed to allocate device memory for filter: {}", e))? + }; + + let (dst_base_ptr, _) = dst_slice.device_ptr(stream); + + // Copy each selected range from source to the new contiguous buffer. + let mut dst_offset: u64 = 0; + for range in ranges { + assert!( + range.end <= self.len, + "Filter range end {} exceeds buffer size {}", + range.end, + self.len + ); + let src_ptr = self.device_ptr + (self.offset + range.start) as u64; + let len = range.len(); + if len > 0 { + unsafe { + sys::cuMemcpyDtoDAsync_v2( + dst_base_ptr + dst_offset, + src_ptr, + len, + stream.cu_stream(), + ) + .result() + .map_err(|e| { + vortex_err!("Failed to copy device memory during filter: {}", e) + })?; + } + } + dst_offset += len as u64; + } + + Ok(Arc::new(CudaDeviceBuffer::new(dst_slice))) + } + /// Slices the CUDA device buffer to a subrange. /// /// This is a byte range, not elements range, due to the DeviceBuffer interface.