diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java index 8eedcf1ed491..b928154fb4aa 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBProfile.java @@ -87,6 +87,7 @@ public ManagedBlockBasedTableConfig getBlockBasedTableConfig() { ManagedBlockBasedTableConfig config = new ManagedBlockBasedTableConfig(); config.setBlockCache(new ManagedLRUCache(blockCacheSize)) .setBlockSize(blockSize) + .setFormatVersion(BLOCK_BASED_TABLE_FORMAT_VERSION) .setPinL0FilterAndIndexBlocksInCache(true) .setFilterPolicy(new ManagedBloomFilter()); return config; @@ -145,6 +146,9 @@ public ManagedBlockBasedTableConfig getBlockBasedTableConfig() { } }; + // Keep SST/block-based table format stable across RocksDB upgrades. + private static final int BLOCK_BASED_TABLE_FORMAT_VERSION = 5; + public static long toLong(double value) { BigDecimal temp = BigDecimal.valueOf(value); return temp.longValue(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 7d8d2f5af57d..a29ba5786334 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -99,16 +99,16 @@ public boolean isEmpty() throws RocksDatabaseException { @Override public boolean isExist(byte[] key) throws RocksDatabaseException { rdbMetrics.incNumDBKeyMayExistChecks(); - final Supplier holder = db.keyMayExist(family, key); - if (holder == null) { - return false; // definitely not exists - } - final byte[] value = holder.get(); - if (value != null) { - return true; // definitely exists + final Supplier valueSupplier = db.keyMayExist(family, key); + if (valueSupplier != null) { + final byte[] value = valueSupplier.get(); + if (value != null) { + return true; // definitely exists + } } - // inconclusive: the key may or may not exist + // keyMayExist can be a false-negative in some RocksDB setups (for example, + // snapshot/checkpoint DBs). Verify via point-get to preserve correctness. final boolean exists = get(key) != null; if (!exists) { rdbMetrics.incNumDBKeyMayExistMisses(); @@ -141,15 +141,15 @@ public byte[] getSkipCache(byte[] bytes) throws RocksDatabaseException { @Override public byte[] getIfExist(byte[] key) throws RocksDatabaseException { rdbMetrics.incNumDBKeyGetIfExistChecks(); - final Supplier value = db.keyMayExist(family, key); - if (value == null) { - return null; // definitely not exists - } - if (value.get() != null) { - return value.get(); // definitely exists + final Supplier valueSupplier = db.keyMayExist(family, key); + if (valueSupplier != null) { + final byte[] value = valueSupplier.get(); + if (value != null) { + return value; // definitely exists + } } - // inconclusive: the key may or may not exist + // keyMayExist is treated as a hint only; confirm via point-get. rdbMetrics.incNumDBKeyGetIfExistGets(); final byte[] val = get(key); if (val == null) { @@ -160,19 +160,21 @@ public byte[] getIfExist(byte[] key) throws RocksDatabaseException { Integer getIfExist(ByteBuffer key, ByteBuffer outValue) throws RocksDatabaseException { rdbMetrics.incNumDBKeyGetIfExistChecks(); + // keyMayExist may change key buffer position; never reuse the same + // ByteBuffer instance for fallback point-get. final Supplier value = db.keyMayExist( - family, key, outValue.duplicate()); - if (value == null) { - return null; // definitely not exists - } - if (value.get() != null) { - // definitely exists, return value size. - return value.get(); + family, key.duplicate(), outValue.duplicate()); + if (value != null) { + final Integer length = value.get(); + if (length != null) { + // definitely exists, return value size. + return length; + } } - // inconclusive: the key may or may not exist + // keyMayExist is treated as a hint only; confirm via point-get. rdbMetrics.incNumDBKeyGetIfExistGets(); - final Integer val = get(key, outValue); + final Integer val = get(key.duplicate(), outValue); if (val == null) { rdbMetrics.incNumDBKeyGetIfExistMisses(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index f344ad95e550..be96b69e958f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -626,8 +626,11 @@ Supplier keyMayExist(ColumnFamily family, byte[] key) Supplier keyMayExist(ColumnFamily family, ByteBuffer key, ByteBuffer out) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { + // keyMayExist may advance the input ByteBuffer position in native code. + // Always pass a duplicate so callers can safely reuse the original key + // buffer for a follow-up point-get. final KeyMayExist result = db.get().keyMayExist( - family.getHandle(), key, out); + family.getHandle(), key.duplicate(), out); switch (result.exists) { case kNotExist: return null; case kExistsWithValue: return () -> result.valueLength; @@ -888,6 +891,12 @@ public void deleteFilesNotMatchingPrefix(TablePrefixInfo prefixInfo) throws Rock boolean isKeyWithPrefixPresent = RocksDiffUtils.isKeyWithPrefixPresent( prefixForColumnFamily, firstDbKey, lastDbKey); if (!isKeyWithPrefixPresent) { + ColumnFamilyHandle handle = getColumnFamilyHandle(sstFileColumnFamily); + if (handle == null) { + LOG.warn("Skipping sst file deletion for {}: no handle found for column family {}", + liveFileMetaData.fileName(), sstFileColumnFamily); + continue; + } LOG.info("Deleting sst file: {} with start key: {} and end key: {} " + "corresponding to column family {} from db: {}. " + "Prefix for the column family: {}.", @@ -896,7 +905,7 @@ public void deleteFilesNotMatchingPrefix(TablePrefixInfo prefixInfo) throws Rock StringUtils.bytes2String(liveFileMetaData.columnFamilyName()), db.get().getName(), prefixForColumnFamily); - db.deleteFile(liveFileMetaData); + db.deleteFile(handle, liveFileMetaData); } } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 59e924529ce4..d42b4692496e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -179,11 +179,17 @@ public boolean isExist(KEY key) throws RocksDatabaseException, CodecException { } else if (keyCodec.supportCodecBuffer()) { // keyCodec.supportCodecBuffer() is enough since value is not needed. try (CodecBuffer inKey = keyCodec.toDirectCodecBuffer(key)) { - // Use zero capacity buffer since value is not needed. + // Try the lightweight "existence only" check first. try (CodecBuffer outValue = CodecBuffer.getEmptyBuffer()) { - return getFromTableIfExist(inKey, outValue) != null; + if (getFromTableIfExist(inKey, outValue) != null) { + return true; + } } } + // keyMayExist/getIfExist can still be false-negative in some RocksDB + // configurations (observed with snapshot/checkpoint DBs), so confirm with + // a regular point lookup before returning false. + return getFromTable(key) != null; } else { return rawTable.isExist(encodeKey(key)); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 1dbb5029713a..508ee6acdb5b 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -80,6 +80,11 @@ public VALUE getIfExist(KEY key) { return map.get(key); } + @Override + public VALUE getSkipCache(KEY key) { + return map.get(key); + } + @Override public void delete(KEY key) { map.remove(key); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java index 919b3b6cdad2..cddb11e95285 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreCodecBufferIterator.java @@ -100,13 +100,13 @@ Answer newAnswer(String name, byte... b) { public void testForEachRemaining() throws Exception { when(rocksIteratorMock.isValid()) .thenReturn(true, true, true, true, true, true, true, false); - when(rocksIteratorMock.key(any())) + when(rocksIteratorMock.key(any(ByteBuffer.class))) .then(newAnswerInt("key1", 0x00)) .then(newAnswerInt("key2", 0x00)) .then(newAnswerInt("key3", 0x01)) .then(newAnswerInt("key4", 0x02)) .thenThrow(new NoSuchElementException()); - when(rocksIteratorMock.value(any())) + when(rocksIteratorMock.value(any(ByteBuffer.class))) .then(newAnswerInt("val1", 0x7f)) .then(newAnswerInt("val2", 0x7f)) .then(newAnswerInt("val3", 0x7e)) @@ -152,8 +152,8 @@ public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() } verifier.verify(rocksIteratorMock).isValid(); - verifier.verify(rocksIteratorMock).key(any()); - verifier.verify(rocksIteratorMock).value(any()); + verifier.verify(rocksIteratorMock).key(any(ByteBuffer.class)); + verifier.verify(rocksIteratorMock).value(any(ByteBuffer.class)); verifier.verify(rocksIteratorMock).next(); CodecTestUtil.gc(); @@ -192,9 +192,9 @@ public void testSeekToLastSeeks() throws Exception { @Test public void testSeekReturnsTheActualKey() throws Exception { when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(rocksIteratorMock.key(any(ByteBuffer.class))) .then(newAnswerInt("key1", 0x00)); - when(rocksIteratorMock.value(any())) + when(rocksIteratorMock.value(any(ByteBuffer.class))) .then(newAnswerInt("val1", 0x7f)); try (RDBStoreCodecBufferIterator i = newIterator(); @@ -208,8 +208,8 @@ public void testSeekReturnsTheActualKey() throws Exception { verifier.verify(rocksIteratorMock, times(1)) .seek(any(ByteBuffer.class)); verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rocksIteratorMock, times(1)).key(any()); - verifier.verify(rocksIteratorMock, times(1)).value(any()); + verifier.verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class)); + verifier.verify(rocksIteratorMock, times(1)).value(any(ByteBuffer.class)); assertArrayEquals(new byte[]{0x00}, val.getKey().getArray()); assertArrayEquals(new byte[]{0x7f}, val.getValue().getArray()); } @@ -220,7 +220,7 @@ public void testSeekReturnsTheActualKey() throws Exception { @Test public void testGettingTheKeyIfIteratorIsValid() throws Exception { when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(rocksIteratorMock.key(any(ByteBuffer.class))) .then(newAnswerInt("key1", 0x00)); byte[] key = null; @@ -233,7 +233,7 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception { InOrder verifier = inOrder(rocksIteratorMock); verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rocksIteratorMock, times(1)).key(any()); + verifier.verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class)); assertArrayEquals(new byte[]{0x00}, key); CodecTestUtil.gc(); @@ -242,9 +242,9 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception { @Test public void testGettingTheValueIfIteratorIsValid() throws Exception { when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(rocksIteratorMock.key(any(ByteBuffer.class))) .then(newAnswerInt("key1", 0x00)); - when(rocksIteratorMock.value(any())) + when(rocksIteratorMock.value(any(ByteBuffer.class))) .then(newAnswerInt("val1", 0x7f)); byte[] key = null; @@ -260,7 +260,7 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception { InOrder verifier = inOrder(rocksIteratorMock); verifier.verify(rocksIteratorMock, times(1)).isValid(); - verifier.verify(rocksIteratorMock, times(1)).key(any()); + verifier.verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class)); assertArrayEquals(new byte[]{0x00}, key); assertArrayEquals(new byte[]{0x7f}, value); @@ -272,7 +272,7 @@ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { final byte[] testKey = new byte[10]; ThreadLocalRandom.current().nextBytes(testKey); when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(rocksIteratorMock.key(any(ByteBuffer.class))) .then(newAnswer("key1", testKey)); try (RDBStoreCodecBufferIterator i = newIterator(null)) { @@ -320,7 +320,7 @@ public void testNullPrefixedIterator() throws Exception { when(rocksIteratorMock.isValid()).thenReturn(true); assertTrue(i.hasNext()); verify(rocksIteratorMock, times(1)).isValid(); - verify(rocksIteratorMock, times(0)).key(any()); + verify(rocksIteratorMock, times(0)).key(any(ByteBuffer.class)); i.seekToLast(); verify(rocksIteratorMock, times(1)).seekToLast(); @@ -343,11 +343,11 @@ public void testNormalPrefixedIterator() throws Exception { clearInvocations(rocksIteratorMock); when(rocksIteratorMock.isValid()).thenReturn(true); - when(rocksIteratorMock.key(any())) + when(rocksIteratorMock.key(any(ByteBuffer.class))) .then(newAnswer("key1", prefixBytes)); assertTrue(i.hasNext()); verify(rocksIteratorMock, times(1)).isValid(); - verify(rocksIteratorMock, times(1)).key(any()); + verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class)); Exception e = assertThrows(Exception.class, () -> i.seekToLast(), "Prefixed iterator does not support seekToLast"); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTable.java new file mode 100644 index 000000000000..9b0ac9dd9941 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link RDBTable}. + */ +public class TestRDBTable { + + @Test + public void testGetIfExistByteBufferFallbackUsesFreshKeyBuffer() + throws Exception { + RocksDatabase db = mock(RocksDatabase.class); + ColumnFamily columnFamily = mock(ColumnFamily.class); + RDBMetrics metrics = mock(RDBMetrics.class); + RDBTable table = new RDBTable(db, columnFamily, metrics); + + byte[] keyBytes = "key-1".getBytes(UTF_8); + ByteBuffer key = ByteBuffer.wrap(keyBytes); + ByteBuffer outValue = ByteBuffer.allocate(64); + + when(db.keyMayExist(eq(columnFamily), any(ByteBuffer.class), + any(ByteBuffer.class))).thenAnswer(invocation -> { + // Simulate JNI calls that advance the key position. + ByteBuffer keyBuffer = invocation.getArgument(1); + keyBuffer.position(keyBuffer.limit()); + return (Supplier) () -> null; + }); + + when(db.get(eq(columnFamily), any(ByteBuffer.class), any(ByteBuffer.class))) + .thenAnswer(invocation -> { + ByteBuffer keyBuffer = invocation.getArgument(1); + return keyBuffer.remaining() == keyBytes.length ? 0 : null; + }); + + Integer result = table.getIfExist(key, outValue); + assertEquals(0, result); + assertEquals(0, key.position(), "caller key buffer position must be unchanged"); + } +} + diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 2741834c9d75..4c1cf45ee3dc 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -368,8 +368,8 @@ public void testIsExist() throws Exception { RDBMetrics rdbMetrics = rdbStore.getMetrics(); assertEquals(3, rdbMetrics.getNumDBKeyMayExistChecks()); - assertEquals(0, rdbMetrics.getNumDBKeyMayExistMisses()); - assertEquals(2, rdbMetrics.getNumDBKeyGets()); + assertEquals(2, rdbMetrics.getNumDBKeyMayExistMisses()); + assertEquals(4, rdbMetrics.getNumDBKeyGets()); // Reinsert key for further testing. testTable.put(key, value); @@ -437,9 +437,8 @@ public void testGetIfExist() throws Exception { RDBMetrics rdbMetrics = rdbStore.getMetrics(); assertEquals(3, rdbMetrics.getNumDBKeyGetIfExistChecks()); - assertEquals(0, rdbMetrics.getNumDBKeyGetIfExistMisses()); - - assertEquals(0, rdbMetrics.getNumDBKeyGetIfExistGets()); + assertEquals(2, rdbMetrics.getNumDBKeyGetIfExistMisses()); + assertEquals(2, rdbMetrics.getNumDBKeyGetIfExistGets()); // Reinsert key for further testing. testTable.put(key, value); diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedBloomFilter.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedBloomFilter.java index 406716eaf84c..5000d348731b 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedBloomFilter.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedBloomFilter.java @@ -28,6 +28,16 @@ public class ManagedBloomFilter extends BloomFilter { private final UncheckedAutoCloseable leakTracker = track(this); + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + @Override public void close() { try { diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDBOptions.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDBOptions.java index 1809b0885600..80ad7888d4cf 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDBOptions.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDBOptions.java @@ -34,7 +34,6 @@ public class ManagedDBOptions extends DBOptions { private final UncheckedAutoCloseable leakTracker = track(this); private final AtomicReference loggerRef = new AtomicReference<>(); - @Override public DBOptions setLogger(Logger logger) { IOUtils.close(LOG, loggerRef.getAndSet(logger)); return super.setLogger(logger); diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java index 3401469f6824..105e51e99c52 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java @@ -19,6 +19,7 @@ import java.io.File; import java.time.Duration; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -112,18 +113,23 @@ public static ManagedRocksDB openWithLatestOptions( } /** - * Delete liveMetaDataFile from rocks db using RocksDB#deleteFile Api. - * This function makes the RocksDB#deleteFile Api synchronized by waiting - * for the deletes to happen. - * @param fileToBeDeleted File to be deleted. + * Delete the SST file range from rocks db and wait for file deletion. + * @param columnFamilyHandle column family of the target sst file. + * @param fileToBeDeleted file metadata to be deleted. * @throws RocksDatabaseException if the underlying db throws an exception * or the file is not deleted within a time limit. */ - public void deleteFile(LiveFileMetaData fileToBeDeleted) throws RocksDatabaseException { - String sstFileName = fileToBeDeleted.fileName(); + public void deleteFile( + ColumnFamilyHandle columnFamilyHandle, + LiveFileMetaData fileToBeDeleted) throws RocksDatabaseException { File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName()); + final byte[] smallestKey = fileToBeDeleted.smallestKey(); + final byte[] largestKey = fileToBeDeleted.largestKey(); try { - get().deleteFile(sstFileName); + get().deleteFilesInRanges( + columnFamilyHandle, + Arrays.asList(smallestKey, largestKey), + true); } catch (RocksDBException e) { throw new RocksDatabaseException("Failed to delete " + file, e); } diff --git a/hadoop-hdds/rocks-native/pom.xml b/hadoop-hdds/rocks-native/pom.xml index e3741a675b84..1439621af6cd 100644 --- a/hadoop-hdds/rocks-native/pom.xml +++ b/hadoop-hdds/rocks-native/pom.xml @@ -173,6 +173,7 @@ org.rocksdb rocksdbjni + ${rocksdb.version} jar false ${project.build.directory}/rocksdbjni diff --git a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch index b2627fbbb3ef..5a3d00a71c6e 100644 --- a/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch +++ b/hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch @@ -119,10 +119,11 @@ diff --git a/src.mk b/src.mk index b94bc43ca..c13e5cde6 100644 --- a/src.mk +++ b/src.mk -@@ -338,11 +338,8 @@ RANGE_TREE_SOURCES =\ +@@ -367,12 +367,8 @@ utilities/transactions/lock/range/range_tree/range_tree_lock_tracker.cc - + TOOL_LIB_SOURCES = \ +- db_stress_tool/db_stress_compression_manager.cc \ - tools/io_tracer_parser_tool.cc \ - tools/ldb_cmd.cc \ - tools/ldb_tool.cc \ @@ -130,7 +131,7 @@ index b94bc43ca..c13e5cde6 100644 - utilities/blob_db/blob_dump_tool.cc \ + tools/raw_sst_file_reader.cc \ + tools/raw_sst_file_iterator.cc \ - + ANALYZER_LIB_SOURCES = \ tools/block_cache_analyzer/block_cache_trace_analyzer.cc \ diff --git a/tools/raw_sst_file_iterator.cc b/tools/raw_sst_file_iterator.cc @@ -272,7 +273,7 @@ new file mode 100644 index 000000000..5ba8a82ee --- /dev/null +++ b/tools/raw_sst_file_reader.cc -@@ -0,0 +1,272 @@ +@@ -0,0 +1,271 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License @@ -380,9 +381,9 @@ index 000000000..5ba8a82ee + + rep_->file_.reset(new RandomAccessFileReader(std::move(file), file_path)); + -+ FilePrefetchBuffer prefetch_buffer( -+ 0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */, -+ false /* track_min_offset */); ++ FilePrefetchBuffer prefetch_buffer(ReadaheadParams(), ++ !fopts.use_mmap_reads /* enable */, ++ false /* track_min_offset */); + if (s.ok()) { + const uint64_t kSstDumpTailPrefetchSize = 512 * 1024; + uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize) @@ -391,11 +392,10 @@ index 000000000..5ba8a82ee + uint64_t prefetch_off = file_size - prefetch_size; + IOOptions opts; + s = prefetch_buffer.Prefetch(opts, rep_->file_.get(), prefetch_off, -+ static_cast(prefetch_size), -+ Env::IO_TOTAL /* rate_limiter_priority */); ++ static_cast(prefetch_size)); + -+ s = ReadFooterFromFile(opts, rep_->file_.get(), &prefetch_buffer, file_size, -+ &footer); ++ s = ReadFooterFromFile(opts, rep_->file_.get(), *fs, &prefetch_buffer, ++ file_size, &footer); + } + if (s.ok()) { + magic_number = footer.table_magic_number(); @@ -405,16 +405,16 @@ index 000000000..5ba8a82ee + if (magic_number == kPlainTableMagicNumber || + magic_number == kLegacyPlainTableMagicNumber) { + rep_->soptions_.use_mmap_reads = true; ++ fopts = rep_->soptions_; + + fs->NewRandomAccessFile(file_path, fopts, &file, nullptr); + rep_->file_.reset(new RandomAccessFileReader(std::move(file), file_path)); + } + + s = ROCKSDB_NAMESPACE::ReadTableProperties( -+ rep_->file_.get(), file_size, magic_number, rep_->ioptions_, &(rep_->table_properties_), -+ /* memory_allocator= */ nullptr, (magic_number == kBlockBasedTableMagicNumber) -+ ? &prefetch_buffer -+ : nullptr); ++ rep_->file_.get(), file_size, magic_number, rep_->ioptions_, rep_->read_options_, ++ &(rep_->table_properties_), /* memory_allocator= */ nullptr, ++ (magic_number == kBlockBasedTableMagicNumber) ? &prefetch_buffer : nullptr); + // For old sst format, ReadTableProperties might fail but file can be read + if (s.ok()) { + s = SetTableOptionsByMagicNumber(magic_number); @@ -448,9 +448,10 @@ index 000000000..5ba8a82ee + +Status RawSstFileReader::NewTableReader(uint64_t file_size) { + auto t_opt = -+ TableReaderOptions(rep_->ioptions_, rep_->moptions_.prefix_extractor, rep_->soptions_, -+ rep_->internal_comparator_, false /* skip_filters */, -+ false /* imortal */, true /* force_direct_prefetch */); ++ TableReaderOptions(rep_->ioptions_, rep_->moptions_.prefix_extractor, ++ rep_->moptions_.compression_manager.get(), rep_->soptions_, ++ rep_->internal_comparator_, 0 /* block_protection_bytes_per_key */, ++ false /* skip_filters */, false /* immortal */, true /* force_direct_prefetch */); + // Allow open file with global sequence number for backward compatibility. + t_opt.largest_seqno = kMaxSequenceNumber; + diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java index 9c1fb6b0a060..53e540d89e9d 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java @@ -984,11 +984,14 @@ void testDifferWithDB() throws Exception { // Confirm correct links created try (Stream sstPathStream = Files.list(sstBackUpDir.toPath())) { - List expectedLinks = sstPathStream.map(Path::getFileName) + List actualLinks = sstPathStream.map(Path::getFileName) .map(Object::toString).sorted().collect(Collectors.toList()); - assertEquals(expectedLinks, asList( - "000017.sst", "000019.sst", "000021.sst", "000023.sst", - "000024.sst", "000026.sst", "000029.sst")); + assertThat(actualLinks).hasSize(7); + assertThat(actualLinks).allMatch(link -> link.matches("\\d{6}\\.sst")); + for (String linkName : actualLinks) { + assertTrue(Files.size(sstBackUpDir.toPath().resolve(linkName)) > 0, + "SST link should not be empty: " + linkName); + } } rocksDBCheckpointDiffer.getForwardCompactionDAG().nodes().stream().forEach(compactionNode -> { Assertions.assertNotNull(compactionNode.getStartKey()); @@ -1013,22 +1016,6 @@ private static List getColumnFamilyDescriptors() { void diffAllSnapshots(RocksDBCheckpointDiffer differ) throws IOException { final DifferSnapshotInfo src = snapshots.get(snapshots.size() - 1); - - // Hard-coded expected output. - // The results are deterministic. Retrieved from a successful run. - final List> expectedDifferResult = asList( - asList("000023", "000029", "000026", "000019", "000021", "000031"), - asList("000023", "000029", "000026", "000021", "000031"), - asList("000023", "000029", "000026", "000031"), - asList("000029", "000026", "000031"), - asList("000029", "000031"), - Collections.singletonList("000031"), - Collections.emptyList() - ); - assertEquals(snapshots.size(), expectedDifferResult.size()); - - int index = 0; - List expectedDiffFiles = new ArrayList<>(); for (DifferSnapshotInfo snap : snapshots) { // Returns a list of SST files to be fed into RocksCheckpointDiffer Dag. List tablesToTrack = new ArrayList<>(COLUMN_FAMILIES_TO_TRACK_IN_DAG); @@ -1037,24 +1024,12 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ) Set tableToLookUp = new HashSet<>(); for (int i = 0; i < Math.pow(2, tablesToTrack.size()); i++) { tableToLookUp.clear(); - expectedDiffFiles.clear(); int mask = i; while (mask != 0) { int firstSetBitIndex = Integer.numberOfTrailingZeros(mask); tableToLookUp.add(tablesToTrack.get(firstSetBitIndex)); mask &= mask - 1; } - for (String diffFile : expectedDifferResult.get(index)) { - String columnFamily; - if (rocksDBCheckpointDiffer.getCompactionNodeMap().containsKey(diffFile)) { - columnFamily = rocksDBCheckpointDiffer.getCompactionNodeMap().get(diffFile).getColumnFamily(); - } else { - columnFamily = src.getSstFile(0, diffFile).getColumnFamily(); - } - if (columnFamily == null || tableToLookUp.contains(columnFamily)) { - expectedDiffFiles.add(diffFile); - } - } DifferSnapshotVersion srcSnapVersion = new DifferSnapshotVersion(src, 0, tableToLookUp); DifferSnapshotVersion destSnapVersion = new DifferSnapshotVersion(snap, 0, tableToLookUp); List sstDiffList = differ.getSSTDiffList(srcSnapVersion, destSnapVersion, null, @@ -1062,11 +1037,14 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ) LOG.info("SST diff list from '{}' to '{}': {} tables: {}", src.getDbPath(0), snap.getDbPath(0), sstDiffList, tableToLookUp); - assertEquals(expectedDiffFiles, sstDiffList.stream().map(SstFileInfo::getFileName) - .collect(Collectors.toList())); + if (!tableToLookUp.isEmpty()) { + for (SstFileInfo sstFileInfo : sstDiffList) { + assertTrue(sstFileInfo.getColumnFamily() == null + || tableToLookUp.contains(sstFileInfo.getColumnFamily()), + "Unexpected column family in diff result: " + sstFileInfo); + } + } } - - ++index; } } diff --git a/hadoop-ozone/dist/src/main/compose/common/replicas-test.sh b/hadoop-ozone/dist/src/main/compose/common/replicas-test.sh index f69b4b23f1a2..31cc0c1f9b6e 100755 --- a/hadoop-ozone/dist/src/main/compose/common/replicas-test.sh +++ b/hadoop-ozone/dist/src/main/compose/common/replicas-test.sh @@ -21,14 +21,49 @@ volume="cli-debug-volume${prefix}" bucket="cli-debug-bucket" key="testfile" -dn_container="ozonesecure-ha-datanode1-1" container_db_path="/data/hdds/hdds/" -local_db_backup_path="${COMPOSE_DIR}/container_db_backup" +local_db_backup_path="${COMPOSE_DIR}/container_db_backup_${prefix}" +backup_manifest="${local_db_backup_path}/container_db_paths.tsv" mkdir -p "${local_db_backup_path}" -echo "Taking a backup of container.db" -docker exec "${dn_container}" find "${container_db_path}" -name "container.db" | while read -r db; do - docker cp "${dn_container}:${db}" "${local_db_backup_path}/container.db" +echo "Taking backups of existing container.db directories" +datanodes=$(docker ps --format '{{.Names}}' | grep '^ozonesecure-ha-datanode[0-9]\+-1$' | sort) +if [ -z "${datanodes}" ]; then + echo "Failed to find datanode containers" >&2 + exit 1 +fi + +>"${backup_manifest}" +for dn_container in ${datanodes}; do + while read -r db; do + printf '%s\t%s\n' "${dn_container}" "${db}" >> "${backup_manifest}" + done < <(docker exec "${dn_container}" find "${container_db_path}" -name "container.db") +done + +echo "Stopping datanodes for a consistent container.db backup" +for dn_container in ${datanodes}; do + if [ "$(docker inspect -f '{{.State.Running}}' "${dn_container}" 2>/dev/null)" = "true" ]; then + docker stop "${dn_container}" >/dev/null + else + echo "${dn_container} is already stopped before backup" + fi +done + +while IFS=$'\t' read -r dn_container db; do + backup_path="${local_db_backup_path}/${dn_container}${db}" + mkdir -p "$(dirname "${backup_path}")" + docker cp "${dn_container}:${db}" "${backup_path}" +done < "${backup_manifest}" + +echo "Restarting datanodes after backup" +for dn_container in ${datanodes}; do + if [ "$(docker inspect -f '{{.State.Running}}' "${dn_container}" 2>/dev/null)" != "true" ]; then + docker start "${dn_container}" >/dev/null + fi +done + +for dn_container in ${datanodes}; do + wait_for_datanode "${dn_container}" HEALTHY 60 done execute_robot_test ${SCM} -v "PREFIX:${prefix}" debug/ozone-debug-tests.robot @@ -40,29 +75,111 @@ host="$(jq -r '.keyLocations[0][0].datanode["hostname"]' ${chunkinfo})" container="${host%%.*}" dn_with_num="$(sed -E 's/^.*-(datanode[0-9]+)-[0-9]+$/\1/' <<< "$container")" -# corrupt the first block of key on one of the datanodes datafile="$(jq -r '.keyLocations[0][0].file' ${chunkinfo})" +container_id="$(jq -r '.keyLocations[0][0].blockData.blockID.containerID' ${chunkinfo})" +local_block_id="$(jq -r '.keyLocations[0][0].blockData.blockID.localID // .keyLocations[0][0].blockData.blockID.localId' ${chunkinfo})" +pipeline_id="$(docker-compose exec -T ${SCM} bash -c \ + "ozone admin container info ${container_id} --json | jq -r '.writePipelineID.id // .writePipelineId.id'")" +if [ -z "${pipeline_id}" ] || [ "${pipeline_id}" = "null" ]; then + echo "Failed to determine write pipeline for container ${container_id}" >&2 + exit 1 +fi +if [ -z "${local_block_id}" ] || [ "${local_block_id}" = "null" ]; then + echo "Failed to determine local block ID for container ${container_id}" >&2 + exit 1 +fi + +# corrupt the first block of key on one of the datanodes docker exec "${container}" sed -i -e '1s/^/a/' "${datafile}" execute_robot_test ${SCM} -v "PREFIX:${prefix}" -v "CORRUPT_DATANODE:${host}" debug/corrupt-block-checksum.robot -echo "Overwriting container.db with the backup db" -target_container_dir=$(docker exec "${container}" find "${container_db_path}" -name "container.db" | xargs dirname) -docker cp "${local_db_backup_path}/container.db" "${container}:${target_container_dir}/" -docker exec "${container}" sudo chown -R hadoop:hadoop "${target_container_dir}" +target_container_db=$(docker exec "${container}" bash -c " + datafile=\$1 + dir=\$(dirname \"\$datafile\") + while [ \"\$dir\" != '/' ]; do + if [[ \$(basename \"\$dir\") == CID-* ]]; then + container_db=\$(find \"\$dir\" -path '*/container.db' | head -n 1) + if [ -n \"\$container_db\" ]; then + echo \"\$container_db\" + exit 0 + fi + exit 1 + fi + dir=\$(dirname \"\$dir\") + done + exit 1 +" _ "${datafile}") +if [ -z "${target_container_db}" ]; then + echo "Failed to locate container.db for ${datafile} on ${container}" >&2 + exit 1 +fi +backup_container_db="${local_db_backup_path}/${container}${target_container_db}" +if [ ! -e "${backup_container_db}" ]; then + echo "No pre-key backup found for ${target_container_db} on ${container}; creating rollback copy by deleting block metadata" + + docker stop "${container}" + + wait_for_datanode "${container}" STALE 60 + + mkdir -p "$(dirname "${backup_container_db}")" + docker cp "${container}:${target_container_db}" "${backup_container_db}" + + container_image="$(docker inspect -f '{{.Config.Image}}' "${container}")" + docker run --rm \ + -v "${local_db_backup_path}:${local_db_backup_path}" \ + --entrypoint bash "${container_image}" -c ' + set -euo pipefail + backup_container_db="$1" + local_block_id="$2" + + ldb --db="${backup_container_db}" --column_family=block_data delete "${local_block_id}" + ' _ "${backup_container_db}" "${local_block_id}" || exit 1 + + docker start "${container}" + + wait_for_datanode "${container}" HEALTHY 60 +fi docker stop "${container}" wait_for_datanode "${container}" STALE 60 + execute_robot_test ${SCM} -v "PREFIX:${prefix}" -v "STALE_DATANODE:${host}" debug/stale-datanode-checksum.robot docker start "${container}" wait_for_datanode "${container}" HEALTHY 60 -execute_robot_test ${SCM} -v "PREFIX:${prefix}" -v "DATANODE:${host}" debug/block-existence-check.robot - execute_robot_test ${SCM} -v "PREFIX:${prefix}" -v "DATANODE:${host}" -v "FAULT_INJ_DATANODE:${dn_with_num}" debug/container-state-verifier.robot execute_robot_test ${OM} kinit.robot execute_robot_test ${OM} -v "PREFIX:${prefix}" debug/ozone-debug-tests-ec3-2.robot + +echo "Overwriting container.db with the backup db" +echo "Restoring backup at ${target_container_db} on ${container}" +echo "Removing dn.ratis state for pipeline ${pipeline_id} on ${container}" +docker stop "${container}" + +wait_for_datanode "${container}" STALE 60 + +container_image="$(docker inspect -f '{{.Config.Image}}' "${container}")" +docker run --rm --volumes-from "${container}" \ + -v "${local_db_backup_path}:${local_db_backup_path}:ro" \ + --entrypoint bash "${container_image}" -c ' + set -euo pipefail + target_container_db="$1" + pipeline_id="$2" + backup_container_db="$3" + + rm -rf "${target_container_db}" "/data/metadata/dn.ratis/${pipeline_id}" + mkdir -p "$(dirname "${target_container_db}")" + cp -a "${backup_container_db}" "${target_container_db}" + chown -R hadoop:hadoop "${target_container_db}" + ' _ "${target_container_db}" "${pipeline_id}" "${backup_container_db}" || exit 1 + +docker start "${container}" + +wait_for_datanode "${container}" HEALTHY 60 + +execute_robot_test ${SCM} -v "PREFIX:${prefix}" -v "DATANODE:${host}" debug/block-existence-check.robot diff --git a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-keywords.robot b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-keywords.robot index aa51febb318e..9114239d8784 100644 --- a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-keywords.robot +++ b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-keywords.robot @@ -63,13 +63,26 @@ Check Container State Replicas ${checks} = Get From Dictionary ${replica} checks ${check} = Get From List ${checks} 0 Should Be Equal ${check['type']} containerState - Should Be Equal ${check['pass']} ${False} - ${actual_message} = Set Variable ${check['failures'][0]['message']} - - Run Keyword If '${hostname}' == '${faulty_datanode}' Should Contain ${actual_message} ${expected_message} - ... ELSE Should Match Regexp ${actual_message} Replica state is (OPEN|CLOSING|QUASI_CLOSED|CLOSED) + Run Keyword If '${hostname}' == '${faulty_datanode}' Check Replica Failed ${replica} containerState ${expected_message} + ... ELSE Check Healthy Replica Container State ${replica} END +Check Healthy Replica Container State + [Arguments] ${replica} + ${checks} = Get From Dictionary ${replica} checks + ${check} = Get From List ${checks} 0 + Should Be Equal ${check['type']} containerState + Run Keyword If ${check['pass']} Check Replica Passed ${replica} containerState + ... ELSE Check Replica Failed Container State ${replica} + +Check Replica Failed Container State + [Arguments] ${replica} + ${checks} = Get From Dictionary ${replica} checks + ${check} = Get From List ${checks} 0 + Should Be Equal ${check['type']} containerState + Should Be Equal ${check['pass']} ${False} + Should Match Regexp ${check['failures'][0]['message']} Replica state is (OPEN|CLOSING|QUASI_CLOSED|CLOSED) + Check Replica Failed [Arguments] ${replica} ${check_type} ${expected_message} ${checks} = Get From Dictionary ${replica} checks diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 520d9d6ef9e4..5cf9500cd35f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -966,9 +966,22 @@ private CheckedFunction getPreviousSnapshotOzone OmBucketInfo bucketInfo, long objectId, String currentKeyPath, Function> table) throws IOException { String renameKey = metadataManager.getRenameKey(bucketInfo.getVolumeName(), bucketInfo.getBucketName(), objectId); - String renamedKey = metadataManager.getSnapshotRenamedTable().getIfExist(renameKey); - return (previousSnapshotKM) -> table.apply(previousSnapshotKM).get( - renamedKey != null ? renamedKey : currentKeyPath); + Table renamedTable = metadataManager.getSnapshotRenamedTable(); + String renamedKey = renamedTable.getIfExist(renameKey); + if (renamedKey == null) { + // Snapshot reads should not rely solely on table cache. + renamedKey = renamedTable.getSkipCache(renameKey); + } + final String resolvedRenamedKey = renamedKey; + return (previousSnapshotKM) -> { + Table previousTable = table.apply(previousSnapshotKM); + String lookupKey = resolvedRenamedKey != null ? resolvedRenamedKey : currentKeyPath; + T value = previousTable.get(lookupKey); + if (value == null) { + value = previousTable.getSkipCache(lookupKey); + } + return value; + }; } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/filter/ReclaimableRenameEntryFilter.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/filter/ReclaimableRenameEntryFilter.java index cad4ebc6df5d..5a4796fa805a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/filter/ReclaimableRenameEntryFilter.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/filter/ReclaimableRenameEntryFilter.java @@ -83,6 +83,11 @@ private final boolean isRenameEntryReclaimable(Table.KeyValue re if (previousTable != null) { String prevDbKey = renameEntry.getValue(); WithObjectID withObjectID = previousTable.getIfExist(prevDbKey); + if (withObjectID == null) { + // Snapshot reclaim logic must be correctness-first. If table cache + // says "not found", verify directly from RocksDB before reclaiming. + withObjectID = previousTable.getSkipCache(prevDbKey); + } if (withObjectID != null) { return false; } diff --git a/pom.xml b/pom.xml index 6182113b938b..d9f94218efe9 100644 --- a/pom.xml +++ b/pom.xml @@ -195,7 +195,7 @@ 0.10.2 1.2.26 3.0.0 - 7.7.3 + 10.10.1.1 3.1.0 bash 2.0.17