Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ To use this feature through `flink run`, run the following shell command.
[--primary_keys <primary-keys>] \
[--type_mapping to-string] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column <metadata-column> [--metadata_column ...]] \
[--metadata_column_prefix <metadata-column-prefix>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Expand Down Expand Up @@ -215,6 +217,8 @@ To use this feature through `flink run`, run the following shell command.
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column <metadata-column> [--metadata_column ...]] \
[--metadata_column_prefix <metadata-column-prefix>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Expand Down
8 changes: 8 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. NOTICE: It returns null if the referenced column does not exist in the source table.</td>
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
</tr>
<tr>
<td><h5>--metadata_column_prefix</h5></td>
<td>--metadata_column_prefix is optionally used to set a prefix for metadata columns in the Paimon table to avoid conflicts with existing attributes. For example, with prefix "__kafka_", the metadata column "topic" will be stored as "__kafka_topic" field.</td>
</tr>
<tr>
<td><h5>--eager_init</h5></td>
<td>It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot.</td>
Expand Down
10 changes: 9 additions & 1 deletion docs/layouts/shortcodes/generated/kafka_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td>
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
</tr>
<tr>
<td><h5>--metadata_column_prefix</h5></td>
<td>--metadata_column_prefix is optionally used to set a prefix for metadata columns in the Paimon table to avoid conflicts with existing attributes. For example, with prefix "__kafka_", the metadata column "topic" will be stored as "__kafka_topic" field.</td>
</tr>
<tr>
<td><h5>--kafka_conf</h5></td>
<td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
Expand All @@ -83,4 +91,4 @@
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class CdcActionCommonUtils {
public static final String PRIMARY_KEYS = "primary_keys";
public static final String COMPUTED_COLUMN = "computed_column";
public static final String METADATA_COLUMN = "metadata_column";
public static final String METADATA_COLUMN_PREFIX = "metadata_column_prefix";
public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys";
public static final String EAGER_INIT = "eager_init";
public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@
* A functional interface for converting CDC metadata.
*
* <p>This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given
* {@link JsonNode} source. Implementations of this interface can be used to process and transform
* metadata entries from CDC sources.
* {@link JsonNode} source or {@link CdcSourceRecord}. Implementations of this interface can be used
* to process and transform metadata entries from CDC sources.
*/
public interface CdcMetadataConverter extends Serializable {

String read(JsonNode payload);

/**
* Read metadata from a CDC source record. Default implementation throws
* UnsupportedOperationException to maintain backward compatibility.
*
* @param record the CDC source record
* @return the metadata value as a string
*/
default String read(CdcSourceRecord record) {
throw new UnsupportedOperationException(
"This metadata converter does not support reading from CdcSourceRecord");
}

DataType dataType();

String columnName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import java.util.Arrays;
Expand Down Expand Up @@ -49,7 +51,14 @@ public enum CdcMetadataProcessor {
new CdcMetadataConverter.DatabaseNameConverter(),
new CdcMetadataConverter.TableNameConverter(),
new CdcMetadataConverter.SchemaNameConverter(),
new CdcMetadataConverter.OpTsConverter());
new CdcMetadataConverter.OpTsConverter()),
KAFKA_METADATA_PROCESSOR(
SyncJobHandler.SourceType.KAFKA,
new KafkaMetadataConverter.TopicConverter(),
new KafkaMetadataConverter.PartitionConverter(),
new KafkaMetadataConverter.OffsetConverter(),
new KafkaMetadataConverter.TimestampConverter(),
new KafkaMetadataConverter.TimestampTypeConverter());

private final SyncJobHandler.SourceType sourceType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/** A data change record from the CDC source. */
Expand All @@ -35,14 +38,29 @@ public class CdcSourceRecord implements Serializable {
// TODO Use generics to support more scenarios.
private final Object value;

// Generic metadata map - any source can add metadata
private final Map<String, Object> metadata;

public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
this.topic = topic;
this.key = key;
this.value = value;
this(topic, key, value, null);
}

public CdcSourceRecord(Object value) {
this(null, null, value);
this(null, null, value, null);
}

public CdcSourceRecord(
@Nullable String topic,
@Nullable Object key,
Object value,
@Nullable Map<String, Object> metadata) {
this.topic = topic;
this.key = key;
this.value = value;
this.metadata =
metadata != null
? Collections.unmodifiableMap(new HashMap<>(metadata))
: Collections.emptyMap();
}

@Nullable
Expand All @@ -59,6 +77,15 @@ public Object getValue() {
return value;
}

public Map<String, Object> getMetadata() {
return metadata;
}

@Nullable
public Object getMetadata(String key) {
return metadata.get(key);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CdcSourceRecord)) {
Expand All @@ -68,12 +95,13 @@ public boolean equals(Object o) {
CdcSourceRecord that = (CdcSourceRecord) o;
return Objects.equals(topic, that.topic)
&& Objects.equals(key, that.key)
&& Objects.equals(value, that.value);
&& Objects.equals(value, that.value)
&& Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(topic, key, value);
return Objects.hash(topic, key, value, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.types.DataType;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

/**
* Wraps a {@link CdcMetadataConverter} to add a prefix to its column name.
*
* <p>This decorator allows adding prefixes like "__kafka_" to metadata column names to avoid
* collisions with source data columns, while keeping the underlying converter logic unchanged.
*/
public class PrefixedMetadataConverter implements CdcMetadataConverter {

private static final long serialVersionUID = 1L;

private final CdcMetadataConverter delegate;
private final String prefix;

public PrefixedMetadataConverter(CdcMetadataConverter delegate, String prefix) {
this.delegate = delegate;
this.prefix = prefix != null ? prefix : "";
}

@Override
public String columnName() {
return prefix + delegate.columnName();
}

@Override
public String read(JsonNode payload) {
return delegate.read(payload);
}

@Override
public String read(CdcSourceRecord record) {
return delegate.read(record);
}

@Override
public DataType dataType() {
return delegate.dataType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
return new PostgresRecordParser(
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
case KAFKA:
return provideDataFormat()
.createParser(typeMapping, computedColumns, metadataConverters);
case PULSAR:
DataFormat dataFormat = provideDataFormat();
return dataFormat.createParser(typeMapping, computedColumns);
return provideDataFormat().createParser(typeMapping, computedColumns);
case MONGODB:
return new MongoDBRecordParser(computedColumns, cdcSourceConfig);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
Expand Down Expand Up @@ -64,6 +65,11 @@ protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBa
}

if (params.has(METADATA_COLUMN)) {
// Parse optional prefix first
if (params.has(METADATA_COLUMN_PREFIX)) {
action.withMetadataColumnPrefix(params.get(METADATA_COLUMN_PREFIX));
}

List<String> metadataColumns =
new ArrayList<>(params.getMultiParameter(METADATA_COLUMN));
if (metadataColumns.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class SynchronizationActionBase extends ActionBase {
// in paimon schema if pkeys are not specified in action command
protected boolean syncPKeysFromSourceSchema = true;
protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {};
protected String metadataColumnPrefix = "";

public SynchronizationActionBase(
String database,
Expand Down Expand Up @@ -101,10 +102,21 @@ public SynchronizationActionBase withMetadataColumns(List<String> metadataColumn
this.metadataConverters =
metadataColumns.stream()
.map(this.syncJobHandler::provideMetadataConverter)
.map(
converter ->
metadataColumnPrefix.isEmpty()
? converter
: new PrefixedMetadataConverter(
converter, metadataColumnPrefix))
.toArray(CdcMetadataConverter[]::new);
return this;
}

public SynchronizationActionBase withMetadataColumnPrefix(String prefix) {
this.metadataColumnPrefix = prefix != null ? prefix : "";
return this;
}

public SynchronizationActionBase syncPKeysFromSourceSchema(boolean flag) {
this.syncPKeysFromSourceSchema = flag;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
Expand Down Expand Up @@ -49,6 +50,16 @@ public AbstractRecordParser createParser(
return parser().createParser(typeMapping, computedColumns);
}

@Override
public AbstractRecordParser createParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters) {
// Most parsers don't support metadata converters, so we default to the 2-parameter version
// Only specific parsers like DebeziumAvroRecordParser will override this
return createParser(typeMapping, computedColumns);
}

@Override
public KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public AbstractJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> co
}

protected void setRoot(CdcSourceRecord record) {
super.setRoot(record); // Store current record for metadata access
root = (JsonNode) record.getValue();
}

Expand Down Expand Up @@ -104,6 +105,8 @@ protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder
return Objects.toString(entry.getValue());
}));
evalComputedColumns(rowData, schemaBuilder);
evalMetadataColumns(rowData, schemaBuilder);

return rowData;
}

Expand Down
Loading
Loading