diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index 9e69c7f15a841..9bdcda265ea7e 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -476,7 +476,7 @@ impl TableProvider for IndexTableProvider { .partitioned_file() // provide the starting access plan to the DataSourceExec by // storing it as "extensions" on PartitionedFile - .with_extensions(Arc::new(access_plan) as _); + .with_extension(access_plan); // Prepare for scanning let schema = self.schema(); diff --git a/datafusion/common/src/extensions.rs b/datafusion/common/src/extensions.rs new file mode 100644 index 0000000000000..d5a21fb6566ec --- /dev/null +++ b/datafusion/common/src/extensions.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A type-keyed map of opaque, `Arc`'d objects. +//! +//! Used as the backing store for the various `extensions` fields throughout +//! DataFusion (e.g. [`SessionConfig`], [`ExtendedStatistics`], +//! [`PartitionedFile`]) so that independent components can each attach +//! their own data without conflict, each keyed by its concrete Rust type. +//! +//! [`SessionConfig`]: https://docs.rs/datafusion-execution/latest/datafusion_execution/config/struct.SessionConfig.html +//! [`ExtendedStatistics`]: https://docs.rs/datafusion-physical-plan/latest/datafusion_physical_plan/operator_statistics/struct.ExtendedStatistics.html +//! [`PartitionedFile`]: https://docs.rs/datafusion-datasource/latest/datafusion_datasource/struct.PartitionedFile.html + +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::hash::{BuildHasherDefault, Hasher}; +use std::sync::Arc; + +/// A type-keyed map of opaque `Arc`'d values. Each Rust type `T` occupies +/// its own slot, so independent components can each attach their own data +/// without conflict. +/// +/// Cloning is cheap: the backing values are reference-counted. +/// +/// # Example +/// +/// ``` +/// # use std::sync::Arc; +/// # use datafusion_common::extensions::Extensions; +/// struct MyData(u32); +/// struct OtherData(&'static str); +/// +/// let mut ext = Extensions::new(); +/// ext.insert(MyData(42)); +/// ext.insert_arc(Arc::new(OtherData("hello"))); +/// +/// assert_eq!(ext.get::().unwrap().0, 42); +/// assert_eq!(ext.get::().unwrap().0, "hello"); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct Extensions { + inner: HashMap, BuildHasherDefault>, +} + +impl Extensions { + /// Create an empty map. + pub fn new() -> Self { + Self::default() + } + + /// Returns true if no extensions are set. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Number of extensions set. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Insert an extension keyed by its concrete type `T`. Returns the + /// previous value of that type, if any. + /// + /// The value is wrapped in an [`Arc`] internally. If the caller already + /// has an `Arc` and wants to avoid an extra allocation, use + /// [`Self::insert_arc`]. + pub fn insert(&mut self, value: T) -> Option> { + self.insert_arc(Arc::new(value)) + } + + /// Insert an extension keyed by its concrete type `T`, taking an + /// already-allocated [`Arc`]. Returns the previous value of that type, + /// if any. + pub fn insert_arc(&mut self, value: Arc) -> Option> { + self.inner + .insert(TypeId::of::(), value) + .map(|p| Arc::downcast::(p).expect("TypeId matches T")) + } + + /// Insert an already-type-erased value, keyed by its dynamic + /// [`TypeId`]. Used internally to support APIs that accept + /// `Arc` for backwards compatibility and need + /// to recover the concrete type for keying. + /// + /// New code should use [`Self::insert`] or [`Self::insert_arc`], which + /// preserve the concrete type at the call site. + #[deprecated( + since = "54.0.0", + note = "use `insert` or `insert_arc`; only retained to support the deprecated `PartitionedFile::with_extensions` shim" + )] + pub fn insert_dyn( + &mut self, + value: Arc, + ) -> Option> { + let id = (*value).type_id(); + self.inner.insert(id, value) + } + + /// Borrow the extension of type `T`, if set. + pub fn get(&self) -> Option<&T> { + self.inner + .get(&TypeId::of::()) + .and_then(|a| a.downcast_ref::()) + } + + /// Get a cloned `Arc` of the extension, if set. + pub fn get_arc(&self) -> Option> { + self.inner + .get(&TypeId::of::()) + .map(|a| Arc::downcast::(Arc::clone(a)).expect("TypeId matches T")) + } + + /// Returns true if an extension of type `T` is set. + pub fn contains(&self) -> bool { + self.inner.contains_key(&TypeId::of::()) + } + + /// Merge entries from `other` into `self`. Entries in `other` take + /// precedence over existing entries with the same type. + pub fn merge(&mut self, other: &Extensions) { + for (id, ext) in &other.inner { + self.inner.insert(*id, Arc::clone(ext)); + } + } +} + +/// Hasher specialized for [`TypeId`] keys. Since `TypeId` is already a +/// hash produced by the compiler, we don't need to hash it again — we +/// just store the `u64` it writes and return it unchanged. +#[derive(Default)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq)] + struct A(u32); + + #[derive(Debug, PartialEq)] + struct B(&'static str); + + #[test] + fn insert_get_replace() { + let mut ext = Extensions::new(); + assert!(ext.is_empty()); + + ext.insert(A(1)); + ext.insert_arc(Arc::new(B("x"))); + assert_eq!(ext.len(), 2); + assert_eq!(ext.get::(), Some(&A(1))); + assert_eq!(ext.get::(), Some(&B("x"))); + assert!(ext.contains::()); + + let prev = ext.insert(A(2)); + assert_eq!(prev.as_deref(), Some(&A(1))); + assert_eq!(ext.get::(), Some(&A(2))); + } + + #[test] + #[expect(deprecated)] + fn insert_dyn_keys_by_concrete_type() { + let mut ext = Extensions::new(); + let erased: Arc = Arc::new(A(7)); + ext.insert_dyn(erased); + assert_eq!(ext.get::(), Some(&A(7))); + } + + #[test] + fn merge_other_wins() { + let mut a = Extensions::new(); + a.insert(A(1)); + let mut b = Extensions::new(); + b.insert(A(2)); + b.insert(B("hi")); + a.merge(&b); + assert_eq!(a.get::(), Some(&A(2))); + assert_eq!(a.get::(), Some(&B("hi"))); + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 996c563f0d8a2..9be0941b5d575 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -43,6 +43,7 @@ pub mod diagnostic; pub mod display; pub mod encryption; pub mod error; +pub mod extensions; pub mod file_options; pub mod format; pub mod hash_utils; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index ae11fa9a11334..31337ed5d8485 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -38,6 +38,7 @@ use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource_parquet::metadata::DFParquetMetadata; +use datafusion_datasource_parquet::{ParquetAccessPlan, RowGroupAccess}; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -49,6 +50,7 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::errors::ParquetError; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; @@ -71,7 +73,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { .into_iter() .map(|meta| { PartitionedFile::new_from_meta(meta) - .with_extensions(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))) + .with_extension(String::from(EXPECTED_USER_DEFINED_METADATA)) }) .collect(); @@ -107,6 +109,82 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { "); } +/// Regression test for the type-keyed extensions map: independent components +/// must be able to attach their own per-file payloads on the same +/// [`PartitionedFile`] without colliding. Here we attach a custom reader +/// payload (the `String` checked by [`InMemoryParquetFileReaderFactory`]) +/// *and* a [`ParquetAccessPlan`] that skips the first row group, then verify +/// (a) the factory still sees its payload (its internal `assert_eq!` would +/// fire if the slot got overwritten) and (b) the access plan is honored — so +/// only the second row group's 5 rows come out, not all 10. +#[tokio::test] +async fn custom_payload_and_access_plan_coexist() { + // Two row groups of 5 rows each: values 0..=4 in row group 0, 5..=9 in + // row group 1. + let c1: ArrayRef = Arc::new(Int64Array::from((0..10).collect::>())); + let batch = create_batch(vec![("c1", c1)]); + let file_schema = batch.schema().clone(); + + let in_memory = InMemory::new(); + let mut buf = Vec::::with_capacity(32 * 1024); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(5)) + .build(); + let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let location = Path::parse("two-row-groups.parquet").unwrap(); + let size = buf.len() as u64; + in_memory + .put(&location, Bytes::from(buf).into()) + .await + .unwrap(); + let meta = ObjectMeta { + location, + last_modified: chrono::DateTime::from(SystemTime::now()), + size, + e_tag: None, + version: None, + }; + + let access_plan = + ParquetAccessPlan::new(vec![RowGroupAccess::Skip, RowGroupAccess::Scan]); + let pf = PartitionedFile::new_from_meta(meta) + .with_extension(String::from(EXPECTED_USER_DEFINED_METADATA)) + .with_extension(access_plan); + + let store: Arc = Arc::new(in_memory); + let source = Arc::new( + ParquetSource::new(file_schema.clone()).with_parquet_file_reader_factory( + Arc::new(InMemoryParquetFileReaderFactory(Arc::clone(&store))), + ), + ); + let base_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) + .with_file_group(vec![pf].into()) + .build(); + let parquet_exec = DataSourceExec::from_data_source(base_config); + + let session_ctx = SessionContext::new(); + let read = collect(parquet_exec, session_ctx.task_ctx()).await.unwrap(); + + let total: usize = read.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total, 5, + "access plan should have skipped the first row group" + ); + + let values: Vec = read + .iter() + .flat_map(|b| { + let arr = b.column(0).as_any().downcast_ref::().unwrap(); + (0..arr.len()).map(|i| arr.value(i)).collect::>() + }) + .collect(); + assert_eq!(values, vec![5, 6, 7, 8, 9]); +} + #[derive(Debug)] struct InMemoryParquetFileReaderFactory(Arc); @@ -119,12 +197,8 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { metrics: &ExecutionPlanMetricsSet, ) -> Result> { let metadata = partitioned_file - .extensions - .as_ref() + .extension::() .expect("has user defined metadata"); - let metadata = metadata - .downcast_ref::() - .expect("has string metadata"); assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index 9ff8137687c95..31be6fd979fd6 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -349,7 +349,7 @@ impl TestFull { // add the access plan, if any, as an extension if let Some(access_plan) = access_plan { - partitioned_file = partitioned_file.with_extensions(Arc::new(access_plan)); + partitioned_file = partitioned_file.with_extension(access_plan); } // Create a DataSourceExec to read the file diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..bbbd298687ab5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -259,7 +259,7 @@ struct PreparedParquetOpen { partition_index: usize, partitioned_file: PartitionedFile, file_range: Option, - extensions: Option>, + extensions: datafusion_datasource::FileExtensions, file_name: String, file_metrics: ParquetFileMetrics, baseline_metrics: BaselineMetrics, @@ -884,7 +884,7 @@ impl FiltersPreparedParquetOpen { // as many row groups as possible based on the metadata and query let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan( &prepared.file_name, - prepared.extensions.clone(), + &prepared.extensions, rg_metadata.len(), )?); @@ -1540,23 +1540,17 @@ impl ParquetMorselizer { /// Note: file_name is only used for error messages fn create_initial_plan( file_name: &str, - extensions: Option>, + extensions: &datafusion_datasource::FileExtensions, row_group_count: usize, ) -> Result { - if let Some(extensions) = extensions { - if let Some(access_plan) = extensions.downcast_ref::() { - let plan_len = access_plan.len(); - if plan_len != row_group_count { - return exec_err!( - "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" - ); - } - - // check row group count matches the plan - return Ok(access_plan.clone()); - } else { - debug!("DataSourceExec Ignoring unknown extension specified for {file_name}"); + if let Some(access_plan) = extensions.get::() { + let plan_len = access_plan.len(); + if plan_len != row_group_count { + return exec_err!( + "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" + ); } + return Ok(access_plan.clone()); } // default to scanning all row groups @@ -2513,7 +2507,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extensions(Arc::new(access_plan)); + .with_extension(access_plan); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() @@ -2614,7 +2608,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extensions(Arc::new(access_plan)); + .with_extension(access_plan); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..fec36c49cc210 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -230,7 +230,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// access_plan.skip(4); /// // provide the plan as extension to the FileScanConfig /// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) -/// .with_extensions(Arc::new(access_plan)); +/// .with_extension(access_plan); /// // create a FileScanConfig to scan this file /// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(ParquetSource::new(schema()))) /// .with_file(partitioned_file).build(); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index a9600271c28ce..c5f9d722006d1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -67,10 +67,18 @@ pub use table_schema::TableSchema; #[expect(deprecated)] pub use statistics::add_row_stats; pub use statistics::compute_all_files_statistics; +use std::any::Any; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; +/// User-defined per-file extension data, keyed by concrete Rust type. +/// +/// Re-exported from [`datafusion_common::extensions::Extensions`]; the same +/// type backs `SessionConfig::extensions`, `ExtendedStatistics::extensions`, +/// and other extension fields throughout DataFusion. +pub type FileExtensions = datafusion_common::extensions::Extensions; + /// Stream of files get listed from object store #[deprecated( since = "54.0.0", @@ -148,8 +156,10 @@ pub struct PartitionedFile { /// underlying format (for example, Parquet `sorting_columns`), but it may also be set /// explicitly via [`Self::with_ordering`]. pub ordering: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, + /// User-defined per-file metadata, keyed by Rust type. Multiple + /// independent components can each attach their own data here without + /// conflict — see [`FileExtensions`]. + pub extensions: FileExtensions, /// The estimated size of the parquet metadata, in bytes pub metadata_size_hint: Option, } @@ -169,7 +179,7 @@ impl PartitionedFile { range: None, statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } } @@ -182,7 +192,7 @@ impl PartitionedFile { range: None, statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } } @@ -201,7 +211,7 @@ impl PartitionedFile { range: Some(FileRange { start, end }), statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } .with_range(start, end) @@ -257,14 +267,34 @@ impl PartitionedFile { self } - /// Update the user defined extensions for this file. + /// Attach a typed user-defined extension to this file. Multiple + /// independent extensions can be attached, each keyed by its concrete + /// Rust type. Inserting a value of a type that already has an extension + /// replaces the previous one. + /// + /// This can be used to pass reader-specific information (e.g. a + /// `ParquetAccessPlan`, or a custom index entry). + pub fn with_extension(mut self, value: T) -> Self { + self.extensions.insert(value); + self + } + + /// Borrow the extension of type `T`, if one is attached. + pub fn extension(&self) -> Option<&T> { + self.extensions.get::() + } + + /// Attach a type-erased extension to this file. /// - /// This can be used to pass reader specific information. - pub fn with_extensions( - mut self, - extensions: Arc, - ) -> Self { - self.extensions = Some(extensions); + /// Kept as a backwards-compatible shim; prefer [`Self::with_extension`] + /// which keys the extension by its concrete Rust type at the call site. + #[deprecated( + since = "54.0.0", + note = "use `with_extension`; the extension is keyed by its concrete type" + )] + pub fn with_extensions(mut self, extensions: Arc) -> Self { + #[expect(deprecated)] + self.extensions.insert_dyn(extensions); self } @@ -338,7 +368,7 @@ impl From for PartitionedFile { range: None, statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } } @@ -535,7 +565,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec, - /// Opaque extensions. - extensions: AnyMap, + /// Opaque extensions, keyed by concrete Rust type. See + /// [`with_extension`](Self::with_extension) and + /// [`get_extension`](Self::get_extension). + extensions: Extensions, } impl Default for SessionConfig { fn default() -> Self { Self { options: Arc::new(ConfigOptions::new()), - // Assume no extensions by default. - extensions: HashMap::with_capacity_and_hasher( - 0, - BuildHasherDefault::default(), - ), + extensions: Extensions::new(), } } } @@ -602,9 +596,7 @@ impl SessionConfig { where T: Send + Sync + 'static, { - let ext = ext as Arc; - let id = TypeId::of::(); - self.extensions.insert(id, ext); + self.extensions.insert_arc(ext); } /// Get extension, if any for the specified type `T` exists. @@ -614,11 +606,7 @@ impl SessionConfig { where T: Send + Sync + 'static, { - let id = TypeId::of::(); - self.extensions - .get(&id) - .cloned() - .map(|ext| Arc::downcast(ext).expect("TypeId unique")) + self.extensions.get_arc::() } } @@ -631,34 +619,3 @@ impl From for SessionConfig { } } } - -/// Map that holds opaque objects indexed by their type. -/// -/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe]. -/// -/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety -type AnyMap = - HashMap, BuildHasherDefault>; - -/// Hasher for [`AnyMap`]. -/// -/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler. -/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling. -#[derive(Default)] -struct IdHasher(u64); - -impl Hasher for IdHasher { - fn write(&mut self, _: &[u8]) { - unreachable!("TypeId calls write_u64"); - } - - #[inline] - fn write_u64(&mut self, id: u64) { - self.0 = id; - } - - #[inline] - fn finish(&self) -> u64 { - self.0 - } -} diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index 20266e9768ebe..b577482ba4f24 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -86,11 +86,10 @@ //! let stats = registry.compute(plan.as_ref())?; //! ``` -use std::any::{Any, TypeId}; -use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; +use datafusion_common::extensions::Extensions; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; @@ -128,7 +127,7 @@ pub struct ExtendedStatistics { /// Standard statistics (num_rows, byte_size, column stats) base: Arc, /// Type-erased extensions for custom statistics - extensions: HashMap>, + extensions: Extensions, } impl ExtendedStatistics { @@ -136,7 +135,7 @@ impl ExtendedStatistics { pub fn new(base: Statistics) -> Self { Self { base: Arc::new(base), - extensions: HashMap::new(), + extensions: Extensions::new(), } } @@ -144,7 +143,7 @@ impl ExtendedStatistics { pub fn new_arc(base: Arc) -> Self { Self { base, - extensions: HashMap::new(), + extensions: Extensions::new(), } } @@ -160,26 +159,22 @@ impl ExtendedStatistics { /// Get a reference to a custom statistics extension by type. pub fn get_extension(&self) -> Option<&T> { - self.extensions - .get(&TypeId::of::()) - .and_then(|ext| ext.downcast_ref()) + self.extensions.get::() } /// Set a custom statistics extension. pub fn set_extension(&mut self, value: T) { - self.extensions.insert(TypeId::of::(), Arc::new(value)); + self.extensions.insert(value); } /// Check if an extension of the given type exists. pub fn has_extension(&self) -> bool { - self.extensions.contains_key(&TypeId::of::()) + self.extensions.contains::() } /// Merge extensions from another ExtendedStatistics (other's extensions take precedence). pub fn merge_extensions(&mut self, other: &ExtendedStatistics) { - for (type_id, ext) in &other.extensions { - self.extensions.insert(*type_id, Arc::clone(ext)); - } + self.extensions.merge(&other.extensions); } } diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 854dee07241d1..f92979a6642b7 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -436,6 +436,48 @@ SELECT CAST(approx_percentile_cont(quantity, 0.5) AS BIGINT) FROM orders; [#21074]: https://github.com/apache/datafusion/pull/21074 +### `PartitionedFile::extensions` is now a type-keyed map + +`PartitionedFile.extensions` previously held a single +`Option>` slot, so two independent components +could not both attach data to the same file without colliding. The field +is now a `FileExtensions` (a re-export of +`datafusion_common::extensions::Extensions`), a map keyed by concrete Rust +type. Each type occupies its own slot, so multiple consumers (e.g. a +`ParquetAccessPlan` and a custom index entry) can coexist on a single +`PartitionedFile`. + +The previous `with_extensions(Arc)` builder is +deprecated (it still works, keyed by the value's dynamic `TypeId`) in +favor of a typed variant: + +```diff +-let pf = PartitionedFile::new(path, size) +- .with_extensions(Arc::new(access_plan)); ++let pf = PartitionedFile::new(path, size) ++ .with_extension(access_plan); +``` + +Reading an extension no longer requires a manual downcast: + +```diff +-let access_plan = partitioned_file +- .extensions +- .as_ref() +- .and_then(|ext| ext.downcast_ref::()); ++let access_plan = partitioned_file.extension::(); +``` + +The `FileExtensions` API is `insert` / `insert_arc` / `get` / `get_arc` +/ `contains` / `merge`, all generic over the concrete type `T`. Values +are stored as `Arc` so the map remains cheap to clone. + +**Who is affected:** + +- Code that constructs `PartitionedFile` and calls `.with_extensions(...)`. +- Custom `ParquetFileReaderFactory` implementations or other consumers that + read `partitioned_file.extensions` and downcast manually. + ### `Box` and `Arc` `TreeNodeContainer` impls now require `C: Default` The generic `TreeNodeContainer` implementations for `Box` and `Arc` now