diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java index 48a1abc83f38e..0ff49d5ecf274 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -424,11 +424,15 @@ public static TableAccumulator createBuiltinMultiInputAccumulator( TAggregationType aggregationType, List inputDataTypes) { switch (aggregationType) { case MAX_BY: - checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); - // return new MaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + { + checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); + // return new MaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + } case MIN_BY: - checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); - // return new MinByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + { + checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size."); + // return new MinByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); + } default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedMaxMinByBaseAccumulator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedMaxMinByBaseAccumulator.java index 0f59a575f45c6..7d1220ae6ddca 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedMaxMinByBaseAccumulator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedMaxMinByBaseAccumulator.java @@ -645,6 +645,7 @@ private void updateX(int groupId, Column xColumn, int xIndex) { break; case BOOLEAN: xBooleanValues.set(groupId, xColumn.getBoolean(xIndex)); + break; default: throw new UnSupportedDataTypeException( String.format("Unsupported data type in MAX_BY/MIN_BY Aggregation: %s", xDataType)); diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/unary/scalar/util/SpookyHashV2Utils.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/unary/scalar/util/SpookyHashV2Utils.java index d726f4deff6a7..c1c0f42c64123 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/unary/scalar/util/SpookyHashV2Utils.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/unary/scalar/util/SpookyHashV2Utils.java @@ -200,36 +200,47 @@ private static long shortHash64(byte[] data, int offset, int length, long seed) switch (remainder) { case 15: h3 += (data[current + 14] & 0xFFL) << 48; + // fall through case 14: h3 += (data[current + 13] & 0xFFL) << 40; + // fall through case 13: h3 += (data[current + 12] & 0xFFL) << 32; + // fall through case 12: h3 += getUnsignedIntFromBytesWithLittleEndian(data, current + 8); h2 += getLongFromBytesWithLittleEndian(data, current); break; case 11: h3 += (data[current + 10] & 0xFFL) << 16; + // fall through case 10: h3 += (data[current + 9] & 0xFFL) << 8; + // fall through case 9: h3 += (data[current + 8] & 0xFFL); + // fall through case 8: h2 += getLongFromBytesWithLittleEndian(data, current); break; case 7: h2 += (data[current + 6] & 0xFFL) << 48; + // fall through case 6: h2 += (data[current + 5] & 0xFFL) << 40; + // fall through case 5: h2 += (data[current + 4] & 0xFFL) << 32; + // fall through case 4: h2 += getUnsignedIntFromBytesWithLittleEndian(data, current); break; case 3: h2 += (data[current + 2] & 0xFFL) << 16; + // fall through case 2: h2 += (data[current + 1] & 0xFFL) << 8; + // fall through case 1: h2 += (data[current] & 0xFFL); break; @@ -389,26 +400,37 @@ private static long longHash64(byte[] data, int offset, int length, long seed) { switch (sequences) { case 11: h10 += getLongFromBytesWithLittleEndian(data, current + 10 * SIZE_OF_LONG); + // fall through case 10: h9 += getLongFromBytesWithLittleEndian(data, current + 9 * SIZE_OF_LONG); + // fall through case 9: h8 += getLongFromBytesWithLittleEndian(data, current + 8 * SIZE_OF_LONG); + // fall through case 8: h7 += getLongFromBytesWithLittleEndian(data, current + 7 * SIZE_OF_LONG); + // fall through case 7: h6 += getLongFromBytesWithLittleEndian(data, current + 6 * SIZE_OF_LONG); + // fall through case 6: h5 += getLongFromBytesWithLittleEndian(data, current + 5 * SIZE_OF_LONG); + // fall through case 5: h4 += getLongFromBytesWithLittleEndian(data, current + 4 * SIZE_OF_LONG); + // fall through case 4: h3 += getLongFromBytesWithLittleEndian(data, current + 3 * SIZE_OF_LONG); + // fall through case 3: h2 += getLongFromBytesWithLittleEndian(data, current + 2 * SIZE_OF_LONG); + // fall through case 2: h1 += getLongFromBytesWithLittleEndian(data, current + SIZE_OF_LONG); + // fall through case 1: h0 += getLongFromBytesWithLittleEndian(data, current); + // fall through case 0: break; default: @@ -422,18 +444,25 @@ private static long longHash64(byte[] data, int offset, int length, long seed) { switch (limit - current) { case 7: last |= (data[current + 6] & 0xFFL) << 48; + // fall through case 6: last |= (data[current + 5] & 0xFFL) << 40; + // fall through case 5: last |= (data[current + 4] & 0xFFL) << 32; + // fall through case 4: last |= (data[current + 3] & 0xFFL) << 24; + // fall through case 3: last |= (data[current + 2] & 0xFFL) << 16; + // fall through case 2: last |= (data[current + 1] & 0xFFL) << 8; + // fall through case 1: last |= (data[current] & 0xFFL); + // fall through case 0: break; default: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index b2bae24de38b7..93c27fde51095 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -418,10 +418,12 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) return clusterSchemaInfo.adjustMaxRegionGroupCount( (AdjustMaxRegionGroupNumPlan) physicalPlan); case DeleteDatabase: - try { - return clusterSchemaInfo.deleteDatabase((DeleteDatabasePlan) physicalPlan); - } finally { - partitionInfo.deleteDatabase((DeleteDatabasePlan) physicalPlan); + { + try { + return clusterSchemaInfo.deleteDatabase((DeleteDatabasePlan) physicalPlan); + } finally { + partitionInfo.deleteDatabase((DeleteDatabasePlan) physicalPlan); + } } case PreDeleteDatabase: return partitionInfo.preDeleteDatabase((PreDeleteDatabasePlan) physicalPlan); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java index a531d67955cec..9190fa60e6ccb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java @@ -116,6 +116,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState removedDataNodes); return Flow.NO_MORE_STATE; } + break; case REMOVE_DATA_NODE_PREPARE: Map removedNodeStatusMap = new HashMap<>(); removedDataNodes.forEach( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index 9d11d51d31a3f..51c09a8a750b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -101,7 +101,6 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator columnTypes; private List measurementList; private List dataTypeList; - private List fieldSchemaList; private int deviceIdSize; private List modsInfoList; @@ -163,75 +162,103 @@ public boolean hasNext() { while (true) { switch (state) { case CHECK_DATA: - if (batchData != null && batchData.hasCurrent()) { - return true; + { + if (batchData != null && batchData.hasCurrent()) { + return true; + } } case INIT_DATA: - if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) { - batchData = chunkReader.nextPageData(); - final long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData); - if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForBatchData, size); + { + if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) { + batchData = chunkReader.nextPageData(); + final long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData); + if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForBatchData, size); + } + state = State.CHECK_DATA; + break; } - state = State.CHECK_DATA; - break; } case INIT_CHUNK_READER: - if (currentChunkMetadata != null - || (chunkMetadataList != null && chunkMetadataList.hasNext())) { - if (currentChunkMetadata == null) { - currentChunkMetadata = chunkMetadataList.next(); - timeChunk = null; - offset = 0; + { + if (currentChunkMetadata != null + || (chunkMetadataList != null && chunkMetadataList.hasNext())) { + if (currentChunkMetadata == null) { + currentChunkMetadata = chunkMetadataList.next(); + timeChunk = null; + offset = 0; + } + initChunkReader(currentChunkMetadata); + state = State.INIT_DATA; + break; } - initChunkReader(currentChunkMetadata); - state = State.INIT_DATA; - break; } case INIT_CHUNK_METADATA: - if (deviceMetaIterator != null && deviceMetaIterator.hasNext()) { - final Pair pair = deviceMetaIterator.next(); - - long size = 0; - List iChunkMetadataList = - reader.getAlignedChunkMetadata(pair.left, false); - - Iterator chunkMetadataIterator = - iChunkMetadataList.iterator(); - while (chunkMetadataIterator.hasNext()) { - final AbstractAlignedChunkMetadata alignedChunkMetadata = - chunkMetadataIterator.next(); - if (alignedChunkMetadata == null) { - throw new PipeException( - "Table model tsfile parsing does not support this type of ChunkMeta"); - } + { + if (deviceMetaIterator != null && deviceMetaIterator.hasNext()) { + final Pair pair = deviceMetaIterator.next(); + + long size = 0; + List iChunkMetadataList = + reader.getAlignedChunkMetadata(pair.left, true); + + Iterator chunkMetadataIterator = + iChunkMetadataList.iterator(); + while (chunkMetadataIterator.hasNext()) { + final AbstractAlignedChunkMetadata alignedChunkMetadata = + chunkMetadataIterator.next(); + if (alignedChunkMetadata == null) { + throw new PipeException( + "Table model tsfile parsing does not support this type of ChunkMeta"); + } - // Reduce the number of times Chunks are read - if (alignedChunkMetadata.getEndTime() < startTime - || alignedChunkMetadata.getStartTime() > endTime) { - chunkMetadataIterator.remove(); - continue; - } + // Reduce the number of times Chunks are read + if (alignedChunkMetadata.getEndTime() < startTime + || alignedChunkMetadata.getStartTime() > endTime) { + chunkMetadataIterator.remove(); + continue; + } - if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) { - chunkMetadataIterator.remove(); - continue; - } + Iterator iChunkMetadataIterator = + alignedChunkMetadata.getValueChunkMetadataList().iterator(); + while (iChunkMetadataIterator.hasNext()) { + IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next(); + if (iChunkMetadata == null) { + iChunkMetadataIterator.remove(); + continue; + } + + if (!modifications.isEmpty() + && ModsOperationUtil.isAllDeletedByMods( + pair.getLeft(), + iChunkMetadata.getMeasurementUid(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime(), + modifications)) { + iChunkMetadataIterator.remove(); + } + } - size += - PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata); - if (allocatedMemoryBlockForChunkMeta.getMemoryUsageInBytes() < size) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunkMeta, size); + if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) { + chunkMetadataIterator.remove(); + continue; + } + + size += + PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata); + if (allocatedMemoryBlockForChunkMeta.getMemoryUsageInBytes() < size) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForChunkMeta, size); + } } - } - deviceID = pair.getLeft(); - chunkMetadataList = iChunkMetadataList.iterator(); + deviceID = pair.getLeft(); + chunkMetadataList = iChunkMetadataList.iterator(); - state = State.INIT_CHUNK_READER; - break; + state = State.INIT_CHUNK_READER; + break; + } } case INIT_DEVICE_META: if (filteredTableSchemaIterator != null && filteredTableSchemaIterator.hasNext()) { @@ -248,7 +275,6 @@ public boolean hasNext() { dataTypeList = new ArrayList<>(); columnTypes = new ArrayList<>(); measurementList = new ArrayList<>(); - fieldSchemaList = new ArrayList<>(); for (int i = 0; i < columnSchemaSize; i++) { final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i); @@ -262,9 +288,6 @@ public boolean hasNext() { measurementList.add(measurementName); dataTypeList.add(schema.getType()); } - if (ColumnCategory.FIELD.equals(columnCategory)) { - fieldSchemaList.add(schema); - } } } deviceIdSize = dataTypeList.size(); @@ -316,9 +339,9 @@ private Tablet buildNextTablet() { tablet = new Tablet( tableName, - new ArrayList<>(measurementList), - new ArrayList<>(dataTypeList), - new ArrayList<>(columnTypes), + measurementList, + dataTypeList, + columnTypes, rowCountAndMemorySize.getLeft()); tablet.initBitMaps(); isFirstRow = false; @@ -361,20 +384,6 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta long size = timeChunkSize; final List valueChunkList = new ArrayList<>(); - final Map valueChunkMetadataMap = - alignedChunkMetadata.getValueChunkMetadataList().stream() - .filter(Objects::nonNull) - .filter( - metadata -> - !isFieldDeletedByMods( - metadata.getMeasurementUid(), - alignedChunkMetadata.getStartTime(), - alignedChunkMetadata.getEndTime())) - .collect( - Collectors.toMap( - IChunkMetadata::getMeasurementUid, - metadata -> metadata, - (left, right) -> left)); // To ensure that the Tablet has the same alignedChunk column as the current one, // you need to create a new Tablet to fill in the data. @@ -391,98 +400,50 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta measurementList.subList(deviceIdSize, measurementList.size()).clear(); dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear(); - boolean hasSelectedField = fieldSchemaList.isEmpty(); - boolean hasSelectedNonNullChunk = false; - for (; offset < fieldSchemaList.size(); ++offset) { - final IMeasurementSchema schema = fieldSchemaList.get(offset); - if (isFieldDeletedByMods( - schema.getMeasurementName(), - alignedChunkMetadata.getStartTime(), - alignedChunkMetadata.getEndTime())) { - continue; - } - - final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName()); - Chunk chunk = null; + for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) { + final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset); if (metadata != null) { - chunk = reader.readMemChunk((ChunkMetadata) metadata); - final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk); - if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - if (!hasSelectedNonNullChunk) { + final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata); + size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk); + if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + if (valueChunkList.isEmpty()) { // If the first chunk exceeds the memory limit, we need to allocate more memory - size = newSize; PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size); - } else { - break; + columnTypes.add(ColumnCategory.FIELD); + measurementList.add(metadata.getMeasurementUid()); + dataTypeList.add(metadata.getDataType()); + valueChunkList.add(chunk); + ++offset; } + break; } else { - size = newSize; + // Record the column information corresponding to Meta to fill in Tablet + columnTypes.add(ColumnCategory.FIELD); + measurementList.add(metadata.getMeasurementUid()); + dataTypeList.add(metadata.getDataType()); + valueChunkList.add(chunk); } - hasSelectedNonNullChunk = true; } - columnTypes.add(ColumnCategory.FIELD); - measurementList.add(schema.getMeasurementName()); - dataTypeList.add(schema.getType()); - valueChunkList.add(chunk); - hasSelectedField = true; } - if (offset >= fieldSchemaList.size()) { + if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) { currentChunkMetadata = null; } - if (!hasSelectedField) { - this.chunkReader = null; - this.batchData = null; - return; - } - this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null); this.modsInfoList = ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications); } - private boolean areAllFieldsDeletedByMods( - final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) { - if (modifications.isEmpty() || fieldSchemaList.isEmpty()) { - return false; - } - - for (final IMeasurementSchema schema : fieldSchemaList) { - if (!ModsOperationUtil.isAllDeletedByMods( - currentDeviceID, - schema.getMeasurementName(), - alignedChunkMetadata.getStartTime(), - alignedChunkMetadata.getEndTime(), - modifications)) { - return false; - } - } - return true; - } - - private boolean isFieldDeletedByMods( - final String measurementID, final long startTime, final long endTime) { - return !modifications.isEmpty() - && ModsOperationUtil.isAllDeletedByMods( - deviceID, measurementID, startTime, endTime, modifications); - } - private boolean fillMeasurementValueColumns( final BatchData data, final Tablet tablet, final int rowIndex) { - final TsPrimitiveType[] primitiveTypes = - Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0]; + final TsPrimitiveType[] primitiveTypes = data.getVector(); boolean needFillTime = false; - boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize; for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) { - final TsPrimitiveType primitiveType = - i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null; - final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i)); - if (!isDeleted) { - hasNonDeletedField = true; - } - if (primitiveType == null || isDeleted) { + final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize]; + if (primitiveType == null + || ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) { switch (dataTypeList.get(i)) { case TEXT: case BLOB: @@ -527,7 +488,7 @@ private boolean fillMeasurementValueColumns( throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType()); } } - return needFillTime || hasNonDeletedField; + return needFillTime; } private void fillDeviceIdColumns( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java index 27ce077252ef0..191e2bf3282e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java @@ -280,6 +280,7 @@ private TIoTConsensusV2TransferResp loadEvent(final TIoTConsensusV2TransferReq r IoTConsensusV2TsFileSealWithModReq.fromTIoTConsensusV2TransferReq(req)); case TRANSFER_TABLET_BATCH: LOGGER.info("IoTConsensusV2 transfer batch hasn't been implemented yet."); + break; default: break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index c10fdbc4f6720..a3dd749136d59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -207,231 +207,189 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { } switch (requestType) { case HANDSHAKE_DATANODE_V1: - { - try { - if (PipeConfig.getInstance().isPipeEnableMemoryCheck() - && PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() - < PipeConfig.getInstance().getPipeMinimumReceiverMemory()) { - return new TPipeTransferResp( - RpcUtils.getStatus( - TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), - "The receiver memory is not enough to handle the handshake request from datanode.")); - } - return handleTransferHandshakeV1( - PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + try { + if (PipeConfig.getInstance().isPipeEnableMemoryCheck() + && PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() + < PipeConfig.getInstance().getPipeMinimumReceiverMemory()) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), + "The receiver memory is not enough to handle the handshake request from datanode.")); } + return handleTransferHandshakeV1( + PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); } case HANDSHAKE_DATANODE_V2: - { - try { - if (PipeConfig.getInstance().isPipeEnableMemoryCheck() - && PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() - < PipeConfig.getInstance().getPipeMinimumReceiverMemory()) { - return new TPipeTransferResp( - RpcUtils.getStatus( - TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), - "The receiver memory is not enough to handle the handshake request from datanode.")); - } - return handleTransferHandshakeV2( - PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + try { + if (PipeConfig.getInstance().isPipeEnableMemoryCheck() + && PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() + < PipeConfig.getInstance().getPipeMinimumReceiverMemory()) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), + "The receiver memory is not enough to handle the handshake request from datanode.")); } + return handleTransferHandshakeV2( + PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); } case TRANSFER_TABLET_INSERT_NODE: - { - try { - return handleTransferTabletInsertNode( - PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); - - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletInsertNode( + PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); } case TRANSFER_TABLET_INSERT_NODE_V2: - { - try { - return handleTransferTabletInsertNode( - PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletInsertNode( + PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); } case TRANSFER_TABLET_RAW: - { - try { - return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawTimer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletRawTimer(System.nanoTime() - startTime); } case TRANSFER_TABLET_RAW_V2: - { - try { - return handleTransferTabletRaw( - PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawV2Timer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletRaw(PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletRawV2Timer(System.nanoTime() - startTime); } case TRANSFER_TABLET_BINARY: - { - try { - return handleTransferTabletBinary( - PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletBinary( + PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); } case TRANSFER_TABLET_BINARY_V2: - { - try { - return handleTransferTabletBinary( - PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletBinary( + PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); } case TRANSFER_TABLET_BATCH: - { - try { - return handleTransferTabletBatch( - PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchTimer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletBatch( + PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBatchTimer(System.nanoTime() - startTime); } case TRANSFER_TABLET_BATCH_V2: - { - try { - return handleTransferTabletBatchV2( - PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); - } + try { + return handleTransferTabletBatchV2( + PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); } case TRANSFER_TS_FILE_PIECE: - { - try { - return handleTransferFilePiece( - PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - true); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); - } + try { + return handleTransferFilePiece( + PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + true); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); } case TRANSFER_TS_FILE_SEAL: - { - try { - return handleTransferFileSealV1( - PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealTimer(System.nanoTime() - startTime); - } + try { + return handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealTimer(System.nanoTime() - startTime); } case TRANSFER_TS_FILE_PIECE_WITH_MOD: - { - try { - return handleTransferFilePiece( - PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); - - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); - } + try { + return handleTransferFilePiece( + PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false); + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); } case TRANSFER_TS_FILE_SEAL_WITH_MOD: - { - try { - return handleTransferFileSealV2( - PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); - } + try { + return handleTransferFileSealV2( + PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); } case TRANSFER_PLAN_NODE: - { - try { - return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); - } + try { + return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); } case TRANSFER_SCHEMA_SNAPSHOT_PIECE: - { - try { - return handleTransferFilePiece( - PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), - req instanceof AirGapPseudoTPipeTransferRequest, - false); - - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); - } + try { + return handleTransferFilePiece( + PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), + req instanceof AirGapPseudoTPipeTransferRequest, + false); + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); } case TRANSFER_SCHEMA_SNAPSHOT_SEAL: - { - try { - return handleTransferFileSealV2( - PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); - - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); - } + try { + return handleTransferFileSealV2( + PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); } case HANDSHAKE_CONFIGNODE_V1: case HANDSHAKE_CONFIGNODE_V2: case TRANSFER_CONFIG_PLAN: case TRANSFER_CONFIG_SNAPSHOT_PIECE: case TRANSFER_CONFIG_SNAPSHOT_SEAL: - { - try { - // Config requests will first be received by the DataNode receiver, - // then transferred to ConfigNode receiver to execute. - return handleTransferConfigPlan(req); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferConfigPlanTimer(System.nanoTime() - startTime); - } + try { + // Config requests will first be received by the DataNode receiver, + // then transferred to ConfigNode receiver to execute. + return handleTransferConfigPlan(req); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferConfigPlanTimer(System.nanoTime() - startTime); } case TRANSFER_SLICE: - { - try { - return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSliceTimer(System.nanoTime() - startTime); - } + try { + return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSliceTimer(System.nanoTime() - startTime); } case TRANSFER_COMPRESSED: - { - try { - return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferCompressedTimer(System.nanoTime() - startTime); - } + try { + return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferCompressedTimer(System.nanoTime() - startTime); } default: break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 8412161e63e39..f4c207272c89c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -1379,6 +1379,7 @@ private TsBlock getTransferedDataTypeTsBlock(TsBlock tsBlock) { break; case OBJECT: newValueColumns[i] = valueColumns[i]; + break; case VECTOR: case UNKNOWN: default: