Skip to content

feat(blob view): support multi-thread reading upstream table & add inte test#341

Open
lszskye wants to merge 1 commit into
alibaba:mainfrom
lszskye:blob_view_support_multi_thread_reading
Open

feat(blob view): support multi-thread reading upstream table & add inte test#341
lszskye wants to merge 1 commit into
alibaba:mainfrom
lszskye:blob_view_support_multi_thread_reading

Conversation

@lszskye
Copy link
Copy Markdown
Collaborator

@lszskye lszskye commented Jun 5, 2026

Purpose

support multi-thread reading upstream table & add inte test

Tests

blob_table_inte_test.cpp

API and Format

catalog.h
defs.h
executor.h

Comment thread include/paimon/defs.h
/// path and requires manual configuration by the user. No default value.
static const char BLOB_VIEW_UPSTREAM_WAREHOUSE[];
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clearly note that this differs from the Java configuration and will be deprecated once RestCatalog is supported.


Status BlobUtils::ValidateBlobViewFields(const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& view_fields) {
if (view_fields.empty()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge or reuse the logic in ValidateBlobViewFields and ValidateInlineBlobDescriptors; the two code paths currently differ by only a few lines.

// Save inline descriptor and view fields for validation in Write()
if (blob_context) {
inline_descriptor_fields_ = blob_context->GetDescriptorFields();
inline_view_fields_ = blob_context->GetViewFields();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge #332 first to avoid duplicate check when write_schema does not contain view/desc fields.

PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options"));
std::string table_path =
PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the origin ASSERT_EQ check for GetTableLocation?

uint32_t cpu_count = std::thread::hardware_concurrency();
thread_num = cpu_count > 0 ? static_cast<int32_t>(cpu_count) : 1;
if (thread_num) {
final_executor = CreateDefaultExecutor(static_cast<uint32_t>(thread_num.value()));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If thread_num is set to 0, is there is any protect or bad status?

{Options::BLOB_EXTERNAL_STORAGE_FIELD, "blob3,blob4"},
{Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"},
{Options::BLOB_VIEW_UPSTREAM_WAREHOUSE, "FILE:///tmp/blob_view_upstream_warehouse/"},
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m just wondering whether BLOB_VIEW_UPSTREAM_WAREHOUSE needs to be set with schema, or if /tmp/blob_view_upstream_warehouse/ is also fine.

auto append_view = [&](arrow::LargeBinaryBuilder* builder, int32_t field_id, int64_t row_id) {
BlobViewStruct view_struct(upstream_identifier, field_id, row_id);
auto serialized = view_struct.Serialize(GetDefaultPool());
ASSERT_TRUE(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetDefaultPool to poo_, please also put out param builder to the last param.

std::string branch_dir =
PathUtil::JoinPath(upstream_table_path, "branch/branch-" + branch_name);
for (const auto& entry : std::filesystem::directory_iterator(branch_src_table_path)) {
std::string name = entry.path().filename().string();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the local filesystem abstraction instead of std::filesystem unless the local fs cannot support this case.

std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar");

Identifier branch_identifier(upstream_db_name, upstream_table_name + "$branch_" + branch_name);
arrow::LargeBinaryBuilder view_builder;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test that reads a blob view database written by Java Paimon, where the upstream table is a branch. This would help verify that the identifier logic and serialization are consistent with C++, and would also avoid having to fake the upstream table in this case.

ASSERT_OK(Commit(temp_table_path, upstream_commit_msgs));

// Copy the temp table to the fallback path (without .db).
ASSERT_TRUE(TestUtil::CopyDirectory(temp_table_path, fallback_table_path));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler and clearer to copy a blob table from test_data directly into fallback_table_path for this test?

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends blob-view support by adding parallelized upstream table descriptor loading and introducing additional integration tests and updated golden test datasets to validate blob-view behavior across formats and table layouts.

Changes:

  • Parallelize upstream blob descriptor preloading for blob-view resolution, including range chunking and multi-table/branch/fallback-path scenarios.
  • Add new blob-view option plumbing (blob-view-upstream-warehouse) and extend validation for inline blob-view writes.
  • Add/extend integration & unit tests and update ORC/Parquet golden datasets to reflect schema evolution changes.

Reviewed changes

Copilot reviewed 38 out of 87 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/paimon/core/utils/blob_view_lookup.h Extend blob-view lookup API for executor-based parallel loading and range splitting helpers.
src/paimon/core/utils/blob_view_lookup.cpp Implement parallel descriptor preload, fallback-path resolution, and row-range chunking.
src/paimon/core/utils/blob_view_lookup_test.cpp Add unit coverage for identifier access, task sizing, and range splitting.
src/paimon/core/operation/data_evolution_split_read.cpp Wire blob-view resolver creation to new upstream-warehouse option and global executor.
src/paimon/core/core_options.h Add accessor for blob-view upstream warehouse option.
src/paimon/core/core_options.cpp Parse/store blob-view-upstream-warehouse option.
src/paimon/core/core_options_test.cpp Validate default and map parsing for the new option.
include/paimon/defs.h Document new blob-view upstream warehouse option.
src/paimon/common/defs.cpp Add option key constant blob-view-upstream-warehouse.
include/paimon/executor.h Add Executor::GetThreadNum() API.
src/paimon/common/executor/executor.cpp Implement GetThreadNum() for default executor.
src/paimon/core/mergetree/compact/merge_tree_compact_manager_test.cpp Update test executors to implement new executor API.
src/paimon/core/global_index/global_index_scan_impl.cpp Prefer global default executor when thread-num option is unset.
src/paimon/common/data/blob_utils.h Add blob-view field validation API.
src/paimon/common/data/blob_utils.cpp Add blob-view field validation and tighten blob-descriptor validation messaging.
src/paimon/common/data/blob_utils_test.cpp Expand blob utils tests for view validation and updated error messages.
src/paimon/common/data/blob_view_struct_test.cpp Add tests for BlobViewStruct detection/validation behavior.
src/paimon/core/append/append_only_writer.h Track inline view fields for write-time validation.
src/paimon/core/append/append_only_writer.cpp Validate inline blob-view values during write.
src/paimon/core/append/append_only_writer_test.cpp Add tests for accepting/rejecting blob-view values on write.
include/paimon/catalog/catalog.h Change Catalog::GetTableLocation to return Result<std::string>.
src/paimon/core/catalog/file_system_catalog.h Update catalog interface implementation signatures for new Result return.
src/paimon/core/catalog/file_system_catalog.cpp Propagate Result table location and adjust internal path building.
src/paimon/core/catalog/file_system_catalog_test.cpp Update tests for new GetTableLocation result type.
test/inte/read_inte_test.cpp Adjust system-table integration test path usage.
test/inte/blob_table_inte_test.cpp Add blob-view integration tests (external storage, multi-upstream, branch, fallback, failure).
test/test_data/parquet/.../snapshot/snapshot-1 Update parquet golden snapshot metadata.
test/test_data/parquet/.../snapshot/snapshot-2 Update parquet golden snapshot metadata.
test/test_data/parquet/.../snapshot/snapshot-3 Update parquet golden snapshot metadata.
test/test_data/parquet/.../schema/schema-0 Update parquet golden schema/options ordering and timestamps.
test/test_data/parquet/.../schema/schema-1 Rename blob column and update options/timestamps in parquet golden schema.
test/test_data/parquet/.../README Update parquet golden README steps/output to match blob column rename.
test/test_data/parquet/.../bucket-0/data-50989127-...-1.blob Add/update parquet blob payload test file.
test/test_data/parquet/.../bucket-0/data-50989127-...-2.blob Add/update parquet blob payload test file.
test/test_data/parquet/.../bucket-0/data-ac3de758-...-1.blob Add/update parquet blob payload test file.
test/test_data/parquet/.../bucket-0/data-ac3de758-...-2.blob Add/update parquet blob payload test file.
test/test_data/orc/.../snapshot/snapshot-1 Update ORC golden snapshot metadata.
test/test_data/orc/.../snapshot/snapshot-2 Update ORC golden snapshot metadata.
test/test_data/orc/.../snapshot/snapshot-3 Update ORC golden snapshot metadata.
test/test_data/orc/.../schema/schema-0 Update ORC golden schema/options ordering and timestamps.
test/test_data/orc/.../schema/schema-1 Rename blob column and update options/timestamps in ORC golden schema.
test/test_data/orc/.../README Update ORC golden README steps/output to match blob column rename.
test/test_data/orc/.../bucket-0/data-2c772a51-...-0.orc Add/update ORC golden data file.
test/test_data/orc/.../bucket-0/data-a5e684ea-...-0.orc Add/update ORC golden data file.
test/test_data/orc/.../bucket-0/data-ac7f689b-...-0.orc Add/update ORC golden data file.
test/test_data/orc/.../bucket-0/data-d0ddcf17-...-0.orc Add/update ORC golden data file.
test/test_data/orc/.../bucket-0/data-a5e684ea-...-1.blob Add/update ORC blob payload test file.
test/test_data/orc/.../bucket-0/data-a5e684ea-...-2.blob Add/update ORC blob payload test file.
test/test_data/orc/.../bucket-0/data-ac7f689b-...-1.blob Add/update ORC blob payload test file.
test/test_data/orc/.../bucket-0/data-ac7f689b-...-2.blob Add/update ORC blob payload test file.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 52 to 55
DefaultExecutor::DefaultExecutor(uint32_t thread_count) : thread_count_(thread_count) {
for (uint32_t i = 0; i < thread_count_; ++i) {
workers_.emplace_back(&DefaultExecutor::WorkerThread, this);
}
Comment on lines +300 to +310
int64_t BlobViewLookup::TargetRowsPerTask(
const std::unordered_map<Identifier, TableReadPlan>& plan_by_identifier, uint32_t thread_num) {
int64_t total_rows = 0;
for (const auto& [identifier, table_read_plan] : plan_by_identifier) {
for (const auto& row_range : table_read_plan.GetSortedDistinctRanges()) {
total_rows += row_range.Count();
}
}
int64_t balanced_rows = (total_rows + thread_num - 1) / thread_num;
return std::max(MIN_ROW_PER_TASK, balanced_rows);
}
Comment on lines +182 to +187
const auto* binary_array =
arrow::internal::checked_cast<const arrow::LargeBinaryArray*>(field_array.get());
if (!binary_array) {
return Status::Invalid(
fmt::format("cannot cast array for field {} to LargeBinaryArray", field_name));
}
Comment on lines +177 to +183
std::optional<std::string> warehouse_path = options_.GetBlobViewUpstreamWarehouse();
if (!warehouse_path) {
return Status::Invalid(
"invalid config for blob view, supposed to set BLOB_VIEW_UPSTREAM_WAREHOUSE");
}
auto catalog_context = std::make_shared<CatalogContext>(
warehouse_path.value(), options_.ToMap(), options_.GetFileSystem());
Comment on lines +52 to +54
uint32_t GetThreadNum() const override {
return 0;
}
Comment on lines +65 to +67
uint32_t GetThreadNum() const override {
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants