diff --git a/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java b/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java index c2cfb2b90106..dda42c90c482 100644 --- a/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java +++ b/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java @@ -128,7 +128,7 @@ final void putShort(int pos, short value) final void putShortVolatile(int pos, short value) { - getChunk(pos).putShort(inChunkPointer(pos), value); + getChunk(pos).putShortVolatile(inChunkPointer(pos), value); } final void putByte(int pos, byte value) diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableIterator.java index a320684dd833..a7a0881f7c9b 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableIterator.java @@ -97,6 +97,7 @@ protected AbstractSSTableIterator(SSTableReader sstable, } else { + Reader reader = null; boolean shouldCloseFile = file == null; try { @@ -119,15 +120,18 @@ protected AbstractSSTableIterator(SSTableReader sstable, // Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow // (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format). - this.reader = createReader(indexEntry, file, shouldCloseFile); + reader = createReader(indexEntry, file, shouldCloseFile); this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics); } else { this.partitionLevelDeletion = indexEntry.deletionTime(); this.staticRow = Rows.EMPTY_STATIC_ROW; - this.reader = createReader(indexEntry, file, shouldCloseFile); + reader = createReader(indexEntry, file, shouldCloseFile); } + + this.reader = reader; + if (!partitionLevelDeletion.validate()) UnfilteredValidation.handleInvalid(metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString()); @@ -140,6 +144,19 @@ protected AbstractSSTableIterator(SSTableReader sstable, catch (IOException e) { sstable.markSuspect(); + + if (reader != null) + { + try + { + reader.close(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + } + String filePath = file.getPath(); if (shouldCloseFile) { diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java index bf28a83a3720..b0b7b2455c2a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java @@ -330,13 +330,16 @@ protected SequentialWriter openDataWriter() { checkState(!dataWriterOpened, "Data writer has been already opened."); - return DataComponent.buildWriter(descriptor, - getTableMetadataRef().getLocal(), - getIOOptions().writerOptions, - getMetadataCollector(), - ensuringInBuildInternalContext(operationType), - getIOOptions().flushCompression, - getCompressionDictionaryManager()); + SequentialWriter sequentialWriter = DataComponent.buildWriter(descriptor, + getTableMetadataRef().getLocal(), + getIOOptions().writerOptions, + getMetadataCollector(), + ensuringInBuildInternalContext(operationType), + getIOOptions().flushCompression, + getCompressionDictionaryManager()); + dataWriterOpened = true; + + return sequentialWriter; } @Override diff --git a/src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java b/src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java index 27c199a68523..509d83a6b959 100644 --- a/src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java +++ b/src/java/org/apache/cassandra/io/tries/ReverseValueIterator.java @@ -94,39 +94,55 @@ void initializeWithRightBound(long root, ByteSource endStream, boolean admitPref int limitByte; reportingPrefixes = admitPrefix; - // Follow end position while we still have a prefix, stacking path. - go(root); - while (true) + try { - int s = endStream.next(); - childIndex = search(s); - - limitByte = NOT_AT_LIMIT; - if (atLimit) + // Follow end position while we still have a prefix, stacking path. + go(root); + while (true) { - limitByte = limit.next(); - if (s > limitByte) - atLimit = false; + int s = endStream.next(); + childIndex = search(s); + + limitByte = NOT_AT_LIMIT; + if (atLimit) + { + limitByte = limit.next(); + if (s > limitByte) + atLimit = false; + } + if (childIndex < 0) + break; + + prev = new IterationPosition(position, childIndex, limitByte, prev); + go(transition(childIndex)); // childIndex is positive, this transition must exist } - if (childIndex < 0) - break; - prev = new IterationPosition(position, childIndex, limitByte, prev); - go(transition(childIndex)); // childIndex is positive, this transition must exist + // Advancing now gives us first match. + childIndex = -1 - childIndex; + stack = new IterationPosition(position, childIndex, limitByte, prev); + next = advanceNode(); + } + catch (Throwable t) + { + super.close(); + throw t; } - - // Advancing now gives us first match. - childIndex = -1 - childIndex; - stack = new IterationPosition(position, childIndex, limitByte, prev); - next = advanceNode(); } private void initializeNoRightBound(long root, int limitByte, boolean admitPrefix) { - go(root); - stack = new IterationPosition(root, -1 - search(256), limitByte, null); - next = advanceNode(); - reportingPrefixes = admitPrefix; + try + { + go(root); + stack = new IterationPosition(root, -1 - search(256), limitByte, null); + next = advanceNode(); + reportingPrefixes = admitPrefix; + } + catch (Throwable t) + { + super.close(); + throw t; + } } diff --git a/src/java/org/apache/cassandra/io/tries/Walker.java b/src/java/org/apache/cassandra/io/tries/Walker.java index 2f91ab23af14..fa943309931d 100644 --- a/src/java/org/apache/cassandra/io/tries/Walker.java +++ b/src/java/org/apache/cassandra/io/tries/Walker.java @@ -46,7 +46,7 @@ public class Walker> implements AutoCloseable { /** Value used to indicate a branch (e.g. lesser/greaterBranch) does not exist. */ - public static int NONE = TrieNode.NONE; + public static final int NONE = TrieNode.NONE; private final Rebufferer source; protected final long root; diff --git a/test/unit/org/apache/cassandra/io/tries/ReverseValueIteratorLeakTest.java b/test/unit/org/apache/cassandra/io/tries/ReverseValueIteratorLeakTest.java new file mode 100644 index 000000000000..41c580c6a068 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/tries/ReverseValueIteratorLeakTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.tries; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.Rebufferer; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Reproducer: ReverseValueIterator leaks Rebufferer on constructor exception + * + *

If initializeWithRightBound() or initializeNoRightBound() throws after the Walker + * super-constructor acquires a Rebufferer, the Rebufferer is never closed. Compare with + * ValueIterator which wraps initialization in try-catch with super.close(). + * + * Expected: Rebufferer.closeReader() is called when initialization throws. + * Actual (buggy): Rebufferer leaks — closeReader() never called. + * Failure criterion: closeReader() not called after constructor exception. + */ +@SuppressWarnings({"unchecked", "RedundantSuppression"}) +public class ReverseValueIteratorLeakTest extends AbstractTrieTestBase +{ + // ORACLE: Rebufferer.closeReader() must be called if constructor throws + @Test + public void testRebuffererClosedOnInitWithRightBoundFailure() throws IOException + { + // HARNESS: build a valid trie, wrap its Rebufferer with close tracking, + // then trigger a failure during initializeWithRightBound + DataOutputBuffer buf = new DataOutputBufferPaged(); + IncrementalTrieWriter builder = newTrieWriter(serializer, buf); + builder.add(source("aaa"), 1); + builder.add(source("bbb"), 2); + long root = builder.complete(); + + AtomicBoolean closeReaderCalled = new AtomicBoolean(false); + Rebufferer trackingSource = new TrackingRebufferer(buf.asNewBuffer(), closeReaderCalled); + + // TRIGGER: provide a ByteComparable whose ByteSource throws during iteration, + // causing initializeWithRightBound to fail after Walker acquires the Rebufferer + ByteComparable throwingEnd = v -> new ByteSource() + { + private int calls = 0; + + @Override + public int next() + { + if (++calls > 1) + throw new RuntimeException("simulated failure during trie traversal"); + return 'b'; + } + }; + + try + { + new ReverseValueIterator<>(trackingSource, root, source("aaa"), throwingEnd, false); + fail("Constructor should have thrown"); + } + catch (RuntimeException e) + { + assertTrue("Expected simulated failure", e.getMessage().contains("simulated failure")); + } + + // ORACLE: Rebufferer must have been closed despite the constructor failure + assertTrue("Rebufferer.closeReader() must be called when ReverseValueIterator " + + "constructor fails during initializeWithRightBound — without the fix, " + + "the Rebufferer leaks", + closeReaderCalled.get()); + } + + private static class TrackingRebufferer extends ByteBufRebufferer + { + private final AtomicBoolean closeReaderCalled; + + TrackingRebufferer(ByteBuffer buffer, AtomicBoolean closeReaderCalled) + { + super(buffer); + this.closeReaderCalled = closeReaderCalled; + } + + @Override + public void closeReader() + { + closeReaderCalled.set(true); + } + } +}