Skip to content

Commit bd816ff

Browse files
ramitg254deniskuzZ
andauthored
HIVE-29376: Iceberg: Using partition spec in DESC FORMATTED sql is unsupported (#6259)
Co-authored-by: Denys Kuzmenko <deniskuzz@gmail.com>
1 parent 5c1a99a commit bd816ff

54 files changed

Lines changed: 1520 additions & 301 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.time.temporal.ChronoUnit;
3131
import java.util.List;
3232
import java.util.stream.Collectors;
33-
import org.apache.commons.lang3.ObjectUtils;
3433
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
3534
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
3635
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -111,9 +110,7 @@ private static Expression translate(ExpressionTree tree, List<PredicateLeaf> lea
111110
*/
112111
private static Expression translateLeaf(PredicateLeaf leaf) {
113112
TransformSpec transformSpec = TransformSpec.fromStringWithColumnName(leaf.getColumnName());
114-
String columnName = transformSpec.getColumnName();
115-
UnboundTerm<Object> column =
116-
ObjectUtils.defaultIfNull(toTerm(columnName, transformSpec), Expressions.ref(columnName));
113+
UnboundTerm<Object> column = SchemaUtils.toTerm(transformSpec);
117114

118115
switch (leaf.getOperator()) {
119116
case EQUALS:
@@ -144,30 +141,6 @@ private static Expression translateLeaf(PredicateLeaf leaf) {
144141
}
145142
}
146143

147-
public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transformSpec) {
148-
if (transformSpec == null) {
149-
return null;
150-
}
151-
switch (transformSpec.getTransformType()) {
152-
case YEAR:
153-
return Expressions.year(columnName);
154-
case MONTH:
155-
return Expressions.month(columnName);
156-
case DAY:
157-
return Expressions.day(columnName);
158-
case HOUR:
159-
return Expressions.hour(columnName);
160-
case TRUNCATE:
161-
return Expressions.truncate(columnName, transformSpec.getTransformParam());
162-
case BUCKET:
163-
return Expressions.bucket(columnName, transformSpec.getTransformParam());
164-
case IDENTITY:
165-
return null;
166-
default:
167-
throw new UnsupportedOperationException("Unknown transformSpec: " + transformSpec);
168-
}
169-
}
170-
171144
// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
172145
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
173146
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
package org.apache.iceberg.mr.hive;
2121

2222
import java.io.IOException;
23-
import java.net.URLDecoder;
24-
import java.nio.charset.StandardCharsets;
2523
import java.util.Collection;
2624
import java.util.Collections;
2725
import java.util.EnumSet;
@@ -35,7 +33,6 @@
3533
import java.util.stream.Collectors;
3634
import java.util.stream.Stream;
3735
import org.apache.commons.collections4.CollectionUtils;
38-
import org.apache.commons.lang3.ObjectUtils;
3936
import org.apache.hadoop.conf.Configuration;
4037
import org.apache.hadoop.fs.FileSystem;
4138
import org.apache.hadoop.fs.Path;
@@ -137,7 +134,6 @@
137134
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
138135
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
139136
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
140-
import org.apache.iceberg.types.Conversions;
141137
import org.apache.iceberg.types.Type;
142138
import org.apache.iceberg.types.Types;
143139
import org.apache.iceberg.util.Pair;
@@ -574,56 +570,47 @@ public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab
574570

575571
@Override
576572
public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table table, EnvironmentContext context,
577-
List<String> partNames)
578-
throws MetaException {
573+
List<String> partNames) throws MetaException {
574+
579575
this.tableProperties = IcebergTableProperties.getTableProperties(table, conf);
580576
this.icebergTable = Catalogs.loadTable(conf, tableProperties);
581-
Map<String, PartitionField> partitionFieldMap = icebergTable.spec().fields().stream()
582-
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
583-
Expression finalExp = CollectionUtils.isEmpty(partNames) ? Expressions.alwaysTrue() : Expressions.alwaysFalse();
584-
if (partNames != null) {
585-
for (String partName : partNames) {
586-
Map<String, String> specMap = Warehouse.makeSpecFromName(partName);
587-
Expression subExp = Expressions.alwaysTrue();
588-
for (Map.Entry<String, String> entry : specMap.entrySet()) {
589-
// Since Iceberg encodes the values in UTF-8, we need to decode it.
590-
String partColValue = URLDecoder.decode(entry.getValue(), StandardCharsets.UTF_8);
591-
592-
if (partitionFieldMap.containsKey(entry.getKey())) {
593-
PartitionField partitionField = partitionFieldMap.get(entry.getKey());
594-
Type resultType = partitionField.transform().getResultType(icebergTable.schema()
595-
.findField(partitionField.sourceId()).type());
596-
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
597-
Object value = Conversions.fromPartitionString(resultType, partColValue);
598-
Iterable iterable = () -> Collections.singletonList(value).iterator();
599-
if (TransformSpec.TransformType.IDENTITY.equals(transformType)) {
600-
Expression boundPredicate = Expressions.in(partitionField.name(), iterable);
601-
subExp = Expressions.and(subExp, boundPredicate);
602-
} else {
603-
throw new MetaException(
604-
String.format("Partition transforms are not supported via truncate operation: %s", entry.getKey()));
605-
}
606-
} else {
607-
throw new MetaException(String.format("No partition column/transform name by the name: %s",
608-
entry.getKey()));
609-
}
610-
}
611-
finalExp = Expressions.or(finalExp, subExp);
612-
}
613-
}
577+
578+
Expression predicate = generateExprFromPartitionNames(partNames);
614579

615580
DeleteFiles delete = icebergTable.newDelete();
616581
String branchName = context.getProperties().get(Catalogs.SNAPSHOT_REF);
617582
if (branchName != null) {
618583
delete.toBranch(HiveUtils.getTableSnapshotRef(branchName));
619584
}
620-
delete.deleteFromRowFilter(finalExp);
585+
586+
delete.deleteFromRowFilter(predicate);
621587
delete.commit();
622588
context.putToProperties("truncateSkipDataDeletion", "true");
623589
}
624590

625-
@Override public boolean createHMSTableInHook() {
626-
return createHMSTableInHook;
591+
private Expression generateExprFromPartitionNames(List<String> partNames) throws MetaException {
592+
if (CollectionUtils.isEmpty(partNames)) {
593+
return Expressions.alwaysTrue();
594+
}
595+
596+
Map<String, PartitionField> partitionFields = icebergTable.spec().fields().stream()
597+
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
598+
Expression predicate = Expressions.alwaysFalse();
599+
600+
for (String partName : partNames) {
601+
try {
602+
Map<String, String> partitionSpec = Warehouse.makeSpecFromName(partName);
603+
Expression partitionExpr = IcebergTableUtil.generateExprForIdentityPartition(
604+
icebergTable, partitionSpec, partitionFields);
605+
606+
predicate = Expressions.or(predicate, partitionExpr);
607+
} catch (Exception e) {
608+
throw new MetaException(
609+
"Failed to generate expression for partition: " + partName + ". " + e.getMessage());
610+
}
611+
}
612+
613+
return predicate;
627614
}
628615

629616
private void alterTableProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable,
@@ -1029,10 +1016,7 @@ private static UnboundPredicate<Object> getPartitionPredicate(PartitionData part
10291016
String columName = schema.findField(field.sourceId()).name();
10301017
TransformSpec transformSpec = TransformSpec.fromString(field.transform().toString(), columName);
10311018

1032-
UnboundTerm<Object> partitionColumn =
1033-
ObjectUtils.defaultIfNull(HiveIcebergFilterFactory.toTerm(columName, transformSpec),
1034-
Expressions.ref(field.name()));
1035-
1019+
UnboundTerm<Object> partitionColumn = SchemaUtils.toTerm(transformSpec);
10361020
return Expressions.equal(partitionColumn, partitionData.get(index, Object.class));
10371021
}
10381022

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -866,8 +866,7 @@ public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.m
866866
return table.spec().fields().stream()
867867
.filter(f -> !f.transform().isVoid())
868868
.map(f -> {
869-
TransformSpec spec = IcebergTableUtil.getTransformSpec(
870-
table, f.transform().toString(), f.sourceId());
869+
TransformSpec spec = IcebergTableUtil.getTransformSpec(table, f.transform().toString(), f.sourceId());
871870
spec.setFieldName(f.name());
872871
return spec;
873872
})
@@ -882,8 +881,7 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
882881
e.getValue().fields().stream()
883882
.filter(f -> !f.transform().isVoid())
884883
.map(f -> {
885-
TransformSpec spec = IcebergTableUtil.getTransformSpec(
886-
table, f.transform().toString(), f.sourceId());
884+
TransformSpec spec = IcebergTableUtil.getTransformSpec(table, f.transform().toString(), f.sourceId());
887885
spec.setFieldName(f.name());
888886
return Pair.of(e.getKey(), spec);
889887
}))
@@ -893,9 +891,8 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
893891

894892
private List<TransformSpec> getSortTransformSpec(Table table) {
895893
return table.sortOrder().fields().stream().map(s ->
896-
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId())
897-
)
898-
.collect(Collectors.toList());
894+
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId()))
895+
.toList();
899896
}
900897

