Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected void onErrorInternal(final Exception exception) {
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
throws TException {
client.pipeTransfer(req, this);
transferWithOptionalRequestSlicing(client, req);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public PipeTransferTabletInsertNodeEventHandler(
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
throws TException {
client.pipeTransfer(req, this);
transferWithOptionalRequestSlicing(client, req);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public PipeTransferTabletRawEventHandler(
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
throws TException {
client.pipeTransfer(req, this);
transferWithOptionalRequestSlicing(client, req);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

Expand Down Expand Up @@ -126,8 +130,131 @@
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
throws TException;

protected final void transferWithOptionalRequestSlicing(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
throws TException {
final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
client.pipeTransfer(req, this);
return;
}

LOGGER.warn(
"The body size of the request is too large. The request will be sliced. Origin req: {}-{}. "
+ "Request body size: {}, threshold: {}",
req.getVersion(),
req.getType(),
req.body.limit(),
bodySizeLimit);

final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit);
final boolean shouldReturnSelf = client.shouldReturnSelf();
try {
transferSlicedRequest(
client,
req,
shouldReturnSelf,
PipeTransferSliceReqBuilder.nextSliceOrderId(),
0,
sliceCount,
bodySizeLimit);
} catch (final Exception e) {
fallbackToWholeRequest(client, req, shouldReturnSelf, e);
}
}

public abstract void clearEventsReferenceCount();

private void transferSlicedRequest(
final AsyncPipeDataTransferServiceClient client,
final TPipeTransferReq originalReq,
final boolean shouldReturnSelf,
final int sliceOrderId,
final int sliceIndex,
final int sliceCount,
final int bodySizeLimit)
throws Exception {

Check warning on line 176 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4l3u7u6txHQD_wsoi5&open=AZ4l3u7u6txHQD_wsoi5&pullRequest=17668
client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 1);
client.pipeTransfer(
PipeTransferSliceReqBuilder.buildSliceReq(
originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit),
new AsyncMethodCallback<TPipeTransferResp>() {
@Override
public void onComplete(final TPipeTransferResp response) {
if (sink.isClosed() || sliceIndex == sliceCount - 1) {
PipeTransferTrackableHandler.this.onComplete(response);
return;
}

if (response == null) {
fallbackToWholeRequest(
client,
originalReq,
shouldReturnSelf,
new PipeException("TPipeTransferResp is null when transferring slice."));
return;
}

if (response.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
fallbackToWholeRequest(
client,
originalReq,
shouldReturnSelf,
new PipeConnectionException(
String.format(
"Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s",

Check warning on line 205 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 118).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4l3u7u6txHQD_wsoi6&open=AZ4l3u7u6txHQD_wsoi6&pullRequest=17668
originalReq.getVersion(),
originalReq.getType(),
sliceIndex,
sliceCount,
response.getStatus())));
return;
}

try {
transferSlicedRequest(
client,
originalReq,
shouldReturnSelf,
sliceOrderId,
sliceIndex + 1,
sliceCount,
bodySizeLimit);
} catch (final Exception e) {
fallbackToWholeRequest(client, originalReq, shouldReturnSelf, e);
}
}

@Override
public void onError(final Exception exception) {
if (sink.isClosed() || sliceIndex == sliceCount - 1) {
PipeTransferTrackableHandler.this.onError(exception);
return;
}
fallbackToWholeRequest(client, originalReq, shouldReturnSelf, exception);
}
});
}

private void fallbackToWholeRequest(
final AsyncPipeDataTransferServiceClient client,
final TPipeTransferReq originalReq,
final boolean shouldReturnSelf,
final Exception exception) {
LOGGER.warn(
"Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.",
originalReq.getVersion(),
originalReq.getType(),
exception);

try {
client.setShouldReturnSelf(shouldReturnSelf);
client.pipeTransfer(originalReq, this);
} catch (final Exception e) {
PipeTransferTrackableHandler.this.onError(e);
}
}

public void closeClient() {
if (Objects.isNull(client)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
return tsFile;
}

public void transfer(

Check warning on line 140 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 89 to 64, Complexity from 16 to 14, Nesting Level from 3 to 2, Number of Variables from 13 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4l3u286txHQD_wsoi4&open=AZ4l3u286txHQD_wsoi4&pullRequest=17668
final IoTDBDataNodeAsyncClientManager clientManager,
final AsyncPipeDataTransferServiceClient client)
throws TException, IOException {
Expand Down Expand Up @@ -449,7 +449,7 @@
return;
}

client.pipeTransfer(req, this);
transferWithOptionalRequestSlicing(client, req);
}

@Override
Expand Down
Loading
Loading