Skip to content
Draft

test #6558

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.iceberg.catalog.TableIdentifier;

/**
* Hive-native bucketing metadata ({@code CLUSTERED BY}) stored in HMS for an Iceberg table.
*/
public final class HiveIcebergHiveBucketingMetadata {

private final int numBuckets;
private final List<String> bucketCols;
private final int bucketingVersion;

private HiveIcebergHiveBucketingMetadata(int numBuckets, List<String> bucketCols, int bucketingVersion) {
this.numBuckets = numBuckets;
this.bucketCols = bucketCols == null ? Collections.emptyList() : bucketCols;
this.bucketingVersion = bucketingVersion;
}

public static HiveIcebergHiveBucketingMetadata fromHmsTable(org.apache.hadoop.hive.ql.metadata.Table table) {
return new HiveIcebergHiveBucketingMetadata(
table.getNumBuckets(), table.getBucketCols(), table.getBucketingVersion());
}

public static HiveIcebergHiveBucketingMetadata load(Configuration conf, String tableName)
throws SerDeException {
if (conf == null || tableName == null || tableName.isEmpty()) {
throw new SerDeException("Cannot load Hive bucketing metadata without configuration and table name");
}
try {
TableIdentifier tableId = TableIdentifier.parse(tableName);
String dbName = tableId.namespace().length() > 0 ?
tableId.namespace().level(0) :
Warehouse.DEFAULT_DATABASE_NAME;
org.apache.hadoop.hive.ql.metadata.Table table =
Hive.get(conf, HiveIcebergHiveBucketingMetadata.class).getTable(dbName, tableId.name());
return fromHmsTable(table);
} catch (HiveException e) {
throw new SerDeException("Failed to load Hive bucketing metadata for table " + tableName, e);
}
}

public boolean hasHiveBucketing() {
return numBuckets > 0 && !bucketCols.isEmpty();
}

public int numBuckets() {
return numBuckets;
}

public List<String> bucketCols() {
return bucketCols;
}

public int bucketingVersion() {
return bucketingVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private static HiveIcebergWriter writer(JobConf jc) {
setWriterLevelConfiguration(jc, table);
boolean shouldAddRowLineageColumns = jc.getBoolean(SessionStateUtil.ROW_LINEAGE, false);

return WriterBuilder.builderFor(table, jc::get)
return WriterBuilder.builderFor(table, jc, jc::get)
.queryId(jc.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname))
.attemptID(taskAttemptID)
.addRowLineageColumns(shouldAddRowLineageColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
Expand All @@ -55,6 +57,7 @@
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.mr.hive.writer.BucketAwareContainer;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -76,8 +79,15 @@
private Collection<String> partitionColumns;
private final Map<ObjectInspector, Deserializer> deserializers = Maps.newHashMapWithExpectedSize(1);
private final Container<Record> row = new Container<>();
private final BucketAwareContainer<Record> bucketAwareRow = new BucketAwareContainer<>();
private final Map<String, String> jobConf = Maps.newHashMap();

private boolean hiveBucketingRouteEnabled = false;
private int hiveBucketingNumBuckets = -1;
private int hiveBucketingVersion = 2;
private List<String> hiveBucketingBucketCols = ImmutableList.of();
private final Map<ObjectInspector, HiveBucketComputer> bucketComputers = Maps.newHashMapWithExpectedSize(1);

@Override
public void initialize(Configuration conf, Properties serDeProperties,
Properties partitionProperties) throws SerDeException {
Expand All @@ -93,64 +103,104 @@
// executor, but serDeProperties are populated by HiveIcebergStorageHandler.configureInputJobProperties() and
// the resulting properties are serialized and distributed to the executors

initTableSchemaAndPartitionColumns(conf, serDeProperties);
this.projectedSchema =
projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf);
initHiveBucketingRoute(conf, serDeProperties);
configureDynamicPartitionSorting(conf, serDeProperties);
this.inspector = createInspector(projectedSchema);
}

private void initTableSchemaAndPartitionColumns(Configuration conf, Properties serDeProperties)
throws SerDeException {
if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
this.tableSchema = SchemaParser.fromJson(serDeProperties.getProperty(InputFormatConfig.TABLE_SCHEMA));
if (serDeProperties.get(InputFormatConfig.PARTITION_SPEC) != null) {
PartitionSpec spec =
PartitionSpecParser.fromJson(tableSchema, serDeProperties.getProperty(InputFormatConfig.PARTITION_SPEC));
this.partitionColumns = spec.fields().stream().map(PartitionField::name).collect(Collectors.toList());
} else {
this.partitionColumns = ImmutableList.of();
}
initTableSchemaFromSerdeProperties(serDeProperties);
return;
}
try {
Table table = IcebergTableUtil.getTable(conf, serDeProperties);
this.tableSchema = table.schema();
this.partitionColumns = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema));
} catch (Exception e) {
initTableSchemaOnLoadFailure(conf, serDeProperties, e);
}
}

