diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index c71a42aea5d1..6b020ffe1d22 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; @@ -129,8 +130,8 @@ TableSnapshotInputFormatImpl.InputSplit getDelegate() { } } - @InterfaceAudience.Private - static class TableSnapshotRegionRecordReader + @InterfaceAudience.Public + public static class TableSnapshotRegionRecordReader extends RecordReader { private TableSnapshotInputFormatImpl.RecordReader delegate = new TableSnapshotInputFormatImpl.RecordReader(); @@ -170,6 +171,14 @@ public float getProgress() throws IOException, InterruptedException { return delegate.getProgress(); } + /** + * Returns the set of store file paths that were successfully read by the underlying region + * scanner. Typically populated after the reader (or scanner) is closed. + */ + public Set getFilesRead() { + return delegate.getScanner().getFilesRead(); + } + @Override public void close() throws IOException { delegate.close(); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 501209f1c902..633a9b25cdd2 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -562,7 +562,7 @@ public static List getBestLocations(Configuration conf, return getBestLocations(conf, blockDistribution, 3); } - private static String getSnapshotName(Configuration conf) { + public static String getSnapshotName(Configuration conf) { String snapshotName = conf.get(SNAPSHOT_NAME_KEY); if (snapshotName == null) { throw new IllegalArgumentException("Snapshot name must be provided"); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index c24f8e62c816..2242cabe3a86 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -29,8 +29,12 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -44,7 +48,9 @@ import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionRecordReader; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -142,6 +148,97 @@ public void testGetBestLocations() throws IOException { TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); } + @Test + public void testTableSnapshotRegionRecordReaderGetFilesRead() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + String snapshotName = name.getMethodName() + "_snapshot"; + try { + // Setup: create table, load data, snapshot, and configure job with restore dir + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); + + Configuration conf = UTIL.getConfiguration(); + Job job = new Job(conf); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + tmpTableDir); + + // Get splits (one per region) and extract delegate split for restore path and region info + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + List splits = tsif.getSplits(job); + Assert.assertEquals(1, splits.size()); + + InputSplit split = splits.get(0); + Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; + TableSnapshotInputFormatImpl.InputSplit implSplit = snapshotRegionSplit.getDelegate(); + + // Collect expected store file paths from the restored region directory + Set expectedFiles = new HashSet<>(); + Path restorePath = new Path(implSplit.getRestoreDir()); + FileSystem fs = restorePath.getFileSystem(conf); + Path tableDir = + CommonFSUtils.getTableDir(restorePath, implSplit.getTableDescriptor().getTableName()); + Path regionPath = new Path(tableDir, implSplit.getRegionInfo().getEncodedName()); + FileStatus[] familyDirs = fs.listStatus(regionPath); + if (familyDirs != null) { + for (FileStatus fam : familyDirs) { + if (fam.isDirectory()) { + FileStatus[] files = fs.listStatus(fam.getPath()); + if (files != null) { + for (FileStatus f : files) { + if (f.isFile()) { + String referenceFileName = f.getPath().getName(); + expectedFiles.add(HFileLink.getReferencedHFileName(referenceFileName)); + } + } + } + } + } + } + Assert.assertFalse("Should have at least one store file after snapshot restore", + expectedFiles.isEmpty()); + + // Create record reader, initialize with split (opens underlying ClientSideRegionScanner) + TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); + when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); + + RecordReader rr = + tsif.createRecordReader(split, taskAttemptContext); + Assert.assertTrue(rr instanceof TableSnapshotRegionRecordReader); + TableSnapshotRegionRecordReader recordReader = (TableSnapshotRegionRecordReader) rr; + recordReader.initialize(split, taskAttemptContext); + + // Before close: getFilesRead() must be empty + Set filesReadBeforeClose = recordReader.getFilesRead(); + Assert.assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty()); + + // Read a few key-values; getFilesRead() must still be empty until close + int count = 0; + while (count < 3 && recordReader.nextKeyValue()) { + count++; + } + + filesReadBeforeClose = recordReader.getFilesRead(); + Assert.assertTrue("Should return empty set before closing even after reading", + filesReadBeforeClose.isEmpty()); + + // Close reader so underlying scanner reports files successfully read + recordReader.close(); + + // After close: getFilesRead() must match expected store file set + Set filesReadAfterClose = recordReader.getFilesRead().stream() + .map(Path::getName).collect(Collectors.toSet()); + + Assert.assertEquals("Should contain all expected file paths", expectedFiles, + filesReadAfterClose); + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + } + } + public static enum TestTableSnapshotCounters { VALIDATION_ERROR } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index eb7c77554b02..3c43bea19d8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -19,7 +19,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -53,6 +56,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner { RegionScanner scanner; List values; boolean hasMore = true; + private final Set filesRead; public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { @@ -85,6 +89,7 @@ public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, // create an internal region scanner this.scanner = region.getScanner(scan); + this.filesRead = new HashSet<>(); values = new ArrayList<>(); if (scanMetrics == null) { @@ -129,6 +134,7 @@ public void close() { if (this.scanner != null) { try { this.scanner.close(); + this.filesRead.addAll(this.scanner.getFilesRead()); this.scanner = null; } catch (IOException ex) { LOG.warn("Exception while closing scanner", ex); @@ -162,6 +168,14 @@ HRegion getRegion() { return region; } + /** + * Returns the set of store file paths that were successfully read by the underlying region + * scanner. Populated when this scanner is closed. + */ + public Set getFilesRead() { + return Collections.unmodifiableSet(this.filesRead); + } + @Override public boolean renewLease() { throw new UnsupportedOperationException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java index fe66535ee55e..116008b7a6b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCell.java @@ -19,6 +19,9 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; @@ -63,6 +66,14 @@ public ExtendedCell getCell() { return cell; } + /** + * Returns the set of file paths successfully read by the underlying MOB store file scanner. + * Should be called after {@link #close()} to get the path of the MOB file that was read. + */ + public Set getFilesRead() { + return sfScanner != null ? sfScanner.getFilesRead() : Collections.emptySet(); + } + @Override public void close() throws IOException { if (this.sfScanner != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 6fd030c13c25..057b7cb1201f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -19,10 +19,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; +import java.util.Set; import java.util.function.IntConsumer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.ExtendedCell; @@ -66,6 +70,8 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner protected KVScannerComparator comparator; + private final Set filesRead = new HashSet<>(); + /** * Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners. */ @@ -216,19 +222,31 @@ public CellComparator getComparator() { public void close() { for (KeyValueScanner scanner : this.scannersForDelayedClose) { scanner.close(); + filesRead.addAll(scanner.getFilesRead()); } this.scannersForDelayedClose.clear(); if (this.current != null) { this.current.close(); + filesRead.addAll(this.current.getFilesRead()); } if (this.heap != null) { // Order of closing the scanners shouldn't matter here, so simply iterate and close them. for (KeyValueScanner scanner : heap) { scanner.close(); + filesRead.addAll(scanner.getFilesRead()); } } } + /** + * Returns the set of store file paths successfully read by the scanners in this heap. Populated + * as each scanner is closed (e.g. in close() or shipped()). + */ + @Override + public Set getFilesRead() { + return Collections.unmodifiableSet(filesRead); + } + /** * Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end * up skipping values that were never reached yet. Rather than iterating down, we want to give the @@ -419,6 +437,7 @@ public ExtendedCell getNextIndexedKey() { public void shipped() throws IOException { for (KeyValueScanner scanner : this.scannersForDelayedClose) { scanner.close(); // There wont be further fetch of Cells from these scanners. Just close. + filesRead.addAll(scanner.getFilesRead()); } this.scannersForDelayedClose.clear(); if (this.current != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index bfe47772f1aa..564a374498a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Set; import java.util.function.IntConsumer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ExtendedCell; @@ -139,6 +140,12 @@ default long getScannerOrder() { */ Path getFilePath(); + /** + * Returns the set of store file paths that were successfully read by this scanner. Typically + * populated only after the scanner is closed. + */ + Set getFilesRead(); + // Support for "Reversed Scanner" /** * Seek the scanner at or before the row of specified Cell, it firstly tries to seek the scanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java index 0d6cfb2b2112..48abf0660dee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@ -19,8 +19,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.NavigableSet; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mob.MobCell; @@ -43,6 +47,7 @@ public class MobStoreScanner extends StoreScanner { private boolean readEmptyValueOnMobCellMiss = false; private final HMobStore mobStore; private final List referencedMobCells; + private final Set mobFilesRead = new HashSet<>(); public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, final NavigableSet columns, long readPt) throws IOException { @@ -94,10 +99,23 @@ public boolean next(List outResult, ScannerContext ctx) th private void freeAllReferencedMobCells() throws IOException { for (MobCell cell : referencedMobCells) { cell.close(); + mobFilesRead.addAll(cell.getFilesRead()); } referencedMobCells.clear(); } + /** + * Returns the set of store file paths and MOB file paths successfully read by this scanner. + * Combines paths from the underlying store scanner with paths from resolved MOB cells (populated + * when referenced mob cells are closed, e.g. in close() or shipped()). + */ + @Override + public Set getFilesRead() { + Set allFiles = new HashSet<>(super.getFilesRead()); + allFiles.addAll(mobFilesRead); + return Collections.unmodifiableSet(allFiles); + } + @Override public void shipped() throws IOException { super.shipped(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index f55bbcc639de..79e5cf91e336 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collections; +import java.util.Set; import java.util.function.IntConsumer; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.fs.Path; @@ -76,6 +78,15 @@ public Path getFilePath() { return null; } + /** + * Returns the set of store file paths successfully read by this scanner. Default implementation + * returns an empty set for non-file scanners (e.g. memstore). + */ + @Override + public Set getFilesRead() { + return Collections.emptySet(); + } + @Override public ExtendedCell getNextIndexedKey() { return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index aed08ebd84d1..a9693585c3e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; @@ -111,4 +113,10 @@ default String getOperationId() { */ boolean nextRaw(List result, ScannerContext scannerContext) throws IOException; + + /** + * Returns the set of store file paths that were successfully read by this scanner. Typically + * populated only after the scanner is closed. + */ + Set getFilesRead(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index c69dc6e2df6a..05787dc73f38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -21,11 +21,15 @@ import java.util.AbstractList; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; @@ -95,6 +99,8 @@ public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { private RegionServerServices rsServices; + private final Set filesRead = new HashSet<>(); + @Override public RegionInfo getRegionInfo() { return region.getRegionInfo(); @@ -760,10 +766,12 @@ protected boolean shouldStop(Cell currentRowCell) { private void closeInternal() { if (storeHeap != null) { storeHeap.close(); + filesRead.addAll(storeHeap.getFilesRead()); storeHeap = null; } if (joinedHeap != null) { joinedHeap.close(); + filesRead.addAll(joinedHeap.getFilesRead()); joinedHeap = null; } // no need to synchronize here. @@ -776,6 +784,15 @@ public synchronized void close() { TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); } + /** + * Returns the set of store file paths that were successfully read by this scanner. Populated at + * close from the underlying store heap and joined heap (if any). + */ + @Override + public Set getFilesRead() { + return Collections.unmodifiableSet(filesRead); + } + @Override public synchronized boolean reseek(byte[] row) throws IOException { return TraceUtil.trace(() -> { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java index 398b716fda69..054cdeecc65f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java @@ -19,8 +19,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.NavigableSet; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mob.MobCell; @@ -42,6 +46,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner { private boolean readEmptyValueOnMobCellMiss = false; private final HMobStore mobStore; private final List referencedMobCells; + private final Set mobFilesRead = new HashSet<>(); ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, long readPt) throws IOException { @@ -92,10 +97,23 @@ public boolean next(List outResult, ScannerContext ctx) th private void freeAllReferencedMobCells() throws IOException { for (MobCell mobCell : referencedMobCells) { mobCell.close(); + mobFilesRead.addAll(mobCell.getFilesRead()); } referencedMobCells.clear(); } + /** + * Returns the set of store file paths that were successfully read by this scanner. Includes paths + * from the underlying store scanner and from resolved MOB cell references; typically populated as + * scanners and referenced MOB cells are closed. + */ + @Override + public Set getFilesRead() { + Set allFiles = new HashSet<>(super.getFilesRead()); + allFiles.addAll(mobFilesRead); + return Collections.unmodifiableSet(allFiles); + } + @Override public void shipped() throws IOException { super.shipped(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 6a053b52669e..1bb1a619fde5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; +import java.util.Set; import java.util.SortedSet; import java.util.function.IntConsumer; import org.apache.commons.lang3.NotImplementedException; @@ -302,6 +304,15 @@ public Path getFilePath() { return null; } + /** + * Returns the set of store file paths that were successfully read by this scanner. This + * implementation always returns an empty set (segment scanners do not track file paths). + */ + @Override + public Set getFilesRead() { + return Collections.emptySet(); + } + /** * @return the next key in the index (the key to seek to the next block) if known, or null * otherwise Not relevant for in-memory scanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 6ce1a3236c47..b5c6a3403d79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; import java.util.concurrent.atomic.LongAdder; import java.util.function.IntConsumer; import org.apache.hadoop.fs.Path; @@ -83,6 +84,9 @@ public class StoreFileScanner implements KeyValueScanner { // Higher values means scanner has newer data. private final long scannerOrder; + // The single file path when this scanner is closed (successfully read). + private Path fileRead; + /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param useMVCC If true, scanner will filter out updates with MVCC larger @@ -313,11 +317,23 @@ public void close() { cur = null; this.hfs.close(); if (this.reader != null) { + this.fileRead = this.reader.getHFileReader().getPath(); this.reader.readCompleted(); } closed = true; } + /** + * Returns the set of store file paths that were successfully read by this scanner. Contains the + * single store file path if this scanner successfully read it; typically set at close. + */ + @Override + public Set getFilesRead() { + return fileRead != null + ? Collections.singleton(fileRead) + : Collections.emptySet(); + } + /** Returns false if not found or if k is after the end. */ public static boolean seekAtOrAfter(HFileScanner s, ExtendedCell k) throws IOException { int result = s.seekTo(k); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 634b6fffcf53..2e560e6f93d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -20,13 +20,17 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.IntConsumer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; @@ -108,6 +112,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // updates and the data will be corrupt. private final List scannersForDelayedClose = new ArrayList<>(); + // Tracks file paths successfully read (scanners closed) by this store scanner. + private final Set filesRead = new HashSet<>(); + /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via * seeking to next row/column. TODO: estimate them? @@ -271,6 +278,7 @@ public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet selectScannersFrom(HStore store, memOnly = false; filesOnly = false; } - List scanners = new ArrayList<>(allScanners.size()); // We can only exclude store files based on TTL if minVersions is set to 0. @@ -479,6 +486,7 @@ protected List selectScannersFrom(HStore store, boolean isFile = kvs.isFileScanner(); if ((!isFile && filesOnly) || (isFile && memOnly)) { kvs.close(); + filesRead.addAll(kvs.getFilesRead()); continue; } @@ -486,6 +494,7 @@ protected List selectScannersFrom(HStore store, scanners.add(kvs); } else { kvs.close(); + filesRead.addAll(kvs.getFilesRead()); } } return scanners; @@ -528,6 +537,7 @@ private void close(boolean withDelayedScannersClose) { clearAndClose(flushedstoreFileScanners); if (this.heap != null) { this.heap.close(); + this.filesRead.addAll(this.heap.getFilesRead()); this.currentScanners.clear(); this.heap = null; // CLOSED! } @@ -976,12 +986,19 @@ public long getReadPoint() { return this.readPt; } - private static void clearAndClose(List scanners) { + private void clearAndClose(List scanners) { + clearAndClose(scanners, true); + } + + private void clearAndClose(List scanners, boolean trackFiles) { if (scanners == null) { return; } for (KeyValueScanner s : scanners) { s.close(); + if (trackFiles) { + this.filesRead.addAll(s.getFilesRead()); + } } scanners.clear(); } @@ -1188,7 +1205,10 @@ void trySwitchToStreamRead() { addCurrentScanners(newCurrentScanners); this.heap = newHeap; resetQueryMatcher(lastTop); - scannersToClose.forEach(KeyValueScanner::close); + for (KeyValueScanner scanner : scannersToClose) { + scanner.close(); + this.filesRead.addAll(scanner.getFilesRead()); + } if (hasSwitchedToStreamRead != null) { hasSwitchedToStreamRead.set(true); } @@ -1270,6 +1290,15 @@ public long getEstimatedNumberOfKvsScanned() { return this.kvsScanned; } + /** + * Returns the set of store file paths that were successfully read by this scanner. Populated at + * close from the key-value heap and any closed child scanners. + */ + @Override + public Set getFilesRead() { + return Collections.unmodifiableSet(filesRead); + } + @Override public ExtendedCell getNextIndexedKey() { return this.heap.getNextIndexedKey(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index 41996f904c18..1f9bbd3aa7af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -26,12 +26,14 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -1527,6 +1529,11 @@ public boolean nextRaw(List result, ScannerContext context return nextRaw; } + @Override + public Set getFilesRead() { + return delegate.getFilesRead(); + } + @Override public void close() throws IOException { delegate.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java index 253e61f995cf..375406e2d05a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java @@ -30,7 +30,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -54,8 +58,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; @Category({ SmallTests.class, ClientTests.class }) public class TestClientSideRegionScanner { @@ -74,6 +80,9 @@ public class TestClientSideRegionScanner { private RegionInfo hri; private Scan scan; + @Rule + public TestName name = new TestName(); + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); @@ -264,6 +273,75 @@ public void testScanMetricsByRegionWithScanMetricsAsInput() throws IOException { testScanMetricByRegion(new ScanMetrics()); } + @Test + public void testGetFilesRead() throws Exception { + // Create a table and add some data + TableName tableName = TableName.valueOf(name.getMethodName()); + try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAME })) { + TableDescriptor tableHtd = TEST_UTIL.getAdmin().getDescriptor(tableName); + RegionInfo tableHri = TEST_UTIL.getAdmin().getRegions(tableName).get(0); + + // Add some data + for (int i = 0; i < 5; i++) { + byte[] row = Bytes.toBytes(i); + Put put = new Put(row); + put.addColumn(FAM_NAME, row, row); + table.put(put); + } + + // Flush contents to disk so we can scan the fs + TEST_UTIL.getAdmin().flush(tableName); + + // Create ClientSideRegionScanner with the correct table descriptor and region info + Configuration copyConf = new Configuration(conf); + Scan tableScan = new Scan(); + ClientSideRegionScanner clientSideRegionScanner = + new ClientSideRegionScanner(copyConf, fs, rootDir, tableHtd, tableHri, tableScan, null); + + // Get expected file paths from the region before closing + // (after closing, the region will be closed too) + Set expectedFilePaths = new HashSet<>(); + HStore store = clientSideRegionScanner.getRegion().getStore(FAM_NAME); + for (HStoreFile storeFile : store.getStorefiles()) { + Path qualifiedPath = fs.makeQualified(storeFile.getPath()); + expectedFilePaths.add(qualifiedPath); + } + int expectedFileCount = expectedFilePaths.size(); + assertTrue("Should have at least one store file after flush", expectedFileCount >= 1); + + // Before closing, should return empty set + Set filesReadBeforeClose = clientSideRegionScanner.getFilesRead(); + assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty()); + + // Scan through some results + Result result; + int count = 0; + while ((result = clientSideRegionScanner.next()) != null && count < 3) { + assertNotNull("Result should not be null", result); + count++; + } + + // Still should return empty set before closing + filesReadBeforeClose = clientSideRegionScanner.getFilesRead(); + assertTrue("Should return empty set before closing even after scanning", + filesReadBeforeClose.isEmpty()); + + // Close the scanner - this should collect files from the underlying scanner + clientSideRegionScanner.close(); + + // After closing, should return files from the underlying scanner + Set filesReadAfterClose = clientSideRegionScanner.getFilesRead(); + // Verify exact file count + assertEquals("Should have exact file count after closing", expectedFileCount, + filesReadAfterClose.size()); + // Verify exact file names match + assertEquals("Should contain all expected file paths", expectedFilePaths, + filesReadAfterClose); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + private static Put createPut(int rowAsInt) { byte[] row = Bytes.toBytes(rowAsInt); Put put = new Put(row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 3f75a62e4d6d..01584736a8da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -114,6 +115,11 @@ public boolean nextRaw(List result, ScannerContext context return delegate.nextRaw(result, context); } + @Override + public Set getFilesRead() { + return delegate.getFilesRead(); + } + @Override public void close() throws IOException { delegate.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java index 5ce880485312..8f3c858d75f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Set; import java.util.function.IntConsumer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ExtendedCell; @@ -96,6 +97,11 @@ public Path getFilePath() { return delegate.getFilePath(); } + @Override + public Set getFilesRead() { + return delegate.getFilesRead(); + } + @Override public boolean backwardSeek(ExtendedCell key) throws IOException { return delegate.backwardSeek(key); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 9ff116ee53f9..26c1cd21ce30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -25,13 +25,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import javax.crypto.spec.SecretKeySpec; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ArrayBackedTag; @@ -446,6 +449,103 @@ public void testResolve() throws Exception { Assert.assertEquals(Bytes.toString(value2), Bytes.toString(CellUtil.cloneValue(resultCell3))); } + @Test + public void testMobStoreScannerGetFilesRead() throws IOException { + doTestMobStoreScannerGetFilesRead(false); + } + + @Test + public void testReversedMobStoreScannerGetFilesRead() throws IOException { + doTestMobStoreScannerGetFilesRead(true); + } + + /** + * Utility method for getFilesRead tests on MOB store scanners. Uses values above mob + * threshold so DefaultMobStoreFlusher creates the mob file and refs. + */ + private void doTestMobStoreScannerGetFilesRead(boolean reversed) throws IOException { + // Setup: conf, root dir, and MOB store init (mob threshold causes large values to go to MOB). + final Configuration conf = HBaseConfiguration.create(); + Path basedir = new Path(DIR + name.getMethodName()); + CommonFSUtils.setRootDir(conf, basedir); + init(name.getMethodName(), conf, false); + + // Add values above MOB threshold and flush so DefaultMobStoreFlusher creates mob file and refs. + byte[] valueAboveThreshold = Bytes.toBytes("value"); // threshold in setup is 3 bytes + this.store.add(new KeyValue(row, family, qf1, 1, valueAboveThreshold), null); + this.store.add(new KeyValue(row, family, qf2, 1, valueAboveThreshold), null); + this.store.add(new KeyValue(row2, family, qf3, 1, valueAboveThreshold), null); + flush(1); + + // Collect expected paths: store files (refs) plus actual MOB files under mob family path. + FileSystem storeFs = store.getFileSystem(); + Set expectedFilePaths = new HashSet<>(); + for (HStoreFile storeFile : this.store.getStorefiles()) { + expectedFilePaths.add(storeFs.makeQualified(storeFile.getPath())); + } + Path mobFamilyPath = + MobUtils.getMobFamilyPath(conf, TableName.valueOf(table), Bytes.toString(family)); + if (storeFs.exists(mobFamilyPath)) { + FileStatus[] mobFiles = storeFs.listStatus(mobFamilyPath); + for (FileStatus f : mobFiles) { + if (!f.isDirectory()) { + expectedFilePaths.add(storeFs.makeQualified(f.getPath())); + } + } + } + Assert.assertTrue("Should have at least one store file and one mob file", + expectedFilePaths.size() >= 2); + + // Build scan (optionally reversed) and target columns; get store scanner and verify type. + Scan scan = new Scan(); + if (reversed) { + scan.setReversed(true); + } + scan.addColumn(family, qf1); + scan.addColumn(family, qf2); + scan.addColumn(family, qf3); + NavigableSet targetCols = new ConcurrentSkipListSet<>(Bytes.BYTES_COMPARATOR); + targetCols.add(qf1); + targetCols.add(qf2); + targetCols.add(qf3); + + KeyValueScanner kvScanner = store.getScanner(scan, targetCols, 0); + if (reversed) { + Assert.assertTrue("Store scanner should be ReversedMobStoreScanner", + kvScanner instanceof ReversedMobStoreScanner); + } else { + Assert.assertTrue("Store scanner should be MobStoreScanner", + kvScanner instanceof MobStoreScanner); + } + + // Before close: getFilesRead must be empty; then drain scanner to resolve MOB refs. + try { + Set filesReadBeforeClose = kvScanner.getFilesRead(); + Assert.assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty()); + Assert.assertEquals("Should have 0 files before closing", 0, filesReadBeforeClose.size()); + + List results = new ArrayList<>(); + StoreScanner storeScanner = (StoreScanner) kvScanner; + while (storeScanner.next(results)) { + results.clear(); + } + + // Still before close: set must remain empty until scanner is closed. + filesReadBeforeClose = kvScanner.getFilesRead(); + Assert.assertTrue("Should return empty set before closing even after reading", + filesReadBeforeClose.isEmpty()); + } finally { + kvScanner.close(); + } + + // After close: set must contain exactly the expected store + MOB file paths. + Set filesReadAfterClose = kvScanner.getFilesRead(); + Assert.assertEquals("Should have exact file count after closing", expectedFilePaths.size(), + filesReadAfterClose.size()); + Assert.assertEquals("Should contain all expected file paths", expectedFilePaths, + filesReadAfterClose); + } + /** * Flush the memstore */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e8e2263bc04d..d5de6418a8bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -3756,6 +3757,60 @@ public void testGetScanner_WithRegionClosed() throws IOException { } } + @Test + public void testRegionScanner_getFilesRead() throws IOException { + // Setup: init region with one family; put two rows and flush to create store files. + byte[] family = Bytes.toBytes("fam1"); + byte[][] families = { family }; + this.region = initHRegion(tableName, method, CONF, families); + Put put = new Put(Bytes.toBytes("row1")); + put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); + region.put(put); + put = new Put(Bytes.toBytes("row2")); + put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); + region.put(put); + region.flush(true); + + // Collect expected store file paths from all stores (before opening the scanner). + Set expectedFilePaths = new HashSet<>(); + FileSystem fs = region.getFilesystem(); + for (HStore store : region.getStores()) { + for (HStoreFile storeFile : store.getStorefiles()) { + expectedFilePaths.add(fs.makeQualified(storeFile.getPath())); + } + } + assertTrue("Should have at least one store file after flush", expectedFilePaths.size() >= 1); + + // Get region scanner; before close getFilesRead must be empty. + RegionScannerImpl scanner = region.getScanner(new Scan()); + + Set filesReadBeforeClose = scanner.getFilesRead(); + assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty()); + + // Drain scanner (next up to two rows) to exercise store heap reads. + List cells = new ArrayList<>(); + int count = 0; + while (count < 2) { + if (!scanner.next(cells)) { + break; + } + cells.clear(); + count++; + } + + // Still before close: set must remain empty until scanner is closed. + filesReadBeforeClose = scanner.getFilesRead(); + assertTrue("Should return empty set before closing even after scanning", + filesReadBeforeClose.isEmpty()); + scanner.close(); + + // After close: set must contain exactly the expected store file paths. + Set filesReadAfterClose = scanner.getFilesRead(); + assertEquals("Should have exact file count after closing", expectedFilePaths.size(), + filesReadAfterClose.size()); + assertEquals("Should contain all expected file paths", expectedFilePaths, filesReadAfterClose); + } + @Test public void testRegionScanner_Next() throws IOException { byte[] row1 = Bytes.toBytes("row1"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index fea25b424e10..2ee0645f1573 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.ExtendedCell; @@ -209,6 +212,57 @@ public void testPriorityId() throws IOException { assertCells(expected, Arrays.asList(scan1, scan2)); } + @Test + public void testGetFilesRead() throws IOException { + // Create test scanners with file paths + Path file1 = new Path("/test/file1"); + Path file2 = new Path("/test/file2"); + Path file3 = new Path("/test/file3"); + + FileTrackingScanner scanner1 = + new FileTrackingScanner(Arrays.asList(kv115, kv211, kv212), file1); + FileTrackingScanner scanner2 = new FileTrackingScanner(Arrays.asList(kv111, kv112), file2); + FileTrackingScanner scanner3 = + new FileTrackingScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213), file3); + + // Add a non-file-based scanner (e.g., memstore scanner) that doesn't return files + TestScanner memStoreScanner = new TestScanner(Arrays.asList(kv114)); + + List scanners = + new ArrayList<>(Arrays.asList(scanner1, scanner2, scanner3, memStoreScanner)); + + // Create KeyValueHeap and scan through all cells + KeyValueHeap keyValueHeap = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR); + + // Before closing, should return empty set even after scanning + // Scan through all cells first + while (keyValueHeap.peek() != null) { + keyValueHeap.next(); + } + + // Verify that before closing, files are not returned + Set filesReadBeforeClose = keyValueHeap.getFilesRead(); + assertTrue("Should return empty set before closing heap", filesReadBeforeClose.isEmpty()); + assertEquals("Should have 0 files before closing", 0, filesReadBeforeClose.size()); + + // Now close the heap + keyValueHeap.close(); + + // After closing, should return all files from file-based scanners only + // Non-file-based scanners (like memstore) should not contribute files + Set filesReadAfterClose = keyValueHeap.getFilesRead(); + assertEquals("Should return set with 3 file paths after closing (excluding non-file scanner)", + 3, filesReadAfterClose.size()); + assertTrue("Should contain file1", filesReadAfterClose.contains(file1)); + assertTrue("Should contain file2", filesReadAfterClose.contains(file2)); + assertTrue("Should contain file3", filesReadAfterClose.contains(file3)); + + // Verify that non-file-based scanner doesn't contribute any files + // (memStoreScanner.getFilesRead() should return empty set) + Set memStoreFiles = memStoreScanner.getFilesRead(); + assertTrue("Non-file-based scanner should return empty set", memStoreFiles.isEmpty()); + } + private static class TestScanner extends CollectionBackedScanner { private boolean closed = false; private long scannerOrder = 0; @@ -269,4 +323,26 @@ public void enforceSeek() throws IOException { throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); } } + + private static class FileTrackingScanner extends TestScanner { + private final Path filePath; + private boolean closed = false; + + public FileTrackingScanner(List list, Path filePath) { + super(list); + this.filePath = filePath; + } + + @Override + public void close() { + super.close(); + closed = true; + } + + @Override + public Set getFilesRead() { + // Only return the file path after the scanner is closed + return closed ? Collections.singleton(filePath) : Collections.emptySet(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScanner.java new file mode 100644 index 000000000000..84566a9651cc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScanner.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Test StoreFileScanner + */ +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestStoreFileScanner { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStoreFileScanner.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final String TEST_FAMILY = "cf"; + + @Rule + public TestName name = new TestName(); + + private Configuration conf; + private Path testDir; + private FileSystem fs; + private CacheConfig cacheConf; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + testDir = TEST_UTIL.getDataTestDir(name.getMethodName()); + fs = testDir.getFileSystem(conf); + cacheConf = new CacheConfig(conf); + } + + private void writeStoreFile(final StoreFileWriter writer) throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + byte[] family = Bytes.toBytes(TEST_FAMILY); + byte[] qualifier = Bytes.toBytes("col"); + for (char d = 'a'; d <= 'z'; d++) { + for (char e = 'a'; e <= 'z'; e++) { + byte[] row = new byte[] { (byte) d, (byte) e }; + writer.append(new KeyValue(row, family, qualifier, now, row)); + } + } + } + + @Test + public void testGetFilesRead() throws Exception { + // Setup: region info, region fs, and HFile context; create store file and write data. + final RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build(); + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, + new Path(testDir, hri.getTable().getNameAsString()), hri); + HFileContext hFileContext = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + + StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(regionFs.createTempName()).withFileContext(hFileContext).build(); + writeStoreFile(writer); + Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath()); + writer.close(); + + // Open HStoreFile and reader; get qualified path and create StoreFileScanner. + StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, + StoreContext.getBuilder() + .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(), TEST_FAMILY)) + .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)) + .withRegionFileSystem(regionFs).build()); + HStoreFile file = new HStoreFile(fs, hsfPath, conf, cacheConf, BloomType.NONE, true, sft); + file.initReader(); + StoreFileReader r = file.getReader(); + assertNotNull(r); + Path qualifiedPath = fs.makeQualified(hsfPath); + StoreFileScanner scanner = r.getStoreFileScanner(false, false, false, 0, 0, false); + + // Before close: getFilesRead must be empty. + Set filesRead = scanner.getFilesRead(); + assertTrue("Should return empty set before closing scanner", filesRead.isEmpty()); + + scanner.close(); + + // After close: set must contain the single qualified store file path. + filesRead = scanner.getFilesRead(); + assertEquals("Should return set with one file path after closing", 1, filesRead.size()); + assertTrue("Should contain the qualified file path", filesRead.contains(qualifiedPath)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 1a6801666145..9d2ac98b07ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -31,9 +32,12 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellComparator; @@ -48,12 +52,21 @@ import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -67,6 +80,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1093,4 +1107,203 @@ public void close() { assertFalse(memStoreScanner.closed); } } + + @Test + public void testGetFilesRead() throws Exception { + // Setup: test util, conf, fs, cache, region fs, and HFile context. + HBaseTestingUtil testUtil = new HBaseTestingUtil(); + Configuration conf = testUtil.getConfiguration(); + Path testDir = testUtil.getDataTestDir(name.getMethodName() + "_directory"); + FileSystem fs = testDir.getFileSystem(conf); + CacheConfig cacheConf = new CacheConfig(conf); + final String TEST_FAMILY = "cf"; + + final RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build(); + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, + new Path(testDir, hri.getTable().getNameAsString()), hri); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + + StoreFileTracker sft = StoreFileTrackerFactory.create(conf, false, + StoreContext.getBuilder() + .withFamilyStoreDirectoryPath(new Path(regionFs.getRegionDir(), TEST_FAMILY)) + .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)) + .withRegionFileSystem(regionFs).build()); + + long now = EnvironmentEdgeManager.currentTime(); + List filePaths = new ArrayList<>(); + List storeFiles = new ArrayList<>(); + + // File 1: rows "row01" to "row05" - in scan key range, fresh timestamp + StoreFileWriter writer1 = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(regionFs.createTempName()).withFileContext(meta).build(); + for (int i = 1; i <= 5; i++) { + writer1.append(new KeyValue(Bytes.toBytes(String.format("row%02d", i)), CF, + Bytes.toBytes("col"), now, Bytes.toBytes("value" + i))); + } + Path path1 = regionFs.commitStoreFile(TEST_FAMILY, writer1.getPath()); + writer1.close(); + filePaths.add(fs.makeQualified(path1)); + HStoreFile file1 = new HStoreFile(fs, path1, conf, cacheConf, BloomType.NONE, true, sft); + file1.initReader(); + storeFiles.add(file1); + + // File 2: rows "row06" to "row10" - in scan key range, fresh timestamp + StoreFileWriter writer2 = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(regionFs.createTempName()).withFileContext(meta).build(); + for (int i = 6; i <= 10; i++) { + writer2.append(new KeyValue(Bytes.toBytes(String.format("row%02d", i)), CF, + Bytes.toBytes("col"), now, Bytes.toBytes("value" + i))); + } + Path path2 = regionFs.commitStoreFile(TEST_FAMILY, writer2.getPath()); + writer2.close(); + filePaths.add(fs.makeQualified(path2)); + HStoreFile file2 = new HStoreFile(fs, path2, conf, cacheConf, BloomType.NONE, true, sft); + file2.initReader(); + storeFiles.add(file2); + + // File 3: rows "row20" to "row25" - OUT of scan key range (after stop row) + StoreFileWriter writer3 = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(regionFs.createTempName()).withFileContext(meta).build(); + for (int i = 20; i <= 25; i++) { + writer3.append(new KeyValue(Bytes.toBytes(String.format("row%02d", i)), CF, + Bytes.toBytes("col"), now, Bytes.toBytes("value" + i))); + } + Path path3 = regionFs.commitStoreFile(TEST_FAMILY, writer3.getPath()); + writer3.close(); + filePaths.add(fs.makeQualified(path3)); + HStoreFile file3 = new HStoreFile(fs, path3, conf, cacheConf, BloomType.NONE, true, sft); + file3.initReader(); + storeFiles.add(file3); + + // File 4: row "row00" - OUT of key range (before start row) + StoreFileWriter writer4 = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(regionFs.createTempName()).withFileContext(meta).build(); + writer4.append(new KeyValue(Bytes.toBytes("row00"), CF, Bytes.toBytes("col"), + now, Bytes.toBytes("value0"))); + Path path4 = regionFs.commitStoreFile(TEST_FAMILY, writer4.getPath()); + writer4.close(); + filePaths.add(fs.makeQualified(path4)); + HStoreFile file4 = new HStoreFile(fs, path4, conf, cacheConf, BloomType.NONE, true, sft); + file4.initReader(); + storeFiles.add(file4); + + // File 5: row "row11" with expired timestamp (1 hour ago); TTL-filtered but still tracked. + long expiredTime = now - (1000 * 60 * 60); + StoreFileWriter writer5 = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(regionFs.createTempName()).withFileContext(meta).build(); + writer5.append(new KeyValue(Bytes.toBytes("row11"), CF, Bytes.toBytes("col"), expiredTime, + Bytes.toBytes("expired_value"))); + Path path5 = regionFs.commitStoreFile(TEST_FAMILY, writer5.getPath()); + writer5.close(); + filePaths.add(fs.makeQualified(path5)); + HStoreFile file5 = new HStoreFile(fs, path5, conf, cacheConf, BloomType.NONE, true, sft); + file5.initReader(); + storeFiles.add(file5); + + // Create StoreFileScanners from all five files. + List scanners = new ArrayList<>(); + for (HStoreFile storeFile : storeFiles) { + StoreFileReader reader = storeFile.getReader(); + StoreFileScanner scanner = reader.getStoreFileScanner(false, false, false, 0, 0, false); + scanners.add(scanner); + } + + // Scan row01–row15 with 30-minute TTL so file 5's expired cell is filtered after read. + Scan scan = + new Scan().withStartRow(Bytes.toBytes("row01")).withStopRow(Bytes.toBytes("row15"), false); + long ttl = 30 * 60 * 1000; + ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, ttl, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); + + // Create StoreScanner; drain with next(), then close. + StoreScanner storeScanner = new StoreScanner(scan, scanInfo, null, scanners); + + List results = new ArrayList<>(); + while (storeScanner.next(results)) { + results.clear(); + } + storeScanner.close(); + + // After close: all 5 files must be tracked (in-range, out-of-range, and TTL-expired). + Set filesRead = storeScanner.getFilesRead(); + + assertTrue("File 1 (in range) should be tracked", filesRead.contains(filePaths.get(0))); + assertTrue("File 2 (in range) should be tracked", filesRead.contains(filePaths.get(1))); + assertTrue("File 3 (out of key range) should be tracked", filesRead.contains(filePaths.get(2))); + assertTrue("File 4 (before start row) should be tracked", filesRead.contains(filePaths.get(3))); + assertTrue("File 5 (expired TTL, filtered after read) should be tracked", + filesRead.contains(filePaths.get(4))); + assertEquals("Should have all 5 files read", 5, filesRead.size()); + } + + /** + * Test that when StoreScanner initialization fails after scanners are created, files are not + * tracked + */ + @Test + public void testGetFilesReadOnInitializationFailure() throws Exception { + HStore mockStore = Mockito.mock(HStore.class); + ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); + Scan scan = new Scan(); + NavigableSet columns = null; + long readPt = 100L; + + // Create mock scanners that will be returned by getScanners + KeyValueScanner mockScanner1 = Mockito.mock(StoreFileScanner.class); + KeyValueScanner mockScanner2 = Mockito.mock(StoreFileScanner.class); + Path filePath1 = new Path("/test/file1"); + Path filePath2 = new Path("/test/file2"); + Mockito.when(mockScanner1.isFileScanner()).thenReturn(true); + Mockito.when(mockScanner2.isFileScanner()).thenReturn(true); + Mockito.doReturn(true).when(mockScanner1).shouldUseScanner(Mockito.any(), Mockito.any(), + Mockito.anyLong()); + Mockito.doReturn(true).when(mockScanner2).shouldUseScanner(Mockito.any(), Mockito.any(), + Mockito.anyLong()); + Mockito.when(mockScanner1.getFilesRead()) + .thenReturn(Collections.singleton(filePath1)); + Mockito.when(mockScanner2.getFilesRead()) + .thenReturn(Collections.singleton(filePath2)); + + List mockScanners = new ArrayList<>(); + mockScanners.add(mockScanner1); + mockScanners.add(mockScanner2); + + // Make getScanners return the mock scanners + Mockito.when(mockStore.getScanners(Mockito.anyBoolean(), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), + Mockito.anyBoolean(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(mockScanners); + + Mockito.when(mockStore.getCoprocessorHost()).thenReturn(null); + + // Make seek throw IOException on one scanner to simulate failure during seekScanners + Mockito.doThrow(new IOException("Test seek failure")).when(mockScanner1).seek(Mockito.any()); + + // Verify that IOException is thrown during construction + StoreScanner storeScanner = null; + IOException caughtException = null; + try { + storeScanner = new StoreScanner(mockStore, scanInfo, scan, columns, readPt); + } catch (IOException e) { + caughtException = e; + } + + // Verify that exception was thrown + assertNotNull("Should have thrown IOException during initialization", caughtException); + + // Verify that store methods were called (cleanup happened in catch block) + Mockito.verify(mockStore, Mockito.times(1)).addChangedReaderObserver(Mockito.any()); + Mockito.verify(mockStore, Mockito.times(1)).deleteChangedReaderObserver(Mockito.any()); + + // Verify that scanners were closed (clearAndClose was called in catch block) + Mockito.verify(mockScanner1, Mockito.times(1)).close(); + Mockito.verify(mockScanner2, Mockito.times(1)).close(); + + // Verify that getFilesRead was NOT called on the scanners + // (because trackFiles=false was passed to clearAndClose, so files weren't tracked) + Mockito.verify(mockScanner1, Mockito.never()).getFilesRead(); + Mockito.verify(mockScanner2, Mockito.never()).getFilesRead(); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java index c0f444e5b463..0b6b7b838ee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java @@ -19,15 +19,21 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; @@ -257,4 +263,87 @@ public boolean hasFilterRow() { public void testFilterRow() throws IOException { testFilter(new MatchLastRowFilterRowFilter()); } + + /** + * Verifies that when the store scanner switches from pread to stream read successfully, all store + * files that were read (including those closed during the switch) are reported by + * {@link StoreScanner#getFilesRead()} after close. + */ + @Test + public void testGetFilesReadOnTrySwitchToStreamRead() throws Exception { + HStore store = REGION.getStore(FAMILY); + FileSystem fs = REGION.getFilesystem(); + + // Set a very small preadMaxBytes so that trySwitchToStreamRead is triggered during scan. + long originalPreadMaxBytes = + UTIL.getConfiguration().getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); + try { + UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 10L); + + ScanInfo scanInfo = + new ScanInfo(UTIL.getConfiguration(), FAMILY, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + org.apache.hadoop.hbase.KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, + org.apache.hadoop.hbase.CellComparator.getInstance(), false); + Scan scan = new Scan().setReadType(Scan.ReadType.DEFAULT); + long readPt = + REGION.getReadPoint(org.apache.hadoop.hbase.client.IsolationLevel.READ_COMMITTED); + + StoreScanner storeScanner = new StoreScanner(store, scanInfo, scan, null, readPt); + + // Collect expected store file paths (qualified) for assertion after close. + Set expectedFilePaths = new HashSet<>(); + for (HStoreFile sf : store.getStorefiles()) { + expectedFilePaths.add(fs.makeQualified(sf.getPath())); + } + assertFalse("Should have at least one store file", expectedFilePaths.isEmpty()); + + // Verify scanners start in PREAD mode before the switch. + for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { + if (kvs instanceof StoreFileScanner) { + StoreFileScanner sfScanner = (StoreFileScanner) kvs; + assertSame("Scanner should start in PREAD mode", ReaderType.PREAD, + sfScanner.getReader().getReaderContext().getReaderType()); + } + } + + // Scan a few rows and call shipped() to trigger trySwitchToStreamRead. + List results = new ArrayList<>(); + ScannerContext scannerContext = ScannerContext.newBuilder().build(); + boolean switchVerified = false; + while (storeScanner.next(results, scannerContext)) { + results.clear(); + storeScanner.shipped(); + + // Check mid-scan, whether the switch happened. + if (!switchVerified) { + boolean allSwitched = true; + for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { + if (kvs instanceof StoreFileScanner) { + StoreFileScanner sfScanner = (StoreFileScanner) kvs; + if (sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD) { + allSwitched = false; + break; + } + } + } + if (allSwitched) { + switchVerified = true; + } + } + } + assertTrue("trySwitchToStreamRead should have been invoked and scanners switched to stream", + switchVerified); + + // Not closing the scanners explicitly, because those must be closed during trySwitchToStreamRead + + // After close: files that were read (including those closed during switch) must be tracked. + Set filesRead = storeScanner.getFilesRead(); + assertEquals("Should have exact file count after close", expectedFilePaths.size(), + filesRead.size()); + assertEquals("Should contain all expected store file paths", expectedFilePaths, filesRead); + } finally { + UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, + originalPreadMaxBytes); + } + } }