From a5fb33156627a109fe2a1dbefffb55a7601fe89c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 18 Jun 2026 12:42:42 +0200 Subject: [PATCH] Speed up rank window materialization --- datafusion/functions-window/Cargo.toml | 4 + datafusion/functions-window/benches/rank.rs | 107 +++++++++++++++++++ datafusion/functions-window/src/cume_dist.rs | 36 ++++--- datafusion/functions-window/src/rank.rs | 93 ++++++++++------ 4 files changed, 195 insertions(+), 45 deletions(-) create mode 100644 datafusion/functions-window/benches/rank.rs diff --git a/datafusion/functions-window/Cargo.toml b/datafusion/functions-window/Cargo.toml index 9c4342adae8fd..d7553b8d47f55 100644 --- a/datafusion/functions-window/Cargo.toml +++ b/datafusion/functions-window/Cargo.toml @@ -58,3 +58,7 @@ criterion = { workspace = true } [[bench]] name = "nth_value" harness = false + +[[bench]] +name = "rank" +harness = false diff --git a/datafusion/functions-window/benches/rank.rs b/datafusion/functions-window/benches/rank.rs new file mode 100644 index 0000000000000..12e4f503d2777 --- /dev/null +++ b/datafusion/functions-window/benches/rank.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::ops::Range; + +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_expr::{PartitionEvaluator, WindowUDFImpl}; +use datafusion_functions_window::cume_dist::CumeDist; +use datafusion_functions_window::rank::Rank; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; + +const NUM_ROWS: usize = 8192; + +fn make_peer_ranges(group_size: usize) -> Vec> { + let mut ranges = Vec::with_capacity(NUM_ROWS.div_ceil(group_size)); + let mut start = 0; + while start < NUM_ROWS { + let end = (start + group_size).min(NUM_ROWS); + ranges.push(start..end); + start = end; + } + ranges +} + +fn create_rank_evaluator(rank: &Rank) -> Box { + rank.partition_evaluator(PartitionEvaluatorArgs::default()) + .unwrap() +} + +fn create_cume_dist_evaluator() -> Box { + CumeDist::new() + .partition_evaluator(PartitionEvaluatorArgs::default()) + .unwrap() +} + +fn bench_rank(c: &mut Criterion) { + let mut group = c.benchmark_group("rank"); + + for group_size in [1, 8, 64] { + let ranges = make_peer_ranges(group_size); + let range_label = format!("group_{group_size}"); + + group.bench_function(BenchmarkId::new("rank", &range_label), |b| { + b.iter(|| { + let evaluator = create_rank_evaluator(&Rank::basic()); + black_box( + evaluator + .evaluate_all_with_rank(NUM_ROWS, black_box(&ranges)) + .unwrap(), + ); + }) + }); + + group.bench_function(BenchmarkId::new("dense_rank", &range_label), |b| { + b.iter(|| { + let evaluator = create_rank_evaluator(&Rank::dense_rank()); + black_box( + evaluator + .evaluate_all_with_rank(NUM_ROWS, black_box(&ranges)) + .unwrap(), + ); + }) + }); + + group.bench_function(BenchmarkId::new("percent_rank", &range_label), |b| { + b.iter(|| { + let evaluator = create_rank_evaluator(&Rank::percent_rank()); + black_box( + evaluator + .evaluate_all_with_rank(NUM_ROWS, black_box(&ranges)) + .unwrap(), + ); + }) + }); + + group.bench_function(BenchmarkId::new("cume_dist", &range_label), |b| { + b.iter(|| { + let evaluator = create_cume_dist_evaluator(); + black_box( + evaluator + .evaluate_all_with_rank(NUM_ROWS, black_box(&ranges)) + .unwrap(), + ); + }) + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_rank); +criterion_main!(benches); diff --git a/datafusion/functions-window/src/cume_dist.rs b/datafusion/functions-window/src/cume_dist.rs index 9d0f082daeb99..13e2c08a21ce8 100644 --- a/datafusion/functions-window/src/cume_dist.rs +++ b/datafusion/functions-window/src/cume_dist.rs @@ -31,7 +31,6 @@ use datafusion_macros::user_doc; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::fmt::Debug; -use std::iter; use std::ops::Range; use std::sync::Arc; @@ -123,18 +122,23 @@ impl PartitionEvaluator for CumeDistEvaluator { ranks_in_partition: &[Range], ) -> Result { let scalar = num_rows as f64; - let result = Float64Array::from_iter_values( - ranks_in_partition - .iter() - .scan(0_u64, |acc, range| { - let len = range.end - range.start; - *acc += len as u64; - let value: f64 = (*acc as f64) / scalar; - let result = iter::repeat_n(value, len); - Some(result) - }) - .flatten(), - ); + let result = if ranks_in_partition.len() == num_rows { + let values: Vec<_> = + (1..=num_rows).map(|rank| rank as f64 / scalar).collect(); + values.into() + } else { + let mut values = Vec::with_capacity(num_rows); + let mut rank = 0_u64; + + for range in ranks_in_partition { + let len = range.end - range.start; + rank += len as u64; + let value = (rank as f64) / scalar; + values.resize(values.len() + len, value); + } + + Float64Array::from(values) + }; Ok(Arc::new(result)) } @@ -172,6 +176,12 @@ mod tests { test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?; + test_f64_result( + 4, + (0..4).map(|idx| idx..idx + 1).collect(), + vec![0.25, 0.5, 0.75, 1.0], + )?; + Ok(()) } } diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index d18bd0748917e..477d5bc19a5e0 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -36,7 +36,6 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::fmt::Debug; use std::hash::Hash; -use std::iter; use std::ops::Range; use std::sync::{Arc, LazyLock}; @@ -317,43 +316,59 @@ impl PartitionEvaluator for RankEvaluator { ranks_in_partition: &[Range], ) -> Result { let result: ArrayRef = match self.rank_type { - RankType::Basic => Arc::new(UInt64Array::from_iter_values( - ranks_in_partition - .iter() - .scan(1_u64, |acc, range| { + RankType::Basic => { + if ranks_in_partition.len() == num_rows { + Arc::new(UInt64Array::from_iter_values(1..=num_rows as u64)) + } else { + let mut values = Vec::with_capacity(num_rows); + let mut rank = 1_u64; + for range in ranks_in_partition { let len = range.end - range.start; - let result = iter::repeat_n(*acc, len); - *acc += len as u64; - Some(result) - }) - .flatten(), - )), - - RankType::Dense => Arc::new(UInt64Array::from_iter_values( - ranks_in_partition - .iter() - .zip(1u64..) - .flat_map(|(range, rank)| { + values.resize(values.len() + len, rank); + rank += len as u64; + } + + Arc::new(UInt64Array::from(values)) + } + } + + RankType::Dense => { + if ranks_in_partition.len() == num_rows { + Arc::new(UInt64Array::from_iter_values(1..=num_rows as u64)) + } else { + let mut values = Vec::with_capacity(num_rows); + for (range, rank) in ranks_in_partition.iter().zip(1_u64..) { let len = range.end - range.start; - iter::repeat_n(rank, len) - }), - )), + values.resize(values.len() + len, rank); + } + + Arc::new(UInt64Array::from(values)) + } + } RankType::Percent => { let denominator = num_rows as f64; + let denominator = (denominator - 1.0).max(1.0); + + if ranks_in_partition.len() == num_rows { + let values: Vec<_> = (0..num_rows) + .map(|rank| rank as f64 / denominator) + .collect(); + let result: Float64Array = values.into(); + Arc::new(result) + } else { + let mut values = Vec::with_capacity(num_rows); + let mut rank = 0_u64; + + for range in ranks_in_partition { + let len = range.end - range.start; + let value = (rank as f64) / denominator; + values.resize(values.len() + len, value); + rank += len as u64; + } - Arc::new(Float64Array::from_iter_values( - ranks_in_partition - .iter() - .scan(0_u64, |acc, range| { - let len = range.end - range.start; - let value = (*acc as f64) / (denominator - 1.0).max(1.0); - let result = iter::repeat_n(value, len); - *acc += len as u64; - Some(result) - }) - .flatten(), - )) + Arc::new(Float64Array::from(values)) + } } }; @@ -419,6 +434,11 @@ mod tests { let r = Rank::basic(); test_without_rank(&r, vec![1; 8])?; test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?; + test_i32_result( + &r, + (0..8).map(|idx| idx..idx + 1).collect(), + vec![1, 2, 3, 4, 5, 6, 7, 8], + )?; Ok(()) } @@ -427,6 +447,11 @@ mod tests { let r = Rank::dense_rank(); test_without_rank(&r, vec![1; 8])?; test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?; + test_i32_result( + &r, + (0..8).map(|idx| idx..idx + 1).collect(), + vec![1, 2, 3, 4, 5, 6, 7, 8], + )?; Ok(()) } @@ -451,6 +476,10 @@ mod tests { let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5]; test_f64_result(&r, 7, vec![0..3, 3..7], expected)?; + // singleton rank groups + let expected = vec![0.0, 0.25, 0.5, 0.75, 1.0]; + test_f64_result(&r, 5, (0..5).map(|idx| idx..idx + 1).collect(), expected)?; + Ok(()) } }