feat(blob view): support multi-thread reading upstream table & add inte test#341
feat(blob view): support multi-thread reading upstream table & add inte test#341lszskye wants to merge 1 commit into
Conversation
| /// path and requires manual configuration by the user. No default value. | ||
| static const char BLOB_VIEW_UPSTREAM_WAREHOUSE[]; | ||
| /// "global-index.enabled" - Whether to enable global index for scan. Default value is "true". | ||
| static const char GLOBAL_INDEX_ENABLED[]; |
There was a problem hiding this comment.
Please clearly note that this differs from the Java configuration and will be deprecated once RestCatalog is supported.
|
|
||
| Status BlobUtils::ValidateBlobViewFields(const std::shared_ptr<arrow::StructArray>& struct_array, | ||
| const std::set<std::string>& view_fields) { | ||
| if (view_fields.empty()) { |
There was a problem hiding this comment.
Please merge or reuse the logic in ValidateBlobViewFields and ValidateInlineBlobDescriptors; the two code paths currently differ by only a few lines.
| // Save inline descriptor and view fields for validation in Write() | ||
| if (blob_context) { | ||
| inline_descriptor_fields_ = blob_context->GetDescriptorFields(); | ||
| inline_view_fields_ = blob_context->GetViewFields(); |
There was a problem hiding this comment.
Please merge #332 first to avoid duplicate check when write_schema does not contain view/desc fields.
| PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options")); | ||
| std::string table_path = | ||
| PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options"); | ||
|
|
There was a problem hiding this comment.
Where is the origin ASSERT_EQ check for GetTableLocation?
| uint32_t cpu_count = std::thread::hardware_concurrency(); | ||
| thread_num = cpu_count > 0 ? static_cast<int32_t>(cpu_count) : 1; | ||
| if (thread_num) { | ||
| final_executor = CreateDefaultExecutor(static_cast<uint32_t>(thread_num.value())); |
There was a problem hiding this comment.
If thread_num is set to 0, is there is any protect or bad status?
| {Options::BLOB_EXTERNAL_STORAGE_FIELD, "blob3,blob4"}, | ||
| {Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"}, | ||
| {Options::BLOB_VIEW_UPSTREAM_WAREHOUSE, "FILE:///tmp/blob_view_upstream_warehouse/"}, | ||
| {Options::PARTITION_GENERATE_LEGACY_NAME, "false"}, |
There was a problem hiding this comment.
I’m just wondering whether BLOB_VIEW_UPSTREAM_WAREHOUSE needs to be set with schema, or if /tmp/blob_view_upstream_warehouse/ is also fine.
| auto append_view = [&](arrow::LargeBinaryBuilder* builder, int32_t field_id, int64_t row_id) { | ||
| BlobViewStruct view_struct(upstream_identifier, field_id, row_id); | ||
| auto serialized = view_struct.Serialize(GetDefaultPool()); | ||
| ASSERT_TRUE( |
There was a problem hiding this comment.
GetDefaultPool to poo_, please also put out param builder to the last param.
| std::string branch_dir = | ||
| PathUtil::JoinPath(upstream_table_path, "branch/branch-" + branch_name); | ||
| for (const auto& entry : std::filesystem::directory_iterator(branch_src_table_path)) { | ||
| std::string name = entry.path().filename().string(); |
There was a problem hiding this comment.
Please use the local filesystem abstraction instead of std::filesystem unless the local fs cannot support this case.
| std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); | ||
|
|
||
| Identifier branch_identifier(upstream_db_name, upstream_table_name + "$branch_" + branch_name); | ||
| arrow::LargeBinaryBuilder view_builder; |
There was a problem hiding this comment.
Please add a test that reads a blob view database written by Java Paimon, where the upstream table is a branch. This would help verify that the identifier logic and serialization are consistent with C++, and would also avoid having to fake the upstream table in this case.
| ASSERT_OK(Commit(temp_table_path, upstream_commit_msgs)); | ||
|
|
||
| // Copy the temp table to the fallback path (without .db). | ||
| ASSERT_TRUE(TestUtil::CopyDirectory(temp_table_path, fallback_table_path)); |
There was a problem hiding this comment.
Would it be simpler and clearer to copy a blob table from test_data directly into fallback_table_path for this test?
There was a problem hiding this comment.
Pull request overview
This PR extends blob-view support by adding parallelized upstream table descriptor loading and introducing additional integration tests and updated golden test datasets to validate blob-view behavior across formats and table layouts.
Changes:
- Parallelize upstream blob descriptor preloading for blob-view resolution, including range chunking and multi-table/branch/fallback-path scenarios.
- Add new blob-view option plumbing (
blob-view-upstream-warehouse) and extend validation for inline blob-view writes. - Add/extend integration & unit tests and update ORC/Parquet golden datasets to reflect schema evolution changes.
Reviewed changes
Copilot reviewed 38 out of 87 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/paimon/core/utils/blob_view_lookup.h |
Extend blob-view lookup API for executor-based parallel loading and range splitting helpers. |
src/paimon/core/utils/blob_view_lookup.cpp |
Implement parallel descriptor preload, fallback-path resolution, and row-range chunking. |
src/paimon/core/utils/blob_view_lookup_test.cpp |
Add unit coverage for identifier access, task sizing, and range splitting. |
src/paimon/core/operation/data_evolution_split_read.cpp |
Wire blob-view resolver creation to new upstream-warehouse option and global executor. |
src/paimon/core/core_options.h |
Add accessor for blob-view upstream warehouse option. |
src/paimon/core/core_options.cpp |
Parse/store blob-view-upstream-warehouse option. |
src/paimon/core/core_options_test.cpp |
Validate default and map parsing for the new option. |
include/paimon/defs.h |
Document new blob-view upstream warehouse option. |
src/paimon/common/defs.cpp |
Add option key constant blob-view-upstream-warehouse. |
include/paimon/executor.h |
Add Executor::GetThreadNum() API. |
src/paimon/common/executor/executor.cpp |
Implement GetThreadNum() for default executor. |
src/paimon/core/mergetree/compact/merge_tree_compact_manager_test.cpp |
Update test executors to implement new executor API. |
src/paimon/core/global_index/global_index_scan_impl.cpp |
Prefer global default executor when thread-num option is unset. |
src/paimon/common/data/blob_utils.h |
Add blob-view field validation API. |
src/paimon/common/data/blob_utils.cpp |
Add blob-view field validation and tighten blob-descriptor validation messaging. |
src/paimon/common/data/blob_utils_test.cpp |
Expand blob utils tests for view validation and updated error messages. |
src/paimon/common/data/blob_view_struct_test.cpp |
Add tests for BlobViewStruct detection/validation behavior. |
src/paimon/core/append/append_only_writer.h |
Track inline view fields for write-time validation. |
src/paimon/core/append/append_only_writer.cpp |
Validate inline blob-view values during write. |
src/paimon/core/append/append_only_writer_test.cpp |
Add tests for accepting/rejecting blob-view values on write. |
include/paimon/catalog/catalog.h |
Change Catalog::GetTableLocation to return Result<std::string>. |
src/paimon/core/catalog/file_system_catalog.h |
Update catalog interface implementation signatures for new Result return. |
src/paimon/core/catalog/file_system_catalog.cpp |
Propagate Result table location and adjust internal path building. |
src/paimon/core/catalog/file_system_catalog_test.cpp |
Update tests for new GetTableLocation result type. |
test/inte/read_inte_test.cpp |
Adjust system-table integration test path usage. |
test/inte/blob_table_inte_test.cpp |
Add blob-view integration tests (external storage, multi-upstream, branch, fallback, failure). |
test/test_data/parquet/.../snapshot/snapshot-1 |
Update parquet golden snapshot metadata. |
test/test_data/parquet/.../snapshot/snapshot-2 |
Update parquet golden snapshot metadata. |
test/test_data/parquet/.../snapshot/snapshot-3 |
Update parquet golden snapshot metadata. |
test/test_data/parquet/.../schema/schema-0 |
Update parquet golden schema/options ordering and timestamps. |
test/test_data/parquet/.../schema/schema-1 |
Rename blob column and update options/timestamps in parquet golden schema. |
test/test_data/parquet/.../README |
Update parquet golden README steps/output to match blob column rename. |
test/test_data/parquet/.../bucket-0/data-50989127-...-1.blob |
Add/update parquet blob payload test file. |
test/test_data/parquet/.../bucket-0/data-50989127-...-2.blob |
Add/update parquet blob payload test file. |
test/test_data/parquet/.../bucket-0/data-ac3de758-...-1.blob |
Add/update parquet blob payload test file. |
test/test_data/parquet/.../bucket-0/data-ac3de758-...-2.blob |
Add/update parquet blob payload test file. |
test/test_data/orc/.../snapshot/snapshot-1 |
Update ORC golden snapshot metadata. |
test/test_data/orc/.../snapshot/snapshot-2 |
Update ORC golden snapshot metadata. |
test/test_data/orc/.../snapshot/snapshot-3 |
Update ORC golden snapshot metadata. |
test/test_data/orc/.../schema/schema-0 |
Update ORC golden schema/options ordering and timestamps. |
test/test_data/orc/.../schema/schema-1 |
Rename blob column and update options/timestamps in ORC golden schema. |
test/test_data/orc/.../README |
Update ORC golden README steps/output to match blob column rename. |
test/test_data/orc/.../bucket-0/data-2c772a51-...-0.orc |
Add/update ORC golden data file. |
test/test_data/orc/.../bucket-0/data-a5e684ea-...-0.orc |
Add/update ORC golden data file. |
test/test_data/orc/.../bucket-0/data-ac7f689b-...-0.orc |
Add/update ORC golden data file. |
test/test_data/orc/.../bucket-0/data-d0ddcf17-...-0.orc |
Add/update ORC golden data file. |
test/test_data/orc/.../bucket-0/data-a5e684ea-...-1.blob |
Add/update ORC blob payload test file. |
test/test_data/orc/.../bucket-0/data-a5e684ea-...-2.blob |
Add/update ORC blob payload test file. |
test/test_data/orc/.../bucket-0/data-ac7f689b-...-1.blob |
Add/update ORC blob payload test file. |
test/test_data/orc/.../bucket-0/data-ac7f689b-...-2.blob |
Add/update ORC blob payload test file. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DefaultExecutor::DefaultExecutor(uint32_t thread_count) : thread_count_(thread_count) { | ||
| for (uint32_t i = 0; i < thread_count_; ++i) { | ||
| workers_.emplace_back(&DefaultExecutor::WorkerThread, this); | ||
| } |
| int64_t BlobViewLookup::TargetRowsPerTask( | ||
| const std::unordered_map<Identifier, TableReadPlan>& plan_by_identifier, uint32_t thread_num) { | ||
| int64_t total_rows = 0; | ||
| for (const auto& [identifier, table_read_plan] : plan_by_identifier) { | ||
| for (const auto& row_range : table_read_plan.GetSortedDistinctRanges()) { | ||
| total_rows += row_range.Count(); | ||
| } | ||
| } | ||
| int64_t balanced_rows = (total_rows + thread_num - 1) / thread_num; | ||
| return std::max(MIN_ROW_PER_TASK, balanced_rows); | ||
| } |
| const auto* binary_array = | ||
| arrow::internal::checked_cast<const arrow::LargeBinaryArray*>(field_array.get()); | ||
| if (!binary_array) { | ||
| return Status::Invalid( | ||
| fmt::format("cannot cast array for field {} to LargeBinaryArray", field_name)); | ||
| } |
| std::optional<std::string> warehouse_path = options_.GetBlobViewUpstreamWarehouse(); | ||
| if (!warehouse_path) { | ||
| return Status::Invalid( | ||
| "invalid config for blob view, supposed to set BLOB_VIEW_UPSTREAM_WAREHOUSE"); | ||
| } | ||
| auto catalog_context = std::make_shared<CatalogContext>( | ||
| warehouse_path.value(), options_.ToMap(), options_.GetFileSystem()); |
| uint32_t GetThreadNum() const override { | ||
| return 0; | ||
| } |
| uint32_t GetThreadNum() const override { | ||
| return 0; | ||
| } |
Purpose
support multi-thread reading upstream table & add inte test
Tests
blob_table_inte_test.cpp
API and Format
catalog.h
defs.h
executor.h