Skip to content

Commit c4d8432

Browse files
committed
Compact StringView buffer during sparse take to avoid holding the original buffers alive when selectivity is high
Signed-off-by: lyang24 <lanqingy93@gmail.com>
1 parent 7dbe58a commit c4d8432

1 file changed

Lines changed: 298 additions & 1 deletion

File tree

arrow-select/src/take.rs

Lines changed: 298 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow_buffer::{
2828
ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
2929
bit_util,
3030
};
31-
use arrow_data::ArrayDataBuilder;
31+
use arrow_data::{ArrayDataBuilder, ByteView, MAX_INLINE_VIEW_LEN};
3232
use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};
3333

3434
use num_traits::{One, Zero};
@@ -596,10 +596,18 @@ fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
596596
}
597597

598598
/// `take` implementation for byte view arrays
599+
///
600+
/// Automatically compacts string data when the take is sparse (selecting less
601+
/// than half the rows), which avoids holding the original large buffers alive.
599602
fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
600603
array: &GenericByteViewArray<T>,
601604
indices: &PrimitiveArray<IndexType>,
602605
) -> Result<GenericByteViewArray<T>, ArrowError> {
606+
// If selecting less than half the rows, compact the string data to avoid
607+
// keeping the original (potentially large) buffers alive via Arc.
608+
if indices.len() < array.len() / 2 {
609+
return take_byte_view_compact(array, indices);
610+
}
603611
let new_views = take_native(array.views(), indices);
604612
let new_nulls = take_nulls(array.nulls(), indices);
605613
// Safety: array.views was valid, and take_native copies only valid values, and verifies bounds
@@ -608,6 +616,141 @@ fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
608616
})
609617
}
610618

