Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
metadata.isSigned(index), true);
}

public Schema getSchema(String typeName, int sqlType, int precision, int scale, String columnName,
boolean isSigned) throws SQLException {
return DBUtils.getSchema(typeName, sqlType, precision, scale, columnName, isSigned, true);
}

@Override
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
protected void setField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
Object o = DBUtils.transformValue(sqlType, sqlPrecision, sqlScale, resultSet, columnIndex);
if (o instanceof Date) {
setFieldValue(recordBuilder, field, o);
}

protected void setFieldValue(StructuredRecord.Builder recordBuilder, Schema.Field field, Object o)
throws SQLException {
if (o == null) {
recordBuilder.set(field.getName(), null);
} else if (o instanceof Date) {
recordBuilder.setDate(field.getName(), ((Date) o).toLocalDate());
} else if (o instanceof Time) {
recordBuilder.setTime(field.getName(), ((Time) o).toLocalTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Struct;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -106,13 +109,122 @@ record = recordBuilder.build();
@Override
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) {
handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
} else {
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
}
}

@Override
protected void setFieldValue(StructuredRecord.Builder recordBuilder, Schema.Field field, Object attrValue)
throws SQLException {
if (attrValue == null) {
recordBuilder.set(field.getName(), null);
return;
}

Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
String attrClassName = attrValue.getClass().getName();

// Handle Nested Structs Recursively
if (attrValue instanceof Struct) {
recordBuilder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, null));
return;
}

// Handle Oracle UDTs (Clobs, Blobs, SQLXML, INTERVALS)
if (attrValue instanceof Clob) {
Clob clob = (Clob) attrValue;
recordBuilder.set(field.getName(), clob.getSubString(1, (int) clob.length()));
return;
}

if (attrValue instanceof Blob) {
Blob blob = (Blob) attrValue;
recordBuilder.set(field.getName(), blob.getBytes(1, (int) blob.length()));
return;
}

if (attrValue instanceof java.sql.SQLXML) {
recordBuilder.set(field.getName(), ((java.sql.SQLXML) attrValue).getString());
return;
}

if ("oracle.sql.INTERVALDS".equals(attrClassName) || "oracle.sql.INTERVALYM".equals(attrClassName)) {
recordBuilder.set(field.getName(), attrValue.toString());
return;
}

// Handle Oracle's lazy BigDecimals and downcast them
if (attrValue instanceof BigDecimal) {
BigDecimal bigDecimal = (BigDecimal) attrValue;
if (Schema.LogicalType.DECIMAL.equals(fieldSchema.getLogicalType())) {
recordBuilder.setDecimal(field.getName(), bigDecimal.setScale(fieldSchema.getScale(),
java.math.RoundingMode.HALF_UP));
} else {
switch (fieldSchema.getType()) {
case DOUBLE: recordBuilder.set(field.getName(), bigDecimal.doubleValue()); break;
case FLOAT: recordBuilder.set(field.getName(), bigDecimal.floatValue()); break;
case INT: recordBuilder.set(field.getName(), bigDecimal.intValue()); break;
case LONG: recordBuilder.set(field.getName(), bigDecimal.longValue()); break;
case STRING: recordBuilder.set(field.getName(), bigDecimal.toString()); break;
default: recordBuilder.set(field.getName(), bigDecimal);
}
}
return;
}

// 4. Handle Oracle Timestamps to ensure DATETIME schema compatibility
if (attrValue instanceof Timestamp) {
Timestamp timestamp = (Timestamp) attrValue;
if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
recordBuilder.setDateTime(field.getName(), timestamp.toLocalDateTime());
} else if (Schema.LogicalType.DATE.equals(fieldSchema.getLogicalType())) {
recordBuilder.setDate(field.getName(), timestamp.toLocalDateTime().toLocalDate());
} else if (fieldSchema.getLogicalType() == null && Schema.Type.STRING.equals(fieldSchema.getType())) {
// Only stringify if the CDAP schema strictly demands a String
recordBuilder.set(field.getName(), attrValue.toString());
} else {
// HAND IT BACK TO THE PARENT! This restores the exact behavior of your old build.
super.setFieldValue(recordBuilder, field, attrValue);
}
return;
}

// Handle Timezone shifting
if (attrValue instanceof OffsetDateTime || attrValue instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (attrValue instanceof OffsetDateTime)
? ((OffsetDateTime) attrValue).atZoneSameInstant(ZoneId.of("UTC"))
: ((ZonedDateTime) attrValue).withZoneSameInstant(ZoneId.of("UTC"));

if (fieldSchema.getLogicalType() != null &&
(Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType()) ||
Schema.LogicalType.TIMESTAMP_MILLIS.equals(fieldSchema.getLogicalType()))) {
recordBuilder.setTimestamp(field.getName(), zonedDateTime);
} else if (Schema.Type.LONG.equals(fieldSchema.getType())) {
recordBuilder.set(field.getName(), zonedDateTime.toInstant().toEpochMilli());
} else {
recordBuilder.set(field.getName(), zonedDateTime.toString());
}
return;
}

// Handle BFILE
try {
ClassLoader oracleLoader = attrValue.getClass().getClassLoader();
if (oracleLoader != null && oracleLoader.loadClass("oracle.jdbc.OracleBfile").isInstance(attrValue)) {
recordBuilder.set(field.getName(), getBfileBytes(attrValue, field.getName()));
return;
}
} catch (Exception e) {
// Not a BFile, let it fall through
}

// Parent DBRecord handles the standard types
super.setFieldValue(recordBuilder, field, attrValue);
}

@Override
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String fieldName, int fieldIndex) throws SQLException {
Expand Down Expand Up @@ -232,11 +344,15 @@ private Object createOracleTimestamp(Connection connection, String timestampStri
*/
private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLException {
Object bfile = resultSet.getObject(columnName);
return getBfileBytes(bfile, columnName);
}

public static byte[] getBfileBytes(Object bfile, String columnName) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this static?

if (bfile == null) {
return null;
}
try {
ClassLoader classLoader = resultSet.getClass().getClassLoader();
ClassLoader classLoader = bfile.getClass().getClassLoader();
Class<?> oracleBfileClass = classLoader.loadClass("oracle.jdbc.OracleBfile");
boolean isFileExist = (boolean) oracleBfileClass.getMethod("fileExists").invoke(bfile);
if (!isFileExist) {
Expand Down Expand Up @@ -341,6 +457,12 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
case OracleSourceSchemaReader.LONG_RAW:
recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex));
break;
case Types.STRUCT:
Struct structValue = (Struct) resultSet.getObject(columnIndex);
if (structValue != null) {
recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, resultSet));
}
break;
case Types.DECIMAL:
case Types.NUMERIC:
// This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER.
Expand Down Expand Up @@ -371,6 +493,21 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
}
}

private StructuredRecord convertStructToRecord(Struct struct, Schema schema, ResultSet resultSet)
throws SQLException {
Object[] attributes = struct.getAttributes();
List<Schema.Field> fields = schema.getFields();
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

for (int index = 0; index < attributes.length; index++) {
Schema.Field field = fields.get(index);
Object attrValue = attributes[index];

setFieldValue(builder, field, attrValue);
}
return builder.build();
}

/**
* Get the scale set in Non-nullable schema associated with the schema
* */
Expand Down
Loading