From 7a115b299266f5951e3302c1b0fadd0745a49fe4 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 19 May 2026 16:18:17 +0800 Subject: [PATCH] [FLINK-39721] Support flink 1.18 compatibility for cdc runtime. --- .../connectors/fluss/sink/v2/FlussSink.java | 9 ---- .../connector/sink2/SupportsCommitter.java | 33 ++++++++++++++ .../sink2/SupportsPreCommitTopology.java | 33 ++++++++++++++ .../sink2/SupportsPreWriteTopology.java | 38 ++++++++++++++++ .../cdc/source/RuntimeContextAdapter.java | 31 +++++++++++++ .../SingleThreadFetcherManagerAdapter.java | 39 ++++++++++++++++ ...hreadMultiplexSourceReaderBaseAdapter.java | 44 +++++++++++++++++++ .../cdc/source/RuntimeContextAdapter.java | 31 +++++++++++++ .../SingleThreadFetcherManagerAdapter.java | 40 +++++++++++++++++ ...hreadMultiplexSourceReaderBaseAdapter.java | 44 +++++++++++++++++++ .../schema/distributed/SchemaOperator.java | 3 +- .../schema/regular/SchemaOperator.java | 3 +- .../sink/DataSinkWriterOperator.java | 5 ++- .../DistributedPrePartitionOperator.java | 3 +- 14 files changed, 342 insertions(+), 14 deletions(-) create mode 100644 flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java create mode 100644 flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreCommitTopology.java create mode 100644 flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreWriteTopology.java create mode 100644 flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java create mode 100644 flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java create mode 100644 flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java create mode 100644 flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java create mode 100644 flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java create mode 100644 flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java index 0bd74f0601e..8cb4e09d22b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSink.java @@ -19,7 +19,6 @@ import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.fluss.config.Configuration; @@ -51,12 +50,4 @@ public SinkWriter createWriter(InitContext context) throws IOException { flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup())); return flinkSinkWriter; } - - @Override - public SinkWriter createWriter(WriterInitContext context) throws IOException { - FlussSinkWriter flinkSinkWriter = - new FlussSinkWriter<>(flussConfig, context.getMailboxExecutor(), serializer); - flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup())); - return flinkSinkWriter; - } } diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java new file mode 100644 index 00000000000..f66470aa979 --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public interface SupportsCommitter { + + Committer createCommitter(CommitterInitContext context) throws IOException; + + SimpleVersionedSerializer getCommittableSerializer(); +} diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreCommitTopology.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreCommitTopology.java new file mode 100644 index 00000000000..b49433805e1 --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreCommitTopology.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public interface SupportsPreCommitTopology { + DataStream> addPreCommitTopology( + DataStream> var1); + + SimpleVersionedSerializer getWriteResultSerializer(); +} diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreWriteTopology.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreWriteTopology.java new file mode 100644 index 00000000000..22fe55a73c4 --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/sink2/SupportsPreWriteTopology.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.Public; +import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +@Public +public interface SupportsPreWriteTopology extends WithPreWriteTopology { + /** + * Adds an arbitrary topology before the writer. The topology may be used to repartition the + * data. + * + * @param inputDataStream the stream of input records. + * @return the custom topology before {@link SinkWriter}. + */ + DataStream addPreWriteTopology(DataStream inputDataStream); +} diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java new file mode 100644 index 00000000000..8cec6e0ee7a --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.source; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public class RuntimeContextAdapter { + + public static int getIndexOfThisSubtask(RuntimeContext runtimeContext) { + return runtimeContext.getIndexOfThisSubtask(); + } +} diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java new file mode 100644 index 00000000000..c98bc8f10e6 --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.source; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +import java.util.function.Supplier; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public class SingleThreadFetcherManagerAdapter + extends SingleThreadFetcherManager { + public SingleThreadFetcherManagerAdapter( + FutureCompletingBlockingQueue> elementsQueue, + Supplier> splitReaderSupplier) { + super(elementsQueue, splitReaderSupplier); + } +} diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java new file mode 100644 index 00000000000..cf8f2e39e81 --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.source; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public abstract class SingleThreadMultiplexSourceReaderBaseAdapter< + E, T, SplitT extends SourceSplit, SplitStateT> + extends SingleThreadMultiplexSourceReaderBase { + public SingleThreadMultiplexSourceReaderBaseAdapter( + FutureCompletingBlockingQueue> elementsQueue, + SingleThreadFetcherManager splitFetcherManager, + RecordEmitter recordEmitter, + Configuration config, + SourceReaderContext context) { + super(elementsQueue, splitFetcherManager, recordEmitter, config, context); + } +} diff --git a/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java new file mode 100644 index 00000000000..ac7d7a27d62 --- /dev/null +++ b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/RuntimeContextAdapter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.source; + +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public class RuntimeContextAdapter { + + public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) { + return runtimeContext.getTaskInfo().getIndexOfThisSubtask(); + } +} diff --git a/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java new file mode 100644 index 00000000000..48cb57f8207 --- /dev/null +++ b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadFetcherManagerAdapter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.source; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +import java.util.function.Supplier; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public class SingleThreadFetcherManagerAdapter + extends SingleThreadFetcherManager { + + public SingleThreadFetcherManagerAdapter( + FutureCompletingBlockingQueue> elementsQueue, + Supplier> splitReaderSupplier) { + super(splitReaderSupplier); + } +} diff --git a/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java new file mode 100644 index 00000000000..eeef08c8a9c --- /dev/null +++ b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/cdc/source/SingleThreadMultiplexSourceReaderBaseAdapter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.source; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +/** + * Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer + * that allows Flink CDC to work across different Flink versions. + */ +public abstract class SingleThreadMultiplexSourceReaderBaseAdapter< + E, T, SplitT extends SourceSplit, SplitStateT> + extends SingleThreadMultiplexSourceReaderBase { + public SingleThreadMultiplexSourceReaderBaseAdapter( + FutureCompletingBlockingQueue> elementsQueue, + SingleThreadFetcherManager splitFetcherManager, + RecordEmitter recordEmitter, + Configuration config, + SourceReaderContext context) { + super(splitFetcherManager, recordEmitter, config, context); + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java index 114629e2dba..048ab5bdfdc 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java @@ -37,6 +37,7 @@ import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse; import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent; +import org.apache.flink.cdc.source.RuntimeContextAdapter; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -101,7 +102,7 @@ public SchemaOperator( @Override public void open() throws Exception { super.open(); - subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + subTaskId = RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext()); upstreamSchemaTable = HashBasedTable.create(); evolvedSchemaMap = new HashMap<>(); tableIdRouter = new TableIdRouter(routingRules, routeMode); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index b659b7234f7..abc47068c2a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -36,6 +36,7 @@ import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; +import org.apache.flink.cdc.source.RuntimeContextAdapter; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -136,7 +137,7 @@ public void open() throws Exception { this.schemaOperatorMetrics = new SchemaOperatorMetrics( getRuntimeContext().getMetricGroup(), schemaChangeBehavior); - this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.subTaskId = RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext()); this.originalSchemaMap = new HashMap<>(); this.evolvedSchemaMap = new HashMap<>(); this.router = new TableIdRouter(routingRules, routeMode); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index bac2929cec8..be3d0789d15 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter; import org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException; +import org.apache.flink.cdc.source.RuntimeContextAdapter; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -127,7 +128,7 @@ public void open() throws Exception { @Override public void initializeState(StateInitializationContext context) throws Exception { schemaEvolutionClient.registerSubtask( - getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()); + RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext())); this.>>getFlinkWriterOperator() .initializeState(context); } @@ -227,7 +228,7 @@ private void handleFlushEvent(FlushEvent event) throws Exception { }); } schemaEvolutionClient.notifyFlushSuccess( - getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), + RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext()), event.getSourceSubTaskId()); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java index 4750c685a95..9231abf1a8c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter; import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; +import org.apache.flink.cdc.source.RuntimeContextAdapter; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -64,7 +65,7 @@ public DistributedPrePartitionOperator( @Override public void open() throws Exception { super.open(); - subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + subTaskId = RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext()); schemaMap = new HashMap<>(); hashFunctionMap = new HashMap<>(); }