diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..d58ac1f1cd5eb 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -43,8 +43,10 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, + tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -597,7 +599,11 @@ impl ParquetMorselizer { )); let mut projection = self.projection.clone(); - let mut predicate = self.predicate.clone(); + let mut predicate = self + .predicate + .clone() + .map(|p| rewrite_partition_index_dynamic_filters(p, self.partition_index)) + .transpose()?; if !literal_columns.is_empty() { projection = projection.try_map_exprs(|expr| { replace_columns_with_literals(Arc::clone(&expr), &literal_columns) @@ -1589,6 +1595,27 @@ pub(crate) fn build_pruning_predicates( ) } +/// Replaces partition-index dynamic filters with the filter for the parquet +/// execution partition currently opening a file. +fn rewrite_partition_index_dynamic_filters( + predicate: Arc, + partition_index: usize, +) -> Result> { + predicate + .transform_up(|expr| { + let Some(dynamic_filter) = expr.downcast_ref::() + else { + return Ok(Transformed::no(expr)); + }; + + match dynamic_filter.partition_filter(partition_index)? { + Some(partition_expr) => Ok(Transformed::yes(partition_expr)), + None => Ok(Transformed::no(expr)), + } + }) + .data() +} + /// Returns a `ArrowReaderMetadata` with the page index loaded, loading /// it from the underlying `AsyncFileReader` if necessary. async fn load_page_index( @@ -1637,7 +1664,7 @@ mod test { use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, - expressions::{Column, DynamicFilterPhysicalExpr, Literal}, + expressions::{Column, DynamicFilterPhysicalExpr, Literal, lit as physical_lit}, planner::logical2physical, projection::ProjectionExprs, }; @@ -2011,6 +2038,41 @@ mod test { )) } + #[test] + fn test_rewrite_partition_index_dynamic_filters() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let initial = logical2physical(&col("a").gt(lit(0)), &schema); + let partition_0 = logical2physical(&col("a").gt(lit(10)), &schema); + + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + initial.children().into_iter().map(Arc::clone).collect(), + initial, + )); + dynamic_filter + .update_partitioned(physical_lit(true), vec![Some(partition_0), None]) + .unwrap(); + + let rewritten_0 = rewrite_partition_index_dynamic_filters( + Arc::clone(&dynamic_filter) as Arc, + 0, + ) + .unwrap(); + assert_eq!( + format!("{rewritten_0:?}"), + r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Gt, right: Literal { value: Int32(10), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"# + ); + + let rewritten_1 = rewrite_partition_index_dynamic_filters( + dynamic_filter as Arc, + 1, + ) + .unwrap(); + assert_eq!( + format!("{rewritten_1:?}"), + r#"Literal { value: Boolean(false), field: Field { name: "lit", data_type: Boolean } }"# + ); + } + #[tokio::test] async fn test_prune_on_statistics() { let store = Arc::new(InMemory::new()) as Arc; diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..e9e214e1cbf95 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -43,7 +43,10 @@ use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction}; +use datafusion_physical_expr::{ + EquivalenceProperties, PartitionRange, Partitioning, RangeBound, RangePartitioning, + projection::ProjectionMapping, split_conjunction, +}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; @@ -202,14 +205,20 @@ pub struct FileScanConfig { /// would be incorrect if there are filters being applied, thus this should be accessed /// via [`FileScanConfig::statistics`]. pub(crate) statistics: Statistics, - /// When true, file_groups are organized by partition column values - /// and output_partitioning will return Hash partitioning on partition columns. - /// This allows the optimizer to skip hash repartitioning for aggregates and joins - /// on partition columns. + /// When true, file_groups are organized by partition column values. + /// When each file group has a single partition-value tuple, + /// output_partitioning will return range partitioning on partition columns. + /// This allows the optimizer to skip repartitioning for aggregates and joins + /// on partition columns without advertising hash partitioning. /// /// If the number of file partitions > target_partitions, the file partitions will be grouped /// in a round-robin fashion such that number of file partitions = target_partitions. pub partitioned_by_file_group: bool, + /// Optional physical partitioning metadata for [`Self::file_groups`]. + /// + /// The partition count must match `file_groups.len()`. Expressions are in + /// terms of the full table schema before scan projection. + pub file_group_partitioning: Option, } /// A builder for [`FileScanConfig`]'s. @@ -280,6 +289,7 @@ pub struct FileScanConfigBuilder { batch_size: Option, expr_adapter_factory: Option>, partitioned_by_file_group: bool, + file_group_partitioning: Option, } impl FileScanConfigBuilder { @@ -306,6 +316,7 @@ impl FileScanConfigBuilder { batch_size: None, expr_adapter_factory: None, partitioned_by_file_group: false, + file_group_partitioning: None, } } @@ -496,8 +507,8 @@ impl FileScanConfigBuilder { /// Set whether file groups are organized by partition column values. /// - /// When set to true, the output partitioning will be declared as Hash partitioning - /// on the partition columns. + /// When set to true, output partitioning will be derived from file + /// partition values when each file group has a single partition-value tuple. pub fn with_partitioned_by_file_group( mut self, partitioned_by_file_group: bool, @@ -506,6 +517,15 @@ impl FileScanConfigBuilder { self } + /// Declare the physical partitioning represented by the file groups. + /// + /// The partition count must match the number of file groups. Expressions + /// are in terms of the full table schema before scan projection. + pub fn with_file_group_partitioning(mut self, partitioning: Partitioning) -> Self { + self.file_group_partitioning = Some(partitioning); + self + } + /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -527,6 +547,7 @@ impl FileScanConfigBuilder { batch_size, expr_adapter_factory: expr_adapter, partitioned_by_file_group, + file_group_partitioning, } = self; let constraints = constraints.unwrap_or_default(); @@ -552,6 +573,7 @@ impl FileScanConfigBuilder { expr_adapter_factory: expr_adapter, statistics, partitioned_by_file_group, + file_group_partitioning, } } } @@ -571,6 +593,7 @@ impl From for FileScanConfigBuilder { batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, partitioned_by_file_group: config.partitioned_by_file_group, + file_group_partitioning: config.file_group_partitioning, } } } @@ -687,8 +710,8 @@ impl DataSource for FileScanConfig { ) -> Result>> { // When files are grouped by partition values, we cannot allow byte-range // splitting. It would mix rows from different partition values across - // file groups, breaking the Hash partitioning. - if self.partitioned_by_file_group { + // file groups, breaking the advertised partitioning. + if self.partitioned_by_file_group || self.file_group_partitioning.is_some() { return Ok(None); } @@ -704,9 +727,15 @@ impl DataSource for FileScanConfig { /// Returns the output partitioning for this file scan. /// - /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on - /// the Hive partition columns, allowing the optimizer to skip hash repartitioning - /// for aggregates and joins on those columns. + /// If [`Self::file_group_partitioning`] is set and has the same partition + /// count as [`Self::file_groups`], that declared partitioning is projected + /// to the scan output and returned. + /// + /// When `partitioned_by_file_group` is true, this returns range + /// partitioning on the Hive partition columns when each file group has one + /// partition value tuple. This truthfully describes file groups that are + /// partitioned by partition-column values and allows the optimizer to skip hash + /// repartitioning for aggregates and joins on those columns. /// /// Tradeoffs /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with @@ -719,6 +748,23 @@ impl DataSource for FileScanConfig { /// - Idea: Could allow byte-range splitting within partition-aware groups, /// preserving I/O parallelism while maintaining partition semantics. fn output_partitioning(&self) -> Partitioning { + if let Some(partitioning) = &self.file_group_partitioning { + if partitioning.partition_count() == self.file_groups.len() { + match self.project_file_group_partitioning(partitioning) { + Ok(partitioning) => return partitioning, + Err(e) => { + debug!("Could not project file group partitioning: {e}"); + } + } + } else { + debug!( + "Ignoring file group partitioning with {} partitions for {} file groups", + partitioning.partition_count(), + self.file_groups.len() + ); + } + } + if self.partitioned_by_file_group { let partition_cols = self.table_partition_cols(); if !partition_cols.is_empty() { @@ -747,7 +793,15 @@ impl DataSource for FileScanConfig { } if exprs.len() == partition_cols.len() { - return Partitioning::Hash(exprs, self.file_groups.len()); + match self.file_group_range_partitioning(exprs) { + Ok(Some(partitioning)) => return partitioning, + Ok(None) => {} + Err(e) => { + debug!( + "Could not build range partitioning from file groups: {e}" + ); + } + } } } } @@ -1093,6 +1147,53 @@ impl FileScanConfig { } } + fn project_file_group_partitioning( + &self, + partitioning: &Partitioning, + ) -> Result { + let Some(projection) = self.file_source.projection() else { + return Ok(partitioning.clone()); + }; + + let schema = self.file_source.table_schema().table_schema(); + let mapping: ProjectionMapping = projection.projection_mapping(schema)?; + let eq_properties = EquivalenceProperties::new(Arc::clone(schema)); + Ok(partitioning.project(&mapping, &eq_properties)) + } + + fn file_group_range_partitioning( + &self, + exprs: Vec>, + ) -> Result> { + let mut ranges = Vec::with_capacity(self.file_groups.len()); + for file_group in &self.file_groups { + let Some(first_file) = file_group.files().first() else { + return Ok(None); + }; + let partition_values = &first_file.partition_values; + if partition_values.len() != exprs.len() { + return Ok(None); + } + + let all_files_match = file_group + .files() + .iter() + .all(|file| file.partition_values == *partition_values); + if !all_files_match { + return Ok(None); + } + + ranges.push(PartitionRange::new( + Some(RangeBound::inclusive(partition_values.clone())), + Some(RangeBound::inclusive(partition_values.clone())), + )); + } + + RangePartitioning::try_new(exprs, ranges) + .map(Partitioning::range) + .map(Some) + } + fn add_filter_equivalence_info( filter: &Arc, eq_properties: &mut EquivalenceProperties, @@ -2442,6 +2543,84 @@ mod tests { assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_))); } + #[test] + fn test_output_partitioning_declared_file_group_range() -> Result<()> { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Utf8, false), + Field::new("key", DataType::Int64, false), + ])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + let partitioning = Partitioning::range(RangePartitioning::try_new( + vec![Arc::new(Column::new("key", 1))], + vec![ + PartitionRange::new( + None, + Some(RangeBound::exclusive(vec![ScalarValue::Int64(Some(100))])), + ), + PartitionRange::new( + Some(RangeBound::inclusive(vec![ScalarValue::Int64(Some(100))])), + None, + ), + ], + )?); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + file_source, + ) + .with_projection_indices(Some(vec![1])) + .unwrap() + .with_file_groups(vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet", 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet", 1024)]), + ]) + .with_file_group_partitioning(partitioning) + .build(); + + let partitioning = config.output_partitioning(); + let range = partitioning + .as_range() + .expect("Expected range partitioning"); + assert_eq!(range.partition_count(), 2); + assert_eq!(range.exprs().len(), 1); + let column = range.exprs()[0].downcast_ref::().unwrap(); + assert_eq!(column.name(), "key"); + assert_eq!(column.index(), 0); + + Ok(()) + } + + #[test] + fn test_output_partitioning_declared_file_group_range_mismatch() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("key", DataType::Int64, false)])); + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(MockSource::new(table_schema)); + let partitioning = Partitioning::range(RangePartitioning::try_new( + vec![Arc::new(Column::new("key", 0))], + vec![PartitionRange::unbounded()], + )?); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + file_source, + ) + .with_file_groups(vec![ + FileGroup::new(vec![PartitionedFile::new("f1.parquet", 1024)]), + FileGroup::new(vec![PartitionedFile::new("f2.parquet", 1024)]), + ]) + .with_file_group_partitioning(partitioning) + .build(); + + assert!(matches!( + config.output_partitioning(), + Partitioning::UnknownPartitioning(2) + )); + + Ok(()) + } + #[test] fn test_output_partitioning_with_partition_columns() { let file_schema = aggr_test_schema(); @@ -2461,20 +2640,36 @@ mod tests { ); config.partitioned_by_file_group = true; config.file_groups = vec![ - FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), - FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), - FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]), + FileGroup::new(vec![ + PartitionedFile::new("f1.parquet".to_string(), 1024) + .with_partition_values(vec![wrap_partition_value_in_dict( + ScalarValue::from("2024-01-01"), + )]), + ]), + FileGroup::new(vec![ + PartitionedFile::new("f2.parquet".to_string(), 1024) + .with_partition_values(vec![wrap_partition_value_in_dict( + ScalarValue::from("2024-01-02"), + )]), + ]), + FileGroup::new(vec![ + PartitionedFile::new("f3.parquet".to_string(), 1024) + .with_partition_values(vec![wrap_partition_value_in_dict( + ScalarValue::from("2024-01-03"), + )]), + ]), ]; let partitioning = config.output_partitioning(); - match partitioning { - Partitioning::Hash(exprs, num_partitions) => { - assert_eq!(num_partitions, 3); - assert_eq!(exprs.len(), 1); - assert_eq!(exprs[0].downcast_ref::().unwrap().name(), "date"); - } - _ => panic!("Expected Hash partitioning"), - } + let range = partitioning + .as_range() + .expect("Expected range partitioning"); + assert_eq!(range.partition_count(), 3); + assert_eq!(range.exprs().len(), 1); + assert_eq!( + range.exprs()[0].downcast_ref::().unwrap().name(), + "date" + ); // Test multiple partition columns let multiple_partition_cols = vec![ @@ -2490,23 +2685,34 @@ mod tests { ); config.partitioned_by_file_group = true; config.file_groups = vec![ - FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]), - FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]), + FileGroup::new(vec![ + PartitionedFile::new("f1.parquet".to_string(), 1024) + .with_partition_values(vec![ + wrap_partition_value_in_dict(ScalarValue::from("2024")), + wrap_partition_value_in_dict(ScalarValue::from("01")), + ]), + ]), + FileGroup::new(vec![ + PartitionedFile::new("f2.parquet".to_string(), 1024) + .with_partition_values(vec![ + wrap_partition_value_in_dict(ScalarValue::from("2024")), + wrap_partition_value_in_dict(ScalarValue::from("02")), + ]), + ]), ]; let partitioning = config.output_partitioning(); - match partitioning { - Partitioning::Hash(exprs, num_partitions) => { - assert_eq!(num_partitions, 2); - assert_eq!(exprs.len(), 2); - let col_names: Vec<_> = exprs - .iter() - .map(|e| e.downcast_ref::().unwrap().name()) - .collect(); - assert_eq!(col_names, vec!["year", "month"]); - } - _ => panic!("Expected Hash partitioning"), - } + let range = partitioning + .as_range() + .expect("Expected range partitioning"); + assert_eq!(range.partition_count(), 2); + assert_eq!(range.exprs().len(), 2); + let col_names: Vec<_> = range + .exprs() + .iter() + .map(|e| e.downcast_ref::().unwrap().name()) + .collect(); + assert_eq!(col_names, vec!["year", "month"]); } #[test] diff --git a/datafusion/ffi/src/physical_expr/partitioning.rs b/datafusion/ffi/src/physical_expr/partitioning.rs index 434b6a097e645..15133e4e9074e 100644 --- a/datafusion/ffi/src/physical_expr/partitioning.rs +++ b/datafusion/ffi/src/physical_expr/partitioning.rs @@ -46,6 +46,9 @@ impl From<&Partitioning> for FFI_Partitioning { Self::Hash(exprs, *size) } Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size), + Partitioning::Custom(custom) => { + Self::UnknownPartitioning(custom.partition_count()) + } } } } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 2db328377a5e1..3ee71d5d181ec 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::{fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; -use crate::PhysicalExpr; +use crate::{PhysicalExpr, expressions::lit}; use arrow::datatypes::{DataType, Schema}; use datafusion_common::{ Result, @@ -94,6 +94,11 @@ pub struct Inner { /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. pub generation: u64, pub expr: Arc, + /// Per-partition filter expressions for partition-index routing. + /// When set (via [`DynamicFilterPhysicalExpr::update_partitioned`]), each + /// entry corresponds to a build partition. `Some(expr)` means the partition + /// has a filter. `None` means the partition was empty. + pub partitioned_exprs: Option>>>, /// Flag for quick synchronous check if filter is complete. /// This is redundant with the watch channel state, but allows us to return immediately /// from `wait_complete()` without subscribing if already complete. @@ -126,6 +131,7 @@ impl Inner { // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, expr, + partitioned_exprs: None, is_complete: false, } } @@ -278,6 +284,7 @@ impl DynamicFilterPhysicalExpr { expression_id: current.expression_id, generation: new_generation, expr: new_expr, + partitioned_exprs: None, is_complete: current.is_complete, }; drop(current); // Release the lock before broadcasting @@ -289,6 +296,83 @@ impl DynamicFilterPhysicalExpr { Ok(()) } + /// Update this filter with a global fallback plus partition-local filters. + pub fn update_partitioned( + &self, + global_expr: Arc, + partitioned_exprs: Vec>>, + ) -> Result<()> { + let global_expr = Self::remap_children( + &self.children, + self.remapped_children.as_ref(), + global_expr, + )?; + let partitioned_exprs = partitioned_exprs + .into_iter() + .map(|expr| { + expr.map(|expr| { + Self::remap_children( + &self.children, + self.remapped_children.as_ref(), + expr, + ) + }) + .transpose() + }) + .collect::>>()?; + + let mut current = self.inner.write(); + let new_generation = current.generation + 1; + *current = Inner { + expression_id: current.expression_id, + generation: new_generation, + expr: global_expr, + partitioned_exprs: Some(partitioned_exprs), + is_complete: current.is_complete, + }; + drop(current); + + let _ = self.state_watch.send(FilterState::InProgress { + generation: new_generation, + }); + Ok(()) + } + + /// Returns the filter expression for a specific partition, with children + /// remapped to match any prior [`PhysicalExpr::with_new_children`] calls. + /// + /// Returns `None` if no per-partition data has been stored. Returns + /// `Some(lit(false))` if the partition was empty. + pub fn partition_filter( + &self, + partition: usize, + ) -> Result>> { + let partitioned_expr = self + .inner + .read() + .partitioned_exprs + .as_ref() + .and_then(|partitioned_exprs| partitioned_exprs.get(partition)) + .cloned(); + + partitioned_expr + .map(|expr| { + expr.map_or_else( + || Ok(Some(lit(false))), + |expr| { + Self::remap_children( + &self.children, + self.remapped_children.as_ref(), + expr, + ) + .map(Some) + }, + ) + }) + .transpose() + .map(Option::flatten) + } + /// Mark this dynamic filter as complete and broadcast to all waiters. /// /// This signals that all expected updates have been received. @@ -700,6 +784,64 @@ mod test { assert_eq!(snapshot, Some(new_expr)); } + #[test] + fn test_partition_filter() { + let dynamic_filter = + DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + + assert!(dynamic_filter.partition_filter(0).unwrap().is_none()); + + dynamic_filter + .update_partitioned(lit(true), vec![Some(lit(true)), None]) + .unwrap(); + + let partition_0 = dynamic_filter.partition_filter(0).unwrap().unwrap(); + insta::assert_snapshot!(format!("{partition_0:?}"), @r#"Literal { value: Boolean(true), field: Field { name: "lit", data_type: Boolean } }"#); + + let partition_1 = dynamic_filter.partition_filter(1).unwrap().unwrap(); + insta::assert_snapshot!(format!("{partition_1:?}"), @r#"Literal { value: Boolean(false), field: Field { name: "lit", data_type: Boolean } }"#); + + assert!(dynamic_filter.partition_filter(2).unwrap().is_none()); + } + + #[test] + fn test_partition_filter_remaps_children() { + let source_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &source_schema).unwrap(); + let filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true) as Arc, + )); + + filter + .update_partitioned( + lit(true), + vec![Some(Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Gt, + lit(10) as Arc, + )))], + ) + .unwrap(); + + let reassigned_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let reassigned = reassign_expr_columns( + Arc::clone(&filter) as Arc, + &reassigned_schema, + ) + .unwrap(); + let reassigned = reassigned + .downcast_ref::() + .expect("Expected dynamic filter after reassignment"); + + let partition_filter = reassigned.partition_filter(0).unwrap().unwrap(); + insta::assert_snapshot!(format!("{partition_filter:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Gt, right: Literal { value: Int32(10), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); + } + #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { let dynamic_filter = diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 848bf81d15979..33d91ff0cc01f 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -58,7 +58,10 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze}; pub use equivalence::{ AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union, }; -pub use partitioning::{Distribution, Partitioning}; +pub use partitioning::{ + Distribution, PartitionRange, Partitioning, PartitioningCompatibility, + PhysicalPartitioning, RangeBound, RangePartitioning, +}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, create_ordering, create_physical_sort_expr, create_physical_sort_exprs, diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index d24c60b63e6bd..bfabe7a8f3334 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -21,7 +21,9 @@ use crate::{ EquivalenceProperties, PhysicalExpr, equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal, }; +use datafusion_common::{Result, ScalarValue, plan_err}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; +use std::any::Any; use std::fmt; use std::fmt::Display; use std::sync::Arc; @@ -117,10 +119,50 @@ pub enum Partitioning { /// Allocate rows based on a hash of one of more expressions and the specified number of /// partitions Hash(Vec>, usize), + /// Custom partitioning scheme backed by a trait object. + Custom(Arc), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), } +/// Extensible physical partitioning contract. +/// +/// This is a POC shape for partitioning schemes that are not built in to the +/// [`Partitioning`] enum. It intentionally mirrors the operations DataFusion +/// already asks of partitioning metadata. +pub trait PhysicalPartitioning: fmt::Debug + Display + Send + Sync { + /// Used for display and downcasting-friendly diagnostics. + fn name(&self) -> &str; + + /// Number of output partitions. + fn partition_count(&self) -> usize; + + /// Returns how this partitioning satisfies a required [`Distribution`]. + fn satisfaction( + &self, + required: &Distribution, + eq_properties: &EquivalenceProperties, + allow_subset: bool, + ) -> PartitioningSatisfaction; + + /// Returns whether this partitioning describes the same logical partition + /// map as `other`. + fn compatibility( + &self, + other: &dyn PhysicalPartitioning, + ) -> PartitioningCompatibility; + + /// Calculate the output partitioning after applying a projection. + fn project( + &self, + mapping: &ProjectionMapping, + input_eq_properties: &EquivalenceProperties, + ) -> Arc; + + /// Downcast hook for implementations that understand each other. + fn as_any(&self) -> &dyn Any; +} + impl Display for Partitioning { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -133,6 +175,7 @@ impl Display for Partitioning { .join(", "); write!(f, "Hash([{phy_exprs_str}], {size})") } + Partitioning::Custom(custom) => write!(f, "{custom}"), Partitioning::UnknownPartitioning(size) => { write!(f, "UnknownPartitioning({size})") } @@ -140,6 +183,319 @@ impl Display for Partitioning { } } +/// Ordered range partitioning for one or more expressions. +/// +/// Each [`PartitionRange`] describes the value range for one output partition. +/// Ranges are interpreted lexicographically across [`Self::exprs`]. This type +/// records the partitioning contract; callers are responsible for constructing +/// non-overlapping ranges that accurately describe the source. +#[derive(Debug, Clone)] +pub struct RangePartitioning { + exprs: Vec>, + ranges: Vec, +} + +impl RangePartitioning { + /// Create a new [`RangePartitioning`]. + /// + /// Each bound must have the same arity as `exprs`. + pub fn try_new( + exprs: Vec>, + ranges: Vec, + ) -> Result { + if exprs.is_empty() { + return plan_err!("RangePartitioning requires at least one expression"); + } + if ranges.is_empty() { + return plan_err!("RangePartitioning requires at least one range"); + } + + for range in &ranges { + range.validate(exprs.len())?; + } + + Ok(Self { exprs, ranges }) + } + + /// Expressions whose values determine the partition range. + pub fn exprs(&self) -> &[Arc] { + &self.exprs + } + + /// Per-partition ranges, in partition index order. + pub fn ranges(&self) -> &[PartitionRange] { + &self.ranges + } + + /// Number of range partitions. + pub fn partition_count(&self) -> usize { + self.ranges.len() + } + + /// Returns how this range partitioning satisfies a hash distribution + /// requirement. + /// + /// A range partitioning satisfies the requirement when all equal values for + /// the required expressions are colocated in one partition. The routing is + /// range-based rather than hash-based, but the distribution property is the + /// same property hash joins and grouped aggregations require. + pub fn hash_distribution_satisfaction( + &self, + required_exprs: &[Arc], + eq_properties: &EquivalenceProperties, + allow_subset: bool, + ) -> PartitioningSatisfaction { + exprs_satisfy_distribution( + &self.exprs, + required_exprs, + eq_properties, + allow_subset, + ) + } + + /// Returns whether this range partitioning has the same partition map as + /// another range partitioning. + pub fn compatibility_with_range(&self, other: &Self) -> PartitioningCompatibility { + if !physical_exprs_equal(&self.exprs, &other.exprs) { + return PartitioningCompatibility::Incompatible; + } + + if self.ranges == other.ranges { + PartitioningCompatibility::SamePartitionMap + } else { + PartitioningCompatibility::SameExpressionsDifferentBounds + } + } + + fn project_with_mapping( + &self, + mapping: &ProjectionMapping, + input_eq_properties: &EquivalenceProperties, + ) -> Self { + let exprs = project_partition_exprs(&self.exprs, mapping, input_eq_properties); + Self { + exprs, + ranges: self.ranges.clone(), + } + } +} + +impl PartialEq for RangePartitioning { + fn eq(&self, other: &Self) -> bool { + physical_exprs_equal(&self.exprs, &other.exprs) && self.ranges == other.ranges + } +} + +impl Display for RangePartitioning { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let exprs = self + .exprs + .iter() + .map(|e| format!("{e}")) + .collect::>() + .join(", "); + let ranges = self + .ranges + .iter() + .map(|r| format!("{r}")) + .collect::>() + .join(", "); + write!(f, "Range([{exprs}], [{ranges}])") + } +} + +impl PhysicalPartitioning for RangePartitioning { + fn name(&self) -> &str { + "range" + } + + fn partition_count(&self) -> usize { + self.ranges.len() + } + + fn satisfaction( + &self, + required: &Distribution, + eq_properties: &EquivalenceProperties, + allow_subset: bool, + ) -> PartitioningSatisfaction { + match required { + Distribution::UnspecifiedDistribution => PartitioningSatisfaction::Exact, + Distribution::SinglePartition if self.partition_count() == 1 => { + PartitioningSatisfaction::Exact + } + Distribution::HashPartitioned(_) if self.partition_count() == 1 => { + PartitioningSatisfaction::Exact + } + Distribution::HashPartitioned(required_exprs) => { + RangePartitioning::hash_distribution_satisfaction( + self, + required_exprs, + eq_properties, + allow_subset, + ) + } + _ => PartitioningSatisfaction::NotSatisfied, + } + } + + fn compatibility( + &self, + other: &dyn PhysicalPartitioning, + ) -> PartitioningCompatibility { + other + .as_any() + .downcast_ref::() + .map(|other| self.compatibility_with_range(other)) + .unwrap_or(PartitioningCompatibility::Incompatible) + } + + fn project( + &self, + mapping: &ProjectionMapping, + input_eq_properties: &EquivalenceProperties, + ) -> Arc { + Arc::new(RangePartitioning::project_with_mapping( + self, + mapping, + input_eq_properties, + )) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// A single partition's lexicographic value range. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionRange { + lower: Option, + upper: Option, +} + +impl PartitionRange { + /// Create a range with optional lower and upper bounds. + pub fn new(lower: Option, upper: Option) -> Self { + Self { lower, upper } + } + + /// Create an unbounded range. + pub fn unbounded() -> Self { + Self { + lower: None, + upper: None, + } + } + + /// Lower bound, if any. + pub fn lower(&self) -> Option<&RangeBound> { + self.lower.as_ref() + } + + /// Upper bound, if any. + pub fn upper(&self) -> Option<&RangeBound> { + self.upper.as_ref() + } + + fn validate(&self, arity: usize) -> Result<()> { + if let Some(lower) = &self.lower { + lower.validate(arity)?; + } + if let Some(upper) = &self.upper { + upper.validate(arity)?; + } + Ok(()) + } +} + +impl Display for PartitionRange { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match (&self.lower, &self.upper) { + (Some(lower), Some(upper)) => write!(f, "{lower}..{upper}"), + (Some(lower), None) => write!(f, "{lower}.."), + (None, Some(upper)) => write!(f, "..{upper}"), + (None, None) => write!(f, ".."), + } + } +} + +/// A lexicographic range bound. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RangeBound { + values: Vec, + inclusive: bool, +} + +impl RangeBound { + /// Create an inclusive bound. + pub fn inclusive(values: Vec) -> Self { + Self { + values, + inclusive: true, + } + } + + /// Create an exclusive bound. + pub fn exclusive(values: Vec) -> Self { + Self { + values, + inclusive: false, + } + } + + /// Bound values, one per partition expression. + pub fn values(&self) -> &[ScalarValue] { + &self.values + } + + /// Whether this bound is inclusive. + pub fn is_inclusive(&self) -> bool { + self.inclusive + } + + fn validate(&self, arity: usize) -> Result<()> { + if self.values.len() != arity { + return plan_err!( + "Range bound arity mismatch: expected {arity}, got {}", + self.values.len() + ); + } + Ok(()) + } +} + +impl Display for RangeBound { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let values = self + .values + .iter() + .map(|v| format!("{v}")) + .collect::>() + .join(", "); + let marker = if self.inclusive { + "inclusive" + } else { + "exclusive" + }; + write!(f, "{marker}({values})") + } +} + +/// Describes whether two partitioning schemes define the same logical +/// partition map. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PartitioningCompatibility { + /// Partition `P` in both inputs describes the same logical key domain. + SamePartitionMap, + /// The same expressions are partitioned, but partition boundaries differ. + SameExpressionsDifferentBounds, + /// The partitioning schemes are known not to be compatible. + Incompatible, + /// The compatibility cannot be proven from the available metadata. + Unknown, +} + /// Represents how a [`Partitioning`] satisfies a [`Distribution`] requirement. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PartitioningSatisfaction { @@ -162,33 +518,41 @@ impl PartitioningSatisfaction { } impl Partitioning { + /// Create a [`Partitioning`] from a custom partitioning implementation. + pub fn custom(partitioning: Arc) -> Self { + Self::Custom(partitioning) + } + + /// Create a [`Partitioning`] from range partitioning metadata. + pub fn range(range: RangePartitioning) -> Self { + Self::Custom(Arc::new(range)) + } + + /// Returns the custom partitioning implementation, if this partitioning is + /// backed by one. + pub fn as_custom(&self) -> Option<&Arc> { + match self { + Self::Custom(custom) => Some(custom), + _ => None, + } + } + + /// Returns range partitioning metadata when this custom partitioning is a + /// [`RangePartitioning`]. + pub fn as_range(&self) -> Option<&RangePartitioning> { + self.as_custom() + .and_then(|custom| custom.as_any().downcast_ref::()) + } + /// Returns the number of partitions in this partitioning scheme pub fn partition_count(&self) -> usize { use Partitioning::*; match self { RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, + Custom(custom) => custom.partition_count(), } } - /// Returns true if `subset_exprs` is a subset of `exprs`. - /// For example: Hash(a, b) is subset of Hash(a) since a partition with all occurrences of - /// a distinct (a) must also contain all occurrences of a distinct (a, b) with the same (a). - fn is_subset_partitioning( - subset_exprs: &[Arc], - superset_exprs: &[Arc], - ) -> bool { - // Require strict subset: fewer expressions, not equal - if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() { - return false; - } - - subset_exprs.iter().all(|subset_expr| { - superset_exprs - .iter() - .any(|superset_expr| subset_expr.eq(superset_expr)) - }) - } - #[deprecated(since = "52.0.0", note = "Use satisfaction instead")] pub fn satisfy( &self, @@ -220,50 +584,14 @@ impl Partitioning { // Here we do not check the partition count for hash partitioning and assumes the partition count // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins, // then we need to have the partition count and hash functions validation. - Partitioning::Hash(partition_exprs, _) => { - // Empty hash partitioning is invalid - if partition_exprs.is_empty() || required_exprs.is_empty() { - return PartitioningSatisfaction::NotSatisfied; - } - - // Fast path: exact match - if physical_exprs_equal(required_exprs, partition_exprs) { - return PartitioningSatisfaction::Exact; - } - - // Normalization path using equivalence groups - let eq_groups = eq_properties.eq_group(); - if !eq_groups.is_empty() { - let normalized_required_exprs = required_exprs - .iter() - .map(|e| eq_groups.normalize_expr(Arc::clone(e))) - .collect::>(); - let normalized_partition_exprs = partition_exprs - .iter() - .map(|e| eq_groups.normalize_expr(Arc::clone(e))) - .collect::>(); - if physical_exprs_equal( - &normalized_required_exprs, - &normalized_partition_exprs, - ) { - return PartitioningSatisfaction::Exact; - } - - if allow_subset - && Self::is_subset_partitioning( - &normalized_partition_exprs, - &normalized_required_exprs, - ) - { - return PartitioningSatisfaction::Subset; - } - } else if allow_subset - && Self::is_subset_partitioning(partition_exprs, required_exprs) - { - return PartitioningSatisfaction::Subset; - } - - PartitioningSatisfaction::NotSatisfied + Partitioning::Hash(partition_exprs, _) => exprs_satisfy_distribution( + partition_exprs, + required_exprs, + eq_properties, + allow_subset, + ), + Partitioning::Custom(custom) => { + custom.satisfaction(required, eq_properties, allow_subset) } _ => PartitioningSatisfaction::NotSatisfied, }, @@ -277,21 +605,116 @@ impl Partitioning { mapping: &ProjectionMapping, input_eq_properties: &EquivalenceProperties, ) -> Self { - if let Partitioning::Hash(exprs, part) = self { - let normalized_exprs = input_eq_properties - .project_expressions(exprs, mapping) - .zip(exprs) - .map(|(proj_expr, expr)| { - proj_expr.unwrap_or_else(|| { - Arc::new(UnKnownColumn::new(&expr.to_string())) - }) - }) - .collect(); - Partitioning::Hash(normalized_exprs, *part) - } else { - self.clone() + match self { + Partitioning::Hash(exprs, part) => Partitioning::Hash( + project_partition_exprs(exprs, mapping, input_eq_properties), + *part, + ), + Partitioning::Custom(custom) => { + Partitioning::Custom(custom.project(mapping, input_eq_properties)) + } + _ => self.clone(), + } + } + + /// Returns whether two partitioning schemes describe the same logical + /// partition map. + pub fn compatibility(&self, other: &Self) -> PartitioningCompatibility { + match (self, other) { + ( + Partitioning::Hash(left_exprs, left_count), + Partitioning::Hash(right_exprs, right_count), + ) if left_count == right_count + && physical_exprs_equal(left_exprs, right_exprs) => + { + PartitioningCompatibility::SamePartitionMap + } + (Partitioning::Custom(left), Partitioning::Custom(right)) => { + left.compatibility(right.as_ref()) + } + (Partitioning::UnknownPartitioning(_), _) + | (_, Partitioning::UnknownPartitioning(_)) => { + PartitioningCompatibility::Unknown + } + _ => PartitioningCompatibility::Incompatible, + } + } +} + +fn exprs_satisfy_distribution( + partition_exprs: &[Arc], + required_exprs: &[Arc], + eq_properties: &EquivalenceProperties, + allow_subset: bool, +) -> PartitioningSatisfaction { + if partition_exprs.is_empty() || required_exprs.is_empty() { + return PartitioningSatisfaction::NotSatisfied; + } + + if physical_exprs_equal(required_exprs, partition_exprs) { + return PartitioningSatisfaction::Exact; + } + + let eq_groups = eq_properties.eq_group(); + if !eq_groups.is_empty() { + let normalized_required_exprs = required_exprs + .iter() + .map(|e| eq_groups.normalize_expr(Arc::clone(e))) + .collect::>(); + let normalized_partition_exprs = partition_exprs + .iter() + .map(|e| eq_groups.normalize_expr(Arc::clone(e))) + .collect::>(); + if physical_exprs_equal(&normalized_required_exprs, &normalized_partition_exprs) { + return PartitioningSatisfaction::Exact; + } + + if allow_subset + && is_subset_partitioning( + &normalized_partition_exprs, + &normalized_required_exprs, + ) + { + return PartitioningSatisfaction::Subset; } + } else if allow_subset && is_subset_partitioning(partition_exprs, required_exprs) { + return PartitioningSatisfaction::Subset; } + + PartitioningSatisfaction::NotSatisfied +} + +/// Returns true if `subset_exprs` is a subset of `exprs`. +/// For example: Hash(a, b) is subset of Hash(a) since a partition with all occurrences of +/// a distinct (a) must also contain all occurrences of a distinct (a, b) with the same (a). +fn is_subset_partitioning( + subset_exprs: &[Arc], + superset_exprs: &[Arc], +) -> bool { + // Require strict subset: fewer expressions, not equal + if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() { + return false; + } + + subset_exprs.iter().all(|subset_expr| { + superset_exprs + .iter() + .any(|superset_expr| subset_expr.eq(superset_expr)) + }) +} + +fn project_partition_exprs( + exprs: &[Arc], + mapping: &ProjectionMapping, + input_eq_properties: &EquivalenceProperties, +) -> Vec> { + input_eq_properties + .project_expressions(exprs, mapping) + .zip(exprs) + .map(|(proj_expr, expr)| { + proj_expr.unwrap_or_else(|| Arc::new(UnKnownColumn::new(&expr.to_string()))) + }) + .collect() } impl PartialEq for Partitioning { @@ -306,6 +729,12 @@ impl PartialEq for Partitioning { { true } + (Partitioning::Custom(left), Partitioning::Custom(right)) + if left.compatibility(right.as_ref()) + == PartitioningCompatibility::SamePartitionMap => + { + true + } _ => false, } } @@ -425,6 +854,225 @@ mod tests { Ok(()) } + #[test] + fn test_partitioning_compatibility() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + + let col_a: Arc = + Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = + Arc::new(Column::new_with_schema("b", &schema)?); + + let hash_a_4 = Partitioning::Hash(vec![Arc::clone(&col_a)], 4); + let hash_a_8 = Partitioning::Hash(vec![Arc::clone(&col_a)], 8); + let hash_b_4 = Partitioning::Hash(vec![Arc::clone(&col_b)], 4); + + let test_cases = vec![ + ( + hash_a_4.clone(), + Partitioning::Hash(vec![Arc::clone(&col_a)], 4), + PartitioningCompatibility::SamePartitionMap, + ), + ( + hash_a_4.clone(), + hash_a_8, + PartitioningCompatibility::Incompatible, + ), + ( + hash_a_4.clone(), + hash_b_4, + PartitioningCompatibility::Incompatible, + ), + ( + Partitioning::RoundRobinBatch(4), + Partitioning::RoundRobinBatch(4), + PartitioningCompatibility::Incompatible, + ), + ( + hash_a_4, + Partitioning::UnknownPartitioning(4), + PartitioningCompatibility::Unknown, + ), + ]; + + for (left, right, expected) in test_cases { + assert_eq!(left.compatibility(&right), expected); + assert_eq!(right.compatibility(&left), expected); + } + + Ok(()) + } + + #[test] + fn range_partitioning_satisfies_hash_distribution() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + + let col_a: Arc = + Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = + Arc::new(Column::new_with_schema("b", &schema)?); + let eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + + let range = Partitioning::range(RangePartitioning::try_new( + vec![Arc::clone(&col_a)], + vec![ + PartitionRange::new( + None, + Some(RangeBound::exclusive(vec![ScalarValue::Int64(Some(10))])), + ), + PartitionRange::new( + Some(RangeBound::inclusive(vec![ScalarValue::Int64(Some(10))])), + None, + ), + ], + )?); + + assert_eq!( + range.satisfaction( + &Distribution::HashPartitioned(vec![Arc::clone(&col_a)]), + &eq_properties, + false, + ), + PartitioningSatisfaction::Exact + ); + assert_eq!( + range.satisfaction( + &Distribution::HashPartitioned(vec![col_a, col_b]), + &eq_properties, + true, + ), + PartitioningSatisfaction::Subset + ); + + Ok(()) + } + + #[test] + fn range_partitioning_validates_bound_arity() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + + let col_a: Arc = + Arc::new(Column::new_with_schema("a", &schema)?); + + let err = RangePartitioning::try_new( + vec![col_a], + vec![PartitionRange::new( + Some(RangeBound::inclusive(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)), + ])), + None, + )], + ) + .unwrap_err(); + + assert!( + err.to_string().contains("Range bound arity mismatch"), + "{err}" + ); + + Ok(()) + } + + #[test] + fn range_partitioning_projects_partition_exprs() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + + let col_b: Arc = + Arc::new(Column::new_with_schema("b", &schema)?); + let range = Partitioning::range(RangePartitioning::try_new( + vec![Arc::clone(&col_b)], + vec![PartitionRange::unbounded()], + )?); + let projection = ProjectionMapping::try_new( + vec![(Arc::clone(&col_b), "b".to_string())], + &schema, + )?; + let eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + + let projected = range.project(&projection, &eq_properties); + let expected = Partitioning::range(RangePartitioning::try_new( + vec![Arc::new(Column::new("b", 0))], + vec![PartitionRange::unbounded()], + )?); + + assert_eq!(projected, expected); + assert_ne!(projected, range); + assert!(!projected.to_string().contains("a@0")); + + Ok(()) + } + + #[test] + fn range_partitioning_reports_compatibility() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + + let col_a: Arc = + Arc::new(Column::new_with_schema("a", &schema)?); + let col_b: Arc = + Arc::new(Column::new_with_schema("b", &schema)?); + let split_10 = vec![ + PartitionRange::new( + None, + Some(RangeBound::exclusive(vec![ScalarValue::Int64(Some(10))])), + ), + PartitionRange::new( + Some(RangeBound::inclusive(vec![ScalarValue::Int64(Some(10))])), + None, + ), + ]; + let split_20 = vec![ + PartitionRange::new( + None, + Some(RangeBound::exclusive(vec![ScalarValue::Int64(Some(20))])), + ), + PartitionRange::new( + Some(RangeBound::inclusive(vec![ScalarValue::Int64(Some(20))])), + None, + ), + ]; + + let range_a_10 = + RangePartitioning::try_new(vec![Arc::clone(&col_a)], split_10.clone())?; + let range_a_10_again = + RangePartitioning::try_new(vec![Arc::clone(&col_a)], split_10)?; + let range_a_20 = RangePartitioning::try_new(vec![Arc::clone(&col_a)], split_20)?; + let range_b_10 = + RangePartitioning::try_new(vec![col_b], vec![PartitionRange::unbounded()])?; + + assert_eq!( + range_a_10.compatibility_with_range(&range_a_10_again), + PartitioningCompatibility::SamePartitionMap, + ); + assert_eq!( + range_a_10.compatibility_with_range(&range_a_20), + PartitioningCompatibility::SameExpressionsDifferentBounds, + ); + assert_eq!( + range_a_10.compatibility_with_range(&range_b_10), + PartitioningCompatibility::Incompatible, + ); + assert_eq!( + Partitioning::range(range_a_10) + .compatibility(&Partitioning::UnknownPartitioning(2)), + PartitioningCompatibility::Unknown, + ); + + Ok(()) + } + #[test] fn test_partitioning_satisfy_by_subset() -> Result<()> { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index c522867c05196..7835b2936767d 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -696,9 +696,27 @@ fn reorder_current_join_keys( result => result, } } + (Some(left), _) if left.as_range().is_some() => { + let left_range = left.as_range().expect("checked above"); + match try_reorder(join_keys, left_range.exprs(), left_equivalence_properties) + { + (join_keys, None) => reorder_current_join_keys( + join_keys, + None, + right_partition, + left_equivalence_properties, + right_equivalence_properties, + ), + result => result, + } + } (_, Some(Partitioning::Hash(right_exprs, _))) => { try_reorder(join_keys, right_exprs, right_equivalence_properties) } + (_, Some(right)) if right.as_range().is_some() => { + let right_range = right.as_range().expect("checked above"); + try_reorder(join_keys, right_range.exprs(), right_equivalence_properties) + } _ => (join_keys, None), } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4ebbf7cb31ccf..24eea9682764f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -55,7 +55,7 @@ use crate::repartition::REPARTITION_RANDOM_STATE; use crate::spill::get_record_batch_memory_size; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, Statistics, + PartitioningCompatibility, PlanProperties, SendableRecordBatchStream, Statistics, common::can_project, joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, @@ -764,11 +764,25 @@ pub struct HashJoinExec { struct HashJoinExecDynamicFilter { /// Dynamic filter that we'll update with the results of the build side once that is done. filter: Arc, + /// How partitioned build-side filters should be routed on the probe side. + routing_mode: DynamicFilterRoutingMode, /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition. /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. build_accumulator: OnceLock>, } +/// Routing strategy for partitioned dynamic filters. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum DynamicFilterRoutingMode { + /// Route by hash-case expression, used when rows are hash repartitioned. + CaseHash, + /// Route by partition index. Used when both sides satisfy the join's + /// distribution without repartitioning and share the same partition map. + PartitionIndex, + /// Use one global filter that safely covers all build partitions. + Global, +} + impl fmt::Debug for HashJoinExec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("HashJoinExec") @@ -842,23 +856,32 @@ impl HashJoinExec { fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool { let (_, probe_preserved) = self.join_type.on_lr_is_preserved(); - if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown { - return false; - } + probe_preserved && config.optimizer.enable_join_dynamic_filter_pushdown + } - // `preserve_file_partitions` can report Hash partitioning for Hive-style - // file groups, but those partitions are not actually hash-distributed. - // Partitioned dynamic filters rely on hash routing, so disable them in - // this mode to avoid incorrect results. Follow-up work: enable dynamic - // filtering for preserve_file_partitioned scans (issue #20195). - // https://github.com/apache/datafusion/issues/20195 - if config.optimizer.preserve_file_partitions > 0 - && self.mode == PartitionMode::Partitioned - { - return false; + fn dynamic_filter_routing_mode(&self) -> DynamicFilterRoutingMode { + if self.mode != PartitionMode::Partitioned { + return DynamicFilterRoutingMode::Global; } - true + let left_partitioning = self.left.output_partitioning(); + let right_partitioning = self.right.output_partitioning(); + let compatibility = left_partitioning.compatibility(right_partitioning); + + if matches!( + (left_partitioning, right_partitioning), + (Partitioning::Hash(_, _), Partitioning::Hash(_, _)) + ) { + if compatibility == PartitioningCompatibility::SamePartitionMap { + DynamicFilterRoutingMode::CaseHash + } else { + DynamicFilterRoutingMode::Global + } + } else if compatibility == PartitioningCompatibility::SamePartitionMap { + DynamicFilterRoutingMode::PartitionIndex + } else { + DynamicFilterRoutingMode::Global + } } /// left (build) side which gets hashed @@ -1336,6 +1359,7 @@ impl ExecutionPlan for HashJoinExec { filter, on_right, repartition_random_state, + df.routing_mode, )) }))) }) @@ -1676,6 +1700,7 @@ impl ExecutionPlan for HashJoinExec { .builder() .with_dynamic_filter(Some(HashJoinExecDynamicFilter { filter: dynamic_filter, + routing_mode: self.dynamic_filter_routing_mode(), build_accumulator: OnceLock::new(), })) .build_exec()?; @@ -2375,8 +2400,10 @@ mod tests { NullEquality::NullEqualsNothing, false, )?; + let routing_mode = join.dynamic_filter_routing_mode(); join.dynamic_filter = Some(HashJoinExecDynamicFilter { filter: Arc::clone(&dynamic_filter), + routing_mode, build_accumulator: OnceLock::new(), }); diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 23ca14f5ba406..166e1934e78e3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -25,7 +25,7 @@ use crate::ExecutionPlan; use crate::ExecutionPlanProperties; use crate::joins::Map; use crate::joins::PartitionMode; -use crate::joins::hash_join::exec::HASH_JOIN_SEED; +use crate::joins::hash_join::exec::{DynamicFilterRoutingMode, HASH_JOIN_SEED}; use crate::joins::hash_join::inlist_builder::build_struct_fields; use crate::joins::hash_join::partitioned_hash_eval::{ HashExpr, HashTableLookupExpr, SeededRandomState, @@ -252,6 +252,8 @@ pub(crate) struct SharedBuildAccumulator { /// Random state for partitioning (RepartitionExec's hash function with 0,0,0,0 seeds) /// Used for PartitionedHashLookupPhysicalExpr repartition_random_state: SeededRandomState, + /// How partitioned build-side filters should be routed on the probe side. + routing_mode: DynamicFilterRoutingMode, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, } @@ -357,6 +359,7 @@ impl SharedBuildAccumulator { dynamic_filter: Arc, on_right: Vec, repartition_random_state: SeededRandomState, + routing_mode: DynamicFilterRoutingMode, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -402,6 +405,7 @@ impl SharedBuildAccumulator { dynamic_filter, on_right, repartition_random_state, + routing_mode, probe_schema: right_child.schema(), } } @@ -592,104 +596,172 @@ impl SharedBuildAccumulator { ); } }, - FinalizeInput::Partitioned(partitions) => { - let num_partitions = partitions.len(); - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) as Arc; - - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) as Arc; - - let mut real_branches = Vec::new(); - let mut empty_partition_ids = Vec::new(); - let mut has_canceled_unknown = false; - - for (partition_id, partition) in partitions.iter().enumerate() { - match partition { - PartitionStatus::Reported(partition) - if matches!(partition.pushdown, PushdownStrategy::Empty) => - { - empty_partition_ids.push(partition_id); - } - PartitionStatus::Reported(partition) => { - let membership_expr = create_membership_predicate( - &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition.bounds, - ); - let then_expr = combine_membership_and_bounds( - membership_expr, - bounds_expr, - ) - .unwrap_or_else(|| lit(true)); - real_branches.push(( - lit(ScalarValue::UInt64(Some(partition_id as u64))), - then_expr, - )); - } - PartitionStatus::CanceledUnknown => { - has_canceled_unknown = true; - } - PartitionStatus::Pending => { - return datafusion_common::internal_err!( - "attempted to finalize dynamic filter with pending partition" - ); - } - } + FinalizeInput::Partitioned(partitions) => match self.routing_mode { + DynamicFilterRoutingMode::CaseHash => { + self.update_case_hash_filter(&partitions)? + } + DynamicFilterRoutingMode::PartitionIndex => { + self.update_partition_index_filter(&partitions)? + } + DynamicFilterRoutingMode::Global => { + self.update_global_partitioned_filter(&partitions)? } + }, + } - let filter_expr = if has_canceled_unknown { - let mut when_then_branches = empty_partition_ids - .into_iter() - .map(|partition_id| { - ( - lit(ScalarValue::UInt64(Some(partition_id as u64))), - lit(false), - ) - }) - .collect::>(); - when_then_branches.extend(real_branches); - - if when_then_branches.is_empty() { - lit(true) - } else { - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(true)), - )?) as Arc - } - } else if real_branches.is_empty() { - lit(false) - } else if real_branches.len() == 1 - && empty_partition_ids.len() + 1 == num_partitions + Ok(()) + } + + fn update_case_hash_filter(&self, partitions: &[PartitionStatus]) -> Result<()> { + let num_partitions = partitions.len(); + let routing_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + self.repartition_random_state.clone(), + "hash_repartition".to_string(), + )) as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_partitions as u64))), + )) as Arc; + + let mut real_branches = Vec::new(); + let mut empty_partition_ids = Vec::new(); + let mut has_canceled_unknown = false; + + for (partition_id, partition) in partitions.iter().enumerate() { + match partition { + PartitionStatus::Reported(partition) + if matches!(partition.pushdown, PushdownStrategy::Empty) => { - Arc::clone(&real_branches[0].1) - } else { - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - real_branches, - Some(lit(false)), - )?) as Arc - }; - - self.dynamic_filter.update(filter_expr)?; + empty_partition_ids.push(partition_id); + } + PartitionStatus::Reported(partition) => { + let then_expr = self + .build_reported_partition_filter(partition)? + .unwrap_or_else(|| lit(true)); + real_branches.push(( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + then_expr, + )); + } + PartitionStatus::CanceledUnknown => { + has_canceled_unknown = true; + } + PartitionStatus::Pending => { + return datafusion_common::internal_err!( + "attempted to finalize dynamic filter with pending partition" + ); + } } } - Ok(()) + let filter_expr = if has_canceled_unknown { + let mut when_then_branches = empty_partition_ids + .into_iter() + .map(|partition_id| { + ( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + lit(false), + ) + }) + .collect::>(); + when_then_branches.extend(real_branches); + + if when_then_branches.is_empty() { + lit(true) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(true)), + )?) as Arc + } + } else if real_branches.is_empty() { + lit(false) + } else if real_branches.len() == 1 + && empty_partition_ids.len() + 1 == num_partitions + { + Arc::clone(&real_branches[0].1) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + real_branches, + Some(lit(false)), + )?) as Arc + }; + + self.dynamic_filter.update(filter_expr) + } + + fn update_partition_index_filter( + &self, + partitions: &[PartitionStatus], + ) -> Result<()> { + let partition_filters = self.partition_filters(partitions)?; + let global_filter = combine_filters_with_or(partition_filters.iter().flatten()) + .unwrap_or_else(|| lit(false)); + + self.dynamic_filter + .update_partitioned(global_filter, partition_filters) + } + + fn update_global_partitioned_filter( + &self, + partitions: &[PartitionStatus], + ) -> Result<()> { + let partition_filters = self.partition_filters(partitions)?; + let global_filter = combine_filters_with_or(partition_filters.iter().flatten()) + .unwrap_or_else(|| lit(false)); + + self.dynamic_filter.update(global_filter) } + + fn partition_filters( + &self, + partitions: &[PartitionStatus], + ) -> Result>>> { + partitions + .iter() + .map(|partition| match partition { + PartitionStatus::Reported(partition) + if matches!(partition.pushdown, PushdownStrategy::Empty) => + { + Ok(None) + } + PartitionStatus::Reported(partition) => { + self.build_reported_partition_filter(partition) + } + PartitionStatus::CanceledUnknown => Ok(Some(lit(true))), + PartitionStatus::Pending => datafusion_common::internal_err!( + "attempted to finalize dynamic filter with pending partition" + ), + }) + .collect() + } + + fn build_reported_partition_filter( + &self, + partition: &PartitionData, + ) -> Result>> { + let membership_expr = create_membership_predicate( + &self.on_right, + partition.pushdown.clone(), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = create_bounds_predicate(&self.on_right, &partition.bounds); + Ok(combine_membership_and_bounds(membership_expr, bounds_expr)) + } +} + +fn combine_filters_with_or<'a>( + filters: impl Iterator>, +) -> Option> { + filters.cloned().reduce(|left, right| { + Arc::new(BinaryExpr::new(left, Operator::Or, right)) as Arc + }) } impl fmt::Debug for SharedBuildAccumulator { @@ -721,6 +793,7 @@ mod tests { dynamic_filter, on_right: vec![], repartition_random_state: SeededRandomState::with_seed(1), + routing_mode: DynamicFilterRoutingMode::CaseHash, probe_schema, } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 90cab7246d71c..3f0d30766d89c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -33,7 +33,8 @@ use crate::metrics::{ }; use crate::projection::{ProjectionExec, ProjectionExpr}; use crate::{ - ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, + ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, + RangePartitioning, Statistics, }; // compatibility pub use super::join_filter::JoinFilter; @@ -144,6 +145,18 @@ pub fn adjust_right_output_partitioning( .collect::>()?; Partitioning::Hash(new_exprs, *size) } + _ if right_partitioning.as_range().is_some() => { + let range = right_partitioning.as_range().expect("checked above"); + let new_exprs = range + .exprs() + .iter() + .map(|expr| add_offset_to_expr(Arc::clone(expr), left_columns_len as _)) + .collect::>()?; + Partitioning::range(RangePartitioning::try_new( + new_exprs, + range.ranges().to_vec(), + )?) + } result => result.clone(), }; Ok(result) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 3005e975424b4..4aa0f10c902cb 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -37,7 +37,8 @@ pub use datafusion_expr::{Accumulator, ColumnarValue}; use datafusion_physical_expr::PhysicalSortExpr; pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ - Distribution, Partitioning, PhysicalExpr, expressions, + Distribution, PartitionRange, Partitioning, PartitioningCompatibility, PhysicalExpr, + PhysicalPartitioning, RangeBound, RangePartitioning, expressions, }; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b4af6e2c09a5c..738ae834faf6f 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -53,7 +53,7 @@ use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; use datafusion_common::{ ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, - internal_datafusion_err, internal_err, + internal_datafusion_err, internal_err, plan_err, }; use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; @@ -1318,6 +1318,7 @@ impl ExecutionPlan for RepartitionExec { new_properties.partitioning = match new_properties.partitioning { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), + Custom(custom) => Custom(custom), UnknownPartitioning(_) => UnknownPartitioning(target_partitions), }; Ok(Some(Arc::new(Self { @@ -1338,6 +1339,10 @@ impl RepartitionExec { input: Arc, partitioning: Partitioning, ) -> Result { + if matches!(partitioning, Partitioning::Custom(_)) { + return plan_err!("RepartitionExec does not support custom repartitioning"); + } + let preserve_order = false; let cache = Self::compute_properties(&input, partitioning, preserve_order); Ok(RepartitionExec { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 13c28ccb10991..10ddede726640 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -1505,11 +1505,7 @@ mod tests { let task_ctx = Arc::new(TaskContext::default()); let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]); let properties = CongestedExec::compute_properties(Arc::new(schema.clone())); - let &partition_count = match properties.output_partitioning() { - Partitioning::RoundRobinBatch(partitions) => partitions, - Partitioning::Hash(_, partitions) => partitions, - Partitioning::UnknownPartitioning(partitions) => partitions, - }; + let partition_count = properties.output_partitioning().partition_count(); let source = CongestedExec { schema: schema.clone(), cache: Arc::new(properties), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 511e8eb1b012e..53a184f8408a2 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1372,6 +1372,21 @@ message PhysicalHashRepartition { uint64 partition_count = 2; } +message PhysicalRangePartitioning { + repeated PhysicalExprNode range_expr = 1; + repeated PhysicalPartitionRange ranges = 2; +} + +message PhysicalPartitionRange { + optional PhysicalRangeBound lower = 1; + optional PhysicalRangeBound upper = 2; +} + +message PhysicalRangeBound { + repeated datafusion_common.ScalarValue values = 1; + bool inclusive = 2; +} + message RepartitionExecNode{ PhysicalPlanNode input = 1; // oneof partition_method { @@ -1388,6 +1403,7 @@ message Partitioning { uint64 round_robin = 1; PhysicalHashRepartition hash = 2; uint64 unknown = 3; + PhysicalRangePartitioning range = 4; } } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index c05d3283eac8e..a4f2cc6011d4c 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -15677,6 +15677,9 @@ impl serde::Serialize for Partitioning { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("unknown", ToString::to_string(&v).as_str())?; } + partitioning::PartitionMethod::Range(v) => { + struct_ser.serialize_field("range", v)?; + } } } struct_ser.end() @@ -15693,6 +15696,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { "roundRobin", "hash", "unknown", + "range", ]; #[allow(clippy::enum_variant_names)] @@ -15700,6 +15704,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { RoundRobin, Hash, Unknown, + Range, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -15724,6 +15729,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { "roundRobin" | "round_robin" => Ok(GeneratedField::RoundRobin), "hash" => Ok(GeneratedField::Hash), "unknown" => Ok(GeneratedField::Unknown), + "range" => Ok(GeneratedField::Range), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -15765,6 +15771,13 @@ impl<'de> serde::Deserialize<'de> for Partitioning { } partition_method__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| partitioning::PartitionMethod::Unknown(x.0)); } + GeneratedField::Range => { + if partition_method__.is_some() { + return Err(serde::de::Error::duplicate_field("range")); + } + partition_method__ = map_.next_value::<::std::option::Option<_>>()?.map(partitioning::PartitionMethod::Range) +; + } } } Ok(Partitioning { @@ -18361,6 +18374,114 @@ impl<'de> serde::Deserialize<'de> for PhysicalNot { deserializer.deserialize_struct("datafusion.PhysicalNot", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalPartitionRange { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.lower.is_some() { + len += 1; + } + if self.upper.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalPartitionRange", len)?; + if let Some(v) = self.lower.as_ref() { + struct_ser.serialize_field("lower", v)?; + } + if let Some(v) = self.upper.as_ref() { + struct_ser.serialize_field("upper", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalPartitionRange { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "lower", + "upper", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Lower, + Upper, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "lower" => Ok(GeneratedField::Lower), + "upper" => Ok(GeneratedField::Upper), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalPartitionRange; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalPartitionRange") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut lower__ = None; + let mut upper__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Lower => { + if lower__.is_some() { + return Err(serde::de::Error::duplicate_field("lower")); + } + lower__ = map_.next_value()?; + } + GeneratedField::Upper => { + if upper__.is_some() { + return Err(serde::de::Error::duplicate_field("upper")); + } + upper__ = map_.next_value()?; + } + } + } + Ok(PhysicalPartitionRange { + lower: lower__, + upper: upper__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalPartitionRange", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalPlanNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -18960,6 +19081,223 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { deserializer.deserialize_struct("datafusion.PhysicalPlanNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalRangeBound { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.values.is_empty() { + len += 1; + } + if self.inclusive { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalRangeBound", len)?; + if !self.values.is_empty() { + struct_ser.serialize_field("values", &self.values)?; + } + if self.inclusive { + struct_ser.serialize_field("inclusive", &self.inclusive)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalRangeBound { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "values", + "inclusive", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Values, + Inclusive, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "values" => Ok(GeneratedField::Values), + "inclusive" => Ok(GeneratedField::Inclusive), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalRangeBound; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalRangeBound") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut values__ = None; + let mut inclusive__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Values => { + if values__.is_some() { + return Err(serde::de::Error::duplicate_field("values")); + } + values__ = Some(map_.next_value()?); + } + GeneratedField::Inclusive => { + if inclusive__.is_some() { + return Err(serde::de::Error::duplicate_field("inclusive")); + } + inclusive__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalRangeBound { + values: values__.unwrap_or_default(), + inclusive: inclusive__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalRangeBound", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for PhysicalRangePartitioning { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.range_expr.is_empty() { + len += 1; + } + if !self.ranges.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalRangePartitioning", len)?; + if !self.range_expr.is_empty() { + struct_ser.serialize_field("rangeExpr", &self.range_expr)?; + } + if !self.ranges.is_empty() { + struct_ser.serialize_field("ranges", &self.ranges)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalRangePartitioning { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "range_expr", + "rangeExpr", + "ranges", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + RangeExpr, + Ranges, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "rangeExpr" | "range_expr" => Ok(GeneratedField::RangeExpr), + "ranges" => Ok(GeneratedField::Ranges), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalRangePartitioning; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalRangePartitioning") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut range_expr__ = None; + let mut ranges__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::RangeExpr => { + if range_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("rangeExpr")); + } + range_expr__ = Some(map_.next_value()?); + } + GeneratedField::Ranges => { + if ranges__.is_some() { + return Err(serde::de::Error::duplicate_field("ranges")); + } + ranges__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalRangePartitioning { + range_expr: range_expr__.unwrap_or_default(), + ranges: ranges__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalRangePartitioning", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalScalarSubqueryExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index af9b1404bb80a..f3f2ef986f4d0 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2029,6 +2029,27 @@ pub struct PhysicalHashRepartition { pub partition_count: u64, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalRangePartitioning { + #[prost(message, repeated, tag = "1")] + pub range_expr: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub ranges: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalPartitionRange { + #[prost(message, optional, tag = "1")] + pub lower: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub upper: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalRangeBound { + #[prost(message, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "2")] + pub inclusive: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct RepartitionExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, @@ -2044,7 +2065,7 @@ pub struct RepartitionExecNode { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Partitioning { - #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3")] + #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3, 4")] pub partition_method: ::core::option::Option, } /// Nested message and enum types in `Partitioning`. @@ -2057,6 +2078,8 @@ pub mod partitioning { Hash(super::PhysicalHashRepartition), #[prost(uint64, tag = "3")] Unknown(u64), + #[prost(message, tag = "4")] + Range(super::PhysicalRangePartitioning), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 43ebf0474320a..f1570b04a86ba 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -48,7 +48,9 @@ use datafusion_physical_plan::expressions::{ }; use datafusion_physical_plan::joins::{HashExpr, SeededRandomState}; use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field}; -use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion_physical_plan::{ + PartitionRange, Partitioning, PhysicalExpr, RangeBound, RangePartitioning, WindowExpr, +}; use datafusion_proto_common::common::proto_error; use object_store::ObjectMeta; use object_store::path::Path; @@ -556,6 +558,7 @@ pub fn parse_physical_expr_with_converter( expression_id, generation: dynamic_filter.generation, expr: inner_expr, + partitioned_exprs: None, is_complete: dynamic_filter.is_complete, }, )); @@ -632,6 +635,15 @@ pub fn parse_protobuf_partitioning( proto_converter, ) } + Some(protobuf::partitioning::PartitionMethod::Range(range)) => { + parse_protobuf_range_partitioning( + range, + ctx, + input_schema, + proto_converter, + ) + .map(Some) + } Some(protobuf::partitioning::PartitionMethod::Unknown(partition_count)) => { Ok(Some(Partitioning::UnknownPartitioning( *partition_count as usize, @@ -643,6 +655,48 @@ pub fn parse_protobuf_partitioning( } } +fn parse_protobuf_range_partitioning( + range: &protobuf::PhysicalRangePartitioning, + ctx: &PhysicalPlanDecodeContext<'_>, + input_schema: &Schema, + proto_converter: &dyn PhysicalProtoConverterExtension, +) -> Result { + let exprs = + parse_physical_exprs(&range.range_expr, ctx, input_schema, proto_converter)?; + let ranges = range + .ranges + .iter() + .map(parse_partition_range) + .collect::>>()?; + + Ok(Partitioning::range(RangePartitioning::try_new( + exprs, ranges, + )?)) +} + +fn parse_partition_range( + range: &protobuf::PhysicalPartitionRange, +) -> Result { + Ok(PartitionRange::new( + range.lower.as_ref().map(parse_range_bound).transpose()?, + range.upper.as_ref().map(parse_range_bound).transpose()?, + )) +} + +fn parse_range_bound(bound: &protobuf::PhysicalRangeBound) -> Result { + let values = bound + .values + .iter() + .map(TryInto::try_into) + .collect::, _>>()?; + + if bound.inclusive { + Ok(RangeBound::inclusive(values)) + } else { + Ok(RangeBound::exclusive(values)) + } +} + pub fn parse_protobuf_file_scan_schema( proto: &protobuf::FileScanExecConf, ) -> Result> { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 83c11cfc6b299..6cd6c35ef3802 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -43,7 +43,9 @@ use datafusion_physical_plan::expressions::{ use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; -use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion_physical_plan::{ + PartitionRange, Partitioning, PhysicalExpr, RangeBound, RangePartitioning, WindowExpr, +}; use super::{ DefaultPhysicalProtoConverter, PhysicalExtensionCodec, @@ -615,6 +617,19 @@ pub fn serialize_partitioning( )), } } + Partitioning::Custom(custom) => { + let Some(range) = custom.as_any().downcast_ref::() else { + return not_impl_err!( + "Serializing custom partitioning '{}' is not supported", + custom.name() + ); + }; + protobuf::Partitioning { + partition_method: Some(protobuf::partitioning::PartitionMethod::Range( + serialize_range_partitioning(range, codec, proto_converter)?, + )), + } + } Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning { partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown( *partition_count as u64, @@ -624,6 +639,41 @@ pub fn serialize_partitioning( Ok(serialized_partitioning) } +fn serialize_range_partitioning( + range: &RangePartitioning, + codec: &dyn PhysicalExtensionCodec, + proto_converter: &dyn PhysicalProtoConverterExtension, +) -> Result { + Ok(protobuf::PhysicalRangePartitioning { + range_expr: serialize_physical_exprs(range.exprs(), codec, proto_converter)?, + ranges: range + .ranges() + .iter() + .map(serialize_partition_range) + .collect::>>()?, + }) +} + +fn serialize_partition_range( + range: &PartitionRange, +) -> Result { + Ok(protobuf::PhysicalPartitionRange { + lower: range.lower().map(serialize_range_bound).transpose()?, + upper: range.upper().map(serialize_range_bound).transpose()?, + }) +} + +fn serialize_range_bound(bound: &RangeBound) -> Result { + Ok(protobuf::PhysicalRangeBound { + values: bound + .values() + .iter() + .map(TryInto::try_into) + .collect::, _>>()?, + inclusive: bound.is_inclusive(), + }) +} + fn serialize_when_then_expr( when_expr: &Arc, then_expr: &Arc, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fa342ae9079d5..da0b62de64b33 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -85,7 +85,8 @@ use datafusion::physical_plan::windows::{ create_udwf_window_expr, }; use datafusion::physical_plan::{ - ExecutionPlan, InputOrderMode, Partitioning, PhysicalExpr, Statistics, displayable, + ExecutionPlan, InputOrderMode, PartitionRange, Partitioning, PhysicalExpr, + RangeBound, RangePartitioning, Statistics, displayable, }; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion::scalar::ScalarValue; @@ -118,7 +119,10 @@ use datafusion_proto::bytes::{ physical_plan_from_bytes_with_proto_converter, physical_plan_to_bytes_with_proto_converter, }; -use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter; +use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning; +use datafusion_proto::physical_plan::to_proto::{ + serialize_partitioning, serialize_physical_expr_with_converter, +}; use datafusion_proto::physical_plan::{ AsExecutionPlan, DeduplicatingProtoConverter, DefaultPhysicalExtensionCodec, DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalPlanDecodeContext, @@ -1798,6 +1802,41 @@ fn roundtrip_repartition_preserve_order() -> Result<()> { roundtrip_test(Arc::new(repartition)) } +#[test] +fn roundtrip_range_partitioning() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let range_partitioning = Partitioning::range(RangePartitioning::try_new( + vec![col("a", &schema)?], + vec![ + PartitionRange::new( + None, + Some(RangeBound::exclusive(vec![ScalarValue::Int64(Some(10))])), + ), + PartitionRange::new( + Some(RangeBound::inclusive(vec![ScalarValue::Int64(Some(10))])), + None, + ), + ], + )?); + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let task_ctx = ctx.task_ctx(); + let decode_ctx = PhysicalPlanDecodeContext::new(task_ctx.as_ref(), &codec); + + let proto = serialize_partitioning(&range_partitioning, &codec, &proto_converter)?; + let decoded = parse_protobuf_partitioning( + Some(&proto), + &decode_ctx, + &schema, + &proto_converter, + )? + .expect("partitioning"); + + assert_eq!(decoded, range_partitioning); + Ok(()) +} + #[test] fn roundtrip_interleave() -> Result<()> { let field_a = Field::new("col", DataType::Int64, false); diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 175d7d90cd8ed..cf454ac0b659c 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -124,6 +124,30 @@ STORED AS PARQUET; ---- 1 +# Create a matching partitioned dimension table with an empty C partition. +# This lets the runtime dynamic filter prove it can prune the C probe partition +# while still preserving the same three-partition file map on both sides. +query I +COPY (SELECT 'dev' as env, 'log' as service) +TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/d_dkey=A/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT 'prod' as env, 'log' as service) +TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/d_dkey=B/data.parquet' +STORED AS PARQUET; +---- +1 + +query I +COPY (SELECT 'prod' as env, 'log' as service WHERE false) +TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/d_dkey=C/data.parquet' +STORED AS PARQUET; +---- +0 + # Create high-cardinality fact table (5 partitions > 3 target_partitions) # For testing partition merging with consistent hashing query I @@ -203,6 +227,13 @@ STORED AS PARQUET PARTITIONED BY (d_dkey STRING) LOCATION 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/'; +# Hive-partitioned dimension table with an empty C partition +statement ok +CREATE EXTERNAL TABLE dimension_table_partitioned_sparse (env STRING, service STRING) +STORED AS PARQUET +PARTITIONED BY (d_dkey STRING) +LOCATION 'test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/'; + # 'High'-cardinality fact table (5 partitions > 3 target_partitions) statement ok CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value DOUBLE) @@ -532,12 +563,20 @@ C 1 300 D 1 400 E 1 500 -# Now with optimization - verify plan shows SinglePartitioned mode and no RepartitionExec +# Now with optimization. Since multiple distinct partition values are merged +# into each file group, DataFusion cannot truthfully represent this as range +# partitioning and falls back to repartitioning. +# +# Future work could avoid this repartition either by: +# 1. extending range partitioning to support multiple disjoint ranges per partition +# (for example p0 = [A, A] union [D, D]), or +# 2. adding a separate custom partitioning implementation for disjoint value sets. statement ok set datafusion.optimizer.preserve_file_partitions = 1; -# Verify the plan uses SinglePartitioned mode with no RepartitionExec -# The 5 partitions are merged into 3 file groups using round-robin assignment +# Verify the plan repartitions after the partial aggregate. +# The 5 partitions are merged into 3 file groups using round-robin assignment, +# so each file group can contain non-contiguous partition values. query TT EXPLAIN SELECT f_dkey, count(*), sum(value) FROM high_cardinality_table GROUP BY f_dkey; ---- @@ -547,8 +586,10 @@ logical_plan 03)----TableScan: high_cardinality_table projection=[value, f_dkey] physical_plan 01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), sum(high_cardinality_table.value)@2 as sum(high_cardinality_table.value)] -02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] +03)----RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3 +04)------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), sum(high_cardinality_table.value)] +05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet # Verify results with optimization match results without optimization query TIR rowsort @@ -659,8 +700,8 @@ C prod 2017.6 # TEST 12: Partitioned Join with Matching Partition Counts - With Optimization # Both tables have 3 partitions matching target_partitions=3 # No RepartitionExec needed for join - partitions already satisfy the requirement -# Dynamic filter pushdown is disabled in this mode because preserve_file_partitions -# reports Hash partitioning for Hive-style file groups, which are not hash-routed. +# Dynamic filter pushdown is still safe because both sides advertise the same +# range partition map and the probe can route filters by file-group index. ########## statement ok @@ -686,7 +727,7 @@ physical_plan 03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] 04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0] 05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet -06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ] query TTR rowsort SELECT f.f_dkey, d.env, sum(f.value) @@ -698,6 +739,45 @@ A dev 772.4 B prod 614.4 C prod 2017.6 +########## +# TEST 12b: Partitioned Join Dynamic Filter Prunes an Empty Build Partition +# dimension_table_partitioned_sparse has an empty C partition. The optimized +# partitioned join keeps the file partitioning and the runtime dynamic filter +# prunes the C fact partition from the probe side. +########## + +statement ok +set datafusion.explain.analyze_level = summary; + +query TTR rowsort +SELECT f.f_dkey, d.env, sum(f.value) +FROM fact_table f +INNER JOIN dimension_table_partitioned_sparse d ON f.f_dkey = d.d_dkey +GROUP BY f.f_dkey, d.env; +---- +A dev 772.4 +B prod 614.4 + +# The probe-side plan still has three fact file groups, but after execution the +# dynamic filter only allows A and B. The fact scan reports two file ranges, +# proving the C probe partition was removed before scanning. +query TT +EXPLAIN ANALYZE SELECT f.f_dkey, d.env, sum(f.value) +FROM fact_table f +INNER JOIN dimension_table_partitioned_sparse d ON f.f_dkey = d.d_dkey +GROUP BY f.f_dkey, d.env; +---- +Plan with Metrics +01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)], metrics=[output_rows=2, elapsed_compute=, output_bytes=] +02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3, metrics=[output_rows=2, elapsed_compute=, output_bytes=] +03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, reduction_factor=14.29% (2/14)] +04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0], metrics=[output_rows=14, elapsed_compute=, output_bytes=, avg_fanout=100% (14/14), probe_hit_rate=100% (14/14)] +05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned_sparse/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet, metrics=[output_rows=2, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=2 total → 2 matched, row_groups_pruned_bloom_filter=2 total → 2 matched, page_index_pages_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio=] +06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ f_dkey@2 >= A AND f_dkey@2 <= A AND f_dkey@2 IN (SET) ([A]) OR f_dkey@2 >= B AND f_dkey@2 <= B AND f_dkey@2 IN (SET) ([B]) ], pruning_predicate=f_dkey_null_count@1 != row_count@2 AND f_dkey_max@0 >= A AND f_dkey_null_count@1 != row_count@2 AND f_dkey_min@3 <= A AND f_dkey_null_count@1 != row_count@2 AND f_dkey_min@3 <= A AND A <= f_dkey_max@0 OR f_dkey_null_count@1 != row_count@2 AND f_dkey_max@0 >= B AND f_dkey_null_count@1 != row_count@2 AND f_dkey_min@3 <= B AND f_dkey_null_count@1 != row_count@2 AND f_dkey_min@3 <= B AND B <= f_dkey_max@0, required_guarantees=[f_dkey in (A, B)], metrics=[output_rows=14, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=2 total → 2 matched, row_groups_pruned_statistics=2 total → 2 matched, row_groups_pruned_bloom_filter=2 total → 2 matched, page_index_pages_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio=] + +statement ok +reset datafusion.explain.analyze_level; + ########## # TEST 13: Partitioned Join where Number of File Groups is less than target_partitions # With preserve_file_partitions enabled, we should still avoid repartitioning @@ -781,5 +861,8 @@ DROP TABLE dimension_table; statement ok DROP TABLE dimension_table_partitioned; +statement ok +DROP TABLE dimension_table_partitioned_sparse; + statement ok DROP TABLE high_cardinality_table;