Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8326,6 +8326,10 @@ pub fn vortex_array::buffer::BufferHandle::as_host_opt(&self) -> core::option::O

pub fn vortex_array::buffer::BufferHandle::ensure_aligned(self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<Self>

pub fn vortex_array::buffer::BufferHandle::filter(&self, ranges: &[core::ops::range::Range<usize>]) -> vortex_error::VortexResult<Self>

pub fn vortex_array::buffer::BufferHandle::filter_typed<T: core::marker::Sized>(&self, ranges: &[core::ops::range::Range<usize>]) -> vortex_error::VortexResult<Self>

pub fn vortex_array::buffer::BufferHandle::into_host(self) -> futures_core::future::BoxFuture<'static, vortex_buffer::ByteBuffer>

pub fn vortex_array::buffer::BufferHandle::into_host_sync(self) -> vortex_buffer::ByteBuffer
Expand Down Expand Up @@ -8400,6 +8404,8 @@ pub fn vortex_array::buffer::DeviceBuffer::copy_to_host(&self, alignment: vortex

pub fn vortex_array::buffer::DeviceBuffer::copy_to_host_sync(&self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult<vortex_buffer::ByteBuffer>

pub fn vortex_array::buffer::DeviceBuffer::filter(&self, ranges: &[core::ops::range::Range<usize>]) -> vortex_error::VortexResult<alloc::sync::Arc<dyn vortex_array::buffer::DeviceBuffer>>

pub fn vortex_array::buffer::DeviceBuffer::is_empty(&self) -> bool

pub fn vortex_array::buffer::DeviceBuffer::len(&self) -> usize
Expand All @@ -8408,10 +8414,14 @@ pub fn vortex_array::buffer::DeviceBuffer::slice(&self, range: core::ops::range:

pub trait vortex_array::buffer::DeviceBufferExt: vortex_array::buffer::DeviceBuffer

pub fn vortex_array::buffer::DeviceBufferExt::filter_typed<T: core::marker::Sized>(&self, ranges: &[core::ops::range::Range<usize>]) -> vortex_error::VortexResult<alloc::sync::Arc<dyn vortex_array::buffer::DeviceBuffer>>

pub fn vortex_array::buffer::DeviceBufferExt::slice_typed<T: core::marker::Sized>(&self, range: core::ops::range::Range<usize>) -> alloc::sync::Arc<dyn vortex_array::buffer::DeviceBuffer>

impl<B: vortex_array::buffer::DeviceBuffer> vortex_array::buffer::DeviceBufferExt for B

pub fn B::filter_typed<T: core::marker::Sized>(&self, ranges: &[core::ops::range::Range<usize>]) -> vortex_error::VortexResult<alloc::sync::Arc<dyn vortex_array::buffer::DeviceBuffer>>

pub fn B::slice_typed<T: core::marker::Sized>(&self, range: core::ops::range::Range<usize>) -> alloc::sync::Arc<dyn vortex_array::buffer::DeviceBuffer>

pub mod vortex_array::builders
Expand Down
81 changes: 81 additions & 0 deletions vortex-array/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::future::BoxFuture;
use vortex_buffer::ALIGNMENT_TO_HOST_COPY;
use vortex_buffer::Alignment;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_utils::dyn_traits::DynEq;
Expand Down Expand Up @@ -87,6 +88,16 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
/// Note that slice indices are in byte units.
fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;

/// Select and concatenate multiple byte ranges from this buffer into a new buffer.
///
/// Unlike [`slice`](DeviceBuffer::slice), this method allocates new memory and copies the
/// selected ranges into a contiguous buffer.
///
/// # Errors
///
/// Returns an error if the device cannot allocate memory or copy the data.
fn filter(&self, ranges: &[Range<usize>]) -> VortexResult<Arc<dyn DeviceBuffer>>;
Comment on lines +93 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in bytes? This translation might be hard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need a few ways of slicing? Not sure...


/// Return a buffer with the given alignment. Where possible, this will be zero-copy.
///
/// # Errors
Expand All @@ -98,6 +109,16 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
pub trait DeviceBufferExt: DeviceBuffer {
/// Slice a range of elements `T` out of the device buffer.
fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;

/// Select and concatenate multiple element ranges of type `T` from this buffer.
///
/// # Errors
///
/// Returns an error if the device cannot allocate memory or copy the data.
fn filter_typed<T: Sized>(
&self,
ranges: &[Range<usize>],
) -> VortexResult<Arc<dyn DeviceBuffer>>;
}

impl<B: DeviceBuffer> DeviceBufferExt for B {
Expand All @@ -106,6 +127,17 @@ impl<B: DeviceBuffer> DeviceBufferExt for B {
let end_bytes = range.end * size_of::<T>();
self.slice(start_bytes..end_bytes)
}

fn filter_typed<T: Sized>(
&self,
ranges: &[Range<usize>],
) -> VortexResult<Arc<dyn DeviceBuffer>> {
let byte_ranges: Vec<Range<usize>> = ranges
.iter()
.map(|r| (r.start * size_of::<T>())..(r.end * size_of::<T>()))
.collect();
self.filter(&byte_ranges)
}
}

impl Hash for dyn DeviceBuffer {
Expand Down Expand Up @@ -202,6 +234,55 @@ impl BufferHandle {
}
}

/// Select and concatenate multiple byte ranges from this buffer into a new buffer.
///
/// Unlike [`slice`](BufferHandle::slice), this method allocates a new buffer and copies
/// the selected ranges.
///
/// # Example
///
/// ```
/// # use vortex_array::buffer::BufferHandle;
/// # use vortex_buffer::buffer;
/// let handle = BufferHandle::new_host(buffer![1u8, 2, 3, 4, 5, 6]);
/// let filtered = handle.filter(&[0..2, 4..6]).unwrap();
/// assert_eq!(filtered.unwrap_host(), buffer![1u8, 2, 5, 6]);
/// ```
pub fn filter(&self, ranges: &[Range<usize>]) -> VortexResult<Self> {
match &self.0 {
Inner::Host(host) => {
let total_len: usize = ranges.iter().map(|r| r.len()).sum();
let mut result = ByteBufferMut::with_capacity_aligned(total_len, host.alignment());
for range in ranges {
result.extend_from_slice(&host.as_slice()[range.start..range.end]);
}
Ok(BufferHandle::new_host(result.freeze()))
}
Inner::Device(device) => Ok(BufferHandle::new_device(device.filter(ranges)?)),
}
}

/// Select and concatenate multiple element ranges of type `T` from this buffer.
///
/// # Example
///
/// ```
/// # use vortex_array::buffer::BufferHandle;
/// # use vortex_buffer::{buffer, Buffer};
/// let values = buffer![1u32, 2u32, 3u32, 4u32, 5u32, 6u32];
/// let handle = BufferHandle::new_host(values.into_byte_buffer());
/// let filtered = handle.filter_typed::<u32>(&[0..2, 4..6]).unwrap();
/// let result = Buffer::<u32>::from_byte_buffer(filtered.to_host_sync());
/// assert_eq!(result, buffer![1, 2, 5, 6]);
/// ```
pub fn filter_typed<T: Sized>(&self, ranges: &[Range<usize>]) -> VortexResult<Self> {
let byte_ranges: Vec<Range<usize>> = ranges
.iter()
.map(|r| (r.start * size_of::<T>())..(r.end * size_of::<T>()))
.collect();
self.filter(&byte_ranges)
}

/// Reinterpret the pointee as a buffer of `T` and slice the provided element range.
///
/// # Example
Expand Down
58 changes: 58 additions & 0 deletions vortex-cuda/src/device_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,64 @@ impl DeviceBuffer for CudaDeviceBuffer {
}))
}

fn filter(&self, ranges: &[Range<usize>]) -> VortexResult<Arc<dyn DeviceBuffer>> {
let total_len: usize = ranges.iter().map(|r| r.len()).sum();

let stream = self.allocation.stream();
stream
.context()
.bind_to_thread()
.map_err(|e| vortex_err!("Failed to bind CUDA context: {}", e))?;

if total_len == 0 {
let dst_slice: CudaSlice<u8> = unsafe {
stream
.alloc::<u8>(0)
.map_err(|e| vortex_err!("Failed to allocate device memory: {}", e))?
};
return Ok(Arc::new(CudaDeviceBuffer::new(dst_slice)));
}

// Allocate new device memory for the filtered result.
let dst_slice: CudaSlice<u8> = unsafe {
stream
.alloc::<u8>(total_len)
.map_err(|e| vortex_err!("Failed to allocate device memory for filter: {}", e))?
};

let (dst_base_ptr, _) = dst_slice.device_ptr(stream);

// Copy each selected range from source to the new contiguous buffer.
let mut dst_offset: u64 = 0;
for range in ranges {
assert!(
range.end <= self.len,
"Filter range end {} exceeds buffer size {}",
range.end,
self.len
);
let src_ptr = self.device_ptr + (self.offset + range.start) as u64;
let len = range.len();
if len > 0 {
unsafe {
sys::cuMemcpyDtoDAsync_v2(
dst_base_ptr + dst_offset,
src_ptr,
len,
stream.cu_stream(),
)
.result()
.map_err(|e| {
vortex_err!("Failed to copy device memory during filter: {}", e)
})?;
}
}
dst_offset += len as u64;
}

Ok(Arc::new(CudaDeviceBuffer::new(dst_slice)))
}

/// Slices the CUDA device buffer to a subrange.
///
/// This is a byte range, not elements range, due to the DeviceBuffer interface.
Expand Down
Loading