diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index f3ddfbab7b29..0319cf5be903 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -843,13 +843,20 @@ public TSStatus visitPipeEnrichedNonWritePlanNode( public TSStatus visitPipeOperateSchemaQueueNode( final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) { final SchemaRegionId id = schemaRegion.getSchemaRegionId(); - final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id); - if (node.isOpen() && !queue.isOpened()) { - logger.info("Opened pipe listening queue on schema region {}", id); - queue.open(); - } else if (!node.isOpen() && queue.isOpened()) { - logger.info("Closed pipe listening queue on schema region {}", id); - queue.close(); + if (node.isOpen()) { + final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id); + if (!queue.isOpened()) { + logger.info("Opened pipe listening queue on schema region {}", id); + queue.open(); + } + } else { + final SchemaRegionListeningQueue queue = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(id); + if (queue != null && queue.isOpened()) { + logger.info("Closed pipe listening queue on schema region {}", id); + queue.close(); + PipeDataNodeAgent.runtime().cleanupSchemaListenerIfUnused(id); + } } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java index edab5862d11e..d904da7df315 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java @@ -125,25 +125,34 @@ public boolean isReadOnly() { @Override public boolean takeSnapshot(final File snapshotDir) { - if (schemaRegion.createSnapshot(snapshotDir) - && PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .createSnapshot(snapshotDir)) { - listen2Snapshot4PipeListener(true); + if (!schemaRegion.createSnapshot(snapshotDir)) { + return false; + } + + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener == null) { return true; } - return false; + + if (!listener.createSnapshot(snapshotDir)) { + return false; + } + listen2Snapshot4PipeListener(true); + return true; } @Override public void loadSnapshot(final File latestSnapshotRootDir) { schemaRegion.loadSnapshot(latestSnapshotRootDir); - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .loadSnapshot(latestSnapshotRootDir); - // We recompute the snapshot for pipe listener when loading snapshot - // to recover the newest snapshot in cache - listen2Snapshot4PipeListener(false); + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener != null) { + listener.loadSnapshot(latestSnapshotRootDir); + // We recompute the snapshot for pipe listener when loading snapshot + // to recover the newest snapshot in cache + listen2Snapshot4PipeListener(false); + } } public void listen2Snapshot4PipeListener(final boolean isTmp) { @@ -154,7 +163,10 @@ public void listen2Snapshot4PipeListener(final boolean isTmp) { .toString(), isTmp); final SchemaRegionListeningQueue listener = - PipeDataNodeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId()); + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener == null) { + return; + } if (Objects.isNull(snapshotPaths) || Objects.isNull(snapshotPaths.get(0))) { if (listener.isOpened()) { logger.warn( @@ -181,9 +193,11 @@ public TSStatus write(final IConsensusRequest request) { final TSStatus result = ((PlanNode) request).accept(new SchemaExecutionVisitor(), schemaRegion); if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .tryListenToNode((PlanNode) request); + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener != null) { + listener.tryListenToNode((PlanNode) request); + } } return result; } catch (final IllegalArgumentException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index 8f75b7b18854..3d3547f649ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -148,6 +148,10 @@ public SchemaRegionListeningQueue schemaListener(SchemaRegionId schemaRegionId) return regionListenerManager.listener(schemaRegionId); } + public SchemaRegionListeningQueue schemaListenerIfPresent(SchemaRegionId schemaRegionId) { + return regionListenerManager.listenerIfPresent(schemaRegionId); + } + public int increaseAndGetSchemaListenerReferenceCount(SchemaRegionId schemaRegionId) { return regionListenerManager.increaseAndGetReferenceCount(schemaRegionId); } @@ -156,6 +160,14 @@ public int decreaseAndGetSchemaListenerReferenceCount(SchemaRegionId schemaRegio return regionListenerManager.decreaseAndGetReferenceCount(schemaRegionId); } + public void cleanupSchemaListenerIfUnused(SchemaRegionId schemaRegionId) { + regionListenerManager.cleanupListenerIfUnused(schemaRegionId); + } + + public void clearSchemaRegionState(SchemaRegionId schemaRegionId) { + regionListenerManager.clearSchemaRegionState(schemaRegionId); + } + public void notifySchemaLeaderReady(SchemaRegionId schemaRegionId) { regionListenerManager.notifyLeaderReady(schemaRegionId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java index d28929b36a21..3b03146256bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.agent.runtime; import org.apache.iotdb.commons.consensus.SchemaRegionId; -import org.apache.iotdb.commons.pipe.agent.task.PipeTask; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionListenerMetrics; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; @@ -34,6 +33,7 @@ public class PipeSchemaRegionListenerManager { private final Map id2ListenerMap = new ConcurrentHashMap<>(); + private final Map id2LeaderReadyMap = new ConcurrentHashMap<>(); public synchronized Set regionIds() { return id2ListenerMap.keySet(); @@ -44,6 +44,12 @@ public synchronized SchemaRegionListeningQueue listener(final SchemaRegionId sch .listeningQueue; } + public synchronized SchemaRegionListeningQueue listenerIfPresent( + final SchemaRegionId schemaRegionId) { + final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId); + return listener == null ? null : listener.listeningQueue; + } + public synchronized int increaseAndGetReferenceCount(final SchemaRegionId schemaRegionId) { return id2ListenerMap .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) @@ -52,28 +58,55 @@ public synchronized int increaseAndGetReferenceCount(final SchemaRegionId schema } public synchronized int decreaseAndGetReferenceCount(final SchemaRegionId schemaRegionId) { - return id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .listeningQueueReferenceCount - .updateAndGet(v -> v > 0 ? v - 1 : 0); + final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId); + if (listener == null) { + return 0; + } + + final int referenceCount = + listener.listeningQueueReferenceCount.updateAndGet(v -> v > 0 ? v - 1 : 0); + if (referenceCount == 0 && !listener.listeningQueue.isOpened()) { + cleanupListenerIfUnused(schemaRegionId, listener); + } + return referenceCount; } public synchronized void notifyLeaderReady(final SchemaRegionId schemaRegionId) { - id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .notifyLeaderReady(); + id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new AtomicBoolean()).set(true); } public synchronized void notifyLeaderUnavailable(final SchemaRegionId schemaRegionId) { - id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .notifyLeaderUnavailable(); + id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new AtomicBoolean()).set(false); } public synchronized boolean isLeaderReady(final SchemaRegionId schemaRegionId) { - return id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .isLeaderReady(); + final AtomicBoolean isLeaderReady = id2LeaderReadyMap.get(schemaRegionId); + return isLeaderReady != null && isLeaderReady.get(); + } + + public synchronized void cleanupListenerIfUnused(final SchemaRegionId schemaRegionId) { + final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId); + if (listener != null) { + cleanupListenerIfUnused(schemaRegionId, listener); + } + } + + public synchronized void clearSchemaRegionState(final SchemaRegionId schemaRegionId) { + final PipeSchemaRegionListener listener = id2ListenerMap.remove(schemaRegionId); + if (listener != null) { + PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId()); + } + id2LeaderReadyMap.remove(schemaRegionId); + } + + private void cleanupListenerIfUnused( + final SchemaRegionId schemaRegionId, final PipeSchemaRegionListener listener) { + if (listener.listeningQueueReferenceCount.get() > 0 || listener.listeningQueue.isOpened()) { + return; + } + if (id2ListenerMap.remove(schemaRegionId, listener)) { + PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId()); + } } private static class PipeSchemaRegionListener { @@ -81,33 +114,9 @@ private static class PipeSchemaRegionListener { private final SchemaRegionListeningQueue listeningQueue = new SchemaRegionListeningQueue(); private final AtomicInteger listeningQueueReferenceCount = new AtomicInteger(0); - private final AtomicBoolean isLeaderReady = new AtomicBoolean(false); - protected PipeSchemaRegionListener(final SchemaRegionId schemaRegionId) { PipeSchemaRegionListenerMetrics.getInstance() .register(listeningQueue, schemaRegionId.getId()); } - - /** - * Get leader ready state, DO NOT use consensus layer's leader ready flag because - * SimpleConsensus' ready flag is always {@code true}. Note that this flag has nothing to do - * with listening and a {@link PipeTask} starts only iff the current node is a leader and ready. - * - * @return {@code true} iff the current node is a leader and ready - */ - private boolean isLeaderReady() { - return isLeaderReady.get(); - } - - // Leader ready flag has the following effect - // 1. The linked list starts serving only after leader gets ready - // 2. Config pipe task is only created after leader gets ready - private void notifyLeaderReady() { - isLeaderReady.set(true); - } - - private void notifyLeaderUnavailable() { - isLeaderReady.set(false); - } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 06d6a512b30a..36e1fe1ae9fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -39,6 +39,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; +import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; @@ -59,6 +60,7 @@ import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -267,10 +269,14 @@ private Set clearSchemaRegionListeningQueueIfNecessary( } schemaRegionId2ListeningQueueNewFirstIndex.forEach( - (schemaRegionId, listeningQueueNewFirstIndex) -> - PipeDataNodeAgent.runtime() - .schemaListener(new SchemaRegionId(schemaRegionId)) - .removeBefore(listeningQueueNewFirstIndex)); + (schemaRegionId, listeningQueueNewFirstIndex) -> { + final SchemaRegionListeningQueue listeningQueue = + PipeDataNodeAgent.runtime() + .schemaListenerIfPresent(new SchemaRegionId(schemaRegionId)); + if (listeningQueue != null) { + listeningQueue.removeBefore(listeningQueueNewFirstIndex); + } + }); return schemaRegionId2ListeningQueueNewFirstIndex.keySet(); } @@ -323,6 +329,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); + PipeEventCommitManager.getInstance().clear(pipeName, creationTime); return true; } @@ -351,6 +358,7 @@ protected boolean dropPipe(final String pipeName) { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); + PipeEventCommitManager.getInstance().clear(pipeName, creationTime); // When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the // subscribed pipe, so the subscription needs to be manually marked as completed. if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 6535d371a915..b9d90fd8a68a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -139,12 +139,14 @@ public void unbindFrom(final AbstractMetricService metricService) { } private void removeMetrics(final String pipeID) { - removeAutoGauge(pipeID); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.remove(pipeID); + if (Objects.nonNull(operator) && Objects.nonNull(metricService)) { + removeAutoGauge(operator); + } } - private void removeAutoGauge(final String pipeID) { - final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.get(pipeID); + private void removeAutoGauge(final PipeDataNodeRemainingEventAndTimeOperator operator) { metricService.remove( MetricType.AUTO_GAUGE, Metric.PIPE_DATANODE_REMAINING_EVENT_COUNT.toString(), @@ -190,14 +192,31 @@ private void removeAutoGauge(final String pipeID) { Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(), Tag.NAME.toString(), operator.getPipeName()); - remainingEventAndTimeOperatorMap.remove(pipeID); } //////////////////////////// register & deregister (pipe integration) //////////////////////////// + private static String generatePipeID(final String pipeName, final long creationTime) { + return pipeName + "_" + creationTime; + } + + private boolean isPipeAlive(final String pipeName, final long creationTime) { + return PipeDataNodeAgent.task().getPipeCreationTime(pipeName) == creationTime; + } + + private PipeDataNodeRemainingEventAndTimeOperator getOrCreateOperatorIfPipeAlive( + final String pipeName, final long creationTime) { + if (!isPipeAlive(pipeName, creationTime)) { + return null; + } + return remainingEventAndTimeOperatorMap.computeIfAbsent( + generatePipeID(pipeName, creationTime), + key -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); + } + public void register(final IoTDBDataRegionSource source) { // The metric is global thus the regionId is omitted - final String pipeID = source.getPipeName() + "_" + source.getCreationTime(); + final String pipeID = generatePipeID(source.getPipeName(), source.getCreationTime()); remainingEventAndTimeOperatorMap.computeIfAbsent( pipeID, k -> @@ -210,7 +229,7 @@ public void register(final IoTDBDataRegionSource source) { public void register(final IoTDBSchemaRegionSource source) { // The metric is global thus the regionId is omitted - final String pipeID = source.getPipeName() + "_" + source.getCreationTime(); + final String pipeID = generatePipeID(source.getPipeName(), source.getCreationTime()); remainingEventAndTimeOperatorMap .computeIfAbsent( pipeID, @@ -224,19 +243,20 @@ public void register(final IoTDBSchemaRegionSource source) { } public void increaseInsertNodeEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseInsertNodeEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseInsertNodeEventCount(); + } } public void decreaseInsertNodeEventCount( final String pipeName, final long creationTime, final long transferTime) { final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.isNull(operator)) { + return; + } operator.decreaseInsertNodeEventCount(); @@ -247,46 +267,44 @@ public void decreaseInsertNodeEventCount( public void updateInsertNodeTransferTimer( final String pipeName, final long creationTime, final long transferTime) { - if (transferTime > 0) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .getInsertNodeTransferTimer() - .update(transferTime, TimeUnit.NANOSECONDS); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator) && transferTime > 0) { + operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); } } public void increaseRawTabletEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseRawTabletEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseRawTabletEventCount(); + } } public void decreaseRawTabletEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .decreaseRawTabletEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator)) { + operator.decreaseRawTabletEventCount(); + } } public void increaseTsFileEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseTsFileEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseTsFileEventCount(); + } } public void decreaseTsFileEventCount( final String pipeName, final long creationTime, final long transferTime) { final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.isNull(operator)) { + return; + } operator.decreaseTsFileEventCount(); @@ -297,30 +315,27 @@ public void decreaseTsFileEventCount( public void updateTsFileTransferTimer( final String pipeName, final long creationTime, final long transferTime) { - if (transferTime > 0) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .getTsFileTransferTimer() - .update(transferTime, TimeUnit.NANOSECONDS); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator) && transferTime > 0) { + operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); } } public void increaseHeartbeatEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseHeartbeatEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseHeartbeatEventCount(); + } } public void decreaseHeartbeatEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .decreaseHeartbeatEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator)) { + operator.decreaseHeartbeatEventCount(); + } } public void thawRate(final String pipeID) { @@ -350,9 +365,7 @@ public void deregister(final String pipeID) { pipeID); return; } - if (Objects.nonNull(metricService)) { - removeMetrics(pipeID); - } + removeMetrics(pipeID); } public void markRegionCommit(final String pipeID, final boolean isDataRegion) { @@ -395,7 +408,7 @@ public Pair getRemainingEventAndTime( final String pipeName, final long creationTime) { final PipeDataNodeRemainingEventAndTimeOperator operator = remainingEventAndTimeOperatorMap.computeIfAbsent( - pipeName + "_" + creationTime, + generatePipeID(pipeName, creationTime), k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); return new Pair<>(operator.getRemainingNonHeartbeatEvents(), operator.getRemainingTime()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java index f9436377bb3c..90260fd17fc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java @@ -110,39 +110,47 @@ public void unbindFrom(final AbstractMetricService metricService) { } private void removeMetrics(final String pipeID) { + pipeTimerMap.remove(pipeID); metricService.remove( MetricType.TIMER, Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(), Tag.NAME.toString(), pipeID); - pipeTimerMap.remove(pipeID); + pipeRateMap.remove(pipeID); metricService.remove( MetricType.RATE, Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(), Tag.NAME.toString(), pipeID); - pipeRateMap.remove(pipeID); + pipeTabletCountMap.remove(pipeID); metricService.remove( MetricType.COUNTER, Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(), Tag.NAME.toString(), pipeID); - pipeTabletCountMap.remove(pipeID); + pipeTabletMemoryMap.remove(pipeID); metricService.remove( MetricType.COUNTER, Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(), Tag.NAME.toString(), pipeID); - pipeTabletMemoryMap.remove(pipeID); + pipeParseFileCountMap.remove(pipeID); metricService.remove( MetricType.COUNTER, Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(), Tag.NAME.toString(), pipeID); + } + + private void removePipeStats(final String pipeID) { + pipeTimerMap.remove(pipeID); + pipeRateMap.remove(pipeID); + pipeTabletCountMap.remove(pipeID); + pipeTabletMemoryMap.remove(pipeID); pipeParseFileCountMap.remove(pipeID); } @@ -165,6 +173,8 @@ public void deregister(final String pipeID) { try { if (Objects.nonNull(metricService)) { removeMetrics(pipeID); + } else { + removePipeStats(pipeID); } } finally { pipe.remove(pipeID); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 63bdbd1daec3..b6531897ba30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -842,12 +842,20 @@ public void collectWindowOutputs( @Override public void close() throws Exception { - if (Objects.nonNull(pipeName) - && pipeName2referenceCountMap.compute( - pipeName, (name, count) -> Objects.nonNull(count) ? count - 1 : 0) - == 0) { - pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).clear(); - pipeName2timeSeries2TimeSeriesRuntimeStateMap.remove(pipeName); + boolean isLastReference = false; + if (Objects.nonNull(pipeName)) { + isLastReference = + pipeName2referenceCountMap.computeIfPresent( + pipeName, (name, count) -> count > 1 ? count - 1 : null) + == null; + } + if (isLastReference) { + final ConcurrentMap> + timeSeries2RuntimeStateMap = + pipeName2timeSeries2TimeSeriesRuntimeStateMap.remove(pipeName); + if (timeSeries2RuntimeStateMap != null) { + timeSeries2RuntimeStateMap.clear(); + } pipeName2LastValueReceiveTimeMap.remove(pipeName); } if (Objects.nonNull(windowingProcessor)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 554c6c43dfbb..4030ae580c71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -743,6 +743,7 @@ public boolean isEnableSendTsFileLimit() { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index e8c4420861c5..b255e67404ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -601,6 +601,7 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); if (Objects.nonNull(tabletBatchBuilder)) { tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java index 4fc2cfbff9c6..1cd499632406 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.rpc.RpcUtils; @@ -203,7 +204,12 @@ public TSStatus deleteDataRegion(DataRegionId dataRegionId) { public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) { try { schemaEngine.deleteSchemaRegion(schemaRegionId); - PipeDataNodeAgent.runtime().schemaListener(schemaRegionId).close(); + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegionId); + if (listener != null) { + listener.close(); + } + PipeDataNodeAgent.runtime().clearSchemaRegionState(schemaRegionId); schemaRegionLockMap.remove(schemaRegionId); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); } catch (MetadataException e) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java new file mode 100644 index 000000000000..3e08d23978b2 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.runtime; + +import org.apache.iotdb.commons.consensus.SchemaRegionId; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; + +import org.junit.Assert; +import org.junit.Test; + +public class PipeSchemaRegionListenerManagerTest { + + @Test + public void testLeaderStateDoesNotCreateListener() { + final PipeSchemaRegionListenerManager manager = new PipeSchemaRegionListenerManager(); + final SchemaRegionId schemaRegionId = new SchemaRegionId(1); + + manager.notifyLeaderReady(schemaRegionId); + + Assert.assertTrue(manager.isLeaderReady(schemaRegionId)); + Assert.assertTrue(manager.regionIds().isEmpty()); + Assert.assertNull(manager.listenerIfPresent(schemaRegionId)); + } + + @Test + public void testCleanupUnusedClosedListener() { + final PipeSchemaRegionListenerManager manager = new PipeSchemaRegionListenerManager(); + final SchemaRegionId schemaRegionId = new SchemaRegionId(2); + + final SchemaRegionListeningQueue listeningQueue = manager.listener(schemaRegionId); + Assert.assertEquals(1, manager.increaseAndGetReferenceCount(schemaRegionId)); + + listeningQueue.open(); + Assert.assertEquals(0, manager.decreaseAndGetReferenceCount(schemaRegionId)); + Assert.assertNotNull(manager.listenerIfPresent(schemaRegionId)); + + listeningQueue.close(); + manager.cleanupListenerIfUnused(schemaRegionId); + + Assert.assertNull(manager.listenerIfPresent(schemaRegionId)); + Assert.assertTrue(manager.regionIds().isEmpty()); + } + + @Test + public void testAutoCleanupAfterLastReferenceReleased() { + final PipeSchemaRegionListenerManager manager = new PipeSchemaRegionListenerManager(); + final SchemaRegionId schemaRegionId = new SchemaRegionId(3); + + manager.increaseAndGetReferenceCount(schemaRegionId); + + Assert.assertEquals(0, manager.decreaseAndGetReferenceCount(schemaRegionId)); + Assert.assertNull(manager.listenerIfPresent(schemaRegionId)); + Assert.assertTrue(manager.regionIds().isEmpty()); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 99d452987943..41efbef9c047 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -60,6 +60,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -104,10 +105,14 @@ public abstract class PipeTaskAgent { protected final PipeMetaKeeper pipeMetaKeeper; protected final PipeTaskManager pipeTaskManager; + private final Map pipeNameWithCreationTime2FloatingMemoryUsageInByteMap; + private final Map> pipeName2CreationTimeSetMap; protected PipeTaskAgent() { pipeMetaKeeper = new PipeMetaKeeper(); pipeTaskManager = new PipeTaskManager(); + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap = new ConcurrentHashMap<>(); + pipeName2CreationTimeSetMap = new ConcurrentHashMap<>(); // Help PipeEndPointRateLimiter to check if the pipe is still alive PipeEndPointRateLimiter.setTaskAgent(this); @@ -601,6 +606,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) { // Remove pipe meta from pipe meta keeper pipeMetaKeeper.removePipeMeta(pipeName); + cleanupFloatingMemoryUsageCounterIfNecessary(pipeName, creationTime); return true; } @@ -639,6 +645,8 @@ protected boolean dropPipe(final String pipeName) { // Remove pipe meta from pipe meta keeper pipeMetaKeeper.removePipeMeta(pipeName); + cleanupFloatingMemoryUsageCounterIfNecessary( + pipeName, existedPipeMeta.getStaticMeta().getCreationTime()); return true; } @@ -1170,6 +1178,57 @@ public static boolean isSnapshotMode(final PipeParameters parameters) { ///////////////////////// Maintain meta info ///////////////////////// + private static String generatePipeNameWithCreationTime( + final String pipeName, final long creationTime) { + return pipeName + "_" + creationTime; + } + + private AtomicLong getOrCreateFloatingMemoryUsageCounter( + final String pipeName, final long creationTime) { + pipeName2CreationTimeSetMap + .computeIfAbsent(pipeName, key -> ConcurrentHashMap.newKeySet()) + .add(creationTime); + return pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.computeIfAbsent( + generatePipeNameWithCreationTime(pipeName, creationTime), key -> new AtomicLong(0)); + } + + private void tryCleanupFloatingMemoryUsageCounter( + final String pipeName, final long creationTime, final AtomicLong counter) { + if (counter.get() != 0) { + return; + } + + final PipeMeta currentPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + if (Objects.nonNull(currentPipeMeta) + && currentPipeMeta.getStaticMeta().getCreationTime() == creationTime) { + return; + } + + final String pipeNameWithCreationTime = + generatePipeNameWithCreationTime(pipeName, creationTime); + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.remove(pipeNameWithCreationTime, counter); + + if (!pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.containsKey( + pipeNameWithCreationTime)) { + pipeName2CreationTimeSetMap.computeIfPresent( + pipeName, + (key, creationTimes) -> { + creationTimes.remove(creationTime); + return creationTimes.isEmpty() ? null : creationTimes; + }); + } + } + + private void cleanupFloatingMemoryUsageCounterIfNecessary( + final String pipeName, final long creationTime) { + final AtomicLong counter = + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.get( + generatePipeNameWithCreationTime(pipeName, creationTime)); + if (Objects.nonNull(counter)) { + tryCleanupFloatingMemoryUsageCounter(pipeName, creationTime, counter); + } + } + public long getPipeCreationTime(final String pipeName) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime(); @@ -1178,7 +1237,7 @@ public long getPipeCreationTime(final String pipeName) { public String getPipeNameWithCreationTime(final String pipeName, final long creationTime) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); return pipeMeta == null - ? pipeName + "_" + creationTime + ? generatePipeNameWithCreationTime(pipeName, creationTime) : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getPipeNameWithCreationTime(); } @@ -1192,22 +1251,23 @@ public CommitterKey getCommitterKey( } public long getAllFloatingMemoryUsageInByte() { - final AtomicLong bytes = new AtomicLong(0); - pipeMetaKeeper - .getPipeMetaList() - .forEach( - pipeMeta -> - bytes.addAndGet( - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .getFloatingMemoryUsageInByte())); - return bytes.get(); + return pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.values().stream() + .mapToLong(AtomicLong::get) + .sum(); } public long getFloatingMemoryUsageInByte(final String pipeName) { - final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - return pipeMeta == null - ? 0 - : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getFloatingMemoryUsageInByte(); + final Set creationTimes = pipeName2CreationTimeSetMap.get(pipeName); + if (Objects.isNull(creationTimes)) { + return 0; + } + + return creationTimes.stream() + .map(creationTime -> generatePipeNameWithCreationTime(pipeName, creationTime)) + .map(pipeNameWithCreationTime2FloatingMemoryUsageInByteMap::get) + .filter(Objects::nonNull) + .mapToLong(AtomicLong::get) + .sum(); } public void addFloatingMemoryUsageInByte( @@ -1215,18 +1275,18 @@ public void addFloatingMemoryUsageInByte( final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); // To avoid stale pipe before alter if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) { - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .addFloatingMemoryUsageInByte(sizeInByte); + getOrCreateFloatingMemoryUsageCounter(pipeName, creationTime).addAndGet(sizeInByte); } } public void decreaseFloatingMemoryUsageInByte( final String pipeName, final long creationTime, final long sizeInByte) { - final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - // To avoid stale pipe before alter - if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) { - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .decreaseFloatingMemoryUsageInByte(sizeInByte); + final AtomicLong counter = + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.get( + generatePipeNameWithCreationTime(pipeName, creationTime)); + if (Objects.nonNull(counter)) { + counter.addAndGet(-sizeInByte); + tryCleanupFloatingMemoryUsageCounter(pipeName, creationTime, counter); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index b47441753c40..82ee87f6c8d7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -78,6 +78,13 @@ public void deregister(final String pipeName, final long creationTime, final int LOGGER.info("Pipe committer deregistered for pipe on region: {}", committerKey); } + public void clear(final String pipeName, final long creationTime) { + eventCommitterMap.keySet().removeIf(key -> clearIfMatches(key, pipeName, creationTime, true)); + eventCommitterRestartTimesMap + .keySet() + .removeIf(key -> clearIfMatches(key, pipeName, creationTime, false)); + } + /** * Assign a commit id and a key for commit. Make sure {@code EnrichedEvent.pipeName} is set before * calling this. @@ -197,6 +204,20 @@ public void setCommitRateMarker(final BiConsumer commitRateMark this.commitRateMarker = commitRateMarker; } + private boolean clearIfMatches( + final CommitterKey key, + final String pipeName, + final long creationTime, + final boolean shouldClearMetrics) { + if (!Objects.equals(key.getPipeName(), pipeName) || key.getCreationTime() != creationTime) { + return false; + } + if (shouldClearMetrics) { + PipeEventCommitMetrics.getInstance().deregister(key.stringify()); + } + return true; + } + //////////////////////////// singleton //////////////////////////// private PipeEventCommitManager() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index ebb3d90bc8ca..a7e603bd9bc4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -643,7 +643,7 @@ public void rateLimitIfNeeded( */ public synchronized void discardEventsOfPipe( final String pipeName, final long creationTime, final int regionId) { - // Do nothing by default + PIPE_END_POINT_RATE_LIMITER_MAP.remove(new Pair<>(pipeName, creationTime)); } public PipeReceiverStatusHandler statusHandler() { diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java new file mode 100644 index 000000000000..50a485df9d26 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.task; + +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; + +import org.apache.thrift.TException; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class PipeTaskAgentTest { + + @Test + public void testFloatingMemoryUsageSurvivesDropUntilLateRelease() throws IllegalPathException { + final DummyPipeTaskAgent agent = new DummyPipeTaskAgent(); + + agent.createPipeForTest(generatePipeMeta("pipe", 1L)); + agent.addFloatingMemoryUsageInByte("pipe", 1L, 100L); + Assert.assertEquals(100L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(100L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.dropPipeForTest("pipe", 1L); + Assert.assertEquals(100L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(100L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.createPipeForTest(generatePipeMeta("pipe", 2L)); + agent.addFloatingMemoryUsageInByte("pipe", 2L, 20L); + Assert.assertEquals(120L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(120L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.decreaseFloatingMemoryUsageInByte("pipe", 1L, 100L); + Assert.assertEquals(20L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(20L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.dropPipeForTest("pipe", 2L); + agent.decreaseFloatingMemoryUsageInByte("pipe", 2L, 20L); + Assert.assertEquals(0L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(0L, agent.getFloatingMemoryUsageInByte("pipe")); + } + + @Test + public void testZeroFloatingMemoryUsageCounterIsCleanedAfterDrop() throws Exception { + final DummyPipeTaskAgent agent = new DummyPipeTaskAgent(); + + agent.createPipeForTest(generatePipeMeta("pipe", 1L)); + agent.addFloatingMemoryUsageInByte("pipe", 1L, 100L); + agent.decreaseFloatingMemoryUsageInByte("pipe", 1L, 100L); + agent.dropPipeForTest("pipe", 1L); + + Assert.assertTrue( + getMapField(agent, "pipeNameWithCreationTime2FloatingMemoryUsageInByteMap").isEmpty()); + Assert.assertTrue(getMapField(agent, "pipeName2CreationTimeSetMap").isEmpty()); + } + + @SuppressWarnings("unchecked") + private static Map getMapField(final PipeTaskAgent agent, final String fieldName) + throws NoSuchFieldException, IllegalAccessException { + final Field field = PipeTaskAgent.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (Map) field.get(agent); + } + + private static PipeMeta generatePipeMeta(final String pipeName, final long creationTime) { + return new PipeMeta( + new PipeStaticMeta( + pipeName, creationTime, new HashMap<>(), new HashMap<>(), new HashMap<>()), + new PipeRuntimeMeta(new ConcurrentHashMap<>())); + } + + private static class DummyPipeTaskAgent extends PipeTaskAgent { + + private boolean createPipeForTest(final PipeMeta pipeMeta) throws IllegalPathException { + return createPipe(pipeMeta); + } + + private boolean dropPipeForTest(final String pipeName, final long creationTime) { + return dropPipe(pipeName, creationTime); + } + + @Override + protected boolean isShutdown() { + return false; + } + + @Override + protected void thawRate(final String pipeName, final long creationTime) { + // Do nothing + } + + @Override + protected void freezeRate(final String pipeName, final long creationTime) { + // Do nothing + } + + @Override + protected Map buildPipeTasks(final PipeMeta pipeMetaFromCoordinator) { + return new HashMap<>(); + } + + @Override + protected void createPipeTask( + final int consensusGroupId, + final PipeStaticMeta pipeStaticMeta, + final org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta pipeTaskMeta) { + // Do nothing + } + + @Override + protected void collectPipeMetaListInternal( + final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { + // Do nothing + } + + @Override + public void runPipeTasks( + final Collection pipeTasks, final Consumer runSingle) { + pipeTasks.forEach(runSingle); + } + } +}