Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 85 additions & 5 deletions test/inte/blob_table_inte_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

#include <cstdint>
#include <cstdlib>
#include <filesystem>
#include <fstream>
#include <initializer_list>
#include <map>
#include <memory>
#include <numeric>
#include <optional>
#include <set>
#include <string>
#include <utility>
Expand All @@ -40,6 +40,7 @@
#include "paimon/common/data/binary_array_writer.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_row_writer.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/data/blob_view_struct.h"
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/table/special_fields.h"
Expand Down Expand Up @@ -345,19 +346,58 @@ class BlobTableInteTest : public testing::Test, public ::testing::WithParamInter
});
}

struct BlobDescriptorPathRewrite {
std::string table_path;
std::vector<std::string> table_relative_blob_dirs;
};

static std::optional<std::string> TryRewriteDescriptorUri(
const std::string& descriptor_uri, const BlobDescriptorPathRewrite& rewrite,
const std::shared_ptr<LocalFileSystem>& fs) {
if (rewrite.table_path.empty()) {
return std::nullopt;
}

for (const auto& blob_dir : rewrite.table_relative_blob_dirs) {
const std::string marker = "/" + blob_dir + "/";
auto marker_pos = descriptor_uri.find(marker);
if (marker_pos != std::string::npos) {
std::string relative_blob_path = descriptor_uri.substr(marker_pos + 1);
return PathUtil::JoinPath(rewrite.table_path, relative_blob_path);
}
}
return std::nullopt;
}

/// Convert a StructArray with serialized BlobDescriptor bytes back to a StructArray
/// with raw blob bytes. Only blob fields are resolved; other columns (including
/// _VALUE_KIND) are kept as-is.
Result<std::shared_ptr<arrow::StructArray>> ConvertDescriptorToRawBlob(
const std::shared_ptr<arrow::StructArray>& desc_array,
const std::set<std::string>& blob_fields) const {
const std::set<std::string>& blob_fields,
const BlobDescriptorPathRewrite& rewrite = {}) const {
auto fs = std::make_shared<LocalFileSystem>();
return TransformBlobFields(
desc_array, blob_fields,
[&](const std::string_view& descriptor_bytes,
arrow::LargeBinaryBuilder* builder) -> Status {
PAIMON_ASSIGN_OR_RAISE(auto blob, Blob::FromDescriptor(descriptor_bytes.data(),
descriptor_bytes.size()));
PAIMON_ASSIGN_OR_RAISE(
auto descriptor,
BlobDescriptor::Deserialize(descriptor_bytes.data(), descriptor_bytes.size()));
std::string descriptor_uri = descriptor->Uri();
auto rewritten_uri = TryRewriteDescriptorUri(descriptor_uri, rewrite, fs);
if (rewritten_uri.has_value()) {
descriptor_uri = rewritten_uri.value();
}

PAIMON_ASSIGN_OR_RAISE(
auto rewritten_descriptor,
BlobDescriptor::Create(descriptor->Version(), descriptor_uri,
descriptor->Offset(), descriptor->Length()));
auto rewritten_descriptor_bytes = rewritten_descriptor->Serialize(pool_);
PAIMON_ASSIGN_OR_RAISE(auto blob,
Blob::FromDescriptor(rewritten_descriptor_bytes->data(),
rewritten_descriptor_bytes->size()));
PAIMON_ASSIGN_OR_RAISE(auto data, blob->ToData(fs, pool_));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(data->data(), data->size()));
return Status::OK();
Expand Down Expand Up @@ -3008,4 +3048,44 @@ TEST_P(BlobTableInteTest, TestBlobViewWithFallbackPath) {
<< "expected:" << expected_with_rk->ToString();
}

TEST_P(BlobTableInteTest, TestReadBlobDescriptorFieldFromJava) {
auto file_format = GetParam();
if (file_format != "orc" && file_format != "parquet") {
return;
}
std::string table_path =
GetDataDir() + "/" + file_format +
"/blob_desc_field_with_external_path.db/blob_desc_field_with_external_path";
arrow::FieldVector fields = {
arrow::field("f0", arrow::int32()), BlobUtils::ToArrowField("b0", true),
BlobUtils::ToArrowField("b1", true), BlobUtils::ToArrowField("b2", true),
BlobUtils::ToArrowField("b3", true)};
auto schema = arrow::schema(fields);
// b0: all non-null, b1: has nulls, b2: all non-null, b3: has nulls
std::string raw_json = R"([
[1, "img_0", null, "raw_2_0", "raw_3_0"],
[2, "img_1", "vid_1", "raw_2_1", null ],
[3, "img_2", null, "raw_2_2", "raw_3_2" ]
])";
auto raw_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), raw_json).ValueOrDie());

