Skip to content
74 changes: 74 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,31 @@ impl PreparedAccessPlan {
Ok(self)
}

/// Truncate `row_group_indexes` to keep at most the first `count`
/// entries.
///
/// Used by the cumulative-RG-prune step in the opener: after
/// `reorder_by_statistics` and optional `reverse`, the opener
/// accumulates `num_rows` from the front of `row_group_indexes`
/// until the cumulative count reaches `K` (the TopK fetch), then
/// calls this method with `count = number_of_row_groups_needed`.
///
/// Bails out unchanged (no truncation) when `row_selection` is
/// `Some`. Page-level state is keyed by the surviving row groups
/// and would have to be remapped; the no-WHERE TopK gate ensures
/// `row_selection` is always `None` in the path that calls this
/// method, so the early-return is just a safety net.
pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self {
if self.row_selection.is_some() {
debug!("Skipping cumulative RG truncate: row_selection present");
return self;
}
if count < self.row_group_indexes.len() {
self.row_group_indexes.truncate(count);
}
self
}

/// Reverse the access plan for reverse scanning
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
// Get the row group indexes before reversing
Expand Down Expand Up @@ -1026,4 +1051,53 @@ mod test {

assert_eq!(result.row_group_indexes, vec![0, 1]);
}

// ----------------------------------------------------------------
// `truncate_row_groups` tests
// ----------------------------------------------------------------

/// Happy path: truncate keeps only the first `count` indexes,
/// preserving order. Mirrors the opener's cumulative-RG-prune
/// step, where the iteration order has already been finalised
/// by `reorder_by_statistics` and optional `reverse`.
#[test]
fn truncate_row_groups_keeps_first_n() {
let plan = PreparedAccessPlan::new(vec![3, 1, 4, 1, 5, 9, 2, 6], None).unwrap();

let result = plan.truncate_row_groups(3);

assert_eq!(result.row_group_indexes, vec![3, 1, 4]);
assert!(result.row_selection.is_none());
}

/// No-op when the requested keep-count is >= the current length
/// (e.g. LIMIT larger than the file's total row groups). We
/// cover both equal and greater-than separately.
#[test]
fn truncate_row_groups_no_op_when_count_exceeds_len() {
let same_len = PreparedAccessPlan::new(vec![0, 1, 2], None)
.unwrap()
.truncate_row_groups(3);
assert_eq!(same_len.row_group_indexes, vec![0, 1, 2]);

let larger = PreparedAccessPlan::new(vec![0, 1, 2], None)
.unwrap()
.truncate_row_groups(10);
assert_eq!(larger.row_group_indexes, vec![0, 1, 2]);
}

/// `row_selection` is `Some` → bail out unchanged. Page-level
/// state would need to be remapped to the surviving row groups;
/// the no-WHERE TopK gate ensures this is just a safety net.
#[test]
fn truncate_row_groups_skips_when_row_selection_present() {
let selection = RowSelection::from(vec![RowSelector::select(100)]);
let plan =
PreparedAccessPlan::new(vec![0, 1, 2], Some(selection.clone())).unwrap();

let result = plan.truncate_row_groups(1);

assert_eq!(result.row_group_indexes, vec![0, 1, 2]);
assert_eq!(result.row_selection, Some(selection));
}
}
Loading
Loading