diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java new file mode 100644 index 000000000000..755e0cb9e934 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HexStringRowKeyProgress.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * {@link RowKeyProgress} implementation for hex-encoded row keys (e.g. MD5/SHA prefixes). Non-hex + * bytes contribute zero. + */ +@InterfaceAudience.Public +public class HexStringRowKeyProgress implements RowKeyProgress { + /** + * Cap on hex characters interpreted. A {@code double} mantissa carries ~53 bits (~13 hex chars); + * reading more adds no information and risks precision loss. + */ + private static final int MAX_PREFIX_LENGTH = 13; + + /** + * Hex characters past the start/stop divergence point to include for resolution. 4 hex chars = + * 65,536 buckets, finer than any progress bar can display. + */ + private static final int RESOLUTION_PADDING = 4; + + private int prefixLength; + private double start; + private double stop; + + @Override + public void setStartStopRows(byte[] startRow, byte[] stopRow) { + int common = commonPrefixLength(startRow, stopRow); + this.prefixLength = Math.min(common + RESOLUTION_PADDING, MAX_PREFIX_LENGTH); + this.start = hexPrefixToDouble(startRow); + this.stop = hexPrefixToDouble(stopRow); + } + + @Override + public float getProgress(byte[] currentRow) { + if (currentRow == null || stop <= start) { + return 0.0f; + } + double current = hexPrefixToDouble(currentRow); + float progress = (float) ((current - start) / (stop - start)); + return Math.min(1.0f, Math.max(0.0f, progress)); + } + + private static int commonPrefixLength(byte[] a, byte[] b) { + if (a == null || b == null) { + return 0; + } + return Bytes.findCommonPrefix(a, b, a.length, b.length, 0, 0); + } + + private double hexPrefixToDouble(byte[] row) { + if (row == null) { + return 0; + } + int len = Math.min(prefixLength, row.length); + double d = 0; + for (int i = 0; i < prefixLength; i++) { + d *= 16; + if (i < len) { + d += hexCharToInt(row[i]); + } + } + return d; + } + + private static int hexCharToInt(byte b) { + if (b >= '0' && b <= '9') { + return b - '0'; + } + if (b >= 'a' && b <= 'f') { + return 10 + (b - 'a'); + } + if (b >= 'A' && b <= 'F') { + return 10 + (b - 'A'); + } + return 0; + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java new file mode 100644 index 000000000000..5ec029fdba20 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowKeyProgress.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Estimates scan progress based on row key positions within a start/stop range. Custom + * implementations can be plugged in via {@link RowKeyProgress#PROGRESS_CLASS_KEY}. + */ +@InterfaceAudience.Public +public interface RowKeyProgress { + String PROGRESS_CLASS_KEY = "hbase.mapreduce.rowkey.progress.class"; + + /** + * Initialize the progress estimator with the start and stop row keys. + * @param startRow the start row of the scan (inclusive), may be null or empty + * @param stopRow the stop row of the scan (exclusive), may be null or empty + */ + void setStartStopRows(byte[] startRow, byte[] stopRow); + + /** + * Estimate progress as a fraction between 0.0 and 1.0 based on where {@code currentRow} falls in + * the range. + * @param currentRow the last successfully read row key, or null if no row has been read yet + * @return estimated progress between 0.0 and 1.0, or 0.0 if progress cannot be estimated + */ + float getProgress(byte[] currentRow); +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 7d0ffe02e6f0..678ccaab73c4 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.ConnectionConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -66,6 +68,7 @@ public class TableRecordReaderImpl { private int rowcount; private boolean logScannerActivity = false; private int logPerRowCount = 100; + private RowKeyProgress rowKeyProgress = null; /** * Restart from survivable exceptions by creating a new scanner. @@ -141,9 +144,67 @@ public void initialize(InputSplit inputsplit, TaskAttemptContext context) if (context != null) { this.context = context; } + initProgressBounds(); restart(scan.getStartRow()); } + /** + * Resolve the start/stop row keys used for progress estimation. The TableInputFormat splitter + * sets start and stop row keys from region boundaries, so they are only empty for the table's + * very first region (empty start) or last region (empty stop). In those cases, probe the table to + * discover the actual first or last row key as an approximation. + */ + private void initProgressBounds() { + if (context == null) { + return; + } + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + if (startRow == null || startRow.length == 0) { + startRow = probeFirstRow(); + } + if (stopRow == null || stopRow.length == 0) { + stopRow = probeLastRow(); + } + Configuration conf = context.getConfiguration(); + Class progressClass = conf.getClass(RowKeyProgress.PROGRESS_CLASS_KEY, + UniformRowKeyProgress.class, RowKeyProgress.class); + rowKeyProgress = ReflectionUtils.newInstance(progressClass, conf); + rowKeyProgress.setStartStopRows(startRow, stopRow); + } + + private byte[] probeFirstRow() { + try { + Scan probeScan = new Scan(scan); + probeScan.setOneRowLimit(); + try (ResultScanner probeScanner = htable.getScanner(probeScan)) { + Result result = probeScanner.next(); + return result != null ? result.getRow() : null; + } + } catch (IOException e) { + LOG.warn("Failed to probe first row for progress estimation", e); + return null; + } + } + + private byte[] probeLastRow() { + try { + Scan probeScan = new Scan(scan); + // Only called for the last region, so swap row bounds for the reversed scan. + probeScan.withStartRow(HConstants.EMPTY_START_ROW); + probeScan.withStopRow(scan.getStartRow(), scan.includeStartRow()); + probeScan.setReversed(true); + probeScan.setOneRowLimit(); + try (ResultScanner probeScanner = htable.getScanner(probeScan)) { + Result result = probeScanner.next(); + return result != null ? result.getRow() : null; + } + } catch (IOException e) { + LOG.warn("Failed to probe last row for progress estimation", e); + return null; + } + } + /** * Closes the split. */ @@ -318,8 +379,10 @@ protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRes * @return A number between 0.0 and 1.0, the fraction of the data read. */ public float getProgress() { - // Depends on the total number of tuples - return 0; + if (rowKeyProgress == null) { + return 0; + } + return rowKeyProgress.getProgress(lastSuccessfulRow); } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/UniformRowKeyProgress.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/UniformRowKeyProgress.java new file mode 100644 index 000000000000..80928cc63225 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/UniformRowKeyProgress.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * {@link RowKeyProgress} implementation that treats row keys as raw byte sequences. Converts the + * leading bytes to a big-endian unsigned numeric value and computes progress as a linear fraction + * of the key space. + */ +@InterfaceAudience.Public +public class UniformRowKeyProgress implements RowKeyProgress { + private static final int BYTES_FOR_PROGRESS = Double.BYTES; + + private double start; + private double stop; + + @Override + public void setStartStopRows(byte[] startRow, byte[] stopRow) { + this.start = rowKeyToDouble(startRow); + this.stop = rowKeyToDouble(stopRow); + } + + @Override + public float getProgress(byte[] currentRow) { + if (currentRow == null || stop <= start) { + return 0.0f; + } + double current = rowKeyToDouble(currentRow); + float progress = (float) ((current - start) / (stop - start)); + return Math.min(1.0f, Math.max(0.0f, progress)); + } + + /** + * Interpret the leading bytes of a row key as an unsigned big-endian value. Keys shorter than + * {@link #BYTES_FOR_PROGRESS} bytes are treated as if right-padded with zeros. + */ + private static double rowKeyToDouble(byte[] row) { + if (row == null) { + return 0; + } + double d = 0; + for (int i = 0; i < BYTES_FOR_PROGRESS; i++) { + d *= 256; // shift left by one byte (2^8) to build a big-endian base-256 number + if (i < row.length) { + d += (row[i] & 0xFF); + } + } + return d; + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java new file mode 100644 index 000000000000..f0dd27a3f90f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHexStringRowKeyProgress.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag(MapReduceTests.TAG) +@Tag(SmallTests.TAG) +public class TestHexStringRowKeyProgress { + private static RowKeyProgress create(byte[] start, byte[] stop) { + HexStringRowKeyProgress p = new HexStringRowKeyProgress(); + p.setStartStopRows(start, stop); + return p; + } + + @Test + public void testNullCurrentRow() { + assertEquals(0.0f, create(Bytes.toBytes("00"), Bytes.toBytes("ff")).getProgress(null)); + } + + @Test + public void testMidpoint() { + assertEquals(0.5f, + create(Bytes.toBytes("0000"), Bytes.toBytes("ffff")).getProgress(Bytes.toBytes("8000")), + 0.01f); + } + + @Test + public void testQuarterPoint() { + assertEquals(0.25f, + create(Bytes.toBytes("0000"), Bytes.toBytes("ffff")).getProgress(Bytes.toBytes("4000")), + 0.01f); + } + + @Test + public void testAcross9ToAGap() { + RowKeyProgress p = create(Bytes.toBytes("00"), Bytes.toBytes("ff")); + float at0f = p.getProgress(Bytes.toBytes("0f")); + float at10 = p.getProgress(Bytes.toBytes("10")); + assertEquals(1.0f / 255, at10 - at0f, 0.001f); + } + + @Test + public void testProgressNeverExceedsOne() { + assertEquals(1.0f, + create(Bytes.toBytes("0000"), Bytes.toBytes("8000")).getProgress(Bytes.toBytes("ffff"))); + } + + @Test + public void testProgressNeverBelowZero() { + assertEquals(0.0f, + create(Bytes.toBytes("8000"), Bytes.toBytes("ffff")).getProgress(Bytes.toBytes("0000"))); + } + + @Test + public void testNonHexSuffixIgnored() { + RowKeyProgress p = create(Bytes.toBytes("00"), Bytes.toBytes("ff")); + float progressA = p.getProgress(Bytes.toBytes("80:dataA")); + float progressB = p.getProgress(Bytes.toBytes("80:dataZ")); + assertEquals(progressA, progressB); + } + + @Test + public void testMonotonicWithMixedSuffix() { + RowKeyProgress p = create(Bytes.toBytes("00"), Bytes.toBytes("ff")); + float at0f = p.getProgress(Bytes.toBytes("0f:zzz")); + float at10 = p.getProgress(Bytes.toBytes("10:aaa")); + assertTrue(at10 > at0f); + } + + @Test + public void testUppercaseHexEquivalentToLowercase() { + RowKeyProgress lower = create(Bytes.toBytes("0000"), Bytes.toBytes("ffff")); + RowKeyProgress upper = create(Bytes.toBytes("0000"), Bytes.toBytes("FFFF")); + assertEquals(lower.getProgress(Bytes.toBytes("deadbeef")), + upper.getProgress(Bytes.toBytes("DEADBEEF")), 0.0001f); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestUniformRowKeyProgress.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestUniformRowKeyProgress.java new file mode 100644 index 000000000000..b50db0b29fcc --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestUniformRowKeyProgress.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag(MapReduceTests.TAG) +@Tag(SmallTests.TAG) +public class TestUniformRowKeyProgress { + private static RowKeyProgress create(byte[] start, byte[] stop) { + RowKeyProgress p = new UniformRowKeyProgress(); + p.setStartStopRows(start, stop); + return p; + } + + @Test + public void testNullCurrentRow() { + assertEquals(0.0f, create(Bytes.toBytes("a"), Bytes.toBytes("z")).getProgress(null)); + } + + @Test + public void testNullStopRow() { + assertEquals(0.0f, create(Bytes.toBytes("a"), null).getProgress(Bytes.toBytes("m"))); + } + + @Test + public void testEmptyStopRow() { + assertEquals(0.0f, create(Bytes.toBytes("a"), new byte[0]).getProgress(Bytes.toBytes("m"))); + } + + @Test + public void testMidpoint() { + assertEquals(0.5f, create(new byte[] { 0x00 }, new byte[] { (byte) 0xFF }) + .getProgress(new byte[] { (byte) 0x80 }), 0.01f); + } + + @Test + public void testAtStart() { + assertEquals(0.0f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("aaa")), 0.001f); + } + + @Test + public void testNearMid() { + assertEquals(0.48f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("mmm")), 0.01f); + } + + @Test + public void testNearEnd() { + assertEquals(1.0f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("zzy")), 0.01f); + } + + @Test + public void testEmptyStartRow() { + assertEquals(0.5f, + create(new byte[0], new byte[] { (byte) 0xFF }).getProgress(new byte[] { (byte) 0x80 }), + 0.01f); + } + + @Test + public void testNullStartRow() { + assertEquals(0.5f, + create(null, new byte[] { (byte) 0xFF }).getProgress(new byte[] { (byte) 0x80 }), 0.01f); + } + + @Test + public void testProgressNeverExceedsOne() { + assertEquals(1.0f, + create(Bytes.toBytes("aaa"), Bytes.toBytes("mmm")).getProgress(Bytes.toBytes("zzz"))); + } + + @Test + public void testProgressNeverBelowZero() { + assertEquals(0.0f, + create(Bytes.toBytes("mmm"), Bytes.toBytes("zzz")).getProgress(Bytes.toBytes("aaa"))); + } +}