Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;

import org.apache.fluss.config.Configuration;
Expand Down Expand Up @@ -51,12 +50,4 @@ public SinkWriter<InputT> createWriter(InitContext context) throws IOException {
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
return flinkSinkWriter;
}

@Override
public SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
FlussSinkWriter<InputT> flinkSinkWriter =
new FlussSinkWriter<>(flussConfig, context.getMailboxExecutor(), serializer);
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
return flinkSinkWriter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public interface SupportsCommitter<CommittableT> {

Committer<CommittableT> createCommitter(CommitterInitContext context) throws IOException;

SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.datastream.DataStream;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public interface SupportsPreCommitTopology<WriterResultT, CommittableT> {
DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
DataStream<CommittableMessage<WriterResultT>> var1);

SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.annotation.Public;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
@Public
public interface SupportsPreWriteTopology<InputT> extends WithPreWriteTopology<InputT> {
/**
* Adds an arbitrary topology before the writer. The topology may be used to repartition the
* data.
*
* @param inputDataStream the stream of input records.
* @return the custom topology before {@link SinkWriter}.
*/
DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.source;

import org.apache.flink.api.common.functions.RuntimeContext;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public class RuntimeContextAdapter {

public static int getIndexOfThisSubtask(RuntimeContext runtimeContext) {
return runtimeContext.getIndexOfThisSubtask();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.source;

import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

import java.util.function.Supplier;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
extends SingleThreadFetcherManager<E, SplitT> {
public SingleThreadFetcherManagerAdapter(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
super(elementsQueue, splitReaderSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.source;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
E, T, SplitT extends SourceSplit, SplitStateT>
extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT> {
public SingleThreadMultiplexSourceReaderBaseAdapter(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.source;

import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public class RuntimeContextAdapter {

public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
return runtimeContext.getTaskInfo().getIndexOfThisSubtask();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.source;

import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

import java.util.function.Supplier;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
extends SingleThreadFetcherManager<E, SplitT> {

public SingleThreadFetcherManagerAdapter(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
super(splitReaderSupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.source;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;

/**
* Compatibility adapter for Flink 1.18. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
E, T, SplitT extends SourceSplit, SplitStateT>
extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT> {
public SingleThreadMultiplexSourceReaderBaseAdapter(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(splitFetcherManager, recordEmitter, config, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.source.RuntimeContextAdapter;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
Expand Down Expand Up @@ -101,7 +102,7 @@ public SchemaOperator(
@Override
public void open() throws Exception {
super.open();
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
subTaskId = RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext());
upstreamSchemaTable = HashBasedTable.create();
evolvedSchemaMap = new HashMap<>();
tableIdRouter = new TableIdRouter(routingRules, routeMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.cdc.source.RuntimeContextAdapter;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
Expand Down Expand Up @@ -136,7 +137,7 @@ public void open() throws Exception {
this.schemaOperatorMetrics =
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.subTaskId = RuntimeContextAdapter.getIndexOfThisSubtask(getRuntimeContext());
this.originalSchemaMap = new HashMap<>();
this.evolvedSchemaMap = new HashMap<>();
this.router = new TableIdRouter(routingRules, routeMode);
Expand Down
Loading
Loading