901898
@Override
@@ -2024,8 +2021,7 @@ public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20242021
* @param hmsTable A Hive table instance.
20252022
* @param partitionSpec Map containing partition specification given by user.
20262023
* @return true if we can perform metadata delete, otherwise false.
2027-
* @throws SemanticException Exception raised when a partition transform is being used
2028-
* or when partition column is not present in the table.
2024+
* @throws SemanticException Exception raised when partition column is not present in the table.
20292025
*/
20302026
@Override
20312027
public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec)
@@ -2037,13 +2033,16 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20372033
return false;
20382034
}
20392035

2040-
Expression finalExp = IcebergTableUtil.generateExpressionFromPartitionSpec(table, partitionSpec, true);
2041-
FindFiles.Builder builder = new FindFiles.Builder(table).withRecordsMatching(finalExp);
2036+
Expression partitionExpr = IcebergTableUtil.generateExprForIdentityPartition(
2037+
table, partitionSpec, true);
2038+
2039+
FindFiles.Builder builder = new FindFiles.Builder(table).withRecordsMatching(partitionExpr);
20422040
Set<DataFile> dataFiles = Sets.newHashSet(builder.collect());
2041+
20432042
boolean result = true;
20442043
for (DataFile dataFile : dataFiles) {
20452044
PartitionData partitionData = (PartitionData) dataFile.partition();
2046-
Expression residual = ResidualEvaluator.of(table.spec(), finalExp, false)
2045+
Expression residual = ResidualEvaluator.of(table.spec(), partitionExpr, false)
20472046
.residualFor(partitionData);
20482047
if (!residual.isEquivalentTo(Expressions.alwaysTrue())) {
20492048
result = false;
@@ -2056,8 +2055,7 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20562055
@Override
20572056
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20582057
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
2059-
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
2060-
List<String> partNames = IcebergTableUtil.getPartitionNames(table, partitionSpec, latestSpecOnly);
2058+
List<String> partNames = IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, latestSpecOnly);
20612059
return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, partNames);
20622060
}
20632061