ASSERT_OK_AND_ASSIGN(auto plan, ScanTable(table_path));
std::map<std::string, std::string> read_options = {{Options::BLOB_AS_DESCRIPTOR, "false"}};
ASSERT_OK_AND_ASSIGN(auto result, ReadTable(table_path, schema->field_names(), plan,
/*predicate=*/nullptr, read_options));
ASSERT_TRUE(result.chunked_array);
auto read_concat = arrow::Concatenate(result.chunked_array->chunks()).ValueOrDie();
auto read_struct = std::dynamic_pointer_cast<arrow::StructArray>(read_concat);

// After read, b0 and b1 are both descriptor-stored; resolve all back to raw bytes.
// Java-generated descriptors may contain absolute paths from the generation machine.
// Rewrite them to the portable blob directories inside the copied table path.
BlobDescriptorPathRewrite rewrite{table_path, {"raw_blob", "external_blob"}};
ASSERT_OK_AND_ASSIGN(auto resolved,
ConvertDescriptorToRawBlob(read_struct, {"b0", "b1"}, rewrite));
ASSERT_OK_AND_ASSIGN(auto expected_with_rk, PrependRowKindColumn(raw_array));
ASSERT_TRUE(resolved->Equals(expected_with_rk));
}

} // namespace paimon::test
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
f0:int b0:blob b1:blob b2:blob b3:blob (all can be null)
bucket count: -1
target-file-size: 700
row-tracking.enabled: true
data-evolution.enabled: true
blob-descriptor-field: b0,b1
blob-external-storage-field: b1
blob-external-storage-path: <table>/external_blob (absolute path at generation time)

b0: descriptor field, inline in main file, source .bin files in raw_blob/
b1: descriptor field, repacked to external storage in external_blob/
b2: regular blob, written to .blob files
b3: regular blob, written to .blob files

Note: b0 is passed as descriptor via Blob.fromLocal(); b1/b2/b3 are raw bytes.
Paimon auto-converts b1 to descriptor internally.

Msgs:
snapshot-1
write field: "f0", "b0", "b1", "b2", "b3"
Add: 1, "img_0", null, "raw_2_0", "raw_3_0"
Add: 2, "img_1", "vid_1", "raw_2_1", null
Add: 3, "img_2", null, "raw_2_2", "raw_3_2"
NoCompact

