Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 270 additions & 11 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,22 @@ impl AggregateUDFImpl for ArrayAgg {

#[derive(Debug)]
pub struct ArrayAggAccumulator {
values: Vec<ArrayRef>,
values: VecDeque<ArrayRef>,
datatype: DataType,
ignore_nulls: bool,
/// Number of elements already consumed (retracted) from the front array.
/// Used by sliding window frames to avoid copying on partial retract.
front_offset: usize,
}

impl ArrayAggAccumulator {
/// new array_agg accumulator based on given item data type
pub fn try_new(datatype: &DataType, ignore_nulls: bool) -> Result<Self> {
Ok(Self {
values: vec![],
values: VecDeque::new(),
datatype: datatype.clone(),
ignore_nulls,
front_offset: 0,
})
}

Expand Down Expand Up @@ -356,7 +360,7 @@ impl Accumulator for ArrayAggAccumulator {
};

if !val.is_empty() {
self.values.push(val)
self.values.push_back(val)
}

Ok(())
Expand All @@ -376,12 +380,12 @@ impl Accumulator for ArrayAggAccumulator {
Some(values) => {
// Make sure we don't insert empty lists
if !values.is_empty() {
self.values.push(values);
self.values.push_back(values);
}
}
None => {
for arr in list_arr.iter().flatten() {
self.values.push(arr);
self.values.push_back(arr);
}
}
}
Expand All @@ -394,19 +398,71 @@ impl Accumulator for ArrayAggAccumulator {
}

fn evaluate(&mut self) -> Result<ScalarValue> {
// Transform Vec<ListArr> to ListArr
let element_arrays: Vec<&dyn Array> =
self.values.iter().map(|a| a.as_ref()).collect();
if self.values.is_empty() {
return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
}

let element_arrays: Vec<ArrayRef> = self
.values
.iter()
.enumerate()
.map(|(i, a)| {
if i == 0 && self.front_offset > 0 {
a.slice(self.front_offset, a.len() - self.front_offset)
} else {
Arc::clone(a)
}
})
.collect();

if element_arrays.is_empty() {
let element_refs: Vec<&dyn Array> =
element_arrays.iter().map(|a| a.as_ref()).collect();

if element_refs.iter().all(|a| a.is_empty()) {
return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
}

let concated_array = arrow::compute::concat(&element_arrays)?;
let concated_array = arrow::compute::concat(&element_refs)?;

Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar())
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}

assert_eq_or_internal_err!(values.len(), 1, "expects single batch");

let val = &values[0];
let mut to_retract = if self.ignore_nulls {
val.len() - val.null_count()
} else {
val.len()
};

while to_retract > 0 {
let Some(front) = self.values.front() else {
break;
};
let available = front.len() - self.front_offset;
if to_retract >= available {
self.values.pop_front();
to_retract -= available;
self.front_offset = 0;
} else {
self.front_offset += to_retract;
to_retract = 0;
}
}

Ok(())
}

fn supports_retract_batch(&self) -> bool {
true
}

fn size(&self) -> usize {
size_of_val(self)
+ (size_of::<ArrayRef>() * self.values.capacity())
Expand Down Expand Up @@ -1415,7 +1471,7 @@ mod tests {
acc2.update_batch(&[data(["b", "c", "a"])])?;
acc1 = merge(acc1, acc2)?;

assert_eq!(acc1.size(), 266);
assert_eq!(acc1.size(), 282);

Ok(())
}
Expand Down Expand Up @@ -1935,4 +1991,207 @@ mod tests {

Ok(())
}

// ---- retract_batch tests ----

#[test]
fn retract_basic_sliding_window() -> Result<()> {
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;

// Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D]
// Row 1: frame = [A]
acc.update_batch(&[data(["A"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]);

// Row 2: frame = [A, B]
acc.update_batch(&[data(["B"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);

// Row 3: frame = [B, C] — A leaves
acc.update_batch(&[data(["C"])])?;
acc.retract_batch(&[data(["A"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]);

// Row 4: frame = [C, D] — B leaves
acc.update_batch(&[data(["D"])])?;
acc.retract_batch(&[data(["B"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]);

Ok(())
}

#[test]
fn retract_multi_element_across_arrays() -> Result<()> {
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;

// First batch: 3 elements
acc.update_batch(&[data(["A", "B", "C"])])?;
// Second batch: 1 element
acc.update_batch(&[data(["D"])])?;

assert_eq!(
print_nulls(str_arr(acc.evaluate()?)?),
vec!["A", "B", "C", "D"]
);

// Partial retract from front array: A leaves
acc.retract_batch(&[data(["A"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]);

// Retract spanning two arrays: B, C (rest of first array) + D (second array)
acc.retract_batch(&[data(["B", "C", "D"])])?;
let result = acc.evaluate()?;
assert!(
matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
"expected null list after full retract, got {result:?}"
);

Ok(())
}

#[test]
fn retract_with_nulls_preserved() -> Result<()> {
// ignore_nulls = false: NULLs are stored and counted for retract
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;

acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
assert_eq!(
print_nulls(str_arr(acc.evaluate()?)?),
vec!["A", "NULL", "C"]
);

// Retract 2 elements: A and NULL both leave
acc.retract_batch(&[data([Some("A"), None])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);

Ok(())
}

#[test]
fn retract_with_ignore_nulls() -> Result<()> {
// ignore_nulls = true: NULLs are NOT stored by update_batch,
// so retract must only count non-null values
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;

// update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered)
acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]);

// retract_batch receives the original values including NULL: [A, NULL]
// But only 1 non-null value (A) should be retracted
acc.retract_batch(&[data([Some("A"), None])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);

// retract_batch with [NULL, C] — only C (1 non-null) retracted
acc.retract_batch(&[data([None, Some("C")])])?;
let result = acc.evaluate()?;
assert!(
matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
"expected null list after full retract, got {result:?}"
);

Ok(())
}

#[test]
fn retract_ignore_nulls_all_nulls_batch() -> Result<()> {
// When ignore_nulls = true and retract batch is all NULLs, nothing is retracted
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;

acc.update_batch(&[data([Some("A"), Some("B")])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);

// Retract batch of all NULLs: to_retract = 0, nothing changes
acc.retract_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);

Ok(())
}

#[test]
fn retract_empty_accumulator() -> Result<()> {
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;

// Retract on empty accumulator should be a no-op
acc.retract_batch(&[data(["A"])])?;
let result = acc.evaluate()?;
assert!(
matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
"expected null list for empty accumulator, got {result:?}"
);

Ok(())
}

#[test]
fn retract_front_offset_partial_consume() -> Result<()> {
// Reproduces the RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING scenario:
// ts: 1, 2, 3, 4, 100
//
// Row 1 (ts=1): update [A,B,C] (3 elements, ts in [-1,3])
// Row 2 (ts=2): update [D] (ts=4 enters)
// Row 3 (ts=3): no change (same frame [0..4))
// Row 4 (ts=4): retract [A] (ts=1 leaves, partial consume)
// Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays)
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;

// Row 1: update_batch(["A","B","C"])
acc.update_batch(&[data(["A", "B", "C"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B", "C"]);

// Row 2: update_batch(["D"])
acc.update_batch(&[data(["D"])])?;
assert_eq!(
print_nulls(str_arr(acc.evaluate()?)?),
vec!["A", "B", "C", "D"]
);

// Row 4: retract_batch(["A"]) — partial consume, front_offset = 1
acc.retract_batch(&[data(["A"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]);

// Row 5: update_batch(["E"]), then retract_batch(["B","C","D"])
// retract spans: ["A","B","C"] (offset=1, 2 remaining) + ["D"] (1 element)
acc.update_batch(&[data(["E"])])?;
acc.retract_batch(&[data(["B", "C", "D"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]);

Ok(())
}

#[test]
fn retract_update_after_full_drain() -> Result<()> {
// Verify accumulator works correctly after being fully drained
let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;

acc.update_batch(&[data(["A", "B"])])?;
acc.retract_batch(&[data(["A", "B"])])?;

// Accumulator is empty now
let result = acc.evaluate()?;
assert!(
matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
"expected null list, got {result:?}"
);

// New values should work normally after drain
acc.update_batch(&[data(["X", "Y"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]);

acc.retract_batch(&[data(["X"])])?;
assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]);

Ok(())
}

#[test]
fn retract_supports_retract_batch() -> Result<()> {
let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
assert!(acc.supports_retract_batch());

let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
assert!(acc_ignore.supports_retract_batch());

Ok(())
}
}
Loading
Loading