Skip to content

Commit 7a97a8f

Browse files
committed
review comments
1 parent 6a8649b commit 7a97a8f

5 files changed

Lines changed: 183 additions & 93 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@
1919
package org.apache.iceberg.mr.hive.udf;
2020

2121
import java.nio.ByteBuffer;
22+
import java.util.List;
23+
import java.util.Optional;
2224
import java.util.function.Function;
2325
import org.apache.hadoop.hive.ql.exec.Description;
2426
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
2527
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
2628
import org.apache.hadoop.hive.ql.metadata.HiveException;
29+
import org.apache.hadoop.hive.ql.plan.ColStatistics;
30+
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator;
31+
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimatorProvider;
2732
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
2833
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
2934
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -52,7 +57,7 @@
5257
value = "_FUNC_(value, bucketCount) - " +
5358
"Returns the bucket value calculated by Iceberg bucket transform function ",
5459
extended = "Example:\n > SELECT _FUNC_('A bucket full of ice!', 5);\n 4")
55-
public class GenericUDFIcebergBucket extends GenericUDF {
60+
public class GenericUDFIcebergBucket extends GenericUDF implements StatEstimatorProvider {
5661
private final IntWritable result = new IntWritable();
5762
private int numBuckets = -1;
5863
private transient PrimitiveObjectInspector argumentOI;
@@ -209,4 +214,32 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException {
209214
public String getDisplayString(String[] children) {
210215
return getStandardDisplayString("iceberg_bucket", children);
211216
}
217+
218+
@Override
219+
public StatEstimator getStatEstimator() {
220+
return new BucketStatEstimator();
221+
}
222+
223+
private static class BucketStatEstimator implements StatEstimator {
224+
@Override
225+
public Optional<ColStatistics> estimate(List<ColStatistics> argStats) {
226+
if (argStats.size() != 2) {
227+
return Optional.empty();
228+
}
229+
ColStatistics inputStats = argStats.get(0);
230+
ColStatistics bucketCountStats = argStats.get(1);
231+
ColStatistics.Range bucketRange = bucketCountStats.getRange();
232+
if (bucketRange == null || bucketRange.minValue == null) {
233+
return Optional.empty();
234+
}
235+
long numBuckets = bucketRange.minValue.longValue();
236+
if (numBuckets <= 0) {
237+
return Optional.empty();
238+
}
239+
ColStatistics result = inputStats.clone();
240+
result.setCountDistint(Math.min(inputStats.getCountDistint(), numBuckets));
241+
result.setRange(0, numBuckets - 1);
242+
return Optional.of(result);
243+
}
244+
}
212245
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.mr.hive.udf;
20+
21+
import java.util.Arrays;
22+
import java.util.Optional;
23+
import org.apache.hadoop.hive.ql.plan.ColStatistics;
24+
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator;
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
28+
/**
29+
* Tests for the BucketStatEstimator in GenericUDFIcebergBucket.
30+
* Verifies that the StatEstimator correctly narrows NDV based on bucket count.
31+
*/
32+
public class TestGenericUDFIcebergBucketStatEstimator {
33+
34+
@Test
35+
public void testNdvNarrowedByBucketCount() {
36+
// source NDV (100) > numBuckets (8) -> output NDV should be 8
37+
Optional<ColStatistics> result = estimateBucket(100, 8);
38+
Assert.assertTrue(result.isPresent());
39+
Assert.assertEquals(8, result.get().getCountDistint());
40+
}
41+
42+
@Test
43+
public void testNdvBelowBucketCount() {
44+
// source NDV (3) < numBuckets (8) -> output NDV should be 3
45+
Optional<ColStatistics> result = estimateBucket(3, 8);
46+
Assert.assertTrue(result.isPresent());
47+
Assert.assertEquals(3, result.get().getCountDistint());
48+
}
49+
50+
@Test
51+
public void testNdvEqualsBucketCount() {
52+
// source NDV (8) == numBuckets (8) -> output NDV should be 8
53+
Optional<ColStatistics> result = estimateBucket(8, 8);
54+
Assert.assertTrue(result.isPresent());
55+
Assert.assertEquals(8, result.get().getCountDistint());
56+
}
57+
58+
@Test
59+
public void testZeroBucketsReturnsEmpty() {
60+
Optional<ColStatistics> result = estimateBucket(100, 0);
61+
Assert.assertFalse(result.isPresent());
62+
}
63+
64+
private Optional<ColStatistics> estimateBucket(long sourceNdv, long numBuckets) {
65+
ColStatistics sourceStats = new ColStatistics("col", "int");
66+
sourceStats.setCountDistint(sourceNdv);
67+
ColStatistics numBucketsStats = new ColStatistics("numBuckets", "int");
68+
numBucketsStats.setRange(numBuckets, numBuckets);
69+
70+
StatEstimator estimator = new GenericUDFIcebergBucket().getStatEstimator();
71+
return estimator.estimate(Arrays.asList(sourceStats, numBucketsStats));
72+
}
73+
}

iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ insert overwrite table tbl_target_identity select a, b from tbl_src;
3131
select * from tbl_target_identity order by a, ccy;
3232

3333
--bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting
34-
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc;
35-
-- threshold = 0 (default, cost-based): NDV of b (~5) < MAX_WRITERS -> no sort (FanoutWriter)
34+
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc;
35+
-- threshold = 0 (default, cost-based): bucket NDV = min(NDV(b), 3) = 3 < MAX_WRITERS -> no sort (FanoutWriter)
3636
explain insert into table tbl_target_bucket select a, b from tbl_src;
3737
insert into table tbl_target_bucket select a, b from tbl_src;
3838
select * from tbl_target_bucket order by a, ccy;
@@ -165,12 +165,12 @@ set hive.optimize.sort.dynamic.partition.threshold=1;
165165
explain insert into tbl_target_identity select a, b from tbl_src;
166166
explain insert into tbl_target_bucket select a, b from tbl_src;
167167

168-
-- threshold = 2: NDV of b (~5) > 2 -> sort (ClusteredWriter)
168+
-- threshold = 2: bucket NDV = min(NDV(b), 3) = 3 > 2 -> sort (ClusteredWriter)
169169
set hive.optimize.sort.dynamic.partition.threshold=2;
170170
explain insert into tbl_target_identity select a, b from tbl_src;
171171
explain insert into tbl_target_bucket select a, b from tbl_src;
172172

173-
-- threshold = 100: NDV of b (~5) <= 100 -> no sort (FanoutWriter)
173+
-- threshold = 100: bucket NDV = min(NDV(b), 3) = 3 <= 100 -> no sort (FanoutWriter)
174174
set hive.optimize.sort.dynamic.partition.threshold=100;
175175
explain insert into tbl_target_identity select a, b from tbl_src;
176176
explain insert into tbl_target_bucket select a, b from tbl_src;

0 commit comments

Comments
 (0)