619+
/// `take` implementation for byte view arrays that compacts string data into
620+
/// new buffers rather than sharing the original buffers.
621+
///
622+
/// This fuses the gather (take) with string compaction in a single pass,
623+
/// producing an output array whose buffers contain only the referenced data.
624+
/// This is beneficial when `take` selects a small fraction of the source array,
625+
/// as it avoids keeping the original large buffers alive.
626+
///
627+
/// The output uses multiple buffers if a single buffer would exceed `u32::MAX`
628+
/// bytes, ensuring `ByteView::offset` never overflows.
629+
///
630+
/// # Safety contract
631+
/// Callers must ensure that all non-null indices are within bounds of
632+
/// `array` (i.e. `< array.len()`). This is guaranteed when called via
633+
/// `take()` with `check_bounds` enabled, or when the caller otherwise
634+
/// validates indices. Out-of-bounds indices will cause a panic (indexing
635+
/// `src_views`) or UB (via `get_unchecked` on `src_buffers`).
636+
#[inline(never)]
637+
fn take_byte_view_compact<T: ByteViewType, IndexType: ArrowPrimitiveType>(
638+
array: &GenericByteViewArray<T>,
639+
indices: &PrimitiveArray<IndexType>,
640+
) -> Result<GenericByteViewArray<T>, ArrowError> {
641+
let src_views = array.views();
642+
let src_buffers = array.data_buffers();
643+
let index_nulls = indices.nulls();
644+
645+
// Phase 1: Calculate total non-inlined string bytes to pre-allocate.
646+
// This avoids reallocations during the copy phase. We only read the u128
647+
// view descriptors here, not the actual string data.
648+
let mut total_bytes: usize = 0;
649+
for (i, idx) in indices.values().iter().enumerate() {
650+
// SAFETY: i < indices.len(), which equals index_nulls.len() by Arrow invariant
651+
if index_nulls.is_some_and(|n| unsafe { !n.inner().value_unchecked(i) }) {
652+
continue;
653+
}
654+
let raw_view = src_views[idx.as_usize()];
655+
let len = raw_view as u32;
656+
if len > MAX_INLINE_VIEW_LEN {
657+
total_bytes += len as usize;
658+
}
659+
}
660+
661+
// Phase 2: Build output views and compact string data.
662+
// We cap each buffer at u32::MAX bytes to avoid offset overflow.
663+
let mut new_views: Vec<u128> = Vec::with_capacity(indices.len());
664+
let mut completed_buffers: Vec<Buffer> = Vec::new();
665+
let initial_cap = total_bytes.min(u32::MAX as usize);
666+
let mut current_buffer: Vec<u8> = Vec::with_capacity(initial_cap);
667+
let mut written_bytes: usize = 0;
668+
669+
for (i, idx) in indices.values().iter().enumerate() {
670+
// SAFETY: i < indices.len(), which equals index_nulls.len() by Arrow invariant
671+
if index_nulls.is_some_and(|n| unsafe { !n.inner().value_unchecked(i) }) {
672+
new_views.push(0u128);
673+
continue;
674+
}
675+
676+
let raw_view = src_views[idx.as_usize()];
677+
let len = raw_view as u32;
678+
if len <= MAX_INLINE_VIEW_LEN {
679+
new_views.push(raw_view);
680+
continue;
681+
}
682+
683+
let mut view = ByteView::from(raw_view);
684+
let str_len = view.length as usize;
685+
686+
// Spill to a new buffer if this string would overflow u32 offset
687+
if current_buffer.len() + str_len > u32::MAX as usize {
688+
spill_buffer(
689+
&mut current_buffer,
690+
&mut completed_buffers,
691+
&mut written_bytes,
692+
total_bytes,
693+
);
694+
}
695+
696+
// SAFETY: source views are validly constructed
697+
let src = unsafe { resolve_view_bytes(&view, src_buffers) };
698+
699+
view.buffer_index = completed_buffers.len() as u32;
700+
view.offset = current_buffer.len() as u32;
701+
current_buffer.extend_from_slice(src);
702+
703+
new_views.push(view.as_u128());
704+
}
705+
706+
// Finalize the last buffer
707+
completed_buffers.push(Buffer::from_vec(current_buffer));
708+
709+
let new_nulls = take_nulls(array.nulls(), indices);
710+
711+
// SAFETY: views are constructed from valid source views with updated
712+
// buffer_index and offset pointing into the newly built compact buffers.
713+
Ok(unsafe {
714+
GenericByteViewArray::new_unchecked(
715+
ScalarBuffer::from(new_views),
716+
completed_buffers,
717+
new_nulls,
718+
)
719+
})
720+
}
721+
722+
/// Flush `current_buffer` into `completed_buffers` and replace it with a
723+
/// fresh allocation sized for the remaining bytes.
724+
#[inline(always)]
725+
fn spill_buffer(
726+
current_buffer: &mut Vec<u8>,
727+
completed_buffers: &mut Vec<Buffer>,
728+
written_bytes: &mut usize,
729+
total_bytes: usize,
730+
) {
731+
*written_bytes += current_buffer.len();
732+
let remaining = (total_bytes - *written_bytes).min(u32::MAX as usize);
733+
let full_buf = std::mem::replace(current_buffer, Vec::with_capacity(remaining));
734+
completed_buffers.push(Buffer::from_vec(full_buf));
735+
}
736+
737+
/// Resolve a non-inlined [`ByteView`] to its backing byte slice.
738+
///
739+
/// # Safety
740+
/// The view must have been produced from a valid `GenericByteViewArray`
741+
/// whose buffers are `src_buffers`.
742+
#[inline(always)]
743+
unsafe fn resolve_view_bytes<'a>(view: &ByteView, src_buffers: &'a [Buffer]) -> &'a [u8] {
744+
let len = view.length as usize;
745+
// SAFETY: caller guarantees the view was produced from a valid array
746+
// backed by `src_buffers`.
747+
unsafe {
748+
src_buffers
749+
.get_unchecked(view.buffer_index as usize)
750+
.get_unchecked(view.offset as usize..view.offset as usize + len)
751+
}
752+
}
753+
611754
/// `take` implementation for list arrays
612755
///
613756
/// Calculates the index and indexed offset for the inner array,
@@ -1710,6 +1853,160 @@ mod tests {
17101853
_test_byte_view::<BinaryViewType>()
17111854
}
17121855

