From ca8ef09c28d8d2ac183740acefa0c52952ec58f6 Mon Sep 17 00:00:00 2001 From: samlightfoot Date: Tue, 19 May 2026 18:22:50 +0100 Subject: [PATCH] CASSANDRA-21134: Direct I/O for background SSTable writes Adds an opt-in O_DIRECT write path for background SSTable producers, bypassing the OS page cache for data that is unlikely to be re-read soon after being written. Memtable flushes remain buffered. Enabled via two new YAML knobs: - background_write_disk_access_mode: standard (default) | direct - direct_write_buffer_size: 1MiB (default; aligned up to FS block size, auto-grown to chunk_length) The path is gated by config, table compression being enabled, and an OperationType allowlist in DataComponent. The allowlist is exhaustive: any new OperationType with writesData=true that is not classified will fail static initialization. Operations on the DIO path: COMPACTION, MAJOR_COMPACTION, TOMBSTONE_COMPACTION, ANTICOMPACTION, GARBAGE_COLLECT, CLEANUP, UPGRADE_SSTABLES, WRITE, STREAM (chunked receiver only). Operations off the DIO path: - FLUSH (policy: just-flushed data is hot, keep in page cache) - SCRUB (correctness: tryAppend needs mark/resetAndTruncate) - Zero-Copy Streaming (bypasses DataComponent.buildWriter) - Uncompressed writers (only CompressedSequentialWriter has a DIO subclass in this change) StartupChecks fails fast if 'direct' is requested on a platform/FS that does not support O_DIRECT. patch by Sam Lightfoot; reviewed by for CASSANDRA-21134 --- conf/cassandra.yaml | 15 + .../org/apache/cassandra/config/Config.java | 12 + .../cassandra/config/DatabaseDescriptor.java | 79 ++ .../apache/cassandra/io/DirectIoSupport.java | 49 ++ .../compress/CompressedSequentialWriter.java | 101 ++- .../DirectCompressedSequentialWriter.java | 335 ++++++++ .../io/sstable/format/DataComponent.java | 92 +- .../cassandra/io/util/SequentialWriter.java | 34 +- .../cassandra/service/StartupChecks.java | 24 +- .../test/StreamingDirectWriteTest.java | 74 ++ .../config/DatabaseDescriptorTest.java | 29 + .../db/compaction/AntiCompactionTest.java | 83 +- .../db/compaction/CompactionsTest.java | 105 ++- .../DirectCompressedSequentialWriterTest.java | 813 ++++++++++++++++++ .../sstable/CQLSSTableWriterDaemonTest.java | 34 + .../io/sstable/CQLSSTableWriterTest.java | 50 ++ ...DataComponentDirectWriteSelectionTest.java | 192 +++++ .../cassandra/io/util/DirectIoTestUtils.java | 68 ++ 18 files changed, 2117 insertions(+), 72 deletions(-) create mode 100644 src/java/org/apache/cassandra/io/DirectIoSupport.java create mode 100644 src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/StreamingDirectWriteTest.java create mode 100644 test/unit/org/apache/cassandra/io/compress/DirectCompressedSequentialWriterTest.java create mode 100644 test/unit/org/apache/cassandra/io/sstable/format/DataComponentDirectWriteSelectionTest.java create mode 100644 test/unit/org/apache/cassandra/io/util/DirectIoTestUtils.java diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index af65d54b4bb4..01613e56eb02 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -693,6 +693,21 @@ commitlog_disk_access_mode: legacy # - direct: use direct I/O for compaction reads, bypassing the OS page cache # compaction_read_disk_access_mode: auto +# Set the disk access mode for writing compressed SSTables during background operations +# (compaction, streaming, scrub, cleanup, repair, etc.). The allowed values are: +# - standard: use buffered I/O (default) +# - direct: use direct I/O, bypassing the OS page cache +# Note: Only applies to compressed tables. Uncompressed tables always use buffered I/O. +# Note: Memtable flushes always use buffered I/O regardless of this setting, as flushed +# data benefits from page cache for subsequent reads. +# background_write_disk_access_mode: standard + +# Size of the in-memory staging buffer for Direct IO background writes. Trades off syscall +# frequency against per-flush blocking latency on the compaction thread. +# Aligned up to filesystem block size; auto-expands to fit a single compressed chunk + CRC +# + one block when chunk_length exceeds this value. +# direct_write_buffer_size: 1MiB + # Compression to apply to SSTables as they flush for compressed tables. # Note that tables without compression enabled do not respect this flag. # diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 3a485fddc5b2..dac8416da9e1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -362,6 +362,18 @@ public MemtableOptions() public DataStorageSpec.IntKibibytesBound compressed_read_ahead_buffer_size = new DataStorageSpec.IntKibibytesBound("256KiB"); + // Direct IO for background SSTable writes (compaction, streaming, scrub, cleanup, etc.) + // When 'direct' is set, background writes bypass the OS page cache using O_DIRECT. + // Memtable flushes always use buffered I/O regardless of this setting. + // Default is 'standard' (buffered I/O) - users must opt-in to Direct IO + public DiskAccessMode background_write_disk_access_mode = DiskAccessMode.standard; + + // Size of the in-memory staging buffer for Direct IO background writes. Trades off syscall + // frequency against per-flush blocking latency on the compaction thread. + // Aligned up to filesystem block size; auto-expands to fit a single compressed chunk + CRC + // + one block when chunk_length exceeds this value. + public DataStorageSpec.IntKibibytesBound direct_write_buffer_size = new DataStorageSpec.IntKibibytesBound("1MiB"); + // fraction of free disk space available for compaction after min free space is subtracted public volatile Double max_space_usable_for_compactions_in_percentage = .95; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 9ee09db70709..edf664986168 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -224,6 +224,8 @@ public class DatabaseDescriptor private static DiskAccessMode compactionReadDiskAccessMode; + private static DiskAccessMode backgroundWriteDiskAccessMode; + private static AbstractCryptoProvider cryptoProvider; private static IAuthenticator authenticator; private static IAuthorizer authorizer; @@ -897,6 +899,10 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m if (conf.hints_directory.equals(conf.saved_caches_directory)) throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false); + initializeBackgroundWriteDiskAccessMode(); + if (backgroundWriteDiskAccessMode != conf.background_write_disk_access_mode) + logger.info("background_write_disk_access_mode resolved to: {}", backgroundWriteDiskAccessMode); + if (conf.memtable_flush_writers == 0) { conf.memtable_flush_writers = conf.data_file_directories.length == 1 ? 2 : 1; @@ -3406,6 +3412,79 @@ public static void initializeCommitLogDiskAccessMode() commitLogWriteDiskAccessMode = accessModeDirectIoPair.left; } + public static DiskAccessMode getBackgroundWriteDiskAccessMode() + { + return backgroundWriteDiskAccessMode; + } + + @VisibleForTesting + public static void setBackgroundWriteDiskAccessMode(DiskAccessMode diskAccessMode) + { + backgroundWriteDiskAccessMode = diskAccessMode; + conf.background_write_disk_access_mode = diskAccessMode; + } + + public static DataStorageSpec.IntKibibytesBound getDirectWriteBufferSize() + { + return conf.direct_write_buffer_size; + } + + @VisibleForTesting + public static void initializeBackgroundWriteDiskAccessMode() + { + DiskAccessMode providedMode = conf.background_write_disk_access_mode; + + if (providedMode == DiskAccessMode.auto) + { + providedMode = DiskAccessMode.standard; + } + + if (providedMode == DiskAccessMode.direct) + { + // DataStorageSpec already rejects negatives at parse time; zero is the remaining + // nonsense value. The writer's Math.max would silently coerce it to minRequiredSize, + // which masks a likely operator mistake — fail fast instead. + if (conf.direct_write_buffer_size.toBytes() <= 0) + throw new ConfigurationException("direct_write_buffer_size must be > 0 when background_write_disk_access_mode is 'direct'. " + + "Got: " + conf.direct_write_buffer_size, false); + + if (!toolInitialized) + { + List unsupportedLocations = new ArrayList<>(); + + for (String dataDir : conf.data_file_directories) + { + try + { + File dataDirFile = new File(dataDir); + PathUtils.createDirectoriesIfNotExists(dataDirFile.toPath()); + + if (!FileUtils.isDirectIOSupported(dataDirFile)) + { + unsupportedLocations.add(dataDir); + } + } + catch (RuntimeException e) + { + logger.warn("Unable to determine Direct IO support for data directory {}: {}", dataDir, e.getMessage()); + unsupportedLocations.add(dataDir + " (check failed: " + e.getMessage() + ")"); + } + } + + if (!unsupportedLocations.isEmpty()) + { + throw new ConfigurationException( + String.format("background_write_disk_access_mode is set to 'direct', but the following data directories " + + "do not support Direct I/O: %s. Either change background_write_disk_access_mode to 'standard' " + + "in cassandra.yaml, or ensure all data directories are on filesystems that support Direct I/O.", + unsupportedLocations), false); + } + } + } + + backgroundWriteDiskAccessMode = providedMode; + } + public static String getSavedCachesLocation() { return conf.saved_caches_directory; diff --git a/src/java/org/apache/cassandra/io/DirectIoSupport.java b/src/java/org/apache/cassandra/io/DirectIoSupport.java new file mode 100644 index 000000000000..3ece854ed590 --- /dev/null +++ b/src/java/org/apache/cassandra/io/DirectIoSupport.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io; + +/** + * Classifies an operation's eligibility for a direct-IO (O_DIRECT) data path, encoding both + * the answer and the rationale class. Consumers maintain their own per-operation classification + * and apply this alongside their own gates (e.g. compression, configuration mode); + * {@link #SUPPORTED} is necessary but not sufficient. + */ +public enum DirectIoSupport +{ + /** + * Eligible for the direct-IO data path. + * */ + SUPPORTED, + + /** + * The direct-IO path is mechanically incompatible with this operation. Removing this + * exclusion requires code changes, not policy. + */ + UNSUPPORTED_CORRECTNESS, + + /** + * Direct IO would work but is deliberately disabled for performance or cache-residency + * reasons. Removing this exclusion requires re-evaluating the policy, not code changes. + */ + UNSUPPORTED_POLICY; + + public boolean isSupported() + { + return this == SUPPORTED; + } +} diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 484b50c5e555..6da3d5984242 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.file.OpenOption; import java.util.Optional; import java.util.zip.CRC32; @@ -50,32 +51,47 @@ public class CompressedSequentialWriter extends SequentialWriter // holds offset in the file where current chunk should be written // changed only by flush() method where data buffer gets compressed and stored to the file - private long chunkOffset = 0; + protected long chunkOffset = 0; // index file writer (random I/O) - private final CompressionMetadata.Writer metadataWriter; + protected final CompressionMetadata.Writer metadataWriter; private final ICompressor compressor; // used to store compressed data private ByteBuffer compressed; // holds a number of already written chunks - private int chunkCount = 0; + protected int chunkCount = 0; - private long uncompressedSize = 0, compressedSize = 0; + protected long uncompressedSize = 0; + protected long compressedSize = 0; - private final MetadataCollector sstableMetadataCollector; + protected final MetadataCollector sstableMetadataCollector; private final CompressionDictionaryManager compressionDictionaryManager; private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4); - private final Optional digestFile; + protected final Optional digestFile; private final int maxCompressedLength; private final boolean isDictionaryEnabled; + private static ByteBuffer allocateBuffer(CompressionParams parameters) + { + return parameters.getSstableCompressor().preferredBufferType().allocate(parameters.chunkLength()); + } + + private static SequentialWriterOption buildOption(SequentialWriterOption option, CompressionParams parameters) + { + return SequentialWriterOption.newBuilder() + .bufferSize(parameters.chunkLength()) + .bufferType(parameters.getSstableCompressor().preferredBufferType()) + .finishOnClose(option.finishOnClose()) + .build(); + } + public CompressedSequentialWriter(File file, File offsetsFile, - File digestFile, + @Nullable File digestFile, SequentialWriterOption option, CompressionParams parameters, MetadataCollector sstableMetadataCollector) @@ -83,33 +99,28 @@ public CompressedSequentialWriter(File file, this(file, offsetsFile, digestFile, option, parameters, sstableMetadataCollector, null); } - /** - * Create CompressedSequentialWriter without digest file. + * Create CompressedSequentialWriter with optional compression dictionary and channel options. * * @param file File to write * @param offsetsFile File to write compression metadata - * @param digestFile File to write digest + * @param digestFile File to write digest, or null if not needed * @param option Write option (buffer size and type will be set the same as compression params) * @param parameters Compression parameters * @param sstableMetadataCollector Metadata collector * @param compressionDictionaryManager manages compression dictionary; null if absent + * @param extraOpenOptions additional options to pass to FileChannel.open (e.g., ExtendedOpenOption.DIRECT) */ public CompressedSequentialWriter(File file, File offsetsFile, - File digestFile, + @Nullable File digestFile, SequentialWriterOption option, CompressionParams parameters, MetadataCollector sstableMetadataCollector, - @Nullable CompressionDictionaryManager compressionDictionaryManager) + @Nullable CompressionDictionaryManager compressionDictionaryManager, + OpenOption... extraOpenOptions) { - super(file, SequentialWriterOption.newBuilder() - .bufferSize(option.bufferSize()) - .bufferType(option.bufferType()) - .bufferSize(parameters.chunkLength()) - .bufferType(parameters.getSstableCompressor().preferredBufferType()) - .finishOnClose(option.finishOnClose()) - .build()); + super(file, allocateBuffer(parameters), buildOption(option, parameters), true, extraOpenOptions); ICompressor compressor = parameters.getSstableCompressor(); this.digestFile = Optional.ofNullable(digestFile); @@ -142,7 +153,7 @@ public CompressedSequentialWriter(File file, metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsFile, compressionDictionary); this.sstableMetadataCollector = sstableMetadataCollector; - crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel))); + crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(this.channel))); } @Override @@ -178,7 +189,9 @@ public void flush() @Override protected void flushData() { - seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation + // resetAndTruncate leaves fchannel.position() past EOF after its verification reads + truncate; + // re-seek so the next chunk lands at chunkOffset. No-op under linear writes. + seekToChunkStart(); try { @@ -216,32 +229,36 @@ protected void flushData() } compressedSize += compressedLength; + // write an offset of the newly written chunk to the index file + metadataWriter.addOffset(chunkOffset); + chunkCount++; + + // write out the compressed data and checksum + toWrite.flip(); + writeChunk(toWrite); + lastFlushOffset = uncompressedSize; + + if (toWrite == buffer) + buffer.position(uncompressedLength); + + // next chunk should be written right after current + length of the checksum (int) + chunkOffset += compressedLength + 4; + if (runPostFlush != null) + runPostFlush.accept(getLastFlushOffset()); + } + + protected void writeChunk(ByteBuffer toWrite) + { try { - // write an offset of the newly written chunk to the index file - metadataWriter.addOffset(chunkOffset); - chunkCount++; - - // write out the compressed data - toWrite.flip(); channel.write(toWrite); - - // write corresponding checksum toWrite.rewind(); crcMetadata.appendDirect(toWrite, true); - lastFlushOffset = uncompressedSize; } catch (IOException e) { throw new FSWriteError(e, getPath()); } - if (toWrite == buffer) - buffer.position(uncompressedLength); - - // next chunk should be written right after current + length of the checksum (int) - chunkOffset += compressedLength + 4; - if (runPostFlush != null) - runPostFlush.accept(getLastFlushOffset()); } public CompressionMetadata open(long overrideLength) @@ -358,10 +375,16 @@ private void truncate(long toFileSize, long toBufferOffset) } } + protected void writeDigestFile() + { + digestFile.ifPresent(crcMetadata::writeFullChecksum); + } + /** * Seek to the offset where next compressed data chunk should be stored. + * Subclasses may override if they manage their own channel. */ - private void seekToChunkStart() + protected void seekToChunkStart() { if (getOnDiskFilePointer() != chunkOffset) { @@ -429,7 +452,7 @@ protected Throwable doAbort(Throwable accumulate) protected void doPrepare() { syncInternal(); - digestFile.ifPresent(crcMetadata::writeFullChecksum); + writeDigestFile(); sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize); metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit(); } diff --git a/src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java new file mode 100644 index 000000000000..05254a10f682 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.CRC32; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.sun.nio.file.ExtendedOpenOption; + +import org.agrona.BitUtil; +import org.agrona.BufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compression.CompressionDictionaryManager; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileOutputStreamPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.memory.MemoryUtil; + +import sun.nio.ch.DirectBuffer; + +import static org.apache.cassandra.utils.Throwables.merge; + +/** + * Uses O_DIRECT to bypass the OS page cache, reducing memory pressure during compaction. + *

