Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 145 additions & 6 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,20 @@ where

// If page has less rows than the remaining records to
// be skipped, skip entire page
let rows = metadata.num_rows.or_else(|| {
// If no repetition levels, num_levels == num_rows
self.rep_level_decoder
.is_none()
.then_some(metadata.num_levels)?
});
let rows = match (metadata.num_rows, metadata.num_levels) {
// if num_rows is set, but num_levels is None, then we've skipped a page
// based on the offset index. we must skip it here.
(Some(rows), None) => Some(rows),
// we have no row info (unlikely)
(None, None) => None,
// either both are set, or only num_levels. in either case we'll only skip
// if it's safe to assume we're on a record boundary, which will only be true
// if there are no repetition levels
_ => match self.rep_level_decoder {
None => metadata.num_levels,
_ => None,
}
};

if let Some(rows) = rows {
if rows <= remaining_records {
Expand Down Expand Up @@ -1361,4 +1369,135 @@ mod tests {
);
}
}

/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
///
/// Reproduces the production scenario: all DataPage v2 pages for a
/// list column (rep_level=1) read without an offset index (i.e.
/// `at_record_boundary` returns false for non-last pages).
///
/// When a prior operation (here `skip_records(1)`) loads a v2 page,
/// and a subsequent `skip_records` exhausts the remaining levels on
/// that page, the rep level decoder is left with `has_partial=true`.
/// Because `has_record_delimiter` is false, the partial is not
/// flushed during level-based processing. When the next v2 page is
/// then peeked with `num_rows` available, the whole-page-skip
/// shortcut must flush the pending partial first. Otherwise:
///
/// 1. The skip over-counts (skips N+1 records instead of N), and
/// 2. The stale `has_partial` causes a subsequent `read_records` to
/// produce a "phantom" record with 0 values.
#[test]
fn test_skip_records_v2_page_skip_accounts_for_partial() {
use crate::encodings::levels::LevelEncoder;

let max_rep_level: i16 = 1;
let max_def_level: i16 = 1;

// Column descriptor for a list element column (rep=1, def=1)
let primitive_type = SchemaType::primitive_type_builder("element", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap();
let desc = Arc::new(ColumnDescriptor::new(
Arc::new(primitive_type),
max_def_level,
max_rep_level,
ColumnPath::new(vec!["list".to_string(), "element".to_string()]),
));

// Helper: build a DataPage v2 for this list column.
let make_v2_page =
|rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: u32| -> Page {
let mut rep_enc = LevelEncoder::v2(max_rep_level, rep_levels.len());
rep_enc.put(rep_levels);
let rep_bytes = rep_enc.consume();

let mut def_enc = LevelEncoder::v2(max_def_level, def_levels.len());
def_enc.put(def_levels);
let def_bytes = def_enc.consume();

let val_bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();

let mut buf = Vec::new();
buf.extend_from_slice(&rep_bytes);
buf.extend_from_slice(&def_bytes);
buf.extend_from_slice(&val_bytes);

Page::DataPageV2 {
buf: Bytes::from(buf),
num_values: rep_levels.len() as u32,
encoding: Encoding::PLAIN,
num_nulls: 0,
num_rows,
def_levels_byte_len: def_bytes.len() as u32,
rep_levels_byte_len: rep_bytes.len() as u32,
is_compressed: false,
statistics: None,
}
};

// All pages are DataPage v2 (matching the production scenario where
// parquet-rs writes only v2 data pages and no offset index is loaded,
// so at_record_boundary() returns false for non-last pages).

// Page 1 (v2): 2 records × 2 elements = [10,20], [30,40]
let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30, 40], 2);

// Page 2 (v2): 2 records × 2 elements = [50,60], [70,80]
let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70, 80], 2);

// Page 3 (v2): 1 record × 2 elements = [90,100]
let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1);

// 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100]
let pages = VecDeque::from(vec![page1, page2, page3]);
let page_reader = InMemoryPageReader::new(pages);
let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
let mut typed_reader = get_typed_column_reader::<Int32Type>(column_reader);

// Step 1 — skip 1 record:
// Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1),
// so the page is LOADED (not whole-page-skipped).
// Level-based skip consumes rep levels [0,1] for record [10,20],
// stopping at the 0 that starts record [30,40].
let skipped = typed_reader.skip_records(1).unwrap();
assert_eq!(skipped, 1);

// Step 2 — skip 2 more records ([30,40] and [50,60]):
// Mid-page in page 1 with 2 remaining levels [0,1] for [30,40].
// skip_rep_levels(2, 2): the leading 0 does NOT act as a record
// delimiter (has_partial=false, idx==0), so count_records returns
// (true, 0, 2) — all levels consumed, has_partial=true, 0 records.
//
// has_record_delimiter is false → no flush at page boundary.
// Page 1 exhausted → peek page 2 (v2, num_rows=2).
//
// With fix: flush_partial → remaining 2→1, page 2 NOT skipped
// (rows=2 > remaining=1). Load page 2, skip 1 record [50,60].
//
// Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped,
// over-counting by 1. has_partial stays true (stale from page 1).
let skipped = typed_reader.skip_records(2).unwrap();
assert_eq!(skipped, 2);

// Step 3 — read 1 record:
let mut values = Vec::new();
let mut def_levels = Vec::new();
let mut rep_levels = Vec::new();

let (records, values_read, levels_read) = typed_reader
.read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values)
.unwrap();

// Without the fix: (1, 0, 0) — phantom record from stale has_partial;
// the rep=0 on page 3 "completes" the phantom, yielding 0 values.
// With the fix: (1, 2, 2) — correctly reads record [70, 80].
assert_eq!(records, 1, "should read exactly 1 record");
assert_eq!(levels_read, 2, "should read 2 levels for the record");
assert_eq!(values_read, 2, "should read 2 non-null values");
assert_eq!(values, vec![70, 80], "should contain 4th record's values");
assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list");
assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)");
}
}
2 changes: 0 additions & 2 deletions parquet/tests/arrow_reader/row_filter/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() {
/// Without the fix, the list column over-skips by one record, causing
/// struct children to disagree on record counts.
#[test]
#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")]
fn test_row_selection_list_column_v2_page_boundary_skip() {
use arrow_array::builder::{Int32Builder, ListBuilder};

Expand Down Expand Up @@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() {
/// bug causes one leaf to over-skip by one record while the other stays
/// correct.
#[test]
#[should_panic(expected = "Not all children array length are the same!")]
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
use arrow_array::Array;
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};
Expand Down
Loading