C++ read note:
Descriptor URIs contain absolute paths from the Java generation machine.
ConvertDescriptorToRawBlob uses BlobDescriptorPathRewrite{"raw_blob", "external_blob"}
to redirect them to <table>/raw_blob/ and <table>/external_blob/ at read time.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
img_0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
img_1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
img_2
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"version" : 3,
"id" : 0,
"fields" : [ {
"id" : 0,
"name" : "f0",
"type" : "INT"
}, {
"id" : 1,
"name" : "b0",
"type" : "BLOB"
}, {
"id" : 2,
"name" : "b1",
"type" : "BLOB"
}, {
"id" : 3,
"name" : "b2",
"type" : "BLOB"
}, {
"id" : 4,
"name" : "b3",
"type" : "BLOB"
} ],
"highestFieldId" : 4,
"partitionKeys" : [ ],
"primaryKeys" : [ ],
"options" : {
"bucket" : "-1",
"row-tracking.enabled" : "true",
"blob-external-storage-path" : "external_blob",
"target-file-size" : "700",
"blob-external-storage-field" : "b1",
"data-evolution.enabled" : "true",
"file-system" : "local",
"manifest.format" : "orc",
"file.format" : "orc",
"blob-descriptor-field" : "b0,b1"
},
"timeMillis" : 1781088844975
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version" : 3,
"id" : 1,
"schemaId" : 0,
"baseManifestList" : "manifest-list-85cd267b-28d7-4aab-93b8-cf7e8ac57c07-0",
"baseManifestListSize" : 392,
"deltaManifestList" : "manifest-list-85cd267b-28d7-4aab-93b8-cf7e8ac57c07-1",
"deltaManifestListSize" : 1510,
"commitUser" : "f69bd0bf-9271-465c-a6ea-60001dfe02ff",
"commitIdentifier" : 0,
"commitKind" : "APPEND",
"timeMillis" : 1781088845796,
"totalRecordCount" : 9,
"deltaRecordCount" : 9,
"nextRowId" : 3
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
f0:int b0:blob b1:blob b2:blob b3:blob (all can be null)
bucket count: -1
target-file-size: 700
row-tracking.enabled: true
data-evolution.enabled: true
blob-descriptor-field: b0,b1
blob-external-storage-field: b1
blob-external-storage-path: <table>/external_blob (absolute path at generation time)

b0: descriptor field, inline in main file, source .bin files in raw_blob/
b1: descriptor field, repacked to external storage in external_blob/
b2: regular blob, written to .blob files
b3: regular blob, written to .blob files

Note: b0 is passed as descriptor via Blob.fromLocal(); b1/b2/b3 are raw bytes.
Paimon auto-converts b1 to descriptor internally.

Msgs:
snapshot-1
write field: "f0", "b0", "b1", "b2", "b3"
Add: 1, "img_0", null, "raw_2_0", "raw_3_0"
Add: 2, "img_1", "vid_1", "raw_2_1", null
Add: 3, "img_2", null, "raw_2_2", "raw_3_2"
NoCompact

C++ read note:
Descriptor URIs contain absolute paths from the Java generation machine.
ConvertDescriptorToRawBlob uses BlobDescriptorPathRewrite{"raw_blob", "external_blob"}
to redirect them to <table>/raw_blob/ and <table>/external_blob/ at read time.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
img_0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
img_1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
img_2
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"version" : 3,
"id" : 0,
"fields" : [ {
"id" : 0,
"name" : "f0",
"type" : "INT"
}, {
"id" : 1,
"name" : "b0",
"type" : "BLOB"
}, {
"id" : 2,
"name" : "b1",
"type" : "BLOB"
}, {
"id" : 3,
"name" : "b2",
"type" : "BLOB"
}, {
"id" : 4,
"name" : "b3",
"type" : "BLOB"
} ],
"highestFieldId" : 4,
"partitionKeys" : [ ],
"primaryKeys" : [ ],
"options" : {
"bucket" : "-1",
"row-tracking.enabled" : "true",
"blob-external-storage-path" : "external_blob",
"target-file-size" : "700",
"blob-external-storage-field" : "b1",
"data-evolution.enabled" : "true",
"file-system" : "local",
"manifest.format" : "avro",
"file.format" : "parquet",
"blob-descriptor-field" : "b0,b1"
},
"timeMillis" : 1781088620905
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version" : 3,
"id" : 1,
"schemaId" : 0,
"baseManifestList" : "manifest-list-7395f790-699b-4a38-8747-10f23ceba1d6-0",
"baseManifestListSize" : 1006,
"deltaManifestList" : "manifest-list-7395f790-699b-4a38-8747-10f23ceba1d6-1",
"deltaManifestListSize" : 1115,
"commitUser" : "d30e72bf-d9bb-4de4-a178-f11f2d2f4fa5",
"commitIdentifier" : 0,
"commitKind" : "APPEND",
"timeMillis" : 1781088622173,
"totalRecordCount" : 9,
"deltaRecordCount" : 9,
"nextRowId" : 3
}
Loading