Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public static MySqlDatabaseSchema createMySqlDatabaseSchema(

/** Fetch current binlog offsets in MySql Server. */
public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
final String showMasterStmt = "SHOW MASTER STATUS";
final String query = getBinlogStatusQuery(jdbc);
try {
return jdbc.queryAndMap(
showMasterStmt,
query,
rs -> {
if (rs.next()) {
final String binlogFilename = rs.getString(1);
Expand All @@ -129,19 +129,46 @@ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
} else {
throw new FlinkRuntimeException(
"Cannot read the binlog filename and position via '"
+ showMasterStmt
+ query
+ "'. Make sure your server is correctly configured");
}
});
} catch (SQLException e) {
throw new FlinkRuntimeException(
"Cannot read the binlog filename and position via '"
+ showMasterStmt
+ query
+ "'. Make sure your server is correctly configured",
e);
}
}

/**
* Returns the appropriate binlog status query based on MySQL version. MySQL 8.4+ uses SHOW
* BINARY LOG STATUS; MySQL 8.0 and below use SHOW MASTER STATUS.
*/
private static String getBinlogStatusQuery(JdbcConnection jdbc) {
try {
String version =
jdbc.queryAndMap(
"SELECT VERSION()",
rs -> {
if (rs.next()) {
return rs.getString(1);
}
return "";
});
if (version != null
&& (version.startsWith("8.4")
|| version.startsWith("8.5")
|| version.startsWith("9."))) {
return "SHOW BINARY LOG STATUS";
}
} catch (Exception e) {
// Ignore exception, use default command
}
return "SHOW MASTER STATUS";
}

/** Create a TableFilter by database name and table name. */
public static Tables.TableFilter createTableFilter(String database, String table) {
final Selectors.TableSelectionPredicateBuilder eligibleTables =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ public boolean isGtidModeEnabled() {
*/
public String knownGtidSet() {
try {
String query = getBinlogStatusQuery();
return queryAndMap(
"SHOW MASTER STATUS",
query,
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return rs.getString(
Expand All @@ -288,6 +289,33 @@ public String knownGtidSet() {
}
}

/**
* Returns the appropriate binlog status query based on MySQL version. MySQL 8.4+ uses SHOW
* BINARY LOG STATUS; MySQL 8.0 and below use SHOW MASTER STATUS.
*/
private String getBinlogStatusQuery() {
try {
String version =
queryAndMap(
"SELECT VERSION()",
rs -> {
if (rs.next()) {
return rs.getString(1);
}
return "";
});
if (version != null
&& (version.startsWith("8.4")
|| version.startsWith("8.5")
|| version.startsWith("9."))) {
return "SHOW BINARY LOG STATUS";
}
} catch (Exception e) {
// Ignore exception, use default command
}
return "SHOW MASTER STATUS";
}

/**
* Determine the difference between two sets.
*
Expand Down