Skip to content

Commit 33c4032

Browse files
authored
Reproduce the issue of #9370 in a minimal, end-to-end way (#9399)
# Which issue does this PR close? Related to #9374 #9370 # Rationale for this change In #9374, I only came up with a unit-test that didn't really throw the message "Not all children array length are the same!" error the issue #9370 is about. In this PR, tests are introduced that are able to reproduce this issue end-to-end and are expected to still panic. In test `test_list_struct_page_boundary_desync_produces_length_mismatch`, we exactly get the error message the original issue is about, while `test_row_selection_list_column_v2_page_boundary_skip` shows a slightly different error message. # What changes are included in this PR? Two test are introduced, which currently are expected to panic: - test_row_selection_list_column_v2_page_boundary_skip - test_list_struct_page_boundary_desync_produces_length_mismatch # Are there any user-facing changes? No
1 parent d6168e5 commit 33c4032

1 file changed

Lines changed: 228 additions & 1 deletion

File tree

  • parquet/tests/arrow_reader/row_filter

parquet/tests/arrow_reader/row_filter/sync.rs

Lines changed: 228 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ use parquet::{
3333
},
3434
},
3535
errors::Result,
36-
file::{metadata::PageIndexPolicy, properties::WriterProperties},
36+
file::{
37+
metadata::PageIndexPolicy,
38+
properties::{WriterProperties, WriterVersion},
39+
},
3740
};
3841

