From 0509e3d9278d5c76acdfbc1676f22052d13d8240 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Sat, 9 May 2026 14:29:09 +0800 Subject: [PATCH 1/2] [FLINK-39567][paimon] Add blob support. --- .../pom.xml | 37 ++ .../paimon/sink/PaimonMetadataApplier.java | 75 ++- .../paimon/sink/SchemaChangeProvider.java | 147 +++++- .../sink/v2/PaimonRecordEventSerializer.java | 31 +- .../paimon/sink/v2/PaimonWriter.java | 13 + .../paimon/sink/v2/PaimonWriterHelper.java | 117 ++++- .../paimon/sink/v2/TableSchemaInfo.java | 46 +- .../paimon/sink/v2/blob/BlobWriteContext.java | 145 ++++++ .../sink/PaimonMetadataApplierTest.java | 468 ++++++++++++++++++ .../AppendOnlyTableDataSourceFactory.java | 72 +++ .../AppendOnlyTableSourceFunction.java | 125 +++++ .../paimon/sink/v2/AppendOnlyTableITCase.java | 213 ++++++++ .../sink/v2/PaimonWriterHelperTest.java | 363 ++++++++++++++ .../paimon/sink/v2/PaimonWriterTest.java | 220 ++++++++ ....apache.flink.cdc.common.factories.Factory | 15 + 15 files changed, 2050 insertions(+), 37 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index 5eb925da130..60942600736 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -159,6 +159,27 @@ limitations under the License. + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + com.google.protobuf + protobuf-java + + + + @@ -198,6 +219,10 @@ limitations under the License. com.google.protobuf protobuf-java + + org.apache.hbase + hbase-client + @@ -253,6 +278,18 @@ limitations under the License. + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.testcontainers + testcontainers + + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 99ae583f036..6242489ae2b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -30,19 +30,24 @@ import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils; import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.types.DataTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +57,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.flink.cdc.common.types.DataTypeFamily.BINARY_STRING; +import static org.apache.flink.cdc.common.types.DataTypeFamily.CHARACTER_STRING; import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument; import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; @@ -149,11 +156,11 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti new org.apache.paimon.schema.Schema.Builder(); schema.getColumns() .forEach( - (column) -> - builder.column( - column.getName(), - TypeUtils.toPaimonDataType(column.getType()), - column.getComment())); + (column) -> { + org.apache.paimon.types.DataType dataType = + convertToBlobIfNeeded(column, tableOptions); + builder.column(column.getName(), dataType, column.getComment()); + }); List partitionKeys = new ArrayList<>(); List primaryKeys = schema.primaryKeys(); if (partitionMaps.containsKey(event.tableId())) { @@ -205,10 +212,12 @@ private List applyAddColumnEventWithPosition(AddColumnEvent event) SchemaChangeProvider.add( columnWithPosition, SchemaChange.Move.first( - columnWithPosition.getAddColumn().getName()))); + columnWithPosition.getAddColumn().getName()), + tableOptions)); break; case LAST: - tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition)); + tableChangeList.addAll( + SchemaChangeProvider.add(columnWithPosition, tableOptions)); break; case BEFORE: tableChangeList.addAll( @@ -225,7 +234,8 @@ private List applyAddColumnEventWithPosition(AddColumnEvent event) SchemaChange.Move.after( columnWithPosition.getAddColumn().getName(), columnWithPosition.getExistedColumnName()); - tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after)); + tableChangeList.addAll( + SchemaChangeProvider.add(columnWithPosition, after, tableOptions)); break; default: throw new SchemaEvolveException( @@ -253,7 +263,8 @@ private List applyAddColumnWithBeforePosition( columnWithPosition, (index == 0) ? SchemaChange.Move.first(columnName) - : SchemaChange.Move.after(columnName, columnNames.get(index - 1))); + : SchemaChange.Move.after(columnName, columnNames.get(index - 1)), + tableOptions); } private int checkColumnPosition(String existedColumnName, List columnNames) { @@ -303,13 +314,22 @@ private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveExcep private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException { try { + FileStoreTable table = + (FileStoreTable) + catalog.getTable( + new Identifier( + event.tableId().getSchemaName(), + event.tableId().getTableName())); List tableChangeList = new ArrayList<>(); event.getTypeMapping() .forEach( - (oldName, newType) -> - tableChangeList.add( - SchemaChangeProvider.updateColumnType( - oldName, newType))); + (columnName, newType) -> { + // Modifying the primary key data type may lead to exceptions in + // read/write/merge operations. + SchemaChangeProvider.updateColumnType( + table.schema(), columnName, newType, tableOptions) + .ifPresent(tableChangeList::add); + }); event.getComments() .forEach( (name, comment) -> { @@ -359,4 +379,33 @@ private void applyAlterTableComment(AlterTableCommentEvent event) throws SchemaE private static Identifier tableIdToIdentifier(SchemaChangeEvent event) { return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()); } + + /** + * Convert CDC VARBINARY/BINARY/CHAR/VARCHAR/STRING type to Paimon BLOB type if configured. + * + * @param column The CDC column definition. + * @param tableOptions The table options containing blob-field configuration. + * @return The Paimon DataType (BLOB if configured, otherwise original converted type). + */ + private org.apache.paimon.types.DataType convertToBlobIfNeeded( + Column column, Map tableOptions) { + org.apache.paimon.types.DataType dataType = TypeUtils.toPaimonDataType(column.getType()); + + // Check if this field should be converted to BLOB type using Paimon's CoreOptions + List blobFields = CoreOptions.blobField(tableOptions); + if (!blobFields.isEmpty() && isSupportedTypeForBlob(column.getType())) { + if (blobFields.contains(column.getName())) { + // Convert VARBINARY/BINARY/VARCHAR/STRING to BLOB type + // BLOB type is always nullable in Paimon + return DataTypes.BLOB(); + } + } + + return dataType; + } + + /** Check if DataType can be converted to BLOB (BINARY, VARBINARY, CHAR or VARCHAR). */ + private boolean isSupportedTypeForBlob(DataType dataType) { + return dataType.isAnyOf(BINARY_STRING, CHARACTER_STRING); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java index 5e8beacc950..2a280005d97 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -20,14 +20,19 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils; +import org.apache.paimon.CoreOptions; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataTypes; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,15 +54,28 @@ public class SchemaChangeProvider { * @return A SchemaChange object representing the addition of a column. */ public static List add(AddColumnEvent.ColumnWithPosition columnWithPosition) { + return add(columnWithPosition, Collections.emptyMap()); + } + + /** + * Creates a SchemaChange object for adding a column without specifying its position. Supports + * converting VARBINARY/BINARY/VARCHAR/STRING to BLOB type based on tableOptions configuration. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details. + * @param tableOptions The table options containing blob-field configuration. + * @return A SchemaChange object representing the addition of a column. + */ + public static List add( + AddColumnEvent.ColumnWithPosition columnWithPosition, + Map tableOptions) { List result = new ArrayList<>(); - result.add( - SchemaChange.addColumn( - columnWithPosition.getAddColumn().getName(), - TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()), - columnWithPosition.getAddColumn().getComment())); - // if default value express exists, we need to set the default value to the table - // option Column column = columnWithPosition.getAddColumn(); + + org.apache.paimon.types.DataType dataType = convertToBlobIfNeeded(column, tableOptions); + + result.add(SchemaChange.addColumn(column.getName(), dataType, column.getComment())); + + // if default value express exists, we need to set the default value to the table option Optional.ofNullable( convertInvalidTimestampDefaultValue( column.getDefaultValueExpression(), column.getType())) @@ -81,16 +99,36 @@ public static List add(AddColumnEvent.ColumnWithPosition columnWit */ public static List add( AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) { + return add(columnWithPosition, move, Collections.emptyMap()); + } + + /** + * Creates a SchemaChange object for adding a column with a specified position. Supports + * converting VARBINARY/BINARY/VARCHAR/STRING to BLOB type based on tableOptions configuration. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details. + * @param move The move operation to indicate the column's new position. + * @param tableOptions The table options containing blob-field configuration. + * @return A SchemaChange object representing the addition of a column with position + * information. + */ + public static List add( + AddColumnEvent.ColumnWithPosition columnWithPosition, + SchemaChange.Move move, + Map tableOptions) { List result = new ArrayList<>(); + Column column = columnWithPosition.getAddColumn(); + + org.apache.paimon.types.DataType dataType = convertToBlobIfNeeded(column, tableOptions); + result.add( SchemaChange.addColumn( columnWithPosition.getAddColumn().getName(), - TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()), + dataType, columnWithPosition.getAddColumn().getComment(), move)); // if default value express exists, we need to set the default value to the table // option - Column column = columnWithPosition.getAddColumn(); Optional.ofNullable( convertInvalidTimestampDefaultValue( column.getDefaultValueExpression(), column.getType())) @@ -103,6 +141,37 @@ public static List add( return result; } + /** + * Convert CDC VARBINARY/BINARY/VARCHAR/STRING type to Paimon BLOB type if configured. + * + * @param column The CDC column definition. + * @param tableOptions The table options containing blob-field configuration. + * @return The Paimon DataType (BLOB if configured, otherwise original converted type). + */ + public static org.apache.paimon.types.DataType convertToBlobIfNeeded( + Column column, Map tableOptions) { + // Check if this field should be converted to BLOB type using Paimon's CoreOptions + List blobFields = CoreOptions.blobField(tableOptions); + if (!blobFields.isEmpty() && isSupportedTypeForBlob(column.getType())) { + if (blobFields.contains(column.getName())) { + // Convert VARBINARY/BINARY/VARCHAR/STRING to BLOB type + // BLOB type is always nullable in Paimon + return DataTypes.BLOB(); + } + } + + // Use TypeUtils.toPaimonDataType which handles VARIANT type properly + return TypeUtils.toPaimonDataType(column.getType()); + } + + /** Check if DataType can be converted to BLOB (BINARY, VARBINARY, or VARCHAR). */ + private static boolean isSupportedTypeForBlob(DataType dataType) { + DataTypeRoot typeRoot = dataType.getTypeRoot(); + return typeRoot == DataTypeRoot.BINARY + || typeRoot == DataTypeRoot.VARBINARY + || typeRoot == DataTypeRoot.VARCHAR; + } + /** * Creates a SchemaChange object to update the data type of a column. * @@ -110,8 +179,64 @@ public static List add( * @param newType The new DataType for the column. * @return A SchemaChange object representing the update of the column's data type. */ - public static SchemaChange updateColumnType(String oldColumnName, DataType newType) { - return SchemaChange.updateColumnType(oldColumnName, TypeUtils.toPaimonDataType(newType)); + public static Optional updateColumnType( + TableSchema tableSchema, String oldColumnName, DataType newType) { + return updateColumnType(tableSchema, oldColumnName, newType, Collections.emptyMap()); + } + + /** + * Creates a SchemaChange object to update the data type of a column. Supports converting + * VARBINARY/BINARY/VARCHAR/STRING to BLOB type based on tableOptions configuration. + * + * @param tableSchema The TableSchema object containing the current schema of the table. + * @param oldColumnName The name of the column whose data type is to be updated. + * @param newType The new DataType for the column. + * @param tableOptions The table options containing blob-field configuration. + * @return An Optional of the SchemaChange that represents the update of the column's data type. + */ + public static Optional updateColumnType( + TableSchema tableSchema, + String oldColumnName, + DataType newType, + Map tableOptions) { + org.apache.paimon.types.DataType oldDataType = + tableSchema.logicalRowType().getField(oldColumnName).type(); + + org.apache.paimon.types.DataType newDataType; + // Handle BLOB type conversion using Paimon's CoreOptions + if (isSupportedTypeForBlob(newType)) { + List blobFields = CoreOptions.blobField(tableOptions); + if (blobFields.contains(oldColumnName)) { + // BLOB type is always nullable in Paimon + newDataType = DataTypes.BLOB(); + } else { + // Use TypeUtils.toPaimonDataType which handles VARIANT type properly + newDataType = TypeUtils.toPaimonDataType(newType); + } + } else { + // Use TypeUtils.toPaimonDataType which handles VARIANT type properly + newDataType = TypeUtils.toPaimonDataType(newType); + } + + if (oldDataType.equalsIgnoreNullable(newDataType)) { + // Updating a column's nullability from notnull to nullable is not allowed. + return !oldDataType.isNullable() && newDataType.isNullable() + ? Optional.of(SchemaChange.updateColumnNullability(oldColumnName, true)) + : Optional.empty(); + } else { + if (tableSchema.primaryKeys().contains(oldColumnName) + || tableSchema.partitionKeys().contains(oldColumnName)) { + throw new UnsupportedOperationException( + String.format( + "Altering column type of a primary key/partition key column for %s from %s to %s is not supported.\n" + + "If you need to change the primary key/partition key type, it is recommended to recreate the table and reimport the data.", + oldColumnName, oldDataType, newType)); + } + return Optional.of( + SchemaChange.updateColumnType( + oldColumnName, + newDataType.copy(oldDataType.isNullable() || newType.isNullable()))); + } } /** diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java index 52c9018ffa0..20b248341ad 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent; import org.apache.paimon.catalog.Identifier; @@ -52,6 +53,23 @@ public PaimonRecordEventSerializer(ZoneId zoneId) { this.zoneId = zoneId; } + /** + * Update the BlobWriteContext for a specific table. + * + *

This is called by PaimonWriter when it has access to the Paimon table configuration. The + * field getters will be recreated with the new BlobWriteContext to properly convert VARBINARY + * fields to BLOB type. + * + * @param tableId The table identifier. + * @param blobWriteContext The BlobWriteContext from the Paimon table. + */ + public void updateBlobWriteContext(TableId tableId, BlobWriteContext blobWriteContext) { + TableSchemaInfo schemaInfo = schemaMaps.get(tableId); + if (schemaInfo != null) { + schemaInfo.updateBlobWriteContext(blobWriteContext, zoneId); + } + } + @Override public PaimonEvent serialize(Event event) { int bucket = 0; @@ -70,11 +88,14 @@ public PaimonEvent serialize(Event event) { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; Schema schema = schemaMaps.get(schemaChangeEvent.tableId()).getSchema(); if (!SchemaUtils.isSchemaChangeEventRedundant(schema, schemaChangeEvent)) { + Schema newSchema = + SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent); + // Preserve BlobWriteContext if it exists + BlobWriteContext existingContext = + schemaMaps.get(schemaChangeEvent.tableId()).getBlobWriteContext(); schemaMaps.put( schemaChangeEvent.tableId(), - new TableSchemaInfo( - SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent), - zoneId)); + new TableSchemaInfo(newSchema, zoneId, existingContext)); } } return new PaimonEvent(tableId, null, true); @@ -92,4 +113,8 @@ public PaimonEvent serialize(Event event) { "failed to convert Input into PaimonEvent, unsupported event: " + event); } } + + public Map getSchemaMaps() { + return schemaMaps; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index 2e1f9ea1516..5f18eadc95e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -20,7 +20,9 @@ import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.StatefulSinkWriter; +import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -152,10 +154,21 @@ public void write(InputT event, Context context) throws IOException { } catch (Exception e) { throw new RuntimeException(e); } + try { + FileStoreTable table = getTable(tableId); + BlobWriteContext blobWriteContext = + BlobWriteContext.fromTable(catalog.caseSensitive(), table); + ((PaimonRecordEventSerializer) serializer) + .updateBlobWriteContext(((ChangeEvent) event).tableId(), blobWriteContext); + } catch (Exception e) { + // Table might not exist yet, ignore and continue + LOG.debug("Could not get table for BlobWriteContext: {}", e.getMessage()); + } } if (paimonEvent.getGenericRows() != null) { FileStoreTable table; table = getTable(tableId); + if (memoryPoolFactory == null) { memoryPoolFactory = new MemoryPoolFactory( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java index abed564fed3..5e393fa6dbe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java @@ -31,12 +31,15 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeChecks; import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.variant.BinaryVariant; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils; +import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator; import org.apache.flink.core.memory.MemorySegment; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -50,10 +53,13 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import javax.annotation.Nullable; + import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount; @@ -66,10 +72,26 @@ public class PaimonWriterHelper { /** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */ public static List createFieldGetters(Schema schema, ZoneId zoneId) { + return createFieldGetters(schema, zoneId, null); + } + + /** + * create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter} with BLOB support. + * + * @param schema The CDC schema containing column definitions. + * @param zoneId The timezone for timestamp conversion. + * @param blobWriteContext Optional context for BLOB field handling. + * @return List of FieldGetter for converting CDC data to Paimon data. + */ + public static List createFieldGetters( + Schema schema, ZoneId zoneId, @Nullable BlobWriteContext blobWriteContext) { List columns = schema.getColumns(); List fieldGetters = new ArrayList<>(columns.size()); for (int i = 0; i < columns.size(); i++) { - fieldGetters.add(createFieldGetter(columns.get(i).getType(), i, zoneId)); + Column column = columns.get(i); + fieldGetters.add( + createFieldGetter( + column.getType(), i, zoneId, blobWriteContext, column.getName())); } return fieldGetters; } @@ -101,19 +123,64 @@ public static Boolean sameColumnsIgnoreCommentAndDefaultValue( private static RecordData.FieldGetter createFieldGetter( DataType fieldType, int fieldPos, ZoneId zoneId) { + return createFieldGetter(fieldType, fieldPos, zoneId, null, null); + } + + private static RecordData.FieldGetter createFieldGetter( + DataType fieldType, + int fieldPos, + ZoneId zoneId, + @Nullable BlobWriteContext blobWriteContext, + @Nullable String fieldName) { final RecordData.FieldGetter fieldGetter; // ordered by type root definition switch (fieldType.getTypeRoot()) { case CHAR: case VARCHAR: - fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); + // Check if this field should be converted to BLOB type + if (blobWriteContext != null + && fieldName != null + && blobWriteContext.isBlobField(fieldName)) { + if (blobWriteContext.isBlobDescriptorField(fieldName)) { + // blob-descriptor-field mode: Create BlobRef that stores descriptor info. + // Only descriptor (uri, offset, length) is stored inline, external data is + // NOT read or copied. Actual data reading happens on Paimon read side. + fieldGetter = + row -> { + String path = row.getString(fieldPos).toString(); + return blobWriteContext.createBlobRef(path); + }; + } else { + throw new IllegalArgumentException( + String.format( + "VARCHAR/CHAR blob field '%s' must be configured in 'blob-descriptor-field'. " + + "For VARCHAR/CHAR fields configured in 'blob-field', you must also configure " + + "'blob-descriptor-field' to specify which fields store serialized BlobDescriptor bytes inline. " + + "If you want to write raw blob data from string path, please use VARBINARY/BINARY type instead.", + fieldName)); + } + } else { + fieldGetter = + row -> BinaryString.fromString(row.getString(fieldPos).toString()); + } break; case BOOLEAN: fieldGetter = row -> row.getBoolean(fieldPos); break; case BINARY: case VARBINARY: - fieldGetter = row -> row.getBinary(fieldPos); + // Check if this field should be converted to BLOB type + if (blobWriteContext != null + && fieldName != null + && blobWriteContext.isBlobField(fieldName)) { + fieldGetter = + row -> { + byte[] bytes = row.getBinary(fieldPos); + return blobWriteContext.createBlob(bytes); + }; + } else { + fieldGetter = row -> row.getBinary(fieldPos); + } break; case DECIMAL: final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); @@ -296,11 +363,45 @@ public static Schema deduceSchemaForPaimonTable(Table table) { builder.setColumns( rowType.getFields().stream() .map( - column -> - Column.physicalColumn( - column.name(), - TypeUtils.toCDCDataType(column.type()), - column.description())) + column -> { + org.apache.flink.cdc.common.types.DataType cdcType; + // Handle BLOB type: Paimon BLOB → CDC VARBINARY or STRING + // + // Paimon supports two blob storage modes: + // 1. Raw data mode: VARBINARY/BINARY → BlobData → written to + // .blob files + // 2. Descriptor mode: STRING → BlobRef → only descriptor (uri, + // offset, length) + // stored inline, actual data remains in external storage + // + // For descriptor mode fields (configured via + // blob-descriptor-field option), + // the upstream provides URI/path strings, so CDC type should be + // STRING. + // For raw data mode fields, the upstream provides raw bytes, so + // CDC type + // should be VARBINARY. + if (column.type().getTypeRoot() + == org.apache.paimon.types.DataTypeRoot.BLOB) { + CoreOptions coreOptions = + CoreOptions.fromMap(table.options()); + Set blobDescriptorFields = + coreOptions.blobDescriptorField(); + if (blobDescriptorFields.contains(column.name())) { + // Descriptor mode: upstream provides URI string + cdcType = DataTypes.STRING(); + } else { + // Raw data mode: upstream provides raw bytes + cdcType = DataTypes.VARBINARY(Integer.MAX_VALUE); + } + } else { + // Use TypeUtils.toCDCDataType which handles VARIANT type + // properly + cdcType = TypeUtils.toCDCDataType(column.type()); + } + return Column.physicalColumn( + column.name(), cdcType, column.description()); + }) .collect(Collectors.toList())); builder.primaryKey(table.primaryKeys()); table.comment().ifPresent(builder::comment); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java index ed46baa7a60..a422bc5443a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java @@ -19,6 +19,9 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; + +import javax.annotation.Nullable; import java.time.ZoneId; import java.util.List; @@ -28,16 +31,50 @@ public class TableSchemaInfo { private final Schema schema; - private final List fieldGetters; + private List fieldGetters; private final boolean hasPrimaryKey; + @Nullable private BlobWriteContext blobWriteContext; + public TableSchemaInfo(Schema schema, ZoneId zoneId) { + this(schema, zoneId, null); + } + + public TableSchemaInfo( + Schema schema, ZoneId zoneId, @Nullable BlobWriteContext blobWriteContext) { this.schema = schema; - this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId); + this.blobWriteContext = blobWriteContext; + this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId, blobWriteContext); this.hasPrimaryKey = !schema.primaryKeys().isEmpty(); } + /** + * Update the BlobWriteContext and recreate field getters. + * + *

This is called when the table is accessed in PaimonWriter and we have the actual table + * configuration. + */ + public void updateBlobWriteContext(BlobWriteContext blobWriteContext, ZoneId zoneId) { + if (this.blobWriteContext == null && blobWriteContext == null) { + return; + } + if (this.blobWriteContext != null && blobWriteContext != null) { + // Compare blob fields and descriptor fields + boolean sameBlobFields = + this.blobWriteContext.getBlobFields().equals(blobWriteContext.getBlobFields()); + boolean sameDescriptorFields = + this.blobWriteContext + .getBlobDescriptorFields() + .equals(blobWriteContext.getBlobDescriptorFields()); + if (sameBlobFields && sameDescriptorFields) { + return; + } + } + this.blobWriteContext = blobWriteContext; + this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId, blobWriteContext); + } + public Schema getSchema() { return schema; } @@ -49,4 +86,9 @@ public List getFieldGetters() { public boolean hasPrimaryKey() { return hasPrimaryKey; } + + @Nullable + public BlobWriteContext getBlobWriteContext() { + return blobWriteContext; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java new file mode 100644 index 00000000000..4fbb99bc1ee --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/blob/BlobWriteContext.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2.blob; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobRef; +import org.apache.paimon.table.FileStoreTable; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed; + +/** + * Context for handling BLOB fields during CDC write operations. + * + *

CDC connector only handles WRITE operations. We do NOT read external blob data. The actual + * blob data reading happens on Paimon read side. + * + *

Two blob storage modes for CDC write: + * + *

    + *
  • Mode 1 (raw data): VARBINARY/BINARY fields → BlobData → written to .blob files by Paimon. + *
  • Mode 2 (descriptor): VARCHAR/STRING fields → BlobRef → only descriptor (uri, offset, + * length) stored inline. External data is NOT read or copied during write. + *
+ */ +public class BlobWriteContext implements Serializable { + + private static final long serialVersionUID = 1L; + + private final boolean isCaseSensitive; + + /** Fields that should be converted to BLOB type. */ + private final Set blobFields; + + /** Fields configured with blob-descriptor-field (Mode 2). */ + private final Set blobDescriptorFields; + + private BlobWriteContext( + boolean isCaseSensitive, Set blobFields, Set blobDescriptorFields) { + this.isCaseSensitive = isCaseSensitive; + // If case-insensitive, normalize field names to lowercase for consistent matching + this.blobFields = downcaseFieldNames(blobFields, isCaseSensitive); + this.blobDescriptorFields = downcaseFieldNames(blobDescriptorFields, isCaseSensitive); + } + + /** Downcase field names if case-insensitive. */ + private static Set downcaseFieldNames(Set fieldNames, boolean caseSensitive) { + if (caseSensitive) { + return fieldNames; + } + Set normalized = new HashSet<>(); + for (String fieldName : fieldNames) { + normalized.add(fieldName.toLowerCase()); + } + return normalized; + } + + /** Create BlobWriteContext from Paimon table's CoreOptions. */ + public static BlobWriteContext fromTable(boolean isCaseSensitive, FileStoreTable table) { + List blobFieldList = CoreOptions.blobField(table.options()); + if (blobFieldList.isEmpty()) { + return empty(); + } + CoreOptions coreOptions = CoreOptions.fromMap(table.options()); + Set blobDescriptorFields = coreOptions.blobDescriptorField(); + return new BlobWriteContext( + isCaseSensitive, new HashSet<>(blobFieldList), blobDescriptorFields); + } + + /** Create an empty BlobWriteContext with no blob fields. */ + public static BlobWriteContext empty() { + return new BlobWriteContext(true, Collections.emptySet(), Collections.emptySet()); + } + + /** Check if a field should be converted to BLOB type. */ + public boolean isBlobField(String fieldName) { + return blobFields.contains(toLowerCaseIfNeed(fieldName, isCaseSensitive)); + } + + /** Get the set of blob field names. */ + public Set getBlobFields() { + return blobFields; + } + + /** Check if a field is configured with blob-descriptor-field. */ + public boolean isBlobDescriptorField(String fieldName) { + return blobDescriptorFields.contains(toLowerCaseIfNeed(fieldName, isCaseSensitive)); + } + + /** Get the set of blob descriptor field names. */ + public Set getBlobDescriptorFields() { + return blobDescriptorFields; + } + + /** Create BlobData from raw bytes (Mode 1). */ + @Nullable + public Blob createBlob(@Nullable byte[] bytes) { + if (bytes == null) { + return null; + } + return new BlobData(bytes); + } + + /** Create BlobRef from a path string (Mode 2). */ + @Nullable + public Blob createBlobRef(@Nullable String path) { + return createBlobRef(path, -1); + } + + /** Create BlobRef from a path string with specified length (Mode 2). */ + @Nullable + public Blob createBlobRef(@Nullable String path, long length) { + if (path == null) { + return null; + } + BlobDescriptor descriptor = new BlobDescriptor(path, 0, length); + // UriReader is null - CDC connector only writes descriptor info, not reading data + return new BlobRef(null, descriptor); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 9d7e1829a08..81b7357a0de 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.common.types.DataType; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; @@ -52,6 +53,8 @@ import java.util.Objects; import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; + /** Tests for {@link PaimonMetadataApplier}. */ class PaimonMetadataApplierTest { @@ -723,4 +726,469 @@ public void testMysqlDefaultTimestampValueWithMicrosInAddColumn() Assertions.assertThat(table).isNotNull(); } + + // ========== BLOB type support tests ========== + // NOTE: Paimon BLOB type has the following constraints that prevent integration tests: + // 1. BLOB type requires data-evolution.enabled = true + // 2. BLOB type requires row-tracking.enabled = true + // 3. row-tracking.enabled cannot be combined with primary keys + // This means BLOB columns can only exist in append-only tables (no primary keys). + // Since CDC pipeline typically works with primary key tables, actual BLOB type creation + // tests are disabled. The type conversion logic (VARBINARY/BINARY to BLOB) is correctly + // implemented in PaimonMetadataApplier.convertBinaryToBlobIfNeeded() and + // SchemaChangeProvider.convertBinaryToBlobIfNeeded() methods. + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateTableWithoutBlobFieldConfig(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.no_blob_config_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "varbinary_content", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)) + .physicalColumn( + "binary_content", + org.apache.flink.cdc.common.types.DataTypes.BINARY(10)) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.no_blob_config_table")); + RowType rowType = table.rowType(); + + assertThat(rowType.getField("varbinary_content").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY); + assertThat(rowType.getField("binary_content").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BINARY); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testAddNormalVarbinaryColumnWithoutBlobConfig(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.normal_varbinary_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "varbinary_col", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)))); + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse("test.normal_varbinary_table"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + Table table = catalog.getTable(Identifier.fromString("test.normal_varbinary_table")); + RowType rowType = table.rowType(); + + assertThat(rowType.getField("varbinary_col").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateTableWithEmptyBlobFieldConfig(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), ""); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.empty_blob_config_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "content", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.empty_blob_config_table")); + RowType rowType = table.rowType(); + + assertThat(rowType.getField("content").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateTableWithNonExistentBlobFieldConfig(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "nonexistent_blob_col"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.nonexistent_blob_config_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "actual_varbinary", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.nonexistent_blob_config_table")); + RowType rowType = table.rowType(); + + assertThat(rowType.getField("actual_varbinary").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY); + assertThat(rowType.getField("id").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER); + } + + // ========== BLOB type integration tests with append-only tables (no primary keys) ========== + // These tests verify actual BLOB type creation in Paimon tables. + // BLOB type requires: + // 1. row-tracking.enabled = true + // 2. data-evolution.enabled = true + // These options cannot be combined with primary keys. + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateAppendOnlyTableWithBlobField(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + // Configure blob-field to convert VARBINARY to BLOB + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "content"); + // Required for BLOB type + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table (no primary keys) + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.blob_append_only_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "content", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)) + // No primaryKey() - append-only table + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.blob_append_only_table")); + RowType rowType = table.rowType(); + + // Verify content field is converted to BLOB type + assertThat(rowType.getField("content").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + assertThat(rowType.getField("id").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateAppendOnlyTableWithMultipleBlobFields(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + // Configure multiple blob fields + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_content,image_data"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table with multiple VARBINARY columns + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.multi_blob_append_only_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "blob_content", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)) + .physicalColumn( + "image_data", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(1000)) + .physicalColumn( + "normal_varbinary", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(50)) + // No primaryKey() - append-only table + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.multi_blob_append_only_table")); + RowType rowType = table.rowType(); + + // Verify blob_content and image_data are converted to BLOB + assertThat(rowType.getField("blob_content").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + assertThat(rowType.getField("image_data").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + // normal_varbinary is not in blob-field config, should remain VARBINARY + assertThat(rowType.getField("normal_varbinary").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.VARBINARY); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testAddBlobColumnToAppendOnlyTable(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "new_blob_col"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table first + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.add_blob_column_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + // Add VARBINARY column that should be converted to BLOB + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "new_blob_col", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(100)))); + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse("test.add_blob_column_table"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + Table table = catalog.getTable(Identifier.fromString("test.add_blob_column_table")); + RowType rowType = table.rowType(); + + // Verify the added column is BLOB type + assertThat(rowType.getField("new_blob_col").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateAppendOnlyTableWithBinaryToBlob(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + // Configure blob-field for BINARY type conversion + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "fixed_content"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table with BINARY column + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.binary_to_blob_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "fixed_content", + org.apache.flink.cdc.common.types.DataTypes.BINARY(10)) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.binary_to_blob_table")); + RowType rowType = table.rowType(); + + // Verify BINARY is also converted to BLOB type + assertThat(rowType.getField("fixed_content").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateAppendOnlyTableWithVarcharToBlob(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + // Configure blob-field for VARCHAR type conversion (path string case) + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_path"); + tableOptions.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table with VARCHAR column (representing blob path) + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.varchar_to_blob_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "blob_path", + org.apache.flink.cdc.common.types.DataTypes.VARCHAR(500)) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.varchar_to_blob_table")); + RowType rowType = table.rowType(); + + // Verify VARCHAR is converted to BLOB type + assertThat(rowType.getField("blob_path").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + assertThat(rowType.getField("id").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateAppendOnlyTableWithStringToBlob(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + // Configure blob-field for STRING type conversion (path string case) + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_uri"); + tableOptions.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table with STRING column (representing blob URI) + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.string_to_blob_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "blob_uri", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.string_to_blob_table")); + RowType rowType = table.rowType(); + + // Verify STRING is converted to BLOB type + assertThat(rowType.getField("blob_uri").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + assertThat(rowType.getField("id").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER); + } + + @ParameterizedTest + @ValueSource(strings = {"filesystem"}) + void testCreateAppendOnlyTableWithCharToBlob(String metastore) + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { + initialize(metastore); + Map tableOptions = new HashMap<>(); + // Configure blob-field for CHAR type conversion (path string case) + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_path"); + tableOptions.put(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + // Create append-only table with CHAR column (representing blob path) + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.char_to_blob_table"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "blob_path", + org.apache.flink.cdc.common.types.DataTypes.CHAR(500)) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + Table table = catalog.getTable(Identifier.fromString("test.char_to_blob_table")); + RowType rowType = table.rowType(); + + // Verify CHAR is converted to BLOB type + assertThat(rowType.getField("blob_path").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.BLOB); + assertThat(rowType.getField("id").type().getTypeRoot()) + .isEqualTo(org.apache.paimon.types.DataTypeRoot.INTEGER); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java new file mode 100644 index 00000000000..ac1ce2afffc --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.testsource; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.factories.DataSourceFactory; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.source.DataSource; +import org.apache.flink.cdc.common.source.EventSourceProvider; +import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; +import org.apache.flink.cdc.common.source.MetadataAccessor; + +import java.util.HashSet; +import java.util.Set; + +/** A source {@link Factory} to create {@link AppendOnlyTableDataSource}. */ +@Internal +public class AppendOnlyTableDataSourceFactory implements DataSourceFactory { + + public static final String IDENTIFIER = "append-only-table-source"; + + @Override + public DataSource createDataSource(Context context) { + return new AppendOnlyTableDataSource(); + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set> optionalOptions() { + return new HashSet<>(); + } + + /** A {@link DataSource} that emits append-only table data records. */ + @Internal + private static class AppendOnlyTableDataSource implements DataSource { + + @Override + public EventSourceProvider getEventSourceProvider() { + return FlinkSourceFunctionProvider.of(new AppendOnlyTableSourceFunction()); + } + + @Override + public MetadataAccessor getMetadataAccessor() { + return null; + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java new file mode 100644 index 00000000000..d28d5d24d80 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.testsource; + +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** Source function for testing append-only tables. */ +public class AppendOnlyTableSourceFunction extends RichParallelSourceFunction { + + private List testEventList; + + public static final TableId TEST_TABLE_ID = TableId.tableId("default_database", "table1"); + + private static final List SIMPLE_COLUMNS = + Arrays.asList( + Column.physicalColumn("uuid", DataTypes.STRING().notNull()), + Column.physicalColumn("path", DataTypes.STRING()), + Column.physicalColumn("blobContent", DataTypes.VARBINARY(1000))); + + // Fixed UUIDs for testing + private static final String UUID_1 = "550e8400-e29b-41d4-a716-446655440000"; + private static final String UUID_2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8"; + private static final String UUID_3 = "f47ac10b-58cc-4372-a567-0e02b2c3d479"; + + // OSS file paths for testing + private static final String PATH_1 = "oss://my-bucket/data/files/document_001.pdf"; + private static final String PATH_2 = "oss://my-bucket/data/images/photo_002.jpg"; + private static final String PATH_3 = "oss://my-bucket/data/videos/movie_003.mp4"; + + // Blob content for testing + private static final byte[] BLOB_CONTENT_1 = "PDF binary content for document 001".getBytes(); + private static final byte[] BLOB_CONTENT_2 = "JPG binary content for photo 002".getBytes(); + private static final byte[] BLOB_CONTENT_3 = "MP4 binary content for movie 003".getBytes(); + + public AppendOnlyTableSourceFunction() {} + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + testEventList = new ArrayList<>(); + + Schema schema = Schema.newBuilder().setColumns(SIMPLE_COLUMNS).build(); + testEventList.add(new CreateTableEvent(TEST_TABLE_ID, schema)); + + BinaryRecordDataGenerator binaryRecordDataGenerator = + new BinaryRecordDataGenerator( + schema.getColumnDataTypes() + .toArray(new org.apache.flink.cdc.common.types.DataType[0])); + + // Emit 3 insert events with uuid, path and blobContent + testEventList.add( + DataChangeEvent.insertEvent( + TEST_TABLE_ID, + binaryRecordDataGenerator.generate( + new Object[] { + BinaryStringData.fromString(UUID_1), + BinaryStringData.fromString(PATH_1), + BLOB_CONTENT_1 + }))); + testEventList.add( + DataChangeEvent.insertEvent( + TEST_TABLE_ID, + binaryRecordDataGenerator.generate( + new Object[] { + BinaryStringData.fromString(UUID_2), + BinaryStringData.fromString(PATH_2), + BLOB_CONTENT_2 + }))); + testEventList.add( + DataChangeEvent.insertEvent( + TEST_TABLE_ID, + binaryRecordDataGenerator.generate( + new Object[] { + BinaryStringData.fromString(UUID_3), + BinaryStringData.fromString(PATH_3), + BLOB_CONTENT_3 + }))); + } + + @Override + public void run( + org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext + context) { + // Emit all events in the first subtask + if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) { + for (Event event : testEventList) { + context.collect(event); + } + } + } + + @Override + public void cancel() { + // Do nothing + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java new file mode 100644 index 00000000000..dc2abe6b621 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkFactory; +import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions; +import org.apache.flink.cdc.connectors.paimon.sink.testsource.AppendOnlyTableDataSourceFactory; +import org.apache.flink.cdc.connectors.paimon.sink.testsource.AppendOnlyTableSourceFunction; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; + +/** The ITCase for writing to append-only tables using custom test source. */ +public class AppendOnlyTableITCase { + + @TempDir public static java.nio.file.Path temporaryFolder; + + String warehouse; + + private static final int PARALLELISM = 4; + + // Always use parent-first classloader for CDC classes. + // The reason is that test source uses static field for holding data, we need to make sure + // the class is loaded by AppClassloader so that we can verify data in the test case. + private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG = + new org.apache.flink.configuration.Configuration(); + + static { + MINI_CLUSTER_CONFIG.set( + ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + Collections.singletonList("org.apache.flink.cdc")); + } + + /** + * Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for + * every test case. + */ + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .setConfiguration(MINI_CLUSTER_CONFIG) + .build()); + + @BeforeEach + void init() { + warehouse = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + } + + @Test + void testWritingToAppendOnlyTable() throws Exception { + Options catalogOptions = new Options(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + + runPipelineJob(false); + + Table table = + catalog.getTable( + Identifier.fromString( + AppendOnlyTableSourceFunction.TEST_TABLE_ID.identifier())); + List results = getResultsFromTable(table, false); + Assertions.assertThat(results) + .containsExactlyInAnyOrder( + "default_database.table1:uuid=550e8400-e29b-41d4-a716-446655440000;path=oss://my-bucket/data/files/document_001.pdf;blobContent=PDF binary content for document 001", + "default_database.table1:uuid=6ba7b810-9dad-11d1-80b4-00c04fd430c8;path=oss://my-bucket/data/images/photo_002.jpg;blobContent=JPG binary content for photo 002", + "default_database.table1:uuid=f47ac10b-58cc-4372-a567-0e02b2c3d479;path=oss://my-bucket/data/videos/movie_003.mp4;blobContent=MP4 binary content for movie 003"); + } + + @Test + void testWritingToAppendOnlyTableWithBlobDescriptor() throws Exception { + Options catalogOptions = new Options(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + + runPipelineJob(true); + + Table table = + catalog.getTable( + Identifier.fromString( + AppendOnlyTableSourceFunction.TEST_TABLE_ID.identifier())); + List results = getResultsFromTable(table, true); + Assertions.assertThat(results) + .containsExactlyInAnyOrder( + "default_database.table1:uuid=550e8400-e29b-41d4-a716-446655440000;path=oss://my-bucket/data/files/document_001.pdf;blobContent=PDF binary content for document 001", + "default_database.table1:uuid=6ba7b810-9dad-11d1-80b4-00c04fd430c8;path=oss://my-bucket/data/images/photo_002.jpg;blobContent=JPG binary content for photo 002", + "default_database.table1:uuid=f47ac10b-58cc-4372-a567-0e02b2c3d479;path=oss://my-bucket/data/videos/movie_003.mp4;blobContent=MP4 binary content for movie 003"); + } + + private List getResultsFromTable(Table table, boolean withBlobDescriptor) + throws IOException { + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + List result = new ArrayList<>(); + RecordReader reader = read.createReader(splits); + reader.forEachRemaining( + record -> { + String uuid = record.getString(0).toString(); + + String path = + withBlobDescriptor + ? record.getBlob(1).toDescriptor().uri() + : record.getString(1).toString(); + String blobContent = new String(record.getBinary(2)); + result.add( + String.format( + "%s:uuid=%s;path=%s;blobContent=%s", + AppendOnlyTableSourceFunction.TEST_TABLE_ID.identifier(), + uuid, + path, + blobContent)); + }); + return result; + } + + private void runPipelineJob(boolean withBlobDescriptor) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup append-only table source + Configuration sourceConfig = new Configuration(); + SourceDef sourceDef = + new SourceDef( + AppendOnlyTableDataSourceFactory.IDENTIFIER, + "Append Only Table Source", + sourceConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, PARALLELISM); + pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "Asia/Shanghai"); + + // Setup paimon sink + Map sinkOptions = new HashMap<>(); + sinkOptions.put("table.properties.parquet.use-native", "false"); + if (withBlobDescriptor) { + sinkOptions.put("table.properties.row-tracking.enabled", "true"); + sinkOptions.put("table.properties.data-evolution.enabled", "true"); + sinkOptions.put("table.properties.blob-field", "path"); + sinkOptions.put("table.properties.blob-descriptor-field", "path"); + sinkOptions.put("table.properties.blob-as-descriptor", "true"); + } + Configuration sinkConfig = Configuration.fromMap(sinkOptions); + sinkConfig.set(PaimonDataSinkOptions.METASTORE, "filesystem"); + sinkConfig.set(PaimonDataSinkOptions.WAREHOUSE, warehouse); + SinkDef sinkDef = new SinkDef(PaimonDataSinkFactory.IDENTIFIER, "Paimon Sink", sinkConfig); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + + execution.execute(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java index 41accd73a35..f43562c3ec9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java @@ -35,12 +35,15 @@ import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder; import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier; +import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalMap; @@ -50,6 +53,7 @@ import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowKind; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -475,6 +479,365 @@ void testDeduceSchemaForPaimonTable() throws Catalog.TableNotExistException { .isTrue(); } + // ========== BLOB type support tests ========== + + @Test + void testCreateFieldGettersWithBlobWriteContext() throws IOException { + RowType rowType = + RowType.of(DataTypes.INT(), DataTypes.VARBINARY(100), DataTypes.VARBINARY(50)); + Object[] testData = + new Object[] {1, new byte[] {1, 2, 3, 4, 5}, new byte[] {6, 7, 8, 9, 10}}; + BinaryRecordData recordData = new BinaryRecordDataGenerator(rowType).generate(testData); + Schema schema = Schema.newBuilder().fromRowDataType(rowType).build(); + + // Create BlobWriteContext with "col2" as blob field + BlobWriteContext blobWriteContext = BlobWriteContext.empty(); + // Note: We can't directly set blob fields, but we can test the empty case + List fieldGettersNoBlob = + PaimonWriterHelper.createFieldGetters(schema, ZoneId.of("UTC+8"), blobWriteContext); + + DataChangeEvent dataChangeEvent = + DataChangeEvent.insertEvent(TableId.parse("database.table"), recordData); + GenericRow genericRow = + PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGettersNoBlob); + + // Without BlobWriteContext, VARBINARY should remain as byte[] + Assertions.assertThat(genericRow.getField(1)).isInstanceOf(byte[].class); + Assertions.assertThat(genericRow.getField(2)).isInstanceOf(byte[].class); + } + + @Test + void testDeduceSchemaForPaimonTableWithBlobType() throws Catalog.TableNotExistException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + + // Create an append-only table with BLOB column using PaimonMetadataApplier + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + + PaimonMetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("blob_col", DataTypes.VARBINARY(100)) + .physicalColumn("normal_varbinary", DataTypes.VARBINARY(50)) + .build(); + + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.blob_table"), cdcSchema); + metadataApplier.applySchemaChange(createTableEvent); + + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.fromString("test.blob_table")); + + // Verify the table has BLOB type for blob_col + org.apache.paimon.types.RowType paimonRowType = table.rowType(); + Assertions.assertThat(paimonRowType.getField("blob_col").type().getTypeRoot()) + .isEqualTo(DataTypeRoot.BLOB); + Assertions.assertThat(paimonRowType.getField("normal_varbinary").type().getTypeRoot()) + .isEqualTo(DataTypeRoot.VARBINARY); + + // Verify deduceSchemaForPaimonTable converts BLOB back to VARBINARY + // (raw data mode: no blob-descriptor-field configured) + Schema deducedSchema = PaimonWriterHelper.deduceSchemaForPaimonTable(table); + Assertions.assertThat(deducedSchema.getColumns().get(1).getType().getTypeRoot()) + .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARBINARY); + Assertions.assertThat(deducedSchema.getColumns().get(2).getType().getTypeRoot()) + .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARBINARY); + } + + @Test + void testDeduceSchemaForPaimonTableWithBlobDescriptorField() + throws Catalog.TableNotExistException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + + // Create table with blob-descriptor-field configuration (descriptor mode) + // In descriptor mode, blob column stores URI/path string instead of raw bytes + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "descriptor_blob,raw_blob"); + tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "descriptor_blob"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + + PaimonMetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("descriptor_blob", DataTypes.STRING()) // URI path string + .physicalColumn("raw_blob", DataTypes.VARBINARY(100)) // Raw bytes + .build(); + + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.blob_descriptor_schema_table"), cdcSchema); + metadataApplier.applySchemaChange(createTableEvent); + + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + Table table = catalog.getTable(Identifier.fromString("test.blob_descriptor_schema_table")); + + // Verify both columns become BLOB type in Paimon table + org.apache.paimon.types.RowType paimonRowType = table.rowType(); + Assertions.assertThat(paimonRowType.getField("descriptor_blob").type().getTypeRoot()) + .isEqualTo(DataTypeRoot.BLOB); + Assertions.assertThat(paimonRowType.getField("raw_blob").type().getTypeRoot()) + .isEqualTo(DataTypeRoot.BLOB); + + // Verify deduceSchemaForPaimonTable correctly converts: + // - descriptor_blob (blob-descriptor-field) → STRING + // - raw_blob (no blob-descriptor-field) → VARBINARY + Schema deducedSchema = PaimonWriterHelper.deduceSchemaForPaimonTable(table); + Assertions.assertThat(deducedSchema.getColumns().get(1).getName()) + .isEqualTo("descriptor_blob"); + Assertions.assertThat(deducedSchema.getColumns().get(1).getType().getTypeRoot()) + .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARCHAR); + Assertions.assertThat(deducedSchema.getColumns().get(2).getName()).isEqualTo("raw_blob"); + Assertions.assertThat(deducedSchema.getColumns().get(2).getType().getTypeRoot()) + .isEqualTo(org.apache.flink.cdc.common.types.DataTypeRoot.VARBINARY); + } + + @Test + void testBlobWriteContextCreateBlobFromBytes() { + BlobWriteContext context = BlobWriteContext.empty(); + + // Test createBlob(byte[]) - 方式A: direct data storage + byte[] testData = new byte[] {1, 2, 3, 4, 5}; + Blob blob = context.createBlob(testData); + + Assertions.assertThat(blob).isNotNull(); + Assertions.assertThat(blob.toData()).isEqualTo(testData); + + // Test createBlob with null bytes + Blob nullBlob = context.createBlob((byte[]) null); + Assertions.assertThat(nullBlob).isNull(); + } + + @Test + void testBlobWriteContextFromTableWithBlobDescriptorField() + throws Catalog.TableNotExistException, IOException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + + // Create an append-only table with blob-descriptor-field + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col"); + tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "blob_col"); // descriptor mode + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + + PaimonMetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("blob_col", DataTypes.STRING()) // String path + .build(); + + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.blob_descriptor_field_table"), cdcSchema); + metadataApplier.applySchemaChange(createTableEvent); + + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + org.apache.paimon.table.FileStoreTable table = + (org.apache.paimon.table.FileStoreTable) + catalog.getTable(Identifier.fromString("test.blob_descriptor_field_table")); + + // Create BlobWriteContext from table (case-sensitive mode) + BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(true, table); + + // Verify blob-descriptor-field is parsed correctly + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("blob_col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("BLOB_COL")) + .isFalse(); // case-sensitive + Assertions.assertThat(blobWriteContext.getBlobDescriptorFields()).contains("blob_col"); + Assertions.assertThat(blobWriteContext.isBlobField("blob_col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobField("BLOB_COL")).isFalse(); // case-sensitive + + // Test createBlobRef works correctly - stores descriptor info only + String externalPath = "oss://bucket/data/video.mp4"; + Blob blobRef = blobWriteContext.createBlobRef(externalPath); + Assertions.assertThat(blobRef).isNotNull(); + org.apache.paimon.data.BlobDescriptor descriptor = blobRef.toDescriptor(); + Assertions.assertThat(descriptor.uri()).isEqualTo(externalPath); + } + + @Test + void testBlobWriteContextMixedDescriptorAndRawBlobFields() + throws Catalog.TableNotExistException, IOException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + + // Create table with multiple blob fields: some descriptor, some raw + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "descriptor_blob,raw_blob"); + tableOptions.put( + CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), + "descriptor_blob"); // Only descriptor_blob preserves path + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + + PaimonMetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("descriptor_blob", DataTypes.STRING()) // Path string + .physicalColumn("raw_blob", DataTypes.VARBINARY(100)) // Raw bytes + .build(); + + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.mixed_blob_table"), cdcSchema); + metadataApplier.applySchemaChange(createTableEvent); + + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + org.apache.paimon.table.FileStoreTable table = + (org.apache.paimon.table.FileStoreTable) + catalog.getTable(Identifier.fromString("test.mixed_blob_table")); + + // Create BlobWriteContext from table (case-sensitive mode) + BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(true, table); + + // Verify correct classification + Assertions.assertThat(blobWriteContext.isBlobField("descriptor_blob")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobField("raw_blob")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobField("Descriptor_Blob")) + .isFalse(); // case-sensitive + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("descriptor_blob")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("raw_blob")).isFalse(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("Descriptor_Blob")) + .isFalse(); // case-sensitive + + // Verify createBlobRef for descriptor field + String externalPath = "file:///data/image.png"; + Blob descriptorBlob = blobWriteContext.createBlobRef(externalPath); + Assertions.assertThat(descriptorBlob).isNotNull(); + org.apache.paimon.data.BlobDescriptor descriptor = descriptorBlob.toDescriptor(); + Assertions.assertThat(descriptor.uri()).isEqualTo(externalPath); + + // Verify createBlob(byte[]) for raw field + byte[] rawData = new byte[] {1, 2, 3, 4, 5}; + Blob rawBlob = blobWriteContext.createBlob(rawData); + Assertions.assertThat(rawBlob).isNotNull(); + Assertions.assertThat(rawBlob.toData()).isEqualTo(rawData); + } + + @Test + void testBlobWriteContextCaseInsensitive() throws Catalog.TableNotExistException, IOException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + + // Create table with blob fields (use lowercase in config, matching CDC schema column name) + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col"); + tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "blob_col"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + + PaimonMetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("blob_col", DataTypes.STRING()) + .build(); + + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.case_insensitive_blob_table"), cdcSchema); + metadataApplier.applySchemaChange(createTableEvent); + + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + org.apache.paimon.table.FileStoreTable table = + (org.apache.paimon.table.FileStoreTable) + catalog.getTable(Identifier.fromString("test.case_insensitive_blob_table")); + + // Create BlobWriteContext with case-insensitive mode + BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(false, table); + + // Verify field names can match both uppercase and lowercase queries when case-insensitive + Assertions.assertThat(blobWriteContext.isBlobField("blob_col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobField("BLOB_COL")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobField("Blob_Col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("blob_col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("BLOB_COL")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("Blob_Col")).isTrue(); + + // Verify internal storage is lowercase + Assertions.assertThat(blobWriteContext.getBlobFields()).contains("blob_col"); + Assertions.assertThat(blobWriteContext.getBlobDescriptorFields()).contains("blob_col"); + } + + @Test + void testBlobWriteContextCaseSensitiveMismatch() + throws Catalog.TableNotExistException, IOException { + Options catalogOptions = new Options(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); + + // Create table with blob fields in lowercase config + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BLOB_FIELD.key(), "blob_col"); + tableOptions.put(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "blob_col"); + tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + + PaimonMetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("blob_col", DataTypes.STRING()) + .build(); + + CreateTableEvent createTableEvent = + new CreateTableEvent(TableId.parse("test.case_sensitive_blob_table"), cdcSchema); + metadataApplier.applySchemaChange(createTableEvent); + + Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + org.apache.paimon.table.FileStoreTable table = + (org.apache.paimon.table.FileStoreTable) + catalog.getTable(Identifier.fromString("test.case_sensitive_blob_table")); + + // Create BlobWriteContext with case-sensitive mode + BlobWriteContext blobWriteContext = BlobWriteContext.fromTable(true, table); + + // Verify only exact case match works + Assertions.assertThat(blobWriteContext.isBlobField("blob_col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobField("BLOB_COL")).isFalse(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("blob_col")).isTrue(); + Assertions.assertThat(blobWriteContext.isBlobDescriptorField("BLOB_COL")).isFalse(); + } + private static Map extractMap(InternalMap internalMap) { Map resultMap = new HashMap<>(); for (int i = 0; i < internalMap.size(); i++) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java new file mode 100644 index 00000000000..eac67d49c36 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.lang.reflect.Field; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Unit tests for {@link PaimonWriter#write}. */ +@ExtendWith(MockitoExtension.class) +class PaimonWriterTest { + + @Mock private Catalog mockCatalog; + + @Mock private FileStoreTable mockTable; + + @Mock private CoreOptions mockCoreOptions; + + private MockedStatic mockedFactory; + private MockedStatic mockedBlobWriteContext; + private MockedConstruction mockedConstruction; + + private PaimonWriter writer; + + private final TableId testTableId = TableId.parse("test.table1"); + private final Identifier testIdentifier = Identifier.fromString("test.table1"); + + @BeforeEach + void setUp() throws Exception { + mockedFactory = Mockito.mockStatic(FlinkCatalogFactory.class); + mockedBlobWriteContext = Mockito.mockStatic(BlobWriteContext.class); + + mockedFactory + .when(() -> FlinkCatalogFactory.createPaimonCatalog(any(Options.class))) + .thenReturn(mockCatalog); + + BlobWriteContext mockBlobContext = BlobWriteContext.empty(); + mockedBlobWriteContext + .when(() -> BlobWriteContext.fromTable(anyBoolean(), any(FileStoreTable.class))) + .thenReturn(mockBlobContext); + + when(mockCatalog.getTable(any(Identifier.class))).thenReturn(mockTable); + when(mockCoreOptions.writeBufferSize()).thenReturn(32 * 1024 * 1024L); + when(mockCoreOptions.pageSize()).thenReturn(32 * 1024); + when(mockCoreOptions.bucket()).thenReturn(1); + when(mockTable.coreOptions()).thenReturn(mockCoreOptions); + + MetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup(); + PaimonRecordEventSerializer serializer = + new PaimonRecordEventSerializer(ZoneId.systemDefault()); + + Options catalogOptions = new Options(); + + mockedConstruction = + Mockito.mockConstruction( + StoreSinkWriteImpl.class, + (mock, context) -> + lenient() + .when(mock.prepareCommit(anyBoolean(), anyLong())) + .thenReturn(java.util.Collections.emptyList())); + + writer = + new PaimonWriter<>(catalogOptions, metricGroup, "test_commit_user", serializer, 0L); + + injectMockCatalog(writer, mockCatalog); + } + + private void injectMockCatalog(PaimonWriter writer, Catalog catalog) throws Exception { + Field catalogField = PaimonWriter.class.getDeclaredField("catalog"); + catalogField.setAccessible(true); + catalogField.set(writer, catalog); + } + + @AfterEach + void tearDown() throws Exception { + if (writer != null) { + writer.close(); + } + if (mockedFactory != null) { + mockedFactory.close(); + } + if (mockedBlobWriteContext != null) { + mockedBlobWriteContext.close(); + } + if (mockedConstruction != null) { + mockedConstruction.close(); + } + } + + @Test + void testGetTableInvocationCount() throws Exception { + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.STRING().notNull()) + .physicalColumn("value", DataTypes.INT()) + .physicalColumn("path", DataTypes.STRING()) + .build(); + + writer.write(new CreateTableEvent(testTableId, schema), null); + + RowType rowType = + RowType.of(DataTypes.STRING().notNull(), DataTypes.INT(), DataTypes.STRING()); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + + for (int i = 1; i <= 5; i++) { + writer.write( + DataChangeEvent.insertEvent( + testTableId, + generator.generate( + new Object[] { + BinaryStringData.fromString("id" + i), + i, + BinaryStringData.fromString("path" + i) + })), + null); + } + + verify(mockCatalog, times(1)).getTable(testIdentifier); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(testTableId, addedColumns); + writer.write(addColumnEvent, null); + + verify(mockCatalog, times(2)).getTable(testIdentifier); + + RowType rowTypeWithAge = + RowType.of( + DataTypes.STRING().notNull(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.INT()); + BinaryRecordDataGenerator generatorWithAge = new BinaryRecordDataGenerator(rowTypeWithAge); + + for (int i = 6; i <= 10; i++) { + writer.write( + DataChangeEvent.insertEvent( + testTableId, + generatorWithAge.generate( + new Object[] { + BinaryStringData.fromString("id" + i), + i, + BinaryStringData.fromString("path" + i), + i + 10 + })), + null); + } + + verify(mockCatalog, times(2)).getTable(testIdentifier); + + Map typeMapping = new HashMap<>(); + typeMapping.put("value", DataTypes.BIGINT()); + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent(testTableId, typeMapping); + writer.write(alterColumnTypeEvent, null); + + verify(mockCatalog, times(3)).getTable(testIdentifier); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory new file mode 100644 index 00000000000..8335f29d37d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.flink.cdc.connectors.paimon.sink.testsource.AppendOnlyTableDataSourceFactory From c0a61f4e4a47e124680830762f9c66ac0967e18a Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Thu, 21 May 2026 14:49:12 +0800 Subject: [PATCH 2/2] Address comments. --- .../pom.xml | 7 + .../paimon/sink/PaimonDataSinkFactory.java | 2 +- .../sink/v2/PaimonRecordEventSerializer.java | 51 +++---- .../paimon/sink/v2/PaimonWriter.java | 12 -- .../paimon/sink/v2/TableSchemaInfo.java | 30 ----- .../sink/v2/bucket/BucketAssignOperator.java | 9 +- .../AppendOnlyTableDataSourceFactory.java | 72 ---------- .../AppendOnlyTableSourceFunction.java | 125 ------------------ .../paimon/sink/v2/AppendOnlyTableITCase.java | 32 ++--- .../paimon/sink/v2/PaimonSinkITCase.java | 18 ++- .../paimon/sink/v2/PaimonWriterTest.java | 14 +- .../v2/bucket/PaimonPostponeBucketTest.java | 7 +- ....apache.flink.cdc.common.factories.Factory | 15 --- .../values/source/ValuesDataSourceHelper.java | 70 +++++++++- 14 files changed, 146 insertions(+), 318 deletions(-) delete mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java delete mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java delete mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index 60942600736..5c5b12671c6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -51,6 +51,13 @@ limitations under the License. test + + org.apache.flink + flink-cdc-pipeline-connector-values + ${project.version} + test + + org.apache.flink flink-clients diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java index a165946d4dd..faed8ed1431 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java @@ -96,7 +96,7 @@ public DataSink createDataSink(Context context) { } } } - PaimonRecordSerializer serializer = new PaimonRecordEventSerializer(zoneId); + PaimonRecordSerializer serializer = new PaimonRecordEventSerializer(zoneId, options); String schemaOperatorUid = context.getPipelineConfiguration() .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java index 20b248341ad..f1b8a9450c2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java @@ -28,8 +28,12 @@ import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; import java.time.ZoneId; import java.util.HashMap; @@ -48,30 +52,19 @@ public class PaimonRecordEventSerializer implements PaimonRecordSerializer(); this.zoneId = zoneId; - } - - /** - * Update the BlobWriteContext for a specific table. - * - *

This is called by PaimonWriter when it has access to the Paimon table configuration. The - * field getters will be recreated with the new BlobWriteContext to properly convert VARBINARY - * fields to BLOB type. - * - * @param tableId The table identifier. - * @param blobWriteContext The BlobWriteContext from the Paimon table. - */ - public void updateBlobWriteContext(TableId tableId, BlobWriteContext blobWriteContext) { - TableSchemaInfo schemaInfo = schemaMaps.get(tableId); - if (schemaInfo != null) { - schemaInfo.updateBlobWriteContext(blobWriteContext, zoneId); - } + this.options = options; } @Override public PaimonEvent serialize(Event event) { + lazilyInitializeCatalog(); int bucket = 0; if (event instanceof BucketWrapperChangeEvent) { bucket = ((BucketWrapperChangeEvent) event).getBucket(); @@ -80,10 +73,18 @@ public PaimonEvent serialize(Event event) { Identifier tableId = Identifier.fromString(((ChangeEvent) event).tableId().toString()); if (event instanceof SchemaChangeEvent) { if (event instanceof CreateTableEvent) { - CreateTableEvent createTableEvent = (CreateTableEvent) event; - schemaMaps.put( - createTableEvent.tableId(), - new TableSchemaInfo(createTableEvent.getSchema(), zoneId)); + try { + FileStoreTable table = (FileStoreTable) catalog.getTable(tableId); + BlobWriteContext blobWriteContext = + BlobWriteContext.fromTable(catalog.caseSensitive(), table); + CreateTableEvent createTableEvent = (CreateTableEvent) event; + schemaMaps.put( + createTableEvent.tableId(), + new TableSchemaInfo( + createTableEvent.getSchema(), zoneId, blobWriteContext)); + } catch (Catalog.TableNotExistException e) { + throw new IllegalStateException(e); + } } else { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; Schema schema = schemaMaps.get(schemaChangeEvent.tableId()).getSchema(); @@ -114,7 +115,9 @@ public PaimonEvent serialize(Event event) { } } - public Map getSchemaMaps() { - return schemaMaps; + private void lazilyInitializeCatalog() { + if (this.catalog == null) { + this.catalog = FlinkCatalogFactory.createPaimonCatalog(options); + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index 5f18eadc95e..0bcab4d8557 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -20,9 +20,7 @@ import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.StatefulSinkWriter; -import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -154,16 +152,6 @@ public void write(InputT event, Context context) throws IOException { } catch (Exception e) { throw new RuntimeException(e); } - try { - FileStoreTable table = getTable(tableId); - BlobWriteContext blobWriteContext = - BlobWriteContext.fromTable(catalog.caseSensitive(), table); - ((PaimonRecordEventSerializer) serializer) - .updateBlobWriteContext(((ChangeEvent) event).tableId(), blobWriteContext); - } catch (Exception e) { - // Table might not exist yet, ignore and continue - LOG.debug("Could not get table for BlobWriteContext: {}", e.getMessage()); - } } if (paimonEvent.getGenericRows() != null) { FileStoreTable table; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java index a422bc5443a..ccea3ca21ee 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TableSchemaInfo.java @@ -37,10 +37,6 @@ public class TableSchemaInfo { @Nullable private BlobWriteContext blobWriteContext; - public TableSchemaInfo(Schema schema, ZoneId zoneId) { - this(schema, zoneId, null); - } - public TableSchemaInfo( Schema schema, ZoneId zoneId, @Nullable BlobWriteContext blobWriteContext) { this.schema = schema; @@ -49,32 +45,6 @@ public TableSchemaInfo( this.hasPrimaryKey = !schema.primaryKeys().isEmpty(); } - /** - * Update the BlobWriteContext and recreate field getters. - * - *

This is called when the table is accessed in PaimonWriter and we have the actual table - * configuration. - */ - public void updateBlobWriteContext(BlobWriteContext blobWriteContext, ZoneId zoneId) { - if (this.blobWriteContext == null && blobWriteContext == null) { - return; - } - if (this.blobWriteContext != null && blobWriteContext != null) { - // Compare blob fields and descriptor fields - boolean sameBlobFields = - this.blobWriteContext.getBlobFields().equals(blobWriteContext.getBlobFields()); - boolean sameDescriptorFields = - this.blobWriteContext - .getBlobDescriptorFields() - .equals(blobWriteContext.getBlobDescriptorFields()); - if (sameBlobFields && sameDescriptorFields) { - return; - } - } - this.blobWriteContext = blobWriteContext; - this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, zoneId, blobWriteContext); - } - public Schema getSchema() { return schema; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index d523feb721b..94b351c52f5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -248,8 +248,8 @@ public SchemaChangeEvent convertSchemaChangeEvent(SchemaChangeEvent schemaChange catalog.getTable(PaimonWriterHelper.identifierFromTableId(tableId))); MixedSchemaInfo mixedSchemaInfo = new MixedSchemaInfo( - new TableSchemaInfo(upstreamSchema, zoneId), - new TableSchemaInfo(physicalSchema, zoneId)); + new TableSchemaInfo(upstreamSchema, zoneId, null), + new TableSchemaInfo(physicalSchema, zoneId, null)); if (!mixedSchemaInfo.isSameColumnsIgnoringCommentAndDefaultValue()) { LOGGER.warn( "Upstream schema of {} is {}, which is different with paimon physical table schema {}. Data precision loss and truncation may occur.", @@ -276,13 +276,14 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent) if (schema.isPresent()) { MixedSchemaInfo mixedSchemaInfo = new MixedSchemaInfo( - new TableSchemaInfo(schema.get(), zoneId), + new TableSchemaInfo(schema.get(), zoneId, null), new TableSchemaInfo( PaimonWriterHelper.deduceSchemaForPaimonTable( catalog.getTable( PaimonWriterHelper.identifierFromTableId( tableId))), - zoneId)); + zoneId, + null)); if (!mixedSchemaInfo.isSameColumnsIgnoringCommentAndDefaultValue()) { LOGGER.warn( "Upstream schema of {} is {}, which is different with paimon physical table schema {}. Data precision loss and truncation may occur.", diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java deleted file mode 100644 index ac1ce2afffc..00000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableDataSourceFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.connectors.paimon.sink.testsource; - -import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.common.configuration.ConfigOption; -import org.apache.flink.cdc.common.factories.DataSourceFactory; -import org.apache.flink.cdc.common.factories.Factory; -import org.apache.flink.cdc.common.source.DataSource; -import org.apache.flink.cdc.common.source.EventSourceProvider; -import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; -import org.apache.flink.cdc.common.source.MetadataAccessor; - -import java.util.HashSet; -import java.util.Set; - -/** A source {@link Factory} to create {@link AppendOnlyTableDataSource}. */ -@Internal -public class AppendOnlyTableDataSourceFactory implements DataSourceFactory { - - public static final String IDENTIFIER = "append-only-table-source"; - - @Override - public DataSource createDataSource(Context context) { - return new AppendOnlyTableDataSource(); - } - - @Override - public String identifier() { - return IDENTIFIER; - } - - @Override - public Set> requiredOptions() { - return new HashSet<>(); - } - - @Override - public Set> optionalOptions() { - return new HashSet<>(); - } - - /** A {@link DataSource} that emits append-only table data records. */ - @Internal - private static class AppendOnlyTableDataSource implements DataSource { - - @Override - public EventSourceProvider getEventSourceProvider() { - return FlinkSourceFunctionProvider.of(new AppendOnlyTableSourceFunction()); - } - - @Override - public MetadataAccessor getMetadataAccessor() { - return null; - } - } -} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java deleted file mode 100644 index d28d5d24d80..00000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/testsource/AppendOnlyTableSourceFunction.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.connectors.paimon.sink.testsource; - -import org.apache.flink.cdc.common.data.binary.BinaryStringData; -import org.apache.flink.cdc.common.event.CreateTableEvent; -import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.schema.Column; -import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.types.DataTypes; -import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** Source function for testing append-only tables. */ -public class AppendOnlyTableSourceFunction extends RichParallelSourceFunction { - - private List testEventList; - - public static final TableId TEST_TABLE_ID = TableId.tableId("default_database", "table1"); - - private static final List SIMPLE_COLUMNS = - Arrays.asList( - Column.physicalColumn("uuid", DataTypes.STRING().notNull()), - Column.physicalColumn("path", DataTypes.STRING()), - Column.physicalColumn("blobContent", DataTypes.VARBINARY(1000))); - - // Fixed UUIDs for testing - private static final String UUID_1 = "550e8400-e29b-41d4-a716-446655440000"; - private static final String UUID_2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8"; - private static final String UUID_3 = "f47ac10b-58cc-4372-a567-0e02b2c3d479"; - - // OSS file paths for testing - private static final String PATH_1 = "oss://my-bucket/data/files/document_001.pdf"; - private static final String PATH_2 = "oss://my-bucket/data/images/photo_002.jpg"; - private static final String PATH_3 = "oss://my-bucket/data/videos/movie_003.mp4"; - - // Blob content for testing - private static final byte[] BLOB_CONTENT_1 = "PDF binary content for document 001".getBytes(); - private static final byte[] BLOB_CONTENT_2 = "JPG binary content for photo 002".getBytes(); - private static final byte[] BLOB_CONTENT_3 = "MP4 binary content for movie 003".getBytes(); - - public AppendOnlyTableSourceFunction() {} - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - testEventList = new ArrayList<>(); - - Schema schema = Schema.newBuilder().setColumns(SIMPLE_COLUMNS).build(); - testEventList.add(new CreateTableEvent(TEST_TABLE_ID, schema)); - - BinaryRecordDataGenerator binaryRecordDataGenerator = - new BinaryRecordDataGenerator( - schema.getColumnDataTypes() - .toArray(new org.apache.flink.cdc.common.types.DataType[0])); - - // Emit 3 insert events with uuid, path and blobContent - testEventList.add( - DataChangeEvent.insertEvent( - TEST_TABLE_ID, - binaryRecordDataGenerator.generate( - new Object[] { - BinaryStringData.fromString(UUID_1), - BinaryStringData.fromString(PATH_1), - BLOB_CONTENT_1 - }))); - testEventList.add( - DataChangeEvent.insertEvent( - TEST_TABLE_ID, - binaryRecordDataGenerator.generate( - new Object[] { - BinaryStringData.fromString(UUID_2), - BinaryStringData.fromString(PATH_2), - BLOB_CONTENT_2 - }))); - testEventList.add( - DataChangeEvent.insertEvent( - TEST_TABLE_ID, - binaryRecordDataGenerator.generate( - new Object[] { - BinaryStringData.fromString(UUID_3), - BinaryStringData.fromString(PATH_3), - BLOB_CONTENT_3 - }))); - } - - @Override - public void run( - org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext - context) { - // Emit all events in the first subtask - if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) { - for (Event event : testEventList) { - context.collect(event); - } - } - } - - @Override - public void cancel() { - // Do nothing - } -} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java index dc2abe6b621..32abdb215aa 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/AppendOnlyTableITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; @@ -26,8 +27,9 @@ import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkFactory; import org.apache.flink.cdc.connectors.paimon.sink.PaimonDataSinkOptions; -import org.apache.flink.cdc.connectors.paimon.sink.testsource.AppendOnlyTableDataSourceFactory; -import org.apache.flink.cdc.connectors.paimon.sink.testsource.AppendOnlyTableSourceFunction; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper; +import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -67,6 +69,8 @@ public class AppendOnlyTableITCase { private static final int PARALLELISM = 4; + private static final TableId TEST_TABLE_ID = TableId.tableId("default_database", "table1"); + // Always use parent-first classloader for CDC classes. // The reason is that test source uses static field for holding data, we need to make sure // the class is loaded by AppClassloader so that we can verify data in the test case. @@ -106,10 +110,7 @@ void testWritingToAppendOnlyTable() throws Exception { runPipelineJob(false); - Table table = - catalog.getTable( - Identifier.fromString( - AppendOnlyTableSourceFunction.TEST_TABLE_ID.identifier())); + Table table = catalog.getTable(Identifier.fromString(TEST_TABLE_ID.identifier())); List results = getResultsFromTable(table, false); Assertions.assertThat(results) .containsExactlyInAnyOrder( @@ -127,10 +128,7 @@ void testWritingToAppendOnlyTableWithBlobDescriptor() throws Exception { runPipelineJob(true); - Table table = - catalog.getTable( - Identifier.fromString( - AppendOnlyTableSourceFunction.TEST_TABLE_ID.identifier())); + Table table = catalog.getTable(Identifier.fromString(TEST_TABLE_ID.identifier())); List results = getResultsFromTable(table, true); Assertions.assertThat(results) .containsExactlyInAnyOrder( @@ -158,10 +156,7 @@ record -> { result.add( String.format( "%s:uuid=%s;path=%s;blobContent=%s", - AppendOnlyTableSourceFunction.TEST_TABLE_ID.identifier(), - uuid, - path, - blobContent)); + TEST_TABLE_ID.identifier(), uuid, path, blobContent)); }); return result; } @@ -169,13 +164,14 @@ record -> { private void runPipelineJob(boolean withBlobDescriptor) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); - // Setup append-only table source + // Setup values source with APPEND_ONLY_BLOB_TABLE events Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.APPEND_ONLY_BLOB_TABLE); SourceDef sourceDef = new SourceDef( - AppendOnlyTableDataSourceFactory.IDENTIFIER, - "Append Only Table Source", - sourceConfig); + ValuesDataFactory.IDENTIFIER, "Append Only Table Source", sourceConfig); // Setup pipeline Configuration pipelineConfig = new Configuration(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 381c87b249c..a78e830c083 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -278,7 +278,8 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) initialize(metastore); PaimonSink paimonSink = new PaimonSink<>( - catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + catalogOptions, + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions)); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(new MockCommitterInitContext()); @@ -340,7 +341,8 @@ public void testSinkWithDataChangeForAppendOnlyTable(String metastore, boolean e initialize(metastore); PaimonSink paimonSink = new PaimonSink<>( - catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + catalogOptions, + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions)); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(new MockCommitterInitContext()); @@ -406,7 +408,8 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto initialize(metastore); PaimonSink paimonSink = new PaimonSink<>( - catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + catalogOptions, + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions)); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(new MockCommitterInitContext()); @@ -512,7 +515,8 @@ void testSinkWithSchemaChangeForExistedTable(SchemaChange schemaChange) throws E initialize("filesystem"); PaimonSink paimonSink = new PaimonSink<>( - catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + catalogOptions, + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions)); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(new MockCommitterInitContext()); @@ -724,7 +728,8 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector initialize(metastore); PaimonSink paimonSink = new PaimonSink<>( - catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + catalogOptions, + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions)); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(new MockCommitterInitContext()); @@ -855,7 +860,8 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele initialize(metastore); PaimonSink paimonSink = new PaimonSink<>( - catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + catalogOptions, + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions)); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(new MockCommitterInitContext()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java index eac67d49c36..14005bb9a63 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterTest.java @@ -101,14 +101,12 @@ void setUp() throws Exception { when(mockCatalog.getTable(any(Identifier.class))).thenReturn(mockTable); when(mockCoreOptions.writeBufferSize()).thenReturn(32 * 1024 * 1024L); when(mockCoreOptions.pageSize()).thenReturn(32 * 1024); - when(mockCoreOptions.bucket()).thenReturn(1); when(mockTable.coreOptions()).thenReturn(mockCoreOptions); MetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup(); - PaimonRecordEventSerializer serializer = - new PaimonRecordEventSerializer(ZoneId.systemDefault()); - Options catalogOptions = new Options(); + PaimonRecordEventSerializer serializer = + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions); mockedConstruction = Mockito.mockConstruction( @@ -174,7 +172,7 @@ void testGetTableInvocationCount() throws Exception { null); } - verify(mockCatalog, times(1)).getTable(testIdentifier); + verify(mockCatalog, times(2)).getTable(testIdentifier); List addedColumns = new ArrayList<>(); addedColumns.add( @@ -183,7 +181,7 @@ void testGetTableInvocationCount() throws Exception { AddColumnEvent addColumnEvent = new AddColumnEvent(testTableId, addedColumns); writer.write(addColumnEvent, null); - verify(mockCatalog, times(2)).getTable(testIdentifier); + verify(mockCatalog, times(3)).getTable(testIdentifier); RowType rowTypeWithAge = RowType.of( @@ -207,7 +205,7 @@ void testGetTableInvocationCount() throws Exception { null); } - verify(mockCatalog, times(2)).getTable(testIdentifier); + verify(mockCatalog, times(3)).getTable(testIdentifier); Map typeMapping = new HashMap<>(); typeMapping.put("value", DataTypes.BIGINT()); @@ -215,6 +213,6 @@ void testGetTableInvocationCount() throws Exception { new AlterColumnTypeEvent(testTableId, typeMapping); writer.write(alterColumnTypeEvent, null); - verify(mockCatalog, times(3)).getTable(testIdentifier); + verify(mockCatalog, times(4)).getTable(testIdentifier); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java index 3621f4d7a8b..aa2c43c6538 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java @@ -101,8 +101,11 @@ void testBucketAssignOperatorUsesCurrentSubtaskForPostponeBucketTable() throws E @Test void testSerializerKeepsAssignedBucketBeforeWriterRewrite() throws Exception { Schema schema = createPostponeBucketSchema(); + Options catalogOptions = createCatalogOptions(); + new PaimonMetadataApplier(catalogOptions) + .applySchemaChange(new CreateTableEvent(TABLE_ID, schema)); PaimonRecordSerializer serializer = - new PaimonRecordEventSerializer(ZoneId.systemDefault()); + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions); serializer.serialize(new CreateTableEvent(TABLE_ID, schema)); int assignedBucket = 2; BucketWrapperChangeEvent bucketWrapperChangeEvent = @@ -147,7 +150,7 @@ void testPostponeBucketTableRewritesAssignedBucketToMinusTwoWhenWriting() throws catalogOptions, UnregisteredMetricsGroup.createSinkWriterMetricGroup(), "test-user", - new PaimonRecordEventSerializer(ZoneId.systemDefault()), + new PaimonRecordEventSerializer(ZoneId.systemDefault(), catalogOptions), 0); writer.write(new CreateTableEvent(TABLE_ID, schema), null); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory deleted file mode 100644 index 8335f29d37d..00000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.flink.cdc.connectors.paimon.sink.testsource.AppendOnlyTableDataSourceFactory diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java index bcb55094a42..2d5ffd5d775 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java @@ -61,7 +61,8 @@ public enum EventSetId { SINGLE_SPLIT_SINGLE_BATCH_TABLE, SINGLE_SPLIT_MULTI_BATCH_TABLE, MULTI_SPLITS_SINGLE_BATCH_TABLE, - TRANSFORM_BATCH_TABLE; + TRANSFORM_BATCH_TABLE, + APPEND_ONLY_BLOB_TABLE; public boolean isBatchEvent() { switch (this) { @@ -165,11 +166,78 @@ public static void setSourceEvents(EventSetId eventType) { sourceEvents = transformBatchTable(); break; } + case APPEND_ONLY_BLOB_TABLE: + { + sourceEvents = appendOnlyBlobTable(); + break; + } default: throw new IllegalArgumentException(eventType + " is not supported"); } } + public static List> appendOnlyBlobTable() { + List> eventOfSplits = new ArrayList<>(); + List split1 = new ArrayList<>(); + + TableId tableId = TableId.tableId("default_database", "table1"); + + // create table without primary key, with VARBINARY blob column + Schema schema = + Schema.newBuilder() + .physicalColumn("uuid", DataTypes.STRING().notNull()) + .physicalColumn("path", DataTypes.STRING()) + .physicalColumn("blobContent", DataTypes.VARBINARY(1000)) + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema); + split1.add(createTableEvent); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of( + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.VARBINARY(1000))); + + // insert 3 events with uuid, path and blobContent + split1.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + BinaryStringData.fromString( + "550e8400-e29b-41d4-a716-446655440000"), + BinaryStringData.fromString( + "oss://my-bucket/data/files/document_001.pdf"), + "PDF binary content for document 001".getBytes() + }))); + split1.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + BinaryStringData.fromString( + "6ba7b810-9dad-11d1-80b4-00c04fd430c8"), + BinaryStringData.fromString( + "oss://my-bucket/data/images/photo_002.jpg"), + "JPG binary content for photo 002".getBytes() + }))); + split1.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + BinaryStringData.fromString( + "f47ac10b-58cc-4372-a567-0e02b2c3d479"), + BinaryStringData.fromString( + "oss://my-bucket/data/videos/movie_003.mp4"), + "MP4 binary content for movie 003".getBytes() + }))); + + eventOfSplits.add(split1); + return eventOfSplits; + } + public static List> singleSplitSingleTable() { List> eventOfSplits = new ArrayList<>(); List split1 = new ArrayList<>();