diff --git a/src/datafusion/src/optimizers/mod.rs b/src/datafusion/src/optimizers/mod.rs index 19b6416c..574b77e5 100644 --- a/src/datafusion/src/optimizers/mod.rs +++ b/src/datafusion/src/optimizers/mod.rs @@ -10,7 +10,8 @@ use datafusion::{ common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}, config::ConfigOptions, datasource::{ - physical_plan::{FileSource, ParquetSource}, + listing::PartitionedFile, + physical_plan::{FileSource, ParquetSource, parquet::ParquetAccessPlan}, source::DataSource, table_schema::TableSchema, }, @@ -172,6 +173,32 @@ pub fn rewrite_data_source_plan( rewritten.data } +/// Estimate the bytes a parquet scan will actually read from one file. +/// +/// Point-fetches attach a precise [`ParquetAccessPlan`] to the `PartitionedFile` +/// that selects only a few row groups. Scaling the file size by the selected +/// fraction lets those targeted reads fall under `max_scan_bytes` and take the +/// cached path, instead of being judged by the whole file's size. Files with no +/// attached plan fall back to the full `object_meta.size`, so normal/full scans +/// are unaffected. +fn estimated_scan_bytes(file: &PartitionedFile) -> u64 { + let file_size = file.object_meta.size; + let Some(plan) = file + .extensions + .as_ref() + .and_then(|ext| ext.downcast_ref::()) + else { + return file_size; + }; + let total = plan.len(); + if total == 0 { + return file_size; + } + let selected = plan.row_group_indexes().len(); + // u128 intermediate so a large file times the selected count can't overflow. + ((file_size as u128 * selected as u128) / total as u128) as u64 +} + fn try_optimize_parquet_source( plan: Arc, cache: &LiquidCacheParquetRef, @@ -189,7 +216,7 @@ fn try_optimize_parquet_source( .file_groups .iter() .flat_map(|g| g.files()) - .map(|f| f.object_meta.size) + .map(estimated_scan_bytes) .sum(); if total > max_bytes { log::info!( @@ -399,6 +426,33 @@ mod tests { .unwrap(); } + #[test] + fn test_estimated_scan_bytes_scales_by_selected_row_groups() { + const GIB: u64 = 1024 * 1024 * 1024; + const MIB: u64 = 1024 * 1024; + let file_size = GIB; + let total_row_groups = 100; + let cap = 100 * MIB; + + // A point-fetch: precise access plan over 100 row groups selecting just 1. + let mut plan = ParquetAccessPlan::new_none(total_row_groups); + plan.scan(0); + let targeted = + PartitionedFile::new("targeted.parquet", file_size).with_extensions(Arc::new(plan)); + + // Estimate scales to ~1/100 of the file, not the full size. + assert_eq!(estimated_scan_bytes(&targeted), file_size / 100); + + // No attached plan: falls back to the full object size (normal scans). + let full = PartitionedFile::new("full.parquet", file_size); + assert_eq!(estimated_scan_bytes(&full), file_size); + + // At the cap, the targeted file is under (cached) while the full file is + // over it (the regression this fix addresses). + assert!(estimated_scan_bytes(&targeted) <= cap); + assert!(estimated_scan_bytes(&full) > cap); + } + #[tokio::test] async fn test_plan_rewrite() { let ctx = SessionContext::new();