Skip to content

Commit 6b15bec

Browse files
authored
Core, Arrow: Implementation of ArrowFormatModel (#15258)
1 parent eb8fe53 commit 6b15bec

3 files changed

Lines changed: 56 additions & 20 deletions

File tree

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.arrow.vectorized;
20+
21+
import org.apache.arrow.vector.NullCheckingForGet;
22+
import org.apache.iceberg.formats.FormatModelRegistry;
23+
import org.apache.iceberg.parquet.ParquetFormatModel;
24+
25+
public class ArrowFormatModels {
26+
public static void register() {
27+
FormatModelRegistry.register(
28+
ParquetFormatModel.create(
29+
ColumnarBatch.class,
30+
Object.class,
31+
(schema, fileSchema, engineSchema, idToConstant) ->
32+
ArrowReader.VectorizedCombinedScanIterator.buildReader(
33+
schema,
34+
fileSchema,
35+
NullCheckingForGet.NULL_CHECKING_ENABLED /* setArrowValidityVector */)));
36+
}
37+
38+
private ArrowFormatModels() {}
39+
}

arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.stream.Collectors;
3030
import java.util.stream.Stream;
3131
import java.util.stream.StreamSupport;
32-
import org.apache.arrow.vector.NullCheckingForGet;
3332
import org.apache.arrow.vector.VectorSchemaRoot;
3433
import org.apache.arrow.vector.types.Types.MinorType;
3534
import org.apache.iceberg.CombinedScanTask;
@@ -40,13 +39,14 @@
4039
import org.apache.iceberg.encryption.EncryptedFiles;
4140
import org.apache.iceberg.encryption.EncryptedInputFile;
4241
import org.apache.iceberg.encryption.EncryptionManager;
42+
import org.apache.iceberg.formats.FormatModelRegistry;
43+
import org.apache.iceberg.formats.ReadBuilder;
4344
import org.apache.iceberg.io.CloseableGroup;
4445
import org.apache.iceberg.io.CloseableIterable;
4546
import org.apache.iceberg.io.CloseableIterator;
4647
import org.apache.iceberg.io.FileIO;
4748
import org.apache.iceberg.io.InputFile;
4849
import org.apache.iceberg.mapping.NameMappingParser;
49-
import org.apache.iceberg.parquet.Parquet;
5050
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
5151
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5252
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -189,8 +189,7 @@ public void close() throws IOException {
189189
* Reads the data file and returns an iterator of {@link VectorSchemaRoot}. Only Parquet data file
190190
* format is supported.
191191
*/
192-
private static final class VectorizedCombinedScanIterator
193-
implements CloseableIterator<ColumnarBatch> {
192+
static final class VectorizedCombinedScanIterator implements CloseableIterator<ColumnarBatch> {
194193

195194
private final Iterator<FileScanTask> fileItr;
196195
private final Map<String, InputFile> inputFiles;
@@ -324,19 +323,8 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
324323
InputFile location = getInputFile(task);
325324
Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
326325
if (task.file().format() == FileFormat.PARQUET) {
327-
Parquet.ReadBuilder builder =
328-
Parquet.read(location)
329-
.project(expectedSchema)
330-
.split(task.start(), task.length())
331-
.createBatchedReaderFunc(
332-
fileSchema ->
333-
buildReader(
334-
expectedSchema,
335-
fileSchema, /* setArrowValidityVector */
336-
NullCheckingForGet.NULL_CHECKING_ENABLED))
337-
.recordsPerBatch(batchSize)
338-
.filter(task.residual())
339-
.caseSensitive(caseSensitive);
326+
ReadBuilder<ColumnarBatch, ?> builder =
327+
FormatModelRegistry.readBuilder(FileFormat.PARQUET, ColumnarBatch.class, location);
340328

341329
if (reuseContainers) {
342330
builder.reuseContainers();
@@ -345,7 +333,14 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
345333
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
346334
}
347335

348-
iter = builder.build();
336+
iter =
337+
builder
338+
.project(expectedSchema)
339+
.split(task.start(), task.length())
340+
.recordsPerBatch(batchSize)
341+
.caseSensitive(caseSensitive)
342+
.filter(task.residual())
343+
.build();
349344
} else {
350345
throw new UnsupportedOperationException(
351346
"Format: " + task.file().format() + " not supported for batched reads");
@@ -376,7 +371,7 @@ private InputFile getInputFile(FileScanTask task) {
376371
* @param fileSchema Schema of the data file.
377372
* @param setArrowValidityVector Indicates whether to set the validity vector in Arrow vectors.
378373
*/
379-
private static ArrowBatchReader buildReader(
374+
static ArrowBatchReader buildReader(
380375
Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) {
381376
return (ArrowBatchReader)
382377
TypeWithSchemaVisitor.visit(

core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ private FormatModelRegistry() {}
5555
private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class);
5656
// The list of classes which are used for registering the reader and writer builders
5757
private static final List<String> CLASSES_TO_REGISTER =
58-
ImmutableList.of("org.apache.iceberg.data.GenericFormatModels");
58+
ImmutableList.of(
59+
"org.apache.iceberg.data.GenericFormatModels",
60+
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels");
5961

6062
// Format models indexed by file format and object model class
6163
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =

0 commit comments

Comments
 (0)