From 468a39b291bddaac4189673c39a3d0048ea0102b Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Sat, 25 Apr 2026 00:11:43 +0900 Subject: [PATCH 1/5] HBASE-30115 Introduce approximate progress estimation for TableRecordReader based on row key position --- .../mapreduce/ByteBasedRowKeyProgress.java | 67 ++++++++++++ .../mapreduce/HexPrefixRowKeyProgress.java | 98 +++++++++++++++++ .../hbase/mapreduce/RowKeyProgress.java | 44 ++++++++ .../mapreduce/TableRecordReaderImpl.java | 60 +++++++++- .../TestByteBasedRowKeyProgress.java | 100 +++++++++++++++++ .../TestHexPrefixRowKeyProgress.java | 103 ++++++++++++++++++ 6 files changed, 470 insertions(+), 2 deletions(-) create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java create mode 100644 hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java create mode 100644 hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java new file mode 100644 index 000000000000..272a3910b46d --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * {@link RowKeyProgress} implementation that treats row keys as raw byte sequences. Converts the + * leading bytes to a big-endian unsigned numeric value and computes progress as a linear fraction + * of the key space. + */ +@InterfaceAudience.Public +public class ByteBasedRowKeyProgress implements RowKeyProgress { + private static final int BYTES_FOR_PROGRESS = Double.BYTES; + + private double start; + private double stop; + + @Override + public void setStartStopRows(byte[] startRow, byte[] stopRow) { + this.start = rowKeyToDouble(startRow); + this.stop = rowKeyToDouble(stopRow); + } + + @Override + public float getProgress(byte[] currentRow) { + if (currentRow == null || stop <= start) { + return 0.0f; + } + double current = rowKeyToDouble(currentRow); + float progress = (float) ((current - start) / (stop - start)); + return Math.min(1.0f, Math.max(0.0f, progress)); + } + + /** + * Interpret the leading bytes of a row key as an unsigned big-endian value. Keys shorter than + * {@link #BYTES_FOR_PROGRESS} bytes are treated as if right-padded with zeros. + */ + private static double rowKeyToDouble(byte[] row) { + if (row == null) { + return 0; + } + double d = 0; + for (int i = 0; i < BYTES_FOR_PROGRESS; i++) { + d *= 256; // shift left by one byte (2^8) to build a big-endian base-256 number + if (i < row.length) { + d += (row[i] & 0xFF); + } + } + return d; + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java new file mode 100644 index 000000000000..b85a6bddbd9d --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * {@link RowKeyProgress} implementation for tables whose row keys start with a hex-encoded prefix + * (e.g. MD5 hashes like {@code "a3f2b1..."}). Only the hex prefix is used for progress estimation; + * bytes beyond the prefix length are ignored. + *

+ * The prefix length is configurable via {@link #PREFIX_LENGTH_KEY} and defaults to + * {@link #DEFAULT_PREFIX_LENGTH}. + *

+ * Configure via: + * + *

+ * conf.setClass("hbase.mapreduce.rowkey.progress.class", HexPrefixRowKeyProgress.class,
+ *   RowKeyProgress.class);
+ * conf.setInt("hbase.mapreduce.rowkey.progress.hex.prefix.length", 8);
+ * 
+ */ +@InterfaceAudience.Public +public class HexPrefixRowKeyProgress extends Configured implements RowKeyProgress { + public static final String PREFIX_LENGTH_KEY = + "hbase.mapreduce.rowkey.progress.hex.prefix.length"; + public static final int DEFAULT_PREFIX_LENGTH = 4; + + private int prefixLength = DEFAULT_PREFIX_LENGTH; + private double start; + private double stop; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf != null) { + this.prefixLength = conf.getInt(PREFIX_LENGTH_KEY, DEFAULT_PREFIX_LENGTH); + } + } + + @Override + public void setStartStopRows(byte[] startRow, byte[] stopRow) { + this.start = hexPrefixToDouble(startRow); + this.stop = hexPrefixToDouble(stopRow); + } + + @Override + public float getProgress(byte[] currentRow) { + if (currentRow == null || stop <= start) { + return 0.0f; + } + double current = hexPrefixToDouble(currentRow); + float progress = (float) ((current - start) / (stop - start)); + return Math.min(1.0f, Math.max(0.0f, progress)); + } + + private double hexPrefixToDouble(byte[] row) { + if (row == null) { + return 0; + } + int len = Math.min(prefixLength, row.length); + double d = 0; + for (int i = 0; i < prefixLength; i++) { + d *= 16; + if (i < len) { + d += hexCharToInt(row[i]); + } + } + return d; + } + + private static int hexCharToInt(byte b) { + if (b >= '0' && b <= '9') { + return b - '0'; + } + if (b >= 'a' && b <= 'f') { + return 10 + (b - 'a'); + } + return 0; + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java new file mode 100644 index 000000000000..5ec029fdba20 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Estimates scan progress based on row key positions within a start/stop range. Custom + * implementations can be plugged in via {@link RowKeyProgress#PROGRESS_CLASS_KEY}. + */ +@InterfaceAudience.Public +public interface RowKeyProgress { + String PROGRESS_CLASS_KEY = "hbase.mapreduce.rowkey.progress.class"; + + /** + * Initialize the progress estimator with the start and stop row keys. + * @param startRow the start row of the scan (inclusive), may be null or empty + * @param stopRow the stop row of the scan (exclusive), may be null or empty + */ + void setStartStopRows(byte[] startRow, byte[] stopRow); + + /** + * Estimate progress as a fraction between 0.0 and 1.0 based on where {@code currentRow} falls in + * the range. + * @param currentRow the last successfully read row key, or null if no row has been read yet + * @return estimated progress between 0.0 and 1.0, or 0.0 if progress cannot be estimated + */ + float getProgress(byte[] currentRow); +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 7d0ffe02e6f0..1fbef4bb4a2a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -66,6 +67,7 @@ public class TableRecordReaderImpl { private int rowcount; private boolean logScannerActivity = false; private int logPerRowCount = 100; + private RowKeyProgress rowKeyProgress = null; /** * Restart from survivable exceptions by creating a new scanner. @@ -141,9 +143,61 @@ public void initialize(InputSplit inputsplit, TaskAttemptContext context) if (context != null) { this.context = context; } + initProgressBounds(); restart(scan.getStartRow()); } + /** + * Resolve the start/stop row keys used for progress estimation. The TableInputFormat splitter + * sets start and stop row keys from region boundaries, so they are only empty for the table's + * very first region (empty start) or last region (empty stop). In those cases, probe the table to + * discover the actual first or last row key as an approximation. + */ + private void initProgressBounds() { + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + if (startRow == null || startRow.length == 0) { + startRow = probeFirstRow(); + } + if (stopRow == null || stopRow.length == 0) { + stopRow = probeLastRow(); + } + Configuration conf = context.getConfiguration(); + Class progressClass = conf.getClass(RowKeyProgress.PROGRESS_CLASS_KEY, + ByteBasedRowKeyProgress.class, RowKeyProgress.class); + rowKeyProgress = ReflectionUtils.newInstance(progressClass, conf); + rowKeyProgress.setStartStopRows(startRow, stopRow); + } + + private byte[] probeFirstRow() { + try { + Scan probeScan = new Scan(scan); + probeScan.setOneRowLimit(); + try (ResultScanner probeScanner = htable.getScanner(probeScan)) { + Result result = probeScanner.next(); + return result != null ? result.getRow() : null; + } + } catch (IOException e) { + LOG.warn("Failed to probe first row for progress estimation", e); + return null; + } + } + + private byte[] probeLastRow() { + try { + Scan probeScan = new Scan(scan); + probeScan.setReversed(true); + probeScan.setOneRowLimit(); + try (ResultScanner probeScanner = htable.getScanner(probeScan)) { + Result result = probeScanner.next(); + return result != null ? result.getRow() : null; + } + } catch (IOException e) { + LOG.warn("Failed to probe last row for progress estimation", e); + return null; + } + } + /** * Closes the split. */ @@ -318,8 +372,10 @@ protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRes * @return A number between 0.0 and 1.0, the fraction of the data read. */ public float getProgress() { - // Depends on the total number of tuples - return 0; + if (rowKeyProgress == null) { + return 0; + } + return rowKeyProgress.getProgress(lastSuccessfulRow); } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java new file mode 100644 index 000000000000..054040bfa76e --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag(MapReduceTests.TAG) +@Tag(SmallTests.TAG) +public class TestByteBasedRowKeyProgress { + private static RowKeyProgress create(byte[] start, byte[] stop) { + RowKeyProgress p = new ByteBasedRowKeyProgress(); + p.setStartStopRows(start, stop); + return p; + } + + @Test + public void testNullCurrentRow() { + assertEquals(0.0f, create(Bytes.toBytes("a"), Bytes.toBytes("z")).getProgress(null)); + } + + @Test + public void testNullStopRow() { + assertEquals(0.0f, create(Bytes.toBytes("a"), null).getProgress(Bytes.toBytes("m"))); + } + + @Test + public void testEmptyStopRow() { + assertEquals(0.0f, create(Bytes.toBytes("a"), new byte[0]).getProgress(Bytes.toBytes("m"))); + } + + @Test + public void testMidpoint() { + assertEquals(0.5f, create(new byte[] { 0x00 }, new byte[] { (byte) 0xFF }) + .getProgress(new byte[] { (byte) 0x80 }), 0.01f); + } + + @Test + public void testAtStart() { + assertEquals(0.0f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("aaa")), 0.001f); + } + + @Test + public void testNearMid() { + assertEquals(0.48f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("mmm")), 0.01f); + } + + @Test + public void testNearEnd() { + assertEquals(1.0f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("zzy")), 0.01f); + } + + @Test + public void testEmptyStartRow() { + assertEquals(0.5f, + create(new byte[0], new byte[] { (byte) 0xFF }).getProgress(new byte[] { (byte) 0x80 }), + 0.01f); + } + + @Test + public void testNullStartRow() { + assertEquals(0.5f, + create(null, new byte[] { (byte) 0xFF }).getProgress(new byte[] { (byte) 0x80 }), 0.01f); + } + + @Test + public void testProgressNeverExceedsOne() { + assertEquals(1.0f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("mmm")).getProgress(Bytes.toBytes("zzz"))); + } + + @Test + public void testProgressNeverBelowZero() { + assertEquals(0.0f, + create(Bytes.toBytes("mmm"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("aaa"))); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java new file mode 100644 index 000000000000..9459a15d996a --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag(MapReduceTests.TAG) +@Tag(SmallTests.TAG) +public class TestHexPrefixRowKeyProgress { + private static RowKeyProgress create(byte[] start, byte[] stop) { + HexPrefixRowKeyProgress p = new HexPrefixRowKeyProgress(); + p.setStartStopRows(start, stop); + return p; + } + + private static RowKeyProgress createWithPrefixLength(byte[] start, byte[] stop, + int prefixLength) { + Configuration conf = new Configuration(false); + conf.setInt(HexPrefixRowKeyProgress.PREFIX_LENGTH_KEY, prefixLength); + HexPrefixRowKeyProgress p = new HexPrefixRowKeyProgress(); + p.setConf(conf); + p.setStartStopRows(start, stop); + return p; + } + + @Test + public void testNullCurrentRow() { + assertEquals(0.0f, create(Bytes.toBytes("00"), Bytes.toBytes("ff")).getProgress(null)); + } + + @Test + public void testMidpoint() { + assertEquals(0.5f, + create(Bytes.toBytes("0000"), Bytes.toBytes("ffff")).getProgress(Bytes.toBytes("8000")), + 0.01f); + } + + @Test + public void testQuarterPoint() { + assertEquals(0.25f, + create(Bytes.toBytes("0000"), Bytes.toBytes("ffff")).getProgress(Bytes.toBytes("4000")), + 0.01f); + } + + @Test + public void testAcross9ToAGap() { + RowKeyProgress p = createWithPrefixLength(Bytes.toBytes("00"), Bytes.toBytes("ff"), 2); + float at0f = p.getProgress(Bytes.toBytes("0f")); + float at10 = p.getProgress(Bytes.toBytes("10")); + assertEquals(1.0f / 255, at10 - at0f, 0.001f); + } + + @Test + public void testProgressNeverExceedsOne() { + assertEquals(1.0f, + create(Bytes.toBytes("0000"), Bytes.toBytes("8000")).getProgress(Bytes.toBytes("ffff"))); + } + + @Test + public void testProgressNeverBelowZero() { + assertEquals(0.0f, + create(Bytes.toBytes("8000"), Bytes.toBytes("ffff")).getProgress(Bytes.toBytes("0000"))); + } + + @Test + public void testNonHexSuffixIgnored() { + RowKeyProgress p = createWithPrefixLength(Bytes.toBytes("00"), Bytes.toBytes("ff"), 2); + float progressA = p.getProgress(Bytes.toBytes("80:dataA")); + float progressB = p.getProgress(Bytes.toBytes("80:dataZ")); + assertEquals(progressA, progressB); + } + + @Test + public void testMonotonicWithMixedSuffix() { + RowKeyProgress p = createWithPrefixLength(Bytes.toBytes("00"), Bytes.toBytes("ff"), 2); + float at0f = p.getProgress(Bytes.toBytes("0f:zzz")); + float at10 = p.getProgress(Bytes.toBytes("10:aaa")); + assertTrue(at10 > at0f); + } +} From 400c39b3c0100068be2f77fd2751ea8dc24fcc54 Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Mon, 4 May 2026 20:22:57 +0900 Subject: [PATCH 2/5] HBASE-30115 Align names with RegionSplitter and drop hex prefix length config --- ...ress.java => HexStringRowKeyProgress.java} | 54 +++++++++---------- .../mapreduce/TableRecordReaderImpl.java | 2 +- ...ogress.java => UniformRowKeyProgress.java} | 2 +- ....java => TestHexStringRowKeyProgress.java} | 21 ++------ ...ss.java => TestUniformRowKeyProgress.java} | 4 +- 5 files changed, 34 insertions(+), 49 deletions(-) rename hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/{HexPrefixRowKeyProgress.java => HexStringRowKeyProgress.java} (62%) rename hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/{ByteBasedRowKeyProgress.java => UniformRowKeyProgress.java} (97%) rename hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/{TestHexPrefixRowKeyProgress.java => TestHexStringRowKeyProgress.java} (78%) rename hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/{TestByteBasedRowKeyProgress.java => TestUniformRowKeyProgress.java} (96%) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java similarity index 62% rename from hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java rename to hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java index b85a6bddbd9d..85edf368e454 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexPrefixRowKeyProgress.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java @@ -17,46 +17,35 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; /** - * {@link RowKeyProgress} implementation for tables whose row keys start with a hex-encoded prefix - * (e.g. MD5 hashes like {@code "a3f2b1..."}). Only the hex prefix is used for progress estimation; - * bytes beyond the prefix length are ignored. - *

- * The prefix length is configurable via {@link #PREFIX_LENGTH_KEY} and defaults to - * {@link #DEFAULT_PREFIX_LENGTH}. - *

- * Configure via: - * - *

- * conf.setClass("hbase.mapreduce.rowkey.progress.class", HexPrefixRowKeyProgress.class,
- *   RowKeyProgress.class);
- * conf.setInt("hbase.mapreduce.rowkey.progress.hex.prefix.length", 8);
- * 
+ * {@link RowKeyProgress} implementation for hex-encoded row keys (e.g. MD5/SHA prefixes). Non-hex + * bytes contribute zero. */ @InterfaceAudience.Public -public class HexPrefixRowKeyProgress extends Configured implements RowKeyProgress { - public static final String PREFIX_LENGTH_KEY = - "hbase.mapreduce.rowkey.progress.hex.prefix.length"; - public static final int DEFAULT_PREFIX_LENGTH = 4; +public class HexStringRowKeyProgress implements RowKeyProgress { + /** + * Cap on hex characters interpreted. A {@code double} mantissa carries ~53 bits (~13 hex chars); + * reading more adds no information and risks precision loss. + */ + private static final int MAX_PREFIX_LENGTH = 13; + + /** + * Hex characters past the start/stop divergence point to include for resolution. 4 hex chars = 65 + * 536 buckets, finer than any progress bar can display. + */ + private static final int RESOLUTION_PADDING = 4; - private int prefixLength = DEFAULT_PREFIX_LENGTH; + private int prefixLength; private double start; private double stop; - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - if (conf != null) { - this.prefixLength = conf.getInt(PREFIX_LENGTH_KEY, DEFAULT_PREFIX_LENGTH); - } - } - @Override public void setStartStopRows(byte[] startRow, byte[] stopRow) { + int common = commonPrefixLength(startRow, stopRow); + this.prefixLength = Math.min(common + RESOLUTION_PADDING, MAX_PREFIX_LENGTH); this.start = hexPrefixToDouble(startRow); this.stop = hexPrefixToDouble(stopRow); } @@ -71,6 +60,13 @@ public float getProgress(byte[] currentRow) { return Math.min(1.0f, Math.max(0.0f, progress)); } + private static int commonPrefixLength(byte[] a, byte[] b) { + if (a == null || b == null) { + return 0; + } + return Bytes.findCommonPrefix(a, b, a.length, b.length, 0, 0); + } + private double hexPrefixToDouble(byte[] row) { if (row == null) { return 0; diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 1fbef4bb4a2a..337edef848b5 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -164,7 +164,7 @@ private void initProgressBounds() { } Configuration conf = context.getConfiguration(); Class progressClass = conf.getClass(RowKeyProgress.PROGRESS_CLASS_KEY, - ByteBasedRowKeyProgress.class, RowKeyProgress.class); + UniformRowKeyProgress.class, RowKeyProgress.class); rowKeyProgress = ReflectionUtils.newInstance(progressClass, conf); rowKeyProgress.setStartStopRows(startRow, stopRow); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/UniformRowKeyProgress.java similarity index 97% rename from hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java rename to hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/UniformRowKeyProgress.java index 272a3910b46d..80928cc63225 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ByteBasedRowKeyProgress.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/UniformRowKeyProgress.java @@ -25,7 +25,7 @@ * of the key space. */ @InterfaceAudience.Public -public class ByteBasedRowKeyProgress implements RowKeyProgress { +public class UniformRowKeyProgress implements RowKeyProgress { private static final int BYTES_FOR_PROGRESS = Double.BYTES; private double start; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java similarity index 78% rename from hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java rename to hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java index 9459a15d996a..2ab3ec2923e7 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexPrefixRowKeyProgress.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java @@ -20,7 +20,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -29,19 +28,9 @@ @Tag(MapReduceTests.TAG) @Tag(SmallTests.TAG) -public class TestHexPrefixRowKeyProgress { +public class TestHexStringRowKeyProgress { private static RowKeyProgress create(byte[] start, byte[] stop) { - HexPrefixRowKeyProgress p = new HexPrefixRowKeyProgress(); - p.setStartStopRows(start, stop); - return p; - } - - private static RowKeyProgress createWithPrefixLength(byte[] start, byte[] stop, - int prefixLength) { - Configuration conf = new Configuration(false); - conf.setInt(HexPrefixRowKeyProgress.PREFIX_LENGTH_KEY, prefixLength); - HexPrefixRowKeyProgress p = new HexPrefixRowKeyProgress(); - p.setConf(conf); + HexStringRowKeyProgress p = new HexStringRowKeyProgress(); p.setStartStopRows(start, stop); return p; } @@ -67,7 +56,7 @@ public void testQuarterPoint() { @Test public void testAcross9ToAGap() { - RowKeyProgress p = createWithPrefixLength(Bytes.toBytes("00"), Bytes.toBytes("ff"), 2); + RowKeyProgress p = create(Bytes.toBytes("00"), Bytes.toBytes("ff")); float at0f = p.getProgress(Bytes.toBytes("0f")); float at10 = p.getProgress(Bytes.toBytes("10")); assertEquals(1.0f / 255, at10 - at0f, 0.001f); @@ -87,7 +76,7 @@ public void testProgressNeverBelowZero() { @Test public void testNonHexSuffixIgnored() { - RowKeyProgress p = createWithPrefixLength(Bytes.toBytes("00"), Bytes.toBytes("ff"), 2); + RowKeyProgress p = create(Bytes.toBytes("00"), Bytes.toBytes("ff")); float progressA = p.getProgress(Bytes.toBytes("80:dataA")); float progressB = p.getProgress(Bytes.toBytes("80:dataZ")); assertEquals(progressA, progressB); @@ -95,7 +84,7 @@ public void testNonHexSuffixIgnored() { @Test public void testMonotonicWithMixedSuffix() { - RowKeyProgress p = createWithPrefixLength(Bytes.toBytes("00"), Bytes.toBytes("ff"), 2); + RowKeyProgress p = create(Bytes.toBytes("00"), Bytes.toBytes("ff")); float at0f = p.getProgress(Bytes.toBytes("0f:zzz")); float at10 = p.getProgress(Bytes.toBytes("10:aaa")); assertTrue(at10 > at0f); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestUniformRowKeyProgress.java similarity index 96% rename from hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java rename to hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestUniformRowKeyProgress.java index 054040bfa76e..b50db0b29fcc 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestByteBasedRowKeyProgress.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestUniformRowKeyProgress.java @@ -27,9 +27,9 @@ @Tag(MapReduceTests.TAG) @Tag(SmallTests.TAG) -public class TestByteBasedRowKeyProgress { +public class TestUniformRowKeyProgress { private static RowKeyProgress create(byte[] start, byte[] stop) { - RowKeyProgress p = new ByteBasedRowKeyProgress(); + RowKeyProgress p = new UniformRowKeyProgress(); p.setStartStopRows(start, stop); return p; } From 22c6baa24d43233ad3d83e110091ab3ef457eccc Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Wed, 6 May 2026 12:29:15 +0900 Subject: [PATCH 3/5] HBASE-30115 Fix failing tests --- .../apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 337edef848b5..32ef7e6f81c2 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -154,6 +154,9 @@ public void initialize(InputSplit inputsplit, TaskAttemptContext context) * discover the actual first or last row key as an approximation. */ private void initProgressBounds() { + if (context == null) { + return; + } byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); if (startRow == null || startRow.length == 0) { From efb99ab4944ed005d3aba5099ee6fbe686deb983 Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Wed, 6 May 2026 20:39:33 +0900 Subject: [PATCH 4/5] HBASE-30115 Consider reverse scan start row --- .../apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 32ef7e6f81c2..678ccaab73c4 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.ConnectionConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -189,6 +190,9 @@ private byte[] probeFirstRow() { private byte[] probeLastRow() { try { Scan probeScan = new Scan(scan); + // Only called for the last region, so swap row bounds for the reversed scan. + probeScan.withStartRow(HConstants.EMPTY_START_ROW); + probeScan.withStopRow(scan.getStartRow(), scan.includeStartRow()); probeScan.setReversed(true); probeScan.setOneRowLimit(); try (ResultScanner probeScanner = htable.getScanner(probeScan)) { From 00523d1f7eb1be124d14e048c1d2f4cce00b49cc Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Wed, 6 May 2026 20:41:58 +0900 Subject: [PATCH 5/5] HBASE-30115 Handle capital hex alphabets --- .../hadoop/hbase/mapreduce/HexStringRowKeyProgress.java | 7 +++++-- .../hbase/mapreduce/TestHexStringRowKeyProgress.java | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java index 85edf368e454..755e0cb9e934 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java @@ -33,8 +33,8 @@ public class HexStringRowKeyProgress implements RowKeyProgress { private static final int MAX_PREFIX_LENGTH = 13; /** - * Hex characters past the start/stop divergence point to include for resolution. 4 hex chars = 65 - * 536 buckets, finer than any progress bar can display. + * Hex characters past the start/stop divergence point to include for resolution. 4 hex chars = + * 65,536 buckets, finer than any progress bar can display. */ private static final int RESOLUTION_PADDING = 4; @@ -89,6 +89,9 @@ private static int hexCharToInt(byte b) { if (b >= 'a' && b <= 'f') { return 10 + (b - 'a'); } + if (b >= 'A' && b <= 'F') { + return 10 + (b - 'A'); + } return 0; } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java index 2ab3ec2923e7..f0dd27a3f90f 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java @@ -89,4 +89,12 @@ public void testMonotonicWithMixedSuffix() { float at10 = p.getProgress(Bytes.toBytes("10:aaa")); assertTrue(at10 > at0f); } + + @Test + public void testUppercaseHexEquivalentToLowercase() { + RowKeyProgress lower = create(Bytes.toBytes("0000"), Bytes.toBytes("ffff")); + RowKeyProgress upper = create(Bytes.toBytes("0000"), Bytes.toBytes("FFFF")); + assertEquals(lower.getProgress(Bytes.toBytes("deadbeef")), + upper.getProgress(Bytes.toBytes("DEADBEEF")), 0.0001f); + } }