[WIP] Explore extensible range partitioning for dynamic filters#22002
Conversation
| self.ranges.len() | ||
| } | ||
|
|
||
| /// Returns how this range partitioning satisfies a hash distribution |
There was a problem hiding this comment.
this is something we will have to be careful of and raises the question:
Should we allow different types of partitioning to satisfy each other? This may provide more optimizations like skipping repartitions, but can see this getting quite brittle
This is also why the idea of "compatibility" was introduced:
satisfaction- decides whether a partitioning is neededcompatibility- decides whether partition specific behavior is valid (like dynamic filter routing)
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
| } | ||
|
|
||
| /// Update this filter with a global fallback plus partition-local filters. | ||
| pub fn update_partitioned( |
| /// non-overlapping ranges that accurately describe the source. | ||
| #[derive(Debug, Clone)] | ||
| pub struct RangePartitioning { | ||
| exprs: Vec<Arc<dyn PhysicalExpr>>, |
There was a problem hiding this comment.
In generla I think is is important to define formally what range partitioning means -- specifically given a particular row what partition is it in
As you have this structured I think you could have more than one range expressions be true. For example
{
exprs: [a, b]
ranges: [[100-200], [100-200]]
}
For a row like (a,b) = (10,20) it would be in both ranges
I think it is more common (e.g. spark) to have something lke
- A single
Expr - A list of ranges
- Explicitly declare that the FIRST range that matches is the partition that the row is placed in
There was a problem hiding this comment.
Also, does it make sense to explicitly define that the ranges can't overlap and must cover the whole range?
|
|
||
| /// A lexicographic range bound. | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct RangeBound { |
There was a problem hiding this comment.
This looks pretty similar to Interval: https://docs.rs/datafusion/latest/datafusion/logical_expr/interval_arithmetic/struct.Interval.html
Which issue does this PR close?
Rationale for this change
The intention of this POC is to discuss API shape, design, and path forward. Not implementation details. The implementation detail was done by AI but API designed by myself.
This is a proof of concept for representing file/range partitioning instead of advertising it as hash partitioning. The goal is to show how partitioning can be made into an extendable/customizable part of DataFusion rather than the current enum with fixed variants. It also shows how compatibility can support partition-specific optimizations (in this case tighter dynamic filters as discussed in #21207).
What changes are included in this PR?
PhysicalPartitioning trait shape
PhysicalPartitioningis an extensible representation of partitioning. Each method has a responsibility to represent a partitioning in a general but correct sense:name: the namepartition_count: number of output partitionssatisfaction: whether this partitioning satisfies a requiredDistribution. This is used by distribution planning to decide whether repartitioning is neededcompatibility: whether two partitionings describe the same partition map. This is more restrictive thansatisfactionand used for partition-specific behavior (in this case routing dynamic filters by partition index)project: rewrites partitioning expressions through projection while preserving the same partition mapas_any: downcast implementations to compare partitioning typesAn important design note:
Thus, dynamic filters use compatibility before enabling partition-index routing to ensure each side can be trreated as the same key.
Are these changes tested?
Yes:
cargo test -p datafusion-physical-expr -p datafusion-datasource -p datafusion-datasource-parquet -p datafusion-physical-plan -p datafusion-physical-optimizer -p datafusion-proto-> existing and new tests passcargo test --test sqllogictests -- preserve_file_partitioning push_down_filter_regression repartition_scan listing_table_partitions aggregate group_by joins-> shows new end-to-end dynamic filter and file partitioning behaviorAre there any user-facing changes?
Yes. This POC is meant to spark conversation around the public API:
Partitioning::Custom,PhysicalPartitioning,RangePartitioning, range bounds, and partitioning compatibility.