Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -129,8 +130,8 @@ TableSnapshotInputFormatImpl.InputSplit getDelegate() {
}
}

@InterfaceAudience.Private
static class TableSnapshotRegionRecordReader
@InterfaceAudience.Public
public static class TableSnapshotRegionRecordReader
extends RecordReader<ImmutableBytesWritable, Result> {
private TableSnapshotInputFormatImpl.RecordReader delegate =
new TableSnapshotInputFormatImpl.RecordReader();
Expand Down Expand Up @@ -170,6 +171,14 @@ public float getProgress() throws IOException, InterruptedException {
return delegate.getProgress();
}

/**
* Returns the set of store file paths that were successfully read by the underlying region
* scanner. Typically populated after the reader (or scanner) is closed.
*/
public Set<Path> getFilesRead() {
return delegate.getScanner().getFilesRead();
}

@Override
public void close() throws IOException {
delegate.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ public static List<String> getBestLocations(Configuration conf,
return getBestLocations(conf, blockDistribution, 3);
}

private static String getSnapshotName(Configuration conf) {
public static String getSnapshotName(Configuration conf) {
String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
if (snapshotName == null) {
throw new IllegalArgumentException("Snapshot name must be provided");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand All @@ -44,7 +48,9 @@
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionRecordReader;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
Expand Down Expand Up @@ -142,6 +148,97 @@ public void testGetBestLocations() throws IOException {
TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
}

@Test
public void testTableSnapshotRegionRecordReaderGetFilesRead() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
String snapshotName = name.getMethodName() + "_snapshot";
try {
// Setup: create table, load data, snapshot, and configure job with restore dir
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);

Configuration conf = UTIL.getConfiguration();
Job job = new Job(conf);
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir);

// Get splits (one per region) and extract delegate split for restore path and region info
TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
List<InputSplit> splits = tsif.getSplits(job);
Assert.assertEquals(1, splits.size());

InputSplit split = splits.get(0);
Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
TableSnapshotInputFormatImpl.InputSplit implSplit = snapshotRegionSplit.getDelegate();

// Collect expected store file paths from the restored region directory
Set<String> expectedFiles = new HashSet<>();
Path restorePath = new Path(implSplit.getRestoreDir());
FileSystem fs = restorePath.getFileSystem(conf);
Path tableDir =
CommonFSUtils.getTableDir(restorePath, implSplit.getTableDescriptor().getTableName());
Path regionPath = new Path(tableDir, implSplit.getRegionInfo().getEncodedName());
FileStatus[] familyDirs = fs.listStatus(regionPath);
if (familyDirs != null) {
for (FileStatus fam : familyDirs) {
if (fam.isDirectory()) {
FileStatus[] files = fs.listStatus(fam.getPath());
if (files != null) {
for (FileStatus f : files) {
if (f.isFile()) {
String referenceFileName = f.getPath().getName();
expectedFiles.add(HFileLink.getReferencedHFileName(referenceFileName));
}
}
}
}
}
}
Assert.assertFalse("Should have at least one store file after snapshot restore",
expectedFiles.isEmpty());

// Create record reader, initialize with split (opens underlying ClientSideRegionScanner)
TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());

RecordReader<ImmutableBytesWritable, Result> rr =
tsif.createRecordReader(split, taskAttemptContext);
Assert.assertTrue(rr instanceof TableSnapshotRegionRecordReader);
TableSnapshotRegionRecordReader recordReader = (TableSnapshotRegionRecordReader) rr;
recordReader.initialize(split, taskAttemptContext);

// Before close: getFilesRead() must be empty
Set<Path> filesReadBeforeClose = recordReader.getFilesRead();
Assert.assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty());

// Read a few key-values; getFilesRead() must still be empty until close
int count = 0;
while (count < 3 && recordReader.nextKeyValue()) {
count++;
}

filesReadBeforeClose = recordReader.getFilesRead();
Assert.assertTrue("Should return empty set before closing even after reading",
filesReadBeforeClose.isEmpty());

// Close reader so underlying scanner reports files successfully read
recordReader.close();

// After close: getFilesRead() must match expected store file set
Set<String> filesReadAfterClose = recordReader.getFilesRead().stream()
.map(Path::getName).collect(Collectors.toSet());

Assert.assertEquals("Should contain all expected file paths", expectedFiles,
filesReadAfterClose);
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
}
}

