Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b904277
fix: add move constructor for RowRanges
zhf999 Jun 1, 2026
84d7324
fix: add WhenBuffered after PreBuffer is called
zhf999 Jun 2, 2026
ffebaed
fix(ColumnIndexFilter): return all row when literal is empty
zhf999 Jun 2, 2026
7b7fd11
style: delete duplicated macro and move it to common header files
zhf999 Jun 2, 2026
e20b030
fix: compute 'column_name_to_index' only when needed
zhf999 Jun 2, 2026
ed933a4
Merge branch 'main' into zhf-small-fix
zhf999 Jun 2, 2026
6bb45ba
fix: removing 'const' in the move constructor of RowRanges
zhf999 Jun 2, 2026
7458922
refractor: delete redundant member variables in FileReaderWrapper
zhf999 Jun 3, 2026
88a7c58
refractor: change the calling chain of SetReadRange, delete unused me…
zhf999 Jun 3, 2026
c4f4ac6
refractor: split PrepareForReading(), Next(), SeekToRow() into smalle…
zhf999 Jun 3, 2026
853c651
refractor: separate functions in PageFilteredRowGroupReader into seve…
zhf999 Jun 3, 2026
f69320a
Merge branch 'main' into zhf-refractor
zhf999 Jun 3, 2026
3253c65
fix: align comments of ApplyReadRanges with actual functionality
zhf999 Jun 3, 2026
4d57b9d
fix: set default value for TargetRowGroup
zhf999 Jun 3, 2026
3fd1ac0
fix: avoid unsigned overflow
zhf999 Jun 3, 2026
45c9f52
refractor: remove unused member variable read_ranges_ in ParquetFileB…
zhf999 Jun 3, 2026
9e2ac12
Merge branch 'main' into zhf-refractor
zhf999 Jun 4, 2026
208ff6a
fix: ApplyReadRanges() can be called multiple times without modifying…
zhf999 Jun 5, 2026
dde983f
Merge branch 'main' into zhf-refractor
zhf999 Jun 5, 2026
41955f7
style: unify formating of PAINMON_PARQEUT_CATCH_AND_RETURN_STATUS and…
zhf999 Jun 5, 2026
77c1e37
fix: return an error instead of all rows when literal is empty
zhf999 Jun 5, 2026
a5e17ab
fix: include fmt/format.h and paimon/status.h
zhf999 Jun 5, 2026
a4756fd
fix: include header file utility
zhf999 Jun 5, 2026
2cab905
Merge branch 'zhf-small-fix' into zhf-refractor
zhf999 Jun 5, 2026
0a8124f
style: use explicit type instead of auto
zhf999 Jun 5, 2026
5a4eff0
style: sort the order of member functions and member variables
zhf999 Jun 5, 2026
5381487
style: replace the variable name with
zhf999 Jun 5, 2026
6579afb
style: add comment for constructor of TargetRowGroup
zhf999 Jun 5, 2026
ba9e8f8
fix: macro expanding error when type is pair<,>
zhf999 Jun 5, 2026
87de443
fix: align argument comments with TargetRowGroup constructor paramete…
zhf999 Jun 8, 2026
88a721e
Merge branch 'main' into zhf-refractor
lxy-9602 Jun 8, 2026
34789e2
style: reformat if statement
zhf999 Jun 8, 2026
88bf0c6
style: replace raw pointers in functions with smart ptr
zhf999 Jun 8, 2026
70f8eb0
refractor: remove redundant parameteres
zhf999 Jun 8, 2026
8e1ec95
refractor: resort the order of parameters
zhf999 Jun 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions src/paimon/format/parquet/column_index_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Result<RowRanges> ColumnIndexFilter::VisitLeafPredicate(
const auto& literals = leaf_predicate->Literals();
FieldType field_type = leaf_predicate->GetFieldType();

if (function_type != Function::Type::IS_NULL && function_type != Function::Type::IS_NOT_NULL &&
literals.empty()) {
return Status::Invalid(
fmt::format("predicate on column '{}' requires at least one literal", field_name));
}
std::vector<int32_t> matching_pages;

switch (function_type) {
Expand All @@ -106,37 +111,22 @@ Result<RowRanges> ColumnIndexFilter::VisitLeafPredicate(
matching_pages = FilterPagesByIsNotNull(column_index_ptr);
break;
case Function::Type::EQUAL:
if (!literals.empty()) {
matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::NOT_EQUAL:
if (!literals.empty()) {
matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::LESS_THAN:
if (!literals.empty()) {
matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type);
break;
case Function::Type::LESS_OR_EQUAL:
if (!literals.empty()) {
matching_pages =
FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::GREATER_THAN:
if (!literals.empty()) {
matching_pages =
FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type);
break;
case Function::Type::GREATER_OR_EQUAL:
if (!literals.empty()) {
matching_pages =
FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type);
}
matching_pages = FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type);
break;
case Function::Type::IN:
matching_pages = FilterPagesByIn(column_index_ptr, literals, field_type);
Expand Down
22 changes: 22 additions & 0 deletions src/paimon/format/parquet/column_index_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "gtest/gtest.h"
#include "paimon/common/predicate/equal.h"
#include "paimon/common/predicate/in.h"
#include "paimon/common/predicate/leaf_predicate_impl.h"
#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
#include "paimon/common/utils/arrow/mem_utils.h"
#include "paimon/defs.h"
Expand Down Expand Up @@ -480,4 +483,23 @@ TEST_F(ColumnIndexFilterTest, NullPredicateReturnsAllRows) {
EXPECT_EQ(row_group_row_count_, ranges.RowCount());
}

/// Predicates other than IsNull/IsNotNull are not allowed without a literal.
/// PredicateBuilder (public API) does not support constructing them without
/// a literal, so the filter should return an error for this invalid input.
TEST_F(ColumnIndexFilterTest, EmptyLiteralsReturnsError) {
auto pred = std::make_shared<paimon::LeafPredicateImpl>(paimon::Equal::Instance(), 0, "val",
FieldType::INT, std::vector<Literal>());
auto result = Filter(pred);
EXPECT_FALSE(result.ok());
}

/// Empty literals for IN predicate — same rule applies: non-IS_NULL/IS_NOT_NULL
/// predicates without literals are invalid and should return an error.
TEST_F(ColumnIndexFilterTest, EmptyLiteralsInReturnsError) {
auto pred = std::make_shared<paimon::LeafPredicateImpl>(paimon::In::Instance(), 0, "val",
FieldType::INT, std::vector<Literal>());
auto result = Filter(pred);
EXPECT_FALSE(result.ok());
}

} // namespace paimon::parquet::test
542 changes: 248 additions & 294 deletions src/paimon/format/parquet/file_reader_wrapper.cpp

Large diffs are not rendered by default.

67 changes: 35 additions & 32 deletions src/paimon/format/parquet/file_reader_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class FileReaderWrapper {
~FileReaderWrapper();

static Result<std::unique_ptr<FileReaderWrapper>> Create(
std::unique_ptr<::parquet::arrow::FileReader>&& reader, ::arrow::MemoryPool* pool,
int64_t batch_size);
std::unique_ptr<::parquet::arrow::FileReader>&& reader,
int64_t batch_size, std::shared_ptr<arrow::MemoryPool> pool);

/// Seek to the specified row number.
/// @param row_number The row to seek to (must be at a row group boundary).
Expand Down Expand Up @@ -87,7 +87,7 @@ class FileReaderWrapper {
}

/// Get the underlying Parquet file reader.
::parquet::arrow::FileReader* GetFileReader() const {
::parquet::arrow::FileReader* GetFileReader() {
return file_reader_.get();
}

Expand All @@ -109,24 +109,18 @@ class FileReaderWrapper {

/// Prepare for lazy reading of the specified row groups and columns.
/// Actual reader initialization is deferred until the first Next() call.
Status PrepareForReadingLazy(const std::set<int32_t>& row_group_indices,
Status PrepareForReadingLazy(const std::vector<TargetRowGroup>& target_row_groups,
const std::vector<int32_t>& column_indices);

/// Prepare for immediate reading of the specified row groups and columns.
/// Initializes the reader and starts pre-buffering I/O.
Status PrepareForReading(const std::set<int32_t>& row_group_indices,
Status PrepareForReading(const std::vector<TargetRowGroup>& target_row_groups,
const std::vector<int32_t>& column_indices);

/// Filter row groups by read ranges, returning only those that overlap.
Result<std::set<int32_t>> FilterRowGroupsByReadRanges(
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
const std::vector<int32_t>& src_row_groups) const;

/// Set per-row-group RowRanges for page-level filtering.
/// Only partially matched row groups should have entries.
void SetRowGroupRowRanges(const std::map<int32_t, RowRanges>& ranges) {
row_group_row_ranges_ = ranges;
}
/// Apply read ranges to the current target_row_groups_, keeping only those
/// whose row-group range is equal to one of the given read ranges.
/// Resets reader state so that the next Next() call will re-initialize.
Status ApplyReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges);
Comment thread
zhf999 marked this conversation as resolved.

/// Get the page index reader for the file.
/// Returns nullptr if page index is not available.
Expand All @@ -144,21 +138,38 @@ class FileReaderWrapper {
private:
FileReaderWrapper(std::unique_ptr<::parquet::arrow::FileReader>&& file_reader,
const std::vector<std::pair<uint64_t, uint64_t>>& all_row_group_ranges,
uint64_t num_rows, ::arrow::MemoryPool* pool, int64_t batch_size);
uint64_t num_rows, int64_t batch_size,
std::shared_ptr<::arrow::MemoryPool> pool);

/// Wait for all pending PreBuffer operations to complete.
void WaitForPendingPreBuffer();

/// Advance current_row_group_idx_ to the next row group and update next_row_to_read_.
void AdvanceToNextRowGroup();

/// Read next batch from a page-filtered row group. Returns nullptr when the RG is exhausted.
Result<std::shared_ptr<arrow::RecordBatch>> NextPageFiltered();

Result<std::set<int32_t>> ReadRangesToRowGroupIds(
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) const;
Result<int32_t> GetRowGroupId(std::pair<uint64_t, uint64_t> target_range) const;
/// Read next batch from the fully-matched batch_reader_. Returns nullptr when exhausted.
Result<std::shared_ptr<arrow::RecordBatch>> NextFullyMatched();

/// Build page_filtered_read_schema_ from the given column indices. No-op if already built.
Status BuildPageFilteredSchema(const std::vector<int32_t>& column_indices);

/// Collect all byte ranges that need pre-buffering (page-filtered + fully-matched).
std::vector<::arrow::io::ReadRange> CollectPreBufferRanges(
const std::vector<int32_t>& column_indices);

/// Dispatch a single PreBufferRanges call with merged ranges.
void DispatchPreBuffer(std::vector<::arrow::io::ReadRange> ranges);

std::unique_ptr<::parquet::arrow::FileReader> file_reader_;
std::unique_ptr<arrow::RecordBatchReader> batch_reader_;

std::vector<std::pair<uint64_t, uint64_t>> all_row_group_ranges_;
std::set<int32_t> target_row_group_indices_;
std::vector<std::pair<uint64_t, uint64_t>> target_row_groups_;
std::vector<int32_t> target_column_indices_;

::arrow::MemoryPool* pool_;
std::shared_ptr<::arrow::MemoryPool> pool_;
int64_t batch_size_; // 0 means no limit

const uint64_t num_rows_;
Expand All @@ -175,13 +186,8 @@ class FileReaderWrapper {
RowRanges current_filtered_row_ranges_; // RowRanges for the active page-filtered RG
uint64_t current_filtered_rg_start_ = 0; // Absolute row-group start row number

// Page-level filtering state. Externally injected via SetRowGroupRowRanges and
// looked up by row group index when entering a page-filtered RG.
std::map<int32_t, RowRanges> row_group_row_ranges_;

// Set of target_row_groups_ positional indices that use page-filtered reading.
// Built in PrepareForReading from row_group_row_ranges_.
std::set<uint64_t> page_filtered_indices_;
// Target row groups with row ranges for none page-level filtering and page-level filtering
std::vector<TargetRowGroup> target_row_groups_;

Comment thread
zhf999 marked this conversation as resolved.
// Arrow schema covering target_column_indices_, used when constructing the per-RG
// page-filtered reader. Cached in PrepareForReading because it's identical across
Expand All @@ -190,9 +196,6 @@ class FileReaderWrapper {

// Track pre-buffered ranges so we can wait on destruction
std::vector<::arrow::io::ReadRange> prebuffered_ranges_;

/// Wait for all pending PreBuffer operations to complete.
void WaitForPendingPreBuffer();
};

} // namespace paimon::parquet
Loading
Loading