From 9e20bb10d6a1df5ac6a402f3be12e21468c67ce5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 17:22:17 +0800 Subject: [PATCH 01/14] feat(datetime): add overflow reproducer for Time64Microsecond and Time64MicrosecondArray - Introduced scalar Time64Microsecond(i64::MAX) overflow reproducer. - Introduced array Time64MicrosecondArray(i64::MAX) overflow reproducer. - Updated tests to catch current panic using catch_unwind. --- datafusion/functions/src/datetime/date_bin.rs | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 38b491e42bcbd..d34ec96542095 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -886,7 +886,9 @@ mod tests { use crate::datetime::date_bin::{DateBinFunc, date_bin_nanos_interval}; use arrow::array::types::TimestampNanosecondType; - use arrow::array::{Array, IntervalDayTimeArray, TimestampNanosecondArray}; + use arrow::array::{ + Array, IntervalDayTimeArray, Time64MicrosecondArray, TimestampNanosecondArray, + }; use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; @@ -1124,6 +1126,55 @@ mod tests { ); } + #[test] + fn test_date_bin_time64_microsecond_scalar_scaling_overflow_reproducer() { + let return_field = &Arc::new(Field::new( + "f", + DataType::Time64(TimeUnit::Microsecond), + true, + )); + + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, 0, 1)), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), + ]; + + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + invoke_date_bin_with_args(args, 1, return_field) + })); + + assert!( + result.is_err(), + "DATE_BIN Time64Microsecond scaling overflow should not complete successfully" + ); + } + + #[test] + fn test_date_bin_time64_microsecond_array_scaling_overflow_reproducer() { + let return_field = &Arc::new(Field::new( + "f", + DataType::Time64(TimeUnit::Microsecond), + true, + )); + let times = Arc::new(Time64MicrosecondArray::from(vec![Some(i64::MAX)])); + + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, 0, 1)), + ColumnarValue::Array(times), + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), + ]; + + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + invoke_date_bin_with_args(args, 1, return_field) + })); + + assert!( + result.is_err(), + "DATE_BIN Time64Microsecond array scaling overflow should not complete successfully" + ); + } + #[test] fn test_date_bin_timezones() { let cases = [ From 3e365dcd9991dc5224c5f534a55783e38ebe8c3f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 17:24:32 +0800 Subject: [PATCH 02/14] feat(date_bin): improve datetime functions with value_to_nanos and optimize timestamp handling - Added `value_to_nanos(value, scale)` function. - Refactored to eliminate repeated `timestamp scalar checked_mul` blocks. - Implemented helper for timestamp array scaling. - Left TIME direct multiplies for SUB_ISSUE_03. --- datafusion/functions/src/datetime/date_bin.rs | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index d34ec96542095..d1945566a5e45 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -612,16 +612,20 @@ fn date_bin_impl( )) } + fn value_to_nanos(value: i64, scale: i64) -> Result { + value + .checked_mul(scale) + .ok_or_else(|| timestamp_scale_overflow_error(value)) + } + Ok(match array { ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { let scale = timestamp_scale::(); ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( match *v { Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { + let nanos = value_to_nanos(val, scale)?; + match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, } @@ -636,10 +640,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( match *v { Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { + let nanos = value_to_nanos(val, scale)?; + match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, } @@ -654,10 +656,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( match *v { Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { + let nanos = value_to_nanos(val, scale)?; + match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, } @@ -672,10 +672,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampSecond( match *v { Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - match stride_fn(stride, scaled, origin) { + let nanos = value_to_nanos(val, scale)?; + match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, } @@ -757,10 +755,8 @@ fn date_bin_impl( .iter() .map(|val| match val { Some(val) => { - let scaled = val - .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(val))?; - Ok(stride_fn(stride, scaled, origin) + let nanos = value_to_nanos(val, scale)?; + Ok(stride_fn(stride, nanos, origin) .ok() .map(|binned| binned / scale)) } From 60caf6332302aa9af5c68f9bc54dc637aaf7d80b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 17:28:31 +0800 Subject: [PATCH 03/14] feat: update TIME scaling to use value_to_nanos in date_bin.rs - Implemented value_to_nanos for TIME origin scaling. - Updated TIME scalar source scaling to use value_to_nanos. - Modified TIME array source scaling to include value_to_nanos and ArrowError::ComputeError mapping. - Revised overflow repro tests to ensure no panic occurs and handle normal errors appropriately. --- datafusion/functions/src/datetime/date_bin.rs | 89 ++++++++++++------- 1 file changed, 57 insertions(+), 32 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index d1945566a5e45..fb1af035c137e 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -515,7 +515,7 @@ fn date_bin_impl( } } - (*v as i64 * NANOS_PER_MILLI, true) + (value_to_nanos(*v as i64, NANOS_PER_MILLI)?, true) } ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => { match stride { @@ -535,7 +535,7 @@ fn date_bin_impl( } } - (*v as i64 * NANOS_PER_SEC, true) + (value_to_nanos(*v as i64, NANOS_PER_SEC)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => { match stride { @@ -555,7 +555,7 @@ fn date_bin_impl( } } - (*v * NANOS_PER_MICRO, true) + (value_to_nanos(*v, NANOS_PER_MICRO)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => { match stride { @@ -687,30 +687,38 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } - let result = v.and_then(|x| { - match stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some((nanos / NANOS_PER_MILLI) as i32) + let result = match v { + Some(x) => { + let nanos = value_to_nanos(*x as i64, NANOS_PER_MILLI)?; + match stride_fn(stride, nanos, origin) { + Ok(binned_nanos) => { + let nanos = binned_nanos % (NANOSECONDS_IN_DAY); + Some((nanos / NANOS_PER_MILLI) as i32) + } + Err(_) => None, } - Err(_) => None, } - }); + None => None, + }; ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } ColumnarValue::Scalar(ScalarValue::Time32Second(v)) => { if !is_time { return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } - let result = v.and_then(|x| { - match stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some((nanos / NANOS_PER_SEC) as i32) + let result = match v { + Some(x) => { + let nanos = value_to_nanos(*x as i64, NANOS_PER_SEC)?; + match stride_fn(stride, nanos, origin) { + Ok(binned_nanos) => { + let nanos = binned_nanos % (NANOSECONDS_IN_DAY); + Some((nanos / NANOS_PER_SEC) as i32) + } + Err(_) => None, } - Err(_) => None, } - }); + None => None, + }; ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(v)) => { @@ -727,14 +735,19 @@ fn date_bin_impl( if !is_time { return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } - let result = - v.and_then(|x| match stride_fn(stride, x * NANOS_PER_MICRO, origin) { - Ok(binned_nanos) => { - let nanos = binned_nanos % (NANOSECONDS_IN_DAY); - Some(nanos / NANOS_PER_MICRO) + let result = match v { + Some(x) => { + let nanos = value_to_nanos(*x, NANOS_PER_MICRO)?; + match stride_fn(stride, nanos, origin) { + Ok(binned_nanos) => { + let nanos = binned_nanos % (NANOSECONDS_IN_DAY); + Some(nanos / NANOS_PER_MICRO) + } + Err(_) => None, } - Err(_) => None, - }); + } + None => None, + }; ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } ColumnarValue::Array(array) => { @@ -800,7 +813,9 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - stride_fn(stride, x as i64 * NANOS_PER_MILLI, origin) + let nanos = value_to_nanos(x as i64, NANOS_PER_MILLI) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); (nanos / NANOS_PER_MILLI) as i32 @@ -818,7 +833,9 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - stride_fn(stride, x as i64 * NANOS_PER_SEC, origin) + let nanos = value_to_nanos(x as i64, NANOS_PER_SEC) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); (nanos / NANOS_PER_SEC) as i32 @@ -836,7 +853,9 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - stride_fn(stride, x * NANOS_PER_MICRO, origin) + let nanos = value_to_nanos(x, NANOS_PER_MICRO) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); nanos / NANOS_PER_MICRO @@ -1140,9 +1159,10 @@ mod tests { invoke_date_bin_with_args(args, 1, return_field) })); - assert!( - result.is_err(), - "DATE_BIN Time64Microsecond scaling overflow should not complete successfully" + let result = result.expect("DATE_BIN Time64Microsecond scaling should not panic"); + assert_eq!( + result.err().unwrap().strip_backtrace(), + "Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds" ); } @@ -1165,9 +1185,14 @@ mod tests { invoke_date_bin_with_args(args, 1, return_field) })); + let result = + result.expect("DATE_BIN Time64Microsecond array scaling should not panic"); assert!( - result.is_err(), - "DATE_BIN Time64Microsecond array scaling overflow should not complete successfully" + result + .err() + .unwrap() + .strip_backtrace() + .contains("DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds") ); } From 25394b75c54c704b3460f7a181a3f338682b9ec0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 18:42:24 +0800 Subject: [PATCH 04/14] feat(datetime): rename overflow helper and update error handling in date_bin - Renamed the overflow helper from `timestamp_scale_overflow_error` to `nanos_scale_overflow_error`. - Updated error message to be more generic: "DATE_BIN value ... cannot be represented in nanoseconds". - Added a new test helper: `invoke_time64_microsecond_date_bin(...)`. - Simplified scalar and array overflow tests by using the new helper. --- datafusion/functions/src/datetime/date_bin.rs | 75 ++++++++----------- 1 file changed, 32 insertions(+), 43 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index fb1af035c137e..ce461951b088f 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -606,16 +606,16 @@ fn date_bin_impl( } } - fn timestamp_scale_overflow_error(x: i64) -> DataFusionError { + fn nanos_scale_overflow_error(x: i64) -> DataFusionError { DataFusionError::Execution(format!( - "DATE_BIN source timestamp {x} cannot be represented in nanoseconds" + "DATE_BIN value {x} cannot be represented in nanoseconds" )) } fn value_to_nanos(value: i64, scale: i64) -> Result { value .checked_mul(scale) - .ok_or_else(|| timestamp_scale_overflow_error(value)) + .ok_or_else(|| nanos_scale_overflow_error(value)) } Ok(match array { @@ -934,6 +934,27 @@ mod tests { DateBinFunc::new().invoke_with_args(args) } + fn invoke_time64_microsecond_date_bin( + source: ColumnarValue, + number_rows: usize, + ) -> Result { + let return_field = Arc::new(Field::new( + "f", + DataType::Time64(TimeUnit::Microsecond), + true, + )); + let args = vec![ + ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, 0, 1)), + source, + ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), + ]; + + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + invoke_date_bin_with_args(args, number_rows, &return_field) + })) + .expect("DATE_BIN Time64Microsecond scaling should not panic") + } + #[test] fn test_date_bin() { let return_field = &Arc::new(Field::new( @@ -1143,57 +1164,25 @@ mod tests { #[test] fn test_date_bin_time64_microsecond_scalar_scaling_overflow_reproducer() { - let return_field = &Arc::new(Field::new( - "f", - DataType::Time64(TimeUnit::Microsecond), - true, - )); - - let args = vec![ - ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, 0, 1)), + let result = invoke_time64_microsecond_date_bin( ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(i64::MAX))), - ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), - ]; - - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - invoke_date_bin_with_args(args, 1, return_field) - })); + 1, + ); - let result = result.expect("DATE_BIN Time64Microsecond scaling should not panic"); assert_eq!( result.err().unwrap().strip_backtrace(), - "Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds" + "Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds" ); } #[test] fn test_date_bin_time64_microsecond_array_scaling_overflow_reproducer() { - let return_field = &Arc::new(Field::new( - "f", - DataType::Time64(TimeUnit::Microsecond), - true, - )); let times = Arc::new(Time64MicrosecondArray::from(vec![Some(i64::MAX)])); + let result = invoke_time64_microsecond_date_bin(ColumnarValue::Array(times), 1); - let args = vec![ - ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, 0, 1)), - ColumnarValue::Array(times), - ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(0))), - ]; - - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - invoke_date_bin_with_args(args, 1, return_field) - })); - - let result = - result.expect("DATE_BIN Time64Microsecond array scaling should not panic"); - assert!( - result - .err() - .unwrap() - .strip_backtrace() - .contains("DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds") - ); + assert!(result.err().unwrap().strip_backtrace().contains( + "DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds" + )); } #[test] From 8feac1b7c9834e664952f54dfb8861427bd87d45 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 22:00:26 +0800 Subject: [PATCH 05/14] fix: update expected timestamp overflow error in date_bin_errors.slt to reflect new representation limit --- datafusion/sqllogictest/test_files/date_bin_errors.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/date_bin_errors.slt b/datafusion/sqllogictest/test_files/date_bin_errors.slt index 20408c84ef79a..7880f220a2d3e 100644 --- a/datafusion/sqllogictest/test_files/date_bin_errors.slt +++ b/datafusion/sqllogictest/test_files/date_bin_errors.slt @@ -80,7 +80,7 @@ select date_bin( NULL # Source timestamp scaling to nanoseconds overflows: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +query error DataFusion error: Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds select date_bin( interval '1 nanosecond', arrow_cast(9223372036854775807, 'Timestamp(Second, None)'), @@ -88,7 +88,7 @@ select date_bin( ); # Source timestamp scaling to nanoseconds overflows in array path: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +query error DataFusion error: Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds select date_bin( interval '1 nanosecond', ts, From 37f6b0d68c8b4656673ff803e317d1c30bbfb694 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 21:53:02 +0800 Subject: [PATCH 06/14] feat(datetime): restore timestamp overflow text and improve error handling - Restored timestamp overflow message for DATE_BIN source timestamp. - Retained generic TIME/value overflow message for DATE_BIN value. - Updated value_to_nanos to take an error constructor. - Revised timestamp paths to utilize timestamp_scale_overflow_error. - Updated TIME paths to use nanos_scale_overflow_error. --- datafusion/functions/src/datetime/date_bin.rs | 86 +++++++++++++++---- 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index ce461951b088f..b76a0ba74e874 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -515,7 +515,10 @@ fn date_bin_impl( } } - (value_to_nanos(*v as i64, NANOS_PER_MILLI)?, true) + ( + value_to_nanos(*v as i64, NANOS_PER_MILLI, nanos_scale_overflow_error)?, + true, + ) } ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => { match stride { @@ -535,7 +538,10 @@ fn date_bin_impl( } } - (value_to_nanos(*v as i64, NANOS_PER_SEC)?, true) + ( + value_to_nanos(*v as i64, NANOS_PER_SEC, nanos_scale_overflow_error)?, + true, + ) } ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => { match stride { @@ -555,7 +561,10 @@ fn date_bin_impl( } } - (value_to_nanos(*v, NANOS_PER_MICRO)?, true) + ( + value_to_nanos(*v, NANOS_PER_MICRO, nanos_scale_overflow_error)?, + true, + ) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => { match stride { @@ -606,16 +615,26 @@ fn date_bin_impl( } } + fn timestamp_scale_overflow_error(x: i64) -> DataFusionError { + DataFusionError::Execution(format!( + "DATE_BIN source timestamp {x} cannot be represented in nanoseconds" + )) + } + fn nanos_scale_overflow_error(x: i64) -> DataFusionError { DataFusionError::Execution(format!( "DATE_BIN value {x} cannot be represented in nanoseconds" )) } - fn value_to_nanos(value: i64, scale: i64) -> Result { + fn value_to_nanos( + value: i64, + scale: i64, + overflow_error: impl FnOnce(i64) -> DataFusionError, + ) -> Result { value .checked_mul(scale) - .ok_or_else(|| nanos_scale_overflow_error(value)) + .ok_or_else(|| overflow_error(value)) } Ok(match array { @@ -624,7 +643,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( match *v { Some(val) => { - let nanos = value_to_nanos(val, scale)?; + let nanos = + value_to_nanos(val, scale, timestamp_scale_overflow_error)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -640,7 +660,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( match *v { Some(val) => { - let nanos = value_to_nanos(val, scale)?; + let nanos = + value_to_nanos(val, scale, timestamp_scale_overflow_error)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -656,7 +677,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( match *v { Some(val) => { - let nanos = value_to_nanos(val, scale)?; + let nanos = + value_to_nanos(val, scale, timestamp_scale_overflow_error)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -672,7 +694,8 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampSecond( match *v { Some(val) => { - let nanos = value_to_nanos(val, scale)?; + let nanos = + value_to_nanos(val, scale, timestamp_scale_overflow_error)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -689,7 +712,11 @@ fn date_bin_impl( } let result = match v { Some(x) => { - let nanos = value_to_nanos(*x as i64, NANOS_PER_MILLI)?; + let nanos = value_to_nanos( + *x as i64, + NANOS_PER_MILLI, + nanos_scale_overflow_error, + )?; match stride_fn(stride, nanos, origin) { Ok(binned_nanos) => { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -708,7 +735,11 @@ fn date_bin_impl( } let result = match v { Some(x) => { - let nanos = value_to_nanos(*x as i64, NANOS_PER_SEC)?; + let nanos = value_to_nanos( + *x as i64, + NANOS_PER_SEC, + nanos_scale_overflow_error, + )?; match stride_fn(stride, nanos, origin) { Ok(binned_nanos) => { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -737,7 +768,8 @@ fn date_bin_impl( } let result = match v { Some(x) => { - let nanos = value_to_nanos(*x, NANOS_PER_MICRO)?; + let nanos = + value_to_nanos(*x, NANOS_PER_MICRO, nanos_scale_overflow_error)?; match stride_fn(stride, nanos, origin) { Ok(binned_nanos) => { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -768,7 +800,11 @@ fn date_bin_impl( .iter() .map(|val| match val { Some(val) => { - let nanos = value_to_nanos(val, scale)?; + let nanos = value_to_nanos( + val, + scale, + timestamp_scale_overflow_error, + )?; Ok(stride_fn(stride, nanos, origin) .ok() .map(|binned| binned / scale)) @@ -813,8 +849,12 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - let nanos = value_to_nanos(x as i64, NANOS_PER_MILLI) - .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + let nanos = value_to_nanos( + x as i64, + NANOS_PER_MILLI, + nanos_scale_overflow_error, + ) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -833,8 +873,12 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - let nanos = value_to_nanos(x as i64, NANOS_PER_SEC) - .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + let nanos = value_to_nanos( + x as i64, + NANOS_PER_SEC, + nanos_scale_overflow_error, + ) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -853,8 +897,12 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - let nanos = value_to_nanos(x, NANOS_PER_MICRO) - .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + let nanos = value_to_nanos( + x, + NANOS_PER_MICRO, + nanos_scale_overflow_error, + ) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); From 2921e868e27f7d0a611e0ec4a525a2ffb220d87e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 22:24:13 +0800 Subject: [PATCH 07/14] fix(datetime): clarify error messages for DATE_BIN source timestamp overflow --- datafusion/sqllogictest/test_files/date_bin_errors.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/date_bin_errors.slt b/datafusion/sqllogictest/test_files/date_bin_errors.slt index 7880f220a2d3e..20408c84ef79a 100644 --- a/datafusion/sqllogictest/test_files/date_bin_errors.slt +++ b/datafusion/sqllogictest/test_files/date_bin_errors.slt @@ -80,7 +80,7 @@ select date_bin( NULL # Source timestamp scaling to nanoseconds overflows: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds +query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds select date_bin( interval '1 nanosecond', arrow_cast(9223372036854775807, 'Timestamp(Second, None)'), @@ -88,7 +88,7 @@ select date_bin( ); # Source timestamp scaling to nanoseconds overflows in array path: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds +query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds select date_bin( interval '1 nanosecond', ts, From a991f68c0af8cf8b95ef00784de7c3e74f861397 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 8 Jun 2026 22:43:19 +0800 Subject: [PATCH 08/14] feat(date_bin): refactor date_bin.rs and date_bin_errors.slt - Removed timestamp_scale_overflow_error from date_bin.rs - Updated value_to_nanos function to only use nanos_scale_overflow_error for scaling - Modified expected DATE_BIN value in date_bin_errors.slt --- datafusion/functions/src/datetime/date_bin.rs | 86 ++++--------------- .../test_files/date_bin_errors.slt | 4 +- 2 files changed, 21 insertions(+), 69 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index b76a0ba74e874..ce461951b088f 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -515,10 +515,7 @@ fn date_bin_impl( } } - ( - value_to_nanos(*v as i64, NANOS_PER_MILLI, nanos_scale_overflow_error)?, - true, - ) + (value_to_nanos(*v as i64, NANOS_PER_MILLI)?, true) } ColumnarValue::Scalar(ScalarValue::Time32Second(Some(v))) => { match stride { @@ -538,10 +535,7 @@ fn date_bin_impl( } } - ( - value_to_nanos(*v as i64, NANOS_PER_SEC, nanos_scale_overflow_error)?, - true, - ) + (value_to_nanos(*v as i64, NANOS_PER_SEC)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Microsecond(Some(v))) => { match stride { @@ -561,10 +555,7 @@ fn date_bin_impl( } } - ( - value_to_nanos(*v, NANOS_PER_MICRO, nanos_scale_overflow_error)?, - true, - ) + (value_to_nanos(*v, NANOS_PER_MICRO)?, true) } ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(Some(v))) => { match stride { @@ -615,26 +606,16 @@ fn date_bin_impl( } } - fn timestamp_scale_overflow_error(x: i64) -> DataFusionError { - DataFusionError::Execution(format!( - "DATE_BIN source timestamp {x} cannot be represented in nanoseconds" - )) - } - fn nanos_scale_overflow_error(x: i64) -> DataFusionError { DataFusionError::Execution(format!( "DATE_BIN value {x} cannot be represented in nanoseconds" )) } - fn value_to_nanos( - value: i64, - scale: i64, - overflow_error: impl FnOnce(i64) -> DataFusionError, - ) -> Result { + fn value_to_nanos(value: i64, scale: i64) -> Result { value .checked_mul(scale) - .ok_or_else(|| overflow_error(value)) + .ok_or_else(|| nanos_scale_overflow_error(value)) } Ok(match array { @@ -643,8 +624,7 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( match *v { Some(val) => { - let nanos = - value_to_nanos(val, scale, timestamp_scale_overflow_error)?; + let nanos = value_to_nanos(val, scale)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -660,8 +640,7 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( match *v { Some(val) => { - let nanos = - value_to_nanos(val, scale, timestamp_scale_overflow_error)?; + let nanos = value_to_nanos(val, scale)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -677,8 +656,7 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( match *v { Some(val) => { - let nanos = - value_to_nanos(val, scale, timestamp_scale_overflow_error)?; + let nanos = value_to_nanos(val, scale)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -694,8 +672,7 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampSecond( match *v { Some(val) => { - let nanos = - value_to_nanos(val, scale, timestamp_scale_overflow_error)?; + let nanos = value_to_nanos(val, scale)?; match stride_fn(stride, nanos, origin) { Ok(result) => Some(result / scale), Err(_) => None, @@ -712,11 +689,7 @@ fn date_bin_impl( } let result = match v { Some(x) => { - let nanos = value_to_nanos( - *x as i64, - NANOS_PER_MILLI, - nanos_scale_overflow_error, - )?; + let nanos = value_to_nanos(*x as i64, NANOS_PER_MILLI)?; match stride_fn(stride, nanos, origin) { Ok(binned_nanos) => { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -735,11 +708,7 @@ fn date_bin_impl( } let result = match v { Some(x) => { - let nanos = value_to_nanos( - *x as i64, - NANOS_PER_SEC, - nanos_scale_overflow_error, - )?; + let nanos = value_to_nanos(*x as i64, NANOS_PER_SEC)?; match stride_fn(stride, nanos, origin) { Ok(binned_nanos) => { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -768,8 +737,7 @@ fn date_bin_impl( } let result = match v { Some(x) => { - let nanos = - value_to_nanos(*x, NANOS_PER_MICRO, nanos_scale_overflow_error)?; + let nanos = value_to_nanos(*x, NANOS_PER_MICRO)?; match stride_fn(stride, nanos, origin) { Ok(binned_nanos) => { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -800,11 +768,7 @@ fn date_bin_impl( .iter() .map(|val| match val { Some(val) => { - let nanos = value_to_nanos( - val, - scale, - timestamp_scale_overflow_error, - )?; + let nanos = value_to_nanos(val, scale)?; Ok(stride_fn(stride, nanos, origin) .ok() .map(|binned| binned / scale)) @@ -849,12 +813,8 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - let nanos = value_to_nanos( - x as i64, - NANOS_PER_MILLI, - nanos_scale_overflow_error, - ) - .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + let nanos = value_to_nanos(x as i64, NANOS_PER_MILLI) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -873,12 +833,8 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - let nanos = value_to_nanos( - x as i64, - NANOS_PER_SEC, - nanos_scale_overflow_error, - ) - .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + let nanos = value_to_nanos(x as i64, NANOS_PER_SEC) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); @@ -897,12 +853,8 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.try_unary(|x| { - let nanos = value_to_nanos( - x, - NANOS_PER_MICRO, - nanos_scale_overflow_error, - ) - .map_err(|e| ArrowError::ComputeError(e.to_string()))?; + let nanos = value_to_nanos(x, NANOS_PER_MICRO) + .map_err(|e| ArrowError::ComputeError(e.to_string()))?; stride_fn(stride, nanos, origin) .map(|binned_nanos| { let nanos = binned_nanos % (NANOSECONDS_IN_DAY); diff --git a/datafusion/sqllogictest/test_files/date_bin_errors.slt b/datafusion/sqllogictest/test_files/date_bin_errors.slt index 20408c84ef79a..7880f220a2d3e 100644 --- a/datafusion/sqllogictest/test_files/date_bin_errors.slt +++ b/datafusion/sqllogictest/test_files/date_bin_errors.slt @@ -80,7 +80,7 @@ select date_bin( NULL # Source timestamp scaling to nanoseconds overflows: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +query error DataFusion error: Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds select date_bin( interval '1 nanosecond', arrow_cast(9223372036854775807, 'Timestamp(Second, None)'), @@ -88,7 +88,7 @@ select date_bin( ); # Source timestamp scaling to nanoseconds overflows in array path: should return an error, not panic -query error DataFusion error: Execution error: DATE_BIN source timestamp 9223372036854775807 cannot be represented in nanoseconds +query error DataFusion error: Execution error: DATE_BIN value 9223372036854775807 cannot be represented in nanoseconds select date_bin( interval '1 nanosecond', ts, From a0d66842ea7de61f5588708d076437518844fcd0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 17 Jun 2026 23:07:52 +0800 Subject: [PATCH 09/14] fix(datetime): replace integer multiplication with checked_scale_to_nanos for safety This commit updates the date_bin_impl function to use `checked_scale_to_nanos` instead of direct integer multiplication for scaling timestamps. This change ensures safer handling of potential overflow scenarios while maintaining the intended functionality for date and time operations. --- datafusion/functions/src/datetime/date_bin.rs | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 06ffd8ba5b3c6..8a1d837df5761 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -570,7 +570,7 @@ fn date_bin_impl( ) -> Option { let scale = timestamp_scale::(); value - .and_then(|val| val.checked_mul(scale)) + .and_then(|val| checked_scale_to_nanos(val, scale).ok()) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| binned / scale) } @@ -613,7 +613,7 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v - .and_then(|x| (x as i64).checked_mul(NANOS_PER_MILLI)) + .and_then(|x| checked_scale_to_nanos(x as i64, NANOS_PER_MILLI).ok()) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) @@ -623,7 +623,7 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v - .and_then(|x| (x as i64).checked_mul(NANOS_PER_SEC)) + .and_then(|x| checked_scale_to_nanos(x as i64, NANOS_PER_SEC).ok()) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) @@ -644,7 +644,7 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } let result = v - .and_then(|x| x.checked_mul(NANOS_PER_MICRO)) + .and_then(|x| checked_scale_to_nanos(x, NANOS_PER_MICRO).ok()) .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) @@ -665,7 +665,8 @@ fn date_bin_impl( // Per-row errors become NULL, matching scalar behavior. let result: PrimitiveArray = array.unary_opt(|val| { - val.checked_mul(scale) + checked_scale_to_nanos(val, scale) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| binned / scale) }); @@ -704,8 +705,8 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - (x as i64) - .checked_mul(NANOS_PER_MILLI) + checked_scale_to_nanos(x as i64, NANOS_PER_MILLI) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) @@ -722,8 +723,8 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - (x as i64) - .checked_mul(NANOS_PER_SEC) + checked_scale_to_nanos(x as i64, NANOS_PER_SEC) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 @@ -740,7 +741,8 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|x| { - x.checked_mul(NANOS_PER_MICRO) + checked_scale_to_nanos(x, NANOS_PER_MICRO) + .ok() .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| { (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO From 2d6931ea8779fb62648f3da7a91c93fbfa77e194 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 17 Jun 2026 23:15:01 +0800 Subject: [PATCH 10/14] feat(datetime): rename helper argument and enhance scaling and binning functions - Renamed helper argument `x` to `value` for clarity - Added `checked_scale_and_bin_to_nanos` function - Unified source scaling and binning paths to share the helper flow - Ensured no direct calls to `checked_mul(scale)` remain outside the helper --- datafusion/functions/src/datetime/date_bin.rs | 114 ++++++++++++------ 1 file changed, 79 insertions(+), 35 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 8a1d837df5761..8e66b6a94cd73 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -437,13 +437,25 @@ fn timestamp_scale() -> i64 { } // Scale to nanoseconds and report overflow as a normal error. -fn checked_scale_to_nanos(x: i64, scale: i64) -> Result { - match x.checked_mul(scale) { +fn checked_scale_to_nanos(value: i64, scale: i64) -> Result { + match value.checked_mul(scale) { Some(scaled) => Ok(scaled), - None => exec_err!("date_bin timestamp value {x} * scale {scale} overflows i64"), + None => exec_err!("date_bin timestamp value {value} * scale {scale} overflows i64"), } } +fn checked_scale_and_bin_to_nanos( + value: i64, + scale: i64, + stride: i64, + stride_fn: BinFunction, + origin: i64, +) -> Option { + checked_scale_to_nanos(value, scale) + .ok() + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) +} + fn validate_time_stride(stride: &Interval) -> Result<()> { match stride { Interval::Months(m) if *m > 0 => { @@ -570,8 +582,9 @@ fn date_bin_impl( ) -> Option { let scale = timestamp_scale::(); value - .and_then(|val| checked_scale_to_nanos(val, scale).ok()) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .and_then(|value| { + checked_scale_and_bin_to_nanos(value, scale, stride, stride_fn, origin) + }) .map(|binned| binned / scale) } @@ -613,8 +626,15 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v - .and_then(|x| checked_scale_to_nanos(x as i64, NANOS_PER_MILLI).ok()) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .and_then(|value| { + checked_scale_and_bin_to_nanos( + value as i64, + NANOS_PER_MILLI, + stride, + stride_fn, + origin, + ) + }) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } @@ -623,8 +643,15 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time32 source requires Time32 origin"); } let result = v - .and_then(|x| checked_scale_to_nanos(x as i64, NANOS_PER_SEC).ok()) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .and_then(|value| { + checked_scale_and_bin_to_nanos( + value as i64, + NANOS_PER_SEC, + stride, + stride_fn, + origin, + ) + }) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } @@ -644,8 +671,15 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } let result = v - .and_then(|x| checked_scale_to_nanos(x, NANOS_PER_MICRO).ok()) - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .and_then(|value| { + checked_scale_and_bin_to_nanos( + value, + NANOS_PER_MICRO, + stride, + stride_fn, + origin, + ) + }) .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } @@ -664,10 +698,8 @@ fn date_bin_impl( let scale = timestamp_scale::(); // Per-row errors become NULL, matching scalar behavior. - let result: PrimitiveArray = array.unary_opt(|val| { - checked_scale_to_nanos(val, scale) - .ok() - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + let result: PrimitiveArray = array.unary_opt(|value| { + checked_scale_and_bin_to_nanos(value, scale, stride, stride_fn, origin) .map(|binned| binned / scale) }); @@ -704,14 +736,17 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.unary_opt(|x| { - checked_scale_to_nanos(x as i64, NANOS_PER_MILLI) - .ok() - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) - as i32 - }) + array.unary_opt(|value| { + checked_scale_and_bin_to_nanos( + value as i64, + NANOS_PER_MILLI, + stride, + stride_fn, + origin, + ) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32 + }) }); ColumnarValue::Array(Arc::new(result)) } @@ -722,14 +757,19 @@ fn date_bin_impl( ); } let array = array.as_primitive::(); - let result: PrimitiveArray = array.unary_opt(|x| { - checked_scale_to_nanos(x as i64, NANOS_PER_SEC) - .ok() - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + let result: PrimitiveArray = + array.unary_opt(|value| { + checked_scale_and_bin_to_nanos( + value as i64, + NANOS_PER_SEC, + stride, + stride_fn, + origin, + ) .map(|binned| { ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 }) - }); + }); ColumnarValue::Array(Arc::new(result)) } Time64(Microsecond) => { @@ -740,13 +780,17 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.unary_opt(|x| { - checked_scale_to_nanos(x, NANOS_PER_MICRO) - .ok() - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) - .map(|binned| { - (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO - }) + array.unary_opt(|value| { + checked_scale_and_bin_to_nanos( + value, + NANOS_PER_MICRO, + stride, + stride_fn, + origin, + ) + .map(|binned| { + (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO + }) }); ColumnarValue::Array(Arc::new(result)) } From 7d596e1662cf2428ee1b7afdbb31acc7f609c890 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 17 Jun 2026 23:20:10 +0800 Subject: [PATCH 11/14] fix(datetime): revert Time64(Nanosecond) array branch to preserve error behavior - Reverted Time64(Nanosecond) array branch to use try_unary(...) - Maintained prior error behavior - Kept out-of-scope branch unchanged semantically --- datafusion/functions/src/datetime/date_bin.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 8e66b6a94cd73..4dcc6124a215b 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -802,11 +802,11 @@ fn date_bin_impl( } let array = array.as_primitive::(); let result: PrimitiveArray = - array.unary_opt(|x| { + array.try_unary(|x| { stride_fn(stride, x, origin) .map(|binned_nanos| binned_nanos % (NANOSECONDS_IN_DAY)) - .ok() - }); + .map_err(|e| ArrowError::ComputeError(e.to_string())) + })?; ColumnarValue::Array(Arc::new(result)) } _ => { From c2fd024b9cb3f2372969ded42d7f6cdaf48c8e4e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 17 Jun 2026 23:22:58 +0800 Subject: [PATCH 12/14] feat: rename checked_scale_and_bin_to_nanos to checked_scale_and_bin_to_nanos_or_null for clarity on NULL behavior --- datafusion/functions/src/datetime/date_bin.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 4dcc6124a215b..8876660e3654a 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -444,7 +444,7 @@ fn checked_scale_to_nanos(value: i64, scale: i64) -> Result { } } -fn checked_scale_and_bin_to_nanos( +fn checked_scale_and_bin_to_nanos_or_null( value: i64, scale: i64, stride: i64, @@ -583,7 +583,7 @@ fn date_bin_impl( let scale = timestamp_scale::(); value .and_then(|value| { - checked_scale_and_bin_to_nanos(value, scale, stride, stride_fn, origin) + checked_scale_and_bin_to_nanos_or_null(value, scale, stride, stride_fn, origin) }) .map(|binned| binned / scale) } @@ -627,7 +627,7 @@ fn date_bin_impl( } let result = v .and_then(|value| { - checked_scale_and_bin_to_nanos( + checked_scale_and_bin_to_nanos_or_null( value as i64, NANOS_PER_MILLI, stride, @@ -644,7 +644,7 @@ fn date_bin_impl( } let result = v .and_then(|value| { - checked_scale_and_bin_to_nanos( + checked_scale_and_bin_to_nanos_or_null( value as i64, NANOS_PER_SEC, stride, @@ -672,7 +672,7 @@ fn date_bin_impl( } let result = v .and_then(|value| { - checked_scale_and_bin_to_nanos( + checked_scale_and_bin_to_nanos_or_null( value, NANOS_PER_MICRO, stride, @@ -699,7 +699,7 @@ fn date_bin_impl( // Per-row errors become NULL, matching scalar behavior. let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos(value, scale, stride, stride_fn, origin) + checked_scale_and_bin_to_nanos_or_null(value, scale, stride, stride_fn, origin) .map(|binned| binned / scale) }); @@ -737,7 +737,7 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos( + checked_scale_and_bin_to_nanos_or_null( value as i64, NANOS_PER_MILLI, stride, @@ -759,7 +759,7 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos( + checked_scale_and_bin_to_nanos_or_null( value as i64, NANOS_PER_SEC, stride, @@ -781,7 +781,7 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos( + checked_scale_and_bin_to_nanos_or_null( value, NANOS_PER_MICRO, stride, From 3ddd544d9d8883e769419817af1cb3bd3f19be0b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 17 Jun 2026 23:50:26 +0800 Subject: [PATCH 13/14] feat(datetime): format code in date_bin.rs for improved readability - Reformatted match statement in `checked_scale_to_nanos` for clarity. - Split long function calls across multiple lines in `date_bin_impl` for better readability. - Enhanced indentation and formatting consistency in error handling and mapping functions. --- datafusion/functions/src/datetime/date_bin.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 8876660e3654a..b96dde74c8c12 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -440,7 +440,9 @@ fn timestamp_scale() -> i64 { fn checked_scale_to_nanos(value: i64, scale: i64) -> Result { match value.checked_mul(scale) { Some(scaled) => Ok(scaled), - None => exec_err!("date_bin timestamp value {value} * scale {scale} overflows i64"), + None => { + exec_err!("date_bin timestamp value {value} * scale {scale} overflows i64") + } } } @@ -583,7 +585,9 @@ fn date_bin_impl( let scale = timestamp_scale::(); value .and_then(|value| { - checked_scale_and_bin_to_nanos_or_null(value, scale, stride, stride_fn, origin) + checked_scale_and_bin_to_nanos_or_null( + value, scale, stride, stride_fn, origin, + ) }) .map(|binned| binned / scale) } @@ -699,8 +703,10 @@ fn date_bin_impl( // Per-row errors become NULL, matching scalar behavior. let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos_or_null(value, scale, stride, stride_fn, origin) - .map(|binned| binned / scale) + checked_scale_and_bin_to_nanos_or_null( + value, scale, stride, stride_fn, origin, + ) + .map(|binned| binned / scale) }); let array = result.with_timezone_opt(tz_opt.clone()); @@ -788,9 +794,7 @@ fn date_bin_impl( stride_fn, origin, ) - .map(|binned| { - (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO - }) + .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO) }); ColumnarValue::Array(Arc::new(result)) } From bb4807e971ded5ef7973772d1fa0a670454aed32 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Sat, 20 Jun 2026 14:50:28 +0800 Subject: [PATCH 14/14] feat: refactor scaling functions and improve error handling - Removed `checked_scale_and_bin_to_nanos_or_null`. - Retained shared `checked_scale_to_nanos`. - Made `.ok()` explicit at source row call sites for clarity. - Preserved origin scaling as the Result error path. - Formatted code using `cargo fmt`. - Verified with `rg` that no wrappers remain, keeping only `checked_mul` in helper functions. --- datafusion/functions/src/datetime/date_bin.rs | 103 ++++++------------ 1 file changed, 31 insertions(+), 72 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index b96dde74c8c12..68f969702b239 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -446,18 +446,6 @@ fn checked_scale_to_nanos(value: i64, scale: i64) -> Result { } } -fn checked_scale_and_bin_to_nanos_or_null( - value: i64, - scale: i64, - stride: i64, - stride_fn: BinFunction, - origin: i64, -) -> Option { - checked_scale_to_nanos(value, scale) - .ok() - .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) -} - fn validate_time_stride(stride: &Interval) -> Result<()> { match stride { Interval::Months(m) if *m > 0 => { @@ -584,11 +572,8 @@ fn date_bin_impl( ) -> Option { let scale = timestamp_scale::(); value - .and_then(|value| { - checked_scale_and_bin_to_nanos_or_null( - value, scale, stride, stride_fn, origin, - ) - }) + .and_then(|value| checked_scale_to_nanos(value, scale).ok()) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| binned / scale) } @@ -631,14 +616,9 @@ fn date_bin_impl( } let result = v .and_then(|value| { - checked_scale_and_bin_to_nanos_or_null( - value as i64, - NANOS_PER_MILLI, - stride, - stride_fn, - origin, - ) + checked_scale_to_nanos(value as i64, NANOS_PER_MILLI).ok() }) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32); ColumnarValue::Scalar(ScalarValue::Time32Millisecond(result)) } @@ -648,14 +628,9 @@ fn date_bin_impl( } let result = v .and_then(|value| { - checked_scale_and_bin_to_nanos_or_null( - value as i64, - NANOS_PER_SEC, - stride, - stride_fn, - origin, - ) + checked_scale_to_nanos(value as i64, NANOS_PER_SEC).ok() }) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32); ColumnarValue::Scalar(ScalarValue::Time32Second(result)) } @@ -675,15 +650,8 @@ fn date_bin_impl( return exec_err!("DATE_BIN with Time64 source requires Time64 origin"); } let result = v - .and_then(|value| { - checked_scale_and_bin_to_nanos_or_null( - value, - NANOS_PER_MICRO, - stride, - stride_fn, - origin, - ) - }) + .and_then(|value| checked_scale_to_nanos(value, NANOS_PER_MICRO).ok()) + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO); ColumnarValue::Scalar(ScalarValue::Time64Microsecond(result)) } @@ -703,10 +671,10 @@ fn date_bin_impl( // Per-row errors become NULL, matching scalar behavior. let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos_or_null( - value, scale, stride, stride_fn, origin, - ) - .map(|binned| binned / scale) + checked_scale_to_nanos(value, scale) + .ok() + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| binned / scale) }); let array = result.with_timezone_opt(tz_opt.clone()); @@ -743,16 +711,13 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos_or_null( - value as i64, - NANOS_PER_MILLI, - stride, - stride_fn, - origin, - ) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) as i32 - }) + checked_scale_to_nanos(value as i64, NANOS_PER_MILLI) + .ok() + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_MILLI) + as i32 + }) }); ColumnarValue::Array(Arc::new(result)) } @@ -765,16 +730,12 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos_or_null( - value as i64, - NANOS_PER_SEC, - stride, - stride_fn, - origin, - ) - .map(|binned| { - ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 - }) + checked_scale_to_nanos(value as i64, NANOS_PER_SEC) + .ok() + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + ((binned % NANOSECONDS_IN_DAY) / NANOS_PER_SEC) as i32 + }) }); ColumnarValue::Array(Arc::new(result)) } @@ -787,14 +748,12 @@ fn date_bin_impl( let array = array.as_primitive::(); let result: PrimitiveArray = array.unary_opt(|value| { - checked_scale_and_bin_to_nanos_or_null( - value, - NANOS_PER_MICRO, - stride, - stride_fn, - origin, - ) - .map(|binned| (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO) + checked_scale_to_nanos(value, NANOS_PER_MICRO) + .ok() + .and_then(|scaled| stride_fn(stride, scaled, origin).ok()) + .map(|binned| { + (binned % NANOSECONDS_IN_DAY) / NANOS_PER_MICRO + }) }); ColumnarValue::Array(Arc::new(result)) }