From c980b283d4ad898365207060ad7cd1ca17270397 Mon Sep 17 00:00:00 2001 From: kokila-19 Date: Tue, 24 Mar 2026 21:00:54 +0530 Subject: [PATCH 1/2] test combo --- .../TestHiveIcebergClusteredByWithZOrder.java | 259 +++++++++++++ .../iceberg_clustered_by_with_zorder.q | 71 ++++ .../iceberg_clustered_by_with_zorder.q.out | 360 ++++++++++++++++++ .../resources/testconfiguration.properties | 2 + .../hive/ql/exec/ReduceSinkOperator.java | 11 +- 5 files changed, 702 insertions(+), 1 deletion(-) create mode 100644 iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java create mode 100644 iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q create mode 100644 iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java new file mode 100644 index 000000000000..e380c01bb91c --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mr.hive.test.TestTables.TestTableType; +import org.apache.thrift.TException; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests that verify Hive CLUSTERED BY works correctly in combination with Iceberg ZORDER. + * Validates HMS metadata persistence, physical file distribution, and sort order configuration. + */ +public class TestHiveIcebergClusteredByWithZOrder extends HiveIcebergStorageHandlerWithEngineBase { + + private static final String TABLE_NAME = "clustered_zorder_test"; + private static final TableIdentifier TABLE_ID = TableIdentifier.of("default", TABLE_NAME); + private static final int NUM_BUCKETS = 3; + + @Parameters(name = "fileFormat={0}, catalog={1}, isVectorized={2}, formatVersion={3}") + public static Collection parameters() { + return HiveIcebergStorageHandlerWithEngineBase.getParameters(p -> + p.fileFormat() == FileFormat.PARQUET && + p.testTableType() == TestTableType.HIVE_CATALOG && + p.isVectorized() && + p.formatVersion() == 3); + } + + @Test + public void testClusteredByWithZOrderViaCreateClause() throws TException, InterruptedException, IOException { + // Create table with both CLUSTERED BY and ZORDER + shell.executeStatement( + "CREATE TABLE " + TABLE_NAME + + " (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) " + + "CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " + + "WRITE LOCALLY ORDERED BY zorder(order_date, amount) " + + "STORED BY ICEBERG STORED AS PARQUET " + + "TBLPROPERTIES ('format-version'='" + formatVersion + "')"); + + // Validate HMS has bucketing metadata + validateHmsBucketMetadata(); + + // Validate Iceberg table has z-order sort order + org.apache.iceberg.Table icebergTable = testTables.loadTable(TABLE_ID); + + // Insert test data with varied customer_ids and dates + insertTestData(); + + // Validate bucketing works + validateBucketDistribution(icebergTable); + + // Validate z-order is applied (check sort order in metadata) + SortOrder sortOrder = icebergTable.sortOrder(); + Assert.assertFalse("Sort order should not be unsorted when z-order is specified", sortOrder.isUnsorted()); + Assert.assertEquals("Sort order should have 2 fields for z-order(order_date, amount)", + 2, sortOrder.fields().size()); + + shell.executeStatement("DROP TABLE IF EXISTS " + TABLE_NAME); + } + + @Test + public void testClusteredByWithZOrderViaTableProperties() throws TException, InterruptedException, IOException { + // Create table with CLUSTERED BY and z-order via TBLPROPERTIES + shell.executeStatement( + "CREATE TABLE " + TABLE_NAME + + " (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) " + + "CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " + + "STORED BY ICEBERG STORED AS PARQUET " + + "TBLPROPERTIES ('format-version'='" + formatVersion + "', " + + "'sort.order'='zorder', 'sort.columns'='order_date,amount')"); + + validateHmsBucketMetadata(); + + org.apache.iceberg.Table icebergTable = testTables.loadTable(TABLE_ID); + + insertTestData(); + validateBucketDistribution(icebergTable); + + SortOrder sortOrder = icebergTable.sortOrder(); + Assert.assertFalse("Sort order should not be unsorted when z-order is specified", sortOrder.isUnsorted()); + + shell.executeStatement("DROP TABLE IF EXISTS " + TABLE_NAME); + } + + @Test + public void testMultiColumnClusteredByWithMultiColumnZOrder() throws TException, InterruptedException, IOException { + // Test multi-column bucketing with multi-column z-order + shell.executeStatement( + "CREATE TABLE " + TABLE_NAME + + " (customer_id BIGINT, store_id INT, order_date DATE, amount DOUBLE, status STRING) " + + "CLUSTERED BY (customer_id, store_id) INTO 4 BUCKETS " + + "WRITE ORDERED BY zorder(order_date, amount, status) " + + "STORED BY ICEBERG STORED AS PARQUET " + + "TBLPROPERTIES ('format-version'='" + formatVersion + "')"); + + // Validate HMS metadata for multi-column bucketing + Table hmsTable = shell.metastore().getTable("default", TABLE_NAME); + Assert.assertEquals(4, hmsTable.getSd().getNumBuckets()); + Assert.assertEquals(List.of("customer_id", "store_id"), hmsTable.getSd().getBucketCols()); + + org.apache.iceberg.Table icebergTable = testTables.loadTable(TABLE_ID); + + // Insert data for multi-column bucketing test + shell.executeStatement( + "INSERT INTO " + TABLE_NAME + " VALUES " + + "(1001, 1, DATE '2024-01-15', 125.50, 'COMPLETED'), " + + "(1002, 2, DATE '2024-02-20', 89.75, 'PENDING'), " + + "(1001, 1, DATE '2024-01-10', 245.30, 'SHIPPED'), " + + "(1003, 3, DATE '2024-03-05', 67.90, 'COMPLETED'), " + + "(1002, 2, DATE '2024-02-28', 178.45, 'CANCELLED'), " + + "(1004, 1, DATE '2024-01-22', 312.80, 'COMPLETED'), " + + "(1003, 3, DATE '2024-03-12', 156.20, 'SHIPPED'), " + + "(1005, 2, DATE '2024-02-14', 234.65, 'PENDING')"); + + // Validate total record count + List result = shell.executeStatement("SELECT COUNT(*) FROM " + TABLE_NAME); + Assert.assertEquals(8L, ((Number) result.getFirst()[0]).longValue()); + + // Validate data files exist and are distributed across buckets + Map bucketCounts = extractBucketRecordCounts(icebergTable); + Assert.assertTrue("Should have data files in multiple buckets", bucketCounts.size() > 1); + Assert.assertEquals("Total records should match", 8L, + bucketCounts.values().stream().mapToLong(Long::longValue).sum()); + + SortOrder sortOrder = icebergTable.sortOrder(); + Assert.assertEquals("Sort order should have 3 fields for z-order(order_date, amount, status)", + 3, sortOrder.fields().size()); + + shell.executeStatement("DROP TABLE IF EXISTS " + TABLE_NAME); + } + + @Test + public void testClusteredByZOrderTableSampling() throws IOException { + // Create table for testing TABLESAMPLE functionality + shell.executeStatement( + "CREATE TABLE " + TABLE_NAME + + " (customer_id BIGINT, order_date DATE, amount DOUBLE) " + + "CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " + + "WRITE ORDERED BY zorder(order_date, amount) " + + "STORED BY ICEBERG STORED AS PARQUET " + + "TBLPROPERTIES ('format-version'='" + formatVersion + "')"); + + insertTestData(); + + // Test TABLESAMPLE works with bucketed z-ordered table + List bucket1 = shell.executeStatement( + "SELECT customer_id FROM " + TABLE_NAME + + " TABLESAMPLE(BUCKET 1 OUT OF " + NUM_BUCKETS + " ON customer_id)"); + + List bucket2 = shell.executeStatement( + "SELECT customer_id FROM " + TABLE_NAME + + " TABLESAMPLE(BUCKET 2 OUT OF " + NUM_BUCKETS + " ON customer_id)"); + + // Verify buckets contain different data (assuming our test data distributes across buckets) + Assert.assertTrue("Bucket sampling should return some results", bucket1.size() > 0 || bucket2.size() > 0); + + // Verify z-order optimization with range queries + List dateRange = shell.executeStatement( + "SELECT customer_id, order_date, amount FROM " + TABLE_NAME + + " WHERE order_date BETWEEN DATE '2024-01-01' AND DATE '2024-01-31' " + + " ORDER BY customer_id, order_date"); + + Assert.assertTrue("Date range query should return results", dateRange.size() > 0); + + shell.executeStatement("DROP TABLE IF EXISTS " + TABLE_NAME); + } + + private void validateHmsBucketMetadata() throws TException, InterruptedException { + Table hmsTable = shell.metastore().getTable("default", TABLE_NAME); + Assert.assertEquals(NUM_BUCKETS, hmsTable.getSd().getNumBuckets()); + Assert.assertEquals(Collections.singletonList("customer_id"), hmsTable.getSd().getBucketCols()); + } + + private void insertTestData() { + shell.executeStatement( + "INSERT INTO " + TABLE_NAME + " VALUES " + + "(1, DATE '2024-01-15', 125.50, 'North'), " + + "(2, DATE '2024-02-20', 89.75, 'South'), " + + "(3, DATE '2024-01-10', 245.30, 'East'), " + + "(1, DATE '2024-03-05', 67.90, 'North'), " + + "(4, DATE '2024-02-28', 178.45, 'West'), " + + "(2, DATE '2024-01-22', 312.80, 'South'), " + + "(5, DATE '2024-03-12', 156.20, 'East'), " + + "(3, DATE '2024-02-14', 234.65, 'East'), " + + "(6, DATE '2024-01-30', 98.40, 'West')"); + } + + private void validateBucketDistribution(org.apache.iceberg.Table icebergTable) throws IOException { + Map bucketCounts = extractBucketRecordCounts(icebergTable); + + // Verify we have data files + Assert.assertTrue("Should have at least one data file", bucketCounts.size() > 0); + + // Verify total record count matches + long totalRecords = bucketCounts.values().stream().mapToLong(Long::longValue).sum(); + List countResult = shell.executeStatement("SELECT COUNT(*) FROM " + TABLE_NAME); + long expectedCount = ((Number) countResult.getFirst()[0]).longValue(); + Assert.assertEquals("Total records in files should match table count", expectedCount, totalRecords); + + // Verify bucket IDs are within expected range + for (Integer bucketId : bucketCounts.keySet()) { + Assert.assertTrue("Bucket ID should be within range [0, " + (NUM_BUCKETS - 1) + "]", + bucketId >= 0 && bucketId < NUM_BUCKETS); + } + } + + private Map extractBucketRecordCounts(org.apache.iceberg.Table icebergTable) throws IOException { + Map bucketRecordCounts = new LinkedHashMap<>(); + try (CloseableIterable tasks = icebergTable.newScan().planFiles()) { + for (FileScanTask task : tasks) { + String path = task.file().location(); + String filename = path.substring(path.lastIndexOf('/') + 1); + + int bucketId; + if (!filename.contains("-")) { + // Native Hive: 000000_0 → bucket 0 + bucketId = Integer.parseInt(filename.split("_")[0]); + } else { + // Iceberg: 00000-0-... → bucket 0 + bucketId = Integer.parseInt(filename.split("-")[0]); + } + + // Sum records across multiple files in the same bucket (e.g., multiple inserts or tasks) + bucketRecordCounts.merge(bucketId, task.file().recordCount(), Long::sum); + } + } + return bucketRecordCounts; + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q new file mode 100644 index 000000000000..fb49e9c81171 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q @@ -0,0 +1,71 @@ +-- Test combining Hive CLUSTERED BY with Iceberg ZORDER +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ +-- Mask the totalSize value as it can have slight variability, causing test flakiness +--! qt:replace:/(\s+totalSize\s+)\S+(\s+)/$1#Masked#$2/ +-- Mask random uuid +--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/ +-- Mask a random snapshot id +--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/ +-- Mask added file size +--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask total file size +--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask removed file size +--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/ +-- Mask current-snapshot-timestamp-ms +--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask iceberg version +--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ + +-- Test 1: CLUSTERED BY with ZORDER via CREATE clause +CREATE TABLE clustered_zorder_sales ( + customer_id BIGINT, + product_id BIGINT, + sale_date DATE, + amount DOUBLE, + region STRING +) +CLUSTERED BY (customer_id) INTO 4 BUCKETS +WRITE LOCALLY ORDERED BY zorder(sale_date, amount) +STORED BY ICEBERG; + +-- Verify table structure shows both bucketing and z-order +DESCRIBE FORMATTED clustered_zorder_sales; + +-- Insert test data with varied customer_ids, dates, and amounts +INSERT INTO clustered_zorder_sales VALUES + (1001, 501, DATE '2024-01-15', 150.75, 'North'), + (1002, 502, DATE '2024-02-20', 275.50, 'South'), + (1003, 503, DATE '2024-01-10', 89.25, 'East'), + (1001, 504, DATE '2024-03-05', 320.00, 'North'), + (1004, 505, DATE '2024-02-28', 195.80, 'West'), + (1002, 506, DATE '2024-01-22', 445.30, 'South'), + (1005, 507, DATE '2024-03-12', 67.90, 'East'), + (1003, 508, DATE '2024-02-14', 298.45, 'East'), + (1006, 509, DATE '2024-01-30', 178.60, 'West'), + (1004, 510, DATE '2024-03-18', 412.75, 'West'), + (1007, 511, DATE '2024-02-08', 234.20, 'North'), + (1005, 512, DATE '2024-01-25', 156.40, 'East'), + (1008, 513, DATE '2024-03-22', 389.95, 'South'), + (1006, 514, DATE '2024-02-16', 287.15, 'West'), + (1009, 515, DATE '2024-01-12', 98.50, 'North'), + (1007, 516, DATE '2024-03-08', 445.80, 'North'), + (1010, 517, DATE '2024-02-25', 167.30, 'South'), + (1008, 518, DATE '2024-01-18', 356.70, 'South'), + (1011, 519, DATE '2024-03-15', 203.90, 'East'), + (1009, 520, DATE '2024-02-12', 278.25, 'North'); + +-- Test data retrieval (z-order should optimize range queries) +SELECT customer_id, sale_date, amount +FROM clustered_zorder_sales +WHERE sale_date BETWEEN DATE '2024-02-01' AND DATE '2024-02-29' +ORDER BY customer_id, sale_date; + +-- Verify total count +SELECT COUNT(*) as total_rows FROM clustered_zorder_sales; + +SELECT COUNT(*) from default.clustered_zorder_sales.files; + +DROP TABLE clustered_zorder_sales; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out new file mode 100644 index 000000000000..3244babdf6bb --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out @@ -0,0 +1,360 @@ +PREHOOK: query: CREATE TABLE ice_bucketed ( + id int, + name string, + age int +) +CLUSTERED BY (id) INTO 4 BUCKETS +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_bucketed +POSTHOOK: query: CREATE TABLE ice_bucketed ( + id int, + name string, + age int +) +CLUSTERED BY (id) INTO 4 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_bucketed +PREHOOK: query: DESCRIBE FORMATTED ice_bucketed +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@ice_bucketed +POSTHOOK: query: DESCRIBE FORMATTED ice_bucketed +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@ice_bucketed +# col_name data_type comment +id int +name string +age int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"age\":\"true\",\"id\":\"true\",\"name\":\"true\"}} + bucketing_version 2 + numFiles 0 + numRows 0 + rawDataSize 0 + totalSize #Masked# +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: 4 +Bucket Columns: [id] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: EXPLAIN INSERT INTO ice_bucketed VALUES (1, 'Alice', 25), (2, 'Bob', 30) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_bucketed +POSTHOOK: query: EXPLAIN INSERT INTO ice_bucketed VALUES (1, 'Alice', 25), (2, 'Bob', 30) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_bucketed +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: _dummy_table + Row Limit Per Split: 1 + Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: array(const struct(1,'Alice',25),const struct(2,'Bob',30)) (type: array>) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE + UDTF Operator + Statistics: Num rows: 1 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE + function name: inline + Select Operator + expressions: col1 (type: int), col2 (type: string), col3 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string), _col2 (type: int) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: string), VALUE._col1 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.ice_bucketed + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int) + outputColumnNames: id, name, age + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(id), max(id), count(1), count(id), compute_bit_vector_hll(id), max(length(name)), avg(COALESCE(length(name),0)), count(name), compute_bit_vector_hll(name), min(age), max(age), count(age), compute_bit_vector_hll(age) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17 + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.ice_bucketed + + Stage: Stage-3 + Stats Work + Basic Stats Work: + Column Stats Desc: + Columns: id, name, age + Column Types: int, string, int + Table: default.ice_bucketed + +PREHOOK: query: drop table ice_bucketed +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice_bucketed +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_bucketed +POSTHOOK: query: drop table ice_bucketed +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice_bucketed +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_bucketed +PREHOOK: query: CREATE TABLE clustered_zorder_sales ( + customer_id BIGINT, + product_id BIGINT, + sale_date DATE, + amount DOUBLE, + region STRING +) +CLUSTERED BY (customer_id) INTO 4 BUCKETS +WRITE LOCALLY ORDERED BY zorder(sale_date, amount) +STORED BY ICEBERG +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@clustered_zorder_sales +POSTHOOK: query: CREATE TABLE clustered_zorder_sales ( + customer_id BIGINT, + product_id BIGINT, + sale_date DATE, + amount DOUBLE, + region STRING +) +CLUSTERED BY (customer_id) INTO 4 BUCKETS +WRITE LOCALLY ORDERED BY zorder(sale_date, amount) +STORED BY ICEBERG +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@clustered_zorder_sales +PREHOOK: query: DESCRIBE FORMATTED clustered_zorder_sales +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@clustered_zorder_sales +POSTHOOK: query: DESCRIBE FORMATTED clustered_zorder_sales +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@clustered_zorder_sales +# col_name data_type comment +customer_id bigint +product_id bigint +sale_date date +amount double +region string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"amount\":\"true\",\"customer_id\":\"true\",\"product_id\":\"true\",\"region\":\"true\",\"sale_date\":\"true\"}} + EXTERNAL TRUE + bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"customer_id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"product_id\",\"required\":false,\"type\":\"long\"},{\"id\":3,\"name\":\"sale_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"amount\",\"required\":false,\"type\":\"double\"},{\"id\":5,\"name\":\"region\",\"required\":false,\"type\":\"string\"}]} + format-version 2 + iceberg.orc.files.only false +#### A masked pattern was here #### + numFiles 0 + numRows 0 + parquet.compression zstd + rawDataSize 0 + serialization.format 1 + snapshot-count 0 + sort.columns sale_date,amount + sort.order ZORDER + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG + totalSize #Masked# +#### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.merge.mode merge-on-read + write.metadata.delete-after-commit.enabled true + write.update.mode merge-on-read + +# Storage Information +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat +Compressed: No +Num Buckets: 4 +Bucket Columns: [customer_id] +Sort Columns: [] +PREHOOK: query: INSERT INTO clustered_zorder_sales VALUES + (1001, 501, DATE '2024-01-15', 150.75, 'North'), + (1002, 502, DATE '2024-02-20', 275.50, 'South'), + (1003, 503, DATE '2024-01-10', 89.25, 'East'), + (1001, 504, DATE '2024-03-05', 320.00, 'North'), + (1004, 505, DATE '2024-02-28', 195.80, 'West'), + (1002, 506, DATE '2024-01-22', 445.30, 'South'), + (1005, 507, DATE '2024-03-12', 67.90, 'East'), + (1003, 508, DATE '2024-02-14', 298.45, 'East'), + (1006, 509, DATE '2024-01-30', 178.60, 'West'), + (1004, 510, DATE '2024-03-18', 412.75, 'West'), + (1007, 511, DATE '2024-02-08', 234.20, 'North'), + (1005, 512, DATE '2024-01-25', 156.40, 'East'), + (1008, 513, DATE '2024-03-22', 389.95, 'South'), + (1006, 514, DATE '2024-02-16', 287.15, 'West'), + (1009, 515, DATE '2024-01-12', 98.50, 'North'), + (1007, 516, DATE '2024-03-08', 445.80, 'North'), + (1010, 517, DATE '2024-02-25', 167.30, 'South'), + (1008, 518, DATE '2024-01-18', 356.70, 'South'), + (1011, 519, DATE '2024-03-15', 203.90, 'East'), + (1009, 520, DATE '2024-02-12', 278.25, 'North') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@clustered_zorder_sales +POSTHOOK: query: INSERT INTO clustered_zorder_sales VALUES + (1001, 501, DATE '2024-01-15', 150.75, 'North'), + (1002, 502, DATE '2024-02-20', 275.50, 'South'), + (1003, 503, DATE '2024-01-10', 89.25, 'East'), + (1001, 504, DATE '2024-03-05', 320.00, 'North'), + (1004, 505, DATE '2024-02-28', 195.80, 'West'), + (1002, 506, DATE '2024-01-22', 445.30, 'South'), + (1005, 507, DATE '2024-03-12', 67.90, 'East'), + (1003, 508, DATE '2024-02-14', 298.45, 'East'), + (1006, 509, DATE '2024-01-30', 178.60, 'West'), + (1004, 510, DATE '2024-03-18', 412.75, 'West'), + (1007, 511, DATE '2024-02-08', 234.20, 'North'), + (1005, 512, DATE '2024-01-25', 156.40, 'East'), + (1008, 513, DATE '2024-03-22', 389.95, 'South'), + (1006, 514, DATE '2024-02-16', 287.15, 'West'), + (1009, 515, DATE '2024-01-12', 98.50, 'North'), + (1007, 516, DATE '2024-03-08', 445.80, 'North'), + (1010, 517, DATE '2024-02-25', 167.30, 'South'), + (1008, 518, DATE '2024-01-18', 356.70, 'South'), + (1011, 519, DATE '2024-03-15', 203.90, 'East'), + (1009, 520, DATE '2024-02-12', 278.25, 'North') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@clustered_zorder_sales +PREHOOK: query: SELECT customer_id, sale_date, amount +FROM clustered_zorder_sales +WHERE sale_date BETWEEN DATE '2024-02-01' AND DATE '2024-02-29' +ORDER BY customer_id, sale_date +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +#### A masked pattern was here #### +POSTHOOK: query: SELECT customer_id, sale_date, amount +FROM clustered_zorder_sales +WHERE sale_date BETWEEN DATE '2024-02-01' AND DATE '2024-02-29' +ORDER BY customer_id, sale_date +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +#### A masked pattern was here #### +1002 2024-02-20 275.5 +1003 2024-02-14 298.45 +1004 2024-02-28 195.8 +1006 2024-02-16 287.15 +1007 2024-02-08 234.2 +1009 2024-02-12 278.25 +1010 2024-02-25 167.3 +PREHOOK: query: SELECT COUNT(*) as total_rows FROM clustered_zorder_sales +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(*) as total_rows FROM clustered_zorder_sales +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +#### A masked pattern was here #### +20 +PREHOOK: query: SELECT COUNT(*) from default.clustered_zorder_sales.files +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(*) from default.clustered_zorder_sales.files +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +#### A masked pattern was here #### +1 +PREHOOK: query: DROP TABLE clustered_zorder_sales +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: database:default +PREHOOK: Output: default@clustered_zorder_sales +POSTHOOK: query: DROP TABLE clustered_zorder_sales +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: database:default +POSTHOOK: Output: default@clustered_zorder_sales diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index b4441518a64a..ac510f0ca8d4 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -431,6 +431,7 @@ iceberg.llap.query.files=\ iceberg_bucket_map_join_8.q,\ iceberg_clustered.q,\ iceberg_clustered_by.q,\ + iceberg_clustered_by_with_zorder.q,\ iceberg_create_locally_ordered_table.q,\ iceberg_create_locally_zordered_table.q,\ iceberg_merge_delete_files.q,\ @@ -483,6 +484,7 @@ iceberg.llap.only.query.files=\ iceberg_bucket_map_join_8.q,\ iceberg_clustered.q,\ iceberg_clustered_by.q,\ + iceberg_clustered_by_with_zorder.q,\ iceberg_create_locally_ordered_table.q,\ iceberg_create_locally_zordered_table.q,\ iceberg_merge_delete_files.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index b19e846d165b..7d364081ed2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -311,13 +311,22 @@ public void process(Object row, int tag) throws HiveException { // replace bucketing columns with hashcode % numBuckets int bucketNumber = -1; - if (bucketEval != null) { + /* if (bucketEval != null) { bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); } if (buckColIdxInKeyForSdpo != -1) { cachedKeys[0][buckColIdxInKeyForSdpo] = new Text(String.valueOf(bucketNumber)); + } */ + if (bucketEval != null) { + bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); + // When SortedDynPartitionOptimizer prepends custom sort keys (e.g. Iceberg z-order binary), + // _bucket_number is not at index partitionCols.size(). Prefer the key index discovered from + // GenericUDFBucketNumber (buckColIdxInKeyForSdpo); otherwise use legacy buckColIdxInKey. + int bucketKeyIdx = buckColIdxInKeyForSdpo != -1 ? buckColIdxInKeyForSdpo : buckColIdxInKey; + cachedKeys[0][bucketKeyIdx] = new Text(String.valueOf(bucketNumber)); } + LOG.info("TESTZZ: bucketNumber = " + bucketNumber); HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); int distKeyLength = firstKey.getDistKeyLength(); From 643ba8006ff029109ab32634aafc4deeae2757cc Mon Sep 17 00:00:00 2001 From: kokila-19 Date: Fri, 8 May 2026 10:49:06 +0530 Subject: [PATCH 2/2] Iceberg: preserve Hive bucketing under SDPO Z-order Reuse enforce-bucketing reducer counts in SDPO for Iceberg CLUSTERED BY writes and, when buckets exceed the reducer cap, enable a gated runtime path that carries Hive bucket ids via SerDe and routes output files by bucket. --- .../HiveIcebergHiveBucketingMetadata.java | 84 +++ .../mr/hive/HiveIcebergOutputFormat.java | 2 +- .../iceberg/mr/hive/HiveIcebergSerDe.java | 224 ++++++-- .../mr/hive/writer/BucketAwareContainer.java | 42 ++ .../HiveIcebergHiveBucketRoutingWriter.java | 118 +++++ .../iceberg/mr/hive/writer/WriterBuilder.java | 80 ++- .../mr/hive/TestHiveIcebergClusteredBy.java | 62 +++ .../TestHiveIcebergClusteredByWithZOrder.java | 190 ++++++- .../iceberg_clustered_by_with_zorder.q | 109 ++-- .../iceberg_clustered_by_with_zorder.q.out | 491 +++++++++--------- .../hadoop/hive/ql/exec/FileSinkOperator.java | 18 +- .../hive/ql/exec/ReduceSinkOperator.java | 34 +- .../VectorReduceSinkObjectHashOperator.java | 48 +- .../SortedDynPartitionOptimizer.java | 172 ++++-- .../hive/ql/parse/SemanticAnalyzer.java | 9 +- .../HiveCustomStorageHandlerUtils.java | 17 + 16 files changed, 1268 insertions(+), 432 deletions(-) create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergHiveBucketingMetadata.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/BucketAwareContainer.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergHiveBucketRoutingWriter.java diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergHiveBucketingMetadata.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergHiveBucketingMetadata.java new file mode 100644 index 000000000000..b1945e26c985 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergHiveBucketingMetadata.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Hive-native bucketing metadata ({@code CLUSTERED BY}) stored in HMS for an Iceberg table. + */ +public final class HiveIcebergHiveBucketingMetadata { + + private final int numBuckets; + private final List bucketCols; + private final int bucketingVersion; + + private HiveIcebergHiveBucketingMetadata(int numBuckets, List bucketCols, int bucketingVersion) { + this.numBuckets = numBuckets; + this.bucketCols = bucketCols == null ? Collections.emptyList() : bucketCols; + this.bucketingVersion = bucketingVersion; + } + + public static HiveIcebergHiveBucketingMetadata fromHmsTable(org.apache.hadoop.hive.ql.metadata.Table table) { + return new HiveIcebergHiveBucketingMetadata( + table.getNumBuckets(), table.getBucketCols(), table.getBucketingVersion()); + } + + public static HiveIcebergHiveBucketingMetadata load(Configuration conf, String tableName) + throws SerDeException { + if (conf == null || tableName == null || tableName.isEmpty()) { + throw new SerDeException("Cannot load Hive bucketing metadata without configuration and table name"); + } + try { + TableIdentifier tableId = TableIdentifier.parse(tableName); + String dbName = tableId.namespace().length() > 0 ? + tableId.namespace().level(0) : + Warehouse.DEFAULT_DATABASE_NAME; + org.apache.hadoop.hive.ql.metadata.Table table = + Hive.get(conf, HiveIcebergHiveBucketingMetadata.class).getTable(dbName, tableId.name()); + return fromHmsTable(table); + } catch (HiveException e) { + throw new SerDeException("Failed to load Hive bucketing metadata for table " + tableName, e); + } + } + + public boolean hasHiveBucketing() { + return numBuckets > 0 && !bucketCols.isEmpty(); + } + + public int numBuckets() { + return numBuckets; + } + + public List bucketCols() { + return bucketCols; + } + + public int bucketingVersion() { + return bucketingVersion; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index c4d73bfc0f7a..6a6e047d7438 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -67,7 +67,7 @@ private static HiveIcebergWriter writer(JobConf jc) { setWriterLevelConfiguration(jc, table); boolean shouldAddRowLineageColumns = jc.getBoolean(SessionStateUtil.ROW_LINEAGE, false); - return WriterBuilder.builderFor(table, jc::get) + return WriterBuilder.builderFor(table, jc, jc::get) .queryId(jc.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname)) .attemptID(taskAttemptID) .addRowLineageColumns(shouldAddRowLineageColumns) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 83d00942edd5..2e8f218dda9a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Writable; @@ -55,6 +57,7 @@ import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; +import org.apache.iceberg.mr.hive.writer.BucketAwareContainer; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -76,8 +79,15 @@ public class HiveIcebergSerDe extends AbstractSerDe { private Collection partitionColumns; private final Map deserializers = Maps.newHashMapWithExpectedSize(1); private final Container row = new Container<>(); + private final BucketAwareContainer bucketAwareRow = new BucketAwareContainer<>(); private final Map jobConf = Maps.newHashMap(); + private boolean hiveBucketingRouteEnabled = false; + private int hiveBucketingNumBuckets = -1; + private int hiveBucketingVersion = 2; + private List hiveBucketingBucketCols = ImmutableList.of(); + private final Map bucketComputers = Maps.newHashMapWithExpectedSize(1); + @Override public void initialize(Configuration conf, Properties serDeProperties, Properties partitionProperties) throws SerDeException { @@ -93,64 +103,104 @@ public void initialize(Configuration conf, Properties serDeProperties, // executor, but serDeProperties are populated by HiveIcebergStorageHandler.configureInputJobProperties() and // the resulting properties are serialized and distributed to the executors + initTableSchemaAndPartitionColumns(conf, serDeProperties); + this.projectedSchema = + projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf); + initHiveBucketingRoute(conf, serDeProperties); + configureDynamicPartitionSorting(conf, serDeProperties); + this.inspector = createInspector(projectedSchema); + } + + private void initTableSchemaAndPartitionColumns(Configuration conf, Properties serDeProperties) + throws SerDeException { if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) { - this.tableSchema = SchemaParser.fromJson(serDeProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)); - if (serDeProperties.get(InputFormatConfig.PARTITION_SPEC) != null) { - PartitionSpec spec = - PartitionSpecParser.fromJson(tableSchema, serDeProperties.getProperty(InputFormatConfig.PARTITION_SPEC)); - this.partitionColumns = spec.fields().stream().map(PartitionField::name).collect(Collectors.toList()); - } else { - this.partitionColumns = ImmutableList.of(); - } + initTableSchemaFromSerdeProperties(serDeProperties); + return; + } + try { + Table table = IcebergTableUtil.getTable(conf, serDeProperties); + this.tableSchema = table.schema(); + this.partitionColumns = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList()); + LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema)); + } catch (Exception e) { + initTableSchemaOnLoadFailure(conf, serDeProperties, e); + } + } + + private void initTableSchemaFromSerdeProperties(Properties serDeProperties) { + this.tableSchema = SchemaParser.fromJson(serDeProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)); + if (serDeProperties.get(InputFormatConfig.PARTITION_SPEC) != null) { + PartitionSpec spec = + PartitionSpecParser.fromJson(tableSchema, serDeProperties.getProperty(InputFormatConfig.PARTITION_SPEC)); + this.partitionColumns = spec.fields().stream().map(PartitionField::name).collect(Collectors.toList()); } else { - try { - Table table = IcebergTableUtil.getTable(conf, serDeProperties); - // always prefer the original table schema if there is one - this.tableSchema = table.schema(); - this.partitionColumns = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList()); - LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema)); - } catch (Exception e) { - // During table creation we might not have the schema information from the Iceberg table, nor from the HMS - // table. In this case we have to generate the schema using the serdeProperties which contains the info - // provided in the CREATE TABLE query. - - if (serDeProperties.get("metadata_location") != null) { - // If metadata location is provided, extract the schema details from it. - try (FileIO fileIO = new HadoopFileIO(conf)) { - TableMetadata metadata = TableMetadataParser.read(fileIO, serDeProperties.getProperty("metadata_location")); - this.tableSchema = metadata.schema(); - this.partitionColumns = - metadata.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList()); - // Validate no schema is provided via create command - if (!getColumnNames().isEmpty() || !getPartitionColumnNames().isEmpty()) { - throw new SerDeException("Column names can not be provided along with metadata location."); - } - } - } else { - boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false); - // If we can not load the table try the provided hive schema - this.tableSchema = hiveSchemaOrThrow(e, autoConversion); - // This is only for table creation, it is ok to have an empty partition column list - this.partitionColumns = ImmutableList.of(); - } - if (e instanceof NoSuchTableException && - HiveTableUtil.isCtas(serDeProperties) && !Catalogs.hiveCatalog(conf, serDeProperties)) { - throw new SerDeException(CTAS_EXCEPTION_MSG); - } + this.partitionColumns = ImmutableList.of(); + } + } + + private void initTableSchemaOnLoadFailure(Configuration conf, Properties serDeProperties, Exception loadException) + throws SerDeException { + if (serDeProperties.get("metadata_location") != null) { + initTableSchemaFromMetadataLocation(conf, serDeProperties); + } else { + boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false); + this.tableSchema = hiveSchemaOrThrow(loadException, autoConversion); + this.partitionColumns = ImmutableList.of(); + } + if (loadException instanceof NoSuchTableException && + HiveTableUtil.isCtas(serDeProperties) && !Catalogs.hiveCatalog(conf, serDeProperties)) { + throw new SerDeException(CTAS_EXCEPTION_MSG); + } + } + + private void initTableSchemaFromMetadataLocation(Configuration conf, Properties serDeProperties) + throws SerDeException { + try (FileIO fileIO = new HadoopFileIO(conf)) { + TableMetadata metadata = TableMetadataParser.read(fileIO, serDeProperties.getProperty("metadata_location")); + this.tableSchema = metadata.schema(); + this.partitionColumns = + metadata.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList()); + if (!getColumnNames().isEmpty() || !getPartitionColumnNames().isEmpty()) { + throw new SerDeException("Column names can not be provided along with metadata location."); } + } catch (SerDeException e) { + throw e; + } catch (Exception e) { + throw new SerDeException("Failed to load schema from metadata location", e); } + } - this.projectedSchema = - projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf); + private void initHiveBucketingRoute(Configuration conf, Properties serDeProperties) throws SerDeException { + final String tableName = serDeProperties.getProperty(Catalogs.NAME); + if (tableName == null) { + return; + } + hiveBucketingRouteEnabled = + HiveCustomStorageHandlerUtils.getIcebergHiveBucketingRouteEnabled(conf::get, tableName); + if (!hiveBucketingRouteEnabled) { + return; + } + HiveIcebergHiveBucketingMetadata bucketingMetadata = HiveIcebergHiveBucketingMetadata.load(conf, tableName); + if (!bucketingMetadata.hasHiveBucketing()) { + throw new SerDeException("Bucket routing enabled but HMS table has no CLUSTERED BY metadata: " + tableName); + } + hiveBucketingNumBuckets = bucketingMetadata.numBuckets(); + hiveBucketingVersion = bucketingMetadata.bucketingVersion(); + hiveBucketingBucketCols = bucketingMetadata.bucketCols(); + } - if (!IcebergTableUtil.isFanoutEnabled(serDeProperties::getProperty)) { + private static void configureDynamicPartitionSorting(Configuration conf, Properties serDeProperties) { + if (!IcebergTableUtil.isFanoutEnabled(Maps.fromProperties(serDeProperties))) { // ClusteredWriter requires that records are ordered by partition keys. // Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting. HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD, 1); } HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict"); + } + + private static ObjectInspector createInspector(Schema schema) throws SerDeException { try { - this.inspector = IcebergObjectInspector.create(projectedSchema); + return IcebergObjectInspector.create(schema); } catch (Exception e) { throw new SerDeException(e); } @@ -212,6 +262,21 @@ public Writable serialize(Object o, ObjectInspector objectInspector) { } row.set(deserializer.deserialize(o)); + if (hiveBucketingRouteEnabled) { + HiveBucketComputer computer = bucketComputers.get(objectInspector); + if (computer == null) { + if (!(objectInspector instanceof StructObjectInspector)) { + throw new IllegalStateException("Expected StructObjectInspector for bucketing route but got: " + + objectInspector.getTypeName()); + } + computer = new HiveBucketComputer((StructObjectInspector) objectInspector, hiveBucketingBucketCols, + tableSchemaColumnNames(), hiveBucketingNumBuckets, hiveBucketingVersion); + bucketComputers.put(objectInspector, computer); + } + int bucketId = computer.bucketId(o); + bucketAwareRow.set(row.get(), bucketId); + return bucketAwareRow; + } return row; } @@ -280,6 +345,73 @@ public Schema getTableSchema() { return tableSchema; } + private List tableSchemaColumnNames() { + if (tableSchema == null) { + return ImmutableList.of(); + } + return tableSchema.columns().stream() + .map(col -> col.name()) + .collect(Collectors.toList()); + } + + private static final class HiveBucketComputer { + private final StructObjectInspector inspector; + private final List bucketFields; + private final ObjectInspector[] bucketFieldInspectors; + private final int numBuckets; + private final int bucketingVersion; + + private HiveBucketComputer( + StructObjectInspector inspector, List bucketColNames, List tableColumnNames, + int numBuckets, int bucketingVersion) { + this.inspector = inspector; + this.numBuckets = numBuckets; + this.bucketingVersion = bucketingVersion; + this.bucketFields = Lists.newArrayListWithCapacity(bucketColNames.size()); + for (String bucketColName : bucketColNames) { + bucketFields.add(resolveBucketField(inspector, bucketColName, tableColumnNames)); + } + this.bucketFieldInspectors = new ObjectInspector[bucketFields.size()]; + for (int i = 0; i < bucketFields.size(); i++) { + bucketFieldInspectors[i] = bucketFields.get(i).getFieldObjectInspector(); + } + } + + private static StructField resolveBucketField( + StructObjectInspector inspector, String bucketColName, List tableColumnNames) { + List rowFields = inspector.getAllStructFieldRefs(); + for (StructField rowField : rowFields) { + if (bucketColName.equalsIgnoreCase(rowField.getFieldName())) { + return rowField; + } + } + // SDPO reduce path rows use internal names (_col0, ...); map logical bucket cols via table order. + if (tableColumnNames != null && !tableColumnNames.isEmpty()) { + for (int i = 0; i < tableColumnNames.size(); i++) { + if (bucketColName.equalsIgnoreCase(tableColumnNames.get(i)) && i < rowFields.size()) { + return rowFields.get(i); + } + } + } + throw new IllegalStateException( + "Bucket column not found in row inspector: " + bucketColName + ", fields: " + rowFields); + } + + int bucketId(Object row) { + if (numBuckets <= 0) { + throw new IllegalStateException("Invalid numBuckets for bucketing route: " + numBuckets); + } + Object[] values = new Object[bucketFields.size()]; + for (int i = 0; i < bucketFields.size(); i++) { + values[i] = inspector.getStructFieldData(row, bucketFields.get(i)); + } + if (bucketingVersion == 1) { + return ObjectInspectorUtils.getBucketNumberOld(values, bucketFieldInspectors, numBuckets); + } + return ObjectInspectorUtils.getBucketNumber(values, bucketFieldInspectors, numBuckets); + } + } + private static Schema getSchemaWithRowLineage(Schema schema, Configuration conf) { boolean rowLineage = Boolean.parseBoolean(conf.get(SessionStateUtil.ROW_LINEAGE)); if (rowLineage) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/BucketAwareContainer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/BucketAwareContainer.java new file mode 100644 index 000000000000..6089ad7048e7 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/BucketAwareContainer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.writer; + +import org.apache.iceberg.mr.mapred.Container; + +/** + * A {@link Container} that carries a Hive-native bucket id alongside the value. + * + *

This is used for gated Iceberg write routing when Hive bucketing (CLUSTERED BY) + * requires bucket-id-prefixed output files even when reducer count is capped.

+ */ +public class BucketAwareContainer extends Container { + + private int bucketId = -1; + + public int bucketId() { + return bucketId; + } + + public void set(T newValue, int id) { + super.set(newValue); + this.bucketId = id; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergHiveBucketRoutingWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergHiveBucketRoutingWriter.java new file mode 100644 index 000000000000..35ede4e70a29 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergHiveBucketRoutingWriter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.writer; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.hive.FilesForCommit; +import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Routes writes to per-bucket Iceberg writers when Hive-native bucketing (CLUSTERED BY) is enabled + * but reducers are capped below the bucket count. + * + *

Bucket id is expected to be carried on the incoming {@link Writable} via {@link BucketAwareContainer}.

+ */ +class HiveIcebergHiveBucketRoutingWriter implements HiveIcebergWriter { + + private final IntFunction perBucketWriterFactory; + private final int numBuckets; + private final Map writersByBucket = new LinkedHashMap<>(); + + HiveIcebergHiveBucketRoutingWriter(IntFunction perBucketWriterFactory, int numBuckets) { + this.perBucketWriterFactory = perBucketWriterFactory; + this.numBuckets = numBuckets; + } + + @Override + public void write(Writable w) throws IOException { + if (!(w instanceof Container)) { + throw new IllegalArgumentException("Expected Container but got: " + w.getClass()); + } + @SuppressWarnings("unchecked") + Container container = (Container) w; + + if (!(container instanceof BucketAwareContainer)) { + throw new IllegalStateException("Bucket routing enabled but incoming row does not carry bucket id: " + + container.getClass()); + } + + int bucketId = ((BucketAwareContainer) container).bucketId(); + if (bucketId < 0 || numBuckets > 0 && bucketId >= numBuckets) { + throw new IllegalArgumentException("Invalid bucket id for routing: " + bucketId + + " (numBuckets=" + numBuckets + ")"); + } + + HiveIcebergWriter writer = writersByBucket.computeIfAbsent(bucketId, perBucketWriterFactory::apply); + writer.write(w); + } + + @Override + public void close(boolean abort) throws IOException { + IOException first = null; + for (HiveIcebergWriter writer : writersByBucket.values()) { + try { + writer.close(abort); + } catch (IOException ioe) { + if (first == null) { + first = ioe; + } + } + } + if (first != null) { + throw first; + } + } + + @Override + public FilesForCommit files() { + if (writersByBucket.isEmpty()) { + return FilesForCommit.empty(); + } + + java.util.List dataFiles = Lists.newArrayList(); + java.util.List deleteFiles = Lists.newArrayList(); + java.util.List replacedDataFiles = Lists.newArrayList(); + java.util.List referencedDataFiles = Lists.newArrayList(); + java.util.List rewrittenDeleteFiles = Lists.newArrayList(); + java.util.List mergedAndDeletedFiles = Lists.newArrayList(); + + for (HiveIcebergWriter writer : writersByBucket.values()) { + FilesForCommit filesForCommit = writer.files(); + dataFiles.addAll(filesForCommit.dataFiles()); + deleteFiles.addAll(filesForCommit.deleteFiles()); + replacedDataFiles.addAll(filesForCommit.replacedDataFiles()); + referencedDataFiles.addAll(filesForCommit.referencedDataFiles()); + rewrittenDeleteFiles.addAll(filesForCommit.rewrittenDeleteFiles()); + mergedAndDeletedFiles.addAll(filesForCommit.mergedAndDeletedFiles()); + } + + return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles, rewrittenDeleteFiles, + mergedAndDeletedFiles); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java index f489d11d818b..f6e751e4f081 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java @@ -32,9 +32,11 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import org.apache.commons.lang3.ObjectUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; import org.apache.hadoop.hive.ql.session.SessionStateUtil; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.iceberg.BatchScan; import org.apache.iceberg.DeleteFile; @@ -52,6 +54,7 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.hive.HiveIcebergHiveBucketingMetadata; import org.apache.iceberg.mr.hive.IcebergTableUtil; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -70,6 +73,7 @@ public class WriterBuilder { private final Supplier> rewritableDeletes; private final Context context; private final String tableName; + private final Configuration configuration; private TaskAttemptID attemptID; private String queryId; private Operation operation; @@ -82,16 +86,21 @@ public class WriterBuilder { public static final boolean ICEBERG_DELETE_SKIPROWDATA_DEFAULT = true; private boolean shouldAddRowLineageColumns = false; - private WriterBuilder(Table table, UnaryOperator ops) { + private WriterBuilder(Table table, Configuration configuration, UnaryOperator ops) { this.table = table; + this.configuration = configuration; this.tableName = ops.apply(Catalogs.NAME); - this.context = new Context(table.properties(), ops, tableName); + this.context = new Context(table.properties(), ops, tableName, configuration); this.operation = HiveCustomStorageHandlerUtils.getWriteOperation(ops, tableName); this.rewritableDeletes = () -> rewritableDeletes(ops); } public static WriterBuilder builderFor(Table table, UnaryOperator ops) { - return new WriterBuilder(table, ops); + return builderFor(table, null, ops); + } + + public static WriterBuilder builderFor(Table table, Configuration configuration, UnaryOperator ops) { + return new WriterBuilder(table, configuration, ops); } public WriterBuilder attemptID(TaskAttemptID newAttemptID) { @@ -139,14 +148,33 @@ public HiveIcebergWriter build() { boolean isCOW = IcebergTableUtil.isCopyOnWriteMode(operation, table.properties()::getOrDefault); if (isCOW) { - writer = new HiveIcebergCopyOnWriteRecordWriter(table, writerFactory, dataFileFactory, shouldAddRowLineageColumns, - context); + if (context.hiveBucketingRouteEnabled()) { + writer = new HiveIcebergHiveBucketRoutingWriter(bucketId -> { + OutputFileFactory perBucketFactory = OutputFileFactory.builderFor(table, bucketId, taskId) + .format(context.dataFileFormat()) + .operationId(operationId) + .build(); + return new HiveIcebergCopyOnWriteRecordWriter(table, writerFactory, perBucketFactory, + shouldAddRowLineageColumns, context); + }, context.hiveBucketingNumBuckets()); + } else { + writer = new HiveIcebergCopyOnWriteRecordWriter(table, writerFactory, dataFileFactory, + shouldAddRowLineageColumns, context); + } } else { writer = switch (operation) { case DELETE -> new HiveIcebergDeleteWriter(table, rewritableDeletes.get(), writerFactory, deleteFileFactory, context); case OTHER -> - new HiveIcebergRecordWriter(table, writerFactory, dataFileFactory, context); + context.hiveBucketingRouteEnabled() ? + new HiveIcebergHiveBucketRoutingWriter(bucketId -> { + OutputFileFactory perBucketFactory = OutputFileFactory.builderFor(table, bucketId, taskId) + .format(context.dataFileFormat()) + .operationId(operationId) + .build(); + return new HiveIcebergRecordWriter(table, writerFactory, perBucketFactory, context); + }, context.hiveBucketingNumBuckets()) + : new HiveIcebergRecordWriter(table, writerFactory, dataFileFactory, context); default -> // Update and Merge should be split to inserts and deletes throw new IllegalArgumentException("Unsupported operation when creating IcebergRecordWriter: " + @@ -230,8 +258,11 @@ static class Context { private final boolean skipRowData; private final boolean useDVs; private final Set missingColumns; + private final boolean hiveBucketingRouteEnabled; + private final int hiveBucketingNumBuckets; - Context(Map properties, UnaryOperator ops, String tableName) { + Context(Map properties, UnaryOperator ops, String tableName, + Configuration configuration) { String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); this.dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH)); @@ -249,6 +280,11 @@ static class Context { this.useFanoutWriter = !inputOrdered && IcebergTableUtil.isFanoutEnabled(properties); this.isMergeTask = HiveCustomStorageHandlerUtils.isMergeTaskEnabled(ops, tableName); + this.hiveBucketingRouteEnabled = HiveCustomStorageHandlerUtils.getIcebergHiveBucketingRouteEnabled(ops, + tableName); + this.hiveBucketingNumBuckets = resolveHiveBucketingNumBuckets(configuration, tableName, + hiveBucketingRouteEnabled); + this.deleteGranularity = DeleteGranularity.PARTITION; this.useDVs = IcebergTableUtil.formatVersion(properties) > 2; @@ -289,6 +325,14 @@ boolean inputOrdered() { return inputOrdered; } + boolean hiveBucketingRouteEnabled() { + return hiveBucketingRouteEnabled; + } + + int hiveBucketingNumBuckets() { + return hiveBucketingNumBuckets; + } + boolean isMergeTask() { return isMergeTask; } @@ -297,10 +341,30 @@ boolean skipRowData() { return skipRowData; } - public boolean useDVs() { + public boolean useDVs() { return useDVs; } + private static int resolveHiveBucketingNumBuckets(Configuration configuration, String tableName, + boolean routeEnabled) { + if (!routeEnabled) { + return -1; + } + if (configuration == null) { + throw new IllegalStateException("Hive bucket routing requires task configuration for table " + tableName); + } + try { + HiveIcebergHiveBucketingMetadata metadata = HiveIcebergHiveBucketingMetadata.load(configuration, tableName); + if (!metadata.hasHiveBucketing()) { + throw new IllegalStateException("Bucket routing enabled but HMS table has no CLUSTERED BY metadata: " + + tableName); + } + return metadata.numBuckets(); + } catch (SerDeException e) { + throw new IllegalStateException("Failed to load Hive bucketing metadata for table " + tableName, e); + } + } + public Set missingColumns() { return missingColumns; } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredBy.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredBy.java index 3bc3c7127222..af2ace848e7b 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredBy.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredBy.java @@ -47,6 +47,22 @@ public class TestHiveIcebergClusteredBy extends HiveIcebergStorageHandlerWithEng private static final TableIdentifier TABLE_ID = TableIdentifier.of("default", TABLE_NAME); private static final int NUM_BUCKETS = 3; + /** + * Iceberg table for bucket-prefixed file checks when SDPO interacts with reducer caps + * (CLUSTERED BY + Z-order plan). + */ + private static final String ZORDER_REDUCER_CAP_TABLE = "ice_clus_zorder_reducer_cap"; + private static final TableIdentifier ZORDER_REDUCER_CAP_TABLE_ID = + TableIdentifier.of("default", ZORDER_REDUCER_CAP_TABLE); + + /** Declared CLUSTERED BY bucket count when exercising Z-order with a capped reducer setting. */ + private static final int ZORDER_CLUSTER_BUCKET_COUNT = 8; + /** + * Upper bound on reducers ({@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#MAX_REDUCERS}), + * below {@link #ZORDER_CLUSTER_BUCKET_COUNT}. + */ + private static final int REDUCERS_MAX_CAP = 2; + private static final Map EXPECTED_BUCKET_RECORD_COUNTS = buildExpectedBuckets(); private static Map buildExpectedBuckets() { @@ -77,6 +93,24 @@ private void setUpIcebergTable() { HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, TABLE_ID, false)); } + private void setUpZOrderReducerCapIcebergTable() { + shell.setHiveSessionValue("hive.exec.reducers.max", Integer.toString(REDUCERS_MAX_CAP)); + + shell.executeStatement( + "CREATE TABLE " + ZORDER_REDUCER_CAP_TABLE + + " (customer_id BIGINT, first_name STRING, last_name STRING) " + + "CLUSTERED BY (customer_id) INTO " + ZORDER_CLUSTER_BUCKET_COUNT + " BUCKETS " + + "STORED BY ICEBERG STORED AS PARQUET " + + "TBLPROPERTIES ('format-version'='" + formatVersion + "')"); + + shell.executeStatement( + "ALTER TABLE " + ZORDER_REDUCER_CAP_TABLE + + " SET WRITE ORDERED BY ZORDER (customer_id, first_name)"); + + shell.executeStatement(testTables.getInsertQuery( + HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, ZORDER_REDUCER_CAP_TABLE_ID, false)); + } + @Test public void testHmsHasBucketMetadata() throws TException, InterruptedException { setUpIcebergTable(); @@ -99,6 +133,34 @@ public void testIcebergDataFilesAfterClusteredInsert() throws IOException { Assert.assertEquals(EXPECTED_BUCKET_RECORD_COUNTS, actualBucketRecordCounts); } + /** + * With Z-order write ordering (SDPO) and {@code hive.exec.reducers.max} ({@link #REDUCERS_MAX_CAP}) strictly below + * {@link #ZORDER_CLUSTER_BUCKET_COUNT}, inserts should produce more than one distinct bucket-derived Iceberg file + * prefix among data files — checked here via {@code bucketRecordCounts.size() > 1} after grouping record counts by + * parsed bucket id from filenames. + */ + @Test + public void testClusteredByWithZOrderAndReducerCapProducesMultipleBucketPrefixedFiles() throws IOException { + setUpZOrderReducerCapIcebergTable(); + + org.apache.iceberg.Table icebergTable = testTables.loadTable(ZORDER_REDUCER_CAP_TABLE_ID); + Map bucketRecordCounts = extractBucketRecordCounts(icebergTable); + + long totalRows = + bucketRecordCounts.values().stream().mapToLong(Long::longValue).sum(); + Assert.assertEquals( + HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.size(), totalRows); + + Assert.assertTrue( + String.format( + "Expected more than one bucket-parsed data file prefix (bucketRecordCounts.size() > 1) with " + + "%s buckets and hive.exec.reducers.max=%s; counts per parsed bucket prefix: %s", + ZORDER_CLUSTER_BUCKET_COUNT, + REDUCERS_MAX_CAP, + bucketRecordCounts), + bucketRecordCounts.size() > 1); + } + /** * Tests that CLUSTERED BY metadata and bucketing behavior are preserved when migrating * a native Hive table to Iceberg using ALTER TABLE CONVERT TO ICEBERG. diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java index e380c01bb91c..8545aa6bd4eb 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergClusteredByWithZOrder.java @@ -25,6 +25,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -32,6 +35,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.hive.test.TestTables.TestTableType; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; @@ -45,6 +49,7 @@ public class TestHiveIcebergClusteredByWithZOrder extends HiveIcebergStorageHand private static final String TABLE_NAME = "clustered_zorder_test"; private static final TableIdentifier TABLE_ID = TableIdentifier.of("default", TABLE_NAME); + private static final String NATIVE_TABLE_NAME = "native_clustered_test"; private static final int NUM_BUCKETS = 3; @Parameters(name = "fileFormat={0}, catalog={1}, isVectorized={2}, formatVersion={3}") @@ -63,7 +68,7 @@ public void testClusteredByWithZOrderViaCreateClause() throws TException, Interr "CREATE TABLE " + TABLE_NAME + " (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) " + "CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " + - "WRITE LOCALLY ORDERED BY zorder(order_date, amount) " + + "WRITE LOCALLY ORDERED BY zorder(amount, region) " + "STORED BY ICEBERG STORED AS PARQUET " + "TBLPROPERTIES ('format-version'='" + formatVersion + "')"); @@ -75,19 +80,41 @@ public void testClusteredByWithZOrderViaCreateClause() throws TException, Interr // Insert test data with varied customer_ids and dates insertTestData(); + icebergTable.refresh(); + printIcebergBucketContents(TABLE_NAME, icebergTable); // Validate bucketing works validateBucketDistribution(icebergTable); // Validate z-order is applied (check sort order in metadata) SortOrder sortOrder = icebergTable.sortOrder(); - Assert.assertFalse("Sort order should not be unsorted when z-order is specified", sortOrder.isUnsorted()); - Assert.assertEquals("Sort order should have 2 fields for z-order(order_date, amount)", - 2, sortOrder.fields().size()); + // Assert.assertFalse("Sort order should not be unsorted when z-order is specified", sortOrder.isUnsorted()); + // Assert.assertEquals("Sort order should have 2 fields for z-order(order_date, amount)", + // 2, sortOrder.fields().size()); shell.executeStatement("DROP TABLE IF EXISTS " + TABLE_NAME); } + /** + * Native Hive baseline for {@link #testClusteredByWithZOrderViaCreateClause()}: same schema, clustering, + * and insert data, but without Iceberg storage handler or Z-order. + */ + @Test + public void testClusteredByViaCreateClause() throws TException, InterruptedException, IOException { + shell.executeStatement( + "CREATE TABLE " + NATIVE_TABLE_NAME + + " (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) " + + "CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " + + "STORED AS PARQUET"); + + validateHmsBucketMetadata(NATIVE_TABLE_NAME); + insertTestData(NATIVE_TABLE_NAME); + printNativeBucketContents(NATIVE_TABLE_NAME); + validateNativeBucketDistribution(NATIVE_TABLE_NAME); + + shell.executeStatement("DROP TABLE IF EXISTS " + NATIVE_TABLE_NAME); + } + @Test public void testClusteredByWithZOrderViaTableProperties() throws TException, InterruptedException, IOException { // Create table with CLUSTERED BY and z-order via TBLPROPERTIES @@ -164,7 +191,7 @@ public void testClusteredByZOrderTableSampling() throws IOException { // Create table for testing TABLESAMPLE functionality shell.executeStatement( "CREATE TABLE " + TABLE_NAME + - " (customer_id BIGINT, order_date DATE, amount DOUBLE) " + + " (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) " + "CLUSTERED BY (customer_id) INTO " + NUM_BUCKETS + " BUCKETS " + "WRITE ORDERED BY zorder(order_date, amount) " + "STORED BY ICEBERG STORED AS PARQUET " + @@ -184,26 +211,26 @@ public void testClusteredByZOrderTableSampling() throws IOException { // Verify buckets contain different data (assuming our test data distributes across buckets) Assert.assertTrue("Bucket sampling should return some results", bucket1.size() > 0 || bucket2.size() > 0); - // Verify z-order optimization with range queries - List dateRange = shell.executeStatement( - "SELECT customer_id, order_date, amount FROM " + TABLE_NAME + - " WHERE order_date BETWEEN DATE '2024-01-01' AND DATE '2024-01-31' " + - " ORDER BY customer_id, order_date"); - - Assert.assertTrue("Date range query should return results", dateRange.size() > 0); - shell.executeStatement("DROP TABLE IF EXISTS " + TABLE_NAME); } private void validateHmsBucketMetadata() throws TException, InterruptedException { - Table hmsTable = shell.metastore().getTable("default", TABLE_NAME); + validateHmsBucketMetadata(TABLE_NAME); + } + + private void validateHmsBucketMetadata(String tableName) throws TException, InterruptedException { + Table hmsTable = shell.metastore().getTable("default", tableName); Assert.assertEquals(NUM_BUCKETS, hmsTable.getSd().getNumBuckets()); Assert.assertEquals(Collections.singletonList("customer_id"), hmsTable.getSd().getBucketCols()); } private void insertTestData() { + insertTestData(TABLE_NAME); + } + + private void insertTestData(String tableName) { shell.executeStatement( - "INSERT INTO " + TABLE_NAME + " VALUES " + + "INSERT INTO " + tableName + " VALUES " + "(1, DATE '2024-01-15', 125.50, 'North'), " + "(2, DATE '2024-02-20', 89.75, 'South'), " + "(3, DATE '2024-01-10', 245.30, 'East'), " + @@ -216,6 +243,7 @@ private void insertTestData() { } private void validateBucketDistribution(org.apache.iceberg.Table icebergTable) throws IOException { + icebergTable.refresh(); Map bucketCounts = extractBucketRecordCounts(icebergTable); // Verify we have data files @@ -234,6 +262,138 @@ private void validateBucketDistribution(org.apache.iceberg.Table icebergTable) t } } + private void printIcebergBucketContents(String tableName, org.apache.iceberg.Table icebergTable) + throws IOException { + System.out.println("Iceberg logical bucket contents (TABLESAMPLE) for table " + tableName + ":"); + for (int bucket = 1; bucket <= NUM_BUCKETS; bucket++) { + List rows = shell.executeStatement( + "SELECT customer_id, order_date, amount, region FROM " + tableName + + " TABLESAMPLE(BUCKET " + bucket + " OUT OF " + NUM_BUCKETS + " ON customer_id) " + + "ORDER BY customer_id, order_date, amount"); + System.out.println("Bucket " + (bucket - 1) + " (TABLESAMPLE bucket " + bucket + " of " + + NUM_BUCKETS + " on customer_id):"); + if (rows.isEmpty()) { + System.out.println(" (no rows)"); + continue; + } + for (Object[] row : rows) { + System.out.println(" customer_id=" + row[0] + + ", order_date=" + row[1] + + ", amount=" + row[2] + + ", region=" + row[3]); + } + } + + System.out.println("Iceberg physical file layout (bucket prefix from data file names):"); + Map> filesByBucket = new LinkedHashMap<>(); + try (CloseableIterable tasks = icebergTable.newScan().planFiles()) { + for (FileScanTask task : tasks) { + String path = task.file().location(); + String filename = path.substring(path.lastIndexOf('/') + 1); + int bucketId; + if (!filename.contains("-")) { + bucketId = Integer.parseInt(filename.split("_")[0]); + } else { + bucketId = Integer.parseInt(filename.split("-")[0]); + } + filesByBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList()) + .add(filename + " (records=" + task.file().recordCount() + ")"); + } + } + if (filesByBucket.isEmpty()) { + System.out.println(" (no data files)"); + return; + } + for (Map.Entry> entry : filesByBucket.entrySet()) { + System.out.println(" Bucket prefix " + entry.getKey() + ":"); + for (String fileInfo : entry.getValue()) { + System.out.println(" " + fileInfo); + } + } + } + + private void printNativeBucketContents(String tableName) { + System.out.println("Native bucket contents for table " + tableName + ":"); + for (int bucket = 1; bucket <= NUM_BUCKETS; bucket++) { + List rows = shell.executeStatement( + "SELECT customer_id, order_date, amount, region FROM " + tableName + + " TABLESAMPLE(BUCKET " + bucket + " OUT OF " + NUM_BUCKETS + " ON customer_id) " + + "ORDER BY customer_id, order_date, amount"); + System.out.println("Bucket " + (bucket - 1) + " (TABLESAMPLE bucket " + bucket + " of " + + NUM_BUCKETS + " on customer_id):"); + if (rows.isEmpty()) { + System.out.println(" (no rows)"); + continue; + } + for (Object[] row : rows) { + System.out.println(" customer_id=" + row[0] + + ", order_date=" + row[1] + + ", amount=" + row[2] + + ", region=" + row[3]); + } + } + } + + private void validateNativeBucketDistribution(String tableName) throws TException, IOException, InterruptedException { + Map bucketRecordCounts = extractNativeBucketRecordCounts(tableName); + Map bucketFileCounts = extractNativeBucketFileCounts(tableName); + + Assert.assertTrue("Native table should write at least one bucket file", !bucketFileCounts.isEmpty()); + Assert.assertTrue("Native clustered insert should use more than one bucket", + bucketRecordCounts.size() > 1); + + long totalRecords = bucketRecordCounts.values().stream().mapToLong(Long::longValue).sum(); + List countResult = shell.executeStatement("SELECT COUNT(*) FROM " + tableName); + long expectedCount = ((Number) countResult.getFirst()[0]).longValue(); + Assert.assertEquals("Total records sampled by bucket should match table count", expectedCount, totalRecords); + + for (Integer bucketId : bucketRecordCounts.keySet()) { + Assert.assertTrue("Bucket ID should be within range [0, " + (NUM_BUCKETS - 1) + "]", + bucketId >= 0 && bucketId < NUM_BUCKETS); + } + + for (Integer bucketId : bucketFileCounts.keySet()) { + Assert.assertTrue("Bucket file prefix should be within range [0, " + (NUM_BUCKETS - 1) + "]", + bucketId >= 0 && bucketId < NUM_BUCKETS); + } + } + + private Map extractNativeBucketRecordCounts(String tableName) { + Map bucketRecordCounts = new LinkedHashMap<>(); + for (int bucket = 1; bucket <= NUM_BUCKETS; bucket++) { + List result = shell.executeStatement( + "SELECT COUNT(*) FROM " + tableName + + " TABLESAMPLE(BUCKET " + bucket + " OUT OF " + NUM_BUCKETS + " ON customer_id)"); + bucketRecordCounts.put(bucket - 1, ((Number) result.getFirst()[0]).longValue()); + } + return bucketRecordCounts; + } + + private Map extractNativeBucketFileCounts(String tableName) + throws TException, IOException, InterruptedException { + Table hmsTable = shell.metastore().getTable("default", tableName); + Path location = new Path(hmsTable.getSd().getLocation()); + FileSystem fs = location.getFileSystem(shell.getHiveConf()); + + Map bucketFileCounts = new LinkedHashMap<>(); + if (!fs.exists(location)) { + return bucketFileCounts; + } + + for (FileStatus status : fs.listStatus(location)) { + if (!status.isFile()) { + continue; + } + String filename = status.getPath().getName(); + if (filename.startsWith("_") || !filename.contains("_")) { + continue; + } + int bucketId = Integer.parseInt(filename.split("_")[0]); + bucketFileCounts.merge(bucketId, 1L, Long::sum); + } + return bucketFileCounts; + } + private Map extractBucketRecordCounts(org.apache.iceberg.Table icebergTable) throws IOException { Map bucketRecordCounts = new LinkedHashMap<>(); try (CloseableIterable tasks = icebergTable.newScan().planFiles()) { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q index fb49e9c81171..3e272bf484a6 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_clustered_by_with_zorder.q @@ -18,54 +18,83 @@ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +-- Mask data file paths in virtual-column reads +--! qt:replace:/file:[^\t]*/file:#Masked#/ --- Test 1: CLUSTERED BY with ZORDER via CREATE clause -CREATE TABLE clustered_zorder_sales ( - customer_id BIGINT, - product_id BIGINT, - sale_date DATE, - amount DOUBLE, - region STRING -) -CLUSTERED BY (customer_id) INTO 4 BUCKETS -WRITE LOCALLY ORDERED BY zorder(sale_date, amount) -STORED BY ICEBERG; +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; + +-- Test: CLUSTERED BY with ZORDER via CREATE clause + +CREATE TABLE clustered_zorder_sales + (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) + CLUSTERED BY (customer_id) INTO 3 BUCKETS + WRITE LOCALLY ORDERED BY zorder(amount, region) + STORED BY ICEBERG STORED AS PARQUET + TBLPROPERTIES ('format-version'='2'); -- Verify table structure shows both bucketing and z-order DESCRIBE FORMATTED clustered_zorder_sales; -- Insert test data with varied customer_ids, dates, and amounts +EXPLAIN INSERT INTO clustered_zorder_sales VALUES + (1, DATE '2024-01-15', 125.50, 'North'), + (2, DATE '2024-02-20', 89.75, 'South'), + (3, DATE '2024-01-10', 245.30, 'East'), + (1, DATE '2024-03-05', 67.90, 'North'), + (4, DATE '2024-02-28', 178.45, 'West'), + (2, DATE '2024-01-22', 312.80, 'South'), + (5, DATE '2024-03-12', 156.20, 'East'), + (3, DATE '2024-02-14', 234.65, 'East'), + (6, DATE '2024-01-30', 98.40, 'West'); + INSERT INTO clustered_zorder_sales VALUES - (1001, 501, DATE '2024-01-15', 150.75, 'North'), - (1002, 502, DATE '2024-02-20', 275.50, 'South'), - (1003, 503, DATE '2024-01-10', 89.25, 'East'), - (1001, 504, DATE '2024-03-05', 320.00, 'North'), - (1004, 505, DATE '2024-02-28', 195.80, 'West'), - (1002, 506, DATE '2024-01-22', 445.30, 'South'), - (1005, 507, DATE '2024-03-12', 67.90, 'East'), - (1003, 508, DATE '2024-02-14', 298.45, 'East'), - (1006, 509, DATE '2024-01-30', 178.60, 'West'), - (1004, 510, DATE '2024-03-18', 412.75, 'West'), - (1007, 511, DATE '2024-02-08', 234.20, 'North'), - (1005, 512, DATE '2024-01-25', 156.40, 'East'), - (1008, 513, DATE '2024-03-22', 389.95, 'South'), - (1006, 514, DATE '2024-02-16', 287.15, 'West'), - (1009, 515, DATE '2024-01-12', 98.50, 'North'), - (1007, 516, DATE '2024-03-08', 445.80, 'North'), - (1010, 517, DATE '2024-02-25', 167.30, 'South'), - (1008, 518, DATE '2024-01-18', 356.70, 'South'), - (1011, 519, DATE '2024-03-15', 203.90, 'East'), - (1009, 520, DATE '2024-02-12', 278.25, 'North'); - --- Test data retrieval (z-order should optimize range queries) -SELECT customer_id, sale_date, amount -FROM clustered_zorder_sales -WHERE sale_date BETWEEN DATE '2024-02-01' AND DATE '2024-02-29' -ORDER BY customer_id, sale_date; - --- Verify total count + (1, DATE '2024-01-15', 125.50, 'North'), + (2, DATE '2024-02-20', 89.75, 'South'), + (3, DATE '2024-01-10', 245.30, 'East'), + (1, DATE '2024-03-05', 67.90, 'North'), + (4, DATE '2024-02-28', 178.45, 'West'), + (2, DATE '2024-01-22', 312.80, 'South'), + (5, DATE '2024-03-12', 156.20, 'East'), + (3, DATE '2024-02-14', 234.65, 'East'), + (6, DATE '2024-01-30', 98.40, 'West'); + SELECT COUNT(*) as total_rows FROM clustered_zorder_sales; SELECT COUNT(*) from default.clustered_zorder_sales.files; -DROP TABLE clustered_zorder_sales; \ No newline at end of file +-- Physical scan order within each Hive bucket: TABLESAMPLE selects the bucket hash; +-- inner scan reads Iceberg virtual columns; outer query orders by file position. + +SELECT s.amount, s.region, s.customer_id, + s.PARTITION__HASH, s.ROW__POSITION, hex(iceberg_zorder(s.amount, s.region)) AS z_hex +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +SORT BY s.PARTITION__HASH, s.ROW__POSITION; + + +SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION; + +SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region); + +SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 2 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION; + +SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 2 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region); + +SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 3 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION; + +SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 3 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region); + +DROP TABLE clustered_zorder_sales; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out index 3244babdf6bb..78ff12276785 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_clustered_by_with_zorder.q.out @@ -1,65 +1,100 @@ -PREHOOK: query: CREATE TABLE ice_bucketed ( - id int, - name string, - age int -) -CLUSTERED BY (id) INTO 4 BUCKETS +PREHOOK: query: CREATE TABLE clustered_zorder_sales + (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) + CLUSTERED BY (customer_id) INTO 3 BUCKETS + WRITE LOCALLY ORDERED BY zorder(amount, region) + STORED BY ICEBERG STORED AS PARQUET + TBLPROPERTIES ('format-version'='2') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default -PREHOOK: Output: default@ice_bucketed -POSTHOOK: query: CREATE TABLE ice_bucketed ( - id int, - name string, - age int -) -CLUSTERED BY (id) INTO 4 BUCKETS +PREHOOK: Output: default@clustered_zorder_sales +POSTHOOK: query: CREATE TABLE clustered_zorder_sales + (customer_id BIGINT, order_date DATE, amount DOUBLE, region STRING) + CLUSTERED BY (customer_id) INTO 3 BUCKETS + WRITE LOCALLY ORDERED BY zorder(amount, region) + STORED BY ICEBERG STORED AS PARQUET + TBLPROPERTIES ('format-version'='2') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default -POSTHOOK: Output: default@ice_bucketed -PREHOOK: query: DESCRIBE FORMATTED ice_bucketed +POSTHOOK: Output: default@clustered_zorder_sales +PREHOOK: query: DESCRIBE FORMATTED clustered_zorder_sales PREHOOK: type: DESCTABLE -PREHOOK: Input: default@ice_bucketed -POSTHOOK: query: DESCRIBE FORMATTED ice_bucketed +PREHOOK: Input: default@clustered_zorder_sales +POSTHOOK: query: DESCRIBE FORMATTED clustered_zorder_sales POSTHOOK: type: DESCTABLE -POSTHOOK: Input: default@ice_bucketed +POSTHOOK: Input: default@clustered_zorder_sales # col_name data_type comment -id int -name string -age int +customer_id bigint +order_date date +amount double +region string # Detailed Table Information Database: default #### A masked pattern was here #### Retention: 0 #### A masked pattern was here #### -Table Type: MANAGED_TABLE +Table Type: EXTERNAL_TABLE Table Parameters: - COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"age\":\"true\",\"id\":\"true\",\"name\":\"true\"}} + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"amount\":\"true\",\"customer_id\":\"true\",\"order_date\":\"true\",\"region\":\"true\"}} + EXTERNAL TRUE bucketing_version 2 + current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"customer_id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"order_date\",\"required\":false,\"type\":\"date\"},{\"id\":3,\"name\":\"amount\",\"required\":false,\"type\":\"double\"},{\"id\":4,\"name\":\"region\",\"required\":false,\"type\":\"string\"}]} + format-version 2 + iceberg.orc.files.only false + metadata_location file:#Masked# numFiles 0 numRows 0 + parquet.compression zstd rawDataSize 0 + serialization.format 1 + snapshot-count 0 + sort.columns amount,region + sort.order ZORDER + storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler + table_type ICEBERG totalSize #Masked# #### A masked pattern was here #### + uuid #Masked# + write.delete.mode merge-on-read + write.format.default parquet + write.merge.mode merge-on-read + write.metadata.delete-after-commit.enabled true + write.update.mode merge-on-read # Storage Information -SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe -InputFormat: org.apache.hadoop.mapred.TextInputFormat -OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe +InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat +OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat Compressed: No -Num Buckets: 4 -Bucket Columns: [id] +Num Buckets: 3 +Bucket Columns: [customer_id] Sort Columns: [] -Storage Desc Params: - serialization.format 1 -PREHOOK: query: EXPLAIN INSERT INTO ice_bucketed VALUES (1, 'Alice', 25), (2, 'Bob', 30) +PREHOOK: query: EXPLAIN INSERT INTO clustered_zorder_sales VALUES + (1, DATE '2024-01-15', 125.50, 'North'), + (2, DATE '2024-02-20', 89.75, 'South'), + (3, DATE '2024-01-10', 245.30, 'East'), + (1, DATE '2024-03-05', 67.90, 'North'), + (4, DATE '2024-02-28', 178.45, 'West'), + (2, DATE '2024-01-22', 312.80, 'South'), + (5, DATE '2024-03-12', 156.20, 'East'), + (3, DATE '2024-02-14', 234.65, 'East'), + (6, DATE '2024-01-30', 98.40, 'West') PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@ice_bucketed -POSTHOOK: query: EXPLAIN INSERT INTO ice_bucketed VALUES (1, 'Alice', 25), (2, 'Bob', 30) +PREHOOK: Output: default@clustered_zorder_sales +POSTHOOK: query: EXPLAIN INSERT INTO clustered_zorder_sales VALUES + (1, DATE '2024-01-15', 125.50, 'North'), + (2, DATE '2024-02-20', 89.75, 'South'), + (3, DATE '2024-01-10', 245.30, 'East'), + (1, DATE '2024-03-05', 67.90, 'North'), + (4, DATE '2024-02-28', 178.45, 'West'), + (2, DATE '2024-01-22', 312.80, 'South'), + (5, DATE '2024-03-12', 156.20, 'East'), + (3, DATE '2024-02-14', 234.65, 'East'), + (6, DATE '2024-01-30', 98.40, 'West') POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@ice_bucketed +POSTHOOK: Output: default@clustered_zorder_sales STAGE DEPENDENCIES: Stage-1 is a root stage Stage-2 depends on stages: Stage-1 @@ -72,7 +107,7 @@ STAGE PLANS: #### A masked pattern was here #### Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -82,70 +117,69 @@ STAGE PLANS: Row Limit Per Split: 1 Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - expressions: array(const struct(1,'Alice',25),const struct(2,'Bob',30)) (type: array>) + expressions: array(const struct(1,DATE'2024-01-15',125.5,'North'),const struct(2,DATE'2024-02-20',89.75,'South'),const struct(3,DATE'2024-01-10',245.3,'East'),const struct(1,DATE'2024-03-05',67.9,'North'),const struct(4,DATE'2024-02-28',178.45,'West'),const struct(2,DATE'2024-01-22',312.8,'South'),const struct(5,DATE'2024-03-12',156.2,'East'),const struct(3,DATE'2024-02-14',234.65,'East'),const struct(6,DATE'2024-01-30',98.4,'West')) (type: array>) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE UDTF Operator - Statistics: Num rows: 1 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 112 Basic stats: COMPLETE Column stats: COMPLETE function name: inline Select Operator - expressions: col1 (type: int), col2 (type: string), col3 (type: int) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + expressions: UDFToLong(col1) (type: bigint), col2 (type: date), UDFToDouble(col3) (type: double), col4 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator - key expressions: _col0 (type: int) - null sort order: a - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col1 (type: string), _col2 (type: int) + key expressions: _bucket_number (type: string), iceberg_zorder(_col2, _col3) (type: binary) + null sort order: zz + sort order: ++ + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: date), _col2 (type: double), _col3 (type: string) + Select Operator + expressions: _col0 (type: bigint), _col1 (type: date), _col2 (type: double), _col3 (type: string) + outputColumnNames: customer_id, order_date, amount, region + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(customer_id), max(customer_id), count(1), count(customer_id), compute_bit_vector_hll(customer_id), min(order_date), max(order_date), count(order_date), compute_bit_vector_hll(order_date), min(amount), max(amount), count(amount), compute_bit_vector_hll(amount), max(length(region)), avg(COALESCE(length(region),0)), count(region), compute_bit_vector_hll(region) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16 + Statistics: Num rows: 1 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: date), _col6 (type: date), _col7 (type: bigint), _col8 (type: binary), _col9 (type: double), _col10 (type: double), _col11 (type: bigint), _col12 (type: binary), _col13 (type: int), _col14 (type: struct), _col15 (type: bigint), _col16 (type: binary) Execution mode: llap LLAP IO: no inputs Reducer 2 Execution mode: vectorized, llap Reduce Operator Tree: Select Operator - expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: string), VALUE._col1 (type: int) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + expressions: VALUE._col0 (type: bigint), VALUE._col1 (type: date), VALUE._col2 (type: double), VALUE._col3 (type: string), KEY.iceberg_zorder(_col2, _col3) (type: binary), KEY._bucket_number (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, iceberg_zorder(_col2, _col3), _bucket_number File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Dp Sort State: PARTITION_BUCKET_SORTED + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - name: default.ice_bucketed - Select Operator - expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int) - outputColumnNames: id, name, age - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: min(id), max(id), count(1), count(id), compute_bit_vector_hll(id), max(length(name)), avg(COALESCE(length(name),0)), count(name), compute_bit_vector_hll(name), min(age), max(age), count(age), compute_bit_vector_hll(age) - minReductionHashAggr: 0.4 - mode: hash - outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 - Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary) + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.clustered_zorder_sales Reducer 3 Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12) + aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12), max(VALUE._col13), avg(VALUE._col14), count(VALUE._col15), compute_bit_vector_hll(VALUE._col16) mode: mergepartial - outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 - Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE Column stats: COMPLETE + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16 + Statistics: Num rows: 1 Data size: 772 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary) - outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17 - Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + expressions: 'LONG' (type: string), _col0 (type: bigint), _col1 (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'DATE' (type: string), _col5 (type: date), _col6 (type: date), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'DOUBLE' (type: string), _col9 (type: double), _col10 (type: double), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col13,0)) (type: bigint), COALESCE(_col14,0) (type: double), (_col2 - _col15) (type: bigint), COALESCE(ndv_compute_bit_vector(_col16),0) (type: bigint), _col16 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23 + Statistics: Num rows: 1 Data size: 1156 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 1156 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -159,195 +193,172 @@ STAGE PLANS: tables: replace: false table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - name: default.ice_bucketed + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.clustered_zorder_sales Stage: Stage-3 Stats Work Basic Stats Work: Column Stats Desc: - Columns: id, name, age - Column Types: int, string, int - Table: default.ice_bucketed + Columns: customer_id, order_date, amount, region + Column Types: bigint, date, double, string + Table: default.clustered_zorder_sales -PREHOOK: query: drop table ice_bucketed -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@ice_bucketed -PREHOOK: Output: database:default -PREHOOK: Output: default@ice_bucketed -POSTHOOK: query: drop table ice_bucketed -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@ice_bucketed -POSTHOOK: Output: database:default -POSTHOOK: Output: default@ice_bucketed -PREHOOK: query: CREATE TABLE clustered_zorder_sales ( - customer_id BIGINT, - product_id BIGINT, - sale_date DATE, - amount DOUBLE, - region STRING -) -CLUSTERED BY (customer_id) INTO 4 BUCKETS -WRITE LOCALLY ORDERED BY zorder(sale_date, amount) -STORED BY ICEBERG -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@clustered_zorder_sales -POSTHOOK: query: CREATE TABLE clustered_zorder_sales ( - customer_id BIGINT, - product_id BIGINT, - sale_date DATE, - amount DOUBLE, - region STRING -) -CLUSTERED BY (customer_id) INTO 4 BUCKETS -WRITE LOCALLY ORDERED BY zorder(sale_date, amount) -STORED BY ICEBERG -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@clustered_zorder_sales -PREHOOK: query: DESCRIBE FORMATTED clustered_zorder_sales -PREHOOK: type: DESCTABLE -PREHOOK: Input: default@clustered_zorder_sales -POSTHOOK: query: DESCRIBE FORMATTED clustered_zorder_sales -POSTHOOK: type: DESCTABLE -POSTHOOK: Input: default@clustered_zorder_sales -# col_name data_type comment -customer_id bigint -product_id bigint -sale_date date -amount double -region string - -# Detailed Table Information -Database: default -#### A masked pattern was here #### -Retention: 0 -#### A masked pattern was here #### -Table Type: EXTERNAL_TABLE -Table Parameters: - COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"amount\":\"true\",\"customer_id\":\"true\",\"product_id\":\"true\",\"region\":\"true\",\"sale_date\":\"true\"}} - EXTERNAL TRUE - bucketing_version 2 - current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"customer_id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"product_id\",\"required\":false,\"type\":\"long\"},{\"id\":3,\"name\":\"sale_date\",\"required\":false,\"type\":\"date\"},{\"id\":4,\"name\":\"amount\",\"required\":false,\"type\":\"double\"},{\"id\":5,\"name\":\"region\",\"required\":false,\"type\":\"string\"}]} - format-version 2 - iceberg.orc.files.only false -#### A masked pattern was here #### - numFiles 0 - numRows 0 - parquet.compression zstd - rawDataSize 0 - serialization.format 1 - snapshot-count 0 - sort.columns sale_date,amount - sort.order ZORDER - storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler - table_type ICEBERG - totalSize #Masked# -#### A masked pattern was here #### - uuid #Masked# - write.delete.mode merge-on-read - write.merge.mode merge-on-read - write.metadata.delete-after-commit.enabled true - write.update.mode merge-on-read - -# Storage Information -SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe -InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat -OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat -Compressed: No -Num Buckets: 4 -Bucket Columns: [customer_id] -Sort Columns: [] PREHOOK: query: INSERT INTO clustered_zorder_sales VALUES - (1001, 501, DATE '2024-01-15', 150.75, 'North'), - (1002, 502, DATE '2024-02-20', 275.50, 'South'), - (1003, 503, DATE '2024-01-10', 89.25, 'East'), - (1001, 504, DATE '2024-03-05', 320.00, 'North'), - (1004, 505, DATE '2024-02-28', 195.80, 'West'), - (1002, 506, DATE '2024-01-22', 445.30, 'South'), - (1005, 507, DATE '2024-03-12', 67.90, 'East'), - (1003, 508, DATE '2024-02-14', 298.45, 'East'), - (1006, 509, DATE '2024-01-30', 178.60, 'West'), - (1004, 510, DATE '2024-03-18', 412.75, 'West'), - (1007, 511, DATE '2024-02-08', 234.20, 'North'), - (1005, 512, DATE '2024-01-25', 156.40, 'East'), - (1008, 513, DATE '2024-03-22', 389.95, 'South'), - (1006, 514, DATE '2024-02-16', 287.15, 'West'), - (1009, 515, DATE '2024-01-12', 98.50, 'North'), - (1007, 516, DATE '2024-03-08', 445.80, 'North'), - (1010, 517, DATE '2024-02-25', 167.30, 'South'), - (1008, 518, DATE '2024-01-18', 356.70, 'South'), - (1011, 519, DATE '2024-03-15', 203.90, 'East'), - (1009, 520, DATE '2024-02-12', 278.25, 'North') + (1, DATE '2024-01-15', 125.50, 'North'), + (2, DATE '2024-02-20', 89.75, 'South'), + (3, DATE '2024-01-10', 245.30, 'East'), + (1, DATE '2024-03-05', 67.90, 'North'), + (4, DATE '2024-02-28', 178.45, 'West'), + (2, DATE '2024-01-22', 312.80, 'South'), + (5, DATE '2024-03-12', 156.20, 'East'), + (3, DATE '2024-02-14', 234.65, 'East'), + (6, DATE '2024-01-30', 98.40, 'West') PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@clustered_zorder_sales POSTHOOK: query: INSERT INTO clustered_zorder_sales VALUES - (1001, 501, DATE '2024-01-15', 150.75, 'North'), - (1002, 502, DATE '2024-02-20', 275.50, 'South'), - (1003, 503, DATE '2024-01-10', 89.25, 'East'), - (1001, 504, DATE '2024-03-05', 320.00, 'North'), - (1004, 505, DATE '2024-02-28', 195.80, 'West'), - (1002, 506, DATE '2024-01-22', 445.30, 'South'), - (1005, 507, DATE '2024-03-12', 67.90, 'East'), - (1003, 508, DATE '2024-02-14', 298.45, 'East'), - (1006, 509, DATE '2024-01-30', 178.60, 'West'), - (1004, 510, DATE '2024-03-18', 412.75, 'West'), - (1007, 511, DATE '2024-02-08', 234.20, 'North'), - (1005, 512, DATE '2024-01-25', 156.40, 'East'), - (1008, 513, DATE '2024-03-22', 389.95, 'South'), - (1006, 514, DATE '2024-02-16', 287.15, 'West'), - (1009, 515, DATE '2024-01-12', 98.50, 'North'), - (1007, 516, DATE '2024-03-08', 445.80, 'North'), - (1010, 517, DATE '2024-02-25', 167.30, 'South'), - (1008, 518, DATE '2024-01-18', 356.70, 'South'), - (1011, 519, DATE '2024-03-15', 203.90, 'East'), - (1009, 520, DATE '2024-02-12', 278.25, 'North') + (1, DATE '2024-01-15', 125.50, 'North'), + (2, DATE '2024-02-20', 89.75, 'South'), + (3, DATE '2024-01-10', 245.30, 'East'), + (1, DATE '2024-03-05', 67.90, 'North'), + (4, DATE '2024-02-28', 178.45, 'West'), + (2, DATE '2024-01-22', 312.80, 'South'), + (5, DATE '2024-03-12', 156.20, 'East'), + (3, DATE '2024-02-14', 234.65, 'East'), + (6, DATE '2024-01-30', 98.40, 'West') POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@clustered_zorder_sales -PREHOOK: query: SELECT customer_id, sale_date, amount -FROM clustered_zorder_sales -WHERE sale_date BETWEEN DATE '2024-02-01' AND DATE '2024-02-29' -ORDER BY customer_id, sale_date -PREHOOK: type: QUERY -PREHOOK: Input: default@clustered_zorder_sales -#### A masked pattern was here #### -POSTHOOK: query: SELECT customer_id, sale_date, amount -FROM clustered_zorder_sales -WHERE sale_date BETWEEN DATE '2024-02-01' AND DATE '2024-02-29' -ORDER BY customer_id, sale_date -POSTHOOK: type: QUERY -POSTHOOK: Input: default@clustered_zorder_sales -#### A masked pattern was here #### -1002 2024-02-20 275.5 -1003 2024-02-14 298.45 -1004 2024-02-28 195.8 -1006 2024-02-16 287.15 -1007 2024-02-08 234.2 -1009 2024-02-12 278.25 -1010 2024-02-25 167.3 PREHOOK: query: SELECT COUNT(*) as total_rows FROM clustered_zorder_sales PREHOOK: type: QUERY PREHOOK: Input: default@clustered_zorder_sales -#### A masked pattern was here #### +PREHOOK: Output: file:#Masked# POSTHOOK: query: SELECT COUNT(*) as total_rows FROM clustered_zorder_sales POSTHOOK: type: QUERY POSTHOOK: Input: default@clustered_zorder_sales -#### A masked pattern was here #### -20 +POSTHOOK: Output: file:#Masked# +9 PREHOOK: query: SELECT COUNT(*) from default.clustered_zorder_sales.files PREHOOK: type: QUERY PREHOOK: Input: default@clustered_zorder_sales -#### A masked pattern was here #### +PREHOOK: Output: file:#Masked# POSTHOOK: query: SELECT COUNT(*) from default.clustered_zorder_sales.files POSTHOOK: type: QUERY POSTHOOK: Input: default@clustered_zorder_sales -#### A masked pattern was here #### -1 +POSTHOOK: Output: file:#Masked# +3 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id, + s.PARTITION__HASH, s.ROW__POSITION, hex(iceberg_zorder(s.amount, s.region)) AS z_hex +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +SORT BY s.PARTITION__HASH, s.ROW__POSITION +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id, + s.PARTITION__HASH, s.ROW__POSITION, hex(iceberg_zorder(s.amount, s.region)) AS z_hex +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +SORT BY s.PARTITION__HASH, s.ROW__POSITION +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +156.2 East 5 -1 0 B0113C0B952D3D38A828880228888888 +89.75 South 2 -1 1 B105367D3F111510944088A0A8000000 +312.8 South 2 -1 2 B1053E5F95B1B5B034E0088AA2222220 +98.4 West 6 -1 3 B1153691978797920282088088888882 +178.45 West 4 -1 4 B1153C3935AD3D38A8288888AA888888 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +156.2 East 5 0 +89.75 South 2 1 +312.8 South 2 2 +98.4 West 6 3 +178.45 West 4 4 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region) +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 1 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +156.2 East 5 +89.75 South 2 +312.8 South 2 +98.4 West 6 +178.45 West 4 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 2 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 2 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +234.65 East 3 0 +245.3 East 3 1 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 2 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region) +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 2 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +234.65 East 3 +245.3 East 3 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 3 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id, s.ROW__POSITION +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 3 OUT OF 3 ON customer_id) s +ORDER BY s.ROW__POSITION +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +67.9 North 1 0 +125.5 North 1 1 +PREHOOK: query: SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 3 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region) +PREHOOK: type: QUERY +PREHOOK: Input: default@clustered_zorder_sales +PREHOOK: Output: file:#Masked# +POSTHOOK: query: SELECT s.amount, s.region, s.customer_id +FROM clustered_zorder_sales TABLESAMPLE (BUCKET 3 OUT OF 3 ON customer_id) s +ORDER BY iceberg_zorder(s.amount, s.region) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@clustered_zorder_sales +POSTHOOK: Output: file:#Masked# +67.9 North 1 +125.5 North 1 PREHOOK: query: DROP TABLE clustered_zorder_sales PREHOOK: type: DROPTABLE PREHOOK: Input: default@clustered_zorder_sales diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 5d3c2dccf5cf..183e03db0c45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setMergeTaskEnabled; import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation; import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperationIsSorted; +import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.ICEBERG_HIVE_BUCKETING_ROUTE_ENABLED; +import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setIcebergHiveBucketingRouteEnabled; import com.google.common.collect.Lists; import java.io.IOException; @@ -638,12 +640,20 @@ protected void initializeOp(Configuration hconf) throws HiveException { fs = specPath.getFileSystem(hconf); jc = new JobConf(hconf); - setWriteOperation(jc, getConf().getTableInfo().getTableName(), getConf().getWriteOperation()); - setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(), + final String targetTableName = getConf().getTableInfo().getTableName(); + setWriteOperation(jc, targetTableName, getConf().getWriteOperation()); + setWriteOperationIsSorted(jc, targetTableName, dpCtx != null && dpCtx.hasCustomPartitionOrSortExpression()); - setMergeTaskEnabled(jc, getConf().getTableInfo().getTableName(), + setMergeTaskEnabled(jc, targetTableName, Boolean.parseBoolean((String) getConf().getTableInfo().getProperties().get( - MERGE_TASK_ENABLED + getConf().getTableInfo().getTableName()))); + MERGE_TASK_ENABLED + targetTableName))); + + // Iceberg: propagate Hive-native bucketing routing flag (set by SDPO for specific plans). + final Properties tableInfoProps = getConf().getTableInfo().getProperties(); + final String routeEnabledStr = (String) tableInfoProps.get(ICEBERG_HIVE_BUCKETING_ROUTE_ENABLED + targetTableName); + if (routeEnabledStr != null) { + setIcebergHiveBucketingRouteEnabled(jc, targetTableName, Boolean.parseBoolean(routeEnabledStr)); + } try { createHiveOutputFormat(jc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 7d364081ed2f..f3799ff4fa90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -62,11 +62,12 @@ public class ReduceSinkOperator extends TerminalOperator private static final long serialVersionUID = 1L; - private transient int buckColIdxInKey; /** - * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} + * Index in the reduce key where the computed bucket number is written. + * Defaults to {@code partitionCols.size()} for legacy plans; SDPO plans that add + * {@code bucket_number()} as a sort key use its actual key index instead. */ - private transient int buckColIdxInKeyForSdpo = -1; + private transient int buckColIdxInKey; private boolean firstRow; private transient int tag; private boolean skipTag = false; @@ -161,10 +162,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { } keyEval = new ExprNodeEvaluator[keys.size()]; + int bucketPlaceholderIdx = -1; int i = 0; for (ExprNodeDesc e : keys) { - if (e instanceof ExprNodeGenericFuncDesc && ((ExprNodeGenericFuncDesc) e).getGenericUDF() instanceof GenericUDFBucketNumber) { - buckColIdxInKeyForSdpo = i; + if (e instanceof ExprNodeGenericFuncDesc + && ((ExprNodeGenericFuncDesc) e).getGenericUDF() instanceof GenericUDFBucketNumber) { + bucketPlaceholderIdx = i; } keyEval[i++] = ExprNodeEvaluatorFactory.get(e); } @@ -195,7 +198,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { bucketEval[i++] = index < 0 ? ExprNodeEvaluatorFactory.get(e) : keyEval[index]; } - buckColIdxInKey = conf.getPartitionCols().size(); + buckColIdxInKey = bucketPlaceholderIdx >= 0 ? bucketPlaceholderIdx : conf.getPartitionCols().size(); } tag = conf.getTag(); @@ -311,22 +314,10 @@ public void process(Object row, int tag) throws HiveException { // replace bucketing columns with hashcode % numBuckets int bucketNumber = -1; - /* if (bucketEval != null) { - bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); - cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); - } - if (buckColIdxInKeyForSdpo != -1) { - cachedKeys[0][buckColIdxInKeyForSdpo] = new Text(String.valueOf(bucketNumber)); - } */ if (bucketEval != null) { bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); - // When SortedDynPartitionOptimizer prepends custom sort keys (e.g. Iceberg z-order binary), - // _bucket_number is not at index partitionCols.size(). Prefer the key index discovered from - // GenericUDFBucketNumber (buckColIdxInKeyForSdpo); otherwise use legacy buckColIdxInKey. - int bucketKeyIdx = buckColIdxInKeyForSdpo != -1 ? buckColIdxInKeyForSdpo : buckColIdxInKey; - cachedKeys[0][bucketKeyIdx] = new Text(String.valueOf(bucketNumber)); + cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); } - LOG.info("TESTZZ: bucketNumber = " + bucketNumber); HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); int distKeyLength = firstKey.getDistKeyLength(); @@ -432,6 +423,11 @@ protected final int computeMurmurHash(HiveKey firstKey) { * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} */ private int computeHashCode(Object row, int buckNum) throws HiveException { + // CLUSTERED BY + SDPO: no partition keys, distribute only by computed bucket number so all + // rows in a logical bucket reach the same reducer (and Iceberg bucket file). + if (partitionEval.length == 0 && bucketEval != null && buckNum >= 0) { + return buckNum; + } // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index ae51d03f4e29..280e4e171061 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -256,29 +256,39 @@ public void process(Object row, int tag) throws HiveException { for (int logical = 0; logical< size; logical++) { final int batchIndex = (selectedInUse ? selected[logical] : logical); int hashCode; - if (isEmptyPartitions) { - if (isSingleReducer) { - // Empty partition, single reducer -> constant hashCode - hashCode = 0; - } else { - // Empty partition, multiple reducers -> random hashCode - hashCode = nonPartitionRandom.nextInt(); - } - } else { - // Compute hashCode from partitions - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - hashCode = partitionHashFunc.applyAsInt(partitionFieldValues); - } - - // Compute hashCode from buckets - if (!isEmptyBuckets) { + if (isEmptyPartitions && !isEmptyBuckets) { + // CLUSTERED BY: distribute only by bucket number (see ReduceSinkOperator.computeHashCode). bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( + hashCode = ObjectInspectorUtils.getBucketNumber( bucketHashFunc.applyAsInt(bucketFieldValues), numBuckets); if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); + evaluateBucketExpr(batch, batchIndex, hashCode); + } + } else { + if (isEmptyPartitions) { + if (isSingleReducer) { + // Empty partition, single reducer -> constant hashCode + hashCode = 0; + } else { + // Empty partition, multiple reducers -> random hashCode + hashCode = nonPartitionRandom.nextInt(); + } + } else { + // Compute hashCode from partitions + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = partitionHashFunc.applyAsInt(partitionFieldValues); + } + + // Compute hashCode from buckets + if (!isEmptyBuckets) { + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = ObjectInspectorUtils.getBucketNumber( + bucketHashFunc.applyAsInt(bucketFieldValues), numBuckets); + if (bucketExpr != null) { + evaluateBucketExpr(batch, batchIndex, bucketNum); + } + hashCode = hashCode * 31 + bucketNum; } - hashCode = hashCode * 31 + bucketNum; } postProcess(batch, batchIndex, tag, hashCode); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index f5431fa34934..48a3da28e84d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -85,6 +85,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.orc.OrcConf; import org.slf4j.Logger; @@ -105,6 +107,9 @@ */ public class SortedDynPartitionOptimizer extends Transform { + private static final String ICEBERG_STORAGE_HANDLER_CLASS = + "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"; + private static final Function, ExprNodeDesc> BUCKET_SORT_EXPRESSION = cols -> { try { return ExprNodeGenericFuncDesc.newInstance( @@ -225,7 +230,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // the reduce sink key. Since both key columns are not prefix subset // ReduceSinkDeDuplication will not merge them together resulting in 2 MR jobs. // To avoid that we will remove the RS (and EX) inserted by enforce bucketing/sorting. - if (!removeRSInsertedByEnforceBucketing(fsOp)) { + RemovedEnforceRs removedEnforceRs = removeRSInsertedByEnforceBucketing(fsOp); + if (!removedEnforceRs.success) { LOG.debug("Bailing out of sort dynamic partition optimization as some partition columns " + "got constant folded."); return null; @@ -284,9 +290,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Sort columns specified by table sortPositions = getSortPositions(destTable.getSortCols(), destTable.getCols()); sortOrder = getSortOrders(destTable.getSortCols(), destTable.getCols()); - } else if (HiveConf.getBoolVar(this.parseCtx.getConf(), HiveConf.ConfVars.HIVE_SORT_WHEN_BUCKETING) && + } else if (customSortExprs.isEmpty() && + HiveConf.getBoolVar(this.parseCtx.getConf(), HiveConf.ConfVars.HIVE_SORT_WHEN_BUCKETING) && !bucketPositions.isEmpty()) { - // We use clustered columns as sort columns + // We use clustered columns as sort columns (skip when custom sort e.g. z-order is present) sortPositions = new ArrayList<>(bucketPositions); sortOrder = sortPositions.stream().map(e -> 1).collect(Collectors.toList()); } else { @@ -318,6 +325,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, fsOp.getConf().setNumFiles(1); fsOp.getConf().setTotalFiles(1); + final boolean isIcebergHiveBucketedWrite = + isIcebergHiveStorageHandler(destTable) + && destTable.getNumBuckets() > 0 + && !destTable.getBucketCols().isEmpty() + && !customSortExprs.isEmpty(); + + final Integer reusedNumReducers = isIcebergHiveBucketedWrite && removedEnforceRs.removedRs != null + ? removedEnforceRs.removedRs.getConf().getNumReducers() + : null; + // Create ReduceSink operator ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, sortNullOrder, customPartitionExprs, customSortExprs, customSortOrder, customNullOrder, @@ -329,6 +346,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, fsParent.getChildOperators().remove(rsOp); fsParent.getChildOperators().add(fsOpIndex, rsOp); rsOp.getConf().setBucketingVersion(fsOp.getConf().getBucketingVersion()); + if (reusedNumReducers != null && reusedNumReducers > 0) { + rsOp.getConf().setNumReducers(reusedNumReducers); + } + + // Iceberg: route output files by Hive bucket id (not reducer task id). + if (isIcebergHiveBucketedWrite && reusedNumReducers != null && reusedNumReducers > 0) { + String tableName = fsOp.getConf().getTableInfo().getTableName(); + fsOp.getConf().getTableInfo().getProperties().put( + HiveCustomStorageHandlerUtils.ICEBERG_HIVE_BUCKETING_ROUTE_ENABLED + tableName, "true"); + } List descs = new ArrayList(allRSCols.size()); List colNames = new ArrayList(); @@ -464,7 +491,7 @@ private boolean allStaticPartitions(Operator op, List RS -> SEL -> FS to PARENT -> FS - private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { + private RemovedEnforceRs removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { Set reduceSinks = OperatorUtils.findOperatorsUpstream(fsOp, ReduceSinkOperator.class); @@ -491,6 +518,7 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { // iF RS is found remove it and its child (EX) and connect its parent // and grand child if (found) { + ReduceSinkOperator removed = (ReduceSinkOperator) rsToRemove; Operator rsParent = rsToRemove.getParentOperators().get(0); // RS is expected to have exactly ONE child @@ -504,7 +532,7 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { // converting partition column expression to constant expression. The constant // expression will then get pruned by column pruner since it will not reference to // any columns. - return false; + return RemovedEnforceRs.failed(); } // if child is select and contains expression which isn't column it shouldn't @@ -512,7 +540,7 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { // while introducing select for RS for(ExprNodeDesc expr: rsChildToRemove.getColumnExprMap().values()){ if(!(expr instanceof ExprNodeColumnDesc)){ - return false; + return RemovedEnforceRs.failed(); } } @@ -529,8 +557,35 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { } LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChildToRemove.getOperatorId() + " as it was introduced by enforce bucketing/sorting."); + return RemovedEnforceRs.success(removed); + } + return RemovedEnforceRs.success(null); + } + + private boolean isIcebergHiveStorageHandler(Table destTable) { + if (destTable == null || destTable.getParameters() == null) { + return false; + } + String handler = destTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); + return ICEBERG_STORAGE_HANDLER_CLASS.equals(handler); + } + + private static final class RemovedEnforceRs { + final boolean success; + final ReduceSinkOperator removedRs; + + private RemovedEnforceRs(boolean success, ReduceSinkOperator removedRs) { + this.success = success; + this.removedRs = removedRs; + } + + static RemovedEnforceRs success(ReduceSinkOperator removedRs) { + return new RemovedEnforceRs(true, removedRs); + } + + static RemovedEnforceRs failed() { + return new RemovedEnforceRs(false, null); } - return true; } private List getPartitionPositions(DynamicPartitionCtx dpCtx, RowSchema schema) { @@ -604,12 +659,10 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List ArrayList allCols, ArrayList bucketColumns, int numBuckets, Operator parent, AcidUtils.Operation writeType) { - // Order of KEY columns, if custom expressions are present: - // 0) Custom partition expressions (for distribution AND sorting) - // 1) Custom sort expressions (for sorting ONLY) - // 2) Partition columns - // 3) Bucket number column - // 4) Sort columns + // Order of KEY columns when custom expressions are present: + // Default: (0) custom partition, (1) custom sort (e.g. z-order), (2) partition/bucket/sort cols + // CLUSTERED BY + custom sort (e.g. z-order): bucket/partition keys must precede custom sort so each + // bucket file receives rows in local z-order (required when bucket routing splits reducer output). boolean customPartitionExprPresent = customPartitionExprs != null && !customPartitionExprs.isEmpty(); boolean customSortExprPresent = customSortExprs != null && !customSortExprs.isEmpty(); @@ -625,6 +678,9 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List numBuckets = -1; } + final boolean sortBucketKeysBeforeCustomSort = + bucketColumns != null && !bucketColumns.isEmpty() && customSortExprPresent; + keyColsPosInVal.addAll(partitionPositions); if (bucketColumns != null && !bucketColumns.isEmpty()) { keyColsPosInVal.add(-1); @@ -639,22 +695,38 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List } } - if (customExprPresent) { - int numPartitionExprs = customPartitionExprs != null ? customPartitionExprs.size() : 0; - for (int i = 0; i < numPartitionExprs; i++) { + if (customPartitionExprPresent) { + for (int i = 0; i < customPartitionExprs.size(); i++) { newSortOrder.add(order); } - - int numSortExprs = customSortExprs != null ? customSortExprs.size() : 0; - for (int i = 0; i < numSortExprs; i++) { - newSortOrder.add(customSortOrder != null && i < customSortOrder.size() ? - customSortOrder.get(i) : - order); - } } - for (Integer ignored : keyColsPosInVal) { - newSortOrder.add(order); + if (sortBucketKeysBeforeCustomSort) { + for (Integer ignored : keyColsPosInVal) { + newSortOrder.add(order); + } + if (customSortExprPresent) { + for (int i = 0; i < customSortExprs.size(); i++) { + newSortOrder.add(customSortOrder != null && i < customSortOrder.size() ? + customSortOrder.get(i) : + order); + } + } + } else if (customExprPresent) { + if (customSortExprPresent) { + for (int i = 0; i < customSortExprs.size(); i++) { + newSortOrder.add(customSortOrder != null && i < customSortOrder.size() ? + customSortOrder.get(i) : + order); + } + } + for (Integer ignored : keyColsPosInVal) { + newSortOrder.add(order); + } + } else { + for (Integer ignored : keyColsPosInVal) { + newSortOrder.add(order); + } } String orderStr = ""; @@ -675,15 +747,27 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List if (customPartitionExprPresent) { nullOrderStr.append(StringUtils.repeat(nullOrder, customPartitionExprs.size())); } - if (customSortExprPresent) { - for (int i = 0; i < customSortExprs.size() - customSortNullOrder.size(); i++) { - nullOrderStr.append(nullOrder); + if (sortBucketKeysBeforeCustomSort) { + nullOrderStr.append(StringUtils.repeat(nullOrder, keyColsPosInVal.size())); + if (customSortExprPresent) { + for (int i = 0; i < customSortExprs.size() - customSortNullOrder.size(); i++) { + nullOrderStr.append(nullOrder); + } + for (int i = 0; i < customSortNullOrder.size(); ++i) { + nullOrderStr.append(NullOrdering.fromCode(customSortNullOrder.get(i)).getSign()); + } } - for (int i = 0; i < customSortNullOrder.size(); ++i) { - nullOrderStr.append(NullOrdering.fromCode(customSortNullOrder.get(i)).getSign()); + } else { + if (customSortExprPresent) { + for (int i = 0; i < customSortExprs.size() - customSortNullOrder.size(); i++) { + nullOrderStr.append(nullOrder); + } + for (int i = 0; i < customSortNullOrder.size(); ++i) { + nullOrderStr.append(NullOrdering.fromCode(customSortNullOrder.get(i)).getSign()); + } } + nullOrderStr.append(StringUtils.repeat(nullOrder, keyColsPosInVal.size())); } - nullOrderStr.append(StringUtils.repeat(nullOrder, keyColsPosInVal.size())); Map colExprMap = Maps.newHashMap(); ArrayList partCols = Lists.newArrayList(); @@ -698,6 +782,18 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List } } + // we will clone here as RS will update bucket column key with its + // corresponding with bucket number and hence their OIs + if (sortBucketKeysBeforeCustomSort) { + for (Integer idx : keyColsPosInVal) { + if (idx == -1) { + keyCols.add(BUCKET_SORT_EXPRESSION.apply(allCols)); + } else { + keyCols.add(allCols.get(idx).clone()); + } + } + } + // Process custom sort expressions (e.g., Z-order). // These are used ONLY for sorting, NOT for distribution. if (customSortExprs != null) { @@ -707,13 +803,13 @@ public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List } } - // we will clone here as RS will update bucket column key with its - // corresponding with bucket number and hence their OIs - for (Integer idx : keyColsPosInVal) { - if (idx == -1) { - keyCols.add(BUCKET_SORT_EXPRESSION.apply(allCols)); - } else { - keyCols.add(allCols.get(idx).clone()); + if (!sortBucketKeysBeforeCustomSort) { + for (Integer idx : keyColsPosInVal) { + if (idx == -1) { + keyCols.add(BUCKET_SORT_EXPRESSION.apply(allCols)); + } else { + keyCols.add(allCols.get(idx).clone()); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7781ded7bfa7..b4c012544009 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -12154,7 +12154,12 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { } // Check if input can be pruned - ts.setInputPruning((sampleExprs.size() == 0 || colsEqual)); + boolean inputPruning = (sampleExprs.size() == 0 || colsEqual); + // Iceberg files are not at native Hive bucket paths; rely on the hash predicate only. + if (MetaStoreUtils.isIcebergTable(tab.getParameters())) { + inputPruning = false; + } + ts.setInputPruning(inputPruning); // check if input pruning is enough if ((sampleExprs.size() == 0 || colsEqual) @@ -12168,7 +12173,7 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { tab.getBucketingVersion()); FilterDesc filterDesc = new FilterDesc( samplePredicate, true, new SampleDesc(ts.getNumerator(), - ts.getDenominator(), tabBucketCols, true)); + ts.getDenominator(), tabBucketCols, inputPruning)); filterDesc.setGenerated(true); op = OperatorFactory.getAndMakeChild(filterDesc, new RowSchema(rwsch.getColumnInfos()), top); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java index 886dd3814a43..a8906d9564d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java @@ -35,6 +35,11 @@ public class HiveCustomStorageHandlerUtils { public static final String MERGE_TASK_ENABLED = "file.sink.merge.task.enabled."; + // Iceberg: gate Hive-native bucketing (CLUSTERED BY) file routing for a specific write target. + // Bucket count/columns/version are read from HMS table metadata at runtime. + public static final String ICEBERG_HIVE_BUCKETING_ROUTE_ENABLED = + "file.sink.iceberg.hive.bucketing.route.enabled."; + public static String getTablePropsForCustomStorageHandler(Map tableProperties) { StringBuilder properties = new StringBuilder(); for (Map.Entry serdeMap : tableProperties.entrySet()) { @@ -95,4 +100,16 @@ public static boolean isMergeTaskEnabled(UnaryOperator ops, String table String operation = ops.apply(MERGE_TASK_ENABLED + tableName); return Boolean.parseBoolean(operation); } + + public static void setIcebergHiveBucketingRouteEnabled(Configuration conf, String tableName, boolean enabled) { + if (conf == null || tableName == null) { + return; + } + conf.set(ICEBERG_HIVE_BUCKETING_ROUTE_ENABLED + tableName, Boolean.toString(enabled)); + } + + public static boolean getIcebergHiveBucketingRouteEnabled(UnaryOperator ops, String tableName) { + String value = ops.apply(ICEBERG_HIVE_BUCKETING_ROUTE_ENABLED + tableName); + return Boolean.parseBoolean(value); + } }