Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions vortex-cuda/kernels/src/dynamic_dispatch.cu
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ __device__ inline uint64_t upper_bound(const T *data, uint64_t len, uint64_t val
/// compressed or raw data from global memory and writes decoded elements into
/// the stage's shared memory region.
///
/// @param input Global memory pointer to the stage's encoded input data
/// @param input Global memory pointer to the stage's encoded input data
/// @param smem_output Shared memory pointer where decoded elements are written
/// @param chunk_start Starting index of the chunk to process (block-relative for output stage)
/// @param chunk_len Number of elements to produce (may be < ELEMENTS_PER_BLOCK for tail blocks)
Expand All @@ -44,7 +44,7 @@ __device__ inline uint64_t upper_bound(const T *data, uint64_t len, uint64_t val
/// to resolve offsets to ends/values decoded by earlier stages
template <typename T>
__device__ inline void dynamic_source_op(const T *__restrict input,
T *__restrict smem_output,
T *__restrict &smem_output,
uint64_t chunk_start,
uint32_t chunk_len,
const struct SourceOp &source_op,
Expand All @@ -57,15 +57,18 @@ __device__ inline void dynamic_source_op(const T *__restrict input,
constexpr uint32_t LANES_PER_FL_BLOCK = FL_CHUNK_SIZE / T_BITS;
const uint32_t bit_width = source_op.params.bitunpack.bit_width;
const uint32_t packed_words_per_fl_block = LANES_PER_FL_BLOCK * bit_width;
const uint64_t first_fl_block = chunk_start / FL_CHUNK_SIZE;

const uint32_t element_offset = source_op.params.bitunpack.element_offset;
const uint32_t smem_within_offset = (chunk_start + element_offset) % FL_CHUNK_SIZE;
const uint64_t first_fl_block = (chunk_start + element_offset) / FL_CHUNK_SIZE;

// FL blocks must divide evenly. Otherwise, the last unpack would overflow smem.
static_assert((ELEMENTS_PER_BLOCK % FL_CHUNK_SIZE) == 0);

const auto div_ceil = [](auto a, auto b) {
return (a + b - 1) / b;
};
const uint32_t num_fl_chunks = div_ceil(chunk_len, FL_CHUNK_SIZE);
const uint32_t num_fl_chunks = div_ceil(chunk_len + smem_within_offset, FL_CHUNK_SIZE);

for (uint32_t chunk_idx = 0; chunk_idx < num_fl_chunks; ++chunk_idx) {
const T *packed_chunk = input + (first_fl_block + chunk_idx) * packed_words_per_fl_block;
Expand All @@ -75,15 +78,16 @@ __device__ inline void dynamic_source_op(const T *__restrict input,
bit_unpack_lane<T>(packed_chunk, smem_lane, 0, lane, bit_width);
}
}
break;
smem_output += smem_within_offset;
return;
}

case SourceOp::LOAD: {
// Copy elements verbatim from global memory into shared memory.
for (uint32_t i = threadIdx.x; i < chunk_len; i += blockDim.x) {
smem_output[i] = input[chunk_start + i];
}
break;
return;
}

case SourceOp::RUNEND: {
Expand All @@ -107,7 +111,7 @@ __device__ inline void dynamic_source_op(const T *__restrict input,

smem_output[i] = values[min(current_run, num_runs - 1)];
}
break;
return;
}

default:
Expand Down Expand Up @@ -273,6 +277,18 @@ __device__ void execute_stage(const struct Stage &stage,
__syncthreads();
}

/// Computes the number of elements to process in an output tile.
///
/// Each tile decodes exactly one FL block == SMEM_TILE_SIZE elements into
/// shared memory. In case BITUNPACK is sliced, we need to account for the
/// sub-byte element offset.
__device__ inline uint32_t output_tile_len(const struct Stage &stage, uint32_t block_len, uint32_t tile_off) {
const uint32_t element_offset = (tile_off == 0 && stage.source.op_code == SourceOp::BITUNPACK)
? stage.source.params.bitunpack.element_offset
: 0;
return min(SMEM_TILE_SIZE - element_offset, block_len - tile_off);
}

/// Entry point of the dynamic dispatch kernel.
///
/// Executes the plan's stages in order:
Expand All @@ -285,9 +301,9 @@ __device__ void execute_stage(const struct Stage &stage,
/// @param array_len Total number of elements to produce
/// @param plan Device pointer to the dispatch plan
template <typename T>
__device__ void dynamic_dispatch_impl(T *__restrict output,
uint64_t array_len,
const struct DynamicDispatchPlan *__restrict plan) {
__device__ void dynamic_dispatch(T *__restrict output,
uint64_t array_len,
const struct DynamicDispatchPlan *__restrict plan) {

// Dynamically-sized shared memory: The host computes the exact byte count
// needed to hold all stage outputs that must coexist simultaneously, and
Expand All @@ -310,21 +326,20 @@ __device__ void dynamic_dispatch_impl(T *__restrict output,
execute_stage<T, StorePolicy::WRITEBACK>(stage, smem_base, 0, stage.len, smem_output, 0);
}

// Output stage: process in SMEM_TILE_SIZE tiles to reduce smem footprint.
// Each tile decodes into the same smem region and writes to global memory.
const struct Stage &output_stage = smem_plan.stages[last];
const uint64_t block_start = static_cast<uint64_t>(blockIdx.x) * ELEMENTS_PER_BLOCK;
const uint64_t block_end = min(block_start + ELEMENTS_PER_BLOCK, array_len);
const uint32_t block_len = static_cast<uint32_t>(block_end - block_start);

for (uint32_t tile_off = 0; tile_off < block_len; tile_off += SMEM_TILE_SIZE) {
const uint32_t tile_len = min(SMEM_TILE_SIZE, block_len - tile_off);
for (uint32_t tile_off = 0; tile_off < block_len;) {
const uint32_t tile_len = output_tile_len(output_stage, block_len, tile_off);
execute_stage<T, StorePolicy::STREAMING>(output_stage,
smem_base,
block_start + tile_off,
tile_len,
output,
block_start + tile_off);
tile_off += tile_len;
}
}

Expand All @@ -334,7 +349,7 @@ __device__ void dynamic_dispatch_impl(T *__restrict output,
Type *__restrict output, \
uint64_t array_len, \
const struct DynamicDispatchPlan *__restrict plan) { \
dynamic_dispatch_impl<Type>(output, array_len, plan); \
dynamic_dispatch<Type>(output, array_len, plan); \
}

FOR_EACH_UNSIGNED_INT(GENERATE_DYNAMIC_DISPATCH_KERNEL)
4 changes: 3 additions & 1 deletion vortex-cuda/kernels/src/dynamic_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ union SourceParams {
/// Unpack bit-packed data using FastLanes layout.
struct BitunpackParams {
uint8_t bit_width;
uint32_t element_offset; // Sub-byte offset
} bitunpack;

/// Copy elements verbatim from global memory to shared memory.
/// The input pointer is pre-adjusted on the host to account for slicing.
struct LoadParams {
uint8_t _padding;
uint8_t _placeholder;
} load;

/// Decode run-end encoding using ends and values already in shared memory.
Expand Down
16 changes: 12 additions & 4 deletions vortex-cuda/src/device_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ mod private {
}
}

// Get it back out as a View of u8

impl CudaDeviceBuffer {
/// Creates a new CUDA device buffer from a [`CudaSlice<T>`].
///
Expand All @@ -101,6 +99,16 @@ impl CudaDeviceBuffer {
}
}

/// Returns the byte offset within the allocated buffer.
pub fn offset(&self) -> usize {
self.offset
}

/// Returns the adjusted device pointer accounting for the offset.
pub fn offset_ptr(&self) -> sys::CUdeviceptr {
self.device_ptr + self.offset as u64
}

/// Returns a [`CudaView`] to the CUDA device buffer.
pub fn as_view<T: DeviceRepr + 'static>(&self) -> CudaView<'_, T> {
// Return a new &[T]
Expand Down Expand Up @@ -159,7 +167,7 @@ impl CudaBufferExt for BufferHandle {
.as_any()
.downcast_ref::<CudaDeviceBuffer>()
.ok_or_else(|| vortex_err!("expected CudaDeviceBuffer"))?
.device_ptr;
.offset_ptr();

Ok(ptr)
}
Expand Down Expand Up @@ -281,7 +289,7 @@ impl DeviceBuffer for CudaDeviceBuffer {

/// Slices the CUDA device buffer to a subrange.
///
/// **IMPORTANT**: this is a byte range, not elements range, due to the DeviceBuffer interface.
/// This is a byte range, not elements range, due to the DeviceBuffer interface.
fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
assert!(
range.end <= self.len,
Expand Down
Loading
Loading