Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl InMemoryRowGroup {
let mut vs = std::mem::take(&mut self.column_chunks);
for (leaf_idx, meta) in self.row_group_metadata().columns().iter().enumerate() {
if self.mask.leaf_included(leaf_idx) {
let (start, len) = meta.byte_range();
let (start, len) = meta.byte_range()?;
let data = reader.get_bytes(start..(start + len)).await?;

vs[leaf_idx] = Some(Arc::new(ColumnChunkData {
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,8 @@ mod tests {
assert_eq!(async_batches, sync_batches);

let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range().unwrap();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range().unwrap();

assert_eq!(
&requests[..],
Expand Down
5 changes: 3 additions & 2 deletions parquet/src/arrow/in_memory_row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ impl InMemoryRowGroup<'_> {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges: Vec<Range<u64>> = vec![];
let (start, _len) = chunk_meta.byte_range();
let (start_i64, _len) = chunk_meta.byte_range();
let start = start_i64 as u64;
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
Expand Down Expand Up @@ -120,7 +121,7 @@ impl InMemoryRowGroup<'_> {
.map(|(idx, _chunk)| {
let column = metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
(start as u64)..(start as u64 + length as u64)
})
.collect();
FetchRanges {
Expand Down
32 changes: 20 additions & 12 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,25 +353,33 @@ impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
) -> std::result::Result<Self, Self::Error> {
match value.r#type {
PageType::DATA_PAGE => {
let header = value.data_page_header.as_ref().unwrap();
Ok(PageMetadata {
num_rows: None,
num_levels: Some(header.num_values as _),
is_dict: false,
})
let header = value.data_page_header.as_ref();
match header {
None => Err(ParquetError::General("data page header is not set".to_string())),
Some(header) => Ok(PageMetadata {
num_rows: None,
num_levels: Some(header.num_values as _),
is_dict: false,
}),
}
}
PageType::DICTIONARY_PAGE => Ok(PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
}),
PageType::DATA_PAGE_V2 => {
let header = value.data_page_header_v2.as_ref().unwrap();
Ok(PageMetadata {
num_rows: Some(header.num_rows as _),
num_levels: Some(header.num_values as _),
is_dict: false,
})
let header = value.data_page_header_v2.as_ref();
match header {
None => {
Err(ParquetError::General("data page header v2 is not set".to_string()))
}
Some(header) => Ok(PageMetadata {
num_rows: Some(header.num_rows as _),
num_levels: Some(header.num_values as _),
is_dict: false,
}),
}
}
other => Err(ParquetError::General(format!(
"page type {other:?} cannot be converted to PageMetadata"
Expand Down
7 changes: 4 additions & 3 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
}

let decoder = if encoding == Encoding::RLE_DICTIONARY {
self.decoders[encoding as usize]
.as_mut()
.expect("Decoder for dict should have been set")
match self.decoders[encoding as usize].as_mut() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you find some code path that can trigger this error?

Some(decoder) => decoder,
None => return Err(general_err!("Internal: Decoder for dict should have been set")),
}
} else {
let slot = encoding as usize;
if self.decoders[slot].is_none() {
Expand Down
24 changes: 21 additions & 3 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use bytes::Bytes;
use half::f16;
use std::cmp::Ordering;
use std::fmt;
use std::io::Error;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::str::from_utf8;

use crate::basic::Type;
use crate::column::reader::{ColumnReader, ColumnReaderImpl};
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::encodings::decoding::PlainDecoder;
use crate::encodings::decoding::Decoder;
use crate::errors::{ParquetError, Result};
use crate::util::bit_util::FromBytes;

Expand Down Expand Up @@ -1126,7 +1129,9 @@ pub(crate) mod private {

#[inline]
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
assert!(decoder.type_length > 0);
if decoder.type_length <= 0 {
return Err(general_err!("Internal: Type length must be positive ({}).", decoder.type_length));
}

let data = decoder
.data
Expand All @@ -1150,8 +1155,9 @@ pub(crate) mod private {
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
assert!(decoder.type_length > 0);

if decoder.type_length <= 0 {
return Err(general_err!("Internal: Type length must be positive ({}).", decoder.type_length));
}
let data = decoder
.data
.as_mut()
Expand Down Expand Up @@ -1413,4 +1419,16 @@ mod tests {
assert_eq!(ba1, ba11);
assert!(ba5 > ba1);
}

#[test]
fn test_fixed_len_byte_array_zero_type_length_is_error() {
let buffer = FixedLenByteArray::from(vec![1, 2, 3]);
assert_eq!(buffer.as_bytes(), &[1, 2, 3]);
let mut decoder = PlainDecoder::<FixedLenByteArrayType>::new(0);
assert!(decoder.get(&mut vec![buffer]).is_err_and(
|x| x.to_string().contains("Internal: Type length must be positive (0)")));

assert!(decoder.skip(10).is_err_and(
|x| x.to_string().contains("Internal: Type length must be positive (0)")));
}
}
7 changes: 7 additions & 0 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,13 @@ mod tests {
assert_eq!(buffer, expected);
}

#[test]
fn test_dict_decode_doesnt_panic_on_empty() {
let mut decoder = DictDecoder::<Int32Type>::new();
let result = decoder.set_data(Bytes::from(vec![]), 0);
assert!(!result.is_ok());
}

#[test]
#[should_panic(expected = "RleValueEncoder only supports BoolType")]
fn test_rle_value_encode_int32_not_supported() {
Expand Down
38 changes: 36 additions & 2 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,11 @@ impl RleDecoder {
if self.rle_left > 0 {
let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
let dict_idx = self.current_value.unwrap() as usize;
let dict_value = dict[dict_idx].clone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to create two sample files that reproduce this error in #9434, feel free to add these as unit tests in this PR.


let dict_value = dict
Comment thread
rambleraptor marked this conversation as resolved.
.get(dict_idx)
.ok_or_else(|| ParquetError::General(format!("Index out of bounds: the len is {} but the index is {}", dict.len(), dict_idx)))?
.clone();

buffer[values_read..values_read + num_values].fill(dict_value);

Expand Down Expand Up @@ -494,10 +498,24 @@ impl RleDecoder {
self.bit_packed_left = 0;
break;
}

buffer[values_read..values_read + num_values]
.iter_mut()
.zip(index_buf[..num_values].iter())
.for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
.try_for_each(|(b, i)| -> Result<()> {
let dict_idx = *i as usize;
let dict_val = dict
.get(dict_idx)
.ok_or_else(|| {
ParquetError::General(
format!(
"Index out of bounds: the len is {} but the index is {}",
dict.len(), dict_idx))
})?;
b.clone_from(dict_val);
Ok(())
})?;

self.bit_packed_left -= num_values as u32;
values_read += num_values;
if num_values < to_read {
Expand Down Expand Up @@ -718,6 +736,22 @@ mod tests {
assert_eq!(buffer, expected);
}

#[test]
fn test_rle_decode_with_dict_out_of_bounds() {
// Test RLE encoding: 2 0s (ok) followed by 3 3s (fail out of bounds)
let dict = vec![10, 20, 30];
let data = vec![0x04, 0x00, 0x06, 0x03];
let mut decoder: RleDecoder = RleDecoder::new(3);
decoder.set_data(data.into());
let mut buffer = vec![0; 3];
let result = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 3);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().to_string(),
"Parquet error: Index out of bounds: the len is 3 but the index is 3"
);
}

#[test]
fn test_rle_skip_dict() {
// Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
Expand Down
8 changes: 2 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,17 +1040,13 @@ impl ColumnChunkMetaData {
}

/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
pub fn byte_range(&self) -> (i64, i64) {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
(col_start, col_len)
}

/// Returns statistics that are set for this column chunk,
Expand Down
4 changes: 3 additions & 1 deletion parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,9 @@ impl<R: ChunkReader> SerializedPageReader<R> {
props: ReaderPropertiesPtr,
) -> Result<Self> {
let decompressor = create_codec(meta.compression(), props.codec_options())?;
let (start, len) = meta.byte_range();
let (start_i64, len_i64) = meta.byte_range();
let start = start_i64 as u64;
let len = len_i64 as u64;

let state = match page_locations {
Some(locations) => {
Expand Down
40 changes: 12 additions & 28 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,18 @@ pub(crate) fn from_thrift_page_stats(
) -> Result<Option<Statistics>> {
Ok(match thrift_stats {
Some(stats) => {
// Number of nulls recorded, when it is not available, we just mark it as 0.
// TODO this should be `None` if there is no information about NULLS.
// see https://github.com/apache/arrow-rs/pull/6216/files
let null_count = stats.null_count.unwrap_or(0);

if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {null_count}",
)));
}

// Generic null count.
let null_count = Some(null_count as u64);
let null_count = match stats.null_count {
Some(null_count) => {
if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {null_count}",
)));
}
Some(null_count as u64)
}
None => None,
};
// Generic distinct count (count of distinct values occurring)
let distinct_count = stats.distinct_count.map(|value| value as u64);
// Whether or not statistics use deprecated min/max fields.
Expand Down Expand Up @@ -177,7 +176,6 @@ pub(crate) fn from_thrift_page_stats(
Type::BOOLEAN => check_len(&min, &max, 1),
Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8),
Type::INT96 => check_len(&min, &max, 12),
_ => Ok(()),
}?;

Expand Down Expand Up @@ -1064,21 +1062,7 @@ mod tests {
let round_tripped = from_thrift_page_stats(Type::BOOLEAN, Some(thrift_stats))
.unwrap()
.unwrap();
// TODO: remove branch when we no longer support assuming null_count==None in the thrift
// means null_count = Some(0)
if null_count.is_none() {
assert_ne!(round_tripped, statistics);
assert!(round_tripped.null_count_opt().is_some());
assert_eq!(round_tripped.null_count_opt(), Some(0));
assert_eq!(round_tripped.min_bytes_opt(), statistics.min_bytes_opt());
assert_eq!(round_tripped.max_bytes_opt(), statistics.max_bytes_opt());
assert_eq!(
round_tripped.distinct_count_opt(),
statistics.distinct_count_opt()
);
} else {
assert_eq!(round_tripped, statistics);
}
assert_eq!(round_tripped, statistics);
}

fn make_bool_stats(distinct_count: Option<u64>, null_count: Option<u64>) -> Statistics {
Expand Down
37 changes: 33 additions & 4 deletions parquet/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ impl BitWriter {
/// Maximum byte length for a VLQ encoded integer
/// MAX_VLQ_BYTE_LEN = 5 for i32, and MAX_VLQ_BYTE_LEN = 10 for i64
pub const MAX_VLQ_BYTE_LEN: usize = 10;
pub const MAX_VLQ_BITSHIFT: usize = (MAX_VLQ_BYTE_LEN - 1)*7;

pub struct BitReader {
/// The byte buffer to read from, passed in by client
Expand Down Expand Up @@ -657,16 +658,18 @@ impl BitReader {
/// Reads a VLQ encoded (in little endian order) int from the stream.
/// The encoded int must start at the beginning of a byte.
///
/// Returns `None` if there's not enough bytes in the stream. `Some` otherwise.
/// Returns `None` if there's not enough bytes in the stream.
pub fn get_vlq_int(&mut self) -> Option<i64> {
let mut shift = 0;
let mut v: i64 = 0;
while let Some(byte) = self.get_aligned::<u8>(1) {
assert!(shift != MAX_VLQ_BITSHIFT || ((byte & 0xFE) == 0),
"parquet_data_error: VLQ encoded integer has more than 64 bits");
v |= ((byte & 0x7F) as i64) << shift;
shift += 7;
assert!(
shift <= MAX_VLQ_BYTE_LEN * 7,
"Num of bytes exceed MAX_VLQ_BYTE_LEN ({MAX_VLQ_BYTE_LEN})"
shift <= MAX_VLQ_BITSHIFT * 7,
"parquet_data_error: VLQ encoded integer num of bytes exceed MAX_VLQ_BYTE_LEN ({MAX_VLQ_BYTE_LEN})"
);
if byte & 0x80 == 0 {
return Some(v);
Expand Down Expand Up @@ -811,10 +814,36 @@ mod tests {
#[test]
fn test_bit_reader_get_vlq_int() {
// 10001001 00000001 11110010 10110101 00000110
let buffer: Vec<u8> = vec![0x89, 0x01, 0xF2, 0xB5, 0x06];
let buffer: Vec<u8> = vec![
0x89, 0x01, 0xF2, 0xB5, 0x06, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x01,
];
let mut bit_reader = BitReader::from(buffer);
assert_eq!(bit_reader.get_vlq_int(), Some(137));
assert_eq!(bit_reader.get_vlq_int(), Some(105202));
assert_eq!(bit_reader.get_vlq_int(), Some(0xFFFFFFFFFFFFFFFFu64 as i64));
}

#[test]
#[should_panic(expected = "parquet_data_error:")]
fn test_bit_reader_get_vlq_int_too_many_bytes() {
let buffer: Vec<u8> =
vec![0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01];
let mut bit_reader = BitReader::from(buffer);
// should panic with "parquet_data_error:"
let ignored = bit_reader.get_vlq_int();
}

#[test]
#[should_panic(expected = "parquet_data_error:")]
fn test_bit_reader_get_vlq_int_no_bitshift_overflow() {
// The last byte can only have one bit set. If more are set a left shift
// overflow would be triggered. Check that we return None in this case,
// rather than executing the left shift overflow.
let buffer: Vec<u8> = vec![0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x03];
let mut bit_reader = BitReader::from(buffer);
// should panic with "parquet_data_error:"
let ignored = bit_reader.get_vlq_int();
}

#[test]
Expand Down
Loading