From 544a519f87790473cdbb5b63de2f73f3543baf5e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 2 May 2026 15:48:05 -0500 Subject: [PATCH 1/8] feat: type-keyed extensions map for PartitionedFile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PartitionedFile.extensions previously held a single Option>, so two independent components could not both attach data to the same file without colliding. This commit introduces a shared TypeId-keyed Extensions map in datafusion-common and uses it for PartitionedFile so each Rust type occupies its own slot. The shared datafusion_common::extensions::Extensions type also gives us a single home for this pattern; SessionConfig.extensions and ExtendedStatistics.extensions can be migrated to it in follow-ups (both are TypeId-keyed maps duplicating the same shape today). Public API changes on PartitionedFile: - with_extensions(Arc) → with_extension(T) and with_extension_arc(Arc) - extensions field type Option> → FileExtensions - new extension::() accessor avoids manual downcasts Migrated in-tree call sites (parquet opener, custom_reader test, external_access_plan test, parquet_advanced_index example) and added an upgrade guide entry. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../data_io/parquet_advanced_index.rs | 2 +- datafusion/common/src/extensions.rs | 189 ++++++++++++++++++ datafusion/common/src/lib.rs | 1 + .../core/tests/parquet/custom_reader.rs | 8 +- .../tests/parquet/external_access_plan.rs | 2 +- datafusion/datasource-parquet/src/opener.rs | 30 ++- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/mod.rs | 51 +++-- .../library-user-guide/upgrading/54.0.0.md | 41 ++++ 9 files changed, 285 insertions(+), 41 deletions(-) create mode 100644 datafusion/common/src/extensions.rs diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index 9e69c7f15a841..9bdcda265ea7e 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -476,7 +476,7 @@ impl TableProvider for IndexTableProvider { .partitioned_file() // provide the starting access plan to the DataSourceExec by // storing it as "extensions" on PartitionedFile - .with_extensions(Arc::new(access_plan) as _); + .with_extension(access_plan); // Prepare for scanning let schema = self.schema(); diff --git a/datafusion/common/src/extensions.rs b/datafusion/common/src/extensions.rs new file mode 100644 index 0000000000000..fc5650791b7a8 --- /dev/null +++ b/datafusion/common/src/extensions.rs @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A type-keyed map of opaque, `Arc`'d objects. +//! +//! Used as the backing store for the various `extensions` fields throughout +//! DataFusion (e.g. [`SessionConfig`], [`ExtendedStatistics`], +//! [`PartitionedFile`]) so that independent components can each attach +//! their own data without conflict, each keyed by its concrete Rust type. +//! +//! [`SessionConfig`]: https://docs.rs/datafusion-execution/latest/datafusion_execution/config/struct.SessionConfig.html +//! [`ExtendedStatistics`]: https://docs.rs/datafusion-physical-plan/latest/datafusion_physical_plan/operator_statistics/struct.ExtendedStatistics.html +//! [`PartitionedFile`]: https://docs.rs/datafusion-datasource/latest/datafusion_datasource/struct.PartitionedFile.html + +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::hash::{BuildHasherDefault, Hasher}; +use std::sync::Arc; + +/// A type-keyed map of opaque `Arc`'d values. Each Rust type `T` occupies +/// its own slot, so independent components can each attach their own data +/// without conflict. +/// +/// Cloning is cheap: the backing values are reference-counted. +/// +/// # Example +/// +/// ``` +/// # use std::sync::Arc; +/// # use datafusion_common::extensions::Extensions; +/// struct MyData(u32); +/// struct OtherData(&'static str); +/// +/// let mut ext = Extensions::new(); +/// ext.insert(MyData(42)); +/// ext.insert(OtherData("hello")); +/// +/// assert_eq!(ext.get::().unwrap().0, 42); +/// assert_eq!(ext.get::().unwrap().0, "hello"); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct Extensions { + inner: HashMap, BuildHasherDefault>, +} + +impl Extensions { + /// Create an empty map. + pub fn new() -> Self { + Self::default() + } + + /// Returns true if no extensions are set. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Number of extensions set. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Insert an extension keyed by its concrete type `T`. Returns the + /// previous value of that type, if any. + pub fn insert(&mut self, value: T) -> Option> { + self.insert_arc(Arc::new(value)) + } + + /// Insert an already-`Arc`ed extension keyed by `T`. Returns the + /// previous value of that type, if any. + pub fn insert_arc(&mut self, value: Arc) -> Option> { + self.inner + .insert(TypeId::of::(), value) + .map(|p| Arc::downcast::(p).expect("TypeId matches T")) + } + + /// Borrow the extension of type `T`, if set. + pub fn get(&self) -> Option<&T> { + self.inner + .get(&TypeId::of::()) + .and_then(|a| a.downcast_ref::()) + } + + /// Get a cloned `Arc` of the extension, if set. + pub fn get_arc(&self) -> Option> { + self.inner + .get(&TypeId::of::()) + .map(|a| Arc::downcast::(Arc::clone(a)).expect("TypeId matches T")) + } + + /// Returns true if an extension of type `T` is set. + pub fn contains(&self) -> bool { + self.inner.contains_key(&TypeId::of::()) + } + + /// Remove and return the extension of type `T`, if set. + pub fn remove(&mut self) -> Option> { + self.inner + .remove(&TypeId::of::()) + .map(|p| Arc::downcast::(p).expect("TypeId matches T")) + } + + /// Merge entries from `other` into `self`. Entries in `other` take + /// precedence over existing entries with the same type. + pub fn merge(&mut self, other: &Extensions) { + for (id, ext) in &other.inner { + self.inner.insert(*id, Arc::clone(ext)); + } + } +} + +/// Hasher specialized for [`TypeId`] keys. Since `TypeId` is already a +/// hash produced by the compiler, we don't need to hash it again — we +/// just store the `u64` it writes and return it unchanged. +#[derive(Default)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq)] + struct A(u32); + + #[derive(Debug, PartialEq)] + struct B(&'static str); + + #[test] + fn insert_get_remove() { + let mut ext = Extensions::new(); + assert!(ext.is_empty()); + + ext.insert(A(1)); + ext.insert(B("x")); + assert_eq!(ext.len(), 2); + assert_eq!(ext.get::(), Some(&A(1))); + assert_eq!(ext.get::(), Some(&B("x"))); + assert!(ext.contains::()); + + let prev = ext.insert(A(2)); + assert_eq!(prev.as_deref(), Some(&A(1))); + assert_eq!(ext.get::(), Some(&A(2))); + + let removed = ext.remove::(); + assert_eq!(removed.as_deref(), Some(&A(2))); + assert!(!ext.contains::()); + } + + #[test] + fn merge_other_wins() { + let mut a = Extensions::new(); + a.insert(A(1)); + let mut b = Extensions::new(); + b.insert(A(2)); + b.insert(B("hi")); + a.merge(&b); + assert_eq!(a.get::(), Some(&A(2))); + assert_eq!(a.get::(), Some(&B("hi"))); + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 996c563f0d8a2..9be0941b5d575 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -43,6 +43,7 @@ pub mod diagnostic; pub mod display; pub mod encryption; pub mod error; +pub mod extensions; pub mod file_options; pub mod format; pub mod hash_utils; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index ae11fa9a11334..31db93eae20fb 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -71,7 +71,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { .into_iter() .map(|meta| { PartitionedFile::new_from_meta(meta) - .with_extensions(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))) + .with_extension(String::from(EXPECTED_USER_DEFINED_METADATA)) }) .collect(); @@ -119,12 +119,8 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { metrics: &ExecutionPlanMetricsSet, ) -> Result> { let metadata = partitioned_file - .extensions - .as_ref() + .extension::() .expect("has user defined metadata"); - let metadata = metadata - .downcast_ref::() - .expect("has string metadata"); assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]); diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index 9ff8137687c95..31be6fd979fd6 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -349,7 +349,7 @@ impl TestFull { // add the access plan, if any, as an extension if let Some(access_plan) = access_plan { - partitioned_file = partitioned_file.with_extensions(Arc::new(access_plan)); + partitioned_file = partitioned_file.with_extension(access_plan); } // Create a DataSourceExec to read the file diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..bbbd298687ab5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -259,7 +259,7 @@ struct PreparedParquetOpen { partition_index: usize, partitioned_file: PartitionedFile, file_range: Option, - extensions: Option>, + extensions: datafusion_datasource::FileExtensions, file_name: String, file_metrics: ParquetFileMetrics, baseline_metrics: BaselineMetrics, @@ -884,7 +884,7 @@ impl FiltersPreparedParquetOpen { // as many row groups as possible based on the metadata and query let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan( &prepared.file_name, - prepared.extensions.clone(), + &prepared.extensions, rg_metadata.len(), )?); @@ -1540,23 +1540,17 @@ impl ParquetMorselizer { /// Note: file_name is only used for error messages fn create_initial_plan( file_name: &str, - extensions: Option>, + extensions: &datafusion_datasource::FileExtensions, row_group_count: usize, ) -> Result { - if let Some(extensions) = extensions { - if let Some(access_plan) = extensions.downcast_ref::() { - let plan_len = access_plan.len(); - if plan_len != row_group_count { - return exec_err!( - "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" - ); - } - - // check row group count matches the plan - return Ok(access_plan.clone()); - } else { - debug!("DataSourceExec Ignoring unknown extension specified for {file_name}"); + if let Some(access_plan) = extensions.get::() { + let plan_len = access_plan.len(); + if plan_len != row_group_count { + return exec_err!( + "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" + ); } + return Ok(access_plan.clone()); } // default to scanning all row groups @@ -2513,7 +2507,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extensions(Arc::new(access_plan)); + .with_extension(access_plan); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() @@ -2614,7 +2608,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extensions(Arc::new(access_plan)); + .with_extension(access_plan); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..fec36c49cc210 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -230,7 +230,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// access_plan.skip(4); /// // provide the plan as extension to the FileScanConfig /// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) -/// .with_extensions(Arc::new(access_plan)); +/// .with_extension(access_plan); /// // create a FileScanConfig to scan this file /// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(ParquetSource::new(schema()))) /// .with_file(partitioned_file).build(); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index a9600271c28ce..b49ad28536a8e 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -67,10 +67,18 @@ pub use table_schema::TableSchema; #[expect(deprecated)] pub use statistics::add_row_stats; pub use statistics::compute_all_files_statistics; +use std::any::Any; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; +/// User-defined per-file extension data, keyed by concrete Rust type. +/// +/// Re-exported from [`datafusion_common::extensions::Extensions`]; the same +/// type backs `SessionConfig::extensions`, `ExtendedStatistics::extensions`, +/// and other extension fields throughout DataFusion. +pub type FileExtensions = datafusion_common::extensions::Extensions; + /// Stream of files get listed from object store #[deprecated( since = "54.0.0", @@ -148,8 +156,10 @@ pub struct PartitionedFile { /// underlying format (for example, Parquet `sorting_columns`), but it may also be set /// explicitly via [`Self::with_ordering`]. pub ordering: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, + /// User-defined per-file metadata, keyed by Rust type. Multiple + /// independent components can each attach their own data here without + /// conflict — see [`FileExtensions`]. + pub extensions: FileExtensions, /// The estimated size of the parquet metadata, in bytes pub metadata_size_hint: Option, } @@ -169,7 +179,7 @@ impl PartitionedFile { range: None, statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } } @@ -182,7 +192,7 @@ impl PartitionedFile { range: None, statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } } @@ -201,7 +211,7 @@ impl PartitionedFile { range: Some(FileRange { start, end }), statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } .with_range(start, end) @@ -257,17 +267,30 @@ impl PartitionedFile { self } - /// Update the user defined extensions for this file. + /// Attach a typed user-defined extension to this file. Multiple + /// independent extensions can be attached, each keyed by its concrete + /// Rust type. Inserting a value of a type that already has an extension + /// replaces the previous one. /// - /// This can be used to pass reader specific information. - pub fn with_extensions( - mut self, - extensions: Arc, - ) -> Self { - self.extensions = Some(extensions); + /// This can be used to pass reader-specific information (e.g. a + /// `ParquetAccessPlan`, or a custom index entry). + pub fn with_extension(mut self, value: T) -> Self { + self.extensions.insert(value); self } + /// Like [`Self::with_extension`] but accepts an already-`Arc`ed value, + /// avoiding an extra allocation. + pub fn with_extension_arc(mut self, value: Arc) -> Self { + self.extensions.insert_arc(value); + self + } + + /// Borrow the extension of type `T`, if one is attached. + pub fn extension(&self) -> Option<&T> { + self.extensions.get::() + } + /// Update the statistics for this file. /// /// The provided `statistics` should cover only the file schema columns. @@ -338,7 +361,7 @@ impl From for PartitionedFile { range: None, statistics: None, ordering: None, - extensions: None, + extensions: FileExtensions::new(), metadata_size_hint: None, } } @@ -535,7 +558,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec>` slot, so two independent components +could not both attach data to the same file without colliding. The field +is now a `FileExtensions` (a re-export of `datafusion_common::extensions::Extensions`), +a map keyed by concrete Rust type. Each type occupies its own slot, so +multiple consumers (e.g. a `ParquetAccessPlan` and a custom index entry) +can coexist on a single `PartitionedFile`. + +The previous `with_extensions(Arc)` builder is +replaced by typed variants: + +```diff +-let pf = PartitionedFile::new(path, size) +- .with_extensions(Arc::new(access_plan)); ++let pf = PartitionedFile::new(path, size) ++ .with_extension(access_plan); +``` + +Reading an extension no longer requires a manual downcast: + +```diff +-let access_plan = partitioned_file +- .extensions +- .as_ref() +- .and_then(|ext| ext.downcast_ref::()); ++let access_plan = partitioned_file.extension::(); +``` + +For an already-`Arc`ed value, use `with_extension_arc` to avoid the extra +allocation. The full API on `FileExtensions` is `insert` / `insert_arc` / +`get` / `get_arc` / `contains` / `remove` / `merge`, all generic over the +concrete type `T`. + +**Who is affected:** + +- Code that constructs `PartitionedFile` and calls `.with_extensions(...)`. +- Custom `ParquetFileReaderFactory` implementations or other consumers that + read `partitioned_file.extensions` and downcast manually. + ### `Box` and `Arc` `TreeNodeContainer` impls now require `C: Default` The generic `TreeNodeContainer` implementations for `Box` and `Arc` now From bcb194ddb3985d24862ebd9c9c5ce49fc8cfbd12 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 2 May 2026 16:58:56 -0500 Subject: [PATCH 2/8] refactor: migrate SessionConfig and ExtendedStatistics to shared Extensions Both SessionConfig.extensions and ExtendedStatistics.extensions were already TypeId-keyed Arc-of-Any maps, duplicating the type added in the previous commit for PartitionedFile.extensions. Replace both with the shared datafusion_common::extensions::Extensions, deleting the inline AnyMap type alias and IdHasher (the IdHasher optimization is preserved inside Extensions). No public API changes: - SessionConfig::with_extension / set_extension / get_extension keep the same signatures. - ExtendedStatistics::get_extension / set_extension / has_extension / merge_extensions keep the same signatures. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/execution/src/config.rs | 61 +++---------------- .../src/operator_statistics/mod.rs | 21 +++---- 2 files changed, 17 insertions(+), 65 deletions(-) diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 6f6071163110b..b2917a4583628 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -15,16 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::{Any, TypeId}, - collections::HashMap, - hash::{BuildHasherDefault, Hasher}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use datafusion_common::{ Result, ScalarValue, config::{ConfigExtension, ConfigOptions, SpillCompression}, + extensions::Extensions, }; /// Configuration options for [`SessionContext`]. @@ -99,19 +95,17 @@ pub struct SessionConfig { /// A new copy is created on write, if there are other outstanding /// references to the same options. options: Arc, - /// Opaque extensions. - extensions: AnyMap, + /// Opaque extensions, keyed by concrete Rust type. See + /// [`with_extension`](Self::with_extension) and + /// [`get_extension`](Self::get_extension). + extensions: Extensions, } impl Default for SessionConfig { fn default() -> Self { Self { options: Arc::new(ConfigOptions::new()), - // Assume no extensions by default. - extensions: HashMap::with_capacity_and_hasher( - 0, - BuildHasherDefault::default(), - ), + extensions: Extensions::new(), } } } @@ -602,9 +596,7 @@ impl SessionConfig { where T: Send + Sync + 'static, { - let ext = ext as Arc; - let id = TypeId::of::(); - self.extensions.insert(id, ext); + self.extensions.insert_arc(ext); } /// Get extension, if any for the specified type `T` exists. @@ -614,11 +606,7 @@ impl SessionConfig { where T: Send + Sync + 'static, { - let id = TypeId::of::(); - self.extensions - .get(&id) - .cloned() - .map(|ext| Arc::downcast(ext).expect("TypeId unique")) + self.extensions.get_arc::() } } @@ -631,34 +619,3 @@ impl From for SessionConfig { } } } - -/// Map that holds opaque objects indexed by their type. -/// -/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe]. -/// -/// [object safe]: https://doc.rust-lang.org/reference/items/traits.html#object-safety -type AnyMap = - HashMap, BuildHasherDefault>; - -/// Hasher for [`AnyMap`]. -/// -/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, coming from the compiler. -/// The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then returns it, instead of doing any bit fiddling. -#[derive(Default)] -struct IdHasher(u64); - -impl Hasher for IdHasher { - fn write(&mut self, _: &[u8]) { - unreachable!("TypeId calls write_u64"); - } - - #[inline] - fn write_u64(&mut self, id: u64) { - self.0 = id; - } - - #[inline] - fn finish(&self) -> u64 { - self.0 - } -} diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index 20266e9768ebe..b577482ba4f24 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -86,11 +86,10 @@ //! let stats = registry.compute(plan.as_ref())?; //! ``` -use std::any::{Any, TypeId}; -use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; +use datafusion_common::extensions::Extensions; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; @@ -128,7 +127,7 @@ pub struct ExtendedStatistics { /// Standard statistics (num_rows, byte_size, column stats) base: Arc, /// Type-erased extensions for custom statistics - extensions: HashMap>, + extensions: Extensions, } impl ExtendedStatistics { @@ -136,7 +135,7 @@ impl ExtendedStatistics { pub fn new(base: Statistics) -> Self { Self { base: Arc::new(base), - extensions: HashMap::new(), + extensions: Extensions::new(), } } @@ -144,7 +143,7 @@ impl ExtendedStatistics { pub fn new_arc(base: Arc) -> Self { Self { base, - extensions: HashMap::new(), + extensions: Extensions::new(), } } @@ -160,26 +159,22 @@ impl ExtendedStatistics { /// Get a reference to a custom statistics extension by type. pub fn get_extension(&self) -> Option<&T> { - self.extensions - .get(&TypeId::of::()) - .and_then(|ext| ext.downcast_ref()) + self.extensions.get::() } /// Set a custom statistics extension. pub fn set_extension(&mut self, value: T) { - self.extensions.insert(TypeId::of::(), Arc::new(value)); + self.extensions.insert(value); } /// Check if an extension of the given type exists. pub fn has_extension(&self) -> bool { - self.extensions.contains_key(&TypeId::of::()) + self.extensions.contains::() } /// Merge extensions from another ExtendedStatistics (other's extensions take precedence). pub fn merge_extensions(&mut self, other: &ExtendedStatistics) { - for (type_id, ext) in &other.extensions { - self.extensions.insert(*type_id, Arc::clone(ext)); - } + self.extensions.merge(&other.extensions); } } From da8b73d45013daf6f03f2edf6dc02ddb66905f9a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 3 May 2026 07:11:53 -0500 Subject: [PATCH 3/8] address review feedback: simplify Extensions API and add deprecated shim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Collapse Extensions::insert / insert_arc into a single insert(Arc). Values are always Arc'd internally (the map needs Clone, dyn Any isn't), but the call site signature is now consistent across all consumers (PartitionedFile, SessionConfig, ExtendedStatistics) — caller wraps in Arc explicitly. - Drop PartitionedFile::with_extension_arc; the single with_extension takes Arc directly. - Change ExtendedStatistics::set_extension(value: T) to take Arc for the same reason. - Re-add PartitionedFile::with_extensions(Arc) as a #[deprecated] shim that keys by the value's dynamic TypeId, restoring SemVer compatibility for code targeting 53.x. Backed by a new Extensions::insert_dyn(Arc) helper. - Update parquet call sites (test, example, docstring) to wrap in Arc. - Tweak the upgrade guide entry to reflect the deprecation and final API. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../data_io/parquet_advanced_index.rs | 2 +- datafusion/common/src/extensions.rs | 43 ++++++++++++------- .../core/tests/parquet/custom_reader.rs | 2 +- .../tests/parquet/external_access_plan.rs | 2 +- datafusion/datasource-parquet/src/opener.rs | 4 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/mod.rs | 22 ++++++---- datafusion/execution/src/config.rs | 2 +- .../src/operator_statistics/mod.rs | 2 +- .../library-user-guide/upgrading/54.0.0.md | 22 +++++----- 10 files changed, 62 insertions(+), 41 deletions(-) diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index 9bdcda265ea7e..ec817c3968660 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -476,7 +476,7 @@ impl TableProvider for IndexTableProvider { .partitioned_file() // provide the starting access plan to the DataSourceExec by // storing it as "extensions" on PartitionedFile - .with_extension(access_plan); + .with_extension(Arc::new(access_plan)); // Prepare for scanning let schema = self.schema(); diff --git a/datafusion/common/src/extensions.rs b/datafusion/common/src/extensions.rs index fc5650791b7a8..1500da8f585bd 100644 --- a/datafusion/common/src/extensions.rs +++ b/datafusion/common/src/extensions.rs @@ -46,8 +46,8 @@ use std::sync::Arc; /// struct OtherData(&'static str); /// /// let mut ext = Extensions::new(); -/// ext.insert(MyData(42)); -/// ext.insert(OtherData("hello")); +/// ext.insert(Arc::new(MyData(42))); +/// ext.insert(Arc::new(OtherData("hello"))); /// /// assert_eq!(ext.get::().unwrap().0, 42); /// assert_eq!(ext.get::().unwrap().0, "hello"); @@ -75,18 +75,23 @@ impl Extensions { /// Insert an extension keyed by its concrete type `T`. Returns the /// previous value of that type, if any. - pub fn insert(&mut self, value: T) -> Option> { - self.insert_arc(Arc::new(value)) - } - - /// Insert an already-`Arc`ed extension keyed by `T`. Returns the - /// previous value of that type, if any. - pub fn insert_arc(&mut self, value: Arc) -> Option> { + pub fn insert(&mut self, value: Arc) -> Option> { self.inner .insert(TypeId::of::(), value) .map(|p| Arc::downcast::(p).expect("TypeId matches T")) } + /// Insert an already-type-erased value, keyed by its dynamic + /// [`TypeId`]. Used by APIs that accept `Arc` + /// and need to recover the concrete type for keying. + pub fn insert_dyn( + &mut self, + value: Arc, + ) -> Option> { + let id = (*value).type_id(); + self.inner.insert(id, value) + } + /// Borrow the extension of type `T`, if set. pub fn get(&self) -> Option<&T> { self.inner @@ -159,14 +164,14 @@ mod tests { let mut ext = Extensions::new(); assert!(ext.is_empty()); - ext.insert(A(1)); - ext.insert(B("x")); + ext.insert(Arc::new(A(1))); + ext.insert(Arc::new(B("x"))); assert_eq!(ext.len(), 2); assert_eq!(ext.get::(), Some(&A(1))); assert_eq!(ext.get::(), Some(&B("x"))); assert!(ext.contains::()); - let prev = ext.insert(A(2)); + let prev = ext.insert(Arc::new(A(2))); assert_eq!(prev.as_deref(), Some(&A(1))); assert_eq!(ext.get::(), Some(&A(2))); @@ -175,13 +180,21 @@ mod tests { assert!(!ext.contains::()); } + #[test] + fn insert_dyn_keys_by_concrete_type() { + let mut ext = Extensions::new(); + let erased: Arc = Arc::new(A(7)); + ext.insert_dyn(erased); + assert_eq!(ext.get::(), Some(&A(7))); + } + #[test] fn merge_other_wins() { let mut a = Extensions::new(); - a.insert(A(1)); + a.insert(Arc::new(A(1))); let mut b = Extensions::new(); - b.insert(A(2)); - b.insert(B("hi")); + b.insert(Arc::new(A(2))); + b.insert(Arc::new(B("hi"))); a.merge(&b); assert_eq!(a.get::(), Some(&A(2))); assert_eq!(a.get::(), Some(&B("hi"))); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 31db93eae20fb..eaca5339342f9 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -71,7 +71,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { .into_iter() .map(|meta| { PartitionedFile::new_from_meta(meta) - .with_extension(String::from(EXPECTED_USER_DEFINED_METADATA)) + .with_extension(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))) }) .collect(); diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index 31be6fd979fd6..ea703465d4e3d 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -349,7 +349,7 @@ impl TestFull { // add the access plan, if any, as an extension if let Some(access_plan) = access_plan { - partitioned_file = partitioned_file.with_extension(access_plan); + partitioned_file = partitioned_file.with_extension(Arc::new(access_plan)); } // Create a DataSourceExec to read the file diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bbbd298687ab5..80c12669ddf5c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -2507,7 +2507,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extension(access_plan); + .with_extension(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() @@ -2608,7 +2608,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extension(access_plan); + .with_extension(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index fec36c49cc210..979b5c54316fc 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -230,7 +230,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// access_plan.skip(4); /// // provide the plan as extension to the FileScanConfig /// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) -/// .with_extension(access_plan); +/// .with_extension(Arc::new(access_plan)); /// // create a FileScanConfig to scan this file /// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(ParquetSource::new(schema()))) /// .with_file(partitioned_file).build(); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index b49ad28536a8e..eef8c8b853326 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -274,23 +274,29 @@ impl PartitionedFile { /// /// This can be used to pass reader-specific information (e.g. a /// `ParquetAccessPlan`, or a custom index entry). - pub fn with_extension(mut self, value: T) -> Self { + pub fn with_extension(mut self, value: Arc) -> Self { self.extensions.insert(value); self } - /// Like [`Self::with_extension`] but accepts an already-`Arc`ed value, - /// avoiding an extra allocation. - pub fn with_extension_arc(mut self, value: Arc) -> Self { - self.extensions.insert_arc(value); - self - } - /// Borrow the extension of type `T`, if one is attached. pub fn extension(&self) -> Option<&T> { self.extensions.get::() } + /// Attach a type-erased extension to this file. + /// + /// Kept as a backwards-compatible shim; prefer [`Self::with_extension`] + /// which keys the extension by its concrete Rust type at the call site. + #[deprecated( + since = "54.0.0", + note = "use `with_extension`; the extension is keyed by its concrete type" + )] + pub fn with_extensions(mut self, extensions: Arc) -> Self { + self.extensions.insert_dyn(extensions); + self + } + /// Update the statistics for this file. /// /// The provided `statistics` should cover only the file schema columns. diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index b2917a4583628..8cb1c70d14aeb 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -596,7 +596,7 @@ impl SessionConfig { where T: Send + Sync + 'static, { - self.extensions.insert_arc(ext); + self.extensions.insert(ext); } /// Get extension, if any for the specified type `T` exists. diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index b577482ba4f24..c9ade4528a8d5 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -163,7 +163,7 @@ impl ExtendedStatistics { } /// Set a custom statistics extension. - pub fn set_extension(&mut self, value: T) { + pub fn set_extension(&mut self, value: Arc) { self.extensions.insert(value); } diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 46d1fd94c17ad..709964cfb478b 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -441,19 +441,21 @@ SELECT CAST(approx_percentile_cont(quantity, 0.5) AS BIGINT) FROM orders; `PartitionedFile.extensions` previously held a single `Option>` slot, so two independent components could not both attach data to the same file without colliding. The field -is now a `FileExtensions` (a re-export of `datafusion_common::extensions::Extensions`), -a map keyed by concrete Rust type. Each type occupies its own slot, so -multiple consumers (e.g. a `ParquetAccessPlan` and a custom index entry) -can coexist on a single `PartitionedFile`. +is now a `FileExtensions` (a re-export of +`datafusion_common::extensions::Extensions`), a map keyed by concrete Rust +type. Each type occupies its own slot, so multiple consumers (e.g. a +`ParquetAccessPlan` and a custom index entry) can coexist on a single +`PartitionedFile`. The previous `with_extensions(Arc)` builder is -replaced by typed variants: +deprecated (it still works, keyed by the value's dynamic `TypeId`) in +favor of a typed variant: ```diff -let pf = PartitionedFile::new(path, size) - .with_extensions(Arc::new(access_plan)); +let pf = PartitionedFile::new(path, size) -+ .with_extension(access_plan); ++ .with_extension(Arc::new(access_plan)); ``` Reading an extension no longer requires a manual downcast: @@ -466,10 +468,10 @@ Reading an extension no longer requires a manual downcast: +let access_plan = partitioned_file.extension::(); ``` -For an already-`Arc`ed value, use `with_extension_arc` to avoid the extra -allocation. The full API on `FileExtensions` is `insert` / `insert_arc` / -`get` / `get_arc` / `contains` / `remove` / `merge`, all generic over the -concrete type `T`. +The full API on `FileExtensions` is `insert` / `insert_dyn` / `get` / +`get_arc` / `contains` / `remove` / `merge`, all generic over the +concrete type `T`. Values are stored as `Arc` so the map remains cheap +to clone. **Who is affected:** From bc0a3e030bec68b215636836cf98092fc48cddc1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 08:29:15 -0500 Subject: [PATCH 4/8] api: keep T-taking shape for set_extension/with_extension, add _arc variants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restores `ExtendedStatistics::set_extension(value: T)` (was source-breaking in this PR's previous revision when it switched to `Arc`) and applies the same pattern to `PartitionedFile::with_extension`. Callers who already have an `Arc` and want to skip the extra allocation can use the new `set_extension_arc` / `with_extension_arc` variants. `Extensions` itself gains the same split: `insert(T)` + `insert_arc(Arc)`. A single method that accepts both `T` and `Arc` is not expressible in stable Rust — both `impl Into>` and a custom trait with two blanket impls leave the compiler unable to choose between `T = X` and `T = Arc` when given an `Arc`, and resolving that requires specialization. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../data_io/parquet_advanced_index.rs | 2 +- datafusion/common/src/extensions.rs | 29 +++++++++++++------ .../core/tests/parquet/custom_reader.rs | 2 +- .../tests/parquet/external_access_plan.rs | 2 +- datafusion/datasource-parquet/src/opener.rs | 4 +-- datafusion/datasource/src/mod.rs | 13 ++++++++- datafusion/execution/src/config.rs | 2 +- .../src/operator_statistics/mod.rs | 11 ++++++- .../library-user-guide/upgrading/54.0.0.md | 18 ++++++++---- 9 files changed, 61 insertions(+), 22 deletions(-) diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index ec817c3968660..9bdcda265ea7e 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -476,7 +476,7 @@ impl TableProvider for IndexTableProvider { .partitioned_file() // provide the starting access plan to the DataSourceExec by // storing it as "extensions" on PartitionedFile - .with_extension(Arc::new(access_plan)); + .with_extension(access_plan); // Prepare for scanning let schema = self.schema(); diff --git a/datafusion/common/src/extensions.rs b/datafusion/common/src/extensions.rs index 1500da8f585bd..2fe1fbebce070 100644 --- a/datafusion/common/src/extensions.rs +++ b/datafusion/common/src/extensions.rs @@ -46,8 +46,8 @@ use std::sync::Arc; /// struct OtherData(&'static str); /// /// let mut ext = Extensions::new(); -/// ext.insert(Arc::new(MyData(42))); -/// ext.insert(Arc::new(OtherData("hello"))); +/// ext.insert(MyData(42)); +/// ext.insert_arc(Arc::new(OtherData("hello"))); /// /// assert_eq!(ext.get::().unwrap().0, 42); /// assert_eq!(ext.get::().unwrap().0, "hello"); @@ -75,7 +75,18 @@ impl Extensions { /// Insert an extension keyed by its concrete type `T`. Returns the /// previous value of that type, if any. - pub fn insert(&mut self, value: Arc) -> Option> { + /// + /// The value is wrapped in an [`Arc`] internally. If the caller already + /// has an `Arc` and wants to avoid an extra allocation, use + /// [`Self::insert_arc`]. + pub fn insert(&mut self, value: T) -> Option> { + self.insert_arc(Arc::new(value)) + } + + /// Insert an extension keyed by its concrete type `T`, taking an + /// already-allocated [`Arc`]. Returns the previous value of that type, + /// if any. + pub fn insert_arc(&mut self, value: Arc) -> Option> { self.inner .insert(TypeId::of::(), value) .map(|p| Arc::downcast::(p).expect("TypeId matches T")) @@ -164,14 +175,14 @@ mod tests { let mut ext = Extensions::new(); assert!(ext.is_empty()); - ext.insert(Arc::new(A(1))); - ext.insert(Arc::new(B("x"))); + ext.insert(A(1)); + ext.insert_arc(Arc::new(B("x"))); assert_eq!(ext.len(), 2); assert_eq!(ext.get::(), Some(&A(1))); assert_eq!(ext.get::(), Some(&B("x"))); assert!(ext.contains::()); - let prev = ext.insert(Arc::new(A(2))); + let prev = ext.insert(A(2)); assert_eq!(prev.as_deref(), Some(&A(1))); assert_eq!(ext.get::(), Some(&A(2))); @@ -191,10 +202,10 @@ mod tests { #[test] fn merge_other_wins() { let mut a = Extensions::new(); - a.insert(Arc::new(A(1))); + a.insert(A(1)); let mut b = Extensions::new(); - b.insert(Arc::new(A(2))); - b.insert(Arc::new(B("hi"))); + b.insert(A(2)); + b.insert(B("hi")); a.merge(&b); assert_eq!(a.get::(), Some(&A(2))); assert_eq!(a.get::(), Some(&B("hi"))); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index eaca5339342f9..31db93eae20fb 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -71,7 +71,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { .into_iter() .map(|meta| { PartitionedFile::new_from_meta(meta) - .with_extension(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))) + .with_extension(String::from(EXPECTED_USER_DEFINED_METADATA)) }) .collect(); diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index ea703465d4e3d..31be6fd979fd6 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -349,7 +349,7 @@ impl TestFull { // add the access plan, if any, as an extension if let Some(access_plan) = access_plan { - partitioned_file = partitioned_file.with_extension(Arc::new(access_plan)); + partitioned_file = partitioned_file.with_extension(access_plan); } // Create a DataSourceExec to read the file diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 80c12669ddf5c..bbbd298687ab5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -2507,7 +2507,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extension(Arc::new(access_plan)); + .with_extension(access_plan); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() @@ -2608,7 +2608,7 @@ mod test { "test.parquet".to_string(), u64::try_from(data_len).unwrap(), ) - .with_extension(Arc::new(access_plan)); + .with_extension(access_plan); let make_opener = |reverse_scan: bool| { ParquetMorselizerBuilder::new() diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index eef8c8b853326..b9588b405a136 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -274,11 +274,22 @@ impl PartitionedFile { /// /// This can be used to pass reader-specific information (e.g. a /// `ParquetAccessPlan`, or a custom index entry). - pub fn with_extension(mut self, value: Arc) -> Self { + /// + /// Wraps the value in an [`Arc`] internally. If the caller already has an + /// `Arc` and wants to avoid an extra allocation, use + /// [`Self::with_extension_arc`]. + pub fn with_extension(mut self, value: T) -> Self { self.extensions.insert(value); self } + /// Attach a typed user-defined extension to this file from an + /// already-allocated [`Arc`]. See [`Self::with_extension`]. + pub fn with_extension_arc(mut self, value: Arc) -> Self { + self.extensions.insert_arc(value); + self + } + /// Borrow the extension of type `T`, if one is attached. pub fn extension(&self) -> Option<&T> { self.extensions.get::() diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 8cb1c70d14aeb..b2917a4583628 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -596,7 +596,7 @@ impl SessionConfig { where T: Send + Sync + 'static, { - self.extensions.insert(ext); + self.extensions.insert_arc(ext); } /// Get extension, if any for the specified type `T` exists. diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index c9ade4528a8d5..288baeaf30db9 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -163,10 +163,19 @@ impl ExtendedStatistics { } /// Set a custom statistics extension. - pub fn set_extension(&mut self, value: Arc) { + /// + /// Wraps the value in an [`Arc`] internally. If the caller already has an + /// `Arc` and wants to avoid an extra allocation, use + /// [`Self::set_extension_arc`]. + pub fn set_extension(&mut self, value: T) { self.extensions.insert(value); } + /// Set a custom statistics extension from an already-allocated [`Arc`]. + pub fn set_extension_arc(&mut self, value: Arc) { + self.extensions.insert_arc(value); + } + /// Check if an extension of the given type exists. pub fn has_extension(&self) -> bool { self.extensions.contains::() diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 709964cfb478b..43c11abf3b76e 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -455,7 +455,15 @@ favor of a typed variant: -let pf = PartitionedFile::new(path, size) - .with_extensions(Arc::new(access_plan)); +let pf = PartitionedFile::new(path, size) -+ .with_extension(Arc::new(access_plan)); ++ .with_extension(access_plan); +``` + +If you already have an `Arc` and want to avoid an extra allocation, +use `with_extension_arc`: + +```rust,ignore +let pf = PartitionedFile::new(path, size) + .with_extension_arc(Arc::clone(&shared_access_plan)); ``` Reading an extension no longer requires a manual downcast: @@ -468,10 +476,10 @@ Reading an extension no longer requires a manual downcast: +let access_plan = partitioned_file.extension::(); ``` -The full API on `FileExtensions` is `insert` / `insert_dyn` / `get` / -`get_arc` / `contains` / `remove` / `merge`, all generic over the -concrete type `T`. Values are stored as `Arc` so the map remains cheap -to clone. +The full API on `FileExtensions` is `insert` / `insert_arc` / +`insert_dyn` / `get` / `get_arc` / `contains` / `remove` / `merge`, all +generic over the concrete type `T`. Values are stored as `Arc` so the +map remains cheap to clone. **Who is affected:** From 75b6e3d7c621bcf4fd8184a0e8600fd01fb0cc3f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 08:32:42 -0500 Subject: [PATCH 5/8] test: add regression test for coexisting per-file extensions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Attaches both a custom reader payload (a `String` checked inside `InMemoryParquetFileReaderFactory`) and a `ParquetAccessPlan` (skip the first of two row groups) to the same `PartitionedFile`, then asserts that (a) the factory still sees its payload and (b) only the second row group's rows come through. Either consumer overwriting the other's slot would break this end-to-end — the per-consumer tests elsewhere catch the slots in isolation but not the coexistence invariant this PR is meant to guarantee. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/tests/parquet/custom_reader.rs | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 31db93eae20fb..31337ed5d8485 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -38,6 +38,7 @@ use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource_parquet::metadata::DFParquetMetadata; +use datafusion_datasource_parquet::{ParquetAccessPlan, RowGroupAccess}; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -49,6 +50,7 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::errors::ParquetError; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata"; @@ -107,6 +109,82 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { "); } +/// Regression test for the type-keyed extensions map: independent components +/// must be able to attach their own per-file payloads on the same +/// [`PartitionedFile`] without colliding. Here we attach a custom reader +/// payload (the `String` checked by [`InMemoryParquetFileReaderFactory`]) +/// *and* a [`ParquetAccessPlan`] that skips the first row group, then verify +/// (a) the factory still sees its payload (its internal `assert_eq!` would +/// fire if the slot got overwritten) and (b) the access plan is honored — so +/// only the second row group's 5 rows come out, not all 10. +#[tokio::test] +async fn custom_payload_and_access_plan_coexist() { + // Two row groups of 5 rows each: values 0..=4 in row group 0, 5..=9 in + // row group 1. + let c1: ArrayRef = Arc::new(Int64Array::from((0..10).collect::>())); + let batch = create_batch(vec![("c1", c1)]); + let file_schema = batch.schema().clone(); + + let in_memory = InMemory::new(); + let mut buf = Vec::::with_capacity(32 * 1024); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(5)) + .build(); + let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let location = Path::parse("two-row-groups.parquet").unwrap(); + let size = buf.len() as u64; + in_memory + .put(&location, Bytes::from(buf).into()) + .await + .unwrap(); + let meta = ObjectMeta { + location, + last_modified: chrono::DateTime::from(SystemTime::now()), + size, + e_tag: None, + version: None, + }; + + let access_plan = + ParquetAccessPlan::new(vec![RowGroupAccess::Skip, RowGroupAccess::Scan]); + let pf = PartitionedFile::new_from_meta(meta) + .with_extension(String::from(EXPECTED_USER_DEFINED_METADATA)) + .with_extension(access_plan); + + let store: Arc = Arc::new(in_memory); + let source = Arc::new( + ParquetSource::new(file_schema.clone()).with_parquet_file_reader_factory( + Arc::new(InMemoryParquetFileReaderFactory(Arc::clone(&store))), + ), + ); + let base_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) + .with_file_group(vec![pf].into()) + .build(); + let parquet_exec = DataSourceExec::from_data_source(base_config); + + let session_ctx = SessionContext::new(); + let read = collect(parquet_exec, session_ctx.task_ctx()).await.unwrap(); + + let total: usize = read.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total, 5, + "access plan should have skipped the first row group" + ); + + let values: Vec = read + .iter() + .flat_map(|b| { + let arr = b.column(0).as_any().downcast_ref::().unwrap(); + (0..arr.len()).map(|i| arr.value(i)).collect::>() + }) + .collect(); + assert_eq!(values, vec![5, 6, 7, 8, 9]); +} + #[derive(Debug)] struct InMemoryParquetFileReaderFactory(Arc); From db3cc1a52a7132ee1a31aaff3bf2737f524db7d3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 09:16:23 -0500 Subject: [PATCH 6/8] =?UTF-8?q?api:=20trim=20Extensions=20surface=20?= =?UTF-8?q?=E2=80=94=20drop=20unused/speculative=20methods?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove `Extensions::remove`: no callers anywhere in the tree. - Remove `ExtendedStatistics::set_extension_arc` and `PartitionedFile::with_extension_arc`: speculative `_arc` variants the reviewer suggested adding only when callers actually need to skip the Arc allocation. None do today. - Mark `Extensions::insert_dyn` as `#[deprecated]`: it exists solely to back the deprecated `PartitionedFile::with_extensions(Arc)` shim. Marking it deprecated discourages new callers and lets it leave with the shim later. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/common/src/extensions.rs | 26 +++++++++---------- datafusion/datasource/src/mod.rs | 12 +-------- .../src/operator_statistics/mod.rs | 9 ------- .../library-user-guide/upgrading/54.0.0.md | 15 +++-------- 4 files changed, 16 insertions(+), 46 deletions(-) diff --git a/datafusion/common/src/extensions.rs b/datafusion/common/src/extensions.rs index 2fe1fbebce070..b4651e23acddb 100644 --- a/datafusion/common/src/extensions.rs +++ b/datafusion/common/src/extensions.rs @@ -93,8 +93,16 @@ impl Extensions { } /// Insert an already-type-erased value, keyed by its dynamic - /// [`TypeId`]. Used by APIs that accept `Arc` - /// and need to recover the concrete type for keying. + /// [`TypeId`]. Used internally to support APIs that accept + /// `Arc` for backwards compatibility and need + /// to recover the concrete type for keying. + /// + /// New code should use [`Self::insert`] or [`Self::insert_arc`], which + /// preserve the concrete type at the call site. + #[deprecated( + since = "54.0.0", + note = "use `insert` or `insert_arc`; only retained to support the deprecated `PartitionedFile::with_extensions` shim" + )] pub fn insert_dyn( &mut self, value: Arc, @@ -122,13 +130,6 @@ impl Extensions { self.inner.contains_key(&TypeId::of::()) } - /// Remove and return the extension of type `T`, if set. - pub fn remove(&mut self) -> Option> { - self.inner - .remove(&TypeId::of::()) - .map(|p| Arc::downcast::(p).expect("TypeId matches T")) - } - /// Merge entries from `other` into `self`. Entries in `other` take /// precedence over existing entries with the same type. pub fn merge(&mut self, other: &Extensions) { @@ -171,7 +172,7 @@ mod tests { struct B(&'static str); #[test] - fn insert_get_remove() { + fn insert_get_replace() { let mut ext = Extensions::new(); assert!(ext.is_empty()); @@ -185,13 +186,10 @@ mod tests { let prev = ext.insert(A(2)); assert_eq!(prev.as_deref(), Some(&A(1))); assert_eq!(ext.get::(), Some(&A(2))); - - let removed = ext.remove::(); - assert_eq!(removed.as_deref(), Some(&A(2))); - assert!(!ext.contains::()); } #[test] + #[allow(deprecated)] fn insert_dyn_keys_by_concrete_type() { let mut ext = Extensions::new(); let erased: Arc = Arc::new(A(7)); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index b9588b405a136..96c6ffea09da3 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -274,22 +274,11 @@ impl PartitionedFile { /// /// This can be used to pass reader-specific information (e.g. a /// `ParquetAccessPlan`, or a custom index entry). - /// - /// Wraps the value in an [`Arc`] internally. If the caller already has an - /// `Arc` and wants to avoid an extra allocation, use - /// [`Self::with_extension_arc`]. pub fn with_extension(mut self, value: T) -> Self { self.extensions.insert(value); self } - /// Attach a typed user-defined extension to this file from an - /// already-allocated [`Arc`]. See [`Self::with_extension`]. - pub fn with_extension_arc(mut self, value: Arc) -> Self { - self.extensions.insert_arc(value); - self - } - /// Borrow the extension of type `T`, if one is attached. pub fn extension(&self) -> Option<&T> { self.extensions.get::() @@ -304,6 +293,7 @@ impl PartitionedFile { note = "use `with_extension`; the extension is keyed by its concrete type" )] pub fn with_extensions(mut self, extensions: Arc) -> Self { + #[allow(deprecated)] self.extensions.insert_dyn(extensions); self } diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index 288baeaf30db9..b577482ba4f24 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -163,19 +163,10 @@ impl ExtendedStatistics { } /// Set a custom statistics extension. - /// - /// Wraps the value in an [`Arc`] internally. If the caller already has an - /// `Arc` and wants to avoid an extra allocation, use - /// [`Self::set_extension_arc`]. pub fn set_extension(&mut self, value: T) { self.extensions.insert(value); } - /// Set a custom statistics extension from an already-allocated [`Arc`]. - pub fn set_extension_arc(&mut self, value: Arc) { - self.extensions.insert_arc(value); - } - /// Check if an extension of the given type exists. pub fn has_extension(&self) -> bool { self.extensions.contains::() diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 43c11abf3b76e..f92979a6642b7 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -458,14 +458,6 @@ favor of a typed variant: + .with_extension(access_plan); ``` -If you already have an `Arc` and want to avoid an extra allocation, -use `with_extension_arc`: - -```rust,ignore -let pf = PartitionedFile::new(path, size) - .with_extension_arc(Arc::clone(&shared_access_plan)); -``` - Reading an extension no longer requires a manual downcast: ```diff @@ -476,10 +468,9 @@ Reading an extension no longer requires a manual downcast: +let access_plan = partitioned_file.extension::(); ``` -The full API on `FileExtensions` is `insert` / `insert_arc` / -`insert_dyn` / `get` / `get_arc` / `contains` / `remove` / `merge`, all -generic over the concrete type `T`. Values are stored as `Arc` so the -map remains cheap to clone. +The `FileExtensions` API is `insert` / `insert_arc` / `get` / `get_arc` +/ `contains` / `merge`, all generic over the concrete type `T`. Values +are stored as `Arc` so the map remains cheap to clone. **Who is affected:** From 8a5439114dc285a32451f643bef79a8eef133aaa Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 4 May 2026 09:41:42 -0500 Subject: [PATCH 7/8] fix(clippy): use #[expect(deprecated)] over #[allow(deprecated)] DataFusion's clippy config enables `clippy::allow-attributes`, which mandates `#[expect(...)]` (so the suppression is automatically reported when no longer needed) instead of `#[allow(...)]`. Local `cargo clippy --all-targets --all-features -- -D warnings` did not catch this; only CI's `./dev/rust_lint.sh` lint pipeline does. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/common/src/extensions.rs | 2 +- datafusion/datasource/src/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/extensions.rs b/datafusion/common/src/extensions.rs index b4651e23acddb..d5a21fb6566ec 100644 --- a/datafusion/common/src/extensions.rs +++ b/datafusion/common/src/extensions.rs @@ -189,7 +189,7 @@ mod tests { } #[test] - #[allow(deprecated)] + #[expect(deprecated)] fn insert_dyn_keys_by_concrete_type() { let mut ext = Extensions::new(); let erased: Arc = Arc::new(A(7)); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 96c6ffea09da3..c5f9d722006d1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -293,7 +293,7 @@ impl PartitionedFile { note = "use `with_extension`; the extension is keyed by its concrete type" )] pub fn with_extensions(mut self, extensions: Arc) -> Self { - #[allow(deprecated)] + #[expect(deprecated)] self.extensions.insert_dyn(extensions); self } From aa1f55176fc8cb9433a1eea2399cad1ec862ab6c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 5 May 2026 06:30:48 -0500 Subject: [PATCH 8/8] docs: pass `access_plan` directly in `with_extension` example `with_extension` keys by the concrete type `T`, so wrapping the plan in `Arc::new(...)` would store it as `Arc`. The downstream lookup is `extensions.get::()`, which would miss the wrapped value and scan all row groups. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/datasource-parquet/src/source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 979b5c54316fc..fec36c49cc210 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -230,7 +230,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// access_plan.skip(4); /// // provide the plan as extension to the FileScanConfig /// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) -/// .with_extension(Arc::new(access_plan)); +/// .with_extension(access_plan); /// // create a FileScanConfig to scan this file /// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(ParquetSource::new(schema()))) /// .with_file(partitioned_file).build();