Skip to content
Open
48 changes: 22 additions & 26 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,8 +1370,6 @@ impl ParquetRecordBatchReader {
}
match self.read_plan.row_selection_cursor_mut() {
RowSelectionCursor::Mask(mask_cursor) => {
// Stream the record batch reader using contiguous segments of the selection
// mask, avoiding the need to materialize intermediate `RowSelector` ranges.
while !mask_cursor.is_empty() {
let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
return Ok(None);
Expand All @@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
continue;
}

let mask = mask_cursor.mask_values_for(&mask_chunk)?;

let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
if read != mask_chunk.chunk_rows {
return Err(general_err!(
"insufficient rows read from array reader - expected {}, got {}",
mask_chunk.chunk_rows,
read
));
}

let array = self.array_reader.consume_batch()?;
// The column reader exposes the projection as a struct array; convert this
// into a record batch before applying the boolean filter mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
})?;

// Key Change: partial read → emit immediately, no mask
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think after we merge this PR, the idea of "key change" will be confusing (as it won't be part of a change)

if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}

// Full read , safe to apply mask
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a spacing error in this comment: "Full read , safe" should be "Full read, safe" (remove the extra space before the comma).

Suggested change
// Full read , safe to apply mask
// Full read, safe to apply mask

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful explanatory comments were removed that described the high-level logic of the Mask cursor path. While the code changes are legitimate, removing documentation that explains "why" the code works a certain way reduces maintainability. Consider restoring or updating the comment that explains "The column reader exposes the projection as a struct array; convert this into a record batch before applying the boolean filter mask" near line 1405 where the conversion happens, as this helps future maintainers understand the data flow.

Suggested change
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read , safe to apply mask
// The column reader exposes the projected columns as a single StructArray.
// Convert this struct array into a RecordBatch before applying the boolean
// filter mask so we can use the standard RecordBatch-level filter utilities.
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read, safe to apply mask

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a partial read occurs (read < mask_chunk.chunk_rows), this code returns the unfiltered struct_array without applying the boolean mask. This means rows that should have been filtered out according to the row selection mask will be incorrectly included in the returned RecordBatch.

The mask_cursor expects that mask_chunk.chunk_rows were read, but only 'read' rows were actually decoded. The mask should still be applied, but it needs to be sized/sliced appropriately for the partial read, or the mask_cursor state needs to be adjusted to reflect only the rows that were actually read.

Suggested change
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read , safe to apply mask
// Always apply the selection mask; for partial reads, slice it down to `read`
if read < mask_chunk.chunk_rows {
let full_mask = mask_cursor.mask_values_for(&mask_chunk)?;
let sliced_mask = full_mask.slice(0, read);
let sliced_mask = sliced_mask.as_boolean();
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), sliced_mask)?;
if filtered_batch.num_rows() == 0 {
continue;
}
return Ok(Some(filtered_batch));
}
// Full read, apply mask directly

Copilot uses AI. Check for mistakes.
let mask = mask_cursor.mask_values_for(&mask_chunk)?;
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;

Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed validation that checked filtered_batch.num_rows() == mask_chunk.selected_rows was an important consistency check. While this check needs to be skipped for partial reads, it should still be performed for full reads to catch data inconsistencies. Consider restoring this check after line 1419, within the full read path, to maintain data integrity validation.

Suggested change
// For full reads, ensure the mask and filtered output are consistent
if filtered_batch.num_rows() != mask_chunk.selected_rows {
return Err(general_err!(
"row filter inconsistency: expected {} rows, got {}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}

Copilot uses AI. Check for mistakes.
if filtered_batch.num_rows() != mask_chunk.selected_rows {
return Err(general_err!(
"filtered rows mismatch selection - expected {}, got {}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}

if filtered_batch.num_rows() == 0 {
continue;
}
Expand Down Expand Up @@ -1472,14 +1458,24 @@ impl ParquetRecordBatchReader {
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read)? {
0 => break,
rec => read_records += rec,
};
let rec = self.array_reader.read_records(to_read)?;
if rec == 0 {
break;
}

read_records += rec;

// stop early if we couldn't read everything requested
if rec < to_read {
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a partial read occurs (rec < to_read), this code breaks and returns a partial batch. However, if line 1456 executed, we've already pushed remaining rows back onto the selectors_cursor via return_selector(). This means those rows are queued for the next read, but we haven't actually advanced the cursor properly for the rows we DID read. The cursor state may become inconsistent because:

  1. If to_read was less than front.row_count, we pushed back remaining rows
  2. But we only read rec rows, which may be less than to_read
  3. The cursor doesn't know about the (to_read - rec) rows that weren't consumed

Consider handling the partial read case by calculating how many rows were actually consumed and adjusting the selector cursor accordingly, possibly by calling return_selector with the unconsumed count.

Suggested change
if rec < to_read {
if rec < to_read {
let unconsumed = to_read - rec;
if unconsumed > 0 {
selectors_cursor.return_selector(RowSelector::select(unconsumed));
}

Copilot uses AI. Check for mistakes.
break;
}
}
}
RowSelectionCursor::All => {
self.array_reader.read_records(batch_size)?;
let rec = self.array_reader.read_records(batch_size)?;
if rec == 0 {
return Ok(None);
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three cursor paths handle the case where read_records returns 0 inconsistently:

  • Mask path: Returns an error "reached end of column while expecting X rows" (lines 1397-1402)
  • Selectors path: Breaks the loop and proceeds to consume_batch (line 1462-1463)
  • All path: Returns Ok(None) immediately without calling consume_batch (lines 1476-1478)

This inconsistency could lead to different behavior in edge cases. Consider whether all three paths should handle EOF consistently, either by returning None or by proceeding to consume_batch to check if there's any buffered data. The early return in the All path might skip consuming any previously buffered data, although this may be intentional optimization.

Suggested change
return Ok(None);
break;

Copilot uses AI. Check for mistakes.
}
}
};

Expand Down
16 changes: 9 additions & 7 deletions parquet/tests/arrow_reader/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use parquet::arrow::arrow_reader::ArrowReaderBuilder;
#[test]
fn test_datapage_v1_corrupt_checksum() {
let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet");
assert_eq!(errors, [
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string())
]);
assert_eq!(
errors,
[
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
]
);
}

#[test]
Expand Down
Loading