Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
public static final String NAME_CONNECTION = "connection";
public static final String DEFAULT_ROW_PREFETCH_VALUE = "40";
public static final String DEFAULT_BATCH_SIZE = "10";
public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";

@Name(NAME_USE_CONNECTION)
@Nullable
Expand All @@ -123,6 +124,14 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
@Nullable
private Integer defaultRowPrefetch;

@Name(TREAT_AS_OLD_TIMESTAMP)
@Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.")
@Macro
@Nullable
@MetadataProperty(key = "hidden", value = "true")
private boolean treatAsOldTimestamp = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it object type otherwise it will be shown as mandatory



public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName,
String connectionArguments, String connectionType, String database, String role,
int defaultBatchValue, int defaultRowPrefetch,
Expand Down Expand Up @@ -163,6 +172,10 @@ public OracleConnectorConfig getConnection() {
return connection;
}

public boolean shouldTreatAsOldTimestamp() {
return treatAsOldTimestamp;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do like this
return Boolean.TRUE.equals(treatAsOldTimestamp);

}

@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
Expand All @@ -177,6 +190,17 @@ public String getTransactionIsolationLevel() {
@Override
protected void validateField(FailureCollector collector,
Schema.Field field, Schema actualFieldSchema, Schema expectedFieldSchema) {
// For handling backward compatibility with pipelines built prior to plugin version change.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is already there at line 223

// If the config flag 'treatAsOldTimestamp' is enabled, allow DATETIME fields (actual schema)
// to be treated as TIMESTAMP_MICROS (expected schema). This preserves legacy behavior where
// non-UTC timestamp fields were interpreted as TIMESTAMP_MICROS.
if (shouldTreatAsOldTimestamp()) {
if (Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType())
&& Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) {
return;
}
}

// This change is needed to make sure that the pipeline upgrade continues to work post upgrade.
// Since the older handling of the precision less used to convert to the decimal type,
// and the new version would try to convert to the String type. In that case the output schema would
Expand Down