Skip to content

Commit aabf306

Browse files
authored
[FLINK-36520] MySQL CDC Supports connecting to MySQL server 8.4+ (#3666)
1 parent a75e580 commit aabf306

10 files changed

Lines changed: 1137 additions & 110 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source;
19+
20+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
21+
import org.apache.flink.cdc.common.event.DataChangeEvent;
22+
import org.apache.flink.cdc.common.event.Event;
23+
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
24+
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
25+
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
26+
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
27+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
28+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
29+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
30+
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
31+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
32+
import org.apache.flink.streaming.util.RestartStrategyUtils;
33+
import org.apache.flink.util.CloseableIterator;
34+
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
import org.junit.jupiter.params.ParameterizedClass;
39+
import org.junit.jupiter.params.provider.EnumSource;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
import org.testcontainers.containers.output.Slf4jLogConsumer;
43+
import org.testcontainers.lifecycle.Startables;
44+
45+
import java.io.File;
46+
import java.nio.charset.StandardCharsets;
47+
import java.nio.file.Files;
48+
import java.nio.file.Path;
49+
import java.nio.file.Paths;
50+
import java.nio.file.StandardOpenOption;
51+
import java.sql.Connection;
52+
import java.sql.Statement;
53+
import java.util.ArrayList;
54+
import java.util.Collections;
55+
import java.util.List;
56+
import java.util.Objects;
57+
import java.util.stream.Stream;
58+
59+
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
60+
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
61+
import static org.assertj.core.api.Assertions.assertThat;
62+
63+
/**
64+
* Integration tests to check MySQL pipeline connector works well with different MySQL server
65+
* versions.
66+
*/
67+
@ParameterizedClass
68+
@EnumSource(
69+
value = MySqlVersion.class,
70+
names = {"V5_7", "V8_0", "V8_4"})
71+
class MySqlPipelineCompatibilityITCase {
72+
73+
private static final Logger LOG =
74+
LoggerFactory.getLogger(MySqlPipelineCompatibilityITCase.class);
75+
76+
private static Path tempFolder;
77+
private static File resourceFolder;
78+
79+
private final MySqlVersion version;
80+
private final MySqlContainer mySqlContainer;
81+
private final UniqueDatabase testDatabase;
82+
83+
private final StreamExecutionEnvironment env =
84+
StreamExecutionEnvironment.getExecutionEnvironment();
85+
86+
MySqlPipelineCompatibilityITCase(MySqlVersion version) {
87+
this.version = version;
88+
this.mySqlContainer =
89+
(MySqlContainer)
90+
new MySqlContainer(version)
91+
.withConfigurationOverride(buildCustomMySqlConfig(version))
92+
.withSetupSQL("docker/setup.sql")
93+
.withDatabaseName("flink-test")
94+
.withUsername("flinkuser")
95+
.withPassword("flinkpw")
96+
.withLogConsumer(new Slf4jLogConsumer(LOG));
97+
this.testDatabase =
98+
new UniqueDatabase(mySqlContainer, "inventory", TEST_USER, TEST_PASSWORD);
99+
}
100+
101+
@BeforeEach
102+
void setup() throws Exception {
103+
// Initialize static resources if needed
104+
if (resourceFolder == null) {
105+
resourceFolder =
106+
Paths.get(
107+
Objects.requireNonNull(
108+
MySqlPipelineCompatibilityITCase.class
109+
.getClassLoader()
110+
.getResource("."))
111+
.toURI())
112+
.toFile();
113+
tempFolder = Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
114+
}
115+
116+
env.setParallelism(4);
117+
env.enableCheckpointing(200);
118+
RestartStrategyUtils.configureNoRestartStrategy(env);
119+
120+
LOG.info("Starting container for MySQL {}...", version.getVersion());
121+
Startables.deepStart(Stream.of(mySqlContainer)).join();
122+
LOG.info("Container is started.");
123+
124+
testDatabase.createAndInitialize();
125+
}
126+
127+
@AfterEach
128+
void tearDown() {
129+
try {
130+
testDatabase.dropDatabase();
131+
} catch (IllegalStateException e) {
132+
LOG.warn("Failed to drop test database during teardown.", e);
133+
}
134+
if (mySqlContainer != null) {
135+
LOG.info("Stopping container for MySQL {}...", version.getVersion());
136+
mySqlContainer.stop();
137+
LOG.info("Container is stopped.");
138+
}
139+
}
140+
141+
@Test
142+
void testSnapshotRead() throws Exception {
143+
MySqlSourceConfigFactory configFactory =
144+
new MySqlSourceConfigFactory()
145+
.hostname(mySqlContainer.getHost())
146+
.port(mySqlContainer.getDatabasePort())
147+
.username(TEST_USER)
148+
.password(TEST_PASSWORD)
149+
.databaseList(testDatabase.getDatabaseName())
150+
.tableList(testDatabase.getDatabaseName() + ".products")
151+
.startupOptions(StartupOptions.initial())
152+
.serverId(getServerId(env.getParallelism()))
153+
.serverTimeZone("UTC");
154+
155+
FlinkSourceProvider sourceProvider =
156+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
157+
CloseableIterator<Event> events =
158+
env.fromSource(
159+
sourceProvider.getSource(),
160+
WatermarkStrategy.noWatermarks(),
161+
MySqlDataSourceFactory.IDENTIFIER,
162+
new EventTypeInfo())
163+
.executeAndCollect();
164+
165+
List<Event> snapshotEvents = fetchEvents(events, 9);
166+
167+
assertThat(snapshotEvents).hasSize(9);
168+
assertThat(snapshotEvents.stream().filter(e -> e instanceof DataChangeEvent)).hasSize(9);
169+
170+
events.close();
171+
}
172+
173+
@Test
174+
void testBinlogRead() throws Exception {
175+
MySqlSourceConfigFactory configFactory =
176+
new MySqlSourceConfigFactory()
177+
.hostname(mySqlContainer.getHost())
178+
.port(mySqlContainer.getDatabasePort())
179+
.username(TEST_USER)
180+
.password(TEST_PASSWORD)
181+
.databaseList(testDatabase.getDatabaseName())
182+
.tableList(testDatabase.getDatabaseName() + ".products")
183+
.startupOptions(StartupOptions.initial())
184+
.serverId(getServerId(env.getParallelism()))
185+
.serverTimeZone("UTC");
186+
187+
FlinkSourceProvider sourceProvider =
188+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
189+
CloseableIterator<Event> events =
190+
env.fromSource(
191+
sourceProvider.getSource(),
192+
WatermarkStrategy.noWatermarks(),
193+
MySqlDataSourceFactory.IDENTIFIER,
194+
new EventTypeInfo())
195+
.executeAndCollect();
196+
197+
fetchEvents(events, 9);
198+
199+
try (Connection connection = testDatabase.getJdbcConnection();
200+
Statement statement = connection.createStatement()) {
201+
statement.execute(
202+
String.format(
203+
"INSERT INTO `%s`.`products` VALUES (default,'test_product','desc',1.0);",
204+
testDatabase.getDatabaseName()));
205+
}
206+
207+
List<Event> binlogEvents = fetchEvents(events, 1);
208+
assertThat(binlogEvents).hasSize(1);
209+
assertThat(binlogEvents.get(0)).isInstanceOf(DataChangeEvent.class);
210+
211+
events.close();
212+
}
213+
214+
private String getServerId(int parallelism) {
215+
int serverId = (int) (Math.random() * 100) + 5400;
216+
return serverId + "-" + (serverId + parallelism);
217+
}
218+
219+
private List<Event> fetchEvents(CloseableIterator<Event> iterator, int count) {
220+
List<Event> events = new ArrayList<>();
221+
while (count > 0 && iterator.hasNext()) {
222+
Event event = iterator.next();
223+
if (event instanceof DataChangeEvent) {
224+
events.add(event);
225+
count--;
226+
}
227+
}
228+
return events;
229+
}
230+
231+
private String buildCustomMySqlConfig(MySqlVersion version) {
232+
try {
233+
if (resourceFolder == null) {
234+
resourceFolder =
235+
Paths.get(
236+
Objects.requireNonNull(
237+
MySqlPipelineCompatibilityITCase.class
238+
.getClassLoader()
239+
.getResource("."))
240+
.toURI())
241+
.toFile();
242+
tempFolder = Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
243+
}
244+
// Create version-specific directory to avoid conflicts
245+
Path versionDir =
246+
Files.createDirectories(
247+
Paths.get(
248+
tempFolder.toString(), version.getVersion().replace(".", "_")));
249+
Path cnf = Paths.get(versionDir.toString(), "my.cnf");
250+
// Check if file already exists to avoid FileAlreadyExistsException
251+
if (!Files.exists(cnf)) {
252+
Files.createFile(cnf);
253+
}
254+
StringBuilder mysqlConfBuilder = new StringBuilder();
255+
mysqlConfBuilder.append(
256+
"[mysqld]\n"
257+
+ "binlog_format = row\n"
258+
+ "log_bin = mysql-bin\n"
259+
+ "server-id = 223344\n"
260+
+ "binlog_row_image = FULL\n"
261+
+ "gtid-mode = OFF\n");
262+
263+
if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
264+
mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
265+
}
266+
267+
Files.write(
268+
cnf,
269+
Collections.singleton(mysqlConfBuilder.toString()),
270+
StandardCharsets.UTF_8,
271+
StandardOpenOption.TRUNCATE_EXISTING);
272+
return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
273+
} catch (Exception e) {
274+
throw new RuntimeException("Failed to create my.cnf file.", e);
275+
}
276+
}
277+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,12 @@ public void run(SourceContext<T> sourceContext) throws Exception {
448448
debeziumChangeFetcher.runFetchLoop();
449449
} catch (Throwable t) {
450450
if (t.getMessage() != null
451-
&& t.getMessage()
452-
.contains(
453-
"A slave with the same server_uuid/server_id as this slave has connected to the master")) {
451+
&& (t.getMessage()
452+
.contains(
453+
"A slave with the same server_uuid/server_id as this slave has connected to the master")
454+
|| t.getMessage()
455+
.contains(
456+
"A replica with the same server_uuid/server_id as this replica has connected to the source"))) {
454457
throw new RuntimeException(
455458
"The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n"
456459
+ "The server id conflict may happen in the following situations: \n"

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,32 @@
4040

4141
/**
4242
* Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new
43-
* parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will
44-
* be used to generate the jdbc url pattern, and may overwrite the default value.
43+
* parameter {@link MySqlConnectionConfiguration#MySqlConnectionConfiguration(Configuration config,
44+
* Properties jdbcProperties)} in the constructor of {@link MySqlConnectionConfiguration} will be
45+
* used to generate the jdbc url pattern, and may overwrite the default value.
4546
*
46-
* <p>Line 75: Add field {@code urlPattern} in {@link MySqlConnection} and remove old pattern.
47+
* <p>Add field {@link MySqlConnection#urlPattern} in {@link MySqlConnection} and remove old
48+
* pattern.
4749
*
48-
* <p>Line 92: Init {@code urlPattern} using the url pattern from {@link
50+
* <p>Added MySQL 8.4+ compatible probing fields {@link
51+
* MySqlConnection#MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT}, {@link
52+
* MySqlConnection#MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT}, and {@link
53+
* MySqlConnection#showBinaryLogStatement}.
54+
*
55+
* <p>Init {@link MySqlConnection#urlPattern} using the url pattern from {@link
4956
* MySqlConnectionConfiguration}.
5057
*
51-
* <p>Line 544: Generate the connection string by the new field {@code urlPattern}.
58+
* <p>Generate the connection string by the new field {@link MySqlConnection#urlPattern}.
5259
*
53-
* <p>Line 569 ~ 574: Add new constant and field {@code urlPattern} to {@link
60+
* <p>Add new constant and field {@link MySqlConnection#urlPattern} to {@link
5461
* MySqlConnectionConfiguration}.
5562
*
56-
* <p>Line 625: Init new field {@code urlPattern} in {@link MySqlConnectionConfiguration}.
63+
* <p>Init new field {@link MySqlConnection#urlPattern} in {@link MySqlConnectionConfiguration}.
64+
*
65+
* <p>Add utility methods helping to generate the url pattern and add default values.
5766
*
58-
* <p>Line 715 ~ 741: Add some methods helping to generate the url pattern and add default values.
67+
* <p>Added utility method {@link MySqlConnection#getShowBinaryLogStatement} and {@link
68+
* MySqlConnection#probeShowBinaryLogStatement} for MySQL 8.4 compatibility.
5969
*/
6070
public class MySqlConnection extends JdbcConnection {
6171

@@ -74,6 +84,10 @@ public class MySqlConnection extends JdbcConnection {
7484

7585
private final String urlPattern;
7686

87+
private static final String MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT = "SHOW MASTER STATUS";
88+
private static final String MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT = "SHOW BINARY LOG STATUS";
89+
private final String showBinaryLogStatement;
90+
7791
/**
7892
* Creates a new connection using the supplied configuration.
7993
*
@@ -90,6 +104,7 @@ public MySqlConnection(
90104
this.connectionConfig = connectionConfig;
91105
this.mysqlFieldReader = fieldReader;
92106
this.urlPattern = connectionConfig.getUrlPattern();
107+
this.showBinaryLogStatement = probeShowBinaryLogStatement();
93108
}
94109

95110
/**
@@ -275,7 +290,7 @@ public boolean isGtidModeEnabled() {
275290
public String knownGtidSet() {
276291
try {
277292
return queryAndMap(
278-
"SHOW MASTER STATUS",
293+
showBinaryLogStatement,
279294
rs -> {
280295
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
281296
return rs.getString(
@@ -769,4 +784,27 @@ public <T extends DatabaseSchema<TableId>> Object getColumnValue(
769784
public String quotedTableIdString(TableId tableId) {
770785
return tableId.toQuotedString('`');
771786
}
787+
788+
public String getShowBinaryLogStatement() {
789+
return showBinaryLogStatement;
790+
}
791+
792+
private String probeShowBinaryLogStatement() {
793+
LOGGER.info("Probing binary log statement.");
794+
try {
795+
// Attempt to query
796+
query(MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT, rs -> {});
797+
LOGGER.info(
798+
"Successfully found show binary log statement with `{}`.",
799+
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT);
800+
return MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT;
801+
} catch (SQLException e) {
802+
LOGGER.info(
803+
"Probing with {} failed, fallback to classic {}. Caused by: {}",
804+
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
805+
MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
806+
e.getMessage());
807+
return MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT;
808+
}
809+
}
772810
}

0 commit comments

Comments
 (0)