Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.2.0
-----
* Fix LEAK DETECTED errors during bulk read (CASSANALYTICS-87)
* Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84)
* Analytics job fails when source table has secondary indexes (CASSANALYTICS-86)
* Set KeyStore to be optional (CASSANALYTICS-69)
Expand Down
1 change: 1 addition & 0 deletions cassandra-five-zero-bridge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {
testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}")
testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}")
testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4')
testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}")

testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}")
testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.cassandra.spark.reader.CompactionStreamScanner;
import org.apache.cassandra.spark.reader.IndexEntry;
import org.apache.cassandra.spark.reader.IndexReader;
import org.apache.cassandra.spark.reader.IndexSummaryComponent;
import org.apache.cassandra.spark.reader.ReaderUtils;
import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.SchemaBuilder;
Expand Down Expand Up @@ -567,7 +568,7 @@ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner,
{
try
{
SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval);
IndexSummaryComponent summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval);
Pair<DecoratedKey, DecoratedKey> keys = summary == null ? null : Pair.of(summary.first(), summary.last());
if (summary == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static Long findDataDbOffset(@NotNull IndexSummary indexSummary,
@NotNull SSTable ssTable,
@NotNull Stats stats) throws IOException
{
long searchStartOffset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue());
long searchStartOffset = findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue());

// Open the Index.db, skip to nearest offset found in Summary.db and find start & end offset for the Data.db file
return findDataDbOffset(range, partitioner, ssTable, stats, searchStartOffset);
Expand Down Expand Up @@ -171,4 +171,50 @@ static BigInteger readNextToken(@NotNull IPartitioner partitioner,
stats.readPartitionIndexDb((ByteBuffer) key.rewind(), token);
return token;
}

/**
* Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for
*
* @param summary IndexSummary from Summary.db file
* @param partitioner Cassandra partitioner to hash partition keys to token
* @param token the token we are trying to find
* @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for
*/
public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token)
{
return summary.getPosition(binarySearchSummary(summary, partitioner, token));
}

/**
* The class is private on purpose.
* Think carefully if you want to open up the access modifier from private to public.
* IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault.
*/
private static class IndexSummaryTokenList implements SummaryDbUtils.TokenList
{
final IPartitioner partitioner;
final IndexSummary summary;

IndexSummaryTokenList(IPartitioner partitioner,
IndexSummary summary)
{
this.partitioner = partitioner;
this.summary = summary;
}

public int size()
{
return summary.size();
}

public BigInteger tokenAt(int index)
{
return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken());
}
}

public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token)
{
return SummaryDbUtils.binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.FileType;
Expand Down Expand Up @@ -73,7 +74,7 @@ public IndexReader(@NotNull SSTable ssTable,
now = System.nanoTime();
if (rangeFilter != null)
{
SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
if (summary != null)
{
this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()),
Expand All @@ -87,10 +88,13 @@ public IndexReader(@NotNull SSTable ssTable,
return;
}

skipAhead = summary.summary().getPosition(
SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())
);
stats.indexSummaryFileRead(System.nanoTime() - now);
try (IndexSummary indexSummary = summary.summarySharedCopy())
{
skipAhead = indexSummary.getPosition(
IndexDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())
);
stats.indexSummaryFileRead(System.nanoTime() - now);
}
now = System.nanoTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cassandra.spark.reader;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.RebufferingChannelInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.jetbrains.annotations.Nullable;

public class IndexSummaryComponent implements AutoCloseable
{
private final IndexSummary indexSummary;
private final DecoratedKey firstKey;
private final DecoratedKey lastKey;

/**
* Read and deserialize the Summary.db file
*
* @param summaryStream input stream for Summary.db file
* @param partitioner token partitioner
* @param minIndexInterval min index interval
* @param maxIndexInterval max index interval
* @return Summary object
* @throws IOException io exception
*/
@Nullable
static IndexSummaryComponent readSummary(InputStream summaryStream,
IPartitioner partitioner,
int minIndexInterval,
int maxIndexInterval) throws IOException
{
if (summaryStream == null)
{
return null;
}

int bufferSize = ReaderUtils.inputStreamBufferSize(summaryStream);
try (DataInputStream is = new DataInputStream(summaryStream);
DataInputPlus.DataInputStreamPlus dis = new RebufferingChannelInputStream(is, bufferSize))
{
IndexSummary indexSummary = IndexSummary.serializer.deserialize(dis, partitioner, minIndexInterval, maxIndexInterval);
DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis));
DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis));
return new IndexSummaryComponent(indexSummary, firstKey, lastKey);
}
}

IndexSummaryComponent(IndexSummary indexSummary,
DecoratedKey firstKey,
DecoratedKey lastKey)
{
this.indexSummary = indexSummary;
this.firstKey = firstKey;
this.lastKey = lastKey;
}

/**
* Get a shared copy of the IndexSummary, whose reference count is incremented.
* It is important to close the shared copy to decrement the reference count.
* @return a shared copy of the IndexSummary object
*/
public IndexSummary summarySharedCopy()
{
return indexSummary.sharedCopy();
}

public DecoratedKey first()
{
return firstKey;
}

public DecoratedKey last()
{
return lastKey;
}

@Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly.
public void close() throws Exception
{
indexSummary.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public SSTableReader(@NotNull TableMetadata metadata,
Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
this.version = descriptor.version;

SummaryDbUtils.Summary summary = null;
IndexSummaryComponent summary = null;
Pair<DecoratedKey, DecoratedKey> keys = null;
try
{
Expand Down Expand Up @@ -390,11 +390,16 @@ public SSTableReader(@NotNull TableMetadata metadata,
buildColumnFilter(metadata, columnFilter));
this.metadata = metadata;

// The summary might be evicted already if downloading other components takes too long;
// Get it from cache again to avoid using the already evicted object
summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
if (readIndexOffset && summary != null)
{
SummaryDbUtils.Summary finalSummary = summary;
extractRange(sparkRangeFilter, partitionKeyFilters)
.ifPresent(range -> readOffsets(finalSummary.summary(), range));
try (IndexSummary indexSummary = summary.summarySharedCopy())
{
extractRange(sparkRangeFilter, partitionKeyFilters)
.ifPresent(range -> readOffsets(indexSummary, range));
}
}
else
{
Expand Down
Loading