diff --git a/Cargo.lock b/Cargo.lock index de1da5205d891..d0d07a2a2e830 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2003,6 +2003,7 @@ name = "datafusion-datasource-parquet" version = "53.1.0" dependencies = [ "arrow", + "arrow-schema", "async-trait", "bytes", "chrono", diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index a5855af17a536..8aa6ca1f97721 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -32,6 +32,7 @@ all-features = true [dependencies] arrow = { workspace = true } +arrow-schema = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } datafusion-common = { workspace = true, features = ["object_store", "parquet"] } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..ecfae78cf1a0f 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -36,6 +36,7 @@ mod row_group_filter; mod sort; pub mod source; mod supported_predicates; +mod virtual_column; mod writer; pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; @@ -46,4 +47,5 @@ pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; +pub use virtual_column::ParquetVirtualColumn; pub use writer::plan_to_parquet; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..a35f1c23615f9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -22,7 +22,8 @@ use crate::row_filter::build_projection_read_plan; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + ParquetVirtualColumn, apply_file_schema_type_coercions, coerce_int96_to_resolution, + row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; @@ -38,13 +39,16 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, + ColumnStatistics, DataFusionError, HashSet, Result, ScalarValue, Statistics, + exec_err, not_impl_err, }; use datafusion_datasource::{PartitionedFile, TableSchema}; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ @@ -274,6 +278,10 @@ struct PreparedParquetOpen { output_schema: SchemaRef, projection: ProjectionExprs, predicate: Option>, + /// Virtual columns (e.g. parquet `row_number`) to be produced by the reader + /// in addition to the file's own columns. Empty when no virtual columns + /// were requested by the caller. + virtual_columns: Vec, reorder_predicates: bool, pushdown_filters: bool, force_filter_selections: bool, @@ -535,6 +543,15 @@ impl ParquetMorselizer { &self, partitioned_file: PartitionedFile, ) -> Result { + validate_supported_virtual_columns(self.table_schema.virtual_columns())?; + if self.pushdown_filters + && let Some(predicate) = self.predicate.as_ref() + { + validate_predicate_does_not_reference_virtual_columns( + predicate, + self.table_schema.virtual_columns(), + )?; + } let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); let file_name = partitioned_file.object_meta.location.to_string(); @@ -643,6 +660,7 @@ impl ParquetMorselizer { output_schema, projection, predicate, + virtual_columns: self.table_schema.virtual_columns().clone(), reorder_predicates: self.reorder_filters, pushdown_filters: self.pushdown_filters, force_filter_selections: self.force_filter_selections, @@ -758,22 +776,22 @@ impl MetadataLoadedParquetOpen { // - The logical file schema: this is the table schema minus any hive partition columns and projections. // This is what the physical file schema is coerced to. // - The physical file schema: this is the schema that the arrow-rs - // parquet reader will actually produce. + // parquet reader will actually produce for the file's columns. Any + // virtual columns (see [`crate::TableSchema::virtual_columns`]) are + // produced separately by the reader and are not part of this schema. let mut physical_file_schema = Arc::clone(reader_metadata.schema()); // The schema loaded from the file may not be the same as the // desired schema (for example if we want to instruct the parquet // reader to read strings using Utf8View instead). Update if necessary + let mut metadata_dirty = false; if let Some(merged) = apply_file_schema_type_coercions( &prepared.logical_file_schema, &physical_file_schema, ) { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); - reader_metadata = ArrowReaderMetadata::try_new( - Arc::clone(reader_metadata.metadata()), - options.clone(), - )?; + metadata_dirty = true; } if let Some(ref coerce) = prepared.coerce_int96 @@ -785,6 +803,17 @@ impl MetadataLoadedParquetOpen { { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); + metadata_dirty = true; + } + + // Arrow-rs appends virtual columns to the supplied schema internally, + // so any `with_schema` coercion above must stay limited to file columns. + if !prepared.virtual_columns.is_empty() { + options = options.with_virtual_columns(prepared.virtual_columns.clone())?; + metadata_dirty = true; + } + + if metadata_dirty { reader_metadata = ArrowReaderMetadata::try_new( Arc::clone(reader_metadata.metadata()), options.clone(), @@ -807,11 +836,24 @@ impl MetadataLoadedParquetOpen { let needs_rewrite = prepared.predicate.is_some() || prepared.logical_file_schema != physical_file_schema; if needs_rewrite { + // When virtual columns are requested, augment the logical and + // physical schemas passed to the rewriter/simplifier with those + // fields. The rewriter identity-rewrites references found in both + // schemas, keeping virtual-column references as `Column` rather + // than replacing them with null literals; the simplifier needs + // them present so it can resolve their data types while walking + // expression trees. We keep `physical_file_schema` itself as the + // pure file schema so downstream predicate pushdown, pruning, and + // row filter construction stay unaffected. + let logical_for_rewrite = + append_fields(&prepared.logical_file_schema, &prepared.virtual_columns); + let physical_for_rewrite = + append_fields(&physical_file_schema, &prepared.virtual_columns); let rewriter = prepared.expr_adapter_factory.create( - Arc::clone(&prepared.logical_file_schema), - Arc::clone(&physical_file_schema), + Arc::clone(&logical_for_rewrite), + Arc::clone(&physical_for_rewrite), )?; - let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); + let simplifier = PhysicalExprSimplifier::new(&physical_for_rewrite); prepared.predicate = prepared .predicate .map(|p| simplifier.simplify(rewriter.rewrite(p)?)) @@ -1133,8 +1175,29 @@ impl RowGroupsPrunedParquetOpen { } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + // Virtual columns are produced by the reader separately from the + // projection mask, so strip them from the expressions we feed into + // `build_projection_read_plan`. We substitute each virtual column + // reference with a null literal; that leaves the remaining Column + // refs (into `physical_file_schema`) intact for + // `ProjectionMask::roots`, which only understands file columns. + let projection_for_read_plan = if prepared.virtual_columns.is_empty() { + prepared.projection.clone() + } else { + let null_replacements = prepared + .virtual_columns + .iter() + .map(|f| { + ScalarValue::try_from(f.data_type()).map(|v| (f.name().clone(), v)) + }) + .collect::>>()?; + prepared.projection.clone().try_map_exprs(|expr| { + replace_columns_with_literals(expr, &null_replacements) + })? + }; + let read_plan = build_projection_read_plan( - prepared.projection.expr_iter(), + projection_for_read_plan.expr_iter(), &prepared.physical_file_schema, reader_metadata.parquet_schema(), ); @@ -1174,7 +1237,11 @@ impl RowGroupsPrunedParquetOpen { // Check if we need to replace the schema to handle things like differing nullability or metadata. // See note below about file vs. output schema. - let stream_schema = read_plan.projected_schema; + // The reader produces projected file columns followed by any virtual + // columns (`ArrowReaderOptions::with_virtual_columns` appends them to + // each decoded batch). + let stream_schema = + append_fields(&read_plan.projected_schema, &prepared.virtual_columns); let replace_schema = stream_schema != prepared.output_schema; // Rebase column indices to match the narrowed stream schema. @@ -1323,6 +1390,76 @@ impl PushDecoderStreamState { type ConstantColumns = HashMap; +/// Return `base` unchanged when `extra` is empty; otherwise build a new schema +/// with `extra` appended to `base`'s fields. +fn append_fields(base: &SchemaRef, extra: &[FieldRef]) -> SchemaRef { + if extra.is_empty() { + return Arc::clone(base); + } + let fields = base + .fields() + .iter() + .cloned() + .chain(extra.iter().cloned()) + .collect::>(); + Arc::new(Schema::new(fields)) +} + +/// Validate that each field is a DataFusion-supported parquet virtual column +/// by round-tripping it through [`ParquetVirtualColumn::try_from`]. Adding a +/// new supported extension type means adding a variant there — not editing a +/// stringly-typed allowlist here. +fn validate_supported_virtual_columns(virtual_columns: &[FieldRef]) -> Result<()> { + for field in virtual_columns { + ParquetVirtualColumn::try_from(field)?; + } + Ok(()) +} + +/// Reject predicates that reference a virtual column when filter pushdown is +/// enabled. +/// +/// arrow-rs's `RowFilter` evaluates predicates against a `ProjectionMask` that +/// addresses parquet leaves only; virtual columns (e.g. `row_number`) are +/// synthesized by the reader *after* filter evaluation and cannot be referenced +/// inside a row filter. Silently dropping such a predicate would produce wrong +/// results, so we fail loudly here. +/// +/// `ParquetSource::try_pushdown_filters` already classifies virtual-column +/// filters as `PushedDown::No` so the `FilterPushdown` optimizer leaves them +/// above the scan; this check is defense-in-depth for callers that build plans +/// manually and set `with_pushdown_filters(true)` alongside a predicate +/// referencing virtual columns. +fn validate_predicate_does_not_reference_virtual_columns( + predicate: &Arc, + virtual_columns: &[FieldRef], +) -> Result<()> { + if virtual_columns.is_empty() { + return Ok(()); + } + let virtual_names: HashSet<&str> = + virtual_columns.iter().map(|f| f.name().as_str()).collect(); + let mut offender: Option = None; + predicate.apply(|node: &Arc| { + if let Some(column) = node.downcast_ref::() + && virtual_names.contains(column.name()) + { + offender = Some(column.name().to_string()); + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + })?; + if let Some(name) = offender { + return not_impl_err!( + "Predicate references virtual column '{name}' while \ + pushdown_filters=true; predicates on virtual columns must be \ + evaluated above the scan. Disable filter pushdown or leave the \ + filter above the scan." + ); + } + Ok(()) +} + /// Extract constant column values from statistics, keyed by column name in the logical file schema. fn constant_columns_from_stats( statistics: Option<&Statistics>, @@ -1723,12 +1860,28 @@ mod test { self } - /// Set projection by column indices (convenience method for common case). + /// Set projection by column indices. + /// + /// The indices are resolved against the **file schema**, not the full + /// table schema. Callers that need to project partition columns or + /// virtual columns must use [`Self::with_projection`] and construct a + /// [`ProjectionExprs`] against [`TableSchema::table_schema`]. fn with_projection_indices(mut self, indices: &[usize]) -> Self { self.projection_indices = Some(indices.to_vec()); self } + /// Set an explicit projection. + /// + /// Prefer this over [`Self::with_projection_indices`] whenever the + /// projection must reference partition or virtual columns, since + /// `with_projection_indices` resolves its indices against the file + /// schema only. + fn with_projection(mut self, projection: ProjectionExprs) -> Self { + self.projection = Some(projection); + self + } + /// Set the predicate. fn with_predicate(mut self, predicate: Arc) -> Self { self.predicate = Some(predicate); @@ -1977,7 +2130,7 @@ mod test { async fn write_parquet( store: Arc, filename: &str, - batch: arrow::record_batch::RecordBatch, + batch: RecordBatch, ) -> usize { write_parquet_batches(store, filename, vec![batch], None).await } @@ -1986,7 +2139,7 @@ mod test { async fn write_parquet_batches( store: Arc, filename: &str, - batches: Vec, + batches: Vec, props: Option, ) -> usize { let mut out = BytesMut::new().writer(); @@ -2720,4 +2873,430 @@ mod test { "without page index all rows are returned" ); } + + /// Helpers for tests that exercise parquet virtual columns + /// (e.g. `row_number`) plumbed through `TableSchema`/`ParquetOpener`. + mod virtual_columns { + use super::*; + use arrow::array::{Array, Int64Array}; + use arrow::datatypes::FieldRef; + use parquet::arrow::RowNumber; + + /// Build a parquet `row_number` virtual column field. Spark's + /// `_tmp_metadata_row_index` is declared nullable, so the default + /// matches that contract; tests that need `nullable=false` can + /// override via `with_nullable`. + fn row_number_field(name: &str, nullable: bool) -> FieldRef { + Arc::new( + Field::new(name, DataType::Int64, nullable) + .with_extension_type(RowNumber), + ) + } + + /// Collect every `Int64` value from the given column in every batch + /// of a stream. Used to verify the `row_number` column end to end. + async fn collect_int64_values( + mut stream: BoxStream<'static, Result>, + column: usize, + ) -> Vec { + let mut out = vec![]; + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + let array = batch + .column(column) + .as_any() + .downcast_ref::() + .expect("expected Int64 column"); + for i in 0..array.len() { + assert!( + !array.is_null(i), + "row_number values produced by the reader must not be null" + ); + out.push(array.value(i)); + } + } + out + } + + /// Write a parquet file containing `num_row_groups` groups of + /// `rows_per_group` rows with a single `value` Int64 column. + /// Values are `0..num_row_groups*rows_per_group`. + async fn write_grouped_file( + store: &Arc, + path: &str, + num_row_groups: usize, + rows_per_group: usize, + ) -> (SchemaRef, usize) { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + let mut batches = Vec::with_capacity(num_row_groups); + for g in 0..num_row_groups { + let start = (g * rows_per_group) as i64; + let values: Vec = (start..start + rows_per_group as i64).collect(); + batches.push( + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(values))], + ) + .unwrap(), + ); + } + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(rows_per_group)) + .build(); + let data_size = + write_parquet_batches(Arc::clone(store), path, batches, Some(props)) + .await; + (schema, data_size) + } + + #[tokio::test] + async fn test_row_index_basic() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "basic.parquet", 1, 5).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + // Project [value, row_number] — indices in table_schema are + // [0 file:value, 1 virtual:row_number]. + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "basic.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 1).await; + assert_eq!(row_numbers, vec![0, 1, 2, 3, 4]); + } + + #[tokio::test] + async fn test_row_index_projection_only() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "proj_only.parquet", 1, 4).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + // Project only the virtual column (index 1). + let projection = + ProjectionExprs::from_indices(&[1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "proj_only.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 0).await; + assert_eq!(row_numbers, vec![0, 1, 2, 3]); + } + + #[tokio::test] + async fn test_row_index_multi_row_group() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "multi_rg.parquet", 3, 100).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "multi_rg.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 1).await; + let expected: Vec = (0..300).collect(); + assert_eq!(row_numbers, expected); + } + + #[tokio::test] + async fn test_row_index_with_row_group_skip() { + // 3 row groups of 100 rows. A predicate that excludes the middle + // row group (values 100..200) must leave absolute row numbers + // 0..100 and 200..300 intact — not 0..200. This guards against + // the arrow-rs bug fixed in apache/arrow-rs#8863. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "rg_skip.parquet", 3, 100).await; + + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + // `value < 100 OR value >= 200` prunes the middle row group via + // min/max statistics. + let expr = col("value") + .lt(lit(100i64)) + .or(col("value").gt_eq(lit(200i64))); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .with_predicate(predicate) + .with_row_group_stats_pruning(true) + .build(); + + let file = PartitionedFile::new( + "rg_skip.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let stream = open_file(&morselizer, file).await.unwrap(); + let row_numbers = collect_int64_values(stream, 1).await; + let expected: Vec = (0..100).chain(200..300).collect(); + assert_eq!(row_numbers, expected); + } + + #[tokio::test] + async fn test_row_index_with_partition_cols() { + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "part=5/data.parquet", 1, 3).await; + + let rn_field = row_number_field("row_number", false); + let partition_col = Arc::new(Field::new("part", DataType::Int32, false)); + let table_schema = TableSchema::new( + Arc::clone(&file_schema), + vec![Arc::clone(&partition_col)], + ) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + // table_schema layout: [value(0), part(1), row_number(2)]. + let projection = + ProjectionExprs::from_indices(&[0, 1, 2], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let mut file = PartitionedFile::new( + "part=5/data.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + file.partition_values = vec![ScalarValue::Int32(Some(5))]; + + let stream = open_file(&morselizer, file).await.unwrap(); + let mut stream = stream; + let batch = stream.next().await.unwrap().unwrap(); + assert!(stream.next().await.is_none()); + + assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.schema().field(0).name(), "value"); + assert_eq!(batch.schema().field(1).name(), "part"); + assert_eq!(batch.schema().field(2).name(), "row_number"); + + let part = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(part.iter().all(|v| v == Some(5))); + + let rn = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + let rn_values: Vec = (0..rn.len()).map(|i| rn.value(i)).collect(); + assert_eq!(rn_values, vec![0, 1, 2]); + } + + #[tokio::test] + async fn test_row_index_nullable_int64() { + // Spark declares `_tmp_metadata_row_index` nullable. Verify the + // nullability flag flows through unchanged. + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "nullable.parquet", 1, 3).await; + + let rn_field = row_number_field("_tmp_metadata_row_index", true); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + + let file = PartitionedFile::new( + "nullable.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + let mut stream = open_file(&morselizer, file).await.unwrap(); + let batch = stream.next().await.unwrap().unwrap(); + + let schema_field = batch.schema().field(1).clone(); + assert_eq!(schema_field.name(), "_tmp_metadata_row_index"); + assert_eq!(schema_field.data_type(), &DataType::Int64); + assert!( + schema_field.is_nullable(), + "nullable flag should be preserved for Spark's row index field" + ); + } + + #[tokio::test] + async fn test_unsupported_virtual_extension_type_rejected() { + // Guard: opener must reject virtual columns carrying extension + // types outside the tested allowlist, rather than silently + // forwarding them to arrow-rs (where they would produce columns + // we have not validated against DataFusion's projection and + // predicate paths). + let store = Arc::new(InMemory::new()) as Arc; + let (file_schema, data_size) = + write_grouped_file(&store, "unsupported.parquet", 1, 1).await; + + // RowGroupIndex is a real arrow-rs virtual type but is not in + // SUPPORTED_VIRTUAL_EXTENSION_TYPES until a test is added for it. + let rg_field = Arc::new( + Field::new("row_group_index", DataType::Int64, false) + .with_extension_type(parquet::arrow::RowGroupIndex), + ); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![rg_field]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema) + .with_projection(projection) + .build(); + let file = PartitionedFile::new( + "unsupported.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let err = morselizer.plan_file(file).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("parquet.virtual.row_group_index"), + "error should name the unsupported extension type, got: {msg}" + ); + } + + /// Build a morselizer + file for a 5-row single-row-group parquet at + /// `path`, with a single `row_number` virtual column and the given + /// physical predicate applied to + /// `table_schema = [value(0), row_number(1)]`. + async fn build_pushdown_morselizer( + store: &Arc, + path: &str, + predicate_expr: datafusion_expr::Expr, + pushdown_filters: bool, + ) -> (ParquetMorselizer, PartitionedFile) { + let (file_schema, data_size) = write_grouped_file(store, path, 1, 5).await; + let rn_field = row_number_field("row_number", false); + let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(vec![Arc::clone(&rn_field)]); + let projection = + ProjectionExprs::from_indices(&[0, 1], table_schema.table_schema()); + let predicate = + logical2physical(&predicate_expr, table_schema.table_schema()); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(store)) + .with_table_schema(table_schema) + .with_projection(projection) + .with_predicate(predicate) + .with_pushdown_filters(pushdown_filters) + .build(); + + let file = + PartitionedFile::new(path.to_string(), u64::try_from(data_size).unwrap()); + (morselizer, file) + } + + #[tokio::test] + async fn test_row_index_predicate_pushdown_mixed_or_errors() { + // Silent drop in the scan would return all 5 rows; we want a loud + // error instead. + let store = Arc::new(InMemory::new()) as Arc; + let expr = col("row_number") + .eq(lit(2i64)) + .or(col("value").eq(lit(4i64))); + let (morselizer, file) = + build_pushdown_morselizer(&store, "pushdown_mixed.parquet", expr, true) + .await; + + let err = morselizer.plan_file(file).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("row_number"), + "error should name the offending virtual column, got: {msg}" + ); + assert!( + msg.contains("virtual column") && msg.contains("pushdown"), + "error should explain the virtual-column + pushdown context, \ + got: {msg}" + ); + } + + #[tokio::test] + async fn test_row_index_predicate_pushdown_virtual_only_errors() { + let store = Arc::new(InMemory::new()) as Arc; + let expr = col("row_number").eq(lit(2i64)); + let (morselizer, file) = build_pushdown_morselizer( + &store, + "pushdown_virtual_only.parquet", + expr, + true, + ) + .await; + + let err = morselizer.plan_file(file).unwrap_err(); + assert!(err.to_string().contains("row_number")); + } + + #[tokio::test] + async fn test_row_index_predicate_allowed_when_pushdown_disabled() { + // Guards the `pushdown_filters=false` path: the predicate is only + // used for stats pruning (a no-op for row_number) and must not + // trip the pushdown-guard error. + let store = Arc::new(InMemory::new()) as Arc; + let expr = col("row_number").eq(lit(2i64)); + let (morselizer, file) = + build_pushdown_morselizer(&store, "pushdown_off.parquet", expr, false) + .await; + + let stream = open_file(&morselizer, file).await.unwrap(); + let (_batches, rows) = count_batches_and_rows(stream).await; + assert_eq!(rows, 5); + } + } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..d08b8a03729e1 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -678,7 +678,12 @@ impl FileSource for ParquetSource { filters: Vec>, config: &ConfigOptions, ) -> datafusion_common::Result>> { - let table_schema = self.table_schema.table_schema(); + // Use the schema excluding virtual columns: virtual columns (e.g. + // Parquet `row_number`) are produced by the reader itself and cannot + // be referenced inside a RowFilter, so predicates that reference them + // must not be marked as pushed down — otherwise the scan would + // silently drop them and produce wrong results. + let pushable_schema = self.table_schema.schema_without_virtual_columns(); // Determine if based on configs we should push filters down. // If either the table / scan itself or the config has pushdown enabled, // we will push down the filters. @@ -694,7 +699,7 @@ impl FileSource for ParquetSource { let filters: Vec = filters .into_iter() .map(|filter| { - if can_expr_be_pushed_down_with_schemas(&filter, table_schema) { + if can_expr_be_pushed_down_with_schemas(&filter, &pushable_schema) { PushedDownPredicate::supported(filter) } else { PushedDownPredicate::unsupported(filter) @@ -946,4 +951,66 @@ mod tests { assert!(source.reverse_row_groups()); assert!(source.filter().is_some()); } + + #[test] + fn test_try_pushdown_filters_rejects_virtual_column_refs() { + // Virtual columns are produced by the reader and cannot be referenced + // inside a RowFilter. `try_pushdown_filters` must report such filters + // as `PushedDown::No` so the FilterExec above the scan stays in + // place — otherwise the scan would silently drop the predicate and + // produce wrong results. + use arrow::datatypes::{DataType, Field, FieldRef, Schema}; + use datafusion_common::config::ConfigOptions; + use datafusion_datasource::TableSchema; + use datafusion_expr::{col, lit as logical_lit}; + use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::filter_pushdown::PushedDown; + use parquet::arrow::RowNumber; + + let file_schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + let row_number_field: FieldRef = Arc::new( + Field::new("row_number", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::from_file_schema(file_schema) + .with_virtual_columns(vec![row_number_field]); + + let source = ParquetSource::new(table_schema).with_pushdown_filters(true); + + let full_schema = source.table_schema.table_schema(); + + let pushable = logical2physical(&col("value").eq(logical_lit(1i64)), full_schema); + let virtual_only = + logical2physical(&col("row_number").eq(logical_lit(2i64)), full_schema); + let mixed = logical2physical( + &col("row_number") + .eq(logical_lit(2i64)) + .or(col("value").eq(logical_lit(4i64))), + full_schema, + ); + + let config = ConfigOptions::default(); + let prop = source + .try_pushdown_filters(vec![pushable, virtual_only, mixed], &config) + .expect("try_pushdown_filters must not error"); + + assert_eq!(prop.filters.len(), 3); + assert!( + matches!(prop.filters[0], PushedDown::Yes), + "file-column filter should be pushable" + ); + assert!( + matches!(prop.filters[1], PushedDown::No), + "filter referencing only a virtual column must not be pushed down" + ); + assert!( + matches!(prop.filters[2], PushedDown::No), + "filter mixing a virtual column with a file column must not be \ + pushed down (row filter would silently drop it)" + ); + } } diff --git a/datafusion/datasource-parquet/src/virtual_column.rs b/datafusion/datasource-parquet/src/virtual_column.rs new file mode 100644 index 0000000000000..2290ad2aeab9d --- /dev/null +++ b/datafusion/datasource-parquet/src/virtual_column.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Typed wrapper for parquet virtual columns. +//! +//! arrow-rs identifies virtual columns via arrow extension types carried on +//! the `FieldRef`. [`ParquetVirtualColumn`] lifts that contract into the type +//! system so callers validate at the boundary (via `TryFrom<&FieldRef>`) +//! rather than string-comparing extension-type names deep inside the reader. + +use arrow::datatypes::FieldRef; +use arrow_schema::extension::ExtensionType; +use datafusion_common::{DataFusionError, Result, not_impl_err}; +use parquet::arrow::RowNumber; +use std::sync::Arc; + +/// A parquet virtual column validated to have a supported arrow extension +/// type. +/// +/// Construct via [`TryFrom<&FieldRef>`]; add a new variant (and update the +/// `TryFrom` impl) when DataFusion gains support for another arrow-rs virtual +/// extension type. +#[derive(Debug, Clone)] +pub enum ParquetVirtualColumn { + /// Absolute row number within the parquet file. Backed by arrow-rs's + /// [`RowNumber`] extension type. + RowNumber(FieldRef), +} + +impl ParquetVirtualColumn { + pub fn field(&self) -> &FieldRef { + match self { + Self::RowNumber(field) => field, + } + } +} + +impl From for FieldRef { + fn from(col: ParquetVirtualColumn) -> Self { + match col { + ParquetVirtualColumn::RowNumber(field) => field, + } + } +} + +impl TryFrom<&FieldRef> for ParquetVirtualColumn { + type Error = DataFusionError; + + fn try_from(field: &FieldRef) -> Result { + let Some(name) = field.extension_type_name() else { + return not_impl_err!( + "Virtual column '{}' is missing an Arrow extension type; \ + supported extension types: [{}]", + field.name(), + RowNumber::NAME + ); + }; + match name { + n if n == RowNumber::NAME => Ok(Self::RowNumber(Arc::clone(field))), + other => not_impl_err!( + "Virtual column '{}' uses unsupported Arrow extension type '{}'; \ + supported types: [{}]. Add a ParquetVirtualColumn variant and \ + a test for this type before wiring it through.", + field.name(), + other, + RowNumber::NAME + ), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field}; + + #[test] + fn row_number_field_converts() { + let field: FieldRef = Arc::new( + Field::new("row_number", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let col = ParquetVirtualColumn::try_from(&field).expect("valid row_number"); + assert!(matches!(col, ParquetVirtualColumn::RowNumber(_))); + assert_eq!(col.field().name(), "row_number"); + } + + #[test] + fn missing_extension_type_rejected() { + let field: FieldRef = Arc::new(Field::new("plain", DataType::Int64, false)); + let err = ParquetVirtualColumn::try_from(&field).unwrap_err(); + assert!( + err.to_string().contains("missing an Arrow extension type"), + "got: {err}" + ); + } + + #[test] + fn unsupported_extension_type_rejected() { + // RowGroupIndex is a real arrow-rs virtual type not yet in our enum. + let field: FieldRef = Arc::new( + Field::new("row_group_index", DataType::Int64, false) + .with_extension_type(parquet::arrow::RowGroupIndex), + ); + let err = ParquetVirtualColumn::try_from(&field).unwrap_err(); + assert!( + err.to_string().contains("parquet.virtual.row_group_index"), + "error should name the offending extension type, got: {err}" + ); + } +} diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 5b7fc4727df05..761b63f0050c7 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -23,10 +23,17 @@ use std::sync::Arc; /// The overall schema for potentially partitioned data sources. /// /// When reading partitioned data (such as Hive-style partitioning), a [`TableSchema`] -/// consists of two parts: +/// consists of up to three parts: /// 1. **File schema**: The schema of the actual data files on disk /// 2. **Partition columns**: Columns whose values are encoded in the directory structure, /// but not stored in the files themselves +/// 3. **Virtual columns**: Columns produced by the file reader (e.g. Parquet +/// `row_number`) that are not stored in the files +/// +/// The full table schema is composed in that order: file columns, then +/// partition columns, then virtual columns. Consumers that need a different +/// output ordering should use a projection on top of +/// [`TableSchema::table_schema`]. /// /// # Example: Partitioned Table /// @@ -63,6 +70,19 @@ pub struct TableSchema { /// this field holds that schema. file_schema: SchemaRef, + /// Virtual columns that are generated by the reader rather than read from + /// the data files or the directory structure. + /// + /// For example, a Parquet reader may inject a `row_number` column whose + /// values are produced per file by the reader. Virtual column fields must + /// carry an arrow extension type (e.g. `RowNumber`, `RowGroupIndex`) so the + /// file reader can recognize them. + /// + /// Virtual columns are appended at the end of the table schema, after the + /// file columns and any partition columns (layout: `[file, partition, + /// virtual]`). + virtual_columns: Arc>, + /// Columns that are derived from the directory structure (partitioning scheme). /// /// For Hive-style partitioning like `/date=2025-10-10/region=us-west/`, @@ -72,10 +92,11 @@ pub struct TableSchema { /// row during query execution based on the file's location. table_partition_cols: Arc>, - /// The complete table schema: file_schema columns followed by partition columns. + /// The complete table schema: file_schema columns, followed by partition + /// columns, followed by virtual columns. /// - /// This is pre-computed during construction by concatenating `file_schema` - /// and `table_partition_cols`, so it can be returned as a cheap reference. + /// This is pre-computed during construction, so it can be returned as a + /// cheap reference. table_schema: SchemaRef, } @@ -117,12 +138,18 @@ impl TableSchema { /// assert_eq!(table_schema.table_schema().fields().len(), 4); /// ``` pub fn new(file_schema: SchemaRef, table_partition_cols: Vec) -> Self { - let mut builder = SchemaBuilder::from(file_schema.as_ref()); - builder.extend(table_partition_cols.iter().cloned()); + let table_partition_cols = Arc::new(table_partition_cols); + let virtual_columns: Arc> = Arc::new(vec![]); + let table_schema = build_table_schema( + &file_schema, + table_partition_cols.as_ref(), + virtual_columns.as_ref(), + ); Self { file_schema, - table_partition_cols: Arc::new(table_partition_cols), - table_schema: Arc::new(builder.finish()), + virtual_columns, + table_partition_cols, + table_schema, } } @@ -140,6 +167,12 @@ impl TableSchema { /// into [`TableSchema::with_table_partition_cols`] if you have partition columns at construction time /// since it avoids re-computing the table schema. pub fn with_table_partition_cols(mut self, partition_cols: Vec) -> Self { + debug_assert!( + !partition_cols + .iter() + .any(|p| self.virtual_columns.iter().any(|v| v.name() == p.name())), + "partition column name collides with an existing virtual column" + ); if self.table_partition_cols.is_empty() { self.table_partition_cols = Arc::new(partition_cols); } else { @@ -149,9 +182,49 @@ impl TableSchema { ); table_partition_cols.extend(partition_cols); } - let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); - builder.extend(self.table_partition_cols.iter().cloned()); - self.table_schema = Arc::new(builder.finish()); + self.table_schema = build_table_schema( + &self.file_schema, + self.table_partition_cols.as_ref(), + self.virtual_columns.as_ref(), + ); + self + } + + /// Add virtual columns to an existing TableSchema, returning a new instance. + /// + /// Virtual columns are produced by the file reader (e.g. a Parquet + /// `row_number` column) rather than being stored in the files or derived + /// from partition paths. Each field must carry an arrow virtual extension + /// type so the reader can recognize it; `ParquetOpener` forwards these + /// fields to `parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`. + /// + /// Virtual columns are appended at the end of the table schema, after any + /// partition columns. + pub fn with_virtual_columns(mut self, virtual_columns: Vec) -> Self { + debug_assert!( + virtual_columns.iter().enumerate().all(|(i, v)| { + let name = v.name(); + !self.file_schema.fields().iter().any(|f| f.name() == name) + && !self.table_partition_cols.iter().any(|p| p.name() == name) + && !self.virtual_columns.iter().any(|w| w.name() == name) + && !virtual_columns[..i].iter().any(|w| w.name() == name) + }), + "virtual column name collides with an existing file, partition, or virtual column" + ); + + if self.virtual_columns.is_empty() { + self.virtual_columns = Arc::new(virtual_columns); + } else { + let existing = Arc::get_mut(&mut self.virtual_columns).expect( + "Expected to be the sole owner of virtual_columns since this function accepts mut self", + ); + existing.extend(virtual_columns); + } + self.table_schema = build_table_schema( + &self.file_schema, + self.table_partition_cols.as_ref(), + self.virtual_columns.as_ref(), + ); self } @@ -170,13 +243,52 @@ impl TableSchema { &self.table_partition_cols } - /// Get the full table schema (file schema + partition columns). + /// Get the virtual columns. /// - /// This is the complete schema that will be seen by queries, combining - /// both the columns from the files and the partition columns. + /// Virtual columns are produced by the file reader (e.g. Parquet + /// `row_number`) and are not stored in the data files or derived from + /// partition paths. + pub fn virtual_columns(&self) -> &Vec { + &self.virtual_columns + } + + /// Get the full table schema (file schema + partition columns + virtual columns). + /// + /// This is the complete schema that will be seen by queries. Fields appear + /// in the order: file columns, partition columns, virtual columns. pub fn table_schema(&self) -> &SchemaRef { &self.table_schema } + + /// Schema of columns that can be referenced by predicates pushed into the + /// file reader: file columns plus partition columns, excluding virtual + /// columns. + /// + /// Virtual columns are produced by the reader itself (e.g. Parquet + /// `row_number`) and cannot be referenced inside the reader's row filter, + /// so predicates that reference them must stay above the scan. Callers + /// deciding which filters to push down should check against this schema + /// rather than [`Self::table_schema`]. + /// + /// When there are no virtual columns this returns the same schema as + /// [`Self::table_schema`]. + pub fn schema_without_virtual_columns(&self) -> SchemaRef { + if self.virtual_columns.is_empty() { + return Arc::clone(&self.table_schema); + } + build_table_schema(&self.file_schema, &self.table_partition_cols, &[]) + } +} + +fn build_table_schema( + file_schema: &SchemaRef, + table_partition_cols: &[FieldRef], + virtual_columns: &[FieldRef], +) -> SchemaRef { + let mut builder = SchemaBuilder::from(file_schema.as_ref()); + builder.extend(table_partition_cols.iter().cloned()); + builder.extend(virtual_columns.iter().cloned()); + Arc::new(builder.finish()) } impl From for TableSchema { @@ -276,4 +388,111 @@ mod tests { &expected_schema ); } + + #[test] + fn test_with_virtual_columns_layout() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ])); + + let virtual_cols = + vec![Arc::new(Field::new("row_number", DataType::Int64, true))]; + + let partition_cols = vec![Arc::new(Field::new("date", DataType::Utf8, false))]; + + // Apply virtual columns and partition columns in either order; the + // resulting table schema should always be [file, partition, virtual]. + let built_virtual_first = TableSchema::from_file_schema(Arc::clone(&file_schema)) + .with_virtual_columns(virtual_cols.clone()) + .with_table_partition_cols(partition_cols.clone()); + + let built_partition_first = + TableSchema::new(Arc::clone(&file_schema), partition_cols.clone()) + .with_virtual_columns(virtual_cols.clone()); + + let expected = Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + Field::new("date", DataType::Utf8, false), + Field::new("row_number", DataType::Int64, true), + ]); + + for ts in [built_virtual_first, built_partition_first] { + assert_eq!(ts.table_schema().as_ref(), &expected); + assert_eq!(ts.virtual_columns().len(), 1); + assert_eq!(ts.virtual_columns()[0].name(), "row_number"); + assert_eq!(ts.table_partition_cols().len(), 1); + assert_eq!(ts.file_schema().fields().len(), 2); + } + } + + #[test] + #[should_panic(expected = "virtual column name collides")] + #[cfg(debug_assertions)] + fn test_virtual_column_collides_with_file_schema_panics_in_debug() { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "row_number", + DataType::Int64, + false, + )])); + let _ = TableSchema::from_file_schema(file_schema).with_virtual_columns(vec![ + Arc::new(Field::new("row_number", DataType::Int64, true)), + ]); + } + + #[test] + #[should_panic(expected = "virtual column name collides")] + #[cfg(debug_assertions)] + fn test_virtual_column_collides_with_partition_panics_in_debug() { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "user_id", + DataType::Int64, + false, + )])); + let partition_cols = + vec![Arc::new(Field::new("row_number", DataType::Utf8, false))]; + let _ = TableSchema::new(file_schema, partition_cols).with_virtual_columns(vec![ + Arc::new(Field::new("row_number", DataType::Int64, true)), + ]); + } + + #[test] + #[should_panic(expected = "virtual column name collides")] + #[cfg(debug_assertions)] + fn test_duplicate_virtual_columns_panic_in_debug() { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "user_id", + DataType::Int64, + false, + )])); + let _ = TableSchema::from_file_schema(file_schema).with_virtual_columns(vec![ + Arc::new(Field::new("vc", DataType::Int64, true)), + Arc::new(Field::new("vc", DataType::Int64, true)), + ]); + } + + #[test] + #[should_panic(expected = "partition column name collides")] + #[cfg(debug_assertions)] + fn test_partition_column_added_after_colliding_virtual_panics_in_debug() { + // Guards the ordering hole: with_virtual_columns then + // with_table_partition_cols must not silently reintroduce a collision. + let file_schema = Arc::new(Schema::new(vec![Field::new( + "user_id", + DataType::Int64, + false, + )])); + let _ = TableSchema::from_file_schema(file_schema) + .with_virtual_columns(vec![Arc::new(Field::new( + "row_number", + DataType::Int64, + true, + ))]) + .with_table_partition_cols(vec![Arc::new(Field::new( + "row_number", + DataType::Utf8, + false, + ))]); + } }