Skip to content

Commit ef6f3f0

Browse files
Wire compaction_read_disk_access_mode through cursor-based compaction
This change wires DiskAccessMode through cursor-based compaction, enabling direct I/O reads for cursor compaction to match the existing support in the iterator-based compaction path. Key changes: - Thread DiskAccessMode from DatabaseDescriptor.getCompactionReadDiskAccessMode() through CursorCompactor, StatefulCursor, and SSTableCursorReader - Consolidate SSTableReader's openDataReader/openDataReaderForScan variants into a unified openDataReaderInternal with canReuseDfile guard - Fix DirectThreadLocalReadAheadBuffer.cleanBuffer() to clean the backing buffer rather than the aligned slice
1 parent 816524a commit ef6f3f0

10 files changed

Lines changed: 176 additions & 34 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Wire compaction_read_disk_access_mode through cursor-based compaction (CASSANDRA-21147)
23
* Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141)
34
* Direct I/O support for compaction reads (CASSANDRA-19987)
45
* Support custom StartupCheck implementations via SPI (CASSANDRA-21093)

src/java/org/apache/cassandra/db/compaction/CursorCompactor.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import java.util.function.LongPredicate;
2828

2929
import com.google.common.collect.ImmutableSet;
30-
import com.google.common.collect.UnmodifiableIterator;
3130

3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
3433

34+
import org.apache.cassandra.config.Config.DiskAccessMode;
3535
import org.apache.cassandra.config.DatabaseDescriptor;
3636
import org.apache.cassandra.db.AbstractCompactionController;
3737
import org.apache.cassandra.db.ClusteringComparator;
@@ -70,6 +70,7 @@
7070
import org.apache.cassandra.schema.CompactionParams;
7171
import org.apache.cassandra.schema.SchemaConstants;
7272
import org.apache.cassandra.schema.TableMetadata;
73+
import org.apache.cassandra.utils.Throwables;
7374
import org.apache.cassandra.utils.TimeUUID;
7475

7576
import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
@@ -294,15 +295,8 @@ private CursorCompactor(OperationType type,
294295
* {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)}
295296
*/
296297

297-
// Convert Readers to Cursors
298-
this.sstableCursors = new StatefulCursor[sstables.size()];
298+
this.sstableCursors = convertScannersToCursors(scanners, sstables, DatabaseDescriptor.getCompactionReadDiskAccessMode());
299299
this.sstableCursorsEqualsNext = new boolean[sstables.size()];
300-
UnmodifiableIterator<SSTableReader> iterator = sstables.iterator();
301-
for (int i = 0; i < this.sstableCursors.length; i++)
302-
{
303-
SSTableReader ssTableReader = iterator.next();
304-
this.sstableCursors[i] = new StatefulCursor(ssTableReader);
305-
}
306300
this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness();
307301

308302
purger = new Purger(type, controller, nowInSec);
@@ -1553,6 +1547,33 @@ private static String mergeHistogramToString(long[] histogram)
15531547
return sb.toString();
15541548
}
15551549

1550+
/**
1551+
* Closes scanner-opened readers before opening cursor-specific readers with the configured disk access mode.
1552+
* In cursor compaction, scanners are only used for metadata; closing them avoids holding redundant file
1553+
* descriptors and prevents conflicts when scan and non-scan readers for the same file share thread-local
1554+
* buffer state on the same thread.
1555+
*/
1556+
private static StatefulCursor[] convertScannersToCursors(List<ISSTableScanner> scanners, ImmutableSet<SSTableReader> sstables,
1557+
DiskAccessMode diskAccessMode)
1558+
{
1559+
for (ISSTableScanner scanner : scanners)
1560+
scanner.close();
1561+
1562+
StatefulCursor[] cursors = new StatefulCursor[sstables.size()];
1563+
int i = 0;
1564+
try
1565+
{
1566+
for (SSTableReader reader : sstables)
1567+
cursors[i++] = new StatefulCursor(reader, diskAccessMode);
1568+
return cursors;
1569+
}
1570+
catch (RuntimeException | Error e)
1571+
{
1572+
Throwables.closeNonNullAndAddSuppressed(e, cursors);
1573+
throw e;
1574+
}
1575+
}
1576+
15561577
public void close()
15571578
{
15581579
try

src/java/org/apache/cassandra/db/compaction/StatefulCursor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.cassandra.db.compaction;
2020

2121
import org.apache.cassandra.config.Config;
22+
import org.apache.cassandra.config.Config.DiskAccessMode;
2223
import org.apache.cassandra.config.DatabaseDescriptor;
2324
import org.apache.cassandra.db.DecoratedKey;
2425
import org.apache.cassandra.db.ReusableLivenessInfo;
@@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader
5556

5657
private boolean isOpenRangeTombstonePresent = false;
5758

58-
public StatefulCursor(SSTableReader reader)
59+
public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode)
5960
{
60-
super(reader);
61+
super(reader, diskAccessMode);
6162
currPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
6263
prevPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
6364
unfiltered = new UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new));

