diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 70f2aa0b0..2aa67b830 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -365,6 +365,20 @@ struct PAIMON_EXPORT Options { /// "partition.legacy-name" - The legacy partition name is using `ToString` for all types. If /// false, using casting to string for all types. Default value is "true". static const char PARTITION_GENERATE_LEGACY_NAME[]; + /// "map-storage-layout" - Suffix for per-column MAP storage layout configuration. + /// Used as `fields..map-storage-layout`. Values: "default" (standard KV arrays) + /// or "extend" (columnar-extend with column reuse). Default is "default". + /// The column must be of type MAP. Each column must be configured individually. + /// For example, to enable extend layout for two columns "metrics" and "tags": + /// fields.metrics.map-storage-layout = extend + /// fields.tags.map-storage-layout = extend + static const char MAP_STORAGE_LAYOUT[]; + /// "map-extend.max-columns" - Suffix for per-column upper bound K_max configuration. + /// Used as `fields..map-extend.max-columns`. Only effective when + /// map-storage-layout = extend. Rows with more fields than K_max spill to __overflow. + /// Default value is 256. Each column can have its own max-columns setting. + static const char MAP_EXTEND_MAX_COLUMNS[]; + /// "blob-as-descriptor" - Read blob field using blob descriptor rather than blob /// bytes. Default value is "false". static const char BLOB_AS_DESCRIPTOR[]; diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index a4e919030..a39fe7c29 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -137,6 +137,7 @@ set(PAIMON_COMMON_SRCS common/utils/bloom_filter64.cpp common/utils/crc32c.cpp common/utils/decimal_utils.cpp + common/utils/extend_map_utils.cpp common/utils/delta_varint_compressor.cpp common/utils/fields_comparator.cpp common/utils/path_util.cpp @@ -531,6 +532,7 @@ if(PAIMON_BUILD_TESTS) common/utils/decimal_utils_test.cpp common/utils/threadsafe_queue_test.cpp common/utils/generic_lru_cache_test.cpp + common/utils/extend_map_utils_test.cpp STATIC_LINK_LIBS paimon_shared test_utils_static diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index b328213d6..153388350 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -91,6 +91,8 @@ const char Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[] = "row-tracking.partition-group-on-commit"; const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled"; const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name"; +const char Options::MAP_STORAGE_LAYOUT[] = "map-storage-layout"; +const char Options::MAP_EXTEND_MAX_COLUMNS[] = "map-extend.max-columns"; const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor"; const char Options::BLOB_FIELD[] = "blob-field"; const char Options::BLOB_DESCRIPTOR_FIELD[] = "blob-descriptor-field"; diff --git a/src/paimon/common/utils/extend_map_defs.h b/src/paimon/common/utils/extend_map_defs.h new file mode 100644 index 000000000..0f5b3b9b9 --- /dev/null +++ b/src/paimon/common/utils/extend_map_defs.h @@ -0,0 +1,90 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "fmt/format.h" +namespace paimon { + +/// Constants for the columnar-extend MAP storage layout. +/// Includes file footer meta keys and physical sub-column names. +struct ExtendMapDefine { + // ---- File footer meta keys ---- + + /// Version of the extend-map meta format. + static constexpr const char* kVersion = "paimon.map-extend.version"; + /// Current meta format version. + static constexpr int32_t kCurrentVersion = 1; + + /// Marker key indicating this file uses extend layout. Value is the layout type string. + static constexpr const char* kStorageLayout = "paimon.map-extend.storage-layout"; + /// Value for kStorageLayout when using extend layout. + static constexpr const char* kStorageLayoutExtend = "extend"; + /// JSON-encoded field name <-> field id dictionary (may be compressed). + static constexpr const char* kFieldDict = "paimon.map-extend.field-dict"; + /// Original (uncompressed) size of field_dict value. + static constexpr const char* kFieldDictOriginalSize = + "paimon.map-extend.field-dict-original-size"; + /// JSON-encoded field_id -> set of physical column indices. + static constexpr const char* kFieldColumns = "paimon.map-extend.field-columns"; + /// JSON-encoded set of field_ids that ever spilled into __overflow. + static constexpr const char* kOverflowSet = "paimon.map-extend.overflow-set"; + /// The number of physical columns K used in this file. + static constexpr const char* kNumColumns = "paimon.map-extend.num-columns"; + /// The maximum row width observed in this file. + static constexpr const char* kMaxRowWidth = "paimon.map-extend.max-row-width"; + + // ---- Physical sub-column names ---- + + /// Per-row field mapping column name. + static constexpr const char* kFieldMapping = "__field_mapping"; + /// Overflow column name. + static constexpr const char* kOverflow = "__overflow"; + + /// Returns the name of the i-th physical column: "__col_0", "__col_1", etc. + static std::string PhysicalColumnName(int32_t index) { + return fmt::format("__col_{}", index); + } +}; + +/// Parsed file-level meta for one columnar-extend MAP column. +struct ExtendMapFileMeta { + /// field_name -> field_id + std::map name_to_id; + /// field_id -> set of physical column indices S + std::map> field_to_columns; + /// Set of field_ids that ever spilled into __overflow + std::set overflow_field_set; + /// Number of physical columns K in this file + int32_t num_columns = 0; + /// Maximum row width observed in this file + int32_t max_row_width = 0; + + bool operator==(const ExtendMapFileMeta& other) const { + return name_to_id == other.name_to_id && field_to_columns == other.field_to_columns && + overflow_field_set == other.overflow_field_set && num_columns == other.num_columns && + max_row_width == other.max_row_width; + } +}; + +} // namespace paimon diff --git a/src/paimon/common/utils/extend_map_utils.cpp b/src/paimon/common/utils/extend_map_utils.cpp new file mode 100644 index 000000000..800b37d31 --- /dev/null +++ b/src/paimon/common/utils/extend_map_utils.cpp @@ -0,0 +1,370 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/extend_map_utils.h" + +#include +#include + +#include "arrow/type.h" +#include "arrow/util/key_value_metadata.h" +#include "fmt/format.h" +#include "paimon/common/compression/block_compression_factory.h" +#include "paimon/common/compression/block_compressor.h" +#include "paimon/common/compression/block_decompressor.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/core/options/map_storage_layout.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" + +namespace paimon { +// ---- Column detection ---- + +bool ExtendMapUtils::IsStringKeyMap(const std::shared_ptr& arrow_type) { + if (arrow_type->id() != arrow::Type::MAP) { + return false; + } + auto map_type = std::static_pointer_cast(arrow_type); + return map_type->key_type()->id() == arrow::Type::STRING; +} + +Result> ExtendMapUtils::DetectExtendColumns( + const std::shared_ptr& schema, const CoreOptions& options) { + std::vector indices; + for (int32_t i = 0; i < schema->num_fields(); ++i) { + const auto& field = schema->field(i); + if (!IsStringKeyMap(field->type())) { + continue; + } + PAIMON_ASSIGN_OR_RAISE(MapStorageLayout layout, options.GetMapStorageLayout(field->name())); + if (layout == MapStorageLayout::EXTEND) { + indices.push_back(i); + } + } + return indices; +} + +// ---- Schema conversion ---- + +std::shared_ptr ExtendMapUtils::BuildPhysicalStructType( + const std::shared_ptr& value_type, int32_t num_columns, bool value_nullable) { + arrow::FieldVector struct_fields; + struct_fields.reserve(num_columns + 2); + + struct_fields.push_back( + arrow::field(ExtendMapDefine::kFieldMapping, arrow::list(arrow::int32()), false)); + + for (int32_t i = 0; i < num_columns; ++i) { + struct_fields.push_back( + arrow::field(ExtendMapDefine::PhysicalColumnName(i), value_type, value_nullable)); + } + + struct_fields.push_back(arrow::field( + ExtendMapDefine::kOverflow, + arrow::map(arrow::int32(), arrow::field("value", value_type, value_nullable)), true)); + + return arrow::struct_(std::move(struct_fields)); +} + +Result> ExtendMapUtils::LogicalToPhysicalSchema( + const std::shared_ptr& logical_schema, + const std::map& column_to_num_columns) { + arrow::FieldVector physical_fields; + physical_fields.reserve(logical_schema->num_fields()); + + for (int32_t i = 0; i < logical_schema->num_fields(); ++i) { + const auto& field = logical_schema->field(i); + auto it = column_to_num_columns.find(i); + if (it != column_to_num_columns.end()) { + auto map_type = std::static_pointer_cast(field->type()); + auto value_type = map_type->item_type(); + bool value_nullable = map_type->item_field()->nullable(); + auto physical_type = BuildPhysicalStructType(value_type, it->second, value_nullable); + physical_fields.push_back( + arrow::field(field->name(), physical_type, field->nullable())); + } else { + physical_fields.push_back(field); + } + } + + return arrow::schema(std::move(physical_fields)); +} + +Result> ExtendMapUtils::BuildColumnToNumColumns( + const std::vector& extend_column_indices, const std::shared_ptr& schema, + const CoreOptions& options) { + std::map column_to_num_columns; + for (int32_t col_index : extend_column_indices) { + const std::string& field_name = schema->field(col_index)->name(); + PAIMON_ASSIGN_OR_RAISE(int32_t max_columns, options.GetMapExtendMaxColumns(field_name)); + column_to_num_columns[col_index] = max_columns; + } + return column_to_num_columns; +} + +// ---- Metadata serialization helpers ---- + +namespace { + +std::string JsonEncodeObject( + std::function builder) { + rapidjson::Document doc(rapidjson::kObjectType); + auto allocator = doc.GetAllocator(); + builder(&doc, &allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + return buffer.GetString(); +} + +std::string JsonEncodeArray( + std::function builder) { + rapidjson::Document doc(rapidjson::kArrayType); + auto allocator = doc.GetAllocator(); + builder(&doc, &allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + return buffer.GetString(); +} + +Result CompressString(const std::string& input, const std::string& compression) { + CompressOptions compress_opts{compression, /*zstd_level=*/1}; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr factory, + BlockCompressionFactory::Create(compress_opts)); + std::shared_ptr compressor = factory->GetCompressor(); + if (!compressor) { + return input; + } + + auto src_size = static_cast(input.size()); + int32_t max_compressed = compressor->GetMaxCompressedSize(src_size); + std::string output(max_compressed, '\0'); + + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_size, + compressor->Compress(input.data(), src_size, output.data(), max_compressed)); + + output.resize(actual_size); + return output; +} + +Result DecompressString(const std::string& input, int32_t original_len, + const std::string& compression) { + CompressOptions compress_opts{compression, /*zstd_level=*/1}; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr factory, + BlockCompressionFactory::Create(compress_opts)); + std::shared_ptr decompressor = factory->GetDecompressor(); + if (!decompressor) { + return input; + } + std::string output(original_len, '\0'); + PAIMON_ASSIGN_OR_RAISE( + int32_t decompressed_len, + decompressor->Decompress(input.data(), static_cast(input.size()), output.data(), + original_len)); + output.resize(decompressed_len); + return output; +} + +Result GetRequiredValue(const std::shared_ptr& metadata, + const char* key) { + int32_t index = metadata->FindKey(key); + if (index < 0) { + return Status::Invalid(fmt::format("missing extend metadata key: {}", key)); + } + return metadata->value(index); +} + +Result GetRequiredInt32(const std::shared_ptr& metadata, + const char* key) { + PAIMON_ASSIGN_OR_RAISE(std::string value, GetRequiredValue(metadata, key)); + std::optional parsed = StringUtils::StringToValue(value); + if (!parsed.has_value()) { + return Status::Invalid(fmt::format("malformed extend metadata value for key: {}", key)); + } + return parsed.value(); +} + +std::string SerializeFieldDict(const ExtendMapFileMeta& file_meta) { + return JsonEncodeObject([&](rapidjson::Document* doc, + rapidjson::Document::AllocatorType* alloc) { + for (const auto& [name, id] : file_meta.name_to_id) { + doc->AddMember(rapidjson::Value(name.c_str(), *alloc), rapidjson::Value(id), *alloc); + } + }); +} + +std::string SerializeFieldColumns(const ExtendMapFileMeta& file_meta) { + return JsonEncodeObject( + [&](rapidjson::Document* doc, rapidjson::Document::AllocatorType* alloc) { + for (const auto& [field_id, col_vec] : file_meta.field_to_columns) { + rapidjson::Value array(rapidjson::kArrayType); + std::vector sorted_cols(col_vec.begin(), col_vec.end()); + std::sort(sorted_cols.begin(), sorted_cols.end()); + for (int32_t col : sorted_cols) { + array.PushBack(col, *alloc); + } + std::string key = std::to_string(field_id); + doc->AddMember(rapidjson::Value(key.c_str(), *alloc), array, *alloc); + } + }); +} + +std::string SerializeOverflowSet(const ExtendMapFileMeta& file_meta) { + return JsonEncodeArray( + [&](rapidjson::Document* doc, rapidjson::Document::AllocatorType* alloc) { + std::vector sorted(file_meta.overflow_field_set.begin(), + file_meta.overflow_field_set.end()); + std::sort(sorted.begin(), sorted.end()); + for (int32_t field_id : sorted) { + doc->PushBack(field_id, *alloc); + } + }); +} + +Result> DeserializeFieldDict(const std::string& json_str) { + rapidjson::Document doc; + doc.Parse(json_str.c_str()); + if (doc.HasParseError() || !doc.IsObject()) { + return Status::Invalid("malformed extend field_dict metadata"); + } + std::map name_to_id; + for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) { + name_to_id[it->name.GetString()] = it->value.GetInt(); + } + return name_to_id; +} + +Result>> DeserializeFieldColumns( + const std::string& json_str) { + rapidjson::Document doc; + doc.Parse(json_str.c_str()); + if (doc.HasParseError() || !doc.IsObject()) { + return Status::Invalid("malformed extend field_columns metadata"); + } + std::map> field_to_columns; + for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) { + std::optional field_id = StringUtils::StringToValue(it->name.GetString()); + if (!field_id.has_value()) { + return Status::Invalid("malformed extend field_columns: invalid field_id key"); + } + const auto& array = it->value; + if (!array.IsArray()) { + return Status::Invalid("malformed extend field_columns: value is not array"); + } + std::vector cols; + cols.reserve(array.Size()); + for (rapidjson::SizeType i = 0; i < array.Size(); ++i) { + cols.push_back(array[i].GetInt()); + } + field_to_columns[field_id.value()] = std::move(cols); + } + return field_to_columns; +} + +Result> DeserializeOverflowSet(const std::string& json_str) { + rapidjson::Document doc; + doc.Parse(json_str.c_str()); + if (doc.HasParseError() || !doc.IsArray()) { + return Status::Invalid("malformed extend overflow_set metadata"); + } + std::set overflow_set; + for (rapidjson::SizeType i = 0; i < doc.Size(); ++i) { + overflow_set.insert(doc[i].GetInt()); + } + return overflow_set; +} + +} // namespace + +Status ExtendMapUtils::SerializeMetadata(const ExtendMapFileMeta& file_meta, + const std::string& compression, + arrow::KeyValueMetadata* metadata) { + metadata->Append(ExtendMapDefine::kVersion, std::to_string(ExtendMapDefine::kCurrentVersion)); + metadata->Append(ExtendMapDefine::kStorageLayout, ExtendMapDefine::kStorageLayoutExtend); + + std::string field_dict_json = SerializeFieldDict(file_meta); + metadata->Append(ExtendMapDefine::kFieldDictOriginalSize, + std::to_string(field_dict_json.size())); + PAIMON_ASSIGN_OR_RAISE(std::string compressed_dict, + CompressString(field_dict_json, compression)); + metadata->Append(ExtendMapDefine::kFieldDict, std::move(compressed_dict)); + + metadata->Append(ExtendMapDefine::kFieldColumns, SerializeFieldColumns(file_meta)); + metadata->Append(ExtendMapDefine::kOverflowSet, SerializeOverflowSet(file_meta)); + metadata->Append(ExtendMapDefine::kNumColumns, std::to_string(file_meta.num_columns)); + metadata->Append(ExtendMapDefine::kMaxRowWidth, std::to_string(file_meta.max_row_width)); + + return Status::OK(); +} + +Result ExtendMapUtils::DeserializeMetadata( + const std::shared_ptr& metadata, const std::string& compression) { + if (!metadata) { + return Status::Invalid("metadata is null"); + } + PAIMON_ASSIGN_OR_RAISE(int32_t version, GetRequiredInt32(metadata, ExtendMapDefine::kVersion)); + if (version != ExtendMapDefine::kCurrentVersion) { + return Status::Invalid( + fmt::format("unsupported extend-map metadata version: {}, expected: {}", version, + ExtendMapDefine::kCurrentVersion)); + } + + ExtendMapFileMeta result; + + // field_dict (compressed) + PAIMON_ASSIGN_OR_RAISE(int32_t original_len, + GetRequiredInt32(metadata, ExtendMapDefine::kFieldDictOriginalSize)); + PAIMON_ASSIGN_OR_RAISE(std::string compressed_dict, + GetRequiredValue(metadata, ExtendMapDefine::kFieldDict)); + PAIMON_ASSIGN_OR_RAISE(std::string field_dict_json, + DecompressString(compressed_dict, original_len, compression)); + PAIMON_ASSIGN_OR_RAISE(result.name_to_id, DeserializeFieldDict(field_dict_json)); + + // field_columns + PAIMON_ASSIGN_OR_RAISE(std::string field_columns_json, + GetRequiredValue(metadata, ExtendMapDefine::kFieldColumns)); + PAIMON_ASSIGN_OR_RAISE(result.field_to_columns, DeserializeFieldColumns(field_columns_json)); + + // overflow_set + PAIMON_ASSIGN_OR_RAISE(std::string overflow_json, + GetRequiredValue(metadata, ExtendMapDefine::kOverflowSet)); + PAIMON_ASSIGN_OR_RAISE(result.overflow_field_set, DeserializeOverflowSet(overflow_json)); + + // num_columns & max_row_width + PAIMON_ASSIGN_OR_RAISE(result.num_columns, + GetRequiredInt32(metadata, ExtendMapDefine::kNumColumns)); + PAIMON_ASSIGN_OR_RAISE(result.max_row_width, + GetRequiredInt32(metadata, ExtendMapDefine::kMaxRowWidth)); + + return result; +} + +bool ExtendMapUtils::HasExtendMetadata(const std::shared_ptr& metadata) { + if (!metadata) { + return false; + } + auto index = metadata->FindKey(ExtendMapDefine::kStorageLayout); + if (index < 0) { + return false; + } + return metadata->value(index) == ExtendMapDefine::kStorageLayoutExtend; +} + +} // namespace paimon diff --git a/src/paimon/common/utils/extend_map_utils.h b/src/paimon/common/utils/extend_map_utils.h new file mode 100644 index 000000000..10bbae1fa --- /dev/null +++ b/src/paimon/common/utils/extend_map_utils.h @@ -0,0 +1,112 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "paimon/common/utils/extend_map_defs.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class KeyValueMetadata; +class Schema; +} // namespace arrow + +namespace paimon { + +class CoreOptions; + +/// Utility functions for columnar-extend MAP storage layout. +class ExtendMapUtils { + public: + ExtendMapUtils() = delete; + ~ExtendMapUtils() = delete; + + // ---- Column detection ---- + + /// Checks whether a given arrow field is MAP (the type prerequisite for extend). + /// @param arrow_type The Arrow data type of the column. + /// @return true if the type is MAP. + static bool IsStringKeyMap(const std::shared_ptr& arrow_type); + + /// Finds all extend MAP column indices in a schema by checking per-column config + /// via CoreOptions. + /// @param schema The logical Arrow schema. + /// @param options CoreOptions containing per-column configuration. + /// @return Vector of column indices whose map-storage-layout is "extend", or error + /// if validation fails. + static Result> DetectExtendColumns( + const std::shared_ptr& schema, const CoreOptions& options); + + // ---- Schema conversion ---- + + /// Converts a logical schema to a physical schema by replacing extend MAP columns + /// with their physical Struct representation. + /// @param logical_schema The original schema with MAP columns. + /// @param column_to_num_columns Map from column index to its physical column count K. + /// Each extend column can have its own width. + /// @return The physical schema for file writing. + static Result> LogicalToPhysicalSchema( + const std::shared_ptr& logical_schema, + const std::map& column_to_num_columns); + + /// Builds column_to_num_columns map from DetectExtendColumns result and CoreOptions. + /// @param extend_column_indices Indices returned by DetectExtendColumns. + /// @param schema The logical Arrow schema (used to get field names). + /// @param options CoreOptions containing per-column max-columns config. + /// @return Map from column index to K (max physical columns for that column). + static Result> BuildColumnToNumColumns( + const std::vector& extend_column_indices, + const std::shared_ptr& schema, const CoreOptions& options); + + // ---- Metadata serialization ---- + + /// Serializes extend metadata and appends entries to an existing KeyValueMetadata. + /// @param file_meta The file-level extend metadata to serialize. + /// @param compression Compression codec name for field_dict compression. + /// @param[out] metadata The KeyValueMetadata to append entries to. + static Status SerializeMetadata(const ExtendMapFileMeta& file_meta, + const std::string& compression, + arrow::KeyValueMetadata* metadata); + + /// Deserializes extend metadata from file footer KeyValueMetadata. + /// @param metadata The KeyValueMetadata from file footer. + /// @param compression Compression codec name. + /// @return Parsed ExtendMapFileMeta, or error if metadata is missing/malformed. + static Result DeserializeMetadata( + const std::shared_ptr& metadata, const std::string& compression); + + /// Checks whether a KeyValueMetadata contains extend MAP metadata. + static bool HasExtendMetadata(const std::shared_ptr& metadata); + + private: + /// Builds the physical Arrow type for one extend MAP column. + /// @param value_type The value type of the original MAP. + /// @param num_columns Number of physical columns K. + /// @param value_nullable Whether the MAP's value field is nullable. + static std::shared_ptr BuildPhysicalStructType( + const std::shared_ptr& value_type, int32_t num_columns, + bool value_nullable); +}; + +} // namespace paimon diff --git a/src/paimon/common/utils/extend_map_utils_test.cpp b/src/paimon/common/utils/extend_map_utils_test.cpp new file mode 100644 index 000000000..4daddf7cd --- /dev/null +++ b/src/paimon/common/utils/extend_map_utils_test.cpp @@ -0,0 +1,330 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/extend_map_utils.h" + +#include "arrow/type.h" +#include "arrow/util/key_value_metadata.h" +#include "gtest/gtest.h" +#include "paimon/core/core_options.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +// ---- IsStringKeyMap ---- + +TEST(ExtendMapUtilsTest, IsStringKeyMap) { + ASSERT_TRUE(ExtendMapUtils::IsStringKeyMap(arrow::map(arrow::utf8(), arrow::int32()))); + ASSERT_TRUE(ExtendMapUtils::IsStringKeyMap(arrow::map(arrow::utf8(), arrow::float64()))); + // Nested value type (struct) + auto nested_value = + arrow::struct_({arrow::field("x", arrow::int32()), arrow::field("y", arrow::utf8())}); + ASSERT_TRUE(ExtendMapUtils::IsStringKeyMap(arrow::map(arrow::utf8(), nested_value))); + ASSERT_FALSE(ExtendMapUtils::IsStringKeyMap(arrow::map(arrow::int32(), arrow::utf8()))); + ASSERT_FALSE(ExtendMapUtils::IsStringKeyMap(arrow::int32())); + ASSERT_FALSE(ExtendMapUtils::IsStringKeyMap(arrow::list(arrow::utf8()))); +} + +// ---- DetectExtendColumns ---- + +TEST(ExtendMapUtilsTest, DetectExtendColumnsBasic) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), + arrow::field("metrics", arrow::map(arrow::utf8(), arrow::float64())), + arrow::field("name", arrow::utf8()), + }); + + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{"fields.tags.map-storage-layout", "extend"}, + {"fields.metrics.map-storage-layout", "extend"}})); + + ASSERT_OK_AND_ASSIGN(auto indices, ExtendMapUtils::DetectExtendColumns(schema, options)); + ASSERT_EQ(indices.size(), 2); + ASSERT_EQ(indices[0], 1); + ASSERT_EQ(indices[1], 2); +} + +TEST(ExtendMapUtilsTest, DetectExtendColumnsNoExtend) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), + }); + + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(auto indices, ExtendMapUtils::DetectExtendColumns(schema, options)); + ASSERT_TRUE(indices.empty()); +} + +// ---- LogicalToPhysicalSchema ---- + +TEST(ExtendMapUtilsTest, LogicalToPhysicalSchemaBasic) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), + arrow::field("name", arrow::utf8()), + }); + + std::map column_to_num_columns = {{1, 4}}; + ASSERT_OK_AND_ASSIGN(auto physical_schema, + ExtendMapUtils::LogicalToPhysicalSchema(schema, column_to_num_columns)); + + // Build expected schema for comparison + auto expected_struct = arrow::struct_({ + arrow::field("__field_mapping", arrow::list(arrow::int32()), false), + arrow::field("__col_0", arrow::utf8(), true), + arrow::field("__col_1", arrow::utf8(), true), + arrow::field("__col_2", arrow::utf8(), true), + arrow::field("__col_3", arrow::utf8(), true), + arrow::field("__overflow", arrow::map(arrow::int32(), arrow::utf8()), true), + }); + auto expected_schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", expected_struct, true), + arrow::field("name", arrow::utf8()), + }); + ASSERT_TRUE(physical_schema->Equals(expected_schema)); +} + +TEST(ExtendMapUtilsTest, LogicalToPhysicalSchemaNestedValue) { + // MAP> + auto nested_value = + arrow::struct_({arrow::field("a", arrow::int32()), arrow::field("b", arrow::utf8())}); + auto map_type = arrow::map(arrow::utf8(), nested_value); + auto schema = arrow::schema({arrow::field("data", map_type)}); + + std::map column_to_num_columns = {{0, 2}}; + ASSERT_OK_AND_ASSIGN(auto physical_schema, + ExtendMapUtils::LogicalToPhysicalSchema(schema, column_to_num_columns)); + + auto expected_struct = arrow::struct_({ + arrow::field("__field_mapping", arrow::list(arrow::int32()), false), + arrow::field("__col_0", nested_value, true), + arrow::field("__col_1", nested_value, true), + arrow::field("__overflow", arrow::map(arrow::int32(), nested_value), true), + }); + auto expected_schema = arrow::schema({arrow::field("data", expected_struct, true)}); + ASSERT_TRUE(physical_schema->Equals(expected_schema)); +} + +TEST(ExtendMapUtilsTest, LogicalToPhysicalSchemaNullable) { + // MAP value is nullable + auto nullable_map = arrow::map(arrow::utf8(), arrow::field("item", arrow::int64(), true)); + auto schema_nullable = arrow::schema({arrow::field("m", nullable_map)}); + std::map col_map = {{0, 2}}; + + ASSERT_OK_AND_ASSIGN(auto physical, + ExtendMapUtils::LogicalToPhysicalSchema(schema_nullable, col_map)); + auto struct_type = physical->field(0)->type(); + ASSERT_TRUE(struct_type->field(1)->nullable()); + ASSERT_TRUE(struct_type->field(2)->nullable()); + + // MAP value is non-nullable + auto non_nullable_map = arrow::map(arrow::utf8(), arrow::field("item", arrow::int64(), false)); + auto schema_non_nullable = arrow::schema({arrow::field("m", non_nullable_map)}); + + ASSERT_OK_AND_ASSIGN(auto physical2, + ExtendMapUtils::LogicalToPhysicalSchema(schema_non_nullable, col_map)); + auto struct_type2 = physical2->field(0)->type(); + ASSERT_FALSE(struct_type2->field(1)->nullable()); + ASSERT_FALSE(struct_type2->field(2)->nullable()); +} + +TEST(ExtendMapUtilsTest, LogicalToPhysicalSchemaNoExtendColumns) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("name", arrow::utf8()), + }); + + std::map empty_map; + ASSERT_OK_AND_ASSIGN(auto physical_schema, + ExtendMapUtils::LogicalToPhysicalSchema(schema, empty_map)); + ASSERT_TRUE(physical_schema->Equals(schema)); +} + +// ---- BuildColumnToNumColumns ---- + +TEST(ExtendMapUtilsTest, BuildColumnToNumColumns) { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), + arrow::field("metrics", arrow::map(arrow::utf8(), arrow::float64())), + }); + + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{"fields.tags.map-extend.max-columns", "128"}, + {"fields.metrics.map-extend.max-columns", "64"}})); + + std::vector extend_indices = {1, 2}; + ASSERT_OK_AND_ASSIGN(auto result, + ExtendMapUtils::BuildColumnToNumColumns(extend_indices, schema, options)); + + ASSERT_EQ(result.size(), 2); + ASSERT_EQ(result[1], 128); + ASSERT_EQ(result[2], 64); +} + +TEST(ExtendMapUtilsTest, BuildColumnToNumColumnsDefault) { + auto schema = arrow::schema({ + arrow::field("tags", arrow::map(arrow::utf8(), arrow::utf8())), + }); + + // No explicit max-columns config -> default 256 + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + std::vector extend_indices = {0}; + ASSERT_OK_AND_ASSIGN(auto result, + ExtendMapUtils::BuildColumnToNumColumns(extend_indices, schema, options)); + ASSERT_EQ(result[0], 256); +} + +// ---- SerializeMetadata / DeserializeMetadata roundtrip ---- + +TEST(ExtendMapUtilsTest, MetadataRoundtripNoneCompression) { + ExtendMapFileMeta original; + original.name_to_id = {{"age", 0}, {"name", 1}}; + original.field_to_columns = {{0, {0}}, {1, {1, 2}}}; + original.overflow_field_set = {1, 5}; + original.num_columns = 3; + original.max_row_width = 2; + + auto metadata = std::make_shared(); + ASSERT_OK(ExtendMapUtils::SerializeMetadata(original, "none", metadata.get())); + + // Verify raw KV strings to get intuition of what's stored + auto find_value = [&](const char* key) -> std::string { + int32_t idx = metadata->FindKey(key); + EXPECT_GE(idx, 0); + return metadata->value(idx); + }; + ASSERT_EQ(find_value(ExtendMapDefine::kVersion), "1"); + ASSERT_EQ(find_value(ExtendMapDefine::kStorageLayout), "extend"); + ASSERT_EQ(find_value(ExtendMapDefine::kNumColumns), "3"); + ASSERT_EQ(find_value(ExtendMapDefine::kMaxRowWidth), "2"); + + std::string expected_dict = R"({"age":0,"name":1})"; + ASSERT_EQ(find_value(ExtendMapDefine::kFieldDict), expected_dict); + // field_dict_original_size should be the length of the JSON string + std::string field_dict_original_size = find_value(ExtendMapDefine::kFieldDictOriginalSize); + ASSERT_EQ(field_dict_original_size, std::to_string(expected_dict.size())); + + std::string expected_field_to_columns = R"({"0":[0],"1":[1,2]})"; + ASSERT_EQ(find_value(ExtendMapDefine::kFieldColumns), expected_field_to_columns); + + // overflow_set is a JSON array of sorted field_ids + ASSERT_EQ(find_value(ExtendMapDefine::kOverflowSet), "[1,5]"); + + // Roundtrip verify + ASSERT_OK_AND_ASSIGN(auto deserialized, ExtendMapUtils::DeserializeMetadata(metadata, "none")); + ASSERT_EQ(deserialized, original); +} + +TEST(ExtendMapUtilsTest, MetadataRoundtripCompression) { + ExtendMapFileMeta original; + original.name_to_id = {{"alpha", 0}, {"beta", 1}, {"gamma", 2}}; + original.field_to_columns = {{0, {0, 1, 2}}, {1, {3}}, {2, {4, 5}}}; + original.overflow_field_set = {2}; + original.num_columns = 6; + original.max_row_width = 3; + + auto verify_roundtrip = [&](const std::string& compression) { + auto metadata = std::make_shared(); + ASSERT_OK(ExtendMapUtils::SerializeMetadata(original, compression, metadata.get())); + ASSERT_OK_AND_ASSIGN(auto deserialized, + ExtendMapUtils::DeserializeMetadata(metadata, compression)); + ASSERT_EQ(deserialized, original); + }; + + verify_roundtrip("none"); + verify_roundtrip("lz4"); + verify_roundtrip("zstd"); +} + +TEST(ExtendMapUtilsTest, MetadataRoundtripEmptyData) { + ExtendMapFileMeta original; + + auto verify_roundtrip = [&](const std::string& compression) { + auto metadata = std::make_shared(); + ASSERT_OK(ExtendMapUtils::SerializeMetadata(original, compression, metadata.get())); + ASSERT_OK_AND_ASSIGN(auto deserialized, + ExtendMapUtils::DeserializeMetadata(metadata, compression)); + ASSERT_EQ(deserialized, original); + }; + + verify_roundtrip("none"); + verify_roundtrip("lz4"); + verify_roundtrip("zstd"); +} + +// ---- DeserializeMetadata error cases ---- + +TEST(ExtendMapUtilsTest, DeserializeMetadataErrors) { + // nullptr + ASSERT_NOK_WITH_MSG(ExtendMapUtils::DeserializeMetadata(nullptr, "none"), "metadata is null"); + // missing version + { + auto metadata = std::make_shared(); + metadata->Append("some_key", "some_value"); + ASSERT_NOK_WITH_MSG(ExtendMapUtils::DeserializeMetadata(metadata, "none"), + "missing extend metadata key: paimon.map-extend.version"); + } + // wrong version + { + auto metadata = std::make_shared(); + metadata->Append(ExtendMapDefine::kVersion, "999"); + metadata->Append(ExtendMapDefine::kFieldDictOriginalSize, "2"); + metadata->Append(ExtendMapDefine::kFieldDict, "{}"); + ASSERT_NOK_WITH_MSG(ExtendMapUtils::DeserializeMetadata(metadata, "none"), + "unsupported extend-map metadata version: 999"); + } + // missing field_dict + { + auto metadata = std::make_shared(); + metadata->Append(ExtendMapDefine::kVersion, "1"); + metadata->Append(ExtendMapDefine::kFieldDictOriginalSize, "2"); + ASSERT_NOK_WITH_MSG(ExtendMapUtils::DeserializeMetadata(metadata, "none"), + "missing extend metadata key: paimon.map-extend.field-dict"); + } +} + +// ---- HasExtendMetadata ---- + +TEST(ExtendMapUtilsTest, HasExtendMetadata) { + ASSERT_FALSE(ExtendMapUtils::HasExtendMetadata(nullptr)); + { + auto metadata = std::make_shared(); + metadata->Append(ExtendMapDefine::kStorageLayout, ExtendMapDefine::kStorageLayoutExtend); + ASSERT_TRUE(ExtendMapUtils::HasExtendMetadata(metadata)); + } + { + auto metadata = std::make_shared(); + metadata->Append(ExtendMapDefine::kStorageLayout, "default"); + ASSERT_FALSE(ExtendMapUtils::HasExtendMetadata(metadata)); + } + { + auto metadata = std::make_shared(); + ASSERT_FALSE(ExtendMapUtils::HasExtendMetadata(metadata)); + } +} + +// ---- PhysicalColumnName ---- + +TEST(ExtendMapUtilsTest, PhysicalColumnName) { + ASSERT_EQ(ExtendMapDefine::PhysicalColumnName(0), "__col_0"); + ASSERT_EQ(ExtendMapDefine::PhysicalColumnName(1), "__col_1"); + ASSERT_EQ(ExtendMapDefine::PhysicalColumnName(99), "__col_99"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 818995706..7866bc485 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -26,6 +26,7 @@ #include "paimon/common/fs/resolving_file_system.h" #include "paimon/common/options/memory_size.h" #include "paimon/common/options/time_duration.h" +#include "paimon/common/utils/options_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/options/expire_config.h" @@ -1173,6 +1174,32 @@ Result CoreOptions::FieldCollectAggDistinct(const std::string& field_name) return distinct; } +Result CoreOptions::GetMapStorageLayout(const std::string& field_name) const { + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::MAP_STORAGE_LAYOUT); + PAIMON_ASSIGN_OR_RAISE(std::string layout_str, OptionsUtils::GetValueFromMap( + impl_->raw_options, key, "default")); + std::string lower = StringUtils::ToLowerCase(layout_str); + if (lower == "extend") { + return MapStorageLayout::EXTEND; + } else if (lower == "default") { + return MapStorageLayout::DEFAULT; + } + return Status::Invalid(fmt::format("invalid map-storage-layout: {}", layout_str)); +} + +Result CoreOptions::GetMapExtendMaxColumns(const std::string& field_name) const { + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::MAP_EXTEND_MAX_COLUMNS); + PAIMON_ASSIGN_OR_RAISE(int32_t max_columns, + OptionsUtils::GetValueFromMap(impl_->raw_options, key, 256)); + if (max_columns <= 0) { + return Status::Invalid( + fmt::format("options {} must > 0", std::string(Options::MAP_EXTEND_MAX_COLUMNS))); + } + return max_columns; +} + bool CoreOptions::DeletionVectorsEnabled() const { return impl_->deletion_vectors_enabled; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 699506413..34290e9d0 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -29,6 +29,7 @@ #include "paimon/core/options/external_path_strategy.h" #include "paimon/core/options/lookup_compact_mode.h" #include "paimon/core/options/lookup_strategy.h" +#include "paimon/core/options/map_storage_layout.h" #include "paimon/core/options/merge_engine.h" #include "paimon/core/options/sort_engine.h" #include "paimon/format/file_format.h" @@ -115,6 +116,10 @@ class PAIMON_EXPORT CoreOptions { Result FieldAggIgnoreRetract(const std::string& field_name) const; Result FieldListAggDelimiter(const std::string& field_name) const; Result FieldCollectAggDistinct(const std::string& field_name) const; + + Result GetMapStorageLayout(const std::string& field_name) const; + Result GetMapExtendMaxColumns(const std::string& field_name) const; + bool DeletionVectorsEnabled() const; bool DeletionVectorsBitmap64() const; int64_t DeletionVectorTargetFileSize() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 4d8a26884..fec00afea 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -88,6 +88,8 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_FALSE(core_options.FieldAggIgnoreRetract("f1").value()); ASSERT_EQ(",", core_options.FieldListAggDelimiter("f1").value()); ASSERT_FALSE(core_options.FieldCollectAggDistinct("f1").value()); + ASSERT_EQ(MapStorageLayout::DEFAULT, core_options.GetMapStorageLayout("any_col").value()); + ASSERT_EQ(256, core_options.GetMapExtendMaxColumns("any_col").value()); ASSERT_FALSE(core_options.DeletionVectorsEnabled()); ASSERT_FALSE(core_options.DeletionVectorsBitmap64()); ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); @@ -260,7 +262,9 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, "2"}, {Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED, "true"}, {Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "true"}, - {Options::BUCKET_FUNCTION_TYPE, "mod"}}; + {Options::BUCKET_FUNCTION_TYPE, "mod"}, + {"fields.metrics.map-storage-layout", "extend"}, + {"fields.metrics.map-extend.max-columns", "128"}}; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); auto fs = core_options.GetFileSystem(); @@ -400,6 +404,8 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_TRUE(core_options.LookupRemoteFileEnabled()); ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), 2); ASSERT_EQ(BucketFunctionType::MOD, core_options.GetBucketFunctionType()); + ASSERT_EQ(MapStorageLayout::EXTEND, core_options.GetMapStorageLayout("metrics").value()); + ASSERT_EQ(128, core_options.GetMapExtendMaxColumns("metrics").value()); } TEST(CoreOptionsTest, TestInvalidCase) { @@ -903,4 +909,48 @@ TEST(CoreOptionsTest, TestFallback) { std::vector({"new_b1", "new_b2"})); } } +TEST(CoreOptionsTest, TestMapStorageLayout) { + // Test extend layout configured for a specific column + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{"fields.ext_map.map-storage-layout", "extend"}, + {"fields.ext_map.map-extend.max-columns", "64"}, + {"fields.normal_map.map-storage-layout", "default"}})); + ASSERT_EQ(MapStorageLayout::EXTEND, options.GetMapStorageLayout("ext_map").value()); + ASSERT_EQ(64, options.GetMapExtendMaxColumns("ext_map").value()); + ASSERT_EQ(MapStorageLayout::DEFAULT, options.GetMapStorageLayout("normal_map").value()); + // Unconfigured column falls back to default + ASSERT_EQ(MapStorageLayout::DEFAULT, options.GetMapStorageLayout("other").value()); + ASSERT_EQ(256, options.GetMapExtendMaxColumns("other").value()); + } + // Test case-insensitive layout value + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{"fields.metrics.map-storage-layout", "Extend"}})); + ASSERT_EQ(MapStorageLayout::EXTEND, options.GetMapStorageLayout("metrics").value()); + } + // Test invalid layout value + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{"fields.col.map-storage-layout", "invalid"}})); + ASSERT_NOK_WITH_MSG(options.GetMapStorageLayout("col"), + "invalid map-storage-layout: invalid"); + } + // Test invalid max-columns value + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{"fields.col.map-extend.max-columns", "0"}})); + ASSERT_NOK_WITH_MSG(options.GetMapExtendMaxColumns("col"), + "options map-extend.max-columns must > 0"); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{"fields.col.map-extend.max-columns", "-1"}})); + ASSERT_NOK_WITH_MSG(options.GetMapExtendMaxColumns("col"), + "options map-extend.max-columns must > 0"); + } +} + } // namespace paimon::test diff --git a/src/paimon/core/options/map_storage_layout.h b/src/paimon/core/options/map_storage_layout.h new file mode 100644 index 000000000..ae30d7079 --- /dev/null +++ b/src/paimon/core/options/map_storage_layout.h @@ -0,0 +1,27 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace paimon { +/// Specifies the physical storage layout for MAP columns. +enum class MapStorageLayout { + /// Default KV-array storage (keys array + values array). + DEFAULT = 0, + /// Columnar-extend layout: K reusable typed columns with per-row field mapping. + EXTEND = 1 +}; +} // namespace paimon diff --git a/src/paimon/core/schema/schema_validation.cpp b/src/paimon/core/schema/schema_validation.cpp index 4af714124..b9bd3dcbf 100644 --- a/src/paimon/core/schema/schema_validation.cpp +++ b/src/paimon/core/schema/schema_validation.cpp @@ -32,12 +32,14 @@ #include "paimon/common/data/blob_utils.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" +#include "paimon/common/utils/extend_map_utils.h" #include "paimon/common/utils/object_utils.h" #include "paimon/common/utils/preconditions.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/core_options.h" #include "paimon/core/options/changelog_producer.h" #include "paimon/core/options/expire_config.h" +#include "paimon/core/options/map_storage_layout.h" #include "paimon/core/options/merge_engine.h" #include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/core/schema/table_schema.h" @@ -119,6 +121,7 @@ Status SchemaValidation::ValidateTableSchema(const TableSchema& schema) { PAIMON_RETURN_NOT_OK(ValidateRowTracking(schema, options)); PAIMON_RETURN_NOT_OK(ValidateBlobFields(schema, options)); + PAIMON_RETURN_NOT_OK(ValidateMapStorageLayout(schema, options)); return Status::OK(); } @@ -505,4 +508,64 @@ Status SchemaValidation::ValidateBlobFields(const TableSchema& schema, const Cor return Status::OK(); } +Status SchemaValidation::ValidateMapStorageLayout(const TableSchema& schema, + const CoreOptions& options) { + // Extract all field names that have map-storage-layout configured from options + const std::string layout_suffix = std::string(".") + std::string(Options::MAP_STORAGE_LAYOUT); + const auto& options_map = options.ToMap(); + const auto& field_names = schema.FieldNames(); + + std::unordered_map> schema_fields; + for (const auto& field : schema.Fields()) { + schema_fields[field.Name()] = field.Type(); + } + + for (const auto& [key, value] : options_map) { + if (!StringUtils::StartsWith(key, Options::FIELDS_PREFIX)) { + continue; + } + if (!StringUtils::EndsWith(key, layout_suffix)) { + continue; + } + // key = "fields..map-storage-layout" + // Extract field_name: skip "fields." prefix and ".map-storage-layout" suffix + std::string field_name = key.substr( + std::string(Options::FIELDS_PREFIX).size() + 1, + key.size() - std::string(Options::FIELDS_PREFIX).size() - 1 - layout_suffix.size()); + + // Check field exists in schema + auto it = schema_fields.find(field_name); + if (it == schema_fields.end()) { + return Status::Invalid( + fmt::format("Column '{}' is configured with map-storage-layout " + "but does not exist in table schema.", + field_name)); + } + + // Any column configured with map-storage-layout must be a MAP type + const auto& field_type = it->second; + if (field_type->id() != arrow::Type::MAP) { + return Status::Invalid( + fmt::format("Column '{}' is configured with map-storage-layout " + "but its type is not MAP.", + field_name)); + } + + PAIMON_ASSIGN_OR_RAISE(MapStorageLayout layout, options.GetMapStorageLayout(field_name)); + if (layout != MapStorageLayout::EXTEND) { + continue; + } + // Column configured with extend must be MAP + if (!ExtendMapUtils::IsStringKeyMap(field_type)) { + return Status::Invalid( + fmt::format("Column '{}' is configured with map-storage-layout=extend " + "but its type is not MAP.", + field_name)); + } + // Validate max-columns config + PAIMON_RETURN_NOT_OK(options.GetMapExtendMaxColumns(field_name)); + } + return Status::OK(); +} + } // namespace paimon diff --git a/src/paimon/core/schema/schema_validation.h b/src/paimon/core/schema/schema_validation.h index 1c1875403..efbd0cf85 100644 --- a/src/paimon/core/schema/schema_validation.h +++ b/src/paimon/core/schema/schema_validation.h @@ -70,6 +70,8 @@ class SchemaValidation { static Status ValidateBlobFields(const TableSchema& schema, const CoreOptions& options); + static Status ValidateMapStorageLayout(const TableSchema& schema, const CoreOptions& options); + static bool IsComplexType(const std::shared_ptr& field); }; diff --git a/src/paimon/core/schema/schema_validation_test.cpp b/src/paimon/core/schema/schema_validation_test.cpp index 8907c3c64..40238c135 100644 --- a/src/paimon/core/schema/schema_validation_test.cpp +++ b/src/paimon/core/schema/schema_validation_test.cpp @@ -820,4 +820,101 @@ TEST(SchemaValidationTest, ValidateInvalidConfiguration) { "Data evolution config must disabled with deletion-vectors.enabled"); } } +TEST(SchemaValidationTest, TestMapStorageLayout) { + auto f0 = arrow::field("f0", arrow::utf8()); + auto f1 = arrow::field("f1", arrow::int32()); + auto f2 = arrow::field("f2", arrow::map(arrow::utf8(), arrow::int64())); + auto f3 = arrow::field("f3", arrow::map(arrow::int32(), arrow::utf8())); + + // Valid: extend on MAP column + { + arrow::FieldVector fields = {f0, f1, f2}; + auto schema = arrow::schema(fields); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f2.map-storage-layout", "extend"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema)); + } + // Invalid: field not in schema + { + arrow::FieldVector fields = {f0, f1}; + auto schema = arrow::schema(fields); + std::map options = { + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.nonexist.map-storage-layout", "extend"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), + "Field nonexist can not be found in table schema"); + } + // Invalid: non-MAP column configured with map-storage-layout (any value) + { + arrow::FieldVector fields = {f0, f1}; + auto schema = arrow::schema(fields); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f1.map-storage-layout", "default"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), "not MAP"); + } + // Invalid: extend on non-MAP column + { + arrow::FieldVector fields = {f0, f1}; + auto schema = arrow::schema(fields); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f1.map-storage-layout", "extend"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), "not MAP"); + } + // Invalid: extend on MAP with non-STRING key + { + arrow::FieldVector fields = {f0, f1, f3}; + auto schema = arrow::schema(fields); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f3.map-storage-layout", "extend"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), + "not MAP"); + } + // Valid: default layout on a MAP column + { + arrow::FieldVector fields = {f0, f1, f2}; + auto schema = arrow::schema(fields); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f2.map-storage-layout", "default"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema)); + } + // Invalid: extend with invalid max-columns + { + arrow::FieldVector fields = {f0, f1, f2}; + auto schema = arrow::schema(fields); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.f2.map-storage-layout", "extend"}, + {"fields.f2.map-extend.max-columns", "0"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), + "options map-extend.max-columns must > 0"); + } +} + } // namespace paimon::test