diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java index efd4d304356..f56fd3a2ba0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.connectors.oracle.source.parser; +import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; + import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener; @@ -76,21 +78,6 @@ public String getColumnName(final PlSqlParser.New_column_nameContext ctx) { * @return parsed table or column name from the supplied name argument */ private static String getTableOrColumnName(String name) { - return removeQuotes(name, true); - } - - /** - * Removes leading and trailing double quote characters from the provided string. - * - * @param text value to have double quotes removed - * @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted - * @return string that has had quotes removed - */ - @SuppressWarnings("SameParameterValue") - private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) { - if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) { - return text.substring(1, text.length() - 1); - } - return upperCaseIfNotQuoted ? text.toUpperCase() : text; + return OracleSchemaUtils.removeQuotes(name); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java index e81c4110cfb..083442b5170 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java @@ -46,6 +46,25 @@ public class OracleSchemaUtils { private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class); + /** + * Removes leading and trailing double quote characters from the provided string. + * + *

Oracle table and column names are inherently stored in upper-case; however, if the objects + * are created using double-quotes, the case of the object name is retained. This method will + * adhere to those rules and will always return the name in upper-case unless the provided name + * is double-quoted in which the returned value will have the double-quotes removed and case + * retained. + * + * @param text value to have double quotes removed + * @return string that has had quotes removed + */ + public static String removeQuotes(String text) { + if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) { + return text.substring(1, text.length() - 1); + } + return text.toUpperCase(); + } + public static List listTables( OracleSourceConfig sourceConfig, @Nullable String dbName) { try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) { @@ -127,16 +146,23 @@ public static Schema toSchema(Table table) { .map(OracleSchemaUtils::toColumn) .collect(Collectors.toList()); + List primaryKeys = + table.primaryKeyColumnNames().stream() + .map(OracleSchemaUtils::removeQuotes) + .collect(Collectors.toList()); + return Schema.newBuilder() .setColumns(columns) - .primaryKey(table.primaryKeyColumnNames()) + .primaryKey(primaryKeys) .comment(table.comment()) .build(); } public static Column toColumn(io.debezium.relational.Column column) { return Column.physicalColumn( - column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment()); + removeQuotes(column.name()), + OracleTypeUtils.fromDbzColumn(column), + column.comment()); } public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { @@ -158,7 +184,7 @@ public static Schema getSchema( DataType dataType = OracleTypeUtils.fromDbzColumn(column); org.apache.flink.cdc.common.schema.Column cdcColumn = org.apache.flink.cdc.common.schema.Column.physicalColumn( - column.name().toLowerCase(Locale.ROOT), dataType); + removeQuotes(column.name()), dataType); list.add(cdcColumn); } return Schema.newBuilder().setColumns(list).primaryKey(pks).build(); @@ -181,7 +207,7 @@ public static List getTablePks( while (rs.next()) { String columnName; columnName = rs.getString(1); - list.add(columnName.toLowerCase(Locale.ROOT)); + list.add(removeQuotes(columnName)); } return list; }); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java index cd90caad827..6f13b7562ef 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java @@ -981,10 +981,10 @@ public void testGeometryType() throws Exception { new CreateTableEvent( TableId.tableId("DEBEZIUM", "MYLAKE"), Schema.newBuilder() - .physicalColumn("feature_id", DataTypes.BIGINT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(32)) - .physicalColumn("shape", DataTypes.STRING()) - .primaryKey(Arrays.asList("feature_id")) + .physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(32)) + .physicalColumn("SHAPE", DataTypes.STRING()) + .primaryKey(Arrays.asList("FEATURE_ID")) .build()); RowType rowType = @@ -1568,11 +1568,11 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { return new CreateTableEvent( tableId, Schema.newBuilder() - .physicalColumn("id", DataTypes.BIGINT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(255).notNull()) - .physicalColumn("description", DataTypes.VARCHAR(512)) - .physicalColumn("weight", DataTypes.FLOAT()) - .primaryKey(Collections.singletonList("id")) + .physicalColumn("ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512)) + .physicalColumn("WEIGHT", DataTypes.FLOAT()) + .primaryKey(Collections.singletonList("ID")) .build()); } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java index 0be4734e79c..e6b5387a224 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java @@ -150,7 +150,7 @@ void testSyncWholeDatabase() throws Exception { Statement stat = conn.createStatement()) { waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception { "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception { waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent(