Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ static void validateCompatibility(DataType fieldType, PojoType.Property prop) {
return;
}

if (typeRoot == DataTypeRoot.ROW) {
// ROW type maps to a nested POJO. The POJO class must be a valid POJO (public class
// with public default constructor). Detailed field-level validation is deferred to
// the nested PojoToRowConverter / RowToPojoConverter.
if (actual.isPrimitive()
|| actual.isArray()
|| Collection.class.isAssignableFrom(actual)
|| Map.class.isAssignableFrom(actual)) {
throw new IllegalArgumentException(
String.format(
"Field '%s' must be a POJO class for ROW type, got %s",
prop.name, actual.getName()));
}
return;
}

Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
if (supported == null) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DecimalType;
import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;

import java.lang.reflect.Array;
import java.time.Instant;
Expand Down Expand Up @@ -169,6 +171,24 @@ private static ElementConverter buildElementConverter(
return (arr, i) ->
new FlussMapToPojoMap(arr.getMap(i), (MapType) elementType, fieldName)
.convertMap();
case ROW:
{
RowType nestedRowType = (RowType) elementType;
int nestedFieldCount = nestedRowType.getFieldCount();
if (pojoType == Object.class) {
// When the target type is unknown (e.g. ROW values in a Map),
// return InternalRow directly
return (arr, i) -> arr.getRow(i, nestedFieldCount);
}
@SuppressWarnings("unchecked")
RowToPojoConverter<Object> nestedConverter =
RowToPojoConverter.of(
(Class<Object>) pojoType, nestedRowType, nestedRowType);
return (arr, i) -> {
InternalRow nestedRow = arr.getRow(i, nestedFieldCount);
return nestedRow == null ? null : nestedConverter.fromRow(nestedRow);
};
}
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
new PojoMapToFlussMap(
(Map<?, ?>) prop.read(obj), (MapType) fieldType, prop.name)
.convertMap();
case ROW:
{
RowType nestedRowType = (RowType) fieldType;
@SuppressWarnings("unchecked")
PojoToRowConverter<Object> nestedConverter =
PojoToRowConverter.of(
(Class<Object>) prop.type, nestedRowType, nestedRowType);
return (obj) -> {
Object nested = prop.read(obj);
return nested == null ? null : nestedConverter.toRow(nested);
};
}
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DecimalType;
import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -260,6 +261,15 @@ private static int truncateNanos(int nanos, int precision) {
case MAP:
return new PojoMapToFlussMap((Map<?, ?>) obj, (MapType) elementType, fieldName)
.convertMap();
case ROW:
{
RowType nestedRowType = (RowType) elementType;
@SuppressWarnings("unchecked")
PojoToRowConverter<Object> nestedConverter =
PojoToRowConverter.of(
(Class<Object>) obj.getClass(), nestedRowType, nestedRowType);
return nestedConverter.toRow(obj);
}
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ private static RowToField createRowReader(DataType fieldType, PojoType.Property
return (row, pos) ->
new FlussMapToPojoMap(row.getMap(pos), (MapType) fieldType, prop.name)
.convertMap();
case ROW:
{
RowType nestedRowType = (RowType) fieldType;
int nestedFieldCount = nestedRowType.getFieldCount();
@SuppressWarnings("unchecked")
RowToPojoConverter<Object> nestedConverter =
RowToPojoConverter.of(
(Class<Object>) prop.type, nestedRowType, nestedRowType);
return (row, pos) -> {
InternalRow nestedRow = row.getRow(pos, nestedFieldCount);
return nestedRow == null ? null : nestedConverter.fromRow(nestedRow);
};
}
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Loading