@@ -261,8 +261,9 @@ fn build_case_heavy_left_join_df_with_push_down_filter(
261261 let ctx = SessionContext :: new ( ) ;
262262 register_string_table ( & ctx, 100 , 1000 ) ;
263263 if !push_down_filter_enabled {
264- debug_assert ! (
265- ctx. remove_optimizer_rule( "push_down_filter" ) ,
264+ let removed = ctx. remove_optimizer_rule ( "push_down_filter" ) ;
265+ assert ! (
266+ removed,
266267 "push_down_filter rule should be present in the default optimizer"
267268 ) ;
268269 }
@@ -271,6 +272,58 @@ fn build_case_heavy_left_join_df_with_push_down_filter(
271272 rt. block_on ( async { ctx. sql ( & query) . await . unwrap ( ) } )
272273}
273274
275+ fn build_non_case_left_join_query (
276+ predicate_count : usize ,
277+ nesting_depth : usize ,
278+ ) -> String {
279+ let mut query = String :: from (
280+ "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE " ,
281+ ) ;
282+
283+ if predicate_count == 0 {
284+ query. push_str ( "TRUE" ) ;
285+ return query;
286+ }
287+
288+ // Keep this deterministic so comparisons between profiles are stable.
289+ for i in 0 ..predicate_count {
290+ if i > 0 {
291+ query. push_str ( " AND " ) ;
292+ }
293+
294+ let left_col = i % 20 ;
295+ let mut expr = format ! ( "l.c{left_col}" ) ;
296+ for depth in 0 ..nesting_depth {
297+ let right_col = ( i + depth + 1 ) % 20 ;
298+ expr = format ! ( "coalesce({expr}, r.c{right_col})" ) ;
299+ }
300+
301+ let _ = write ! ( & mut query, "length({expr}) > 2" ) ;
302+ }
303+
304+ query
305+ }
306+
307+ fn build_non_case_left_join_df_with_push_down_filter (
308+ rt : & Runtime ,
309+ predicate_count : usize ,
310+ nesting_depth : usize ,
311+ push_down_filter_enabled : bool ,
312+ ) -> DataFrame {
313+ let ctx = SessionContext :: new ( ) ;
314+ register_string_table ( & ctx, 100 , 1000 ) ;
315+ if !push_down_filter_enabled {
316+ let removed = ctx. remove_optimizer_rule ( "push_down_filter" ) ;
317+ assert ! (
318+ removed,
319+ "push_down_filter rule should be present in the default optimizer"
320+ ) ;
321+ }
322+
323+ let query = build_non_case_left_join_query ( predicate_count, nesting_depth) ;
324+ rt. block_on ( async { ctx. sql ( & query) . await . unwrap ( ) } )
325+ }
326+
274327fn criterion_benchmark ( c : & mut Criterion ) {
275328 let baseline_ctx = SessionContext :: new ( ) ;
276329 let case_heavy_ctx = SessionContext :: new ( ) ;
@@ -289,17 +342,18 @@ fn criterion_benchmark(c: &mut Criterion) {
289342 } )
290343 } ) ;
291344
292- c. bench_function ( "logical_plan_optimize_case_heavy_left_join " , |b| {
345+ c. bench_function ( "logical_plan_optimize_hotspot_case_heavy_left_join " , |b| {
293346 b. iter ( || {
294347 let df_clone = case_heavy_left_join_df. clone ( ) ;
295348 black_box ( rt. block_on ( async { df_clone. into_optimized_plan ( ) . unwrap ( ) } ) ) ;
296349 } )
297350 } ) ;
298351
299- let mut group = c. benchmark_group ( "push_down_filter_case_heavy_left_join_ab" ) ;
300352 let predicate_sweep = [ 10 , 20 , 30 , 40 , 60 ] ;
301353 let case_depth_sweep = [ 1 , 2 , 3 ] ;
302354
355+ let mut hotspot_group =
356+ c. benchmark_group ( "push_down_filter_hotspot_case_heavy_left_join_ab" ) ;
303357 for case_depth in case_depth_sweep {
304358 for predicate_count in predicate_sweep {
305359 let with_push_down_filter =
@@ -323,7 +377,59 @@ fn criterion_benchmark(c: &mut Criterion) {
323377 // - with_push_down_filter: default optimizer path (rule enabled)
324378 // - without_push_down_filter: control path with the rule removed
325379 // Compare both IDs at the same sweep point to isolate rule impact.
326- group. bench_with_input (
380+ hotspot_group. bench_with_input (
381+ BenchmarkId :: new ( "with_push_down_filter" , & input_label) ,
382+ & with_push_down_filter,
383+ |b, df| {
384+ b. iter ( || {
385+ let df_clone = df. clone ( ) ;
386+ black_box (
387+ rt. block_on ( async {
388+ df_clone. into_optimized_plan ( ) . unwrap ( )
389+ } ) ,
390+ ) ;
391+ } )
392+ } ,
393+ ) ;
394+ hotspot_group. bench_with_input (
395+ BenchmarkId :: new ( "without_push_down_filter" , & input_label) ,
396+ & without_push_down_filter,
397+ |b, df| {
398+ b. iter ( || {
399+ let df_clone = df. clone ( ) ;
400+ black_box (
401+ rt. block_on ( async {
402+ df_clone. into_optimized_plan ( ) . unwrap ( )
403+ } ) ,
404+ ) ;
405+ } )
406+ } ,
407+ ) ;
408+ }
409+ }
410+ hotspot_group. finish ( ) ;
411+
412+ let mut control_group =
413+ c. benchmark_group ( "push_down_filter_control_non_case_left_join_ab" ) ;
414+ for nesting_depth in case_depth_sweep {
415+ for predicate_count in predicate_sweep {
416+ let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter (
417+ & rt,
418+ predicate_count,
419+ nesting_depth,
420+ true ,
421+ ) ;
422+ let without_push_down_filter =
423+ build_non_case_left_join_df_with_push_down_filter (
424+ & rt,
425+ predicate_count,
426+ nesting_depth,
427+ false ,
428+ ) ;
429+
430+ let input_label =
431+ format ! ( "predicates={predicate_count},nesting_depth={nesting_depth}" ) ;
432+ control_group. bench_with_input (
327433 BenchmarkId :: new ( "with_push_down_filter" , & input_label) ,
328434 & with_push_down_filter,
329435 |b, df| {
@@ -337,7 +443,7 @@ fn criterion_benchmark(c: &mut Criterion) {
337443 } )
338444 } ,
339445 ) ;
340- group . bench_with_input (
446+ control_group . bench_with_input (
341447 BenchmarkId :: new ( "without_push_down_filter" , & input_label) ,
342448 & without_push_down_filter,
343449 |b, df| {
@@ -353,7 +459,7 @@ fn criterion_benchmark(c: &mut Criterion) {
353459 ) ;
354460 }
355461 }
356- group . finish ( ) ;
462+ control_group . finish ( ) ;
357463}
358464
359465criterion_group ! ( benches, criterion_benchmark) ;
0 commit comments