From 20e8ae2da3b7dbd83a498ac754f2e380e8e3eb29 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 20 May 2026 21:58:06 +0100 Subject: [PATCH 1/5] fix: accept UTC timestamps in parquet writer --- .../src/writer/file_writer/parquet_writer.rs | 203 +++++++++++++++++- 1 file changed, 199 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..eb16a17ea9 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -20,6 +20,8 @@ use std::collections::HashMap; use std::sync::Arc; +use arrow_array::RecordBatch; +use arrow_cast::cast; use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; @@ -79,8 +81,10 @@ impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; async fn build(&self, output_file: OutputFile) -> Result { + let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); Ok(ParquetWriter { schema: self.schema.clone(), + arrow_schema, inner_writer: None, writer_properties: self.props.clone(), current_row_num: 0, @@ -211,6 +215,7 @@ impl SchemaVisitor for IndexByParquetPathName { /// `ParquetWriter`` is used to write arrow data into parquet file on storage. pub struct ParquetWriter { schema: SchemaRef, + arrow_schema: ArrowSchemaRef, output_file: OutputFile, inner_writer: Option>, writer_properties: WriterProperties, @@ -473,7 +478,7 @@ impl ParquetWriter { } impl FileWriter for ParquetWriter { - async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> { + async fn write(&mut self, batch: &RecordBatch) -> Result<()> { // Skip empty batch if batch.num_rows() == 0 { return Ok(()); @@ -489,12 +494,11 @@ impl FileWriter for ParquetWriter { let writer = if let Some(writer) = &mut self.inner_writer { writer } else { - let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( async_writer, - arrow_schema.clone(), + self.arrow_schema.clone(), Some(self.writer_properties.clone()), ) .map_err(|err| { @@ -505,7 +509,9 @@ impl FileWriter for ParquetWriter { self.inner_writer.as_mut().unwrap() }; - writer.write(batch).await.map_err(|err| { + let batch = coerce_timestamp_columns(batch, &self.arrow_schema)?; + + writer.write(&batch).await.map_err(|err| { Error::new( ErrorKind::Unexpected, "Failed to write using parquet writer.", @@ -551,6 +557,75 @@ impl FileWriter for ParquetWriter { } } +/// Cast columns in `batch` to match `target_schema`, but only when the mismatch is a +/// UTC-equivalent timezone alias (`"UTC"` vs `"+00:00"`). +/// +/// Columns that already match the target type are passed through unchanged. Columns with +/// a UTC alias mismatch are cast using `arrow_cast::cast` (which is a no-op on the values, +/// it just relabels the timezone). Any other type mismatch is left alone — it will surface +/// as the original `Incompatible type` error from the underlying parquet writer, which is +/// the correct behavior for genuinely incompatible schemas. +fn coerce_timestamp_columns( + batch: &RecordBatch, + target_schema: &ArrowSchemaRef, +) -> Result { + // short circuit if schemas are identical. + if batch.schema() == *target_schema { + return Ok(batch.clone()); + } + + let mut cols = batch.columns().to_vec(); + let mut changed = false; + + for (idx, (col, target_field)) in batch + .columns() + .iter() + .zip(target_schema.fields()) + .enumerate() + { + if col.data_type() != target_field.data_type() + && is_utc_timezone_mismatch(col.data_type(), target_field.data_type()) + { + cols[idx] = cast(col, target_field.data_type())?; + changed = true; + } + } + + if !changed { + return Ok(batch.clone()); + } + + RecordBatch::try_new(target_schema.clone(), cols).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Failed to rebuild record batch after casting to target schema.", + ) + .with_source(err) + }) +} + +/// Returns true if `source` and `target` differ only by a UTC-equivalent timezone alias. +/// +/// Specifically, both must be `Timestamp` with the same `TimeUnit`, and their timezone +/// strings must be a `("UTC", "+00:00")` or `("+00:00", "UTC")` pair. +fn is_utc_timezone_mismatch( + source: &arrow_schema::DataType, + target: &arrow_schema::DataType, +) -> bool { + use arrow_schema::DataType; + match (source, target) { + (DataType::Timestamp(s_unit, Some(s_tz)), DataType::Timestamp(t_unit, Some(t_tz))) + if s_unit == t_unit => + { + matches!( + (s_tz.as_ref(), t_tz.as_ref()), + ("UTC", "+00:00") | ("+00:00", "UTC") + ) + } + _ => false, + } +} + impl CurrentFileStatus for ParquetWriter { fn current_file_path(&self) -> String { self.output_file.location().to_string() @@ -2245,6 +2320,126 @@ mod tests { assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0); } + #[tokio::test] + async fn test_writer_casts_utc_alias_timezone() -> Result<()> { + // Iceberg's `Timestamptz` maps to Arrow `Timestamp(µs, "+00:00")`. External + // producers (e.g. DataFusion) commonly tag timestamps with `"UTC"` which is + // semantically identical. The writer should cast transparently. + use crate::arrow::schema_to_arrow_schema; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("utc-alias".to_string(), None, DataFileFormat::Parquet); + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamptz)) + .into(), + NestedField::optional( + 2, + "ts_ns", + Type::Primitive(PrimitiveType::TimestamptzNs), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pw, + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut data_file_writer = DataFileWriterBuilder::new(rolling_writer_builder) + .build(None) + .await?; + + // Build batch with tz="UTC" (the alias that triggers the original error). + let target_arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let utc_fields: Vec = target_arrow_schema + .fields() + .iter() + .map(|f| match f.data_type() { + arrow_schema::DataType::Timestamp(unit, Some(_)) => f + .as_ref() + .clone() + .with_data_type(arrow_schema::DataType::Timestamp(*unit, Some("UTC".into()))), + _ => f.as_ref().clone(), + }) + .collect(); + let batch_schema = Arc::new(arrow_schema::Schema::new_with_metadata( + utc_fields, + target_arrow_schema.metadata().clone(), + )); + let micros = Arc::new( + arrow_array::TimestampMicrosecondArray::from(vec![Some(0_i64), Some(1_000_000)]) + .with_timezone("UTC"), + ) as ArrayRef; + let nanos = Arc::new( + arrow_array::TimestampNanosecondArray::from(vec![Some(0_i64), Some(1_000_000_000)]) + .with_timezone("UTC"), + ) as ArrayRef; + let batch = RecordBatch::try_new(batch_schema, vec![micros, nanos])?; + + data_file_writer.write(batch).await?; + let data_files = data_file_writer.close().await?; + assert_eq!(data_files.len(), 1); + assert_eq!(data_files[0].record_count, 2); + Ok(()) + } + + #[test] + fn test_cast_batch_to_schema_noop_when_matching() { + // When types match, cast_batch_to_schema is a no-op clone. + let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "x", + arrow_schema::DataType::Int32, + false, + )])); + let batch = + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ]) + .unwrap(); + let result = coerce_timestamp_columns(&batch, &schema).unwrap(); + assert_eq!(result.schema(), batch.schema()); + } + + #[test] + fn test_cast_batch_to_schema_passes_through_non_utc_mismatches() { + // Non-UTC mismatches are NOT cast — the batch is returned unchanged so the + // downstream parquet writer produces its normal "Incompatible type" error. + let source_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "x", + arrow_schema::DataType::Int32, + false, + )])); + let target_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "x", + arrow_schema::DataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new(source_schema.clone(), vec![Arc::new( + arrow_array::Int32Array::from(vec![1]), + ) as ArrayRef]) + .unwrap(); + let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); + // The batch is passed through unchanged (Int32, not cast to Utf8). + assert_eq!(result.schema(), source_schema); + } + #[test] fn test_min_max_aggregator() { let schema = Arc::new( From 2b4445e9212a9cd5bafbb6beaa21477333436fd7 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 20 May 2026 22:23:33 +0100 Subject: [PATCH 2/5] fix --- .../src/writer/file_writer/parquet_writer.rs | 44 +++++++------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index eb16a17ea9..172ed2fcdc 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -559,12 +559,6 @@ impl FileWriter for ParquetWriter { /// Cast columns in `batch` to match `target_schema`, but only when the mismatch is a /// UTC-equivalent timezone alias (`"UTC"` vs `"+00:00"`). -/// -/// Columns that already match the target type are passed through unchanged. Columns with -/// a UTC alias mismatch are cast using `arrow_cast::cast` (which is a no-op on the values, -/// it just relabels the timezone). Any other type mismatch is left alone — it will surface -/// as the original `Incompatible type` error from the underlying parquet writer, which is -/// the correct behavior for genuinely incompatible schemas. fn coerce_timestamp_columns( batch: &RecordBatch, target_schema: &ArrowSchemaRef, @@ -709,6 +703,10 @@ mod tests { DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator, }; use crate::writer::tests::check_parquet_data_file; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + fn schema_for_all_type() -> Schema { Schema::builder() @@ -2322,15 +2320,7 @@ mod tests { #[tokio::test] async fn test_writer_casts_utc_alias_timezone() -> Result<()> { - // Iceberg's `Timestamptz` maps to Arrow `Timestamp(µs, "+00:00")`. External - // producers (e.g. DataFusion) commonly tag timestamps with `"UTC"` which is - // semantically identical. The writer should cast transparently. - use crate::arrow::schema_to_arrow_schema; - use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; - use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; - use crate::writer::{IcebergWriter, IcebergWriterBuilder}; - - let temp_dir = TempDir::new().unwrap(); + let temp_dir = TempDir::new()?; let file_io = FileIO::new_with_fs(); let location_gen = DefaultLocationGenerator::with_data_location( temp_dir.path().to_str().unwrap().to_string(), @@ -2366,16 +2356,16 @@ mod tests { .build(None) .await?; - // Build batch with tz="UTC" (the alias that triggers the original error). + // Build batch with tz="UTC" let target_arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); - let utc_fields: Vec = target_arrow_schema + let utc_fields: Vec = target_arrow_schema .fields() .iter() .map(|f| match f.data_type() { - arrow_schema::DataType::Timestamp(unit, Some(_)) => f + DataType::Timestamp(unit, Some(_)) => f .as_ref() .clone() - .with_data_type(arrow_schema::DataType::Timestamp(*unit, Some("UTC".into()))), + .with_data_type(DataType::Timestamp(*unit, Some("UTC".into()))), _ => f.as_ref().clone(), }) .collect(); @@ -2403,14 +2393,14 @@ mod tests { #[test] fn test_cast_batch_to_schema_noop_when_matching() { // When types match, cast_batch_to_schema is a no-op clone. - let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( "x", - arrow_schema::DataType::Int32, + DataType::Int32, false, )])); let batch = RecordBatch::try_new(schema.clone(), vec![ - Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3])) as ArrayRef, + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, ]) .unwrap(); let result = coerce_timestamp_columns(&batch, &schema).unwrap(); @@ -2421,18 +2411,18 @@ mod tests { fn test_cast_batch_to_schema_passes_through_non_utc_mismatches() { // Non-UTC mismatches are NOT cast — the batch is returned unchanged so the // downstream parquet writer produces its normal "Incompatible type" error. - let source_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + let source_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( "x", - arrow_schema::DataType::Int32, + DataType::Int32, false, )])); - let target_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + let target_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( "x", - arrow_schema::DataType::Utf8, + DataType::Utf8, false, )])); let batch = RecordBatch::try_new(source_schema.clone(), vec![Arc::new( - arrow_array::Int32Array::from(vec![1]), + Int32Array::from(vec![1]), ) as ArrayRef]) .unwrap(); let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); From 985a743fdf054c70acf4f6c3d792f10dcf01082d Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 20 May 2026 22:27:35 +0100 Subject: [PATCH 3/5] nested types --- .../src/writer/file_writer/parquet_writer.rs | 319 +++++++++++++++++- 1 file changed, 301 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 172ed2fcdc..bc45896590 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -578,7 +578,7 @@ fn coerce_timestamp_columns( .enumerate() { if col.data_type() != target_field.data_type() - && is_utc_timezone_mismatch(col.data_type(), target_field.data_type()) + && differs_only_by_utc_timezone(col.data_type(), target_field.data_type()) { cols[idx] = cast(col, target_field.data_type())?; changed = true; @@ -598,16 +598,16 @@ fn coerce_timestamp_columns( }) } -/// Returns true if `source` and `target` differ only by a UTC-equivalent timezone alias. -/// -/// Specifically, both must be `Timestamp` with the same `TimeUnit`, and their timezone -/// strings must be a `("UTC", "+00:00")` or `("+00:00", "UTC")` pair. -fn is_utc_timezone_mismatch( +/// Returns true if `source` and `target` differ only by UTC-equivalent timezone aliases +/// at any nesting depth. Recurses into List, LargeList, FixedSizeList, Struct, and Map. +fn differs_only_by_utc_timezone( source: &arrow_schema::DataType, target: &arrow_schema::DataType, ) -> bool { use arrow_schema::DataType; match (source, target) { + (s, t) if s == t => false, + (DataType::Timestamp(s_unit, Some(s_tz)), DataType::Timestamp(t_unit, Some(t_tz))) if s_unit == t_unit => { @@ -616,6 +616,38 @@ fn is_utc_timezone_mismatch( ("UTC", "+00:00") | ("+00:00", "UTC") ) } + + (DataType::List(s_field), DataType::List(t_field)) + | (DataType::LargeList(s_field), DataType::LargeList(t_field)) => { + s_field.name() == t_field.name() + && s_field.is_nullable() == t_field.is_nullable() + && differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) + } + + (DataType::FixedSizeList(s_field, s_size), DataType::FixedSizeList(t_field, t_size)) + if s_size == t_size => + { + s_field.name() == t_field.name() + && s_field.is_nullable() == t_field.is_nullable() + && differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) + } + + (DataType::Struct(s_fields), DataType::Struct(t_fields)) => { + s_fields.len() == t_fields.len() + && s_fields.iter().zip(t_fields.iter()).all(|(sf, tf)| { + sf.name() == tf.name() + && sf.is_nullable() == tf.is_nullable() + && (sf.data_type() == tf.data_type() + || differs_only_by_utc_timezone(sf.data_type(), tf.data_type())) + }) + } + + (DataType::Map(s_field, s_sorted), DataType::Map(t_field, t_sorted)) + if s_sorted == t_sorted => + { + differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) + } + _ => false, } } @@ -699,15 +731,14 @@ mod tests { use crate::io::FileIO; use crate::spec::decimal_utils::{decimal_mantissa, decimal_new, decimal_scale}; use crate::spec::{PrimitiveLiteral, Struct, *}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; use crate::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator, }; - use crate::writer::tests::check_parquet_data_file; - use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use crate::writer::tests::check_parquet_data_file; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; - fn schema_for_all_type() -> Schema { Schema::builder() .with_schema_id(1) @@ -2398,11 +2429,10 @@ mod tests { DataType::Int32, false, )])); - let batch = - RecordBatch::try_new(schema.clone(), vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, - ]) - .unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![ + 1, 2, 3, + ])) as ArrayRef]) + .unwrap(); let result = coerce_timestamp_columns(&batch, &schema).unwrap(); assert_eq!(result.schema(), batch.schema()); } @@ -2421,15 +2451,268 @@ mod tests { DataType::Utf8, false, )])); - let batch = RecordBatch::try_new(source_schema.clone(), vec![Arc::new( - Int32Array::from(vec![1]), - ) as ArrayRef]) - .unwrap(); + let batch = + RecordBatch::try_new(source_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + ]) + .unwrap(); let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); // The batch is passed through unchanged (Int32, not cast to Utf8). assert_eq!(result.schema(), source_schema); } + #[test] + fn test_differs_only_by_utc_timezone_flat_timestamp() { + use arrow_schema::TimeUnit; + assert!(differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + )); + assert!(differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + )); + // Same timezone — no mismatch + assert!(!differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + )); + // Different units — not a UTC alias mismatch + assert!(!differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + &DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + )); + // Non-UTC timezone mismatch + assert!(!differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("America/New_York".into())), + &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + )); + } + + #[test] + fn test_differs_only_by_utc_timezone_list() { + use arrow_schema::TimeUnit; + let source = DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ))); + let target = DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ))); + assert!(differs_only_by_utc_timezone(&source, &target)); + + // LargeList + let source_large = DataType::LargeList(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ))); + let target_large = DataType::LargeList(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ))); + assert!(differs_only_by_utc_timezone(&source_large, &target_large)); + + // List with non-timestamp element — no mismatch + let source_int = DataType::List(Arc::new(Field::new("item", DataType::Int32, true))); + let target_str = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))); + assert!(!differs_only_by_utc_timezone(&source_int, &target_str)); + } + + #[test] + fn test_differs_only_by_utc_timezone_struct() { + use arrow_schema::TimeUnit; + let source = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ])); + let target = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ])); + assert!(differs_only_by_utc_timezone(&source, &target)); + + // Struct with a genuinely incompatible field — should return false + let bad_target = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Utf8, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ])); + assert!(!differs_only_by_utc_timezone(&source, &bad_target)); + } + + #[test] + fn test_differs_only_by_utc_timezone_map() { + use arrow_schema::TimeUnit; + let entries_source = Field::new_struct( + "entries", + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ], + false, + ); + let entries_target = Field::new_struct( + "entries", + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ], + false, + ); + let source = DataType::Map(Arc::new(entries_source), false); + let target = DataType::Map(Arc::new(entries_target), false); + assert!(differs_only_by_utc_timezone(&source, &target)); + + // Map with incompatible key type — should return false + let bad_entries_target = Field::new_struct( + "entries", + vec![ + Field::new("key", DataType::Int32, false), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ], + false, + ); + let bad_target = DataType::Map(Arc::new(bad_entries_target), false); + assert!(!differs_only_by_utc_timezone(&source, &bad_target)); + } + + #[test] + fn test_coerce_timestamp_columns_with_struct() { + use arrow_schema::TimeUnit; + let source_struct_field = Field::new( + "s", + DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ])), + false, + ); + let target_struct_field = Field::new( + "s", + DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ])), + false, + ); + + let source_schema = Arc::new(arrow_schema::Schema::new(vec![source_struct_field.clone()])); + let target_schema = Arc::new(arrow_schema::Schema::new(vec![target_struct_field])); + + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let ts_array = Arc::new( + arrow_array::TimestampMicrosecondArray::from(vec![ + Some(1_000_000), + Some(2_000_000), + Some(3_000_000), + ]) + .with_timezone("UTC"), + ) as ArrayRef; + let struct_array = Arc::new(StructArray::from(vec![ + (Arc::new(Field::new("id", DataType::Int32, false)), id_array), + ( + Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + )), + ts_array, + ), + ])) as ArrayRef; + + let batch = RecordBatch::try_new(source_schema, vec![struct_array]).unwrap(); + let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); + + assert_eq!(result.schema(), target_schema); + assert_eq!(result.num_rows(), 3); + } + + #[test] + fn test_coerce_timestamp_columns_with_list() { + use arrow_schema::TimeUnit; + let source_field = Field::new( + "ts_list", + DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ))), + false, + ); + let target_field = Field::new( + "ts_list", + DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ))), + false, + ); + + let source_schema = Arc::new(arrow_schema::Schema::new(vec![source_field])); + let target_schema = Arc::new(arrow_schema::Schema::new(vec![target_field])); + + let ts_values = arrow_array::TimestampMicrosecondArray::from(vec![ + Some(1_000_000), + Some(2_000_000), + Some(3_000_000), + ]) + .with_timezone("UTC"); + let offsets = arrow_buffer::OffsetBuffer::from_lengths([2, 1]); + let list_array = Arc::new(ListArray::new( + Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + )), + offsets, + Arc::new(ts_values), + None, + )) as ArrayRef; + + let batch = RecordBatch::try_new(source_schema, vec![list_array]).unwrap(); + let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); + + assert_eq!(result.schema(), target_schema); + assert_eq!(result.num_rows(), 2); + } + #[test] fn test_min_max_aggregator() { let schema = Arc::new( From 551c224a598077ef64effdf86aecdb1e94627bc2 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 22 May 2026 10:12:02 +0100 Subject: [PATCH 4/5] move to arrow mod --- crates/iceberg/src/arrow/mod.rs | 1 + crates/iceberg/src/arrow/timestamp_tz.rs | 415 ++++++++++++++++++ .../src/writer/file_writer/parquet_writer.rs | 389 +--------------- 3 files changed, 417 insertions(+), 388 deletions(-) create mode 100644 crates/iceberg/src/arrow/timestamp_tz.rs diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index bf53633cfc..e45fc3bcea 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -29,6 +29,7 @@ pub(crate) mod delete_filter; mod int96; mod reader; +pub(crate) mod timestamp_tz; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; diff --git a/crates/iceberg/src/arrow/timestamp_tz.rs b/crates/iceberg/src/arrow/timestamp_tz.rs new file mode 100644 index 0000000000..57374160b7 --- /dev/null +++ b/crates/iceberg/src/arrow/timestamp_tz.rs @@ -0,0 +1,415 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! UTC timestamp coercion for Arrow RecordBatches. +//! +//! Arrow engines may produce timestamps with timezone "UTC" while Iceberg's +//! canonical Arrow schema uses "+00:00". This module handles the lossless cast +//! between UTC-equivalent timezone representations so the parquet writer can +//! accept data from either convention. + +use arrow_array::RecordBatch; +use arrow_cast::cast; +use arrow_schema::SchemaRef as ArrowSchemaRef; + +use crate::{Error, ErrorKind, Result}; + +/// Coerce timestamp columns in `batch` to match `target_schema` when the only +/// difference is a UTC-equivalent timezone alias (e.g. "UTC" vs "+00:00"). +pub(crate) fn coerce_timestamp_columns( + batch: &RecordBatch, + target_schema: &ArrowSchemaRef, +) -> Result { + if batch.schema() == *target_schema { + return Ok(batch.clone()); + } + + let mut cols = batch.columns().to_vec(); + let mut changed = false; + + for (idx, (col, target_field)) in batch + .columns() + .iter() + .zip(target_schema.fields()) + .enumerate() + { + if col.data_type() != target_field.data_type() + && differs_only_by_utc_timezone(col.data_type(), target_field.data_type()) + { + cols[idx] = cast(col, target_field.data_type())?; + changed = true; + } + } + + if !changed { + return Ok(batch.clone()); + } + + RecordBatch::try_new(target_schema.clone(), cols).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Failed to rebuild record batch after casting to target schema.", + ) + .with_source(err) + }) +} + +/// Returns true if `source` and `target` differ only by UTC-equivalent timezone aliases +/// at any nesting depth. Recurses into List, LargeList, FixedSizeList, Struct, and Map. +fn differs_only_by_utc_timezone( + source: &arrow_schema::DataType, + target: &arrow_schema::DataType, +) -> bool { + use arrow_schema::DataType; + match (source, target) { + (s, t) if s == t => false, + + (DataType::Timestamp(s_unit, Some(s_tz)), DataType::Timestamp(t_unit, Some(t_tz))) + if s_unit == t_unit => + { + matches!( + (s_tz.as_ref(), t_tz.as_ref()), + ("UTC", "+00:00") | ("+00:00", "UTC") + ) + } + + (DataType::List(s_field), DataType::List(t_field)) + | (DataType::LargeList(s_field), DataType::LargeList(t_field)) => { + s_field.name() == t_field.name() + && s_field.is_nullable() == t_field.is_nullable() + && differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) + } + + (DataType::FixedSizeList(s_field, s_size), DataType::FixedSizeList(t_field, t_size)) + if s_size == t_size => + { + s_field.name() == t_field.name() + && s_field.is_nullable() == t_field.is_nullable() + && differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) + } + + (DataType::Struct(s_fields), DataType::Struct(t_fields)) => { + s_fields.len() == t_fields.len() + && s_fields.iter().zip(t_fields.iter()).all(|(sf, tf)| { + sf.name() == tf.name() + && sf.is_nullable() == tf.is_nullable() + && (sf.data_type() == tf.data_type() + || differs_only_by_utc_timezone(sf.data_type(), tf.data_type())) + }) + } + + (DataType::Map(s_field, s_sorted), DataType::Map(t_field, t_sorted)) + if s_sorted == t_sorted => + { + differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) + } + + _ => false, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, ListArray, RecordBatch, StructArray}; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + + use super::*; + + #[test] + fn test_noop_when_matching() { + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "x", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![ + 1, 2, 3, + ])) as ArrayRef]) + .unwrap(); + let result = coerce_timestamp_columns(&batch, &schema).unwrap(); + assert_eq!(result.schema(), batch.schema()); + } + + #[test] + fn test_passes_through_non_utc_mismatches() { + let source_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "x", + DataType::Int32, + false, + )])); + let target_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "x", + DataType::Utf8, + false, + )])); + let batch = + RecordBatch::try_new(source_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + ]) + .unwrap(); + let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); + assert_eq!(result.schema(), source_schema); + } + + #[test] + fn test_differs_only_by_utc_timezone_flat_timestamp() { + assert!(differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + )); + assert!(differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + )); + // Same timezone — no mismatch + assert!(!differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + )); + // Different units — not a UTC alias mismatch + assert!(!differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + &DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + )); + // Non-UTC timezone mismatch + assert!(!differs_only_by_utc_timezone( + &DataType::Timestamp(TimeUnit::Microsecond, Some("America/New_York".into())), + &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + )); + } + + #[test] + fn test_differs_only_by_utc_timezone_list() { + let source = DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ))); + let target = DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ))); + assert!(differs_only_by_utc_timezone(&source, &target)); + + // LargeList + let source_large = DataType::LargeList(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ))); + let target_large = DataType::LargeList(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ))); + assert!(differs_only_by_utc_timezone(&source_large, &target_large)); + + // List with non-timestamp element — no mismatch + let source_int = DataType::List(Arc::new(Field::new("item", DataType::Int32, true))); + let target_str = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))); + assert!(!differs_only_by_utc_timezone(&source_int, &target_str)); + } + + #[test] + fn test_differs_only_by_utc_timezone_struct() { + let source = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ])); + let target = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ])); + assert!(differs_only_by_utc_timezone(&source, &target)); + + // Struct with a genuinely incompatible field — should return false + let bad_target = DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Utf8, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ])); + assert!(!differs_only_by_utc_timezone(&source, &bad_target)); + } + + #[test] + fn test_differs_only_by_utc_timezone_map() { + let entries_source = Field::new_struct( + "entries", + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ], + false, + ); + let entries_target = Field::new_struct( + "entries", + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ], + false, + ); + let source = DataType::Map(Arc::new(entries_source), false); + let target = DataType::Map(Arc::new(entries_target), false); + assert!(differs_only_by_utc_timezone(&source, &target)); + + // Map with incompatible key type — should return false + let bad_entries_target = Field::new_struct( + "entries", + vec![ + Field::new("key", DataType::Int32, false), + Field::new( + "value", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ], + false, + ); + let bad_target = DataType::Map(Arc::new(bad_entries_target), false); + assert!(!differs_only_by_utc_timezone(&source, &bad_target)); + } + + #[test] + fn test_coerce_timestamp_columns_with_struct() { + let source_struct_field = Field::new( + "s", + DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ])), + false, + ); + let target_struct_field = Field::new( + "s", + DataType::Struct(Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ])), + false, + ); + + let source_schema = Arc::new(arrow_schema::Schema::new(vec![source_struct_field])); + let target_schema = Arc::new(arrow_schema::Schema::new(vec![target_struct_field])); + + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let ts_array = Arc::new( + arrow_array::TimestampMicrosecondArray::from(vec![ + Some(1_000_000), + Some(2_000_000), + Some(3_000_000), + ]) + .with_timezone("UTC"), + ) as ArrayRef; + let struct_array = Arc::new(StructArray::from(vec![ + (Arc::new(Field::new("id", DataType::Int32, false)), id_array), + ( + Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + )), + ts_array, + ), + ])) as ArrayRef; + + let batch = RecordBatch::try_new(source_schema, vec![struct_array]).unwrap(); + let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); + + assert_eq!(result.schema(), target_schema); + assert_eq!(result.num_rows(), 3); + } + + #[test] + fn test_coerce_timestamp_columns_with_list() { + let source_field = Field::new( + "ts_list", + DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ))), + false, + ); + let target_field = Field::new( + "ts_list", + DataType::List(Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ))), + false, + ); + + let source_schema = Arc::new(arrow_schema::Schema::new(vec![source_field])); + let target_schema = Arc::new(arrow_schema::Schema::new(vec![target_field])); + + let ts_values = arrow_array::TimestampMicrosecondArray::from(vec![ + Some(1_000_000), + Some(2_000_000), + Some(3_000_000), + ]) + .with_timezone("UTC"); + let offsets = arrow_buffer::OffsetBuffer::from_lengths([2, 1]); + let list_array = Arc::new(ListArray::new( + Arc::new(Field::new( + "item", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + )), + offsets, + Arc::new(ts_values), + None, + )) as ArrayRef; + + let batch = RecordBatch::try_new(source_schema, vec![list_array]).unwrap(); + let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); + + assert_eq!(result.schema(), target_schema); + assert_eq!(result.num_rows(), 2); + } +} diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index bc45896590..0d061d9cd4 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_cast::cast; use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; @@ -34,6 +33,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; use super::{FileWriter, FileWriterBuilder}; +use crate::arrow::timestamp_tz::coerce_timestamp_columns; use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, @@ -557,101 +557,6 @@ impl FileWriter for ParquetWriter { } } -/// Cast columns in `batch` to match `target_schema`, but only when the mismatch is a -/// UTC-equivalent timezone alias (`"UTC"` vs `"+00:00"`). -fn coerce_timestamp_columns( - batch: &RecordBatch, - target_schema: &ArrowSchemaRef, -) -> Result { - // short circuit if schemas are identical. - if batch.schema() == *target_schema { - return Ok(batch.clone()); - } - - let mut cols = batch.columns().to_vec(); - let mut changed = false; - - for (idx, (col, target_field)) in batch - .columns() - .iter() - .zip(target_schema.fields()) - .enumerate() - { - if col.data_type() != target_field.data_type() - && differs_only_by_utc_timezone(col.data_type(), target_field.data_type()) - { - cols[idx] = cast(col, target_field.data_type())?; - changed = true; - } - } - - if !changed { - return Ok(batch.clone()); - } - - RecordBatch::try_new(target_schema.clone(), cols).map_err(|err| { - Error::new( - ErrorKind::DataInvalid, - "Failed to rebuild record batch after casting to target schema.", - ) - .with_source(err) - }) -} - -/// Returns true if `source` and `target` differ only by UTC-equivalent timezone aliases -/// at any nesting depth. Recurses into List, LargeList, FixedSizeList, Struct, and Map. -fn differs_only_by_utc_timezone( - source: &arrow_schema::DataType, - target: &arrow_schema::DataType, -) -> bool { - use arrow_schema::DataType; - match (source, target) { - (s, t) if s == t => false, - - (DataType::Timestamp(s_unit, Some(s_tz)), DataType::Timestamp(t_unit, Some(t_tz))) - if s_unit == t_unit => - { - matches!( - (s_tz.as_ref(), t_tz.as_ref()), - ("UTC", "+00:00") | ("+00:00", "UTC") - ) - } - - (DataType::List(s_field), DataType::List(t_field)) - | (DataType::LargeList(s_field), DataType::LargeList(t_field)) => { - s_field.name() == t_field.name() - && s_field.is_nullable() == t_field.is_nullable() - && differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) - } - - (DataType::FixedSizeList(s_field, s_size), DataType::FixedSizeList(t_field, t_size)) - if s_size == t_size => - { - s_field.name() == t_field.name() - && s_field.is_nullable() == t_field.is_nullable() - && differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) - } - - (DataType::Struct(s_fields), DataType::Struct(t_fields)) => { - s_fields.len() == t_fields.len() - && s_fields.iter().zip(t_fields.iter()).all(|(sf, tf)| { - sf.name() == tf.name() - && sf.is_nullable() == tf.is_nullable() - && (sf.data_type() == tf.data_type() - || differs_only_by_utc_timezone(sf.data_type(), tf.data_type())) - }) - } - - (DataType::Map(s_field, s_sorted), DataType::Map(t_field, t_sorted)) - if s_sorted == t_sorted => - { - differs_only_by_utc_timezone(s_field.data_type(), t_field.data_type()) - } - - _ => false, - } -} - impl CurrentFileStatus for ParquetWriter { fn current_file_path(&self) -> String { self.output_file.location().to_string() @@ -2421,298 +2326,6 @@ mod tests { Ok(()) } - #[test] - fn test_cast_batch_to_schema_noop_when_matching() { - // When types match, cast_batch_to_schema is a no-op clone. - let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( - "x", - DataType::Int32, - false, - )])); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![ - 1, 2, 3, - ])) as ArrayRef]) - .unwrap(); - let result = coerce_timestamp_columns(&batch, &schema).unwrap(); - assert_eq!(result.schema(), batch.schema()); - } - - #[test] - fn test_cast_batch_to_schema_passes_through_non_utc_mismatches() { - // Non-UTC mismatches are NOT cast — the batch is returned unchanged so the - // downstream parquet writer produces its normal "Incompatible type" error. - let source_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( - "x", - DataType::Int32, - false, - )])); - let target_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( - "x", - DataType::Utf8, - false, - )])); - let batch = - RecordBatch::try_new(source_schema.clone(), vec![ - Arc::new(Int32Array::from(vec![1])) as ArrayRef, - ]) - .unwrap(); - let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); - // The batch is passed through unchanged (Int32, not cast to Utf8). - assert_eq!(result.schema(), source_schema); - } - - #[test] - fn test_differs_only_by_utc_timezone_flat_timestamp() { - use arrow_schema::TimeUnit; - assert!(differs_only_by_utc_timezone( - &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - )); - assert!(differs_only_by_utc_timezone( - &DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), - &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), - )); - // Same timezone — no mismatch - assert!(!differs_only_by_utc_timezone( - &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - )); - // Different units — not a UTC alias mismatch - assert!(!differs_only_by_utc_timezone( - &DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - &DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), - )); - // Non-UTC timezone mismatch - assert!(!differs_only_by_utc_timezone( - &DataType::Timestamp(TimeUnit::Microsecond, Some("America/New_York".into())), - &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - )); - } - - #[test] - fn test_differs_only_by_utc_timezone_list() { - use arrow_schema::TimeUnit; - let source = DataType::List(Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - ))); - let target = DataType::List(Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ))); - assert!(differs_only_by_utc_timezone(&source, &target)); - - // LargeList - let source_large = DataType::LargeList(Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - ))); - let target_large = DataType::LargeList(Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ))); - assert!(differs_only_by_utc_timezone(&source_large, &target_large)); - - // List with non-timestamp element — no mismatch - let source_int = DataType::List(Arc::new(Field::new("item", DataType::Int32, true))); - let target_str = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))); - assert!(!differs_only_by_utc_timezone(&source_int, &target_str)); - } - - #[test] - fn test_differs_only_by_utc_timezone_struct() { - use arrow_schema::TimeUnit; - let source = DataType::Struct(Fields::from(vec![ - Field::new("id", DataType::Int32, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - ), - ])); - let target = DataType::Struct(Fields::from(vec![ - Field::new("id", DataType::Int32, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ), - ])); - assert!(differs_only_by_utc_timezone(&source, &target)); - - // Struct with a genuinely incompatible field — should return false - let bad_target = DataType::Struct(Fields::from(vec![ - Field::new("id", DataType::Utf8, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ), - ])); - assert!(!differs_only_by_utc_timezone(&source, &bad_target)); - } - - #[test] - fn test_differs_only_by_utc_timezone_map() { - use arrow_schema::TimeUnit; - let entries_source = Field::new_struct( - "entries", - vec![ - Field::new("key", DataType::Utf8, false), - Field::new( - "value", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - ), - ], - false, - ); - let entries_target = Field::new_struct( - "entries", - vec![ - Field::new("key", DataType::Utf8, false), - Field::new( - "value", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ), - ], - false, - ); - let source = DataType::Map(Arc::new(entries_source), false); - let target = DataType::Map(Arc::new(entries_target), false); - assert!(differs_only_by_utc_timezone(&source, &target)); - - // Map with incompatible key type — should return false - let bad_entries_target = Field::new_struct( - "entries", - vec![ - Field::new("key", DataType::Int32, false), - Field::new( - "value", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ), - ], - false, - ); - let bad_target = DataType::Map(Arc::new(bad_entries_target), false); - assert!(!differs_only_by_utc_timezone(&source, &bad_target)); - } - - #[test] - fn test_coerce_timestamp_columns_with_struct() { - use arrow_schema::TimeUnit; - let source_struct_field = Field::new( - "s", - DataType::Struct(Fields::from(vec![ - Field::new("id", DataType::Int32, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - ), - ])), - false, - ); - let target_struct_field = Field::new( - "s", - DataType::Struct(Fields::from(vec![ - Field::new("id", DataType::Int32, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ), - ])), - false, - ); - - let source_schema = Arc::new(arrow_schema::Schema::new(vec![source_struct_field.clone()])); - let target_schema = Arc::new(arrow_schema::Schema::new(vec![target_struct_field])); - - let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; - let ts_array = Arc::new( - arrow_array::TimestampMicrosecondArray::from(vec![ - Some(1_000_000), - Some(2_000_000), - Some(3_000_000), - ]) - .with_timezone("UTC"), - ) as ArrayRef; - let struct_array = Arc::new(StructArray::from(vec![ - (Arc::new(Field::new("id", DataType::Int32, false)), id_array), - ( - Arc::new(Field::new( - "ts", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - )), - ts_array, - ), - ])) as ArrayRef; - - let batch = RecordBatch::try_new(source_schema, vec![struct_array]).unwrap(); - let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); - - assert_eq!(result.schema(), target_schema); - assert_eq!(result.num_rows(), 3); - } - - #[test] - fn test_coerce_timestamp_columns_with_list() { - use arrow_schema::TimeUnit; - let source_field = Field::new( - "ts_list", - DataType::List(Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - ))), - false, - ); - let target_field = Field::new( - "ts_list", - DataType::List(Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - true, - ))), - false, - ); - - let source_schema = Arc::new(arrow_schema::Schema::new(vec![source_field])); - let target_schema = Arc::new(arrow_schema::Schema::new(vec![target_field])); - - let ts_values = arrow_array::TimestampMicrosecondArray::from(vec![ - Some(1_000_000), - Some(2_000_000), - Some(3_000_000), - ]) - .with_timezone("UTC"); - let offsets = arrow_buffer::OffsetBuffer::from_lengths([2, 1]); - let list_array = Arc::new(ListArray::new( - Arc::new(Field::new( - "item", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - true, - )), - offsets, - Arc::new(ts_values), - None, - )) as ArrayRef; - - let batch = RecordBatch::try_new(source_schema, vec![list_array]).unwrap(); - let result = coerce_timestamp_columns(&batch, &target_schema).unwrap(); - - assert_eq!(result.schema(), target_schema); - assert_eq!(result.num_rows(), 2); - } - #[test] fn test_min_max_aggregator() { let schema = Arc::new( From d93d46102326a4d4d3a0d892697f3dd000854ed3 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 22 May 2026 10:17:59 +0100 Subject: [PATCH 5/5] fmt --- crates/iceberg/src/arrow/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index e45fc3bcea..318404b0b4 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -29,11 +29,11 @@ pub(crate) mod delete_filter; mod int96; mod reader; -pub(crate) mod timestamp_tz; /// RecordBatch projection utilities pub mod record_batch_projector; pub(crate) mod record_batch_transformer; mod scan_metrics; +pub(crate) mod timestamp_tz; mod value; pub use reader::*;