public static enum TestTableSnapshotCounters {
VALIDATION_ERROR
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -53,6 +56,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
RegionScanner scanner;
List<Cell> values;
boolean hasMore = true;
private final Set<Path> filesRead;

public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
Expand Down Expand Up @@ -85,6 +89,7 @@ public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,

// create an internal region scanner
this.scanner = region.getScanner(scan);
this.filesRead = new HashSet<>();
values = new ArrayList<>();

if (scanMetrics == null) {
Expand Down Expand Up @@ -129,6 +134,7 @@ public void close() {
if (this.scanner != null) {
try {
this.scanner.close();
this.filesRead.addAll(this.scanner.getFilesRead());
this.scanner = null;
} catch (IOException ex) {
LOG.warn("Exception while closing scanner", ex);
Expand Down Expand Up @@ -162,6 +168,14 @@ HRegion getRegion() {
return region;
}

/**
* Returns the set of store file paths that were successfully read by the underlying region
* scanner. Populated when this scanner is closed.
*/
public Set<Path> getFilesRead() {
return Collections.unmodifiableSet(this.filesRead);
}

@Override
public boolean renewLease() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
Expand Down Expand Up @@ -63,6 +66,14 @@ public ExtendedCell getCell() {
return cell;
}

/**
* Returns the set of file paths successfully read by the underlying MOB store file scanner.
* Should be called after {@link #close()} to get the path of the MOB file that was read.
*/
public Set<Path> getFilesRead() {
return sfScanner != null ? sfScanner.getFilesRead() : Collections.emptySet();
}

@Override
public void close() throws IOException {
if (this.sfScanner != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.ExtendedCell;
Expand Down Expand Up @@ -66,6 +70,8 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner

protected KVScannerComparator comparator;

private final Set<Path> filesRead = new HashSet<>();

/**
* Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners.
*/
Expand Down Expand Up @@ -216,19 +222,31 @@ public CellComparator getComparator() {
public void close() {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close();
filesRead.addAll(scanner.getFilesRead());
}
this.scannersForDelayedClose.clear();
if (this.current != null) {
this.current.close();
filesRead.addAll(this.current.getFilesRead());
}
if (this.heap != null) {
// Order of closing the scanners shouldn't matter here, so simply iterate and close them.
for (KeyValueScanner scanner : heap) {
scanner.close();
filesRead.addAll(scanner.getFilesRead());
}
}
}

/**
* Returns the set of store file paths successfully read by the scanners in this heap. Populated
* as each scanner is closed (e.g. in close() or shipped()).
*/
@Override
public Set<Path> getFilesRead() {
return Collections.unmodifiableSet(filesRead);
}

/**
* Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end
* up skipping values that were never reached yet. Rather than iterating down, we want to give the
Expand Down Expand Up @@ -419,6 +437,7 @@ public ExtendedCell getNextIndexedKey() {
public void shipped() throws IOException {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
filesRead.addAll(scanner.getFilesRead());
}
this.scannersForDelayedClose.clear();
if (this.current != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ExtendedCell;
Expand Down Expand Up @@ -139,6 +140,12 @@ default long getScannerOrder() {
*/
Path getFilePath();

/**
* Returns the set of store file paths that were successfully read by this scanner. Typically
* populated only after the scanner is closed.
*/
Set<Path> getFilesRead();

// Support for "Reversed Scanner"
/**
* Seek the scanner at or before the row of specified Cell, it firstly tries to seek the scanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mob.MobCell;
Expand All @@ -43,6 +47,7 @@ public class MobStoreScanner extends StoreScanner {
private boolean readEmptyValueOnMobCellMiss = false;
private final HMobStore mobStore;
private final List<MobCell> referencedMobCells;
private final Set<Path> mobFilesRead = new HashSet<>();

public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
final NavigableSet<byte[]> columns, long readPt) throws IOException {
Expand Down Expand Up @@ -94,10 +99,23 @@ public boolean next(List<? super ExtendedCell> outResult, ScannerContext ctx) th
private void freeAllReferencedMobCells() throws IOException {
for (MobCell cell : referencedMobCells) {
cell.close();
mobFilesRead.addAll(cell.getFilesRead());
}
referencedMobCells.clear();
}

/**
* Returns the set of store file paths and MOB file paths successfully read by this scanner.
* Combines paths from the underlying store scanner with paths from resolved MOB cells (populated
* when referenced mob cells are closed, e.g. in close() or shipped()).
*/
@Override
public Set<Path> getFilesRead() {
Set<Path> allFiles = new HashSet<>(super.getFilesRead());
allFiles.addAll(mobFilesRead);
return Collections.unmodifiableSet(allFiles);
}

@Override
public void shipped() throws IOException {
super.shipped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.function.IntConsumer;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -76,6 +78,15 @@ public Path getFilePath() {
return null;
}

/**
* Returns the set of store file paths successfully read by this scanner. Default implementation
* returns an empty set for non-file scanners (e.g. memstore).
*/
@Override
public Set<Path> getFilesRead() {
return Collections.emptySet();
}

@Override
public ExtendedCell getNextIndexedKey() {
return null;
Expand Down
Loading
Loading