Skip to content

Data regression in aggregations that spill #20724

@16pierre

Description

@16pierre

Describe the bug

We (CC @rgehan) encountered a rare data regression in production during spillable aggregations, where GROUP BY some_key would return duplicate entries for some_key .

We reproduced in both Datafusion 50 and 52, and we believe we’ve found the root-cause and the fix.

Our understanding of the bug is the following:

when the Final aggregation spills and is done iterating over the Partial aggregation, it switches to a sorted streaming aggregation. This is implemented by resetting its input to point to the sorted spill stream, and by mutating some its internal state, notably
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());

// We can now use `GroupOrdering::Full` since the spill files are sorted
// on the grouping columns.
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());

This ordering then unlocks streaming the output here:

if (self.spill_state.spills.is_empty()
|| self.spill_state.is_stream_merging)
&& let Some(to_emit) = self.group_ordering.emit_to()
{
timer.done();
if let Some(batch) = self.emit(to_emit, false)? {
self.exec_state =
ExecutionState::ProducingOutput(batch);
};

What’s problematic is that at this point, the GroupedHashAggregateStream#group_values state didn't get rebuilt: in our case it’s still a GroupValuesColumn<Streaming = false> , which uses vectorized_intern under the hood and can lead to correctness bugs in the case of streaming aggregations, as documented here:

This bug analysis correlates quite well with the data correctness symptoms we experienced in production, aka, duplicated grouping keys output by the aggregation.

We fixed the correctness problem by resetting group_values to the streaming implementation when group_ordering get mutated, cf this example commit :

self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
self.group_values = new_group_values(group_schema, &self.group_ordering)?;

To Reproduce

We have a reproduction based on production data which isn’t really comfortable to share as is / takes a while to run / is a bit complex.

We’ve tried to simplify the reproduction without much success so far, we’d be happy to get assistance of maintainers to build a synthetic repro, which probably involves hash collisions here.

Expected behavior

No duplicate groups after an aggregation.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions