Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/paimon/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ class PAIMON_EXPORT Catalog {
/// @note This does not check whether the table actually exists.
///
/// @param identifier The table identifier containing database and table name.
/// @return A string representing the expected location of the table.
virtual std::string GetTableLocation(const Identifier& identifier) const = 0;
/// @return A result containing the expected location of the table, or an error status on
/// failure.
virtual Result<std::string> GetTableLocation(const Identifier& identifier) const = 0;

/// Returns the root path of the catalog.
///
Expand Down
4 changes: 4 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ struct PAIMON_EXPORT Options {
/// configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is
/// not applied to this path. No default value.
static const char BLOB_EXTERNAL_STORAGE_PATH[];
/// "blob-view-upstream-warehouse" - Since the catalog capabilities are partially missing, when
/// Blob View is enabled, cpp paimon cannot automatically obtain the upstream table warehouse
/// path and requires manual configuration by the user. No default value.
static const char BLOB_VIEW_UPSTREAM_WAREHOUSE[];
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clearly note that this differs from the Java configuration and will be deprecated once RestCatalog is supported.

/// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No
Expand Down
3 changes: 3 additions & 0 deletions include/paimon/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class PAIMON_EXPORT Executor {

/// Shutdown the executor immediately, discarding all pending tasks.
virtual void ShutdownNow() = 0;

/// Get thread number.
virtual uint32_t GetThreadNum() const = 0;
};

} // namespace paimon
46 changes: 42 additions & 4 deletions src/paimon/common/data/blob_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "fmt/format.h"
#include "paimon/common/data/blob_defs.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/data/blob_view_struct.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/string_utils.h"
Expand Down Expand Up @@ -155,10 +156,47 @@ Status BlobUtils::ValidateInlineBlobDescriptors(
PAIMON_ASSIGN_OR_RAISE(bool is_descriptor,
BlobDescriptor::IsBlobDescriptor(value.data(), value.size()));
if (!is_descriptor) {
return Status::Invalid(fmt::format(
"BLOB inline field {} configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.",
field_name));
return Status::Invalid(
fmt::format("BLOB inline field {} configured by blob-descriptor-field require "
"values to be a BlobDescriptor.",
field_name));
}
}
}
return Status::OK();
}

Status BlobUtils::ValidateBlobViewFields(const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& view_fields) {
if (view_fields.empty()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge or reuse the logic in ValidateBlobViewFields and ValidateInlineBlobDescriptors; the two code paths currently differ by only a few lines.

return Status::OK();
}
if (!struct_array) {
return Status::Invalid("array in ValidateBlobViewFields must be a struct_array");
}
for (const auto& field_name : view_fields) {
auto field_array = struct_array->GetFieldByName(field_name);
if (!field_array) {
continue;
}
const auto* binary_array =
arrow::internal::checked_cast<const arrow::LargeBinaryArray*>(field_array.get());
if (!binary_array) {
return Status::Invalid(
fmt::format("cannot cast array for field {} to LargeBinaryArray", field_name));
}
Comment on lines +182 to +187
for (int64_t row = 0; row < binary_array->length(); ++row) {
if (binary_array->IsNull(row)) {
continue;
}
auto value = binary_array->GetView(row);
PAIMON_ASSIGN_OR_RAISE(bool is_view,
BlobViewStruct::IsBlobViewStruct(value.data(), value.size()));
if (!is_view) {
return Status::Invalid(
fmt::format("BLOB inline field {} configured by blob-view-field require values "
"to be a BlobViewStruct.",
field_name));
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/data/blob_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class PAIMON_EXPORT BlobUtils {
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_descriptor_fields);

static Status ValidateBlobViewFields(const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& view_fields);

/// Converts inline blob DataFields from large_binary to binary type.
/// Inline blob fields use large_binary in the table schema (because they are BLOB type),
/// but are stored as binary in data files. This conversion aligns the field type with
Expand Down
131 changes: 103 additions & 28 deletions src/paimon/common/data/blob_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#include "arrow/api.h"
#include "arrow/c/bridge.h"
#include "gtest/gtest.h"
#include "paimon/catalog/identifier.h"
#include "paimon/common/data/blob_defs.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/data/blob_view_struct.h"
#include "paimon/common/types/data_field.h"
#include "paimon/data/blob.h"
#include "paimon/memory/memory_pool.h"
Expand All @@ -29,44 +31,47 @@
namespace paimon::test {

class BlobUtilsTest : public ::testing::Test {
private:
public:
std::shared_ptr<arrow::KeyValueMetadata> CreateBlobMetadata() {
std::unordered_map<std::string, std::string> blob_metadata_map = {
{BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}};
return std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map);
}

private:
std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
};

TEST_F(BlobUtilsTest, IsBlobMetadata) {
auto correct_metadata = CreateBlobMetadata();
EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
ASSERT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
ASSERT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
std::unordered_map<std::string, std::string> wrong_metadata_map = {
{BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}};
auto wrong_metadata = std::make_shared<arrow::KeyValueMetadata>(wrong_metadata_map);
EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
ASSERT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
std::unordered_map<std::string, std::string> no_extension_metadata_map = {
{"other_key", BlobDefs::kExtensionTypeValue}};
auto no_extension_metadata =
std::make_shared<arrow::KeyValueMetadata>(no_extension_metadata_map);
EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
ASSERT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
}

TEST_F(BlobUtilsTest, IsBlobField) {
std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("f1", true);
EXPECT_TRUE(BlobUtils::IsBlobField(blob_field));
ASSERT_TRUE(BlobUtils::IsBlobField(blob_field));

auto int_field = arrow::field("i_int", arrow::int32());
EXPECT_FALSE(BlobUtils::IsBlobField(int_field));
ASSERT_FALSE(BlobUtils::IsBlobField(int_field));

auto binary_field_no_meta = arrow::field("b_no_meta", arrow::large_binary());
EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta));
ASSERT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta));

auto wrong_meta = std::make_shared<arrow::KeyValueMetadata>(
std::unordered_map<std::string, std::string>{{"other_key", "value"}});
auto binary_field_wrong_meta =
arrow::field("b_wrong_meta", arrow::large_binary(), false, wrong_meta);
EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta));
ASSERT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta));
}

TEST_F(BlobUtilsTest, SeparateBlobSchema) {
Expand Down Expand Up @@ -250,9 +255,8 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsFieldNotPresent) {

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithValidDescriptor) {
// Valid BlobDescriptor bytes -> OK
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);
auto serialized = descriptor->Serialize(pool_);

arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
Expand Down Expand Up @@ -282,36 +286,31 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithRawBytes) {
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(
BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}),
"BLOB inline field b0 configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.");
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}),
"BLOB inline field b0 configured by blob-descriptor-field require values "
"to be a BlobDescriptor.");
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsMixedValidAndInvalid) {
// First row is valid descriptor, second row is raw bytes -> error on row 1
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);

auto serialized = descriptor->Serialize(pool_);
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
ASSERT_TRUE(builder.Append("raw_bytes_not_descriptor").ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(
BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}),
"BLOB inline field b0 configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.");
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}),
"BLOB inline field b0 configured by blob-descriptor-field require values "
"to be a BlobDescriptor.");
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsMultipleFields) {
// Two inline fields: b0 is valid, b1 has raw bytes -> error on b1
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);
auto serialized = descriptor->Serialize(pool_);

