Skip to content

Commit ac59538

Browse files
Dandandanclaude
andcommitted
Add per-partition hash tables for partial aggregation
Introduce PartitionAggState to support multiple internal hash tables in partial aggregation. When enabled via AggregateExec::with_num_agg_partitions(), input rows are hashed by group keys (using the same hash as RepartitionExec) and routed to separate smaller hash tables for better cache locality. Defaults to 1 partition (no behavior change). The optimizer can set higher values when a hash repartition follows the partial aggregate. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 40463ea commit ac59538

2 files changed

Lines changed: 534 additions & 185 deletions

File tree

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,12 @@ pub struct AggregateExec {
660660
/// it remains `Some(..)` to enable dynamic filtering during aggregate execution;
661661
/// otherwise, it is cleared to `None`.
662662
dynamic_filter: Option<Arc<AggrDynFilter>>,
663+
664+
/// Number of internal hash table partitions for partial aggregation.
665+
/// When > 1, input rows are hashed by group keys and routed to separate
666+
/// smaller hash tables for better cache locality. Only used when mode is
667+
/// Partial and input is unordered. Defaults to 1 (single hash table).
668+
pub(crate) num_agg_partitions: usize,
663669
}
664670

665671
impl AggregateExec {
@@ -685,6 +691,7 @@ impl AggregateExec {
685691
schema: Arc::clone(&self.schema),
686692
input_schema: Arc::clone(&self.input_schema),
687693
dynamic_filter: self.dynamic_filter.clone(),
694+
num_agg_partitions: self.num_agg_partitions,
688695
}
689696
}
690697

@@ -705,6 +712,7 @@ impl AggregateExec {
705712
schema: Arc::clone(&self.schema),
706713
input_schema: Arc::clone(&self.input_schema),
707714
dynamic_filter: self.dynamic_filter.clone(),
715+
num_agg_partitions: self.num_agg_partitions,
708716
}
709717
}
710718

@@ -839,6 +847,7 @@ impl AggregateExec {
839847
input_order_mode,
840848
cache: Arc::new(cache),
841849
dynamic_filter: None,
850+
num_agg_partitions: 1,
842851
};
843852

844853
exec.init_dynamic_filter();
@@ -851,6 +860,12 @@ impl AggregateExec {
851860
&self.mode
852861
}
853862

863+
/// Set the number of internal hash table partitions for partial aggregation.
864+
pub fn with_num_agg_partitions(mut self, n: usize) -> Self {
865+
self.num_agg_partitions = n;
866+
self
867+
}
868+
854869
/// Set the limit options for this AggExec
855870
pub fn with_limit_options(mut self, limit_options: Option<LimitOptions>) -> Self {
856871
self.limit_options = limit_options;

0 commit comments

Comments
 (0)