Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions src/datafusion/src/optimizers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use datafusion::{
common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
config::ConfigOptions,
datasource::{
physical_plan::{FileSource, ParquetSource},
listing::PartitionedFile,
physical_plan::{FileSource, ParquetSource, parquet::ParquetAccessPlan},
source::DataSource,
table_schema::TableSchema,
},
Expand Down Expand Up @@ -172,6 +173,32 @@ pub fn rewrite_data_source_plan(
rewritten.data
}

/// Estimate the bytes a parquet scan will actually read from one file.
///
/// Point-fetches attach a precise [`ParquetAccessPlan`] to the `PartitionedFile`
/// that selects only a few row groups. Scaling the file size by the selected
/// fraction lets those targeted reads fall under `max_scan_bytes` and take the
/// cached path, instead of being judged by the whole file's size. Files with no
/// attached plan fall back to the full `object_meta.size`, so normal/full scans
/// are unaffected.
fn estimated_scan_bytes(file: &PartitionedFile) -> u64 {
let file_size = file.object_meta.size;
let Some(plan) = file
.extensions
.as_ref()
.and_then(|ext| ext.downcast_ref::<ParquetAccessPlan>())
else {
return file_size;
};
let total = plan.len();
if total == 0 {
return file_size;
}
let selected = plan.row_group_indexes().len();
// u128 intermediate so a large file times the selected count can't overflow.
((file_size as u128 * selected as u128) / total as u128) as u64
}

fn try_optimize_parquet_source(
plan: Arc<dyn ExecutionPlan>,
cache: &LiquidCacheParquetRef,
Expand All @@ -189,7 +216,7 @@ fn try_optimize_parquet_source(
.file_groups
.iter()
.flat_map(|g| g.files())
.map(|f| f.object_meta.size)
.map(estimated_scan_bytes)
.sum();
if total > max_bytes {
log::info!(
Expand Down Expand Up @@ -399,6 +426,33 @@ mod tests {
.unwrap();
}

#[test]
fn test_estimated_scan_bytes_scales_by_selected_row_groups() {
const GIB: u64 = 1024 * 1024 * 1024;
const MIB: u64 = 1024 * 1024;
let file_size = GIB;
let total_row_groups = 100;
let cap = 100 * MIB;

// A point-fetch: precise access plan over 100 row groups selecting just 1.
let mut plan = ParquetAccessPlan::new_none(total_row_groups);
plan.scan(0);
let targeted =
PartitionedFile::new("targeted.parquet", file_size).with_extensions(Arc::new(plan));

// Estimate scales to ~1/100 of the file, not the full size.
assert_eq!(estimated_scan_bytes(&targeted), file_size / 100);

// No attached plan: falls back to the full object size (normal scans).
let full = PartitionedFile::new("full.parquet", file_size);
assert_eq!(estimated_scan_bytes(&full), file_size);

// At the cap, the targeted file is under (cached) while the full file is
// over it (the regression this fix addresses).
assert!(estimated_scan_bytes(&targeted) <= cap);
assert!(estimated_scan_bytes(&full) > cap);
}

#[tokio::test]
async fn test_plan_rewrite() {
let ctx = SessionContext::new();
Expand Down
Loading