@@ -2078,12 +2076,39 @@ public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable)
20782076
@Override
20792077
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
20802078
Map<String, String> partitionSpec, RewritePolicy policy) throws SemanticException {
2079+
20812080
validatePartSpec(table, partitionSpec, policy);
2081+
2082+
boolean isDescTable = SessionStateUtil.getQueryState(conf)
2083+
.map(QueryState::getHiveOperation)
2084+
.filter(op -> op == HiveOperation.DESCTABLE)
2085+
.isPresent();
2086+
2087+
if (!isDescTable) {
2088+
return createDummyPartitionHandle(table, partitionSpec);
2089+
}
2090+
2091+
Partition partition = IcebergTableUtil.getPartition(conf, table, partitionSpec);
2092+
2093+
// Populate basic statistics
2094+
if (partition != null) {
2095+
Map<String, String> stats = getBasicStatistics(Partish.buildFor(table, partition));
2096+
if (stats != null && !stats.isEmpty()) {
2097+
partition.getTPartition().setParameters(stats);
2098+
}
2099+
}
2100+
2101+
return partition;
2102+
}
2103+
2104+
private static DummyPartition createDummyPartitionHandle(
2105+
org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec)
2106+
throws SemanticException {
20822107
try {
2083-
String partName = Warehouse.makePartName(partitionSpec, false);
2084-
return new DummyPartition(table, partName, partitionSpec);
2108+
String partitionName = Warehouse.makePartName(partitionSpec, false);
2109+
return new DummyPartition(table, partitionName, partitionSpec);
20852110
} catch (MetaException e) {
2086-
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
2111+
throw new SemanticException("Unable to construct partition name", e);
20872112
}
20882113
}
20892114

@@ -2096,8 +2121,7 @@ public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
20962121
*/
20972122
public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20982123
Map<String, String> partitionSpec) throws SemanticException {
2099-
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
2100-
return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, false);
2124+
return IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, false);
21012125
}
21022126

21032127
/**

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public static void appendFiles(URI fromURI, String format, Table icebergTbl, boo
172172
if (isOverwrite) {
173173
DeleteFiles delete = transaction.newDelete();
174174
if (partitionSpec != null) {
175-
Expression partitionExpr =
176-
IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTbl, partitionSpec, true);
175+
Expression partitionExpr = IcebergTableUtil.generateExprForIdentityPartition(
176+
icebergTbl, partitionSpec, true);
177177
delete.deleteFromRowFilter(partitionExpr);
178178
} else {
179179
delete.deleteFromRowFilter(Expressions.alwaysTrue());

0 commit comments

Comments
 (0)