private void initTableSchemaFromSerdeProperties(Properties serDeProperties) {
this.tableSchema = SchemaParser.fromJson(serDeProperties.getProperty(InputFormatConfig.TABLE_SCHEMA));
if (serDeProperties.get(InputFormatConfig.PARTITION_SPEC) != null) {
PartitionSpec spec =
PartitionSpecParser.fromJson(tableSchema, serDeProperties.getProperty(InputFormatConfig.PARTITION_SPEC));
this.partitionColumns = spec.fields().stream().map(PartitionField::name).collect(Collectors.toList());
} else {
try {
Table table = IcebergTableUtil.getTable(conf, serDeProperties);
// always prefer the original table schema if there is one
this.tableSchema = table.schema();
this.partitionColumns = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema));
} catch (Exception e) {
// During table creation we might not have the schema information from the Iceberg table, nor from the HMS
// table. In this case we have to generate the schema using the serdeProperties which contains the info
// provided in the CREATE TABLE query.

if (serDeProperties.get("metadata_location") != null) {
// If metadata location is provided, extract the schema details from it.
try (FileIO fileIO = new HadoopFileIO(conf)) {
TableMetadata metadata = TableMetadataParser.read(fileIO, serDeProperties.getProperty("metadata_location"));
this.tableSchema = metadata.schema();
this.partitionColumns =
metadata.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
// Validate no schema is provided via create command
if (!getColumnNames().isEmpty() || !getPartitionColumnNames().isEmpty()) {
throw new SerDeException("Column names can not be provided along with metadata location.");
}
}
} else {
boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
// If we can not load the table try the provided hive schema
this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
// This is only for table creation, it is ok to have an empty partition column list
this.partitionColumns = ImmutableList.of();
}
if (e instanceof NoSuchTableException &&
HiveTableUtil.isCtas(serDeProperties) && !Catalogs.hiveCatalog(conf, serDeProperties)) {
throw new SerDeException(CTAS_EXCEPTION_MSG);
}
this.partitionColumns = ImmutableList.of();
}
}

private void initTableSchemaOnLoadFailure(Configuration conf, Properties serDeProperties, Exception loadException)
throws SerDeException {
if (serDeProperties.get("metadata_location") != null) {
initTableSchemaFromMetadataLocation(conf, serDeProperties);
} else {
boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
this.tableSchema = hiveSchemaOrThrow(loadException, autoConversion);
this.partitionColumns = ImmutableList.of();
}
if (loadException instanceof NoSuchTableException &&
HiveTableUtil.isCtas(serDeProperties) && !Catalogs.hiveCatalog(conf, serDeProperties)) {
throw new SerDeException(CTAS_EXCEPTION_MSG);
}
}

private void initTableSchemaFromMetadataLocation(Configuration conf, Properties serDeProperties)
throws SerDeException {
try (FileIO fileIO = new HadoopFileIO(conf)) {
TableMetadata metadata = TableMetadataParser.read(fileIO, serDeProperties.getProperty("metadata_location"));
this.tableSchema = metadata.schema();
this.partitionColumns =
metadata.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
if (!getColumnNames().isEmpty() || !getPartitionColumnNames().isEmpty()) {
throw new SerDeException("Column names can not be provided along with metadata location.");
}
} catch (SerDeException e) {
throw e;
} catch (Exception e) {
throw new SerDeException("Failed to load schema from metadata location", e);
}
}

this.projectedSchema =
projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf);
private void initHiveBucketingRoute(Configuration conf, Properties serDeProperties) throws SerDeException {
final String tableName = serDeProperties.getProperty(Catalogs.NAME);
if (tableName == null) {
return;
}
hiveBucketingRouteEnabled =
HiveCustomStorageHandlerUtils.getIcebergHiveBucketingRouteEnabled(conf::get, tableName);
if (!hiveBucketingRouteEnabled) {
return;
}
HiveIcebergHiveBucketingMetadata bucketingMetadata = HiveIcebergHiveBucketingMetadata.load(conf, tableName);
if (!bucketingMetadata.hasHiveBucketing()) {
throw new SerDeException("Bucket routing enabled but HMS table has no CLUSTERED BY metadata: " + tableName);
}
hiveBucketingNumBuckets = bucketingMetadata.numBuckets();
hiveBucketingVersion = bucketingMetadata.bucketingVersion();
hiveBucketingBucketCols = bucketingMetadata.bucketCols();
}

if (!IcebergTableUtil.isFanoutEnabled(serDeProperties::getProperty)) {
private static void configureDynamicPartitionSorting(Configuration conf, Properties serDeProperties) {
if (!IcebergTableUtil.isFanoutEnabled(Maps.fromProperties(serDeProperties))) {
// ClusteredWriter requires that records are ordered by partition keys.
// Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting.
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD, 1);
}
HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict");
}