src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.google.common.collect.ImmutableList;
2424

25+
import org.apache.cassandra.config.Config.DiskAccessMode;
2526
import org.apache.cassandra.db.ClusteringPrefix;
2627
import org.apache.cassandra.db.Columns;
2728
import org.apache.cassandra.db.DeletionTime;
@@ -197,15 +198,20 @@ public static SSTableCursorReader fromDescriptor(Descriptor desc) throws IOExcep
197198
{
198199
TableMetadata metadata = Util.metadataFromSSTable(desc);
199200
SSTableReader reader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata));
200-
return new SSTableCursorReader(reader, metadata, reader.ref());
201+
return new SSTableCursorReader(reader, metadata, reader.ref(), null);
201202
}
202203

203204
public SSTableCursorReader(SSTableReader reader)
204205
{
205-
this(reader, reader.metadata(), null);
206+
this(reader, reader.metadata(), null, null);
206207
}
207208

208-
private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SSTableReader> readerRef)
209+
public SSTableCursorReader(SSTableReader reader, DiskAccessMode diskAccessMode)
210+
{
211+
this(reader, reader.metadata(), null, diskAccessMode);
212+
}
213+
214+
private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SSTableReader> readerRef, DiskAccessMode diskAccessMode)
209215
{
210216
ssTableReader = reader;
211217
ssTableReaderRef = readerRef;
@@ -221,7 +227,7 @@ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SS
221227
deserializationHelper = new DeserializationHelper(metadata, version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, null);
222228
serializationHeader = reader.header;
223229

224-
dataReader = reader.openDataReader();
230+
dataReader = reader.openDataReader(diskAccessMode);
225231
hasStaticColumns = metadata.hasStaticColumns();
226232
}
227233

src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.concurrent.locks.ReentrantReadWriteLock;
4040

41+
import javax.annotation.Nullable;
42+
4143
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
4244
import com.clearspring.analytics.stream.cardinality.ICardinality;
4345
import com.google.common.annotations.VisibleForTesting;
@@ -1417,44 +1419,60 @@ public StatsMetadata getSSTableMetadata()
14171419
return sstableMetadata;
14181420
}
14191421

1422+
public RandomAccessReader openDataReader()
1423+
{
1424+
return openDataReaderInternal(null, null, false);
1425+
}
1426+
14201427
public RandomAccessReader openDataReader(RateLimiter limiter)
14211428
{
14221429
assert limiter != null;
1423-
return dfile.createReader(limiter);
1430+
return openDataReaderInternal(null, limiter, false);
14241431
}
14251432

1426-
public RandomAccessReader openDataReader()
1433+
public RandomAccessReader openDataReader(DiskAccessMode diskAccessMode)
14271434
{
1428-
return dfile.createReader();
1435+
return openDataReaderInternal(diskAccessMode, null, false);
14291436
}
14301437

14311438
public RandomAccessReader openDataReaderForScan()
14321439
{
1433-
return openDataReaderForScan(dfile.diskAccessMode());
1440+
return openDataReaderInternal(null, null, true);
14341441
}
14351442

14361443
public RandomAccessReader openDataReaderForScan(DiskAccessMode diskAccessMode)
14371444
{
1438-
boolean isSameDiskAccessMode = diskAccessMode == dfile.diskAccessMode();
1439-
boolean isDirectIONotSupported = diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO();
1445+
return openDataReaderInternal(diskAccessMode, null, true);
1446+
}
14401447

