diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index a11a1a68f0ce..205ce4c16bdb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionSinkMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.metrics.type.Histogram; @@ -299,6 +300,9 @@ public int getBatchSize() { if (outputPipeSink instanceof IoTDBDataRegionSyncSink) { return ((IoTDBDataRegionSyncSink) outputPipeSink).getBatchSize(); } + if (outputPipeSink instanceof IoTDBDataRegionAirGapSink) { + return ((IoTDBDataRegionAirGapSink) outputPipeSink).getBatchSize(); + } return 0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 7f904324bbbc..2207a1bf284a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; +import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -30,6 +31,10 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2; @@ -38,6 +43,7 @@ import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -48,13 +54,26 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.external.commons.io.FileUtils; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.NoSuchFileException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; @@ -67,6 +86,7 @@ public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAirGapSink.class); + private PipeTransferBatchReqBuilder tabletBatchBuilder; private boolean enableSendTsFileLimit; @Override @@ -75,6 +95,10 @@ public void customize( throws Exception { super.customize(parameters, configuration); + if (isTabletBatchModeEnabled) { + tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); + } + enableSendTsFileLimit = parameters.getBooleanOrDefault( Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), @@ -101,7 +125,10 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc // When receiver encountered packet loss, the transfer will time out // We need to restore the transfer quickly by retry under this circumstance socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs()); - if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { + if (isTabletBatchModeEnabled) { + tabletBatchBuilder.onEvent(tabletInsertionEvent); + doTransferWrapper(socket); + } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); } else { doTransferWrapper(socket, (PipeRawTabletInsertionEvent) tabletInsertionEvent); @@ -140,6 +167,9 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc final AirGapSocket socket = sockets.get(socketIndex); try { + if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) { + doTransferWrapper(socket); + } doTransferWrapper(socket, (PipeTsFileInsertionEvent) tsFileInsertionEvent); } catch (final IOException e) { isSocketAlive.set(socketIndex, false); @@ -155,13 +185,33 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc @Override public void transfer(final Event event) throws Exception { + if (event instanceof PipeDeleteDataNodeEvent) { + final int socketIndex = nextSocketIndex(); + final AirGapSocket socket = sockets.get(socketIndex); + + try { + doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event); + } catch (final IOException e) { + isSocketAlive.set(socketIndex, false); + + throw new PipeConnectionException( + String.format( + "Network error when transfer tsfile event %s, because %s.", + ((EnrichedEvent) event).coreReportMessage(), e.getMessage()), + e); + } + return; + } + final int socketIndex = nextSocketIndex(); final AirGapSocket socket = sockets.get(socketIndex); try { - if (event instanceof PipeDeleteDataNodeEvent) { - doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event); - } else if (!(event instanceof PipeHeartbeatEvent || event instanceof PipeTerminateEvent)) { + if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) { + doTransferWrapper(socket); + } + + if (!(event instanceof PipeHeartbeatEvent || event instanceof PipeTerminateEvent)) { LOGGER.warn( "IoTDBDataRegionAirGapConnector does not support transferring generic event: {}.", event); @@ -177,6 +227,64 @@ public void transfer(final Event event) throws Exception { } } + private void doTransferWrapper(final AirGapSocket socket) + throws IOException, WriteProcessException { + for (final Pair nonEmptyAndShouldEmitBatch : + tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) { + final PipeTabletEventBatch batch = nonEmptyAndShouldEmitBatch.getRight(); + if (batch instanceof PipeTabletEventPlainBatch) { + doTransfer(socket, (PipeTabletEventPlainBatch) batch); + } else if (batch instanceof PipeTabletEventTsFileBatch) { + doTransfer(socket, (PipeTabletEventTsFileBatch) batch); + } else { + LOGGER.warn("Unsupported batch type {}.", batch.getClass()); + } + batch.decreaseEventsReferenceCount(IoTDBDataRegionAirGapSink.class.getName(), true); + batch.onSuccess(); + } + } + + private void doTransfer( + final AirGapSocket socket, final PipeTabletEventPlainBatch batchToTransfer) + throws IOException { + if (!sendBatch( + socket, + toTPipeTransferBytes(batchToTransfer.toTPipeTransferReq()), + batchToTransfer.getPipe2BytesAccumulated())) { + final String errorMessage = + String.format("Transfer PipeTransferTabletBatchReq error. Socket: %s.", socket); + receiverStatusHandler.handle( + new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(errorMessage), + errorMessage, + batchToTransfer.deepCopyEvents().toString()); + } + } + + private void doTransfer( + final AirGapSocket socket, final PipeTabletEventTsFileBatch batchToTransfer) + throws IOException, WriteProcessException { + final List> dbTsFilePairs = batchToTransfer.sealTsFiles(); + final Map, Double> pipe2WeightMap = batchToTransfer.deepCopyPipe2WeightMap(); + + for (final Pair dbTsFile : dbTsFilePairs) { + doTransfer( + pipe2WeightMap, socket, dbTsFile.right, null, dbTsFile.left, dbTsFile.right.getName()); + try { + RetryUtils.retryOnException( + () -> { + FileUtils.delete(dbTsFile.right); + return null; + }); + } catch (final NoSuchFileException e) { + LOGGER.info("The file {} is not found, may already be deleted.", dbTsFile); + } catch (final Exception e) { + LOGGER.warn( + "Failed to delete batch file {}, this file should be deleted manually later", dbTsFile); + } + } + } + private void doTransferWrapper( final AirGapSocket socket, final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent) throws PipeException, IOException { @@ -319,61 +427,157 @@ private void doTransferWrapper( private void doTransfer( final AirGapSocket socket, final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { - final String pipeName = pipeTsFileInsertionEvent.getPipeName(); - final long creationTime = pipeTsFileInsertionEvent.getCreationTime(); - final File tsFile = pipeTsFileInsertionEvent.getTsFile(); + doTransfer( + Collections.singletonMap( + new Pair<>( + pipeTsFileInsertionEvent.getPipeName(), pipeTsFileInsertionEvent.getCreationTime()), + 1.0), + socket, + pipeTsFileInsertionEvent.getTsFile(), + pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver + ? pipeTsFileInsertionEvent.getModFile() + : null, + pipeTsFileInsertionEvent.isTableModelEvent() + ? pipeTsFileInsertionEvent.getTableModelDatabaseName() + : null, + pipeTsFileInsertionEvent.toString()); + } + + private void doTransfer( + final Map, Double> pipe2WeightMap, + final AirGapSocket socket, + final File tsFile, + final File modFile, + final String dataBaseName, + final String receiverStatusContext) + throws PipeException, IOException { final String errorMessage = String.format("Seal file %s error. Socket %s.", tsFile, socket); - // 1. Transfer file piece by piece, and mod if needed - if (pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver) { - final File modFile = pipeTsFileInsertionEvent.getModFile(); - transferFilePieces(pipeName, creationTime, modFile, socket, true); - transferFilePieces(pipeName, creationTime, tsFile, socket, true); - // 2. Transfer file seal signal with mod, which means the file is transferred completely - if (!send( - pipeName, - creationTime, + if (Objects.nonNull(modFile)) { + transferFilePieces(pipe2WeightMap, modFile, socket, true); + transferFilePieces(pipe2WeightMap, tsFile, socket, true); + if (!sendWeighted( socket, PipeTransferTsFileSealWithModReq.toTPipeTransferBytes( - modFile.getName(), - modFile.length(), - tsFile.getName(), - tsFile.length(), - pipeTsFileInsertionEvent.isTableModelEvent() - ? pipeTsFileInsertionEvent.getTableModelDatabaseName() - : null))) { + modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length(), dataBaseName), + pipe2WeightMap)) { receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeTsFileInsertionEvent.toString()); + receiverStatusContext); } else { LOGGER.info("Successfully transferred file {}.", tsFile); } } else { - transferFilePieces(pipeName, creationTime, tsFile, socket, false); - // 2. Transfer file seal signal without mod, which means the file is transferred completely - if (!send( - pipeName, - creationTime, + transferFilePieces(pipe2WeightMap, tsFile, socket, false); + if (!sendWeighted( socket, PipeTransferTsFileSealWithModReq.toTPipeTransferBytes( - tsFile.getName(), - tsFile.length(), - pipeTsFileInsertionEvent.isTableModelEvent() - ? pipeTsFileInsertionEvent.getTableModelDatabaseName() - : null))) { + tsFile.getName(), tsFile.length(), dataBaseName), + pipe2WeightMap)) { receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeTsFileInsertionEvent.toString()); + receiverStatusContext); } else { LOGGER.info("Successfully transferred file {}.", tsFile); } } } + private void transferFilePieces( + final Map, Double> pipe2WeightMap, + final File file, + final AirGapSocket socket, + final boolean isMultiFile) + throws PipeException, IOException { + final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize(); + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; + try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + while (true) { + mayLimitRateAndRecordIO(readFileBufferSize); + final int readLength = reader.read(readBuffer); + if (readLength == -1) { + break; + } + + final byte[] payload = + readLength == readFileBufferSize + ? readBuffer + : Arrays.copyOfRange(readBuffer, 0, readLength); + if (!sendWeighted( + socket, + isMultiFile + ? getTransferMultiFilePieceBytes(file.getName(), position, payload) + : getTransferSingleFilePieceBytes(file.getName(), position, payload), + pipe2WeightMap)) { + final String errorMessage = + String.format("Transfer file %s error. Socket %s.", file, socket); + receiverStatusHandler.handle( + new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(errorMessage), + errorMessage, + file.toString()); + } else { + position += readLength; + } + } + } + } + + private boolean sendBatch( + final AirGapSocket socket, + byte[] bytes, + final Map, Long> pipe2BytesAccumulated) + throws IOException { + final long uncompressedSize = bytes.length; + bytes = compressIfNeeded(bytes); + + final double compressionRatio = + uncompressedSize == 0 ? 1 : (double) bytes.length / uncompressedSize; + for (final Map.Entry, Long> entry : pipe2BytesAccumulated.entrySet()) { + rateLimitIfNeeded( + entry.getKey().getLeft(), + entry.getKey().getRight(), + socket.getEndPoint(), + (long) (entry.getValue() * compressionRatio)); + } + return sendBytes(socket, bytes); + } + + private boolean sendWeighted( + final AirGapSocket socket, byte[] bytes, final Map, Double> pipe2WeightMap) + throws IOException { + bytes = compressIfNeeded(bytes); + + for (final Map.Entry, Double> entry : pipe2WeightMap.entrySet()) { + rateLimitIfNeeded( + entry.getKey().getLeft(), + entry.getKey().getRight(), + socket.getEndPoint(), + (long) (bytes.length * entry.getValue())); + } + return sendBytes(socket, bytes); + } + + private byte[] toTPipeTransferBytes(final TPipeTransferReq req) throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(req.version, outputStream); + ReadWriteIOUtils.write(req.type, outputStream); + + final ByteBuffer bodyBuffer = req.body.duplicate(); + final byte[] body = new byte[bodyBuffer.remaining()]; + bodyBuffer.get(body); + outputStream.write(body); + + return byteArrayOutputStream.toByteArray(); + } + } + @Override protected void mayLimitRateAndRecordIO(final long requiredBytes) { PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); @@ -402,4 +606,60 @@ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException { } return super.compressIfNeeded(reqInBytes); } + + @Override + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (Objects.nonNull(tabletBatchBuilder)) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + } + } + + public int getBatchSize() { + return Objects.nonNull(tabletBatchBuilder) ? tabletBatchBuilder.size() : 0; + } + + @Override + public void close() { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.close(); + } + + super.close(); + } + + @Override + public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); + } + } + + @Override + public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java new file mode 100644 index 000000000000..d49ef82ce055 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSinkTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.airgap; + +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IoTDBDataRegionAirGapSinkTest { + + @Test + public void testTransferTabletBatchOverAirGap() throws Exception { + try (final RecordingIoTDBDataRegionAirGapSink sink = new RecordingIoTDBDataRegionAirGapSink()) { + final PipeParameters parameters = buildParameters(false); + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + sink.prepareSocket(); + + sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 1L)); + sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 2L)); + + Thread.sleep(300L); + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + Assert.assertEquals(1, sink.sentRequests.size()); + + final TPipeTransferReq req = toTPipeTransferReq(sink.sentRequests.get(0)); + Assert.assertEquals(PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType(), req.type); + + final PipeTransferTabletBatchReqV2 batchReq = + PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req); + Assert.assertEquals(2, batchReq.getTabletReqs().size()); + } + } + + @Test + public void testTransferTsFileBatchOverAirGap() throws Exception { + try (final RecordingIoTDBDataRegionAirGapSink sink = new RecordingIoTDBDataRegionAirGapSink()) { + final PipeParameters parameters = buildParameters(true); + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + sink.prepareSocket(); + + sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 1L)); + sink.transfer(createPipeRawTabletInsertionEvent("pipe", 1L, 2L)); + + Thread.sleep(300L); + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + final List requestTypes = new ArrayList<>(); + for (final byte[] requestBytes : sink.sentRequests) { + requestTypes.add(toTPipeTransferReq(requestBytes).type); + } + + Assert.assertTrue(requestTypes.contains(PipeRequestType.TRANSFER_TS_FILE_PIECE.getType())); + Assert.assertTrue( + requestTypes.contains(PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD.getType())); + Assert.assertFalse(requestTypes.contains(PipeRequestType.TRANSFER_TABLET_RAW_V2.getType())); + Assert.assertFalse(requestTypes.contains(PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType())); + } + } + + private PipeParameters buildParameters(final boolean useTsFileBatch) { + final Map attributes = new HashMap<>(); + attributes.put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName()); + attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "200"); + attributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576"); + if (useTsFileBatch) { + attributes.put(PipeSinkConstant.CONNECTOR_FORMAT_KEY, "tsfile"); + } + return new PipeParameters(attributes); + } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime, final long value) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d1", schemaList, 1); + tablet.addTimestamp(0, value); + tablet.addValue("s1", 0, value); + return new PipeRawTabletInsertionEvent( + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); + } + + private static TPipeTransferReq toTPipeTransferReq(final byte[] requestBytes) { + final ByteBuffer buffer = ByteBuffer.wrap(requestBytes); + + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = ReadWriteIOUtils.readByte(buffer); + req.type = ReadWriteIOUtils.readShort(buffer); + req.body = buffer.slice(); + return req; + } + + private static class RecordingIoTDBDataRegionAirGapSink extends IoTDBDataRegionAirGapSink { + + private final List sentRequests = new ArrayList<>(); + + private void prepareSocket() { + sockets.set(0, new TestingAirGapSocket()); + } + + @Override + protected int nextSocketIndex() { + return 0; + } + + @Override + protected boolean sendBytes(final AirGapSocket socket, final byte[] bytes) { + sentRequests.add(Arrays.copyOf(bytes, bytes.length)); + return true; + } + + private static class TestingAirGapSocket extends AirGapSocket { + + private TestingAirGapSocket() { + super("127.0.0.1", 6668); + } + + @Override + public synchronized void setSoTimeout(final int timeout) { + // No-op for unit test. + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index 7d84e3bb98fc..01b15d804320 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -109,13 +109,6 @@ public void customize( throws Exception { super.customize(parameters, configuration); - if (isTabletBatchModeEnabled) { - LOGGER.warn( - "Batch mode is enabled by the given parameters. " - + "IoTDBAirGapConnector does not support batch mode. " - + "Disable batch mode."); - } - for (int i = 0; i < nodeUrls.size(); i++) { isSocketAlive.add(false); sockets.add(null); @@ -327,15 +320,17 @@ protected int nextSocketIndex() { protected boolean send( final String pipeName, final long creationTime, final AirGapSocket socket, byte[] bytes) throws IOException { + bytes = compressIfNeeded(bytes); + rateLimitIfNeeded(pipeName, creationTime, socket.getEndPoint(), bytes.length); + return sendBytes(socket, bytes); + } + + protected boolean sendBytes(final AirGapSocket socket, byte[] bytes) throws IOException { if (!socket.isConnected()) { throw new SocketException( String.format("Socket %s is closed, will try to handshake", socket)); } - bytes = compressIfNeeded(bytes); - - rateLimitIfNeeded(pipeName, creationTime, socket.getEndPoint(), bytes.length); - final BufferedOutputStream outputStream = new BufferedOutputStream(socket.getOutputStream()); bytes = enrichWithLengthAndChecksum(bytes); outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);