+ * O_DIRECT requires all writes to be block-aligned, so compressed chunks are accumulated in an aligned buffer. + * Only complete blocks are flushed; at close, remaining data is padded, written, and the file is truncated to + * actual size. + */ +public class DirectCompressedSequentialWriter extends CompressedSequentialWriter +{ + private static final Logger logger = LoggerFactory.getLogger(DirectCompressedSequentialWriter.class); + + // Fires the "configured buffer below minimum required, was coerced" warning at most once + // per JVM so a misconfiguration is operator-visible without per-SSTable spam. + private static final AtomicBoolean undersizedBufferWarned = new AtomicBoolean(false); + + private static final int CRC_LENGTH = Integer.BYTES; + + // Sized to hold at least one full chunk + CRC + post-flush leftover: + // capacity >= maxChunkWrite + CRC_LENGTH + blockSize + // writeToAlignedBuffer puts a chunk in a single ByteBuffer.put, so the chunk must + // fit contiguously. flushCompleteBlocks aligns down to blockSize, leaving up to + // (blockSize - 1) bytes carried over via compact(); the floor accounts for that. + private ByteBuffer writeBuffer; + private int writeBufferPosition = 0; + private long actualDataSize = 0; + + private final int blockSize; + // ChecksumWriter writes CRCs directly to the channel, bypassing writeBuffer; track checksums ourselves. + private final CRC32 fullFileChecksum = new CRC32(); + private final CRC32 chunkChecksum = new CRC32(); + private final ByteBuffer crcBuffer = ByteBuffer.allocate(CRC_LENGTH); + + public DirectCompressedSequentialWriter(File file, + File offsetsFile, + @Nullable File digestFile, + SequentialWriterOption option, + CompressionParams parameters, + MetadataCollector sstableMetadataCollector, + @Nullable CompressionDictionaryManager compressionDictionaryManager) + { + super(file, offsetsFile, digestFile, option, parameters, sstableMetadataCollector, compressionDictionaryManager, ExtendedOpenOption.DIRECT); + + // super() opened the O_DIRECT FileChannel and allocated parent buffers; if anything below throws + // the caller never gets a reference to clean them up, so abort the txn proxy ourselves. + try + { + this.blockSize = FileUtils.getBlockSize(file.parent()); + if (blockSize <= 0) + throw new IllegalStateException("Unable to determine filesystem block size for Direct IO. " + + "Block size: " + blockSize); + + if ((blockSize & (blockSize - 1)) != 0) + throw new IllegalStateException("Filesystem block size must be a power of two for Direct IO. " + + "Block size: " + blockSize); + + int configuredSize = DatabaseDescriptor.getDirectWriteBufferSize().toBytes(); + int maxChunkWrite = parameters.getSstableCompressor().initialCompressedBufferLength(parameters.chunkLength()); + int minRequiredSize = maxChunkWrite + CRC_LENGTH + blockSize; + if (configuredSize < minRequiredSize && undersizedBufferWarned.compareAndSet(false, true)) + logger.warn("direct_write_buffer_size ({} bytes) is below the minimum required for SSTable {} " + + "(worst-case chunk {} + CRC 4 + blockSize {} = {} bytes); using the minimum. " + + "Increase direct_write_buffer_size in cassandra.yaml to silence this warning.", + configuredSize, file, maxChunkWrite, blockSize, minRequiredSize); + int bufferSize = BitUtil.align(Math.max(configuredSize, minRequiredSize), blockSize); + + this.writeBuffer = BufferUtil.allocateDirectAligned(bufferSize, blockSize); + } + catch (Throwable t) + { + Throwable merged = t; + try { merged = abort(t); } + catch (Throwable t2) { t.addSuppressed(t2); } + Throwables.maybeFail(merged); + // Unreachable: maybeFail(non-null) always throws. Present for definite-assignment of `blockSize`. + throw new AssertionError("Throwables.maybeFail should have thrown", merged); + } + } + + // Parent reads fchannel.position(), which lags by writeBuffer contents under O_DIRECT. + // getEstimatedOnDiskBytesWritten is intentionally NOT overridden: parent returns chunkOffset, + // which already represents the eventual on-disk size — correct under DIO. + @Override + public long getOnDiskFilePointer() + { + return actualDataSize; + } + + @Override + protected void seekToChunkStart() + { + // No-op: bytes staged in writeBuffer would be skipped by a seek, leaving a hole. + // resetAndTruncate (the parent's reason for this seek) is unsupported under DIO. + } + + @Override + protected void writeChunk(ByteBuffer toWrite) + { + Preconditions.checkArgument(toWrite.position() == 0, + "writeChunk requires a flipped buffer (position == 0), got position=%s", + toWrite.position()); + + int chunkLength = toWrite.remaining(); + + chunkChecksum.reset(); + chunkChecksum.update(toWrite); + int crcValue = (int) chunkChecksum.getValue(); + toWrite.rewind(); + + writeToAlignedBuffer(toWrite); + writeCrcToAlignedBuffer(crcValue); + + toWrite.rewind(); + updateFullChecksum(toWrite, crcValue); + + actualDataSize = chunkOffset + chunkLength + CRC_LENGTH; + } + + private void writeToAlignedBuffer(ByteBuffer data) + { + int dataLength = data.remaining(); + + // Buffer is sized for worst-case chunk + CRC + blockSize, so a flush always frees enough room. + if (writeBufferPosition + dataLength > writeBuffer.capacity()) + flushCompleteBlocks(); + + writeBuffer.position(writeBufferPosition); + writeBuffer.put(data); + writeBufferPosition = writeBuffer.position(); + } + + private void writeCrcToAlignedBuffer(int crcValue) + { + // After flush, leftover < blockSize, so there's always room for the CRC trailer. + if (writeBufferPosition + CRC_LENGTH > writeBuffer.capacity()) + flushCompleteBlocks(); + + writeBuffer.position(writeBufferPosition); + writeBuffer.putInt(crcValue); + writeBufferPosition = writeBuffer.position(); + } + + private void flushCompleteBlocks() + { + // Align down: O_DIRECT cannot write partial blocks + int flushLimit = writeBufferPosition & -blockSize; + + if (flushLimit == 0) + return; + + try + { + writeBuffer.position(0); + writeBuffer.limit(flushLimit); + fchannel.write(writeBuffer); + + int leftover = writeBufferPosition - flushLimit; + if (leftover > 0) + { + writeBuffer.limit(writeBufferPosition); + writeBuffer.position(flushLimit); + writeBuffer.compact(); + } + else + { + writeBuffer.clear(); + } + writeBufferPosition = leftover; + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + + private void flushFinalWithPadding() + { + if (writeBufferPosition == 0) + return; + + try + { + int flushLimit = BitUtil.align(writeBufferPosition, blockSize); + + writeBuffer.position(writeBufferPosition); + ByteBufferUtil.writeZeroes(writeBuffer, flushLimit - writeBufferPosition); + + writeBuffer.position(0); + writeBuffer.limit(flushLimit); + fchannel.write(writeBuffer); + + // O_DIRECT required padding; truncate back to actual data size. + fchannel.truncate(actualDataSize); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + + private void updateFullChecksum(ByteBuffer data, int crcValue) + { + fullFileChecksum.update(data); + + // Include CRC bytes in the full-file checksum to match ChecksumWriter.appendDirect(..., true). + crcBuffer.clear(); + crcBuffer.putInt(crcValue); + crcBuffer.flip(); + fullFileChecksum.update(crcBuffer); + } + + @Override + protected void writeDigestFile() + { + digestFile.ifPresent(file -> { + try (FileOutputStreamPlus fos = new FileOutputStreamPlus(file)) + { + fos.write(String.valueOf(fullFileChecksum.getValue()).getBytes(StandardCharsets.UTF_8)); + fos.flush(); + fos.getChannel().force(true); + } + catch (IOException e) + { + throw new FSWriteError(e, file); + } + }); + } + + // Gated out for SCRUB in DataComponent.buildWriter; these throws are a canary if the gate is bypassed. + @Override + public DataPosition mark() + { + throw new UnsupportedOperationException("mark() not supported under O_DIRECT"); + } + + @Override + public synchronized void resetAndTruncate(DataPosition mark) + { + throw new UnsupportedOperationException("resetAndTruncate() not supported under O_DIRECT"); + } + + protected class DirectTransactionalProxy extends CompressedSequentialWriter.TransactionalProxy + { + @Override + protected void doPrepare() + { + doFlush(0); + // doFlush leaves a partial-block tail in writeBuffer; pad to a block, write, then truncate. + flushFinalWithPadding(); + syncDataOnlyInternal(); + writeDigestFile(); + sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize); + metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit(); + } + + @Override + protected Throwable doPreCleanup(Throwable accumulate) + { + if (writeBuffer != null) + { + try + { + // allocateDirectAligned returns a slice; clean the backing buffer via the attachment. + MemoryUtil.clean((ByteBuffer) ((DirectBuffer) writeBuffer).attachment()); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + writeBuffer = null; + } + + return super.doPreCleanup(accumulate); + } + } + + @Override + protected SequentialWriter.TransactionalProxy txnProxy() + { + return new DirectTransactionalProxy(); + } +} diff --git a/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java b/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java index 69528628d348..b31346c2cc0e 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java +++ b/src/java/org/apache/cassandra/io/sstable/format/DataComponent.java @@ -18,10 +18,21 @@ package org.apache.cassandra.io.sstable.format; +import java.util.EnumMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.Config.FlushCompression; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compression.CompressionDictionaryManager; +import org.apache.cassandra.io.DirectIoSupport; import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.DirectCompressedSequentialWriter; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; @@ -32,8 +43,45 @@ import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.TableMetadata; +import static org.apache.cassandra.io.DirectIoSupport.SUPPORTED; +import static org.apache.cassandra.io.DirectIoSupport.UNSUPPORTED_CORRECTNESS; +import static org.apache.cassandra.io.DirectIoSupport.UNSUPPORTED_POLICY; + public class DataComponent { + private static final Logger logger = LoggerFactory.getLogger(DataComponent.class); + + private static final EnumMap DIRECT_WRITE_SUPPORT = buildDirectWriteSupport(); + + private static final Set directWriteLogged = ConcurrentHashMap.newKeySet(); + + private static EnumMap buildDirectWriteSupport() + { + EnumMap m = new EnumMap<>(OperationType.class); + + // append()-only writers; safe under O_DIRECT. + m.put(OperationType.CLEANUP, SUPPORTED); + m.put(OperationType.UPGRADE_SSTABLES, SUPPORTED); + m.put(OperationType.MAJOR_COMPACTION, SUPPORTED); + m.put(OperationType.GARBAGE_COLLECT, SUPPORTED); + m.put(OperationType.WRITE, SUPPORTED); + m.put(OperationType.ANTICOMPACTION, SUPPORTED); + m.put(OperationType.COMPACTION, SUPPORTED); + m.put(OperationType.TOMBSTONE_COMPACTION, SUPPORTED); + m.put(OperationType.STREAM, SUPPORTED); + + // tryAppend() needs mark()/resetAndTruncate(), unsupported under O_DIRECT. + m.put(OperationType.SCRUB, UNSUPPORTED_CORRECTNESS); + + // Flushed data is hot; keep it in the page cache. + m.put(OperationType.FLUSH, UNSUPPORTED_POLICY); + + for (OperationType op : OperationType.values()) + if (op.writesData && !m.containsKey(op)) + throw new AssertionError("Missing direct-write classification for " + op); + return m; + } + public static SequentialWriter buildWriter(Descriptor descriptor, TableMetadata metadata, SequentialWriterOption options, @@ -44,15 +92,31 @@ public static SequentialWriter buildWriter(Descriptor descriptor, { if (metadata.params.compression.isEnabled()) { - final CompressionParams compressionParams = buildCompressionParams(metadata, operationType, flushCompression); - - return new CompressedSequentialWriter(descriptor.fileFor(Components.DATA), - descriptor.fileFor(Components.COMPRESSION_INFO), - descriptor.fileFor(Components.DIGEST), - options, - compressionParams, - metadataCollector, - compressionDictionaryManager); + CompressionParams compressionParams = buildCompressionParams(metadata, operationType, flushCompression); + + if (DatabaseDescriptor.getBackgroundWriteDiskAccessMode() == DiskAccessMode.direct + && isDirectWriteSupported(operationType)) + { + if (directWriteLogged.add(operationType)) + logger.info("Direct I/O writer activated for {}", operationType); + return new DirectCompressedSequentialWriter(descriptor.fileFor(Components.DATA), + descriptor.fileFor(Components.COMPRESSION_INFO), + descriptor.fileFor(Components.DIGEST), + options, + compressionParams, + metadataCollector, + compressionDictionaryManager); + } + else + { + return new CompressedSequentialWriter(descriptor.fileFor(Components.DATA), + descriptor.fileFor(Components.COMPRESSION_INFO), + descriptor.fileFor(Components.DIGEST), + options, + compressionParams, + metadataCollector, + compressionDictionaryManager); + } } else { @@ -103,4 +167,14 @@ private static CompressionParams buildCompressionParams(TableMetadata metadata, } return compressionParams; } + + private static boolean isDirectWriteSupported(OperationType operationType) + { + DirectIoSupport support = DIRECT_WRITE_SUPPORT.get(operationType); + if (support == null) + throw new IllegalArgumentException("OperationType " + operationType + + " has no direct-write classification — likely a read-only operation routed through buildWriter()"); + return support.isSupported(); + } + } diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index e76cc234ed45..f68b12e8d53b 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -20,7 +20,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.OpenOption; import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.function.LongConsumer; import org.apache.cassandra.io.FSReadError; @@ -109,25 +113,36 @@ protected Throwable doAbort(Throwable accumulate) } // TODO: we should specify as a parameter if we permit an existing file or not - private static FileChannel openChannel(File file) + private static FileChannel openChannel(File file, OpenOption... extraOptions) { try { + Set options = new LinkedHashSet<>(Arrays.asList(StandardOpenOption.READ, StandardOpenOption.WRITE)); + options.addAll(Arrays.asList(extraOptions)); + if (file.exists()) { - return FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE); + return FileChannel.open(file.toPath(), options.toArray(OpenOption[]::new)); } else { - FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); + options.add(StandardOpenOption.CREATE_NEW); + FileChannel channel = FileChannel.open(file.toPath(), options.toArray(OpenOption[]::new)); try { SyncUtil.trySyncDir(file.parent()); } catch (Throwable t) { - try { channel.close(); } - catch (Throwable t2) { t.addSuppressed(t2); } + try + { + channel.close(); + } + catch (Throwable t2) + { + t.addSuppressed(t2); + } + throw t; } return channel; } @@ -170,15 +185,14 @@ public SequentialWriter(File file, SequentialWriterOption option, boolean strict this(file, option.allocateBuffer(), option, strictFlushing); } - protected SequentialWriter(File file, ByteBuffer buffer, SequentialWriterOption option, boolean strictFlushing) + protected SequentialWriter(File file, ByteBuffer buffer, SequentialWriterOption option, boolean strictFlushing, + OpenOption... extraOpenOptions) { - super(openChannel(file), buffer); + super(openChannel(file, extraOpenOptions), buffer); this.strictFlushing = strictFlushing; - this.fchannel = (FileChannel)channel; - + this.fchannel = (FileChannel) this.channel; this.file = file; this.filePath = file.absolutePath(); - this.option = option; } diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 31a2686af7b3..82f351c3e0b3 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -259,8 +259,12 @@ public void execute(StartupChecksConfiguration configuration) throws StartupExce Set directIOWritePaths = new HashSet<>(); if (DatabaseDescriptor.getCommitLogWriteDiskAccessMode() == Config.DiskAccessMode.direct) directIOWritePaths.add(new File(DatabaseDescriptor.getCommitLogLocation()).toPath()); - // Note: Data directories for direct IO compaction reads are checked in checkDirectIOSupport. - // This check is specifically for direct IO writes which are currently only supported for commit log. + + if (DatabaseDescriptor.getBackgroundWriteDiskAccessMode() == Config.DiskAccessMode.direct) + { + for (String dataDir : DatabaseDescriptor.getAllDataFileLocations()) + directIOWritePaths.add(new File(dataDir).toPath()); + } if (!directIOWritePaths.isEmpty() && IGNORE_KERNEL_BUG_1057843_CHECK.getBoolean()) { @@ -734,21 +738,27 @@ public void execute(StartupChecksConfiguration configuration) throws StartupExce if (configuration.isDisabled(name())) return; - // Only check if compaction_read_disk_access_mode is direct - if (DatabaseDescriptor.getCompactionReadDiskAccessMode() != Config.DiskAccessMode.direct) + boolean directReads = DatabaseDescriptor.getCompactionReadDiskAccessMode() == Config.DiskAccessMode.direct; + boolean directWrites = DatabaseDescriptor.getBackgroundWriteDiskAccessMode() == Config.DiskAccessMode.direct; + + if (!directReads && !directWrites) return; List unsupportedLocations = findDirectIOUnsupportedLocations(DatabaseDescriptor.getAllDataFileLocations()); if (!unsupportedLocations.isEmpty()) { + String configuredModes = directReads && directWrites + ? "compaction reads and background writes" + : directReads ? "compaction reads" : "background writes"; + throw new StartupException(StartupException.ERR_WRONG_DISK_STATE, - String.format("Direct I/O is configured for compaction reads (compaction_read_disk_access_mode=direct), " + + String.format("Direct I/O is configured for %s, " + "but the following data directories do not support Direct I/O: %s. " + - "Either change compaction_read_disk_access_mode to 'standard' in cassandra.yaml, " + + "Either change the disk access mode to 'standard' in cassandra.yaml, " + "or ensure all data directories are on filesystems that support Direct I/O. " + "Network filesystems (NFS, CIFS) and some virtual filesystems do not support Direct I/O.", - unsupportedLocations)); + configuredModes, unsupportedLocations)); } } }; diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamingDirectWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamingDirectWriteTest.java new file mode 100644 index 000000000000..ea720470fe9d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingDirectWriteTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; + +import static org.assertj.core.api.Assertions.assertThat; + +public class StreamingDirectWriteTest extends TestBaseImpl +{ + @Test + public void testChunkedStreamReceiverWithDirectWrites() throws IOException + { + // ZCS streaming bypasses DataComponent.buildWriter; disable to exercise the chunked receiver path. + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP) + .set("stream_entire_sstables", false) + .set("background_write_disk_access_mode", "direct")) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k text PRIMARY KEY, v blob) WITH compression = {'enabled':'true', 'class':'LZ4Compressor'};")); + cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()); + + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + + // ~800 KiB of incompressible data spread across many flushes — multiple sstables to stream chunked. + int rowCount = 100; + int valBytes = 8 * 1024; + byte[] val = new byte[valBytes]; + new Random(42).nextBytes(val); + ByteBuffer blob = ByteBuffer.wrap(val); + for (int i = 0; i < rowCount; i++) + { + node1.executeInternal(withKeyspace("INSERT INTO %s.t (k, v) VALUES (?, ?)"), "k" + i, blob); + if ((i % 20) == 19) + node1.flush(KEYSPACE); + } + node1.flush(KEYSPACE); + + node2.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success(); + + Object[][] rows = node2.executeInternal(withKeyspace("SELECT k FROM %s.t")); + assertThat(rows.length).isEqualTo(rowCount); + + assertThat(node2.logs().grep("Direct I/O writer activated for Stream").getResult()) + .describedAs("expected DIO writer to engage for STREAM op on receiver") + .isNotEmpty(); + } + } +} diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java index f93a0d00269b..d8582ac0180c 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java @@ -1074,4 +1074,33 @@ public void testMaxValueSize() Config config = DatabaseDescriptor.loadConfig(); Assert.assertEquals(config.max_value_size.toMebibytes() * 1024 * 1024, DatabaseDescriptor.getMaxValueSize()); } + + @Test + public void testInitializeBackgroundWriteDiskAccessModeRejectsZeroBufferSize() + { + Config conf = DatabaseDescriptor.getRawConfig(); + Config.DiskAccessMode savedMode = conf.background_write_disk_access_mode; + DataStorageSpec.IntKibibytesBound savedBufferSize = conf.direct_write_buffer_size; + try + { + conf.background_write_disk_access_mode = Config.DiskAccessMode.direct; + conf.direct_write_buffer_size = new DataStorageSpec.IntKibibytesBound("0KiB"); + + try + { + DatabaseDescriptor.initializeBackgroundWriteDiskAccessMode(); + fail("expected ConfigurationException for direct_write_buffer_size == 0"); + } + catch (ConfigurationException expected) + { + Assert.assertTrue("expected message to mention direct_write_buffer_size, got: " + expected.getMessage(), + expected.getMessage().contains("direct_write_buffer_size")); + } + } + finally + { + conf.background_write_disk_access_mode = savedMode; + conf.direct_write_buffer_size = savedBufferSize; + } + } } diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index cb55129909a4..fe07f51b62b6 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -18,12 +18,15 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -55,12 +58,15 @@ import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableTxnWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DirectIoTestUtils; import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.repair.NoSuchRepairSessionException; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.schema.Schema; @@ -78,6 +84,7 @@ import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -89,6 +96,7 @@ public class AntiCompactionTest { private static final String KEYSPACE1 = "AntiCompactionTest"; private static final String CF = "AntiCompactionTest"; + private static final String CF_COMPRESSED = "AntiCompactionCompressed"; private static final Collection> NO_RANGES = Collections.emptyList(); private static TableMetadata metadata; @@ -101,7 +109,10 @@ public static void defineSchema() throws Throwable { SchemaLoader.prepareServer(); metadata = SchemaLoader.standardCFMD(KEYSPACE1, CF).build(); - SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata); + TableMetadata compressedMetadata = SchemaLoader.standardCFMD(KEYSPACE1, CF_COMPRESSED) + .compression(CompressionParams.lz4()) + .build(); + SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata, compressedMetadata); cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id); local = InetAddressAndPort.getByName("127.0.0.1"); } @@ -233,6 +244,76 @@ public void antiCompactOneMixed() throws Exception assertOnDiskState(store, 3); } + @Test + public void testAntiCompactionWithCompressedTableAndDirectWrites() throws Throwable + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_COMPRESSED); + try + { + store.disableAutoCompaction(); + assertTrue("CF must be compressed for DIO to engage", + store.metadata().params.compression.isEnabled()); + + long ts = 1L; + populateDeterministic(store, ts); + SSTableStats baselineStats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES)); + assertEquals(2, baselineStats.numLiveSSTables); + assertEquals(4, baselineStats.pendingKeys); + assertEquals(6, baselineStats.unrepairedKeys); + Map baselineBytes = captureDataBytesByPendingRepair(store); + store.truncateBlocking(); + + populateDeterministic(store, ts); + SSTableStats directStats = DirectIoTestUtils.withDirectWrites(() -> + antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES))); + assertEquals(2, directStats.numLiveSSTables); + assertEquals(4, directStats.pendingKeys); + assertEquals(6, directStats.unrepairedKeys); + Map directBytes = captureDataBytesByPendingRepair(store); + + assertArrayEquals("pending-repair data file must match standard-writer output", + baselineBytes.get(true), directBytes.get(true)); + assertArrayEquals("unrepaired data file must match standard-writer output", + baselineBytes.get(false), directBytes.get(false)); + + Set activated = DirectIoTestUtils.activatedDirectWriteOperations(); + assertTrue("DirectCompressedSequentialWriter must have engaged for ANTICOMPACTION; activated=" + activated, + activated.contains(OperationType.ANTICOMPACTION)); + } + finally + { + store.truncateBlocking(); + } + } + + private void populateDeterministic(ColumnFamilyStore store, long ts) + { + TableMetadata md = store.metadata(); + for (int i = 0; i < 10; i++) + { + new RowUpdateBuilder(md, ts, Integer.toString(i)) + .clustering("c") + .add("val", "val") + .build() + .applyUnsafe(); + } + Util.flush(store); + } + + private Map captureDataBytesByPendingRepair(ColumnFamilyStore store) throws IOException + { + Map bytes = new HashMap<>(); + for (SSTableReader sstable : store.getLiveSSTables()) + { + byte[] prev = bytes.put(sstable.isPendingRepair(), + Files.readAllBytes(sstable.descriptor.fileFor(Components.DATA).toPath())); + assertTrue("expected one sstable per pending-repair status; duplicate at " + sstable.isPendingRepair(), + prev == null); + } + return bytes; + } + @Test public void antiCompactOneTransOnly() throws Exception { diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 3c536fc677ce..36ab11a02bc6 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -18,13 +18,17 @@ */ package org.apache.cassandra.db.compaction; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -53,10 +57,16 @@ import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.Slice; import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.ValueAccessors; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.ImmutableBTreePartition; @@ -77,6 +87,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.SchemaTestUtil; import org.apache.cassandra.schema.TableMetadata; @@ -97,6 +108,7 @@ public class CompactionsTest private static final String CF_STANDARD2 = "Standard2"; private static final String CF_STANDARD3 = "Standard3"; private static final String CF_STANDARD4 = "Standard4"; + private static final String CF_COMPRESSED_STANDARD1 = "CF_COMPRESSED_STANDARD1"; @Parameterized.Parameter(0) public DiskAccessMode compactionReadDiskAccessMode; @@ -104,25 +116,33 @@ public class CompactionsTest @Parameterized.Parameter(1) public boolean cursorCompactionEnabled; - @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1}") + @Parameterized.Parameter(2) + public DiskAccessMode backgroundWriteDiskAccessMode; + + @Parameterized.Parameters(name = "diskAccessMode={0},cursor={1},backgroundWriteMode={2}") public static Collection params() { - return Arrays.asList(new Object[]{ DiskAccessMode.standard, true }, - new Object[]{ DiskAccessMode.standard, false }, - new Object[]{ DiskAccessMode.direct, true }, - new Object[]{ DiskAccessMode.direct, false }); + // One direct-write cell instead of cross-multiplying: uncompressed CFs ignore the write mode. + return Arrays.asList(new Object[]{ DiskAccessMode.standard, true, DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.standard, false, DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.direct, true, DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.direct, false, DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.standard, false, DiskAccessMode.direct }); } private DiskAccessMode originalDiskAccessMode; private boolean originalCursorCompactionEnabled; + private DiskAccessMode originalBackgroundWriteDiskAccessMode; @Before public void setCompactionParams() { originalDiskAccessMode = DatabaseDescriptor.getCompactionReadDiskAccessMode(); originalCursorCompactionEnabled = DatabaseDescriptor.cursorCompactionEnabled(); + originalBackgroundWriteDiskAccessMode = DatabaseDescriptor.getBackgroundWriteDiskAccessMode(); DatabaseDescriptor.setCompactionReadDiskAccessMode(compactionReadDiskAccessMode); DatabaseDescriptor.setCursorCompactionEnabled(cursorCompactionEnabled); + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(backgroundWriteDiskAccessMode); } @After @@ -130,6 +150,7 @@ public void restoreCompactionParams() { DatabaseDescriptor.setCompactionReadDiskAccessMode(originalDiskAccessMode); DatabaseDescriptor.setCursorCompactionEnabled(originalCursorCompactionEnabled); + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(originalBackgroundWriteDiskAccessMode); } @BeforeClass @@ -149,7 +170,10 @@ public static void defineSchema() throws ConfigurationException .compaction(CompactionParams.stcs(compactionOptions)), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4)); + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4), + SchemaLoader.standardCFMD(KEYSPACE1, CF_COMPRESSED_STANDARD1, + 0, AsciiType.instance, BytesType.instance, AsciiType.instance) + .compression(CompressionParams.lz4())); } public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) @@ -422,6 +446,75 @@ public void testRangeTombstones() assertEquals(keys, k); } + @Test + public void testCompactionWithSizeLimitedRewriter() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_COMPRESSED_STANDARD1); + cfs.clearUnsafe(); + cfs.disableAutoCompaction(); + + assertTrue("CF_COMPRESSED_STANDARD1 must be compressed for DIO to engage", + cfs.metadata().params.compression.isEnabled()); + assertEquals(backgroundWriteDiskAccessMode, DatabaseDescriptor.getBackgroundWriteDiskAccessMode()); + + // ~800 KiB of incompressible random data across 200 partitions -> >=2 output sstables at 64 KiB cap. + int valSize = 4096; + byte[] val = new byte[valSize]; + new Random(42).nextBytes(val); + TableMetadata table = cfs.metadata(); + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 200; i++) + { + DecoratedKey key = Util.dk(String.format("%05d", i)); + new RowUpdateBuilder(table, timestamp, key.getKey()) + .clustering("0") + .add("val", ByteBuffer.wrap(val)) + .build() + .applyUnsafe(); + } + Util.flush(cfs); + assertEquals(1, cfs.getLiveSSTables().size()); + + Set originals = new HashSet<>(cfs.getLiveSSTables()); + long maxSSTableSize = 64L * 1024; + + LifecycleTransaction txn = cfs.getTracker().tryModify(originals, OperationType.COMPACTION); + CompactionTask task = new CompactionTask(cfs, txn, FBUtilities.nowInSeconds()) + { + @Override + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, + ILifecycleTransaction transaction, + Set nonExpiredSSTables) + { + return new MaxSSTableSizeWriter(cfs, directories, transaction, nonExpiredSSTables, maxSSTableSize, 0); + } + }; + task.execute(CompactionManager.instance.active); + + Set result = cfs.getLiveSSTables(); + assertTrue("expected segment rotation to produce >= 2 SSTables, got " + result.size(), result.size() >= 2); + + int partitionsRead = 0; + for (SSTableReader sstable : result) + { + try (ISSTableScanner scanner = sstable.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator it = scanner.next()) + { + while (it.hasNext()) + it.next(); + partitionsRead++; + } + } + } + } + assertEquals(200, partitionsRead); + } + private void testDontPurgeAccidentally(String k, String cfname) throws InterruptedException { // This test catches the regression of CASSANDRA-2786 diff --git a/test/unit/org/apache/cassandra/io/compress/DirectCompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/DirectCompressedSequentialWriterTest.java new file mode 100644 index 000000000000..192a6fae9ed6 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/DirectCompressedSequentialWriterTest.java @@ -0,0 +1,813 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.slf4j.LoggerFactory; + +import accord.utils.Gen; +import accord.utils.Gens; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.DataIntegrityMetadata; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.schema.CompressionParams; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; + +import static accord.utils.Property.qt; +import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mockStatic; + +public class DirectCompressedSequentialWriterTest +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(DirectCompressedSequentialWriterTest.class); + + // Logged in @BeforeClass so a property-test failure in CI can be reproduced locally. + private static long seed; + + @BeforeClass + public static void setupClass() + { + DatabaseDescriptor.daemonInitialization(); + seed = new Random().nextLong(); + logger.info("DirectCompressedSequentialWriterTest property-based seed: {}", seed); + } + + private static MetadataCollector newCollector() + { + return new MetadataCollector(new ClusteringComparator(Collections.singletonList(BytesType.instance))); + } + + @Test + public void testBufferSizedForWorstCaseCompressedOutput() throws Exception + { + // Chunk large enough that minRequiredSize dominates the default configured buffer, + // so the buffer sizing formula is exercised rather than masked by the config. + int largeChunk = 1024 * 1024; + CompressionParams params = CompressionParams.lz4(largeChunk, Integer.MAX_VALUE); + + File dataFile = FileUtils.createTempFile("bufferSize", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + try + { + MetadataCollector collector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, + SequentialWriterOption.DEFAULT, params, collector, null)) + { + int blockSize = FileUtils.getBlockSize(dataFile.parent()); + int maxChunkWrite = params.getSstableCompressor().initialCompressedBufferLength(largeChunk); + + Field f = DirectCompressedSequentialWriter.class.getDeclaredField("writeBuffer"); + f.setAccessible(true); + int bufferCapacity = ((ByteBuffer) f.get(writer)).capacity(); + + // After flushCompleteBlocks(), up to (blockSize - 1) bytes of leftover can remain. + // The buffer must still have room for the worst-case chunk write + CRC trailer. + int requiredCapacity = maxChunkWrite + Integer.BYTES + blockSize; + Assert.assertTrue( + String.format("Write buffer (%d) too small for worst-case chunk write (%d) + CRC (%d) + blockSize (%d) = %d", + bufferCapacity, maxChunkWrite, Integer.BYTES, blockSize, requiredCapacity), + bufferCapacity >= requiredCapacity); + } + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + /** + * Canary: FLUSH is the only path that produces NOOP today, and it is gated out of DIO. + * If that gating ever changes, NOOP's identity initialCompressedBufferLength would hit a + * different ctor sizing branch than real compressors. The randomized sweep covers NOOP at + * random sizes; this is the named beacon for that scenario. + */ + @Test + public void testNoopCompressor() throws IOException + { + testWriteAndRead("noop", DEFAULT_CHUNK_LENGTH * 3 + 137, CompressionParams.NOOP); + } + + @Test + public void testDigestMatchesStandardWriter() throws IOException + { + CompressionParams[] paramSet = { CompressionParams.lz4(), + CompressionParams.zstd(), + CompressionParams.snappy(), + CompressionParams.deflate() }; + int chunk = DEFAULT_CHUNK_LENGTH; + int[] sizes = { 1, chunk - 1, chunk, chunk + 1, chunk * 3 + 137 }; + + for (CompressionParams params : paramSet) + { + for (int dataSize : sizes) + { + byte[] testData = new byte[dataSize]; + new Random(0xC0FFEEL ^ dataSize).nextBytes(testData); + + byte[] standardDigest = writeAndReadDigest(testData, params, false); + byte[] directDigest = writeAndReadDigest(testData, params, true); + + String label = params.getSstableCompressor().getClass().getSimpleName() + "/" + dataSize; + assertArrayEquals("Digest mismatch for " + label, standardDigest, directDigest); + } + } + } + + private byte[] writeAndReadDigest(byte[] data, CompressionParams params, boolean direct) throws IOException + { + String prefix = (direct ? "direct" : "standard") + "_digest_"; + File dataFile = FileUtils.createTempFile(prefix, ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + File digestFile = FileUtils.createTempFile(prefix, ".digest"); + try + { + MetadataCollector collector = newCollector(); + try (SequentialWriter writer = direct + ? new DirectCompressedSequentialWriter(dataFile, metadataFile, digestFile, + SequentialWriterOption.DEFAULT, params, collector, null) + : new CompressedSequentialWriter(dataFile, metadataFile, digestFile, + SequentialWriterOption.DEFAULT, params, collector)) + { + writer.write(data); + writer.finish(); + } + return Files.readAllBytes(digestFile.toPath()); + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + digestFile.tryDelete(); + } + } + + @Test + public void testMarkThrowsUnsupportedOperationException() throws IOException + { + File dataFile = FileUtils.createTempFile("mark_test", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + try + { + MetadataCollector collector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, + SequentialWriterOption.DEFAULT, CompressionParams.lz4(), collector, null)) + { + writer.write(new byte[100]); + try + { + writer.mark(); + fail("Expected UnsupportedOperationException"); + } + catch (UnsupportedOperationException expected) + { + } + } + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + @Test + public void testResetAndTruncateThrowsUnsupportedOperationException() throws IOException + { + File dataFile = FileUtils.createTempFile("reset_test", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + try + { + MetadataCollector collector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, + SequentialWriterOption.DEFAULT, CompressionParams.lz4(), collector, null)) + { + writer.write(new byte[100]); + try + { + writer.resetAndTruncate(null); + fail("Expected UnsupportedOperationException"); + } + catch (UnsupportedOperationException expected) + { + } + } + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + @Test + public void testAbortCleansUpResources() throws IOException + { + File dataFile = FileUtils.createTempFile("abort_test", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + try + { + byte[] testData = new byte[1024]; + new Random(99).nextBytes(testData); + + MetadataCollector collector = newCollector(); + + // finishOnClose(false) routes close() through the abort path instead of finish(). + SequentialWriterOption abortOnCloseOption = SequentialWriterOption.newBuilder() + .finishOnClose(false) + .build(); + + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, + abortOnCloseOption, CompressionParams.lz4(), collector, null)) + { + writer.write(testData); + } + + // 1024 bytes < any reasonable blockSize (≥4096), so flushCompleteBlocks never had a + // full block to flush. doPrepare (where flushFinalWithPadding lives) is not called on + // the abort path, so nothing reaches the channel — the data file must be empty. + assertEquals("aborted data file should be empty (no flush, no doPrepare)", + 0L, dataFile.length()); + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + @Test + public void testDirectMemoryIsCleanedOnClose() throws IOException + { + // Sized to dominate baseline allocator noise; matches DirectThreadLocalReadAheadBufferTest. + int bufferSize = 64 * 1024 * 1024; + Config conf = DatabaseDescriptor.getRawConfig(); + DataStorageSpec.IntKibibytesBound savedBufferSize = conf.direct_write_buffer_size; + + File dataFile = FileUtils.createTempFile("direct_mem_clean", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + try + { + conf.direct_write_buffer_size = new DataStorageSpec.IntKibibytesBound(bufferSize / 1024 + "KiB"); + + BufferPoolMXBean directPool = getDirectBufferPool(); + MetadataCollector collector = newCollector(); + SequentialWriterOption abortOnClose = SequentialWriterOption.newBuilder().finishOnClose(false).build(); + + long memoryUsedBefore; + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, abortOnClose, CompressionParams.lz4(), collector, null)) + { + writer.write(new byte[1024]); + memoryUsedBefore = directPool.getMemoryUsed(); + } + long memoryUsedAfter = directPool.getMemoryUsed(); + long actualDecrease = memoryUsedBefore - memoryUsedAfter; + + Assert.assertTrue("Direct memory should drop by ~bufferSize on close. before=" + memoryUsedBefore + + ", after=" + memoryUsedAfter + ", decrease=" + actualDecrease + ", expected~=" + bufferSize, + actualDecrease >= bufferSize * 0.9); // 10% tolerance for alignment overhead + } + finally + { + conf.direct_write_buffer_size = savedBufferSize; + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + private static BufferPoolMXBean getDirectBufferPool() + { + for (BufferPoolMXBean pool : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) + if (pool.getName().equals("direct")) + return pool; + throw new IllegalStateException("Direct buffer pool not found"); + } + + // Both ctor calls in this method intentionally throw before producing a writer; there is no + // AutoCloseable to manage. The IDE's try-with-resources hint would defeat the test. + @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" }) + @Test + public void testConstructorFailureClosesParentChannel() + { + File dataFile = FileUtils.createTempFile("ctor_leak", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + try + { + MetadataCollector collector = newCollector(); + + // Force getBlockSize() to 0 so the constructor throws after super() has opened + // the channel and allocated parent buffers, but before writeBuffer is assigned. + ChannelCapturingWriter.lastCapturedChannel = null; + try (MockedStatic mocked = mockStatic(FileUtils.class, CALLS_REAL_METHODS)) + { + mocked.when(() -> FileUtils.getBlockSize(any())).thenReturn(0); + + try + { + new ChannelCapturingWriter(dataFile, metadataFile, SequentialWriterOption.DEFAULT, + CompressionParams.lz4(), collector); + fail("Expected IllegalStateException from constructor"); + } + catch (IllegalStateException expected) + { + Assert.assertTrue("expected block-size message, got: " + expected.getMessage(), + expected.getMessage().contains("block size")); + } + } + + FileChannel parentChannel = ChannelCapturingWriter.lastCapturedChannel; + assertNotNull("test subclass should have captured the parent FileChannel", parentChannel); + assertFalse("parent FileChannel must be closed after constructor failure", + parentChannel.isOpen()); + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + // The ctor call below intentionally throws before producing a writer; no AutoCloseable to manage. + @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" }) + @Test + public void testConstructorRejectsNonPowerOfTwoBlockSize() + { + File dataFile = FileUtils.createTempFile("nonpow2_blocksize", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + try + { + MetadataCollector collector = newCollector(); + + try (MockedStatic mocked = mockStatic(FileUtils.class, CALLS_REAL_METHODS)) + { + mocked.when(() -> FileUtils.getBlockSize(any())).thenReturn(4097); + + try + { + new DirectCompressedSequentialWriter(dataFile, metadataFile, null, + SequentialWriterOption.DEFAULT, + CompressionParams.lz4(), collector, null); + fail("Expected IllegalStateException for non-power-of-two block size"); + } + catch (IllegalStateException expected) + { + Assert.assertTrue("expected power-of-two message, got: " + expected.getMessage(), + expected.getMessage().contains("power of two")); + } + } + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + @Test + public void testUndersizedBufferLogsWarningOnce() throws Exception + { + Config conf = DatabaseDescriptor.getRawConfig(); + DataStorageSpec.IntKibibytesBound savedBufferSize = conf.direct_write_buffer_size; + + Logger writerLogger = (Logger) LoggerFactory.getLogger(DirectCompressedSequentialWriter.class); + ListAppender appender = new ListAppender<>(); + appender.start(); + writerLogger.addAppender(appender); + + // Reset the once-per-JVM guard so the test outcome is independent of execution order. + Field warnedField = DirectCompressedSequentialWriter.class.getDeclaredField("undersizedBufferWarned"); + warnedField.setAccessible(true); + AtomicBoolean warned = (AtomicBoolean) warnedField.get(null); + warned.set(false); + + File file1 = FileUtils.createTempFile("undersized_1", ".db"); + File meta1 = new File(file1.absolutePath() + ".metadata"); + File file2 = FileUtils.createTempFile("undersized_2", ".db"); + File meta2 = new File(file2.absolutePath() + ".metadata"); + try + { + // 1 KiB is well below lz4's minRequiredSize (worst-case 64 KiB chunk + 4 + 4 KiB block). + conf.direct_write_buffer_size = new DataStorageSpec.IntKibibytesBound("1KiB"); + + MetadataCollector c1 = newCollector(); + try (DirectCompressedSequentialWriter w = new DirectCompressedSequentialWriter( + file1, meta1, null, SequentialWriterOption.DEFAULT, CompressionParams.lz4(), c1, null)) + { + // construction is enough to trigger the warning + } + + MetadataCollector c2 = newCollector(); + try (DirectCompressedSequentialWriter w = new DirectCompressedSequentialWriter( + file2, meta2, null, SequentialWriterOption.DEFAULT, CompressionParams.lz4(), c2, null)) + { + // second construction must not re-fire the warning + } + + List warnings = appender.list.stream() + .filter(e -> e.getLevel() == Level.WARN) + .filter(e -> e.getFormattedMessage().contains("direct_write_buffer_size")) + .collect(Collectors.toList()); + assertEquals("Expected exactly one undersized-buffer warning across two writers, got: " + warnings, + 1, warnings.size()); + } + finally + { + conf.direct_write_buffer_size = savedBufferSize; + writerLogger.detachAppender(appender); + warned.set(false); + file1.tryDelete(); + meta1.tryDelete(); + file2.tryDelete(); + meta2.tryDelete(); + } + } + + /** + * Captures the parent {@code FileChannel} via the overridden {@code txnProxy()}, which + * {@code SequentialWriter}'s field initializer invokes before this subclass's instance + * fields are assigned — hence the static slot. + */ + private static class ChannelCapturingWriter extends DirectCompressedSequentialWriter + { + static volatile FileChannel lastCapturedChannel; + + ChannelCapturingWriter(File file, + File offsetsFile, + SequentialWriterOption option, + CompressionParams parameters, + MetadataCollector collector) + { + super(file, offsetsFile, null, option, parameters, collector, null); + } + + @Override + protected SequentialWriter.TransactionalProxy txnProxy() + { + lastCapturedChannel = (FileChannel) channel; + return super.txnProxy(); + } + } + + @Test + public void testDigestFileValidation() throws IOException + { + CompressionParams params = CompressionParams.lz4(); + int chunkLength = params.chunkLength(); + + for (int dataSize : new int[]{ 100, chunkLength, chunkLength * 3 + 500 }) + { + File dataFile = FileUtils.createTempFile("digest_validate_" + dataSize, ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + File digestFile = FileUtils.createTempFile("digest_validate_" + dataSize, ".digest"); + + try + { + byte[] testData = new byte[dataSize]; + new Random(42).nextBytes(testData); + + MetadataCollector collector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, digestFile, + SequentialWriterOption.DEFAULT, params, collector, null)) + { + writer.write(testData); + writer.finish(); + } + + DataIntegrityMetadata.FileDigestValidator validator = + new DataIntegrityMetadata.FileDigestValidator(dataFile, digestFile); + validator.validate(); + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + digestFile.tryDelete(); + } + } + } + + @Test + public void testCompressionFailureFallback() throws IOException + { + int chunkLength = DEFAULT_CHUNK_LENGTH; + // maxCompressedLength == chunkLength forces fallback to the uncompressed path when + // a chunk doesn't compress smaller than the input. + CompressionParams params = CompressionParams.lz4(chunkLength, chunkLength); + int dataSize = chunkLength * 3; + + File dataFile = FileUtils.createTempFile("compressionFailure_direct", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + // Random bytes don't compress under LZ4, so at least one chunk MUST take the + // fallback. Inlined rather than via testWriteAndRead so we can inspect the metadata. + byte[] testData = new byte[dataSize]; + new Random(42).nextBytes(testData); + + try + { + MetadataCollector collector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, + SequentialWriterOption.DEFAULT, params, collector, null)) + { + writer.write(testData); + writer.finish(); + } + + try (CompressionMetadata metadata = CompressionMetadata.open(metadataFile, dataFile.length(), true); + FileHandle fh = new FileHandle.Builder(dataFile).withCompressionMetadata(metadata).complete(); + RandomAccessReader reader = fh.createReader()) + { + assertEquals("Length should match", dataSize, reader.length()); + + byte[] readBack = new byte[dataSize]; + reader.readFully(readBack); + assertArrayEquals("Data should match", testData, readBack); + + // Fallback chunks have stored length == chunkLength (data stored verbatim); + // compressed chunks have length < chunkLength. + int chunks = (dataSize + chunkLength - 1) / chunkLength; + boolean anyUncompressed = false; + for (int i = 0; i < chunks; i++) + if (metadata.chunkFor((long) i * chunkLength).length == chunkLength) + { + anyUncompressed = true; + break; + } + assertTrue("expected at least one chunk to take the uncompressed fallback path", + anyUncompressed); + } + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + /** + * Data-file byte equivalence in the uncompressed-fallback mode (maxCompressedLength == chunkLength + * on incompressible data). testDigestMatchesStandardWriter covers digest equivalence in normal + * mode; this is its companion for fallback mode. Digest equivalence is implied here because the + * digest is a deterministic function of the data + per-chunk CRC bytes. + */ + @Test + public void testCompressionFailureMatchesStandardWriter() throws IOException + { + int chunkLength = DEFAULT_CHUNK_LENGTH; + CompressionParams params = CompressionParams.lz4(chunkLength, chunkLength); + byte[] payload = new byte[chunkLength * 2 + 100]; // partial last chunk exercises padding + new Random(42).nextBytes(payload); + assertWriteAndReadMatchesStandardWriter(payload, params); + } + + private void testWriteAndRead(String testName, int dataSize, CompressionParams params) throws IOException + { + File dataFile = FileUtils.createTempFile(testName + "_direct", ".db"); + File metadataFile = new File(dataFile.absolutePath() + ".metadata"); + + byte[] testData = new byte[dataSize]; + new Random(42).nextBytes(testData); + + try + { + MetadataCollector collector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + dataFile, metadataFile, null, + SequentialWriterOption.DEFAULT, params, collector, null)) + { + writer.write(testData); + writer.finish(); + } + + try (CompressionMetadata metadata = CompressionMetadata.open(metadataFile, dataFile.length(), true); + FileHandle fh = new FileHandle.Builder(dataFile).withCompressionMetadata(metadata).complete(); + RandomAccessReader reader = fh.createReader()) + { + assertEquals("Length should match", dataSize, reader.length()); + + byte[] readBack = new byte[dataSize]; + reader.readFully(readBack); + + assertArrayEquals("Data should match", testData, readBack); + } + } + finally + { + dataFile.tryDelete(); + metadataFile.tryDelete(); + } + } + + private enum CompressorKind { Lz4, Zstd, Snappy, Deflate, Noop } + + // Powers of two 1 KiB → 64 KiB span below-block (1 KiB on most filesystems), at-block, + // above-block, and many-blocks-per-chunk regimes — each exercises a different combination + // of branches in writeToAlignedBuffer / flushCompleteBlocks / flushFinalWithPadding. + private static Gen mixedChunkLengths() + { + return Gens.pick(Stream.iterate(1024, n -> n <= 64 * 1024, n -> n * 2) + .collect(Collectors.toList())); + } + + private static Gen compressionParamsGen(Gen chunkLengths) + { + return rs -> { + int cl = chunkLengths.next(rs); + switch (rs.pick(CompressorKind.values())) + { + case Lz4: return CompressionParams.lz4(cl); + case Zstd: return CompressionParams.zstd(cl); + case Snappy: return CompressionParams.snappy(cl, 1.1); + case Deflate: return CompressionParams.deflate(cl); + case Noop: return CompressionParams.NOOP; + default: throw new AssertionError(); + } + }; + } + + @Test + public void testRandomizedWriteAndReadMatchesStandardWriter() + { + Gen paramsGen = compressionParamsGen(mixedChunkLengths()); + // 1 B – 1 MiB covers below-chunk, single-chunk, multi-chunk, many-chunk, and + // large-file regimes in one sweep. + Gen.LongGen fileLengthGen = Gens.longs().between(1, 1 << 20); + + // 100 iterations keeps wall time bounded while exercising hundreds of + // (compressor, chunkLength, fileSize) combinations. + qt().withSeed(seed).withExamples(100).forAll(Gens.random(), paramsGen).check((rs, params) -> { + int dataSize = (int) fileLengthGen.nextLong(rs); + byte[] payload = new byte[dataSize]; + rs.nextBytes(payload); + assertWriteAndReadMatchesStandardWriter(payload, params); + }); + } + + private static void assertWriteAndReadMatchesStandardWriter(byte[] payload, CompressionParams params) throws IOException + { + File directFile = FileUtils.createTempFile("rt_direct", ".db"); + File directMeta = new File(directFile.absolutePath() + ".metadata"); + File standardFile = FileUtils.createTempFile("rt_standard", ".db"); + File standardMeta = new File(standardFile.absolutePath() + ".metadata"); + + try + { + MetadataCollector dCollector = newCollector(); + try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter( + directFile, directMeta, null, SequentialWriterOption.DEFAULT, params, dCollector, null)) + { + writer.write(payload); + writer.finish(); + } + + byte[] directReadBack; + try (CompressionMetadata metadata = CompressionMetadata.open(directMeta, directFile.length(), true); + FileHandle fh = new FileHandle.Builder(directFile).withCompressionMetadata(metadata).complete(); + RandomAccessReader reader = fh.createReader()) + { + assertEquals("DIO writer write/read length mismatch", payload.length, reader.length()); + directReadBack = new byte[payload.length]; + reader.readFully(directReadBack); + } + assertArrayEquals("DIO writer write/read data mismatch", payload, directReadBack); + + MetadataCollector sCollector = newCollector(); + try (CompressedSequentialWriter writer = new CompressedSequentialWriter( + standardFile, standardMeta, null, SequentialWriterOption.DEFAULT, params, sCollector)) + { + writer.write(payload); + writer.finish(); + } + + byte[] standardBytes = Files.readAllBytes(standardFile.toPath()); + byte[] directBytes = Files.readAllBytes(directFile.toPath()); + assertArrayEquals("Direct vs standard writer on-disk bytes differ for " + + params.getSstableCompressor().getClass().getSimpleName() + + ", chunkLength=" + params.chunkLength() + ", size=" + payload.length, + standardBytes, directBytes); + } + finally + { + directFile.tryDelete(); + directMeta.tryDelete(); + standardFile.tryDelete(); + standardMeta.tryDelete(); + } + } + + @Test + public void testWriteAndReadAcrossBufferSizes() throws IOException + { + int chunk = DEFAULT_CHUNK_LENGTH; + CompressionParams params = CompressionParams.lz4(chunk); + + int max = params.getSstableCompressor().initialCompressedBufferLength(chunk); + // 4 KiB is the common-case filesystem block; the writer rounds up to the actual + // blockSize at runtime, so these sizes remain valid lower bounds on 8 KiB FSes too. + int blockProxy = 4096; + int minRequired = max + Integer.BYTES + blockProxy; + + // Three buffer regimes pin different flushCompleteBlocks branches: + // exactly minRequired → clear path dominates + // minRequired + block → frequent compact() of sub-block tail + // 1 MiB → many chunks accumulate before flush + // Crossed with three data sizes hitting different final-tail regimes: + // chunk - 1 → flushFinalWithPadding only + // chunk * 3 → 3 full chunks, partial-block tail at end + // chunk * 3 + 137 → 3 full chunks + explicit non-aligned tail + int[] bufferSizesBytes = { roundUpToKiB(minRequired), + roundUpToKiB(minRequired + blockProxy), + 1024 * 1024 }; + int[] dataSizes = { chunk - 1, chunk * 3, chunk * 3 + 137 }; + + Config conf = DatabaseDescriptor.getRawConfig(); + DataStorageSpec.IntKibibytesBound saved = conf.direct_write_buffer_size; + try + { + for (int sz : bufferSizesBytes) + { + conf.direct_write_buffer_size = new DataStorageSpec.IntKibibytesBound(sz / 1024 + "KiB"); + for (int dataSize : dataSizes) + testWriteAndRead("bufSize_" + sz + "_" + dataSize, dataSize, params); + } + } + finally + { + conf.direct_write_buffer_size = saved; + } + } + + // Round up so we never land below minRequired due to integer division when converting to KiB. + private static int roundUpToKiB(int bytes) + { + return ((bytes + 1023) / 1024) * 1024; + } +} diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java index 64c617667cfe..10de90b4f12a 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java @@ -18,18 +18,39 @@ package org.apache.cassandra.io.sstable; +import java.util.Arrays; +import java.util.Collection; + +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.Config.DiskAccessMode; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +@RunWith(Parameterized.class) public class CQLSSTableWriterDaemonTest extends CQLSSTableWriterTest { + @Parameterized.Parameter + public DiskAccessMode backgroundWriteMode; + + @Parameterized.Parameters(name = "backgroundWriteMode={0}") + public static Collection modes() + { + return Arrays.asList(new Object[]{ DiskAccessMode.standard }, + new Object[]{ DiskAccessMode.direct }); + } + + private DiskAccessMode originalBackgroundWriteMode; + @BeforeClass public static void setup() throws Exception { @@ -41,4 +62,17 @@ public static void setup() throws Exception MessagingService.instance().waitUntilListeningUnchecked(); StorageService.instance.initServer(); } + + @Before + public void setBackgroundWriteMode() + { + originalBackgroundWriteMode = DatabaseDescriptor.getBackgroundWriteDiskAccessMode(); + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(backgroundWriteMode); + } + + @After + public void restoreBackgroundWriteMode() + { + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(originalBackgroundWriteMode); + } } diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index a9a304787bf7..bede3ac49694 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -53,6 +54,8 @@ import org.junit.rules.TemporaryFolder; import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -62,6 +65,7 @@ import org.apache.cassandra.cql3.functions.types.TypeCodec; import org.apache.cassandra.cql3.functions.types.UDTValue; import org.apache.cassandra.cql3.functions.types.UserType; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compression.CompressionDictionary; import org.apache.cassandra.db.compression.CompressionDictionary.DictId; import org.apache.cassandra.db.compression.ZstdCompressionDictionary; @@ -81,6 +85,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.io.sstable.format.bti.BtiFormat; +import org.apache.cassandra.io.util.DirectIoTestUtils; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.locator.RangesAtEndpoint; @@ -202,6 +207,51 @@ private void testWritingSstableWithFormat(SSTableFormat format) throws Exc } } + @Test + public void testCompressedWriteEngagesDirectWriterAndReadsBack() throws Exception + { + String schema = "CREATE TABLE " + qualifiedTable + " (" + + " k int PRIMARY KEY," + + " v text" + + ") WITH compression = {'enabled':'true', 'class':'LZ4Compressor'}"; + String insert = "INSERT INTO " + qualifiedTable + " (k, v) VALUES (?, ?)"; + + DirectIoTestUtils.activatedDirectWriteOperations().clear(); + + try (CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using(insert) + .build()) + { + for (int i = 0; i < 50; i++) + writer.addRow(i, "row-" + i); + } + + if (DatabaseDescriptor.getBackgroundWriteDiskAccessMode() == DiskAccessMode.direct) + { + Set activated = DirectIoTestUtils.activatedDirectWriteOperations(); + assertTrue("DirectCompressedSequentialWriter must have engaged for WRITE; activated=" + activated, + activated.contains(OperationType.WRITE)); + } + + loadSSTables(dataDir, keyspace, table); + + if (verifyDataAfterLoading) + { + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT k, v FROM " + qualifiedTable); + assertEquals(50, rs.size()); + + Set seen = new HashSet<>(); + for (UntypedResultSet.Row row : rs) + { + int k = row.getInt("k"); + assertTrue("duplicate key " + k, seen.add(k)); + assertEquals("row-" + k, row.getString("v")); + } + } + } + private void validateFilesAreInFormat(SSTableFormat format) throws IOException { try (Stream dataFilePaths = Files.list(dataDir.toPath()).filter(p -> p.toString().endsWith("Data.db"))) diff --git a/test/unit/org/apache/cassandra/io/sstable/format/DataComponentDirectWriteSelectionTest.java b/test/unit/org/apache/cassandra/io/sstable/format/DataComponentDirectWriteSelectionTest.java new file mode 100644 index 000000000000..68c9501de1e1 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/format/DataComponentDirectWriteSelectionTest.java @@ -0,0 +1,192 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.io.sstable.format; + +import java.nio.file.Files; +import java.util.Collections; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.config.Config.FlushCompression; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.DirectCompressedSequentialWriter; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.ChecksummedSequentialWriter; +import org.apache.cassandra.io.util.DirectIoTestUtils; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.schema.TableMetadata; + +import static org.junit.Assert.assertTrue; + +public class DataComponentDirectWriteSelectionTest +{ + private static final OperationType[] SUPPORTED_OPS = { + OperationType.WRITE, + OperationType.COMPACTION, + OperationType.MAJOR_COMPACTION, + OperationType.GARBAGE_COLLECT, + OperationType.ANTICOMPACTION, + OperationType.TOMBSTONE_COMPACTION, + OperationType.CLEANUP, + OperationType.UPGRADE_SSTABLES, + OperationType.STREAM, + }; + + private static final String KS = "ks_dio_selection"; + private static final String CF = "cf_dio_selection"; + + private File tmpDir; + private int nextId; + + @BeforeClass + public static void setupClass() + { + DatabaseDescriptor.daemonInitialization(); + } + + @Before + public void setUp() throws Exception + { + tmpDir = new File(Files.createTempDirectory("dio_selection")); + nextId = 0; + } + + @After + public void tearDown() + { + if (tmpDir != null) + FileUtils.deleteRecursive(tmpDir); + } + + @Test + public void testSupportedOpsReturnDirectWriterWhenCompressedAndDirectMode() throws Exception + { + DirectIoTestUtils.withDirectWrites(() -> { + for (OperationType op : SUPPORTED_OPS) + { + try (SequentialWriter w = build(compressedMetadata(), op)) + { + assertTrue("Expected DirectCompressedSequentialWriter for " + op + ", got " + w.getClass().getSimpleName(), + w instanceof DirectCompressedSequentialWriter); + } + } + }); + } + + @Test + public void testFlushFallsBackToStandardEvenInDirectMode() throws Exception + { + DirectIoTestUtils.withDirectWrites(() -> { + try (SequentialWriter w = build(compressedMetadata(), OperationType.FLUSH)) + { + assertTrue("FLUSH must not use DirectCompressedSequentialWriter (UNSUPPORTED_POLICY), got " + w.getClass().getSimpleName(), + w instanceof CompressedSequentialWriter && !(w instanceof DirectCompressedSequentialWriter)); + } + }); + } + + @Test + public void testScrubFallsBackToStandardEvenInDirectMode() throws Exception + { + DirectIoTestUtils.withDirectWrites(() -> { + try (SequentialWriter w = build(compressedMetadata(), OperationType.SCRUB)) + { + assertTrue("SCRUB must not use DirectCompressedSequentialWriter (UNSUPPORTED_CORRECTNESS), got " + w.getClass().getSimpleName(), + w instanceof CompressedSequentialWriter && !(w instanceof DirectCompressedSequentialWriter)); + } + }); + } + + @Test + public void testUncompressedAlwaysUsesStandardWriter() throws Exception + { + DirectIoTestUtils.withDirectWrites(() -> { + for (OperationType op : SUPPORTED_OPS) + { + try (SequentialWriter w = build(uncompressedMetadata(), op)) + { + assertTrue("Uncompressed tables must use ChecksummedSequentialWriter for " + op + ", got " + w.getClass().getSimpleName(), + w instanceof ChecksummedSequentialWriter); + } + } + }); + } + + @Test + public void testBufferedModeNeverPicksDirectWriter() + { + DiskAccessMode original = DatabaseDescriptor.getBackgroundWriteDiskAccessMode(); + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(DiskAccessMode.standard); + try + { + for (OperationType op : SUPPORTED_OPS) + { + try (SequentialWriter w = build(compressedMetadata(), op)) + { + assertTrue("Buffered mode must not produce DirectCompressedSequentialWriter for " + op + ", got " + w.getClass().getSimpleName(), + w instanceof CompressedSequentialWriter && !(w instanceof DirectCompressedSequentialWriter)); + } + } + } + finally + { + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(original); + } + } + + private SequentialWriter build(TableMetadata metadata, OperationType op) + { + Descriptor descriptor = new Descriptor(tmpDir, KS, CF, new SequenceBasedSSTableId(nextId++), + DatabaseDescriptor.getSelectedSSTableFormat()); + MetadataCollector collector = new MetadataCollector(new ClusteringComparator(Collections.singletonList(BytesType.instance))); + // finishOnClose(false) routes try-with-resources close through abort, avoiding finish() side effects on an empty writer. + SequentialWriterOption options = SequentialWriterOption.newBuilder().finishOnClose(false).build(); + return DataComponent.buildWriter(descriptor, metadata, options, collector, op, FlushCompression.fast, null); + } + + private static TableMetadata compressedMetadata() + { + return TableMetadata.builder(KS, CF) + .addPartitionKeyColumn("k", BytesType.instance) + .compression(CompressionParams.lz4()) + .build(); + } + + private static TableMetadata uncompressedMetadata() + { + return TableMetadata.builder(KS, CF) + .addPartitionKeyColumn("k", BytesType.instance) + .compression(CompressionParams.noCompression()) + .build(); + } +} diff --git a/test/unit/org/apache/cassandra/io/util/DirectIoTestUtils.java b/test/unit/org/apache/cassandra/io/util/DirectIoTestUtils.java new file mode 100644 index 000000000000..7c71ccd2cc09 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/DirectIoTestUtils.java @@ -0,0 +1,68 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.io.util; + +import java.lang.reflect.Field; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.DataComponent; +import org.apache.cassandra.utils.Throwables.ThrowingRunnable; + +public final class DirectIoTestUtils +{ + private DirectIoTestUtils() {} + + public static void withDirectWrites(ThrowingRunnable body) throws Exception + { + withDirectWrites(() -> { body.run(); return null; }); + } + + public static T withDirectWrites(Callable body) throws Exception + { + DiskAccessMode original = DatabaseDescriptor.getBackgroundWriteDiskAccessMode(); + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(DiskAccessMode.direct); + try + { + return body.call(); + } + finally + { + DatabaseDescriptor.setBackgroundWriteDiskAccessMode(original); + } + } + + @SuppressWarnings("unchecked") + public static Set activatedDirectWriteOperations() + { + try + { + Field f = DataComponent.class.getDeclaredField("directWriteLogged"); + f.setAccessible(true); + return (Set) f.get(null); + } + catch (ReflectiveOperationException e) + { + throw new AssertionError("DataComponent.directWriteLogged not accessible", e); + } + } +}