Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
0.2.0
0.3.0
-----
* File descriptor leak after file streamed in Sidecar Client (CASSANALYTICS-103)
* Add TimeRangeFilter to filter out SSTables outside given time window (CASSANALYTICS-102)

0.2.0
-----
* Generated distribution artifacts fix (CASSANALYTICS-105)
* Fix SSTable descriptor mismatch preventing newly produced SSTables from being uploaded (CASSANALYTICS-98)
* Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.cassandra.sidecar.common.request.Request;
import org.apache.cassandra.sidecar.common.request.UploadableRequest;

import static java.lang.String.valueOf;
import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE;
import static org.apache.cassandra.sidecar.common.utils.StringUtils.isNullOrEmpty;

Expand Down Expand Up @@ -165,11 +166,7 @@ protected CompletableFuture<HttpResponse> executeUploadFileInternal(SidecarInsta
Promise<HttpResponse> promise = Promise.promise();
// open the local file
openFileForRead(vertx.fileSystem(), filename)
.compose(pair -> vertxRequest.ssl(config.ssl())
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
String.valueOf(pair.getKey()))
.sendStream(pair.getValue()
.setReadBufferSize(config.sendReadBufferSize())))
.compose(pair -> sendFileStream(vertxRequest, pair, filename))
.onFailure(promise::fail)
.onSuccess(response -> {
byte[] raw = response.body() != null ? response.body().getBytes() : null;
Expand All @@ -184,6 +181,32 @@ protected CompletableFuture<HttpResponse> executeUploadFileInternal(SidecarInsta
return promise.future().toCompletionStage().toCompletableFuture();
}

/**
* Sends the file stream via HTTP request.
*
* @param vertxRequest the HTTP request to send the file stream with
* @param pair a pair containing file size and the AsyncFile handle
* @param filename the name of the file being uploaded (for logging purposes)
* @return a Future that completes when the file has been sent
*/
protected Future<io.vertx.ext.web.client.HttpResponse<Buffer>> sendFileStream(
HttpRequest<Buffer> vertxRequest,
AbstractMap.SimpleEntry<Long, AsyncFile> pair,
String filename)
{
AsyncFile asyncFile = pair.getValue();
return vertxRequest.ssl(config.ssl())
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
valueOf(pair.getKey()))
.sendStream(pair.getValue()
.setReadBufferSize(config.sendReadBufferSize()))
.onComplete(ar -> {
asyncFile.close().onFailure(err ->
LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err)
);
});
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,38 @@

package org.apache.cassandra.sidecar.client;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.cassandra.sidecar.client.request.RequestExecutorTest;

import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static java.nio.file.Files.copy;
import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -37,17 +59,35 @@
public class VertxHttpClientTest
{
private static Vertx vertx;
private MockWebServer mockServer;
private HttpClientConfig config;
private SidecarInstanceImpl sidecarInstance;

@BeforeAll
public static void setUp()
@BeforeEach
public void setUp() throws IOException
{
vertx = Vertx.vertx();
mockServer = new MockWebServer();
mockServer.start();

config = new HttpClientConfig.Builder<>()
.ssl(false)
.timeoutMillis(30000)
.build();
sidecarInstance = RequestExecutorTest.newSidecarInstance(mockServer);
}

@AfterAll
public static void tearDown()
@AfterEach
public void tearDown() throws IOException
{
vertx.close();
if (mockServer != null)
{
mockServer.shutdown();
}
if (vertx != null)
{
vertx.close();
}
}

@Test
Expand All @@ -74,4 +114,116 @@ private HttpClientConfig.Builder<?> httpClientConfigBuilder()
.timeoutMillis(100)
.idleTimeoutMillis(100);
}

@Test
void testUploadSSTableClosesFile(@TempDir Path tempDirectory) throws Exception
{
runTestScenario(tempDirectory,
new MockResponse().setResponseCode(OK.code()),
new ExposeAsyncFileVertxHttpClient(vertx, config));
}

@Test
void testUploadClosesFileOnUploadFailure(@TempDir Path tempDirectory) throws Exception
{
runTestScenario(tempDirectory,
new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()),
new ExposeAsyncFileVertxHttpClient(vertx, config));
}

@Test
void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws Exception
{
mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));

Path fileToUpload = prepareFile(tempDirectory);

ExposeAsyncFileVertxHttpClient httpClient = new ExposeAsyncFileVertxHttpClient(vertx, config);

// Upload the same file 3 times (simulating multiple file uploads)
for (int i = 0; i < 3; i++)
{
HttpRequest<Buffer> vertxRequest = httpClient.webClient.put(mockServer.getPort(),
mockServer.getHostName(),
"/upload/test" + i);
httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString())
.get(30, TimeUnit.SECONDS);
}

assertThat(mockServer.getRequestCount()).isEqualTo(3);
assertThat(httpClient.capturedFiles).hasSize(3);

// Give async file close operations time to complete
Thread.sleep(100);

// Verify all the files are closed by attempting to call .end() which should throw IllegalStateException
for (AsyncFile file : httpClient.capturedFiles)
{
assertThatThrownBy(file::end)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("File handle is closed" );
}
}

private void runTestScenario(Path tempDirectory,
MockResponse mockResponse,
ExposeAsyncFileVertxHttpClient httpClient) throws Exception
{
mockServer.enqueue(mockResponse);

Path fileToUpload = prepareFile(tempDirectory);
HttpRequest<Buffer> vertxRequest = httpClient.webClient.put(mockServer.getPort(),
mockServer.getHostName(),
"/upload/test" );

httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString())
.get(30, TimeUnit.SECONDS);

assertThat(mockServer.getRequestCount()).isEqualTo(1);

// Give async file close operation time to complete
Thread.sleep(100);

// Verify file is closed by attempting to call .end() which should throw IllegalStateException
assertThat(httpClient.capturedFiles).hasSize(1);
assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end())
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("File handle is closed" );
}

/**
* Class that extends from {@link VertxHttpClient} for testing purposes and holds a reference to the
* {@link AsyncFile} to ensure that the file has been closed.
*/
static class ExposeAsyncFileVertxHttpClient extends VertxHttpClient
{
List<AsyncFile> capturedFiles = new ArrayList<>();

ExposeAsyncFileVertxHttpClient(Vertx vertx, HttpClientConfig config)
{
super(vertx, config);
}

@Override
protected Future<HttpResponse<Buffer>> sendFileStream(HttpRequest<Buffer> vertxRequest,
SimpleEntry<Long, AsyncFile> pair,
String filename)
{
capturedFiles.add(pair.getValue());
return super.sendFileStream(vertxRequest, pair, filename);
}
}

private Path prepareFile(Path tempDirectory) throws IOException
{
Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt" );
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt" ))
{
assertThat(inputStream).isNotNull();
copy(inputStream, fileToUpload, StandardCopyOption.REPLACE_EXISTING);
}
return fileToUpload;
}
}