Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions src/paimon/format/parquet/file_reader_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Result<std::unique_ptr<FileReaderWrapper>> FileReaderWrapper::Create(
std::move(file_reader), all_row_group_ranges, num_rows, batch_size, pool));
std::vector<TargetRowGroup> all_target_row_groups;
for (int32_t i = 0; i < file_reader_wrapper->GetNumberOfRowGroups(); i++) {
all_target_row_groups.emplace_back(/*rg_index=*/i, /*page_filtered=*/false,
all_target_row_groups.emplace_back(/*rg_index=*/i, /*is_partially_matched=*/false,
/*ranges=*/RowRanges());
}
PAIMON_RETURN_NOT_OK(
Expand Down Expand Up @@ -144,7 +144,7 @@ FileReaderWrapper::FileReaderWrapper(
int64_t batch_size, std::shared_ptr<::arrow::MemoryPool> pool)
: file_reader_(std::move(file_reader)),
all_row_group_ranges_(all_row_group_ranges),
pool_(pool),
pool_(std::move(pool)),
batch_size_(batch_size),
num_rows_(num_rows) {}

Expand Down Expand Up @@ -184,7 +184,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) {
if (target_row_groups_[i].excluded_by_read_range) {
continue;
}
uint32_t rg_id = target_row_groups_[i].row_group_index;
int32_t rg_id = target_row_groups_[i].row_group_index;
uint64_t rg_start = all_row_group_ranges_[rg_id].first;
uint64_t rg_end = all_row_group_ranges_[rg_id].second;
if (row_number > rg_start && row_number < rg_end) {
Expand Down Expand Up @@ -297,12 +297,13 @@ Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::Next() {
}

while (current_row_group_idx_ < target_row_groups_.size()) {
bool is_page_filtered = target_row_groups_[current_row_group_idx_].is_partially_matched;
bool is_partially_matched =
target_row_groups_[current_row_group_idx_].is_partially_matched;
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::RecordBatch> batch,
is_page_filtered ? NextPageFiltered() : NextFullyMatched());
is_partially_matched ? NextPageFiltered() : NextFullyMatched());
if (batch) {
return batch;
} else if (!is_page_filtered) {
} else if (!is_partially_matched) {
// Null from fully-matched path means batch_reader_ is globally exhausted.
break;
}
Expand Down Expand Up @@ -424,13 +425,18 @@ Status FileReaderWrapper::PrepareForReading(const std::vector<TargetRowGroup>& t
}
}

bool has_page_filtered = fully_matched_row_groups.size() != active_count;
if (has_page_filtered) {
bool has_partially_matched = fully_matched_row_groups.size() != active_count;
if (has_partially_matched) {
PAIMON_RETURN_NOT_OK(BuildPageFilteredSchema(column_indices));
}

WaitForPendingPreBuffer();

// TODO(Yonghao Fang): Neither Paimon nor Arrow manage the size and lifecycle of prebuffered
// caches. So when a lot of row is needed, there is possibility of OOM due to too much
// prebuffering. Also, DispatchPreBuffer will drop previous prebuffered ranges by
// GetRecordBatchReader, which cause IO wastes.
Comment thread
zhf999 marked this conversation as resolved.

// Create standard reader for fully-matched row groups.
if (!fully_matched_row_groups.empty()) {
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader(
Expand All @@ -441,7 +447,7 @@ Status FileReaderWrapper::PrepareForReading(const std::vector<TargetRowGroup>& t

// When page-filtered RGs exist, issue a single PreBuffer covering both kinds.
// Otherwise GetRecordBatchReader already issued PreBuffer internally.
if (has_page_filtered) {
if (has_partially_matched) {
auto all_ranges = CollectPreBufferRanges(column_indices);
DispatchPreBuffer(std::move(all_ranges));
}
Expand Down
5 changes: 3 additions & 2 deletions src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ std::pair<RowRanges, int64_t> PageFilteredRowGroupReader::ComputeCompressedRowRa
}

Status PageFilteredRowGroupReader::ExecuteSkipReadPattern(
std::shared_ptr<::parquet::internal::RecordReader> record_reader, const RowRanges& ranges,
int64_t total_row_count, int32_t row_group_index, int32_t column_index) {
const std::shared_ptr<::parquet::internal::RecordReader>& record_reader,
const RowRanges& ranges, int64_t total_row_count, int32_t row_group_index,
int32_t column_index) {
int64_t current_row = 0;
for (const auto& range : ranges.GetRanges()) {
if (range.from > current_row) {
Expand Down
5 changes: 3 additions & 2 deletions src/paimon/format/parquet/page_filtered_row_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ class PageFilteredRowGroupReader {

/// Execute the skip/read pattern on a RecordReader based on RowRanges.
static Status ExecuteSkipReadPattern(
std::shared_ptr<::parquet::internal::RecordReader> record_reader, const RowRanges& ranges,
int64_t total_row_count, int32_t row_group_index, int32_t column_index);
const std::shared_ptr<::parquet::internal::RecordReader>& record_reader,
const RowRanges& ranges, int64_t total_row_count, int32_t row_group_index,
int32_t column_index);
Comment thread
zhf999 marked this conversation as resolved.

/// Create a data_page_filter callback for a column based on RowRanges + OffsetIndex.
static std::function<bool(const ::parquet::DataPageStats&)> MakePageFilter(
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/format/parquet/parquet_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ Status ParquetFileBatchReader::SetReadSchema(
for (int32_t rg_id : row_groups) {
auto it = row_group_row_ranges.find(rg_id);
if (it != row_group_row_ranges.end()) {
target_row_groups.emplace_back(/*rg_index=*/rg_id, /*page_filtered=*/true,
target_row_groups.emplace_back(/*rg_index=*/rg_id, /*is_partially_matched=*/true,
/*ranges=*/it->second);
} else {
target_row_groups.emplace_back(/*rg_index=*/rg_id,
/*page_filtered=*/false,
/*is_partially_matched=*/false,
Comment thread
zhf999 marked this conversation as resolved.
/*ranges=*/RowRanges());
}
}
Expand Down
Loading