Skip to content

Commit a67523d

Browse files
committed
HIVE-29458: Iceberg: Sort expressions should not be added for distribution. Partition transforms are added for clustering (distribution and sorting)
1 parent 2fa85ab commit a67523d

6 files changed

Lines changed: 476 additions & 62 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -890,9 +890,9 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
890890
Pair::first, Collectors.mapping(Pair::second, Collectors.toList())));
891891
}
892892

893-
private List<TransformSpec> getSortTransformSpec(Table table) {
894-
return table.sortOrder().fields().stream().map(s ->
895-
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId()))
893+
private List<TransformSpec> getWriteSortTransformSpecs(Table table) {
894+
return table.sortOrder().fields().stream()
895+
.map(s -> IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId()))
896896
.toList();
897897
}
898898

@@ -913,11 +913,16 @@ public DynamicPartitionCtx createDPContext(
913913
hiveConf.getVar(ConfVars.DEFAULT_PARTITION_NAME),
914914
hiveConf.getIntVar(ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE));
915915

916+
// Add Iceberg partition transforms as custom partition expressions.
917+
// These are required for clustering by partition spec/values for clustered writers.
916918
if (table.spec().isPartitioned() &&
917-
hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) >= 0) {
918-
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getPartitionTransformSpec(hmsTable));
919+
hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) >= 0) {
920+
addCustomPartitionTransformExpressions(table, hmsTable, writeOperation, dpCtx,
921+
getPartitionTransformSpec(hmsTable));
919922
}
920923

924+
// Add write sort order expressions as custom sort expressions.
925+
// These are used ONLY for sorting within reducers, NOT for distribution.
921926
SortOrder sortOrder = table.sortOrder();
922927
if (sortOrder.isSorted()) {
923928
List<Integer> customSortPositions = Lists.newLinkedList();
@@ -943,7 +948,8 @@ public DynamicPartitionCtx createDPContext(
943948
}
944949
}
945950

946-
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getSortTransformSpec(table));
951+
addCustomWriteSortExpressions(table, hmsTable, writeOperation, dpCtx,
952+
getWriteSortTransformSpecs(table));
947953
}
948954

949955
// Even if table has no explicit sort order, honor z-order if configured
@@ -999,21 +1005,43 @@ private void addZOrderCustomExpr(Map<String, String> props, DynamicPartitionCtx
9991005
}
10001006
}
10011007

1002-
private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
1003-
Operation writeOperation, DynamicPartitionCtx dpCtx,
1004-
List<TransformSpec> transformSpecs) {
1008+
private void addCustomPartitionTransformExpressions(Table table,
1009+
org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation,
1010+
DynamicPartitionCtx dpCtx, List<TransformSpec> transformSpecs) {
1011+
Map<String, Integer> fieldOrderMap = buildFieldOrderMap(table);
1012+
int offset = getWriteRowOffset(hmsTable, writeOperation);
1013+
1014+
dpCtx.addCustomPartitionExpressions(transformSpecs.stream()
1015+
.map(spec -> IcebergTransformSortFunctionUtil.getCustomTransformExpr(
1016+
spec, fieldOrderMap.get(spec.getColumnName()) + offset))
1017+
.collect(Collectors.toList()));
1018+
}
1019+
1020+
private void addCustomWriteSortExpressions(Table table,
1021+
org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation,
1022+
DynamicPartitionCtx dpCtx, List<TransformSpec> transformSpecs) {
1023+
Map<String, Integer> fieldOrderMap = buildFieldOrderMap(table);
1024+
int offset = getWriteRowOffset(hmsTable, writeOperation);
1025+
1026+
dpCtx.addCustomSortExpressions(transformSpecs.stream()
1027+
.map(spec -> IcebergTransformSortFunctionUtil.getCustomTransformExpr(
1028+
spec, fieldOrderMap.get(spec.getColumnName()) + offset))
1029+
.collect(Collectors.toList()));
1030+
}
1031+
1032+
private Map<String, Integer> buildFieldOrderMap(Table table) {
10051033
List<Types.NestedField> fields = table.schema().columns();
10061034
Map<String, Integer> fieldOrderMap = Maps.newHashMapWithExpectedSize(fields.size());
10071035
for (int i = 0; i < fields.size(); ++i) {
10081036
fieldOrderMap.put(fields.get(i).name(), i);
10091037
}
1038+
return fieldOrderMap;
1039+
}
10101040

1011-
int offset = (shouldOverwrite(hmsTable, writeOperation) ?
1012-
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size();
1013-
1014-
dpCtx.addCustomSortExpressions(transformSpecs.stream().map(spec ->
1015-
IcebergTransformSortFunctionUtil.getCustomSortExprs(spec, fieldOrderMap.get(spec.getColumnName()) + offset)
1016-
).collect(Collectors.toList()));
1041+
private int getWriteRowOffset(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation) {
1042+
return (shouldOverwrite(hmsTable, writeOperation) ?
1043+
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA :
1044+
acidSelectColumns(hmsTable, writeOperation)).size();
10171045
}
10181046

10191047
@Override

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3838

3939
/**
40-
* A utility class which provides Iceberg transform sort functions.
40+
* Utility for building Iceberg transform expressions used by both partitioning and sorting.
4141
*/
4242
public final class IcebergTransformSortFunctionUtil {
4343

@@ -136,7 +136,7 @@ private IcebergTransformSortFunctionUtil() {
136136
}
137137
};
138138

139-
public static Function<List<ExprNodeDesc>, ExprNodeDesc> getCustomSortExprs(TransformSpec spec, int index) {
139+
public static Function<List<ExprNodeDesc>, ExprNodeDesc> getCustomTransformExpr(TransformSpec spec, int index) {
140140
switch (spec.getTransformType()) {
141141
case BUCKET:
142142
return BUCKET_SORT_EXPR.apply(index, spec.getTransformParam());

iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,27 @@ INSERT INTO default.zorder_props VALUES (3, 'B'),(1, 'A'),(7, 'C'),(2, 'A'),(9,
109109
DESCRIBE FORMATTED default.zorder_props;
110110
SELECT * FROM default.zorder_props;
111111
DROP TABLE default.zorder_props;
112+
113+
CREATE TABLE default.zorder_tsdl_test (
114+
ts timestamp,
115+
dd double,
116+
ll int)
117+
PARTITIONED BY SPEC (bucket(4, ll))
118+
WRITE ORDERED BY zorder (ts, dd)
119+
STORED BY iceberg
120+
STORED As orc;
121+
122+
explain insert into default.zorder_tsdl_test values (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0);
123+
drop table default.zorder_tsdl_test;
124+
125+
126+
CREATE TABLE default.zorder_tsdl_test1 (
127+
ts timestamp,
128+
dd double)
129+
PARTITIONED BY (ll int)
130+
WRITE ORDERED BY zorder (ts, dd)
131+
STORED BY iceberg
132+
STORED As orc;
133+
134+
explain insert into default.zorder_tsdl_test1 values (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0);
135+
drop table default.zorder_tsdl_test1;

0 commit comments

Comments
 (0)