diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java index c6e8ad5295070..3148a7c67ac0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.metrics.type.Rate; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -56,6 +57,7 @@ public void bindTo(final AbstractMetricService metricService) { private void createMetrics(final String taskID) { createRate(taskID); + createHistogram(taskID); } private void createRate(final String taskID) { @@ -72,6 +74,38 @@ private void createRate(final String taskID) { String.valueOf(connector.getCreationTime()))); } + private void createHistogram(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + + final Histogram schemaBatchSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_SCHEMA_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTabletBatchSizeHistogram(schemaBatchSizeHistogram); + + final Histogram schemaBatchTimeIntervalHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTabletBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram); + + final Histogram schemaBatchEventSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString()); + connector.setEventSizeHistogram(schemaBatchEventSizeHistogram); + } + @Override public void unbindFrom(final AbstractMetricService metricService) { ImmutableSet.copyOf(connectorMap.keySet()).forEach(this::deregister); @@ -83,6 +117,7 @@ public void unbindFrom(final AbstractMetricService metricService) { private void removeMetrics(final String taskID) { removeRate(taskID); + removeHistogram(taskID); } private void removeRate(final String taskID) { @@ -98,6 +133,29 @@ private void removeRate(final String taskID) { schemaRateMap.remove(taskID); } + private void removeHistogram(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_SCHEMA_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString()); + } + //////////////////////////// Register & deregister (pipe integration) //////////////////////////// public void register(final PipeSinkSubtask pipeSinkSubtask) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java new file mode 100644 index 0000000000000..e03ac6b63efbb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup; +import org.apache.iotdb.metrics.impl.DoNothingHistogram; +import org.apache.iotdb.metrics.type.Histogram; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY; + +public class PipeSchemaRegionWritePlanEventBatch implements AutoCloseable { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeSchemaRegionWritePlanEventBatch.class); + + private static final PlanNodeId EMPTY_PLAN_NODE_ID = new PlanNodeId(""); + + private final int maxDelayInMs; + private final long maxBatchSizeInBytes; + private final PipeMemoryBlock allocatedMemoryBlock; + + private final List events = new ArrayList<>(); + + private final Map> deviceMap = new HashMap<>(); + private final Map> templateActivationMap = new HashMap<>(); + + private BatchType batchType = BatchType.NONE; + private String pipeName; + private long creationTime; + + private long totalBufferSize = 0; + private long firstEventProcessingTime = Long.MIN_VALUE; + + private volatile boolean isClosed = false; + + private Histogram batchSizeHistogram = new DoNothingHistogram(); + private Histogram batchTimeIntervalHistogram = new DoNothingHistogram(); + private Histogram eventSizeHistogram = new DoNothingHistogram(); + + public PipeSchemaRegionWritePlanEventBatch(final PipeParameters parameters) { + final Integer requestMaxDelayInMillis = + parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, SINK_IOTDB_BATCH_DELAY_MS_KEY); + if (Objects.isNull(requestMaxDelayInMillis)) { + final int requestMaxDelayConfig = + parameters.getIntOrDefault( + Arrays.asList( + CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY, SINK_IOTDB_BATCH_DELAY_SECONDS_KEY), + CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE); + maxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE : requestMaxDelayConfig; + } else { + maxDelayInMs = requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : requestMaxDelayInMillis; + } + + maxBatchSizeInBytes = + parameters.getLongOrDefault( + Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), + CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(maxBatchSizeInBytes); + } + + public synchronized boolean onEvent(final PipeSchemaRegionWritePlanEvent event) { + if (isClosed || !canBatch(event)) { + return false; + } + + if (events.isEmpty() || !Objects.equals(events.get(events.size() - 1), event)) { + if (!event.increaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName())) { + LOGGER.warn("Cannot increase reference count for event: {}, ignore it in batch.", event); + return true; + } + + try { + if (Objects.isNull(pipeName)) { + pipeName = event.getPipeName(); + creationTime = event.getCreationTime(); + } + appendPlanNode(event.getPlanNode()); + totalBufferSize += event.getPlanNode().serializeToByteBuffer().limit(); + events.add(event); + } catch (final Exception e) { + event.decreaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName(), false); + throw e; + } + + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); + } + } + + return true; + } + + private boolean canBatch(final PipeSchemaRegionWritePlanEvent event) { + final BatchType eventBatchType = resolveBatchType(event.getPlanNode()); + if (eventBatchType == BatchType.NONE || containsNonEmptyProps(event.getPlanNode())) { + return false; + } + + if (events.isEmpty()) { + return !hasAlignmentConflict(event.getPlanNode()); + } + + return Objects.equals(pipeName, event.getPipeName()) + && creationTime == event.getCreationTime() + && batchType == eventBatchType + && !hasAlignmentConflict(event.getPlanNode()); + } + + private BatchType resolveBatchType(final PlanNode planNode) { + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + case CREATE_ALIGNED_TIME_SERIES: + case CREATE_MULTI_TIME_SERIES: + case INTERNAL_CREATE_TIME_SERIES: + case INTERNAL_CREATE_MULTI_TIMESERIES: + return BatchType.TIMESERIES; + case ACTIVATE_TEMPLATE: + case BATCH_ACTIVATE_TEMPLATE: + case INTERNAL_BATCH_ACTIVATE_TEMPLATE: + return BatchType.TEMPLATE_ACTIVATE; + default: + return BatchType.NONE; + } + } + + private boolean containsNonEmptyProps(final PlanNode planNode) { + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + return hasNonEmptyProps(((CreateTimeSeriesNode) planNode).getProps()); + case CREATE_MULTI_TIME_SERIES: + return ((CreateMultiTimeSeriesNode) planNode) + .getMeasurementGroupMap().values().stream() + .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps); + case INTERNAL_CREATE_TIME_SERIES: + return hasNonEmptyProps(((InternalCreateTimeSeriesNode) planNode).getMeasurementGroup()); + case INTERNAL_CREATE_MULTI_TIMESERIES: + return ((InternalCreateMultiTimeSeriesNode) planNode) + .getDeviceMap().values().stream() + .map(Pair::getRight) + .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps); + default: + return false; + } + } + + private static boolean hasNonEmptyProps(final MeasurementGroup measurementGroup) { + return Objects.nonNull(measurementGroup.getPropsList()) + && measurementGroup.getPropsList().stream() + .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps); + } + + private static boolean hasNonEmptyProps(final Map props) { + return Objects.nonNull(props) && !props.isEmpty(); + } + + private boolean hasAlignmentConflict(final PlanNode planNode) { + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + return hasAlignmentConflict( + ((CreateTimeSeriesNode) planNode).getPath().getDevicePath(), false); + case CREATE_ALIGNED_TIME_SERIES: + return hasAlignmentConflict(((CreateAlignedTimeSeriesNode) planNode).getDevicePath(), true); + case CREATE_MULTI_TIME_SERIES: + return ((CreateMultiTimeSeriesNode) planNode) + .getMeasurementGroupMap().keySet().stream() + .anyMatch(devicePath -> hasAlignmentConflict(devicePath, false)); + case INTERNAL_CREATE_TIME_SERIES: + return hasAlignmentConflict( + ((InternalCreateTimeSeriesNode) planNode).getDevicePath(), + ((InternalCreateTimeSeriesNode) planNode).isAligned()); + case INTERNAL_CREATE_MULTI_TIMESERIES: + return ((InternalCreateMultiTimeSeriesNode) planNode) + .getDeviceMap().entrySet().stream() + .anyMatch( + entry -> hasAlignmentConflict(entry.getKey(), entry.getValue().getLeft())); + default: + return false; + } + } + + private boolean hasAlignmentConflict(final PartialPath devicePath, final boolean isAligned) { + final Pair existing = deviceMap.get(devicePath); + return Objects.nonNull(existing) && !Objects.equals(existing.getLeft(), isAligned); + } + + private void appendPlanNode(final PlanNode planNode) { + if (batchType == BatchType.NONE) { + batchType = resolveBatchType(planNode); + } + + switch (planNode.getType()) { + case CREATE_TIME_SERIES: + appendCreateTimeSeriesNode((CreateTimeSeriesNode) planNode); + break; + case CREATE_ALIGNED_TIME_SERIES: + appendCreateAlignedTimeSeriesNode((CreateAlignedTimeSeriesNode) planNode); + break; + case CREATE_MULTI_TIME_SERIES: + appendCreateMultiTimeSeriesNode((CreateMultiTimeSeriesNode) planNode); + break; + case INTERNAL_CREATE_TIME_SERIES: + appendInternalCreateTimeSeriesNode((InternalCreateTimeSeriesNode) planNode); + break; + case INTERNAL_CREATE_MULTI_TIMESERIES: + appendInternalCreateMultiTimeSeriesNode((InternalCreateMultiTimeSeriesNode) planNode); + break; + case ACTIVATE_TEMPLATE: + appendActivateTemplateNode((ActivateTemplateNode) planNode); + break; + case BATCH_ACTIVATE_TEMPLATE: + appendBatchActivateTemplateNode((BatchActivateTemplateNode) planNode); + break; + case INTERNAL_BATCH_ACTIVATE_TEMPLATE: + appendInternalBatchActivateTemplateNode((InternalBatchActivateTemplateNode) planNode); + break; + default: + throw new IllegalArgumentException("Unsupported schema plan node " + planNode.getType()); + } + } + + private void appendCreateTimeSeriesNode(final CreateTimeSeriesNode node) { + appendMeasurement( + node.getPath().getDevicePath(), + false, + node.getPath().getMeasurement(), + node.getDataType(), + node.getEncoding(), + node.getCompressor(), + node.getAlias(), + node.getTags(), + node.getAttributes()); + } + + private void appendCreateAlignedTimeSeriesNode(final CreateAlignedTimeSeriesNode node) { + for (int i = 0; i < node.getMeasurements().size(); ++i) { + appendMeasurement( + node.getDevicePath(), + true, + node.getMeasurements().get(i), + node.getDataTypes().get(i), + node.getEncodings().get(i), + node.getCompressors().get(i), + Objects.nonNull(node.getAliasList()) ? node.getAliasList().get(i) : null, + Objects.nonNull(node.getTagsList()) ? node.getTagsList().get(i) : null, + Objects.nonNull(node.getAttributesList()) ? node.getAttributesList().get(i) : null); + } + } + + private void appendCreateMultiTimeSeriesNode(final CreateMultiTimeSeriesNode node) { + node.getMeasurementGroupMap() + .forEach( + (devicePath, measurementGroup) -> + appendMeasurementGroup(devicePath, false, measurementGroup)); + } + + private void appendInternalCreateTimeSeriesNode(final InternalCreateTimeSeriesNode node) { + appendMeasurementGroup(node.getDevicePath(), node.isAligned(), node.getMeasurementGroup()); + } + + private void appendInternalCreateMultiTimeSeriesNode( + final InternalCreateMultiTimeSeriesNode node) { + node.getDeviceMap() + .forEach( + (devicePath, isAlignedAndMeasurementGroup) -> + appendMeasurementGroup( + devicePath, + isAlignedAndMeasurementGroup.getLeft(), + isAlignedAndMeasurementGroup.getRight())); + } + + private void appendMeasurementGroup( + final PartialPath devicePath, + final boolean isAligned, + final MeasurementGroup measurementGroup) { + for (int i = 0; i < measurementGroup.size(); ++i) { + appendMeasurement( + devicePath, + isAligned, + measurementGroup.getMeasurements().get(i), + measurementGroup.getDataTypes().get(i), + measurementGroup.getEncodings().get(i), + measurementGroup.getCompressors().get(i), + Objects.nonNull(measurementGroup.getAliasList()) + ? measurementGroup.getAliasList().get(i) + : null, + Objects.nonNull(measurementGroup.getTagsList()) + ? measurementGroup.getTagsList().get(i) + : null, + Objects.nonNull(measurementGroup.getAttributesList()) + ? measurementGroup.getAttributesList().get(i) + : null); + } + } + + private void appendMeasurement( + final PartialPath devicePath, + final boolean isAligned, + final String measurement, + final TSDataType dataType, + final TSEncoding encoding, + final CompressionType compressor, + final String alias, + final Map tags, + final Map attributes) { + final MeasurementGroup group = + deviceMap + .computeIfAbsent(devicePath, key -> new Pair<>(isAligned, new MeasurementGroup())) + .getRight(); + if (group.addMeasurement(measurement, dataType, encoding, compressor)) { + group.addAlias(alias); + group.addTags(tags); + group.addAttributes(attributes); + } + } + + private void appendActivateTemplateNode(final ActivateTemplateNode node) { + templateActivationMap.putIfAbsent( + node.getActivatePath(), new Pair<>(node.getTemplateId(), node.getTemplateSetLevel())); + } + + private void appendBatchActivateTemplateNode(final BatchActivateTemplateNode node) { + node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent); + } + + private void appendInternalBatchActivateTemplateNode( + final InternalBatchActivateTemplateNode node) { + node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent); + } + + public synchronized boolean shouldEmit() { + if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) { + return false; + } + return totalBufferSize >= maxBatchSizeInBytes + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + } + + public synchronized void recordBatchMetrics() { + if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) { + return; + } + batchTimeIntervalHistogram.update(System.currentTimeMillis() - firstEventProcessingTime); + batchSizeHistogram.update(totalBufferSize); + eventSizeHistogram.update(events.size()); + } + + public synchronized PlanNode toPlanNode() { + switch (batchType) { + case TIMESERIES: + return new InternalCreateMultiTimeSeriesNode(EMPTY_PLAN_NODE_ID, new HashMap<>(deviceMap)); + case TEMPLATE_ACTIVATE: + return new BatchActivateTemplateNode( + EMPTY_PLAN_NODE_ID, new HashMap<>(templateActivationMap)); + default: + throw new IllegalStateException("Cannot build schema batch plan node from empty batch."); + } + } + + public synchronized void onSuccess() { + events.clear(); + deviceMap.clear(); + templateActivationMap.clear(); + batchType = BatchType.NONE; + pipeName = null; + creationTime = 0; + totalBufferSize = 0; + firstEventProcessingTime = Long.MIN_VALUE; + } + + public synchronized void decreaseEventsReferenceCount( + final String holderMessage, final boolean shouldReport) { + events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport)); + } + + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + final boolean removed = + events.removeIf( + event -> { + if (pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId()) { + event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName()); + return true; + } + return false; + }); + if (removed) { + rebuildFromEvents(); + } + } + + private void rebuildFromEvents() { + deviceMap.clear(); + templateActivationMap.clear(); + batchType = BatchType.NONE; + pipeName = null; + creationTime = 0; + totalBufferSize = 0; + + if (events.isEmpty()) { + firstEventProcessingTime = Long.MIN_VALUE; + return; + } + + // After a partial discard, the enqueue timestamp of the oldest remaining event is unknown. + // Reset the emit window conservatively to avoid flushing immediately because of removed events. + firstEventProcessingTime = System.currentTimeMillis(); + batchType = resolveBatchType(((PipeSchemaRegionWritePlanEvent) events.get(0)).getPlanNode()); + + for (final EnrichedEvent event : events) { + final PipeSchemaRegionWritePlanEvent schemaEvent = (PipeSchemaRegionWritePlanEvent) event; + if (Objects.isNull(pipeName)) { + pipeName = schemaEvent.getPipeName(); + creationTime = schemaEvent.getCreationTime(); + } + appendPlanNode(schemaEvent.getPlanNode()); + totalBufferSize += schemaEvent.getPlanNode().serializeToByteBuffer().limit(); + } + } + + public synchronized boolean isEmpty() { + return events.isEmpty(); + } + + public synchronized int size() { + return events.size(); + } + + public synchronized String getPipeName() { + return pipeName; + } + + public synchronized long getCreationTime() { + return creationTime; + } + + public void setBatchSizeHistogram(final Histogram batchSizeHistogram) { + if (Objects.nonNull(batchSizeHistogram)) { + this.batchSizeHistogram = batchSizeHistogram; + } + } + + public void setBatchTimeIntervalHistogram(final Histogram batchTimeIntervalHistogram) { + if (Objects.nonNull(batchTimeIntervalHistogram)) { + this.batchTimeIntervalHistogram = batchTimeIntervalHistogram; + } + } + + public void setEventSizeHistogram(final Histogram eventSizeHistogram) { + if (Objects.nonNull(eventSizeHistogram)) { + this.eventSizeHistogram = eventSizeHistogram; + } + } + + @Override + public synchronized void close() { + isClosed = true; + events.forEach( + event -> event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName())); + events.clear(); + deviceMap.clear(); + templateActivationMap.clear(); + allocatedMemoryBlock.close(); + } + + private enum BatchType { + NONE, + TIMESERIES, + TEMPLATE_ACTIVATE + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index bc056857c17c1..d15a913b5e050 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -24,11 +24,15 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -49,6 +53,19 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSchemaRegionAirGapSink.class); + private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch; + + @Override + public void customize( + final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) + throws Exception { + super.customize(parameters, configuration); + + if (isTabletBatchModeEnabled) { + schemaRegionWritePlanEventBatch = new PipeSchemaRegionWritePlanEventBatch(parameters); + } + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( @@ -68,12 +85,21 @@ public void transfer(final Event event) throws Exception { try { if (event instanceof PipeSchemaRegionWritePlanEvent) { - doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event); + if (isTabletBatchModeEnabled && Objects.nonNull(schemaRegionWritePlanEventBatch)) { + doTransferWithBatch(socket, (PipeSchemaRegionWritePlanEvent) event); + } else { + doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event); + } } else if (event instanceof PipeSchemaRegionSnapshotEvent) { + flushBatchedEventsIfNecessary(socket); doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event); - } else if (!(event instanceof PipeHeartbeatEvent)) { - LOGGER.warn( - "IoTDBSchemaRegionAirGapSink does not support transferring generic event: {}.", event); + } else { + flushBatchedEventsIfNecessary(socket); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn( + "IoTDBSchemaRegionAirGapSink does not support transferring generic event: {}.", + event); + } } } catch (final IOException e) { isSocketAlive.set(socketIndex, false); @@ -86,6 +112,58 @@ public void transfer(final Event event) throws Exception { } } + private void doTransferWithBatch( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) + throws PipeException, IOException { + if (tryTransferInBatch(socket, event)) { + return; + } + + doTransferWrapper(socket, event); + } + + private boolean tryTransferInBatch( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) + throws PipeException, IOException { + if (tryAppendToBatchAndFlushIfNecessary(socket, event)) { + return true; + } + + if (schemaRegionWritePlanEventBatch.isEmpty()) { + return false; + } + + flushBatchedEventsIfNecessary(socket); + return tryAppendToBatchAndFlushIfNecessary(socket, event); + } + + private boolean tryAppendToBatchAndFlushIfNecessary( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event) + throws PipeException, IOException { + if (!schemaRegionWritePlanEventBatch.onEvent(event)) { + return false; + } + + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(socket); + } + return true; + } + + private void flushBatchedEventsIfNecessary(final AirGapSocket socket) + throws PipeException, IOException { + if (Objects.isNull(schemaRegionWritePlanEventBatch) + || schemaRegionWritePlanEventBatch.isEmpty()) { + return; + } + + schemaRegionWritePlanEventBatch.recordBatchMetrics(); + doTransfer(socket, schemaRegionWritePlanEventBatch); + schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount( + IoTDBSchemaRegionAirGapSink.class.getName(), true); + schemaRegionWritePlanEventBatch.onSuccess(); + } + private void doTransferWrapper( final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) @@ -107,21 +185,39 @@ private void doTransfer( final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException, IOException { - if (!send( + doTransfer( + socket, + pipeSchemaRegionWritePlanEvent.getPlanNode(), pipeSchemaRegionWritePlanEvent.getPipeName(), pipeSchemaRegionWritePlanEvent.getCreationTime(), - socket, - PipeTransferPlanNodeReq.toTPipeTransferBytes( - pipeSchemaRegionWritePlanEvent.getPlanNode()))) { + pipeSchemaRegionWritePlanEvent.toString()); + } + + private void doTransfer( + final AirGapSocket socket, final PipeSchemaRegionWritePlanEventBatch batch) + throws PipeException, IOException { + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode = + batch.toPlanNode(); + doTransfer(socket, planNode, batch.getPipeName(), batch.getCreationTime(), planNode.toString()); + } + + private void doTransfer( + final AirGapSocket socket, + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode, + final String pipeName, + final long creationTime, + final String eventDescription) + throws PipeException, IOException { + if (!send( + pipeName, creationTime, socket, PipeTransferPlanNodeReq.toTPipeTransferBytes(planNode))) { final String errorMessage = String.format( - "Transfer data node write plan %s error. Socket: %s.", - pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), socket); + "Transfer data node write plan %s error. Socket: %s.", planNode.getType(), socket); receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeSchemaRegionWritePlanEvent.toString(), + eventDescription, true); } } @@ -215,4 +311,44 @@ protected byte[] getTransferMultiFilePieceBytes( final String fileName, final long position, final byte[] payLoad) throws IOException { return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferBytes(fileName, position, payLoad); } + + @Override + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.discardEventsOfPipe( + pipeNameToDrop, creationTimeToDrop, regionId); + } + } + + @Override + public void close() { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.close(); + } + super.close(); + } + + @Override + public void setTabletBatchSizeHistogram(final Histogram tabletBatchSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram( + final Histogram tabletBatchTimeIntervalHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram( + tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java index 15a72d1193aec..26c1464c183f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java @@ -25,11 +25,15 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -54,6 +58,19 @@ public class IoTDBSchemaRegionSink extends IoTDBDataNodeSyncSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSchemaRegionSink.class); + private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch; + + @Override + public void customize( + final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) + throws Exception { + super.customize(parameters, configuration); + + if (isTabletBatchModeEnabled) { + schemaRegionWritePlanEventBatch = new PipeSchemaRegionWritePlanEventBatch(parameters); + } + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( @@ -69,13 +86,69 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc @Override public void transfer(final Event event) throws Exception { if (event instanceof PipeSchemaRegionWritePlanEvent) { - doTransferWrapper((PipeSchemaRegionWritePlanEvent) event); + if (isTabletBatchModeEnabled && Objects.nonNull(schemaRegionWritePlanEventBatch)) { + doTransferWithBatch((PipeSchemaRegionWritePlanEvent) event); + } else { + doTransferWrapper((PipeSchemaRegionWritePlanEvent) event); + } } else if (event instanceof PipeSchemaRegionSnapshotEvent) { + flushBatchedEventsIfNecessary(); doTransferWrapper((PipeSchemaRegionSnapshotEvent) event); - } else if (!(event instanceof PipeHeartbeatEvent)) { - LOGGER.warn( - "IoTDBSchemaRegionConnector does not support transferring generic event: {}.", event); + } else { + flushBatchedEventsIfNecessary(); + if (!(event instanceof PipeHeartbeatEvent)) { + LOGGER.warn( + "IoTDBSchemaRegionConnector does not support transferring generic event: {}.", event); + } + } + } + + private void doTransferWithBatch(final PipeSchemaRegionWritePlanEvent event) + throws PipeException { + if (tryTransferInBatch(event)) { + return; + } + + doTransferWrapper(event); + } + + private boolean tryTransferInBatch(final PipeSchemaRegionWritePlanEvent event) + throws PipeException { + if (tryAppendToBatchAndFlushIfNecessary(event)) { + return true; } + + if (schemaRegionWritePlanEventBatch.isEmpty()) { + return false; + } + + flushBatchedEventsIfNecessary(); + return tryAppendToBatchAndFlushIfNecessary(event); + } + + private boolean tryAppendToBatchAndFlushIfNecessary(final PipeSchemaRegionWritePlanEvent event) + throws PipeException { + if (!schemaRegionWritePlanEventBatch.onEvent(event)) { + return false; + } + + if (schemaRegionWritePlanEventBatch.shouldEmit()) { + flushBatchedEventsIfNecessary(); + } + return true; + } + + private void flushBatchedEventsIfNecessary() throws PipeException { + if (Objects.isNull(schemaRegionWritePlanEventBatch) + || schemaRegionWritePlanEventBatch.isEmpty()) { + return; + } + + schemaRegionWritePlanEventBatch.recordBatchMetrics(); + doTransfer(schemaRegionWritePlanEventBatch); + schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount( + IoTDBSchemaRegionSink.class.getName(), true); + schemaRegionWritePlanEventBatch.onSuccess(); } private void doTransferWrapper( @@ -95,43 +168,56 @@ private void doTransferWrapper( private void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException { + doTransfer( + pipeSchemaRegionWritePlanEvent.getPlanNode(), + pipeSchemaRegionWritePlanEvent.getPipeName(), + pipeSchemaRegionWritePlanEvent.getCreationTime(), + pipeSchemaRegionWritePlanEvent.getPlanNode().toString()); + LOGGER.info("Successfully transferred schema event {}.", pipeSchemaRegionWritePlanEvent); + } + + private void doTransfer(final PipeSchemaRegionWritePlanEventBatch batch) throws PipeException { + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode = + batch.toPlanNode(); + doTransfer(planNode, batch.getPipeName(), batch.getCreationTime(), planNode.toString()); + LOGGER.info("Successfully transferred batched schema events, batch size {}.", batch.size()); + } + + private void doTransfer( + final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode, + final String pipeName, + final long creationTime, + final String eventDescription) + throws PipeException { final Pair clientAndStatus = clientManager.getClient(); final TPipeTransferResp resp; try { final TPipeTransferReq req = - compressIfNeeded( - PipeTransferPlanNodeReq.toTPipeTransferReq( - pipeSchemaRegionWritePlanEvent.getPlanNode())); + compressIfNeeded(PipeTransferPlanNodeReq.toTPipeTransferReq(planNode)); rateLimitIfNeeded( - pipeSchemaRegionWritePlanEvent.getPipeName(), - pipeSchemaRegionWritePlanEvent.getCreationTime(), - clientAndStatus.getLeft().getEndPoint(), - req.getBody().length); + pipeName, creationTime, clientAndStatus.getLeft().getEndPoint(), req.getBody().length); resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( String.format( "Network error when transfer schema region write plan %s, because %s.", - pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), e.getMessage()), + planNode.getType(), e.getMessage()), e); } final TSStatus status = resp.getStatus(); - // Only handle the failed statuses to avoid string format performance overhead - if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && resp.getStatus().getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { receiverStatusHandler.handle( status, String.format( "Transfer data node write plan %s error, result status %s.", - pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status), - pipeSchemaRegionWritePlanEvent.getPlanNode().toString(), + planNode.getType(), status), + eventDescription, true); } - - LOGGER.info("Successfully transferred schema event {}.", pipeSchemaRegionWritePlanEvent); } private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) @@ -245,6 +331,46 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq( return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } + @Override + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.discardEventsOfPipe( + pipeNameToDrop, creationTimeToDrop, regionId); + } + } + + @Override + public void close() { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.close(); + } + super.close(); + } + + @Override + public void setTabletBatchSizeHistogram(final Histogram tabletBatchSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram( + final Histogram tabletBatchTimeIntervalHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram( + tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) { + if (Objects.nonNull(schemaRegionWritePlanEventBatch)) { + schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram); + } + } + @Override protected void mayLimitRateAndRecordIO(final long requiredBytes) { // Do nothing diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java new file mode 100644 index 0000000000000..22a5a59662f32 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.metric.schema; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.type.Histogram; +import org.apache.iotdb.metrics.type.Rate; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PipeSchemaRegionSinkMetricsTest { + + @Test + public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws Exception { + final String taskId = "schema-task-" + System.nanoTime(); + boolean deregistered = false; + final AbstractMetricService metricService = Mockito.mock(AbstractMetricService.class); + final PipeSinkSubtask subtask = Mockito.mock(PipeSinkSubtask.class); + final Rate rate = Mockito.mock(Rate.class); + final Histogram batchSizeHistogram = Mockito.mock(Histogram.class); + final Histogram batchTimeHistogram = Mockito.mock(Histogram.class); + final Histogram eventSizeHistogram = Mockito.mock(Histogram.class); + + when(subtask.getTaskID()).thenReturn(taskId); + when(subtask.getAttributeSortedString()).thenReturn("schema_test"); + when(subtask.getCreationTime()).thenReturn(1L); + when(metricService.getOrCreateRate( + eq(Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) + .thenReturn(rate); + when(metricService.getOrCreateHistogram( + eq(Metric.PIPE_SCHEMA_BATCH_SIZE.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) + .thenReturn(batchSizeHistogram); + when(metricService.getOrCreateHistogram( + eq(Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"), + eq(Tag.CREATION_TIME.toString()), + eq("1"))) + .thenReturn(batchTimeHistogram); + when(metricService.getOrCreateHistogram( + eq(Metric.PIPE_CONNECTOR_BATCH_SIZE.toString()), + eq(MetricLevel.IMPORTANT), + eq(Tag.NAME.toString()), + eq("schema_test"))) + .thenReturn(eventSizeHistogram); + + final PipeSchemaRegionSinkMetrics metrics = PipeSchemaRegionSinkMetrics.getInstance(); + + final Field metricServiceField = + PipeSchemaRegionSinkMetrics.class.getDeclaredField("metricService"); + metricServiceField.setAccessible(true); + final Field connectorMapField = + PipeSchemaRegionSinkMetrics.class.getDeclaredField("connectorMap"); + connectorMapField.setAccessible(true); + final Field schemaRateMapField = + PipeSchemaRegionSinkMetrics.class.getDeclaredField("schemaRateMap"); + schemaRateMapField.setAccessible(true); + + ((Map) connectorMapField.get(metrics)).clear(); + ((Map) schemaRateMapField.get(metrics)).clear(); + metricServiceField.set(metrics, null); + + try { + metrics.register(subtask); + metrics.bindTo(metricService); + + verify(subtask).setTabletBatchSizeHistogram(batchSizeHistogram); + verify(subtask).setTabletBatchTimeIntervalHistogram(batchTimeHistogram); + verify(subtask).setEventSizeHistogram(eventSizeHistogram); + + metrics.deregister(taskId); + + verify(metricService) + .remove( + MetricType.HISTOGRAM, + Metric.PIPE_SCHEMA_BATCH_SIZE.toString(), + Tag.NAME.toString(), + "schema_test", + Tag.CREATION_TIME.toString(), + "1"); + verify(metricService) + .remove( + MetricType.HISTOGRAM, + Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(), + Tag.NAME.toString(), + "schema_test", + Tag.CREATION_TIME.toString(), + "1"); + verify(metricService) + .remove( + MetricType.HISTOGRAM, + Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(), + Tag.NAME.toString(), + "schema_test"); + verify(metricService) + .remove( + MetricType.RATE, + Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString(), + Tag.NAME.toString(), + "schema_test", + Tag.CREATION_TIME.toString(), + "1"); + deregistered = true; + } finally { + if (!deregistered) { + metrics.deregister(taskId); + } + ((Map) connectorMapField.get(metrics)).clear(); + ((Map) schemaRateMapField.get(metrics)).clear(); + metricServiceField.set(metrics, null); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java new file mode 100644 index 0000000000000..586b13b7667d2 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; +import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink; +import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.HashMap; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PipeSchemaRegionSinkTest { + + @Test + public void testSyncSinkFlushesBatchedEventsOnHeartbeat() throws Exception { + final TestIoTDBSchemaRegionSyncSink sink = new TestIoTDBSchemaRegionSyncSink(); + try { + final IoTDBDataNodeSyncClientManager clientManager = + Mockito.mock(IoTDBDataNodeSyncClientManager.class); + final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client = + Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class); + when(clientManager.getClient()).thenAnswer(invocation -> new Pair<>(client, true)); + when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667)); + when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp()); + + setField(sink, "clientManager", clientManager); + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent event = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + + sink.transfer(event); + Assert.assertFalse(event.isReleased()); + verify(client, never()).pipeTransfer(any(TPipeTransferReq.class)); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(event.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testSyncSinkFlushesBufferedEventsBeforeStandaloneTransfer() throws Exception { + final TestIoTDBSchemaRegionSyncSink sink = new TestIoTDBSchemaRegionSyncSink(); + try { + final IoTDBDataNodeSyncClientManager clientManager = + Mockito.mock(IoTDBDataNodeSyncClientManager.class); + final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client = + Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class); + when(clientManager.getClient()).thenAnswer(invocation -> new Pair<>(client, true)); + when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667)); + when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp()); + + setField(sink, "clientManager", clientManager); + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent batchedEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent standaloneEvent = + createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L); + + sink.transfer(batchedEvent); + sink.transfer(standaloneEvent); + + verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(batchedEvent.isReleased()); + Assert.assertTrue(standaloneEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testSyncSinkRetriesBatchingAfterFlushingIncompatibleBatch() throws Exception { + final TestIoTDBSchemaRegionSyncSink sink = new TestIoTDBSchemaRegionSyncSink(); + try { + final IoTDBDataNodeSyncClientManager clientManager = + Mockito.mock(IoTDBDataNodeSyncClientManager.class); + final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client = + Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class); + when(clientManager.getClient()).thenAnswer(invocation -> new Pair<>(client, true)); + when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667)); + when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp()); + + setField(sink, "clientManager", clientManager); + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent firstEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent secondEvent = + createBatchableEvent("root.db.d2.s1", "pipeB", 1L); + + sink.transfer(firstEvent); + sink.transfer(secondEvent); + + verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(firstEvent.isReleased()); + Assert.assertFalse(secondEvent.isReleased()); + Assert.assertEquals(1, getBatch(sink).size()); + Assert.assertEquals("pipeB", getBatch(sink).getPipeName()); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class)); + Assert.assertTrue(secondEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testAirGapSinkFlushesBatchedEventsOnHeartbeat() throws Exception { + final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); + try { + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent event = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + + sink.transfer(event); + Assert.assertFalse(event.isReleased()); + Assert.assertEquals(0, sink.getSendCount()); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + Assert.assertEquals(1, sink.getSendCount()); + Assert.assertTrue(event.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testAirGapSinkRetriesBatchingAfterFlushingIncompatibleBatch() throws Exception { + final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); + try { + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent firstEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent secondEvent = + createBatchableEvent("root.db.d2.s1", "pipeB", 1L); + + sink.transfer(firstEvent); + sink.transfer(secondEvent); + + Assert.assertEquals(1, sink.getSendCount()); + Assert.assertTrue(firstEvent.isReleased()); + Assert.assertFalse(secondEvent.isReleased()); + Assert.assertEquals(1, getBatch(sink).size()); + Assert.assertEquals("pipeB", getBatch(sink).getPipeName()); + + sink.transfer(new PipeHeartbeatEvent(-1, false)); + + Assert.assertEquals(2, sink.getSendCount()); + Assert.assertTrue(secondEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + @Test + public void testAirGapSinkFlushesBufferedEventsBeforeStandaloneTransfer() throws Exception { + final TestIoTDBSchemaRegionAirGapSink sink = new TestIoTDBSchemaRegionAirGapSink(); + try { + enableBatching(sink); + + final PipeSchemaRegionWritePlanEvent batchedEvent = + createBatchableEvent("root.db.d1.s1", "pipeA", 1L); + final PipeSchemaRegionWritePlanEvent standaloneEvent = + createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L); + + sink.transfer(batchedEvent); + sink.transfer(standaloneEvent); + + Assert.assertEquals(2, sink.getSendCount()); + Assert.assertTrue(batchedEvent.isReleased()); + Assert.assertTrue(standaloneEvent.isReleased()); + Assert.assertTrue(getBatch(sink).isEmpty()); + } finally { + sink.close(); + } + } + + private PipeParameters createParameters() { + return new PipeParameters( + new HashMap() { + { + put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "100000"); + put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576"); + } + }); + } + + private void enableBatching(final Object sink) throws Exception { + setField(sink, "isTabletBatchModeEnabled", true); + setField( + sink, + "schemaRegionWritePlanEventBatch", + new PipeSchemaRegionWritePlanEventBatch(createParameters())); + } + + private PipeSchemaRegionWritePlanEventBatch getBatch(final Object sink) throws Exception { + return (PipeSchemaRegionWritePlanEventBatch) + getFieldValue(sink, "schemaRegionWritePlanEventBatch"); + } + + private PipeSchemaRegionWritePlanEvent createBatchableEvent( + final String path, final String pipeName, final long creationTime) throws Exception { + return new PipeSchemaRegionWritePlanEvent( + new CreateTimeSeriesNode( + new org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path), + new org.apache.iotdb.commons.path.MeasurementPath(path), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + pipeName, + creationTime, + null, + null, + null, + null, + null, + null, + true, + false); + } + + private PipeSchemaRegionWritePlanEvent createNonBatchableEvent( + final String path, final String pipeName, final long creationTime) throws Exception { + return new PipeSchemaRegionWritePlanEvent( + new CreateTimeSeriesNode( + new org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path + "-p"), + new org.apache.iotdb.commons.path.MeasurementPath(path), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + java.util.Collections.singletonMap("prop", "v1"), + null, + null, + null), + pipeName, + creationTime, + null, + null, + null, + null, + null, + null, + true, + false); + } + + private TPipeTransferResp createSuccessResp() { + final TPipeTransferResp resp = new TPipeTransferResp(); + resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("success")); + return resp; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + Field field = findField(target.getClass(), fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private Object getFieldValue(final Object target, final String fieldName) throws Exception { + Field field = findField(target.getClass(), fieldName); + field.setAccessible(true); + return field.get(target); + } + + private Field findField(final Class clazz, final String fieldName) + throws NoSuchFieldException { + Class current = clazz; + while (current != null) { + try { + return current.getDeclaredField(fieldName); + } catch (NoSuchFieldException ignored) { + current = current.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } + + private static class TestIoTDBSchemaRegionSyncSink extends IoTDBSchemaRegionSink { + + @Override + public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) { + return req; + } + + @Override + public void rateLimitIfNeeded( + final String pipeName, + final long creationTime, + final TEndPoint endPoint, + final long bytesLength) { + // Do nothing in tests. + } + } + + private static class TestIoTDBSchemaRegionAirGapSink extends IoTDBSchemaRegionAirGapSink { + + private int sendCount = 0; + + private TestIoTDBSchemaRegionAirGapSink() { + sockets.add(new AirGapSocket("127.0.0.1", 6667)); + isSocketAlive.add(true); + } + + @Override + protected int nextSocketIndex() { + return 0; + } + + @Override + protected boolean send( + final String pipeName, + final long creationTime, + final AirGapSocket socket, + final byte[] bytes) { + sendCount++; + return true; + } + + private int getSendCount() { + return sendCount; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java new file mode 100644 index 0000000000000..2f31fdb41f796 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink; + +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup; +import org.apache.iotdb.metrics.type.Histogram; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PipeSchemaRegionWritePlanEventBatchTest { + + @Test + public void testBatchTimeSeriesEvents() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + Collections.singletonMap("tag", "v1"), + Collections.singletonMap("attr", "a1"), + "alias1"), + "pipeA", + 1L))); + + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateAlignedTimeSeriesNode( + new PlanNodeId("2"), + new PartialPath("root.db.d2"), + Arrays.asList("s1", "s2"), + Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE), + Arrays.asList(TSEncoding.RLE, TSEncoding.GORILLA), + Arrays.asList(CompressionType.SNAPPY, CompressionType.ZSTD), + Arrays.asList("alias2", null), + Arrays.asList(Collections.singletonMap("tag", "v2"), null), + Arrays.asList(Collections.singletonMap("attr", "a2"), null)), + "pipeA", + 1L))); + + final PlanNode planNode = batch.toPlanNode(); + Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode); + + final Map> deviceMap = + ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap(); + Assert.assertEquals(2, deviceMap.size()); + + final Pair d1Group = deviceMap.get(new PartialPath("root.db.d1")); + Assert.assertNotNull(d1Group); + Assert.assertFalse(d1Group.getLeft()); + Assert.assertEquals(Collections.singletonList("s1"), d1Group.getRight().getMeasurements()); + Assert.assertEquals(Collections.singletonList("alias1"), d1Group.getRight().getAliasList()); + Assert.assertEquals( + Collections.singletonList(Collections.singletonMap("tag", "v1")), + d1Group.getRight().getTagsList()); + Assert.assertEquals( + Collections.singletonList(Collections.singletonMap("attr", "a1")), + d1Group.getRight().getAttributesList()); + + final Pair d2Group = deviceMap.get(new PartialPath("root.db.d2")); + Assert.assertNotNull(d2Group); + Assert.assertTrue(d2Group.getLeft()); + Assert.assertEquals(Arrays.asList("s1", "s2"), d2Group.getRight().getMeasurements()); + Assert.assertEquals(Arrays.asList("alias2", null), d2Group.getRight().getAliasList()); + } + } + + @Test + public void testBatchAdditionalTimeSeriesNodeTypes() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + final Map createMultiMap = new HashMap<>(); + createMultiMap.put(new PartialPath("root.db.d1"), createMeasurementGroup("s1", "alias1")); + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateMultiTimeSeriesNode(new PlanNodeId("1"), createMultiMap), + "pipeA", + 1L))); + + Assert.assertTrue( + batch.onEvent( + createEvent( + new InternalCreateTimeSeriesNode( + new PlanNodeId("2"), + new PartialPath("root.db.d2"), + createMeasurementGroup("s2", "alias2"), + true), + "pipeA", + 1L))); + + final Map> internalCreateMultiMap = + new HashMap<>(); + internalCreateMultiMap.put( + new PartialPath("root.db.d3"), new Pair<>(false, createMeasurementGroup("s3", "alias3"))); + Assert.assertTrue( + batch.onEvent( + createEvent( + new InternalCreateMultiTimeSeriesNode( + new PlanNodeId("3"), internalCreateMultiMap), + "pipeA", + 1L))); + + final PlanNode planNode = batch.toPlanNode(); + Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode); + + final Map> deviceMap = + ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap(); + Assert.assertEquals(3, deviceMap.size()); + Assert.assertFalse(deviceMap.get(new PartialPath("root.db.d1")).getLeft()); + Assert.assertTrue(deviceMap.get(new PartialPath("root.db.d2")).getLeft()); + Assert.assertFalse(deviceMap.get(new PartialPath("root.db.d3")).getLeft()); + Assert.assertEquals( + Collections.singletonList("s2"), + deviceMap.get(new PartialPath("root.db.d2")).getRight().getMeasurements()); + Assert.assertEquals( + Collections.singletonList("alias3"), + deviceMap.get(new PartialPath("root.db.d3")).getRight().getAliasList()); + } + } + + @Test + public void testBatchTemplateActivationEvents() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + Assert.assertTrue( + batch.onEvent( + createEvent( + new ActivateTemplateNode( + new PlanNodeId("1"), new PartialPath("root.db.d1"), 1, 10), + "pipeA", + 1L))); + + final Map> templateActivationMap = new HashMap<>(); + templateActivationMap.put(new PartialPath("root.db.d2"), new Pair<>(2, 20)); + Assert.assertTrue( + batch.onEvent( + createEvent( + new BatchActivateTemplateNode(new PlanNodeId("2"), templateActivationMap), + "pipeA", + 1L))); + + final Map> internalTemplateActivationMap = + new HashMap<>(); + internalTemplateActivationMap.put(new PartialPath("root.db.d3"), new Pair<>(3, 30)); + Assert.assertTrue( + batch.onEvent( + createEvent( + new InternalBatchActivateTemplateNode( + new PlanNodeId("3"), internalTemplateActivationMap), + "pipeA", + 1L))); + + final PlanNode planNode = batch.toPlanNode(); + Assert.assertTrue(planNode instanceof BatchActivateTemplateNode); + + final Map> batchedMap = + ((BatchActivateTemplateNode) planNode).getTemplateActivationMap(); + Assert.assertEquals(3, batchedMap.size()); + Assert.assertEquals(new Pair<>(10, 1), batchedMap.get(new PartialPath("root.db.d1"))); + Assert.assertEquals(new Pair<>(2, 20), batchedMap.get(new PartialPath("root.db.d2"))); + Assert.assertEquals(new Pair<>(3, 30), batchedMap.get(new PartialPath("root.db.d3"))); + + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("4"), + new MeasurementPath("root.db.d4.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L))); + } + } + + @Test + public void testRejectDifferentPipePropsAndAlignmentConflict() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + Collections.singletonMap("prop", "v1"), + null, + null, + null), + "pipeA", + 1L))); + + final Map measurementGroupMapWithProps = new HashMap<>(); + measurementGroupMapWithProps.put( + new PartialPath("root.db.d2"), createMeasurementGroupWithProps("s1")); + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateMultiTimeSeriesNode(new PlanNodeId("2"), measurementGroupMapWithProps), + "pipeA", + 1L))); + + Assert.assertTrue( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("3"), + new MeasurementPath("root.db.d3.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L))); + + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateAlignedTimeSeriesNode( + new PlanNodeId("4"), + new PartialPath("root.db.d3"), + Collections.singletonList("s2"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(TSEncoding.RLE), + Collections.singletonList(CompressionType.SNAPPY), + null, + null, + null), + "pipeA", + 1L))); + + Assert.assertFalse( + batch.onEvent( + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("5"), + new MeasurementPath("root.db.d4.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeB", + 1L))); + } + } + + @Test + public void testDiscardEventsOfPipeRebuildsBatchAndResetsEmitWindow() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(10000, 1048576))) { + final PipeSchemaRegionWritePlanEvent removedEvent = + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L, + 1); + final PipeSchemaRegionWritePlanEvent remainingEvent = + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("2"), + new MeasurementPath("root.db.d2.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L, + 2); + + Assert.assertTrue(batch.onEvent(removedEvent)); + Assert.assertTrue(batch.onEvent(remainingEvent)); + + setField(batch, "firstEventProcessingTime", System.currentTimeMillis() - 20000); + Assert.assertTrue(batch.shouldEmit()); + + batch.discardEventsOfPipe("pipeA", 1L, 1); + + Assert.assertTrue(removedEvent.isReleased()); + Assert.assertFalse(remainingEvent.isReleased()); + Assert.assertEquals(1, batch.size()); + Assert.assertEquals("pipeA", batch.getPipeName()); + Assert.assertEquals(1L, batch.getCreationTime()); + Assert.assertFalse(batch.shouldEmit()); + + final PlanNode planNode = batch.toPlanNode(); + final Map> deviceMap = + ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap(); + Assert.assertEquals(1, deviceMap.size()); + Assert.assertTrue(deviceMap.containsKey(new PartialPath("root.db.d2"))); + } + } + + @Test + public void testRecordMetricsAndCloseReleaseEvents() throws Exception { + try (PipeSchemaRegionWritePlanEventBatch batch = + new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 1048576))) { + final Histogram batchSizeHistogram = Mockito.mock(Histogram.class); + final Histogram batchTimeIntervalHistogram = Mockito.mock(Histogram.class); + final Histogram eventSizeHistogram = Mockito.mock(Histogram.class); + batch.setBatchSizeHistogram(batchSizeHistogram); + batch.setBatchTimeIntervalHistogram(batchTimeIntervalHistogram); + batch.setEventSizeHistogram(eventSizeHistogram); + + final PipeSchemaRegionWritePlanEvent event = + createEvent( + new CreateTimeSeriesNode( + new PlanNodeId("1"), + new MeasurementPath("root.db.d1.s1"), + TSDataType.INT64, + TSEncoding.PLAIN, + CompressionType.LZ4, + null, + null, + null, + null), + "pipeA", + 1L); + + Assert.assertTrue(batch.onEvent(event)); + batch.recordBatchMetrics(); + + verify(batchTimeIntervalHistogram, times(1)).update(anyLong()); + verify(batchSizeHistogram, times(1)).update(anyLong()); + verify(eventSizeHistogram, times(1)).update(1L); + + batch.close(); + Assert.assertTrue(event.isReleased()); + } + } + + private PipeParameters createParameters(final int delayInMs, final long batchSizeInBytes) { + return new PipeParameters( + new HashMap() { + { + put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, String.valueOf(delayInMs)); + put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, String.valueOf(batchSizeInBytes)); + } + }); + } + + private MeasurementGroup createMeasurementGroup(final String measurement, final String alias) { + final MeasurementGroup measurementGroup = new MeasurementGroup(); + measurementGroup.addMeasurement( + measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4); + measurementGroup.addAlias(alias); + measurementGroup.addTags(Collections.singletonMap("tag", alias)); + measurementGroup.addAttributes(Collections.singletonMap("attr", alias)); + return measurementGroup; + } + + private MeasurementGroup createMeasurementGroupWithProps(final String measurement) { + final MeasurementGroup measurementGroup = new MeasurementGroup(); + measurementGroup.addMeasurement( + measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4); + measurementGroup.addProps(Collections.singletonMap("prop", "v1")); + return measurementGroup; + } + + private PipeSchemaRegionWritePlanEvent createEvent( + final PlanNode planNode, final String pipeName, final long creationTime) { + return createEvent(planNode, pipeName, creationTime, -1); + } + + private PipeSchemaRegionWritePlanEvent createEvent( + final PlanNode planNode, final String pipeName, final long creationTime, final int regionId) { + final PipeSchemaRegionWritePlanEvent event = + new PipeSchemaRegionWritePlanEvent( + planNode, pipeName, creationTime, null, null, null, null, null, null, true, false); + event.setCommitterKeyAndCommitId(new CommitterKey(pipeName, creationTime, regionId, -1), 1L); + return event; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index ec16c181e618b..3305191b5d3ad 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -152,8 +152,10 @@ public enum Metric { UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"), UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"), PIPE_INSERT_NODE_BATCH_SIZE("pipe_insert_node_batch_size"), + PIPE_SCHEMA_BATCH_SIZE("pipe_schema_batch_size"), PIPE_TSFILE_BATCH_SIZE("pipe_tsfile_batch_size"), PIPE_INSERT_NODE_BATCH_TIME_COST("pipe_insert_node_batch_time_cost"), + PIPE_SCHEMA_BATCH_TIME_COST("pipe_schema_batch_time_cost"), PIPE_TSFILE_BATCH_TIME_COST("pipe_tsfile_batch_time_cost"), PIPE_CONNECTOR_BATCH_SIZE("pipe_connector_batch_size"), PIPE_PENDING_HANDLERS_SIZE("pipe_pending_handlers_size"),