diff --git a/vortex-cuda/kernels/src/dynamic_dispatch.cu b/vortex-cuda/kernels/src/dynamic_dispatch.cu index 9168a8580ab..ef9c824e187 100644 --- a/vortex-cuda/kernels/src/dynamic_dispatch.cu +++ b/vortex-cuda/kernels/src/dynamic_dispatch.cu @@ -35,7 +35,7 @@ __device__ inline uint64_t upper_bound(const T *data, uint64_t len, uint64_t val /// compressed or raw data from global memory and writes decoded elements into /// the stage's shared memory region. /// -/// @param input Global memory pointer to the stage's encoded input data +/// @param input Global memory pointer to the stage's encoded input data /// @param smem_output Shared memory pointer where decoded elements are written /// @param chunk_start Starting index of the chunk to process (block-relative for output stage) /// @param chunk_len Number of elements to produce (may be < ELEMENTS_PER_BLOCK for tail blocks) @@ -44,7 +44,7 @@ __device__ inline uint64_t upper_bound(const T *data, uint64_t len, uint64_t val /// to resolve offsets to ends/values decoded by earlier stages template __device__ inline void dynamic_source_op(const T *__restrict input, - T *__restrict smem_output, + T *__restrict &smem_output, uint64_t chunk_start, uint32_t chunk_len, const struct SourceOp &source_op, @@ -57,7 +57,10 @@ __device__ inline void dynamic_source_op(const T *__restrict input, constexpr uint32_t LANES_PER_FL_BLOCK = FL_CHUNK_SIZE / T_BITS; const uint32_t bit_width = source_op.params.bitunpack.bit_width; const uint32_t packed_words_per_fl_block = LANES_PER_FL_BLOCK * bit_width; - const uint64_t first_fl_block = chunk_start / FL_CHUNK_SIZE; + + const uint32_t element_offset = source_op.params.bitunpack.element_offset; + const uint32_t smem_within_offset = (chunk_start + element_offset) % FL_CHUNK_SIZE; + const uint64_t first_fl_block = (chunk_start + element_offset) / FL_CHUNK_SIZE; // FL blocks must divide evenly. Otherwise, the last unpack would overflow smem. static_assert((ELEMENTS_PER_BLOCK % FL_CHUNK_SIZE) == 0); @@ -65,7 +68,7 @@ __device__ inline void dynamic_source_op(const T *__restrict input, const auto div_ceil = [](auto a, auto b) { return (a + b - 1) / b; }; - const uint32_t num_fl_chunks = div_ceil(chunk_len, FL_CHUNK_SIZE); + const uint32_t num_fl_chunks = div_ceil(chunk_len + smem_within_offset, FL_CHUNK_SIZE); for (uint32_t chunk_idx = 0; chunk_idx < num_fl_chunks; ++chunk_idx) { const T *packed_chunk = input + (first_fl_block + chunk_idx) * packed_words_per_fl_block; @@ -75,7 +78,8 @@ __device__ inline void dynamic_source_op(const T *__restrict input, bit_unpack_lane(packed_chunk, smem_lane, 0, lane, bit_width); } } - break; + smem_output += smem_within_offset; + return; } case SourceOp::LOAD: { @@ -83,7 +87,7 @@ __device__ inline void dynamic_source_op(const T *__restrict input, for (uint32_t i = threadIdx.x; i < chunk_len; i += blockDim.x) { smem_output[i] = input[chunk_start + i]; } - break; + return; } case SourceOp::RUNEND: { @@ -107,7 +111,7 @@ __device__ inline void dynamic_source_op(const T *__restrict input, smem_output[i] = values[min(current_run, num_runs - 1)]; } - break; + return; } default: @@ -273,6 +277,18 @@ __device__ void execute_stage(const struct Stage &stage, __syncthreads(); } +/// Computes the number of elements to process in an output tile. +/// +/// Each tile decodes exactly one FL block == SMEM_TILE_SIZE elements into +/// shared memory. In case BITUNPACK is sliced, we need to account for the +/// sub-byte element offset. +__device__ inline uint32_t output_tile_len(const struct Stage &stage, uint32_t block_len, uint32_t tile_off) { + const uint32_t element_offset = (tile_off == 0 && stage.source.op_code == SourceOp::BITUNPACK) + ? stage.source.params.bitunpack.element_offset + : 0; + return min(SMEM_TILE_SIZE - element_offset, block_len - tile_off); +} + /// Entry point of the dynamic dispatch kernel. /// /// Executes the plan's stages in order: @@ -285,9 +301,9 @@ __device__ void execute_stage(const struct Stage &stage, /// @param array_len Total number of elements to produce /// @param plan Device pointer to the dispatch plan template -__device__ void dynamic_dispatch_impl(T *__restrict output, - uint64_t array_len, - const struct DynamicDispatchPlan *__restrict plan) { +__device__ void dynamic_dispatch(T *__restrict output, + uint64_t array_len, + const struct DynamicDispatchPlan *__restrict plan) { // Dynamically-sized shared memory: The host computes the exact byte count // needed to hold all stage outputs that must coexist simultaneously, and @@ -310,21 +326,20 @@ __device__ void dynamic_dispatch_impl(T *__restrict output, execute_stage(stage, smem_base, 0, stage.len, smem_output, 0); } - // Output stage: process in SMEM_TILE_SIZE tiles to reduce smem footprint. - // Each tile decodes into the same smem region and writes to global memory. const struct Stage &output_stage = smem_plan.stages[last]; const uint64_t block_start = static_cast(blockIdx.x) * ELEMENTS_PER_BLOCK; const uint64_t block_end = min(block_start + ELEMENTS_PER_BLOCK, array_len); const uint32_t block_len = static_cast(block_end - block_start); - for (uint32_t tile_off = 0; tile_off < block_len; tile_off += SMEM_TILE_SIZE) { - const uint32_t tile_len = min(SMEM_TILE_SIZE, block_len - tile_off); + for (uint32_t tile_off = 0; tile_off < block_len;) { + const uint32_t tile_len = output_tile_len(output_stage, block_len, tile_off); execute_stage(output_stage, smem_base, block_start + tile_off, tile_len, output, block_start + tile_off); + tile_off += tile_len; } } @@ -334,7 +349,7 @@ __device__ void dynamic_dispatch_impl(T *__restrict output, Type *__restrict output, \ uint64_t array_len, \ const struct DynamicDispatchPlan *__restrict plan) { \ - dynamic_dispatch_impl(output, array_len, plan); \ + dynamic_dispatch(output, array_len, plan); \ } FOR_EACH_UNSIGNED_INT(GENERATE_DYNAMIC_DISPATCH_KERNEL) diff --git a/vortex-cuda/kernels/src/dynamic_dispatch.h b/vortex-cuda/kernels/src/dynamic_dispatch.h index f8fbeaf6c13..9f7dc122f1b 100644 --- a/vortex-cuda/kernels/src/dynamic_dispatch.h +++ b/vortex-cuda/kernels/src/dynamic_dispatch.h @@ -44,11 +44,13 @@ union SourceParams { /// Unpack bit-packed data using FastLanes layout. struct BitunpackParams { uint8_t bit_width; + uint32_t element_offset; // Sub-byte offset } bitunpack; /// Copy elements verbatim from global memory to shared memory. + /// The input pointer is pre-adjusted on the host to account for slicing. struct LoadParams { - uint8_t _padding; + uint8_t _placeholder; } load; /// Decode run-end encoding using ends and values already in shared memory. diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 17bcd44f5d4..c8d2841f10a 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -81,8 +81,6 @@ mod private { } } -// Get it back out as a View of u8 - impl CudaDeviceBuffer { /// Creates a new CUDA device buffer from a [`CudaSlice`]. /// @@ -101,6 +99,16 @@ impl CudaDeviceBuffer { } } + /// Returns the byte offset within the allocated buffer. + pub fn offset(&self) -> usize { + self.offset + } + + /// Returns the adjusted device pointer accounting for the offset. + pub fn offset_ptr(&self) -> sys::CUdeviceptr { + self.device_ptr + self.offset as u64 + } + /// Returns a [`CudaView`] to the CUDA device buffer. pub fn as_view(&self) -> CudaView<'_, T> { // Return a new &[T] @@ -159,7 +167,7 @@ impl CudaBufferExt for BufferHandle { .as_any() .downcast_ref::() .ok_or_else(|| vortex_err!("expected CudaDeviceBuffer"))? - .device_ptr; + .offset_ptr(); Ok(ptr) } @@ -281,7 +289,7 @@ impl DeviceBuffer for CudaDeviceBuffer { /// Slices the CUDA device buffer to a subrange. /// - /// **IMPORTANT**: this is a byte range, not elements range, due to the DeviceBuffer interface. + /// This is a byte range, not elements range, due to the DeviceBuffer interface. fn slice(&self, range: Range) -> Arc { assert!( range.end <= self.len, diff --git a/vortex-cuda/src/dynamic_dispatch/mod.rs b/vortex-cuda/src/dynamic_dispatch/mod.rs index a2a431f1490..dd4e78c47af 100644 --- a/vortex-cuda/src/dynamic_dispatch/mod.rs +++ b/vortex-cuda/src/dynamic_dispatch/mod.rs @@ -30,11 +30,19 @@ unsafe impl cudarc::driver::DeviceRepr for Stage {} impl SourceOp { /// Unpack bit-packed data using FastLanes layout. - pub fn bitunpack(bit_width: u8) -> Self { + /// + /// `element_offset` (0..1023) is the sub-block position within the first + /// FastLanes block. The device pointer already accounts for buffer slicing, + /// but sub-block alignment cannot be expressed as pointer arithmetic on + /// bit-packed data, so it is passed as a kernel parameter. + pub fn bitunpack(bit_width: u8, element_offset: u16) -> Self { Self { op_code: SourceOp_SourceOpCode_BITUNPACK, params: SourceParams { - bitunpack: SourceParams_BitunpackParams { bit_width }, + bitunpack: SourceParams_BitunpackParams { + bit_width, + element_offset: u32::from(element_offset), + }, }, } } @@ -134,9 +142,8 @@ impl Stage { } } - /// Create the output stage. Uses [`SMEM_TILE_SIZE`] as the shared memory - /// region size — the kernel tiles `ELEMENTS_PER_BLOCK` elements through - /// this smaller region to reduce shared memory usage. + /// Create the output stage. The kernel tiles `ELEMENTS_PER_BLOCK` elements + /// through a [`SMEM_TILE_SIZE`] shared-memory region to reduce usage. pub fn output( input_ptr: u64, smem_offset: u32, @@ -192,6 +199,7 @@ mod tests { use cudarc::driver::DevicePtr; use cudarc::driver::LaunchConfig; use cudarc::driver::PushKernelArg; + use rstest::rstest; use vortex::array::IntoArray; use vortex::array::ToCanonical; use vortex::array::arrays::DictArray; @@ -259,7 +267,7 @@ mod tests { let plan = DynamicDispatchPlan::new([Stage::output( input_ptr, 0, - SourceOp::bitunpack(bit_width), + SourceOp::bitunpack(bit_width, 0), &scalar_ops, )]); assert_eq!(plan.stages[0].num_scalar_ops, 4); @@ -279,13 +287,13 @@ mod tests { 0xAAAA, 0, 256, - SourceOp::bitunpack(4), + SourceOp::bitunpack(4, 0), &[ScalarOp::frame_of_ref(10)], ), Stage::output( 0xBBBB, 256, - SourceOp::bitunpack(6), + SourceOp::bitunpack(6, 0), &[ScalarOp::frame_of_ref(42), ScalarOp::dict(0)], ), ]); @@ -684,4 +692,316 @@ mod tests { Ok(()) } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(500, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_primitive( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let len = 5000; + let data: Vec = (0..len).map(|i| (i * 7) % 1000).collect(); + + let prim = PrimitiveArray::new(Buffer::from(data.clone()), NonNullable); + + let sliced = prim.into_array().slice(slice_start..slice_end)?; + + let expected: Vec = data[slice_start..slice_end].to_vec(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(500, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_zigzag_bitpacked( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let bit_width = 10u8; + let max_val = (1u32 << bit_width) - 1; + let len = 5000; + + let raw: Vec = (0..len).map(|i| (i as u32) % max_val).collect(); + let all_decoded: Vec = raw + .iter() + .map(|&v| (v >> 1) ^ (0u32.wrapping_sub(v & 1))) + .collect(); + + let prim = PrimitiveArray::new(Buffer::from(raw), NonNullable); + let bp = BitPackedArray::encode(&prim.into_array(), bit_width)?; + let zz = ZigZagArray::try_new(bp.into_array())?; + + let sliced = zz.into_array().slice(slice_start..slice_end)?; + let expected: Vec = all_decoded[slice_start..slice_end].to_vec(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(500, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_dict_with_primitive_codes( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let dict_values: Vec = vec![100, 200, 300, 400, 500]; + let dict_size = dict_values.len(); + let len = 5000; + let codes: Vec = (0..len).map(|i| (i % dict_size) as u32).collect(); + + let codes_prim = PrimitiveArray::new(Buffer::from(codes.clone()), NonNullable); + let values_prim = PrimitiveArray::new(Buffer::from(dict_values.clone()), NonNullable); + let dict = DictArray::try_new(codes_prim.into_array(), values_prim.into_array())?; + + let sliced = dict.into_array().slice(slice_start..slice_end)?; + + let expected: Vec = codes[slice_start..slice_end] + .iter() + .map(|&c| dict_values[c as usize]) + .collect(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(500, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_bitpacked( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let bit_width = 10u8; + let max_val = (1u32 << bit_width) - 1; + let len = 5000; + + let data: Vec = (0..len).map(|i| (i as u32) % max_val).collect(); + let prim = PrimitiveArray::new(Buffer::from(data.clone()), NonNullable); + let bp = BitPackedArray::encode(&prim.into_array(), bit_width)?; + + let sliced = bp.into_array().slice(slice_start..slice_end)?; + let expected: Vec = data[slice_start..slice_end].to_vec(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(500, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_for_bitpacked( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let reference = 100u32; + let bit_width = 10u8; + let max_val = (1u32 << bit_width) - 1; + let len = 5000; + + let encoded_data: Vec = (0..len).map(|i| (i as u32) % max_val).collect(); + let prim = PrimitiveArray::new(Buffer::from(encoded_data.clone()), NonNullable); + let bp = BitPackedArray::encode(&prim.into_array(), bit_width)?; + let for_arr = FoRArray::try_new(bp.into_array(), Scalar::from(reference))?; + + let all_decoded: Vec = encoded_data.iter().map(|&v| v + reference).collect(); + + let sliced = for_arr.into_array().slice(slice_start..slice_end)?; + let expected: Vec = all_decoded[slice_start..slice_end].to_vec(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(400, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_runend( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let ends: Vec = vec![500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000]; + let values: Vec = vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100]; + let len = 5000; + + let all_decoded: Vec = (0..len) + .map(|i| { + let run = ends.iter().position(|&e| (i as u32) < e).unwrap(); + values[run] + }) + .collect(); + + let ends_arr = PrimitiveArray::new(Buffer::from(ends), NonNullable).into_array(); + let values_arr = PrimitiveArray::new(Buffer::from(values), NonNullable).into_array(); + let re = RunEndArray::new(ends_arr, values_arr); + + let sliced = re.into_array().slice(slice_start..slice_end)?; + let expected: Vec = all_decoded[slice_start..slice_end].to_vec(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } + + #[rstest] + #[case(0, 1024)] + #[case(0, 3000)] + #[case(0, 4096)] + #[case(500, 600)] + #[case(500, 1024)] + #[case(500, 2048)] + #[case(500, 4500)] + #[case(777, 3333)] + #[case(1024, 2048)] + #[case(1024, 4096)] + #[case(1500, 3500)] + #[case(2048, 4096)] + #[case(2500, 4500)] + #[case(3333, 4444)] + #[crate::test] + fn test_sliced_dict_for_bp_values_bp_codes( + #[case] slice_start: usize, + #[case] slice_end: usize, + ) -> VortexResult<()> { + let dict_reference = 1_000_000u32; + let dict_residuals: Vec = (0..64).collect(); + let dict_expected: Vec = dict_residuals.iter().map(|&r| r + dict_reference).collect(); + let dict_size = dict_residuals.len(); + + let len = 5000; + let codes: Vec = (0..len).map(|i| (i % dict_size) as u32).collect(); + let all_decoded: Vec = codes.iter().map(|&c| dict_expected[c as usize]).collect(); + + // BitPack+FoR the dict values + let dict_prim = PrimitiveArray::new(Buffer::from(dict_residuals), NonNullable); + let dict_bp = BitPackedArray::encode(&dict_prim.into_array(), 6)?; + let dict_for = FoRArray::try_new(dict_bp.into_array(), Scalar::from(dict_reference))?; + + // BitPack the codes + let codes_prim = PrimitiveArray::new(Buffer::from(codes), NonNullable); + let codes_bp = BitPackedArray::encode(&codes_prim.into_array(), 6)?; + + let dict = DictArray::try_new(codes_bp.into_array(), dict_for.into_array())?; + + let sliced = dict.into_array().slice(slice_start..slice_end)?; + let expected: Vec = all_decoded[slice_start..slice_end].to_vec(); + + let cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?; + let (plan, _bufs) = build_plan(&sliced, &cuda_ctx)?; + + let actual = run_dynamic_dispatch_plan(&cuda_ctx, expected.len(), &plan)?; + assert_eq!(actual, expected); + + Ok(()) + } } diff --git a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs index 734e46b35fd..679fdcc6a0f 100644 --- a/vortex-cuda/src/dynamic_dispatch/plan_builder.rs +++ b/vortex-cuda/src/dynamic_dispatch/plan_builder.rs @@ -10,15 +10,19 @@ use futures::executor::block_on; use vortex::array::ArrayRef; use vortex::array::DynArray; +use vortex::array::ExecutionCtx; use vortex::array::arrays::DictVTable; use vortex::array::arrays::PrimitiveVTable; +use vortex::array::arrays::SliceVTable; use vortex::array::arrays::primitive::PrimitiveArrayParts; use vortex::array::buffer::BufferHandle; +use vortex::array::session::ArraySession; use vortex::dtype::PType; use vortex::encodings::alp::ALPFloat; use vortex::encodings::alp::ALPVTable; use vortex::encodings::fastlanes::BitPackedArrayParts; use vortex::encodings::fastlanes::BitPackedVTable; +use vortex::encodings::fastlanes::FoRArray; use vortex::encodings::fastlanes::FoRVTable; use vortex::encodings::runend::RunEndArrayParts; use vortex::encodings::runend::RunEndVTable; @@ -26,6 +30,7 @@ use vortex::encodings::zigzag::ZigZagVTable; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; +use vortex::session::VortexSession; use super::DynamicDispatchPlan; use super::MAX_SCALAR_OPS; @@ -77,6 +82,7 @@ struct Pipeline { /// - `ALPArray` → recurse + `ALP` scalar op (f32 only, no patches) /// - `DictArray` → input stage for values + recurse codes + `DICT` scalar op /// - `RunEndArray` → input stages for ends/values + `RUNEND` source +/// - `SliceArray` → resolve via child's slice reduce/kernel /// /// # Limitations /// @@ -84,8 +90,6 @@ struct Pipeline { /// receive a value regardless of whether the input was null. Only arrays with /// `NonNullable` or `AllValid` validity produce correct results. /// -/// **Slicing**: Not supported. -/// /// **Patches**: `BitPackedArray` with patches and `ALPArray` with patches are /// not supported and will return an error. /// @@ -152,6 +156,8 @@ impl PlanBuilderState<'_> { self.walk_runend(array) } else if id == PrimitiveVTable::ID { self.walk_primitive(array) + } else if id == SliceVTable::ID { + self.walk_slice(array) } else { vortex_bail!( "Encoding {:?} not supported by dynamic dispatch plan builder", @@ -160,7 +166,34 @@ impl PlanBuilderState<'_> { } } + /// SliceArray → resolve the slice via reduce/execute rules. + /// + /// When the plan builder encounters a `SliceArray`, it resolves the slice + /// by invoking the child's `reduce_parent`, `execute_parent`. + fn walk_slice(&mut self, array: ArrayRef) -> VortexResult { + let slice_arr = array.as_::(); + let child = slice_arr.child().clone(); + + // reduce_parent: (for types with SliceReduceAdaptor, like FoR/ZigZag) + if let Some(reduced) = child.vtable().reduce_parent(&child, &array, 0)? { + return self.walk(reduced); + } + + // execute_parent: (for types with SliceExecuteAdaptor/SliceKernel, like BitPacked) + let mut ctx = ExecutionCtx::new(VortexSession::empty().with::()); + if let Some(executed) = child.vtable().execute_parent(&child, &array, 0, &mut ctx)? { + return self.walk(executed); + } + + vortex_bail!( + "Cannot resolve SliceArray wrapping {:?} in dynamic dispatch plan builder", + child.encoding_id() + ) + } + /// Canonical primitive array → LOAD source op. + /// + /// The device pointer accounts for buffer slicing, so no offset parameter is needed. fn walk_primitive(&mut self, array: ArrayRef) -> VortexResult { let prim = array.to_canonical()?.into_primitive(); let PrimitiveArrayParts { buffer, .. } = prim.into_parts(); @@ -170,11 +203,14 @@ impl PlanBuilderState<'_> { Ok(Pipeline { source: SourceOp::load(), scalar_ops: vec![], - input_ptr: ptr, + input_ptr: ptr as u64, }) } /// BitPackedArray → BITUNPACK source op. + /// + /// The sub-byte element offset (0..=1023) is passed as a kernel parameter + /// as it cannot be expressed as pointer arithmetic on the device pointer. fn walk_bitpacked(&mut self, array: ArrayRef) -> VortexResult { let bp = array .try_into::() @@ -187,11 +223,6 @@ impl PlanBuilderState<'_> { .. } = bp.into_parts(); - if offset != 0 { - vortex_bail!( - "Dynamic dispatch does not support sliced BitPackedArray (offset={offset})" - ); - } if patches.is_some() { vortex_bail!("Dynamic dispatch does not support BitPackedArray with patches"); } @@ -200,9 +231,9 @@ impl PlanBuilderState<'_> { let ptr = device_buf.cuda_device_ptr()?; self.device_buffers.push(device_buf); Ok(Pipeline { - source: SourceOp::bitunpack(bit_width), + source: SourceOp::bitunpack(bit_width, offset), scalar_ops: vec![], - input_ptr: ptr, + input_ptr: ptr as u64, }) } @@ -313,7 +344,7 @@ impl PlanBuilderState<'_> { } /// Extract a FoR reference scalar as u64 bits. -fn extract_for_reference(for_arr: &vortex::encodings::fastlanes::FoRArray) -> VortexResult { +fn extract_for_reference(for_arr: &FoRArray) -> VortexResult { if let Ok(v) = u32::try_from(for_arr.reference_scalar()) { Ok(v as u64) } else if let Ok(v) = i32::try_from(for_arr.reference_scalar()) {