-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Describe the bug
We (CC @rgehan) encountered a rare data regression in production during spillable aggregations, where GROUP BY some_key would return duplicate entries for some_key .
We reproduced in both Datafusion 50 and 52, and we believe we’ve found the root-cause and the fix.
Our understanding of the bug is the following:
when the Final aggregation spills and is done iterating over the Partial aggregation, it switches to a sorted streaming aggregation. This is implemented by resetting its input to point to the sorted spill stream, and by mutating some its internal state, notably
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs
Lines 1220 to 1222 in 0aab6a3
| // We can now use `GroupOrdering::Full` since the spill files are sorted | |
| // on the grouping columns. | |
| self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new()); |
This ordering then unlocks streaming the output here:
datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs
Lines 754 to 762 in 0aab6a3
| if (self.spill_state.spills.is_empty() | |
| || self.spill_state.is_stream_merging) | |
| && let Some(to_emit) = self.group_ordering.emit_to() | |
| { | |
| timer.done(); | |
| if let Some(batch) = self.emit(to_emit, false)? { | |
| self.exec_state = | |
| ExecutionState::ProducingOutput(batch); | |
| }; |
What’s problematic is that at this point, the GroupedHashAggregateStream#group_values state didn't get rebuilt: in our case it’s still a GroupValuesColumn<Streaming = false> , which uses vectorized_intern under the hood and can lead to correctness bugs in the case of streaming aggregations, as documented here:
datafusion/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
Line 321 in 65a6bc4
| fn scalarized_intern( |
This bug analysis correlates quite well with the data correctness symptoms we experienced in production, aka, duplicated grouping keys output by the aggregation.
We fixed the correctness problem by resetting group_values to the streaming implementation when group_ordering get mutated, cf this example commit :
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
To Reproduce
We have a reproduction based on production data which isn’t really comfortable to share as is / takes a while to run / is a bit complex.
We’ve tried to simplify the reproduction without much success so far, we’d be happy to get assistance of maintainers to build a synthetic repro, which probably involves hash collisions here.
Expected behavior
No duplicate groups after an aggregation.
Additional context
No response