From 20b870da86a6bd0d9a31d0dcd62704c9422e882c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 16:20:29 +0800 Subject: [PATCH 1/2] sl --- .../PipeTransferTabletBatchEventHandler.java | 2 +- ...eTransferTabletInsertNodeEventHandler.java | 2 +- .../PipeTransferTabletRawEventHandler.java | 2 +- .../handler/PipeTransferTrackableHandler.java | 141 +++++++++++ .../handler/PipeTransferTsFileHandler.java | 2 +- .../PipeTransferTrackableHandlerTest.java | 218 ++++++++++++++++++ .../AsyncPipeDataTransferServiceClient.java | 4 + 7 files changed, 367 insertions(+), 4 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 52c52b1038e9..e6899dee3c57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -129,7 +129,7 @@ protected void onErrorInternal(final Exception exception) { protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java index 912a1e724f74..56d1ce41b029 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java @@ -41,7 +41,7 @@ public PipeTransferTabletInsertNodeEventHandler( protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java index b64e446827af..eb4677de358e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java @@ -40,7 +40,7 @@ public PipeTransferTabletRawEventHandler( protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index a8b4a3b7a79a..51d6135fc49c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -21,7 +21,13 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -31,10 +37,12 @@ import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public abstract class PipeTransferTrackableHandler implements AsyncMethodCallback, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class); + private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0); protected final IoTDBDataRegionAsyncSink sink; protected volatile AsyncPipeDataTransferServiceClient client; @@ -126,8 +134,141 @@ protected abstract void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException; + protected final void transferWithOptionalRequestSlicing( + final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + throws TException { + final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes(); + if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion() + || req.body.limit() < bodySizeLimit) { + client.pipeTransfer(req, this); + return; + } + + LOGGER.warn( + "The body size of the request is too large. The request will be sliced. Origin req: {}-{}. " + + "Request body size: {}, threshold: {}", + req.getVersion(), + req.getType(), + req.body.limit(), + bodySizeLimit); + + final int sliceCount = + req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + final boolean shouldReturnSelf = client.shouldReturnSelf(); + try { + transferSlicedRequest( + client, + req, + shouldReturnSelf, + SLICE_ORDER_ID_GENERATOR.getAndIncrement(), + 0, + sliceCount, + bodySizeLimit); + } catch (final Exception e) { + fallbackToWholeRequest(client, req, shouldReturnSelf, e); + } + } + public abstract void clearEventsReferenceCount(); + private void transferSlicedRequest( + final AsyncPipeDataTransferServiceClient client, + final TPipeTransferReq originalReq, + final boolean shouldReturnSelf, + final int sliceOrderId, + final int sliceIndex, + final int sliceCount, + final int bodySizeLimit) + throws Exception { + final int startIndexInBody = sliceIndex * bodySizeLimit; + final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, originalReq.body.limit()); + client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 1); + client.pipeTransfer( + PipeTransferSliceReq.toTPipeTransferReq( + sliceOrderId, + originalReq.getType(), + sliceIndex, + sliceCount, + originalReq.body.duplicate(), + startIndexInBody, + endIndexInBody), + new AsyncMethodCallback() { + @Override + public void onComplete(final TPipeTransferResp response) { + if (sink.isClosed() || sliceIndex == sliceCount - 1) { + PipeTransferTrackableHandler.this.onComplete(response); + return; + } + + if (response == null) { + fallbackToWholeRequest( + client, + originalReq, + shouldReturnSelf, + new PipeException("TPipeTransferResp is null when transferring slice.")); + return; + } + + if (response.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + fallbackToWholeRequest( + client, + originalReq, + shouldReturnSelf, + new PipeConnectionException( + String.format( + "Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s", + originalReq.getVersion(), + originalReq.getType(), + sliceIndex, + sliceCount, + response.getStatus()))); + return; + } + + try { + transferSlicedRequest( + client, + originalReq, + shouldReturnSelf, + sliceOrderId, + sliceIndex + 1, + sliceCount, + bodySizeLimit); + } catch (final Exception e) { + fallbackToWholeRequest(client, originalReq, shouldReturnSelf, e); + } + } + + @Override + public void onError(final Exception exception) { + if (sink.isClosed() || sliceIndex == sliceCount - 1) { + PipeTransferTrackableHandler.this.onError(exception); + return; + } + fallbackToWholeRequest(client, originalReq, shouldReturnSelf, exception); + } + }); + } + + private void fallbackToWholeRequest( + final AsyncPipeDataTransferServiceClient client, + final TPipeTransferReq originalReq, + final boolean shouldReturnSelf, + final Exception exception) { + LOGGER.warn( + "Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.", + originalReq.getVersion(), + originalReq.getType(), + exception); + + try { + client.setShouldReturnSelf(shouldReturnSelf); + client.pipeTransfer(originalReq, this); + } catch (final Exception e) { + PipeTransferTrackableHandler.this.onError(e); + } + } + public void closeClient() { if (Objects.isNull(client)) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 35a28d1413a5..b6d3785de7da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -449,7 +449,7 @@ protected void doTransfer( return; } - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java new file mode 100644 index 000000000000..7d62d22a17a2 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class PipeTransferTrackableHandlerTest { + + private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + + private int originalRequestSliceThresholdBytes; + + @Before + public void setUp() { + originalRequestSliceThresholdBytes = commonConfig.getPipeSinkRequestSliceThresholdBytes(); + commonConfig.setPipeSinkRequestSliceThresholdBytes(4); + } + + @After + public void tearDown() { + commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes); + } + + @Test + public void testLargeRequestWillBeSlicedForAsyncTransfer() throws Exception { + final IoTDBDataRegionAsyncSink sink = Mockito.mock(IoTDBDataRegionAsyncSink.class); + final AsyncPipeDataTransferServiceClient client = + Mockito.mock(AsyncPipeDataTransferServiceClient.class); + Mockito.when(client.shouldReturnSelf()).thenReturn(true); + + final List transferredRequests = new ArrayList<>(); + Mockito.doAnswer( + invocation -> { + final TPipeTransferReq req = invocation.getArgument(0); + final AsyncMethodCallback callback = invocation.getArgument(1); + transferredRequests.add(req); + callback.onComplete(successResp()); + return null; + }) + .when(client) + .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any()); + + final TestPipeTransferTrackableHandler handler = new TestPipeTransferTrackableHandler(sink); + final TPipeTransferReq originalReq = createReq(10); + + handler.transfer(client, originalReq); + + Assert.assertEquals(3, transferredRequests.size()); + Assert.assertEquals(1, handler.completeCount); + Assert.assertEquals(0, handler.errorCount); + + final PipeTransferSliceReq firstSlice = + PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(0)); + final PipeTransferSliceReq secondSlice = + PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(1)); + final PipeTransferSliceReq thirdSlice = + PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2)); + + Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); + Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId()); + Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId()); + Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType()); + Assert.assertEquals(10, firstSlice.getOriginBodySize()); + Assert.assertEquals(3, firstSlice.getSliceCount()); + Assert.assertEquals(0, firstSlice.getSliceIndex()); + Assert.assertEquals(1, secondSlice.getSliceIndex()); + Assert.assertEquals(2, thirdSlice.getSliceIndex()); + Assert.assertEquals(4, firstSlice.getSliceBody().length); + Assert.assertEquals(4, secondSlice.getSliceBody().length); + Assert.assertEquals(2, thirdSlice.getSliceBody().length); + + final ArgumentCaptor shouldReturnSelfCaptor = ArgumentCaptor.forClass(Boolean.class); + Mockito.verify(client, Mockito.times(3)).setShouldReturnSelf(shouldReturnSelfCaptor.capture()); + Assert.assertEquals(Arrays.asList(false, false, true), shouldReturnSelfCaptor.getAllValues()); + } + + @Test + public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() throws Exception { + final IoTDBDataRegionAsyncSink sink = Mockito.mock(IoTDBDataRegionAsyncSink.class); + final AsyncPipeDataTransferServiceClient client = + Mockito.mock(AsyncPipeDataTransferServiceClient.class); + Mockito.when(client.shouldReturnSelf()).thenReturn(true); + + final List transferredRequests = new ArrayList<>(); + Mockito.doAnswer( + invocation -> { + final TPipeTransferReq req = invocation.getArgument(0); + final AsyncMethodCallback callback = invocation.getArgument(1); + transferredRequests.add(req); + if (req.getType() == PipeRequestType.TRANSFER_SLICE.getType()) { + callback.onComplete(failedResp()); + } else { + callback.onComplete(successResp()); + } + return null; + }) + .when(client) + .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any()); + + final TestPipeTransferTrackableHandler handler = new TestPipeTransferTrackableHandler(sink); + final TPipeTransferReq originalReq = createReq(10); + + handler.transfer(client, originalReq); + + Assert.assertEquals(2, transferredRequests.size()); + Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); + Assert.assertEquals(originalReq.getType(), transferredRequests.get(1).getType()); + Assert.assertEquals(originalReq.getVersion(), transferredRequests.get(1).getVersion()); + Assert.assertArrayEquals(originalReq.getBody(), transferredRequests.get(1).getBody()); + Assert.assertEquals(1, handler.completeCount); + Assert.assertEquals(0, handler.errorCount); + } + + private static TPipeTransferReq createReq(final int bodySize) { + final byte[] body = new byte[bodySize]; + for (int i = 0; i < body.length; ++i) { + body[i] = (byte) i; + } + + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = (short) 123; + req.body = ByteBuffer.wrap(body); + return req; + } + + private static TPipeTransferResp successResp() { + final TPipeTransferResp resp = new TPipeTransferResp(); + resp.setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + return resp; + } + + private static TPipeTransferResp failedResp() { + final TPipeTransferResp resp = new TPipeTransferResp(); + resp.setStatus( + new TSStatus().setCode(TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER.getStatusCode())); + return resp; + } + + private static class TestPipeTransferTrackableHandler extends PipeTransferTrackableHandler { + + private int completeCount; + private int errorCount; + + private TestPipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) { + super(sink); + } + + private void transfer(final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + throws TException { + tryTransfer(client, req); + } + + @Override + protected boolean onCompleteInternal(final TPipeTransferResp response) { + completeCount++; + return true; + } + + @Override + protected void onErrorInternal(final Exception exception) { + errorCount++; + } + + @Override + protected void doTransfer( + final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + throws TException { + transferWithOptionalRequestSlicing(client, req); + } + + @Override + public void clearEventsReferenceCount() { + // Do nothing + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 36295ec8500f..b7edc0c1088f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -135,6 +135,10 @@ public void setShouldReturnSelf(final boolean shouldReturnSelf) { this.shouldReturnSelf.set(shouldReturnSelf); } + public boolean shouldReturnSelf() { + return shouldReturnSelf.get(); + } + public void setTimeoutDynamically(final int timeout) { try { ((TNonblockingSocket) ___transport).setTimeout(timeout); From 97a405d1df7421437a6e2d98070f5dad1a3fdcbf Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 17:01:04 +0800 Subject: [PATCH 2/2] chew --- .../handler/PipeTransferTrackableHandler.java | 28 ++--- .../PipeTransferTrackableHandlerTest.java | 9 +- .../pipe/sink/client/IoTDBSyncClient.java | 31 ++--- .../common/PipeTransferSliceReqBuilder.java | 73 ++++++++++++ .../PipeTransferSliceReqBuilderTest.java | 106 ++++++++++++++++++ 5 files changed, 199 insertions(+), 48 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index 51d6135fc49c..a0e6ad73fe7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -21,9 +21,7 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; -import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; -import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -37,12 +35,10 @@ import org.slf4j.LoggerFactory; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; public abstract class PipeTransferTrackableHandler implements AsyncMethodCallback, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class); - private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0); protected final IoTDBDataRegionAsyncSink sink; protected volatile AsyncPipeDataTransferServiceClient client; @@ -137,9 +133,8 @@ protected abstract void doTransfer( protected final void transferWithOptionalRequestSlicing( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes(); - if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion() - || req.body.limit() < bodySizeLimit) { + final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit(); + if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) { client.pipeTransfer(req, this); return; } @@ -152,15 +147,14 @@ protected final void transferWithOptionalRequestSlicing( req.body.limit(), bodySizeLimit); - final int sliceCount = - req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit); final boolean shouldReturnSelf = client.shouldReturnSelf(); try { transferSlicedRequest( client, req, shouldReturnSelf, - SLICE_ORDER_ID_GENERATOR.getAndIncrement(), + PipeTransferSliceReqBuilder.nextSliceOrderId(), 0, sliceCount, bodySizeLimit); @@ -180,18 +174,10 @@ private void transferSlicedRequest( final int sliceCount, final int bodySizeLimit) throws Exception { - final int startIndexInBody = sliceIndex * bodySizeLimit; - final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, originalReq.body.limit()); client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 1); client.pipeTransfer( - PipeTransferSliceReq.toTPipeTransferReq( - sliceOrderId, - originalReq.getType(), - sliceIndex, - sliceCount, - originalReq.body.duplicate(), - startIndexInBody, - endIndexInBody), + PipeTransferSliceReqBuilder.buildSliceReq( + originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit), new AsyncMethodCallback() { @Override public void onComplete(final TPipeTransferResp response) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java index 7d62d22a17a2..60b692350854 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java @@ -97,7 +97,8 @@ public void testLargeRequestWillBeSlicedForAsyncTransfer() throws Exception { final PipeTransferSliceReq thirdSlice = PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2)); - Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); + Assert.assertEquals( + PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId()); Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId()); Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType()); @@ -144,7 +145,8 @@ public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() thro handler.transfer(client, originalReq); Assert.assertEquals(2, transferredRequests.size()); - Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); + Assert.assertEquals( + PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); Assert.assertEquals(originalReq.getType(), transferredRequests.get(1).getType()); Assert.assertEquals(originalReq.getVersion(), transferredRequests.get(1).getVersion()); Assert.assertArrayEquals(originalReq.getBody(), transferredRequests.get(1).getBody()); @@ -187,7 +189,8 @@ private TestPipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) { super(sink); } - private void transfer(final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + private void transfer( + final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { tryTransfer(client, req); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java index b7f42295e6cc..1ad5d0a855f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java @@ -22,9 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; -import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; -import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; import org.apache.iotdb.rpc.TSStatusCode; @@ -39,15 +37,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicInteger; - public class IoTDBSyncClient extends IClientRPCService.Client implements ThriftClient, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncClient.class); - private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0); - private final String ipAddress; private final int port; private final TEndPoint endPoint; @@ -100,9 +94,8 @@ public void setTimeout(int timeout) { @Override public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException { - final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes(); - if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion() - || req.body.limit() < bodySizeLimit) { + final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit(); + if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) { return super.pipeTransfer(req); } @@ -115,23 +108,13 @@ public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TExcept bodySizeLimit); try { - final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement(); - // Slice the buffer to avoid the buffer being too large - final int sliceCount = - req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId(); + final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit); for (int i = 0; i < sliceCount; ++i) { - final int startIndexInBody = i * bodySizeLimit; - final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, req.body.limit()); final TPipeTransferResp sliceResp = super.pipeTransfer( - PipeTransferSliceReq.toTPipeTransferReq( - sliceOrderId, - req.getType(), - i, - sliceCount, - req.body.duplicate(), - startIndexInBody, - endIndexInBody)); + PipeTransferSliceReqBuilder.buildSliceReq( + req, sliceOrderId, i, sliceCount, bodySizeLimit)); if (i == sliceCount - 1) { return sliceResp; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java new file mode 100644 index 000000000000..b108d6f1d3ab --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.common; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public final class PipeTransferSliceReqBuilder { + + private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0); + + private PipeTransferSliceReqBuilder() { + // Utility class + } + + public static int getBodySizeLimit() { + return PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes(); + } + + public static boolean shouldSlice(final TPipeTransferReq req, final int bodySizeLimit) { + return req.getVersion() == IoTDBSinkRequestVersion.VERSION_1.getVersion() + && req.body.limit() >= bodySizeLimit; + } + + public static int nextSliceOrderId() { + return SLICE_ORDER_ID_GENERATOR.getAndIncrement(); + } + + public static int getSliceCount(final TPipeTransferReq req, final int bodySizeLimit) { + return req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + } + + public static PipeTransferSliceReq buildSliceReq( + final TPipeTransferReq originalReq, + final int sliceOrderId, + final int sliceIndex, + final int sliceCount, + final int bodySizeLimit) + throws IOException { + final int startIndexInBody = sliceIndex * bodySizeLimit; + final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, originalReq.body.limit()); + return PipeTransferSliceReq.toTPipeTransferReq( + sliceOrderId, + originalReq.getType(), + sliceIndex, + sliceCount, + originalReq.body.duplicate(), + startIndexInBody, + endIndexInBody); + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java new file mode 100644 index 000000000000..290ce3979807 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.common; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class PipeTransferSliceReqBuilderTest { + + private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + + private int originalRequestSliceThresholdBytes; + + @Before + public void setUp() { + originalRequestSliceThresholdBytes = commonConfig.getPipeSinkRequestSliceThresholdBytes(); + commonConfig.setPipeSinkRequestSliceThresholdBytes(4); + } + + @After + public void tearDown() { + commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes); + } + + @Test + public void testBuildSliceReq() throws Exception { + final TPipeTransferReq req = createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 10); + final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit(); + + Assert.assertTrue(PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)); + Assert.assertEquals(3, PipeTransferSliceReqBuilder.getSliceCount(req, bodySizeLimit)); + + final PipeTransferSliceReq firstSlice = + PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 0, 3, bodySizeLimit); + final PipeTransferSliceReq secondSlice = + PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 1, 3, bodySizeLimit); + final PipeTransferSliceReq thirdSlice = + PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 2, 3, bodySizeLimit); + + Assert.assertArrayEquals(new byte[] {0, 1, 2, 3}, firstSlice.getSliceBody()); + Assert.assertArrayEquals(new byte[] {4, 5, 6, 7}, secondSlice.getSliceBody()); + Assert.assertArrayEquals(new byte[] {8, 9}, thirdSlice.getSliceBody()); + Assert.assertEquals(0, firstSlice.getSliceIndex()); + Assert.assertEquals(1, secondSlice.getSliceIndex()); + Assert.assertEquals(2, thirdSlice.getSliceIndex()); + Assert.assertEquals(3, firstSlice.getSliceCount()); + Assert.assertEquals(req.getType(), firstSlice.getOriginReqType()); + Assert.assertEquals(10, firstSlice.getOriginBodySize()); + } + + @Test + public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() { + final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit(); + + Assert.assertFalse( + PipeTransferSliceReqBuilder.shouldSlice( + createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 3), bodySizeLimit)); + Assert.assertFalse( + PipeTransferSliceReqBuilder.shouldSlice( + createReq((byte) (IoTDBSinkRequestVersion.VERSION_1.getVersion() + 1), 10), + bodySizeLimit)); + Assert.assertTrue( + PipeTransferSliceReqBuilder.shouldSlice( + createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4), bodySizeLimit)); + } + + private static TPipeTransferReq createReq(final byte version, final int bodySize) { + final byte[] body = new byte[bodySize]; + for (int i = 0; i < body.length; ++i) { + body[i] = (byte) i; + } + + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = version; + req.type = (short) 123; + req.body = ByteBuffer.wrap(body); + return req; + } +}