From c0b2fae64c9102651d1c23d6ee40b47396cb33dd Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Fri, 12 Jun 2026 18:30:06 +0800 Subject: [PATCH 1/3] fix: small fixes for AI reviewing feedback --- src/paimon/format/parquet/file_reader_wrapper.cpp | 9 +++++++-- .../format/parquet/page_filtered_row_group_reader.cpp | 5 +++-- .../format/parquet/page_filtered_row_group_reader.h | 5 +++-- src/paimon/format/parquet/parquet_file_batch_reader.cpp | 4 ++-- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 542f631ae..4e9248e11 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -144,7 +144,7 @@ FileReaderWrapper::FileReaderWrapper( int64_t batch_size, std::shared_ptr<::arrow::MemoryPool> pool) : file_reader_(std::move(file_reader)), all_row_group_ranges_(all_row_group_ranges), - pool_(pool), + pool_(std::move(pool)), batch_size_(batch_size), num_rows_(num_rows) {} @@ -184,7 +184,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { if (target_row_groups_[i].excluded_by_read_range) { continue; } - uint32_t rg_id = target_row_groups_[i].row_group_index; + int32_t rg_id = target_row_groups_[i].row_group_index; uint64_t rg_start = all_row_group_ranges_[rg_id].first; uint64_t rg_end = all_row_group_ranges_[rg_id].second; if (row_number > rg_start && row_number < rg_end) { @@ -431,6 +431,11 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t WaitForPendingPreBuffer(); + // TODO(Yonghao Fang): Neither Paimon nor Arrow manage the size and lifecycle of prebuffered + // caches. So when a lot of row is needed, there is possibility of OOM due to too much + // prebuffering. Also, DispatchPreBuffer will drop previous prebuffered ranges by + // GetRecordBatchReader, which cause IO wastes. + // Create standard reader for fully-matched row groups. if (!fully_matched_row_groups.empty()) { PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader( diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 5f6a0f917..9c87438b8 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -123,8 +123,9 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa } Status PageFilteredRowGroupReader::ExecuteSkipReadPattern( - std::shared_ptr<::parquet::internal::RecordReader> record_reader, const RowRanges& ranges, - int64_t total_row_count, int32_t row_group_index, int32_t column_index) { + const std::shared_ptr<::parquet::internal::RecordReader>& record_reader, + const RowRanges& ranges, int64_t total_row_count, int32_t row_group_index, + int32_t column_index) { int64_t current_row = 0; for (const auto& range : ranges.GetRanges()) { if (range.from > current_row) { diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index abd4a0add..5092bb5ca 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -88,8 +88,9 @@ class PageFilteredRowGroupReader { /// Execute the skip/read pattern on a RecordReader based on RowRanges. static Status ExecuteSkipReadPattern( - std::shared_ptr<::parquet::internal::RecordReader> record_reader, const RowRanges& ranges, - int64_t total_row_count, int32_t row_group_index, int32_t column_index); + const std::shared_ptr<::parquet::internal::RecordReader>& record_reader, + const RowRanges& ranges, int64_t total_row_count, int32_t row_group_index, + int32_t column_index); /// Create a data_page_filter callback for a column based on RowRanges + OffsetIndex. static std::function MakePageFilter( diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 4c153226e..7533cb99a 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -199,11 +199,11 @@ Status ParquetFileBatchReader::SetReadSchema( for (int32_t rg_id : row_groups) { auto it = row_group_row_ranges.find(rg_id); if (it != row_group_row_ranges.end()) { - target_row_groups.emplace_back(/*rg_index=*/rg_id, /*page_filtered=*/true, + target_row_groups.emplace_back(/*rg_index=*/rg_id, /*is_partially_matched=*/true, /*ranges=*/it->second); } else { target_row_groups.emplace_back(/*rg_index=*/rg_id, - /*page_filtered=*/false, + /*is_partially_matched=*/false, /*ranges=*/RowRanges()); } } From 2b3ca460bd135a935d95862fd2feb13846997157 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Fri, 12 Jun 2026 18:48:26 +0800 Subject: [PATCH 2/3] style: replace with --- src/paimon/format/parquet/file_reader_wrapper.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 4e9248e11..d7d5d364e 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -105,7 +105,7 @@ Result> FileReaderWrapper::Create( std::move(file_reader), all_row_group_ranges, num_rows, batch_size, pool)); std::vector all_target_row_groups; for (int32_t i = 0; i < file_reader_wrapper->GetNumberOfRowGroups(); i++) { - all_target_row_groups.emplace_back(/*rg_index=*/i, /*page_filtered=*/false, + all_target_row_groups.emplace_back(/*rg_index=*/i, /*is_partially_matched=*/false, /*ranges=*/RowRanges()); } PAIMON_RETURN_NOT_OK( @@ -297,12 +297,12 @@ Result> FileReaderWrapper::Next() { } while (current_row_group_idx_ < target_row_groups_.size()) { - bool is_page_filtered = target_row_groups_[current_row_group_idx_].is_partially_matched; + bool is_partially_matched = target_row_groups_[current_row_group_idx_].is_partially_matched; PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, - is_page_filtered ? NextPageFiltered() : NextFullyMatched()); + is_partially_matched ? NextPageFiltered() : NextFullyMatched()); if (batch) { return batch; - } else if (!is_page_filtered) { + } else if (!is_partially_matched) { // Null from fully-matched path means batch_reader_ is globally exhausted. break; } @@ -424,8 +424,8 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t } } - bool has_page_filtered = fully_matched_row_groups.size() != active_count; - if (has_page_filtered) { + bool has_partially_matched = fully_matched_row_groups.size() != active_count; + if (has_partially_matched) { PAIMON_RETURN_NOT_OK(BuildPageFilteredSchema(column_indices)); } @@ -446,7 +446,7 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t // When page-filtered RGs exist, issue a single PreBuffer covering both kinds. // Otherwise GetRecordBatchReader already issued PreBuffer internally. - if (has_page_filtered) { + if (has_partially_matched) { auto all_ranges = CollectPreBufferRanges(column_indices); DispatchPreBuffer(std::move(all_ranges)); } From 3dc3103dfd1d7cc908917f9b96ea354d88c7b4a1 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Fri, 12 Jun 2026 18:53:34 +0800 Subject: [PATCH 3/3] style: clang-format --- src/paimon/format/parquet/file_reader_wrapper.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index d7d5d364e..e7d6bf606 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -297,7 +297,8 @@ Result> FileReaderWrapper::Next() { } while (current_row_group_idx_ < target_row_groups_.size()) { - bool is_partially_matched = target_row_groups_[current_row_group_idx_].is_partially_matched; + bool is_partially_matched = + target_row_groups_[current_row_group_idx_].is_partially_matched; PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, is_partially_matched ? NextPageFiltered() : NextFullyMatched()); if (batch) {