From 5982eaa8de9d05f51f82b4883ccba6bb487b046b Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Mon, 15 Jun 2026 10:45:15 +0530 Subject: [PATCH 1/2] fix: gate scan-size on access-plan estimate, not full file size The max_scan_bytes gate in try_optimize_parquet_source summed each file's full object_meta.size, so point-fetches that read only a few row groups (via a pre-attached ParquetAccessPlan) were judged by the whole file's size and passed through as uncached vanilla DataFusion reads. Add estimated_scan_bytes(&PartitionedFile): when a precise ParquetAccessPlan is attached to file.extensions, scale the file size by the selected (non-Skip) row-group fraction; otherwise fall back to the full object_meta.size so normal/full scans are unaffected. Use a u128 intermediate to avoid overflow on large files. Wire it into the gate's size computation. Add a focused unit test covering the targeted vs. full-scan estimate and the cap-crossing regression. --- src/datafusion/src/optimizers/mod.rs | 58 +++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/src/datafusion/src/optimizers/mod.rs b/src/datafusion/src/optimizers/mod.rs index 19b6416c..1a19bc6f 100644 --- a/src/datafusion/src/optimizers/mod.rs +++ b/src/datafusion/src/optimizers/mod.rs @@ -10,7 +10,8 @@ use datafusion::{ common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}, config::ConfigOptions, datasource::{ - physical_plan::{FileSource, ParquetSource}, + listing::PartitionedFile, + physical_plan::{FileSource, ParquetSource, parquet::ParquetAccessPlan}, source::DataSource, table_schema::TableSchema, }, @@ -172,6 +173,32 @@ pub fn rewrite_data_source_plan( rewritten.data } +/// Estimate the bytes a parquet scan will actually read from one file. +/// +/// Point-fetches attach a precise [`ParquetAccessPlan`] to the `PartitionedFile` +/// that selects only a few row groups. Scaling the file size by the selected +/// fraction lets those targeted reads fall under `max_scan_bytes` and take the +/// cached path, instead of being judged by the whole file's size. Files with no +/// attached plan fall back to the full `object_meta.size`, so normal/full scans +/// are unaffected. +fn estimated_scan_bytes(file: &PartitionedFile) -> u64 { + let file_size = file.object_meta.size; + let Some(plan) = file + .extensions + .as_ref() + .and_then(|ext| ext.downcast_ref::()) + else { + return file_size; + }; + let total = plan.len(); + if total == 0 { + return file_size; + } + let selected = plan.row_group_indexes().len(); + // u128 intermediate so a large file times the selected count can't overflow. + ((file_size as u128 * selected as u128) / total as u128) as u64 +} + fn try_optimize_parquet_source( plan: Arc, cache: &LiquidCacheParquetRef, @@ -189,7 +216,7 @@ fn try_optimize_parquet_source( .file_groups .iter() .flat_map(|g| g.files()) - .map(|f| f.object_meta.size) + .map(estimated_scan_bytes) .sum(); if total > max_bytes { log::info!( @@ -399,6 +426,33 @@ mod tests { .unwrap(); } + #[test] + fn test_estimated_scan_bytes_scales_by_selected_row_groups() { + const GIB: u64 = 1024 * 1024 * 1024; + const MIB: u64 = 1024 * 1024; + let file_size = GIB; + let total_row_groups = 100; + let cap = 100 * MIB; + + // A point-fetch: precise access plan over 100 row groups selecting just 1. + let mut plan = ParquetAccessPlan::new_none(total_row_groups); + plan.scan(0); + let targeted = PartitionedFile::new("targeted.parquet", file_size) + .with_extensions(Arc::new(plan)); + + // Estimate scales to ~1/100 of the file, not the full size. + assert_eq!(estimated_scan_bytes(&targeted), file_size / 100); + + // No attached plan: falls back to the full object size (normal scans). + let full = PartitionedFile::new("full.parquet", file_size); + assert_eq!(estimated_scan_bytes(&full), file_size); + + // At the cap, the targeted file is under (cached) while the full file is + // over it (the regression this fix addresses). + assert!(estimated_scan_bytes(&targeted) <= cap); + assert!(estimated_scan_bytes(&full) > cap); + } + #[tokio::test] async fn test_plan_rewrite() { let ctx = SessionContext::new(); From ec2d70b04bf54a8ca0b5ff8bcf47c821e4ba5836 Mon Sep 17 00:00:00 2001 From: Shefeek Jinnah Date: Mon, 15 Jun 2026 10:52:16 +0530 Subject: [PATCH 2/2] style: rustfmt PartitionedFile builder line in scan-gate test --- src/datafusion/src/optimizers/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datafusion/src/optimizers/mod.rs b/src/datafusion/src/optimizers/mod.rs index 1a19bc6f..574b77e5 100644 --- a/src/datafusion/src/optimizers/mod.rs +++ b/src/datafusion/src/optimizers/mod.rs @@ -437,8 +437,8 @@ mod tests { // A point-fetch: precise access plan over 100 row groups selecting just 1. let mut plan = ParquetAccessPlan::new_none(total_row_groups); plan.scan(0); - let targeted = PartitionedFile::new("targeted.parquet", file_size) - .with_extensions(Arc::new(plan)); + let targeted = + PartitionedFile::new("targeted.parquet", file_size).with_extensions(Arc::new(plan)); // Estimate scales to ~1/100 of the file, not the full size. assert_eq!(estimated_scan_bytes(&targeted), file_size / 100);