Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,20 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange
for (int32_t col_idx : column_indices) {
auto col_chunk = rg_metadata->ColumnChunk(col_idx);
int64_t data_page_offset = col_chunk->data_page_offset();
int64_t total_compressed_size = col_chunk->total_compressed_size();
int64_t chunk_end = data_page_offset + total_compressed_size;

int64_t data_page_compressed_size = col_chunk->total_compressed_size();
// Dictionary page: always include if present
if (col_chunk->has_dictionary_page()) {
int64_t dict_offset = col_chunk->dictionary_page_offset();
int64_t dict_size = data_page_offset - dict_offset;
if (dict_size > 0) {
// if dictionary exists, the data page size should be reduced by the dictionary
data_page_compressed_size -= dict_size;
ranges.push_back({dict_offset, dict_size});
}
Comment thread
zhf999 marked this conversation as resolved.
}

int64_t chunk_end = data_page_offset + data_page_compressed_size;
Comment thread
zhf999 marked this conversation as resolved.

// Try to get OffsetIndex for page-level ranges
std::shared_ptr<::parquet::OffsetIndex> offset_index;
if (rg_page_index_reader) {
Expand All @@ -336,7 +338,7 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange

if (!offset_index) {
// No OffsetIndex: fall back to entire column chunk
ranges.push_back({data_page_offset, total_compressed_size});
ranges.push_back({data_page_offset, data_page_compressed_size});
continue;
}

Expand Down
150 changes: 148 additions & 2 deletions src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test {
/// @param max_row_group_length Controls row group size
void WriteTestFile(const std::string& file_name,
const std::shared_ptr<arrow::StructArray>& struct_array,
int32_t write_batch_size, int64_t max_row_group_length) {
int32_t write_batch_size, int64_t max_row_group_length,
bool enable_dictionary = false) {
auto data_type = struct_array->struct_type();
auto data_schema = arrow::schema(data_type->fields());
auto data_arrow_array = std::make_unique<ArrowArray>();
Expand All @@ -84,7 +85,11 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test {
::parquet::WriterProperties::Builder builder;
builder.write_batch_size(write_batch_size);
builder.max_row_group_length(max_row_group_length);
builder.disable_dictionary(); // Ensure page index min/max are meaningful
if (enable_dictionary) {
builder.enable_dictionary();
} else {
builder.disable_dictionary(); // Ensure page index min/max are meaningful
}
builder.enable_write_page_index(); // Enable page index for page-level filtering
// Set data page size to 1 byte to force a new page after every write_batch_size rows.
// The writer flushes a page when accumulated data exceeds data_pagesize, so setting
Expand Down Expand Up @@ -719,4 +724,145 @@ TEST_F(PageFilteredRowGroupReaderTest, EndToEndPageLevelPreBuffer) {
ASSERT_EQ(10, offset);
}

/// Test: ComputePageRanges with dictionary encoding produces correct chunk_end.
///
/// When dictionary encoding is enabled, the column chunk layout is:
/// [Dictionary Page] [Data Page 0] [Data Page 1] ... [Data Page N]
/// And total_compressed_size covers the entire chunk starting from dictionary_page_offset.
///
/// The bug: chunk_end = data_page_offset + total_compressed_size is wrong because
/// total_compressed_size already includes the dictionary page size. The correct
/// chunk_end should be dictionary_page_offset + total_compressed_size.
///
/// This test verifies that:
/// 1. No range exceeds the true chunk boundary (overshoot regression).
/// 2. At least one non-dictionary data-page range is present (not truncated).
/// 3. The maximum range_end equals true_chunk_end when requesting all rows.
/// 4. End-to-end reads with page-level filtering return correct query results.
TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding) {
std::string file_name = dir_->Str() + "/compute_ranges_dict.parquet";

// Use low-cardinality data to ensure dictionary encoding is actually used.
// 100 rows with values cycling through 0..9 → dictionary will have 10 entries.
arrow::Int32Builder val_builder;
ASSERT_TRUE(val_builder.Reserve(100).ok());
for (int32_t i = 0; i < 100; ++i) {
val_builder.UnsafeAppend(i % 10);
}
auto val_array = val_builder.Finish().ValueOrDie();
auto field = arrow::field("val", arrow::int32());
auto struct_array = arrow::StructArray::Make({val_array}, {field}).ValueOrDie();
Comment thread
lxy-9602 marked this conversation as resolved.

// Write with dictionary encoding enabled and 1 row per page.
// Each page has min==max==val for that row, enabling precise page-level skipping.
WriteTestFile(file_name, struct_array, /*write_batch_size=*/1,
/*max_row_group_length=*/100, /*enable_dictionary=*/true);

// Open the file and verify metadata confirms dictionary page presence
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file_name));
Comment thread
zhf999 marked this conversation as resolved.
ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length());
auto in_stream = std::make_shared<ArrowInputStreamAdapter>(in, arrow_pool_, length);
auto parquet_reader = ::parquet::ParquetFileReader::Open(in_stream);
ASSERT_TRUE(parquet_reader);

auto file_metadata = parquet_reader->metadata();
auto rg_metadata = file_metadata->RowGroup(0);
auto col_chunk = rg_metadata->ColumnChunk(0);

// Precondition: dictionary page must exist for this test to be meaningful
ASSERT_TRUE(col_chunk->has_dictionary_page());

int64_t dict_offset = col_chunk->dictionary_page_offset();
int64_t data_page_offset = col_chunk->data_page_offset();
int64_t total_compressed_size = col_chunk->total_compressed_size();

// The true chunk end is dict_offset + total_compressed_size
int64_t true_chunk_end = dict_offset + total_compressed_size;
// The buggy chunk end would be data_page_offset + total_compressed_size
int64_t buggy_chunk_end = data_page_offset + total_compressed_size;

// Sanity: dict page is before data pages, so buggy end > true end
ASSERT_LT(dict_offset, data_page_offset);
ASSERT_GT(buggy_chunk_end, true_chunk_end);
// Now call ComputePageRanges with all rows matching
RowRanges row_ranges;
row_ranges.Add(RowRanges::Range(0, 99));

auto ranges = PageFilteredRowGroupReader::ComputePageRanges(
parquet_reader.get(), /*row_group_index=*/0, row_ranges, /*column_indices=*/{0});

ASSERT_FALSE(ranges.empty());

// --- Check 1: No range should extend beyond the true chunk end ---
// With the bug, the last data page's range would use chunk_end = data_page_offset +
// total_compressed_size, which overshoots by the dictionary page size.
for (auto & range : ranges) {
int64_t range_end = range.offset + range.length;
ASSERT_LE(range_end, true_chunk_end);
}

// --- Check 2: At least one non-dictionary data-page range is present ---
// Guards against truncation: if only the dictionary range is returned, the test
// would still pass the overshoot check but miss that data pages are lost.
int data_page_range_count = 0;
for (const auto& range : ranges) {
if (range.offset >= data_page_offset) {
++data_page_range_count;
}
}
ASSERT_GE(data_page_range_count, 1);

// --- Check 3: Maximum range_end equals true_chunk_end when requesting all rows ---
int64_t max_range_end = 0;
for (const auto& range : ranges) {
int64_t range_end = range.offset + range.length;
max_range_end = std::max(max_range_end, range_end);
}
Comment thread
zhf999 marked this conversation as resolved.
ASSERT_EQ(max_range_end, true_chunk_end);

// --- Check 4: No range exceeds file size ---
for (const auto& range : ranges) {
ASSERT_LE(range.offset + range.length, static_cast<int64_t>(length));
}

// --- End-to-end check 1: read all rows (no predicate filtering) ---
// Verifies that reading a dictionary-encoded file with page index enabled
// returns all 100 rows with correct values.
auto read_schema = arrow::schema({field});
auto predicate_all = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(0));
std::shared_ptr<arrow::ChunkedArray> result_all;
ReadWithPredicateImpl(file_name, read_schema, predicate_all, &result_all);
ASSERT_TRUE(result_all);
ASSERT_EQ(100, result_all->length());

// --- End-to-end check 2: full range query with page level skipping ---
// Build expected array: val = i % 10 for i in [0, 100), wrapped in a struct.
// Concatenate all chunks and compare with Equals
auto actual_struct_arr = arrow::Concatenate(result_all->chunks()).ValueOrDie();
ASSERT_TRUE(actual_struct_arr->Equals(struct_array));

// --- End-to-end check 3: partial-row query with page-level skipping ---
// Predicate val >= 7 skips pages where val < 7, keeping only val in {7,8,9}.
// Out of 100 rows, 30 rows satisfy val >= 7 (3 per cycle × 10 cycles).
auto predicate_partial = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(7));
std::shared_ptr<arrow::ChunkedArray> result_partial;
ReadWithPredicateImpl(file_name, read_schema, predicate_partial, &result_partial);
ASSERT_TRUE(result_partial);

// Build expected StructArray and compare with Equals
arrow::Int32Builder expected_builder;
ASSERT_TRUE(expected_builder.Reserve(30).ok());
for (int32_t i = 0; i < 100; ++i) {
if (i % 10 >= 7) {
expected_builder.UnsafeAppend(i % 10);
}
}
auto expected_array = expected_builder.Finish().ValueOrDie();
auto expected_struct = arrow::StructArray::Make({expected_array}, {field}).ValueOrDie();
auto partial_concat = arrow::Concatenate(result_partial->chunks()).ValueOrDie();
ASSERT_TRUE(partial_concat->Equals(expected_struct));
}

} // namespace paimon::parquet::test