1441-
if (isSameDiskAccessMode || isDirectIONotSupported)
1442-
return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
1448+
private RandomAccessReader openDataReaderInternal(@Nullable DiskAccessMode diskAccessMode,
1449+
@Nullable RateLimiter limiter,
1450+
boolean forScan)
1451+
{
1452+
if (canReuseDfile(diskAccessMode))
1453+
return dfile.createReader(limiter, forScan, OnReaderClose.RETAIN_FILE_OPEN);
14431454

1444-
FileHandle dataFile = dfile.toBuilder()
1445-
.withDiskAccessMode(diskAccessMode)
1446-
.complete();
1455+
FileHandle handle = dfile.toBuilder()
1456+
.withDiskAccessMode(diskAccessMode)
1457+
.complete();
14471458
try
14481459
{
1449-
return dataFile.createReaderForScan(OnReaderClose.CLOSE_FILE);
1460+
return handle.createReader(limiter, forScan, OnReaderClose.CLOSE_FILE);
14501461
}
14511462
catch (Throwable t)
14521463
{
1453-
dataFile.close();
1464+
handle.close();
14541465
throw t;
14551466
}
14561467
}
14571468

1469+
private boolean canReuseDfile(@Nullable DiskAccessMode diskAccessMode)
1470+
{
1471+
return diskAccessMode == null
1472+
|| diskAccessMode == dfile.diskAccessMode()
1473+
|| (diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO());
1474+
}
1475+
14581476
public void trySkipFileCacheBefore(DecoratedKey key)
14591477
{
14601478
long position = getPosition(key, SSTableReader.Operator.GE);

src/java/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBuffer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.agrona.BufferUtil;
2525

2626
import org.apache.cassandra.io.sstable.CorruptSSTableException;
27+
import org.apache.cassandra.utils.memory.MemoryUtil;
28+
29+
import sun.nio.ch.DirectBuffer;
2730

2831
public final class DirectThreadLocalReadAheadBuffer extends ThreadLocalReadAheadBuffer
2932
{
@@ -46,4 +49,14 @@ protected void loadBlock(ByteBuffer blockBuffer, long blockPosition, int sizeToR
4649
if (channel.read(blockBuffer, blockPosition) < sizeToRead)
4750
throw new CorruptSSTableException(null, channel.filePath());
4851
}
52+
53+
@Override
54+
protected void cleanBuffer(ByteBuffer buffer)
55+
{
56+
// Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment)
57+
DirectBuffer db = (DirectBuffer) buffer;
58+
ByteBuffer attachment = (ByteBuffer) db.attachment();
59+
MemoryUtil.clean(attachment != null ? attachment : buffer);
60+
}
61+
4962
}

src/java/org/apache/cassandra/io/util/FileHandle.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,6 @@ public RandomAccessReader createReader()
196196
return createReader(null);
197197
}
198198

