Skip to content

Commit 0a86a90

Browse files
committed
Compact StringView buffer during sparse take to avoid holding the original buffers alive when selectivity is high
1 parent 7dbe58a commit 0a86a90

1 file changed

Lines changed: 307 additions & 1 deletion

File tree

arrow-select/src/take.rs

Lines changed: 307 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow_buffer::{
2828
ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
2929
bit_util,
3030
};
31-
use arrow_data::ArrayDataBuilder;
31+
use arrow_data::{ArrayDataBuilder, ByteView, MAX_INLINE_VIEW_LEN};
3232
use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};
3333

3434
use num_traits::{One, Zero};
@@ -596,10 +596,18 @@ fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
596596
}
597597

598598
/// `take` implementation for byte view arrays
599+
///
600+
/// Automatically compacts string data when the take is sparse (selecting less
601+
/// than half the rows), which avoids holding the original large buffers alive.
599602
fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
600603
array: &GenericByteViewArray<T>,
601604
indices: &PrimitiveArray<IndexType>,
602605
) -> Result<GenericByteViewArray<T>, ArrowError> {
606+
// If selecting less than half the rows, compact the string data to avoid
607+
// keeping the original (potentially large) buffers alive via Arc.
608+
if indices.len() < array.len() / 2 {
609+
return take_byte_view_compact(array, indices);
610+
}
603611
let new_views = take_native(array.views(), indices);
604612
let new_nulls = take_nulls(array.nulls(), indices);
605613
// Safety: array.views was valid, and take_native copies only valid values, and verifies bounds
@@ -608,6 +616,156 @@ fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
608616
})
609617
}
610618

