Skip to content

Commit 6f5fe8c

Browse files
samueldlightfootaweisberg
authored andcommitted
Add Direct IO support for compaction reads
This change adds support for using O_DIRECT (direct I/O) when reading SSTables during compaction. Direct I/O bypasses the OS page cache, which can be beneficial for compaction workloads where data is typically read once and not accessed again soon after. Key changes: - Add DiskAccessMode.direct option for scan operations - Introduce DirectThreadLocalByteBufferHolder and DirectThreadLocalReadAheadBuffer for aligned buffer management required by O_DIRECT - Add ByteBufferHolder interface to abstract buffer management - Modify FileHandle to support building direct I/O readers via toBuilder() - Add startup check to verify Direct IO support on configured data directories - Enable read-ahead for uncompressed data in Direct CompressedChunkReader - Move direct IO support logic into FileHandle (from SSTableReader) - Add comprehensive tests for direct I/O chunk readers and buffer holders patch by Sam Lightfoot; reviewed by Ariel Weisberg and Maxwell Guo for CASSANDRA-19987
1 parent 940739a commit 6f5fe8c

43 files changed

Lines changed: 1666 additions & 234 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Direct I/O support for compaction reads (CASSANDRA-19987)
23
* Support custom StartupCheck implementations via SPI (CASSANDRA-21093)
34
* Make sstableexpiredblockers support human-readable output with SSTable sizes (CASSANDRA-20448)
45
* Enhance nodetool compactionhistory to report more compaction properities (CASSANDRA-20081)

conf/cassandra.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,11 @@ commitlog_segment_size: 32MiB
688688
# The default setting is legacy when the storage compatibility is set to 4 or auto otherwise.
689689
commitlog_disk_access_mode: legacy
690690

691+
# Set the disk access mode for reading SSTables during compaction. The allowed values are:
692+
# - auto: inherit from disk_access_mode (default)
693+
# - direct: use direct I/O for compaction reads, bypassing the OS page cache
694+
# compaction_read_disk_access_mode: auto
695+
691696
# Compression to apply to SSTables as they flush for compressed tables.
692697
# Note that tables without compression enabled do not respect this flag.
693698
#

conf/cassandra_latest.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,11 @@ commitlog_segment_size: 32MiB
695695
# The default setting is legacy when the storage compatibility is set to 4 or auto otherwise.
696696
commitlog_disk_access_mode: auto
697697

698+
# Set the disk access mode for reading SSTables during compaction. The allowed values are:
699+
# - auto: inherit from disk_access_mode (default)
700+
# - direct: use direct I/O for compaction reads, bypassing the OS page cache
701+
# compaction_read_disk_access_mode: auto
702+
698703
# Compression to apply to SSTables as they flush for compressed tables.
699704
# Note that tables without compression enabled do not respect this flag.
700705
#

src/java/org/apache/cassandra/config/Config.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ public static class SSTableConfig
428428
public FlushCompression flush_compression = FlushCompression.fast;
429429
public int commitlog_max_compression_buffers_in_pool = 3;
430430
public DiskAccessMode commitlog_disk_access_mode = DiskAccessMode.legacy;
431+
public DiskAccessMode compaction_read_disk_access_mode = DiskAccessMode.auto;
431432
@Replaces(oldName = "periodic_commitlog_sync_lag_block_in_ms", converter = Converters.MILLIS_DURATION_INT, deprecated = true)
432433
public DurationSpec.IntMillisecondsBound periodic_commitlog_sync_lag_block;
433434
public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions();

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ public class DatabaseDescriptor
221221

222222
private static DiskAccessMode commitLogWriteDiskAccessMode;
223223

224+
private static DiskAccessMode compactionReadDiskAccessMode;
225+
224226
private static AbstractCryptoProvider cryptoProvider;
225227
private static IAuthenticator authenticator;
226228
private static IAuthorizer authorizer;
@@ -662,6 +664,21 @@ else if (conf.disk_access_mode == DiskAccessMode.direct)
662664
}
663665
logger.info("DiskAccessMode is {}, indexAccessMode is {}", conf.disk_access_mode, indexAccessMode);
664666

667+
if (DiskAccessMode.auto == conf.compaction_read_disk_access_mode)
668+
{
669+
compactionReadDiskAccessMode = conf.disk_access_mode;
670+
}
671+
else if (DiskAccessMode.direct == conf.compaction_read_disk_access_mode)
672+
{
673+
compactionReadDiskAccessMode = DiskAccessMode.direct;
674+
}
675+
else
676+
{
677+
throw new IllegalArgumentException("Unsupported disk access mode for compaction_read_disk_access_mode " +
678+
"(options: direct/auto) " + conf.compaction_read_disk_access_mode);
679+
}
680+
logger.info("compaction_read_disk_access_mode resolved to: {}", compactionReadDiskAccessMode);
681+
665682
/* phi convict threshold for FailureDetector */
666683
if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
667684
{
@@ -1773,7 +1790,7 @@ private static Pair<DiskAccessMode, Boolean> resolveCommitLogWriteDiskAccessMode
17731790

17741791
File commitLogLocationDir = new File(commitLogLocation);
17751792
PathUtils.createDirectoriesIfNotExists(commitLogLocationDir.toPath());
1776-
directIOSupported = FileUtils.getBlockSize(commitLogLocationDir) > 0;
1793+
directIOSupported = FileUtils.isDirectIOSupported(commitLogLocationDir);
17771794
}
17781795
catch (IOError | ConfigurationException ex)
17791796
{
@@ -3301,6 +3318,18 @@ public static void setCommitLogSegmentSize(int sizeMebibytes)
33013318
conf.commitlog_segment_size = new DataStorageSpec.IntMebibytesBound(sizeMebibytes);
33023319
}
33033320

