From ccafa6b12c6c61861076883d87d8c3632f397cbd Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Fri, 6 Mar 2026 20:55:07 -0500 Subject: [PATCH] Fix `elapsed_compute` metric for Parquet DataSourceExec The `elapsed_compute` baseline metric for Parquet scans previously reported unrealistically low values (e.g. 14ns for a full table scan) because no timer was wrapping the per-batch compute work. This follows the same pattern used in PR #18901 for CSV: instantiate `BaselineMetrics` in `ParquetOpener::open()` and wrap the stream's per-batch processing (projection, schema replacement, metrics copy) with an `elapsed_compute` timer. Closes part of #18195. --- datafusion/datasource-parquet/src/opener.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 108e8c5752017..bb97d13cc59d9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -47,7 +47,7 @@ use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; use datafusion_physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, + BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics, }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; @@ -191,6 +191,7 @@ impl FileOpener for ParquetOpener { let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); + let baseline_metrics = BaselineMetrics::new(&self.metrics, self.partition_index); let metadata_size_hint = partitioned_file .metadata_size_hint @@ -631,7 +632,8 @@ impl FileOpener for ParquetOpener { let projector = projection.make_projector(&stream_schema)?; let stream = stream.map_err(DataFusionError::from).map(move |b| { - b.and_then(|mut b| { + let mut timer = baseline_metrics.elapsed_compute().timer(); + let result = b.and_then(|mut b| { copy_arrow_reader_metrics( &arrow_reader_metrics, &predicate_cache_inner_records, @@ -659,7 +661,9 @@ impl FileOpener for ParquetOpener { } else { Ok(b) } - }) + }); + timer.stop(); + result }); // ----------------------------------------------------------------------