1856+
/// Helper to test the compact path of take_byte_view.
1857+
/// Creates an array with many rows and takes a small subset to trigger compaction.
1858+
fn _test_byte_view_compact<T>()
1859+
where
1860+
T: ByteViewType,
1861+
str: AsRef<T::Native>,
1862+
T::Native: PartialEq,
1863+
{
1864+
// Build a source array with 20 rows (mix of inlined and non-inlined strings)
1865+
let mut builder = GenericByteViewBuilder::<T>::new();
1866+
let values = [
1867+
"hello", // 5 bytes, inlined
1868+
"world", // 5 bytes, inlined
1869+
"this is a long string over 12b", // 30 bytes, non-inlined
1870+
"short", // 5 bytes, inlined
1871+
"another large payload string!!", // 30 bytes, non-inlined
1872+
"x",
1873+
"y",
1874+
"z",
1875+
"a medium str!!", // 14 bytes, non-inlined
1876+
"b",
1877+
"c",
1878+
"d",
1879+
"e",
1880+
"f",
1881+
"g",
1882+
"h",
1883+
"i",
1884+
"j",
1885+
"k",
1886+
"l",
1887+
];
1888+
for v in &values {
1889+
builder.append_value(v);
1890+
}
1891+
let array = builder.finish();
1892+
assert_eq!(array.len(), 20);
1893+
1894+
// Take 3 rows out of 20 → 3 < 20/2 = 10, so compact path is taken
1895+
let indices = UInt32Array::from(vec![0, 2, 4]);
1896+
let result = take(&array, &indices, None).unwrap();
1897+
1898+
assert_eq!(result.len(), 3);
1899+
let result = result
1900+
.as_any()
1901+
.downcast_ref::<GenericByteViewArray<T>>()
1902+
.unwrap();
1903+
1904+
let expected = {
1905+
let mut b = GenericByteViewBuilder::<T>::new();
1906+
b.append_value(values[0]);
1907+
b.append_value(values[2]);
1908+
b.append_value(values[4]);
1909+
b.finish()
1910+
};
1911+
assert_eq!(result, &expected);
1912+
1913+
// Verify compaction: result should NOT share the original buffers.
1914+
// The original buffers hold all 20 rows' data. The compact result should
1915+
// only have the bytes for the 2 non-inlined strings we selected.
1916+
let original_buf_size: usize = array.data_buffers().iter().map(|b| b.len()).sum();
1917+
let result_buf_size: usize = result.data_buffers().iter().map(|b| b.len()).sum();
1918+
assert!(
1919+
result_buf_size < original_buf_size,
1920+
"compact result buffers ({result_buf_size}) should be smaller than original ({original_buf_size})"
1921+
);
1922+
}
1923+
1924+
#[test]
1925+
fn test_take_string_view_compact() {
1926+
_test_byte_view_compact::<StringViewType>()
1927+
}
1928+
1929+
#[test]
1930+
fn test_take_binary_view_compact() {
1931+
_test_byte_view_compact::<BinaryViewType>()
1932+
}
1933+
1934+
/// Test compact path with nullable indices
1935+
#[test]
1936+
fn test_take_byte_view_compact_with_nulls() {
1937+
let mut builder = GenericByteViewBuilder::<StringViewType>::new();
1938+
for i in 0..20 {
1939+
if i % 5 == 0 {
1940+
builder.append_value(format!("long string number {i} over twelve bytes"));
1941+
} else {
1942+
builder.append_value(format!("s{i}"));
1943+
}
1944+
}
1945+
let array = builder.finish();
1946+
1947+
// Take with some null indices; 4 < 20/2 so compact triggers
1948+
let indices = UInt32Array::from(vec![Some(0), None, Some(5), Some(1)]);
1949+
let result = take(&array, &indices, None).unwrap();
1950+
1951+
assert_eq!(result.len(), 4);
1952+
let sv = result.as_string_view();
1953+
assert_eq!(sv.value(0), "long string number 0 over twelve bytes");
1954+
assert!(result.is_null(1));
1955+
assert_eq!(sv.value(2), "long string number 5 over twelve bytes");
1956+
assert_eq!(sv.value(3), "s1");
1957+
}
1958+
1959+
/// Test compact path with nullable source array
1960+
#[test]
1961+
fn test_take_byte_view_compact_with_null_source() {
1962+
let mut builder = GenericByteViewBuilder::<StringViewType>::new();
1963+
for i in 0..20 {
1964+
if i == 3 {
1965+
builder.append_null();
1966+
} else if i % 5 == 0 {
1967+
builder.append_value(format!("long string number {i} over twelve bytes"));
1968+
} else {
1969+
builder.append_value(format!("s{i}"));
1970+
}
1971+
}
1972+
let array = builder.finish();
1973+
1974+
// indices.len() = 4 < 20/2 = 10 → compact
1975+
let indices = UInt32Array::from(vec![0, 3, 5, 1]);
1976+
let result = take(&array, &indices, None).unwrap();
1977+
1978+
assert_eq!(result.len(), 4);
1979+
let sv = result.as_string_view();
1980+
assert_eq!(sv.value(0), "long string number 0 over twelve bytes");
1981+
assert!(result.is_null(1)); // source was null at index 3
1982+
assert_eq!(sv.value(2), "long string number 5 over twelve bytes");
1983+
assert_eq!(sv.value(3), "s1");
1984+
}
1985+
1986+
/// Test that the non-compact (shared buffer) path still works when
1987+
/// selecting more than half the rows.
1988+
#[test]
1989+
fn test_take_byte_view_shared_path() {
1990+
let mut builder = GenericByteViewBuilder::<StringViewType>::new();
1991+
for i in 0..10 {
1992+
builder.append_value(format!("long string number {i} over twelve bytes"));
1993+
}
1994+
let array = builder.finish();
1995+
1996+
// Take 8 out of 10 → 8 >= 10/2 = 5, so shared (non-compact) path
1997+
let indices = UInt32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1998+
let result = take(&array, &indices, None).unwrap();
1999+
2000+
assert_eq!(result.len(), 8);
2001+
let sv = result.as_string_view();
2002+
for i in 0..8 {
2003+
assert_eq!(
2004+
sv.value(i),
2005+
format!("long string number {i} over twelve bytes")
2006+
);
2007+
}
2008+
}
2009+
17132010
macro_rules! test_take_list {
17142011
($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
17152012
// Construct a value array, [[0,0,0], [-1,-2,-1], [], [2,3]]

0 commit comments

Comments
 (0)