diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 897bebcf686c..8b38b2d8ee0f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2254,6 +2254,14 @@ public InlineElement getDescription() { "Specifies column names that should be stored as blob type. " + "This is used when you want to treat a BYTES column as a BLOB."); + public static final ConfigOption BLOB_REF_FIELD = + key("blob-ref-field") + .stringType() + .noDefaultValue() + .withDescription( + "Specifies column names that should be stored as blob reference type. " + + "This is used when you want to treat a BYTES column as a BLOB_REF."); + @Immutable public static final ConfigOption BLOB_DESCRIPTOR_FIELD = key("blob-descriptor-field") @@ -2935,7 +2943,13 @@ public Set blobExternalStorageField() { * subset of descriptor fields and therefore are also updatable. */ public Set updatableBlobFields() { - return blobDescriptorField(); + Set fields = new HashSet<>(blobDescriptorField()); + fields.addAll(blobRefField()); + return fields; + } + + public Set blobRefField() { + return parseCommaSeparatedSet(BLOB_REF_FIELD); } /** @@ -3274,6 +3288,15 @@ public static List blobField(Map options) { return Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList()); } + public static List blobRefField(Map options) { + String string = options.get(BLOB_REF_FIELD.key()); + if (string == null) { + return Collections.emptyList(); + } + + return Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList()); + } + public boolean sequenceFieldSortOrderIsAscending() { return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING; } diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobRefType.java b/paimon-api/src/main/java/org/apache/paimon/types/BlobRefType.java new file mode 100644 index 000000000000..e5d53ead4e4a --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobRefType.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.types; + +import org.apache.paimon.annotation.Public; + +/** + * Data type of blob reference. + * + *

{@link BlobRefType} stores reference bytes inline in data files instead of writing payloads to + * Paimon-managed {@code .blob} files. + * + * @since 1.5.0 + */ +@Public +public final class BlobRefType extends DataType { + + private static final long serialVersionUID = 1L; + + private static final String FORMAT = "BLOB_REF"; + + public BlobRefType(boolean isNullable) { + super(isNullable, DataTypeRoot.BLOB_REF); + } + + public BlobRefType() { + this(true); + } + + @Override + public int defaultSize() { + return BlobType.DEFAULT_SIZE; + } + + @Override + public DataType copy(boolean isNullable) { + return new BlobRefType(isNullable); + } + + @Override + public String asSQLString() { + return withNullability(FORMAT); + } + + @Override + public R accept(DataTypeVisitor visitor) { + return visitor.visit(this); + } +} diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java index af680ede62e2..4a819d42ae2c 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeDefaultVisitor.java @@ -119,6 +119,11 @@ public R visit(BlobType blobType) { return defaultMethod(blobType); } + @Override + public R visit(BlobRefType blobRefType) { + return defaultMethod(blobRefType); + } + @Override public R visit(ArrayType arrayType) { return defaultMethod(arrayType); diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java index 4079dd8c47c0..5e2a39a29fcd 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java @@ -331,6 +331,7 @@ private enum Keyword { LEGACY, VARIANT, BLOB, + BLOB_REF, NOT } @@ -549,6 +550,8 @@ private DataType parseTypeByKeyword() { return new VariantType(); case BLOB: return new BlobType(); + case BLOB_REF: + return new BlobRefType(); case VECTOR: return parseVectorType(); default: diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java index f55da9c4706f..27f8d65a40bf 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeRoot.java @@ -104,6 +104,8 @@ public enum DataTypeRoot { BLOB(DataTypeFamily.PREDEFINED), + BLOB_REF(DataTypeFamily.PREDEFINED), + ARRAY(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), VECTOR(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION), diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java index 6e377309f237..074a1d82ec70 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeVisitor.java @@ -66,6 +66,8 @@ public interface DataTypeVisitor { R visit(BlobType blobType); + R visit(BlobRefType blobRefType); + R visit(ArrayType arrayType); R visit(VectorType vectorType); diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java index 39b180651ef5..0033984bc6cc 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypes.java @@ -163,6 +163,10 @@ public static BlobType BLOB() { return new BlobType(); } + public static BlobRefType BLOB_REF() { + return new BlobRefType(); + } + public static OptionalInt getPrecision(DataType dataType) { return dataType.accept(PRECISION_EXTRACTOR); } diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java index 33defc8f9a01..37b36a24d154 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java @@ -21,6 +21,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -163,6 +164,11 @@ public FieldType visit(BlobType blobType) { throw new UnsupportedOperationException(); } + @Override + public FieldType visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException(); + } + private TimeUnit getTimeUnit(int precision) { if (precision == 0) { return TimeUnit.SECOND; diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java index e1fe66883a84..d8672dfdc23b 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java @@ -47,6 +47,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -447,6 +448,11 @@ public Arrow2PaimonVectorConverter visit(BlobType blobType) { throw new UnsupportedOperationException(); } + @Override + public Arrow2PaimonVectorConverter visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException(); + } + @Override public Arrow2PaimonVectorConverter visit(ArrayType arrayType) { final Arrow2PaimonVectorConverter arrowVectorConvertor = diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java index ccff6d6a24f6..b4d38e2dae61 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java @@ -21,6 +21,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -156,6 +157,11 @@ public ArrowFieldWriterFactory visit(BlobType blobType) { throw new UnsupportedOperationException("Doesn't support BlobType."); } + @Override + public ArrowFieldWriterFactory visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException("Doesn't support BlobRefType."); + } + @Override public ArrowFieldWriterFactory visit(ArrayType arrayType) { ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index 2e0cd5701b71..b22336e31b8d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -157,6 +157,10 @@ static void write( case BLOB: writer.writeBlob(pos, (Blob) o); break; + case BLOB_REF: + byte[] refBytes = BlobUtils.serializeBlobReference((Blob) o); + writer.writeBinary(pos, refBytes, 0, refBytes.length); + break; default: throw new UnsupportedOperationException("Not support type: " + type); } @@ -241,6 +245,11 @@ static ValueSetter createValueSetter(DataType elementType, Serializer seriali return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value); case BLOB: return (writer, pos, value) -> writer.writeBlob(pos, (Blob) value); + case BLOB_REF: + return (writer, pos, value) -> { + byte[] bytes = BlobUtils.serializeBlobReference((Blob) value); + writer.writeBinary(pos, bytes, 0, bytes.length); + }; default: String msg = String.format( diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java index 6586124e466b..2ab095d23a9e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java @@ -65,6 +65,10 @@ static Blob fromDescriptor(UriReader reader, BlobDescriptor descriptor) { return new BlobRef(reader, descriptor); } + static Blob fromReference(BlobReference reference) { + return new UnresolvedBlob(reference); + } + static Blob fromInputStream(Supplier supplier) { return new BlobStream(supplier); } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobReference.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobReference.java new file mode 100644 index 000000000000..d4a7926bd514 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobReference.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Objects; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Serialized metadata for a {@code BLOB_REF} field. + * + *

A blob reference only stores the coordinates needed to locate the original blob value in the + * upstream table: {@code tableName}, {@code fieldId} and {@code rowId}. The actual blob data is + * resolved at read time by scanning the upstream table. + * + *

Serialization layout (Little Endian): + * + *

+ * | Offset       | Field         | Type    | Size |
+ * |--------------|---------------|---------|------|
+ * | 0            | version       | byte    | 1    |
+ * | 1            | magicNumber   | long    | 8    |
+ * | 9            | tableNameLen  | int     | 4    |
+ * | 13           | tableNameBytes| byte[N] | N    |
+ * | 13 + N       | fieldId       | int     | 4    |
+ * | 17 + N       | rowId         | long    | 8    |
+ * 
+ */ +public class BlobReference implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final long MAGIC = 0x424C4F4252454631L; // "BLOBREF1" + private static final byte CURRENT_VERSION = 1; + + private final String tableName; + private final int fieldId; + private final long rowId; + + public BlobReference(String tableName, int fieldId, long rowId) { + this.tableName = tableName; + this.fieldId = fieldId; + this.rowId = rowId; + } + + public String tableName() { + return tableName; + } + + public int fieldId() { + return fieldId; + } + + public long rowId() { + return rowId; + } + + public byte[] serialize() { + byte[] tableBytes = tableName.getBytes(UTF_8); + + int totalSize = 1 + 8 + 4 + tableBytes.length + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN); + buffer.put(CURRENT_VERSION); + buffer.putLong(MAGIC); + buffer.putInt(tableBytes.length); + buffer.put(tableBytes); + buffer.putInt(fieldId); + buffer.putLong(rowId); + return buffer.array(); + } + + public static BlobReference deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + byte version = buffer.get(); + + if (version != CURRENT_VERSION) { + throw new UnsupportedOperationException( + "Expecting BlobReference version to be " + + CURRENT_VERSION + + ", but found " + + version + + "."); + } + + long magic = buffer.getLong(); + if (magic != MAGIC) { + throw new IllegalArgumentException( + "Invalid BlobReference: missing magic header. Expected magic: " + + MAGIC + + ", but found: " + + magic); + } + + byte[] tableBytes = new byte[buffer.getInt()]; + buffer.get(tableBytes); + + int fieldId = buffer.getInt(); + long rowId = buffer.getLong(); + return new BlobReference(new String(tableBytes, UTF_8), fieldId, rowId); + } + + public static boolean isBlobReference(byte[] bytes) { + if (bytes == null || bytes.length < 9) { + return false; + } + ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + byte version = buffer.get(); + return version == CURRENT_VERSION && MAGIC == buffer.getLong(); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + BlobReference that = (BlobReference) o; + return fieldId == that.fieldId + && rowId == that.rowId + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, fieldId, rowId); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobReferenceHolder.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobReferenceHolder.java new file mode 100644 index 000000000000..c5863bf5dffe --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobReferenceHolder.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +/** A {@link Blob} that carries a {@link BlobReference} for re-serialization. */ +public interface BlobReferenceHolder { + + BlobReference reference(); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobReferenceResolver.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobReferenceResolver.java new file mode 100644 index 000000000000..9e0d78485542 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobReferenceResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import java.io.Serializable; + +/** Resolves a {@link BlobReference} through fallback metadata. */ +@FunctionalInterface +public interface BlobReferenceResolver extends Serializable { + + Blob resolve(BlobReference reference); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobUtils.java new file mode 100644 index 000000000000..813e9a649d9b --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.utils.UriReader; +import org.apache.paimon.utils.UriReaderFactory; + +import javax.annotation.Nullable; + +/** Utilities for decoding and encoding blob-related bytes. */ +public class BlobUtils { + + public static Blob fromBytes( + byte[] bytes, @Nullable UriReaderFactory uriReaderFactory, @Nullable FileIO fileIO) { + if (bytes == null) { + return null; + } + + if (BlobReference.isBlobReference(bytes)) { + return new UnresolvedBlob(BlobReference.deserialize(bytes)); + } + + if (BlobDescriptor.isBlobDescriptor(bytes)) { + BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes); + UriReader reader = + uriReaderFactory != null + ? uriReaderFactory.create(descriptor.uri()) + : UriReader.fromFile(fileIO); + return Blob.fromDescriptor(reader, descriptor); + } + + return new BlobData(bytes); + } + + public static byte[] serializeBlobReference(Blob blob) { + if (blob instanceof BlobReferenceHolder) { + return ((BlobReferenceHolder) blob).reference().serialize(); + } + throw new IllegalArgumentException( + "BLOB_REF fields only accept blobs with a BlobReference, but found " + + blob.getClass().getSimpleName() + + "."); + } + + private BlobUtils() {} +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java index 10aefbafdd07..293f47bf445e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java @@ -194,7 +194,14 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - return (Blob) this.fields[pos]; + Object value = this.fields[pos]; + if (value instanceof Blob) { + return (Blob) value; + } + if (value instanceof byte[]) { + return BlobUtils.fromBytes((byte[]) value, null, null); + } + throw new ClassCastException("Cannot cast " + value.getClass().getName() + " to Blob"); } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java index 3bbb85f49963..f4e9e6960b6b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java @@ -146,6 +146,9 @@ static Class getDataClass(DataType type) { case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return Timestamp.class; + case BLOB: + case BLOB_REF: + return Blob.class; case ARRAY: return InternalArray.class; case MULTISET: @@ -228,6 +231,7 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { fieldGetter = row -> row.getVariant(fieldPos); break; case BLOB: + case BLOB_REF: fieldGetter = row -> row.getBlob(fieldPos); break; default: diff --git a/paimon-common/src/main/java/org/apache/paimon/data/UnresolvedBlob.java b/paimon-common/src/main/java/org/apache/paimon/data/UnresolvedBlob.java new file mode 100644 index 000000000000..a9bb9d02d298 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/UnresolvedBlob.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.fs.SeekableInputStream; + +import java.io.Serializable; +import java.util.Objects; + +/** + * An unresolved {@link Blob} that only holds a {@link BlobReference}. It cannot provide data + * directly — callers must resolve it through a {@link BlobReferenceResolver} first. + */ +public class UnresolvedBlob implements Blob, BlobReferenceHolder, Serializable { + + private static final long serialVersionUID = 1L; + + private final BlobReference reference; + + public UnresolvedBlob(BlobReference reference) { + this.reference = reference; + } + + public BlobReference reference() { + return reference; + } + + @Override + public byte[] toData() { + throw new IllegalStateException( + "UnresolvedBlob cannot provide data. Resolve it first via BlobReferenceResolver."); + } + + @Override + public BlobDescriptor toDescriptor() { + throw new IllegalStateException( + "UnresolvedBlob cannot provide descriptor. Resolve it first via BlobReferenceResolver."); + } + + @Override + public SeekableInputStream newInputStream() { + throw new IllegalStateException( + "UnresolvedBlob cannot provide stream. Resolve it first via BlobReferenceResolver."); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + UnresolvedBlob that = (UnresolvedBlob) o; + return Objects.equals(reference, that.reference); + } + + @Override + public int hashCode() { + return Objects.hash(reference); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java index 13d345b1f03f..79a97915efe7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java @@ -20,7 +20,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; -import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.DataSetters; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; @@ -31,7 +31,6 @@ import org.apache.paimon.data.variant.Variant; import org.apache.paimon.fs.FileIO; import org.apache.paimon.types.RowKind; -import org.apache.paimon.utils.UriReader; import java.io.Serializable; @@ -162,14 +161,7 @@ public Blob getBlob(int pos) { if (bytes == null) { return null; } - if (fileIO == null) { - throw new IllegalStateException("FileIO is null, cannot read blob data from uri!"); - } - - // Only blob descriptor could be able to stored in columnar format. - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); - UriReader uriReader = UriReader.fromFile(fileIO); - return Blob.fromDescriptor(uriReader, blobDescriptor); + return BlobUtils.fromBytes(bytes, null, fileIO); } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java index de962ad86a39..12b7a567ec65 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/RowToColumnConverter.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -262,6 +263,11 @@ public TypeConverter visit(BlobType blobType) { throw new UnsupportedOperationException(); } + @Override + public TypeConverter visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException(); + } + @Override public TypeConverter visit(ArrayType arrayType) { return createConverter( diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobRefSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobRefSerializer.java new file mode 100644 index 000000000000..91f88aae8e62 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BlobRefSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.serializer; + +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobReference; +import org.apache.paimon.data.BlobUtils; +import org.apache.paimon.data.UnresolvedBlob; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; + +import java.io.IOException; + +/** Type serializer for {@code BLOB_REF}. */ +public class BlobRefSerializer extends SerializerSingleton { + + private static final long serialVersionUID = 1L; + + public static final BlobRefSerializer INSTANCE = new BlobRefSerializer(); + + @Override + public Blob copy(Blob from) { + return from; + } + + @Override + public void serialize(Blob blob, DataOutputView target) throws IOException { + BinarySerializer.INSTANCE.serialize(BlobUtils.serializeBlobReference(blob), target); + } + + @Override + public Blob deserialize(DataInputView source) throws IOException { + byte[] bytes = BinarySerializer.INSTANCE.deserialize(source); + return new UnresolvedBlob(BlobReference.deserialize(bytes)); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java index 6669f347ff27..9d4c9dba1798 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalSerializers.java @@ -92,6 +92,8 @@ private static Serializer createInternal(DataType type) { return VariantSerializer.INSTANCE; case BLOB: return BlobSerializer.INSTANCE; + case BLOB_REF: + return BlobRefSerializer.INSTANCE; default: throw new UnsupportedOperationException( "Unsupported type '" + type + "' to get internal serializer"); diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java index 4183bfbb2bf8..57fcc8665b97 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapTypeVisitor.java @@ -21,6 +21,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -176,4 +177,9 @@ public final R visit(VariantType rowType) { public final R visit(BlobType blobType) { throw new UnsupportedOperationException("Does not support type blob"); } + + @Override + public final R visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException("Does not support type blob ref"); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java index 322847f849ab..722ab63bc4f0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/FastHash.java @@ -23,6 +23,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -172,6 +173,11 @@ public FastHash visit(BlobType blobType) { throw new UnsupportedOperationException("Does not support type blob"); } + @Override + public FastHash visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException("Does not support type blob_ref"); + } + @Override public FastHash visit(ArrayType arrayType) { throw new UnsupportedOperationException("Does not support type array"); diff --git a/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java b/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java index 241dc6100379..254204dc2511 100644 --- a/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java @@ -25,6 +25,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -272,6 +273,11 @@ public HProcessFunction visit(BlobType blobType) { throw new RuntimeException("Unsupported type"); } + @Override + public HProcessFunction visit(BlobRefType blobRefType) { + throw new RuntimeException("Unsupported type"); + } + @Override public HProcessFunction visit(ArrayType arrayType) { throw new RuntimeException("Unsupported type"); diff --git a/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java index 1d40fe75e776..f95e767cb5ae 100644 --- a/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java @@ -26,6 +26,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -360,6 +361,11 @@ public ZProcessFunction visit(BlobType blobType) { throw new UnsupportedOperationException("Does not support type blob"); } + @Override + public ZProcessFunction visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException("Does not support type blob_ref"); + } + @Override public ZProcessFunction visit(ArrayType arrayType) { throw new RuntimeException("Unsupported type"); diff --git a/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java b/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java index dbac55a07dde..94c71b6346ec 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/InternalRowToSizeVisitor.java @@ -18,6 +18,7 @@ package org.apache.paimon.types; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.DataGetters; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -229,7 +230,21 @@ public BiFunction visit(BlobType blobType) { if (row.isNullAt(index)) { return NULL_SIZE; } else { - return Math.toIntExact(row.getVariant(index).sizeInBytes()); + return row.getBlob(index).toData().length; + } + }; + } + + @Override + public BiFunction visit(BlobRefType blobRefType) { + return (row, index) -> { + if (row.isNullAt(index)) { + return NULL_SIZE; + } + try { + return row.getBinary(index).length; + } catch (ClassCastException | UnsupportedOperationException e) { + return BlobUtils.serializeBlobReference(row.getBlob(index)).length; } }; } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java index 4cfe35e39851..e2ff28c7bab1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java @@ -331,6 +331,7 @@ public static Object get(DataGetters dataGetters, int pos, DataType fieldType) { case VARIANT: return dataGetters.getVariant(pos); case BLOB: + case BLOB_REF: return dataGetters.getBlob(pos); default: throw new UnsupportedOperationException("Unsupported type: " + fieldType); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java index 99e8fd455c41..6ea9a0a7b52f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -336,6 +337,11 @@ public ColumnVector visit(BlobType blobType) { throw new UnsupportedOperationException("BlobType is not supported."); } + @Override + public ColumnVector visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException("BlobRefType is not supported."); + } + @Override public ColumnVector visit(ArrayType arrayType) { return new ArrayColumnVector() { diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobReferenceTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BlobReferenceTest.java new file mode 100644 index 000000000000..7bbb7965ba95 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobReferenceTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link BlobReference}. */ +public class BlobReferenceTest { + + @Test + public void testSerializeAndDeserialize() { + BlobReference reference = new BlobReference("default.source", 7, 5L); + + BlobReference deserialized = BlobReference.deserialize(reference.serialize()); + + assertThat(deserialized.tableName()).isEqualTo("default.source"); + assertThat(deserialized.fieldId()).isEqualTo(7); + assertThat(deserialized.rowId()).isEqualTo(5L); + } + + @Test + public void testRejectUnexpectedVersion() { + BlobReference reference = new BlobReference("default.source", 7, 5L); + byte[] bytes = reference.serialize(); + bytes[0] = 3; + + assertThatThrownBy(() -> BlobReference.deserialize(bytes)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Expecting BlobReference version to be 1"); + } + + @Test + public void testEquality() { + BlobReference a = new BlobReference("default.source", 7, 5L); + BlobReference b = new BlobReference("default.source", 7, 5L); + BlobReference c = new BlobReference("default.source", 8, 5L); + + assertThat(a).isEqualTo(b); + assertThat(a.hashCode()).isEqualTo(b.hashCode()); + assertThat(a).isNotEqualTo(c); + } + + @Test + public void testIsBlobReference() { + BlobReference reference = new BlobReference("default.source", 7, 5L); + byte[] bytes = reference.serialize(); + + assertThat(BlobReference.isBlobReference(bytes)).isTrue(); + assertThat(BlobReference.isBlobReference(null)).isFalse(); + assertThat(BlobReference.isBlobReference(new byte[] {1, 2, 3})).isFalse(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/types/InternalRowToSizeVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/types/InternalRowToSizeVisitorTest.java index cfdae649c190..728eb84e56e9 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/InternalRowToSizeVisitorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/InternalRowToSizeVisitorTest.java @@ -36,6 +36,11 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** Test for InternalRowToSizeVisitor. */ public class InternalRowToSizeVisitorTest { @@ -192,4 +197,18 @@ void testCalculatorSize() { Assertions.assertThat(feildSizeCalculator.get(23).apply(row, 23)).isEqualTo(0); } + + @Test + void testBlobRefSizeUsesSerializedReferenceBytes() { + DataGetters row = mock(DataGetters.class); + byte[] referenceBytes = new byte[] {1, 2, 3, 4}; + when(row.isNullAt(0)).thenReturn(false); + when(row.getBinary(0)).thenReturn(referenceBytes); + + int size = new InternalRowToSizeVisitor().visit(DataTypes.BLOB_REF()).apply(row, 0); + + Assertions.assertThat(size).isEqualTo(referenceBytes.length); + verify(row).getBinary(0); + verify(row, never()).getBlob(0); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 7f8715ab0846..652017a85e2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import java.util.function.Supplier; @@ -106,6 +106,10 @@ public DataFileMeta result() throws IOException { ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(); String externalPath = isExternalPath ? path.toString() : null; + List extraFiles = new ArrayList<>(); + if (indexResult.independentIndexFile() != null) { + extraFiles.add(indexResult.independentIndexFile()); + } return DataFileMeta.forAppend( path.getName(), fileSize, @@ -114,9 +118,7 @@ public DataFileMeta result() throws IOException { seqNumCounter.getValue() - super.recordCount(), seqNumCounter.getValue() - 1, schemaId, - indexResult.independentIndexFile() == null - ? Collections.emptyList() - : Collections.singletonList(indexResult.independentIndexFile()), + extraFiles, indexResult.embeddedIndexBytes(), fileSource, statsPair.getKey(), diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 271709c47ef5..cab6a28d7fac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -165,6 +165,7 @@ public static void validateTableSchema(TableSchema schema) { FileFormat fileFormat = FileFormat.fromIdentifier(options.formatType(), new Options(schema.options())); RowType tableRowType = new RowType(schema.fields()); + validateNestedBlobRefFields(tableRowType); Set blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options); validateBlobExternalStorageFields(tableRowType, options, blobDescriptorFields); @@ -672,19 +673,22 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) List fields = schema.fields(); List blobNames = fields.stream() - .filter(field -> field.type().is(DataTypeRoot.BLOB)) + .filter( + field -> + field.type().is(DataTypeRoot.BLOB) + || field.type().is(DataTypeRoot.BLOB_REF)) .map(DataField::name) .collect(Collectors.toList()); if (!blobNames.isEmpty()) { checkArgument( options.dataEvolutionEnabled(), - "Data evolution config must enabled for table with BLOB type column."); + "Data evolution config must enabled for table with BLOB or BLOB_REF type column."); checkArgument( fields.size() > blobNames.size(), - "Table with BLOB type column must have other normal columns."); + "Table with BLOB or BLOB_REF type column must have other normal columns."); checkArgument( blobNames.stream().noneMatch(schema.partitionKeys()::contains), - "The BLOB type column can not be part of partition keys."); + "The BLOB or BLOB_REF type column can not be part of partition keys."); } FileFormat vectorFileFormat = vectorFileFormat(options); @@ -702,6 +706,49 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) } } + private static void validateNestedBlobRefFields(RowType rowType) { + for (DataField field : rowType.getFields()) { + checkArgument( + !containsNestedBlobRef(field.type()), + "Nested BLOB_REF type is not supported. Field '%s' contains a nested BLOB_REF.", + field.name()); + } + } + + private static boolean containsNestedBlobRef(DataType dataType) { + switch (dataType.getTypeRoot()) { + case ARRAY: + DataType arrayElementType = ((ArrayType) dataType).getElementType(); + return arrayElementType.is(DataTypeRoot.BLOB_REF) + || containsNestedBlobRef(arrayElementType); + case MULTISET: + DataType multisetElementType = ((MultisetType) dataType).getElementType(); + return multisetElementType.is(DataTypeRoot.BLOB_REF) + || containsNestedBlobRef(multisetElementType); + case MAP: + MapType mapType = (MapType) dataType; + return mapType.getKeyType().is(DataTypeRoot.BLOB_REF) + || containsNestedBlobRef(mapType.getKeyType()) + || mapType.getValueType().is(DataTypeRoot.BLOB_REF) + || containsNestedBlobRef(mapType.getValueType()); + case ROW: + for (DataField field : ((RowType) dataType).getFields()) { + if (field.type().is(DataTypeRoot.BLOB_REF) + || containsNestedBlobRef(field.type())) { + return true; + } + } + return false; + case VECTOR: + DataType vectorElementType = + ((org.apache.paimon.types.VectorType) dataType).getElementType(); + return vectorElementType.is(DataTypeRoot.BLOB_REF) + || containsNestedBlobRef(vectorElementType); + default: + return false; + } + } + private static Set validateBlobDescriptorFields(RowType rowType, CoreOptions options) { Set blobFieldNames = rowType.getFields().stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 327810b881bc..8cb70824cd45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -123,7 +123,8 @@ public InnerTableRead newRead() { new AppendTableRawFileSplitReadProvider( () -> store().newRead(), config)); } - return new AppendTableRead(providerFactories, schema()); + return new AppendTableRead( + providerFactories, schema(), catalogEnvironment().catalogContext()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index a2fee49bfb88..411bcf6767de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -147,7 +147,10 @@ protected BiConsumer nonPartitionFilterConsumer() { @Override public InnerTableRead newRead() { return new KeyValueTableRead( - () -> store().newRead(), () -> store().newBatchRawFileRead(), schema()); + () -> store().newRead(), + () -> store().newBatchRawFileRead(), + schema(), + catalogEnvironment().catalogContext()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java index ca5af88f40bc..cdf76db4f737 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java @@ -18,20 +18,30 @@ package org.apache.paimon.table.source; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.TableQueryAuthResult; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobReference; +import org.apache.paimon.data.BlobReferenceResolver; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.UnresolvedBlob; import org.apache.paimon.disk.IOManager; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateProjectionConverter; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BlobReferenceLookup; import org.apache.paimon.utils.ListUtils; import org.apache.paimon.utils.ProjectedRow; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -45,9 +55,11 @@ public abstract class AbstractDataTableRead implements InnerTableRead { private boolean executeFilter = false; private Predicate predicate; private final TableSchema schema; + private final CatalogContext catalogContext; - public AbstractDataTableRead(TableSchema schema) { + public AbstractDataTableRead(TableSchema schema, CatalogContext catalogContext) { this.schema = schema; + this.catalogContext = catalogContext; } public abstract void applyReadType(RowType readType); @@ -96,6 +108,20 @@ public final RecordReader createReader(Split split) throws IOExcept split = authSplit.split(); authResult = authSplit.authResult(); } + + // Check if this split has BLOB_REF fields that need resolving + if (catalogContext != null) { + RowType rowType = this.readType == null ? schema.logicalRowType() : this.readType; + int[] blobRefFields = + rowType.getFields().stream() + .filter(field -> field.type().is(DataTypeRoot.BLOB_REF)) + .mapToInt(field -> rowType.getFieldIndex(field.name())) + .toArray(); + if (blobRefFields.length > 0) { + return createBlobRefReader(split, authResult, blobRefFields); + } + } + RecordReader reader; if (authResult == null) { reader = reader(split); @@ -105,10 +131,51 @@ public final RecordReader createReader(Split split) throws IOExcept if (executeFilter) { reader = executeFilter(reader); } - return reader; } + private RecordReader createBlobRefReader( + Split split, @Nullable TableQueryAuthResult authResult, int[] blobRefFields) + throws IOException { + // Pre-scan: read the split once to collect all BlobReferences via getBlob() + LinkedHashSet references = new LinkedHashSet<>(); + RecordReader prescanReader = + authResult == null ? reader(split) : authedReader(split, authResult); + try { + prescanReader.forEachRemaining( + row -> { + for (int field : blobRefFields) { + if (row.isNullAt(field)) { + continue; + } + Blob blob = row.getBlob(field); + if (blob instanceof UnresolvedBlob) { + references.add(((UnresolvedBlob) blob).reference()); + } + } + }); + } finally { + prescanReader.close(); + } + + // Build the resolver from collected references + List refList = new ArrayList<>(references); + BlobReferenceResolver resolver = + BlobReferenceLookup.createResolver(catalogContext, refList); + + // Second pass: wrap each row so getBlob() resolves UnresolvedBlob → real Blob + RecordReader reader = + authResult == null ? reader(split) : authedReader(split, authResult); + if (executeFilter) { + reader = executeFilter(reader); + } + Set blobRefFieldSet = new HashSet<>(); + for (int f : blobRefFields) { + blobRefFieldSet.add(f); + } + return reader.transform(row -> new BlobRefResolvingRow(row, blobRefFieldSet, resolver)); + } + private RecordReader authedReader(Split split, TableQueryAuthResult authResult) throws IOException { RecordReader reader; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java index 1a9ed9b4bee2..388ef9344c1e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.source; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.InternalRow; import org.apache.paimon.operation.MergeFileSplitRead; import org.apache.paimon.operation.SplitRead; @@ -51,8 +52,9 @@ public final class AppendTableRead extends AbstractDataTableRead { public AppendTableRead( List> providerFactories, - TableSchema schema) { - super(schema); + TableSchema schema, + CatalogContext catalogContext) { + super(schema, catalogContext); this.readProviders = providerFactories.stream() .map(factory -> factory.apply(this::config)) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobRefResolvingRow.java b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobRefResolvingRow.java new file mode 100644 index 000000000000..fe0d1988a7fe --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobRefResolvingRow.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobReference; +import org.apache.paimon.data.BlobReferenceHolder; +import org.apache.paimon.data.BlobReferenceResolver; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.UnresolvedBlob; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.types.RowKind; + +import java.io.IOException; +import java.util.Set; + +/** + * {@link InternalRow} wrapper that resolves {@link UnresolvedBlob} to real {@link Blob} via a + * {@link BlobReferenceResolver} when {@link #getBlob(int)} is called. + */ +class BlobRefResolvingRow implements InternalRow { + + private final InternalRow wrapped; + private final Set blobRefFields; + private final BlobReferenceResolver resolver; + + BlobRefResolvingRow( + InternalRow wrapped, Set blobRefFields, BlobReferenceResolver resolver) { + this.wrapped = wrapped; + this.blobRefFields = blobRefFields; + this.resolver = resolver; + } + + @Override + public int getFieldCount() { + return wrapped.getFieldCount(); + } + + @Override + public RowKind getRowKind() { + return wrapped.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + wrapped.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return wrapped.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return wrapped.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return wrapped.getByte(pos); + } + + @Override + public short getShort(int pos) { + return wrapped.getShort(pos); + } + + @Override + public int getInt(int pos) { + return wrapped.getInt(pos); + } + + @Override + public long getLong(int pos) { + return wrapped.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return wrapped.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return wrapped.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return wrapped.getString(pos); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + return wrapped.getDecimal(pos, precision, scale); + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + return wrapped.getTimestamp(pos, precision); + } + + @Override + public byte[] getBinary(int pos) { + return wrapped.getBinary(pos); + } + + @Override + public Variant getVariant(int pos) { + return wrapped.getVariant(pos); + } + + @Override + public Blob getBlob(int pos) { + Blob blob = wrapped.getBlob(pos); + if (blobRefFields.contains(pos) && blob instanceof UnresolvedBlob) { + BlobReference reference = ((UnresolvedBlob) blob).reference(); + Blob resolved = resolver.resolve(reference); + return new ResolvedBlobRef(reference, resolved); + } + return blob; + } + + /** + * A resolved blob that carries both the original {@link BlobReference} (for re-serialization) + * and the resolved {@link Blob} (for data access). + */ + static class ResolvedBlobRef implements Blob, BlobReferenceHolder { + + private final BlobReference reference; + private final Blob resolved; + + ResolvedBlobRef(BlobReference reference, Blob resolved) { + this.reference = reference; + this.resolved = resolved; + } + + public BlobReference reference() { + return reference; + } + + @Override + public byte[] toData() { + return resolved.toData(); + } + + @Override + public BlobDescriptor toDescriptor() { + return resolved.toDescriptor(); + } + + @Override + public SeekableInputStream newInputStream() throws IOException { + return resolved.newInputStream(); + } + } + + @Override + public InternalRow getRow(int pos, int numFields) { + return wrapped.getRow(pos, numFields); + } + + @Override + public InternalArray getArray(int pos) { + return wrapped.getArray(pos); + } + + @Override + public InternalVector getVector(int pos) { + return wrapped.getVector(pos); + } + + @Override + public InternalMap getMap(int pos) { + return wrapped.getMap(pos); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index fda7d70ffdf6..ac83737afdc8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.MergeFileSplitRead; @@ -63,8 +64,9 @@ public final class KeyValueTableRead extends AbstractDataTableRead { public KeyValueTableRead( Supplier mergeReadSupplier, Supplier batchRawReadSupplier, - TableSchema schema) { - super(schema); + TableSchema schema, + CatalogContext catalogContext) { + super(schema, catalogContext); this.readProviders = Arrays.asList( new PrimaryKeyTableRawFileSplitReadProvider( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BlobReferenceLookup.java b/paimon-core/src/main/java/org/apache/paimon/utils/BlobReferenceLookup.java new file mode 100644 index 000000000000..ea16ada4d8db --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BlobReferenceLookup.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.util.Comparator; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobReference; +import org.apache.paimon.data.BlobReferenceResolver; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * Utilities for resolving {@link BlobReference} through table metadata. + * + *

The preload phase only caches lightweight {@link BlobDescriptor} (uri + offset + length) + * rather than the actual blob data, so memory usage stays small even when a data file contains a + * large number of blob references. + */ +public class BlobReferenceLookup { + + private static final Logger LOG = LoggerFactory.getLogger(BlobReferenceLookup.class); + + /** + * Creates a resolver backed by a preloaded descriptor cache. The given references are batch + * scanned from the upstream tables to obtain their {@link BlobDescriptor}s. On a cache hit the + * descriptor is used to construct a {@link Blob} directly. On a cache miss the resolver falls + * back to scanning the upstream table individually. + */ + public static BlobReferenceResolver createResolver( + CatalogContext catalogContext, List references) { + if (references.isEmpty()) { + return createResolver(catalogContext); + } + Map cached = preloadDescriptors(catalogContext, references); + UriReaderFactory uriReaderFactory = new UriReaderFactory(catalogContext); + if (cached.isEmpty()) { + return createResolver(catalogContext); + } + return reference -> { + BlobDescriptor descriptor = cached.get(reference); + if (descriptor != null) { + return Blob.fromDescriptor(uriReaderFactory.create(descriptor.uri()), descriptor); + } + return resolve(catalogContext, reference); + }; + } + + /** Creates a simple resolver that scans the upstream table for each reference individually. */ + public static BlobReferenceResolver createResolver(CatalogContext catalogContext) { + return reference -> resolve(catalogContext, reference); + } + + public static Blob resolve(CatalogContext catalogContext, BlobReference reference) { + try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) { + Table table = catalog.getTable(Identifier.fromString(reference.tableName())); + if (!table.rowType().containsField(reference.fieldId())) { + throw new IllegalArgumentException( + "Cannot find blob fieldId " + + reference.fieldId() + + " in upstream table " + + reference.tableName() + + "."); + } + int fieldPos = table.rowType().getFieldIndexByFieldId(reference.fieldId()); + + ReadBuilder readBuilder = + table.newReadBuilder() + .withProjection(new int[] {fieldPos}) + .withRowRanges( + Collections.singletonList( + new Range(reference.rowId(), reference.rowId()))); + + try (RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan())) { + RecordReader.RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + try { + InternalRow row; + while ((row = batch.next()) != null) { + return row.getBlob(0); + } + } finally { + batch.releaseBatch(); + } + } + } + + throw new IllegalStateException( + "Cannot resolve blob reference for table " + + reference.tableName() + + ", rowId " + + reference.rowId() + + ", fieldId " + + reference.fieldId() + + "."); + } catch (Exception e) { + throw new RuntimeException("Failed to resolve blob reference.", e); + } + } + + private static Map preloadDescriptors( + CatalogContext catalogContext, List references) { + try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) { + return loadReferencedDescriptors(catalog, references); + } catch (Exception e) { + LOG.warn("Failed to preload blob descriptors. Falling back to per-reference scan.", e); + return Collections.emptyMap(); + } + } + + private static Map loadReferencedDescriptors( + Catalog catalog, Collection references) throws Exception { + Map grouped = new HashMap<>(); + for (BlobReference reference : references) { + grouped.computeIfAbsent(reference.tableName(), TableReferences::new).add(reference); + } + Map resolved = new HashMap<>(); + for (TableReferences tableReferences : grouped.values()) { + loadTableDescriptors(catalog, tableReferences, resolved); + } + return resolved; + } + + private static void loadTableDescriptors( + Catalog catalog, + TableReferences tableReferences, + Map resolved) + throws Exception { + Table table = catalog.getTable(Identifier.fromString(tableReferences.tableName)); + + List fields = new ArrayList<>(tableReferences.referencesByField.size()); + TreeSet rowIds = new TreeSet<>(); + for (Map.Entry> entry : + tableReferences.referencesByField.entrySet()) { + int fieldId = entry.getKey(); + if (!table.rowType().containsField(fieldId)) { + throw new IllegalArgumentException( + "Cannot find blob fieldId " + + fieldId + + " in upstream table " + + tableReferences.tableName + + "."); + } + int fieldPos = table.rowType().getFieldIndexByFieldId(fieldId); + fields.add(new FieldRead(fieldId, fieldPos, table.rowType().getFields().get(fieldPos))); + for (BlobReference reference : entry.getValue()) { + rowIds.add(reference.rowId()); + } + } + + Collections.sort(fields, Comparator.comparingInt(left -> left.fieldPos)); + + List readFields = new ArrayList<>(fields.size()); + for (FieldRead field : fields) { + readFields.add(field.field); + } + + ReadBuilder readBuilder = + table.newReadBuilder() + .withReadType(SpecialFields.rowTypeWithRowId(new RowType(readFields))) + .withRowRanges(Range.toRanges(rowIds)); + + try (RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan())) { + RecordReader.RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + try { + InternalRow row; + while ((row = batch.next()) != null) { + long rowId = row.getLong(fields.size()); + for (int i = 0; i < fields.size(); i++) { + Blob blob = row.getBlob(i); + if (blob != null) { + resolved.put( + new BlobReference( + tableReferences.tableName, + fields.get(i).fieldId, + rowId), + blob.toDescriptor()); + } + } + } + } finally { + batch.releaseBatch(); + } + } + } + } + + private static class TableReferences { + private final String tableName; + private final Map> referencesByField = new HashMap<>(); + + private TableReferences(String tableName) { + this.tableName = tableName; + } + + private void add(BlobReference reference) { + referencesByField + .computeIfAbsent(reference.fieldId(), unused -> new ArrayList<>()) + .add(reference); + } + } + + private static class FieldRead { + private final int fieldId; + private final int fieldPos; + private final DataField field; + + private FieldRead(int fieldId, int fieldPos, DataField field) { + this.fieldId = fieldId; + this.fieldPos = fieldPos; + this.field = field; + } + } + + private BlobReferenceLookup() {} +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index e3019485bb2d..90394253dc8a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobReference; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; @@ -39,6 +40,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableWrite; @@ -751,6 +754,112 @@ void testRenameBlobColumnShouldFail() throws Exception { .hasMessageContaining("Cannot rename BLOB column"); } + @Test + public void testBlobRefE2E() throws Exception { + // 1. Create upstream table with BLOB field and write data + String upstreamTableName = "UpstreamBlob"; + Schema.Builder upstreamSchema = Schema.newBuilder(); + upstreamSchema.column("id", DataTypes.INT()); + upstreamSchema.column("name", DataTypes.STRING()); + upstreamSchema.column("image", DataTypes.BLOB()); + upstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + upstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + upstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + catalog.createTable(identifier(upstreamTableName), upstreamSchema.build(), true); + + FileStoreTable upstreamTable = + (FileStoreTable) catalog.getTable(identifier(upstreamTableName)); + + byte[] imageBytes1 = randomBytes(); + byte[] imageBytes2 = randomBytes(); + + BatchWriteBuilder upstreamWriteBuilder = upstreamTable.newBatchWriteBuilder(); + try (BatchTableWrite write = upstreamWriteBuilder.newWrite(); + BatchTableCommit commit = upstreamWriteBuilder.newCommit()) { + write.write( + GenericRow.of(1, BinaryString.fromString("row1"), new BlobData(imageBytes1))); + write.write( + GenericRow.of(2, BinaryString.fromString("row2"), new BlobData(imageBytes2))); + commit.commit(write.prepareCommit()); + } + + // 2. Get field ID for the "image" blob column + int imageFieldId = + upstreamTable.rowType().getFields().stream() + .filter(f -> f.name().equals("image")) + .findFirst() + .orElseThrow(() -> new RuntimeException("image field not found")) + .id(); + + // Read upstream with _ROW_ID to get actual row IDs + RowTrackingTable upstreamRowTracking = new RowTrackingTable(upstreamTable); + // schema: 0=id, 1=name, 2=image, 3=_ROW_ID, 4=_SEQUENCE_NUMBER + ReadBuilder rowIdReader = + upstreamRowTracking.newReadBuilder().withProjection(new int[] {0, 2, 3}); + // maps: upstream id -> (rowId, blobData) + java.util.Map idToRowId = new java.util.HashMap<>(); + java.util.Map idToBlob = new java.util.HashMap<>(); + rowIdReader + .newRead() + .createReader(rowIdReader.newScan().plan()) + .forEachRemaining( + row -> { + int id = row.getInt(0); + byte[] blobData = row.getBlob(1).toData(); + long rowId = row.getLong(2); + idToRowId.put(id, rowId); + idToBlob.put(id, blobData); + }); + assertThat(idToRowId.size()).isEqualTo(2); + + // 3. Create downstream table with BLOB_REF field + String downstreamTableName = "DownstreamRef"; + Schema.Builder downstreamSchema = Schema.newBuilder(); + downstreamSchema.column("id", DataTypes.INT()); + downstreamSchema.column("label", DataTypes.STRING()); + downstreamSchema.column("image_ref", DataTypes.BLOB_REF()); + downstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + downstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + downstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + catalog.createTable(identifier(downstreamTableName), downstreamSchema.build(), true); + + FileStoreTable downstreamTable = + (FileStoreTable) catalog.getTable(identifier(downstreamTableName)); + + // 4. Write blob references using actual row IDs from upstream + String upstreamFullName = database + "." + upstreamTableName; + BlobReference ref1 = new BlobReference(upstreamFullName, imageFieldId, idToRowId.get(1)); + BlobReference ref2 = new BlobReference(upstreamFullName, imageFieldId, idToRowId.get(2)); + + BatchWriteBuilder downstreamWriteBuilder = downstreamTable.newBatchWriteBuilder(); + try (BatchTableWrite write = downstreamWriteBuilder.newWrite(); + BatchTableCommit commit = downstreamWriteBuilder.newCommit()) { + write.write( + GenericRow.of(1, BinaryString.fromString("label1"), Blob.fromReference(ref1))); + write.write( + GenericRow.of(2, BinaryString.fromString("label2"), Blob.fromReference(ref2))); + commit.commit(write.prepareCommit()); + } + + // 5. Read downstream table — blob references should resolve from upstream + ReadBuilder downstreamReadBuilder = downstreamTable.newReadBuilder(); + List downstreamRows = new ArrayList<>(); + downstreamReadBuilder + .newRead() + .createReader(downstreamReadBuilder.newScan().plan()) + .forEachRemaining(downstreamRows::add); + + assertThat(downstreamRows.size()).isEqualTo(2); + + for (InternalRow row : downstreamRows) { + int id = row.getInt(0); + Blob blob = row.getBlob(2); + assertThat(blob).isNotNull(); + // The resolved blob data should match the original upstream data + assertThat(blob.toData()).isEqualTo(idToBlob.get(id)); + } + } + private void createExternalStorageTable() throws Exception { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RowDataFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RowDataFileWriterTest.java new file mode 100644 index 000000000000..d2db4ec3e2c9 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/io/RowDataFileWriterTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobReference; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.options.Options; +import org.apache.paimon.statistics.NoneSimpleColStatsCollector; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.BlobRefType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.LongCounter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowDataFileWriter}. */ +public class RowDataFileWriterTest { + + private static final RowType SCHEMA = + RowType.of( + new DataType[] {new IntType(), new BlobRefType()}, new String[] {"id", "ref"}); + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testWriteBlobRefDataFile() throws Exception { + FileFormat fileFormat = FileFormat.fromIdentifier("parquet", new Options()); + Path dataPath = new Path(tempDir.toUri().toString(), "data.parquet"); + BlobReference reference = new BlobReference("default.upstream", 7, 11L); + + RowDataFileWriter writer = + new RowDataFileWriter( + LocalFileIO.create(), + RollingFileWriter.createFileWriterContext( + fileFormat, + SCHEMA, + new SimpleColStatsCollector.Factory[] { + NoneSimpleColStatsCollector::new, + NoneSimpleColStatsCollector::new + }, + CoreOptions.FILE_COMPRESSION.defaultValue()), + dataPath, + SCHEMA, + 0L, + () -> new LongCounter(0), + new FileIndexOptions(), + FileSource.APPEND, + false, + false, + false, + SCHEMA.getFieldNames()); + + writer.write(GenericRow.of(1, Blob.fromReference(reference))); + writer.close(); + + DataFileMeta meta = writer.result(); + + // No .blobref extra files should be produced + assertThat(meta.extraFiles().stream().noneMatch(f -> f.endsWith(".blobref"))).isTrue(); + assertThat(meta.rowCount()).isEqualTo(1); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/BlobRefSchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/BlobRefSchemaValidationTest.java new file mode 100644 index 000000000000..fcc345d719a9 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/schema/BlobRefSchemaValidationTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.schema; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyList; +import static org.apache.paimon.CoreOptions.BUCKET; +import static org.apache.paimon.schema.SchemaValidation.validateTableSchema; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for BLOB_REF-specific schema validation. */ +public class BlobRefSchemaValidationTest { + + @Test + public void testNestedBlobRefTableSchema() { + Map options = new HashMap<>(); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + options.put(BUCKET.key(), String.valueOf(-1)); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField( + 1, + "f1", + DataTypes.ROW(DataTypes.FIELD(2, "nested", DataTypes.BLOB_REF())))); + + assertThatThrownBy( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + emptyList(), + emptyList(), + options, + ""))) + .hasMessage( + "Nested BLOB_REF type is not supported. Field 'f1' contains a nested BLOB_REF."); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/BlobReferenceLookupTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/BlobReferenceLookupTest.java new file mode 100644 index 000000000000..a787b2cc6547 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/BlobReferenceLookupTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobReference; +import org.apache.paimon.data.BlobReferenceResolver; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.OptionalLong; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Tests for {@link BlobReferenceLookup}. */ +public class BlobReferenceLookupTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testResolveByFieldIdAfterRename() throws Exception { + CatalogContext context = mock(CatalogContext.class); + Catalog catalog = mock(Catalog.class); + Table table = mock(Table.class); + ReadBuilder readBuilder = mock(ReadBuilder.class); + TableScan scan = mock(TableScan.class); + TableScan.Plan plan = mock(TableScan.Plan.class); + + byte[] payload = new byte[] {1, 2, 3}; + InternalRow row = GenericRow.of(Blob.fromData(payload)); + Split split = new TestSplit(); + + when(catalog.getTable(any())).thenReturn(table); + when(table.rowType()) + .thenReturn( + new RowType( + Collections.singletonList( + new DataField(7, "blob_renamed", DataTypes.BLOB())))); + when(table.newReadBuilder()).thenReturn(readBuilder); + when(readBuilder.withProjection(any(int[].class))).thenReturn(readBuilder); + when(readBuilder.withRowRanges(anyList())).thenReturn(readBuilder); + when(readBuilder.newRead()) + .thenReturn(new ListRowTableRead(split, Collections.singletonList(row))); + when(readBuilder.newScan()).thenReturn(scan); + when(scan.plan()).thenReturn(plan); + when(plan.splits()).thenReturn(Collections.singletonList(split)); + + BlobReference reference = new BlobReference("default.source", 7, 12L); + + try (MockedStatic mockedCatalogFactory = + Mockito.mockStatic(CatalogFactory.class)) { + mockedCatalogFactory + .when(() -> CatalogFactory.createCatalog(context)) + .thenReturn(catalog); + + Blob resolved = BlobReferenceLookup.resolve(context, reference); + assertThat(resolved.toData()).isEqualTo(payload); + } + } + + @Test + public void testCreateResolverPreloadsDescriptors() throws Exception { + CatalogContext context = CatalogContext.create(new Path(tempDir.toUri().toString())); + Catalog catalog = mock(Catalog.class); + Table table = mock(Table.class); + ReadBuilder readBuilder = mock(ReadBuilder.class); + TableScan scan = mock(TableScan.class); + TableScan.Plan plan = mock(TableScan.Plan.class); + + byte[] leftPayload = new byte[] {1, 2, 3}; + byte[] rightPayload = new byte[] {4, 5, 6}; + Path leftBlobPath = new Path(tempDir.toUri().toString(), "left.blob"); + Path rightBlobPath = new Path(tempDir.toUri().toString(), "right.blob"); + LocalFileIO fileIO = LocalFileIO.create(); + try (org.apache.paimon.fs.PositionOutputStream out = + fileIO.newOutputStream(leftBlobPath, false)) { + out.write(leftPayload); + } + try (org.apache.paimon.fs.PositionOutputStream out = + fileIO.newOutputStream(rightBlobPath, false)) { + out.write(rightPayload); + } + + BlobDescriptor leftDescriptor = + new BlobDescriptor(leftBlobPath.toString(), 0L, leftPayload.length); + BlobDescriptor rightDescriptor = + new BlobDescriptor(rightBlobPath.toString(), 0L, rightPayload.length); + + Blob leftBlob = + Blob.fromDescriptor(UriReader.fromFile(LocalFileIO.create()), leftDescriptor); + Blob rightBlob = + Blob.fromDescriptor(UriReader.fromFile(LocalFileIO.create()), rightDescriptor); + + BlobReference leftReference = new BlobReference("default.source", 7, 12L); + BlobReference rightReference = new BlobReference("default.source", 8, 12L); + + Split readerSplit = new TestSplit(); + InternalRow preloadRow = GenericRow.of(leftBlob, rightBlob, 12L); + + when(catalog.getTable(any())).thenReturn(table); + when(table.rowType()) + .thenReturn( + new RowType( + java.util.Arrays.asList( + new DataField(7, "blob_left", DataTypes.BLOB()), + new DataField(8, "blob_right", DataTypes.BLOB())))); + when(table.newReadBuilder()).thenReturn(readBuilder); + when(readBuilder.withReadType(any(RowType.class))).thenReturn(readBuilder); + when(readBuilder.withRowRanges(anyList())).thenReturn(readBuilder); + when(readBuilder.newRead()) + .thenAnswer( + invocation -> + new ListRowTableRead( + readerSplit, Collections.singletonList(preloadRow))); + when(readBuilder.newScan()).thenReturn(scan); + when(scan.plan()).thenReturn(plan); + when(plan.splits()).thenReturn(Collections.singletonList(readerSplit)); + + try (MockedStatic mockedCatalogFactory = + Mockito.mockStatic(CatalogFactory.class)) { + mockedCatalogFactory + .when(() -> CatalogFactory.createCatalog(context)) + .thenReturn(catalog); + + BlobReferenceResolver resolver = + BlobReferenceLookup.createResolver( + context, java.util.Arrays.asList(leftReference, rightReference)); + + assertThat(resolver.resolve(leftReference).toData()).isEqualTo(leftPayload); + assertThat(resolver.resolve(rightReference).toData()).isEqualTo(rightPayload); + assertThat(resolver.resolve(new BlobReference("default.source", 7, 12L)).toData()) + .isEqualTo(leftPayload); + + verify(table, times(1)).newReadBuilder(); + } + } + + private static class ListRowTableRead implements TableRead { + + private final Split split; + private final List rows; + + private ListRowTableRead(Split split, List rows) { + this.split = split; + this.rows = rows; + } + + @Override + public TableRead withMetricRegistry(MetricRegistry registry) { + return this; + } + + @Override + public TableRead executeFilter() { + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) { + return new RecordReader() { + + private boolean emitted = false; + + @Nullable + @Override + public RecordIterator readBatch() { + if (emitted) { + return null; + } + emitted = true; + return new RecordIterator() { + + private int next = 0; + + @Nullable + @Override + public InternalRow next() { + return next < rows.size() ? rows.get(next++) : null; + } + + @Override + public void releaseBatch() {} + }; + } + + @Override + public void close() throws IOException {} + }; + } + } + + private static class TestSplit implements Split { + + @Override + public long rowCount() { + return 1L; + } + + @Override + public OptionalLong mergedRowCount() { + return OptionalLong.of(1L); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java index 92ae714ca577..21cb45cf0d93 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java @@ -21,6 +21,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -155,6 +156,11 @@ public LogicalType visit(BlobType blobType) { org.apache.flink.table.types.logical.VarBinaryType.MAX_LENGTH); } + @Override + public LogicalType visit(BlobRefType blobRefType) { + return new org.apache.flink.table.types.logical.VarBinaryType(BlobType.DEFAULT_SIZE); + } + @Override public LogicalType visit(ArrayType arrayType) { return new org.apache.flink.table.types.logical.ArrayType( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 5f59063668a5..80d1d14d2d2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -153,6 +153,7 @@ import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; +import static org.apache.paimon.flink.LogicalTypeConversion.toBlobRefType; import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType; import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; @@ -1038,6 +1039,7 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { Map options = new HashMap<>(catalogTable.getOptions()); List blobFields = CoreOptions.blobField(options); + List blobRefFields = CoreOptions.blobRefField(options); if (!blobFields.isEmpty()) { checkArgument( options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()), @@ -1047,6 +1049,15 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { + CoreOptions.DATA_EVOLUTION_ENABLED.key() + "'"); } + if (!blobRefFields.isEmpty()) { + checkArgument( + options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()), + "When setting '" + + CoreOptions.BLOB_REF_FIELD.key() + + "', you must also set '" + + CoreOptions.DATA_EVOLUTION_ENABLED.key() + + "'"); + } // Serialize virtual columns and watermark to the options // This is what Flink SQL needs, the storage itself does not need them options.putAll(columnOptions(schema)); @@ -1077,9 +1088,13 @@ private static org.apache.paimon.types.DataType resolveDataType( org.apache.flink.table.types.logical.LogicalType logicalType, Map options) { List blobFields = CoreOptions.blobField(options); + List blobRefFields = CoreOptions.blobRefField(options); if (blobFields.contains(fieldName)) { return toBlobType(logicalType); } + if (blobRefFields.contains(fieldName)) { + return toBlobRefType(logicalType); + } Set vectorFields = CoreOptions.vectorField(options); if (vectorFields.contains(fieldName)) { return toVectorType(fieldName, logicalType, options); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java index ad2132e8c1eb..71f5304643a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; -import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -32,7 +32,6 @@ import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; -import org.apache.paimon.utils.UriReader; import org.apache.paimon.utils.UriReaderFactory; import org.apache.flink.table.data.DecimalData; @@ -55,7 +54,8 @@ public FlinkRowWrapper(org.apache.flink.table.data.RowData row) { public FlinkRowWrapper(org.apache.flink.table.data.RowData row, CatalogContext catalogContext) { this.row = row; - this.uriReaderFactory = new UriReaderFactory(catalogContext); + this.uriReaderFactory = + catalogContext == null ? null : new UriReaderFactory(catalogContext); } @Override @@ -142,15 +142,7 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - byte[] bytes = row.getBinary(pos); - boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); - if (blobDes) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); - UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); - return Blob.fromDescriptor(uriReader, blobDescriptor); - } else { - return new BlobData(bytes); - } + return BlobUtils.fromBytes(row.getBinary(pos), uriReaderFactory, null); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java index 556dbd95ff31..1cd0168c332d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -53,6 +54,13 @@ public static BlobType toBlobType(LogicalType logicalType) { return new BlobType(); } + public static BlobRefType toBlobRefType(LogicalType logicalType) { + checkArgument( + logicalType instanceof BinaryType || logicalType instanceof VarBinaryType, + "Expected BinaryType or VarBinaryType, but got: " + logicalType); + return new BlobRefType(); + } + public static VectorType toVectorType( String fieldName, org.apache.flink.table.types.logical.LogicalType logicalType, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java index e4870de58336..76d83393b3d4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.lookup; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.MergeFileSplitRead; @@ -41,8 +42,9 @@ public class LookupCompactDiffRead extends AbstractDataTableRead { private final SplitRead fullPhaseMergeRead; private final SplitRead incrementalDiffRead; - public LookupCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema schema) { - super(schema); + public LookupCompactDiffRead( + MergeFileSplitRead mergeRead, TableSchema schema, CatalogContext catalogContext) { + super(schema, catalogContext); this.incrementalDiffRead = new IncrementalCompactDiffSplitRead(mergeRead); this.fullPhaseMergeRead = SplitRead.convert( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java index 353c99d2b1f1..4e355db1e8f2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java @@ -64,7 +64,9 @@ public InnerTableRead newRead() { return wrapped.newRead(); case COMPACT_DELTA_MONITOR: return new LookupCompactDiffRead( - ((KeyValueFileStore) wrapped.store()).newRead(), wrapped.schema()); + ((KeyValueFileStore) wrapped.store()).newRead(), + wrapped.schema(), + wrapped.catalogEnvironment().catalogContext()); default: throw new UnsupportedOperationException( "Unknown lookup stream scan mode: " + lookupScanMode.name()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java index b49b9adb9476..eff22090735c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java @@ -277,7 +277,8 @@ private FileStoreRecordIterator(@Nullable RowType rowType) { private Set blobFieldIndex(RowType rowType) { Set result = new HashSet<>(); for (int i = 0; i < rowType.getFieldCount(); i++) { - if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) { + if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB + || rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB_REF) { result.add(i); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 8114ac17eb38..a0d5f98ba0e0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -152,7 +152,7 @@ public KeyValueTableRead createReadWithKey() { FileFormatDiscover.of(options), pathFactory, options); - return new KeyValueTableRead(() -> read, () -> rawFileRead, null); + return new KeyValueTableRead(() -> read, () -> rawFileRead, schema, null); } public List writeFiles( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java index eb55a86c5b66..251a973b3e0c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java @@ -106,6 +106,7 @@ public static Schema convertToSchema( case BINARY: case VARBINARY: case BLOB: + case BLOB_REF: Schema binary = SchemaBuilder.builder().bytesType(); return nullable ? nullableSchema(binary) : binary; case TIMESTAMP_WITHOUT_TIME_ZONE: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java index 9aa663df8946..47f12d45684b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java @@ -89,10 +89,13 @@ public FieldReaderFactory(@Nullable UriReader uriReader) { @Override public FieldReader primitive(Schema primitive, DataType type) { - if (primitive.getType() == Schema.Type.BYTES - && type != null - && type.getTypeRoot() == DataTypeRoot.BLOB) { - return new BlobDescriptorBytesReader(uriReader); + if (primitive.getType() == Schema.Type.BYTES && type != null) { + if (type.getTypeRoot() == DataTypeRoot.BLOB) { + return new BlobDescriptorBytesReader(uriReader); + } + if (type.getTypeRoot() == DataTypeRoot.BLOB_REF) { + return BYTES_READER; + } } return AvroSchemaVisitor.super.primitive(primitive, type); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java index 6eb81cb7f5d1..d48becb7ed46 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.DataGetters; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; @@ -93,12 +94,24 @@ public class FieldWriterFactory implements AvroSchemaVisitor { } }; + private static final FieldWriter BLOB_REFERENCE_BYTES_WRITER = + (container, i, encoder) -> { + Blob blob = container.getBlob(i); + if (blob == null) { + throw new IllegalArgumentException("Null blob_ref is not allowed."); + } + encoder.writeBytes(BlobUtils.serializeBlobReference(blob)); + }; + @Override public FieldWriter primitive(Schema primitive, DataType type) { - if (primitive.getType() == Schema.Type.BYTES - && type != null - && type.getTypeRoot() == DataTypeRoot.BLOB) { - return BLOB_DESCRIPTOR_BYTES_WRITER; + if (primitive.getType() == Schema.Type.BYTES && type != null) { + if (type.getTypeRoot() == DataTypeRoot.BLOB) { + return BLOB_DESCRIPTOR_BYTES_WRITER; + } + if (type.getTypeRoot() == DataTypeRoot.BLOB_REF) { + return BLOB_REFERENCE_BYTES_WRITER; + } } return AvroSchemaVisitor.super.primitive(primitive, type); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java index 4b80827d1bb3..4fe8b5999b3f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java @@ -68,6 +68,7 @@ static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { return TypeDescription.createBoolean() .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case BLOB: + case BLOB_REF: return TypeDescription.createBinary() .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case VARBINARY: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java index 443c2410cbd2..8248937b5f14 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -264,6 +265,18 @@ public FieldWriter visit(BlobType blobType) { }; } + @Override + public FieldWriter visit(BlobRefType blobRefType) { + return (rowId, column, getters, columnId) -> { + BytesColumnVector vector = (BytesColumnVector) column; + byte[] bytes = + org.apache.paimon.data.BlobUtils.serializeBlobReference( + getters.getBlob(columnId)); + vector.setVal(rowId, bytes, 0, bytes.length); + return bytes.length; + }; + } + @Override public FieldWriter visit(DecimalType decimalType) { return (rowId, column, getters, columnId) -> { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 37a69fe9aebd..102aa0b2b709 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -91,6 +91,7 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, case BINARY: case VARBINARY: case BLOB: + case BLOB_REF: return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) .named(name) .withId(fieldId); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java index a2741f869ab6..a285491219a6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java @@ -101,6 +101,7 @@ public static WritableColumnVector createWritableColumnVector( case VARCHAR: case VARBINARY: case BLOB: + case BLOB_REF: return new HeapBytesVector(batchSize); case BINARY: return new HeapBytesVector(batchSize); @@ -178,7 +179,8 @@ public static ColumnVector createReadableColumnVector( case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return new ParquetTimestampVector(writableVector); case BLOB: - // Physical representation is bytes; higher-level Row#getBlob() handles descriptor. + case BLOB_REF: + // Physical representation is bytes; higher-level Row#getBlob() materializes them. return writableVector; case ARRAY: return new CastedArrayColumnVector( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java index 0abf78fd2747..2f2582b401e6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java @@ -36,6 +36,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -230,6 +231,11 @@ public UpdaterFactory visit(BlobType blobType) { }; } + @Override + public UpdaterFactory visit(BlobRefType blobRefType) { + return visit(new BlobType(blobRefType.isNullable())); + } + @Override public UpdaterFactory visit(ArrayType arrayType) { throw new RuntimeException("Array type is not supported"); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java index a7241147e68a..b668a0d912c9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; @@ -109,6 +110,8 @@ private FieldWriter createWriter(DataType t, Type type) { return new BinaryWriter(); case BLOB: return new BlobDescriptorWriter(); + case BLOB_REF: + return new BlobReferenceWriter(); case DECIMAL: DecimalType decimalType = (DecimalType) t; return createDecimalWriter(decimalType.getPrecision(), decimalType.getScale()); @@ -344,6 +347,25 @@ private void writeBlob(Blob blob) { } } + /** Writes BLOB_REF as serialized {@link org.apache.paimon.data.BlobReference} bytes. */ + private class BlobReferenceWriter implements FieldWriter { + + @Override + public void write(InternalRow row, int ordinal) { + writeBlob(row.getBlob(ordinal)); + } + + @Override + public void write(InternalArray arrayData, int ordinal) { + throw new UnsupportedOperationException("BLOB_REF in array is not supported."); + } + + private void writeBlob(Blob blob) { + byte[] bytes = BlobUtils.serializeBlobReference(blob); + recordConsumer.addBinary(Binary.fromReusedByteArray(bytes)); + } + } + private class IntWriter implements FieldWriter { @Override diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index dacd12f492c1..70c865f0b5f9 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -29,6 +29,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -414,6 +415,11 @@ public Operators.Column visit(BlobType blobType) { throw new UnsupportedOperationException(); } + @Override + public Operators.Column visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException(); + } + // ===================== can not support ========================= @Override diff --git a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java index cb3d7de27da5..797756224b2a 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.format.avro; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobReference; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileFormat; @@ -76,6 +78,7 @@ public void testSupportedDataTypes() { dataFields.add(new DataField(index++, "varchar_type", DataTypes.VARCHAR(20))); dataFields.add(new DataField(index++, "binary_type", DataTypes.BINARY(20))); dataFields.add(new DataField(index++, "varbinary_type", DataTypes.VARBINARY(20))); + dataFields.add(new DataField(index++, "blob_ref_type", DataTypes.BLOB_REF())); dataFields.add(new DataField(index++, "timestamp_type", DataTypes.TIMESTAMP(3))); dataFields.add(new DataField(index++, "date_type", DataTypes.DATE())); dataFields.add(new DataField(index++, "decimal_type", DataTypes.DECIMAL(10, 3))); @@ -210,4 +213,29 @@ void testCompression() throws IOException { .hasMessageContaining("Unrecognized codec: unsupported"); } } + + @Test + void testBlobRefRoundTrip() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "blob_ref", DataTypes.BLOB_REF())); + BlobReference reference = new BlobReference("default.t", 7, 11L); + Blob blob = Blob.fromReference(reference); + + FileFormat format = new AvroFileFormat(new FormatContext(new Options(), 1024, 1024)); + LocalFileIO fileIO = LocalFileIO.create(); + Path file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + + try (PositionOutputStream out = fileIO.newOutputStream(file, false)) { + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.addElement(GenericRow.of(blob)); + writer.close(); + } + + try (RecordReader reader = + format.createReaderFactory(rowType, rowType, new ArrayList<>()) + .createReader( + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)))) { + InternalRow row = reader.readBatch().next(); + assertThat(row.getBinary(0)).isEqualTo(reference.serialize()); + } + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java index 5669ac33d443..5c36e14cfd1a 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcTypeUtilTest.java @@ -60,6 +60,7 @@ void testDataTypeToOrcType() { test("varchar(123)", DataTypes.VARCHAR(123)); test("string", DataTypes.STRING()); test("binary", DataTypes.BYTES()); + test("binary", DataTypes.BLOB_REF()); test("tinyint", DataTypes.TINYINT()); test("smallint", DataTypes.SMALLINT()); test("int", DataTypes.INT()); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java index bfbdaed7c4a3..a312d8867b53 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetSchemaConverterTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.types.RowType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -106,4 +107,15 @@ public void testPaimonParquetSchemaConvert() { RowType rowType = convertToPaimonRowType(messageType); assertThat(ALL_TYPES).isEqualTo(rowType); } + + @Test + public void testBlobRefSchemaConvertToBinary() { + MessageType messageType = + convertToParquetMessageType( + new RowType( + Arrays.asList(new DataField(0, "blob_ref", DataTypes.BLOB_REF())))); + + assertThat(messageType.getType("blob_ref").asPrimitiveType().getPrimitiveTypeName()) + .isEqualTo(PrimitiveType.PrimitiveTypeName.BINARY); + } } diff --git a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java index e4799341d1dc..bcb48dffb485 100644 --- a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java +++ b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java @@ -22,6 +22,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -235,6 +236,11 @@ public TypeInfo visit(BlobType blobType) { return TypeInfoFactory.binaryTypeInfo; } + @Override + public TypeInfo visit(BlobRefType blobRefType) { + return TypeInfoFactory.binaryTypeInfo; + } + @Override protected TypeInfo defaultMethod(org.apache.paimon.types.DataType dataType) { throw new UnsupportedOperationException("Unsupported type: " + dataType); diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java index 64b4e2887f82..9421de6b60a3 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -188,6 +189,11 @@ public Void visit(BlobType blobType) { return null; } + @Override + public Void visit(BlobRefType blobRefType) { + return null; + } + @Override public Void visit(ArrayType arrayType) { return null; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 6ef853eda870..7a07f4fb5ef8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -41,6 +41,7 @@ import org.apache.paimon.table.iceberg.IcebergTable; import org.apache.paimon.table.lance.LanceTable; import org.apache.paimon.table.object.ObjectTable; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -457,6 +458,7 @@ private Schema toInitialSchema( StructType schema, Transform[] partitions, Map properties) { Map normalizedProperties = new HashMap<>(properties); List blobFields = CoreOptions.blobField(properties); + List blobRefFields = CoreOptions.blobRefField(properties); String provider = properties.get(TableCatalog.PROP_PROVIDER); if (!usePaimon(provider)) { if (isFormatTable(provider)) { @@ -495,6 +497,11 @@ private Schema toInitialSchema( field.dataType() instanceof org.apache.spark.sql.types.BinaryType, "The type of blob field must be binary"); type = new BlobType(); + } else if (blobRefFields.contains(name)) { + checkArgument( + field.dataType() instanceof org.apache.spark.sql.types.BinaryType, + "The type of blob ref field must be binary"); + type = new BlobRefType(); } else { type = toPaimonType(field.dataType()).copy(field.nullable()); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index ffd077741c9f..14b7331be319 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; -import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -32,7 +32,6 @@ import org.apache.paimon.data.variant.Variant; import org.apache.paimon.spark.util.shim.TypeUtils$; import org.apache.paimon.types.RowKind; -import org.apache.paimon.utils.UriReader; import org.apache.paimon.utils.UriReaderFactory; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -246,15 +245,7 @@ public Blob getBlob(int pos) { if (actualPos == -1 || internalRow.isNullAt(actualPos)) { return null; } - byte[] bytes = internalRow.getBinary(actualPos); - boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); - if (blobDes) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); - UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); - return Blob.fromDescriptor(uriReader, blobDescriptor); - } else { - return new BlobData(bytes); - } + return BlobUtils.fromBytes(internalRow.getBinary(actualPos), uriReaderFactory, null); } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index 36b5624ff52f..3bc132810788 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; -import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobUtils; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -38,7 +38,6 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.DateTimeUtils; -import org.apache.paimon.utils.UriReader; import org.apache.paimon.utils.UriReaderFactory; import org.apache.spark.sql.Row; @@ -72,7 +71,8 @@ public SparkRow(RowType type, Row row, RowKind rowkind, CatalogContext catalogCo this.type = type; this.row = row; this.rowKind = rowkind; - this.uriReaderFactory = new UriReaderFactory(catalogContext); + this.uriReaderFactory = + catalogContext == null ? null : new UriReaderFactory(catalogContext); } @Override @@ -161,15 +161,7 @@ public Variant getVariant(int i) { @Override public Blob getBlob(int i) { - byte[] bytes = row.getAs(i); - boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); - if (blobDes) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); - UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); - return Blob.fromDescriptor(uriReader, blobDescriptor); - } else { - return new BlobData(bytes); - } + return BlobUtils.fromBytes(row.getAs(i), uriReaderFactory, null); } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java index dc2f8b30acab..823534deea7c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; import org.apache.paimon.types.BlobType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; import org.apache.paimon.types.DataField; @@ -167,6 +168,11 @@ public DataType visit(BlobType blobType) { return DataTypes.BinaryType; } + @Override + public DataType visit(BlobRefType blobRefType) { + return DataTypes.BinaryType; + } + @Override public DataType visit(VarBinaryType varBinaryType) { return DataTypes.BinaryType; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala index ae504b24120f..4c34814df4f2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala @@ -48,7 +48,10 @@ object SparkInternalRow { var i: Int = 0 val blobFields = new mutable.HashSet[Int]() while (i < rowType.getFieldCount) { - if (rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB)) { + if ( + rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB) || + rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB_REF) + ) { blobFields.add(i) } i += 1 diff --git a/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexFileFormat.java b/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexFileFormat.java index eda8a3944ca9..d6191af2204d 100644 --- a/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexFileFormat.java +++ b/paimon-vortex/paimon-vortex-format/src/main/java/org/apache/paimon/format/vortex/VortexFileFormat.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobRefType; import org.apache.paimon.types.BlobType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; @@ -187,6 +188,12 @@ public Void visit(BlobType blobType) { "Vortex file format does not support type BLOB"); } + @Override + public Void visit(BlobRefType blobRefType) { + throw new UnsupportedOperationException( + "Vortex file format does not support type BLOB_REF"); + } + @Override public Void visit(ArrayType arrayType) { return null;