diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 542f631a..e7d6bf60 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -105,7 +105,7 @@ Result> FileReaderWrapper::Create( std::move(file_reader), all_row_group_ranges, num_rows, batch_size, pool)); std::vector all_target_row_groups; for (int32_t i = 0; i < file_reader_wrapper->GetNumberOfRowGroups(); i++) { - all_target_row_groups.emplace_back(/*rg_index=*/i, /*page_filtered=*/false, + all_target_row_groups.emplace_back(/*rg_index=*/i, /*is_partially_matched=*/false, /*ranges=*/RowRanges()); } PAIMON_RETURN_NOT_OK( @@ -144,7 +144,7 @@ FileReaderWrapper::FileReaderWrapper( int64_t batch_size, std::shared_ptr<::arrow::MemoryPool> pool) : file_reader_(std::move(file_reader)), all_row_group_ranges_(all_row_group_ranges), - pool_(pool), + pool_(std::move(pool)), batch_size_(batch_size), num_rows_(num_rows) {} @@ -184,7 +184,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { if (target_row_groups_[i].excluded_by_read_range) { continue; } - uint32_t rg_id = target_row_groups_[i].row_group_index; + int32_t rg_id = target_row_groups_[i].row_group_index; uint64_t rg_start = all_row_group_ranges_[rg_id].first; uint64_t rg_end = all_row_group_ranges_[rg_id].second; if (row_number > rg_start && row_number < rg_end) { @@ -297,12 +297,13 @@ Result> FileReaderWrapper::Next() { } while (current_row_group_idx_ < target_row_groups_.size()) { - bool is_page_filtered = target_row_groups_[current_row_group_idx_].is_partially_matched; + bool is_partially_matched = + target_row_groups_[current_row_group_idx_].is_partially_matched; PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, - is_page_filtered ? NextPageFiltered() : NextFullyMatched()); + is_partially_matched ? NextPageFiltered() : NextFullyMatched()); if (batch) { return batch; - } else if (!is_page_filtered) { + } else if (!is_partially_matched) { // Null from fully-matched path means batch_reader_ is globally exhausted. break; } @@ -424,13 +425,18 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t } } - bool has_page_filtered = fully_matched_row_groups.size() != active_count; - if (has_page_filtered) { + bool has_partially_matched = fully_matched_row_groups.size() != active_count; + if (has_partially_matched) { PAIMON_RETURN_NOT_OK(BuildPageFilteredSchema(column_indices)); } WaitForPendingPreBuffer(); + // TODO(Yonghao Fang): Neither Paimon nor Arrow manage the size and lifecycle of prebuffered + // caches. So when a lot of row is needed, there is possibility of OOM due to too much + // prebuffering. Also, DispatchPreBuffer will drop previous prebuffered ranges by + // GetRecordBatchReader, which cause IO wastes. + // Create standard reader for fully-matched row groups. if (!fully_matched_row_groups.empty()) { PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader( @@ -441,7 +447,7 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t // When page-filtered RGs exist, issue a single PreBuffer covering both kinds. // Otherwise GetRecordBatchReader already issued PreBuffer internally. - if (has_page_filtered) { + if (has_partially_matched) { auto all_ranges = CollectPreBufferRanges(column_indices); DispatchPreBuffer(std::move(all_ranges)); } diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 5f6a0f91..9c87438b 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -123,8 +123,9 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa } Status PageFilteredRowGroupReader::ExecuteSkipReadPattern( - std::shared_ptr<::parquet::internal::RecordReader> record_reader, const RowRanges& ranges, - int64_t total_row_count, int32_t row_group_index, int32_t column_index) { + const std::shared_ptr<::parquet::internal::RecordReader>& record_reader, + const RowRanges& ranges, int64_t total_row_count, int32_t row_group_index, + int32_t column_index) { int64_t current_row = 0; for (const auto& range : ranges.GetRanges()) { if (range.from > current_row) { diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index abd4a0ad..5092bb5c 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -88,8 +88,9 @@ class PageFilteredRowGroupReader { /// Execute the skip/read pattern on a RecordReader based on RowRanges. static Status ExecuteSkipReadPattern( - std::shared_ptr<::parquet::internal::RecordReader> record_reader, const RowRanges& ranges, - int64_t total_row_count, int32_t row_group_index, int32_t column_index); + const std::shared_ptr<::parquet::internal::RecordReader>& record_reader, + const RowRanges& ranges, int64_t total_row_count, int32_t row_group_index, + int32_t column_index); /// Create a data_page_filter callback for a column based on RowRanges + OffsetIndex. static std::function MakePageFilter( diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 4c153226..7533cb99 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -199,11 +199,11 @@ Status ParquetFileBatchReader::SetReadSchema( for (int32_t rg_id : row_groups) { auto it = row_group_row_ranges.find(rg_id); if (it != row_group_row_ranges.end()) { - target_row_groups.emplace_back(/*rg_index=*/rg_id, /*page_filtered=*/true, + target_row_groups.emplace_back(/*rg_index=*/rg_id, /*is_partially_matched=*/true, /*ranges=*/it->second); } else { target_row_groups.emplace_back(/*rg_index=*/rg_id, - /*page_filtered=*/false, + /*is_partially_matched=*/false, /*ranges=*/RowRanges()); } }