diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b10761a5fe816..4e20470511a55 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -605,7 +605,7 @@ config_namespace! { /// will be removed after the migration is finished. /// /// See for details. - pub enable_migration_aggregate: bool, default = false + pub enable_migration_aggregate: bool, default = true /// Sets the compression codec used when spilling data to disk. /// diff --git a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs index a0934b976ea79..1c6285d793b88 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/metrics.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/metrics.rs @@ -19,6 +19,7 @@ use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time}; +#[derive(Clone)] pub(crate) struct GroupByMetrics { /// Time spent calculating the group IDs from the evaluated grouping columns. pub(crate) time_calculating_group_ids: Time, diff --git a/datafusion/physical-plan/src/aggregates/hash_aggregate.rs b/datafusion/physical-plan/src/aggregates/hash_aggregate.rs index 0c8593efd05bb..c83a465c2d50a 100644 --- a/datafusion/physical-plan/src/aggregates/hash_aggregate.rs +++ b/datafusion/physical-plan/src/aggregates/hash_aggregate.rs @@ -25,6 +25,7 @@ //! //! See issue for details: +use std::ops::ControlFlow; use std::sync::Arc; use std::task::{Context, Poll}; @@ -36,8 +37,11 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use futures::stream::{Stream, StreamExt}; use super::AggregateExec; -use super::hash_table::{AggregateHashTable, Final, Partial}; -use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput, SpillMetrics}; +use super::hash_table::{AggregateHashTable, Final, Partial, PartialSkip}; +use super::utils::SkipAggregationProbe; +use crate::metrics::{ + BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput, SpillMetrics, +}; use crate::stream::EmptyRecordBatchStream; use crate::{InputOrderMode, RecordBatchStream, SendableRecordBatchStream, metrics}; @@ -87,6 +91,17 @@ use crate::{InputOrderMode, RecordBatchStream, SendableRecordBatchStream, metric /// This operator does not guarantee an exact limit because a single batch can /// cross the threshold. The downstream limit operator enforces the exact result /// size. +/// +/// # Optimization: Partial Aggregation Skip +/// +/// Partial aggregation can be counterproductive for high-cardinality inputs, +/// where most rows create distinct groups. The stream probes the ratio of +/// accumulated groups to input rows while it is still aggregating. If the ratio +/// crosses the configured threshold and all aggregate accumulators can convert +/// raw inputs directly to partial state, the stream emits any already +/// accumulated groups, then switches to a skip state. In that state, each +/// remaining input batch is converted directly to partial aggregate state rows +/// without inserting the rows into the grouped hash table. pub(crate) struct PartialHashAggregateStream { /// Output schema: group columns followed by partial aggregate state columns. schema: SchemaRef, @@ -94,9 +109,6 @@ pub(crate) struct PartialHashAggregateStream { /// Input batches containing raw rows, not partial aggregate state. input: SendableRecordBatchStream, - /// Hash table state for this aggregate stream. - hash_table: AggregateHashTable, - /// Memory reservation for group keys and accumulators. reservation: MemoryReservation, @@ -106,11 +118,65 @@ pub(crate) struct PartialHashAggregateStream { /// Tracks partial aggregation row reduction, matching `GroupedHashAggregateStream`. reduction_factor: metrics::RatioMetrics, + /// Tracks whether partial aggregation should switch to direct state conversion. + skip_aggregation_probe: Option, + /// Optional soft limit on the number of groups to accumulate before output. /// /// Invariant: when this is `Some(..)`, the accumulators inside `hash_table` must /// be empty. See struct comments for details. group_values_soft_limit: Option, + + /// Tracks the high-level stream lifecycle. The hash table owns the lower-level + /// state for materializing and slicing output batches. + state: Option, +} + +/// States for partial hash aggregation processing. +enum PartialHashAggregateState { + ReadingInput { + hash_table: AggregateHashTable, + }, + ProducingOutput { + hash_table: AggregateHashTable, + /// If `None`, partial skip was never triggered and this state will + /// finish in `Done`. If `Some`, partial skip has triggered and the + /// stream will move to `SkippingAggregation` after these accumulated + /// groups are emitted. + skip_hash_table: Option>, + }, + SkippingAggregation { + hash_table: AggregateHashTable, + }, + Done, +} + +type PartialHashAggregatePoll = Poll>>; +type PartialHashAggregateStateTransition = ControlFlow< + (PartialHashAggregatePoll, PartialHashAggregateState), + PartialHashAggregateState, +>; + +impl PartialHashAggregateState { + fn hash_table(&self) -> &AggregateHashTable { + match self { + Self::ReadingInput { hash_table } + | Self::ProducingOutput { hash_table, .. } => hash_table, + Self::SkippingAggregation { .. } | Self::Done => { + unreachable!("state does not hold a partial hash table") + } + } + } + + fn hash_table_mut(&mut self) -> &mut AggregateHashTable { + match self { + Self::ReadingInput { hash_table } + | Self::ProducingOutput { hash_table, .. } => hash_table, + Self::SkippingAggregation { .. } | Self::Done => { + unreachable!("state does not hold a partial hash table") + } + } + } } /// Hash aggregation uses a 2-stage (partial and final) hash aggregation, this stream @@ -124,9 +190,6 @@ pub(crate) struct FinalHashAggregateStream { /// Input batches containing partial aggregate state rows. input: SendableRecordBatchStream, - /// Hash table state for this aggregate stream. - hash_table: AggregateHashTable, - /// Execution metrics shared with the aggregate plan node. baseline_metrics: BaselineMetrics, @@ -135,6 +198,67 @@ pub(crate) struct FinalHashAggregateStream { /// See comments for the same variable in [`PartialHashAggregateStream`] group_values_soft_limit: Option, + + /// Tracks the high-level stream lifecycle. The hash table owns the lower-level + /// state for materializing and slicing output batches. + state: Option, +} + +/// States for final hash aggregation processing. +// Typestate pattern is used, in case the inner logic become more complex in the future. +enum FinalHashAggregateState { + ReadingInput { + hash_table: AggregateHashTable, + }, + ProducingOutput { + hash_table: AggregateHashTable, + }, + Done, +} + +type FinalHashAggregatePoll = Poll>>; +type FinalHashAggregateStateTransition = ControlFlow< + (FinalHashAggregatePoll, FinalHashAggregateState), + FinalHashAggregateState, +>; + +impl FinalHashAggregateState { + fn hash_table(&self) -> &AggregateHashTable { + match self { + Self::ReadingInput { hash_table } | Self::ProducingOutput { hash_table } => { + hash_table + } + Self::Done => unreachable!("Done state does not hold a hash table"), + } + } + + fn hash_table_mut(&mut self) -> &mut AggregateHashTable { + match self { + Self::ReadingInput { hash_table } | Self::ProducingOutput { hash_table } => { + hash_table + } + Self::Done => unreachable!("Done state does not hold a hash table"), + } + } + + fn into_hash_table(self) -> AggregateHashTable { + match self { + Self::ReadingInput { hash_table } | Self::ProducingOutput { hash_table } => { + hash_table + } + Self::Done => unreachable!("Done state does not hold a hash table"), + } + } + + fn into_producing_output(self) -> Self { + Self::ProducingOutput { + hash_table: self.into_hash_table(), + } + } + + fn into_done(self) -> Self { + Self::Done + } } impl PartialHashAggregateStream { @@ -163,6 +287,29 @@ impl PartialHashAggregateStream { Arc::clone(&schema), batch_size, )?; + let can_skip_aggregation = + agg.group_by.is_single() && hash_table.can_skip_aggregation(); + let skip_aggregation_probe = if can_skip_aggregation { + let options = &context.session_config().options().execution; + let probe_ratio_threshold = + options.skip_partial_aggregation_probe_ratio_threshold; + // A threshold >= 1.0 means the ratio (num_groups / input_rows) can + // never exceed it, so the feature is effectively disabled. + if probe_ratio_threshold >= 1.0 { + None + } else { + let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) + .with_category(MetricCategory::Rows) + .counter("skipped_aggregation_rows", partition); + Some(SkipAggregationProbe::new( + options.skip_partial_aggregation_probe_rows_threshold, + probe_ratio_threshold, + skipped_aggregation_rows, + )) + } + } else { + None + }; let reservation = MemoryConsumer::new(format!("PartialHashAggregateStream[{partition}]")) @@ -171,106 +318,382 @@ impl PartialHashAggregateStream { Ok(Self { schema, input, - hash_table, baseline_metrics, reservation, reduction_factor, + skip_aggregation_probe, group_values_soft_limit: agg.limit_options().map(|config| config.limit()), + state: Some(PartialHashAggregateState::ReadingInput { hash_table }), }) } /// See comments in [`Self::group_values_soft_limit`] for details. - fn hit_soft_group_limit(&self) -> bool { + fn hit_soft_group_limit(&self, hash_table: &AggregateHashTable) -> bool { self.group_values_soft_limit - .is_some_and(|limit| limit <= self.hash_table.building_group_count()) + .is_some_and(|limit| limit <= hash_table.building_group_count()) } - fn start_output(&mut self) -> Result<()> { - let input_schema = self.input.schema(); - self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); - self.hash_table.start_output() + /// Updates skip aggregation probe state. + fn update_skip_aggregation_probe(&mut self, input_rows: usize, num_groups: usize) { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + probe.update_state(input_rows, num_groups); + } } -} -impl Stream for PartialHashAggregateStream { - type Item = Result; + /// Returns true if the aggregation probe indicates that aggregation + /// should be skipped. + fn should_skip_aggregation(&self) -> bool { + self.skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + } - fn poll_next( - mut self: std::pin::Pin<&mut Self>, + fn start_output( + &mut self, + hash_table: &mut AggregateHashTable, + close_input: bool, + ) -> Result<()> { + if close_input { + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + } + hash_table.start_output() + } + + /// Handle ReadingInput state - aggregate input batches into the hash table. + /// + /// Returns the next operator state with control flow decision. + fn handle_reading_input( + &mut self, cx: &mut Context<'_>, - ) -> Poll> { - let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + mut original_state: PartialHashAggregateState, + ) -> PartialHashAggregateStateTransition { + debug_assert!(matches!( + &original_state, + PartialHashAggregateState::ReadingInput { .. } + )); + debug_assert!(original_state.hash_table().is_building()); - loop { - if self.hash_table.is_done() { - let _ = self.reservation.try_resize(0); - return Poll::Ready(None); - } else if self.hash_table.is_building() { - match self.input.poll_next_unpin(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Some(Ok(batch))) => { - let timer = elapsed_compute.timer(); - self.reduction_factor.add_total(batch.num_rows()); - let result = self.hash_table.aggregate_batch(&batch); - timer.done(); - - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); - } + match self.input.poll_next_unpin(cx) { + Poll::Pending => ControlFlow::Break((Poll::Pending, original_state)), + Poll::Ready(Some(Ok(batch))) => { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let input_rows = batch.num_rows(); + self.reduction_factor.add_total(input_rows); + let result = original_state.hash_table_mut().aggregate_batch(&batch); + timer.done(); - if self.hit_soft_group_limit() { - let timer = elapsed_compute.timer(); - let result = self.start_output(); - timer.done(); + if let Err(e) = result { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); - } + if self.hit_soft_group_limit(original_state.hash_table()) { + let timer = elapsed_compute.timer(); + let result = self.start_output(original_state.hash_table_mut(), true); + timer.done(); - continue; - } + if let Err(e) = result { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } + + let PartialHashAggregateState::ReadingInput { hash_table } = + original_state + else { + unreachable!("expected reading input state") + }; + return ControlFlow::Continue( + PartialHashAggregateState::ProducingOutput { + hash_table, + skip_hash_table: None, + }, + ); + } - // TODO: impl memory-limited aggr, when OOM directly send - // partial state to final aggregate stage - if let Err(e) = - self.reservation.try_resize(self.hash_table.memory_size()) - { - return Poll::Ready(Some(Err(e))); + self.update_skip_aggregation_probe( + input_rows, + original_state.hash_table().building_group_count(), + ); + + // True branch: a decision has been made to skip partial aggregation. + if self.should_skip_aggregation() { + let timer = elapsed_compute.timer(); + let result = match original_state.hash_table().partial_skip_table() { + Ok(skip_hash_table) => self + .start_output(original_state.hash_table_mut(), false) + .map(|()| skip_hash_table), + Err(e) => Err(e), + }; + timer.done(); + + match result { + Ok(skip_hash_table) => { + let PartialHashAggregateState::ReadingInput { hash_table } = + original_state + else { + unreachable!("expected reading input state") + }; + + // Move to `ProducingOutput` first. Its `skip_hash_table` + // field moves the stream to skip-partial aggregation after + // the accumulated batches have been output. + return ControlFlow::Continue( + PartialHashAggregateState::ProducingOutput { + hash_table, + skip_hash_table: Some(skip_hash_table), + }, + ); } + Err(e) => { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } + } + } + + // TODO: impl memory-limited aggr, when OOM directly send + // partial state to final aggregate stage + if let Err(e) = self + .reservation + .try_resize(original_state.hash_table().memory_size()) + { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } + + ControlFlow::Continue(original_state) + } + Poll::Ready(Some(Err(e))) => { + ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) + } + Poll::Ready(None) => { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let result = self.start_output(original_state.hash_table_mut(), true); + timer.done(); + + match result { + Ok(()) => { + let PartialHashAggregateState::ReadingInput { hash_table } = + original_state + else { + unreachable!("expected reading input state") + }; + ControlFlow::Continue( + PartialHashAggregateState::ProducingOutput { + hash_table, + skip_hash_table: None, + }, + ) } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(Err(e))); + Err(e) => { + ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) } - Poll::Ready(None) => { - let timer = elapsed_compute.timer(); - let result = self.start_output(); - timer.done(); + } + } + } + } + + /// Handle ProducingOutput state - emit partial aggregate state batches. + /// + /// Returns the next operator state with control flow decision. + fn handle_producing_output( + &mut self, + mut original_state: PartialHashAggregateState, + ) -> PartialHashAggregateStateTransition { + debug_assert!(matches!( + &original_state, + PartialHashAggregateState::ProducingOutput { .. } + )); + debug_assert!(!original_state.hash_table().is_building()); - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let result = original_state.hash_table_mut().next_output_batch(); + timer.done(); + + match result { + Ok(Some(batch)) => { + let _ = self + .reservation + .try_resize(original_state.hash_table().memory_size()); + self.reduction_factor.add_part(batch.num_rows()); + debug_assert!(batch.num_rows() > 0); + let next_state = if original_state.hash_table().is_done() { + match original_state { + PartialHashAggregateState::ProducingOutput { + skip_hash_table: Some(hash_table), + .. + } => { + PartialHashAggregateState::SkippingAggregation { hash_table } } + PartialHashAggregateState::ProducingOutput { + skip_hash_table: None, + .. + } => PartialHashAggregateState::Done, + _ => unreachable!("expected producing output state"), } + } else { + original_state + }; + + ControlFlow::Break(( + Poll::Ready(Some(Ok(batch.record_output(&self.baseline_metrics)))), + next_state, + )) + } + Ok(None) => { + let _ = self.reservation.try_resize(0); + // If in the previous `Aggregating` stage it has decided to skip partial + // aggregation, go the `SkipAggregation` stage; otherwise finish. + let next_state = match original_state { + PartialHashAggregateState::ProducingOutput { + skip_hash_table: Some(hash_table), + .. + } => PartialHashAggregateState::SkippingAggregation { hash_table }, + PartialHashAggregateState::ProducingOutput { + skip_hash_table: None, + .. + } => PartialHashAggregateState::Done, + _ => unreachable!("expected producing output state"), + }; + ControlFlow::Continue(next_state) + } + Err(e) => ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)), + } + } + + /// Handle SkippingAggregation state - convert raw input directly to partial states. + /// + /// Returns the next operator state with control flow decision. + fn handle_skipping_aggregation( + &mut self, + cx: &mut Context<'_>, + mut original_state: PartialHashAggregateState, + ) -> PartialHashAggregateStateTransition { + debug_assert!(matches!( + &original_state, + PartialHashAggregateState::SkippingAggregation { .. } + )); + + match self.input.poll_next_unpin(cx) { + Poll::Pending => ControlFlow::Break((Poll::Pending, original_state)), + Poll::Ready(Some(Ok(batch))) => { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + probe.record_skipped(&batch); } - } else { + + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); - let result = self.hash_table.next_output_batch(); + let result = match &mut original_state { + PartialHashAggregateState::SkippingAggregation { hash_table } => { + hash_table.convert_batch_to_state(&batch) + } + _ => unreachable!("expected skipping aggregation state"), + }; timer.done(); match result { - Ok(Some(batch)) => { - let _ = - self.reservation.try_resize(self.hash_table.memory_size()); - self.reduction_factor.add_part(batch.num_rows()); - debug_assert!(batch.num_rows() > 0); - return Poll::Ready(Some(Ok( - batch.record_output(&self.baseline_metrics) - ))); - } - Ok(None) => { - let _ = self.reservation.try_resize(0); - return Poll::Ready(None); + Ok(batch) => ControlFlow::Break(( + Poll::Ready(Some( + Ok(batch.record_output(&self.baseline_metrics)), + )), + original_state, + )), + Err(e) => { + ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) } - Err(e) => return Poll::Ready(Some(Err(e))), + } + } + Poll::Ready(Some(Err(e))) => { + ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) + } + Poll::Ready(None) => { + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + ControlFlow::Continue(PartialHashAggregateState::Done) + } + } + } +} + +impl Stream for PartialHashAggregateStream { + type Item = Result; + + /// Entry point for the partial hash aggregate state machine. + /// + /// See comments in [`PartialHashAggregateStream`] for high-level ideas. + /// + /// ============================ + /// State transition graph: + /// ============================ + /// + /// (start) --> ReadingInput + /// ---------------------------- + /// ReadingInput -> ReadingInput (after aggregating an input batch) + /// ReadingInput -> ProducingOutput(skip=None) (input exhausted or soft + /// limit reached) + /// ReadingInput -> ProducingOutput(skip=Some) (partial skip triggered) + /// + /// ProducingOutput(skip=None) -> ProducingOutput(skip=None) + /// (after yielding one accumulated output batch) + /// ProducingOutput(skip=None) -> Done (all accumulated output emitted) + /// + /// ProducingOutput(skip=Some) -> ProducingOutput(skip=Some) + /// (after yielding one accumulated output batch) + /// ProducingOutput(skip=Some) -> SkippingAggregation (all accumulated output + /// emitted) + /// + /// SkippingAggregation -> SkippingAggregation (after yielding one + /// `convert_to_state` batch) + /// SkippingAggregation -> Done (input exhausted) + /// ---------------------------- + /// Done -> (end) + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + let cur_state = self + .state + .take() + .expect("PartialHashAggregateStream state should not be None"); + + let next_state = match cur_state { + state @ PartialHashAggregateState::ReadingInput { .. } => { + self.handle_reading_input(cx, state) + } + state @ PartialHashAggregateState::ProducingOutput { .. } => { + self.handle_producing_output(state) + } + state @ PartialHashAggregateState::SkippingAggregation { .. } => { + self.handle_skipping_aggregation(cx, state) + } + state @ PartialHashAggregateState::Done => { + let _ = self.reservation.try_resize(0); + self.state = Some(state); + return Poll::Ready(None); + } + }; + + match next_state { + ControlFlow::Continue(next_state) => { + self.state = Some(next_state); + continue; + } + ControlFlow::Break((poll, next_state)) => { + self.state = Some(next_state); + return poll; } } } @@ -317,101 +740,199 @@ impl FinalHashAggregateStream { Ok(Self { schema, input, - hash_table, baseline_metrics, reservation, group_values_soft_limit: agg.limit_options().map(|config| config.limit()), + state: Some(FinalHashAggregateState::ReadingInput { hash_table }), }) } /// See comments in [`Self::group_values_soft_limit`] for details. - fn hit_soft_group_limit(&self) -> bool { + fn hit_soft_group_limit(&self, hash_table: &AggregateHashTable) -> bool { self.group_values_soft_limit - .is_some_and(|limit| limit <= self.hash_table.building_group_count()) + .is_some_and(|limit| limit <= hash_table.building_group_count()) } - fn start_output(&mut self) -> Result<()> { + fn start_output(&mut self, hash_table: &mut AggregateHashTable) -> Result<()> { let input_schema = self.input.schema(); self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); - self.hash_table.start_output() + hash_table.start_output() } -} - -impl Stream for FinalHashAggregateStream { - type Item = Result; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, + /// Handle ReadingInput state - aggregate partial state batches into the hash table. + /// + /// Returns the next operator state with control flow decision. + fn handle_reading_input( + &mut self, cx: &mut Context<'_>, - ) -> Poll> { - let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); - - loop { - if self.hash_table.is_done() { - let _ = self.reservation.try_resize(0); - return Poll::Ready(None); - } else if self.hash_table.is_building() { - match self.input.poll_next_unpin(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Some(Ok(batch))) => { - let timer = elapsed_compute.timer(); - let result = self.hash_table.aggregate_batch(&batch); - timer.done(); - - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); - } + mut original_state: FinalHashAggregateState, + ) -> FinalHashAggregateStateTransition { + debug_assert!(matches!( + &original_state, + FinalHashAggregateState::ReadingInput { .. } + )); + debug_assert!(original_state.hash_table().is_building()); - if self.hit_soft_group_limit() { - let timer = elapsed_compute.timer(); - let result = self.start_output(); - timer.done(); + match self.input.poll_next_unpin(cx) { + Poll::Pending => ControlFlow::Break((Poll::Pending, original_state)), + Poll::Ready(Some(Ok(batch))) => { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let result = original_state.hash_table_mut().aggregate_batch(&batch); + timer.done(); - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); - } + if let Err(e) = result { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } - continue; - } + if self.hit_soft_group_limit(original_state.hash_table()) { + let timer = elapsed_compute.timer(); + let result = self.start_output(original_state.hash_table_mut()); + timer.done(); - if let Err(e) = - self.reservation.try_resize(self.hash_table.memory_size()) - { - return Poll::Ready(Some(Err(e))); - } - } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(Err(e))); + if let Err(e) = result { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); } - Poll::Ready(None) => { - let timer = elapsed_compute.timer(); - let result = self.start_output(); - timer.done(); - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); - } - } + return ControlFlow::Continue(original_state.into_producing_output()); } - } else { + + if let Err(e) = self + .reservation + .try_resize(original_state.hash_table().memory_size()) + { + return ControlFlow::Break(( + Poll::Ready(Some(Err(e))), + original_state, + )); + } + + ControlFlow::Continue(original_state) + } + Poll::Ready(Some(Err(e))) => { + ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) + } + Poll::Ready(None) => { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); - let result = self.hash_table.next_output_batch(); + let result = self.start_output(original_state.hash_table_mut()); timer.done(); match result { - Ok(Some(batch)) => { - let _ = - self.reservation.try_resize(self.hash_table.memory_size()); - debug_assert!(batch.num_rows() > 0); - return Poll::Ready(Some(Ok( - batch.record_output(&self.baseline_metrics) - ))); + Ok(()) => { + ControlFlow::Continue(original_state.into_producing_output()) } - Ok(None) => { - let _ = self.reservation.try_resize(0); - return Poll::Ready(None); + Err(e) => { + ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)) } - Err(e) => return Poll::Ready(Some(Err(e))), + } + } + } + } + + /// Handle ProducingOutput state - emit final aggregate value batches. + /// + /// Returns the next operator state with control flow decision. + fn handle_producing_output( + &mut self, + mut original_state: FinalHashAggregateState, + ) -> FinalHashAggregateStateTransition { + debug_assert!(matches!( + &original_state, + FinalHashAggregateState::ProducingOutput { .. } + )); + debug_assert!(!original_state.hash_table().is_building()); + + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); + let result = original_state.hash_table_mut().next_output_batch(); + timer.done(); + + match result { + Ok(Some(batch)) => { + let _ = self + .reservation + .try_resize(original_state.hash_table().memory_size()); + debug_assert!(batch.num_rows() > 0); + let next_state = if original_state.hash_table().is_done() { + original_state.into_done() + } else { + original_state + }; + + ControlFlow::Break(( + Poll::Ready(Some(Ok(batch.record_output(&self.baseline_metrics)))), + next_state, + )) + } + Ok(None) => { + let _ = self.reservation.try_resize(0); + ControlFlow::Continue(original_state.into_done()) + } + Err(e) => ControlFlow::Break((Poll::Ready(Some(Err(e))), original_state)), + } + } +} + +impl Stream for FinalHashAggregateStream { + type Item = Result; + + /// Entry point for the final hash aggregate state machine. + /// + /// See comments in [`FinalHashAggregateStream`] for high-level ideas. + /// + /// ============================ + /// State transition graph: + /// ============================ + /// + /// (start) --> ReadingInput + /// ---------------------------- + /// ReadingInput -> ReadingInput (after aggregating one partial-state input + /// batch) + /// ReadingInput -> ProducingOutput (input exhausted or soft limit reached) + /// + /// ProducingOutput -> ProducingOutput (after yielding one final output batch) + /// ProducingOutput -> Done (all final output emitted) + /// ---------------------------- + /// Done -> (end) + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + let cur_state = self + .state + .take() + .expect("FinalHashAggregateStream state should not be None"); + + let next_state = match cur_state { + state @ FinalHashAggregateState::ReadingInput { .. } => { + self.handle_reading_input(cx, state) + } + state @ FinalHashAggregateState::ProducingOutput { .. } => { + self.handle_producing_output(state) + } + state @ FinalHashAggregateState::Done => { + let _ = self.reservation.try_resize(0); + self.state = Some(state); + return Poll::Ready(None); + } + }; + + match next_state { + ControlFlow::Continue(next_state) => { + self.state = Some(next_state); + continue; + } + ControlFlow::Break((poll, next_state)) => { + self.state = Some(next_state); + return poll; } } } diff --git a/datafusion/physical-plan/src/aggregates/hash_table.rs b/datafusion/physical-plan/src/aggregates/hash_table.rs index 87f16d0eebe6f..2e5702f750546 100644 --- a/datafusion/physical-plan/src/aggregates/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/hash_table.rs @@ -22,9 +22,10 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, BooleanArray, new_null_array}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{Result, internal_err}; +use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::{EmitTo, GroupsAccumulator}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use super::group_values::{GroupByMetrics, GroupValues, new_group_values}; use super::order::GroupOrdering; @@ -34,10 +35,11 @@ use super::{ group_id_array, max_duplicate_ordinal, }; use crate::PhysicalExpr; -use crate::metrics::{MetricBuilder, MetricCategory}; /// Marker for raw rows -> partial state aggregation. pub(super) struct Partial; +/// Marker for raw rows -> partial state conversion without aggregation. +pub(super) struct PartialSkip; /// Marker for partial state -> final value aggregation. pub(super) struct Final; @@ -76,6 +78,10 @@ pub(super) struct AggregateHashTable { } struct HashAggregateAccumulator { + /// Aggregate expression used to create a fresh accumulator for related + /// hash tables, such as the partial-skip table. + aggregate_expr: Arc, + /// Arguments to pass to this accumulator. /// /// Example: `CORR(x, y)` stores two expressions here, while `SUM(x)` stores one. @@ -147,17 +153,29 @@ enum AggregateHashTableState { impl HashAggregateAccumulator { fn new( + aggregate_expr: Arc, arguments: Vec>, filter: Option>, accumulator: Box, ) -> Self { Self { + aggregate_expr, arguments, filter, accumulator, } } + fn empty_like(&self) -> Result { + let accumulator = create_group_accumulator(&self.aggregate_expr)?; + Ok(Self::new( + Arc::clone(&self.aggregate_expr), + self.arguments.clone(), + self.filter.clone(), + accumulator, + )) + } + fn evaluate(&self, batch: &RecordBatch) -> Result { let arguments = self .arguments @@ -223,6 +241,15 @@ impl HashAggregateAccumulator { self.accumulator.supports_convert_to_state() } + fn convert_to_state( + &mut self, + values: &EvaluatedHashAggregateAccumulator, + ) -> Result> { + let opt_filter = values.filter.as_ref().map(|filter| filter.as_boolean()); + self.accumulator + .convert_to_state(&values.arguments, opt_filter) + } + fn null_arguments(&self, input_schema: &SchemaRef) -> Result> { self.arguments .iter() @@ -272,6 +299,7 @@ impl AggregateHashTable { .map(|((agg_expr, arguments), filter)| { let accumulator = create_group_accumulator(agg_expr)?; Ok(HashAggregateAccumulator::new( + Arc::clone(agg_expr), arguments, filter, accumulator, @@ -342,6 +370,7 @@ impl AggregateHashTable { } } + /// How many distinct groups has been accumulated now. pub(super) fn building_group_count(&self) -> usize { self.state.building().group_values.len() } @@ -410,27 +439,49 @@ impl AggregateHashTable { output_schema: SchemaRef, batch_size: usize, ) -> Result { - let table = Self::new_with_filters( + Self::new_with_filters( agg, partition, output_schema, batch_size, agg.filter_expr.iter().cloned().collect(), - )?; + ) + } - if table - .state + pub(super) fn can_skip_aggregation(&self) -> bool { + self.state .building() .accumulators .iter() .all(|acc| acc.supports_convert_to_state()) - { - let _skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) - .with_category(MetricCategory::Rows) - .counter("skipped_aggregation_rows", partition); - } + } + + /// In skip-partial-aggregation optimization, when a decision has made to skip + /// partial stage, build a typed hash table only for aggregation state conversion + /// row-by-row. + pub(super) fn partial_skip_table(&self) -> Result> { + let state = self.state.building(); + let group_schema = state.group_by.group_schema(&self.input_schema)?; + let group_values = new_group_values(group_schema, &GroupOrdering::None)?; + let accumulators = state + .accumulators + .iter() + .map(HashAggregateAccumulator::empty_like) + .collect::>>()?; - Ok(table) + Ok(AggregateHashTable { + group_by_metrics: self.group_by_metrics.clone(), + input_schema: Arc::clone(&self.input_schema), + output_schema: Arc::clone(&self.output_schema), + batch_size: self.batch_size, + state: AggregateHashTableState::Building(BuildingHashTableState { + group_by: Arc::clone(&state.group_by), + group_values, + batch_group_indices: Default::default(), + accumulators, + }), + _mode: PhantomData, + }) } pub(super) fn aggregate_batch(&mut self, batch: &RecordBatch) -> Result<()> { @@ -551,6 +602,40 @@ impl AggregateHashTable { } } +impl AggregateHashTable { + pub(super) fn convert_batch_to_state( + &mut self, + batch: &RecordBatch, + ) -> Result { + let evaluated_batch = self.evaluate_batch(batch)?; + + assert_eq_or_internal_err!( + evaluated_batch.grouping_set_args.len(), + 1, + "group_values expected to have single element" + ); + let mut output = evaluated_batch + .grouping_set_args + .into_iter() + .next() + .unwrap_or_default(); + + let state = self.state.building_mut(); + for (acc, values) in state + .accumulators + .iter_mut() + .zip(evaluated_batch.accumulator_args.iter()) + { + output.extend(acc.convert_to_state(values)?); + } + + Ok(RecordBatch::try_new( + Arc::clone(&self.output_schema), + output, + )?) + } +} + impl AggregateHashTable { pub(super) fn new( agg: &AggregateExec, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index a5f1621812561..caccbaf564fbc 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -79,6 +79,7 @@ pub mod order; mod row_hash; mod topk; mod topk_stream; +mod utils; /// Returns true if TopK aggregation data structures support the provided key and value types. /// @@ -4102,6 +4103,189 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_partial_hash_stream_skip_aggregation_after_first_batch() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr = vec![ + AggregateExprBuilder::new(count_udaf(), vec![col("val", &schema)?]) + .schema(Arc::clone(&schema)) + .alias(String::from("COUNT(val)")) + .build() + .map(Arc::new)?, + ]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = + TestMemoryExec::try_new_exec(&[input_data], Arc::clone(&schema), None)?; + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let session_config = SessionConfig::default() + .set_bool("datafusion.execution.enable_migration_aggregate", true) + .set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + &ScalarValue::Int64(Some(2)), + ) + .set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + &ScalarValue::Float64(Some(0.1)), + ); + + let ctx = Arc::new(TaskContext::default().with_session_config(session_config)); + let output = collect(aggregate_exec.execute(0, Arc::clone(&ctx))?).await?; + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&output), @r" + +-----+-------------------+ + | key | COUNT(val)[count] | + +-----+-------------------+ + | 1 | 1 | + | 2 | 1 | + | 2 | 1 | + | 3 | 1 | + | 3 | 1 | + | 4 | 1 | + +-----+-------------------+ + "); + } + + let metrics = aggregate_exec.metrics().unwrap(); + let skipped_rows = metrics + .sum_by_name("skipped_aggregation_rows") + .map(|m| m.as_usize()) + .unwrap_or(0); + assert_eq!(skipped_rows, 3); + + Ok(()) + } + + #[tokio::test] + async fn test_partial_hash_stream_skip_aggregation_after_threshold() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr = vec![ + AggregateExprBuilder::new(count_udaf(), vec![col("val", &schema)?]) + .schema(Arc::clone(&schema)) + .alias(String::from("COUNT(val)")) + .build() + .map(Arc::new)?, + ]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = + TestMemoryExec::try_new_exec(&[input_data], Arc::clone(&schema), None)?; + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let session_config = SessionConfig::default() + .set_bool("datafusion.execution.enable_migration_aggregate", true) + .set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + &ScalarValue::Int64(Some(5)), + ) + .set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + &ScalarValue::Float64(Some(0.1)), + ); + + let ctx = Arc::new(TaskContext::default().with_session_config(session_config)); + let output = collect(aggregate_exec.execute(0, Arc::clone(&ctx))?).await?; + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&output), @r" + +-----+-------------------+ + | key | COUNT(val)[count] | + +-----+-------------------+ + | 1 | 1 | + | 2 | 1 | + | 2 | 2 | + | 3 | 1 | + | 3 | 2 | + | 4 | 1 | + | 4 | 1 | + +-----+-------------------+ + "); + } + + let metrics = aggregate_exec.metrics().unwrap(); + let skipped_rows = metrics + .sum_by_name("skipped_aggregation_rows") + .map(|m| m.as_usize()) + .unwrap_or(0); + assert_eq!(skipped_rows, 3); + + Ok(()) + } + /// When `skip_partial_aggregation_probe_ratio_threshold` is set to 1.0, /// the feature must be effectively disabled: even with 100% cardinality /// (every row is a unique group), no rows should be skipped. diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c3f73976c721a..5af294eef5088 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -22,6 +22,7 @@ use std::task::{Context, Poll}; use std::vec; use super::order::GroupOrdering; +use super::utils::SkipAggregationProbe; use super::{AggregateExec, format_human_display}; use crate::aggregates::group_values::{GroupByMetrics, GroupValues, new_group_values}; use crate::aggregates::order::GroupOrderingFull; @@ -118,100 +119,6 @@ struct SpillState { // Metrics related to spilling are managed inside `spill_manager` } -/// Tracks if the aggregate should skip partial aggregations -/// -/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] -struct SkipAggregationProbe { - // ======================================================================== - // PROPERTIES: - // These fields are initialized at the start and remain constant throughout - // the execution. - // ======================================================================== - /// Aggregation ratio check performed when the number of input rows exceeds - /// this threshold (from `SessionConfig`) - probe_rows_threshold: usize, - /// Maximum ratio of `num_groups` to `input_rows` for continuing aggregation - /// (from `SessionConfig`). If the ratio exceeds this value, aggregation - /// is skipped and input rows are directly converted to output - probe_ratio_threshold: f64, - - // ======================================================================== - // STATES: - // Fields changes during execution. Can be buffer, or state flags that - // influence the execution in parent `GroupedHashAggregateStream` - // ======================================================================== - /// Number of processed input rows (updated during probing) - input_rows: usize, - /// Number of total group values for `input_rows` (updated during probing) - num_groups: usize, - - /// Flag indicating further data aggregation may be skipped (decision made - /// when probing complete) - should_skip: bool, - /// Flag indicating further updates of `SkipAggregationProbe` state won't - /// make any effect (set either while probing or on probing completion) - is_locked: bool, - - // ======================================================================== - // METRICS: - // ======================================================================== - /// Number of rows where state was output without aggregation. - /// - /// * If 0, all input rows were aggregated (should_skip was always false) - /// - /// * if greater than zero, the number of rows which were output directly - /// without aggregation - skipped_aggregation_rows: metrics::Count, -} - -impl SkipAggregationProbe { - fn new( - probe_rows_threshold: usize, - probe_ratio_threshold: f64, - skipped_aggregation_rows: metrics::Count, - ) -> Self { - Self { - input_rows: 0, - num_groups: 0, - probe_rows_threshold, - probe_ratio_threshold, - should_skip: false, - is_locked: false, - skipped_aggregation_rows, - } - } - - /// Updates `SkipAggregationProbe` state: - /// - increments the number of input rows - /// - replaces the number of groups with the new value - /// - on `probe_rows_threshold` exceeded calculates - /// aggregation ratio and sets `should_skip` flag - /// - if `should_skip` is set, locks further state updates - fn update_state(&mut self, input_rows: usize, num_groups: usize) { - if self.is_locked { - return; - } - self.input_rows += input_rows; - self.num_groups = num_groups; - if self.input_rows >= self.probe_rows_threshold { - self.should_skip = self.num_groups as f64 / self.input_rows as f64 - > self.probe_ratio_threshold; - // Set is_locked to true only if we have decided to skip, otherwise we can try to skip - // during processing the next record_batch. - self.is_locked = self.should_skip; - } - } - - fn should_skip(&self) -> bool { - self.should_skip - } - - /// Record the number of rows that were output directly without aggregation - fn record_skipped(&mut self, batch: &RecordBatch) { - self.skipped_aggregation_rows.add(batch.num_rows()); - } -} - /// Controls the behavior when an out-of-memory condition occurs. #[derive(PartialEq, Debug)] enum OutOfMemoryMode { @@ -1479,7 +1386,6 @@ impl GroupedHashAggregateStream { mod tests { use super::*; use crate::InputOrderMode; - use crate::execution_plan::ExecutionPlan; use crate::test::TestMemoryExec; use arrow::array::{Int32Array, Int64Array}; use arrow::datatypes::{DataType, Field, Schema}; @@ -1594,152 +1500,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_skip_aggregation_probe_not_locked_until_skip() -> Result<()> { - // Test that the probe is not locked until we actually decide to skip. - // This allows us to continue evaluating the skip condition across multiple batches. - // - // Scenario: - // - Batch 1: Hits rows threshold but NOT ratio threshold (low cardinality) -> don't skip - // - Batch 2: Now hits ratio threshold (high cardinality) -> skip - // - // Without the fix, the probe would be locked after batch 1, preventing the skip - // decision from being made on batch 2. - - let schema = Arc::new(Schema::new(vec![ - Field::new("group_col", DataType::Int32, false), - Field::new("value_col", DataType::Int32, false), - ])); - - // Configure thresholds: - // - probe_rows_threshold: 100 rows - // - probe_ratio_threshold: 0.8 (80%) - let probe_rows_threshold = 100; - let probe_ratio_threshold = 0.8; - - // Batch 1: 100 rows with only 10 unique groups - // Ratio: 10/100 = 0.1 (10%) < 0.8 -> should NOT skip - // This will hit the rows threshold but not the ratio threshold - let batch1_rows = 100; - let batch1_groups = 10; - let mut group_ids_batch1 = Vec::new(); - for i in 0..batch1_rows { - group_ids_batch1.push((i % batch1_groups) as i32); - } - let values_batch1: Vec = vec![1; batch1_rows]; - - let batch1 = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(group_ids_batch1)), - Arc::new(Int32Array::from(values_batch1)), - ], - )?; - - // Batch 2: 360 rows with 360 unique NEW groups (starting from group 10) - // After batch 2, total: 460 rows, 370 groups - // Ratio: 370/460 ≈ 0.804 (80.4%) > 0.8 -> SHOULD decide to skip - let batch2_rows = 360; - let batch2_groups = 360; - let group_ids_batch2: Vec = (batch1_groups..(batch1_groups + batch2_groups)) - .map(|x| x as i32) - .collect(); - let values_batch2: Vec = vec![1; batch2_rows]; - - let batch2 = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(group_ids_batch2)), - Arc::new(Int32Array::from(values_batch2)), - ], - )?; - - // Batch 3: This batch should be skipped since we decided to skip after batch 2 - // 100 rows with 100 unique groups (continuing from where batch 2 left off) - let batch3_rows = 100; - let batch3_groups = 100; - let batch3_start_group = batch1_groups + batch2_groups; - let group_ids_batch3: Vec = (batch3_start_group - ..(batch3_start_group + batch3_groups)) - .map(|x| x as i32) - .collect(); - let values_batch3: Vec = vec![1; batch3_rows]; - - let batch3 = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(group_ids_batch3)), - Arc::new(Int32Array::from(values_batch3)), - ], - )?; - - let input_partitions = vec![vec![batch1, batch2, batch3]]; - - let runtime = RuntimeEnvBuilder::default().build_arc()?; - let mut task_ctx = TaskContext::default().with_runtime(runtime); - - // Configure skip aggregation settings - let mut session_config = task_ctx.session_config().clone(); - session_config = session_config.set( - "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", - &datafusion_common::ScalarValue::UInt64(Some(probe_rows_threshold)), - ); - session_config = session_config.set( - "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", - &datafusion_common::ScalarValue::Float64(Some(probe_ratio_threshold)), - ); - task_ctx = task_ctx.with_session_config(session_config); - let task_ctx = Arc::new(task_ctx); - - // Create aggregate: COUNT(*) GROUP BY group_col - let group_expr = vec![(col("group_col", &schema)?, "group_col".to_string())]; - let aggr_expr = vec![Arc::new( - AggregateExprBuilder::new(count_udaf(), vec![col("value_col", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("count_value") - .build()?, - )]; - - let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; - let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec))); - - // Use Partial mode - let aggregate_exec = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::new_single(group_expr), - aggr_expr, - vec![None], - exec, - Arc::clone(&schema), - )?; - - // Execute and collect results - let mut stream = - GroupedHashAggregateStream::new(&aggregate_exec, &Arc::clone(&task_ctx), 0)?; - let mut results = Vec::new(); - - while let Some(result) = stream.next().await { - let batch = result?; - results.push(batch); - } - - // Check that skip aggregation actually happened - // The key metric is skipped_aggregation_rows - let metrics = aggregate_exec.metrics().unwrap(); - let skipped_rows = metrics - .sum_by_name("skipped_aggregation_rows") - .map(|m| m.as_usize()) - .unwrap_or(0); - - // We expect batch 3's rows to be skipped (100 rows) - assert_eq!( - skipped_rows, batch3_rows, - "Expected batch 3's rows ({batch3_rows}) to be skipped", - ); - - Ok(()) - } - #[tokio::test] async fn test_emit_early_with_partially_sorted() -> Result<()> { // Reproducer for #20445: EmitEarly with PartiallySorted panics in @@ -1823,25 +1583,4 @@ mod tests { Ok(()) } - - #[test] - fn test_skip_aggregation_probe_equality_does_not_skip() { - // When num_groups / input_rows == probe_ratio_threshold, the `>` boundary - // means we must NOT skip — equality is not sufficient to trigger skip. - let threshold_ratio = 0.5_f64; - let threshold_rows = 10_usize; - let mut probe = SkipAggregationProbe::new( - threshold_rows, - threshold_ratio, - metrics::Count::new(), - ); - - // 10 rows, 5 groups → ratio = 5/10 = 0.5 exactly equals threshold - probe.update_state(10, 5); - - assert!( - !probe.should_skip(), - "ratio == threshold should not trigger skip (boundary is exclusive)" - ); - } } diff --git a/datafusion/physical-plan/src/aggregates/utils.rs b/datafusion/physical-plan/src/aggregates/utils.rs new file mode 100644 index 0000000000000..a4306c69b411e --- /dev/null +++ b/datafusion/physical-plan/src/aggregates/utils.rs @@ -0,0 +1,303 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::record_batch::RecordBatch; + +use crate::metrics; + +/// Tracks if the aggregate should skip partial aggregations +/// +/// See "partial aggregation" discussion on +/// [`crate::aggregates::row_hash::GroupedHashAggregateStream`]. +pub(super) struct SkipAggregationProbe { + // ======================================================================== + // PROPERTIES: + // These fields are initialized at the start and remain constant throughout + // the execution. + // ======================================================================== + /// Aggregation ratio check performed when the number of input rows exceeds + /// this threshold (from `SessionConfig`) + probe_rows_threshold: usize, + /// Maximum ratio of `num_groups` to `input_rows` for continuing aggregation + /// (from `SessionConfig`). If the ratio exceeds this value, aggregation + /// is skipped and input rows are directly converted to output + probe_ratio_threshold: f64, + + // ======================================================================== + // STATES: + // Fields changes during execution. Can be buffer, or state flags that + // influence the execution in parent `GroupedHashAggregateStream` + // ======================================================================== + /// Number of processed input rows (updated during probing) + input_rows: usize, + /// Number of total group values for `input_rows` (updated during probing) + num_groups: usize, + + /// Flag indicating further data aggregation may be skipped (decision made + /// when probing complete) + should_skip: bool, + /// Flag indicating further updates of `SkipAggregationProbe` state won't + /// make any effect (set either while probing or on probing completion) + is_locked: bool, + + // ======================================================================== + // METRICS: + // ======================================================================== + /// Number of rows where state was output without aggregation. + /// + /// * If 0, all input rows were aggregated (should_skip was always false) + /// + /// * if greater than zero, the number of rows which were output directly + /// without aggregation + skipped_aggregation_rows: metrics::Count, +} + +impl SkipAggregationProbe { + pub(super) fn new( + probe_rows_threshold: usize, + probe_ratio_threshold: f64, + skipped_aggregation_rows: metrics::Count, + ) -> Self { + Self { + input_rows: 0, + num_groups: 0, + probe_rows_threshold, + probe_ratio_threshold, + should_skip: false, + is_locked: false, + skipped_aggregation_rows, + } + } + + /// Updates `SkipAggregationProbe` state: + /// - increments the number of input rows + /// - replaces the number of groups with the new value + /// - on `probe_rows_threshold` exceeded calculates + /// aggregation ratio and sets `should_skip` flag + /// - if `should_skip` is set, locks further state updates + pub(super) fn update_state(&mut self, input_rows: usize, num_groups: usize) { + if self.is_locked { + return; + } + self.input_rows += input_rows; + self.num_groups = num_groups; + if self.input_rows >= self.probe_rows_threshold { + self.should_skip = self.num_groups as f64 / self.input_rows as f64 + > self.probe_ratio_threshold; + // Set is_locked to true only if we have decided to skip, otherwise we can try to skip + // during processing the next record_batch. + self.is_locked = self.should_skip; + } + } + + pub(super) fn should_skip(&self) -> bool { + self.should_skip + } + + /// Record the number of rows that were output directly without aggregation + pub(super) fn record_skipped(&mut self, batch: &RecordBatch) { + self.skipped_aggregation_rows.add(batch.num_rows()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::aggregates::row_hash::GroupedHashAggregateStream; + use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; + use crate::execution_plan::ExecutionPlan; + use crate::test::TestMemoryExec; + + use std::sync::Arc; + + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::Result; + use datafusion_execution::TaskContext; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_functions_aggregate::count::count_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; + use datafusion_physical_expr::expressions::col; + use futures::StreamExt; + + #[tokio::test] + async fn test_skip_aggregation_probe_not_locked_until_skip() -> Result<()> { + // Test that the probe is not locked until we actually decide to skip. + // This allows us to continue evaluating the skip condition across multiple batches. + // + // Scenario: + // - Batch 1: Hits rows threshold but NOT ratio threshold (low cardinality) -> don't skip + // - Batch 2: Now hits ratio threshold (high cardinality) -> skip + // + // Without the fix, the probe would be locked after batch 1, preventing the skip + // decision from being made on batch 2. + + let schema = Arc::new(Schema::new(vec![ + Field::new("group_col", DataType::Int32, false), + Field::new("value_col", DataType::Int32, false), + ])); + + // Configure thresholds: + // - probe_rows_threshold: 100 rows + // - probe_ratio_threshold: 0.8 (80%) + let probe_rows_threshold = 100; + let probe_ratio_threshold = 0.8; + + // Batch 1: 100 rows with only 10 unique groups + // Ratio: 10/100 = 0.1 (10%) < 0.8 -> should NOT skip + // This will hit the rows threshold but not the ratio threshold + let batch1_rows = 100; + let batch1_groups = 10; + let mut group_ids_batch1 = Vec::new(); + for i in 0..batch1_rows { + group_ids_batch1.push((i % batch1_groups) as i32); + } + let values_batch1: Vec = vec![1; batch1_rows]; + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(group_ids_batch1)), + Arc::new(Int32Array::from(values_batch1)), + ], + )?; + + // Batch 2: 360 rows with 360 unique NEW groups (starting from group 10) + // After batch 2, total: 460 rows, 370 groups + // Ratio: 370/460 is about 0.804 (80.4%) > 0.8 -> SHOULD decide to skip + let batch2_rows = 360; + let batch2_groups = 360; + let group_ids_batch2: Vec = (batch1_groups..(batch1_groups + batch2_groups)) + .map(|x| x as i32) + .collect(); + let values_batch2: Vec = vec![1; batch2_rows]; + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(group_ids_batch2)), + Arc::new(Int32Array::from(values_batch2)), + ], + )?; + + // Batch 3: This batch should be skipped since we decided to skip after batch 2 + // 100 rows with 100 unique groups (continuing from where batch 2 left off) + let batch3_rows = 100; + let batch3_groups = 100; + let batch3_start_group = batch1_groups + batch2_groups; + let group_ids_batch3: Vec = (batch3_start_group + ..(batch3_start_group + batch3_groups)) + .map(|x| x as i32) + .collect(); + let values_batch3: Vec = vec![1; batch3_rows]; + + let batch3 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(group_ids_batch3)), + Arc::new(Int32Array::from(values_batch3)), + ], + )?; + + let input_partitions = vec![vec![batch1, batch2, batch3]]; + + let runtime = RuntimeEnvBuilder::default().build_arc()?; + let mut task_ctx = TaskContext::default().with_runtime(runtime); + + // Configure skip aggregation settings + let mut session_config = task_ctx.session_config().clone(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + &datafusion_common::ScalarValue::UInt64(Some(probe_rows_threshold)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + &datafusion_common::ScalarValue::Float64(Some(probe_ratio_threshold)), + ); + task_ctx = task_ctx.with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); + + // Create aggregate: COUNT(*) GROUP BY group_col + let group_expr = vec![(col("group_col", &schema)?, "group_col".to_string())]; + let aggr_expr = vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("value_col", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("count_value") + .build()?, + )]; + + let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; + let exec = Arc::new(TestMemoryExec::update_cache(&Arc::new(exec))); + + // Use Partial mode + let aggregate_exec = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(group_expr), + aggr_expr, + vec![None], + exec, + Arc::clone(&schema), + )?; + + // Execute and collect results + let mut stream = + GroupedHashAggregateStream::new(&aggregate_exec, &Arc::clone(&task_ctx), 0)?; + let mut results = Vec::new(); + + while let Some(result) = stream.next().await { + let batch = result?; + results.push(batch); + } + + // Check that skip aggregation actually happened. + // The key metric is skipped_aggregation_rows. + let metrics = aggregate_exec.metrics().unwrap(); + let skipped_rows = metrics + .sum_by_name("skipped_aggregation_rows") + .map(|m| m.as_usize()) + .unwrap_or(0); + + // We expect batch 3's rows to be skipped (100 rows) + assert_eq!( + skipped_rows, batch3_rows, + "Expected batch 3's rows ({batch3_rows}) to be skipped", + ); + + Ok(()) + } + + #[test] + fn test_skip_aggregation_probe_equality_does_not_skip() { + // When num_groups / input_rows == probe_ratio_threshold, the `>` boundary + // means we must NOT skip: equality is not sufficient to trigger skip. + let threshold_ratio = 0.5_f64; + let threshold_rows = 10_usize; + let mut probe = SkipAggregationProbe::new( + threshold_rows, + threshold_ratio, + metrics::Count::new(), + ); + + // 10 rows, 5 groups: ratio = 5/10 = 0.5 exactly equals threshold + probe.update_state(10, 5); + + assert!( + !probe.should_skip(), + "ratio == threshold should not trigger skip (boundary is exclusive)" + ); + } +} diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8d334d8433284..9bdd96931e381 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -218,7 +218,7 @@ datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics true datafusion.execution.enable_ansi_mode false -datafusion.execution.enable_migration_aggregate false +datafusion.execution.enable_migration_aggregate true datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.hash_join_buffering_capacity 0 @@ -375,7 +375,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. -datafusion.execution.enable_migration_aggregate false Temporary switch for aggregate stream implementations that are being migrated from `GroupedHashAggregateStream`. When set to true, DataFusion tries the migrated implementations when their preconditions are satisfied. When set to false, grouped aggregation falls back to `GroupedHashAggregateStream`. This option will be removed after the migration is finished. See for details. +datafusion.execution.enable_migration_aggregate true Temporary switch for aggregate stream implementations that are being migrated from `GroupedHashAggregateStream`. When set to true, DataFusion tries the migrated implementations when their preconditions are satisfied. When set to false, grouped aggregation falls back to `GroupedHashAggregateStream`. This option will be removed after the migration is finished. See for details. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index fa9213b965d19..9bc05f754f3b2 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -120,7 +120,7 @@ The following configuration settings are available: | datafusion.execution.parquet.content_defined_chunking.norm_level | 0 | Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | -| datafusion.execution.enable_migration_aggregate | false | Temporary switch for aggregate stream implementations that are being migrated from `GroupedHashAggregateStream`. When set to true, DataFusion tries the migrated implementations when their preconditions are satisfied. When set to false, grouped aggregation falls back to `GroupedHashAggregateStream`. This option will be removed after the migration is finished. See for details. | +| datafusion.execution.enable_migration_aggregate | true | Temporary switch for aggregate stream implementations that are being migrated from `GroupedHashAggregateStream`. When set to true, DataFusion tries the migrated implementations when their preconditions are satisfied. When set to false, grouped aggregation falls back to `GroupedHashAggregateStream`. This option will be removed after the migration is finished. See for details. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | diff --git a/uv.lock b/uv.lock index f86b732dfd6d5..b3a16d4da8c4d 100644 --- a/uv.lock +++ b/uv.lock @@ -350,8 +350,8 @@ dependencies = [ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6,<4" }, { name = "maturin", specifier = ">=1.13.3,<2" }, - { name = "myst-parser", specifier = ">=5,<6" }, - { name = "pydata-sphinx-theme", specifier = ">=0.17.1,<1" }, + { name = "myst-parser", specifier = ">=5.1.0,<6" }, + { name = "pydata-sphinx-theme", specifier = ">=0.18.0,<1" }, { name = "setuptools", specifier = ">=82.0.1,<83" }, { name = "sphinx", specifier = ">=9,<10" }, { name = "sphinx-reredirects", specifier = ">=1.1,<2" }, @@ -465,14 +465,14 @@ wheels = [ [[package]] name = "markdown-it-py" -version = "4.0.0" +version = "4.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "mdurl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +sdist = { url = "https://files.pythonhosted.org/packages/06/ff/7841249c247aa650a76b9ee4bbaeae59370dc8bfd2f6c01f3630c35eb134/markdown_it_py-4.2.0.tar.gz", hash = "sha256:04a21681d6fbb623de53f6f364d352309d4094dd4194040a10fd51833e418d49", size = 82454, upload-time = "2026-05-07T12:08:28.36Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, + { url = "https://files.pythonhosted.org/packages/b3/81/4da04ced5a082363ecfa159c010d200ecbd959ae410c10c0264a38cac0f5/markdown_it_py-4.2.0-py3-none-any.whl", hash = "sha256:9f7ebbcd14fe59494226453aed97c1070d83f8d24b6fc3a3bcf9a38092641c4a", size = 91687, upload-time = "2026-05-07T12:08:27.182Z" }, ] [[package]] @@ -572,14 +572,14 @@ wheels = [ [[package]] name = "mdit-py-plugins" -version = "0.5.0" +version = "0.6.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "markdown-it-py" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } +sdist = { url = "https://files.pythonhosted.org/packages/59/fc/f8d0863f8862f25602c0404d75568e89fb6b4109804645e5cdfb1be5cf56/mdit_py_plugins-0.6.1.tar.gz", hash = "sha256:a2bca0f039f39dbd35fb74ae1b5f998608c437463371f0ff7f49a19a17a114d0", size = 56114, upload-time = "2026-05-13T09:03:38.91Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" }, + { url = "https://files.pythonhosted.org/packages/a5/69/6da5581c6a7fede7dc261bf4e67d6adca4196f176b43288b55b3db395b6e/mdit_py_plugins-0.6.1-py3-none-any.whl", hash = "sha256:214c82fb2ac524472ab6a5bcab1de80f73b50443e187f401bfd77efbc7c6481d", size = 66663, upload-time = "2026-05-13T09:03:37.76Z" }, ] [[package]] @@ -593,7 +593,7 @@ wheels = [ [[package]] name = "myst-parser" -version = "5.0.0" +version = "5.1.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "docutils" }, @@ -604,9 +604,9 @@ dependencies = [ { name = "sphinx", version = "9.0.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.12'" }, { name = "sphinx", version = "9.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.12'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/33/fa/7b45eef11b7971f0beb29d27b7bfe0d747d063aa29e170d9edd004733c8a/myst_parser-5.0.0.tar.gz", hash = "sha256:f6f231452c56e8baa662cc352c548158f6a16fcbd6e3800fc594978002b94f3a", size = 98535, upload-time = "2026-01-15T09:08:18.036Z" } +sdist = { url = "https://files.pythonhosted.org/packages/21/dc/603751677fff302f34396e206b610f556a59d7fe58b9a2145f54e96b48e8/myst_parser-5.1.0.tar.gz", hash = "sha256:ab69322dc6719dcc7f296479dbb70181b66df6ed315064f92dbc85c0e1bf2f02", size = 101182, upload-time = "2026-05-13T09:38:19.361Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/d3/ac/686789b9145413f1a61878c407210e41bfdb097976864e0913078b24098c/myst_parser-5.0.0-py3-none-any.whl", hash = "sha256:ab31e516024918296e169139072b81592336f2fef55b8986aa31c9f04b5f7211", size = 84533, upload-time = "2026-01-15T09:08:16.788Z" }, + { url = "https://files.pythonhosted.org/packages/09/dc/f3dfb7488b770f3f67e6545085bf2abea5172e88f57b8ad25ef860ca704c/myst_parser-5.1.0-py3-none-any.whl", hash = "sha256:9c91c52b3cdb4d94a6506e4fab4e2f296c7623a0da0dcbe6de1565c3dad67a8a", size = 85817, upload-time = "2026-05-13T09:38:17.904Z" }, ] [[package]] @@ -758,7 +758,7 @@ wheels = [ [[package]] name = "pydata-sphinx-theme" -version = "0.17.1" +version = "0.18.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "accessible-pygments" }, @@ -770,9 +770,9 @@ dependencies = [ { name = "sphinx", version = "9.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.12'" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ce/f7/c74c7100a7f4c0f77b5dcacb7dfdb8fee774fb70e487dd97acba2b930774/pydata_sphinx_theme-0.17.1.tar.gz", hash = "sha256:2cfc1d926c753c77039b7ee53f0ccebcbee5e81f0db61432b01cbb10ad7fd0af", size = 4991415, upload-time = "2026-04-21T13:00:34.263Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ad/81/b3fdc8b74d0cfed9e623a0fef9932376800da5daa1a85d1224cac4c131a3/pydata_sphinx_theme-0.18.0.tar.gz", hash = "sha256:b4abc95ab02600872e060db07c79e056e87b7ea653ab1ffd0e0b1fa75a3003d4", size = 5004260, upload-time = "2026-05-20T08:32:28.897Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e2/bc/2cb8c78300ce1ace4eeac3b3522218cea2c2053bfa6b4e32cc972a477f9a/pydata_sphinx_theme-0.17.1-py3-none-any.whl", hash = "sha256:320b022d7808bdf5920d9a28e573f27aace9b23e1af6ca103eecc752411df492", size = 6823346, upload-time = "2026-04-21T13:00:31.978Z" }, + { url = "https://files.pythonhosted.org/packages/a9/cd/e0eda602060f9dc99068f8e54490812d9d34ebb134043ff0ae594cf721a4/pydata_sphinx_theme-0.18.0-py3-none-any.whl", hash = "sha256:fbe5401f26642d487e3c5b6dfcbf69b3b1d579e80dcc479a429632abe0a13929", size = 6200747, upload-time = "2026-05-20T08:32:26.646Z" }, ] [[package]]