diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index eb2d4f932b..bb418b14e9 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -86,6 +86,11 @@ pub fn iceberg::arrow::PartitionValueCalculator::partition_type(&self) -> &icebe pub fn iceberg::arrow::PartitionValueCalculator::try_new(partition_spec: &iceberg::spec::PartitionSpec, table_schema: &iceberg::spec::Schema) -> iceberg::Result impl core::fmt::Debug for iceberg::arrow::PartitionValueCalculator pub fn iceberg::arrow::PartitionValueCalculator::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub struct iceberg::arrow::PrimitiveLiteralArrayBuilder +impl iceberg::arrow::PrimitiveLiteralArrayBuilder +pub fn iceberg::arrow::PrimitiveLiteralArrayBuilder::append_or_null(&mut self, prim_lit: core::option::Option<&iceberg::spec::PrimitiveLiteral>) -> iceberg::Result +pub fn iceberg::arrow::PrimitiveLiteralArrayBuilder::finish(self) -> iceberg::Result +pub fn iceberg::arrow::PrimitiveLiteralArrayBuilder::try_new(data_type: &arrow_schema::datatype::DataType, capacity: usize) -> iceberg::Result pub struct iceberg::arrow::RecordBatchPartitionSplitter impl iceberg::arrow::RecordBatchPartitionSplitter pub fn iceberg::arrow::RecordBatchPartitionSplitter::split(&self, batch: &arrow_array::record_batch::RecordBatch) -> iceberg::Result> @@ -127,6 +132,7 @@ pub fn iceberg::arrow::arrow_schema_to_schema(schema: &arrow_schema::schema::Sch pub fn iceberg::arrow::arrow_schema_to_schema_auto_assign_ids(schema: &arrow_schema::schema::Schema) -> iceberg::Result pub fn iceberg::arrow::arrow_struct_to_literal(struct_array: &arrow_array::array::ArrayRef, ty: &iceberg::spec::StructType) -> iceberg::Result>> pub fn iceberg::arrow::arrow_type_to_type(ty: &arrow_schema::datatype::DataType) -> iceberg::Result +pub fn iceberg::arrow::create_primitive_array_single_element(data_type: &arrow_schema::datatype::DataType, prim_lit: &core::option::Option) -> iceberg::Result pub fn iceberg::arrow::datum_to_arrow_type_with_ree(datum: &iceberg::spec::Datum) -> arrow_schema::datatype::DataType pub fn iceberg::arrow::schema_to_arrow_schema(schema: &iceberg::spec::Schema) -> iceberg::Result pub fn iceberg::arrow::strip_metadata_from_schema(schema: &arrow_schema::schema::Schema) -> iceberg::Result @@ -1289,6 +1295,7 @@ pub fn iceberg::scan::TableScan::column_names(&self) -> core::option::Option<&[a pub async fn iceberg::scan::TableScan::plan_files(&self) -> iceberg::Result pub fn iceberg::scan::TableScan::snapshot(&self) -> core::option::Option<&iceberg::spec::SnapshotRef> pub async fn iceberg::scan::TableScan::to_arrow(&self) -> iceberg::Result +pub fn iceberg::scan::TableScan::to_arrow_from_tasks(&self, tasks: iceberg::scan::FileScanTaskStream) -> iceberg::Result impl core::fmt::Debug for iceberg::scan::TableScan pub fn iceberg::scan::TableScan::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result pub struct iceberg::scan::TableScanBuilder<'a> @@ -3034,6 +3041,7 @@ pub fn iceberg::table::Table::metadata_location_result(&self) -> iceberg::Result pub fn iceberg::table::Table::metadata_ref(&self) -> iceberg::spec::TableMetadataRef pub fn iceberg::table::Table::reader_builder(&self) -> iceberg::arrow::ArrowReaderBuilder pub fn iceberg::table::Table::readonly(&self) -> bool +pub fn iceberg::table::Table::runtime(&self) -> &iceberg::Runtime pub fn iceberg::table::Table::scan(&self) -> iceberg::scan::TableScanBuilder<'_> impl core::clone::Clone for iceberg::table::Table pub fn iceberg::table::Table::clone(&self) -> iceberg::table::Table @@ -3094,6 +3102,7 @@ pub mod iceberg::util pub mod iceberg::util::snapshot pub fn iceberg::util::snapshot::ancestors_between(table_metadata: &iceberg::spec::TableMetadataRef, latest_snapshot_id: i64, oldest_snapshot_id: core::option::Option) -> impl core::iter::traits::iterator::Iterator + core::marker::Send pub fn iceberg::util::snapshot::ancestors_of(table_metadata: &iceberg::spec::TableMetadataRef, snapshot_id: i64) -> impl core::iter::traits::iterator::Iterator + core::marker::Send +pub fn iceberg::util::available_parallelism() -> core::num::nonzero::NonZeroUsize pub mod iceberg::writer pub mod iceberg::writer::base_writer pub mod iceberg::writer::base_writer::data_file_writer diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..ed465c0dca 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -17,6 +17,12 @@ use std::sync::Arc; +use arrow_array::builder::{ + BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeBinaryBuilder, + Float32Builder, Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, + LargeStringBuilder, StringBuilder, Time64MicrosecondBuilder, TimestampMicrosecondBuilder, + TimestampNanosecondBuilder, +}; use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray, @@ -620,187 +626,309 @@ pub fn arrow_primitive_to_literal( ) } +enum PrimitiveLiteralArrayBuilderInner { + Boolean(BooleanBuilder), + Int32(Int32Builder), + Date32(Date32Builder), + Int64(Int64Builder), + Time64Microsecond(Time64MicrosecondBuilder), + TimestampMicrosecond(TimestampMicrosecondBuilder), + TimestampNanosecond(TimestampNanosecondBuilder), + Float32(Float32Builder), + Float64(Float64Builder), + Utf8(StringBuilder), + LargeUtf8(LargeStringBuilder), + Binary(BinaryBuilder), + LargeBinary(LargeBinaryBuilder), + Decimal128(Decimal128Builder), + FixedSizeBinary(FixedSizeBinaryBuilder), +} + +/// Incrementally build an Arrow array from Iceberg primitive literals. +/// +/// The builder's Arrow type is fixed at construction and must match the type +/// DataFusion or the reader will consume. `append_or_null` returns `true` only +/// when the provided literal matched that Arrow type and was appended as a +/// non-null value. +pub struct PrimitiveLiteralArrayBuilder { + inner: PrimitiveLiteralArrayBuilderInner, +} + +impl PrimitiveLiteralArrayBuilder { + /// Create a builder for supported primitive Arrow types. + pub fn try_new(data_type: &DataType, capacity: usize) -> Result { + let inner = match data_type { + DataType::Boolean => { + PrimitiveLiteralArrayBuilderInner::Boolean(BooleanBuilder::with_capacity(capacity)) + } + DataType::Int32 => { + PrimitiveLiteralArrayBuilderInner::Int32(Int32Builder::with_capacity(capacity)) + } + DataType::Date32 => { + PrimitiveLiteralArrayBuilderInner::Date32(Date32Builder::with_capacity(capacity)) + } + DataType::Int64 => { + PrimitiveLiteralArrayBuilderInner::Int64(Int64Builder::with_capacity(capacity)) + } + DataType::Time64(TimeUnit::Microsecond) => { + PrimitiveLiteralArrayBuilderInner::Time64Microsecond( + Time64MicrosecondBuilder::with_capacity(capacity), + ) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + PrimitiveLiteralArrayBuilderInner::TimestampMicrosecond( + TimestampMicrosecondBuilder::with_capacity(capacity) + .with_data_type(data_type.clone()), + ) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + PrimitiveLiteralArrayBuilderInner::TimestampNanosecond( + TimestampNanosecondBuilder::with_capacity(capacity) + .with_data_type(data_type.clone()), + ) + } + DataType::Float32 => { + PrimitiveLiteralArrayBuilderInner::Float32(Float32Builder::with_capacity(capacity)) + } + DataType::Float64 => { + PrimitiveLiteralArrayBuilderInner::Float64(Float64Builder::with_capacity(capacity)) + } + DataType::Utf8 => PrimitiveLiteralArrayBuilderInner::Utf8( + StringBuilder::with_capacity(capacity, capacity), + ), + DataType::LargeUtf8 => PrimitiveLiteralArrayBuilderInner::LargeUtf8( + LargeStringBuilder::with_capacity(capacity, capacity), + ), + DataType::Binary => PrimitiveLiteralArrayBuilderInner::Binary( + BinaryBuilder::with_capacity(capacity, capacity), + ), + DataType::LargeBinary => PrimitiveLiteralArrayBuilderInner::LargeBinary( + LargeBinaryBuilder::with_capacity(capacity, capacity), + ), + DataType::Decimal128(_, _) => PrimitiveLiteralArrayBuilderInner::Decimal128( + Decimal128Builder::with_capacity(capacity).with_data_type(data_type.clone()), + ), + DataType::FixedSizeBinary(width) if *width >= 0 => { + PrimitiveLiteralArrayBuilderInner::FixedSizeBinary( + FixedSizeBinaryBuilder::with_capacity(capacity, *width), + ) + } + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported primitive literal array type: {data_type:?}"), + )); + } + }; + + Ok(Self { inner }) + } + + /// Append a primitive literal or a null value. + /// + /// Returns `false` when `prim_lit` is null or does not match the builder's + /// Arrow type. In either case a null is appended so all columns retain the + /// same row count. + pub fn append_or_null(&mut self, prim_lit: Option<&PrimitiveLiteral>) -> Result { + let Some(prim_lit) = prim_lit else { + self.append_null(); + return Ok(false); + }; + + let appended = match (&mut self.inner, prim_lit) { + (PrimitiveLiteralArrayBuilderInner::Boolean(builder), PrimitiveLiteral::Boolean(v)) => { + builder.append_value(*v); + true + } + (PrimitiveLiteralArrayBuilderInner::Int32(builder), PrimitiveLiteral::Int(v)) => { + builder.append_value(*v); + true + } + (PrimitiveLiteralArrayBuilderInner::Date32(builder), PrimitiveLiteral::Int(v)) => { + builder.append_value(*v); + true + } + (PrimitiveLiteralArrayBuilderInner::Int64(builder), PrimitiveLiteral::Long(v)) => { + builder.append_value(*v); + true + } + ( + PrimitiveLiteralArrayBuilderInner::Time64Microsecond(builder), + PrimitiveLiteral::Long(v), + ) => { + builder.append_value(*v); + true + } + ( + PrimitiveLiteralArrayBuilderInner::TimestampMicrosecond(builder), + PrimitiveLiteral::Long(v), + ) => { + builder.append_value(*v); + true + } + ( + PrimitiveLiteralArrayBuilderInner::TimestampNanosecond(builder), + PrimitiveLiteral::Long(v), + ) => { + builder.append_value(*v); + true + } + (PrimitiveLiteralArrayBuilderInner::Float32(builder), PrimitiveLiteral::Float(v)) => { + builder.append_value(v.0); + true + } + (PrimitiveLiteralArrayBuilderInner::Float64(builder), PrimitiveLiteral::Double(v)) => { + builder.append_value(v.0); + true + } + (PrimitiveLiteralArrayBuilderInner::Utf8(builder), PrimitiveLiteral::String(v)) => { + builder.append_value(v.as_str()); + true + } + ( + PrimitiveLiteralArrayBuilderInner::LargeUtf8(builder), + PrimitiveLiteral::String(v), + ) => { + builder.append_value(v.as_str()); + true + } + (PrimitiveLiteralArrayBuilderInner::Binary(builder), PrimitiveLiteral::Binary(v)) => { + builder.append_value(v.as_slice()); + true + } + ( + PrimitiveLiteralArrayBuilderInner::LargeBinary(builder), + PrimitiveLiteral::Binary(v), + ) => { + builder.append_value(v.as_slice()); + true + } + ( + PrimitiveLiteralArrayBuilderInner::Decimal128(builder), + PrimitiveLiteral::Int128(v), + ) => { + builder.append_value(*v); + true + } + ( + PrimitiveLiteralArrayBuilderInner::Decimal128(builder), + PrimitiveLiteral::UInt128(v), + ) => { + builder.append_value(*v as i128); + true + } + ( + PrimitiveLiteralArrayBuilderInner::FixedSizeBinary(builder), + PrimitiveLiteral::Binary(v), + ) => append_fixed_size_binary_or_null(builder, v.as_slice()), + ( + PrimitiveLiteralArrayBuilderInner::FixedSizeBinary(builder), + PrimitiveLiteral::UInt128(v), + ) => { + let bytes = Uuid::from_u128(*v).into_bytes(); + append_fixed_size_binary_or_null(builder, bytes.as_slice()) + } + (builder, _) => { + append_null_to_inner(builder); + false + } + }; + + Ok(appended) + } + + fn append_null(&mut self) { + append_null_to_inner(&mut self.inner); + } + + /// Finish the builder and return the typed Arrow array. + pub fn finish(mut self) -> Result { + Ok(match &mut self.inner { + PrimitiveLiteralArrayBuilderInner::Boolean(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Int32(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Date32(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Int64(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Time64Microsecond(builder) => { + Arc::new(builder.finish()) + } + PrimitiveLiteralArrayBuilderInner::TimestampMicrosecond(builder) => { + Arc::new(builder.finish()) + } + PrimitiveLiteralArrayBuilderInner::TimestampNanosecond(builder) => { + Arc::new(builder.finish()) + } + PrimitiveLiteralArrayBuilderInner::Float32(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Float64(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Utf8(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::LargeUtf8(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Binary(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::LargeBinary(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::Decimal128(builder) => Arc::new(builder.finish()), + PrimitiveLiteralArrayBuilderInner::FixedSizeBinary(builder) => { + Arc::new(builder.finish()) + } + }) + } +} + +fn append_null_to_inner(builder: &mut PrimitiveLiteralArrayBuilderInner) { + match builder { + PrimitiveLiteralArrayBuilderInner::Boolean(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Int32(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Date32(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Int64(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Time64Microsecond(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::TimestampMicrosecond(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::TimestampNanosecond(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Float32(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Float64(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Utf8(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::LargeUtf8(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Binary(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::LargeBinary(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::Decimal128(builder) => builder.append_null(), + PrimitiveLiteralArrayBuilderInner::FixedSizeBinary(builder) => builder.append_null(), + } +} + +fn append_fixed_size_binary_or_null(builder: &mut FixedSizeBinaryBuilder, value: &[u8]) -> bool { + if builder.append_value(value).is_ok() { + true + } else { + builder.append_null(); + false + } +} + /// Create a single-element array from a primitive literal. /// /// This is used for creating constant arrays (Run-End Encoded arrays) where we need /// a single value that represents all rows. -pub(crate) fn create_primitive_array_single_element( +pub fn create_primitive_array_single_element( data_type: &DataType, prim_lit: &Option, ) -> Result { - match (data_type, prim_lit) { - (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => { - Ok(Arc::new(BooleanArray::from(vec![*v]))) - } - (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::::None]))), - (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => { - Ok(Arc::new(Int32Array::from(vec![*v]))) - } - (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::::None]))), - (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => { - Ok(Arc::new(Date32Array::from(vec![*v]))) - } - (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::::None]))), - (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => { - Ok(Arc::new(Int64Array::from(vec![*v]))) - } - (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::::None]))), - (DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => { - let array = TimestampMicrosecondArray::from(vec![*v]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } - (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { - let array = TimestampMicrosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } - (DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => { - let array = TimestampNanosecondArray::from(vec![*v]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } - (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { - let array = TimestampNanosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } - (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { - Ok(Arc::new(Float32Array::from(vec![v.0]))) - } - (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::::None]))), - (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => { - Ok(Arc::new(Float64Array::from(vec![v.0]))) - } - (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::::None]))), - (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => { - Ok(Arc::new(StringArray::from(vec![v.as_str()]))) - } - (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))), - (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => { - Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()]))) - } - (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![ - Option::<&[u8]>::None, - ]))), - (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => { - let array = Decimal128Array::from(vec![{ *v }]) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?; - Ok(Arc::new(array)) - } - (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(v))) => { - let array = Decimal128Array::from(vec![*v as i128]) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?; - Ok(Arc::new(array)) - } - (DataType::Decimal128(precision, scale), None) => { - let array = Decimal128Array::from(vec![Option::::None]) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?; - Ok(Arc::new(array)) - } - (DataType::Struct(fields), None) => { - // Create a single-element StructArray with nulls - let null_arrays: Vec = fields - .iter() - .map(|f| { - // Recursively create null arrays for struct fields - // For primitive fields in structs, use simple null arrays (not REE within struct) - match f.data_type() { - DataType::Boolean => { - Ok(Arc::new(BooleanArray::from(vec![Option::::None])) - as ArrayRef) - } - DataType::Int32 | DataType::Date32 => { - Ok(Arc::new(Int32Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Int64 => { - Ok(Arc::new(Int64Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Timestamp(TimeUnit::Microsecond, timezone) => { - let array = TimestampMicrosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) - } else { - Ok(Arc::new(array) as ArrayRef) - } - } - DataType::Timestamp(TimeUnit::Nanosecond, timezone) => { - let array = TimestampNanosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) - } else { - Ok(Arc::new(array) as ArrayRef) - } - } - DataType::Float32 => { - Ok(Arc::new(Float32Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Float64 => { - Ok(Arc::new(Float64Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Utf8 => { - Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef) - } - DataType::Binary => { - Ok( - Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) - as ArrayRef, - ) - } - _ => Err(Error::new( - ErrorKind::Unexpected, - format!("Unsupported struct field type: {:?}", f.data_type()), - )), - } - }) - .collect::>>()?; - Ok(Arc::new(arrow_array::StructArray::new( - fields.clone(), - null_arrays, - Some(arrow_buffer::NullBuffer::new_null(1)), - ))) - } - _ => Err(Error::new( + if let (DataType::Struct(fields), None) = (data_type, prim_lit) { + let null_arrays = fields + .iter() + .map(|f| create_primitive_array_single_element(f.data_type(), &None)) + .collect::>>()?; + return Ok(Arc::new(arrow_array::StructArray::new( + fields.clone(), + null_arrays, + Some(arrow_buffer::NullBuffer::new_null(1)), + ))); + } + + let mut builder = PrimitiveLiteralArrayBuilder::try_new(data_type, 1)?; + let appended = builder.append_or_null(prim_lit.as_ref())?; + if prim_lit.is_some() && !appended { + return Err(Error::new( ErrorKind::Unexpected, format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"), - )), + )); } + builder.finish() } /// Create a repeated array from a primitive literal for a given number of rows. @@ -1847,6 +1975,70 @@ mod test { } } + #[test] + fn test_primitive_literal_array_builder_timestamp_timezone_and_null() { + let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())); + let mut builder = PrimitiveLiteralArrayBuilder::try_new(&target_type, 2).unwrap(); + let value = PrimitiveLiteral::Long(1_740_600_000_000_000); + + assert!(builder.append_or_null(Some(&value)).unwrap()); + assert!(!builder.append_or_null(None).unwrap()); + + let array = builder.finish().unwrap(); + assert_eq!(array.data_type(), &target_type); + assert_eq!(array.len(), 2); + assert!(array.is_null(1)); + } + + #[test] + fn test_primitive_literal_array_builder_large_binary() { + let mut builder = PrimitiveLiteralArrayBuilder::try_new(&DataType::LargeBinary, 2).unwrap(); + let value = PrimitiveLiteral::Binary(vec![1, 2, 3]); + + assert!(builder.append_or_null(Some(&value)).unwrap()); + assert!(!builder.append_or_null(None).unwrap()); + + let array = builder.finish().unwrap(); + let binary_array = array + .as_any() + .downcast_ref::() + .expect("expected LargeBinaryArray"); + assert_eq!(binary_array.value(0), &[1, 2, 3]); + assert!(binary_array.is_null(1)); + } + + #[test] + fn test_primitive_literal_array_builder_fixed_size_binary_uuid() { + let mut builder = + PrimitiveLiteralArrayBuilder::try_new(&DataType::FixedSizeBinary(16), 2).unwrap(); + let uuid_bytes = [7_u8; 16]; + let uuid = Uuid::from_bytes(uuid_bytes); + let uuid_value = PrimitiveLiteral::UInt128(uuid.as_u128()); + let wrong_width_value = PrimitiveLiteral::Binary(vec![1, 2]); + + assert!(builder.append_or_null(Some(&uuid_value)).unwrap()); + assert!(!builder.append_or_null(Some(&wrong_width_value)).unwrap()); + + let array = builder.finish().unwrap(); + let fixed_array = array + .as_any() + .downcast_ref::() + .expect("expected FixedSizeBinaryArray"); + assert_eq!(fixed_array.value(0), uuid_bytes.as_slice()); + assert!(fixed_array.is_null(1)); + } + + #[test] + fn test_create_single_element_errors_on_mismatched_literal() { + let value = PrimitiveLiteral::String("not an int".to_string()); + + assert!(create_primitive_array_single_element(&DataType::Int32, &Some(value)).is_err()); + + let null_array = create_primitive_array_single_element(&DataType::Int32, &None).unwrap(); + assert_eq!(null_array.len(), 1); + assert!(null_array.is_null(0)); + } + #[test] fn test_create_decimal_array_repeated_respects_precision() { // Ensure repeated arrays also respect target precision, not Arrow's default. diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 368e8143e2..03effb1496 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -462,6 +462,21 @@ impl TableScan { /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { + self.to_arrow_from_tasks(self.plan_files().await?) + } + + /// Like [`TableScan::to_arrow`], but accepts a caller-supplied + /// [`FileScanTask`] stream instead of running [`TableScan::plan_files`] + /// internally. + /// + /// # Correctness + /// + /// Tasks must come from a [`TableScan`] with the same projection and + /// filter as `self`: predicates are baked into each task at planning + /// time and are not re-applied here. Reader-side configuration + /// (concurrency, batch size, row-group filtering, row selection) is + /// taken from `self` and may differ from the planning scan. + pub fn to_arrow_from_tasks(&self, tasks: FileScanTaskStream) -> Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) @@ -474,7 +489,7 @@ impl TableScan { arrow_reader_builder .build() - .read(self.plan_files().await?) + .read(tasks) .map(|result| result.stream()) } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 31feade038..d12a92f452 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -287,7 +287,7 @@ impl Table { } /// Returns the [`Runtime`] for this table. - pub(crate) fn runtime(&self) -> &Runtime { + pub fn runtime(&self) -> &Runtime { &self.runtime } diff --git a/crates/iceberg/src/util/mod.rs b/crates/iceberg/src/util/mod.rs index 3cf2eef9b0..3532b5e62c 100644 --- a/crates/iceberg/src/util/mod.rs +++ b/crates/iceberg/src/util/mod.rs @@ -34,7 +34,7 @@ const DEFAULT_PARALLELISM: usize = 1; /// are circumstances where the level of available /// parallelism can change during the lifetime of an executing /// process, but this should not be called in a hot loop. -pub(crate) fn available_parallelism() -> NonZeroUsize { +pub fn available_parallelism() -> NonZeroUsize { std::thread::available_parallelism().unwrap_or_else(|err| { tracing::warn!( error = %err, diff --git a/crates/integrations/datafusion/public-api.txt b/crates/integrations/datafusion/public-api.txt index d24bd9fc9e..40bb36f3b5 100644 --- a/crates/integrations/datafusion/public-api.txt +++ b/crates/integrations/datafusion/public-api.txt @@ -15,6 +15,7 @@ pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_t pub mod iceberg_datafusion::physical_plan pub struct iceberg_datafusion::physical_plan::IcebergTableScan impl iceberg_datafusion::physical_plan::IcebergTableScan +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::buckets(&self) -> core::option::Option<&[alloc::sync::Arc<[iceberg::scan::task::FileScanTask]>]> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::limit(&self) -> core::option::Option pub fn iceberg_datafusion::physical_plan::IcebergTableScan::predicates(&self) -> core::option::Option<&iceberg::expr::predicate::Predicate> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::projection(&self) -> core::option::Option<&[alloc::string::String]> @@ -27,11 +28,10 @@ pub fn iceberg_datafusion::physical_plan::IcebergTableScan::fmt_as(&self, _t: da impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergTableScan pub fn iceberg_datafusion::physical_plan::IcebergTableScan::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::physical_plan::IcebergTableScan::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc<(dyn datafusion_physical_plan::execution_plan::ExecutionPlan + 'static)>> -pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, _partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result pub fn iceberg_datafusion::physical_plan::IcebergTableScan::name(&self) -> &str pub fn iceberg_datafusion::physical_plan::IcebergTableScan::properties(&self) -> &alloc::sync::Arc -pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> -pub fn iceberg_datafusion::physical_plan::convert_filters_to_predicate(filters: &[datafusion_expr::expr::Expr]) -> core::option::Option +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_new_children(self: alloc::sync::Arc, children: alloc::vec::Vec>) -> datafusion_common::error::Result> pub fn iceberg_datafusion::physical_plan::project_with_partition(input: alloc::sync::Arc, table: &iceberg::table::Table) -> datafusion_common::error::Result> pub mod iceberg_datafusion::table pub mod iceberg_datafusion::table::metadata_table @@ -81,7 +81,7 @@ pub fn iceberg_datafusion::IcebergTableProvider::fmt(&self, f: &mut core::fmt::F impl datafusion_catalog::table::TableProvider for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::IcebergTableProvider::insert_into<'life0, 'life1, 'async_trait>(&'life0 self, state: &'life1 dyn datafusion_session::session::Session, input: alloc::sync::Arc, _insert_op: datafusion_expr::logical_plan::dml::InsertOp) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait -pub fn iceberg_datafusion::IcebergTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec>, filters: &'life3 [datafusion_expr::expr::Expr], limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait +pub fn iceberg_datafusion::IcebergTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec>, filters: &'life3 [datafusion_expr::expr::Expr], limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait pub fn iceberg_datafusion::IcebergTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef pub fn iceberg_datafusion::IcebergTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType @@ -128,7 +128,7 @@ pub fn iceberg_datafusion::IcebergTableProvider::fmt(&self, f: &mut core::fmt::F impl datafusion_catalog::table::TableProvider for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::IcebergTableProvider::insert_into<'life0, 'life1, 'async_trait>(&'life0 self, state: &'life1 dyn datafusion_session::session::Session, input: alloc::sync::Arc, _insert_op: datafusion_expr::logical_plan::dml::InsertOp) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait -pub fn iceberg_datafusion::IcebergTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec>, filters: &'life3 [datafusion_expr::expr::Expr], limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait +pub fn iceberg_datafusion::IcebergTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec>, filters: &'life3 [datafusion_expr::expr::Expr], limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait pub fn iceberg_datafusion::IcebergTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef pub fn iceberg_datafusion::IcebergTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..5a9845cde0 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -26,6 +26,5 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; -pub use expr_to_predicate::convert_filters_to_predicate; pub use project::project_with_partition; pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..762ee5590e 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -18,11 +18,10 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; -use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -30,59 +29,185 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; +use iceberg::arrow::ArrowReaderBuilder; use iceberg::expr::Predicate; +use iceberg::scan::{FileScanTask, TableScan}; use iceberg::table::Table; +use iceberg::util::available_parallelism; use super::expr_to_predicate::convert_filters_to_predicate; use crate::to_datafusion_error; -/// Manages the scanning process of an Iceberg [`Table`], encapsulating the -/// necessary details and computed properties required for execution planning. +/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. +/// +/// Has two construction modes: lazy single-partition scans that plan files +/// inside `execute(0)`, and eager multi-partition scans over pre-planned +/// [`FileScanTask`] buckets. +/// +/// Note: in eager mode the underlying `TableScan` is rebuilt on every +/// `execute(partition)` call. The per-build cost is bounded (no I/O) and +/// keeps the plan free of `Arc`-shared evaluator caches that are awkward to +/// serialize across workers. #[derive(Debug)] pub struct IcebergTableScan { /// A table in the catalog. table: Table, /// Snapshot of the table to scan. snapshot_id: Option, - /// Stores certain, often expensive to compute, - /// plan properties used in query optimization. + /// Cached plan properties used by query optimization. plan_properties: Arc, - /// Projection column names, None means all columns + /// Projection column names, None means all columns. projection: Option>, - /// Filters to apply to the table scan + /// Filters to apply to the table scan. predicates: Option, - /// Optional limit on the number of rows to return + /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). + buckets: Option]>>, + /// Optional limit on the number of rows to return. limit: Option, } -impl IcebergTableScan { - /// Creates a new [`IcebergTableScan`] object. - pub(crate) fn new( - table: Table, - snapshot_id: Option, - schema: ArrowSchemaRef, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Self { - let output_schema = match projection { - None => schema.clone(), - Some(projection) => Arc::new(schema.project(projection).unwrap()), - }; - let plan_properties = Self::compute_properties(output_schema.clone()); - let projection = get_column_names(schema.clone(), projection); - let predicates = convert_filters_to_predicate(filters); +/// Builder to create an [`IcebergTableScan`]. +pub struct IcebergTableScanBuilder { + table: Table, + snapshot_id: Option, + schema: ArrowSchemaRef, + projection: Option>, + filters: Vec, + limit: Option, + partitioning: Partitioning, + buckets: Option]>>, +} +pub(crate) struct TableScanConfig { + snapshot_id: Option, + column_names: Option>, + predicates: Option, +} + +impl IcebergTableScanBuilder { + /// Creates a builder for a lazy single-partition scan. + pub fn new(table: Table, schema: ArrowSchemaRef) -> Self { Self { table, - snapshot_id, - plan_properties, - projection, - predicates, - limit, + schema, + snapshot_id: None, + projection: None, + filters: vec![], + limit: None, + partitioning: Partitioning::UnknownPartitioning(1), + buckets: None, + } + } + + /// Sets the snapshot to scan. When not set, it uses current snapshot. + pub fn with_snapshot_id(mut self, snapshot_id: Option) -> Self { + self.snapshot_id = snapshot_id; + self + } + + /// Sets the projected output columns. + pub fn with_projection(mut self, projection: Option<&Vec>) -> Self { + self.projection = projection.cloned(); + self + } + + /// Sets the filters to apply to the table scan. + pub fn with_filters(mut self, filters: &[Expr]) -> Self { + self.filters = filters.to_vec(); + self + } + + /// Sets the optional row limit. + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Sets pre-planned task buckets for eager multi-partition scans. + pub fn with_task_buckets( + mut self, + buckets: Vec>, + partitioning: Partitioning, + ) -> Self { + let buckets = buckets + .into_iter() + .map(Arc::<[FileScanTask]>::from) + .collect::>(); + self.buckets = Some(Arc::<[Arc<[FileScanTask]>]>::from(buckets)); + self.partitioning = partitioning; + self + } + + pub(crate) fn table_scan_config(&self) -> TableScanConfig { + TableScanConfig { + snapshot_id: self.snapshot_id, + column_names: get_column_names(self.schema.clone(), self.projection.as_ref()), + predicates: convert_filters_to_predicate(&self.filters), + } + } + + /// Returns the Arrow schema produced by this scan after projection. + pub(crate) fn output_schema(&self) -> DFResult { + match &self.projection { + None => Ok(self.schema.clone()), + Some(projection) => Ok(Arc::new(self.schema.project(projection).map_err( + |err| { + DataFusionError::Plan(format!("Failed to project Iceberg table schema: {err}")) + }, + )?)), + } + } + + /// Builds the underlying Iceberg [`TableScan`] using the same inputs as this plan. + pub(crate) fn build_iceberg_table_scan( + &self, + table_scan_config: &TableScanConfig, + ) -> DFResult { + build_iceberg_table_scan_from_config(&self.table, table_scan_config) + } + + /// Builds the [`IcebergTableScan`]. + pub fn build(self) -> DFResult { + let table_scan_config = self.table_scan_config(); + self.build_with_table_scan_config(table_scan_config) + } + + pub(crate) fn build_with_table_scan_config( + self, + table_scan_config: TableScanConfig, + ) -> DFResult { + if let Some(buckets) = &self.buckets { + let partition_count = self.partitioning.partition_count(); + if buckets.len() != partition_count { + return Err(DataFusionError::Internal(format!( + "IcebergTableScan expected {} task buckets to match partitioning, got {}", + partition_count, + buckets.len() + ))); + } } + + let output_schema = self.output_schema()?; + let plan_properties = Arc::new(PlanProperties::new( + EquivalenceProperties::new(output_schema), + self.partitioning, + EmissionType::Incremental, + Boundedness::Bounded, + )); + + Ok(IcebergTableScan { + table: self.table, + snapshot_id: table_scan_config.snapshot_id, + plan_properties, + projection: table_scan_config.column_names, + predicates: table_scan_config.predicates, + buckets: self.buckets, + limit: self.limit, + }) } +} +impl IcebergTableScan { pub fn table(&self) -> &Table { &self.table } @@ -99,21 +224,21 @@ impl IcebergTableScan { self.predicates.as_ref() } + /// Returns the pre-planned file task buckets. + /// + /// `None` means lazy mode, where file tasks are planned inside `execute`; + /// `Some` means eager mode, where `execute` reads from pre-planned buckets. + pub fn buckets(&self) -> Option<&[Arc<[FileScanTask]>]> { + self.buckets.as_deref() + } + pub fn limit(&self) -> Option { self.limit } - /// Computes [`PlanProperties`] used in query optimization. - fn compute_properties(schema: ArrowSchemaRef) -> Arc { - // TODO: - // This is more or less a placeholder, to be replaced - // once we support output-partitioning - Arc::new(PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - )) + fn total_file_count(&self) -> usize { + self.buckets() + .map_or(0, |buckets| buckets.iter().map(|b| b.len()).sum()) } } @@ -132,8 +257,15 @@ impl ExecutionPlan for IcebergTableScan { fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> DFResult> { + if !children.is_empty() { + return Err(DataFusionError::Internal(format!( + "{} is a leaf node and expects no children, but {} were provided", + self.name(), + children.len() + ))); + } Ok(self) } @@ -143,36 +275,35 @@ impl ExecutionPlan for IcebergTableScan { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { - let fut = get_batch_stream( + let bucket = match &self.buckets { + Some(buckets) => Some(Arc::clone(buckets.get(partition).ok_or_else(|| { + DataFusionError::Internal(format!( + "{}: partition index {partition} is out of bounds (total buckets: {})", + self.name(), + buckets.len() + )) + })?)), + None => None, + }; + + let fut = build_record_batch_stream( self.table.clone(), self.snapshot_id, self.projection.clone(), self.predicates.clone(), + bucket, ); - let stream = futures::stream::once(fut).try_flatten(); - - // Apply limit if specified - let limited_stream: Pin> + Send>> = - if let Some(limit) = self.limit { - let mut remaining = limit; - Box::pin(stream.try_filter_map(move |batch| { - futures::future::ready(if remaining == 0 { - Ok(None) - } else if batch.num_rows() <= remaining { - remaining -= batch.num_rows(); - Ok(Some(batch)) - } else { - let limited_batch = batch.slice(0, remaining); - remaining = 0; - Ok(Some(limited_batch)) - }) - })) - } else { - Box::pin(stream) - }; + + let stream = Box::pin(futures::stream::once(fut).try_flatten()) + as Pin> + Send>>; + + let limited_stream = match self.limit { + Some(limit) => apply_limit(stream, limit), + None => stream, + }; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -187,16 +318,25 @@ impl DisplayAs for IcebergTableScan { _t: datafusion::physical_plan::DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { + let projection = self + .projection + .as_deref() + .map_or(String::new(), |v| v.join(",")); + let predicate = self + .predicates + .as_ref() + .map_or(String::new(), |p| p.to_string()); + write!( f, - "IcebergTableScan projection:[{}] predicate:[{}]", - self.projection - .clone() - .map_or(String::new(), |v| v.join(",")), - self.predicates - .clone() - .map_or(String::from(""), |p| format!("{p}")) + "{} projection:[{projection}] predicate:[{predicate}]", + self.name() )?; + if let Some(buckets) = &self.buckets { + let file_count = self.total_file_count(); + let bucket_count = buckets.len(); + write!(f, " buckets:[{bucket_count}] file_count:[{file_count}]")?; + } if let Some(limit) = self.limit { write!(f, " limit:[{limit}]")?; } @@ -204,40 +344,88 @@ impl DisplayAs for IcebergTableScan { } } -/// Asynchronously retrieves a stream of [`RecordBatch`] instances -/// from a given table. -/// -/// This function initializes a [`TableScan`], builds it, -/// and then converts it into a stream of Arrow [`RecordBatch`]es. -async fn get_batch_stream( +fn build_iceberg_table_scan_from_config( + table: &Table, + table_scan_config: &TableScanConfig, +) -> DFResult { + let scan_builder = match table_scan_config.snapshot_id { + Some(id) => table.scan().snapshot_id(id), + None => table.scan(), + }; + let mut scan_builder = match table_scan_config.column_names.clone() { + Some(names) => scan_builder.select(names), + None => scan_builder.select_all(), + }; + if let Some(pred) = table_scan_config.predicates.clone() { + scan_builder = scan_builder.with_filter(pred); + } + scan_builder.build().map_err(to_datafusion_error) +} + +/// Builds the `RecordBatch` stream for a single partition. When `bucket` is +/// `Some`, streams the pre-planned tasks directly through an `ArrowReader`; +/// when `None`, plans and reads the full scan via `to_arrow`. +async fn build_record_batch_stream( table: Table, snapshot_id: Option, column_names: Option>, predicates: Option, + bucket: Option>, ) -> DFResult> + Send>>> { - let scan_builder = match snapshot_id { - Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), - None => table.scan(), - }; + let stream: Pin> + Send>> = match bucket { + Some(bucket) => { + let task_stream = Box::pin(futures::stream::iter( + (0..bucket.len()).map(move |idx| Ok::<_, iceberg::Error>(bucket[idx].clone())), + )); + let num_cpus = available_parallelism().get(); + let arrow_reader_builder = + ArrowReaderBuilder::new(table.file_io().clone(), table.runtime().clone()) + .with_data_file_concurrency_limit(num_cpus) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true); - let mut scan_builder = match column_names { - Some(column_names) => scan_builder.select(column_names), - None => scan_builder.select_all(), + Box::pin( + arrow_reader_builder + .build() + .read(task_stream) + .map_err(to_datafusion_error)? + .stream(), + ) + } + None => { + let table_scan_config = TableScanConfig { + snapshot_id, + column_names, + predicates, + }; + let table_scan = build_iceberg_table_scan_from_config(&table, &table_scan_config)?; + Box::pin(table_scan.to_arrow().await.map_err(to_datafusion_error)?) + } }; - if let Some(pred) = predicates { - scan_builder = scan_builder.with_filter(pred); - } - let table_scan = scan_builder.build().map_err(to_datafusion_error)?; + Ok(Box::pin(stream.map_err(to_datafusion_error))) +} - let stream = table_scan - .to_arrow() - .await - .map_err(to_datafusion_error)? - .map_err(to_datafusion_error); - Ok(Box::pin(stream)) +/// Truncates a stream of `RecordBatch` to at most `limit` rows. +fn apply_limit( + stream: Pin> + Send>>, + limit: usize, +) -> Pin> + Send>> { + let mut remaining = limit; + Box::pin(stream.try_filter_map(move |batch| { + futures::future::ready(if remaining == 0 { + Ok(None) + } else if batch.num_rows() <= remaining { + remaining -= batch.num_rows(); + Ok(Some(batch)) + } else { + let limited_batch = batch.slice(0, remaining); + remaining = 0; + Ok(Some(limited_batch)) + }) + })) } -fn get_column_names( +pub(super) fn get_column_names( schema: ArrowSchemaRef, projection: Option<&Vec>, ) -> Option> { diff --git a/crates/integrations/datafusion/src/table/bucketing.rs b/crates/integrations/datafusion/src/table/bucketing.rs new file mode 100644 index 0000000000..819ba3b3f3 --- /dev/null +++ b/crates/integrations/datafusion/src/table/bucketing.rs @@ -0,0 +1,463 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Distribution of pre-planned [`FileScanTask`]s into per-partition buckets for +//! eager multi-partition scans. +//! +//! Tasks are distributed by *count*: each task is hashed (on its identity +//! partition key when available, otherwise on its data file path) and placed in +//! `hash % n_partitions`. This evens out the number of files per bucket but is +//! unaware of `file_size_in_bytes`, so a table mixing one large file with many +//! small ones can pile most of the bytes into a single bucket and serialize the +//! query on that partition. +//! +//! A size-aware strategy — first-fit-decreasing bin-packing on +//! `file_size_in_bytes` (optionally with a target split size), mirroring +//! iceberg-java's `TableScanUtil.planTaskGroups` / `BinPacking` — would spread +//! the work more evenly. The byte size is already carried on each +//! [`FileScanTask`], so this is a fairly contained extension; it is tracked as a +//! follow-up in . + +use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema, TimeUnit}; +use datafusion::common::hash_utils::create_hashes; +use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE; +use iceberg::arrow::PrimitiveLiteralArrayBuilder; +use iceberg::scan::FileScanTask; +use iceberg::spec::{Literal, Transform}; +use iceberg::table::Table; + +/// Identity-partitioned column that is also present in the output projection +/// and whose Arrow type can be reconstructed from a `Literal` for hashing. +pub(super) struct IdentityCol { + pub(super) name: String, + /// Position of this column in the *output* schema (after projection). + pub(super) output_idx: usize, + /// Position of this column inside the partition spec's `fields()` slice, + /// matching the slot order of `FileScanTask::partition`. + pub(super) spec_field_idx: usize, + pub(super) output_dtype: DataType, +} + +/// Inspect the table's default partition spec and return the list of identity +/// columns that can support a [`Partitioning::Hash`] declaration. Returns +/// `None` if any condition is violated: +/// - the source column for an identity field is not in the output projection +/// - the source column's Arrow type is not currently supported by +/// the identity hash materialization path +/// - the table has spec evolution (>1 historical specs), since older files +/// may carry a partition tuple that does not align with the default spec +/// +/// Returning `None` forces the scan to declare `UnknownPartitioning` even if +/// bucketing succeeds. +pub(super) fn compute_identity_cols( + table: &Table, + output_schema: &ArrowSchema, +) -> Option> { + let metadata = table.metadata(); + // iceberg-java is less conservative here: it intersects the identity fields + // present in every spec (`Partitioning.groupingKeyType` / + // `commonActiveFieldIds`) and still reports a grouping key on the columns + // that are identity-partitioned across all of them. We deliberately bail + // out on any spec evolution instead, because the bucketing path aligns each + // task's partition slot to the *default* spec and `FileScanTask` does not + // yet carry its own spec id to disambiguate. Tracked as a follow-up in + // . + if metadata.partition_specs_iter().len() > 1 { + return None; + } + let spec = metadata.default_partition_spec(); + let table_schema = metadata.current_schema(); + + let mut cols = Vec::new(); + for (spec_field_idx, pf) in spec.fields().iter().enumerate() { + if pf.transform != Transform::Identity { + continue; + } + let source_field = table_schema.field_by_id(pf.source_id)?; + let output_idx = output_schema.index_of(source_field.name.as_str()).ok()?; + let output_dtype = output_schema.field(output_idx).data_type().clone(); + if !is_supported_dtype(&output_dtype) { + return None; + } + cols.push(IdentityCol { + name: source_field.name.clone(), + output_idx, + spec_field_idx, + output_dtype, + }); + } + Some(cols) +} + +fn is_supported_dtype(dt: &DataType) -> bool { + matches!( + dt, + DataType::Boolean + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Date32 + | DataType::Time64(TimeUnit::Microsecond) + | DataType::Timestamp(TimeUnit::Microsecond, _) + | DataType::Timestamp(TimeUnit::Nanosecond, _) + | DataType::Binary + | DataType::LargeBinary + | DataType::Decimal128(_, _) + | DataType::FixedSizeBinary(_) + ) +} + +/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols` +/// describes a non-empty, hashable identity key, each task is hashed on +/// that key using DataFusion's repartition hash so the resulting partitioning +/// matches what `RepartitionExec` would produce on the same data. Tasks +/// missing partition data fall back to hashing `data_file_path`, which still +/// distributes evenly but breaks the `Hash` contract; the second tuple +/// element flags whether every task supplied a full identity key. +pub(super) fn bucket_tasks( + tasks: Vec, + n_partitions: usize, + identity_cols: Option<&[IdentityCol]>, +) -> (Vec>, bool) { + if n_partitions == 0 { + return (Vec::new(), tasks.is_empty()); + } + let mut buckets: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); + let mut all_full_key = true; + let cols = identity_cols.unwrap_or(&[]); + let identity_hashes = identity_hashes_for_tasks(&tasks, cols); + + for (task_idx, task) in tasks.into_iter().enumerate() { + let bucket_idx = match &identity_hashes { + Some(identity_hashes) if identity_hashes.full_key_by_task[task_idx] => { + (identity_hashes.hashes[task_idx] % n_partitions as u64) as usize + } + None => { + all_full_key = false; + fallback_hash(&task) as usize % n_partitions + } + Some(_) => { + all_full_key = false; + fallback_hash(&task) as usize % n_partitions + } + }; + buckets[bucket_idx].push(task); + } + (buckets, all_full_key) +} + +struct IdentityHashes { + hashes: Vec, + full_key_by_task: Vec, +} + +/// Hash all identity-partition values using [`REPARTITION_RANDOM_STATE`] so the +/// bucket assignment matches DataFusion's hash-repartition convention. The +/// returned `full_key_by_task` marks rows whose task supplied every identity key +/// slot with a supported non-null literal. +fn identity_hashes_for_tasks( + tasks: &[FileScanTask], + cols: &[IdentityCol], +) -> Option { + if cols.is_empty() { + return None; + } + + let mut builders = cols + .iter() + .map(|col| PrimitiveLiteralArrayBuilder::try_new(&col.output_dtype, tasks.len())) + .collect::>>() + .ok()?; + let mut full_key_by_task = Vec::with_capacity(tasks.len()); + + for task in tasks { + let partition_fields = task.partition.as_ref().map(|partition| partition.fields()); + let mut full_key = partition_fields.is_some(); + + for (builder, col) in builders.iter_mut().zip(cols) { + let lit = partition_fields + .and_then(|fields| fields.get(col.spec_field_idx)) + .and_then(|lit| lit.as_ref()); + let prim_lit = lit.and_then(|lit| match lit { + Literal::Primitive(prim) => Some(prim), + _ => None, + }); + let appended = builder.append_or_null(prim_lit).ok()?; + full_key = full_key && appended; + } + full_key_by_task.push(full_key); + } + + let arrays = builders + .into_iter() + .map(PrimitiveLiteralArrayBuilder::finish) + .collect::>>() + .ok()?; + let mut hashes = vec![0u64; tasks.len()]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + ) + .ok()?; + Some(IdentityHashes { + hashes, + full_key_by_task, + }) +} + +/// Deterministic per-file fallback used when identity hashing cannot produce a +/// bucket. The hash function does not need to match DataFusion's because any +/// task taking this path causes the scan to drop to `UnknownPartitioning`. +fn fallback_hash(task: &FileScanTask) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + task.data_file_path.hash(&mut hasher); + hasher.finish() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::array::{ + ArrayRef, Decimal128Array, Int32Array, StringArray, TimestampMicrosecondArray, + }; + use iceberg::spec::{ + DataFileFormat, Literal, NestedField, PrimitiveType, Schema, Struct, Type, + }; + + use super::*; + + fn scan_task(file_idx: usize, partition: Option) -> FileScanTask { + FileScanTask { + file_size_in_bytes: 1, + start: 0, + length: 1, + record_count: Some(1), + data_file_path: format!("/tmp/file_{file_idx}.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ), + project_field_ids: vec![1, 2], + predicate: None, + deletes: Vec::new(), + partition, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + } + } + + fn bucket_by_file_index( + buckets: &[Vec], + file_count: usize, + ) -> Vec> { + let mut actual_bucket_by_file = vec![None; file_count]; + for (bucket_idx, bucket) in buckets.iter().enumerate() { + for task in bucket { + let file_idx = task + .data_file_path() + .strip_suffix(".parquet") + .and_then(|path| path.rsplit_once("file_").map(|(_, idx)| idx)) + .and_then(|idx| idx.parse::().ok()) + .expect("test data file path should include its row index"); + actual_bucket_by_file[file_idx] = Some(bucket_idx); + } + } + actual_bucket_by_file + } + + #[test] + fn bucket_tasks_hashes_multiple_identity_columns() { + let rows = [(1, "a"), (2, "b"), (1, "b"), (3, "c"), (2, "a")]; + let tasks = rows + .iter() + .enumerate() + .map(|(idx, (id, name))| { + scan_task( + idx, + Some(Struct::from_iter(vec![ + Some(Literal::int(*id)), + Some(Literal::string(*name)), + ])), + ) + }) + .collect::>(); + let cols = vec![ + IdentityCol { + name: "id".to_string(), + output_idx: 0, + spec_field_idx: 0, + output_dtype: DataType::Int32, + }, + IdentityCol { + name: "name".to_string(), + output_idx: 1, + spec_field_idx: 1, + output_dtype: DataType::Utf8, + }, + ]; + let n_partitions = 4_usize; + + let (buckets, all_full_key) = bucket_tasks(tasks, n_partitions, Some(&cols)); + + assert!(all_full_key); + let arrays: Vec = vec![ + Arc::new(Int32Array::from( + rows.iter().map(|(id, _)| *id).collect::>(), + )), + Arc::new(StringArray::from( + rows.iter().map(|(_, name)| *name).collect::>(), + )), + ]; + let mut hashes = vec![0_u64; rows.len()]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + ) + .unwrap(); + + let actual_bucket_by_file = bucket_by_file_index(&buckets, rows.len()); + for (file_idx, hash) in hashes.iter().enumerate() { + let expected_bucket = (hash % n_partitions as u64) as usize; + assert_eq!(actual_bucket_by_file[file_idx], Some(expected_bucket)); + } + } + + #[test] + fn bucket_tasks_hashes_decimal_and_timestamp_identity_columns() { + let rows = [ + (100_i128, 1_740_600_000_000_000_i64), + (200_i128, 1_740_600_100_000_000_i64), + (100_i128, 1_740_600_200_000_000_i64), + ]; + let tasks = rows + .iter() + .enumerate() + .map(|(idx, (price, ts))| { + scan_task( + idx, + Some(Struct::from_iter(vec![ + Some(Literal::decimal(*price)), + Some(Literal::timestamp(*ts)), + ])), + ) + }) + .collect::>(); + let timestamp_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())); + let cols = vec![ + IdentityCol { + name: "price".to_string(), + output_idx: 0, + spec_field_idx: 0, + output_dtype: DataType::Decimal128(18, 2), + }, + IdentityCol { + name: "ts".to_string(), + output_idx: 1, + spec_field_idx: 1, + output_dtype: timestamp_type, + }, + ]; + let n_partitions = 4_usize; + + let (buckets, all_full_key) = bucket_tasks(tasks, n_partitions, Some(&cols)); + + assert!(all_full_key); + let decimal_array = + Decimal128Array::from(rows.iter().map(|(price, _)| *price).collect::>()) + .with_precision_and_scale(18, 2) + .unwrap(); + let timestamp_array = + TimestampMicrosecondArray::from(rows.iter().map(|(_, ts)| *ts).collect::>()) + .with_timezone("UTC"); + let arrays: Vec = vec![Arc::new(decimal_array), Arc::new(timestamp_array)]; + let mut hashes = vec![0_u64; rows.len()]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + ) + .unwrap(); + + let actual_bucket_by_file = bucket_by_file_index(&buckets, rows.len()); + for (file_idx, hash) in hashes.iter().enumerate() { + let expected_bucket = (hash % n_partitions as u64) as usize; + assert_eq!(actual_bucket_by_file[file_idx], Some(expected_bucket)); + } + } + + #[test] + fn bucket_tasks_falls_back_per_task_for_missing_identity_key() { + let tasks = vec![ + scan_task(0, Some(Struct::from_iter(vec![Some(Literal::string("a"))]))), + scan_task(1, Some(Struct::from_iter(vec![None::]))), + scan_task(2, Some(Struct::from_iter(vec![Some(Literal::string("c"))]))), + scan_task(3, None), + ]; + let expected_tasks = tasks.clone(); + let cols = vec![IdentityCol { + name: "name".to_string(), + output_idx: 1, + spec_field_idx: 0, + output_dtype: DataType::Utf8, + }]; + let n_partitions = 5_usize; + + let (buckets, all_full_key) = bucket_tasks(tasks, n_partitions, Some(&cols)); + + assert!(!all_full_key); + let arrays: Vec = vec![Arc::new(StringArray::from(vec![ + Some("a"), + None, + Some("c"), + None, + ]))]; + let mut hashes = vec![0_u64; expected_tasks.len()]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + ) + .unwrap(); + + let actual_bucket_by_file = bucket_by_file_index(&buckets, expected_tasks.len()); + for file_idx in [0_usize, 2] { + let expected_bucket = (hashes[file_idx] % n_partitions as u64) as usize; + assert_eq!(actual_bucket_by_file[file_idx], Some(expected_bucket)); + } + for file_idx in [1_usize, 3] { + let expected_bucket = fallback_hash(&expected_tasks[file_idx]) as usize % n_partitions; + assert_eq!(actual_bucket_by_file[file_idx], Some(expected_bucket)); + } + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..af4512308a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -25,6 +25,7 @@ //! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific //! table snapshot. Use for consistent analytical queries or time-travel scenarios. +mod bucketing; pub mod metadata_table; pub mod table_provider_factory; @@ -40,10 +41,14 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use futures::TryStreamExt; use iceberg::arrow::schema_to_arrow_schema; use iceberg::inspect::MetadataTableType; +use iceberg::scan::FileScanTask; use iceberg::spec::TableProperties; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; @@ -53,7 +58,7 @@ use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; -use crate::physical_plan::scan::IcebergTableScan; +use crate::physical_plan::scan::IcebergTableScanBuilder; use crate::physical_plan::sort::sort_by_partition; use crate::physical_plan::write::IcebergWriteExec; @@ -87,7 +92,6 @@ impl IcebergTableProvider { ) -> Result { let table_ident = TableIdent::new(namespace, name.into()); - // Load table once to get initial schema let table = catalog.load_table(&table_ident).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); @@ -102,7 +106,6 @@ impl IcebergTableProvider { &self, r#type: MetadataTableType, ) -> Result { - // Load fresh table metadata for metadata table access let table = self.catalog.load_table(&self.table_ident).await?; Ok(IcebergMetadataTableProvider { table, r#type }) } @@ -124,27 +127,88 @@ impl TableProvider for IcebergTableProvider { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> DFResult> { - // Load fresh table metadata from catalog + // Second load: fetch the latest snapshot so scans always reflect current table state. let table = self .catalog .load_table(&self.table_ident) .await .map_err(to_datafusion_error)?; - // Create scan with fresh metadata (always use current snapshot) - Ok(Arc::new(IcebergTableScan::new( - table, - None, // Always use current snapshot for catalog-backed provider - self.schema.clone(), - projection, - filters, - limit, - ))) + // Use the same builder path for eager file planning and execution so + // snapshot, projection, and filter handling cannot drift. + let scan_builder = IcebergTableScanBuilder::new(table.clone(), self.schema.clone()) + // Always use current snapshot for catalog-backed provider. + .with_snapshot_id(None) + .with_projection(projection) + .with_filters(filters) + .with_limit(limit); + let table_scan_config = scan_builder.table_scan_config(); + + let tasks: Vec = scan_builder + .build_iceberg_table_scan(&table_scan_config)? + .plan_files() + .await + .map_err(to_datafusion_error)? + .try_collect::>() + .await + .map_err(to_datafusion_error)?; + + // Output schema after projection: column indices in `Hash` exprs and any + // Arrow array we hash must reference this schema, not the full table schema. + let output_schema = scan_builder.output_schema()?; + + let target_partitions = state.config().target_partitions(); + let task_count = tasks.len(); + // Always produce at least 1 partition so that DataFusion can schedule + // the plan normally and callers can safely call execute(0). An empty + // bucket simply yields an empty record-batch stream. + let n_partitions = target_partitions.min(task_count).max(1); + + // identity_cols is Some(non-empty) iff every condition for declaring + // Partitioning::Hash is met: the table's default spec has identity-transform + // fields, every such source column is present in the output projection, and + // every column type is supported by the identity hash materialization path. + // Any miss collapses to None, which forces UnknownPartitioning regardless + // of bucketing strategy. + let identity_cols = bucketing::compute_identity_cols(&table, &output_schema); + + let (buckets, all_had_full_key) = + bucketing::bucket_tasks(tasks, n_partitions, identity_cols.as_deref()); + + let partitioning = if task_count == 0 { + Partitioning::UnknownPartitioning(n_partitions) + } else { + match identity_cols { + Some(cols) if !cols.is_empty() && all_had_full_key => { + let exprs: Vec> = cols + .iter() + .map(|c| { + Arc::new(Column::new(&c.name, c.output_idx)) as Arc + }) + .collect(); + // This declaration is only sound if the Arrow arrays built from + // partition literals hash identically to the column arrays the + // reader emits at scan time. DataFusion's hash dispatch is + // dtype-specific, so any drift in the reader output type (for + // example Utf8 vs Utf8View) must either update the bucketing + // path to materialize that exact dtype or fall back to + // UnknownPartitioning. + Partitioning::Hash(exprs, n_partitions) + } + _ => Partitioning::UnknownPartitioning(n_partitions), + } + }; + + Ok(Arc::new( + scan_builder + .with_task_buckets(buckets, partitioning) + .build_with_table_scan_config(table_scan_config)?, + )) } fn supports_filters_pushdown( @@ -161,7 +225,6 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { - // Load fresh table metadata from catalog let table = self .catalog .load_table(&self.table_ident) @@ -188,7 +251,6 @@ impl TableProvider for IcebergTableProvider { let repartitioned_plan = repartition(plan_with_partition, table.metadata_ref(), target_partitions)?; - // Apply sort node when it's not fanout mode let fanout_enabled = table .metadata() .properties() @@ -314,15 +376,14 @@ impl TableProvider for IcebergStaticTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - // Use cached table (no refresh) - Ok(Arc::new(IcebergTableScan::new( - self.table.clone(), - self.snapshot_id, - self.schema.clone(), - projection, - filters, - limit, - ))) + Ok(Arc::new( + IcebergTableScanBuilder::new(self.table.clone(), self.schema.clone()) + .with_snapshot_id(self.snapshot_id) + .with_projection(projection) + .with_filters(filters) + .with_limit(limit) + .build()?, + )) } fn supports_filters_pushdown( @@ -353,6 +414,7 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; + use async_trait::async_trait; use datafusion::common::Column; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -360,10 +422,13 @@ mod tests { use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::table::{StaticTable, Table}; - use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; + use iceberg::{ + Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, + }; use tempfile::TempDir; use super::*; + use crate::physical_plan::scan::IcebergTableScan; async fn get_test_table_from_metadata_file() -> Table { let metadata_file_name = "TableMetadataV2Valid.json"; @@ -865,4 +930,780 @@ mod tests { "Limit should be None when not specified" ); } + + // Bucketed scan tests + + async fn make_catalog_and_table_for_bucketing() + -> (Arc, NamespaceIdent, String, tempfile::TempDir) { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + (catalog, namespace, "t".to_string(), temp_dir) + } + + /// Registers `n` synthetic data files in the table metadata via the iceberg + /// transaction API. No actual parquet files are written, only the metadata + /// entries that `plan_files()` reads are created. + async fn append_fake_data_files( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + n: usize, + ) { + use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat}; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.to_string())) + .await + .unwrap(); + + let data_files = (0..n) + .map(|i| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .build() + .unwrap() + }) + .collect::>(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(data_files); + action + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + } + + fn ctx_with_target_partitions(n: usize) -> SessionContext { + use datafusion::prelude::SessionConfig; + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(n)) + } + + #[derive(Debug, Clone)] + struct SingleTableCatalog { + table: Table, + } + + impl SingleTableCatalog { + fn new(table: Table) -> Self { + Self { table } + } + } + + #[async_trait] + impl Catalog for SingleTableCatalog { + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> Result> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn create_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result<()> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn load_table(&self, table: &TableIdent) -> Result
{ + if table == self.table.identifier() { + Ok(self.table.clone()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Unknown test table: {table}"), + )) + } + } + + async fn drop_table(&self, _table: &TableIdent) -> Result<()> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn purge_table(&self, _table: &TableIdent) -> Result<()> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn table_exists(&self, table: &TableIdent) -> Result { + Ok(table == self.table.identifier()) + } + + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn register_table( + &self, + _table: &TableIdent, + _metadata_location: String, + ) -> Result
{ + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + + async fn update_table(&self, _commit: TableCommit) -> Result
{ + unimplemented!("SingleTableCatalog only supports load_table in these tests") + } + } + + /// An empty table must produce a single empty-bucket scan so that DataFusion + /// can schedule the plan normally. execute(0) on an empty bucket simply + /// returns an empty record-batch stream. + #[tokio::test] + async fn test_empty_table_single_empty_bucket() { + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + // no files appended + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(8).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + assert_eq!(buckets.len(), 1); + assert_eq!(buckets[0].len(), 0); + assert_eq!(scan.properties().partitioning.partition_count(), 1); + } + + /// When the table has no identity-partition columns, every task takes the + /// fallback (file_path) bucket path, so the declaration must drop to + /// `UnknownPartitioning`. The bucket count should still equal + /// min(target_partitions, num_files). + #[tokio::test] + async fn test_unpartitioned_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 5).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + let total_files: usize = buckets.iter().map(|b| b.len()).sum(); + assert_eq!(total_files, 5); + assert_eq!(buckets.len(), 3); + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(3) + )); + } + + /// Bucket count must be capped at the number of files: spinning up more + /// DataFusion partitions than there are tasks would just leave empty + /// streams, wasting scheduler slots. + #[tokio::test] + async fn test_bucket_count_capped_at_file_count() { + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 2).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(16).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + assert_eq!(buckets.len(), 2); + } + + /// target_partitions = 1 collapses every task into a single bucket, giving + /// the same execution profile as a single-partition scan. + #[tokio::test] + async fn test_single_target_partition_single_bucket() { + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 4).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(1).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + assert_eq!(buckets.len(), 1); + assert_eq!(buckets[0].len(), 4); + } + + #[tokio::test] + async fn test_catalog_backed_eager_scan_uses_builder_projection_and_predicate() { + use datafusion::prelude::{col, lit}; + use iceberg::expr::Reference; + use iceberg::spec::Datum; + + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 2).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let projection = vec![1_usize]; + let filters = vec![col("id").eq(lit(1_i32))]; + + let plan = provider + .scan( + &ctx_with_target_partitions(2).state(), + Some(&projection), + &filters, + None, + ) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(scan.buckets().is_some(), "expected eager scan buckets"); + assert_eq!(scan.projection().unwrap(), &["name".to_string()]); + assert_eq!( + scan.predicates(), + Some(&Reference::new("id").equal_to(Datum::int(1))) + ); + } + + async fn make_partitioned_catalog_and_table_for_bucketing() + -> (Arc, NamespaceIdent, String, tempfile::TempDir) { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "name_part", Transform::Identity) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + (catalog, namespace, "t".to_string(), temp_dir) + } + + /// Like [`append_fake_data_files`] but each file carries a partition tuple + /// matching the table's identity-partition spec on `name`. + async fn append_partitioned_fake_data_files( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + partition_values: Vec<&str>, + ) { + append_partitioned_fake_data_files_with_optional_values( + catalog, + namespace, + table_name, + partition_values.into_iter().map(Some).collect(), + ) + .await; + } + + async fn append_partitioned_fake_data_files_with_optional_values( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + partition_values: Vec>, + ) { + use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct}; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.to_string())) + .await + .unwrap(); + + let data_files = partition_values + .iter() + .enumerate() + .map(|(i, value)| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![ + value.as_ref().map(|value| Literal::string(*value)), + ])) + .build() + .unwrap() + }) + .collect::>(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(data_files); + action + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + } + + /// Identity-partitioned table whose source column is in the projection + /// must produce `Partitioning::Hash` referencing that column. + #[tokio::test] + async fn test_identity_partitioned_declares_hash() { + use datafusion::physical_expr::expressions::Column; + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + "a", "b", "c", "a", "b", "c", + ]) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + let total_files: usize = buckets.iter().map(|b| b.len()).sum(); + assert_eq!(total_files, 6); + + match &scan.properties().partitioning { + Partitioning::Hash(exprs, n) => { + assert_eq!(*n, 3); + assert_eq!(exprs.len(), 1); + let col = exprs[0] + .as_any() + .downcast_ref::() + .expect("expected Column expr"); + assert_eq!(col.name(), "name"); + } + other => panic!("expected Partitioning::Hash, got {other:?}"), + } + } + + /// Empty identity-partitioned tables still use one empty bucket, but do not + /// claim hash partitioning because there are no tasks proving a full key. + #[tokio::test] + async fn test_empty_identity_partitioned_table_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(8).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + assert_eq!(buckets.len(), 1); + assert_eq!(buckets[0].len(), 0); + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(1) + )); + } + + /// Identity partition task buckets must match DataFusion's own hash + /// repartition bucket calculation for the same concrete Arrow array type. + #[tokio::test] + async fn test_identity_partitioned_hash_buckets_match_datafusion_repartition() { + use datafusion::arrow::array::{ArrayRef, StringArray}; + use datafusion::common::hash_utils::create_hashes; + use datafusion::physical_plan::Partitioning; + use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE; + + let partition_values = vec!["a", "b", "c", "a", "b", "c", "z"]; + let n_partitions = 4_usize; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files( + &catalog, + &namespace, + &table_name, + partition_values.clone(), + ) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan( + &ctx_with_target_partitions(n_partitions).state(), + None, + &[], + None, + ) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::Hash(_, 4) + )); + + let arrays: Vec = vec![Arc::new(StringArray::from(partition_values))]; + let mut hashes = vec![0_u64; arrays[0].len()]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + ) + .unwrap(); + + let mut actual_bucket_by_file = vec![None; hashes.len()]; + for (bucket_idx, bucket) in buckets.iter().enumerate() { + for task in bucket.iter() { + let file_idx = task + .data_file_path() + .strip_suffix(".parquet") + .and_then(|path| path.rsplit_once("fake_").map(|(_, idx)| idx)) + .and_then(|idx| idx.parse::().ok()) + .expect("fake data file path should include its row index"); + actual_bucket_by_file[file_idx] = Some(bucket_idx); + } + } + + for (file_idx, hash) in hashes.iter().enumerate() { + let expected_bucket = (hash % n_partitions as u64) as usize; + assert_eq!( + actual_bucket_by_file[file_idx], + Some(expected_bucket), + "file {file_idx} should be assigned to DataFusion hash bucket {expected_bucket}" + ); + } + } + + fn table_with_additional_partition_spec(table: &Table) -> Table { + use iceberg::TableUpdate; + use iceberg::spec::{Transform, UnboundPartitionSpec}; + + let extra_spec = UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_field(1, "id_part", Transform::Identity) + .unwrap() + .build(); + let metadata = TableUpdate::AddSpec { spec: extra_spec } + .apply(table.metadata().clone().into_builder(None)) + .unwrap() + .build() + .unwrap() + .metadata; + + let mut builder = Table::builder() + .file_io(table.file_io().clone()) + .metadata(Arc::new(metadata)) + .identifier(table.identifier().clone()) + .runtime(table.runtime().clone()); + if let Some(metadata_location) = table.metadata_location() { + builder = builder.metadata_location(metadata_location); + } + builder.build().unwrap() + } + + /// If a table has partition spec evolution, older files may have partition + /// tuples that do not align with the default spec. The scan must therefore + /// keep the eager buckets but avoid declaring hash partitioning. + #[tokio::test] + async fn test_spec_evolution_falls_back_to_unknown_partitioning() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + "a", "b", "c", "d", + ]) + .await; + + let table_ident = TableIdent::new(namespace.clone(), table_name.clone()); + let table = catalog.load_table(&table_ident).await.unwrap(); + let evolved_table = table_with_additional_partition_spec(&table); + assert_eq!(evolved_table.metadata().partition_specs_iter().len(), 2); + + let provider = IcebergTableProvider::try_new( + Arc::new(SingleTableCatalog::new(evolved_table)), + namespace, + table_name, + ) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(4).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(4) + )); + } + + /// If the scan output dtype for an identity partition source cannot be + /// materialized for DataFusion-compatible hashing, the hash declaration is + /// unsound. Timestamp dtypes are supported here, so this uses `Utf8View` as + /// a deliberately unsupported output dtype. + #[tokio::test] + async fn test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning() { + use datafusion::arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + "a", "b", "c", "d", + ]) + .await; + + let table_ident = TableIdent::new(namespace.clone(), table_name.clone()); + let table = catalog.load_table(&table_ident).await.unwrap(); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("name", DataType::Utf8View, false), + ])); + let provider = IcebergTableProvider { + catalog: Arc::new(SingleTableCatalog::new(table)), + table_ident, + schema, + }; + + let plan = provider + .scan(&ctx_with_target_partitions(4).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(4) + )); + } + + /// A null identity partition value forces that task through fallback hashing. + /// Since at least one task did not have a full hash key, the scan must not + /// claim DataFusion hash partitioning. + #[tokio::test] + async fn test_null_partition_value_falls_back_to_unknown_partitioning() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files_with_optional_values( + &catalog, + &namespace, + &table_name, + vec![Some("a"), None, Some("c"), Some("d")], + ) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(4).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + let buckets = scan.buckets().expect("expected eager scan buckets"); + + let total_files: usize = buckets.iter().map(|bucket| bucket.len()).sum(); + assert_eq!(total_files, 4); + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(4) + )); + } + + /// A projection that omits the partition source column drops + /// `compute_identity_cols` to `None`, collapsing to `UnknownPartitioning`. + #[tokio::test] + async fn test_projection_without_partition_col_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec!["a", "b"]).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + // Project only "id" (idx 0), excluding the partition column "name" (idx 1). + let projection = vec![0_usize]; + let plan = provider + .scan( + &ctx_with_target_partitions(3).state(), + Some(&projection), + &[], + None, + ) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(_) + )); + } } diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index cebac75dd9..7603c8b7ab 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -131,7 +131,7 @@ async fn test_provider_plan_stream_schema() -> Result<()> { let task_ctx = Arc::new(df.task_ctx()); let plan = df.create_physical_plan().await.unwrap(); - let stream = plan.execute(1, task_ctx).unwrap(); + let stream = plan.execute(0, task_ctx).unwrap(); // Ensure both the plan and the stream conform to the same schema assert_eq!(plan.schema(), stream.schema()); @@ -600,8 +600,8 @@ async fn test_insert_into_nested() -> Result<()> { // Insert data with nested structs let insert_sql = r#" INSERT INTO catalog.test_insert_nested.nested_table - SELECT - 1 as id, + SELECT + 1 as id, 'Alice' as name, named_struct( 'address', named_struct( @@ -615,8 +615,8 @@ async fn test_insert_into_nested() -> Result<()> { ) ) as profile UNION ALL - SELECT - 2 as id, + SELECT + 2 as id, 'Bob' as name, named_struct( 'address', named_struct( @@ -738,15 +738,15 @@ async fn test_insert_into_nested() -> Result<()> { let df = ctx .sql( r#" - SELECT - id, + SELECT + id, name, profile.address.street, profile.address.city, profile.address.zip, profile.contact.email, profile.contact.phone - FROM catalog.test_insert_nested.nested_table + FROM catalog.test_insert_nested.nested_table ORDER BY id "#, ) @@ -852,8 +852,8 @@ async fn test_insert_into_partitioned() -> Result<()> { let df = ctx .sql( r#" - INSERT INTO catalog.test_partitioned_write.partitioned_table - VALUES + INSERT INTO catalog.test_partitioned_write.partitioned_table + VALUES (1, 'electronics', 'laptop'), (2, 'electronics', 'phone'), (3, 'books', 'novel'), diff --git a/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt b/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt index a5ca4de46a..d9933e0f87 100644 --- a/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt +++ b/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt @@ -53,7 +53,7 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=0, fetch=3 02)--CooperativeExec -03)----IcebergTableScan projection:[id,name,score,category] predicate:[] limit:[3] +03)----IcebergTableScan projection:[id,name,score,category] predicate:[] buckets:[1] file_count:[1] limit:[3] # Test SELECT * with ORDER BY and LIMIT query ITRT diff --git a/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt index aa68ab2762..249d52edd0 100644 --- a/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt @@ -28,7 +28,7 @@ logical_plan physical_plan 01)FilterExec: data@1 = 0102 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,data] predicate:[data = 0102] +03)----IcebergTableScan projection:[id,data] predicate:[data = 0102] buckets:[1] file_count:[0] # Verify empty result from empty table query I? diff --git a/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt index 496f719261..b4596ba6ba 100644 --- a/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt @@ -39,7 +39,7 @@ logical_plan physical_plan 01)FilterExec: is_active@1 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = true] +03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = true] buckets:[1] file_count:[1] # Query with is_active = true query ITT rowsort @@ -59,7 +59,7 @@ logical_plan physical_plan 01)FilterExec: NOT is_active@1 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] +03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] buckets:[1] file_count:[1] # Query with is_active = false query ITT rowsort @@ -78,7 +78,7 @@ logical_plan physical_plan 01)FilterExec: NOT is_active@1 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] +03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] buckets:[1] file_count:[1] # Query with is_active != true (includes false and NULL) query ITT rowsort diff --git a/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt index 3d8b151aa9..698046046a 100644 --- a/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt @@ -36,8 +36,8 @@ logical_plan 02)--TableScan: default.default.test_unpartitioned_table projection=[id, name], partial_filters=[default.default.test_unpartitioned_table.name LIKE Utf8("Al%")] physical_plan 01)FilterExec: name@1 LIKE Al% -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,name] predicate:[name STARTS WITH "Al"] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +03)----IcebergTableScan projection:[id,name] predicate:[name STARTS WITH "Al"] buckets:[2] file_count:[2] # Test LIKE filtering with case-sensitive match query IT rowsort @@ -55,8 +55,8 @@ logical_plan 02)--TableScan: default.default.test_unpartitioned_table projection=[id, name], partial_filters=[default.default.test_unpartitioned_table.name NOT LIKE Utf8("Al%")] physical_plan 01)FilterExec: name@1 NOT LIKE Al% -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,name] predicate:[name NOT STARTS WITH "Al"] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +03)----IcebergTableScan projection:[id,name] predicate:[name NOT STARTS WITH "Al"] buckets:[3] file_count:[3] # Test NOT LIKE filtering query IT rowsort diff --git a/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt index ffa74173dc..47100cc36d 100644 --- a/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt @@ -50,7 +50,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 = 1672921800000000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05 12:30:00] +03)----IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05 12:30:00] buckets:[1] file_count:[1] # Verify timestamp equality filtering works query I? @@ -68,7 +68,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 > 1673308800000000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10 00:00:00] +03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10 00:00:00] buckets:[1] file_count:[1] # Verify timestamp greater than filtering query I? rowsort @@ -97,7 +97,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 >= 1672876800000000000 AND ts@1 <= 1673827199000000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05 00:00:00) AND (ts <= 2023-01-15 23:59:59)] +03)----IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05 00:00:00) AND (ts <= 2023-01-15 23:59:59)] buckets:[1] file_count:[1] # Test timestamp range predicate filtering query I? rowsort @@ -162,7 +162,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 > 1672531200000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-01 00:00:00] +03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-01 00:00:00] buckets:[1] file_count:[1] query I? SELECT * FROM default.default.test_timestamp_micros WHERE ts > CAST('2023-01-01 00:00:00' AS TIMESTAMP)