getFieldGetters() {
public boolean hasPrimaryKey() {
return hasPrimaryKey;
}
+
+ @Nullable
+ public BlobWriteContext getBlobWriteContext() {
+ return blobWriteContext;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java
new file mode 100644
index 00000000000..4fbb99bc1ee
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.blob;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobRef;
+import org.apache.paimon.table.FileStoreTable;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;
+
+/**
+ * Context for handling BLOB fields during CDC write operations.
+ *
+ * CDC connector only handles WRITE operations. We do NOT read external blob data. The actual
+ * blob data reading happens on Paimon read side.
+ *
+ *
Two blob storage modes for CDC write:
+ *
+ *
+ * - Mode 1 (raw data): VARBINARY/BINARY fields → BlobData → written to .blob files by Paimon.
+ *
- Mode 2 (descriptor): VARCHAR/STRING fields → BlobRef → only descriptor (uri, offset,
+ * length) stored inline. External data is NOT read or copied during write.
+ *
+ */
+public class BlobWriteContext implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isCaseSensitive;
+
+ /** Fields that should be converted to BLOB type. */
+ private final Set blobFields;
+
+ /** Fields configured with blob-descriptor-field (Mode 2). */
+ private final Set blobDescriptorFields;
+
+ private BlobWriteContext(
+ boolean isCaseSensitive, Set blobFields, Set blobDescriptorFields) {
+ this.isCaseSensitive = isCaseSensitive;
+ // If case-insensitive, normalize field names to lowercase for consistent matching
+ this.blobFields = downcaseFieldNames(blobFields, isCaseSensitive);
+ this.blobDescriptorFields = downcaseFieldNames(blobDescriptorFields, isCaseSensitive);
+ }
+
+ /** Downcase field names if case-insensitive. */
+ private static Set downcaseFieldNames(Set fieldNames, boolean caseSensitive) {
+ if (caseSensitive) {
+ return fieldNames;
+ }
+ Set normalized = new HashSet<>();
+ for (String fieldName : fieldNames) {
+ normalized.add(fieldName.toLowerCase());
+ }
+ return normalized;
+ }
+
+ /** Create BlobWriteContext from Paimon table's CoreOptions. */
+ public static BlobWriteContext fromTable(boolean isCaseSensitive, FileStoreTable table) {
+ List blobFieldList = CoreOptions.blobField(table.options());
+ if (blobFieldList.isEmpty()) {
+ return empty();
+ }
+ CoreOptions coreOptions = CoreOptions.fromMap(table.options());
+ Set blobDescriptorFields = coreOptions.blobDescriptorField();
+ return new BlobWriteContext(
+ isCaseSensitive, new HashSet<>(blobFieldList), blobDescriptorFields);
+ }
+
+ /** Create an empty BlobWriteContext with no blob fields. */
+ public static BlobWriteContext empty() {
+ return new BlobWriteContext(true, Collections.emptySet(), Collections.emptySet());
+ }
+
+ /** Check if a field should be converted to BLOB type. */
+ public boolean isBlobField(String fieldName) {
+ return blobFields.contains(toLowerCaseIfNeed(fieldName, isCaseSensitive));
+ }
+
+ /** Get the set of blob field names. */
+ public Set getBlobFields() {
+ return blobFields;
+ }
+
+ /** Check if a field is configured with blob-descriptor-field. */
+ public boolean isBlobDescriptorField(String fieldName) {
+ return blobDescriptorFields.contains(toLowerCaseIfNeed(fieldName, isCaseSensitive));
+ }
+
+ /** Get the set of blob descriptor field names. */
+ public Set getBlobDescriptorFields() {
+ return blobDescriptorFields;
+ }
+
+ /** Create BlobData from raw bytes (Mode 1). */
+ @Nullable
+ public Blob createBlob(@Nullable byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ return new BlobData(bytes);
+ }
+
+ /** Create BlobRef from a path string (Mode 2). */
+ @Nullable
+ public Blob createBlobRef(@Nullable String path) {
+ return createBlobRef(path, -1);
+ }
+
+ /** Create BlobRef from a path string with specified length (Mode 2). */
+ @Nullable
+ public Blob createBlobRef(@Nullable String path, long length) {
+ if (path == null) {
+ return null;
+ }
+ BlobDescriptor descriptor = new BlobDescriptor(path, 0, length);
+ // UriReader is null - CDC connector only writes descriptor info, not reading data
+ return new BlobRef(null, descriptor);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index d523feb721b..94b351c52f5 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -248,8 +248,8 @@ public SchemaChangeEvent convertSchemaChangeEvent(SchemaChangeEvent schemaChange
catalog.getTable(PaimonWriterHelper.identifierFromTableId(tableId)));
MixedSchemaInfo mixedSchemaInfo =
new MixedSchemaInfo(
- new TableSchemaInfo(upstreamSchema, zoneId),
- new TableSchemaInfo(physicalSchema, zoneId));
+ new TableSchemaInfo(upstreamSchema, zoneId, null),
+ new TableSchemaInfo(physicalSchema, zoneId, null));
if (!mixedSchemaInfo.isSameColumnsIgnoringCommentAndDefaultValue()) {
LOGGER.warn(
"Upstream schema of {} is {}, which is different with paimon physical table schema {}. Data precision loss and truncation may occur.",
@@ -276,13 +276,14 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent)
if (schema.isPresent()) {
MixedSchemaInfo mixedSchemaInfo =
new MixedSchemaInfo(
- new TableSchemaInfo(schema.get(), zoneId),
+ new TableSchemaInfo(schema.get(), zoneId, null),
new TableSchemaInfo(
PaimonWriterHelper.deduceSchemaForPaimonTable(
catalog.getTable(
PaimonWriterHelper.identifierFromTableId(
tableId))),
- zoneId));
+ zoneId,
+ null));
if (!mixedSchemaInfo.isSameColumnsIgnoringCommentAndDefaultValue()) {
LOGGER.warn(
"Upstream schema of {} is {}, which is different with paimon physical table schema {}. Data precision loss and truncation may occur.",
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 9d7e1829a08..81b7357a0de 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -28,6 +28,7 @@
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
@@ -52,6 +53,8 @@
import java.util.Objects;
import java.util.UUID;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Tests for {@link PaimonMetadataApplier}. */
class PaimonMetadataApplierTest {
@@ -723,4 +726,469 @@ public void testMysqlDefaultTimestampValueWithMicrosInAddColumn()
Assertions.assertThat(table).isNotNull();
}
+
+ // ========== BLOB type support tests ==========
+ // NOTE: Paimon BLOB type has the following constraints that prevent integration tests:
+ // 1. BLOB type requires data-evolution.enabled = true
+ // 2. BLOB type requires row-tracking.enabled = true
+ // 3. row-tracking.enabled cannot be combined with primary keys
+ // This means BLOB columns can only exist in append-only tables (no primary keys).
+ // Since CDC pipeline typically works with primary key tables, actual BLOB type creation
+ // tests are disabled. The type conversion logic (VARBINARY/BINARY to BLOB) is correctly
+ // implemented in PaimonMetadataApplier.convertBinaryToBlobIfNeeded() and
+ // SchemaChangeProvider.convertBinaryToBlobIfNeeded() methods.
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateTableWithoutBlobFieldConfig(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.no_blob_config_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "varbinary_content",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))
+ .physicalColumn(
+ "binary_content",
+ org.apache.flink.cdc.common.types.DataTypes.BINARY(10))
+ .primaryKey("id")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.no_blob_config_table"));
+ RowType rowType = table.rowType();
+
+ assertThat(rowType.getField("varbinary_content").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY);
+ assertThat(rowType.getField("binary_content").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BINARY);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testAddNormalVarbinaryColumnWithoutBlobConfig(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.normal_varbinary_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .primaryKey("id")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ List addedColumns = new ArrayList<>();
+ addedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "varbinary_col",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(TableId.parse("test.normal_varbinary_table"), addedColumns);
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.normal_varbinary_table"));
+ RowType rowType = table.rowType();
+
+ assertThat(rowType.getField("varbinary_col").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateTableWithEmptyBlobFieldConfig(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.empty_blob_config_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "content",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))
+ .primaryKey("id")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.empty_blob_config_table"));
+ RowType rowType = table.rowType();
+
+ assertThat(rowType.getField("content").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateTableWithNonExistentBlobFieldConfig(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "nonexistent_blob_col");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.nonexistent_blob_config_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "actual_varbinary",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))
+ .primaryKey("id")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.nonexistent_blob_config_table"));
+ RowType rowType = table.rowType();
+
+ assertThat(rowType.getField("actual_varbinary").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY);
+ assertThat(rowType.getField("id").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER);
+ }
+
+ // ========== BLOB type integration tests with append-only tables (no primary keys) ==========
+ // These tests verify actual BLOB type creation in Paimon tables.
+ // BLOB type requires:
+ // 1. row-tracking.enabled = true
+ // 2. data-evolution.enabled = true
+ // These options cannot be combined with primary keys.
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateAppendOnlyTableWithBlobField(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ // Configure blob-field to convert VARBINARY to BLOB
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "content");
+ // Required for BLOB type
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table (no primary keys)
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.blob_append_only_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "content",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))
+ // No primaryKey() - append-only table
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.blob_append_only_table"));
+ RowType rowType = table.rowType();
+
+ // Verify content field is converted to BLOB type
+ assertThat(rowType.getField("content").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ assertThat(rowType.getField("id").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateAppendOnlyTableWithMultipleBlobFields(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ // Configure multiple blob fields
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_content,image_data");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table with multiple VARBINARY columns
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.multi_blob_append_only_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "blob_content",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))
+ .physicalColumn(
+ "image_data",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(1000))
+ .physicalColumn(
+ "normal_varbinary",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(50))
+ // No primaryKey() - append-only table
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.multi_blob_append_only_table"));
+ RowType rowType = table.rowType();
+
+ // Verify blob_content and image_data are converted to BLOB
+ assertThat(rowType.getField("blob_content").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ assertThat(rowType.getField("image_data").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ // normal_varbinary is not in blob-field config, should remain VARBINARY
+ assertThat(rowType.getField("normal_varbinary").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testAddBlobColumnToAppendOnlyTable(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "new_blob_col");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table first
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.add_blob_column_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ // Add VARBINARY column that should be converted to BLOB
+ List addedColumns = new ArrayList<>();
+ addedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "new_blob_col",
+ org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100))));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(TableId.parse("test.add_blob_column_table"), addedColumns);
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.add_blob_column_table"));
+ RowType rowType = table.rowType();
+
+ // Verify the added column is BLOB type
+ assertThat(rowType.getField("new_blob_col").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateAppendOnlyTableWithBinaryToBlob(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ // Configure blob-field for BINARY type conversion
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "fixed_content");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table with BINARY column
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.binary_to_blob_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "fixed_content",
+ org.apache.flink.cdc.common.types.DataTypes.BINARY(10))
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.binary_to_blob_table"));
+ RowType rowType = table.rowType();
+
+ // Verify BINARY is also converted to BLOB type
+ assertThat(rowType.getField("fixed_content").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateAppendOnlyTableWithVarcharToBlob(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ // Configure blob-field for VARCHAR type conversion (path string case)
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_path");
+ tableOptions.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table with VARCHAR column (representing blob path)
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.varchar_to_blob_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "blob_path",
+ org.apache.flink.cdc.common.types.DataTypes.VARCHAR(500))
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.varchar_to_blob_table"));
+ RowType rowType = table.rowType();
+
+ // Verify VARCHAR is converted to BLOB type
+ assertThat(rowType.getField("blob_path").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ assertThat(rowType.getField("id").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateAppendOnlyTableWithStringToBlob(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ // Configure blob-field for STRING type conversion (path string case)
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_uri");
+ tableOptions.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table with STRING column (representing blob URI)
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.string_to_blob_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "blob_uri",
+ org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.string_to_blob_table"));
+ RowType rowType = table.rowType();
+
+ // Verify STRING is converted to BLOB type
+ assertThat(rowType.getField("blob_uri").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ assertThat(rowType.getField("id").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem"})
+ void testCreateAppendOnlyTableWithCharToBlob(String metastore)
+ throws Catalog.TableNotExistException,
+ Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException,
+ SchemaEvolveException {
+ initialize(metastore);
+ Map tableOptions = new HashMap<>();
+ // Configure blob-field for CHAR type conversion (path string case)
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_path");
+ tableOptions.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ // Create append-only table with CHAR column (representing blob path)
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.char_to_blob_table"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+ .physicalColumn(
+ "blob_path",
+ org.apache.flink.cdc.common.types.DataTypes.CHAR(500))
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Table table = catalog.getTable(Identifier.fromString("test.char_to_blob_table"));
+ RowType rowType = table.rowType();
+
+ // Verify CHAR is converted to BLOB type
+ assertThat(rowType.getField("blob_path").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB);
+ assertThat(rowType.getField("id").type().getTypeRoot())
+ .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER);
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java
new file mode 100644
index 00000000000..32abdb215aa
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkFactory;
+import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+
+/** The ITCase for writing to append-only tables using custom test source. */
+public class AppendOnlyTableITCase {
+
+ @TempDir public static java.nio.file.Path temporaryFolder;
+
+ String warehouse;
+
+ private static final int PARALLELISM = 4;
+
+ private static final TableId TEST_TABLE_ID = TableId.tableId("default_database", "table1");
+
+ // Always use parent-first classloader for CDC classes.
+ // The reason is that test source uses static field for holding data, we need to make sure
+ // the class is loaded by AppClassloader so that we can verify data in the test case.
+ private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
+ new org.apache.flink.configuration.Configuration();
+
+ static {
+ MINI_CLUSTER_CONFIG.set(
+ ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ Collections.singletonList("org.apache.flink.cdc"));
+ }
+
+ /**
+ * Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
+ * every test case.
+ */
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setConfiguration(MINI_CLUSTER_CONFIG)
+ .build());
+
+ @BeforeEach
+ void init() {
+ warehouse = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ }
+
+ @Test
+ void testWritingToAppendOnlyTable() throws Exception {
+ Options catalogOptions = new Options();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
+ runPipelineJob(false);
+
+ Table table = catalog.getTable(Identifier.fromString(TEST_TABLE_ID.identifier()));
+ List results = getResultsFromTable(table, false);
+ Assertions.assertThat(results)
+ .containsExactlyInAnyOrder(
+ "default_database.table1:uuid=550e8400-e29b-41d4-a716-446655440000;path=oss://my-bucket/data/files/document_001.pdf;blobContent=PDF binary content for document 001",
+ "default_database.table1:uuid=6ba7b810-9dad-11d1-80b4-00c04fd430c8;path=oss://my-bucket/data/images/photo_002.jpg;blobContent=JPG binary content for photo 002",
+ "default_database.table1:uuid=f47ac10b-58cc-4372-a567-0e02b2c3d479;path=oss://my-bucket/data/videos/movie_003.mp4;blobContent=MP4 binary content for movie 003");
+ }
+
+ @Test
+ void testWritingToAppendOnlyTableWithBlobDescriptor() throws Exception {
+ Options catalogOptions = new Options();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
+ runPipelineJob(true);
+
+ Table table = catalog.getTable(Identifier.fromString(TEST_TABLE_ID.identifier()));
+ List results = getResultsFromTable(table, true);
+ Assertions.assertThat(results)
+ .containsExactlyInAnyOrder(
+ "default_database.table1:uuid=550e8400-e29b-41d4-a716-446655440000;path=oss://my-bucket/data/files/document_001.pdf;blobContent=PDF binary content for document 001",
+ "default_database.table1:uuid=6ba7b810-9dad-11d1-80b4-00c04fd430c8;path=oss://my-bucket/data/images/photo_002.jpg;blobContent=JPG binary content for photo 002",
+ "default_database.table1:uuid=f47ac10b-58cc-4372-a567-0e02b2c3d479;path=oss://my-bucket/data/videos/movie_003.mp4;blobContent=MP4 binary content for movie 003");
+ }
+
+ private List getResultsFromTable(Table table, boolean withBlobDescriptor)
+ throws IOException {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+ List result = new ArrayList<>();
+ RecordReader reader = read.createReader(splits);
+ reader.forEachRemaining(
+ record -> {
+ String uuid = record.getString(0).toString();
+
+ String path =
+ withBlobDescriptor
+ ? record.getBlob(1).toDescriptor().uri()
+ : record.getString(1).toString();
+ String blobContent = new String(record.getBinary(2));
+ result.add(
+ String.format(
+ "%s:uuid=%s;path=%s;blobContent=%s",
+ TEST_TABLE_ID.identifier(), uuid, path, blobContent));
+ });
+ return result;
+ }
+
+ private void runPipelineJob(boolean withBlobDescriptor) throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup values source with APPEND_ONLY_BLOB_TABLE events
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.APPEND_ONLY_BLOB_TABLE);
+ SourceDef sourceDef =
+ new SourceDef(
+ ValuesDataFactory.IDENTIFIER, "Append Only Table Source", sourceConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, PARALLELISM);
+ pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "Asia/Shanghai");
+
+ // Setup paimon sink
+ Map sinkOptions = new HashMap<>();
+ sinkOptions.put("table.properties.parquet.use-native", "false");
+ if (withBlobDescriptor) {
+ sinkOptions.put("table.properties.row-tracking.enabled", "true");
+ sinkOptions.put("table.properties.data-evolution.enabled", "true");
+ sinkOptions.put("table.properties.blob-field", "path");
+ sinkOptions.put("table.properties.blob-descriptor-field", "path");
+ sinkOptions.put("table.properties.blob-as-descriptor", "true");
+ }
+ Configuration sinkConfig = Configuration.fromMap(sinkOptions);
+ sinkConfig.set(PaimonDataSinkOptions.METASTORE, "filesystem");
+ sinkConfig.set(PaimonDataSinkOptions.WAREHOUSE, warehouse);
+ SinkDef sinkDef = new SinkDef(PaimonDataSinkFactory.IDENTIFIER, "Paimon Sink", sinkConfig);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+
+ execution.execute();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 381c87b249c..a78e830c083 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -278,7 +278,8 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector)
initialize(metastore);
PaimonSink paimonSink =
new PaimonSink<>(
- catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ catalogOptions,
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions));
PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
Committer committer =
paimonSink.createCommitter(new MockCommitterInitContext());
@@ -340,7 +341,8 @@ public void testSinkWithDataChangeForAppendOnlyTable(String metastore, boolean e
initialize(metastore);
PaimonSink paimonSink =
new PaimonSink<>(
- catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ catalogOptions,
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions));
PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
Committer committer =
paimonSink.createCommitter(new MockCommitterInitContext());
@@ -406,7 +408,8 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto
initialize(metastore);
PaimonSink paimonSink =
new PaimonSink<>(
- catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ catalogOptions,
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions));
PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
Committer committer =
paimonSink.createCommitter(new MockCommitterInitContext());
@@ -512,7 +515,8 @@ void testSinkWithSchemaChangeForExistedTable(SchemaChange schemaChange) throws E
initialize("filesystem");
PaimonSink paimonSink =
new PaimonSink<>(
- catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ catalogOptions,
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions));
PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
Committer committer =
paimonSink.createCommitter(new MockCommitterInitContext());
@@ -724,7 +728,8 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector
initialize(metastore);
PaimonSink paimonSink =
new PaimonSink<>(
- catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ catalogOptions,
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions));
PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
Committer committer =
paimonSink.createCommitter(new MockCommitterInitContext());
@@ -855,7 +860,8 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele
initialize(metastore);
PaimonSink paimonSink =
new PaimonSink<>(
- catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ catalogOptions,
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions));
PaimonWriter writer = paimonSink.createWriter(new MockInitContext());
Committer committer =
paimonSink.createCommitter(new MockCommitterInitContext());
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
index 41accd73a35..f43562c3ec9 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -35,12 +35,15 @@
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalMap;
@@ -50,6 +53,7 @@
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -475,6 +479,365 @@ void testDeduceSchemaForPaimonTable() throws Catalog.TableNotExistException {
.isTrue();
}
+ // ========== BLOB type support tests ==========
+
+ @Test
+ void testCreateFieldGettersWithBlobWriteContext() throws IOException {
+ RowType rowType =
+ RowType.of(DataTypes.INT(), DataTypes.VARBINARY(100), DataTypes.VARBINARY(50));
+ Object[] testData =
+ new Object[] {1, new byte[] {1, 2, 3, 4, 5}, new byte[] {6, 7, 8, 9, 10}};
+ BinaryRecordData recordData = new BinaryRecordDataGenerator(rowType).generate(testData);
+ Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
+
+ // Create BlobWriteContext with "col2" as blob field
+ BlobWriteContext blobWriteContext = BlobWriteContext.empty();
+ // Note: We can't directly set blob fields, but we can test the empty case
+ List fieldGettersNoBlob =
+ PaimonWriterHelper.createFieldGetters(schema, ZoneId.of("UTC+8"), blobWriteContext);
+
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(TableId.parse("database.table"), recordData);
+ GenericRow genericRow =
+ PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGettersNoBlob);
+
+ // Without BlobWriteContext, VARBINARY should remain as byte[]
+ Assertions.assertThat(genericRow.getField(1)).isInstanceOf(byte[].class);
+ Assertions.assertThat(genericRow.getField(2)).isInstanceOf(byte[].class);
+ }
+
+ @Test
+ void testDeduceSchemaForPaimonTableWithBlobType() throws Catalog.TableNotExistException {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+
+ // Create an append-only table with BLOB column using PaimonMetadataApplier
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+ PaimonMetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("blob_col", DataTypes.VARBINARY(100))
+ .physicalColumn("normal_varbinary", DataTypes.VARBINARY(50))
+ .build();
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(TableId.parse("test.blob_table"), cdcSchema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ Table table = catalog.getTable(Identifier.fromString("test.blob_table"));
+
+ // Verify the table has BLOB type for blob_col
+ org.apache.paimon.types.RowType paimonRowType = table.rowType();
+ Assertions.assertThat(paimonRowType.getField("blob_col").type().getTypeRoot())
+ .isEqualTo(DataTypeRoot.BLOB);
+ Assertions.assertThat(paimonRowType.getField("normal_varbinary").type().getTypeRoot())
+ .isEqualTo(DataTypeRoot.VARBINARY);
+
+ // Verify deduceSchemaForPaimonTable converts BLOB back to VARBINARY
+ // (raw data mode: no blob-descriptor-field configured)
+ Schema deducedSchema = PaimonWriterHelper.deduceSchemaForPaimonTable(table);
+ Assertions.assertThat(deducedSchema.getColumns().get(1).getType().getTypeRoot())
+ .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARBINARY);
+ Assertions.assertThat(deducedSchema.getColumns().get(2).getType().getTypeRoot())
+ .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARBINARY);
+ }
+
+ @Test
+ void testDeduceSchemaForPaimonTableWithBlobDescriptorField()
+ throws Catalog.TableNotExistException {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+
+ // Create table with blob-descriptor-field configuration (descriptor mode)
+ // In descriptor mode, blob column stores URI/path string instead of raw bytes
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "descriptor_blob,raw_blob");
+ tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "descriptor_blob");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ PaimonMetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("descriptor_blob", DataTypes.STRING()) // URI path string
+ .physicalColumn("raw_blob", DataTypes.VARBINARY(100)) // Raw bytes
+ .build();
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(TableId.parse("test.blob_descriptor_schema_table"), cdcSchema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ Table table = catalog.getTable(Identifier.fromString("test.blob_descriptor_schema_table"));
+
+ // Verify both columns become BLOB type in Paimon table
+ org.apache.paimon.types.RowType paimonRowType = table.rowType();
+ Assertions.assertThat(paimonRowType.getField("descriptor_blob").type().getTypeRoot())
+ .isEqualTo(DataTypeRoot.BLOB);
+ Assertions.assertThat(paimonRowType.getField("raw_blob").type().getTypeRoot())
+ .isEqualTo(DataTypeRoot.BLOB);
+
+ // Verify deduceSchemaForPaimonTable correctly converts:
+ // - descriptor_blob (blob-descriptor-field) → STRING
+ // - raw_blob (no blob-descriptor-field) → VARBINARY
+ Schema deducedSchema = PaimonWriterHelper.deduceSchemaForPaimonTable(table);
+ Assertions.assertThat(deducedSchema.getColumns().get(1).getName())
+ .isEqualTo("descriptor_blob");
+ Assertions.assertThat(deducedSchema.getColumns().get(1).getType().getTypeRoot())
+ .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARCHAR);
+ Assertions.assertThat(deducedSchema.getColumns().get(2).getName()).isEqualTo("raw_blob");
+ Assertions.assertThat(deducedSchema.getColumns().get(2).getType().getTypeRoot())
+ .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARBINARY);
+ }
+
+ @Test
+ void testBlobWriteContextCreateBlobFromBytes() {
+ BlobWriteContext context = BlobWriteContext.empty();
+
+ // Test createBlob(byte[]) - 方式A: direct data storage
+ byte[] testData = new byte[] {1, 2, 3, 4, 5};
+ Blob blob = context.createBlob(testData);
+
+ Assertions.assertThat(blob).isNotNull();
+ Assertions.assertThat(blob.toData()).isEqualTo(testData);
+
+ // Test createBlob with null bytes
+ Blob nullBlob = context.createBlob((byte[]) null);
+ Assertions.assertThat(nullBlob).isNull();
+ }
+
+ @Test
+ void testBlobWriteContextFromTableWithBlobDescriptorField()
+ throws Catalog.TableNotExistException, IOException {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+
+ // Create an append-only table with blob-descriptor-field
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col");
+ tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "blob_col"); // descriptor mode
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ PaimonMetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("blob_col", DataTypes.STRING()) // String path
+ .build();
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(TableId.parse("test.blob_descriptor_field_table"), cdcSchema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ org.apache.paimon.table.FileStoreTable table =
+ (org.apache.paimon.table.FileStoreTable)
+ catalog.getTable(Identifier.fromString("test.blob_descriptor_field_table"));
+
+ // Create BlobWriteContext from table (case-sensitive mode)
+ BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(true, table);
+
+ // Verify blob-descriptor-field is parsed correctly
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("blob_col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("BLOB_COL"))
+ .isFalse(); // case-sensitive
+ Assertions.assertThat(blobWriteContext.getBlobDescriptorFields()).contains("blob_col");
+ Assertions.assertThat(blobWriteContext.isBlobField("blob_col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobField("BLOB_COL")).isFalse(); // case-sensitive
+
+ // Test createBlobRef works correctly - stores descriptor info only
+ String externalPath = "oss://bucket/data/video.mp4";
+ Blob blobRef = blobWriteContext.createBlobRef(externalPath);
+ Assertions.assertThat(blobRef).isNotNull();
+ org.apache.paimon.data.BlobDescriptor descriptor = blobRef.toDescriptor();
+ Assertions.assertThat(descriptor.uri()).isEqualTo(externalPath);
+ }
+
+ @Test
+ void testBlobWriteContextMixedDescriptorAndRawBlobFields()
+ throws Catalog.TableNotExistException, IOException {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+
+ // Create table with multiple blob fields: some descriptor, some raw
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "descriptor_blob,raw_blob");
+ tableOptions.put(
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
+ "descriptor_blob"); // Only descriptor_blob preserves path
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ PaimonMetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("descriptor_blob", DataTypes.STRING()) // Path string
+ .physicalColumn("raw_blob", DataTypes.VARBINARY(100)) // Raw bytes
+ .build();
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(TableId.parse("test.mixed_blob_table"), cdcSchema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ org.apache.paimon.table.FileStoreTable table =
+ (org.apache.paimon.table.FileStoreTable)
+ catalog.getTable(Identifier.fromString("test.mixed_blob_table"));
+
+ // Create BlobWriteContext from table (case-sensitive mode)
+ BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(true, table);
+
+ // Verify correct classification
+ Assertions.assertThat(blobWriteContext.isBlobField("descriptor_blob")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobField("raw_blob")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobField("Descriptor_Blob"))
+ .isFalse(); // case-sensitive
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("descriptor_blob")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("raw_blob")).isFalse();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("Descriptor_Blob"))
+ .isFalse(); // case-sensitive
+
+ // Verify createBlobRef for descriptor field
+ String externalPath = "file:///data/image.png";
+ Blob descriptorBlob = blobWriteContext.createBlobRef(externalPath);
+ Assertions.assertThat(descriptorBlob).isNotNull();
+ org.apache.paimon.data.BlobDescriptor descriptor = descriptorBlob.toDescriptor();
+ Assertions.assertThat(descriptor.uri()).isEqualTo(externalPath);
+
+ // Verify createBlob(byte[]) for raw field
+ byte[] rawData = new byte[] {1, 2, 3, 4, 5};
+ Blob rawBlob = blobWriteContext.createBlob(rawData);
+ Assertions.assertThat(rawBlob).isNotNull();
+ Assertions.assertThat(rawBlob.toData()).isEqualTo(rawData);
+ }
+
+ @Test
+ void testBlobWriteContextCaseInsensitive() throws Catalog.TableNotExistException, IOException {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+
+ // Create table with blob fields (use lowercase in config, matching CDC schema column name)
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col");
+ tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "blob_col");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ PaimonMetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("blob_col", DataTypes.STRING())
+ .build();
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(TableId.parse("test.case_insensitive_blob_table"), cdcSchema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ org.apache.paimon.table.FileStoreTable table =
+ (org.apache.paimon.table.FileStoreTable)
+ catalog.getTable(Identifier.fromString("test.case_insensitive_blob_table"));
+
+ // Create BlobWriteContext with case-insensitive mode
+ BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(false, table);
+
+ // Verify field names can match both uppercase and lowercase queries when case-insensitive
+ Assertions.assertThat(blobWriteContext.isBlobField("blob_col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobField("BLOB_COL")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobField("Blob_Col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("blob_col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("BLOB_COL")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("Blob_Col")).isTrue();
+
+ // Verify internal storage is lowercase
+ Assertions.assertThat(blobWriteContext.getBlobFields()).contains("blob_col");
+ Assertions.assertThat(blobWriteContext.getBlobDescriptorFields()).contains("blob_col");
+ }
+
+ @Test
+ void testBlobWriteContextCaseSensitiveMismatch()
+ throws Catalog.TableNotExistException, IOException {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+
+ // Create table with blob fields in lowercase config
+ Map tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col");
+ tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "blob_col");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ PaimonMetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>());
+
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("blob_col", DataTypes.STRING())
+ .build();
+
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(TableId.parse("test.case_sensitive_blob_table"), cdcSchema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ org.apache.paimon.table.FileStoreTable table =
+ (org.apache.paimon.table.FileStoreTable)
+ catalog.getTable(Identifier.fromString("test.case_sensitive_blob_table"));
+
+ // Create BlobWriteContext with case-sensitive mode
+ BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(true, table);
+
+ // Verify only exact case match works
+ Assertions.assertThat(blobWriteContext.isBlobField("blob_col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobField("BLOB_COL")).isFalse();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("blob_col")).isTrue();
+ Assertions.assertThat(blobWriteContext.isBlobDescriptorField("BLOB_COL")).isFalse();
+ }
+
private static Map extractMap(InternalMap internalMap) {
Map resultMap = new HashMap<>();
for (int i = 0; i < internalMap.size(); i++) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java
new file mode 100644
index 00000000000..14005bb9a63
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.lang.reflect.Field;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit tests for {@link PaimonWriter#write}. */
+@ExtendWith(MockitoExtension.class)
+class PaimonWriterTest {
+
+ @Mock private Catalog mockCatalog;
+
+ @Mock private FileStoreTable mockTable;
+
+ @Mock private CoreOptions mockCoreOptions;
+
+ private MockedStatic mockedFactory;
+ private MockedStatic mockedBlobWriteContext;
+ private MockedConstruction mockedConstruction;
+
+ private PaimonWriter writer;
+
+ private final TableId testTableId = TableId.parse("test.table1");
+ private final Identifier testIdentifier = Identifier.fromString("test.table1");
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mockedFactory = Mockito.mockStatic(FlinkCatalogFactory.class);
+ mockedBlobWriteContext = Mockito.mockStatic(BlobWriteContext.class);
+
+ mockedFactory
+ .when(() -> FlinkCatalogFactory.createPaimonCatalog(any(Options.class)))
+ .thenReturn(mockCatalog);
+
+ BlobWriteContext mockBlobContext = BlobWriteContext.empty();
+ mockedBlobWriteContext
+ .when(() -> BlobWriteContext.fromTable(anyBoolean(), any(FileStoreTable.class)))
+ .thenReturn(mockBlobContext);
+
+ when(mockCatalog.getTable(any(Identifier.class))).thenReturn(mockTable);
+ when(mockCoreOptions.writeBufferSize()).thenReturn(32 * 1024 * 1024L);
+ when(mockCoreOptions.pageSize()).thenReturn(32 * 1024);
+ when(mockTable.coreOptions()).thenReturn(mockCoreOptions);
+
+ MetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup();
+ Options catalogOptions = new Options();
+ PaimonRecordEventSerializer serializer =
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions);
+
+ mockedConstruction =
+ Mockito.mockConstruction(
+ StoreSinkWriteImpl.class,
+ (mock, context) ->
+ lenient()
+ .when(mock.prepareCommit(anyBoolean(), anyLong()))
+ .thenReturn(java.util.Collections.emptyList()));
+
+ writer =
+ new PaimonWriter<>(catalogOptions, metricGroup, "test_commit_user", serializer, 0L);
+
+ injectMockCatalog(writer, mockCatalog);
+ }
+
+ private void injectMockCatalog(PaimonWriter writer, Catalog catalog) throws Exception {
+ Field catalogField = PaimonWriter.class.getDeclaredField("catalog");
+ catalogField.setAccessible(true);
+ catalogField.set(writer, catalog);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (writer != null) {
+ writer.close();
+ }
+ if (mockedFactory != null) {
+ mockedFactory.close();
+ }
+ if (mockedBlobWriteContext != null) {
+ mockedBlobWriteContext.close();
+ }
+ if (mockedConstruction != null) {
+ mockedConstruction.close();
+ }
+ }
+
+ @Test
+ void testGetTableInvocationCount() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING().notNull())
+ .physicalColumn("value", DataTypes.INT())
+ .physicalColumn("path", DataTypes.STRING())
+ .build();
+
+ writer.write(new CreateTableEvent(testTableId, schema), null);
+
+ RowType rowType =
+ RowType.of(DataTypes.STRING().notNull(), DataTypes.INT(), DataTypes.STRING());
+ BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
+
+ for (int i = 1; i <= 5; i++) {
+ writer.write(
+ DataChangeEvent.insertEvent(
+ testTableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("id" + i),
+ i,
+ BinaryStringData.fromString("path" + i)
+ })),
+ null);
+ }
+
+ verify(mockCatalog, times(2)).getTable(testIdentifier);
+
+ List addedColumns = new ArrayList<>();
+ addedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("age", DataTypes.INT())));
+ AddColumnEvent addColumnEvent = new AddColumnEvent(testTableId, addedColumns);
+ writer.write(addColumnEvent, null);
+
+ verify(mockCatalog, times(3)).getTable(testIdentifier);
+
+ RowType rowTypeWithAge =
+ RowType.of(
+ DataTypes.STRING().notNull(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.INT());
+ BinaryRecordDataGenerator generatorWithAge = new BinaryRecordDataGenerator(rowTypeWithAge);
+
+ for (int i = 6; i <= 10; i++) {
+ writer.write(
+ DataChangeEvent.insertEvent(
+ testTableId,
+ generatorWithAge.generate(
+ new Object[] {
+ BinaryStringData.fromString("id" + i),
+ i,
+ BinaryStringData.fromString("path" + i),
+ i + 10
+ })),
+ null);
+ }
+
+ verify(mockCatalog, times(3)).getTable(testIdentifier);
+
+ Map typeMapping = new HashMap<>();
+ typeMapping.put("value", DataTypes.BIGINT());
+ AlterColumnTypeEvent alterColumnTypeEvent =
+ new AlterColumnTypeEvent(testTableId, typeMapping);
+ writer.write(alterColumnTypeEvent, null);
+
+ verify(mockCatalog, times(4)).getTable(testIdentifier);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java
index 3621f4d7a8b..aa2c43c6538 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java
@@ -101,8 +101,11 @@ void testBucketAssignOperatorUsesCurrentSubtaskForPostponeBucketTable() throws E
@Test
void testSerializerKeepsAssignedBucketBeforeWriterRewrite() throws Exception {
Schema schema = createPostponeBucketSchema();
+ Options catalogOptions = createCatalogOptions();
+ new PaimonMetadataApplier(catalogOptions)
+ .applySchemaChange(new CreateTableEvent(TABLE_ID, schema));
PaimonRecordSerializer serializer =
- new PaimonRecordEventSerializer(ZoneId.systemDefault());
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions);
serializer.serialize(new CreateTableEvent(TABLE_ID, schema));
int assignedBucket = 2;
BucketWrapperChangeEvent bucketWrapperChangeEvent =
@@ -147,7 +150,7 @@ void testPostponeBucketTableRewritesAssignedBucketToMinusTwoWhenWriting() throws
catalogOptions,
UnregisteredMetricsGroup.createSinkWriterMetricGroup(),
"test-user",
- new PaimonRecordEventSerializer(ZoneId.systemDefault()),
+ new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions),
0);
writer.write(new CreateTableEvent(TABLE_ID, schema), null);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
index bcb55094a42..2d5ffd5d775 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
@@ -61,7 +61,8 @@ public enum EventSetId {
SINGLE_SPLIT_SINGLE_BATCH_TABLE,
SINGLE_SPLIT_MULTI_BATCH_TABLE,
MULTI_SPLITS_SINGLE_BATCH_TABLE,
- TRANSFORM_BATCH_TABLE;
+ TRANSFORM_BATCH_TABLE,
+ APPEND_ONLY_BLOB_TABLE;
public boolean isBatchEvent() {
switch (this) {
@@ -165,11 +166,78 @@ public static void setSourceEvents(EventSetId eventType) {
sourceEvents = transformBatchTable();
break;
}
+ case APPEND_ONLY_BLOB_TABLE:
+ {
+ sourceEvents = appendOnlyBlobTable();
+ break;
+ }
default:
throw new IllegalArgumentException(eventType + " is not supported");
}
}
+ public static List> appendOnlyBlobTable() {
+ List> eventOfSplits = new ArrayList<>();
+ List split1 = new ArrayList<>();
+
+ TableId tableId = TableId.tableId("default_database", "table1");
+
+ // create table without primary key, with VARBINARY blob column
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("uuid", DataTypes.STRING().notNull())
+ .physicalColumn("path", DataTypes.STRING())
+ .physicalColumn("blobContent", DataTypes.VARBINARY(1000))
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);
+ split1.add(createTableEvent);
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(
+ RowType.of(
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING(),
+ DataTypes.VARBINARY(1000)));
+
+ // insert 3 events with uuid, path and blobContent
+ split1.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString(
+ "550e8400-e29b-41d4-a716-446655440000"),
+ BinaryStringData.fromString(
+ "oss://my-bucket/data/files/document_001.pdf"),
+ "PDF binary content for document 001".getBytes()
+ })));
+ split1.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString(
+ "6ba7b810-9dad-11d1-80b4-00c04fd430c8"),
+ BinaryStringData.fromString(
+ "oss://my-bucket/data/images/photo_002.jpg"),
+ "JPG binary content for photo 002".getBytes()
+ })));
+ split1.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString(
+ "f47ac10b-58cc-4372-a567-0e02b2c3d479"),
+ BinaryStringData.fromString(
+ "oss://my-bucket/data/videos/movie_003.mp4"),
+ "MP4 binary content for movie 003".getBytes()
+ })));
+
+ eventOfSplits.add(split1);
+ return eventOfSplits;
+ }
+
public static List> singleSplitSingleTable() {
List> eventOfSplits = new ArrayList<>();
List split1 = new ArrayList<>();