From fbd35a25a1058f2bc2e94bc2221d09229041e6cb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 14 Feb 2026 21:56:34 -0500 Subject: [PATCH 1/9] fix: handle complex projections in ordering validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, `get_projected_output_ordering` used `ordered_column_indices_from_projection` which was all-or-nothing: if any expression in the projection wasn't a simple Column, it returned None for the entire projection — even if the sort columns themselves were simple column refs. Replace it with `resolve_sort_column_projection` which only requires sort-column positions to resolve to simple Columns. Each ordering is now independently evaluated: orderings on simple column refs get validated with statistics even when other projection expressions are complex. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 107 +++++++++--------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 524e091381c4c..de7282f834441 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1331,19 +1331,33 @@ impl DisplayAs for FileScanConfig { } } -/// Get the indices of columns in a projection if the projection is a simple -/// list of columns. -/// If there are any expressions other than columns, returns None. -fn ordered_column_indices_from_projection( +/// Build a projection index mapping for the sort columns in `ordering`. +/// +/// Returns a `Vec` of the same length as `projection`, where each entry +/// maps a projected-schema column index to the corresponding table-schema column +/// index. Only the entries referenced by sort columns in `ordering` are required +/// to be simple `Column` expressions; non-sort-column positions are filled with a +/// placeholder (0) since they will never be accessed by `MinMaxStatistics`. +/// +/// Returns `None` if any sort column is not a simple `Column` reference in the +/// projected ordering, or if its corresponding projection expression is not a +/// simple `Column`. +fn resolve_sort_column_projection( + ordering: &LexOrdering, projection: &ProjectionExprs, ) -> Option> { - projection - .expr_iter() - .map(|e| { - let index = e.as_any().downcast_ref::()?.index(); - Some(index) - }) - .collect::>>() + let proj_slice = projection.as_ref(); + let mut indices = vec![0usize; proj_slice.len()]; + + for sort_expr in ordering.iter() { + let col = sort_expr.expr.as_any().downcast_ref::()?; + let proj_idx = col.index(); + let proj_expr = proj_slice.get(proj_idx)?; + let table_col = proj_expr.expr.as_any().downcast_ref::()?; + indices[proj_idx] = table_col.index(); + } + + Some(indices) } /// Check whether a given ordering is valid for all file groups by verifying @@ -1457,47 +1471,38 @@ fn get_projected_output_ordering( let projected_orderings = project_orderings(&base_config.output_ordering, projected_schema); - let indices = base_config - .file_source - .projection() - .as_ref() - .map(|p| ordered_column_indices_from_projection(p)); - - match indices { - Some(Some(indices)) => { - // Simple column projection — validate with statistics - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - Some(indices.as_slice()), - ) - } - None => { - // No projection — validate with statistics (no remapping needed) - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - None, - ) - } - Some(None) => { - // Complex projection (expressions, not simple columns) — can't - // determine column indices for statistics. Still valid if all - // file groups have at most one file. - if base_config.file_groups.iter().all(|g| g.len() <= 1) { - projected_orderings - } else { - debug!( - "Skipping specified output orderings. \ - Some file groups couldn't be determined to be sorted: {:?}", - base_config.file_groups - ); - vec![] + let projection = base_config.file_source.projection(); + + projected_orderings + .into_iter() + .filter(|ordering| match projection.as_ref() { + None => { + // No projection — validate directly with statistics + is_ordering_valid_for_file_groups( + &base_config.file_groups, + ordering, + projected_schema, + None, + ) } - } - } + Some(proj) => match resolve_sort_column_projection(ordering, proj) { + Some(indices) => { + // All sort columns resolved — validate with statistics + is_ordering_valid_for_file_groups( + &base_config.file_groups, + ordering, + projected_schema, + Some(&indices), + ) + } + None => { + // Some sort column is a complex expression — can't + // look up statistics. Fall back to single-file check. + base_config.file_groups.iter().all(|g| g.len() <= 1) + } + }, + }) + .collect() } /// Convert type to a type suitable for use as a `ListingTable` From a9b379e76bd245618238dca1d1121aa16f692825 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:23:03 -0500 Subject: [PATCH 2/9] fix: unify ordering display with optimization path Replace the independent display computation (get_projected_output_ordering) with orderings extracted from eq_properties().oeq_class(), so EXPLAIN output always matches what the optimizer actually sees. Previously, fmt_as() independently recomputed orderings via get_projected_output_ordering(), which validated post-projection and would drop valid orderings when any projection expression was complex (e.g. `a + 1`). Now both display and optimization use the same path: validate at table-schema level, then project through EquivalenceProperties::project(). - Delete get_projected_output_ordering and resolve_sort_column_projection - Update DataSource::fmt_as and DisplayAs::fmt_as to use eq_properties() - Add regression tests for complex projections with multi-file groups - Update SLT expectations for equivalence-aware ordering display Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 389 ++++++++++++------ .../test_files/encrypted_parquet.slt | 7 +- .../sqllogictest/test_files/group_by.slt | 4 +- datafusion/sqllogictest/test_files/joins.slt | 2 +- .../test_files/monotonic_projection_test.slt | 4 +- .../sqllogictest/test_files/sort_pushdown.slt | 4 +- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/union.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 10 +- 9 files changed, 273 insertions(+), 151 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index de7282f834441..ee9e39c9d9398 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -600,7 +600,8 @@ impl DataSource for FileScanConfig { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; @@ -639,7 +640,7 @@ impl DataSource for FileScanConfig { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1307,21 +1308,23 @@ impl Debug for FileScanConfig { impl DisplayAs for FileScanConfig { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + if let Ok(schema) = self.projected_schema() { + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } } if let Some(limit) = self.limit { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1331,35 +1334,6 @@ impl DisplayAs for FileScanConfig { } } -/// Build a projection index mapping for the sort columns in `ordering`. -/// -/// Returns a `Vec` of the same length as `projection`, where each entry -/// maps a projected-schema column index to the corresponding table-schema column -/// index. Only the entries referenced by sort columns in `ordering` are required -/// to be simple `Column` expressions; non-sort-column positions are filled with a -/// placeholder (0) since they will never be accessed by `MinMaxStatistics`. -/// -/// Returns `None` if any sort column is not a simple `Column` reference in the -/// projected ordering, or if its corresponding projection expression is not a -/// simple `Column`. -fn resolve_sort_column_projection( - ordering: &LexOrdering, - projection: &ProjectionExprs, -) -> Option> { - let proj_slice = projection.as_ref(); - let mut indices = vec![0usize; proj_slice.len()]; - - for sort_expr in ordering.iter() { - let col = sort_expr.expr.as_any().downcast_ref::()?; - let proj_idx = col.index(); - let proj_expr = proj_slice.get(proj_idx)?; - let table_col = proj_expr.expr.as_any().downcast_ref::()?; - indices[proj_idx] = table_col.index(); - } - - Some(indices) -} - /// Check whether a given ordering is valid for all file groups by verifying /// that files within each group are sorted according to their min/max statistics. /// @@ -1405,106 +1379,6 @@ fn validate_orderings( .collect() } -/// The various listing tables does not attempt to read all files -/// concurrently, instead they will read files in sequence within a -/// partition. This is an important property as it allows plans to -/// run against 1000s of files and not try to open them all -/// concurrently. -/// -/// However, it means if we assign more than one file to a partition -/// the output sort order will not be preserved as illustrated in the -/// following diagrams: -/// -/// When only 1 file is assigned to each partition, each partition is -/// correctly sorted on `(A, B, C)` -/// -/// ```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ -/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// DataFusion DataFusion DataFusion DataFusion -/// ┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// -/// DataSourceExec -/// ``` -/// -/// However, when more than 1 file is assigned to each partition, each -/// partition is NOT correctly sorted on `(A, B, C)`. Once the second -/// file is scanned, the same values for A, B and C can be repeated in -/// the same sorted stream -/// -///```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ │ -/// │ │ │ ┃ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// DataFusion DataFusion ┃ -/// ┃ Partition 1 Partition 2 -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ -/// -/// DataSourceExec -/// ``` -fn get_projected_output_ordering( - base_config: &FileScanConfig, - projected_schema: &SchemaRef, -) -> Vec { - let projected_orderings = - project_orderings(&base_config.output_ordering, projected_schema); - - let projection = base_config.file_source.projection(); - - projected_orderings - .into_iter() - .filter(|ordering| match projection.as_ref() { - None => { - // No projection — validate directly with statistics - is_ordering_valid_for_file_groups( - &base_config.file_groups, - ordering, - projected_schema, - None, - ) - } - Some(proj) => match resolve_sort_column_projection(ordering, proj) { - Some(indices) => { - // All sort columns resolved — validate with statistics - is_ordering_valid_for_file_groups( - &base_config.file_groups, - ordering, - projected_schema, - Some(&indices), - ) - } - None => { - // Some sort column is a complex expression — can't - // look up statistics. Fall back to single-file check. - base_config.file_groups.iter().all(|g| g.len() <= 1) - } - }, - }) - .collect() -} - /// Convert type to a type suitable for use as a `ListingTable` /// partition column. Returns `Dictionary(UInt16, val_type)`, which is /// a reasonable trade off between a reasonable number of partition @@ -2607,4 +2481,247 @@ mod tests { Ok(()) } + + /// Helper: create a `PartitionedFile` with min/max stats for the given columns. + fn partitioned_file_with_stats( + name: &str, + col_stats: Vec<(ScalarValue, ScalarValue)>, + ) -> PartitionedFile { + let column_statistics: Vec = col_stats + .into_iter() + .map(|(min, max)| ColumnStatistics { + min_value: Precision::Exact(min), + max_value: Precision::Exact(max), + null_count: Precision::Exact(0), + ..ColumnStatistics::new_unknown() + }) + .collect(); + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics, + }; + PartitionedFile::new(name, 1024).with_statistics(Arc::new(stats)) + } + + /// Regression test: with a complex projection like `a + 1`, the display + /// path should still show orderings (it delegates to `eq_properties()` + /// which validates at table-schema level, then projects correctly). + #[test] + fn test_display_ordering_with_complex_projection_multi_file() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, non-overlapping b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), // b + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), // b + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Push a complex projection: [a + 1 AS x, b] + let expr_a_plus_1: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(expr_a_plus_1, "x"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Format via DisplayAs and verify ordering is present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + assert!( + display.contains("output_ordering="), + "Expected output_ordering in display, but got: {display}" + ); + assert!( + display.contains("b@1 ASC"), + "Expected 'b@1 ASC' in display, but got: {display}" + ); + } + + /// Verify orderings ARE dropped when file statistics overlap + /// (ordering is genuinely invalid for multi-file groups). + #[test] + fn test_display_ordering_dropped_for_overlapping_stats() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, OVERLAPPING b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(20))), // overlaps! + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Format and verify ordering is NOT present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(config.clone()) + ); + assert!( + !display.contains("output_ordering"), + "Expected no output_ordering for overlapping stats, but got: {display}" + ); + } + + /// Verify the display path and optimization path agree: orderings from + /// `eq_properties().oeq_class()` match what appears in `fmt_as()` output. + #[test] + fn test_display_ordering_matches_eq_properties() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Non-overlapping b statistics across two files + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Simple projection [a, b] + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Get orderings from eq_properties (what the optimizer sees) + let eq_props = new_config.eq_properties(); + let oeq_orderings = eq_props.oeq_class(); + assert!( + !oeq_orderings.is_empty(), + "eq_properties should report orderings for valid non-overlapping files" + ); + + // Get display output + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + + // Verify they agree: each ordering from eq_properties should appear in display + for ordering in oeq_orderings.iter() { + let ordering_str = format!("{ordering}"); + assert!( + display.contains(&ordering_str), + "Display should contain ordering '{ordering_str}', but got: {display}" + ); + } + } } diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index d580b7d1ad2b8..42985d6b2c6f5 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,5 +85,10 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided +query RR SELECT * FROM parquet_table +---- +1 2 +5 6 +3 4 +-1 -1 diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 294841552a66d..539a6d96b1743 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2274,7 +2274,7 @@ logical_plan 02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d) @@ -5001,7 +5001,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query II? SELECT a, b, ARRAY_AGG(c ORDER BY c DESC) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 228918c3855f2..4ab6259aa48d1 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3456,7 +3456,7 @@ logical_plan 05)----TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=example(join_proj_push_down_1@0, join_proj_push_down_2@1) > 3, projection=[a0@0, a@1, b@2, c@3, d@4, a0@6, a@7, b@8, c@9, d@10] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_orderings=[[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [join_proj_push_down_1@5 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true 03)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] 04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 7feefc169fcab..3144977b678f2 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 99f26b66d458b..d1599e769be16 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -404,7 +404,7 @@ logical_plan 01)Sort: timeseries_parquet.period_end ASC NULLS LAST, fetch=2 02)--Filter: timeseries_parquet.timeframe = Utf8View("quarterly") 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Test 2.4: Verify ASC results query TIR @@ -458,7 +458,7 @@ logical_plan 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] physical_plan 01)SortExec: TopK(fetch=2), expr=[period_end@1 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Results should still be correct query TIR diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a1fef0722297..3f135c25c86e3 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index d858d0ae3ea4e..d158fa50a107d 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -592,7 +592,7 @@ physical_plan 01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST] 02)--UnionExec 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok drop table t1 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 62296c5d87f2b..b2a2c1cb050ed 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3507,7 +3507,7 @@ logical_plan physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--FilterExec: b@2 = 0 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Since column b is constant after filter b=0, # window requirement b ASC, d ASC can be satisfied @@ -3525,7 +3525,7 @@ physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false] 03)----FilterExec: b@2 = 0 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Create an unbounded source where there is multiple orderings. @@ -3561,7 +3561,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3605,7 +3605,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) From afa6392ed9f6cfa8caddd104e79f00711103d018 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:28:45 -0500 Subject: [PATCH 3/9] docs: move ASCII art diagrams to validated_output_ordering The partition/file ordering diagrams from the deleted get_projected_output_ordering are useful context for understanding why we validate orderings against file statistics. Move them to validated_output_ordering where they belong. Co-Authored-By: Claude Opus 4.6 --- datafusion/datasource/src/file_scan_config.rs | 74 ++++++++++++++++--- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index ee9e39c9d9398..cf98c834f342e 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -942,16 +942,69 @@ impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. /// + /// The various listing tables do not attempt to read all files + /// concurrently, instead they read files in sequence within a + /// partition. This is an important property as it allows plans to + /// run against 1000s of files and not try to open them all + /// concurrently. + /// + /// However, it means if we assign more than one file to a partition + /// the output sort order will not be preserved unless the files' + /// min/max statistics prove the combined stream is still ordered. + /// + /// When only 1 file is assigned to each partition, each partition is + /// correctly sorted on `(A, B, C)`: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ + /// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// Partition 1 Partition 2 Partition 3 Partition 4 + /// ┃ ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// DataSourceExec + /// ``` + /// + /// However, when more than 1 file is assigned to each partition, each + /// partition is NOT necessarily sorted on `(A, B, C)`. Once the second + /// file is scanned, the same values for A, B and C can be repeated in + /// the same sorted stream: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + /// Partition 1 Partition 2 ┃ + /// ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ + /// DataSourceExec + /// ``` + /// /// For example, individual files may be ordered by `col1 ASC`, - /// but if we have files with these min/max statistics in a single partition / file group: + /// but if we have files with these min/max statistics in a single + /// partition / file group: /// /// - file1: min(col1) = 10, max(col1) = 20 /// - file2: min(col1) = 5, max(col1) = 15 /// - /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap - /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering. + /// Because reading file1 followed by file2 would produce out-of-order + /// output (there is overlap in the ranges), we cannot retain `col1 ASC` + /// as a valid output ordering. /// - /// Similarly this would not be a valid order (non-overlapping ranges but not ordered): + /// Similarly this would not be a valid order (non-overlapping ranges + /// but not ordered): /// /// - file1: min(col1) = 20, max(col1) = 30 /// - file2: min(col1) = 10, max(col1) = 15 @@ -961,13 +1014,14 @@ impl FileScanConfig { /// - file1: min(col1) = 5, max(col1) = 15 /// - file2: min(col1) = 16, max(col1) = 25 /// - /// Then we know that reading file1 followed by file2 will produce ordered output, - /// so `col1 ASC` would be retained. + /// Then we know that reading file1 followed by file2 will produce + /// ordered output, so `col1 ASC` would be retained. /// - /// Note that we are checking for ordering *within* *each* file group / partition, - /// files in different partitions are read independently and do not affect each other's ordering. - /// Merging of the multiple partition streams into a single ordered stream is handled - /// upstream e.g. by `SortPreservingMergeExec`. + /// Note that we are checking for ordering *within* *each* file group / + /// partition — files in different partitions are read independently and + /// do not affect each other's ordering. Merging of the multiple + /// partition streams into a single ordered stream is handled upstream + /// e.g. by `SortPreservingMergeExec`. fn validated_output_ordering(&self) -> Vec { let schema = self.file_source.table_schema().table_schema(); validate_orderings(&self.output_ordering, schema, &self.file_groups, None) From ad6fc288fbd5cf31192a6adef475772d637fad42 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 11:42:14 -0500 Subject: [PATCH 4/9] lint --- datafusion/datasource/src/file_scan_config.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index cf98c834f342e..bc4b9c5d80941 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1368,10 +1368,10 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if let Ok(schema) = self.projected_schema() { - if !schema.fields().is_empty() { - write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; - } + if let Ok(schema) = self.projected_schema() + && !schema.fields().is_empty() + { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } if let Some(limit) = self.limit { From b036b4444bbc764685f75e34328311a7ec9dcc41 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 15 Feb 2026 13:26:37 -0500 Subject: [PATCH 5/9] fix --- datafusion/sqllogictest/test_files/encrypted_parquet.slt | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index 42985d6b2c6f5..d580b7d1ad2b8 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -85,10 +85,5 @@ float_field float ) STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' -query RR +query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided SELECT * FROM parquet_table ----- -1 2 -5 6 -3 4 --1 -1 From d418f1c94a2f85216178b963572e56afd5e0632a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:03:34 +0000 Subject: [PATCH 6/9] Update datafusion/datasource/src/file_scan_config.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- datafusion/datasource/src/file_scan_config.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index bc4b9c5d80941..c18cb56e4b808 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1368,9 +1368,11 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if let Ok(schema) = self.projected_schema() - && !schema.fields().is_empty() - { + let schema = self + .projected_schema() + .map_err(|_| std::fmt::Error)?; + + if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } From f6a4bc9ba908781c73bb1d334cf559ceeb681d1f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 22 Feb 2026 11:11:53 +0000 Subject: [PATCH 7/9] Apply suggestion from @adriangb --- datafusion/datasource/src/file_scan_config.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c18cb56e4b808..a5d1d4013e380 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1368,10 +1368,7 @@ impl DisplayAs for FileScanConfig { write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - let schema = self - .projected_schema() - .map_err(|_| std::fmt::Error)?; - + let schema = self.projected_schema().map_err(|_| std::fmt::Error)?; if !schema.fields().is_empty() { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } From 70a99a6afb2d2010028098628200b4f7f4ddb74e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 6 Mar 2026 14:19:43 -0800 Subject: [PATCH 8/9] fix: preserve declaration order of orderings in EXPLAIN display Three changes to ensure projected orderings match the original declaration order: 1. In OrderingEquivalenceClass::remove_redundant_entries(), use Vec::remove() instead of Vec::swap_remove() to preserve ordering during dedup. 2. In projected_orderings(), swap chain order so dependency-map-based orderings (which preserve declaration order via IndexMap) come before newly derived orderings from projection mapping. 3. In substitute_oeq_class(), push the original sort expression first before substituted (derived) expressions so the cartesian product preserves declaration order. Co-Authored-By: Claude Opus 4.6 --- datafusion/physical-expr/src/equivalence/ordering.rs | 4 ++-- .../physical-expr/src/equivalence/properties/mod.rs | 12 ++++++++---- datafusion/sqllogictest/test_files/group_by.slt | 4 ++-- .../test_files/monotonic_projection_test.slt | 4 ++-- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 10 +++++----- 6 files changed, 20 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 2ce8a8d246fe7..76d98ae4bbecb 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -107,14 +107,14 @@ impl OrderingEquivalenceClass { if let Some(remove) = self.resolve_overlap(idx, ordering_idx) { work = true; if remove { - self.orderings.swap_remove(idx); + self.orderings.remove(idx); continue 'outer; } } if let Some(remove) = self.resolve_overlap(ordering_idx, idx) { work = true; if remove { - self.orderings.swap_remove(ordering_idx); + self.orderings.remove(ordering_idx); continue; } } diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index a98341b10765a..94ae36a0a31b8 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -838,7 +838,9 @@ impl EquivalenceProperties { .map(|(source, _target)| source) .filter(|source| expr_refers(source, &sort_expr.expr)) .cloned(); - let mut result = vec![]; + // Start with the original expression so that + // declared orderings appear before derived ones: + let mut result = vec![sort_expr.clone()]; // The sort expression comes from this schema, so the // following call to `unwrap` is safe. let expr_type = sort_expr.expr.data_type(schema).unwrap(); @@ -860,7 +862,6 @@ impl EquivalenceProperties { } } } - result.push(sort_expr); result }) // Generate all valid orderings given substituted expressions: @@ -1099,8 +1100,11 @@ impl EquivalenceProperties { prefixes }); - // Simplify each ordering by removing redundant sections: - orderings.chain(projected_orderings).collect() + // Simplify each ordering by removing redundant sections. + // Place projected_orderings first so that orderings reconstructed + // from the dependency map (which preserves declaration order via + // IndexMap) appear before newly derived orderings from Pass 1: + projected_orderings.chain(orderings).collect() } /// Projects constraints according to the given projection mapping. diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 539a6d96b1743..294841552a66d 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2274,7 +2274,7 @@ logical_plan 02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d) @@ -5001,7 +5001,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true query II? SELECT a, b, ARRAY_AGG(c ORDER BY c DESC) diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 3144977b678f2..62bad823c0111 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 3f135c25c86e3..38305c2a23d6b 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index b2a2c1cb050ed..d07cf8dea1983 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3507,7 +3507,7 @@ logical_plan physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--FilterExec: b@2 = 0 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true # Since column b is constant after filter b=0, # window requirement b ASC, d ASC can be satisfied @@ -3525,7 +3525,7 @@ physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false] 03)----FilterExec: b@2 = 0 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true # Create an unbounded source where there is multiple orderings. @@ -3561,7 +3561,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3605,7 +3605,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC) From 8234ab11eb9acbd6a9d3b0d6dbeedb04fe0a870e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 6 Mar 2026 14:33:30 -0800 Subject: [PATCH 9/9] fix: prefer removing later duplicates in remove_redundant_entries Swap the order of resolve_overlap checks so that when two orderings are exact duplicates, the later occurrence is removed instead of the earlier one. This preserves the declaration order of orderings when the same ordering appears multiple times (e.g. from both Pass 1 and Pass 2 of projected_orderings). Co-Authored-By: Claude Opus 4.6 --- .../physical-expr/src/equivalence/ordering.rs | 15 +++++++++------ .../src/equivalence/properties/dependency.rs | 2 +- .../test_files/monotonic_projection_test.slt | 4 ++-- datafusion/sqllogictest/test_files/topk.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 4 ++-- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 76d98ae4bbecb..48e2a37ce7752 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -104,18 +104,21 @@ impl OrderingEquivalenceClass { 'outer: while idx < self.orderings.len() { let mut ordering_idx = idx + 1; while ordering_idx < self.orderings.len() { - if let Some(remove) = self.resolve_overlap(idx, ordering_idx) { + // Check the later index first so that for exact + // duplicates we remove the later occurrence and + // preserve the declaration order of orderings: + if let Some(remove) = self.resolve_overlap(ordering_idx, idx) { work = true; if remove { - self.orderings.remove(idx); - continue 'outer; + self.orderings.remove(ordering_idx); + continue; } } - if let Some(remove) = self.resolve_overlap(ordering_idx, idx) { + if let Some(remove) = self.resolve_overlap(idx, ordering_idx) { work = true; if remove { - self.orderings.remove(ordering_idx); - continue; + self.orderings.remove(idx); + continue 'outer; } } ordering_idx += 1; diff --git a/datafusion/physical-expr/src/equivalence/properties/dependency.rs b/datafusion/physical-expr/src/equivalence/properties/dependency.rs index edbf7033f4e7a..bba19569a9c6f 100644 --- a/datafusion/physical-expr/src/equivalence/properties/dependency.rs +++ b/datafusion/physical-expr/src/equivalence/properties/dependency.rs @@ -494,7 +494,7 @@ mod tests { assert_eq!( out_properties.to_string(), - "order: [[a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC], [a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC]]" + "order: [[a@0 ASC, b@1 ASC, c@2 ASC, d@3 ASC], [a@0 ASC, c@2 ASC, b@1 ASC, d@3 ASC]]" ); Ok(()) diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 62bad823c0111..3144977b678f2 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 38305c2a23d6b..e202f35583e8f 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, letter@1 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index d07cf8dea1983..f355dcf6498f2 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3345,7 +3345,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum3, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as sum4] 02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Linear] -03)----RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, preserve_order=true, sort_exprs=__common_expr_1@0 ASC NULLS LAST, a@1 ASC NULLS LAST +03)----RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST 04)------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as a, d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 06)----------RepartitionExec: partitioning=Hash([b@2, a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST