Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down Expand Up @@ -159,6 +166,27 @@ limitations under the License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- hive dependency -->

<dependency>
Expand Down Expand Up @@ -198,6 +226,10 @@ limitations under the License.
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -253,6 +285,18 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public DataSink createDataSink(Context context) {
}
}
}
PaimonRecordSerializer<Event> serializer = new PaimonRecordEventSerializer(zoneId);
PaimonRecordSerializer<Event> serializer = new PaimonRecordEventSerializer(zoneId, options);
String schemaOperatorUid =
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,24 @@
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;

import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,6 +57,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.flink.cdc.common.types.DataTypeFamily.BINARY_STRING;
import static org.apache.flink.cdc.common.types.DataTypeFamily.CHARACTER_STRING;
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

Expand Down Expand Up @@ -149,11 +156,11 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti
new org.apache.paimon.schema.Schema.Builder();
schema.getColumns()
.forEach(
(column) ->
builder.column(
column.getName(),
TypeUtils.toPaimonDataType(column.getType()),
column.getComment()));
(column) -> {
org.apache.paimon.types.DataType dataType =
convertToBlobIfNeeded(column, tableOptions);
builder.column(column.getName(), dataType, column.getComment());
});
List<String> partitionKeys = new ArrayList<>();
List<String> primaryKeys = schema.primaryKeys();
if (partitionMaps.containsKey(event.tableId())) {
Expand Down Expand Up @@ -205,10 +212,12 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName())));
columnWithPosition.getAddColumn().getName()),
tableOptions));
break;
case LAST:
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition));
tableChangeList.addAll(
SchemaChangeProvider.add(columnWithPosition, tableOptions));
break;
case BEFORE:
tableChangeList.addAll(
Expand All @@ -225,7 +234,8 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after));
tableChangeList.addAll(
SchemaChangeProvider.add(columnWithPosition, after, tableOptions));
break;
default:
throw new SchemaEvolveException(
Expand Down Expand Up @@ -253,7 +263,8 @@ private List<SchemaChange> applyAddColumnWithBeforePosition(
columnWithPosition,
(index == 0)
? SchemaChange.Move.first(columnName)
: SchemaChange.Move.after(columnName, columnNames.get(index - 1)));
: SchemaChange.Move.after(columnName, columnNames.get(index - 1)),
tableOptions);
}

private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
Expand Down Expand Up @@ -303,13 +314,22 @@ private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveExcep

private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException {
try {
FileStoreTable table =
(FileStoreTable)
catalog.getTable(
new Identifier(
event.tableId().getSchemaName(),
event.tableId().getTableName()));
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getTypeMapping()
.forEach(
(oldName, newType) ->
tableChangeList.add(
SchemaChangeProvider.updateColumnType(
oldName, newType)));
(columnName, newType) -> {
// Modifying the primary key data type may lead to exceptions in
// read/write/merge operations.
SchemaChangeProvider.updateColumnType(
table.schema(), columnName, newType, tableOptions)
.ifPresent(tableChangeList::add);
});
event.getComments()
.forEach(
(name, comment) -> {
Expand Down Expand Up @@ -359,4 +379,33 @@ private void applyAlterTableComment(AlterTableCommentEvent event) throws SchemaE
private static Identifier tableIdToIdentifier(SchemaChangeEvent event) {
return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName());
}

/**
* Convert CDC VARBINARY/BINARY/CHAR/VARCHAR/STRING type to Paimon BLOB type if configured.
*
* @param column The CDC column definition.
* @param tableOptions The table options containing blob-field configuration.
* @return The Paimon DataType (BLOB if configured, otherwise original converted type).
*/
private org.apache.paimon.types.DataType convertToBlobIfNeeded(
Column column, Map<String, String> tableOptions) {
org.apache.paimon.types.DataType dataType = TypeUtils.toPaimonDataType(column.getType());

// Check if this field should be converted to BLOB type using Paimon's CoreOptions
List<String> blobFields = CoreOptions.blobField(tableOptions);
if (!blobFields.isEmpty() && isSupportedTypeForBlob(column.getType())) {
if (blobFields.contains(column.getName())) {
// Convert VARBINARY/BINARY/VARCHAR/STRING to BLOB type
// BLOB type is always nullable in Paimon
return DataTypes.BLOB();
}
}

return dataType;
}

/** Check if DataType can be converted to BLOB (BINARY, VARBINARY, CHAR or VARCHAR). */
private boolean isSupportedTypeForBlob(DataType dataType) {
return dataType.isAnyOf(BINARY_STRING, CHARACTER_STRING);
}
}
Loading
Loading