199-
public RandomAccessReader createReaderForScan(OnReaderClose onReaderClose)
200-
{
201-
return createReader(null, true, onReaderClose);
202-
}
203-
204199
/**
205200
* Create {@link RandomAccessReader} with configured method of reading content of the file.
206201
* Reading from file will be rate limited by given {@link RateLimiter}.

src/java/org/apache/cassandra/io/util/ThreadLocalReadAheadBuffer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public ThreadLocalReadAheadBuffer(ChannelProxy channel, int bufferSize, BufferTy
6262

6363
public ThreadLocalReadAheadBuffer(ChannelProxy channel, Supplier<ByteBuffer> bufferSupplier)
6464
{
65+
if (blockMap.get().containsKey(channel.filePath()))
66+
throw new IllegalStateException("ThreadLocalReadAheadBuffer already exists for " + channel.filePath() + "; previous instance must be closed first");
6567
this.channel = channel;
6668
this.channelSize = channel.size();
6769
this.bufferSupplier = bufferSupplier;
@@ -156,11 +158,16 @@ public void clear(boolean deallocate)
156158
blockBuffer.clear();
157159
if (deallocate)
158160
{
159-
MemoryUtil.clean(blockBuffer);
161+
cleanBuffer(blockBuffer);
160162
block.buffer = null;
161163
}
162164
}
163165

166+
protected void cleanBuffer(ByteBuffer buffer)
167+
{
168+
MemoryUtil.clean(buffer);
169+
}
170+
164171
@Override
165172
public void close()
166173
{

test/unit/org/apache/cassandra/db/compaction/simple/SimpleCompactionTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,49 @@
1919
package org.apache.cassandra.db.compaction.simple;
2020

2121
import java.io.IOException;
22+
import java.util.Arrays;
23+
import java.util.Collection;
2224
import java.util.concurrent.ExecutionException;
2325

26+
import org.junit.After;
2427
import org.junit.AfterClass;
28+
import org.junit.Before;
2529
import org.junit.Ignore;
30+
import org.junit.runner.RunWith;
31+
import org.junit.runners.Parameterized;
2632

33+
import org.apache.cassandra.config.Config.DiskAccessMode;
34+
import org.apache.cassandra.config.DatabaseDescriptor;
2735
import org.apache.cassandra.cql3.CQLTester;
2836
import org.apache.cassandra.utils.TestHelper;
2937

3038

3139
@Ignore
40+
@RunWith(Parameterized.class)
3241
public abstract class SimpleCompactionTest extends CQLTester
3342
{
43+
@Parameterized.Parameter
44+
public DiskAccessMode compactionReadDiskAccessMode;
45+
46+
@Parameterized.Parameters(name = "{0}")
47+
public static Collection<Object[]> diskAccessModes()
48+
{
49+
return Arrays.asList(new Object[]{ DiskAccessMode.standard },
50+
new Object[]{ DiskAccessMode.direct });
51+
}
52+
53+
@Before
54+
public void setCompactionReadDiskAccessMode()
55+
{
56+
DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode);
57+
}
58+
59+
@After
60+
public void restoreCompactionReadDiskAccessMode()
61+
{
62+
DatabaseDescriptor.setCompactionReadDiskAccessMode(DiskAccessMode.standard);
63+
}
64+
3465
@AfterClass
3566
public static void teardown() throws IOException, InterruptedException, ExecutionException
3667
{

test/unit/org/apache/cassandra/io/util/DirectThreadLocalReadAheadBufferTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
*/
1818
package org.apache.cassandra.io.util;
1919

20+
import java.lang.management.BufferPoolMXBean;
21+
import java.lang.management.ManagementFactory;
2022
import java.util.Arrays;
23+
import java.util.List;
2124

25+
import org.junit.Assert;
2226
import org.junit.Test;
2327

2428
import org.apache.cassandra.utils.Pair;
@@ -61,4 +65,49 @@ private void testReads(InputData propertyInputs, int bufferSize, int blockSize)
6165
tlrab.close();
6266
}
6367
}
68+
69+
@Test
70+
public void testDirectMemoryIsCleanedOnClose()
71+
{
72+
BufferPoolMXBean directPool = getDirectBufferPool();
73+
int blockSize = FileUtils.getFileBlockSize(files[0]);
74+
int bufferSize = 64 * 1024 * 1024; // 64MB - large enough to reliably detect
75+
76+
try (ChannelProxy channel = new ChannelProxy(files[0], ChannelProxy.IOMode.DIRECT))
77+
{
78+
DirectThreadLocalReadAheadBuffer tlrab =
79+
new DirectThreadLocalReadAheadBuffer(channel, bufferSize, blockSize);
80+
81+
// Force buffer allocation
82+
tlrab.allocateBuffer();
83+
84+
long memoryUsedBefore = directPool.getMemoryUsed();
85+
86+
// Close should clean the direct memory
87+
tlrab.close();
88+
89+
long memoryUsedAfter = directPool.getMemoryUsed();
90+
91+
// Memory should decrease by approximately buffer size (+ alignment overhead)
92+
long expectedDecrease = bufferSize;
93+
long actualDecrease = memoryUsedBefore - memoryUsedAfter;
94+
95+
Assert.assertTrue(
96+
"Direct memory should decrease after close(). " +
97+
"Before: " + memoryUsedBefore + ", After: " + memoryUsedAfter +
98+
", Expected decrease: ~" + expectedDecrease + ", Actual: " + actualDecrease,
99+
actualDecrease >= expectedDecrease * 0.9); // 10% tolerance for alignment
100+
}
101+
}
102+
103+
private static BufferPoolMXBean getDirectBufferPool()
104+
{
105+
List<BufferPoolMXBean> pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
106+
for (BufferPoolMXBean pool : pools)
107+
if (pool.getName().equals("direct"))
108+
return pool;
109+
110+
throw new IllegalStateException("Direct buffer pool not found");
111+
}
112+
64113
}

0 commit comments

Comments
 (0)