Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 98 additions & 97 deletions native/core/src/execution/shuffle/spark_unsafe/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,44 @@ macro_rules! impl_append_to_builder {
return;
}

// SAFETY: element_offset points to contiguous element data of length num_elements.
debug_assert!(self.element_offset != 0, "element_offset is null");
let ptr = self.element_offset as *const $element_type;
let aligned =
(ptr as usize).is_multiple_of(std::mem::align_of::<$element_type>());

if NULLABLE {
let mut ptr = self.element_offset as *const $element_type;
let null_words = self.null_bitset_ptr();
debug_assert!(!null_words.is_null(), "null_bitset_ptr is null");
debug_assert!(!ptr.is_null(), "element_offset pointer is null");
for idx in 0..num_elements {
// SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements
let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) };

if is_null {
builder.append_null();
} else {
// SAFETY: ptr is within element data bounds
builder.append_value(unsafe { ptr.read_unaligned() });
if aligned {
let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
for (idx, &val) in values.iter().enumerate() {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
builder.append_value(val);
}
}
// SAFETY: ptr stays within bounds, iterating num_elements times
ptr = unsafe { ptr.add(1) };
}
} else {
// SAFETY: element_offset points to contiguous data of length num_elements
debug_assert!(self.element_offset != 0, "element_offset is null");
let ptr = self.element_offset as *const $element_type;
// Use bulk copy when data is properly aligned, fall back to
// per-element unaligned reads otherwise
if (ptr as usize).is_multiple_of(std::mem::align_of::<$element_type>()) {
let slice = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
builder.append_slice(slice);
} else {
let mut ptr = ptr;
for _ in 0..num_elements {
builder.append_value(unsafe { ptr.read_unaligned() });
for idx in 0..num_elements {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
builder.append_value(unsafe { ptr.read_unaligned() });
}
ptr = unsafe { ptr.add(1) };
}
}
} else if aligned {
let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
builder.append_slice(values);
} else {
let mut ptr = ptr;
for _ in 0..num_elements {
builder.append_value(unsafe { ptr.read_unaligned() });
ptr = unsafe { ptr.add(1) };
}
}
}
};
Expand Down Expand Up @@ -191,36 +195,31 @@ impl SparkUnsafeArray {
return;
}

let mut ptr = self.element_offset as *const u8;
// SAFETY: element_offset points to contiguous u8 data of length num_elements.
debug_assert!(
!ptr.is_null(),
self.element_offset != 0,
"append_booleans: element_offset pointer is null"
);
let values =
unsafe { std::slice::from_raw_parts(self.element_offset as *const u8, num_elements) };

if NULLABLE {
let null_words = self.null_bitset_ptr();
debug_assert!(
!null_words.is_null(),
"append_booleans: null_bitset_ptr is null"
);
for idx in 0..num_elements {
for (idx, &val) in values.iter().enumerate() {
// SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements
let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) };

if is_null {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
// SAFETY: ptr is within element data bounds
builder.append_value(unsafe { *ptr != 0 });
builder.append_value(val != 0);
}
// SAFETY: ptr stays within bounds, iterating num_elements times
ptr = unsafe { ptr.add(1) };
}
} else {
for _ in 0..num_elements {
// SAFETY: ptr is within element data bounds
builder.append_value(unsafe { *ptr != 0 });
ptr = unsafe { ptr.add(1) };
for &val in values {
builder.append_value(val != 0);
}
}
}
Expand All @@ -235,47 +234,48 @@ impl SparkUnsafeArray {
return;
}

debug_assert!(
self.element_offset != 0,
"append_timestamps: element_offset is null"
);
let ptr = self.element_offset as *const i64;
let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::<i64>());

if NULLABLE {
let mut ptr = self.element_offset as *const i64;
let null_words = self.null_bitset_ptr();
debug_assert!(
!null_words.is_null(),
"append_timestamps: null_bitset_ptr is null"
);
debug_assert!(
!ptr.is_null(),
"append_timestamps: element_offset pointer is null"
);
for idx in 0..num_elements {
// SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements
let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) };

if is_null {
builder.append_null();
} else {
// SAFETY: ptr is within element data bounds
builder.append_value(unsafe { ptr.read_unaligned() });
if aligned {
let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
for (idx, &val) in values.iter().enumerate() {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
builder.append_value(val);
}
}
// SAFETY: ptr stays within bounds, iterating num_elements times
ptr = unsafe { ptr.add(1) };
}
} else {
// SAFETY: element_offset points to contiguous i64 data of length num_elements
debug_assert!(
self.element_offset != 0,
"append_timestamps: element_offset is null"
);
let ptr = self.element_offset as *const i64;
if (ptr as usize).is_multiple_of(std::mem::align_of::<i64>()) {
let slice = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
builder.append_slice(slice);
} else {
let mut ptr = ptr;
for _ in 0..num_elements {
builder.append_value(unsafe { ptr.read_unaligned() });
for idx in 0..num_elements {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
builder.append_value(unsafe { ptr.read_unaligned() });
}
ptr = unsafe { ptr.add(1) };
}
}
} else if aligned {
let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
builder.append_slice(values);
} else {
let mut ptr = ptr;
for _ in 0..num_elements {
builder.append_value(unsafe { ptr.read_unaligned() });
ptr = unsafe { ptr.add(1) };
}
}
}

Expand All @@ -289,47 +289,48 @@ impl SparkUnsafeArray {
return;
}

debug_assert!(
self.element_offset != 0,
"append_dates: element_offset is null"
);
let ptr = self.element_offset as *const i32;
let aligned = (ptr as usize).is_multiple_of(std::mem::align_of::<i32>());

if NULLABLE {
let mut ptr = self.element_offset as *const i32;
let null_words = self.null_bitset_ptr();
debug_assert!(
!null_words.is_null(),
"append_dates: null_bitset_ptr is null"
);
debug_assert!(
!ptr.is_null(),
"append_dates: element_offset pointer is null"
);
for idx in 0..num_elements {
// SAFETY: null_words has ceil(num_elements/64) words, idx < num_elements
let is_null = unsafe { Self::is_null_in_bitset(null_words, idx) };

if is_null {
builder.append_null();
} else {
// SAFETY: ptr is within element data bounds
builder.append_value(unsafe { ptr.read_unaligned() });
if aligned {
let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
for (idx, &val) in values.iter().enumerate() {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
builder.append_value(val);
}
}
// SAFETY: ptr stays within bounds, iterating num_elements times
ptr = unsafe { ptr.add(1) };
}
} else {
// SAFETY: element_offset points to contiguous i32 data of length num_elements
debug_assert!(
self.element_offset != 0,
"append_dates: element_offset is null"
);
let ptr = self.element_offset as *const i32;
if (ptr as usize).is_multiple_of(std::mem::align_of::<i32>()) {
let slice = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
builder.append_slice(slice);
} else {
let mut ptr = ptr;
for _ in 0..num_elements {
builder.append_value(unsafe { ptr.read_unaligned() });
for idx in 0..num_elements {
if unsafe { Self::is_null_in_bitset(null_words, idx) } {
builder.append_null();
} else {
builder.append_value(unsafe { ptr.read_unaligned() });
}
ptr = unsafe { ptr.add(1) };
}
}
} else if aligned {
let values = unsafe { std::slice::from_raw_parts(ptr, num_elements) };
builder.append_slice(values);
} else {
let mut ptr = ptr;
for _ in 0..num_elements {
builder.append_value(unsafe { ptr.read_unaligned() });
ptr = unsafe { ptr.add(1) };
}
}
}
}
Expand Down
Loading