619+
/// `take` implementation for byte view arrays that compacts string data into
620+
/// new buffers rather than sharing the original buffers.
621+
///
622+
/// This fuses the gather (take) with string compaction in a single pass,
623+
/// producing an output array whose buffers contain only the referenced data.
624+
/// This is beneficial when `take` selects a small fraction of the source array,
625+
/// as it avoids keeping the original large buffers alive.
626+
///
627+
/// The output uses multiple buffers if a single buffer would exceed `u32::MAX`
628+
/// bytes, ensuring `ByteView::offset` never overflows.
629+
#[inline(never)]
630+
fn take_byte_view_compact<T: ByteViewType, IndexType: ArrowPrimitiveType>(
631+
array: &GenericByteViewArray<T>,
632+
indices: &PrimitiveArray<IndexType>,
633+
) -> Result<GenericByteViewArray<T>, ArrowError> {
634+
let src_views = array.views();
635+
let src_buffers = array.data_buffers();
636+
637+
// Phase 1: Calculate total non-inlined string bytes to pre-allocate.
638+
// This avoids reallocations during the copy phase. We only read the u128
639+
// view descriptors here, not the actual string data.
640+
let mut total_bytes: usize = 0;
641+
let indices_null_count = indices.null_count();
642+
if indices_null_count == 0 {
643+
for idx in indices.values() {
644+
let raw_view = src_views[idx.as_usize()];
645+
let len = raw_view as u32;
646+
if len > MAX_INLINE_VIEW_LEN {
647+
total_bytes += len as usize;
648+
}
649+
}
650+
} else {
651+
let index_nulls = indices.nulls().unwrap();
652+
for (i, idx) in indices.values().iter().enumerate() {
653+
// SAFETY: i < indices.len()
654+
if unsafe { index_nulls.inner().value_unchecked(i) } {
655+
let raw_view = src_views[idx.as_usize()];
656+
let len = raw_view as u32;
657+
if len > MAX_INLINE_VIEW_LEN {
658+
total_bytes += len as usize;
659+
}
660+
}
661+
}
662+
}
663+
664+
// Phase 2: Build output views and compact string data.
665+
// We cap each buffer at u32::MAX bytes to avoid offset overflow.
666+
let mut new_views: Vec<u128> = Vec::with_capacity(indices.len());
667+
let mut completed_buffers: Vec<Buffer> = Vec::new();
668+
// Start with exact pre-allocation when it fits in a single buffer,
669+
// otherwise start with u32::MAX and we'll spill as needed.
670+
let initial_cap = total_bytes.min(u32::MAX as usize);
671+
let mut current_buffer: Vec<u8> = Vec::with_capacity(initial_cap);
672+
673+
if indices_null_count == 0 {
674+
for idx in indices.values() {
675+
let raw_view = src_views[idx.as_usize()];
676+
let len = raw_view as u32;
677+
if len > MAX_INLINE_VIEW_LEN {
678+
let mut view = ByteView::from(raw_view);
679+
let str_len = view.length as usize;
680+
681+
// Check if adding this string would overflow u32 offset
682+
if current_buffer.len() + str_len > u32::MAX as usize {
683+
let full_buf = std::mem::replace(
684+
&mut current_buffer,
685+
Vec::with_capacity(
686+
(total_bytes - completed_buffers.iter().map(|b| b.len()).sum::<usize>())
687+
.min(u32::MAX as usize),
688+
),
689+
);
690+
completed_buffers.push(Buffer::from_vec(full_buf));
691+
}
692+
693+
// SAFETY: source views are validly constructed
694+
let src = unsafe {
695+
src_buffers
696+
.get_unchecked(view.buffer_index as usize)
697+
.get_unchecked(view.offset as usize..view.offset as usize + str_len)
698+
};
699+
700+
view.buffer_index = completed_buffers.len() as u32;
701+
view.offset = current_buffer.len() as u32;
702+
current_buffer.extend_from_slice(src);
703+
704+
new_views.push(view.as_u128());
705+
} else {
706+
new_views.push(raw_view);
707+
}
708+
}
709+
} else {
710+
let index_nulls = indices.nulls().unwrap();
711+
for (i, idx) in indices.values().iter().enumerate() {
712+
// SAFETY: i < indices.len()
713+
if unsafe { !index_nulls.inner().value_unchecked(i) } {
714+
new_views.push(0u128);
715+
continue;
716+
}
717+
let raw_view = src_views[idx.as_usize()];
718+
let len = raw_view as u32;
719+
if len > MAX_INLINE_VIEW_LEN {
720+
let mut view = ByteView::from(raw_view);
721+
let str_len = view.length as usize;
722+
723+
// Check if adding this string would overflow u32 offset
724+
if current_buffer.len() + str_len > u32::MAX as usize {
725+
let full_buf = std::mem::replace(
726+
&mut current_buffer,
727+
Vec::with_capacity(
728+
(total_bytes - completed_buffers.iter().map(|b| b.len()).sum::<usize>())
729+
.min(u32::MAX as usize),
730+
),
731+
);
732+
completed_buffers.push(Buffer::from_vec(full_buf));
733+
}
734+
735+
// SAFETY: source views are validly constructed
736+
let src = unsafe {
737+
src_buffers
738+
.get_unchecked(view.buffer_index as usize)
739+
.get_unchecked(view.offset as usize..view.offset as usize + str_len)
740+
};
741+
742+
view.buffer_index = completed_buffers.len() as u32;
743+
view.offset = current_buffer.len() as u32;
744+
current_buffer.extend_from_slice(src);
745+
746+
new_views.push(view.as_u128());
747+
} else {
748+
new_views.push(raw_view);
749+
}
750+
}
751+
}
752+
753+
// Finalize the last buffer
754+
completed_buffers.push(Buffer::from_vec(current_buffer));
755+
756+
let new_nulls = take_nulls(array.nulls(), indices);
757+
758+
// SAFETY: views are constructed from valid source views with updated
759+
// buffer_index and offset pointing into the newly built compact buffers.
760+
Ok(unsafe {
761+
GenericByteViewArray::new_unchecked(
762+
ScalarBuffer::from(new_views),
763+
completed_buffers,
764+
new_nulls,
765+
)
766+
})
767+
}
768+
611769
/// `take` implementation for list arrays
612770
///
613771
/// Calculates the index and indexed offset for the inner array,
@@ -1710,6 +1868,154 @@ mod tests {
17101868
_test_byte_view::<BinaryViewType>()
17111869
}
17121870

