Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.connectors.oracle.source.parser;

import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;

import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;

Expand Down Expand Up @@ -76,21 +78,6 @@ public String getColumnName(final PlSqlParser.New_column_nameContext ctx) {
* @return parsed table or column name from the supplied name argument
*/
private static String getTableOrColumnName(String name) {
return removeQuotes(name, true);
}

/**
* Removes leading and trailing double quote characters from the provided string.
*
* @param text value to have double quotes removed
* @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted
* @return string that has had quotes removed
*/
@SuppressWarnings("SameParameterValue")
private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) {
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
return text.substring(1, text.length() - 1);
}
return upperCaseIfNotQuoted ? text.toUpperCase() : text;
return OracleSchemaUtils.removeQuotes(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ public class OracleSchemaUtils {

private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class);

/**
* Removes leading and trailing double quote characters from the provided string.
*
* <p>Oracle table and column names are inherently stored in upper-case; however, if the objects
* are created using double-quotes, the case of the object name is retained. This method will
* adhere to those rules and will always return the name in upper-case unless the provided name
* is double-quoted in which the returned value will have the double-quotes removed and case
* retained.
*
* @param text value to have double quotes removed
* @return string that has had quotes removed
*/
public static String removeQuotes(String text) {
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
return text.substring(1, text.length() - 1);
}
return text.toUpperCase();
}

public static List<TableId> listTables(
OracleSourceConfig sourceConfig, @Nullable String dbName) {
try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) {
Expand Down Expand Up @@ -127,16 +146,23 @@ public static Schema toSchema(Table table) {
.map(OracleSchemaUtils::toColumn)
.collect(Collectors.toList());

List<String> primaryKeys =
table.primaryKeyColumnNames().stream()
.map(OracleSchemaUtils::removeQuotes)
.collect(Collectors.toList());

return Schema.newBuilder()
.setColumns(columns)
.primaryKey(table.primaryKeyColumnNames())
.primaryKey(primaryKeys)
.comment(table.comment())
.build();
}

public static Column toColumn(io.debezium.relational.Column column) {
return Column.physicalColumn(
column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment());
removeQuotes(column.name()),
OracleTypeUtils.fromDbzColumn(column),
column.comment());
}

public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
Expand All @@ -158,7 +184,7 @@ public static Schema getSchema(
DataType dataType = OracleTypeUtils.fromDbzColumn(column);
org.apache.flink.cdc.common.schema.Column cdcColumn =
org.apache.flink.cdc.common.schema.Column.physicalColumn(
column.name().toLowerCase(Locale.ROOT), dataType);
removeQuotes(column.name()), dataType);
list.add(cdcColumn);
}
return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
Expand All @@ -181,7 +207,7 @@ public static List<String> getTablePks(
while (rs.next()) {
String columnName;
columnName = rs.getString(1);
list.add(columnName.toLowerCase(Locale.ROOT));
list.add(removeQuotes(columnName));
}
return list;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,10 +981,10 @@ public void testGeometryType() throws Exception {
new CreateTableEvent(
TableId.tableId("DEBEZIUM", "MYLAKE"),
Schema.newBuilder()
.physicalColumn("feature_id", DataTypes.BIGINT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(32))
.physicalColumn("shape", DataTypes.STRING())
.primaryKey(Arrays.asList("feature_id"))
.physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull())
.physicalColumn("NAME", DataTypes.VARCHAR(32))
.physicalColumn("SHAPE", DataTypes.STRING())
.primaryKey(Arrays.asList("FEATURE_ID"))
.build());

RowType rowType =
Expand Down Expand Up @@ -1568,11 +1568,11 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
.physicalColumn("description", DataTypes.VARCHAR(512))
.physicalColumn("weight", DataTypes.FLOAT())
.primaryKey(Collections.singletonList("id"))
.physicalColumn("ID", DataTypes.BIGINT().notNull())
.physicalColumn("NAME", DataTypes.VARCHAR(255).notNull())
.physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512))
.physicalColumn("WEIGHT", DataTypes.FLOAT())
.primaryKey(Collections.singletonList("ID"))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void testSyncWholeDatabase() throws Exception {
Statement stat = conn.createStatement()) {

waitUntilSpecificEvent(
"CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}");
"CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
waitUntilSpecificEvent(
Expand All @@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception {
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");

waitUntilSpecificEvent(
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
waitUntilSpecificEvent(
Expand Down Expand Up @@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception {
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}");
waitUntilSpecificEvent(
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
waitUntilSpecificEvent(
Expand Down
Loading