3942
#[test]
@@ -194,3 +197,227 @@ fn test_row_filter_full_page_skip_is_handled() {
194197
let result = concat_batches(&schema, &batches).unwrap();
195198
assert_eq!(result.num_rows(), 2);
196199
}
200+
201+
/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
202+
///
203+
/// When `skip_records` on a list column crosses v2 data page boundaries,
204+
/// the partial record state (`has_partial`) in the repetition level
205+
/// decoder must be flushed before the whole-page-skip shortcut can fire.
206+
/// Without the fix, the list column over-skips by one record, causing
207+
/// struct children to disagree on record counts.
208+
#[test]
209+
#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")]
210+
fn test_row_selection_list_column_v2_page_boundary_skip() {
211+
use arrow_array::builder::{Int32Builder, ListBuilder};
212+
213+
let num_rows = 10usize;
214+
215+
// Schema: { id: int32, values: list<int32> }
216+
// Two top-level columns so that StructArrayReader can detect the
217+
// desync between a simple column (id) and a list column (values).
218+
let schema = Arc::new(Schema::new(vec![
219+
Field::new("id", ArrowDataType::Int32, false),
220+
Field::new(
221+
"values",
222+
ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int32, true))),
223+
false,
224+
),
225+
]));
226+
227+
// Each row: id = i, values = [i*10, i*10+1]
228+
let ids = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
229+
let mut list_builder = ListBuilder::new(Int32Builder::new());
230+
for i in 0..num_rows as i32 {
231+
list_builder.values().append_value(i * 10);
232+
list_builder.values().append_value(i * 10 + 1);
233+
list_builder.append(true);
234+
}
235+
let values_array = list_builder.finish();
236+
237+
let batch = RecordBatch::try_new(
238+
schema.clone(),
239+
vec![
240+
Arc::new(ids) as ArrayRef,
241+
Arc::new(values_array) as ArrayRef,
242+
],
243+
)
244+
.unwrap();
245+
246+
// Force v2 data pages with exactly 2 rows per page → 5 pages.
247+
// The default reader (no offset index) puts SerializedPageReader
248+
// in Values state where at_record_boundary() returns false for
249+
// non-last pages, matching the production scenario.
250+
let props = WriterProperties::builder()
251+
.set_writer_version(WriterVersion::PARQUET_2_0)
252+
.set_write_batch_size(2)
253+
.set_data_page_row_count_limit(2)
254+
.build();
255+
256+
let mut buffer = Vec::new();
257+
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
258+
writer.write(&batch).unwrap();
259+
writer.close().unwrap();
260+
let data = Bytes::from(buffer);
261+
262+
// 1. Read without row selection — should always succeed
263+
let reader = ParquetRecordBatchReaderBuilder::try_new(data.clone())
264+
.unwrap()
265+
.build()
266+
.unwrap();
267+
let batches: Vec<_> = reader.map(|r| r.unwrap()).collect();
268+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
269+
assert_eq!(total, num_rows, "full read should return all rows");
270+
271+
// 2. Read with row selection: select first and last row.
272+
// This produces the sequence read(1) + skip(8) + read(1).
273+
//
274+
// The skip crosses v2 page boundaries. Without the fix, the
275+
// list column's rep-level decoder has stale has_partial state
276+
// after exhausting the first page's remaining levels, causing
277+
// the whole-page-skip shortcut to over-count by one record.
278+
//
279+
// We must use RowSelectionPolicy::Selectors because the default
280+
// Auto policy would choose the Mask strategy for this small
281+
// selection, which reads all rows then filters (never calling
282+
// skip_records, thereby hiding the bug).
283+
let selection = RowSelection::from(vec![
284+
RowSelector::select(1),
285+
RowSelector::skip(num_rows - 2),
286+
RowSelector::select(1),
287+
]);
288+
289+
let reader = ParquetRecordBatchReaderBuilder::try_new(data)
290+
.unwrap()
291+
.with_row_selection(selection)
292+
.with_row_selection_policy(RowSelectionPolicy::Selectors)
293+
.build()
294+
.unwrap();
295+
let batches: Vec<RecordBatch> = reader.map(|r| r.unwrap()).collect();
296+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
297+
assert_eq!(total, 2, "selection should return exactly 2 rows");
298+
299+
// Verify data correctness: row 0 and row 9
300+
let result = concat_batches(&schema, &batches).unwrap();
301+
let id_col = result
302+
.column(0)
303+
.as_primitive::<arrow_array::types::Int32Type>();
304+
assert_eq!(id_col.value(0), 0);
305+
assert_eq!(id_col.value(1), (num_rows - 1) as i32);
306+
307+
let list_col = result.column(1).as_list::<i32>();
308+
let first = list_col
309+
.value(0)
310+
.as_primitive::<arrow_array::types::Int32Type>()
311+
.values()
312+
.to_vec();
313+
assert_eq!(first, vec![0, 1]);
314+
let last = list_col
315+
.value(1)
316+
.as_primitive::<arrow_array::types::Int32Type>()
317+
.values()
318+
.to_vec();
319+
let n = (num_rows - 1) as i32;
320+
assert_eq!(last, vec![n * 10, n * 10 + 1]);
321+
}
322+
323+
/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
324+
///
325+
/// When leaf columns inside a `List<Struct<…>>` have different page
326+
/// boundaries (due to value-size differences), the `has_partial` state
327+
/// bug causes one leaf to over-skip by one record while the other stays
328+
/// correct.
329+
#[test]
330+
#[should_panic(expected = "Not all children array length are the same!")]
331+
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
332+
use arrow_array::Array;
333+
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};
334+
use arrow_schema::Fields;
335+
336+
let num_rows = 14usize;
337+
// Long string forces the string column to flush pages much sooner
338+
// than the int32 column, creating different page boundaries.
339+
let long_prefix = "x".repeat(500);
340+
341+
// Schema: { vals: List<Struct<x: Int32, y: Utf8>> }
342+
let struct_fields = Fields::from(vec![
343+
Field::new("x", ArrowDataType::Int32, false),
344+
Field::new("y", ArrowDataType::Utf8, false),
345+
]);
346+
347+
// Build data: even rows have 2 list elements, odd rows have 3.
348+
// This ensures different physical value counts per record, so
349+
// reading from the wrong position produces a different total.
350+
let mut list_builder = ListBuilder::new(StructBuilder::from_fields(struct_fields, 0));
351+
for i in 0..num_rows {
352+
let num_elems = if i % 2 == 0 { 2 } else { 3 };
353+
let sb = list_builder.values();
354+
for j in 0..num_elems {
355+
sb.field_builder::<Int32Builder>(0)
356+
.unwrap()
357+
.append_value(i as i32 * 10 + j);
358+
sb.field_builder::<StringBuilder>(1)
359+
.unwrap()
360+
.append_value(format!("{long_prefix}_{i}_{j}"));
361+
sb.append(true);
362+
}
363+
list_builder.append(true);
364+
}
365+
let vals_array = list_builder.finish();
366+
367+
// Derive schema from the actual array type to avoid field-name mismatches.
368+
let schema = Arc::new(Schema::new(vec![Field::new(
369+
"vals",
370+
vals_array.data_type().clone(),
371+
false,
372+
)]));
373+
374+
let batch =
375+
RecordBatch::try_new(schema.clone(), vec![Arc::new(vals_array) as ArrayRef]).unwrap();
376+
377+
// V2 pages + small max_data_page_size.
378+
// Column x (Int32): all 14 rows fit in one page (~140 bytes values).
379+
// Column y (Utf8, 500-byte strings): pages flush after every ~2 rows.
380+
// This creates the page boundary asymmetry needed to trigger the bug.
381+
let props = WriterProperties::builder()
382+
.set_writer_version(WriterVersion::PARQUET_2_0)
383+
.set_write_batch_size(2)
384+
.set_dictionary_enabled(false)
385+
.set_data_page_size_limit(512)
386+
.build();
387+
388+
let mut buffer = Vec::new();
389+
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();
390+
writer.write(&batch).unwrap();
391+
writer.close().unwrap();
392+
let data = Bytes::from(buffer);
393+
394+
// 1. Read without row selection — should always succeed.
395+
let reader = ParquetRecordBatchReaderBuilder::try_new(data.clone())
396+
.unwrap()
397+
.build()
398+
.unwrap();
399+
let batches: Vec<_> = reader.map(|r| r.unwrap()).collect();
400+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
401+
assert_eq!(total, num_rows, "full read should return all rows");
402+
403+
// 2. Read with selection: select(1) + skip(10) + select(1).
404+
// Without the fix, the string column (y) over-skips by 1,
405+
// reading a record with a different element count than the
406+
// int column (x). The inner StructArrayReader sees arrays
407+
// of different lengths.
408+
let selection = RowSelection::from(vec![
409+
RowSelector::select(1),
410+
RowSelector::skip(10),
411+
RowSelector::select(1),
412+
]);
413+
414+
let reader = ParquetRecordBatchReaderBuilder::try_new(data)
415+
.unwrap()
416+
.with_row_selection(selection)
417+
.with_row_selection_policy(RowSelectionPolicy::Selectors)
418+
.build()
419+
.unwrap();
420+
let batches: Vec<RecordBatch> = reader.map(|r| r.unwrap()).collect();
421+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
422+
assert_eq!(total, 2, "selection should return exactly 2 rows");
423+
}

0 commit comments

Comments
 (0)