3321+
public static DiskAccessMode getCompactionReadDiskAccessMode()
3322+
{
3323+
return compactionReadDiskAccessMode;
3324+
}
3325+
3326+
@VisibleForTesting
3327+
public static void setCompactionReadDiskAccessMode(DiskAccessMode scanDiskAccessMode)
3328+
{
3329+
compactionReadDiskAccessMode = scanDiskAccessMode;
3330+
conf.compaction_read_disk_access_mode = scanDiskAccessMode;
3331+
}
3332+
33043333
/**
33053334
* Return commitlog disk access mode.
33063335
*/

src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35+
import org.apache.cassandra.config.DatabaseDescriptor;
3536
import org.apache.cassandra.db.ColumnFamilyStore;
3637
import org.apache.cassandra.db.Directories;
3738
import org.apache.cassandra.db.SerializationHeader;
@@ -262,7 +263,7 @@ public ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Ra
262263
try
263264
{
264265
for (SSTableReader sstable : sstables)
265-
scanners.add(sstable.getScanner(ranges));
266+
scanners.add(sstable.getScanner(ranges, DatabaseDescriptor.getCompactionReadDiskAccessMode()));
266267
}
267268
catch (Throwable t)
268269
{

src/java/org/apache/cassandra/db/compaction/CompactionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,7 +1767,7 @@ public ISSTableScanner getScanner(SSTableReader sstable)
17671767
{
17681768
rangesToScan = Collections2.filter(ranges, range -> !transientRanges.contains(range));
17691769
}
1770-
return sstable.getScanner(rangesToScan);
1770+
return sstable.getScanner(rangesToScan, DatabaseDescriptor.getCompactionReadDiskAccessMode());
17711771
}
17721772

17731773
@Override
@@ -1790,7 +1790,7 @@ public Full(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, long nowInSe
17901790
@Override
17911791
public ISSTableScanner getScanner(SSTableReader sstable)
17921792
{
1793-
return sstable.getScanner();
1793+
return sstable.getScanner(DatabaseDescriptor.getCompactionReadDiskAccessMode());
17941794
}
17951795

17961796
@Override

src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.slf4j.Logger;
4545
import org.slf4j.LoggerFactory;
4646

47+
import org.apache.cassandra.config.DatabaseDescriptor;
4748
import org.apache.cassandra.db.ColumnFamilyStore;
4849
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
4950
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -343,7 +344,7 @@ public ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Ra
343344
{
344345
// L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
345346
for (SSTableReader sstable : byLevel.get(level))
346-
scanners.add(sstable.getScanner(ranges));
347+
scanners.add(sstable.getScanner(ranges, DatabaseDescriptor.getCompactionReadDiskAccessMode()));
347348
}
348349
else
349350
{
@@ -445,7 +446,7 @@ public LeveledScanner(TableMetadata metadata, Collection<SSTableReader> sstables
445446
sstableIterator = this.sstables.iterator();
446447
assert sstableIterator.hasNext(); // caller should check intersecting first
447448
SSTableReader currentSSTable = sstableIterator.next();
448-
currentScanner = currentSSTable.getScanner(ranges);
449+
currentScanner = currentSSTable.getScanner(ranges, DatabaseDescriptor.getCompactionReadDiskAccessMode());
449450
}
450451

451452
@Override
@@ -498,7 +499,7 @@ protected UnfilteredRowIterator computeNext()
498499
return endOfData();
499500
}
500501
SSTableReader currentSSTable = sstableIterator.next();
501-
currentScanner = currentSSTable.getScanner(ranges);
502+
currentScanner = currentSSTable.getScanner(ranges, DatabaseDescriptor.getCompactionReadDiskAccessMode());
502503
}
503504
}
504505

src/java/org/apache/cassandra/index/accord/SSTableIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static SSTableIndex create(IndexDescriptor id) throws IOException
7676
{
7777
Map<IndexComponent, FileHandle> files = new EnumMap<>(IndexComponent.class);
7878
for (IndexComponent c : id.getLiveComponents())
79-
files.put(c, new FileHandle.Builder(id.fileFor(c)).mmapped(true).complete());
79+
files.put(c, new FileHandle.Builder(id.fileFor(c)).mmapped().complete());
8080
List<Segment> segments = RouteIndexFormat.readSegments(files);
8181
files.remove(IndexComponent.SEGMENT).close();
8282
files.remove(IndexComponent.METADATA).close();

src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public FileHandle createPerSSTableFileHandle(IndexComponent indexComponent, Thro
269269
if (logger.isTraceEnabled())
270270
logger.trace(logMessage("Opening file handle for {} ({})"), file, FBUtilities.prettyPrintMemory(file.length()));
271271

272-
return new FileHandle.Builder(file).mmapped(true).complete();
272+
return new FileHandle.Builder(file).mmapped().complete();
273273
}
274274
catch (Throwable t)
275275
{
@@ -291,7 +291,7 @@ public FileHandle createPerIndexFileHandle(IndexComponent indexComponent, IndexI
291291
if (logger.isTraceEnabled())
292292
logger.trace(logMessage("Opening file handle for {} ({})"), file, FBUtilities.prettyPrintMemory(file.length()));
293293

294-
return new FileHandle.Builder(file).mmapped(true).complete();
294+
return new FileHandle.Builder(file).mmapped().complete();
295295
}
296296
catch (Throwable t)
297297
{

0 commit comments

Comments
 (0)