-
Notifications
You must be signed in to change notification settings - Fork 142
PatchedArray: basics and wiring #7002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
a10y
wants to merge
7
commits into
develop
Choose a base branch
from
aduffy/patched-array
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
047ede0
PatchedArray: basics and wiring
a10y 33cdc46
take
a10y c05e4d0
add unit tests
a10y 5286eea
final
a10y 1ad47c7
actually make the kernel get used
a10y 6c1d7aa
fix tests
a10y 35bfb5f
use child for values instead of buffer
a10y File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,270 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| use std::ops::Range; | ||
|
|
||
| use vortex_buffer::Buffer; | ||
| use vortex_buffer::BufferMut; | ||
| use vortex_error::VortexResult; | ||
| use vortex_error::vortex_ensure; | ||
|
|
||
| use crate::ArrayRef; | ||
| use crate::Canonical; | ||
| use crate::DynArray; | ||
| use crate::ExecutionCtx; | ||
| use crate::IntoArray; | ||
| use crate::arrays::PrimitiveArray; | ||
| use crate::arrays::patched::PatchAccessor; | ||
| use crate::arrays::patched::TransposedPatches; | ||
| use crate::arrays::patched::patch_lanes; | ||
| use crate::buffer::BufferHandle; | ||
| use crate::dtype::IntegerPType; | ||
| use crate::dtype::NativePType; | ||
| use crate::match_each_native_ptype; | ||
| use crate::match_each_unsigned_integer_ptype; | ||
| use crate::patches::Patches; | ||
| use crate::stats::ArrayStats; | ||
| use crate::validity::Validity; | ||
|
|
||
| /// An array that partially "patches" another array with new values. | ||
| /// | ||
| /// Patched arrays implement the set of nodes that do this instead here...I think? | ||
| #[derive(Debug, Clone)] | ||
| pub struct PatchedArray { | ||
| /// The inner array that is being patched. This is the zeroth child. | ||
| pub(super) inner: ArrayRef, | ||
|
|
||
| /// Number of 1024-element chunks. Pre-computed for convenience. | ||
| pub(super) n_chunks: usize, | ||
|
|
||
| /// Number of lanes the patch indices and values have been split into. Each of the `n_chunks` | ||
| /// of 1024 values is split into `n_lanes` lanes horizontally, each lane having 1024 / n_lanes | ||
| /// values that might be patched. | ||
| pub(super) n_lanes: usize, | ||
|
|
||
| /// Offset into the first chunk | ||
| pub(super) offset: usize, | ||
| /// Total length. | ||
| pub(super) len: usize, | ||
|
|
||
| /// lane offsets. The PType of these MUST be u32 | ||
| pub(super) lane_offsets: BufferHandle, | ||
| /// indices within a 1024-element chunk. The PType of these MUST be u16 | ||
| pub(super) indices: BufferHandle, | ||
| /// patch values corresponding to the indices. The ptype is specified by `values_ptype`. | ||
| pub(super) values: ArrayRef, | ||
|
|
||
| pub(super) stats_set: ArrayStats, | ||
| } | ||
|
|
||
| impl PatchedArray { | ||
| /// Create a new `PatchedArray` from a child array and a set of [`Patches`]. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// The `inner` array must be primitive type, and it must have the same DType as the patches. | ||
| pub fn from_array_and_patches( | ||
| inner: ArrayRef, | ||
| patches: &Patches, | ||
| ctx: &mut ExecutionCtx, | ||
| ) -> VortexResult<Self> { | ||
| vortex_ensure!( | ||
| inner.dtype().eq_with_nullability_superset(patches.dtype()), | ||
| "array DType must match patches DType" | ||
| ); | ||
|
|
||
| vortex_ensure!( | ||
| inner.dtype().is_primitive(), | ||
| "Creating PatchedArray from Patches only supported for primitive arrays" | ||
| ); | ||
|
|
||
| let values_ptype = patches.dtype().as_ptype(); | ||
|
|
||
| let TransposedPatches { | ||
| n_chunks, | ||
| n_lanes, | ||
| lane_offsets, | ||
| indices, | ||
| values, | ||
| } = transpose_patches(patches, ctx)?; | ||
|
|
||
| let len = inner.len(); | ||
|
|
||
| let values = PrimitiveArray::from_buffer_handle( | ||
| BufferHandle::new_host(values), | ||
| values_ptype, | ||
| Validity::NonNullable, | ||
| ) | ||
| .into_array(); | ||
|
|
||
| Ok(Self { | ||
| inner, | ||
| n_chunks, | ||
| n_lanes, | ||
| offset: 0, | ||
| len, | ||
| lane_offsets: BufferHandle::new_host(lane_offsets), | ||
| indices: BufferHandle::new_host(indices), | ||
| values, | ||
| stats_set: ArrayStats::default(), | ||
| }) | ||
| } | ||
|
|
||
| /// Get an accessor, which allows ranged access to patches by chunk/lane. | ||
| pub fn accessor(&self) -> PatchAccessor<'_> { | ||
| PatchAccessor { | ||
| n_lanes: self.n_lanes, | ||
| lane_offsets: self.lane_offsets.as_host().reinterpret::<u32>(), | ||
| indices: self.indices.as_host().reinterpret::<u16>(), | ||
| } | ||
| } | ||
|
|
||
| /// Slice the array to just the patches and inner values that are within the chunk range. | ||
| pub(crate) fn slice_chunks(&self, chunks: Range<usize>) -> VortexResult<Self> { | ||
| let lane_offsets_start = chunks.start * self.n_lanes; | ||
| let lane_offsets_stop = chunks.end * self.n_lanes + 1; | ||
|
|
||
| let sliced_lane_offsets = self | ||
| .lane_offsets | ||
| .slice_typed::<u32>(lane_offsets_start..lane_offsets_stop); | ||
| let indices = self.indices.clone(); | ||
| let values = self.values.clone(); | ||
|
|
||
| let begin = (chunks.start * 1024).max(self.offset); | ||
| let end = (chunks.end * 1024).min(self.len); | ||
|
|
||
| let offset = begin % 1024; | ||
|
|
||
| let inner = self.inner.slice(begin..end)?; | ||
|
|
||
| let len = end - begin; | ||
| let n_chunks = (end - begin).div_ceil(1024); | ||
|
|
||
| Ok(PatchedArray { | ||
| inner, | ||
| n_chunks, | ||
| n_lanes: self.n_lanes, | ||
| offset, | ||
| len, | ||
| indices, | ||
| values, | ||
| lane_offsets: sliced_lane_offsets, | ||
| stats_set: ArrayStats::default(), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| /// Transpose a set of patches from the default sorted layout into the data parallel layout. | ||
| #[allow(clippy::cognitive_complexity)] | ||
| fn transpose_patches(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult<TransposedPatches> { | ||
| let array_len = patches.array_len(); | ||
| let offset = patches.offset(); | ||
|
|
||
| let indices = patches | ||
| .indices() | ||
| .clone() | ||
| .execute::<Canonical>(ctx)? | ||
| .into_primitive(); | ||
|
|
||
| let values = patches | ||
| .values() | ||
| .clone() | ||
| .execute::<Canonical>(ctx)? | ||
| .into_primitive(); | ||
|
|
||
| let indices_ptype = indices.ptype(); | ||
| let values_ptype = values.ptype(); | ||
|
|
||
| let indices = indices.buffer_handle().clone().unwrap_host(); | ||
| let values = values.buffer_handle().clone().unwrap_host(); | ||
|
|
||
| match_each_unsigned_integer_ptype!(indices_ptype, |I| { | ||
| match_each_native_ptype!(values_ptype, |V| { | ||
| let indices: Buffer<I> = Buffer::from_byte_buffer(indices); | ||
| let values: Buffer<V> = Buffer::from_byte_buffer(values); | ||
|
|
||
| Ok(transpose( | ||
| indices.as_slice(), | ||
| values.as_slice(), | ||
| offset, | ||
| array_len, | ||
| )) | ||
| }) | ||
| }) | ||
| } | ||
|
|
||
| #[allow(clippy::cast_possible_truncation)] | ||
| fn transpose<I: IntegerPType, V: NativePType>( | ||
| indices_in: &[I], | ||
| values_in: &[V], | ||
| offset: usize, | ||
| array_len: usize, | ||
| ) -> TransposedPatches { | ||
| // Total number of slots is number of chunks times number of lanes. | ||
| let n_chunks = array_len.div_ceil(1024); | ||
| assert!( | ||
| n_chunks <= u32::MAX as usize, | ||
| "Cannot transpose patches for array with >= 4 trillion elements" | ||
| ); | ||
|
|
||
| let n_lanes = patch_lanes::<V>(); | ||
|
|
||
| // We know upfront how many indices and values we'll have. | ||
| let mut indices_buffer = BufferMut::with_capacity(indices_in.len()); | ||
| let mut values_buffer = BufferMut::with_capacity(values_in.len()); | ||
|
|
||
| // number of patches in each chunk. | ||
| let mut lane_offsets: BufferMut<u32> = BufferMut::zeroed(n_chunks * n_lanes + 1); | ||
|
|
||
| // Scan the index/values once to get chunk/lane counts | ||
| for index in indices_in { | ||
| let index = index.as_() - offset; | ||
| let chunk = index / 1024; | ||
| let lane = index % n_lanes; | ||
|
|
||
| lane_offsets[chunk * n_lanes + lane + 1] += 1; | ||
| } | ||
|
|
||
| // Prefix-sum sizes -> offsets | ||
| for index in 1..lane_offsets.len() { | ||
| lane_offsets[index] += lane_offsets[index - 1]; | ||
| } | ||
|
|
||
| // Loop over patches, writing them to final positions | ||
| let indices_out = indices_buffer.spare_capacity_mut(); | ||
| let values_out = values_buffer.spare_capacity_mut(); | ||
| for (index, &value) in std::iter::zip(indices_in, values_in) { | ||
| let index = index.as_() - offset; | ||
| let chunk = index / 1024; | ||
| let lane = index % n_lanes; | ||
|
|
||
| let position = &mut lane_offsets[chunk * n_lanes + lane]; | ||
| indices_out[*position as usize].write((index % 1024) as u16); | ||
| values_out[*position as usize].write(value); | ||
| *position += 1; | ||
| } | ||
|
|
||
| // SAFETY: we know there are exactly indices_in.len() indices/values, and we just | ||
| // set them to the appropriate values in the loop above. | ||
| unsafe { | ||
| indices_buffer.set_len(indices_in.len()); | ||
| values_buffer.set_len(values_in.len()); | ||
| } | ||
|
|
||
| // Now, pass over all the indices and values again and subtract out the position increments. | ||
| for index in indices_in { | ||
| let index = index.as_() - offset; | ||
| let chunk = index / 1024; | ||
| let lane = index % n_lanes; | ||
|
|
||
| lane_offsets[chunk * n_lanes + lane] -= 1; | ||
| } | ||
|
|
||
| TransposedPatches { | ||
| n_chunks, | ||
| n_lanes, | ||
| lane_offsets: lane_offsets.freeze().into_byte_buffer(), | ||
| indices: indices_buffer.freeze().into_byte_buffer(), | ||
| values: values_buffer.freeze().into_byte_buffer(), | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my feeling is that we want this to be an array too in-case we later do want to compress this array.