Skip to content

Commit d957e4f

Browse files
committed
[FLINK-36590] CDC Supports MySQL 8.x and above
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
1 parent f24399c commit d957e4f

32 files changed

Lines changed: 895 additions & 578 deletions

File tree

.github/workflows/flink_cdc.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
strategy:
6262
matrix:
6363
java-version: [ '8' ]
64-
module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 'vitess' ]
64+
module: [ 'core', 'pipeline_connectors', 'mysql57', 'mysql80', 'mysql84', 'mysql90', 'postgres', 'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 'vitess' ]
6565
name: Unit Tests
6666
uses: ./.github/workflows/flink_cdc_base.yml
6767
with:

.github/workflows/flink_cdc_base.yml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,16 @@ jobs:
147147
("pipeline_connectors")
148148
modules=${{ env.MODULES_PIPELINE_CONNECTORS }}
149149
;;
150-
("mysql")
150+
("mysql57")
151+
modules=${{ env.MODULES_MYSQL }}
152+
;;
153+
("mysql80")
154+
modules=${{ env.MODULES_MYSQL }}
155+
;;
156+
("mysql84")
157+
modules=${{ env.MODULES_MYSQL }}
158+
;;
159+
("mysql90")
151160
modules=${{ env.MODULES_MYSQL }}
152161
;;
153162
("postgres")
@@ -199,6 +208,16 @@ jobs:
199208
build_maven_parameter="-DspecifiedMongoVersion=7.0.12"
200209
fi
201210
211+
if [ ${{ inputs.module }} == "mysql57" ]; then
212+
build_maven_parameter="-DspecifiedMySqlVersion=5.7"
213+
elif [ ${{ inputs.module }} == "mysql80" ]; then
214+
build_maven_parameter="-DspecifiedMySqlVersion=8.0"
215+
elif [ ${{ inputs.module }} == "mysql84" ]; then
216+
build_maven_parameter="-DspecifiedMySqlVersion=8.4"
217+
elif [ ${{ inputs.module }} == "mysql90" ]; then
218+
build_maven_parameter="-DspecifiedMySqlVersion=9.0"
219+
fi
220+
202221
if [ ! -z "${{ inputs.flink-version }}" ]; then
203222
build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }-DspecifiedFlinkVersion=${{ inputs.flink-version }}"
204223
fi

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java

