diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 4594717f..d82bec75 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -316,18 +316,20 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange for (int32_t col_idx : column_indices) { auto col_chunk = rg_metadata->ColumnChunk(col_idx); int64_t data_page_offset = col_chunk->data_page_offset(); - int64_t total_compressed_size = col_chunk->total_compressed_size(); - int64_t chunk_end = data_page_offset + total_compressed_size; - + int64_t data_page_compressed_size = col_chunk->total_compressed_size(); // Dictionary page: always include if present if (col_chunk->has_dictionary_page()) { int64_t dict_offset = col_chunk->dictionary_page_offset(); int64_t dict_size = data_page_offset - dict_offset; if (dict_size > 0) { + // if dictionary exists, the data page size should be reduced by the dictionary + data_page_compressed_size -= dict_size; ranges.push_back({dict_offset, dict_size}); } } + int64_t chunk_end = data_page_offset + data_page_compressed_size; + // Try to get OffsetIndex for page-level ranges std::shared_ptr<::parquet::OffsetIndex> offset_index; if (rg_page_index_reader) { @@ -336,7 +338,7 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange if (!offset_index) { // No OffsetIndex: fall back to entire column chunk - ranges.push_back({data_page_offset, total_compressed_size}); + ranges.push_back({data_page_offset, data_page_compressed_size}); continue; } diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index bd693730..477b2bf3 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -74,7 +74,8 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test { /// @param max_row_group_length Controls row group size void WriteTestFile(const std::string& file_name, const std::shared_ptr& struct_array, - int32_t write_batch_size, int64_t max_row_group_length) { + int32_t write_batch_size, int64_t max_row_group_length, + bool enable_dictionary = false) { auto data_type = struct_array->struct_type(); auto data_schema = arrow::schema(data_type->fields()); auto data_arrow_array = std::make_unique(); @@ -84,7 +85,11 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test { ::parquet::WriterProperties::Builder builder; builder.write_batch_size(write_batch_size); builder.max_row_group_length(max_row_group_length); - builder.disable_dictionary(); // Ensure page index min/max are meaningful + if (enable_dictionary) { + builder.enable_dictionary(); + } else { + builder.disable_dictionary(); // Ensure page index min/max are meaningful + } builder.enable_write_page_index(); // Enable page index for page-level filtering // Set data page size to 1 byte to force a new page after every write_batch_size rows. // The writer flushes a page when accumulated data exceeds data_pagesize, so setting @@ -719,4 +724,145 @@ TEST_F(PageFilteredRowGroupReaderTest, EndToEndPageLevelPreBuffer) { ASSERT_EQ(10, offset); } +/// Test: ComputePageRanges with dictionary encoding produces correct chunk_end. +/// +/// When dictionary encoding is enabled, the column chunk layout is: +/// [Dictionary Page] [Data Page 0] [Data Page 1] ... [Data Page N] +/// And total_compressed_size covers the entire chunk starting from dictionary_page_offset. +/// +/// The bug: chunk_end = data_page_offset + total_compressed_size is wrong because +/// total_compressed_size already includes the dictionary page size. The correct +/// chunk_end should be dictionary_page_offset + total_compressed_size. +/// +/// This test verifies that: +/// 1. No range exceeds the true chunk boundary (overshoot regression). +/// 2. At least one non-dictionary data-page range is present (not truncated). +/// 3. The maximum range_end equals true_chunk_end when requesting all rows. +/// 4. End-to-end reads with page-level filtering return correct query results. +TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding) { + std::string file_name = dir_->Str() + "/compute_ranges_dict.parquet"; + + // Use low-cardinality data to ensure dictionary encoding is actually used. + // 100 rows with values cycling through 0..9 → dictionary will have 10 entries. + arrow::Int32Builder val_builder; + ASSERT_TRUE(val_builder.Reserve(100).ok()); + for (int32_t i = 0; i < 100; ++i) { + val_builder.UnsafeAppend(i % 10); + } + auto val_array = val_builder.Finish().ValueOrDie(); + auto field = arrow::field("val", arrow::int32()); + auto struct_array = arrow::StructArray::Make({val_array}, {field}).ValueOrDie(); + + // Write with dictionary encoding enabled and 1 row per page. + // Each page has min==max==val for that row, enabling precise page-level skipping. + WriteTestFile(file_name, struct_array, /*write_batch_size=*/1, + /*max_row_group_length=*/100, /*enable_dictionary=*/true); + + // Open the file and verify metadata confirms dictionary page presence + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_name)); + ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length()); + auto in_stream = std::make_shared(in, arrow_pool_, length); + auto parquet_reader = ::parquet::ParquetFileReader::Open(in_stream); + ASSERT_TRUE(parquet_reader); + + auto file_metadata = parquet_reader->metadata(); + auto rg_metadata = file_metadata->RowGroup(0); + auto col_chunk = rg_metadata->ColumnChunk(0); + + // Precondition: dictionary page must exist for this test to be meaningful + ASSERT_TRUE(col_chunk->has_dictionary_page()); + + int64_t dict_offset = col_chunk->dictionary_page_offset(); + int64_t data_page_offset = col_chunk->data_page_offset(); + int64_t total_compressed_size = col_chunk->total_compressed_size(); + + // The true chunk end is dict_offset + total_compressed_size + int64_t true_chunk_end = dict_offset + total_compressed_size; + // The buggy chunk end would be data_page_offset + total_compressed_size + int64_t buggy_chunk_end = data_page_offset + total_compressed_size; + + // Sanity: dict page is before data pages, so buggy end > true end + ASSERT_LT(dict_offset, data_page_offset); + ASSERT_GT(buggy_chunk_end, true_chunk_end); + // Now call ComputePageRanges with all rows matching + RowRanges row_ranges; + row_ranges.Add(RowRanges::Range(0, 99)); + + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), /*row_group_index=*/0, row_ranges, /*column_indices=*/{0}); + + ASSERT_FALSE(ranges.empty()); + + // --- Check 1: No range should extend beyond the true chunk end --- + // With the bug, the last data page's range would use chunk_end = data_page_offset + + // total_compressed_size, which overshoots by the dictionary page size. + for (auto & range : ranges) { + int64_t range_end = range.offset + range.length; + ASSERT_LE(range_end, true_chunk_end); + } + + // --- Check 2: At least one non-dictionary data-page range is present --- + // Guards against truncation: if only the dictionary range is returned, the test + // would still pass the overshoot check but miss that data pages are lost. + int data_page_range_count = 0; + for (const auto& range : ranges) { + if (range.offset >= data_page_offset) { + ++data_page_range_count; + } + } + ASSERT_GE(data_page_range_count, 1); + + // --- Check 3: Maximum range_end equals true_chunk_end when requesting all rows --- + int64_t max_range_end = 0; + for (const auto& range : ranges) { + int64_t range_end = range.offset + range.length; + max_range_end = std::max(max_range_end, range_end); + } + ASSERT_EQ(max_range_end, true_chunk_end); + + // --- Check 4: No range exceeds file size --- + for (const auto& range : ranges) { + ASSERT_LE(range.offset + range.length, static_cast(length)); + } + + // --- End-to-end check 1: read all rows (no predicate filtering) --- + // Verifies that reading a dictionary-encoded file with page index enabled + // returns all 100 rows with correct values. + auto read_schema = arrow::schema({field}); + auto predicate_all = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(0)); + std::shared_ptr result_all; + ReadWithPredicateImpl(file_name, read_schema, predicate_all, &result_all); + ASSERT_TRUE(result_all); + ASSERT_EQ(100, result_all->length()); + + // --- End-to-end check 2: full range query with page level skipping --- + // Build expected array: val = i % 10 for i in [0, 100), wrapped in a struct. + // Concatenate all chunks and compare with Equals + auto actual_struct_arr = arrow::Concatenate(result_all->chunks()).ValueOrDie(); + ASSERT_TRUE(actual_struct_arr->Equals(struct_array)); + + // --- End-to-end check 3: partial-row query with page-level skipping --- + // Predicate val >= 7 skips pages where val < 7, keeping only val in {7,8,9}. + // Out of 100 rows, 30 rows satisfy val >= 7 (3 per cycle × 10 cycles). + auto predicate_partial = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(7)); + std::shared_ptr result_partial; + ReadWithPredicateImpl(file_name, read_schema, predicate_partial, &result_partial); + ASSERT_TRUE(result_partial); + + // Build expected StructArray and compare with Equals + arrow::Int32Builder expected_builder; + ASSERT_TRUE(expected_builder.Reserve(30).ok()); + for (int32_t i = 0; i < 100; ++i) { + if (i % 10 >= 7) { + expected_builder.UnsafeAppend(i % 10); + } + } + auto expected_array = expected_builder.Finish().ValueOrDie(); + auto expected_struct = arrow::StructArray::Make({expected_array}, {field}).ValueOrDie(); + auto partial_concat = arrow::Concatenate(result_partial->chunks()).ValueOrDie(); + ASSERT_TRUE(partial_concat->Equals(expected_struct)); +} + } // namespace paimon::parquet::test