arrow::LargeBinaryBuilder b0_builder;
ASSERT_TRUE(b0_builder.Append(serialized->data(), serialized->size()).ok());
Expand All @@ -326,10 +325,86 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsMultipleFields) {
{BlobUtils::ToArrowField("b0"), BlobUtils::ToArrowField("b1")})
.ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(
BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0", "b1"}),
"BLOB inline field b1 configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.");
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0", "b1"}),
"BLOB inline field b1 configured by blob-descriptor-field require values "
"to be a BlobDescriptor.");
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsEmptyFields) {
// Empty view_fields -> always OK
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append("random_data").ok());
auto array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobViewFields(sa, {}));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsFieldNotPresent) {
// Field not in struct_array -> skip, OK
arrow::Int32Builder int_builder;
ASSERT_TRUE(int_builder.Append(42).ok());
auto int_array = int_builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({int_array}, {arrow::field("f0", arrow::int32())}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobViewFields(sa, {"view"}));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsWithValidViewStruct) {
// A BlobViewStruct value is accepted for a view field.
BlobViewStruct view_struct(Identifier("db", "tbl"), /*field_id=*/2, /*row_id=*/5);
auto serialized = view_struct.Serialize(pool_);

arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobViewFields(sa, {"view"}));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsWithNullValue) {
// Null values in view column -> skip, OK
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.AppendNull().ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobViewFields(sa, {"view"}));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsWithRawBytes) {
// Raw bytes -> error
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append("raw_bytes_not_view").ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateBlobViewFields(sa, {"view"}),
"BLOB inline field view configured by blob-view-field require values to be "
"a BlobViewStruct.");
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsRejectsBlobDescriptor) {
// A BlobDescriptor value is NOT accepted for a view field.
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);

arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateBlobViewFields(sa, {"view"}),
"BLOB inline field view configured by blob-view-field require values "
"to be a BlobViewStruct.");
}

TEST_F(BlobUtilsTest, TestConvertBlobInlineDataFields) {
Expand Down
46 changes: 46 additions & 0 deletions src/paimon/common/data/blob_view_struct_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,50 @@ TEST_F(BlobViewStructTest, TestEqual) {
}
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructValid) {
auto serialized = view_struct_.Serialize(pool_);
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(serialized->data(), serialized->size()));
ASSERT_TRUE(result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithTooShortBuffer) {
// Buffer shorter than 9 bytes should return false
std::vector<char> short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C};
ASSERT_OK_AND_ASSIGN(
bool result, BlobViewStruct::IsBlobViewStruct(short_buffer.data(), short_buffer.size()));
ASSERT_FALSE(result);

// Empty buffer
ASSERT_OK_AND_ASSIGN(bool empty_result, BlobViewStruct::IsBlobViewStruct(nullptr, 0));
ASSERT_FALSE(empty_result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithFutureVersion) {
// Version > CURRENT_VERSION should return false (not an error)
auto serialized = view_struct_.Serialize(pool_);
(*serialized)[0] = '\x02'; // set version to 2 (> CURRENT_VERSION)
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(serialized->data(), serialized->size()));
ASSERT_FALSE(result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithWrongMagic) {
// Wrong magic number should return false
auto serialized = view_struct_.Serialize(pool_);
// Corrupt the magic bytes (bytes 1-8)
(*serialized)[1] = '\x00';
(*serialized)[2] = '\x00';
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(serialized->data(), serialized->size()));
ASSERT_FALSE(result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithRandomData) {
// Random data that doesn't match format
std::vector<char> random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(random_data.data(), random_data.size()));
ASSERT_FALSE(result);
}
} // namespace paimon::test
1 change: 1 addition & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const char Options::BLOB_FIELD[] = "blob-field";
const char Options::BLOB_DESCRIPTOR_FIELD[] = "blob-descriptor-field";
const char Options::FALLBACK_BLOB_DESCRIPTOR_FIELD[] = "blob.stored-descriptor-fields";
const char Options::BLOB_VIEW_FIELD[] = "blob-view-field";
const char Options::BLOB_VIEW_UPSTREAM_WAREHOUSE[] = "blob-view-upstream-warehouse";
const char Options::BLOB_EXTERNAL_STORAGE_FIELD[] = "blob-external-storage-field";
const char Options::BLOB_EXTERNAL_STORAGE_PATH[] = "blob-external-storage-path";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
Expand Down
Loading
Loading