Skip to content
Open
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 34 additions & 25 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,12 @@ fn timestamp_scale<T: ArrowTimestampType>() -> i64 {
}

// Scale to nanoseconds and report overflow as a normal error.
fn checked_scale_to_nanos(x: i64, scale: i64) -> Result<i64> {
match x.checked_mul(scale) {
fn checked_scale_to_nanos(value: i64, scale: i64) -> Result<i64> {
match value.checked_mul(scale) {
Some(scaled) => Ok(scaled),
None => exec_err!("date_bin timestamp value {x} * scale {scale} overflows i64"),
None => {
exec_err!("date_bin timestamp value {value} * scale {scale} overflows i64")
}
}
}

Expand Down Expand Up @@ -570,7 +572,7 @@ fn date_bin_impl(
) -> Option<i64> {
let scale = timestamp_scale::<T>();
value
.and_then(|val| val.checked_mul(scale))
.and_then(|value| checked_scale_to_nanos(value, scale).ok())
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| binned / scale)
}
Expand Down Expand Up @@ -613,7 +615,9 @@ fn date_bin_impl(
return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
}
let result = v
.and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI))
.and_then(|value| {
checked_scale_to_nanos(value as i64, NANOS_PER_MILLI).ok()
})
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32);
ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result))
Expand All @@ -623,7 +627,9 @@ fn date_bin_impl(
return exec_err!("DATE_BIN with Time32 source requires Time32 origin");
}
let result = v
.and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC))
.and_then(|value| {
checked_scale_to_nanos(value as i64, NANOS_PER_SEC).ok()
})
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32);
ColumnarValue::Scalar(ScalarValue::Time32Second(result))
Expand All @@ -644,7 +650,7 @@ fn date_bin_impl(
return exec_err!("DATE_BIN with Time64 source requires Time64 origin");
}
let result = v
.and_then(|x| x.checked_mul(NANOS_PER_MICRO))
.and_then(|value| checked_scale_to_nanos(value, NANOS_PER_MICRO).ok())
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO);
ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result))
Expand All @@ -664,8 +670,9 @@ fn date_bin_impl(
let scale = timestamp_scale::<T>();

// Per-row errors become NULL, matching scalar behavior.
let result: PrimitiveArray<T> = array.unary_opt(|val| {
val.checked_mul(scale)
let result: PrimitiveArray<T> = array.unary_opt(|value| {
checked_scale_to_nanos(value, scale)
.ok()
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| binned / scale)
});
Expand Down Expand Up @@ -703,9 +710,9 @@ fn date_bin_impl(
}
let array = array.as_primitive::<Time32MillisecondType>();
let result: PrimitiveArray<Time32MillisecondType> =
array.unary_opt(|x| {
(x as i64)
.checked_mul(NANOS_PER_MILLI)
array.unary_opt(|value| {
checked_scale_to_nanos(value as i64, NANOS_PER_MILLI)
.ok()
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| {
((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI)
Expand All @@ -721,14 +728,15 @@ fn date_bin_impl(
);
}
let array = array.as_primitive::<Time32SecondType>();
let result: PrimitiveArray<Time32SecondType> = array.unary_opt(|x| {
(x as i64)
.checked_mul(NANOS_PER_SEC)
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| {
((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32
})
});
let result: PrimitiveArray<Time32SecondType> =
array.unary_opt(|value| {
checked_scale_to_nanos(value as i64, NANOS_PER_SEC)
.ok()
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| {
((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32
})
});
ColumnarValue::Array(Arc::new(result))
}
Time64(Microsecond) => {
Expand All @@ -739,8 +747,9 @@ fn date_bin_impl(
}
let array = array.as_primitive::<Time64MicrosecondType>();
let result: PrimitiveArray<Time64MicrosecondType> =
array.unary_opt(|x| {
x.checked_mul(NANOS_PER_MICRO)
array.unary_opt(|value| {
checked_scale_to_nanos(value, NANOS_PER_MICRO)
.ok()
.and_then(|scaled| stride_fn(stride, scaled, origin).ok())
.map(|binned| {
(binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO
Expand All @@ -756,11 +765,11 @@ fn date_bin_impl(
}
let array = array.as_primitive::<Time64NanosecondType>();
let result: PrimitiveArray<Time64NanosecondType> =
array.unary_opt(|x| {
array.try_unary(|x| {
stride_fn(stride, x, origin)
.map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY))
.ok()
});
.map_err(|e| ArrowError::ComputeError(e.to_string()))
})?;
ColumnarValue::Array(Arc::new(result))
}
_ => {
Expand Down
Loading