Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 164 additions & 143 deletions datafusion/functions/benches/strpos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,178 +18,199 @@
use arrow::array::{StringArray, StringViewArray};
use arrow::datatypes::{DataType, Field};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use rand::distr::Alphanumeric;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::hint::black_box;
use std::str::Chars;
use std::sync::Arc;

/// Returns a `Vec<ColumnarValue>` with two elements: a haystack array and a
/// needle array. Each haystack is a random string of `str_len_chars`
/// characters. Each needle is a random contiguous substring of its
/// corresponding haystack (i.e., the needle is always present in the haystack).
/// Around `null_density` fraction of rows are null and `utf8_density` fraction
/// contain non-ASCII characters; the remaining rows are ASCII-only.
fn gen_string_array(
n_rows: usize,
#[rustfmt::skip]
const UTF8_CORPUS: &[char] = &[
// Cyrillic (2 bytes each)
'А', 'Б', 'В', 'Г', 'Д', 'Е', 'Ж', 'З', 'И', 'К', 'Л', 'М', 'Н', 'О', 'П', 'Р', 'С',
'Т', 'У', 'Ф', 'Х', 'Ц', 'Ч', 'Ш', 'Щ', 'Э', 'Ю', 'Я',
// CJK (3 bytes each)
'数', '据', '融', '合', '查', '询', '引', '擎', '优', '化', '执', '行', '计', '划',
'表', '达',
// Emoji (4 bytes each)
'📊', '🔥', '🚀', '⚡', '🎯', '💡', '🔧', '📈',
];
const N_ROWS: usize = 8192;

/// Returns a random string of `len` characters. If `ascii` is true, the string
/// is ASCII-only; otherwise it is drawn from `UTF8_CORPUS`.
fn random_string(rng: &mut StdRng, len: usize, ascii: bool) -> String {
if ascii {
let value: Vec<u8> = rng.sample_iter(&Alphanumeric).take(len).collect();
String::from_utf8(value).unwrap()
} else {
(0..len)
.map(|_| UTF8_CORPUS[rng.random_range(0..UTF8_CORPUS.len())])
.collect()
}
}

/// Wraps `strings` into either a `StringArray` or `StringViewArray`.
fn to_columnar_value(
strings: Vec<Option<String>>,
is_string_view: bool,
) -> ColumnarValue {
if is_string_view {
let arr: StringViewArray = strings.into_iter().collect();
ColumnarValue::Array(Arc::new(arr))
} else {
let arr: StringArray = strings.into_iter().collect();
ColumnarValue::Array(Arc::new(arr))
}
}

/// Returns haystack and needle, where both are arrays. Each needle is a
/// contiguous substring of its corresponding haystack. Around `null_density`
/// fraction of rows are null and `utf8_density` fraction contain non-ASCII
/// characters.
fn make_array_needle_args(
rng: &mut StdRng,
str_len_chars: usize,
null_density: f32,
utf8_density: f32,
is_string_view: bool, // false -> StringArray, true -> StringViewArray
is_string_view: bool,
) -> Vec<ColumnarValue> {
let mut rng = StdRng::seed_from_u64(42);
let rng_ref = &mut rng;

let utf8 = "DatafusionДатаФусион数据融合📊🔥"; // includes utf8 encoding with 1~4 bytes
let corpus_char_count = utf8.chars().count();

let mut output_string_vec: Vec<Option<String>> = Vec::with_capacity(n_rows);
let mut output_sub_string_vec: Vec<Option<String>> = Vec::with_capacity(n_rows);
for _ in 0..n_rows {
let rand_num = rng_ref.random::<f32>(); // [0.0, 1.0)
if rand_num < null_density {
output_sub_string_vec.push(None);
output_string_vec.push(None);
} else if rand_num < null_density + utf8_density {
// Generate random UTF8 string
let mut generated_string = String::with_capacity(str_len_chars);
for _ in 0..str_len_chars {
let idx = rng_ref.random_range(0..corpus_char_count);
let char = utf8.chars().nth(idx).unwrap();
generated_string.push(char);
}
output_sub_string_vec.push(Some(random_substring(generated_string.chars())));
output_string_vec.push(Some(generated_string));
let mut haystacks: Vec<Option<String>> = Vec::with_capacity(N_ROWS);
let mut needles: Vec<Option<String>> = Vec::with_capacity(N_ROWS);
for _ in 0..N_ROWS {
let r = rng.random::<f32>();
if r < null_density {
haystacks.push(None);
needles.push(None);
} else {
// Generate random ASCII-only string
let value = rng_ref
let ascii = r >= null_density + utf8_density;
let s = random_string(rng, str_len_chars, ascii);
needles.push(Some(random_substring(rng, &s)));
haystacks.push(Some(s));
}
}

vec![
to_columnar_value(haystacks, is_string_view),
to_columnar_value(needles, is_string_view),
]
}

/// Returns haystack array with a fixed scalar needle inserted into each row.
/// `utf8_density` fraction of rows contain non-ASCII characters.
/// The needle must be ASCII.
fn make_scalar_needle_args(
rng: &mut StdRng,
str_len_chars: usize,
needle: &str,
utf8_density: f32,
is_string_view: bool,
) -> Vec<ColumnarValue> {
let needle_len = needle.len();

let mut haystacks: Vec<Option<String>> = Vec::with_capacity(N_ROWS);
for _ in 0..N_ROWS {
let ascii = rng.random::<f32>() >= utf8_density;
if ascii {
let mut value: Vec<u8> = (&mut *rng)
.sample_iter(&Alphanumeric)
.take(str_len_chars)
.collect();
let value = String::from_utf8(value).unwrap();
output_sub_string_vec.push(Some(random_substring(value.chars())));
output_string_vec.push(Some(value));
if str_len_chars >= needle_len {
let pos = rng.random_range(0..=str_len_chars - needle_len);
value[pos..pos + needle_len].copy_from_slice(needle.as_bytes());
}
haystacks.push(Some(String::from_utf8(value).unwrap()));
} else {
let mut s = random_string(rng, str_len_chars, false);
let char_positions: Vec<usize> = s.char_indices().map(|(i, _)| i).collect();
let insert_pos = if char_positions.len() > 1 {
char_positions[rng.random_range(0..char_positions.len())]
} else {
0
};
s.insert_str(insert_pos, needle);
haystacks.push(Some(s));
}
}

if is_string_view {
let string_view_array: StringViewArray = output_string_vec.into_iter().collect();
let sub_string_view_array: StringViewArray =
output_sub_string_vec.into_iter().collect();
vec![
ColumnarValue::Array(Arc::new(string_view_array)),
ColumnarValue::Array(Arc::new(sub_string_view_array)),
]
} else {
let string_array: StringArray = output_string_vec.clone().into_iter().collect();
let sub_string_array: StringArray = output_sub_string_vec.into_iter().collect();
vec![
ColumnarValue::Array(Arc::new(string_array)),
ColumnarValue::Array(Arc::new(sub_string_array)),
]
}
let needle_cv = ColumnarValue::Scalar(ScalarValue::Utf8(Some(needle.to_string())));
vec![to_columnar_value(haystacks, is_string_view), needle_cv]
}

fn random_substring(chars: Chars) -> String {
// get the substring of a random length from the input string by byte unit
let mut rng = StdRng::seed_from_u64(44);
let count = chars.clone().count();
/// Extracts a random contiguous substring from `s`.
fn random_substring(rng: &mut StdRng, s: &str) -> String {
let count = s.chars().count();
let start = rng.random_range(0..count - 1);
let end = rng.random_range(start + 1..count);
chars
.enumerate()
.filter(|(i, _)| *i >= start && *i < end)
.map(|(_, c)| c)
.collect()
s.chars().skip(start).take(end - start).collect()
}

fn bench_strpos(
c: &mut Criterion,
name: &str,
args: &[ColumnarValue],
strpos: &datafusion_expr::ScalarUDF,
) {
let arg_fields = vec![Field::new("a", args[0].data_type(), true).into()];
let return_field: Arc<Field> = Field::new("f", DataType::Int32, true).into();
let config_options = Arc::new(ConfigOptions::default());

c.bench_function(name, |b| {
b.iter(|| {
black_box(strpos.invoke_with_args(ScalarFunctionArgs {
args: args.to_vec(),
arg_fields: arg_fields.clone(),
number_rows: N_ROWS,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
}))
})
});
}

fn criterion_benchmark(c: &mut Criterion) {
// All benches are single batch run with 8192 rows
let strpos = datafusion_functions::unicode::strpos();
let mut rng = StdRng::seed_from_u64(42);

let n_rows = 8192;
for str_len in [8, 32, 128, 4096] {
// StringArray ASCII only
let args_string_ascii = gen_string_array(n_rows, str_len, 0.1, 0.0, false);
let arg_fields =
vec![Field::new("a", args_string_ascii[0].data_type(), true).into()];
let return_field = Field::new("f", DataType::Int32, true).into();
let config_options = Arc::new(ConfigOptions::default());

c.bench_function(
&format!("strpos_StringArray_ascii_str_len_{str_len}"),
|b| {
b.iter(|| {
black_box(strpos.invoke_with_args(ScalarFunctionArgs {
args: args_string_ascii.clone(),
arg_fields: arg_fields.clone(),
number_rows: n_rows,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
}))
})
},
);

// StringArray UTF8
let args_string_utf8 = gen_string_array(n_rows, str_len, 0.1, 0.5, false);
let arg_fields =
vec![Field::new("a", args_string_utf8[0].data_type(), true).into()];
let return_field = Field::new("f", DataType::Int32, true).into();
c.bench_function(&format!("strpos_StringArray_utf8_str_len_{str_len}"), |b| {
b.iter(|| {
black_box(strpos.invoke_with_args(ScalarFunctionArgs {
args: args_string_utf8.clone(),
arg_fields: arg_fields.clone(),
number_rows: n_rows,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
}))
})
});

// StringViewArray ASCII only
let args_string_view_ascii = gen_string_array(n_rows, str_len, 0.1, 0.0, true);
let arg_fields =
vec![Field::new("a", args_string_view_ascii[0].data_type(), true).into()];
let return_field = Field::new("f", DataType::Int32, true).into();
c.bench_function(
&format!("strpos_StringViewArray_ascii_str_len_{str_len}"),
|b| {
b.iter(|| {
black_box(strpos.invoke_with_args(ScalarFunctionArgs {
args: args_string_view_ascii.clone(),
arg_fields: arg_fields.clone(),
number_rows: n_rows,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
}))
})
},
);

// StringViewArray UTF8
let args_string_view_utf8 = gen_string_array(n_rows, str_len, 0.1, 0.5, true);
let arg_fields =
vec![Field::new("a", args_string_view_utf8[0].data_type(), true).into()];
let return_field = Field::new("f", DataType::Int32, true).into();
c.bench_function(
&format!("strpos_StringViewArray_utf8_str_len_{str_len}"),
|b| {
b.iter(|| {
black_box(strpos.invoke_with_args(ScalarFunctionArgs {
args: args_string_view_utf8.clone(),
arg_fields: arg_fields.clone(),
number_rows: n_rows,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
}))
})
},
);
// Array needle benchmarks
for (label, utf8_density, is_view) in [
("StringArray_ascii", 0.0, false),
("StringArray_utf8", 0.5, false),
("StringViewArray_ascii", 0.0, true),
("StringViewArray_utf8", 0.5, true),
] {
let args =
make_array_needle_args(&mut rng, str_len, 0.1, utf8_density, is_view);
bench_strpos(
c,
&format!("strpos_{label}_str_len_{str_len}"),
&args,
strpos.as_ref(),
);
}

// Scalar needle benchmarks
let needle = "xyz";
for (label, utf8_density, is_view) in [
("StringArray_scalar_needle_ascii", 0.0, false),
("StringArray_scalar_needle_utf8", 0.5, false),
("StringViewArray_scalar_needle_ascii", 0.0, true),
("StringViewArray_scalar_needle_utf8", 0.5, true),
] {
let args =
make_scalar_needle_args(&mut rng, str_len, needle, utf8_density, is_view);
bench_strpos(
c,
&format!("strpos_{label}_str_len_{str_len}"),
&args,
strpos.as_ref(),
);
}
}
}

Expand Down
Loading