Skip to content

Commit 4f05b0c

Browse files
committed
Add compatibility tests
1 parent c29c6d4 commit 4f05b0c

4 files changed

Lines changed: 353 additions & 75 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source;
19+
20+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
21+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
22+
import org.apache.flink.cdc.common.event.DataChangeEvent;
23+
import org.apache.flink.cdc.common.event.Event;
24+
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
25+
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
26+
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
27+
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
28+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
29+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
30+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
31+
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
32+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33+
import org.apache.flink.util.CloseableIterator;
34+
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
import org.junit.jupiter.params.ParameterizedClass;
39+
import org.junit.jupiter.params.provider.EnumSource;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
import org.testcontainers.containers.output.Slf4jLogConsumer;
43+
import org.testcontainers.lifecycle.Startables;
44+
45+
import java.io.File;
46+
import java.nio.charset.StandardCharsets;
47+
import java.nio.file.Files;
48+
import java.nio.file.Path;
49+
import java.nio.file.Paths;
50+
import java.nio.file.StandardOpenOption;
51+
import java.sql.Connection;
52+
import java.sql.Statement;
53+
import java.util.ArrayList;
54+
import java.util.Collections;
55+
import java.util.List;
56+
import java.util.Objects;
57+
import java.util.stream.Stream;
58+
59+
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
60+
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
61+
import static org.assertj.core.api.Assertions.assertThat;
62+
63+
/**
64+
* Integration tests to check MySQL pipeline connector works well with different MySQL server
65+
* versions.
66+
*/
67+
@ParameterizedClass
68+
@EnumSource(
69+
value = MySqlVersion.class,
70+
names = {"V5_7", "V8_0", "V8_4"})
71+
class MySqlPipelineCompatibilityITCase {
72+
73+
private static final Logger LOG =
74+
LoggerFactory.getLogger(MySqlPipelineCompatibilityITCase.class);
75+
76+
private static Path tempFolder;
77+
private static File resourceFolder;
78+
79+
private final MySqlVersion version;
80+
private final MySqlContainer mySqlContainer;
81+
private final UniqueDatabase testDatabase;
82+
83+
private final StreamExecutionEnvironment env =
84+
StreamExecutionEnvironment.getExecutionEnvironment();
85+
86+
MySqlPipelineCompatibilityITCase(MySqlVersion version) {
87+
this.version = version;
88+
this.mySqlContainer =
89+
(MySqlContainer)
90+
new MySqlContainer(version)
91+
.withConfigurationOverride(buildCustomMySqlConfig(version))
92+
.withSetupSQL("docker/setup.sql")
93+
.withDatabaseName("flink-test")
94+
.withUsername("flinkuser")
95+
.withPassword("flinkpw")
96+
.withLogConsumer(new Slf4jLogConsumer(LOG));
97+
this.testDatabase =
98+
new UniqueDatabase(mySqlContainer, "inventory", TEST_USER, TEST_PASSWORD);
99+
}
100+
101+
@BeforeEach
102+
void setup() throws Exception {
103+
// Initialize static resources if needed
104+
if (resourceFolder == null) {
105+
resourceFolder =
106+
Paths.get(
107+
Objects.requireNonNull(
108+
MySqlPipelineCompatibilityITCase.class
109+
.getClassLoader()
110+
.getResource("."))
111+
.toURI())
112+
.toFile();
113+
tempFolder = Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
114+
}
115+
116+
env.setParallelism(4);
117+
env.enableCheckpointing(200);
118+
env.setRestartStrategy(RestartStrategies.noRestart());
119+
120+
LOG.info("Starting container for MySQL {}...", version.getVersion());
121+
Startables.deepStart(Stream.of(mySqlContainer)).join();
122+
LOG.info("Container is started.");
123+
124+
testDatabase.createAndInitialize();
125+
}
126+
127+
@AfterEach
128+
void tearDown() {
129+
testDatabase.dropDatabase();
130+
if (mySqlContainer != null) {
131+
LOG.info("Stopping container for MySQL {}...", version.getVersion());
132+
mySqlContainer.stop();
133+
LOG.info("Container is stopped.");
134+
}
135+
}
136+
137+
@Test
138+
void testSnapshotRead() throws Exception {
139+
MySqlSourceConfigFactory configFactory =
140+
new MySqlSourceConfigFactory()
141+
.hostname(mySqlContainer.getHost())
142+
.port(mySqlContainer.getDatabasePort())
143+
.username(TEST_USER)
144+
.password(TEST_PASSWORD)
145+
.databaseList(testDatabase.getDatabaseName())
146+
.tableList(testDatabase.getDatabaseName() + ".products")
147+
.startupOptions(StartupOptions.initial())
148+
.serverId(getServerId(env.getParallelism()))
149+
.serverTimeZone("UTC");
150+
151+
FlinkSourceProvider sourceProvider =
152+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
153+
CloseableIterator<Event> events =
154+
env.fromSource(
155+
sourceProvider.getSource(),
156+
WatermarkStrategy.noWatermarks(),
157+
MySqlDataSourceFactory.IDENTIFIER,
158+
new EventTypeInfo())
159+
.executeAndCollect();
160+
161+
List<Event> snapshotEvents = fetchEvents(events, 9);
162+
163+
assertThat(snapshotEvents).hasSize(9);
164+
assertThat(snapshotEvents.stream().filter(e -> e instanceof DataChangeEvent)).hasSize(9);
165+
166+
events.close();
167+
}
168+
169+
@Test
170+
void testBinlogRead() throws Exception {
171+
MySqlSourceConfigFactory configFactory =
172+
new MySqlSourceConfigFactory()
173+
.hostname(mySqlContainer.getHost())
174+
.port(mySqlContainer.getDatabasePort())
175+
.username(TEST_USER)
176+
.password(TEST_PASSWORD)
177+
.databaseList(testDatabase.getDatabaseName())
178+
.tableList(testDatabase.getDatabaseName() + ".products")
179+
.startupOptions(StartupOptions.initial())
180+
.serverId(getServerId(env.getParallelism()))
181+
.serverTimeZone("UTC");
182+
183+
FlinkSourceProvider sourceProvider =
184+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
185+
CloseableIterator<Event> events =
186+
env.fromSource(
187+
sourceProvider.getSource(),
188+
WatermarkStrategy.noWatermarks(),
189+
MySqlDataSourceFactory.IDENTIFIER,
190+
new EventTypeInfo())
191+
.executeAndCollect();
192+
193+
fetchEvents(events, 9);
194+
195+
try (Connection connection = testDatabase.getJdbcConnection();
196+
Statement statement = connection.createStatement()) {
197+
statement.execute(
198+
String.format(
199+
"INSERT INTO `%s`.`products` VALUES (default,'test_product','desc',1.0);",
200+
testDatabase.getDatabaseName()));
201+
}
202+
203+
List<Event> binlogEvents = fetchEvents(events, 1);
204+
assertThat(binlogEvents).hasSize(1);
205+
assertThat(binlogEvents.get(0)).isInstanceOf(DataChangeEvent.class);
206+
207+
events.close();
208+
}
209+
210+
private String getServerId(int parallelism) {
211+
int serverId = (int) (Math.random() * 100) + 5400;
212+
return serverId + "-" + (serverId + parallelism);
213+
}
214+
215+
private List<Event> fetchEvents(CloseableIterator<Event> iterator, int count) {
216+
List<Event> events = new ArrayList<>();
217+
while (count > 0 && iterator.hasNext()) {
218+
Event event = iterator.next();
219+
if (event instanceof DataChangeEvent) {
220+
events.add(event);
221+
count--;
222+
}
223+
}
224+
return events;
225+
}
226+
227+
private String buildCustomMySqlConfig(MySqlVersion version) {
228+
try {
229+
if (resourceFolder == null) {
230+
resourceFolder =
231+
Paths.get(
232+
Objects.requireNonNull(
233+
MySqlPipelineCompatibilityITCase.class
234+
.getClassLoader()
235+
.getResource("."))
236+
.toURI())
237+
.toFile();
238+
tempFolder = Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
239+
}
240+
// Create version-specific directory to avoid conflicts
241+
Path versionDir =
242+
Files.createDirectories(
243+
Paths.get(
244+
tempFolder.toString(), version.getVersion().replace(".", "_")));
245+
Path cnf = Paths.get(versionDir.toString(), "my.cnf");
246+
// Check if file already exists to avoid FileAlreadyExistsException
247+
if (!Files.exists(cnf)) {
248+
Files.createFile(cnf);
249+
}
250+
StringBuilder mysqlConfBuilder = new StringBuilder();
251+
mysqlConfBuilder.append(
252+
"[mysqld]\n"
253+
+ "binlog_format = row\n"
254+
+ "log_bin = mysql-bin\n"
255+
+ "server-id = 223344\n"
256+
+ "binlog_row_image = FULL\n"
257+
+ "gtid-mode = OFF\n");
258+
259+
if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
260+
mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
261+
}
262+
263+
Files.write(
264+
cnf,
265+
Collections.singleton(mysqlConfBuilder.toString()),
266+
StandardCharsets.UTF_8,
267+
StandardOpenOption.TRUNCATE_EXISTING);
268+
return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
269+
} catch (Exception e) {
270+
throw new RuntimeException("Failed to create my.cnf file.", e);
271+
}
272+
}
273+
}

0 commit comments

Comments
 (0)