1871+
/// Helper to test the compact path of take_byte_view.
1872+
/// Creates an array with many rows and takes a small subset to trigger compaction.
1873+
fn _test_byte_view_compact<T>()
1874+
where
1875+
T: ByteViewType,
1876+
str: AsRef<T::Native>,
1877+
T::Native: PartialEq,
1878+
{
1879+
// Build a source array with 20 rows (mix of inlined and non-inlined strings)
1880+
let mut builder = GenericByteViewBuilder::<T>::new();
1881+
let values = [
1882+
"hello", // 5 bytes, inlined
1883+
"world", // 5 bytes, inlined
1884+
"this is a long string over 12b", // 30 bytes, non-inlined
1885+
"short", // 5 bytes, inlined
1886+
"another large payload string!!", // 30 bytes, non-inlined
1887+
"x",
1888+
"y",
1889+
"z",
1890+
"a medium str!!", // 14 bytes, non-inlined
1891+
"b",
1892+
"c",
1893+
"d",
1894+
"e",
1895+
"f",
1896+
"g",
1897+
"h",
1898+
"i",
1899+
"j",
1900+
"k",
1901+
"l",
1902+
];
1903+
for v in &values {
1904+
builder.append_value(v);
1905+
}
1906+
let array = builder.finish();
1907+
assert_eq!(array.len(), 20);
1908+
1909+
// Take 3 rows out of 20 → 3 < 20/2 = 10, so compact path is taken
1910+
let indices = UInt32Array::from(vec![0, 2, 4]);
1911+
let result = take(&array, &indices, None).unwrap();
1912+
1913+
assert_eq!(result.len(), 3);
1914+
let result = result.as_any().downcast_ref::<GenericByteViewArray<T>>().unwrap();
1915+
1916+
let expected = {
1917+
let mut b = GenericByteViewBuilder::<T>::new();
1918+
b.append_value(values[0]);
1919+
b.append_value(values[2]);
1920+
b.append_value(values[4]);
1921+
b.finish()
1922+
};
1923+
assert_eq!(result, &expected);
1924+
1925+
// Verify compaction: result should NOT share the original buffers.
1926+
// The original buffers hold all 20 rows' data. The compact result should
1927+
// only have the bytes for the 2 non-inlined strings we selected.
1928+
let original_buf_size: usize = array.data_buffers().iter().map(|b| b.len()).sum();
1929+
let result_buf_size: usize = result.data_buffers().iter().map(|b| b.len()).sum();
1930+
assert!(
1931+
result_buf_size < original_buf_size,
1932+
"compact result buffers ({result_buf_size}) should be smaller than original ({original_buf_size})"
1933+
);
1934+
}
1935+
1936+
#[test]
1937+
fn test_take_string_view_compact() {
1938+
_test_byte_view_compact::<StringViewType>()
1939+
}
1940+
1941+
#[test]
1942+
fn test_take_binary_view_compact() {
1943+
_test_byte_view_compact::<BinaryViewType>()
1944+
}
1945+
1946+
/// Test compact path with nullable indices
1947+
#[test]
1948+
fn test_take_byte_view_compact_with_nulls() {
1949+
let mut builder = GenericByteViewBuilder::<StringViewType>::new();
1950+
for i in 0..20 {
1951+
if i % 5 == 0 {
1952+
builder.append_value(format!("long string number {i} over twelve bytes"));
1953+
} else {
1954+
builder.append_value(format!("s{i}"));
1955+
}
1956+
}
1957+
let array = builder.finish();
1958+
1959+
// Take with some null indices; 4 < 20/2 so compact triggers
1960+
let indices = UInt32Array::from(vec![Some(0), None, Some(5), Some(1)]);
1961+
let result = take(&array, &indices, None).unwrap();
1962+
1963+
assert_eq!(result.len(), 4);
1964+
let sv = result.as_string_view();
1965+
assert_eq!(sv.value(0), "long string number 0 over twelve bytes");
1966+
assert!(result.is_null(1));
1967+
assert_eq!(sv.value(2), "long string number 5 over twelve bytes");
1968+
assert_eq!(sv.value(3), "s1");
1969+
}
1970+
1971+
/// Test compact path with nullable source array
1972+
#[test]
1973+
fn test_take_byte_view_compact_with_null_source() {
1974+
let mut builder = GenericByteViewBuilder::<StringViewType>::new();
1975+
for i in 0..20 {
1976+
if i == 3 {
1977+
builder.append_null();
1978+
} else if i % 5 == 0 {
1979+
builder.append_value(format!("long string number {i} over twelve bytes"));
1980+
} else {
1981+
builder.append_value(format!("s{i}"));
1982+
}
1983+
}
1984+
let array = builder.finish();
1985+
1986+
// indices.len() = 4 < 20/2 = 10 → compact
1987+
let indices = UInt32Array::from(vec![0, 3, 5, 1]);
1988+
let result = take(&array, &indices, None).unwrap();
1989+
1990+
assert_eq!(result.len(), 4);
1991+
let sv = result.as_string_view();
1992+
assert_eq!(sv.value(0), "long string number 0 over twelve bytes");
1993+
assert!(result.is_null(1)); // source was null at index 3
1994+
assert_eq!(sv.value(2), "long string number 5 over twelve bytes");
1995+
assert_eq!(sv.value(3), "s1");
1996+
}
1997+
1998+
/// Test that the non-compact (shared buffer) path still works when
1999+
/// selecting more than half the rows.
2000+
#[test]
2001+
fn test_take_byte_view_shared_path() {
2002+
let mut builder = GenericByteViewBuilder::<StringViewType>::new();
2003+
for i in 0..10 {
2004+
builder.append_value(format!("long string number {i} over twelve bytes"));
2005+
}
2006+
let array = builder.finish();
2007+
2008+
// Take 8 out of 10 → 8 >= 10/2 = 5, so shared (non-compact) path
2009+
let indices = UInt32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
2010+
let result = take(&array, &indices, None).unwrap();
2011+
2012+
assert_eq!(result.len(), 8);
2013+
let sv = result.as_string_view();
2014+
for i in 0..8 {
2015+
assert_eq!(sv.value(i), format!("long string number {i} over twelve bytes"));
2016+
}
2017+
}
2018+
17132019
macro_rules! test_take_list {
17142020
($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
17152021
// Construct a value array, [[0,0,0], [-1,-2,-1], [], [2,3]]

0 commit comments

Comments
 (0)