-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Parquet: split RecordBatches when binary offsets would overflow #9369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
45f24f6
dd4ecce
a6e11fb
d6bb9ce
3c539f1
a79c76d
ab4344b
53dfbad
4d65945
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1370,8 +1370,6 @@ impl ParquetRecordBatchReader { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| match self.read_plan.row_selection_cursor_mut() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| RowSelectionCursor::Mask(mask_cursor) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Stream the record batch reader using contiguous segments of the selection | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // mask, avoiding the need to materialize intermediate `RowSelector` ranges. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while !mask_cursor.is_empty() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Ok(None); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mask = mask_cursor.mask_values_for(&mask_chunk)?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if read == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Err(general_err!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "reached end of column while expecting {} rows", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| mask_chunk.chunk_rows | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if read != mask_chunk.chunk_rows { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Err(general_err!( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "insufficient rows read from array reader - expected {}, got {}", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| mask_chunk.chunk_rows, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| read | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let array = self.array_reader.consume_batch()?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // The column reader exposes the projection as a struct array; convert this | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // into a record batch before applying the boolean filter mask. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let struct_array = array.as_struct_opt().ok_or_else(|| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ArrowError::ParquetError( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Struct array reader should return struct array".to_string(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| })?; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Key Change: partial read → emit immediately, no mask | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if read < mask_chunk.chunk_rows { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return Ok(Some(RecordBatch::from(struct_array))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Full read , safe to apply mask | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Full read , safe to apply mask | |
| // Full read, safe to apply mask |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useful explanatory comments were removed that described the high-level logic of the Mask cursor path. While the code changes are legitimate, removing documentation that explains "why" the code works a certain way reduces maintainability. Consider restoring or updating the comment that explains "The column reader exposes the projection as a struct array; convert this into a record batch before applying the boolean filter mask" near line 1405 where the conversion happens, as this helps future maintainers understand the data flow.
| // Key Change: partial read → emit immediately, no mask | |
| if read < mask_chunk.chunk_rows { | |
| return Ok(Some(RecordBatch::from(struct_array))); | |
| } | |
| // Full read , safe to apply mask | |
| // The column reader exposes the projected columns as a single StructArray. | |
| // Convert this struct array into a RecordBatch before applying the boolean | |
| // filter mask so we can use the standard RecordBatch-level filter utilities. | |
| // Key Change: partial read → emit immediately, no mask | |
| if read < mask_chunk.chunk_rows { | |
| return Ok(Some(RecordBatch::from(struct_array))); | |
| } | |
| // Full read, safe to apply mask |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a partial read occurs (read < mask_chunk.chunk_rows), this code returns the unfiltered struct_array without applying the boolean mask. This means rows that should have been filtered out according to the row selection mask will be incorrectly included in the returned RecordBatch.
The mask_cursor expects that mask_chunk.chunk_rows were read, but only 'read' rows were actually decoded. The mask should still be applied, but it needs to be sized/sliced appropriately for the partial read, or the mask_cursor state needs to be adjusted to reflect only the rows that were actually read.
| // Key Change: partial read → emit immediately, no mask | |
| if read < mask_chunk.chunk_rows { | |
| return Ok(Some(RecordBatch::from(struct_array))); | |
| } | |
| // Full read , safe to apply mask | |
| // Always apply the selection mask; for partial reads, slice it down to `read` | |
| if read < mask_chunk.chunk_rows { | |
| let full_mask = mask_cursor.mask_values_for(&mask_chunk)?; | |
| let sliced_mask = full_mask.slice(0, read); | |
| let sliced_mask = sliced_mask.as_boolean(); | |
| let filtered_batch = | |
| filter_record_batch(&RecordBatch::from(struct_array), sliced_mask)?; | |
| if filtered_batch.num_rows() == 0 { | |
| continue; | |
| } | |
| return Ok(Some(filtered_batch)); | |
| } | |
| // Full read, apply mask directly |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removed validation that checked filtered_batch.num_rows() == mask_chunk.selected_rows was an important consistency check. While this check needs to be skipped for partial reads, it should still be performed for full reads to catch data inconsistencies. Consider restoring this check after line 1419, within the full read path, to maintain data integrity validation.
| // For full reads, ensure the mask and filtered output are consistent | |
| if filtered_batch.num_rows() != mask_chunk.selected_rows { | |
| return Err(general_err!( | |
| "row filter inconsistency: expected {} rows, got {}", | |
| mask_chunk.selected_rows, | |
| filtered_batch.num_rows() | |
| )); | |
| } |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a partial read occurs (rec < to_read), this code breaks and returns a partial batch. However, if line 1456 executed, we've already pushed remaining rows back onto the selectors_cursor via return_selector(). This means those rows are queued for the next read, but we haven't actually advanced the cursor properly for the rows we DID read. The cursor state may become inconsistent because:
- If to_read was less than front.row_count, we pushed back
remainingrows - But we only read
recrows, which may be less thanto_read - The cursor doesn't know about the (to_read - rec) rows that weren't consumed
Consider handling the partial read case by calculating how many rows were actually consumed and adjusting the selector cursor accordingly, possibly by calling return_selector with the unconsumed count.
| if rec < to_read { | |
| if rec < to_read { | |
| let unconsumed = to_read - rec; | |
| if unconsumed > 0 { | |
| selectors_cursor.return_selector(RowSelector::select(unconsumed)); | |
| } |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The three cursor paths handle the case where read_records returns 0 inconsistently:
- Mask path: Returns an error "reached end of column while expecting X rows" (lines 1397-1402)
- Selectors path: Breaks the loop and proceeds to consume_batch (line 1462-1463)
- All path: Returns Ok(None) immediately without calling consume_batch (lines 1476-1478)
This inconsistency could lead to different behavior in edge cases. Consider whether all three paths should handle EOF consistently, either by returning None or by proceeding to consume_batch to check if there's any buffered data. The early return in the All path might skip consuming any previously buffered data, although this may be intentional optimization.
| return Ok(None); | |
| break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think after we merge this PR, the idea of "key change" will be confusing (as it won't be part of a change)