Lines changed: 11 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -61,36 +61,30 @@
6161
/** IT case for MySQL event source. */
6262
public class MySqlFullTypesITCase extends MySqlSourceTestBase {
6363

64-
private static final MySqlContainer MYSQL8_CONTAINER =
65-
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
64+
private static final MySqlContainer MYSQL_CONTAINER =
65+
createMySqlContainer(MySqlVersion.AD_HOC, "docker/server-gtids/expire-seconds/my.cnf");
6666

67-
private final UniqueDatabase fullTypesMySql57Database =
67+
private final UniqueDatabase fullTypesMySqlDatabase =
6868
new UniqueDatabase(
6969
MYSQL_CONTAINER,
7070
"column_type_test",
7171
MySqSourceTestUtils.TEST_USER,
7272
MySqSourceTestUtils.TEST_PASSWORD);
73-
private final UniqueDatabase fullTypesMySql8Database =
74-
new UniqueDatabase(
75-
MYSQL8_CONTAINER,
76-
"column_type_test_mysql8",
77-
MySqSourceTestUtils.TEST_USER,
78-
MySqSourceTestUtils.TEST_PASSWORD);
7973

8074
private final StreamExecutionEnvironment env =
8175
StreamExecutionEnvironment.getExecutionEnvironment();
8276

8377
@BeforeClass
8478
public static void beforeClass() {
8579
LOG.info("Starting MySql8 containers...");
86-
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
80+
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
8781
LOG.info("Container MySql8 is started.");
8882
}
8983

9084
@AfterClass
9185
public static void afterClass() {
9286
LOG.info("Stopping MySql8 containers...");
93-
MYSQL8_CONTAINER.stop();
87+
MYSQL_CONTAINER.stop();
9488
LOG.info("Container MySql8 is stopped.");
9589
}
9690

@@ -102,70 +96,12 @@ public void before() {
10296
}
10397

10498
@Test
105-
public void testMysql57CommonDataTypes() throws Throwable {
106-
testCommonDataTypes(fullTypesMySql57Database);
107-
}
108-
109-
@Test
110-
public void testMySql8CommonDataTypes() throws Throwable {
111-
testCommonDataTypes(fullTypesMySql8Database);
112-
}
113-
114-
@Test
115-
public void testMysql57TimeDataTypes() throws Throwable {
116-
RowType recordType =
117-
RowType.of(
118-
DataTypes.DECIMAL(20, 0).notNull(),
119-
DataTypes.INT(),
120-
DataTypes.DATE(),
121-
DataTypes.TIME(0),
122-
DataTypes.TIME(3),
123-
DataTypes.TIME(6),
124-
DataTypes.TIMESTAMP(0),
125-
DataTypes.TIMESTAMP(3),
126-
DataTypes.TIMESTAMP(6),
127-
DataTypes.TIMESTAMP_LTZ(0),
128-
DataTypes.TIMESTAMP_LTZ(0));
129-
130-
Object[] expectedSnapshot =
131-
new Object[] {
132-
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
133-
2021,
134-
18460,
135-
64822000,
136-
64822123,
137-
// TIME(6) will lose precision for microseconds.
138-
// Because Flink's BinaryWriter force write int value for TIME(6).
139-
// See BinaryWriter#write for detail.
140-
64822123,
141-
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
142-
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
143-
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
144-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
145-
null
146-
};
147-
148-
Object[] expectedStreamRecord =
149-
new Object[] {
150-
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
151-
2021,
152-
18460,
153-
64822000,
154-
64822123,
155-
null,
156-
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
157-
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
158-
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
159-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
160-
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
161-
};
162-
163-
testTimeDataTypes(
164-
fullTypesMySql57Database, recordType, expectedSnapshot, expectedStreamRecord);
99+
public void testMysqlCommonDataTypes() throws Throwable {
100+
testCommonDataTypes(fullTypesMySqlDatabase);
165101
}
166102

167103
@Test
168-
public void testMysql8TimeDataTypes() throws Throwable {
104+
public void testMysqlTimeDataTypes() throws Throwable {
169105
RowType recordType =
170106
RowType.of(
171107
DataTypes.DECIMAL(20, 0).notNull(),
@@ -178,8 +114,6 @@ public void testMysql8TimeDataTypes() throws Throwable {
178114
DataTypes.TIMESTAMP(3),
179115
DataTypes.TIMESTAMP(6),
180116
DataTypes.TIMESTAMP_LTZ(0),
181-
DataTypes.TIMESTAMP_LTZ(3),
182-
DataTypes.TIMESTAMP_LTZ(6),
183117
DataTypes.TIMESTAMP_LTZ(0));
184118

185119
Object[] expectedSnapshot =
@@ -197,8 +131,6 @@ public void testMysql8TimeDataTypes() throws Throwable {
197131
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
198132
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
199133
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
200-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
201-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
202134
null
203135
};
204136

@@ -214,23 +146,16 @@ public void testMysql8TimeDataTypes() throws Throwable {
214146
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
215147
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
216148
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
217-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
218-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
219149
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
220150
};
221151

222152
testTimeDataTypes(
223-
fullTypesMySql8Database, recordType, expectedSnapshot, expectedStreamRecord);
224-
}
225-
226-
@Test
227-
public void testMysql57PrecisionTypes() throws Throwable {
228-
testMysqlPrecisionTypes(fullTypesMySql57Database);
153+
fullTypesMySqlDatabase, recordType, expectedSnapshot, expectedStreamRecord);
229154
}
230155

231156
@Test
232-
public void testMysql8PrecisionTypes() throws Throwable {
233-
testMysqlPrecisionTypes(fullTypesMySql8Database);
157+
public void testMysqlPrecisionTypes() throws Throwable {
158+
testMysqlPrecisionTypes(fullTypesMySqlDatabase);
234159
}
235160

236161
public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java

Lines changed: 3 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -27,63 +27,33 @@
2727
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
2828
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
2929
import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
30-
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
31-
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
3230
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
3331
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3432

35-
import org.junit.AfterClass;
3633
import org.junit.Before;
37-
import org.junit.BeforeClass;
3834
import org.junit.Test;
39-
import org.testcontainers.lifecycle.Startables;
4035

4136
import java.time.ZoneId;
4237
import java.util.Arrays;
4338
import java.util.List;
4439
import java.util.stream.Collectors;
45-
import java.util.stream.Stream;
4640

4741
import static org.assertj.core.api.Assertions.assertThat;
4842
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4943

5044
/** IT cases for {@link MySqlMetadataAccessor}. */
5145
public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
5246

53-
private static final MySqlContainer MYSQL8_CONTAINER =
54-
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
55-
5647
private final UniqueDatabase fullTypesMySql57Database =
5748
new UniqueDatabase(
5849
MYSQL_CONTAINER,
5950
"column_type_test",
6051
MySqSourceTestUtils.TEST_USER,
6152
MySqSourceTestUtils.TEST_PASSWORD);
6253

63-
private final UniqueDatabase fullTypesMySql8Database =
64-
new UniqueDatabase(
65-
MYSQL8_CONTAINER,
66-
"column_type_test_mysql8",
67-
MySqSourceTestUtils.TEST_USER,
68-
MySqSourceTestUtils.TEST_PASSWORD);
69-
7054
private final StreamExecutionEnvironment env =
7155
StreamExecutionEnvironment.getExecutionEnvironment();
7256

73-
@BeforeClass
74-
public static void beforeClass() {
75-
LOG.info("Starting MySql8 containers...");
76-
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
77-
LOG.info("Container MySql8 is started.");
78-
}
79-
80-
@AfterClass
81-
public static void afterClass() {
82-
LOG.info("Stopping MySql8 containers...");
83-
MYSQL8_CONTAINER.stop();
84-
LOG.info("Container MySql8 is stopped.");
85-
}
86-
8757
@Before
8858
public void before() {
8959
env.setParallelism(DEFAULT_PARALLELISM);
@@ -92,27 +62,17 @@ public void before() {
9262
}
9363

9464
@Test
95-
public void testMysql57AccessDatabaseAndTable() {
65+
public void testMysqlAccessDatabaseAndTable() {
9666
testAccessDatabaseAndTable(fullTypesMySql57Database);
9767
}
9868

9969
@Test
100-
public void testMysql8AccessDatabaseAndTable() {
101-
testAccessDatabaseAndTable(fullTypesMySql8Database);
102-
}
103-
104-
@Test
105-
public void testMysql57AccessCommonTypesSchema() {
70+
public void testMysqlAccessCommonTypesSchema() {
10671
testAccessCommonTypesSchema(fullTypesMySql57Database);
10772
}
10873

10974
@Test
110-
public void testMysql8AccessCommonTypesSchema() {
111-
testAccessCommonTypesSchema(fullTypesMySql8Database);
112-
}
113-
114-
@Test
115-
public void testMysql57AccessTimeTypesSchema() {
75+
public void testMysqlAccessTimeTypesSchema() {
11676
fullTypesMySql57Database.createAndInitialize();
11777

11878
String[] tables = new String[] {"time_types"};
@@ -157,56 +117,6 @@ public void testMysql57AccessTimeTypesSchema() {
157117
assertThat(actualSchema).isEqualTo(expectedSchema);
158118
}
159119

160-
@Test
161-
public void testMysql8AccessTimeTypesSchema() {
162-
fullTypesMySql8Database.createAndInitialize();
163-
164-
String[] tables = new String[] {"time_types"};
165-
MySqlMetadataAccessor metadataAccessor =
166-
getMetadataAccessor(tables, fullTypesMySql8Database);
167-
168-
Schema actualSchema =
169-
metadataAccessor.getTableSchema(
170-
TableId.tableId(fullTypesMySql8Database.getDatabaseName(), "time_types"));
171-
Schema expectedSchema =
172-
Schema.newBuilder()
173-
.primaryKey("id")
174-
.fromRowDataType(
175-
RowType.of(
176-
new DataType[] {
177-
DataTypes.DECIMAL(20, 0).notNull(),
178-
DataTypes.INT(),
179-
DataTypes.DATE(),
180-
DataTypes.TIME(0),
181-
DataTypes.TIME(3),
182-
DataTypes.TIME(6),
183-
DataTypes.TIMESTAMP(0),
184-
DataTypes.TIMESTAMP(3),
185-
DataTypes.TIMESTAMP(6),
186-
DataTypes.TIMESTAMP_LTZ(0),
187-
DataTypes.TIMESTAMP_LTZ(3),
188-
DataTypes.TIMESTAMP_LTZ(6),
189-
DataTypes.TIMESTAMP_LTZ(0)
190-
},
191-
new String[] {
192-
"id",
193-
"year_c",
194-
"date_c",
195-
"time_c",
196-
"time_3_c",
197-
"time_6_c",
198-
"datetime_c",
199-
"datetime3_c",
200-
"datetime6_c",
201-
"timestamp_c",
202-
"timestamp3_c",
203-
"timestamp6_c",
204-
"timestamp_def_c"
205-
}))
206-
.build();
207-
assertThat(actualSchema).isEqualTo(expectedSchema);
208-
}
209-
210120
private void testAccessDatabaseAndTable(UniqueDatabase database) {
211121
database.createAndInitialize();
212122

0 commit comments

Comments
 (0)