private static ObjectInspector createInspector(Schema schema) throws SerDeException {
try {
this.inspector = IcebergObjectInspector.create(projectedSchema);
return IcebergObjectInspector.create(schema);
} catch (Exception e) {
throw new SerDeException(e);
}
Expand Down Expand Up @@ -212,6 +262,21 @@
}

row.set(deserializer.deserialize(o));
if (hiveBucketingRouteEnabled) {
HiveBucketComputer computer = bucketComputers.get(objectInspector);
if (computer == null) {
if (!(objectInspector instanceof StructObjectInspector)) {
throw new IllegalStateException("Expected StructObjectInspector for bucketing route but got: " +
objectInspector.getTypeName());

Check warning on line 270 in iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Fix this access on a value that can be null.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7ueDb5AmHqUhh9Rw4J&open=AZ7ueDb5AmHqUhh9Rw4J&pullRequest=6558
}
computer = new HiveBucketComputer((StructObjectInspector) objectInspector, hiveBucketingBucketCols,
tableSchemaColumnNames(), hiveBucketingNumBuckets, hiveBucketingVersion);
bucketComputers.put(objectInspector, computer);
}
int bucketId = computer.bucketId(o);
bucketAwareRow.set(row.get(), bucketId);
return bucketAwareRow;
}
return row;
}

Expand Down Expand Up @@ -280,6 +345,73 @@
return tableSchema;
}

private List<String> tableSchemaColumnNames() {
if (tableSchema == null) {
return ImmutableList.of();
}
return tableSchema.columns().stream()
.map(col -> col.name())

Check warning on line 353 in iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this lambda with method reference 'NestedField::name'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7ueDb5AmHqUhh9Rw4I&open=AZ7ueDb5AmHqUhh9Rw4I&pullRequest=6558
.collect(Collectors.toList());

Check warning on line 354 in iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this usage of 'Stream.collect(Collectors.toList())' with 'Stream.toList()' and ensure that the list is unmodified.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7ueDb5AmHqUhh9Rw4H&open=AZ7ueDb5AmHqUhh9Rw4H&pullRequest=6558
}

private static final class HiveBucketComputer {
private final StructObjectInspector inspector;
private final List<StructField> bucketFields;
private final ObjectInspector[] bucketFieldInspectors;
private final int numBuckets;
private final int bucketingVersion;

private HiveBucketComputer(
StructObjectInspector inspector, List<String> bucketColNames, List<String> tableColumnNames,
int numBuckets, int bucketingVersion) {
this.inspector = inspector;
this.numBuckets = numBuckets;
this.bucketingVersion = bucketingVersion;
this.bucketFields = Lists.newArrayListWithCapacity(bucketColNames.size());
for (String bucketColName : bucketColNames) {
bucketFields.add(resolveBucketField(inspector, bucketColName, tableColumnNames));
}
this.bucketFieldInspectors = new ObjectInspector[bucketFields.size()];
for (int i = 0; i < bucketFields.size(); i++) {
bucketFieldInspectors[i] = bucketFields.get(i).getFieldObjectInspector();
}
}

private static StructField resolveBucketField(
StructObjectInspector inspector, String bucketColName, List<String> tableColumnNames) {
List<? extends StructField> rowFields = inspector.getAllStructFieldRefs();
for (StructField rowField : rowFields) {
if (bucketColName.equalsIgnoreCase(rowField.getFieldName())) {
return rowField;
}
}
// SDPO reduce path rows use internal names (_col0, ...); map logical bucket cols via table order.
if (tableColumnNames != null && !tableColumnNames.isEmpty()) {
for (int i = 0; i < tableColumnNames.size(); i++) {
if (bucketColName.equalsIgnoreCase(tableColumnNames.get(i)) && i < rowFields.size()) {
return rowFields.get(i);
}
}
}
throw new IllegalStateException(
"Bucket column not found in row inspector: " + bucketColName + ", fields: " + rowFields);
}

int bucketId(Object row) {
if (numBuckets <= 0) {
throw new IllegalStateException("Invalid numBuckets for bucketing route: " + numBuckets);
}
Object[] values = new Object[bucketFields.size()];
for (int i = 0; i < bucketFields.size(); i++) {
values[i] = inspector.getStructFieldData(row, bucketFields.get(i));
}
if (bucketingVersion == 1) {
return ObjectInspectorUtils.getBucketNumberOld(values, bucketFieldInspectors, numBuckets);
}
return ObjectInspectorUtils.getBucketNumber(values, bucketFieldInspectors, numBuckets);
}
}

private static Schema getSchemaWithRowLineage(Schema schema, Configuration conf) {
boolean rowLineage = Boolean.parseBoolean(conf.get(SessionStateUtil.ROW_LINEAGE));
if (rowLineage) {
Expand Down
Loading
Loading