Skip to content

Commit b47b0cd

Browse files
author
Chengbing Liu
committed
[FLINK-39197][jdbc&mysql] Fix NPE when finding chunk end
1 parent 570faa7 commit b47b0cd

4 files changed

Lines changed: 167 additions & 2 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
1919

2020
import org.apache.flink.cdc.common.annotation.Experimental;
21+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2122
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
2223
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
2324
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
@@ -507,7 +508,8 @@ private List<ChunkRange> splitUnevenlySizedChunks(
507508
return splits;
508509
}
509510

510-
private Object nextChunkEnd(
511+
@VisibleForTesting
512+
Object nextChunkEnd(
511513
JdbcConnection jdbc,
512514
Object previousChunkEnd,
513515
TableId tableId,
@@ -518,6 +520,9 @@ private Object nextChunkEnd(
518520
// chunk end might be null when max values are removed
519521
Object chunkEnd =
520522
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
523+
if (chunkEnd == null) {
524+
return null;
525+
}
521526
if (Objects.equals(previousChunkEnd, chunkEnd)) {
522527
// we don't allow equal chunk start and end,
523528
// should query the next one larger than chunkEnd
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
19+
20+
import org.apache.flink.table.api.DataTypes;
21+
22+
import io.debezium.jdbc.JdbcConnection;
23+
import io.debezium.relational.Column;
24+
import io.debezium.relational.TableId;
25+
import org.assertj.core.api.Assertions;
26+
import org.junit.jupiter.api.Test;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.sql.SQLException;
31+
32+
/** Tests for {@link JdbcSourceChunkSplitter}. */
33+
class JdbcSourceChunkSplitterTest {
34+
35+
/**
36+
* Guards against regressions where {@code queryNextChunkMax} may return {@code null} (e.g. when
37+
* the max row was removed after MIN/MAX was determined). {@code nextChunkEnd} must handle this
38+
* gracefully and return null without throwing exceptions.
39+
*/
40+
@Test
41+
void testNextChunkEndReturnsNullWhenMaxRowRemoved() throws Exception {
42+
// given a splitter whose queryNextChunkMax always returns null
43+
JdbcSourceChunkSplitter splitter = new TestingJdbcSourceChunkSplitter(null);
44+
45+
TableId tableId = TableId.parse("catalog.db.table");
46+
Column splitColumn =
47+
Column.editor().name("id").type("INT").jdbcType(java.sql.Types.INTEGER).create();
48+
49+
Object previousChunkEnd = 10;
50+
Object max = 100;
51+
int chunkSize = 5;
52+
53+
// when queryNextChunkMax returns null, nextChunkEnd should also return null
54+
Object result =
55+
splitter.nextChunkEnd(null, previousChunkEnd, tableId, splitColumn, max, chunkSize);
56+
57+
Assertions.assertThat(result).isNull();
58+
}
59+
60+
/** Minimal testing implementation that stubs out JDBC interactions. */
61+
private static class TestingJdbcSourceChunkSplitter extends JdbcSourceChunkSplitter {
62+
63+
@Nullable private final Object nextChunkMaxResult;
64+
65+
TestingJdbcSourceChunkSplitter(@Nullable Object nextChunkMaxResult) {
66+
super(null, null, null, null, null);
67+
this.nextChunkMaxResult = nextChunkMaxResult;
68+
}
69+
70+
@Override
71+
protected Object queryNextChunkMax(
72+
JdbcConnection jdbc,
73+
TableId tableId,
74+
Column splitColumn,
75+
int chunkSize,
76+
Object includedLowerBound)
77+
throws SQLException {
78+
return nextChunkMaxResult;
79+
}
80+
81+
@Override
82+
protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
83+
throws SQLException {
84+
return 0L;
85+
}
86+
87+
@Override
88+
protected Object queryMin(
89+
JdbcConnection jdbc, TableId tableId, Column splitColumn, Object excludedLowerBound)
90+
throws SQLException {
91+
return null;
92+
}
93+
94+
@Override
95+
protected org.apache.flink.table.types.DataType fromDbzColumn(Column splitColumn) {
96+
// The concrete type is irrelevant for this test; just return a simple numeric type.
97+
return DataTypes.BIGINT();
98+
}
99+
}
100+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ public List<ChunkRange> splitEvenlySizedChunks(
322322
return splits;
323323
}
324324

325-
private Object nextChunkEnd(
325+
@VisibleForTesting
326+
Object nextChunkEnd(
326327
JdbcConnection jdbc,
327328
Object previousChunkEnd,
328329
TableId tableId,
@@ -334,6 +335,9 @@ private Object nextChunkEnd(
334335
Object chunkEnd =
335336
StatementUtils.queryNextChunkMax(
336337
jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
338+
if (chunkEnd == null) {
339+
return null;
340+
}
337341
if (Objects.equals(previousChunkEnd, chunkEnd)) {
338342
// we don't allow equal chunk start and end,
339343
// should query the next one larger than chunkEnd

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@
2121
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
2222
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
2323

24+
import io.debezium.config.Configuration;
25+
import io.debezium.jdbc.JdbcConfiguration;
26+
import io.debezium.jdbc.JdbcConnection;
2427
import io.debezium.relational.TableId;
2528
import org.assertj.core.api.Assertions;
2629
import org.junit.jupiter.api.Test;
2730

31+
import java.sql.SQLException;
2832
import java.time.ZoneId;
33+
import java.util.Collections;
2934
import java.util.List;
3035

3136
/** Tests for {@link org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter}. */
@@ -85,4 +90,55 @@ void testSplitEvenlySizedChunksNormal() {
8590
ChunkRange.of(2147483637, 2147483647),
8691
ChunkRange.of(2147483647, null));
8792
}
93+
94+
@Test
95+
void testNextChunkEndReturnsNullWhenMaxRowRemoved() throws Exception {
96+
// given a MySqlChunkSplitter instance
97+
MySqlSourceConfig sourceConfig =
98+
new MySqlSourceConfigFactory()
99+
.startupOptions(StartupOptions.initial())
100+
.databaseList("")
101+
.tableList("")
102+
.hostname("")
103+
.username("")
104+
.password("")
105+
.serverTimeZone(ZoneId.of("UTC").toString())
106+
.assignUnboundedChunkFirst(false)
107+
.createConfig(0);
108+
MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, sourceConfig);
109+
110+
// and a JdbcConnection whose prepareQueryAndMap always returns null,
111+
// so that StatementUtils.queryNextChunkMax(... ) returns null
112+
JdbcConfiguration jdbcConfiguration =
113+
JdbcConfiguration.adapt(Configuration.from(Collections.emptyMap()));
114+
JdbcConnection jdbc =
115+
new JdbcConnection(
116+
jdbcConfiguration,
117+
config -> {
118+
throw new SQLException("Connection not used in test");
119+
},
120+
"`",
121+
"`") {
122+
@Override
123+
public <T> T prepareQueryAndMap(
124+
String query,
125+
StatementPreparer statementPreparer,
126+
ResultSetMapper<T> mapper)
127+
throws SQLException {
128+
return null;
129+
}
130+
};
131+
132+
TableId tableId = new TableId("catalog", "db", "tab");
133+
Object previousChunkEnd = 10;
134+
Object max = 100;
135+
int chunkSize = 5;
136+
137+
Object result =
138+
splitter.nextChunkEnd(jdbc, previousChunkEnd, tableId, "id", max, chunkSize);
139+
140+
// when queryNextChunkMax returns null, nextChunkEnd should also return null
141+
// instead of propagating the null further and causing errors
142+
Assertions.assertThat(result).isNull();
143+
}
88144
}

0 commit comments

Comments
 (0)